Whamcloud - gitweb
(1) use mdt_lock_hancle instead of lustre_handle.
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/handler.c
5  *  Lustre Metadata Target (mdt) request handler
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *   Author: Nikita Danilov <nikita@clusterfs.com>
13  *
14  *   This file is part of the Lustre file system, http://www.lustre.org
15  *   Lustre is a trademark of Cluster File Systems, Inc.
16  *
17  *   You may have signed or agreed to another license before downloading
18  *   this software.  If so, you are bound by the terms and conditions
19  *   of that agreement, and the following does not apply to you.  See the
20  *   LICENSE file included with this distribution for more information.
21  *
22  *   If you did not agree to a different license, then this copy of Lustre
23  *   is open source software; you can redistribute it and/or modify it
24  *   under the terms of version 2 of the GNU General Public License as
25  *   published by the Free Software Foundation.
26  *
27  *   In either case, Lustre is distributed in the hope that it will be
28  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
29  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
30  *   license text for more details.
31  */
32
33 #ifndef EXPORT_SYMTAB
34 # define EXPORT_SYMTAB
35 #endif
36 #define DEBUG_SUBSYSTEM S_MDS
37
38 #include <linux/module.h>
39
40 /* LUSTRE_VERSION_CODE */
41 #include <linux/lustre_ver.h>
42 /*
43  * struct OBD_{ALLOC,FREE}*()
44  * OBD_FAIL_CHECK
45  */
46 #include <linux/obd_support.h>
47 /* struct ptlrpc_request */
48 #include <linux/lustre_net.h>
49 /* struct obd_export */
50 #include <linux/lustre_export.h>
51 /* struct obd_device */
52 #include <linux/obd.h>
53 /* lu2dt_dev() */
54 #include <linux/dt_object.h>
55
56 /*LUSTRE_POSIX_ACL_MAX_SIZE*/
57 #include <linux/lustre_acl.h>
58
59
60 /* struct mds_client_data */
61 #include "../mds/mds_internal.h"
62 #include "mdt_internal.h"
63
64 /*
65  * Initialized in mdt_mod_init().
66  */
67 unsigned long mdt_num_threads;
68
69 static int                mdt_handle    (struct ptlrpc_request *req);
70 static struct mdt_device *mdt_dev       (struct lu_device *d);
71 static struct lu_fid     *mdt_object_fid(struct mdt_object *o);
72
73 static struct lu_context_key       mdt_thread_key;
74 static struct lu_object_operations mdt_obj_ops;
75
76 static int mdt_getstatus(struct mdt_thread_info *info,
77                          struct ptlrpc_request *req, int offset)
78 {
79         struct md_device *next  = info->mti_mdt->mdt_child;
80         int               result;
81
82         ENTRY;
83
84         info->mti_rep_buf_size[0] = sizeof (struct mdt_body);
85         result = lustre_pack_reply(req, 1, info->mti_rep_buf_size, NULL);
86         if (result)
87                 CERROR(LUSTRE_MDT0_NAME" out of memory for message: size=%d\n",
88                        sizeof (struct mdt_body));
89         else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
90                 result = -ENOMEM;
91         else {
92                 info->mti_body = lustre_msg_buf(req->rq_repmsg, 0, 
93                                                 sizeof (struct mdt_body));
94                 result = next->md_ops->mdo_root_get(info->mti_ctxt,
95                                                     next, 
96                                                     &info->mti_body->fid1);
97         }
98
99         /* the last_committed and last_xid fields are filled in for all
100          * replies already - no need to do so here also.
101          */
102         RETURN(result);
103 }
104
105 static int mdt_statfs(struct mdt_thread_info *info,
106                       struct ptlrpc_request *req, int offset)
107 {
108         struct md_device  *next  = info->mti_mdt->mdt_child;
109         struct obd_statfs *osfs;
110         struct kstatfs    *sfs;
111         int               result;
112
113         ENTRY;
114
115         info->mti_rep_buf_size[0] = sizeof(struct obd_statfs);
116         result = lustre_pack_reply(req, 1, info->mti_rep_buf_size, NULL);
117         if (result)
118                 CERROR(LUSTRE_MDT0_NAME" out of memory for statfs: size=%d\n",
119                        sizeof(struct obd_statfs));
120         else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
121                 CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
122                 result = -ENOMEM;
123         } else {
124                 osfs = lustre_msg_buf(req->rq_repmsg, 0,  
125                                       sizeof(struct obd_statfs));
126                 OBD_ALLOC_PTR(sfs);
127                 if(sfs == NULL)
128                         RETURN(-ENOMEM);
129                 /* XXX max_age optimisation is needed here. See mds_statfs */
130                 result = next->md_ops->mdo_statfs(info->mti_ctxt, next, sfs);
131                 statfs_pack(osfs, sfs);
132                 OBD_FREE_PTR(sfs);
133         }
134
135         RETURN(result);
136 }
137
138 static void mdt_pack_attr2body(struct mdt_body *b, struct lu_attr *attr)
139 {
140         b->valid |= OBD_MD_FLID | OBD_MD_FLCTIME | OBD_MD_FLUID |
141                     OBD_MD_FLGID | OBD_MD_FLFLAGS | OBD_MD_FLTYPE |
142                     OBD_MD_FLMODE | OBD_MD_FLNLINK | OBD_MD_FLGENER;
143
144         if (!S_ISREG(attr->la_mode))
145                 b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLATIME |
146                             OBD_MD_FLMTIME;
147
148         b->atime      = attr->la_atime;
149         b->mtime      = attr->la_mtime;
150         b->ctime      = attr->la_ctime;
151         b->mode       = attr->la_mode;
152         b->size       = attr->la_size;
153         b->blocks     = attr->la_blocks;
154         b->uid        = attr->la_uid;
155         b->gid        = attr->la_gid;
156         b->flags      = attr->la_flags;
157         b->nlink      = attr->la_nlink;
158 }
159
160 static int mdt_getattr(struct mdt_thread_info *info,
161                        struct ptlrpc_request *req, int offset)
162 {
163         int              result;
164
165         LASSERT(info->mti_object != NULL);
166
167         ENTRY;
168
169         info->mti_rep_buf_size[0] = sizeof(struct mdt_body);
170         result = lustre_pack_reply(req, 1, info->mti_rep_buf_size, NULL);
171         if (result)
172                 CERROR(LUSTRE_MDT0_NAME" cannot pack size=%d, rc=%d\n",
173                        sizeof(struct mdt_body), result);
174         else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
175                 CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
176                 result = -ENOMEM;
177         } else {
178                 struct md_object *next = mdt_object_child(info->mti_object);
179
180                 result = next->mo_ops->moo_attr_get(info->mti_ctxt, next,
181                                                     &info->mti_attr);
182                 if (result == 0) {
183                         info->mti_body = lustre_msg_buf(req->rq_repmsg, 0, 
184                                                        sizeof(struct mdt_body));
185                         mdt_pack_attr2body(info->mti_body, &info->mti_attr);
186                         info->mti_body->fid1 = *mdt_object_fid(info->mti_object);
187                 }
188         }
189         RETURN(result);
190 }
191
192 static struct lu_device_operations mdt_lu_ops;
193
194 static int lu_device_is_mdt(struct lu_device *d)
195 {
196         /*
197          * XXX for now. Tags in lu_device_type->ldt_something are needed.
198          */
199         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
200 }
201
202 static struct mdt_device *mdt_dev(struct lu_device *d)
203 {
204         LASSERT(lu_device_is_mdt(d));
205         return container_of0(d, struct mdt_device, mdt_md_dev.md_lu_dev);
206 }
207
208 static int mdt_connect(struct mdt_thread_info *info,
209                        struct ptlrpc_request *req, int offset)
210 {
211         int result;
212
213         result = target_handle_connect(req, mdt_handle);
214         if (result == 0) {
215                 struct obd_connect_data *data;
216
217                 LASSERT(req->rq_export != NULL);
218                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
219
220                 data = lustre_msg_buf(req->rq_repmsg, 0, sizeof *data);
221                 result = seq_mgr_alloc(info->mti_ctxt,
222                                        info->mti_mdt->mdt_seq_mgr,
223                                        &data->ocd_seq);
224         }
225         return result;
226 }
227
228 static int mdt_disconnect(struct mdt_thread_info *info,
229                           struct ptlrpc_request *req, int offset)
230 {
231         return target_handle_disconnect(req);
232 }
233
234 static int mdt_getattr_name(struct mdt_thread_info *info,
235                             struct ptlrpc_request *req, int offset)
236 {
237         return -EOPNOTSUPP;
238 }
239
240 static int mdt_setxattr(struct mdt_thread_info *info,
241                         struct ptlrpc_request *req, int offset)
242 {
243         return -EOPNOTSUPP;
244 }
245
246 static int mdt_getxattr(struct mdt_thread_info *info,
247                         struct ptlrpc_request *req, int offset)
248 {
249         return -EOPNOTSUPP;
250 }
251
252 static int mdt_readpage(struct mdt_thread_info *info,
253                         struct ptlrpc_request *req, int offset)
254 {
255         return -EOPNOTSUPP;
256 }
257
258 static int mdt_reint_internal(struct mdt_thread_info *info,
259                        struct ptlrpc_request *req, 
260                        int offset,
261                        struct mdt_lock_handle *lockh)
262 {
263         int rc;
264
265         rc = mdt_reint_unpack(info, req, offset);
266         if (rc || OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK)) {
267                 CERROR("invalid record\n");
268                 RETURN(rc = -EINVAL);
269         }
270         rc = mdt_reint_rec(info, lockh);
271         RETURN(rc);
272 }
273
274 static int mdt_reint(struct mdt_thread_info *info,
275                      struct ptlrpc_request *req, int offset)
276 {
277         __u32 *opcp = lustre_msg_buf(req->rq_reqmsg, MDS_REQ_REC_OFF,
278                                      sizeof (*opcp));
279         __u32  opc;
280         int rc;
281
282         ENTRY;
283
284         /* NB only peek inside req now; mdt_XXX_unpack() will swab it */
285         if (opcp == NULL) {
286                 CERROR ("Can't inspect opcode\n");
287                 RETURN (-EINVAL);
288         }
289         opc = *opcp;
290         if (lustre_msg_swabbed (req->rq_reqmsg))
291                 __swab32s(&opc);
292
293         DEBUG_REQ(D_INODE, req, "reint opt = %d", opc);
294
295         OBD_FAIL_RETURN(OBD_FAIL_MDS_REINT_NET, 0);
296
297         if (opc == REINT_UNLINK || opc == REINT_RENAME)
298                 info->mti_rep_buf_nr = 3;
299         else if (opc == REINT_OPEN)
300                 info->mti_rep_buf_nr = 2;
301         else
302                 info->mti_rep_buf_nr = 1;
303         info->mti_rep_buf_size[0] = sizeof(struct mdt_body);
304         info->mti_rep_buf_size[1] = sizeof(struct lov_mds_md); /*FIXME:See mds*/
305         info->mti_rep_buf_size[2] = sizeof(struct llog_cookie);/*FIXME:See mds*/
306         rc = lustre_pack_reply(req, info->mti_rep_buf_nr,
307                                info->mti_rep_buf_size, NULL);
308         if (rc)
309                 RETURN (rc);
310         rc = mdt_reint_internal(info, req, offset, NULL);
311         RETURN(rc);
312 }
313
314 static int mdt_close(struct mdt_thread_info *info,
315                      struct ptlrpc_request *req, int offset)
316 {
317         return -EOPNOTSUPP;
318 }
319
320 static int mdt_done_writing(struct mdt_thread_info *info,
321                             struct ptlrpc_request *req, int offset)
322 {
323         return -EOPNOTSUPP;
324 }
325
326 static int mdt_pin(struct mdt_thread_info *info,
327                    struct ptlrpc_request *req, int offset)
328 {
329         return -EOPNOTSUPP;
330 }
331
332 static int mdt_sync(struct mdt_thread_info *info,
333                     struct ptlrpc_request *req, int offset)
334 {
335         return -EOPNOTSUPP;
336 }
337
338 static int mdt_handle_quotacheck(struct mdt_thread_info *info,
339                                  struct ptlrpc_request *req, int offset)
340 {
341         return -EOPNOTSUPP;
342 }
343
344 static int mdt_handle_quotactl(struct mdt_thread_info *info,
345                                struct ptlrpc_request *req, int offset)
346 {
347         return -EOPNOTSUPP;
348 }
349
350 /*
351  * DLM handlers.
352  */
353
354 static struct ldlm_callback_suite cbs = {
355         .lcs_completion = ldlm_server_completion_ast,
356         .lcs_blocking   = ldlm_server_blocking_ast,
357         .lcs_glimpse    = NULL
358 };
359
360 static int mdt_enqueue(struct mdt_thread_info *info,
361                        struct ptlrpc_request *req, int offset)
362 {
363         /*
364          * info->mti_dlm_req already contains swapped and (if necessary)
365          * converted dlm request.
366          */
367         LASSERT(info->mti_dlm_req != NULL);
368
369         info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
370         return ldlm_handle_enqueue0(info->mti_mdt->mdt_namespace,
371                                     req, info->mti_dlm_req, &cbs);
372 }
373
374 static int mdt_convert(struct mdt_thread_info *info,
375                        struct ptlrpc_request *req, int offset)
376 {
377         LASSERT(info->mti_dlm_req);
378         return ldlm_handle_convert0(req, info->mti_dlm_req);
379 }
380
381 static int mdt_bl_callback(struct mdt_thread_info *info,
382                            struct ptlrpc_request *req, int offset)
383 {
384         CERROR("bl callbacks should not happen on MDS\n");
385         LBUG();
386         return -EOPNOTSUPP;
387 }
388
389 static int mdt_cp_callback(struct mdt_thread_info *info,
390                            struct ptlrpc_request *req, int offset)
391 {
392         CERROR("cp callbacks should not happen on MDS\n");
393         LBUG();
394         return -EOPNOTSUPP;
395 }
396
397 /*
398  * Build (DLM) resource name from fid.
399  */
400 struct ldlm_res_id *fid_build_res_name(const struct lu_fid *f,
401                                        struct ldlm_res_id *name)
402 {
403         memset(name, 0, sizeof *name);
404         /* we use fid_num() whoch includes also object version instread of raw
405          * fid_oid(). */
406         name->name[0] = fid_seq(f);
407         name->name[1] = fid_num(f);
408         return name;
409 }
410
411 /*
412  * Return true if resource is for object identified by fid.
413  */
414 int fid_res_name_eq(const struct lu_fid *f, const struct ldlm_res_id *name)
415 {
416         return name->name[0] == fid_seq(f) && name->name[1] == fid_num(f);
417 }
418
419 /* issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
420 int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
421              struct lustre_handle *lh, ldlm_mode_t mode,
422              ldlm_policy_data_t *policy)
423 {
424         struct ldlm_res_id res_id;
425         int flags = 0, rc;
426         ENTRY;
427
428         LASSERT(ns != NULL);
429         LASSERT(lh != NULL);
430         LASSERT(f != NULL);
431
432         /* FIXME: is that correct to have @flags=0 here? */
433         rc = ldlm_cli_enqueue(NULL, NULL, ns, *fid_build_res_name(f, &res_id),
434                               LDLM_IBITS, policy, mode, &flags,
435                               ldlm_blocking_ast, ldlm_completion_ast, NULL,
436                               NULL, NULL, 0, NULL, lh);
437         RETURN (rc == ELDLM_OK ? 0 : -EIO);
438 }
439
440 void fid_unlock(struct ldlm_namespace *ns, const struct lu_fid *f,
441                 struct lustre_handle *lh, ldlm_mode_t mode)
442 {
443         struct ldlm_lock *lock;
444         ENTRY;
445
446         /* FIXME: this is debug stuff, remove it later. */
447         lock = ldlm_handle2lock(lh);
448         if (!lock) {
449                 CERROR("invalid lock handle "LPX64, lh->cookie);
450                 LBUG();
451         }
452
453         LASSERT(fid_res_name_eq(f, &lock->l_resource->lr_name));
454
455         ldlm_lock_decref(lh, mode);
456         EXIT;
457 }
458
459 static struct mdt_object *mdt_obj(struct lu_object *o)
460 {
461         LASSERT(lu_device_is_mdt(o->lo_dev));
462         return container_of0(o, struct mdt_object, mot_obj.mo_lu);
463 }
464
465 struct mdt_object *mdt_object_find(struct lu_context *ctxt,
466                                    struct mdt_device *d,
467                                    struct lu_fid *f)
468 {
469         struct lu_object *o;
470
471         o = lu_object_find(ctxt, d->mdt_md_dev.md_lu_dev.ld_site, f);
472         if (IS_ERR(o))
473                 return (struct mdt_object *)o;
474         else
475                 return mdt_obj(o);
476 }
477
478 void mdt_object_put(struct lu_context *ctxt, struct mdt_object *o)
479 {
480         lu_object_put(ctxt, &o->mot_obj.mo_lu);
481 }
482
483 static struct lu_fid *mdt_object_fid(struct mdt_object *o)
484 {
485         return lu_object_fid(&o->mot_obj.mo_lu);
486 }
487
488 int mdt_object_lock(struct ldlm_namespace *ns, struct mdt_object *o,
489                     struct mdt_lock_handle *lh, __u64 ibits)
490 {
491         ldlm_policy_data_t p = {
492                 .l_inodebits = {
493                         .bits = ibits
494                 }
495         };
496         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
497         LASSERT(lh->mlh_mode != LCK_MINMODE);
498
499         return fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode, &p);
500 }
501
502 void mdt_object_unlock(struct ldlm_namespace *ns, struct mdt_object *o,
503                               struct mdt_lock_handle *lh)
504 {
505         if (lustre_handle_is_used(&lh->mlh_lh)) {
506                 fid_unlock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode);
507                 lh->mlh_lh.cookie = 0;
508         }
509 }
510
511 struct mdt_object *mdt_object_find_lock(struct lu_context *ctxt,
512                                         struct mdt_device *d,
513                                         struct lu_fid *f,
514                                         struct mdt_lock_handle *lh,
515                                         __u64 ibits)
516 {
517         struct mdt_object *o;
518
519         o = mdt_object_find(ctxt, d, f);
520         if (!IS_ERR(o)) {
521                 int result;
522
523                 result = mdt_object_lock(d->mdt_namespace, o, lh, ibits);
524                 if (result != 0) {
525                         mdt_object_put(ctxt, o);
526                         o = ERR_PTR(result);
527                 }
528         }
529         return o;
530 }
531
532 struct mdt_handler {
533         const char *mh_name;
534         int         mh_fail_id;
535         __u32       mh_opc;
536         __u32       mh_flags;
537         int (*mh_act)(struct mdt_thread_info *info,
538                       struct ptlrpc_request *req, int offset);
539 };
540
541 enum mdt_handler_flags {
542         /*
543          * struct mdt_body is passed in the 0-th incoming buffer.
544          */
545         HABEO_CORPUS = (1 << 0),
546         /*
547          * struct ldlm_request is passed in MDS_REQ_INTENT_LOCKREQ_OFF-th
548          * incoming buffer.
549          */
550         HABEO_CLAVIS = (1 << 1)
551 };
552
553 struct mdt_opc_slice {
554         __u32               mos_opc_start;
555         int                 mos_opc_end;
556         struct mdt_handler *mos_hs;
557 };
558
559 static struct mdt_opc_slice mdt_handlers[];
560
561 static struct mdt_handler *mdt_handler_find(__u32 opc)
562 {
563         struct mdt_opc_slice *s;
564         struct mdt_handler   *h;
565
566         h = NULL;
567         for (s = mdt_handlers; s->mos_hs != NULL; s++) {
568                 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
569                         h = s->mos_hs + (opc - s->mos_opc_start);
570                         if (h->mh_opc != 0)
571                                 LASSERT(h->mh_opc == opc);
572                         else
573                                 h = NULL; /* unsupported opc */
574                         break;
575                 }
576         }
577         return h;
578 }
579
580 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
581 {
582         return req->rq_export->exp_mds_data.med_mcd->mcd_last_xid;
583 }
584
585 static int mdt_lock_resname_compat(struct mdt_device *m,
586                                    struct ldlm_request *req)
587 {
588         /* XXX something... later. */
589         return 0;
590 }
591
592 static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
593 {
594         /* XXX something... later. */
595         return 0;
596 }
597
598 /*
599  * Invoke handler for this request opc. Also do necessary preprocessing
600  * (according to handler ->mh_flags), and post-processing (setting of
601  * ->last_{xid,committed}).
602  */
603 static int mdt_req_handle(struct mdt_thread_info *info,
604                           struct mdt_handler *h, struct ptlrpc_request *req,
605                           int shift)
606 {
607         int result;
608         int off;
609
610         ENTRY;
611
612         LASSERT(h->mh_act != NULL);
613         LASSERT(h->mh_opc == req->rq_reqmsg->opc);
614         LASSERT(current->journal_info == NULL);
615
616         DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
617
618         if (h->mh_fail_id != 0)
619                 OBD_FAIL_RETURN(h->mh_fail_id, 0);
620
621         off = MDS_REQ_REC_OFF + shift;
622
623         result = 0;
624         if (h->mh_flags & HABEO_CORPUS) {
625                 struct mdt_body *body;
626
627                 body = info->mti_body =
628                         lustre_swab_reqbuf(req, off, sizeof *info->mti_body,
629                                            lustre_swab_mdt_body);
630                 if (body != NULL) {
631                         info->mti_object = mdt_object_find(info->mti_ctxt,
632                                                            info->mti_mdt,
633                                                            &body->fid1);
634                         if (IS_ERR(info->mti_object)) {
635                                 result = PTR_ERR(info->mti_object);
636                                 info->mti_object = NULL;
637                         }
638                 } else {
639                         CERROR("Can't unpack body\n");
640                         result = -EFAULT;
641                 }
642         } else if (h->mh_flags & HABEO_CLAVIS) {
643                 struct ldlm_request *dlm;
644
645                 LASSERT(shift == 0);
646                 dlm = info->mti_dlm_req =
647                         lustre_swab_reqbuf(req, MDS_REQ_INTENT_LOCKREQ_OFF,
648                                            sizeof *dlm,
649                                            lustre_swab_ldlm_request);
650                 if (dlm != NULL) {
651                         if (info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME)
652                                 result = mdt_lock_resname_compat(info->mti_mdt,
653                                                                  dlm);
654                 } else {
655                         CERROR("Can't unpack dlm request\n");
656                         result = -EFAULT;
657                 }
658         }
659         if (result == 0)
660                 /*
661                  * Process request.
662                  */
663                 result = h->mh_act(info, req, off);
664         /*
665          * XXX result value is unconditionally shoved into ->rq_status
666          * (original code sometimes placed error code into ->rq_status, and
667          * sometimes returned it to the
668          * caller). ptlrpc_server_handle_request() doesn't check return value
669          * anyway.
670          */
671         req->rq_status = result;
672
673         LASSERT(current->journal_info == NULL);
674
675         if (h->mh_flags & HABEO_CLAVIS &&
676             info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME) {
677                 struct ldlm_reply *rep;
678
679                 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof *rep);
680                 if (rep != NULL)
681                         result = mdt_lock_reply_compat(info->mti_mdt, rep);
682         }
683
684         /* If we're DISCONNECTing, the mds_export_data is already freed */
685         if (result == 0 && h->mh_opc != MDS_DISCONNECT) {
686                 req->rq_reqmsg->last_xid = le64_to_cpu(req_exp_last_xid(req));
687                 target_committed_to_req(req);
688         }
689         RETURN(result);
690 }
691
692 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
693 {
694         lh->mlh_lh.cookie = 0ull;
695         lh->mlh_mode = LCK_MINMODE;
696 }
697
698 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
699 {
700         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
701 }
702
703 static void mdt_thread_info_init(struct mdt_thread_info *info)
704 {
705         int i;
706
707         info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
708         /*
709          * Poison size array.
710          */
711         for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
712                 info->mti_rep_buf_size[i] = ~0;
713         info->mti_rep_buf_nr = i;
714         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
715                 mdt_lock_handle_init(&info->mti_lh[i]);
716         lu_context_enter(info->mti_ctxt);
717 }
718
719 static void mdt_thread_info_fini(struct mdt_thread_info *info)
720 {
721         int i;
722
723         lu_context_exit(info->mti_ctxt);
724         if (info->mti_object != NULL) {
725                 mdt_object_put(info->mti_ctxt, info->mti_object);
726                 info->mti_object = NULL;
727         }
728         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
729                 mdt_lock_handle_fini(&info->mti_lh[i]);
730 }
731
732 static int mds_msg_check_version(struct lustre_msg *msg)
733 {
734         int rc;
735
736         /* TODO: enable the below check while really introducing msg version.
737          * it's disabled because it will break compatibility with b1_4.
738          */
739         return (0);
740
741         switch (msg->opc) {
742         case MDS_CONNECT:
743         case MDS_DISCONNECT:
744         case OBD_PING:
745                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
746                 if (rc)
747                         CERROR("bad opc %u version %08x, expecting %08x\n",
748                                msg->opc, msg->version, LUSTRE_OBD_VERSION);
749                 break;
750         case MDS_GETSTATUS:
751         case MDS_GETATTR:
752         case MDS_GETATTR_NAME:
753         case MDS_STATFS:
754         case MDS_READPAGE:
755         case MDS_REINT:
756         case MDS_CLOSE:
757         case MDS_DONE_WRITING:
758         case MDS_PIN:
759         case MDS_SYNC:
760         case MDS_GETXATTR:
761         case MDS_SETXATTR:
762         case MDS_SET_INFO:
763         case MDS_QUOTACHECK:
764         case MDS_QUOTACTL:
765         case QUOTA_DQACQ:
766         case QUOTA_DQREL:
767                 rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
768                 if (rc)
769                         CERROR("bad opc %u version %08x, expecting %08x\n",
770                                msg->opc, msg->version, LUSTRE_MDS_VERSION);
771                 break;
772         case LDLM_ENQUEUE:
773         case LDLM_CONVERT:
774         case LDLM_BL_CALLBACK:
775         case LDLM_CP_CALLBACK:
776                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
777                 if (rc)
778                         CERROR("bad opc %u version %08x, expecting %08x\n",
779                                msg->opc, msg->version, LUSTRE_DLM_VERSION);
780                 break;
781         case OBD_LOG_CANCEL:
782         case LLOG_ORIGIN_HANDLE_CREATE:
783         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
784         case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
785         case LLOG_ORIGIN_HANDLE_READ_HEADER:
786         case LLOG_ORIGIN_HANDLE_CLOSE:
787         case LLOG_CATINFO:
788                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
789                 if (rc)
790                         CERROR("bad opc %u version %08x, expecting %08x\n",
791                                msg->opc, msg->version, LUSTRE_LOG_VERSION);
792                 break;
793         default:
794                 CERROR("MDS unknown opcode %d\n", msg->opc);
795                 rc = -ENOTSUPP;
796         }
797         return rc;
798 }
799
800 static int mdt_filter_recovery_request(struct ptlrpc_request *req,
801                                        struct obd_device *obd, int *process)
802 {
803         switch (req->rq_reqmsg->opc) {
804         case MDS_CONNECT: /* This will never get here, but for completeness. */
805         case OST_CONNECT: /* This will never get here, but for completeness. */
806         case MDS_DISCONNECT:
807         case OST_DISCONNECT:
808                *process = 1;
809                RETURN(0);
810
811         case MDS_CLOSE:
812         case MDS_SYNC: /* used in unmounting */
813         case OBD_PING:
814         case MDS_REINT:
815         case LDLM_ENQUEUE:
816                 *process = target_queue_recovery_request(req, obd);
817                 RETURN(0);
818
819         default:
820                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
821                 *process = 0;
822                 /* XXX what should we set rq_status to here? */
823                 req->rq_status = -EAGAIN;
824                 RETURN(ptlrpc_error(req));
825         }
826 }
827
828 /*
829  * Handle recovery. Return:
830  *        +1: continue request processing;
831  *       -ve: abort immediately with the given error code;
832  *         0: send reply with error code in req->rq_status;
833  */
834 static int mdt_recovery(struct ptlrpc_request *req)
835 {
836         int recovering;
837         int abort_recovery;
838         struct obd_device *obd;
839
840         ENTRY;
841
842         if (req->rq_reqmsg->opc == MDS_CONNECT)
843                 RETURN(+1);
844
845         if (req->rq_export == NULL) {
846                 CERROR("operation %d on unconnected MDS from %s\n",
847                        req->rq_reqmsg->opc,
848                        libcfs_id2str(req->rq_peer));
849                 req->rq_status = -ENOTCONN;
850                 RETURN(0);
851         }
852
853         /* sanity check: if the xid matches, the request must be marked as a
854          * resent or replayed */
855         LASSERTF(ergo(req->rq_xid == req_exp_last_xid(req),
856                       lustre_msg_get_flags(req->rq_reqmsg) &
857                       (MSG_RESENT | MSG_REPLAY)),
858                  "rq_xid "LPU64" matches last_xid, "
859                  "expected RESENT flag\n", req->rq_xid);
860
861         /* else: note the opposite is not always true; a RESENT req after a
862          * failover will usually not match the last_xid, since it was likely
863          * never committed. A REPLAYed request will almost never match the
864          * last xid, however it could for a committed, but still retained,
865          * open. */
866
867         obd = req->rq_export->exp_obd;
868
869         /* Check for aborted recovery... */
870         spin_lock_bh(&obd->obd_processing_task_lock);
871         abort_recovery = obd->obd_abort_recovery;
872         recovering = obd->obd_recovering;
873         spin_unlock_bh(&obd->obd_processing_task_lock);
874         if (abort_recovery) {
875                 target_abort_recovery(obd);
876         } else if (recovering) {
877                 int rc;
878                 int should_process;
879
880                 rc = mdt_filter_recovery_request(req, obd, &should_process);
881                 if (rc != 0 || !should_process) {
882                         LASSERT(rc < 0);
883                         RETURN(rc);
884                 }
885         }
886         RETURN(+1);
887 }
888
889 static int mdt_reply(struct ptlrpc_request *req, struct mdt_thread_info *info)
890 {
891         struct obd_device *obd;
892
893         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
894                 if (req->rq_reqmsg->opc != OBD_PING)
895                         DEBUG_REQ(D_ERROR, req, "Unexpected MSG_LAST_REPLAY");
896
897                 obd = req->rq_export != NULL ? req->rq_export->exp_obd : NULL;
898                 if (obd && obd->obd_recovering) {
899                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
900                         RETURN(target_queue_final_reply(req, req->rq_status));
901                 } else {
902                         /* Lost a race with recovery; let the error path
903                          * DTRT. */
904                         req->rq_status = -ENOTCONN;
905                 }
906         }
907         target_send_reply(req, req->rq_status, info->mti_fail_id);
908         RETURN(req->rq_status);
909 }
910
911 static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info)
912 {
913         struct mdt_handler *h;
914         struct lustre_msg  *msg;
915         int                 result;
916
917         ENTRY;
918
919         OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
920
921         LASSERT(current->journal_info == NULL);
922
923         msg = req->rq_reqmsg;
924         result = mds_msg_check_version(msg);
925         if (result == 0) {
926                 result = mdt_recovery(req);
927                 switch (result) {
928                 case +1:
929                         h = mdt_handler_find(msg->opc);
930                         if (h != NULL)
931                                 result = mdt_req_handle(info, h, req, 0);
932                         else {
933                                 req->rq_status = -ENOTSUPP;
934                                 result = ptlrpc_error(req);
935                                 break;
936                         }
937                         /* fall through */
938                 case 0:
939                         result = mdt_reply(req, info);
940                 }
941         } else
942                 CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
943         RETURN(result);
944 }
945
946 /*
947  * MDT handler function called by ptlrpc service thread when request comes.
948  *
949  * XXX common "target" functionality should be factored into separate module
950  * shared by mdt, ost and stand-alone services like fld.
951  */
952 static int mdt_handle(struct ptlrpc_request *req)
953 {
954         int result;
955         struct lu_context      *ctx;
956         struct mdt_thread_info *info;
957         ENTRY;
958
959         ctx = req->rq_svc_thread->t_ctx;
960         LASSERT(ctx != NULL);
961         LASSERT(ctx->lc_thread == req->rq_svc_thread);
962
963         info = lu_context_key_get(ctx, &mdt_thread_key);
964         LASSERT(info != NULL);
965
966         mdt_thread_info_init(info);
967         /* it can be NULL while CONNECT */
968         if (req->rq_export)
969                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
970
971         result = mdt_handle0(req, info);
972         mdt_thread_info_fini(info);
973         RETURN(result);
974 }
975
976 /*Please move these function from mds to mdt*/
977 int intent_disposition(struct ldlm_reply *rep, int flag)
978 {
979         if (!rep)
980                 return 0;
981         return (rep->lock_policy_res1 & flag);
982 }
983
984 void intent_set_disposition(struct ldlm_reply *rep, int flag)
985 {
986         if (!rep)
987                 return;
988         rep->lock_policy_res1 |= flag;
989 }
990
991 static void fixup_handle_for_resent_req(struct mdt_thread_info *info,
992                                         struct ptlrpc_request *req, 
993                                         int offset,
994                                         struct ldlm_lock *new_lock,
995                                         struct ldlm_lock **old_lock,
996                                         struct mdt_lock_handle *lockh)
997 {
998         struct obd_export *exp = req->rq_export;
999         struct mdt_device * mdt = info->mti_mdt;
1000         struct ldlm_request *dlmreq =
1001                 lustre_msg_buf(req->rq_reqmsg, offset, sizeof (*dlmreq));
1002         struct lustre_handle remote_hdl = dlmreq->lock_handle1;
1003         struct list_head *iter;
1004
1005         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
1006                 return;
1007
1008         l_lock(&mdt->mdt_namespace->ns_lock);
1009         list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
1010                 struct ldlm_lock *lock;
1011                 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
1012                 if (lock == new_lock)
1013                         continue;
1014                 if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
1015                         lockh->mlh_lh.cookie = lock->l_handle.h_cookie;
1016                         LDLM_DEBUG(lock, "restoring lock cookie");
1017                         DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
1018                                   lockh->mlh_lh.cookie);
1019                         if (old_lock)
1020                                 *old_lock = LDLM_LOCK_GET(lock);
1021                         l_unlock(&mdt->mdt_namespace->ns_lock);
1022                         return;
1023                 }
1024         }
1025         l_unlock(&mdt->mdt_namespace->ns_lock);
1026
1027         /* If the xid matches, then we know this is a resent request,
1028          * and allow it. (It's probably an OPEN, for which we don't
1029          * send a lock */
1030         if (req->rq_xid ==
1031             le64_to_cpu(exp->exp_mds_data.med_mcd->mcd_last_xid))
1032                 return;
1033
1034         /* This remote handle isn't enqueued, so we never received or
1035          * processed this request.  Clear MSG_RESENT, because it can
1036          * be handled like any normal request now. */
1037
1038         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
1039
1040         DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
1041                   remote_hdl.cookie);
1042 }
1043
1044 static int mdt_intent_policy(struct ldlm_namespace *ns,
1045                              struct ldlm_lock **lockp, void *req_cookie,
1046                              ldlm_mode_t mode, int flags, void *data)
1047 {
1048         struct ptlrpc_request *req = req_cookie;
1049         struct ldlm_lock *lock = *lockp;
1050         struct ldlm_intent *it;
1051         struct ldlm_reply *rep;
1052         struct mdt_lock_handle lockh = { {0} };
1053         struct ldlm_lock *new_lock = NULL;
1054         int getattr_part = MDS_INODELOCK_UPDATE;
1055         int offset = MDS_REQ_INTENT_REC_OFF;
1056         int rc;
1057         struct mdt_thread_info *info;
1058
1059         ENTRY;
1060
1061         LASSERT(req != NULL);
1062
1063         /* We already got it in mdt_handle. But we have to do it again*/
1064         info = lu_context_key_get(req->rq_svc_thread->t_ctx, &mdt_thread_key);
1065         LASSERT(info != NULL);
1066         mdt_thread_info_init(info);
1067
1068
1069         if (req->rq_reqmsg->bufcount <= MDS_REQ_INTENT_IT_OFF) {
1070                 /* No intent was provided */
1071                 info->mti_rep_buf_size[0] =  sizeof(struct ldlm_reply);
1072                 rc = lustre_pack_reply(req, 1, info->mti_rep_buf_size, NULL);
1073                 LASSERT(rc == 0);
1074                 mdt_thread_info_fini(info);
1075                 RETURN(0);
1076         }
1077
1078         it = lustre_swab_reqbuf(req, MDS_REQ_INTENT_IT_OFF, sizeof(*it),
1079                                 lustre_swab_ldlm_intent);
1080         if (it == NULL) {
1081                 CERROR("Intent missing\n");
1082                 mdt_thread_info_fini(info);
1083                 RETURN(req->rq_status = -EFAULT);
1084         }
1085
1086         LDLM_DEBUG(lock, "intent policy, opc: %s", ldlm_it2str(it->opc));
1087         info->mti_rep_buf_nr = 3;
1088         info->mti_rep_buf_size[0] = sizeof(*rep);
1089         info->mti_rep_buf_size[1] = sizeof(struct mdt_body);
1090         info->mti_rep_buf_size[2] = sizeof(struct lov_mds_md);/*FIXME:See mds*/
1091
1092         if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
1093             (it->opc & (IT_OPEN | IT_GETATTR | IT_LOOKUP))){
1094                 /* we should never allow OBD_CONNECT_ACL if not configured */
1095                 info->mti_rep_buf_size[info->mti_rep_buf_nr++] = 
1096                                         LUSTRE_POSIX_ACL_MAX_SIZE;
1097         }
1098         else if (it->opc & IT_UNLINK){
1099                 info->mti_rep_buf_size[info->mti_rep_buf_nr++] = 
1100                                         sizeof(struct llog_cookie); 
1101                                         /*FIXME:See mds*/
1102         }
1103
1104         rc = lustre_pack_reply(req, info->mti_rep_buf_nr, 
1105                                info->mti_rep_buf_size, NULL);
1106         if (rc){
1107                 mdt_thread_info_fini(info);
1108                 RETURN(req->rq_status = rc);
1109         }
1110
1111         rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof (*rep));
1112         intent_set_disposition(rep, DISP_IT_EXECD);
1113
1114
1115         /* execute policy */
1116         switch ((long)it->opc) {
1117         case IT_OPEN:
1118         case IT_CREAT|IT_OPEN:
1119                 fixup_handle_for_resent_req(info, req, 
1120                                             MDS_REQ_INTENT_LOCKREQ_OFF,
1121                                             lock, NULL, &lockh);
1122                 /* XXX swab here to assert that an mds_open reint
1123                  * packet is following */
1124                 rep->lock_policy_res2 = mdt_reint_internal(info, req,
1125                                                 offset, &lockh);
1126 #if 0
1127                 /* We abort the lock if the lookup was negative and
1128                  * we did not make it to the OPEN portion */
1129                 if (!intent_disposition(rep, DISP_LOOKUP_EXECD))
1130                         RETURN(ELDLM_LOCK_ABORTED);
1131                 if (intent_disposition(rep, DISP_LOOKUP_NEG) &&
1132                     !intent_disposition(rep, DISP_OPEN_OPEN))
1133                         RETURN(ELDLM_LOCK_ABORTED);
1134 #endif
1135                 break;
1136         case IT_LOOKUP:
1137                         getattr_part = MDS_INODELOCK_LOOKUP;
1138         case IT_GETATTR:
1139                         getattr_part |= MDS_INODELOCK_LOOKUP;
1140         case IT_READDIR:
1141 #if 0
1142                 fixup_handle_for_resent_req(info, req, 
1143                                             MDS_REQ_INTENT_LOCKREQ_OFF,
1144                                             lock, &new_lock, &lockh);
1145
1146                 /* INODEBITS_INTEROP: if this lock was converted from a
1147                  * plain lock (client does not support inodebits), then
1148                  * child lock must be taken with both lookup and update
1149                  * bits set for all operations.
1150                  */
1151                 if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_IBITS))
1152                         getattr_part = MDS_INODELOCK_LOOKUP |
1153                                        MDS_INODELOCK_UPDATE;
1154
1155                 rep->lock_policy_res2 = mds_getattr_name(offset, req,
1156                                                          getattr_part, &lockh);
1157                 /* FIXME: LDLM can set req->rq_status. MDS sets
1158                    policy_res{1,2} with disposition and status.
1159                    - replay: returns 0 & req->status is old status
1160                    - otherwise: returns req->status */
1161                 if (intent_disposition(rep, DISP_LOOKUP_NEG))
1162                         rep->lock_policy_res2 = 0;
1163                 if (!intent_disposition(rep, DISP_LOOKUP_POS) ||
1164                     rep->lock_policy_res2)
1165                         RETURN(ELDLM_LOCK_ABORTED);
1166                 if (req->rq_status != 0) {
1167                         LBUG();
1168                         rep->lock_policy_res2 = req->rq_status;
1169                         RETURN(ELDLM_LOCK_ABORTED);
1170                 }
1171 #endif
1172                 RETURN(ELDLM_LOCK_ABORTED);
1173                 break;
1174         default:
1175                 CERROR("Unhandled intent "LPD64"\n", it->opc);
1176                 LBUG();
1177         }
1178
1179         /* By this point, whatever function we called above must have either
1180          * filled in 'lockh', been an intent replay, or returned an error.  We
1181          * want to allow replayed RPCs to not get a lock, since we would just
1182          * drop it below anyways because lock replay is done separately by the
1183          * client afterwards.  For regular RPCs we want to give the new lock to
1184          * the client instead of whatever lock it was about to get. */
1185         if (new_lock == NULL)
1186                 new_lock = ldlm_handle2lock(&lockh.mlh_lh);
1187         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY))
1188                 RETURN(0);
1189
1190         LASSERTF(new_lock != NULL, "op "LPX64" lockh "LPX64"\n",
1191                  it->opc, lockh.mlh_lh.cookie);
1192
1193         /* If we've already given this lock to a client once, then we should
1194          * have no readers or writers.  Otherwise, we should have one reader
1195          * _or_ writer ref (which will be zeroed below) before returning the
1196          * lock to a client. */
1197         if (new_lock->l_export == req->rq_export) {
1198                 LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
1199         } else {
1200                 LASSERT(new_lock->l_export == NULL);
1201                 LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
1202         }
1203
1204         *lockp = new_lock;
1205
1206         if (new_lock->l_export == req->rq_export) {
1207                 /* Already gave this to the client, which means that we
1208                  * reconstructed a reply. */
1209                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
1210                         MSG_RESENT);
1211                 RETURN(ELDLM_LOCK_REPLACED);
1212         }
1213
1214         /* Fixup the lock to be given to the client */
1215         l_lock(&new_lock->l_resource->lr_namespace->ns_lock);
1216         new_lock->l_readers = 0;
1217         new_lock->l_writers = 0;
1218
1219         new_lock->l_export = class_export_get(req->rq_export);
1220         list_add(&new_lock->l_export_chain,
1221                  &new_lock->l_export->exp_ldlm_data.led_held_locks);
1222
1223         new_lock->l_blocking_ast = lock->l_blocking_ast;
1224         new_lock->l_completion_ast = lock->l_completion_ast;
1225
1226         memcpy(&new_lock->l_remote_handle, &lock->l_remote_handle,
1227                sizeof(lock->l_remote_handle));
1228
1229         new_lock->l_flags &= ~LDLM_FL_LOCAL;
1230
1231         LDLM_LOCK_PUT(new_lock);
1232         l_unlock(&new_lock->l_resource->lr_namespace->ns_lock);
1233
1234         RETURN(ELDLM_LOCK_REPLACED);
1235 }
1236
1237 static int mdt_config(struct lu_context *ctx, struct mdt_device *m,
1238                       const char *name, void *buf, int size, int mode)
1239 {
1240         struct md_device *child = m->mdt_child;
1241         ENTRY;
1242         RETURN(child->md_ops->mdo_config(ctx, child, name, buf, size, mode));
1243 }
1244
1245 static int mdt_seq_mgr_hpr(struct lu_context *ctx, void *opaque, __u64 *seq,
1246                            int mode)
1247 {
1248         struct mdt_device *m = opaque;
1249         int rc;
1250         ENTRY;
1251
1252         rc = mdt_config(ctx, m, LUSTRE_CONFIG_METASEQ,
1253                         seq, sizeof(*seq), mode);
1254         RETURN(rc);
1255 }
1256
1257 static int mdt_seq_mgr_read(struct lu_context *ctx, void *opaque, __u64 *seq)
1258 {
1259         ENTRY;
1260         RETURN(mdt_seq_mgr_hpr(ctx, opaque, seq, LUSTRE_CONFIG_GET));
1261 }
1262
1263 static int mdt_seq_mgr_write(struct lu_context *ctx, void *opaque, __u64 *seq)
1264 {
1265         ENTRY;
1266         RETURN(mdt_seq_mgr_hpr(ctx, opaque, seq, LUSTRE_CONFIG_SET));
1267 }
1268
1269 struct lu_seq_mgr_ops seq_mgr_ops = {
1270         .smo_read  = mdt_seq_mgr_read,
1271         .smo_write = mdt_seq_mgr_write
1272 };
1273
1274 /*
1275  * FLD wrappers
1276  */
1277
1278 static int mdt_fld_init(struct mdt_device *m)
1279 {
1280         struct lu_site *ls;
1281         int rc;
1282         ENTRY;
1283
1284         ls = m->mdt_md_dev.md_lu_dev.ld_site;
1285
1286         OBD_ALLOC_PTR(ls->ls_fld);
1287
1288         if (ls->ls_fld != NULL)
1289                 rc = fld_server_init(ls->ls_fld, m->mdt_bottom);
1290         else
1291                 rc = -ENOMEM;
1292
1293         RETURN(rc);
1294 }
1295
1296 static int mdt_fld_fini(struct mdt_device *m)
1297 {
1298         struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
1299         int rc = 0;
1300
1301         if (ls && ls->ls_fld) {
1302                 fld_server_fini(ls->ls_fld);
1303                 OBD_FREE_PTR(ls->ls_fld);
1304         }
1305         RETURN(rc);
1306 }
1307
1308 /* device init/fini methods */
1309
1310 static void mdt_stop_ptlrpc_service(struct mdt_device *m)
1311 {
1312         if (m->mdt_service != NULL) {
1313                 ptlrpc_unregister_service(m->mdt_service);
1314                 m->mdt_service = NULL;
1315         }
1316 }
1317
1318 static int mdt_start_ptlrpc_service(struct mdt_device *m)
1319 {
1320         int rc;
1321         struct ptlrpc_service_conf conf = {
1322                 .psc_nbufs            = MDS_NBUFS,
1323                 .psc_bufsize          = MDS_BUFSIZE,
1324                 .psc_max_req_size     = MDS_MAXREQSIZE,
1325                 .psc_max_reply_size   = MDS_MAXREPSIZE,
1326                 .psc_req_portal       = MDS_REQUEST_PORTAL,
1327                 .psc_rep_portal       = MDC_REPLY_PORTAL,
1328                 .psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT,
1329                 /*
1330                  * We'd like to have a mechanism to set this on a per-device
1331                  * basis, but alas...
1332                  */
1333                 .psc_num_threads = min(max(mdt_num_threads, MDT_MIN_THREADS),
1334                                        MDT_MAX_THREADS)
1335         };
1336
1337         ENTRY;
1338
1339
1340         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1341                            "mdt_ldlm_client", &m->mdt_ldlm_client);
1342
1343         m->mdt_service =
1344                 ptlrpc_init_svc_conf(&conf, mdt_handle, LUSTRE_MDT0_NAME,
1345                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
1346                                      NULL);
1347         if (m->mdt_service == NULL)
1348                 RETURN(-ENOMEM);
1349
1350         rc = ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
1351         if (rc)
1352                 GOTO(err_mdt_svc, rc);
1353
1354         RETURN(rc);
1355 err_mdt_svc:
1356         ptlrpc_unregister_service(m->mdt_service);
1357         m->mdt_service = NULL;
1358
1359         RETURN(rc);
1360 }
1361
1362 static void mdt_stack_fini(struct mdt_device *m, struct lu_device *d)
1363 {
1364         /* goes through all stack */
1365         while (d != NULL) {
1366                 struct lu_device *n;
1367                 struct obd_type *type;
1368                 struct lu_device_type *ldt = d->ld_type;
1369
1370                 lu_device_put(d);
1371
1372                 /* each fini() returns next device in stack of layers
1373                  * * so we can avoid the recursion */
1374                 n = ldt->ldt_ops->ldto_device_fini(d);
1375                 ldt->ldt_ops->ldto_device_free(d);
1376
1377                 type = ldt->ldt_obd_type;
1378                 type->typ_refcnt--;
1379                 class_put_type(type);
1380                 /* switch to the next device in the layer */
1381                 d = n;
1382         }
1383         m->mdt_child = NULL;
1384 }
1385
1386 static struct lu_device *mdt_layer_setup(const char *typename,
1387                                          struct lu_device *child,
1388                                          struct lustre_cfg *cfg)
1389 {
1390         struct obd_type       *type;
1391         struct lu_device_type *ldt;
1392         struct lu_device      *d;
1393         int rc;
1394
1395         /* find the type */
1396         type = class_get_type(typename);
1397         if (!type) {
1398                 CERROR("Unknown type: '%s'\n", typename);
1399                 GOTO(out, rc = -ENODEV);
1400         }
1401
1402         ldt = type->typ_lu;
1403         ldt->ldt_obd_type = type;
1404         if (ldt == NULL) {
1405                 CERROR("type: '%s'\n", typename);
1406                 GOTO(out_type, rc = -EINVAL);
1407         }
1408
1409         d = ldt->ldt_ops->ldto_device_alloc(ldt, cfg);
1410         if (IS_ERR(d)) {
1411                 CERROR("Cannot allocate device: '%s'\n", typename);
1412                 GOTO(out_type, rc = -ENODEV);
1413         }
1414
1415         LASSERT(child->ld_site);
1416         d->ld_site = child->ld_site;
1417
1418         type->typ_refcnt++;
1419         rc = ldt->ldt_ops->ldto_device_init(d, child);
1420         if (rc) {
1421                 CERROR("can't init device '%s', rc %d\n", typename, rc);
1422                 GOTO(out_alloc, rc);
1423         }
1424         lu_device_get(d);
1425
1426         RETURN(d);
1427 out_alloc:
1428         ldt->ldt_ops->ldto_device_free(d);
1429         type->typ_refcnt--;
1430 out_type:
1431         class_put_type(type);
1432 out:
1433         RETURN(ERR_PTR(rc));
1434 }
1435
1436 static int mdt_stack_init(struct mdt_device *m, struct lustre_cfg *cfg)
1437 {
1438         struct lu_device  *d = &m->mdt_md_dev.md_lu_dev;
1439         struct lu_device  *tmp;
1440         int rc;
1441
1442         /* init the stack */
1443         tmp = mdt_layer_setup(LUSTRE_OSD0_NAME, d, cfg);
1444         if (IS_ERR(tmp)) {
1445                 RETURN (PTR_ERR(tmp));
1446         }
1447         m->mdt_bottom = lu2dt_dev(tmp);
1448         d = tmp;
1449         tmp = mdt_layer_setup(LUSTRE_MDD0_NAME, d, cfg);
1450         if (IS_ERR(tmp)) {
1451                 GOTO(out, rc = PTR_ERR(tmp));
1452         }
1453         d = tmp;
1454         tmp = mdt_layer_setup(LUSTRE_CMM0_NAME, d, cfg);
1455         if (IS_ERR(tmp)) {
1456                 GOTO(out, rc = PTR_ERR(tmp));
1457         }
1458         d = tmp;
1459         m->mdt_child = lu2md_dev(d);
1460
1461         /* process setup config */
1462         tmp = &m->mdt_md_dev.md_lu_dev;
1463         rc = tmp->ld_ops->ldo_process_config(tmp, cfg);
1464
1465 out:
1466         /* fini from last known good lu_device */
1467         if (rc)
1468                 mdt_stack_fini(m, d);
1469
1470         return rc;
1471 }
1472
1473 static void mdt_fini(struct mdt_device *m)
1474 {
1475         struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
1476
1477         ENTRY;
1478
1479         mdt_stop_ptlrpc_service(m);
1480
1481         /* finish the stack */
1482         mdt_stack_fini(m, md2lu_dev(m->mdt_child));
1483
1484         if (d->ld_site != NULL) {
1485                 lu_site_fini(d->ld_site);
1486                 OBD_FREE_PTR(d->ld_site);
1487                 d->ld_site = NULL;
1488         }
1489         if (m->mdt_namespace != NULL) {
1490                 ldlm_namespace_free(m->mdt_namespace, 0);
1491                 m->mdt_namespace = NULL;
1492         }
1493
1494         if (m->mdt_seq_mgr) {
1495                 seq_mgr_fini(m->mdt_seq_mgr);
1496                 m->mdt_seq_mgr = NULL;
1497         }
1498
1499         LASSERT(atomic_read(&d->ld_ref) == 0);
1500         md_device_fini(&m->mdt_md_dev);
1501         EXIT;
1502 }
1503
1504 static int mdt_init0(struct mdt_device *m,
1505                      struct lu_device_type *t, struct lustre_cfg *cfg)
1506 {
1507         int rc;
1508         struct lu_site *s;
1509         char   ns_name[48];
1510         struct lu_context ctx;
1511         const char *dev = lustre_cfg_string(cfg, 0);
1512         struct obd_device *obd;
1513         ENTRY;
1514
1515         obd = class_name2obd(dev);
1516         m->mdt_md_dev.md_lu_dev.ld_obd = obd;
1517
1518         OBD_ALLOC_PTR(s);
1519         if (s == NULL)
1520                 RETURN(-ENOMEM);
1521
1522         md_device_init(&m->mdt_md_dev, t);
1523         m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
1524
1525         rc = lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
1526         if (rc) {
1527                 CERROR("can't init lu_site, rc %d\n", rc);
1528                 GOTO(err_fini_site, rc);
1529         }
1530
1531         /* init the stack */
1532         rc = mdt_stack_init(m, cfg);
1533         if (rc) {
1534                 CERROR("can't init device stack, rc %d\n", rc);
1535                 GOTO(err_fini_site, rc);
1536         }
1537
1538         m->mdt_seq_mgr = seq_mgr_init(&seq_mgr_ops, m);
1539         if (!m->mdt_seq_mgr) {
1540                 CERROR("can't initialize sequence manager\n");
1541                 GOTO(err_fini_stack, rc);
1542         }
1543
1544         rc = lu_context_init(&ctx);
1545         if (rc != 0)
1546                 GOTO(err_fini_mgr, rc);
1547
1548         lu_context_enter(&ctx);
1549         /* init sequence info after device stack is initialized. */
1550         rc = seq_mgr_setup(&ctx, m->mdt_seq_mgr);
1551         lu_context_exit(&ctx);
1552         if (rc)
1553                 GOTO(err_fini_ctx, rc);
1554
1555         lu_context_fini(&ctx);
1556
1557         snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
1558         m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
1559         if (m->mdt_namespace == NULL)
1560                 GOTO(err_fini_site, rc = -ENOMEM);
1561
1562         ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
1563
1564         rc = mdt_fld_init(m);
1565         if (rc)
1566                 GOTO(err_free_ns, rc);
1567
1568         rc = mdt_start_ptlrpc_service(m);
1569         if (rc)
1570                 GOTO(err_free_fld, rc);
1571
1572         RETURN(0);
1573
1574 err_free_fld:
1575         mdt_fld_fini(m);
1576 err_free_ns:
1577         ldlm_namespace_free(m->mdt_namespace, 0);
1578         m->mdt_namespace = NULL;
1579 err_fini_ctx:
1580         lu_context_fini(&ctx);
1581 err_fini_mgr:
1582         seq_mgr_fini(m->mdt_seq_mgr);
1583         m->mdt_seq_mgr = NULL;
1584 err_fini_stack:
1585         mdt_stack_fini(m, md2lu_dev(m->mdt_child));
1586 err_fini_site:
1587         lu_site_fini(s);
1588         OBD_FREE_PTR(s);
1589         RETURN(rc);
1590 }
1591
1592 /* used by MGS to process specific configurations */
1593 static int mdt_process_config(struct lu_device *d, struct lustre_cfg *cfg)
1594 {
1595         struct lu_device *next = md2lu_dev(mdt_dev(d)->mdt_child);
1596         int err;
1597         ENTRY;
1598
1599         switch (cfg->lcfg_command) {
1600                 /* all MDT specific commands should be here */
1601         default:
1602                 /* others are passed further */
1603                 err = next->ld_ops->ldo_process_config(next, cfg);
1604         }
1605         RETURN(err);
1606 }
1607
1608 static struct lu_object *mdt_object_alloc(struct lu_context *ctxt,
1609                                           struct lu_device *d)
1610 {
1611         struct mdt_object *mo;
1612
1613         OBD_ALLOC_PTR(mo);
1614         if (mo != NULL) {
1615                 struct lu_object *o;
1616                 struct lu_object_header *h;
1617
1618                 o = &mo->mot_obj.mo_lu;
1619                 h = &mo->mot_header;
1620                 lu_object_header_init(h);
1621                 lu_object_init(o, h, d);
1622                 lu_object_add_top(h, o);
1623                 o->lo_ops = &mdt_obj_ops;
1624                 return o;
1625         } else
1626                 return NULL;
1627 }
1628
1629 static int mdt_object_init(struct lu_context *ctxt, struct lu_object *o)
1630 {
1631         struct mdt_device *d = mdt_dev(o->lo_dev);
1632         struct lu_device  *under;
1633         struct lu_object  *below;
1634
1635         under = &d->mdt_child->md_lu_dev;
1636         below = under->ld_ops->ldo_object_alloc(ctxt, under);
1637         if (below != NULL) {
1638                 lu_object_add(o, below);
1639                 return 0;
1640         } else
1641                 return -ENOMEM;
1642 }
1643
1644 static void mdt_object_free(struct lu_context *ctxt, struct lu_object *o)
1645 {
1646         struct mdt_object *mo = mdt_obj(o);
1647         struct lu_object_header *h;
1648
1649         h = o->lo_header;
1650         lu_object_fini(o);
1651         lu_object_header_fini(h);
1652         OBD_FREE_PTR(mo);
1653 }
1654
1655 static void mdt_object_release(struct lu_context *ctxt, struct lu_object *o)
1656 {
1657 }
1658
1659 static int mdt_object_exists(struct lu_context *ctx, struct lu_object *o)
1660 {
1661         struct lu_object *next = lu_object_next(o);
1662
1663         return next->lo_ops->loo_object_exists(ctx, next);
1664 }
1665
1666 static int mdt_object_print(struct lu_context *ctxt,
1667                             struct seq_file *f, const struct lu_object *o)
1668 {
1669         return seq_printf(f, LUSTRE_MDT0_NAME"-object@%p", o);
1670 }
1671
1672 static struct lu_device_operations mdt_lu_ops = {
1673         .ldo_object_alloc   = mdt_object_alloc,
1674         .ldo_object_free    = mdt_object_free,
1675         .ldo_process_config = mdt_process_config
1676 };
1677
1678 static struct lu_object_operations mdt_obj_ops = {
1679         .loo_object_init    = mdt_object_init,
1680         .loo_object_release = mdt_object_release,
1681         .loo_object_print   = mdt_object_print,
1682         .loo_object_exists  = mdt_object_exists
1683 };
1684
1685 /* mds_connect_internal */
1686 static int mdt_connect0(struct mdt_device *mdt,
1687                         struct obd_export *exp, struct obd_connect_data *data)
1688 {
1689         if (data != NULL) {
1690                 data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
1691                 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
1692
1693                 /* If no known bits (which should not happen, probably,
1694                    as everybody should support LOOKUP and UPDATE bits at least)
1695                    revert to compat mode with plain locks. */
1696                 if (!data->ocd_ibits_known &&
1697                     data->ocd_connect_flags & OBD_CONNECT_IBITS)
1698                         data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
1699
1700                 if (!mdt->mdt_opts.mo_acl)
1701                         data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
1702
1703                 if (!mdt->mdt_opts.mo_user_xattr)
1704                         data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
1705
1706                 exp->exp_connect_flags = data->ocd_connect_flags;
1707                 data->ocd_version = LUSTRE_VERSION_CODE;
1708                 exp->exp_mds_data.med_ibits_known = data->ocd_ibits_known;
1709         }
1710
1711         if (mdt->mdt_opts.mo_acl &&
1712             ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
1713                 CWARN("%s: MDS requires ACL support but client does not\n",
1714                       mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
1715                 return -EBADE;
1716         }
1717         return 0;
1718 }
1719
1720 /* mds_connect copy */
1721 static int mdt_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
1722                            struct obd_uuid *cluuid,
1723                            struct obd_connect_data *data)
1724 {
1725         struct obd_export *exp;
1726         int rc;
1727         struct mdt_device *mdt;
1728         struct mds_export_data *med;
1729         struct mds_client_data *mcd = NULL;
1730         ENTRY;
1731
1732         if (!conn || !obd || !cluuid)
1733                 RETURN(-EINVAL);
1734
1735         mdt = mdt_dev(obd->obd_lu_dev);
1736
1737         rc = class_connect(conn, obd, cluuid);
1738         if (rc)
1739                 RETURN(rc);
1740
1741         exp = class_conn2export(conn);
1742         LASSERT(exp != NULL);
1743         med = &exp->exp_mds_data;
1744
1745         rc = mdt_connect0(mdt, exp, data);
1746         if (rc == 0) {
1747                 OBD_ALLOC_PTR(mcd);
1748                 if (mcd != NULL) {
1749                         memcpy(mcd->mcd_uuid, cluuid, sizeof mcd->mcd_uuid);
1750                         med->med_mcd = mcd;
1751                 } else
1752                         rc = -ENOMEM;
1753         }
1754         if (rc)
1755                 class_disconnect(exp);
1756         else
1757                 class_export_put(exp);
1758
1759         RETURN(rc);
1760 }
1761
1762 static int mdt_obd_disconnect(struct obd_export *exp)
1763 {
1764         struct mds_export_data *med = &exp->exp_mds_data;
1765         unsigned long irqflags;
1766         int rc;
1767         ENTRY;
1768
1769         LASSERT(exp);
1770         class_export_get(exp);
1771
1772         /* Disconnect early so that clients can't keep using export */
1773         rc = class_disconnect(exp);
1774         //ldlm_cancel_locks_for_export(exp);
1775
1776         /* complete all outstanding replies */
1777         spin_lock_irqsave(&exp->exp_lock, irqflags);
1778         while (!list_empty(&exp->exp_outstanding_replies)) {
1779                 struct ptlrpc_reply_state *rs =
1780                         list_entry(exp->exp_outstanding_replies.next,
1781                                    struct ptlrpc_reply_state, rs_exp_list);
1782                 struct ptlrpc_service *svc = rs->rs_service;
1783
1784                 spin_lock(&svc->srv_lock);
1785                 list_del_init(&rs->rs_exp_list);
1786                 ptlrpc_schedule_difficult_reply(rs);
1787                 spin_unlock(&svc->srv_lock);
1788         }
1789         spin_unlock_irqrestore(&exp->exp_lock, irqflags);
1790
1791         OBD_FREE_PTR(med->med_mcd);
1792
1793         class_export_put(exp);
1794         RETURN(rc);
1795 }
1796
1797 static struct obd_ops mdt_obd_device_ops = {
1798         .o_owner = THIS_MODULE,
1799         .o_connect = mdt_obd_connect,
1800         .o_disconnect = mdt_obd_disconnect,
1801 };
1802
1803 static struct lu_device *mdt_device_alloc(struct lu_device_type *t,
1804                                           struct lustre_cfg *cfg)
1805 {
1806         struct lu_device  *l;
1807         struct mdt_device *m;
1808
1809         OBD_ALLOC_PTR(m);
1810         if (m != NULL) {
1811                 int result;
1812
1813                 l = &m->mdt_md_dev.md_lu_dev;
1814                 result = mdt_init0(m, t, cfg);
1815                 if (result != 0) {
1816                         mdt_fini(m);
1817                         return ERR_PTR(result);
1818                 }
1819
1820         } else
1821                 l = ERR_PTR(-ENOMEM);
1822         return l;
1823 }
1824
1825 static void mdt_device_free(struct lu_device *d)
1826 {
1827         struct mdt_device *m = mdt_dev(d);
1828
1829         mdt_fini(m);
1830         OBD_FREE_PTR(m);
1831 }
1832
1833 static void *mdt_thread_init(struct lu_context *ctx)
1834 {
1835         struct mdt_thread_info *info;
1836
1837         OBD_ALLOC_PTR(info);
1838         if (info != NULL)
1839                 info->mti_ctxt = ctx;
1840         else
1841                 info = ERR_PTR(-ENOMEM);
1842         return info;
1843 }
1844
1845 static void mdt_thread_fini(struct lu_context *ctx, void *data)
1846 {
1847         struct mdt_thread_info *info = data;
1848         OBD_FREE_PTR(info);
1849 }
1850
1851 static struct lu_context_key mdt_thread_key = {
1852         .lct_init = mdt_thread_init,
1853         .lct_fini = mdt_thread_fini
1854 };
1855
1856 static int mdt_type_init(struct lu_device_type *t)
1857 {
1858         return lu_context_key_register(&mdt_thread_key);
1859 }
1860
1861 static void mdt_type_fini(struct lu_device_type *t)
1862 {
1863         lu_context_key_degister(&mdt_thread_key);
1864 }
1865
1866 static struct lu_device_type_operations mdt_device_type_ops = {
1867         .ldto_init = mdt_type_init,
1868         .ldto_fini = mdt_type_fini,
1869
1870         .ldto_device_alloc = mdt_device_alloc,
1871         .ldto_device_free  = mdt_device_free
1872 };
1873
1874 static struct lu_device_type mdt_device_type = {
1875         .ldt_tags = LU_DEVICE_MD,
1876         .ldt_name = LUSTRE_MDT0_NAME,
1877         .ldt_ops  = &mdt_device_type_ops
1878 };
1879
1880 static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
1881         { 0 }
1882 };
1883
1884 static struct lprocfs_vars lprocfs_mdt_module_vars[] = {
1885         { 0 }
1886 };
1887
1888 LPROCFS_INIT_VARS(mdt, lprocfs_mdt_module_vars, lprocfs_mdt_obd_vars);
1889
1890 static int __init mdt_mod_init(void)
1891 {
1892         struct lprocfs_static_vars lvars;
1893
1894         mdt_num_threads = MDT_NUM_THREADS;
1895         lprocfs_init_vars(mdt, &lvars);
1896         return class_register_type(&mdt_obd_device_ops, NULL,
1897                                    lvars.module_vars, LUSTRE_MDT0_NAME,
1898                                    &mdt_device_type);
1899 }
1900
1901 static void __exit mdt_mod_exit(void)
1902 {
1903         class_unregister_type(LUSTRE_MDT0_NAME);
1904 }
1905
1906
1907 #define DEF_HNDL(prefix, base, suffix, flags, opc, fn)                  \
1908 [prefix ## _ ## opc - prefix ## _ ## base] = {                          \
1909         .mh_name    = #opc,                                             \
1910         .mh_fail_id = OBD_FAIL_ ## prefix ## _  ## opc ## suffix,       \
1911         .mh_opc     = prefix ## _  ## opc,                              \
1912         .mh_flags   = flags,                                            \
1913         .mh_act     = fn                                                \
1914 }
1915
1916 #define DEF_MDT_HNDL(flags, name, fn)                   \
1917         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn)
1918
1919 static struct mdt_handler mdt_mds_ops[] = {
1920         DEF_MDT_HNDL(0,            CONNECT,        mdt_connect),
1921         DEF_MDT_HNDL(0,            DISCONNECT,     mdt_disconnect),
1922         DEF_MDT_HNDL(0,            GETSTATUS,      mdt_getstatus),
1923         DEF_MDT_HNDL(HABEO_CORPUS, GETATTR,        mdt_getattr),
1924         DEF_MDT_HNDL(HABEO_CORPUS, GETATTR_NAME,   mdt_getattr_name),
1925         DEF_MDT_HNDL(HABEO_CORPUS, SETXATTR,       mdt_setxattr),
1926         DEF_MDT_HNDL(HABEO_CORPUS, GETXATTR,       mdt_getxattr),
1927         DEF_MDT_HNDL(0,            STATFS,         mdt_statfs),
1928         DEF_MDT_HNDL(HABEO_CORPUS, READPAGE,       mdt_readpage),
1929         DEF_MDT_HNDL(0,            REINT,          mdt_reint),
1930         DEF_MDT_HNDL(HABEO_CORPUS, CLOSE,          mdt_close),
1931         DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING,   mdt_done_writing),
1932         DEF_MDT_HNDL(0,            PIN,            mdt_pin),
1933         DEF_MDT_HNDL(HABEO_CORPUS, SYNC,           mdt_sync),
1934         DEF_MDT_HNDL(0,            QUOTACHECK,     mdt_handle_quotacheck),
1935         DEF_MDT_HNDL(0,            QUOTACTL,       mdt_handle_quotactl)
1936 };
1937
1938 static struct mdt_handler mdt_obd_ops[] = {
1939 };
1940
1941 #define DEF_DLM_HNDL(flags, name, fn)                   \
1942         DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn)
1943
1944 static struct mdt_handler mdt_dlm_ops[] = {
1945         DEF_DLM_HNDL(HABEO_CLAVIS, ENQUEUE,        mdt_enqueue),
1946         DEF_DLM_HNDL(HABEO_CLAVIS, CONVERT,        mdt_convert),
1947         DEF_DLM_HNDL(0,            BL_CALLBACK,    mdt_bl_callback),
1948         DEF_DLM_HNDL(0,            CP_CALLBACK,    mdt_cp_callback)
1949 };
1950
1951 static struct mdt_handler mdt_llog_ops[] = {
1952 };
1953
1954 static struct mdt_opc_slice mdt_handlers[] = {
1955         {
1956                 .mos_opc_start = MDS_GETATTR,
1957                 .mos_opc_end   = MDS_LAST_OPC,
1958                 .mos_hs        = mdt_mds_ops
1959         },
1960         {
1961                 .mos_opc_start = OBD_PING,
1962                 .mos_opc_end   = OBD_LAST_OPC,
1963                 .mos_hs        = mdt_obd_ops
1964         },
1965         {
1966                 .mos_opc_start = LDLM_ENQUEUE,
1967                 .mos_opc_end   = LDLM_LAST_OPC,
1968                 .mos_hs        = mdt_dlm_ops
1969         },
1970         {
1971                 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
1972                 .mos_opc_end   = LLOG_LAST_OPC,
1973                 .mos_hs        = mdt_llog_ops
1974         },
1975         {
1976                 .mos_hs        = NULL
1977         }
1978 };
1979
1980 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1981 MODULE_DESCRIPTION("Lustre Meta-data Target Prototype ("LUSTRE_MDT0_NAME")");
1982 MODULE_LICENSE("GPL");
1983
1984 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
1985                 "number of mdt service threads to start");
1986
1987 cfs_module(mdt, "0.0.4", mdt_mod_init, mdt_mod_exit);