Whamcloud - gitweb
add mds-laeyring prototype
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
1 #if 0
2 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
3  * vim:expandtab:shiftwidth=8:tabstop=8:
4  *
5  *  lustre/mds/handler.c
6  *  Lustre Metadata Target (mdt) request handler
7  *
8  *  Copyright (c) 2006 Cluster File Systems, Inc.
9  *   Author: Peter Braam <braam@clusterfs.com>
10  *   Author: Andreas Dilger <adilger@clusterfs.com>
11  *   Author: Phil Schwan <phil@clusterfs.com>
12  *   Author: Mike Shaver <shaver@clusterfs.com>
13  *   Author: Nikita Danilov <nikita@clusterfs.com>
14  *
15  *   This file is part of the Lustre file system, http://www.lustre.org
16  *   Lustre is a trademark of Cluster File Systems, Inc.
17  *
18  *   You may have signed or agreed to another license before downloading
19  *   this software.  If so, you are bound by the terms and conditions
20  *   of that agreement, and the following does not apply to you.  See the
21  *   LICENSE file included with this distribution for more information.
22  *
23  *   If you did not agree to a different license, then this copy of Lustre
24  *   is open source software; you can redistribute it and/or modify it
25  *   under the terms of version 2 of the GNU General Public License as
26  *   published by the Free Software Foundation.
27  *
28  *   In either case, Lustre is distributed in the hope that it will be
29  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
30  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31  *   license text for more details.
32  */
33
34 #ifndef EXPORT_SYMTAB
35 # define EXPORT_SYMTAB
36 #endif
37 #define DEBUG_SUBSYSTEM S_MDS
38
39 #include <linux/module.h>
40 #include <linux/lu_object.h>
41
42 #include "mdt.h"
43
44 int mdt_num_threads;
45
46 static int mdt_connect_internal(struct obd_export *exp,
47                                 struct obd_connect_data *data)
48 {
49         struct obd_device *obd = exp->exp_obd;
50         if (data != NULL) {
51                 data->ocd_connect_flags &= MDS_CONNECT_SUPPORTED;
52                 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
53
54                 /* If no known bits (which should not happen, probably,
55                    as everybody should support LOOKUP and UPDATE bits at least)
56                    revert to compat mode with plain locks. */
57                 if (!data->ocd_ibits_known &&
58                     data->ocd_connect_flags & OBD_CONNECT_IBITS)
59                         data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
60
61                 if (!obd->u.mds.mdt_fl_acl)
62                         data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
63
64                 if (!obd->u.mds.mdt_fl_user_xattr)
65                         data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
66
67                 exp->exp_connect_flags = data->ocd_connect_flags;
68                 data->ocd_version = LUSTRE_VERSION_CODE;
69                 exp->exp_mdt_data.med_ibits_known = data->ocd_ibits_known;
70         }
71
72         if (obd->u.mds.mdt_fl_acl &&
73             ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
74                 CWARN("%s: MDS requires ACL support but client does not\n",
75                       obd->obd_name);
76                 return -EBADE;
77         }
78         return 0;
79 }
80
81 static int mdt_reconnect(struct obd_export *exp, struct obd_device *obd,
82                          struct obd_uuid *cluuid,
83                          struct obd_connect_data *data)
84 {
85         int rc;
86         ENTRY;
87
88         if (exp == NULL || obd == NULL || cluuid == NULL)
89                 RETURN(-EINVAL);
90
91         rc = mdt_connect_internal(exp, data);
92
93         RETURN(rc);
94 }
95
96 /* Establish a connection to the MDS.
97  *
98  * This will set up an export structure for the client to hold state data
99  * about that client, like open files, the last operation number it did
100  * on the server, etc.
101  */
102 static int mdt_connect(struct lustre_handle *conn, struct obd_device *obd,
103                        struct obd_uuid *cluuid, struct obd_connect_data *data)
104 {
105         struct obd_export *exp;
106         struct mdt_export_data *med;
107         struct mdt_client_data *mcd = NULL;
108         int rc, abort_recovery;
109         ENTRY;
110
111         if (!conn || !obd || !cluuid)
112                 RETURN(-EINVAL);
113
114         /* Check for aborted recovery. */
115         spin_lock_bh(&obd->obd_processing_task_lock);
116         abort_recovery = obd->obd_abort_recovery;
117         spin_unlock_bh(&obd->obd_processing_task_lock);
118         if (abort_recovery)
119                 target_abort_recovery(obd);
120
121         /* XXX There is a small race between checking the list and adding a
122          * new connection for the same UUID, but the real threat (list
123          * corruption when multiple different clients connect) is solved.
124          *
125          * There is a second race between adding the export to the list,
126          * and filling in the client data below.  Hence skipping the case
127          * of NULL mcd above.  We should already be controlling multiple
128          * connects at the client, and we can't hold the spinlock over
129          * memory allocations without risk of deadlocking.
130          */
131         rc = class_connect(conn, obd, cluuid);
132         if (rc)
133                 RETURN(rc);
134         exp = class_conn2export(conn);
135         LASSERT(exp);
136         med = &exp->exp_mdt_data;
137
138         rc = mdt_connect_internal(exp, data);
139         if (rc)
140                 GOTO(out, rc);
141
142         OBD_ALLOC_PTR(mcd);
143         if (!mcd)
144                 GOTO(out, rc = -ENOMEM);
145
146         memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
147         med->med_mcd = mcd;
148
149         rc = mdt_client_add(obd, &obd->u.mds, med, -1);
150         GOTO(out, rc);
151
152 out:
153         if (rc) {
154                 if (mcd) {
155                         OBD_FREE_PTR(mcd);
156                         med->med_mcd = NULL;
157                 }
158                 class_disconnect(exp);
159         } else {
160                 class_export_put(exp);
161         }
162
163         RETURN(rc);
164 }
165
166 int mdt_init_export(struct obd_export *exp)
167 {
168         struct mdt_export_data *med = &exp->exp_mdt_data;
169
170         INIT_LIST_HEAD(&med->med_open_head);
171         spin_lock_init(&med->med_open_lock);
172         exp->exp_connecting = 1;
173         RETURN(0);
174 }
175
176 static int mdt_destroy_export(struct obd_export *export)
177 {
178         struct mdt_export_data *med;
179         struct obd_device *obd = export->exp_obd;
180         struct lvfs_run_ctxt saved;
181         int rc = 0;
182         ENTRY;
183
184         med = &export->exp_mdt_data;
185         target_destroy_export(export);
186
187         if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
188                 RETURN(0);
189
190         push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
191         /* Close any open files (which may also cause orphan unlinking). */
192         spin_lock(&med->med_open_lock);
193         while (!list_empty(&med->med_open_head)) {
194                 struct list_head *tmp = med->med_open_head.next;
195                 struct mdt_file_data *mfd =
196                         list_entry(tmp, struct mdt_file_data, mfd_list);
197                 struct dentry *dentry = mfd->mfd_dentry;
198
199                 /* Remove mfd handle so it can't be found again.
200                  * We are consuming the mfd_list reference here. */
201                 mdt_mfd_unlink(mfd, 0);
202                 spin_unlock(&med->med_open_lock);
203
204                 /* If you change this message, be sure to update
205                  * replay_single:test_46 */
206                 CDEBUG(D_INODE|D_IOCTL, "%s: force closing file handle for "
207                        "%.*s (ino %lu)\n", obd->obd_name, dentry->d_name.len,
208                        dentry->d_name.name, dentry->d_inode->i_ino);
209                 /* child orphan sem protects orphan_dec_test and
210                  * is_orphan race, mdt_mfd_close drops it */
211                 MDT_DOWN_WRITE_ORPHAN_SEM(dentry->d_inode);
212                 rc = mdt_mfd_close(NULL, MDS_REQ_REC_OFF, obd, mfd,
213                                    !(export->exp_flags & OBD_OPT_FAILOVER));
214
215                 if (rc)
216                         CDEBUG(D_INODE|D_IOCTL, "Error closing file: %d\n", rc);
217                 spin_lock(&med->med_open_lock);
218         }
219         spin_unlock(&med->med_open_lock);
220         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
221         mdt_client_free(export);
222
223         RETURN(rc);
224 }
225
226 static int mdt_disconnect(struct obd_export *exp)
227 {
228         unsigned long irqflags;
229         int rc;
230         ENTRY;
231
232         LASSERT(exp);
233         class_export_get(exp);
234
235         /* Disconnect early so that clients can't keep using export */
236         rc = class_disconnect(exp);
237         ldlm_cancel_locks_for_export(exp);
238
239         /* complete all outstanding replies */
240         spin_lock_irqsave(&exp->exp_lock, irqflags);
241         while (!list_empty(&exp->exp_outstanding_replies)) {
242                 struct ptlrpc_reply_state *rs =
243                         list_entry(exp->exp_outstanding_replies.next,
244                                    struct ptlrpc_reply_state, rs_exp_list);
245                 struct ptlrpc_service *svc = rs->rs_service;
246
247                 spin_lock(&svc->srv_lock);
248                 list_del_init(&rs->rs_exp_list);
249                 ptlrpc_schedule_difficult_reply(rs);
250                 spin_unlock(&svc->srv_lock);
251         }
252         spin_unlock_irqrestore(&exp->exp_lock, irqflags);
253
254         class_export_put(exp);
255         RETURN(rc);
256 }
257
258 static int mdt_getstatus(struct mdt_thread_info *info,
259                          struct ptlrpc_request *req)
260 {
261         struct md_device *mdd  = info->mti_mdt->mdt_mdd;
262         int               size = sizeof *body;
263         struct mds_body  *body;
264         int               result;
265
266         ENTRY;
267
268         result = lustre_pack_reply(req, 1, &size, NULL);
269         if (result)
270                 CERROR(LUSTRE_MDT0_NAME" out of memory for message: size=%d\n",
271                        size);
272         else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
273                 result = -ENOMEM;
274         else {
275                 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof *body);
276                 result = mdd->md_ops->mdo_root_get(mdd, &body->fid1);
277         }
278
279         /* the last_committed and last_xid fields are filled in for all
280          * replies already - no need to do so here also.
281          */
282         RETURN(result);
283 }
284
285 static int mdt_getattr_internal(struct obd_device *obd, struct dentry *dentry,
286                                 struct ptlrpc_request *req,
287                                 struct mds_body *reqbody, int reply_off)
288 {
289         struct mds_body *body;
290         struct inode *inode = dentry->d_inode;
291         int rc = 0;
292         ENTRY;
293
294         if (inode == NULL)
295                 RETURN(-ENOENT);
296
297         body = lustre_msg_buf(req->rq_repmsg, reply_off, sizeof(*body));
298         LASSERT(body != NULL);                 /* caller prepped reply */
299
300         mdt_pack_inode2fid(&body->fid1, inode);
301         mdt_pack_inode2body(body, inode);
302         reply_off++;
303
304         if ((S_ISREG(inode->i_mode) && (reqbody->valid & OBD_MD_FLEASIZE)) ||
305             (S_ISDIR(inode->i_mode) && (reqbody->valid & OBD_MD_FLDIREA))) {
306                 rc = mdt_pack_md(obd, req->rq_repmsg, reply_off, body,
307                                  inode, 1);
308
309                 /* If we have LOV EA data, the OST holds size, atime, mtime */
310                 if (!(body->valid & OBD_MD_FLEASIZE) &&
311                     !(body->valid & OBD_MD_FLDIREA))
312                         body->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS |
313                                         OBD_MD_FLATIME | OBD_MD_FLMTIME);
314
315                 lustre_shrink_reply(req, reply_off, body->eadatasize, 0);
316                 if (body->eadatasize)
317                         reply_off++;
318         } else if (S_ISLNK(inode->i_mode) &&
319                    (reqbody->valid & OBD_MD_LINKNAME) != 0) {
320                 char *symname = lustre_msg_buf(req->rq_repmsg, reply_off, 0);
321                 int len;
322
323                 LASSERT (symname != NULL);       /* caller prepped reply */
324                 len = req->rq_repmsg->buflens[reply_off];
325
326                 rc = inode->i_op->readlink(dentry, symname, len);
327                 if (rc < 0) {
328                         CERROR("readlink failed: %d\n", rc);
329                 } else if (rc != len - 1) {
330                         CERROR ("Unexpected readlink rc %d: expecting %d\n",
331                                 rc, len - 1);
332                         rc = -EINVAL;
333                 } else {
334                         CDEBUG(D_INODE, "read symlink dest %s\n", symname);
335                         body->valid |= OBD_MD_LINKNAME;
336                         body->eadatasize = rc + 1;
337                         symname[rc] = 0;        /* NULL terminate */
338                         rc = 0;
339                 }
340                 reply_off++;
341         }
342
343         if (reqbody->valid & OBD_MD_FLMODEASIZE) {
344                 struct mdt_obd *mds = mdt_req2mds(req);
345                 body->max_cookiesize = mds->mdt_max_cookiesize;
346                 body->max_mdsize = mds->mdt_max_mdsize;
347                 body->valid |= OBD_MD_FLMODEASIZE;
348         }
349
350         if (rc)
351                 RETURN(rc);
352
353         RETURN(rc);
354 }
355
356 static int mdt_getattr_pack_msg(struct ptlrpc_request *req, struct inode *inode,
357                                 int offset)
358 {
359         struct mdt_obd *mds = mdt_req2mds(req);
360         struct mds_body *body;
361         int rc, size[2] = {sizeof(*body)}, bufcount = 1;
362         ENTRY;
363
364         body = lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*body));
365         LASSERT(body != NULL);                 /* checked by caller */
366         LASSERT_REQSWABBED(req, offset);       /* swabbed by caller */
367
368         if ((S_ISREG(inode->i_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
369             (S_ISDIR(inode->i_mode) && (body->valid & OBD_MD_FLDIREA))) {
370                 LOCK_INODE_MUTEX(inode);
371                 rc = fsfilt_get_md(req->rq_export->exp_obd, inode, NULL, 0,
372                                    "lov");
373                 UNLOCK_INODE_MUTEX(inode);
374                 CDEBUG(D_INODE, "got %d bytes MD data for inode %lu\n",
375                        rc, inode->i_ino);
376                 if (rc < 0) {
377                         if (rc != -ENODATA) {
378                                 CERROR("error getting inode %lu MD: rc = %d\n",
379                                        inode->i_ino, rc);
380                                 RETURN(rc);
381                         }
382                         size[bufcount] = 0;
383                 } else if (rc > mds->mdt_max_mdsize) {
384                         size[bufcount] = 0;
385                         CERROR("MD size %d larger than maximum possible %u\n",
386                                rc, mds->mdt_max_mdsize);
387                 } else {
388                         size[bufcount] = rc;
389                 }
390                 bufcount++;
391         } else if (S_ISLNK(inode->i_mode) && (body->valid & OBD_MD_LINKNAME)) {
392                 if (inode->i_size + 1 != body->eadatasize)
393                         CERROR("symlink size: %Lu, reply space: %d\n",
394                                inode->i_size + 1, body->eadatasize);
395                 size[bufcount] = min_t(int, inode->i_size+1, body->eadatasize);
396                 bufcount++;
397                 CDEBUG(D_INODE, "symlink size: %Lu, reply space: %d\n",
398                        inode->i_size + 1, body->eadatasize);
399         }
400
401         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
402                 CERROR("failed MDT_GETATTR_PACK test\n");
403                 req->rq_status = -ENOMEM;
404                 RETURN(-ENOMEM);
405         }
406
407         rc = lustre_pack_reply(req, bufcount, size, NULL);
408         if (rc) {
409                 CERROR("lustre_pack_reply failed: rc %d\n", rc);
410                 req->rq_status = rc;
411                 RETURN(rc);
412         }
413
414         RETURN(0);
415 }
416
417 static int mdt_getattr_name(int offset, struct ptlrpc_request *req,
418                             int child_part, struct lustre_handle *child_lockh)
419 {
420         struct obd_device *obd = req->rq_export->exp_obd;
421         struct mdt_obd *mds = &obd->u.mds;
422         struct ldlm_reply *rep = NULL;
423         struct lvfs_run_ctxt saved;
424         struct mds_body *body;
425         struct dentry *dparent = NULL, *dchild = NULL;
426         struct lvfs_ucred uc = {NULL,};
427         struct lustre_handle parent_lockh;
428         int namesize;
429         int rc = 0, cleanup_phase = 0, resent_req = 0;
430         char *name;
431         ENTRY;
432
433         LASSERT(!strcmp(obd->obd_type->typ_name, LUSTRE_MDT_NAME));
434
435         /* Swab now, before anyone looks inside the request */
436
437         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
438                                   lustre_swab_mdt_body);
439         if (body == NULL) {
440                 CERROR("Can't swab mdt_body\n");
441                 RETURN(-EFAULT);
442         }
443
444         LASSERT_REQSWAB(req, offset + 1);
445         name = lustre_msg_string(req->rq_reqmsg, offset + 1, 0);
446         if (name == NULL) {
447                 CERROR("Can't unpack name\n");
448                 RETURN(-EFAULT);
449         }
450         namesize = lustre_msg_buflen(req->rq_reqmsg, offset + 1);
451
452         rc = mdt_init_ucred(&uc, req, offset);
453         if (rc)
454                 GOTO(cleanup, rc);
455
456         LASSERT (offset == MDS_REQ_REC_OFF || offset == MDS_REQ_INTENT_REC_OFF);
457         /* if requests were at offset 2, the getattr reply goes back at 1 */
458         if (offset == MDS_REQ_INTENT_REC_OFF) {
459                 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
460                 offset = 1;
461         }
462
463         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
464         cleanup_phase = 1; /* kernel context */
465         intent_set_disposition(rep, DISP_LOOKUP_EXECD);
466
467         if (lustre_handle_is_used(child_lockh)) {
468                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
469                 resent_req = 1;
470         }
471
472         if (resent_req == 0) {
473             if (name) {
474                 rc = mdt_get_parent_child_locked(obd, &obd->u.mds, &body->fid1,
475                                                  &parent_lockh, &dparent,
476                                                  LCK_CR,
477                                                  MDS_INODELOCK_UPDATE,
478                                                  name, namesize,
479                                                  child_lockh, &dchild, LCK_CR,
480                                                  child_part);
481             } else {
482                         /* For revalidate by fid we always take UPDATE lock */
483                         dchild = mdt_fid2locked_dentry(obd, &body->fid2, NULL,
484                                                        LCK_CR, child_lockh,
485                                                        NULL, 0,
486                                                        MDT_INODELOCK_UPDATE);
487                         LASSERT(dchild);
488                         if (IS_ERR(dchild))
489                                 rc = PTR_ERR(dchild);
490             }
491             if (rc)
492                     GOTO(cleanup, rc);
493         } else {
494                 struct ldlm_lock *granted_lock;
495                 struct ll_fid child_fid;
496                 struct ldlm_resource *res;
497                 DEBUG_REQ(D_DLMTRACE, req, "resent, not enqueuing new locks");
498                 granted_lock = ldlm_handle2lock(child_lockh);
499                 LASSERTF(granted_lock != NULL, LPU64"/%u lockh "LPX64"\n",
500                          body->fid1.id, body->fid1.generation,
501                          child_lockh->cookie);
502
503
504                 res = granted_lock->l_resource;
505                 child_fid.id = res->lr_name.name[0];
506                 child_fid.generation = res->lr_name.name[1];
507                 dchild = mdt_fid2dentry(&obd->u.mds, &child_fid, NULL);
508                 LASSERT(!IS_ERR(dchild));
509                 LDLM_LOCK_PUT(granted_lock);
510         }
511
512         cleanup_phase = 2; /* dchild, dparent, locks */
513
514         if (dchild->d_inode == NULL) {
515                 intent_set_disposition(rep, DISP_LOOKUP_NEG);
516                 /* in the intent case, the policy clears this error:
517                    the disposition is enough */
518                 GOTO(cleanup, rc = -ENOENT);
519         } else {
520                 intent_set_disposition(rep, DISP_LOOKUP_POS);
521         }
522
523         if (req->rq_repmsg == NULL) {
524                 rc = mdt_getattr_pack_msg(req, dchild->d_inode, offset);
525                 if (rc != 0) {
526                         CERROR ("mdt_getattr_pack_msg: %d\n", rc);
527                         GOTO (cleanup, rc);
528                 }
529         }
530
531         rc = mdt_getattr_internal(obd, dchild, req, body, offset);
532         GOTO(cleanup, rc); /* returns the lock to the client */
533
534  cleanup:
535         switch (cleanup_phase) {
536         case 2:
537                 if (resent_req == 0) {
538                         if (rc && dchild->d_inode)
539                                 ldlm_lock_decref(child_lockh, LCK_CR);
540                         ldlm_lock_decref(&parent_lockh, LCK_CR);
541                         l_dput(dparent);
542                 }
543                 l_dput(dchild);
544         case 1:
545                 pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
546         default:
547                 mds_exit_ucred(&uc, mds);
548                 if (req->rq_reply_state == NULL) {
549                         req->rq_status = rc;
550                         lustre_pack_reply(req, 0, NULL, NULL);
551                 }
552         }
553         return rc;
554 }
555
556 static int mds_getattr(struct ptlrpc_request *req, int offset)
557 {
558         struct mds_obd *mds = mds_req2mds(req);
559         struct obd_device *obd = req->rq_export->exp_obd;
560         struct lvfs_run_ctxt saved;
561         struct dentry *de;
562         struct mds_body *body;
563         struct lvfs_ucred uc = {NULL,};
564         int rc = 0;
565         ENTRY;
566
567         body = lustre_swab_reqbuf(req, offset, sizeof(*body),
568                                   lustre_swab_mds_body);
569         if (body == NULL)
570                 RETURN(-EFAULT);
571
572         rc = mds_init_ucred(&uc, req, offset);
573         if (rc)
574                 GOTO(out_ucred, rc);
575
576         push_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
577         de = mds_fid2dentry(mds, &body->fid1, NULL);
578         if (IS_ERR(de)) {
579                 rc = req->rq_status = PTR_ERR(de);
580                 GOTO(out_pop, rc);
581         }
582
583         rc = mds_getattr_pack_msg(req, de->d_inode, offset);
584         if (rc != 0) {
585                 CERROR("mds_getattr_pack_msg: %d\n", rc);
586                 GOTO(out_pop, rc);
587         }
588
589         req->rq_status = mds_getattr_internal(obd, de, req, body, 0);
590
591         l_dput(de);
592         GOTO(out_pop, rc);
593 out_pop:
594         pop_ctxt(&saved, &obd->obd_lvfs_ctxt, &uc);
595 out_ucred:
596         if (req->rq_reply_state == NULL) {
597                 req->rq_status = rc;
598                 lustre_pack_reply(req, 0, NULL, NULL);
599         }
600         mds_exit_ucred(&uc, mds);
601         return rc;
602 }
603
604
605 static int mds_obd_statfs(struct obd_device *obd, struct obd_statfs *osfs,
606                           unsigned long max_age)
607 {
608         int rc;
609
610         spin_lock(&obd->obd_osfs_lock);
611         rc = fsfilt_statfs(obd, obd->u.obt.obt_sb, max_age);
612         if (rc == 0)
613                 memcpy(osfs, &obd->obd_osfs, sizeof(*osfs));
614         spin_unlock(&obd->obd_osfs_lock);
615
616         return rc;
617 }
618
619 static int mds_statfs(struct ptlrpc_request *req)
620 {
621         struct obd_device *obd = req->rq_export->exp_obd;
622         int rc, size = sizeof(struct obd_statfs);
623         ENTRY;
624
625         /* This will trigger a watchdog timeout */
626         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
627                          (MDS_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1);
628
629         rc = lustre_pack_reply(req, 1, &size, NULL);
630         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
631                 CERROR("mds: statfs lustre_pack_reply failed: rc = %d\n", rc);
632                 GOTO(out, rc);
633         }
634
635         /* We call this so that we can cache a bit - 1 jiffie worth */
636         rc = mds_obd_statfs(obd, lustre_msg_buf(req->rq_repmsg, 0, size),
637                             jiffies - HZ);
638         if (rc) {
639                 CERROR("mds_obd_statfs failed: rc %d\n", rc);
640                 GOTO(out, rc);
641         }
642
643         EXIT;
644 out:
645         req->rq_status = rc;
646         return 0;
647 }
648
649 static int mds_set_info(struct obd_export *exp, struct ptlrpc_request *req)
650 {
651         char *key;
652         __u32 *val;
653         int keylen, rc = 0;
654         ENTRY;
655
656         key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
657         if (key == NULL) {
658                 DEBUG_REQ(D_HA, req, "no set_info key");
659                 RETURN(-EFAULT);
660         }
661         keylen = req->rq_reqmsg->buflens[0];
662
663         val = lustre_msg_buf(req->rq_reqmsg, 1, sizeof(*val));
664         if (val == NULL) {
665                 DEBUG_REQ(D_HA, req, "no set_info val");
666                 RETURN(-EFAULT);
667         }
668
669         rc = lustre_pack_reply(req, 0, NULL, NULL);
670         if (rc)
671                 RETURN(rc);
672         req->rq_repmsg->status = 0;
673
674         if (keylen < strlen("read-only") ||
675             memcmp(key, "read-only", keylen) != 0)
676                 RETURN(-EINVAL);
677
678         if (*val)
679                 exp->exp_connect_flags |= OBD_CONNECT_RDONLY;
680         else
681                 exp->exp_connect_flags &= ~OBD_CONNECT_RDONLY;
682
683         RETURN(0);
684 }
685
686 enum mdt_handler_flags {
687         /*
688          * struct mds_body is passed in the 0-th incoming buffer.
689          */
690         HABEO_CORPUS = (1 << 0)
691 };
692
693 struct mdt_handler {
694         const char *mh_name;
695         int         mh_fail_id;
696         __u32       mh_opc;
697         __u32       mh_flags;
698         int (*mh_act)(struct mdt_thread_info *info, struct ptlrpc_request *req);
699 };
700
701 #define DEF_HNDL(prefix, base, flags, name, fn)                 \
702 [prefix ## name - prefix ## base] = {                           \
703         .mh_name    = #name,                                    \
704         .mh_fail_id = OBD_FAIL_ ## prefix ## _  ## opc ## _NET, \
705         .mh_opc     = prefix ## _  ## opc,                      \
706         .mh_flags   = flags,                                    \
707         .mh_act     = fn                                        \
708 }
709
710 #define DEF_MDT_HNDL(flags, name, fn) DEF_HNDL(mdt, CONNECT, flags, name, fn)
711
712 static struct mdt_handler mdt_mds_ops[] = {
713         DEF_MDT_HNDL(0,            CONNECT,        mdt_connect),
714         DEF_MDT_HNDL(0,            DISCONNECT,     mdt_disconnect),
715         DEF_MDT_HNDL(0,            GETSTATUS,      mdt_getstatus),
716         DEF_MDT_HNDL(HABEO_CORPUS, GETATTR,        mdt_getattr),
717         DEF_MDT_HNDL(HABEO_CORPUS, GETATTR_NAME,   mdt_getattr_name),
718         DEF_MDT_HNDL(HABEO_CORPUS, SETXATTR,       mdt_setxattr),
719         DEF_MDT_HNDL(HABEO_CORPUS, GETXATTR,       mdt_getxattr),
720         DEF_MDT_HNDL(0,            STATFS,         mdt_statfs),
721         DEF_MDT_HNDL(HABEO_CORPUS, READPAGE,       mdt_readpage),
722         DEF_MDT_HNDL(0,            REINT,          mdt_reint),
723         DEF_MDT_HNDL(HABEO_CORPUS, CLOSE,          mdt_close),
724         DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING,   mdt_done_writing),
725         DEF_MDT_HNDL(0,            PIN,            mdt_pin),
726         DEF_MDT_HNDL(HABEO_CORPUS, SYNC,           mdt_sync),
727         DEF_MDT_HNDL(0,            0 /*SET_INFO*/, mdt_set_info),
728         DEF_MDT_HNDL(0,            QUOTACHECK,     mdt_handle_quotacheck),
729         DEF_MDT_HNDL(0,            QUOTACTL,       mdt_handle_quotactl)
730 };
731
732 static struct mdt_handler mdt_obd_ops[] = {
733 };
734
735 static struct mdt_handler mdt_dlm_ops[] = {
736 };
737
738 static struct mdt_handler mdt_llog_ops[] = {
739 };
740
741 static struct mdt_opc_slice {
742         __u32               mos_opc_start;
743         int                 mos_opc_end;
744         struct mdt_handler *mos_hs;
745 } mdt_handlers[] = {
746         {
747                 .mos_opc_start = MDS_GETATTR,
748                 .mos_opc_end   = MDS_LAST_OPC,
749                 .mos_hs        = mdt_mds_ops
750         },
751         {
752                 .mos_opc_start = OBD_PING,
753                 .mos_opc_end   = OBD_LAST_OPC,
754                 .mos_hs        = mdt_obd_ops
755         },
756         {
757                 .mos_opc_start = LDLM_ENQUEUE,
758                 .mos_opc_end   = LDLM_LAST_OPC,
759                 .mos_hs        = mdt_dlm_ops
760         },
761         {
762                 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
763                 .mos_opc_end   = LLOG_LAST_OPC,
764                 .mos_hs        = mdt_llog_ops
765         }
766 };
767
768 enum {
769         MDT_REP_BUF_NR_MAX = 8
770 };
771
772 /*
773  * Common data shared by mdt-level handlers. This is allocated per-thread to
774  * reduce stack consumption.
775  */
776 struct mdt_thread_info {
777         struct mdt_device *mti_mdt;
778         /*
779          * number of buffers in reply message.
780          */
781         int                mti_rep_buf_nr;
782         /*
783          * sizes of reply buffers.
784          */
785         int                mti_rep_buf_size[MDT_REP_BUF_NR_MAX];
786         /*
787          * Body for "habeo corpus" operations.
788          */
789         struct mds_body   *mti_body;
790         /*
791          * Host object. This is released at the end of mdt_handler().
792          */
793         struct mdt_object *mti_object;
794         /*
795          * Additional fail id that can be set by handler. Passed to
796          * target_send_reply().
797          */
798         int                mti_fail_id;
799         /*
800          * Offset of incoming buffers. 0 for top-level request processing. +ve
801          * for intent handling.
802          */
803         int                mti_offset;
804 };
805
806 struct mdt_handler *mdt_handler_find(__u32 opc)
807 {
808         int i;
809         struct mdt_opc_slice *s;
810         struct mdt_handler *h;
811
812         h = NULL;
813         for (i = 0, s = mdt_handlers; i < ARRAY_SIZE(mdt_handlers); i++, s++) {
814                 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
815                         h = s->mos_hs + (opc - s->mos_opc_start);
816                         if (h->mos_opc != 0)
817                                 LASSERT(h->mos_opc == opc);
818                         else
819                                 h = NULL; /* unsupported opc */
820                         break;
821                 }
822         }
823         return h;
824 }
825
826 struct mdt_object *mdt_object_find(struct mdt_device *d, struct lfid *f)
827 {
828         struct lu_object *o;
829
830         o = lu_object_find(&d->mdt_lu_dev.ld_site, f);
831         if (IS_ERR(o))
832                 return (struct mdd_object *)o;
833         else
834                 return container_of(o, struct mdt_object, mot_obj.mo_lu);
835 }
836
837 void mdt_object_put(struct mdt_object *o)
838 {
839         lu_object_put(&o->mot_obj.mo_lu);
840 }
841
842 static int mdt_req_handle(struct mdt_thread_info *info,
843                           struct mdt_handler *h, struct ptlrpc_request *req,
844                           int shift)
845 {
846         int result;
847
848         ENTRY;
849
850         LASSERT(h->mh_act != NULL);
851         LASSERT(h->mh_opc == req->rq_reqmsg->opc);
852
853         DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
854
855         if (h->mh_fail_id != 0)
856                 OBD_FAIL_RETURN(h->mh_fail_id, 0);
857
858         h->mh_offset = MDS_REQ_REC_OFF + shift;
859         if (h->mh_flags & HABEO_CORPUS) {
860                 info->mti_body = lustre_swab_reqbuf(req, h->mh_offset,
861                                                     sizeof *info->mti_body,
862                                                     lustre_swab_mds_body);
863                 if (info->mti_body == NULL) {
864                         CERROR("Can't unpack body\n");
865                         result = req->rq_status = -EFAULT;
866                 }
867                 info->mti_object = mdt_object_find(info->mti_mdt,
868                                                    info->mti_body.fid1);
869                 if (IS_ERR(info->mti_object))
870                         result = PTR_ERR(info->mti_object);
871         }
872         if (result == 0)
873                 result = h->mh_act(info, h, req);
874         /*
875          * XXX result value is unconditionally shoved into ->rq_status
876          * (original code sometimes placed error code into ->rq_status, and
877          * sometimes returned it to the
878          * caller). ptlrpc_server_handle_request() doesn't check return value
879          * anyway.
880          */
881         req->rq_status = result;
882         RETURN(result);
883 }
884
885 static void mdt_thread_info_init(struct mdt_thread_info *info)
886 {
887         memset(info, 0, sizeof *info);
888         info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
889         /*
890          * Poison size array.
891          */
892         for (info->mti_rep_buf_nr = 0;
893              info->mti_rep_buf_nr < MDT_REP_BUF_NR_MAX; info->mti_rep_buf_nr++)
894                 info->mti_rep_buf_size[info->mti_rep_buf_nr] = ~0;
895 }
896
897 static void mdt_thread_info_fini(struct mdt_thread_info *info)
898 {
899         if (info->mti_object != NULL) {
900                 mdt_object_put(info->mti_object);
901                 info->mti_object = NULL;
902         }
903 }
904
905 int mdt_handle(struct ptlrpc_request *req)
906 {
907         int should_process,
908         int rc = 0;
909         struct mds_obd *mds = NULL; /* quell gcc overwarning */
910         struct obd_device *obd = NULL;
911         struct mdt_thread_info info; /* XXX on stack for now */
912         struct mdt_handler *h;
913
914         ENTRY;
915
916         OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
917
918         LASSERT(current->journal_info == NULL);
919
920         rc = mds_msg_check_version(req->rq_reqmsg);
921         if (rc) {
922                 CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
923                 RETURN(rc);
924         }
925
926         /* XXX identical to OST */
927         if (req->rq_reqmsg->opc != MDS_CONNECT) {
928                 struct mds_export_data *med;
929                 int recovering, abort_recovery;
930
931                 if (req->rq_export == NULL) {
932                         CERROR("operation %d on unconnected MDS from %s\n",
933                                req->rq_reqmsg->opc,
934                                libcfs_id2str(req->rq_peer));
935                         req->rq_status = -ENOTCONN;
936                         GOTO(out, rc = -ENOTCONN);
937                 }
938
939                 med = &req->rq_export->exp_mds_data;
940                 obd = req->rq_export->exp_obd;
941                 mds = &obd->u.mds;
942
943                 /* sanity check: if the xid matches, the request must
944                  * be marked as a resent or replayed */
945                 if (req->rq_xid == med->med_mcd->mcd_last_xid)
946                         LASSERTF(lustre_msg_get_flags(req->rq_reqmsg) &
947                                  (MSG_RESENT | MSG_REPLAY),
948                                  "rq_xid "LPU64" matches last_xid, "
949                                  "expected RESENT flag\n",
950                                  req->rq_xid);
951                 /* else: note the opposite is not always true; a
952                  * RESENT req after a failover will usually not match
953                  * the last_xid, since it was likely never
954                  * committed. A REPLAYed request will almost never
955                  * match the last xid, however it could for a
956                  * committed, but still retained, open. */
957
958                 /* Check for aborted recovery. */
959                 spin_lock_bh(&obd->obd_processing_task_lock);
960                 abort_recovery = obd->obd_abort_recovery;
961                 recovering = obd->obd_recovering;
962                 spin_unlock_bh(&obd->obd_processing_task_lock);
963                 if (abort_recovery) {
964                         target_abort_recovery(obd);
965                 } else if (recovering) {
966                         rc = mds_filter_recovery_request(req, obd,
967                                                          &should_process);
968                         if (rc || !should_process)
969                                 RETURN(rc);
970                 }
971         }
972
973         h = mdt_handler_find(req->rq_reqmsg->opc);
974         if (h != NULL) {
975                 rc = mdt_handle_req(&info, h, req, 0);
976         } else {
977                 req->rq_status = -ENOTSUPP;
978                 rc = ptlrpc_error(req);
979                 RETURN(rc);
980         }
981
982         LASSERT(current->journal_info == NULL);
983
984         /* If we're DISCONNECTing, the mds_export_data is already freed */
985         if (!rc && req->rq_reqmsg->opc != MDS_DISCONNECT) {
986                 struct mds_export_data *med = &req->rq_export->exp_mds_data;
987                 req->rq_repmsg->last_xid =
988                         le64_to_cpu(med->med_mcd->mcd_last_xid);
989
990                 target_committed_to_req(req);
991         }
992
993         EXIT;
994  out:
995
996         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
997                 if (obd && obd->obd_recovering) {
998                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
999                         return target_queue_final_reply(req, rc);
1000                 }
1001                 /* Lost a race with recovery; let the error path DTRT. */
1002                 rc = req->rq_status = -ENOTCONN;
1003         }
1004
1005         target_send_reply(req, rc, info.mti_fail_id);
1006         return 0;
1007 }
1008
1009 static int mdt_intent_policy(struct ldlm_namespace *ns,
1010                              struct ldlm_lock **lockp, void *req_cookie,
1011                              ldlm_mode_t mode, int flags, void *data)
1012 {
1013         RETURN(ELDLM_LOCK_ABORTED);
1014 }
1015
1016 struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
1017                                             svc_handler_t h, char *name,
1018                                             struct proc_dir_entry *proc_entry,
1019                                             svcreq_printfn_t prntfn)
1020 {
1021         return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize,
1022                                c->psc_max_req_size, c->psc_max_reply_size,
1023                                c->psc_req_portal, c->psc_rep_portal,
1024                                c->psc_watchdog_timeout,
1025                                h, char name, proc_entry,
1026                                prntfn, c->psc_num_threads);
1027 }
1028
1029 int md_device_init(struct md_device *md)
1030 {
1031         return lu_device_init(&md->md_lu_dev);
1032 }
1033
1034 void md_device_fini(struct md_device *md)
1035 {
1036         lu_device_fini(&md->md_lu_dev);
1037 }
1038
1039 static struct lu_device_operations mdt_lu_ops;
1040
1041 static int mdt_device_init(struct mdt_device *m)
1042 {
1043         md_device_init(&m->mdt_md_dev);
1044
1045         m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
1046
1047         m->mdt_service_conf.psc_nbufs            = MDS_NBUFS;
1048         m->mdt_service_conf.psc_bufsize          = MDS_BUFSIZE;
1049         m->mdt_service_conf.psc_max_req_size     = MDS_MAXREQSIZE;
1050         m->mdt_service_conf.psc_max_reply_size   = MDS_MAXREPSIZE;
1051         m->mdt_service_conf.psc_req_portal       = MDS_REQUEST_PORTAL;
1052         m->mdt_service_conf.psc_rep_portal       = MDC_REPLY_PORTAL;
1053         m->mdt_service_conf.psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT;
1054         /*
1055          * We'd like to have a mechanism to set this on a per-device basis,
1056          * but alas...
1057          */
1058         if (mds_num_threads < 2)
1059                 mds_num_threads = MDS_DEF_THREADS;
1060         m->mdt_service_conf.psc_num_threads = min(mds_num_threads,
1061                                                   MDS_MAX_THREADS);
1062         return 0;
1063 }
1064
1065 static void mdt_device_fini(struct mdt_device *m)
1066 {
1067         md_device_fini(&m->mdt_md_dev);
1068 }
1069
1070 static int lu_device_is_mdt(struct lu_device *d)
1071 {
1072         /*
1073          * XXX for now. Tags in lu_device_type->ldt_something are needed.
1074          */
1075         return ergo(d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
1076 }
1077
1078 static struct mdt_device *mdt_dev(struct lu_device *d)
1079 {
1080         LASSERT(lu_device_is_mdt(d));
1081         return container_of(d, struct mdt_device, mdt_lu_dev);
1082 }
1083
1084 static struct mdt_object *mdt_obj(struct lu_object *o)
1085 {
1086         LASSERT(lu_device_is_mdt(o->lo_dev));
1087         return container_of(o, struct mdt_object, mot_obj.mo_lu);
1088 }
1089
1090 static void mdt_fini(struct lu_device *d)
1091 {
1092         struct mdt_device *m = mdt_dev(d);
1093
1094         if (d->ld_site != NULL) {
1095                 lu_site_fini(d->ld_site);
1096                 d->ld_site = NULL;
1097         }
1098         if (m->mdt_service != NULL) {
1099                 ptlrpc_unregister_service(m->mdt_service);
1100                 m->mdt_service = NULL;
1101         }
1102         if (m->mdt_namespace != NULL) {
1103                 ldlm_namespace_free(m->mdt_namespace, 0);
1104                 m->mdt_namespace = NULL;
1105         }
1106         
1107         LASSERT(atomic_read(&d->ld_ref) == 0);
1108 }
1109
1110 static int mdt_init0(struct lu_device *d)
1111 {
1112         struct mdt_device *m = mdt_dev(d);
1113         struct lu_site *s;
1114         char   ns_name[48];
1115
1116         ENTRY;
1117
1118         OBD_ALLOC_PTR(s);
1119         if (s == NULL)
1120                 return -ENOMEM;
1121
1122         mdt_device_init(m);
1123         lu_site_init(s, m);
1124
1125         snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
1126         m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
1127         if (m->mdt_namespace == NULL)
1128                 return -ENOMEM;
1129         ldlm_register_intent(m->mst_namespace, mdt_intent_policy);
1130
1131         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1132                            "mdt_ldlm_client", &m->mdt_ldlm_client);
1133
1134         m->mdt_service = ptlrpc_init_svc_conf(&mdt->mdt_service_conf,
1135                                               mdt_handle, LUSTRE_MDT0_NAME,
1136                                               mdt->mdt_lu_dev.ld_proc_entry
1137                                               NULL);
1138         if (m->mdt_service == NULL)
1139                 return -ENOMEM;
1140
1141         return ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
1142 }
1143
1144 static int mdt_init(struct lu_device *d)
1145 {
1146         int result;
1147
1148         result = mdt_init0(d);
1149         if (result != 0)
1150                 mdt_fini(d);
1151         return result;
1152 }
1153
1154 struct lu_object *mdt_object_alloc(struct lu_device *d)
1155 {
1156         struct mdt_object *mo;
1157
1158         OBD_ALLOC_PTR(mo);
1159         if (mo != NULL) {
1160                 struct lu_object *o;
1161                 struct lu_object_header *h;
1162
1163                 o = &mo->mot_obj.mo_lu;
1164                 h = &mo->mot_header;
1165                 lu_object_header_init(h);
1166                 lu_object_init(o, h, d);
1167                 /* ->lo_depth and ->lo_flags are automatically 0 */
1168                 lu_object_add_top(h, o);
1169         } else
1170                 return NULL;
1171 }
1172
1173 int mdt_object_init(struct lu_object *o)
1174 {
1175         struct mdt_device *d = mdt_dev(o->lo_dev);
1176         struct lu_device  *under;
1177         struct lu_object  *below;
1178
1179         under = &d->mdt_mdd->md_lu_dev;
1180         below = under->ld_ops->ldo_alloc(under);
1181         if (below != NULL) {
1182                 lu_object_add(o, below);
1183                 return 0;
1184         } else
1185                 return -ENOMEM;
1186 }
1187
1188 void mdt_object_free(struct lu_object *o)
1189 {
1190         struct lu_object_header;
1191
1192         h = o->lo_header;
1193         lu_object_fini(o);
1194         lu_object_header_fini(h);
1195 }
1196
1197 void mdt_object_release(struct lu_object *o)
1198 {
1199 }
1200
1201 int mdt_object_print(struct seq_file *f, const struct lu_object *o)
1202 {
1203         return seq_printf(f, LUSTRE_MDT0_NAME"-object@%p", o);
1204 }
1205
1206 static struct lu_device_operations mdt_lu_ops = {
1207         .ldo_init           = mdt_init,
1208         .ldo_fini           = mdt_fini,
1209         .ldo_object_alloc   = mdt_object_alloc,
1210         .ldo_object_init    = mdt_object_init,
1211         .ldo_object_free    = mdt_object_free,
1212         .ldo_object_release = mdt_object_release,
1213         .ldo_object_print   = mdt_object_print
1214 }
1215
1216 int mdt_mkdir(struct mdt_device *d, struct lfid *pfid, const char *name)
1217 {
1218         struct mdt_object *o;
1219         struct lock_handle lh;
1220         int result;
1221
1222         o = mdt_object_find(d, pfid);
1223         if (IS_ERR(o))
1224                 return PTR_ERR(o);
1225         result = fid_lock(pfid, LCK_PW, &lh);
1226         if (result == 0) {
1227                 result = d->mdt_dev.md_ops->mdo_mkdir(o, name);
1228                 fid_unlock(&lh);
1229         }
1230         mdt_object_put(o);
1231         return result;
1232 }
1233
1234 static struct obd_ops mdt_ops = {
1235         .o_owner           = THIS_MODULE,
1236         .o_connect         = mds_connect,
1237         .o_reconnect       = mds_reconnect,
1238         .o_init_export     = mds_init_export,
1239         .o_destroy_export  = mds_destroy_export,
1240         .o_disconnect      = mds_disconnect,
1241         .o_setup           = mds_setup,
1242         .o_precleanup      = mds_precleanup,
1243         .o_cleanup         = mds_cleanup,
1244         .o_postrecov       = mds_postrecov,
1245         .o_statfs          = mds_obd_statfs,
1246         .o_iocontrol       = mds_iocontrol,
1247         .o_create          = mds_obd_create,
1248         .o_destroy         = mds_obd_destroy,
1249         .o_llog_init       = mds_llog_init,
1250         .o_llog_finish     = mds_llog_finish,
1251         .o_notify          = mds_notify,
1252         .o_health_check    = mds_health_check,
1253 };
1254
1255 static int __init mdt_mod_init(void)
1256 {
1257         return 0;
1258 }
1259
1260 static void __exit mdt_mod_exit(void)
1261 {
1262 }
1263
1264 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1265 MODULE_DESCRIPTION("Lustre Meta-data Target Prototype ("LUSTRE_MDT0_NAME")");
1266 MODULE_LICENSE("GPL");
1267
1268 CFS_MODULE_PARM(mdt_num_threads, "i", int, 0444,
1269                 "number of mdt service threads to start");
1270
1271 cfs_module(mdt, "0.0.2", mdt_mod_init, mdt_mod_exit);
1272
1273 #endif /* 0 */