Whamcloud - gitweb
fix module initialization
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/handler.c
5  *  Lustre Metadata Target (mdt) request handler
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *   Author: Nikita Danilov <nikita@clusterfs.com>
13  *
14  *   This file is part of the Lustre file system, http://www.lustre.org
15  *   Lustre is a trademark of Cluster File Systems, Inc.
16  *
17  *   You may have signed or agreed to another license before downloading
18  *   this software.  If so, you are bound by the terms and conditions
19  *   of that agreement, and the following does not apply to you.  See the
20  *   LICENSE file included with this distribution for more information.
21  *
22  *   If you did not agree to a different license, then this copy of Lustre
23  *   is open source software; you can redistribute it and/or modify it
24  *   under the terms of version 2 of the GNU General Public License as
25  *   published by the Free Software Foundation.
26  *
27  *   In either case, Lustre is distributed in the hope that it will be
28  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
29  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
30  *   license text for more details.
31  */
32
33 #ifndef EXPORT_SYMTAB
34 # define EXPORT_SYMTAB
35 #endif
36 #define DEBUG_SUBSYSTEM S_MDS
37
38 #include <linux/module.h>
39
40 /* LUSTRE_VERSION_CODE */
41 #include <linux/lustre_ver.h>
42 /*
43  * struct OBD_{ALLOC,FREE}*()
44  * OBD_FAIL_CHECK
45  */
46 #include <linux/obd_support.h>
47 /* struct ptlrpc_request */
48 #include <linux/lustre_net.h>
49 /* struct obd_export */
50 #include <linux/lustre_export.h>
51 /* struct obd_device */
52 #include <linux/obd.h>
53
54 /* struct mds_client_data */
55 #include "../mds/mds_internal.h"
56 #include "mdt_internal.h"
57
58 /*
59  * Initialized in mdt_mod_init().
60  */
61 unsigned long mdt_num_threads;
62
63 static int mdt_handle(struct ptlrpc_request *req);
64 static struct ptlrpc_thread_key mdt_thread_key;
65
66 static int mdt_mkdir(struct mdt_thread_info *info, struct mdt_device *d,
67                      struct lu_fid *pfid, const char *name, struct lu_fid *cfid)
68 {
69         struct mdt_object      *o;
70         struct mdt_object      *child;
71         struct mdt_lock_handle *lh;
72
73         int result;
74
75         lh = &info->mti_lh[MDT_LH_PARENT];
76         lh->mlh_mode = LCK_PW;
77
78         o = mdt_object_find_lock(d, pfid, lh, MDS_INODELOCK_UPDATE);
79         if (IS_ERR(o))
80                 return PTR_ERR(o);
81
82         child = mdt_object_find(d, cfid);
83         if (!IS_ERR(child)) {
84                 result = mdt_child_ops(d)->mdo_mkdir(mdt_object_child(o), name,
85                                                      mdt_object_child(child));
86                 mdt_object_put(child);
87         } else
88                 result = PTR_ERR(child);
89         mdt_object_unlock(d->mdt_namespace, o, lh);
90         mdt_object_put(o);
91         return result;
92 }
93 #if 0
94 static int mdt_md_getattr(struct mdt_thread_info *info, struct lu_fid *fid,
95                           struct md_object_attr *attr)
96 {
97         struct mdt_device *d = info->mti_mdt;
98         struct mdt_object *o;
99         struct iattr
100         int               result;
101
102         o = mdt_object_find(d, fid);
103         if (IS_ERR(o))
104                 return PTR_ERR(o);
105
106         result = mdt_child_ops(d)->mdo_attr_get(mdt_object_child(o), name,
107                                                      mdt_object_child(child));
108                 mdt_object_put(child);
109         } else
110                 result = PTR_ERR(child);
111         mdt_object_unlock(d->mdt_namespace, o, lh);
112         mdt_object_put(o);
113         return result;
114 }
115 #endif
116 static int mdt_getstatus(struct mdt_thread_info *info,
117                          struct ptlrpc_request *req, int offset)
118 {
119         struct md_device *mdd  = info->mti_mdt->mdt_child;
120         struct mdt_body  *body;
121         int               size = sizeof *body;
122         int               result;
123
124         ENTRY;
125
126         result = lustre_pack_reply(req, 1, &size, NULL);
127         if (result)
128                 CERROR(LUSTRE_MDT0_NAME" out of memory for message: size=%d\n",
129                        size);
130         else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
131                 result = -ENOMEM;
132         else {
133                 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof *body);
134                 result = mdd->md_ops->mdo_root_get(mdd, &body->fid1);
135         }
136
137         /* the last_committed and last_xid fields are filled in for all
138          * replies already - no need to do so here also.
139          */
140         RETURN(result);
141 }
142
143 static int mdt_statfs(struct mdt_thread_info *info,
144                       struct ptlrpc_request *req, int offset)
145 {
146         struct md_device  *child  = info->mti_mdt->mdt_child;
147         struct obd_statfs *osfs;
148         struct kstatfs    sfs;
149         int               result;
150         int               size = sizeof(struct obd_statfs);
151
152         ENTRY;
153
154         result = lustre_pack_reply(req, 1, &size, NULL);
155         if (result)
156                 CERROR(LUSTRE_MDT0_NAME" out of memory for statfs: size=%d\n",
157                        size);
158         else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
159                 CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
160                 result = -ENOMEM;
161         } else {
162                 osfs = lustre_msg_buf(req->rq_repmsg, 0, size);
163                 /* XXX max_age optimisation is needed here. See mds_statfs */
164                 result = child->md_ops->mdo_statfs(child, &sfs);
165                 statfs_pack(osfs, &sfs);
166         }
167
168         RETURN(result);
169 }
170 #if 0
171 static int mdt_getattr(struct mdt_thread_info *info,
172                        struct ptlrpc_request *req, int offset)
173 {
174         struct mdt_body        *body;
175         int                    size = sizeof (*body);
176         struct md_obj_attr     attr;
177         int result;
178
179         ENTRY;
180
181         result = lustre_pack_reply(req, 1, &size, NULL);
182         if (result)
183                 CERROR(LUSTRE_MDT0_NAME" out of memory for statfs: size=%d\n",
184                        size);
185         else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
186                 CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
187                 result = -ENOMEM;
188         } else {
189                 body = lustre_swab_reqbuf(req, offset, size,
190                                           lustre_swab_mdt_body);
191                 result = mdt_md_getattr(info, body->fid1);
192         }
193 out:
194         RETURN(result);
195 }
196 #else
197 static int mdt_getattr(struct mdt_thread_info *info,
198                        struct ptlrpc_request *req, int offset)
199 {
200          return -EOPNOTSUPP;
201 }
202 #endif
203 static int mdt_connect(struct mdt_thread_info *info,
204                        struct ptlrpc_request *req, int offset)
205 {
206         return target_handle_connect(req, mdt_handle);
207 }
208
209 static int mdt_disconnect(struct mdt_thread_info *info,
210                           struct ptlrpc_request *req, int offset)
211 {
212         return -EOPNOTSUPP;
213 }
214
215 static int mdt_getattr_name(struct mdt_thread_info *info,
216                             struct ptlrpc_request *req, int offset)
217 {
218         return -EOPNOTSUPP;
219 }
220
221 static int mdt_setxattr(struct mdt_thread_info *info,
222                         struct ptlrpc_request *req, int offset)
223 {
224         return -EOPNOTSUPP;
225 }
226
227 static int mdt_getxattr(struct mdt_thread_info *info,
228                         struct ptlrpc_request *req, int offset)
229 {
230         return -EOPNOTSUPP;
231 }
232
233 static int mdt_readpage(struct mdt_thread_info *info,
234                         struct ptlrpc_request *req, int offset)
235 {
236         return -EOPNOTSUPP;
237 }
238
239 static int mdt_reint(struct mdt_thread_info *info,
240                      struct ptlrpc_request *req, int offset)
241 {
242         return -EOPNOTSUPP;
243 }
244
245 static int mdt_close(struct mdt_thread_info *info,
246                      struct ptlrpc_request *req, int offset)
247 {
248         return -EOPNOTSUPP;
249 }
250
251 static int mdt_done_writing(struct mdt_thread_info *info,
252                             struct ptlrpc_request *req, int offset)
253 {
254         return -EOPNOTSUPP;
255 }
256
257 static int mdt_pin(struct mdt_thread_info *info,
258                    struct ptlrpc_request *req, int offset)
259 {
260         return -EOPNOTSUPP;
261 }
262
263 static int mdt_sync(struct mdt_thread_info *info,
264                     struct ptlrpc_request *req, int offset)
265 {
266         return -EOPNOTSUPP;
267 }
268
269 static int mdt_set_info(struct mdt_thread_info *info,
270                         struct ptlrpc_request *req, int offset)
271 {
272         return -EOPNOTSUPP;
273 }
274
275 static int mdt_handle_quotacheck(struct mdt_thread_info *info,
276                                  struct ptlrpc_request *req, int offset)
277 {
278         return -EOPNOTSUPP;
279 }
280
281 static int mdt_handle_quotactl(struct mdt_thread_info *info,
282                                struct ptlrpc_request *req, int offset)
283 {
284         return -EOPNOTSUPP;
285 }
286
287 /*
288  * DLM handlers.
289  */
290
291 static struct ldlm_callback_suite cbs = {
292         .lcs_completion = ldlm_server_completion_ast,
293         .lcs_blocking   = ldlm_server_blocking_ast,
294         .lcs_glimpse    = NULL
295 };
296
297 static int mdt_enqueue(struct mdt_thread_info *info,
298                        struct ptlrpc_request *req, int offset)
299 {
300         /*
301          * info->mti_dlm_req already contains swapped and (if necessary)
302          * converted dlm request.
303          */
304         LASSERT(info->mti_dlm_req);
305
306         info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
307         return ldlm_handle_enqueue0(req, info->mti_dlm_req, &cbs);
308 }
309
310 static int mdt_convert(struct mdt_thread_info *info,
311                        struct ptlrpc_request *req, int offset)
312 {
313         LASSERT(info->mti_dlm_req);
314         return ldlm_handle_convert0(req, info->mti_dlm_req);
315 }
316
317 static int mdt_bl_callback(struct mdt_thread_info *info,
318                            struct ptlrpc_request *req, int offset)
319 {
320         CERROR("bl callbacks should not happen on MDS\n");
321         LBUG();
322         return -EOPNOTSUPP;
323 }
324
325 static int mdt_cp_callback(struct mdt_thread_info *info,
326                            struct ptlrpc_request *req, int offset)
327 {
328         CERROR("cp callbacks should not happen on MDS\n");
329         LBUG();
330         return -EOPNOTSUPP;
331 }
332
333 /*
334  * Build (DLM) resource name from fid.
335  */
336 struct ldlm_res_id *fid_build_res_name(const struct lu_fid *f,
337                                        struct ldlm_res_id *name)
338 {
339         memset(name, 0, sizeof *name);
340         /* we use fid_num() whoch includes also object version instread of raw
341          * fid_oid(). */
342         name->name[0] = fid_seq(f);
343         name->name[1] = fid_num(f);
344         return name;
345 }
346
347 /*
348  * Return true if resource is for object identified by fid.
349  */
350 int fid_res_name_eq(const struct lu_fid *f, const struct ldlm_res_id *name)
351 {
352         return name->name[0] == fid_seq(f) && name->name[1] == fid_num(f);
353 }
354
355 /* issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
356 int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
357              struct lustre_handle *lh, ldlm_mode_t mode,
358              ldlm_policy_data_t *policy)
359 {
360         struct ldlm_res_id res_id;
361         int flags = 0, rc;
362         ENTRY;
363
364         LASSERT(ns != NULL);
365         LASSERT(lh != NULL);
366         LASSERT(f != NULL);
367
368         /* FIXME: is that correct to have @flags=0 here? */
369         rc = ldlm_cli_enqueue(NULL, NULL, ns, *fid_build_res_name(f, &res_id),
370                               LDLM_IBITS, policy, mode, &flags,
371                               ldlm_blocking_ast, ldlm_completion_ast, NULL,
372                               NULL, NULL, 0, NULL, lh);
373         RETURN (rc == ELDLM_OK ? 0 : -EIO);
374 }
375
376 void fid_unlock(struct ldlm_namespace *ns, const struct lu_fid *f,
377                 struct lustre_handle *lh, ldlm_mode_t mode)
378 {
379         struct ldlm_lock *lock;
380         ENTRY;
381
382         /* FIXME: this is debug stuff, remove it later. */
383         lock = ldlm_handle2lock(lh);
384         if (!lock) {
385                 CERROR("invalid lock handle "LPX64, lh->cookie);
386                 LBUG();
387         }
388
389         LASSERT(fid_res_name_eq(f, &lock->l_resource->lr_name));
390
391         ldlm_lock_decref(lh, mode);
392         EXIT;
393 }
394
395 static struct lu_device_operations mdt_lu_ops;
396
397 static int lu_device_is_mdt(struct lu_device *d)
398 {
399         /*
400          * XXX for now. Tags in lu_device_type->ldt_something are needed.
401          */
402         return ergo(d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
403 }
404
405 static struct mdt_object *mdt_obj(struct lu_object *o)
406 {
407         LASSERT(lu_device_is_mdt(o->lo_dev));
408         return container_of(o, struct mdt_object, mot_obj.mo_lu);
409 }
410
411 struct mdt_object *mdt_object_find(struct mdt_device *d,
412                                    struct lu_fid *f)
413 {
414         struct lu_object *o;
415
416         o = lu_object_find(d->mdt_md_dev.md_lu_dev.ld_site, f);
417         if (IS_ERR(o))
418                 return (struct mdt_object *)o;
419         else
420                 return mdt_obj(o);
421 }
422
423 void mdt_object_put(struct mdt_object *o)
424 {
425         lu_object_put(&o->mot_obj.mo_lu);
426 }
427
428 struct lu_fid *mdt_object_fid(struct mdt_object *o)
429 {
430         return lu_object_fid(&o->mot_obj.mo_lu);
431 }
432
433 int mdt_object_lock(struct ldlm_namespace *ns, struct mdt_object *o,
434                     struct mdt_lock_handle *lh, __u64 ibits)
435 {
436         ldlm_policy_data_t p = {
437                 .l_inodebits = {
438                         .bits = ibits
439                 }
440         };
441         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
442         LASSERT(lh->mlh_mode != LCK_MINMODE);
443
444         return fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode, &p);
445 }
446
447 void mdt_object_unlock(struct ldlm_namespace *ns, struct mdt_object *o,
448                               struct mdt_lock_handle *lh)
449 {
450         if (lustre_handle_is_used(&lh->mlh_lh)) {
451                 fid_unlock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode);
452                 lh->mlh_lh.cookie = 0;
453         }
454 }
455
456 struct mdt_object *mdt_object_find_lock(struct mdt_device *d,
457                                         struct lu_fid *f,
458                                         struct mdt_lock_handle *lh,
459                                         __u64 ibits)
460 {
461         struct mdt_object *o;
462
463         o = mdt_object_find(d, f);
464         if (!IS_ERR(o)) {
465                 int result;
466
467                 result = mdt_object_lock(d->mdt_namespace, o, lh, ibits);
468                 if (result != 0) {
469                         mdt_object_put(o);
470                         o = ERR_PTR(result);
471                 }
472         }
473         return o;
474 }
475
476 struct mdt_handler {
477         const char *mh_name;
478         int         mh_fail_id;
479         __u32       mh_opc;
480         __u32       mh_flags;
481         int (*mh_act)(struct mdt_thread_info *info,
482                       struct ptlrpc_request *req, int offset);
483 };
484
485 enum mdt_handler_flags {
486         /*
487          * struct mdt_body is passed in the 0-th incoming buffer.
488          */
489         HABEO_CORPUS = (1 << 0),
490         /*
491          * struct ldlm_request is passed in MDS_REQ_INTENT_LOCKREQ_OFF-th
492          * incoming buffer.
493          */
494         HABEO_CLAVIS   = (1 << 1)
495 };
496
497 struct mdt_opc_slice {
498         __u32               mos_opc_start;
499         int                 mos_opc_end;
500         struct mdt_handler *mos_hs;
501 };
502
503 static struct mdt_opc_slice mdt_handlers[];
504
505 static struct mdt_handler *mdt_handler_find(__u32 opc)
506 {
507         struct mdt_opc_slice *s;
508         struct mdt_handler   *h;
509
510         h = NULL;
511         for (s = mdt_handlers; s->mos_hs != NULL; s++) {
512                 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
513                         h = s->mos_hs + (opc - s->mos_opc_start);
514                         if (h->mh_opc != 0)
515                                 LASSERT(h->mh_opc == opc);
516                         else
517                                 h = NULL; /* unsupported opc */
518                         break;
519                 }
520         }
521         return h;
522 }
523
524 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
525 {
526         return req->rq_export->exp_mds_data.med_mcd->mcd_last_xid;
527 }
528
529 static int mdt_lock_resname_compat(struct mdt_device *m,
530                                    struct ldlm_request *req)
531 {
532         /* XXX something... later. */
533         return 0;
534 }
535
536 static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
537 {
538         /* XXX something... later. */
539         return 0;
540 }
541
542 /*
543  * Invoke handler for this request opc. Also do necessary preprocessing
544  * (according to handler ->mh_flags), and post-processing (setting of
545  * ->last_{xid,committed}).
546  */
547 static int mdt_req_handle(struct mdt_thread_info *info,
548                           struct mdt_handler *h, struct ptlrpc_request *req,
549                           int shift)
550 {
551         int result;
552         int off;
553         int lock_conv;
554
555         ENTRY;
556
557         LASSERT(h->mh_act != NULL);
558         LASSERT(h->mh_opc == req->rq_reqmsg->opc);
559         LASSERT(current->journal_info == NULL);
560
561         DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
562
563         if (h->mh_fail_id != 0)
564                 OBD_FAIL_RETURN(h->mh_fail_id, 0);
565
566         off = MDS_REQ_REC_OFF + shift;
567         lock_conv =
568                 h->mh_flags & HABEO_CLAVIS &&
569                 info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME;
570
571         result = 0;
572         if (h->mh_flags & HABEO_CORPUS) {
573                 struct mdt_body *body;
574
575                 body = info->mti_body =
576                         lustre_swab_reqbuf(req, off, sizeof *info->mti_body,
577                                            lustre_swab_mdt_body);
578                 if (body != NULL) {
579                         info->mti_object = mdt_object_find(info->mti_mdt,
580                                                            &body->fid1);
581                         if (IS_ERR(info->mti_object))
582                                 result = PTR_ERR(info->mti_object);
583                 } else {
584                         CERROR("Can't unpack body\n");
585                         result = -EFAULT;
586                 }
587         } else if (lock_conv) {
588                 struct ldlm_request *dlm;
589
590                 LASSERT(shift == 0);
591                 dlm = info->mti_dlm_req =
592                         lustre_swab_reqbuf(req, MDS_REQ_INTENT_LOCKREQ_OFF,
593                                            sizeof *dlm,
594                                            lustre_swab_ldlm_request);
595                 if (dlm != NULL)
596                         result = mdt_lock_resname_compat(info->mti_mdt, dlm);
597                 else {
598                         CERROR("Can't unpack dlm request\n");
599                         result = -EFAULT;
600                 }
601         }
602         if (result == 0)
603                 /*
604                  * Process request.
605                  */
606                 result = h->mh_act(info, req, off);
607         /*
608          * XXX result value is unconditionally shoved into ->rq_status
609          * (original code sometimes placed error code into ->rq_status, and
610          * sometimes returned it to the
611          * caller). ptlrpc_server_handle_request() doesn't check return value
612          * anyway.
613          */
614         req->rq_status = result;
615
616         LASSERT(current->journal_info == NULL);
617
618         if (lock_conv) {
619                 struct ldlm_reply *rep;
620
621                 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof *rep);
622                 if (rep != NULL)
623                         result = mdt_lock_reply_compat(info->mti_mdt, rep);
624         }
625
626         /* If we're DISCONNECTing, the mds_export_data is already freed */
627         if (result == 0 && h->mh_opc != MDS_DISCONNECT) {
628                 req->rq_reqmsg->last_xid = le64_to_cpu(req_exp_last_xid(req));
629                 target_committed_to_req(req);
630         }
631         RETURN(result);
632 }
633
634 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
635 {
636         lh->mlh_lh.cookie = 0ull;
637         lh->mlh_mode = LCK_MINMODE;
638 }
639
640 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
641 {
642         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
643 }
644
645 static void mdt_thread_info_init(struct mdt_thread_info *info)
646 {
647         int i;
648
649         memset(info, 0, sizeof *info);
650         info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
651         /*
652          * Poison size array.
653          */
654         for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
655                 info->mti_rep_buf_size[i] = ~0;
656         info->mti_rep_buf_nr = i;
657         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
658                 mdt_lock_handle_init(&info->mti_lh[i]);
659 }
660
661 static void mdt_thread_info_fini(struct mdt_thread_info *info)
662 {
663         int i;
664
665         if (info->mti_object != NULL) {
666                 mdt_object_put(info->mti_object);
667                 info->mti_object = NULL;
668         }
669         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
670                 mdt_lock_handle_fini(&info->mti_lh[i]);
671 }
672
673 static int mds_msg_check_version(struct lustre_msg *msg)
674 {
675         int rc;
676
677         /* TODO: enable the below check while really introducing msg version.
678          * it's disabled because it will break compatibility with b1_4.
679          */
680         return (0);
681
682         switch (msg->opc) {
683         case MDS_CONNECT:
684         case MDS_DISCONNECT:
685         case OBD_PING:
686                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
687                 if (rc)
688                         CERROR("bad opc %u version %08x, expecting %08x\n",
689                                msg->opc, msg->version, LUSTRE_OBD_VERSION);
690                 break;
691         case MDS_GETSTATUS:
692         case MDS_GETATTR:
693         case MDS_GETATTR_NAME:
694         case MDS_STATFS:
695         case MDS_READPAGE:
696         case MDS_REINT:
697         case MDS_CLOSE:
698         case MDS_DONE_WRITING:
699         case MDS_PIN:
700         case MDS_SYNC:
701         case MDS_GETXATTR:
702         case MDS_SETXATTR:
703         case MDS_SET_INFO:
704         case MDS_QUOTACHECK:
705         case MDS_QUOTACTL:
706         case QUOTA_DQACQ:
707         case QUOTA_DQREL:
708                 rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
709                 if (rc)
710                         CERROR("bad opc %u version %08x, expecting %08x\n",
711                                msg->opc, msg->version, LUSTRE_MDS_VERSION);
712                 break;
713         case LDLM_ENQUEUE:
714         case LDLM_CONVERT:
715         case LDLM_BL_CALLBACK:
716         case LDLM_CP_CALLBACK:
717                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
718                 if (rc)
719                         CERROR("bad opc %u version %08x, expecting %08x\n",
720                                msg->opc, msg->version, LUSTRE_DLM_VERSION);
721                 break;
722         case OBD_LOG_CANCEL:
723         case LLOG_ORIGIN_HANDLE_CREATE:
724         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
725         case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
726         case LLOG_ORIGIN_HANDLE_READ_HEADER:
727         case LLOG_ORIGIN_HANDLE_CLOSE:
728         case LLOG_CATINFO:
729                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
730                 if (rc)
731                         CERROR("bad opc %u version %08x, expecting %08x\n",
732                                msg->opc, msg->version, LUSTRE_LOG_VERSION);
733                 break;
734         default:
735                 CERROR("MDS unknown opcode %d\n", msg->opc);
736                 rc = -ENOTSUPP;
737         }
738         return rc;
739 }
740
741 static int mdt_filter_recovery_request(struct ptlrpc_request *req,
742                                        struct obd_device *obd, int *process)
743 {
744         switch (req->rq_reqmsg->opc) {
745         case MDS_CONNECT: /* This will never get here, but for completeness. */
746         case OST_CONNECT: /* This will never get here, but for completeness. */
747         case MDS_DISCONNECT:
748         case OST_DISCONNECT:
749                *process = 1;
750                RETURN(0);
751
752         case MDS_CLOSE:
753         case MDS_SYNC: /* used in unmounting */
754         case OBD_PING:
755         case MDS_REINT:
756         case LDLM_ENQUEUE:
757                 *process = target_queue_recovery_request(req, obd);
758                 RETURN(0);
759
760         default:
761                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
762                 *process = 0;
763                 /* XXX what should we set rq_status to here? */
764                 req->rq_status = -EAGAIN;
765                 RETURN(ptlrpc_error(req));
766         }
767 }
768
769 /*
770  * Handle recovery. Return:
771  *        +1: continue request processing;
772  *       -ve: abort immediately with the given error code;
773  *         0: send reply with error code in req->rq_status;
774  */
775 static int mdt_recovery(struct ptlrpc_request *req)
776 {
777         int recovering;
778         int abort_recovery;
779         struct obd_device *obd;
780
781         ENTRY;
782
783         if (req->rq_reqmsg->opc == MDS_CONNECT)
784                 RETURN(+1);
785
786         if (req->rq_export == NULL) {
787                 CERROR("operation %d on unconnected MDS from %s\n",
788                        req->rq_reqmsg->opc,
789                        libcfs_id2str(req->rq_peer));
790                 req->rq_status = -ENOTCONN;
791                 RETURN(0);
792         }
793
794         /* sanity check: if the xid matches, the request must be marked as a
795          * resent or replayed */
796         LASSERTF(ergo(req->rq_xid == req_exp_last_xid(req),
797                       lustre_msg_get_flags(req->rq_reqmsg) &
798                       (MSG_RESENT | MSG_REPLAY)),
799                  "rq_xid "LPU64" matches last_xid, "
800                  "expected RESENT flag\n", req->rq_xid);
801
802         /* else: note the opposite is not always true; a RESENT req after a
803          * failover will usually not match the last_xid, since it was likely
804          * never committed. A REPLAYed request will almost never match the
805          * last xid, however it could for a committed, but still retained,
806          * open. */
807
808         obd = req->rq_export->exp_obd;
809
810         /* Check for aborted recovery... */
811         spin_lock_bh(&obd->obd_processing_task_lock);
812         abort_recovery = obd->obd_abort_recovery;
813         recovering = obd->obd_recovering;
814         spin_unlock_bh(&obd->obd_processing_task_lock);
815         if (abort_recovery) {
816                 target_abort_recovery(obd);
817         } else if (recovering) {
818                 int rc;
819                 int should_process;
820
821                 rc = mdt_filter_recovery_request(req, obd, &should_process);
822                 if (rc != 0 || !should_process) {
823                         LASSERT(rc < 0);
824                         RETURN(rc);
825                 }
826         }
827         RETURN(+1);
828 }
829
830 static int mdt_reply(struct ptlrpc_request *req, struct mdt_thread_info *info)
831 {
832         struct obd_device *obd;
833
834         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
835                 if (req->rq_reqmsg->opc != OBD_PING)
836                         DEBUG_REQ(D_ERROR, req, "Unexpected MSG_LAST_REPLAY");
837
838                 obd = req->rq_export != NULL ? req->rq_export->exp_obd : NULL;
839                 if (obd && obd->obd_recovering) {
840                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
841                         RETURN(target_queue_final_reply(req, req->rq_status));
842                 } else {
843                         /* Lost a race with recovery; let the error path
844                          * DTRT. */
845                         req->rq_status = -ENOTCONN;
846                 }
847         }
848         target_send_reply(req, req->rq_status, info->mti_fail_id);
849         RETURN(req->rq_status);
850 }
851
852 static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info)
853 {
854         struct mdt_handler *h;
855         struct lustre_msg  *msg;
856         int                 result;
857
858         ENTRY;
859
860         OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
861
862         LASSERT(current->journal_info == NULL);
863
864         msg = req->rq_reqmsg;
865         result = mds_msg_check_version(msg);
866         if (result == 0) {
867                 result = mdt_recovery(req);
868                 switch (result) {
869                 case +1:
870                         h = mdt_handler_find(msg->opc);
871                         if (h != NULL)
872                                 result = mdt_req_handle(info, h, req, 0);
873                         else {
874                                 req->rq_status = -ENOTSUPP;
875                                 result = ptlrpc_error(req);
876                                 break;
877                         }
878                         /* fall through */
879                 case 0:
880                         result = mdt_reply(req, info);
881                 }
882         } else
883                 CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
884         RETURN(result);
885 }
886
887 static struct mdt_device *mdt_dev(struct lu_device *d)
888 {
889         LASSERT(lu_device_is_mdt(d));
890         return container_of(d, struct mdt_device, mdt_md_dev.md_lu_dev);
891 }
892
893 static int mdt_handle(struct ptlrpc_request *req)
894 {
895         int result;
896
897         struct mdt_thread_info *info = ptlrpc_thread_key_get(req->rq_svc_thread,
898                                                              &mdt_thread_key);
899         ENTRY;
900
901         mdt_thread_info_init(info);
902         /* it can be NULL while CONNECT */
903         if (req->rq_export)
904                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
905
906         result = mdt_handle0(req, info);
907         mdt_thread_info_fini(info);
908         return result;
909 }
910
911 static int mdt_intent_policy(struct ldlm_namespace *ns,
912                              struct ldlm_lock **lockp, void *req_cookie,
913                              ldlm_mode_t mode, int flags, void *data)
914 {
915         ENTRY;
916         RETURN(ELDLM_LOCK_ABORTED);
917 }
918
919 struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
920                                             svc_handler_t h, char *name,
921                                             struct proc_dir_entry *proc_entry,
922                                             svcreq_printfn_t prntfn)
923 {
924         return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize,
925                                c->psc_max_req_size, c->psc_max_reply_size,
926                                c->psc_req_portal, c->psc_rep_portal,
927                                c->psc_watchdog_timeout,
928                                h, name, proc_entry,
929                                prntfn, c->psc_num_threads);
930 }
931
932 static int mdt_config(struct mdt_device *m, const char *name,
933                       void *buf, int size, int mode)
934 {
935         struct md_device *child = m->mdt_child;
936         int rc;
937         ENTRY;
938
939         if (!child->md_ops->mdo_config)
940                 RETURN(-EOPNOTSUPP);
941
942         rc = child->md_ops->mdo_config(child, name, buf, size, mode);
943         RETURN(rc);
944 }
945
946 /* allocate sequence to client */
947 int mdt_seq_alloc(struct mdt_device *m, __u64 *seq)
948 {
949         int rc = 0;
950         ENTRY;
951
952         LASSERT(m != NULL);
953         LASSERT(seq != NULL);
954
955         down(&m->mdt_seq_sem);
956         m->mdt_seq += 1;
957         *seq = m->mdt_seq;
958
959         /* update new allocated sequence on store */
960         rc = mdt_config(m, LUSTRE_CONFIG_METASEQ,
961                         &m->mdt_seq, sizeof(m->mdt_seq),
962                         LUSTRE_CONFIG_SET);
963         if (rc) {
964                 CERROR("can't save new seq, rc %d\n",
965                        rc);
966         }
967
968         up(&m->mdt_seq_sem);
969
970         RETURN(0);
971 }
972 EXPORT_SYMBOL(mdt_seq_alloc);
973
974 /* initialize meta-sequence. First of all try to get it from lower layer down to
975  * back store one. In the case this is first run and there is not meta-sequence
976  * initialized yet - store it to backstore. */
977 static int mdt_seq_init(struct mdt_device *m)
978 {
979         int rc = 0;
980         ENTRY;
981
982         /* allocate next seq after root one */
983         m->mdt_seq = LUSTRE_ROOT_FID_SEQ + 1;
984
985         rc = mdt_config(m, LUSTRE_CONFIG_METASEQ,
986                         &m->mdt_seq, sizeof(m->mdt_seq),
987                         LUSTRE_CONFIG_GET);
988
989         if (rc == -EOPNOTSUPP) {
990                 /* provide zero error and let continue with default value of
991                  * sequence. */
992                 GOTO(out, rc = 0);
993         } else if (rc == -ENODATA) {
994                 CWARN("initialize new sequence\n");
995
996                 /*initialize new sequence config as it is not yet created. */
997                 rc = mdt_config(m, LUSTRE_CONFIG_METASEQ,
998                                 &m->mdt_seq, sizeof(m->mdt_seq),
999                                 LUSTRE_CONFIG_SET);
1000                 if (rc == -EOPNOTSUPP) {
1001                         /* provide zero error and let continue with default
1002                          * value of sequence. */
1003                         CERROR("can't update save initial sequence. "
1004                                "No method defined\n");
1005                         GOTO(out, rc = 0);
1006                 } else if (rc) {
1007                         CERROR("can't update config %s, rc %d\n",
1008                                LUSTRE_CONFIG_METASEQ, rc);
1009                         GOTO(out, rc);
1010                 }
1011         } else if (rc) {
1012                 CERROR("can't get config %s, rc %d\n",
1013                        LUSTRE_CONFIG_METASEQ, rc);
1014                 GOTO(out, rc);
1015         }
1016
1017         EXIT;
1018 out:
1019         if (rc == 0)
1020                 CWARN("last used sequence: "LPU64"\n", m->mdt_seq);
1021         return rc;
1022 }
1023
1024 static void mdt_fini(struct mdt_device *m)
1025 {
1026         struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
1027
1028         if (d->ld_site != NULL) {
1029                 lu_site_fini(d->ld_site);
1030                 OBD_FREE_PTR(d->ld_site);
1031                 d->ld_site = NULL;
1032         }
1033         if (m->mdt_service != NULL) {
1034                 ptlrpc_unregister_service(m->mdt_service);
1035                 m->mdt_service = NULL;
1036         }
1037         if (m->mdt_namespace != NULL) {
1038                 ldlm_namespace_free(m->mdt_namespace, 0);
1039                 m->mdt_namespace = NULL;
1040         }
1041         /* finish the stack */
1042         if (m->mdt_child) {
1043                 struct lu_device *child = md2lu_dev(m->mdt_child);
1044                 child->ld_type->ldt_ops->ldto_device_fini(child);
1045         }
1046
1047         LASSERT(atomic_read(&d->ld_ref) == 0);
1048         md_device_fini(&m->mdt_md_dev);
1049 }
1050
1051 static int mdt_init0(struct mdt_device *m,
1052                      struct lu_device_type *t, struct lustre_cfg *cfg)
1053 {
1054         int rc;
1055         struct lu_site *s;
1056         char   ns_name[48];
1057         struct obd_device *obd;
1058         struct lu_device  *mdt_child;
1059         const char *top   = lustre_cfg_string(cfg, 0);
1060         const char *child = lustre_cfg_string(cfg, 1);
1061
1062         ENTRY;
1063
1064         /* get next layer */
1065         obd = class_name2obd((char *)child);
1066         if (obd && obd->obd_lu_dev) {
1067                 CDEBUG(D_INFO, "Child device is %s\n", child);
1068                 m->mdt_child = lu2md_dev(obd->obd_lu_dev);
1069                 mdt_child = md2lu_dev(m->mdt_child);
1070         } else {
1071                 CDEBUG(D_INFO, "Child device %s is not found\n", child);
1072                 RETURN(-EINVAL);
1073         }
1074
1075         OBD_ALLOC_PTR(s);
1076         if (s == NULL)
1077                 RETURN(-ENOMEM);
1078
1079         md_device_init(&m->mdt_md_dev, t);
1080         m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
1081         lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
1082
1083         sema_init(&m->mdt_seq_sem, 1);
1084
1085         m->mdt_service_conf.psc_nbufs            = MDS_NBUFS;
1086         m->mdt_service_conf.psc_bufsize          = MDS_BUFSIZE;
1087         m->mdt_service_conf.psc_max_req_size     = MDS_MAXREQSIZE;
1088         m->mdt_service_conf.psc_max_reply_size   = MDS_MAXREPSIZE;
1089         m->mdt_service_conf.psc_req_portal       = MDS_REQUEST_PORTAL;
1090         m->mdt_service_conf.psc_rep_portal       = MDC_REPLY_PORTAL;
1091         m->mdt_service_conf.psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT;
1092         /*
1093          * We'd like to have a mechanism to set this on a per-device basis,
1094          * but alas...
1095          */
1096         m->mdt_service_conf.psc_num_threads = min(max(mdt_num_threads,
1097                                                       MDT_MIN_THREADS),
1098                                                   MDT_MAX_THREADS);
1099         snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
1100         m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
1101         if (m->mdt_namespace == NULL)
1102                 GOTO(err_fini_site, rc = -ENOMEM);
1103
1104         ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
1105
1106         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1107                            "mdt_ldlm_client", &m->mdt_ldlm_client);
1108
1109         m->mdt_service =
1110                 ptlrpc_init_svc_conf(&m->mdt_service_conf, mdt_handle,
1111                                      LUSTRE_MDT0_NAME,
1112                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
1113                                      NULL);
1114         if (m->mdt_service == NULL)
1115                 GOTO(err_free_ns, rc = -ENOMEM);
1116
1117         /* init the stack */
1118         LASSERT(mdt_child->ld_type->ldt_ops->ldto_device_init != NULL);
1119         rc = mdt_child->ld_type->ldt_ops->ldto_device_init(mdt_child, top);
1120         if (rc) {
1121                 CERROR("can't init device stack, rc %d\n", rc);
1122                 GOTO(err_free_svc, rc);
1123         }
1124
1125         /* init sequence info after device stack is initialized. */
1126         rc = mdt_seq_init(m);
1127         if (rc)
1128                 GOTO(err_fini_child, rc);
1129
1130         rc = ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
1131         if (rc)
1132                 GOTO(err_fini_child, rc);
1133
1134         RETURN(0);
1135
1136 err_fini_child:
1137         mdt_child->ld_type->ldt_ops->ldto_device_fini(mdt_child);
1138 err_free_svc:
1139         ptlrpc_unregister_service(m->mdt_service);
1140         m->mdt_service = NULL;
1141 err_free_ns:
1142         ldlm_namespace_free(m->mdt_namespace, 0);
1143         m->mdt_namespace = NULL;
1144 err_fini_site:
1145         lu_site_fini(s);
1146         OBD_FREE_PTR(s);
1147         RETURN(rc);
1148 }
1149
1150 static struct lu_object *mdt_object_alloc(struct lu_device *d)
1151 {
1152         struct mdt_object *mo;
1153
1154         OBD_ALLOC_PTR(mo);
1155         if (mo != NULL) {
1156                 struct lu_object *o;
1157                 struct lu_object_header *h;
1158
1159                 o = &mo->mot_obj.mo_lu;
1160                 h = &mo->mot_header;
1161                 lu_object_header_init(h);
1162                 lu_object_init(o, h, d);
1163                 lu_object_add_top(h, o);
1164                 return o;
1165         } else
1166                 return NULL;
1167 }
1168
1169 static int mdt_object_init(struct lu_object *o)
1170 {
1171         struct mdt_device *d = mdt_dev(o->lo_dev);
1172         struct lu_device  *under;
1173         struct lu_object  *below;
1174
1175         under = &d->mdt_child->md_lu_dev;
1176         below = under->ld_ops->ldo_object_alloc(under);
1177         if (below != NULL) {
1178                 lu_object_add(o, below);
1179                 return 0;
1180         } else
1181                 return -ENOMEM;
1182 }
1183
1184 static void mdt_object_free(struct lu_object *o)
1185 {
1186         struct lu_object_header *h;
1187
1188         h = o->lo_header;
1189         lu_object_fini(o);
1190         lu_object_header_fini(h);
1191 }
1192
1193 static void mdt_object_release(struct lu_object *o)
1194 {
1195 }
1196
1197 static int mdt_object_print(struct seq_file *f, const struct lu_object *o)
1198 {
1199         return seq_printf(f, LUSTRE_MDT0_NAME"-object@%p", o);
1200 }
1201
1202 static struct lu_device_operations mdt_lu_ops = {
1203         .ldo_object_alloc   = mdt_object_alloc,
1204         .ldo_object_init    = mdt_object_init,
1205         .ldo_object_free    = mdt_object_free,
1206         .ldo_object_release = mdt_object_release,
1207         .ldo_object_print   = mdt_object_print
1208 };
1209
1210 /* mds_connect copy */
1211 static int mdt_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
1212                            struct obd_uuid *cluuid, struct obd_connect_data *data)
1213 {
1214         struct obd_export *exp;
1215         int rc, abort_recovery;
1216         struct mds_export_data *med;
1217         struct mds_client_data *mcd = NULL;
1218
1219         ENTRY;
1220
1221         if (!conn || !obd || !cluuid)
1222                 RETURN(-EINVAL);
1223
1224         /* Check for aborted recovery. */
1225         spin_lock_bh(&obd->obd_processing_task_lock);
1226         abort_recovery = obd->obd_abort_recovery;
1227         spin_unlock_bh(&obd->obd_processing_task_lock);
1228         if (abort_recovery)
1229                 target_abort_recovery(obd);
1230
1231         /* XXX There is a small race between checking the list and adding a
1232          * new connection for the same UUID, but the real threat (list
1233          * corruption when multiple different clients connect) is solved.
1234          *
1235          * There is a second race between adding the export to the list,
1236          * and filling in the client data below.  Hence skipping the case
1237          * of NULL mcd above.  We should already be controlling multiple
1238          * connects at the client, and we can't hold the spinlock over
1239          * memory allocations without risk of deadlocking.
1240          */
1241         rc = class_connect(conn, obd, cluuid);
1242         if (rc)
1243                 RETURN(rc);
1244         exp = class_conn2export(conn);
1245         LASSERT(exp);
1246         med = &exp->exp_mds_data;
1247
1248         OBD_ALLOC(mcd, sizeof(*mcd));
1249         if (!mcd)
1250                 GOTO(out, rc = -ENOMEM);
1251
1252         memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
1253         med->med_mcd = mcd;
1254
1255         rc = mdt_seq_alloc(mdt_dev(obd->obd_lu_dev),
1256                            &data->ocd_seq);
1257         if (rc)
1258                 GOTO(out, rc);
1259 out:
1260         if (rc) {
1261                 if (mcd) {
1262                         OBD_FREE(mcd, sizeof(*mcd));
1263                         med->med_mcd = NULL;
1264                 }
1265                 class_disconnect(exp);
1266         } else {
1267                 class_export_put(exp);
1268         }
1269
1270         RETURN(rc);
1271 }
1272
1273 static struct obd_ops mdt_obd_device_ops = {
1274         .o_owner = THIS_MODULE,
1275         .o_connect = mdt_obd_connect
1276 };
1277
1278 static struct lu_device *mdt_device_alloc(struct lu_device_type *t,
1279                                           struct lustre_cfg *cfg)
1280 {
1281         struct lu_device  *l;
1282         struct mdt_device *m;
1283
1284         OBD_ALLOC_PTR(m);
1285         if (m != NULL) {
1286                 int result;
1287
1288                 l = &m->mdt_md_dev.md_lu_dev;
1289                 result = mdt_init0(m, t, cfg);
1290                 if (result != 0) {
1291                         mdt_fini(m);
1292                         return ERR_PTR(result);
1293                 }
1294
1295         } else
1296                 l = ERR_PTR(-ENOMEM);
1297         return l;
1298 }
1299
1300 static void mdt_device_free(struct lu_device *d)
1301 {
1302         struct mdt_device *m = mdt_dev(d);
1303
1304         mdt_fini(m);
1305         OBD_FREE_PTR(m);
1306 }
1307
1308 static void *mdt_thread_init(struct ptlrpc_thread *t)
1309 {
1310         struct mdt_thread_info *info;
1311
1312         return OBD_ALLOC_PTR(info) ? : ERR_PTR(-ENOMEM);
1313 }
1314
1315 static void mdt_thread_fini(struct ptlrpc_thread *t, void *data)
1316 {
1317         struct mdt_thread_info *info = data;
1318         OBD_FREE_PTR(info);
1319 }
1320
1321 static struct ptlrpc_thread_key mdt_thread_key = {
1322         .ptk_init = mdt_thread_init,
1323         .ptk_fini = mdt_thread_fini
1324 };
1325
1326 static int mdt_type_init(struct lu_device_type *t)
1327 {
1328         return ptlrpc_thread_key_register(&mdt_thread_key);
1329 }
1330
1331 static void mdt_type_fini(struct lu_device_type *t)
1332 {
1333 }
1334
1335 static struct lu_device_type_operations mdt_device_type_ops = {
1336         .ldto_init = mdt_type_init,
1337         .ldto_fini = mdt_type_fini,
1338
1339         .ldto_device_alloc = mdt_device_alloc,
1340         .ldto_device_free  = mdt_device_free
1341 };
1342
1343 static struct lu_device_type mdt_device_type = {
1344         .ldt_tags = LU_DEVICE_MD,
1345         .ldt_name = LUSTRE_MDT0_NAME,
1346         .ldt_ops  = &mdt_device_type_ops
1347 };
1348
1349 static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
1350         { 0 }
1351 };
1352
1353 static struct lprocfs_vars lprocfs_mdt_module_vars[] = {
1354         { 0 }
1355 };
1356
1357 LPROCFS_INIT_VARS(mdt, lprocfs_mdt_module_vars, lprocfs_mdt_obd_vars);
1358
1359 static int __init mdt_mod_init(void)
1360 {
1361         struct lprocfs_static_vars lvars;
1362
1363         mdt_num_threads = MDT_NUM_THREADS;
1364         lprocfs_init_vars(mdt, &lvars);
1365         return class_register_type(&mdt_obd_device_ops, lvars.module_vars,
1366                                    LUSTRE_MDT0_NAME, &mdt_device_type);
1367 }
1368
1369 static void __exit mdt_mod_exit(void)
1370 {
1371         class_unregister_type(LUSTRE_MDT0_NAME);
1372 }
1373
1374
1375 #define DEF_HNDL(prefix, base, suffix, flags, opc, fn)                  \
1376 [prefix ## _ ## opc - prefix ## _ ## base] = {                          \
1377         .mh_name    = #opc,                                             \
1378         .mh_fail_id = OBD_FAIL_ ## prefix ## _  ## opc ## suffix,       \
1379         .mh_opc     = prefix ## _  ## opc,                              \
1380         .mh_flags   = flags,                                            \
1381         .mh_act     = fn                                                \
1382 }
1383
1384 #define DEF_MDT_HNDL(flags, name, fn)                   \
1385         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn)
1386
1387 static struct mdt_handler mdt_mds_ops[] = {
1388         DEF_MDT_HNDL(0,            CONNECT,        mdt_connect),
1389         DEF_MDT_HNDL(0,            DISCONNECT,     mdt_disconnect),
1390         DEF_MDT_HNDL(0,            GETSTATUS,      mdt_getstatus),
1391         DEF_MDT_HNDL(HABEO_CORPUS, GETATTR,        mdt_getattr),
1392         DEF_MDT_HNDL(HABEO_CORPUS, GETATTR_NAME,   mdt_getattr_name),
1393         DEF_MDT_HNDL(HABEO_CORPUS, SETXATTR,       mdt_setxattr),
1394         DEF_MDT_HNDL(HABEO_CORPUS, GETXATTR,       mdt_getxattr),
1395         DEF_MDT_HNDL(0,            STATFS,         mdt_statfs),
1396         DEF_MDT_HNDL(HABEO_CORPUS, READPAGE,       mdt_readpage),
1397         DEF_MDT_HNDL(0,            REINT,          mdt_reint),
1398         DEF_MDT_HNDL(HABEO_CORPUS, CLOSE,          mdt_close),
1399         DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING,   mdt_done_writing),
1400         DEF_MDT_HNDL(0,            PIN,            mdt_pin),
1401         DEF_MDT_HNDL(HABEO_CORPUS, SYNC,           mdt_sync),
1402         DEF_MDT_HNDL(0,            SET_INFO,       mdt_set_info),
1403         DEF_MDT_HNDL(0,            QUOTACHECK,     mdt_handle_quotacheck),
1404         DEF_MDT_HNDL(0,            QUOTACTL,       mdt_handle_quotactl)
1405 };
1406
1407 static struct mdt_handler mdt_obd_ops[] = {
1408 };
1409
1410 #define DEF_DLM_HNDL(flags, name, fn)                   \
1411         DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn)
1412
1413 static struct mdt_handler mdt_dlm_ops[] = {
1414         DEF_DLM_HNDL(HABEO_CLAVIS, ENQUEUE,        mdt_enqueue),
1415         DEF_DLM_HNDL(HABEO_CLAVIS, CONVERT,        mdt_convert),
1416         DEF_DLM_HNDL(0,            BL_CALLBACK,    mdt_bl_callback),
1417         DEF_DLM_HNDL(0,            CP_CALLBACK,    mdt_cp_callback)
1418 };
1419
1420 static struct mdt_handler mdt_llog_ops[] = {
1421 };
1422
1423 static struct mdt_opc_slice mdt_handlers[] = {
1424         {
1425                 .mos_opc_start = MDS_GETATTR,
1426                 .mos_opc_end   = MDS_LAST_OPC,
1427                 .mos_hs        = mdt_mds_ops
1428         },
1429         {
1430                 .mos_opc_start = OBD_PING,
1431                 .mos_opc_end   = OBD_LAST_OPC,
1432                 .mos_hs        = mdt_obd_ops
1433         },
1434         {
1435                 .mos_opc_start = LDLM_ENQUEUE,
1436                 .mos_opc_end   = LDLM_LAST_OPC,
1437                 .mos_hs        = mdt_dlm_ops
1438         },
1439         {
1440                 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
1441                 .mos_opc_end   = LLOG_LAST_OPC,
1442                 .mos_hs        = mdt_llog_ops
1443         },
1444         {
1445                 .mos_hs        = NULL
1446         }
1447 };
1448
1449 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1450 MODULE_DESCRIPTION("Lustre Meta-data Target Prototype ("LUSTRE_MDT0_NAME")");
1451 MODULE_LICENSE("GPL");
1452
1453 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
1454                 "number of mdt service threads to start");
1455
1456 cfs_module(mdt, "0.0.3", mdt_mod_init, mdt_mod_exit);