Whamcloud - gitweb
1. modified the code to #colibri coding guideline.
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/handler.c
5  *  Lustre Metadata Target (mdt) request handler
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *   Author: Nikita Danilov <nikita@clusterfs.com>
13  *
14  *   This file is part of the Lustre file system, http://www.lustre.org
15  *   Lustre is a trademark of Cluster File Systems, Inc.
16  *
17  *   You may have signed or agreed to another license before downloading
18  *   this software.  If so, you are bound by the terms and conditions
19  *   of that agreement, and the following does not apply to you.  See the
20  *   LICENSE file included with this distribution for more information.
21  *
22  *   If you did not agree to a different license, then this copy of Lustre
23  *   is open source software; you can redistribute it and/or modify it
24  *   under the terms of version 2 of the GNU General Public License as
25  *   published by the Free Software Foundation.
26  *
27  *   In either case, Lustre is distributed in the hope that it will be
28  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
29  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
30  *   license text for more details.
31  */
32
33 #ifndef EXPORT_SYMTAB
34 # define EXPORT_SYMTAB
35 #endif
36 #define DEBUG_SUBSYSTEM S_MDS
37
38 #include <linux/module.h>
39
40 /* LUSTRE_VERSION_CODE */
41 #include <linux/lustre_ver.h>
42 /*
43  * struct OBD_{ALLOC,FREE}*()
44  * OBD_FAIL_CHECK
45  */
46 #include <linux/obd_support.h>
47 /* struct ptlrpc_request */
48 #include <linux/lustre_net.h>
49 /* struct obd_export */
50 #include <linux/lustre_export.h>
51 /* struct obd_device */
52 #include <linux/obd.h>
53 /* lu2dt_dev() */
54 #include <linux/dt_object.h>
55
56 /*LUSTRE_POSIX_ACL_MAX_SIZE*/
57 #include <linux/lustre_acl.h>
58
59
60 /* struct mds_client_data */
61 #include "../mds/mds_internal.h"
62 #include "mdt_internal.h"
63
64 /*
65  * Initialized in mdt_mod_init().
66  */
67 unsigned long mdt_num_threads;
68
69 static int                mdt_handle    (struct ptlrpc_request *req);
70 static struct mdt_device *mdt_dev       (struct lu_device *d);
71 static struct lu_fid     *mdt_object_fid(struct mdt_object *o);
72
73 static struct lu_context_key       mdt_thread_key;
74 static struct lu_object_operations mdt_obj_ops;
75
76 static int mdt_getstatus(struct mdt_thread_info *info,
77                          struct ptlrpc_request *req, int offset)
78 {
79         struct md_device *next  = info->mti_mdt->mdt_child;
80         int               result;
81
82         ENTRY;
83
84         info->mti_rep_buf_size[0] = sizeof (struct mdt_body);
85         result = lustre_pack_reply(req, 1, info->mti_rep_buf_size, NULL);
86         if (result)
87                 CERROR(LUSTRE_MDT0_NAME" out of memory for message: size=%d\n",
88                        sizeof (struct mdt_body));
89         else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
90                 result = -ENOMEM;
91         else {
92                 info->mti_body = lustre_msg_buf(req->rq_repmsg, 0, sizeof (struct mdt_body));
93                 result = next->md_ops->mdo_root_get(info->mti_ctxt,
94                                                     next, &info->mti_body->fid1);
95         }
96
97         /* the last_committed and last_xid fields are filled in for all
98          * replies already - no need to do so here also.
99          */
100         RETURN(result);
101 }
102
103 static int mdt_statfs(struct mdt_thread_info *info,
104                       struct ptlrpc_request *req, int offset)
105 {
106         struct md_device  *next  = info->mti_mdt->mdt_child;
107         struct obd_statfs *osfs;
108         struct kstatfs    *sfs;
109         int               result;
110
111         ENTRY;
112
113         info->mti_rep_buf_size[0] = sizeof(struct obd_statfs);
114         result = lustre_pack_reply(req, 1, info->mti_rep_buf_size, NULL);
115         if (result)
116                 CERROR(LUSTRE_MDT0_NAME" out of memory for statfs: size=%d\n",
117                        sizeof(struct obd_statfs));
118         else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
119                 CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
120                 result = -ENOMEM;
121         } else {
122                 osfs = lustre_msg_buf(req->rq_repmsg, 0,  sizeof(struct obd_statfs));
123                 OBD_ALLOC_PTR(sfs);
124                 if(sfs == NULL)
125                         RETURN(-ENOMEM);
126                 /* XXX max_age optimisation is needed here. See mds_statfs */
127                 result = next->md_ops->mdo_statfs(info->mti_ctxt, next, sfs);
128                 statfs_pack(osfs, sfs);
129                 OBD_FREE_PTR(sfs);
130         }
131
132         RETURN(result);
133 }
134
135 static void mdt_pack_attr2body(struct mdt_body *b, struct lu_attr *attr)
136 {
137         b->valid |= OBD_MD_FLID | OBD_MD_FLCTIME | OBD_MD_FLUID |
138                     OBD_MD_FLGID | OBD_MD_FLFLAGS | OBD_MD_FLTYPE |
139                     OBD_MD_FLMODE | OBD_MD_FLNLINK | OBD_MD_FLGENER;
140
141         if (!S_ISREG(attr->la_mode))
142                 b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLATIME |
143                             OBD_MD_FLMTIME;
144
145         b->atime      = attr->la_atime;
146         b->mtime      = attr->la_mtime;
147         b->ctime      = attr->la_ctime;
148         b->mode       = attr->la_mode;
149         b->size       = attr->la_size;
150         b->blocks     = attr->la_blocks;
151         b->uid        = attr->la_uid;
152         b->gid        = attr->la_gid;
153         b->flags      = attr->la_flags;
154         b->nlink      = attr->la_nlink;
155 }
156
157 static int mdt_getattr(struct mdt_thread_info *info,
158                        struct ptlrpc_request *req, int offset)
159 {
160         int              result;
161
162         LASSERT(info->mti_object != NULL);
163
164         ENTRY;
165
166         info->mti_rep_buf_size[0] = sizeof(struct mdt_body);
167         result = lustre_pack_reply(req, 1, info->mti_rep_buf_size, NULL);
168         if (result)
169                 CERROR(LUSTRE_MDT0_NAME" cannot pack size=%d, rc=%d\n",
170                        sizeof(struct mdt_body), result);
171         else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
172                 CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
173                 result = -ENOMEM;
174         } else {
175                 struct md_object *next = mdt_object_child(info->mti_object);
176
177                 result = next->mo_ops->moo_attr_get(info->mti_ctxt, next,
178                                                     &info->mti_attr);
179                 if (result == 0) {
180                         info->mti_body = lustre_msg_buf(req->rq_repmsg, 0, sizeof(struct mdt_body));
181                         mdt_pack_attr2body(info->mti_body, &info->mti_attr);
182                         info->mti_body->fid1 = *mdt_object_fid(info->mti_object);
183                 }
184         }
185         RETURN(result);
186 }
187
188 static struct lu_device_operations mdt_lu_ops;
189
190 static int lu_device_is_mdt(struct lu_device *d)
191 {
192         /*
193          * XXX for now. Tags in lu_device_type->ldt_something are needed.
194          */
195         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
196 }
197
198 static struct mdt_device *mdt_dev(struct lu_device *d)
199 {
200         LASSERT(lu_device_is_mdt(d));
201         return container_of0(d, struct mdt_device, mdt_md_dev.md_lu_dev);
202 }
203
204 static int mdt_connect(struct mdt_thread_info *info,
205                        struct ptlrpc_request *req, int offset)
206 {
207         int result;
208
209         result = target_handle_connect(req, mdt_handle);
210         if (result == 0) {
211                 struct obd_connect_data *data;
212
213                 LASSERT(req->rq_export != NULL);
214                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
215
216                 data = lustre_msg_buf(req->rq_repmsg, 0, sizeof *data);
217                 result = seq_mgr_alloc(info->mti_ctxt,
218                                        info->mti_mdt->mdt_seq_mgr,
219                                        &data->ocd_seq);
220         }
221         return result;
222 }
223
224 static int mdt_disconnect(struct mdt_thread_info *info,
225                           struct ptlrpc_request *req, int offset)
226 {
227         return target_handle_disconnect(req);
228 }
229
230 static int mdt_getattr_name(struct mdt_thread_info *info,
231                             struct ptlrpc_request *req, int offset)
232 {
233         return -EOPNOTSUPP;
234 }
235
236 static int mdt_setxattr(struct mdt_thread_info *info,
237                         struct ptlrpc_request *req, int offset)
238 {
239         return -EOPNOTSUPP;
240 }
241
242 static int mdt_getxattr(struct mdt_thread_info *info,
243                         struct ptlrpc_request *req, int offset)
244 {
245         return -EOPNOTSUPP;
246 }
247
248 static int mdt_readpage(struct mdt_thread_info *info,
249                         struct ptlrpc_request *req, int offset)
250 {
251         return -EOPNOTSUPP;
252 }
253
254 static int mdt_reint_internal(struct mdt_thread_info *info,
255                        struct ptlrpc_request *req, 
256                        int offset,
257                        struct lustre_handle *lockh)
258 {
259         int rc;
260
261         rc = mdt_reint_unpack(info, req, offset);
262         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
263                 CERROR("invalid record\n");
264                 RETURN(rc = -EINVAL);
265         }
266         rc = mdt_reint_rec(info, lockh);
267         RETURN(rc);
268 }
269
270 static int mdt_reint(struct mdt_thread_info *info,
271                      struct ptlrpc_request *req, int offset)
272 {
273         __u32 *opcp = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF,
274                                      sizeof (*opcp));
275         __u32  opc;
276         int rc;
277
278         ENTRY;
279
280         /* NB only peek inside req now; mdt_XXX_unpack() will swab it */
281         if (opcp == NULL) {
282                 CERROR ("Can't inspect opcode\n");
283                 RETURN (-EINVAL);
284         }
285         opc = *opcp;
286         if (lustre_msg_swabbed (req->rq_reqmsg))
287                 __swab32s(&opc);
288
289         DEBUG_REQ(D_INODE, req, "reint opt = %d", opc);
290
291         OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
292
293         if (opc == REINT_UNLINK || opc == REINT_RENAME)
294                 info->mti_rep_buf_nr = 3;
295         else if (opc == REINT_OPEN)
296                 info->mti_rep_buf_nr = 2;
297         else
298                 info->mti_rep_buf_nr = 1;
299         info->mti_rep_buf_size[0] = sizeof(struct mdt_body);
300         info->mti_rep_buf_size[1] = sizeof(struct lov_mds_md); /*FIXME:See mds*/
301         info->mti_rep_buf_size[2] = sizeof(struct llog_cookie);/*FIXME:See mds*/
302         rc = lustre_pack_reply(req, info->mti_rep_buf_nr, info->mti_rep_buf_size, NULL);
303         if (rc)
304                 RETURN (rc);
305         rc = mdt_reint_internal(info, req, offset, NULL);
306         RETURN(rc);
307 }
308
309 static int mdt_close(struct mdt_thread_info *info,
310                      struct ptlrpc_request *req, int offset)
311 {
312         return -EOPNOTSUPP;
313 }
314
315 static int mdt_done_writing(struct mdt_thread_info *info,
316                             struct ptlrpc_request *req, int offset)
317 {
318         return -EOPNOTSUPP;
319 }
320
321 static int mdt_pin(struct mdt_thread_info *info,
322                    struct ptlrpc_request *req, int offset)
323 {
324         return -EOPNOTSUPP;
325 }
326
327 static int mdt_sync(struct mdt_thread_info *info,
328                     struct ptlrpc_request *req, int offset)
329 {
330         return -EOPNOTSUPP;
331 }
332
333 static int mdt_handle_quotacheck(struct mdt_thread_info *info,
334                                  struct ptlrpc_request *req, int offset)
335 {
336         return -EOPNOTSUPP;
337 }
338
339 static int mdt_handle_quotactl(struct mdt_thread_info *info,
340                                struct ptlrpc_request *req, int offset)
341 {
342         return -EOPNOTSUPP;
343 }
344
345 /*
346  * DLM handlers.
347  */
348
349 static struct ldlm_callback_suite cbs = {
350         .lcs_completion = ldlm_server_completion_ast,
351         .lcs_blocking   = ldlm_server_blocking_ast,
352         .lcs_glimpse    = NULL
353 };
354
355 static int mdt_enqueue(struct mdt_thread_info *info,
356                        struct ptlrpc_request *req, int offset)
357 {
358         /*
359          * info->mti_dlm_req already contains swapped and (if necessary)
360          * converted dlm request.
361          */
362         LASSERT(info->mti_dlm_req != NULL);
363
364         info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
365         return ldlm_handle_enqueue0(info->mti_mdt->mdt_namespace,
366                                     req, info->mti_dlm_req, &cbs);
367 }
368
369 static int mdt_convert(struct mdt_thread_info *info,
370                        struct ptlrpc_request *req, int offset)
371 {
372         LASSERT(info->mti_dlm_req);
373         return ldlm_handle_convert0(req, info->mti_dlm_req);
374 }
375
376 static int mdt_bl_callback(struct mdt_thread_info *info,
377                            struct ptlrpc_request *req, int offset)
378 {
379         CERROR("bl callbacks should not happen on MDS\n");
380         LBUG();
381         return -EOPNOTSUPP;
382 }
383
384 static int mdt_cp_callback(struct mdt_thread_info *info,
385                            struct ptlrpc_request *req, int offset)
386 {
387         CERROR("cp callbacks should not happen on MDS\n");
388         LBUG();
389         return -EOPNOTSUPP;
390 }
391
392 /*
393  * Build (DLM) resource name from fid.
394  */
395 struct ldlm_res_id *fid_build_res_name(const struct lu_fid *f,
396                                        struct ldlm_res_id *name)
397 {
398         memset(name, 0, sizeof *name);
399         /* we use fid_num() whoch includes also object version instread of raw
400          * fid_oid(). */
401         name->name[0] = fid_seq(f);
402         name->name[1] = fid_num(f);
403         return name;
404 }
405
406 /*
407  * Return true if resource is for object identified by fid.
408  */
409 int fid_res_name_eq(const struct lu_fid *f, const struct ldlm_res_id *name)
410 {
411         return name->name[0] == fid_seq(f) && name->name[1] == fid_num(f);
412 }
413
414 /* issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
415 int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
416              struct lustre_handle *lh, ldlm_mode_t mode,
417              ldlm_policy_data_t *policy)
418 {
419         struct ldlm_res_id res_id;
420         int flags = 0, rc;
421         ENTRY;
422
423         LASSERT(ns != NULL);
424         LASSERT(lh != NULL);
425         LASSERT(f != NULL);
426
427         /* FIXME: is that correct to have @flags=0 here? */
428         rc = ldlm_cli_enqueue(NULL, NULL, ns, *fid_build_res_name(f, &res_id),
429                               LDLM_IBITS, policy, mode, &flags,
430                               ldlm_blocking_ast, ldlm_completion_ast, NULL,
431                               NULL, NULL, 0, NULL, lh);
432         RETURN (rc == ELDLM_OK ? 0 : -EIO);
433 }
434
435 void fid_unlock(struct ldlm_namespace *ns, const struct lu_fid *f,
436                 struct lustre_handle *lh, ldlm_mode_t mode)
437 {
438         struct ldlm_lock *lock;
439         ENTRY;
440
441         /* FIXME: this is debug stuff, remove it later. */
442         lock = ldlm_handle2lock(lh);
443         if (!lock) {
444                 CERROR("invalid lock handle "LPX64, lh->cookie);
445                 LBUG();
446         }
447
448         LASSERT(fid_res_name_eq(f, &lock->l_resource->lr_name));
449
450         ldlm_lock_decref(lh, mode);
451         EXIT;
452 }
453
454 static struct mdt_object *mdt_obj(struct lu_object *o)
455 {
456         LASSERT(lu_device_is_mdt(o->lo_dev));
457         return container_of0(o, struct mdt_object, mot_obj.mo_lu);
458 }
459
460 struct mdt_object *mdt_object_find(struct lu_context *ctxt,
461                                    struct mdt_device *d,
462                                    struct lu_fid *f)
463 {
464         struct lu_object *o;
465
466         o = lu_object_find(ctxt, d->mdt_md_dev.md_lu_dev.ld_site, f);
467         if (IS_ERR(o))
468                 return (struct mdt_object *)o;
469         else
470                 return mdt_obj(o);
471 }
472
473 void mdt_object_put(struct lu_context *ctxt, struct mdt_object *o)
474 {
475         lu_object_put(ctxt, &o->mot_obj.mo_lu);
476 }
477
478 static struct lu_fid *mdt_object_fid(struct mdt_object *o)
479 {
480         return lu_object_fid(&o->mot_obj.mo_lu);
481 }
482
483 int mdt_object_lock(struct ldlm_namespace *ns, struct mdt_object *o,
484                     struct mdt_lock_handle *lh, __u64 ibits)
485 {
486         ldlm_policy_data_t p = {
487                 .l_inodebits = {
488                         .bits = ibits
489                 }
490         };
491         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
492         LASSERT(lh->mlh_mode != LCK_MINMODE);
493
494         return fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode, &p);
495 }
496
497 void mdt_object_unlock(struct ldlm_namespace *ns, struct mdt_object *o,
498                               struct mdt_lock_handle *lh)
499 {
500         if (lustre_handle_is_used(&lh->mlh_lh)) {
501                 fid_unlock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode);
502                 lh->mlh_lh.cookie = 0;
503         }
504 }
505
506 struct mdt_object *mdt_object_find_lock(struct lu_context *ctxt,
507                                         struct mdt_device *d,
508                                         struct lu_fid *f,
509                                         struct mdt_lock_handle *lh,
510                                         __u64 ibits)
511 {
512         struct mdt_object *o;
513
514         o = mdt_object_find(ctxt, d, f);
515         if (!IS_ERR(o)) {
516                 int result;
517
518                 result = mdt_object_lock(d->mdt_namespace, o, lh, ibits);
519                 if (result != 0) {
520                         mdt_object_put(ctxt, o);
521                         o = ERR_PTR(result);
522                 }
523         }
524         return o;
525 }
526
527 struct mdt_handler {
528         const char *mh_name;
529         int         mh_fail_id;
530         __u32       mh_opc;
531         __u32       mh_flags;
532         int (*mh_act)(struct mdt_thread_info *info,
533                       struct ptlrpc_request *req, int offset);
534 };
535
536 enum mdt_handler_flags {
537         /*
538          * struct mdt_body is passed in the 0-th incoming buffer.
539          */
540         HABEO_CORPUS = (1 << 0),
541         /*
542          * struct ldlm_request is passed in MDS_REQ_INTENT_LOCKREQ_OFF-th
543          * incoming buffer.
544          */
545         HABEO_CLAVIS = (1 << 1)
546 };
547
548 struct mdt_opc_slice {
549         __u32               mos_opc_start;
550         int                 mos_opc_end;
551         struct mdt_handler *mos_hs;
552 };
553
554 static struct mdt_opc_slice mdt_handlers[];
555
556 static struct mdt_handler *mdt_handler_find(__u32 opc)
557 {
558         struct mdt_opc_slice *s;
559         struct mdt_handler   *h;
560
561         h = NULL;
562         for (s = mdt_handlers; s->mos_hs != NULL; s++) {
563                 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
564                         h = s->mos_hs + (opc - s->mos_opc_start);
565                         if (h->mh_opc != 0)
566                                 LASSERT(h->mh_opc == opc);
567                         else
568                                 h = NULL; /* unsupported opc */
569                         break;
570                 }
571         }
572         return h;
573 }
574
575 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
576 {
577         return req->rq_export->exp_mds_data.med_mcd->mcd_last_xid;
578 }
579
580 static int mdt_lock_resname_compat(struct mdt_device *m,
581                                    struct ldlm_request *req)
582 {
583         /* XXX something... later. */
584         return 0;
585 }
586
587 static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
588 {
589         /* XXX something... later. */
590         return 0;
591 }
592
593 /*
594  * Invoke handler for this request opc. Also do necessary preprocessing
595  * (according to handler ->mh_flags), and post-processing (setting of
596  * ->last_{xid,committed}).
597  */
598 static int mdt_req_handle(struct mdt_thread_info *info,
599                           struct mdt_handler *h, struct ptlrpc_request *req,
600                           int shift)
601 {
602         int result;
603         int off;
604
605         ENTRY;
606
607         LASSERT(h->mh_act != NULL);
608         LASSERT(h->mh_opc == req->rq_reqmsg->opc);
609         LASSERT(current->journal_info == NULL);
610
611         DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
612
613         if (h->mh_fail_id != 0)
614                 OBD_FAIL_RETURN(h->mh_fail_id, 0);
615
616         off = MDS_REQ_REC_OFF + shift;
617
618         result = 0;
619         if (h->mh_flags & HABEO_CORPUS) {
620                 struct mdt_body *body;
621
622                 body = info->mti_body =
623                         lustre_swab_reqbuf(req, off, sizeof *info->mti_body,
624                                            lustre_swab_mdt_body);
625                 if (body != NULL) {
626                         info->mti_object = mdt_object_find(info->mti_ctxt,
627                                                            info->mti_mdt,
628                                                            &body->fid1);
629                         if (IS_ERR(info->mti_object)) {
630                                 result = PTR_ERR(info->mti_object);
631                                 info->mti_object = NULL;
632                         }
633                 } else {
634                         CERROR("Can't unpack body\n");
635                         result = -EFAULT;
636                 }
637         } else if (h->mh_flags & HABEO_CLAVIS) {
638                 struct ldlm_request *dlm;
639
640                 LASSERT(shift == 0);
641                 dlm = info->mti_dlm_req =
642                         lustre_swab_reqbuf(req, MDS_REQ_INTENT_LOCKREQ_OFF,
643                                            sizeof *dlm,
644                                            lustre_swab_ldlm_request);
645                 if (dlm != NULL) {
646                         if (info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME)
647                                 result = mdt_lock_resname_compat(info->mti_mdt,
648                                                                  dlm);
649                 } else {
650                         CERROR("Can't unpack dlm request\n");
651                         result = -EFAULT;
652                 }
653         }
654         if (result == 0)
655                 /*
656                  * Process request.
657                  */
658                 result = h->mh_act(info, req, off);
659         /*
660          * XXX result value is unconditionally shoved into ->rq_status
661          * (original code sometimes placed error code into ->rq_status, and
662          * sometimes returned it to the
663          * caller). ptlrpc_server_handle_request() doesn't check return value
664          * anyway.
665          */
666         req->rq_status = result;
667
668         LASSERT(current->journal_info == NULL);
669
670         if (h->mh_flags & HABEO_CLAVIS &&
671             info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME) {
672                 struct ldlm_reply *rep;
673
674                 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof *rep);
675                 if (rep != NULL)
676                         result = mdt_lock_reply_compat(info->mti_mdt, rep);
677         }
678
679         /* If we're DISCONNECTing, the mds_export_data is already freed */
680         if (result == 0 && h->mh_opc != MDS_DISCONNECT) {
681                 req->rq_reqmsg->last_xid = le64_to_cpu(req_exp_last_xid(req));
682                 target_committed_to_req(req);
683         }
684         RETURN(result);
685 }
686
687 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
688 {
689         lh->mlh_lh.cookie = 0ull;
690         lh->mlh_mode = LCK_MINMODE;
691 }
692
693 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
694 {
695         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
696 }
697
698 static void mdt_thread_info_init(struct mdt_thread_info *info)
699 {
700         int i;
701
702         info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
703         /*
704          * Poison size array.
705          */
706         for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
707                 info->mti_rep_buf_size[i] = ~0;
708         info->mti_rep_buf_nr = i;
709         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
710                 mdt_lock_handle_init(&info->mti_lh[i]);
711         lu_context_enter(info->mti_ctxt);
712 }
713
714 static void mdt_thread_info_fini(struct mdt_thread_info *info)
715 {
716         int i;
717
718         lu_context_exit(info->mti_ctxt);
719         if (info->mti_object != NULL) {
720                 mdt_object_put(info->mti_ctxt, info->mti_object);
721                 info->mti_object = NULL;
722         }
723         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
724                 mdt_lock_handle_fini(&info->mti_lh[i]);
725 }
726
727 static int mds_msg_check_version(struct lustre_msg *msg)
728 {
729         int rc;
730
731         /* TODO: enable the below check while really introducing msg version.
732          * it's disabled because it will break compatibility with b1_4.
733          */
734         return (0);
735
736         switch (msg->opc) {
737         case MDS_CONNECT:
738         case MDS_DISCONNECT:
739         case OBD_PING:
740                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
741                 if (rc)
742                         CERROR("bad opc %u version %08x, expecting %08x\n",
743                                msg->opc, msg->version, LUSTRE_OBD_VERSION);
744                 break;
745         case MDS_GETSTATUS:
746         case MDS_GETATTR:
747         case MDS_GETATTR_NAME:
748         case MDS_STATFS:
749         case MDS_READPAGE:
750         case MDS_REINT:
751         case MDS_CLOSE:
752         case MDS_DONE_WRITING:
753         case MDS_PIN:
754         case MDS_SYNC:
755         case MDS_GETXATTR:
756         case MDS_SETXATTR:
757         case MDS_SET_INFO:
758         case MDS_QUOTACHECK:
759         case MDS_QUOTACTL:
760         case QUOTA_DQACQ:
761         case QUOTA_DQREL:
762                 rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
763                 if (rc)
764                         CERROR("bad opc %u version %08x, expecting %08x\n",
765                                msg->opc, msg->version, LUSTRE_MDS_VERSION);
766                 break;
767         case LDLM_ENQUEUE:
768         case LDLM_CONVERT:
769         case LDLM_BL_CALLBACK:
770         case LDLM_CP_CALLBACK:
771                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
772                 if (rc)
773                         CERROR("bad opc %u version %08x, expecting %08x\n",
774                                msg->opc, msg->version, LUSTRE_DLM_VERSION);
775                 break;
776         case OBD_LOG_CANCEL:
777         case LLOG_ORIGIN_HANDLE_CREATE:
778         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
779         case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
780         case LLOG_ORIGIN_HANDLE_READ_HEADER:
781         case LLOG_ORIGIN_HANDLE_CLOSE:
782         case LLOG_CATINFO:
783                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
784                 if (rc)
785                         CERROR("bad opc %u version %08x, expecting %08x\n",
786                                msg->opc, msg->version, LUSTRE_LOG_VERSION);
787                 break;
788         default:
789                 CERROR("MDS unknown opcode %d\n", msg->opc);
790                 rc = -ENOTSUPP;
791         }
792         return rc;
793 }
794
795 static int mdt_filter_recovery_request(struct ptlrpc_request *req,
796                                        struct obd_device *obd, int *process)
797 {
798         switch (req->rq_reqmsg->opc) {
799         case MDS_CONNECT: /* This will never get here, but for completeness. */
800         case OST_CONNECT: /* This will never get here, but for completeness. */
801         case MDS_DISCONNECT:
802         case OST_DISCONNECT:
803                *process = 1;
804                RETURN(0);
805
806         case MDS_CLOSE:
807         case MDS_SYNC: /* used in unmounting */
808         case OBD_PING:
809         case MDS_REINT:
810         case LDLM_ENQUEUE:
811                 *process = target_queue_recovery_request(req, obd);
812                 RETURN(0);
813
814         default:
815                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
816                 *process = 0;
817                 /* XXX what should we set rq_status to here? */
818                 req->rq_status = -EAGAIN;
819                 RETURN(ptlrpc_error(req));
820         }
821 }
822
823 /*
824  * Handle recovery. Return:
825  *        +1: continue request processing;
826  *       -ve: abort immediately with the given error code;
827  *         0: send reply with error code in req->rq_status;
828  */
829 static int mdt_recovery(struct ptlrpc_request *req)
830 {
831         int recovering;
832         int abort_recovery;
833         struct obd_device *obd;
834
835         ENTRY;
836
837         if (req->rq_reqmsg->opc == MDS_CONNECT)
838                 RETURN(+1);
839
840         if (req->rq_export == NULL) {
841                 CERROR("operation %d on unconnected MDS from %s\n",
842                        req->rq_reqmsg->opc,
843                        libcfs_id2str(req->rq_peer));
844                 req->rq_status = -ENOTCONN;
845                 RETURN(0);
846         }
847
848         /* sanity check: if the xid matches, the request must be marked as a
849          * resent or replayed */
850         LASSERTF(ergo(req->rq_xid == req_exp_last_xid(req),
851                       lustre_msg_get_flags(req->rq_reqmsg) &
852                       (MSG_RESENT | MSG_REPLAY)),
853                  "rq_xid "LPU64" matches last_xid, "
854                  "expected RESENT flag\n", req->rq_xid);
855
856         /* else: note the opposite is not always true; a RESENT req after a
857          * failover will usually not match the last_xid, since it was likely
858          * never committed. A REPLAYed request will almost never match the
859          * last xid, however it could for a committed, but still retained,
860          * open. */
861
862         obd = req->rq_export->exp_obd;
863
864         /* Check for aborted recovery... */
865         spin_lock_bh(&obd->obd_processing_task_lock);
866         abort_recovery = obd->obd_abort_recovery;
867         recovering = obd->obd_recovering;
868         spin_unlock_bh(&obd->obd_processing_task_lock);
869         if (abort_recovery) {
870                 target_abort_recovery(obd);
871         } else if (recovering) {
872                 int rc;
873                 int should_process;
874
875                 rc = mdt_filter_recovery_request(req, obd, &should_process);
876                 if (rc != 0 || !should_process) {
877                         LASSERT(rc < 0);
878                         RETURN(rc);
879                 }
880         }
881         RETURN(+1);
882 }
883
884 static int mdt_reply(struct ptlrpc_request *req, struct mdt_thread_info *info)
885 {
886         struct obd_device *obd;
887
888         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
889                 if (req->rq_reqmsg->opc != OBD_PING)
890                         DEBUG_REQ(D_ERROR, req, "Unexpected MSG_LAST_REPLAY");
891
892                 obd = req->rq_export != NULL ? req->rq_export->exp_obd : NULL;
893                 if (obd && obd->obd_recovering) {
894                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
895                         RETURN(target_queue_final_reply(req, req->rq_status));
896                 } else {
897                         /* Lost a race with recovery; let the error path
898                          * DTRT. */
899                         req->rq_status = -ENOTCONN;
900                 }
901         }
902         target_send_reply(req, req->rq_status, info->mti_fail_id);
903         RETURN(req->rq_status);
904 }
905
906 static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info)
907 {
908         struct mdt_handler *h;
909         struct lustre_msg  *msg;
910         int                 result;
911
912         ENTRY;
913
914         OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
915
916         LASSERT(current->journal_info == NULL);
917
918         msg = req->rq_reqmsg;
919         result = mds_msg_check_version(msg);
920         if (result == 0) {
921                 result = mdt_recovery(req);
922                 switch (result) {
923                 case +1:
924                         h = mdt_handler_find(msg->opc);
925                         if (h != NULL)
926                                 result = mdt_req_handle(info, h, req, 0);
927                         else {
928                                 req->rq_status = -ENOTSUPP;
929                                 result = ptlrpc_error(req);
930                                 break;
931                         }
932                         /* fall through */
933                 case 0:
934                         result = mdt_reply(req, info);
935                 }
936         } else
937                 CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
938         RETURN(result);
939 }
940
941 /*
942  * MDT handler function called by ptlrpc service thread when request comes.
943  *
944  * XXX common "target" functionality should be factored into separate module
945  * shared by mdt, ost and stand-alone services like fld.
946  */
947 static int mdt_handle(struct ptlrpc_request *req)
948 {
949         int result;
950         struct lu_context      *ctx;
951         struct mdt_thread_info *info;
952         ENTRY;
953
954         ctx = req->rq_svc_thread->t_ctx;
955         LASSERT(ctx != NULL);
956         LASSERT(ctx->lc_thread == req->rq_svc_thread);
957
958         info = lu_context_key_get(ctx, &mdt_thread_key);
959         LASSERT(info != NULL);
960
961         mdt_thread_info_init(info);
962         /* it can be NULL while CONNECT */
963         if (req->rq_export)
964                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
965
966         result = mdt_handle0(req, info);
967         mdt_thread_info_fini(info);
968         RETURN(result);
969 }
970
971 /*Please move these function from mds to mdt*/
972 int intent_disposition(struct ldlm_reply *rep, int flag)
973 {
974         if (!rep)
975                 return 0;
976         return (rep->lock_policy_res1 & flag);
977 }
978
979 void intent_set_disposition(struct ldlm_reply *rep, int flag)
980 {
981         if (!rep)
982                 return;
983         rep->lock_policy_res1 |= flag;
984 }
985
986 static void fixup_handle_for_resent_req(struct mdt_thread_info *info,
987                                         struct ptlrpc_request *req, 
988                                         int offset,
989                                         struct ldlm_lock *new_lock,
990                                         struct ldlm_lock **old_lock,
991                                         struct lustre_handle *lockh)
992 {
993         struct obd_export *exp = req->rq_export;
994         struct mdt_device * mdt = info->mti_mdt;
995         struct ldlm_request *dlmreq =
996                 lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*dlmreq));
997         struct lustre_handle remote_hdl = dlmreq->lock_handle1;
998         struct list_head *iter;
999
1000         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
1001                 return;
1002
1003         l_lock(&mdt->mdt_namespace->ns_lock);
1004         list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
1005                 struct ldlm_lock *lock;
1006                 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
1007                 if (lock == new_lock)
1008                         continue;
1009                 if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
1010                         lockh->cookie = lock->l_handle.h_cookie;
1011                         LDLM_DEBUG(lock, "restoring lock cookie");
1012                         DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
1013                                   lockh->cookie);
1014                         if (old_lock)
1015                                 *old_lock = LDLM_LOCK_GET(lock);
1016                         l_unlock(&mdt->mdt_namespace->ns_lock);
1017                         return;
1018                 }
1019         }
1020         l_unlock(&mdt->mdt_namespace->ns_lock);
1021
1022         /* If the xid matches, then we know this is a resent request,
1023          * and allow it. (It's probably an OPEN, for which we don't
1024          * send a lock */
1025         if (req->rq_xid ==
1026             le64_to_cpu(exp->exp_mds_data.med_mcd->mcd_last_xid))
1027                 return;
1028
1029         /* This remote handle isn't enqueued, so we never received or
1030          * processed this request.  Clear MSG_RESENT, because it can
1031          * be handled like any normal request now. */
1032
1033         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
1034
1035         DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
1036                   remote_hdl.cookie);
1037 }
1038
1039 static int mdt_intent_policy(struct ldlm_namespace *ns,
1040                              struct ldlm_lock **lockp, void *req_cookie,
1041                              ldlm_mode_t mode, int flags, void *data)
1042 {
1043         struct ptlrpc_request *req = req_cookie;
1044         struct ldlm_lock *lock = *lockp;
1045         struct ldlm_intent *it;
1046         struct ldlm_reply *rep;
1047         struct lustre_handle lockh = { 0 };
1048         struct ldlm_lock *new_lock = NULL;
1049         int getattr_part = MDS_INODELOCK_UPDATE;
1050         int offset = MDS_REQ_INTENT_REC_OFF;
1051         int rc;
1052         struct mdt_thread_info *info;
1053
1054         ENTRY;
1055
1056         LASSERT(req != NULL);
1057
1058         /* We already got it in mdt_handle. But we have to do it again*/
1059         info = lu_context_key_get(req->rq_svc_thread->t_ctx, &mdt_thread_key);
1060         mdt_thread_info_init(info);
1061
1062
1063         if (req->rq_reqmsg->bufcount <= MDS_REQ_INTENT_IT_OFF) {
1064                 /* No intent was provided */
1065                 int size = sizeof(struct ldlm_reply);
1066                 rc = lustre_pack_reply(req, 1, &size, NULL);
1067                 LASSERT(rc == 0);
1068                 RETURN(0);
1069         }
1070
1071         it = lustre_swab_reqbuf(req, MDS_REQ_INTENT_IT_OFF, sizeof(*it),
1072                                 lustre_swab_ldlm_intent);
1073         if (it == NULL) {
1074                 CERROR("Intent missing\n");
1075                 RETURN(req->rq_status = -EFAULT);
1076         }
1077
1078         LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc));
1079         info->mti_rep_buf_nr = 3;
1080         info->mti_rep_buf_size[0] = sizeof(*rep);
1081         info->mti_rep_buf_size[1] = sizeof(struct mdt_body);
1082         info->mti_rep_buf_size[2] = sizeof(struct lov_mds_md);/*FIXME:See mds*/
1083
1084         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
1085             (it->opc & (IT_OPEN | IT_GETATTR | IT_LOOKUP))){
1086                 /* we should never allow OBD_CONNECT_ACL if not configured */
1087                 info->mti_rep_buf_size[info->mti_rep_buf_nr++] = 
1088                                         LUSTRE_POSIX_ACL_MAX_SIZE;
1089         }
1090         else if (it->opc & IT_UNLINK){
1091                 info->mti_rep_buf_size[info->mti_rep_buf_nr++] = 
1092                                         sizeof(struct llog_cookie); 
1093                                         /*FIXME:See mds*/
1094         }
1095
1096         rc = lustre_pack_reply(req, info->mti_rep_buf_nr, 
1097                                info->mti_rep_buf_size, NULL);
1098         if (rc)
1099                 RETURN(req->rq_status = rc);
1100
1101         rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
1102         intent_set_disposition(rep, DISP_IT_EXECD);
1103
1104
1105         /* execute policy */
1106         switch ((long)it->opc) {
1107         case IT_OPEN:
1108         case IT_CREAT|IT_OPEN:
1109                 fixup_handle_for_resent_req(info,req, MDS_REQ_INTENT_LOCKREQ_OFF,
1110                                             lock, NULL, &lockh);
1111                 /* XXX swab here to assert that an mds_open reint
1112                  * packet is following */
1113                 rep->lock_policy_res2 = mdt_reint_internal(info, req,
1114                                                 offset, &lockh);
1115 #if 0
1116                 /* We abort the lock if the lookup was negative and
1117                  * we did not make it to the OPEN portion */
1118                 if (!intent_disposition(rep, DISP_LOOKUP_EXECD))
1119                         RETURN(ELDLM_LOCK_ABORTED);
1120                 if (intent_disposition(rep, DISP_LOOKUP_NEG) &&
1121                     !intent_disposition(rep, DISP_OPEN_OPEN))
1122                         RETURN(ELDLM_LOCK_ABORTED);
1123 #endif
1124                 break;
1125         case IT_LOOKUP:
1126                         getattr_part = MDS_INODELOCK_LOOKUP;
1127         case IT_GETATTR:
1128                         getattr_part |= MDS_INODELOCK_LOOKUP;
1129         case IT_READDIR:
1130 #if 0
1131                 fixup_handle_for_resent_req(info, req, MDS_REQ_INTENT_LOCKREQ_OFF,
1132                                             lock, &new_lock, &lockh);
1133
1134                 /* INODEBITS_INTEROP: if this lock was converted from a
1135                  * plain lock (client does not support inodebits), then
1136                  * child lock must be taken with both lookup and update
1137                  * bits set for all operations.
1138                  */
1139                 if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS))
1140                         getattr_part = MDS_INODELOCK_LOOKUP |
1141                                        MDS_INODELOCK_UPDATE;
1142
1143                 rep->lock_policy_res2 = mds_getattr_name(offset, req,
1144                                                          getattr_part, &lockh);
1145                 /* FIXME: LDLM can set req->rq_status. MDS sets
1146                    policy_res{1,2} with disposition and status.
1147                    - replay: returns 0 & req->status is old status
1148                    - otherwise: returns req->status */
1149                 if (intent_disposition(rep, DISP_LOOKUP_NEG))
1150                         rep->lock_policy_res2 = 0;
1151                 if (!intent_disposition(rep, DISP_LOOKUP_POS) ||
1152                     rep->lock_policy_res2)
1153                         RETURN(ELDLM_LOCK_ABORTED);
1154                 if (req->rq_status != 0) {
1155                         LBUG();
1156                         rep->lock_policy_res2 = req->rq_status;
1157                         RETURN(ELDLM_LOCK_ABORTED);
1158                 }
1159 #endif
1160                 RETURN(ELDLM_LOCK_ABORTED);
1161                 break;
1162         default:
1163                 CERROR("Unhandled intent "LPD64"\n", it->opc);
1164                 LBUG();
1165         }
1166
1167         /* By this point, whatever function we called above must have either
1168          * filled in 'lockh', been an intent replay, or returned an error.  We
1169          * want to allow replayed RPCs to not get a lock, since we would just
1170          * drop it below anyways because lock replay is done separately by the
1171          * client afterwards.  For regular RPCs we want to give the new lock to
1172          * the client instead of whatever lock it was about to get. */
1173         if (new_lock == NULL)
1174                 new_lock = ldlm_handle2lock(&lockh);
1175         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
1176                 RETURN(0);
1177
1178         LASSERTF(new_lock != NULL, "op "LPX64" lockh "LPX64"\n",
1179                  it->opc, lockh.cookie);
1180
1181         /* If we've already given this lock to a client once, then we should
1182          * have no readers or writers.  Otherwise, we should have one reader
1183          * _or_ writer ref (which will be zeroed below) before returning the
1184          * lock to a client. */
1185         if (new_lock->l_export == req->rq_export) {
1186                 LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
1187         } else {
1188                 LASSERT(new_lock->l_export == NULL);
1189                 LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
1190         }
1191
1192         *lockp = new_lock;
1193
1194         if (new_lock->l_export == req->rq_export) {
1195                 /* Already gave this to the client, which means that we
1196                  * reconstructed a reply. */
1197                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
1198                         MSG_RESENT);
1199                 RETURN(ELDLM_LOCK_REPLACED);
1200         }
1201
1202         /* Fixup the lock to be given to the client */
1203         l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
1204         new_lock->l_readers = 0;
1205         new_lock->l_writers = 0;
1206
1207         new_lock->l_export = class_export_get(req->rq_export);
1208         list_add(&new_lock->l_export_chain,
1209                  &new_lock->l_export->exp_ldlm_data.led_held_locks);
1210
1211         new_lock->l_blocking_ast = lock->l_blocking_ast;
1212         new_lock->l_completion_ast = lock->l_completion_ast;
1213
1214         memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle,
1215                sizeof(lock->l_remote_handle));
1216
1217         new_lock->l_flags &= ~LDLM_FL_LOCAL;
1218
1219         LDLM_LOCK_PUT(new_lock);
1220         l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
1221
1222         RETURN(ELDLM_LOCK_REPLACED);
1223 }
1224
1225 static int mdt_config(struct lu_context *ctx, struct mdt_device *m,
1226                       const char *name, void *buf, int size, int mode)
1227 {
1228         struct md_device *child = m->mdt_child;
1229         ENTRY;
1230         RETURN(child->md_ops->mdo_config(ctx, child, name, buf, size, mode));
1231 }
1232
1233 static int mdt_seq_mgr_hpr(struct lu_context *ctx, void *opaque, __u64 *seq,
1234                            int mode)
1235 {
1236         struct mdt_device *m = opaque;
1237         int rc;
1238         ENTRY;
1239
1240         rc = mdt_config(ctx, m, LUSTRE_CONFIG_METASEQ,
1241                         seq, sizeof(*seq), mode);
1242         RETURN(rc);
1243 }
1244
1245 static int mdt_seq_mgr_read(struct lu_context *ctx, void *opaque, __u64 *seq)
1246 {
1247         ENTRY;
1248         RETURN(mdt_seq_mgr_hpr(ctx, opaque, seq, LUSTRE_CONFIG_GET));
1249 }
1250
1251 static int mdt_seq_mgr_write(struct lu_context *ctx, void *opaque, __u64 *seq)
1252 {
1253         ENTRY;
1254         RETURN(mdt_seq_mgr_hpr(ctx, opaque, seq, LUSTRE_CONFIG_SET));
1255 }
1256
1257 struct lu_seq_mgr_ops seq_mgr_ops = {
1258         .smo_read  = mdt_seq_mgr_read,
1259         .smo_write = mdt_seq_mgr_write
1260 };
1261
1262 /*
1263  * FLD wrappers
1264  */
1265
1266 static int mdt_fld_init(struct mdt_device *m)
1267 {
1268         struct lu_site *ls;
1269         int rc;
1270         ENTRY;
1271
1272         ls = m->mdt_md_dev.md_lu_dev.ld_site;
1273
1274         OBD_ALLOC_PTR(ls->ls_fld);
1275
1276         if (ls->ls_fld != NULL)
1277                 rc = fld_server_init(ls->ls_fld, m->mdt_bottom);
1278         else
1279                 rc = -ENOMEM;
1280
1281         RETURN(rc);
1282 }
1283
1284 static int mdt_fld_fini(struct mdt_device *m)
1285 {
1286         struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
1287         int rc = 0;
1288
1289         if (ls && ls->ls_fld) {
1290                 fld_server_fini(ls->ls_fld);
1291                 OBD_FREE_PTR(ls->ls_fld);
1292         }
1293         RETURN(rc);
1294 }
1295
1296 /* device init/fini methods */
1297
1298 static void mdt_stop_ptlrpc_service(struct mdt_device *m)
1299 {
1300         if (m->mdt_service != NULL) {
1301                 ptlrpc_unregister_service(m->mdt_service);
1302                 m->mdt_service = NULL;
1303         }
1304 }
1305
1306 static int mdt_start_ptlrpc_service(struct mdt_device *m)
1307 {
1308         int rc;
1309         struct ptlrpc_service_conf conf = {
1310                 .psc_nbufs            = MDS_NBUFS,
1311                 .psc_bufsize          = MDS_BUFSIZE,
1312                 .psc_max_req_size     = MDS_MAXREQSIZE,
1313                 .psc_max_reply_size   = MDS_MAXREPSIZE,
1314                 .psc_req_portal       = MDS_REQUEST_PORTAL,
1315                 .psc_rep_portal       = MDC_REPLY_PORTAL,
1316                 .psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT,
1317                 /*
1318                  * We'd like to have a mechanism to set this on a per-device
1319                  * basis, but alas...
1320                  */
1321                 .psc_num_threads = min(max(mdt_num_threads, MDT_MIN_THREADS),
1322                                        MDT_MAX_THREADS)
1323         };
1324
1325         ENTRY;
1326
1327
1328         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1329                            "mdt_ldlm_client", &m->mdt_ldlm_client);
1330
1331         m->mdt_service =
1332                 ptlrpc_init_svc_conf(&conf, mdt_handle, LUSTRE_MDT0_NAME,
1333                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
1334                                      NULL);
1335         if (m->mdt_service == NULL)
1336                 RETURN(-ENOMEM);
1337
1338         rc = ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
1339         if (rc)
1340                 GOTO(err_mdt_svc, rc);
1341
1342         RETURN(rc);
1343 err_mdt_svc:
1344         ptlrpc_unregister_service(m->mdt_service);
1345         m->mdt_service = NULL;
1346
1347         RETURN(rc);
1348 }
1349
1350 static void mdt_stack_fini(struct mdt_device *m, struct lu_device *d)
1351 {
1352         /* goes through all stack */
1353         while (d != NULL) {
1354                 struct lu_device *n;
1355                 struct obd_type *type;
1356                 struct lu_device_type *ldt = d->ld_type;
1357
1358                 lu_device_put(d);
1359
1360                 /* each fini() returns next device in stack of layers
1361                  * * so we can avoid the recursion */
1362                 n = ldt->ldt_ops->ldto_device_fini(d);
1363                 ldt->ldt_ops->ldto_device_free(d);
1364
1365                 type = ldt->ldt_obd_type;
1366                 type->typ_refcnt--;
1367                 class_put_type(type);
1368                 /* switch to the next device in the layer */
1369                 d = n;
1370         }
1371         m->mdt_child = NULL;
1372 }
1373
1374 static struct lu_device *mdt_layer_setup(const char *typename,
1375                                          struct lu_device *child,
1376                                          struct lustre_cfg *cfg)
1377 {
1378         struct obd_type       *type;
1379         struct lu_device_type *ldt;
1380         struct lu_device      *d;
1381         int rc;
1382
1383         /* find the type */
1384         type = class_get_type(typename);
1385         if (!type) {
1386                 CERROR("Unknown type: '%s'\n", typename);
1387                 GOTO(out, rc = -ENODEV);
1388         }
1389
1390         ldt = type->typ_lu;
1391         ldt->ldt_obd_type = type;
1392         if (ldt == NULL) {
1393                 CERROR("type: '%s'\n", typename);
1394                 GOTO(out_type, rc = -EINVAL);
1395         }
1396
1397         d = ldt->ldt_ops->ldto_device_alloc(ldt, cfg);
1398         if (IS_ERR(d)) {
1399                 CERROR("Cannot allocate device: '%s'\n", typename);
1400                 GOTO(out_type, rc = -ENODEV);
1401         }
1402
1403         LASSERT(child->ld_site);
1404         d->ld_site = child->ld_site;
1405
1406         type->typ_refcnt++;
1407         rc = ldt->ldt_ops->ldto_device_init(d, child);
1408         if (rc) {
1409                 CERROR("can't init device '%s', rc %d\n", typename, rc);
1410                 GOTO(out_alloc, rc);
1411         }
1412         lu_device_get(d);
1413
1414         RETURN(d);
1415 out_alloc:
1416         ldt->ldt_ops->ldto_device_free(d);
1417         type->typ_refcnt--;
1418 out_type:
1419         class_put_type(type);
1420 out:
1421         RETURN(ERR_PTR(rc));
1422 }
1423
1424 static int mdt_stack_init(struct mdt_device *m, struct lustre_cfg *cfg)
1425 {
1426         struct lu_device  *d = &m->mdt_md_dev.md_lu_dev;
1427         struct lu_device  *tmp;
1428         int rc;
1429
1430         /* init the stack */
1431         tmp = mdt_layer_setup(LUSTRE_OSD0_NAME, d, cfg);
1432         if (IS_ERR(tmp)) {
1433                 RETURN (PTR_ERR(tmp));
1434         }
1435         m->mdt_bottom = lu2dt_dev(tmp);
1436         d = tmp;
1437         tmp = mdt_layer_setup(LUSTRE_MDD0_NAME, d, cfg);
1438         if (IS_ERR(tmp)) {
1439                 GOTO(out, rc = PTR_ERR(tmp));
1440         }
1441         d = tmp;
1442         tmp = mdt_layer_setup(LUSTRE_CMM0_NAME, d, cfg);
1443         if (IS_ERR(tmp)) {
1444                 GOTO(out, rc = PTR_ERR(tmp));
1445         }
1446         d = tmp;
1447         m->mdt_child = lu2md_dev(d);
1448
1449         /* process setup config */
1450         tmp = &m->mdt_md_dev.md_lu_dev;
1451         rc = tmp->ld_ops->ldo_process_config(tmp, cfg);
1452
1453 out:
1454         /* fini from last known good lu_device */
1455         if (rc)
1456                 mdt_stack_fini(m, d);
1457
1458         return rc;
1459 }
1460
1461 static void mdt_fini(struct mdt_device *m)
1462 {
1463         struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
1464
1465         ENTRY;
1466
1467         mdt_stop_ptlrpc_service(m);
1468
1469         /* finish the stack */
1470         mdt_stack_fini(m, md2lu_dev(m->mdt_child));
1471
1472         if (d->ld_site != NULL) {
1473                 lu_site_fini(d->ld_site);
1474                 OBD_FREE_PTR(d->ld_site);
1475                 d->ld_site = NULL;
1476         }
1477         if (m->mdt_namespace != NULL) {
1478                 ldlm_namespace_free(m->mdt_namespace, 0);
1479                 m->mdt_namespace = NULL;
1480         }
1481
1482         if (m->mdt_seq_mgr) {
1483                 seq_mgr_fini(m->mdt_seq_mgr);
1484                 m->mdt_seq_mgr = NULL;
1485         }
1486
1487         LASSERT(atomic_read(&d->ld_ref) == 0);
1488         md_device_fini(&m->mdt_md_dev);
1489         EXIT;
1490 }
1491
1492 static int mdt_init0(struct mdt_device *m,
1493                      struct lu_device_type *t, struct lustre_cfg *cfg)
1494 {
1495         int rc;
1496         struct lu_site *s;
1497         char   ns_name[48];
1498         struct lu_context ctx;
1499         const char *dev = lustre_cfg_string(cfg, 0);
1500         struct obd_device *obd;
1501         ENTRY;
1502
1503         obd = class_name2obd(dev);
1504         m->mdt_md_dev.md_lu_dev.ld_obd = obd;
1505
1506         OBD_ALLOC_PTR(s);
1507         if (s == NULL)
1508                 RETURN(-ENOMEM);
1509
1510         md_device_init(&m->mdt_md_dev, t);
1511         m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
1512
1513         rc = lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
1514         if (rc) {
1515                 CERROR("can't init lu_site, rc %d\n", rc);
1516                 GOTO(err_fini_site, rc);
1517         }
1518
1519         /* init the stack */
1520         rc = mdt_stack_init(m, cfg);
1521         if (rc) {
1522                 CERROR("can't init device stack, rc %d\n", rc);
1523                 GOTO(err_fini_site, rc);
1524         }
1525
1526         m->mdt_seq_mgr = seq_mgr_init(&seq_mgr_ops, m);
1527         if (!m->mdt_seq_mgr) {
1528                 CERROR("can't initialize sequence manager\n");
1529                 GOTO(err_fini_stack, rc);
1530         }
1531
1532         rc = lu_context_init(&ctx);
1533         if (rc != 0)
1534                 GOTO(err_fini_mgr, rc);
1535
1536         lu_context_enter(&ctx);
1537         /* init sequence info after device stack is initialized. */
1538         rc = seq_mgr_setup(&ctx, m->mdt_seq_mgr);
1539         lu_context_exit(&ctx);
1540         if (rc)
1541                 GOTO(err_fini_ctx, rc);
1542
1543         lu_context_fini(&ctx);
1544
1545         snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
1546         m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
1547         if (m->mdt_namespace == NULL)
1548                 GOTO(err_fini_site, rc = -ENOMEM);
1549
1550         ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
1551
1552         rc = mdt_fld_init(m);
1553         if (rc)
1554                 GOTO(err_free_ns, rc);
1555
1556         rc = mdt_start_ptlrpc_service(m);
1557         if (rc)
1558                 GOTO(err_free_fld, rc);
1559
1560         RETURN(0);
1561
1562 err_free_fld:
1563         mdt_fld_fini(m);
1564 err_free_ns:
1565         ldlm_namespace_free(m->mdt_namespace, 0);
1566         m->mdt_namespace = NULL;
1567 err_fini_ctx:
1568         lu_context_fini(&ctx);
1569 err_fini_mgr:
1570         seq_mgr_fini(m->mdt_seq_mgr);
1571         m->mdt_seq_mgr = NULL;
1572 err_fini_stack:
1573         mdt_stack_fini(m, md2lu_dev(m->mdt_child));
1574 err_fini_site:
1575         lu_site_fini(s);
1576         OBD_FREE_PTR(s);
1577         RETURN(rc);
1578 }
1579
1580 /* used by MGS to process specific configurations */
1581 static int mdt_process_config(struct lu_device *d, struct lustre_cfg *cfg)
1582 {
1583         struct lu_device *next = md2lu_dev(mdt_dev(d)->mdt_child);
1584         int err;
1585         ENTRY;
1586
1587         switch (cfg->lcfg_command) {
1588                 /* all MDT specific commands should be here */
1589         default:
1590                 /* others are passed further */
1591                 err = next->ld_ops->ldo_process_config(next, cfg);
1592         }
1593         RETURN(err);
1594 }
1595
1596 static struct lu_object *mdt_object_alloc(struct lu_context *ctxt,
1597                                           struct lu_device *d)
1598 {
1599         struct mdt_object *mo;
1600
1601         OBD_ALLOC_PTR(mo);
1602         if (mo != NULL) {
1603                 struct lu_object *o;
1604                 struct lu_object_header *h;
1605
1606                 o = &mo->mot_obj.mo_lu;
1607                 h = &mo->mot_header;
1608                 lu_object_header_init(h);
1609                 lu_object_init(o, h, d);
1610                 lu_object_add_top(h, o);
1611                 o->lo_ops = &mdt_obj_ops;
1612                 return o;
1613         } else
1614                 return NULL;
1615 }
1616
1617 static int mdt_object_init(struct lu_context *ctxt, struct lu_object *o)
1618 {
1619         struct mdt_device *d = mdt_dev(o->lo_dev);
1620         struct lu_device  *under;
1621         struct lu_object  *below;
1622
1623         under = &d->mdt_child->md_lu_dev;
1624         below = under->ld_ops->ldo_object_alloc(ctxt, under);
1625         if (below != NULL) {
1626                 lu_object_add(o, below);
1627                 return 0;
1628         } else
1629                 return -ENOMEM;
1630 }
1631
1632 static void mdt_object_free(struct lu_context *ctxt, struct lu_object *o)
1633 {
1634         struct mdt_object *mo = mdt_obj(o);
1635         struct lu_object_header *h;
1636
1637         h = o->lo_header;
1638         lu_object_fini(o);
1639         lu_object_header_fini(h);
1640         OBD_FREE_PTR(mo);
1641 }
1642
1643 static void mdt_object_release(struct lu_context *ctxt, struct lu_object *o)
1644 {
1645 }
1646
1647 static int mdt_object_exists(struct lu_context *ctx, struct lu_object *o)
1648 {
1649         struct lu_object *next = lu_object_next(o);
1650
1651         return next->lo_ops->loo_object_exists(ctx, next);
1652 }
1653
1654 static int mdt_object_print(struct lu_context *ctxt,
1655                             struct seq_file *f, const struct lu_object *o)
1656 {
1657         return seq_printf(f, LUSTRE_MDT0_NAME"-object@%p", o);
1658 }
1659
1660 static struct lu_device_operations mdt_lu_ops = {
1661         .ldo_object_alloc   = mdt_object_alloc,
1662         .ldo_object_free    = mdt_object_free,
1663         .ldo_process_config = mdt_process_config
1664 };
1665
1666 static struct lu_object_operations mdt_obj_ops = {
1667         .loo_object_init    = mdt_object_init,
1668         .loo_object_release = mdt_object_release,
1669         .loo_object_print   = mdt_object_print,
1670         .loo_object_exists  = mdt_object_exists
1671 };
1672
1673 /* mds_connect_internal */
1674 static int mdt_connect0(struct mdt_device *mdt,
1675                         struct obd_export *exp, struct obd_connect_data *data)
1676 {
1677         if (data != NULL) {
1678                 data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
1679                 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
1680
1681                 /* If no known bits (which should not happen, probably,
1682                    as everybody should support LOOKUP and UPDATE bits at least)
1683                    revert to compat mode with plain locks. */
1684                 if (!data->ocd_ibits_known &&
1685                     data->ocd_connect_flags & OBD_CONNECT_IBITS)
1686                         data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
1687
1688                 if (!mdt->mdt_opts.mo_acl)
1689                         data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
1690
1691                 if (!mdt->mdt_opts.mo_user_xattr)
1692                         data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
1693
1694                 exp->exp_connect_flags = data->ocd_connect_flags;
1695                 data->ocd_version = LUSTRE_VERSION_CODE;
1696                 exp->exp_mds_data.med_ibits_known = data->ocd_ibits_known;
1697         }
1698
1699         if (mdt->mdt_opts.mo_acl &&
1700             ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
1701                 CWARN("%s: MDS requires ACL support but client does not\n",
1702                       mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
1703                 return -EBADE;
1704         }
1705         return 0;
1706 }
1707
1708 /* mds_connect copy */
1709 static int mdt_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
1710                            struct obd_uuid *cluuid,
1711                            struct obd_connect_data *data)
1712 {
1713         struct obd_export *exp;
1714         int rc;
1715         struct mdt_device *mdt;
1716         struct mds_export_data *med;
1717         struct mds_client_data *mcd = NULL;
1718         ENTRY;
1719
1720         if (!conn || !obd || !cluuid)
1721                 RETURN(-EINVAL);
1722
1723         mdt = mdt_dev(obd->obd_lu_dev);
1724
1725         rc = class_connect(conn, obd, cluuid);
1726         if (rc)
1727                 RETURN(rc);
1728
1729         exp = class_conn2export(conn);
1730         LASSERT(exp != NULL);
1731         med = &exp->exp_mds_data;
1732
1733         rc = mdt_connect0(mdt, exp, data);
1734         if (rc == 0) {
1735                 OBD_ALLOC_PTR(mcd);
1736                 if (mcd != NULL) {
1737                         memcpy(mcd->mcd_uuid, cluuid, sizeof mcd->mcd_uuid);
1738                         med->med_mcd = mcd;
1739                 } else
1740                         rc = -ENOMEM;
1741         }
1742         if (rc)
1743                 class_disconnect(exp);
1744         else
1745                 class_export_put(exp);
1746
1747         RETURN(rc);
1748 }
1749
1750 static int mdt_obd_disconnect(struct obd_export *exp)
1751 {
1752         struct mds_export_data *med = &exp->exp_mds_data;
1753         unsigned long irqflags;
1754         int rc;
1755         ENTRY;
1756
1757         LASSERT(exp);
1758         class_export_get(exp);
1759
1760         /* Disconnect early so that clients can't keep using export */
1761         rc = class_disconnect(exp);
1762         //ldlm_cancel_locks_for_export(exp);
1763
1764         /* complete all outstanding replies */
1765         spin_lock_irqsave(&exp->exp_lock, irqflags);
1766         while (!list_empty(&exp->exp_outstanding_replies)) {
1767                 struct ptlrpc_reply_state *rs =
1768                         list_entry(exp->exp_outstanding_replies.next,
1769                                    struct ptlrpc_reply_state, rs_exp_list);
1770                 struct ptlrpc_service *svc = rs->rs_service;
1771
1772                 spin_lock(&svc->srv_lock);
1773                 list_del_init(&rs->rs_exp_list);
1774                 ptlrpc_schedule_difficult_reply(rs);
1775                 spin_unlock(&svc->srv_lock);
1776         }
1777         spin_unlock_irqrestore(&exp->exp_lock, irqflags);
1778
1779         OBD_FREE_PTR(med->med_mcd);
1780
1781         class_export_put(exp);
1782         RETURN(rc);
1783 }
1784
1785 static struct obd_ops mdt_obd_device_ops = {
1786         .o_owner = THIS_MODULE,
1787         .o_connect = mdt_obd_connect,
1788         .o_disconnect = mdt_obd_disconnect,
1789 };
1790
1791 static struct lu_device *mdt_device_alloc(struct lu_device_type *t,
1792                                           struct lustre_cfg *cfg)
1793 {
1794         struct lu_device  *l;
1795         struct mdt_device *m;
1796
1797         OBD_ALLOC_PTR(m);
1798         if (m != NULL) {
1799                 int result;
1800
1801                 l = &m->mdt_md_dev.md_lu_dev;
1802                 result = mdt_init0(m, t, cfg);
1803                 if (result != 0) {
1804                         mdt_fini(m);
1805                         return ERR_PTR(result);
1806                 }
1807
1808         } else
1809                 l = ERR_PTR(-ENOMEM);
1810         return l;
1811 }
1812
1813 static void mdt_device_free(struct lu_device *d)
1814 {
1815         struct mdt_device *m = mdt_dev(d);
1816
1817         mdt_fini(m);
1818         OBD_FREE_PTR(m);
1819 }
1820
1821 static void *mdt_thread_init(struct lu_context *ctx)
1822 {
1823         struct mdt_thread_info *info;
1824
1825         OBD_ALLOC_PTR(info);
1826         if (info != NULL)
1827                 info->mti_ctxt = ctx;
1828         else
1829                 info = ERR_PTR(-ENOMEM);
1830         return info;
1831 }
1832
1833 static void mdt_thread_fini(struct lu_context *ctx, void *data)
1834 {
1835         struct mdt_thread_info *info = data;
1836         OBD_FREE_PTR(info);
1837 }
1838
1839 static struct lu_context_key mdt_thread_key = {
1840         .lct_init = mdt_thread_init,
1841         .lct_fini = mdt_thread_fini
1842 };
1843
1844 static int mdt_type_init(struct lu_device_type *t)
1845 {
1846         return lu_context_key_register(&mdt_thread_key);
1847 }
1848
1849 static void mdt_type_fini(struct lu_device_type *t)
1850 {
1851         lu_context_key_degister(&mdt_thread_key);
1852 }
1853
1854 static struct lu_device_type_operations mdt_device_type_ops = {
1855         .ldto_init = mdt_type_init,
1856         .ldto_fini = mdt_type_fini,
1857
1858         .ldto_device_alloc = mdt_device_alloc,
1859         .ldto_device_free  = mdt_device_free
1860 };
1861
1862 static struct lu_device_type mdt_device_type = {
1863         .ldt_tags = LU_DEVICE_MD,
1864         .ldt_name = LUSTRE_MDT0_NAME,
1865         .ldt_ops  = &mdt_device_type_ops
1866 };
1867
1868 static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
1869         { 0 }
1870 };
1871
1872 static struct lprocfs_vars lprocfs_mdt_module_vars[] = {
1873         { 0 }
1874 };
1875
1876 LPROCFS_INIT_VARS(mdt, lprocfs_mdt_module_vars, lprocfs_mdt_obd_vars);
1877
1878 static int __init mdt_mod_init(void)
1879 {
1880         struct lprocfs_static_vars lvars;
1881
1882         mdt_num_threads = MDT_NUM_THREADS;
1883         lprocfs_init_vars(mdt, &lvars);
1884         return class_register_type(&mdt_obd_device_ops, NULL,
1885                                    lvars.module_vars, LUSTRE_MDT0_NAME,
1886                                    &mdt_device_type);
1887 }
1888
1889 static void __exit mdt_mod_exit(void)
1890 {
1891         class_unregister_type(LUSTRE_MDT0_NAME);
1892 }
1893
1894
1895 #define DEF_HNDL(prefix, base, suffix, flags, opc, fn)                  \
1896 [prefix ## _ ## opc - prefix ## _ ## base] = {                          \
1897         .mh_name    = #opc,                                             \
1898         .mh_fail_id = OBD_FAIL_ ## prefix ## _  ## opc ## suffix,       \
1899         .mh_opc     = prefix ## _  ## opc,                              \
1900         .mh_flags   = flags,                                            \
1901         .mh_act     = fn                                                \
1902 }
1903
1904 #define DEF_MDT_HNDL(flags, name, fn)                   \
1905         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn)
1906
1907 static struct mdt_handler mdt_mds_ops[] = {
1908         DEF_MDT_HNDL(0,            CONNECT,        mdt_connect),
1909         DEF_MDT_HNDL(0,            DISCONNECT,     mdt_disconnect),
1910         DEF_MDT_HNDL(0,            GETSTATUS,      mdt_getstatus),
1911         DEF_MDT_HNDL(HABEO_CORPUS, GETATTR,        mdt_getattr),
1912         DEF_MDT_HNDL(HABEO_CORPUS, GETATTR_NAME,   mdt_getattr_name),
1913         DEF_MDT_HNDL(HABEO_CORPUS, SETXATTR,       mdt_setxattr),
1914         DEF_MDT_HNDL(HABEO_CORPUS, GETXATTR,       mdt_getxattr),
1915         DEF_MDT_HNDL(0,            STATFS,         mdt_statfs),
1916         DEF_MDT_HNDL(HABEO_CORPUS, READPAGE,       mdt_readpage),
1917         DEF_MDT_HNDL(0,            REINT,          mdt_reint),
1918         DEF_MDT_HNDL(HABEO_CORPUS, CLOSE,          mdt_close),
1919         DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING,   mdt_done_writing),
1920         DEF_MDT_HNDL(0,            PIN,            mdt_pin),
1921         DEF_MDT_HNDL(HABEO_CORPUS, SYNC,           mdt_sync),
1922         DEF_MDT_HNDL(0,            QUOTACHECK,     mdt_handle_quotacheck),
1923         DEF_MDT_HNDL(0,            QUOTACTL,       mdt_handle_quotactl)
1924 };
1925
1926 static struct mdt_handler mdt_obd_ops[] = {
1927 };
1928
1929 #define DEF_DLM_HNDL(flags, name, fn)                   \
1930         DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn)
1931
1932 static struct mdt_handler mdt_dlm_ops[] = {
1933         DEF_DLM_HNDL(HABEO_CLAVIS, ENQUEUE,        mdt_enqueue),
1934         DEF_DLM_HNDL(HABEO_CLAVIS, CONVERT,        mdt_convert),
1935         DEF_DLM_HNDL(0,            BL_CALLBACK,    mdt_bl_callback),
1936         DEF_DLM_HNDL(0,            CP_CALLBACK,    mdt_cp_callback)
1937 };
1938
1939 static struct mdt_handler mdt_llog_ops[] = {
1940 };
1941
1942 static struct mdt_opc_slice mdt_handlers[] = {
1943         {
1944                 .mos_opc_start = MDS_GETATTR,
1945                 .mos_opc_end   = MDS_LAST_OPC,
1946                 .mos_hs        = mdt_mds_ops
1947         },
1948         {
1949                 .mos_opc_start = OBD_PING,
1950                 .mos_opc_end   = OBD_LAST_OPC,
1951                 .mos_hs        = mdt_obd_ops
1952         },
1953         {
1954                 .mos_opc_start = LDLM_ENQUEUE,
1955                 .mos_opc_end   = LDLM_LAST_OPC,
1956                 .mos_hs        = mdt_dlm_ops
1957         },
1958         {
1959                 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
1960                 .mos_opc_end   = LLOG_LAST_OPC,
1961                 .mos_hs        = mdt_llog_ops
1962         },
1963         {
1964                 .mos_hs        = NULL
1965         }
1966 };
1967
1968 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1969 MODULE_DESCRIPTION("Lustre Meta-data Target Prototype ("LUSTRE_MDT0_NAME")");
1970 MODULE_LICENSE("GPL");
1971
1972 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
1973                 "number of mdt service threads to start");
1974
1975 cfs_module(mdt, "0.0.4", mdt_mod_init, mdt_mod_exit);