Whamcloud - gitweb
add layers initialization here. MDT as top device will init/fini other devices.
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/handler.c
5  *  Lustre Metadata Target (mdt) request handler
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *   Author: Nikita Danilov <nikita@clusterfs.com>
13  *
14  *   This file is part of the Lustre file system, http://www.lustre.org
15  *   Lustre is a trademark of Cluster File Systems, Inc.
16  *
17  *   You may have signed or agreed to another license before downloading
18  *   this software.  If so, you are bound by the terms and conditions
19  *   of that agreement, and the following does not apply to you.  See the
20  *   LICENSE file included with this distribution for more information.
21  *
22  *   If you did not agree to a different license, then this copy of Lustre
23  *   is open source software; you can redistribute it and/or modify it
24  *   under the terms of version 2 of the GNU General Public License as
25  *   published by the Free Software Foundation.
26  *
27  *   In either case, Lustre is distributed in the hope that it will be
28  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
29  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
30  *   license text for more details.
31  */
32
33 #ifndef EXPORT_SYMTAB
34 # define EXPORT_SYMTAB
35 #endif
36 #define DEBUG_SUBSYSTEM S_MDS
37
38 #include <linux/module.h>
39
40 /* LUSTRE_VERSION_CODE */
41 #include <linux/lustre_ver.h>
42 /*
43  * struct OBD_{ALLOC,FREE}*()
44  * OBD_FAIL_CHECK
45  */
46 #include <linux/obd_support.h>
47 /* struct ptlrpc_request */
48 #include <linux/lustre_net.h>
49 /* struct obd_export */
50 #include <linux/lustre_export.h>
51 /* struct obd_device */
52 #include <linux/obd.h>
53
54 /* struct mds_client_data */
55 #include "../mds/mds_internal.h"
56 #include "mdt_internal.h"
57
58 /*
59  * Initialized in mdt_mod_init().
60  */
61 unsigned long mdt_num_threads;
62
63 static int                mdt_handle    (struct ptlrpc_request *req);
64 static struct mdt_device *mdt_dev       (struct lu_device *d);
65 static struct lu_fid     *mdt_object_fid(struct mdt_object *o);
66
67 static struct lu_context_key mdt_thread_key;
68
69 /* object operations */
70 static int mdt_md_mkdir(struct mdt_thread_info *info, struct mdt_device *d,
71                         struct lu_fid *pfid, const char *name,
72                         struct lu_fid *cfid)
73 {
74         struct mdt_object      *o;
75         struct mdt_object      *child;
76         struct mdt_lock_handle *lh;
77
78         int result;
79
80         lh = &info->mti_lh[MDT_LH_PARENT];
81         lh->mlh_mode = LCK_PW;
82
83         o = mdt_object_find_lock(info->mti_ctxt,
84                                  d, pfid, lh, MDS_INODELOCK_UPDATE);
85         if (IS_ERR(o))
86                 return PTR_ERR(o);
87
88         child = mdt_object_find(info->mti_ctxt, d, cfid);
89         if (!IS_ERR(child)) {
90                 struct md_object *next = mdt_object_child(o);
91
92                 result = next->mo_ops->moo_mkdir(info->mti_ctxt, next, name,
93                                                  mdt_object_child(child));
94                 mdt_object_put(info->mti_ctxt, child);
95         } else
96                 result = PTR_ERR(child);
97         mdt_object_unlock(d->mdt_namespace, o, lh);
98         mdt_object_put(info->mti_ctxt, o);
99         return result;
100 }
101
102 static int mdt_getstatus(struct mdt_thread_info *info,
103                          struct ptlrpc_request *req, int offset)
104 {
105         struct md_device *next  = info->mti_mdt->mdt_child;
106         struct mdt_body  *body;
107         int               size = sizeof *body;
108         int               result;
109
110         ENTRY;
111
112         result = lustre_pack_reply(req, 1, &size, NULL);
113         if (result)
114                 CERROR(LUSTRE_MDT0_NAME" out of memory for message: size=%d\n",
115                        size);
116         else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
117                 result = -ENOMEM;
118         else {
119                 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof *body);
120                 result = next->md_ops->mdo_root_get(info->mti_ctxt,
121                                                     next, &body->fid1);
122         }
123
124         /* the last_committed and last_xid fields are filled in for all
125          * replies already - no need to do so here also.
126          */
127         RETURN(result);
128 }
129
130 static int mdt_statfs(struct mdt_thread_info *info,
131                       struct ptlrpc_request *req, int offset)
132 {
133         struct md_device  *next  = info->mti_mdt->mdt_child;
134         struct obd_statfs *osfs;
135         struct kstatfs    sfs;
136         int               result;
137         int               size = sizeof(struct obd_statfs);
138
139         ENTRY;
140
141         result = lustre_pack_reply(req, 1, &size, NULL);
142         if (result)
143                 CERROR(LUSTRE_MDT0_NAME" out of memory for statfs: size=%d\n",
144                        size);
145         else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
146                 CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
147                 result = -ENOMEM;
148         } else {
149                 osfs = lustre_msg_buf(req->rq_repmsg, 0, size);
150                 /* XXX max_age optimisation is needed here. See mds_statfs */
151                 result = next->md_ops->mdo_statfs(info->mti_ctxt, next, &sfs);
152                 statfs_pack(osfs, &sfs);
153         }
154
155         RETURN(result);
156 }
157
158 static void mdt_pack_attr2body(struct mdt_body *b, struct lu_attr *attr)
159 {
160         b->valid |= OBD_MD_FLID | OBD_MD_FLCTIME | OBD_MD_FLUID |
161                     OBD_MD_FLGID | OBD_MD_FLFLAGS | OBD_MD_FLTYPE |
162                     OBD_MD_FLMODE | OBD_MD_FLNLINK | OBD_MD_FLGENER;
163
164         if (!S_ISREG(attr->la_mode))
165                 b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLATIME |
166                             OBD_MD_FLMTIME;
167
168         b->atime      = attr->la_atime;
169         b->mtime      = attr->la_mtime;
170         b->ctime      = attr->la_ctime;
171         b->mode       = attr->la_mode;
172         b->size       = attr->la_size;
173         b->blocks     = attr->la_blocks;
174         b->uid        = attr->la_uid;
175         b->gid        = attr->la_gid;
176         b->flags      = attr->la_flags;
177         b->nlink      = attr->la_nlink;
178 }
179
180 static int mdt_getattr(struct mdt_thread_info *info,
181                        struct ptlrpc_request *req, int offset)
182 {
183         struct mdt_body *body;
184         int              size = sizeof (*body);
185         int              result;
186
187         LASSERT(info->mti_object != NULL);
188
189         ENTRY;
190
191         result = lustre_pack_reply(req, 1, &size, NULL);
192         if (result)
193                 CERROR(LUSTRE_MDT0_NAME" cannot pack size=%d, rc=%d\n",
194                        size, result);
195         else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
196                 CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
197                 result = -ENOMEM;
198         } else {
199                 struct md_object *next = mdt_object_child(info->mti_object);
200
201                 result = next->mo_ops->moo_attr_get(info->mti_ctxt, next,
202                                                     &info->mti_ctxt->lc_attr);
203                 if (result == 0) {
204                         body = lustre_msg_buf(req->rq_repmsg, 0, size);
205                         mdt_pack_attr2body(body, &info->mti_ctxt->lc_attr);
206                         body->fid1 = *mdt_object_fid(info->mti_object);
207                 }
208         }
209         RETURN(result);
210 }
211
212 static struct lu_device_operations mdt_lu_ops;
213
214 static int lu_device_is_mdt(struct lu_device *d)
215 {
216         /*
217          * XXX for now. Tags in lu_device_type->ldt_something are needed.
218          */
219         return ergo(d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
220 }
221
222 static struct mdt_device *mdt_dev(struct lu_device *d)
223 {
224         LASSERT(lu_device_is_mdt(d));
225         return container_of(d, struct mdt_device, mdt_md_dev.md_lu_dev);
226 }
227
228 static int mdt_connect(struct mdt_thread_info *info,
229                        struct ptlrpc_request *req, int offset)
230 {
231         int result;
232
233         result = target_handle_connect(req, mdt_handle);
234         if (result == 0) {
235                 struct obd_connect_data *data;
236
237                 LASSERT(req->rq_export != NULL);
238                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
239
240                 data = lustre_msg_buf(req->rq_repmsg, 0, sizeof *data);
241                 result = seq_mgr_alloc(info->mti_ctxt,
242                                        info->mti_mdt->mdt_seq_mgr,
243                                        &data->ocd_seq);
244         }
245         return result;
246 }
247
248 static int mdt_disconnect(struct mdt_thread_info *info,
249                           struct ptlrpc_request *req, int offset)
250 {
251         //return -EOPNOTSUPP;
252         return target_handle_disconnect(req);
253 }
254
255 static int mdt_getattr_name(struct mdt_thread_info *info,
256                             struct ptlrpc_request *req, int offset)
257 {
258         return -EOPNOTSUPP;
259 }
260
261 static int mdt_setxattr(struct mdt_thread_info *info,
262                         struct ptlrpc_request *req, int offset)
263 {
264         return -EOPNOTSUPP;
265 }
266
267 static int mdt_getxattr(struct mdt_thread_info *info,
268                         struct ptlrpc_request *req, int offset)
269 {
270         return -EOPNOTSUPP;
271 }
272
273 static int mdt_readpage(struct mdt_thread_info *info,
274                         struct ptlrpc_request *req, int offset)
275 {
276         return -EOPNOTSUPP;
277 }
278
279 static int mdt_reint(struct mdt_thread_info *info,
280                      struct ptlrpc_request *req, int offset)
281 {
282         return -EOPNOTSUPP;
283 }
284
285 static int mdt_close(struct mdt_thread_info *info,
286                      struct ptlrpc_request *req, int offset)
287 {
288         return -EOPNOTSUPP;
289 }
290
291 static int mdt_done_writing(struct mdt_thread_info *info,
292                             struct ptlrpc_request *req, int offset)
293 {
294         return -EOPNOTSUPP;
295 }
296
297 static int mdt_pin(struct mdt_thread_info *info,
298                    struct ptlrpc_request *req, int offset)
299 {
300         return -EOPNOTSUPP;
301 }
302
303 static int mdt_sync(struct mdt_thread_info *info,
304                     struct ptlrpc_request *req, int offset)
305 {
306         return -EOPNOTSUPP;
307 }
308
309 static int mdt_handle_quotacheck(struct mdt_thread_info *info,
310                                  struct ptlrpc_request *req, int offset)
311 {
312         return -EOPNOTSUPP;
313 }
314
315 static int mdt_handle_quotactl(struct mdt_thread_info *info,
316                                struct ptlrpc_request *req, int offset)
317 {
318         return -EOPNOTSUPP;
319 }
320
321 /*
322  * DLM handlers.
323  */
324
325 static struct ldlm_callback_suite cbs = {
326         .lcs_completion = ldlm_server_completion_ast,
327         .lcs_blocking   = ldlm_server_blocking_ast,
328         .lcs_glimpse    = NULL
329 };
330
331 static int mdt_enqueue(struct mdt_thread_info *info,
332                        struct ptlrpc_request *req, int offset)
333 {
334         /*
335          * info->mti_dlm_req already contains swapped and (if necessary)
336          * converted dlm request.
337          */
338         LASSERT(info->mti_dlm_req != NULL);
339
340         info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
341         return ldlm_handle_enqueue0(req, info->mti_dlm_req, &cbs);
342 }
343
344 static int mdt_convert(struct mdt_thread_info *info,
345                        struct ptlrpc_request *req, int offset)
346 {
347         LASSERT(info->mti_dlm_req);
348         return ldlm_handle_convert0(req, info->mti_dlm_req);
349 }
350
351 static int mdt_bl_callback(struct mdt_thread_info *info,
352                            struct ptlrpc_request *req, int offset)
353 {
354         CERROR("bl callbacks should not happen on MDS\n");
355         LBUG();
356         return -EOPNOTSUPP;
357 }
358
359 static int mdt_cp_callback(struct mdt_thread_info *info,
360                            struct ptlrpc_request *req, int offset)
361 {
362         CERROR("cp callbacks should not happen on MDS\n");
363         LBUG();
364         return -EOPNOTSUPP;
365 }
366
367 /*
368  * Build (DLM) resource name from fid.
369  */
370 struct ldlm_res_id *fid_build_res_name(const struct lu_fid *f,
371                                        struct ldlm_res_id *name)
372 {
373         memset(name, 0, sizeof *name);
374         /* we use fid_num() whoch includes also object version instread of raw
375          * fid_oid(). */
376         name->name[0] = fid_seq(f);
377         name->name[1] = fid_num(f);
378         return name;
379 }
380
381 /*
382  * Return true if resource is for object identified by fid.
383  */
384 int fid_res_name_eq(const struct lu_fid *f, const struct ldlm_res_id *name)
385 {
386         return name->name[0] == fid_seq(f) && name->name[1] == fid_num(f);
387 }
388
389 /* issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
390 int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
391              struct lustre_handle *lh, ldlm_mode_t mode,
392              ldlm_policy_data_t *policy)
393 {
394         struct ldlm_res_id res_id;
395         int flags = 0, rc;
396         ENTRY;
397
398         LASSERT(ns != NULL);
399         LASSERT(lh != NULL);
400         LASSERT(f != NULL);
401
402         /* FIXME: is that correct to have @flags=0 here? */
403         rc = ldlm_cli_enqueue(NULL, NULL, ns, *fid_build_res_name(f, &res_id),
404                               LDLM_IBITS, policy, mode, &flags,
405                               ldlm_blocking_ast, ldlm_completion_ast, NULL,
406                               NULL, NULL, 0, NULL, lh);
407         RETURN (rc == ELDLM_OK ? 0 : -EIO);
408 }
409
410 void fid_unlock(struct ldlm_namespace *ns, const struct lu_fid *f,
411                 struct lustre_handle *lh, ldlm_mode_t mode)
412 {
413         struct ldlm_lock *lock;
414         ENTRY;
415
416         /* FIXME: this is debug stuff, remove it later. */
417         lock = ldlm_handle2lock(lh);
418         if (!lock) {
419                 CERROR("invalid lock handle "LPX64, lh->cookie);
420                 LBUG();
421         }
422
423         LASSERT(fid_res_name_eq(f, &lock->l_resource->lr_name));
424
425         ldlm_lock_decref(lh, mode);
426         EXIT;
427 }
428
429 static struct mdt_object *mdt_obj(struct lu_object *o)
430 {
431         LASSERT(lu_device_is_mdt(o->lo_dev));
432         return container_of(o, struct mdt_object, mot_obj.mo_lu);
433 }
434
435 struct mdt_object *mdt_object_find(struct lu_context *ctxt,
436                                    struct mdt_device *d,
437                                    struct lu_fid *f)
438 {
439         struct lu_object *o;
440
441         o = lu_object_find(ctxt, d->mdt_md_dev.md_lu_dev.ld_site, f);
442         if (IS_ERR(o))
443                 return (struct mdt_object *)o;
444         else
445                 return mdt_obj(o);
446 }
447
448 void mdt_object_put(struct lu_context *ctxt, struct mdt_object *o)
449 {
450         lu_object_put(ctxt, &o->mot_obj.mo_lu);
451 }
452
453 static struct lu_fid *mdt_object_fid(struct mdt_object *o)
454 {
455         return lu_object_fid(&o->mot_obj.mo_lu);
456 }
457
458 int mdt_object_lock(struct ldlm_namespace *ns, struct mdt_object *o,
459                     struct mdt_lock_handle *lh, __u64 ibits)
460 {
461         ldlm_policy_data_t p = {
462                 .l_inodebits = {
463                         .bits = ibits
464                 }
465         };
466         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
467         LASSERT(lh->mlh_mode != LCK_MINMODE);
468
469         return fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode, &p);
470 }
471
472 void mdt_object_unlock(struct ldlm_namespace *ns, struct mdt_object *o,
473                               struct mdt_lock_handle *lh)
474 {
475         if (lustre_handle_is_used(&lh->mlh_lh)) {
476                 fid_unlock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode);
477                 lh->mlh_lh.cookie = 0;
478         }
479 }
480
481 struct mdt_object *mdt_object_find_lock(struct lu_context *ctxt,
482                                         struct mdt_device *d,
483                                         struct lu_fid *f,
484                                         struct mdt_lock_handle *lh,
485                                         __u64 ibits)
486 {
487         struct mdt_object *o;
488
489         o = mdt_object_find(ctxt, d, f);
490         if (!IS_ERR(o)) {
491                 int result;
492
493                 result = mdt_object_lock(d->mdt_namespace, o, lh, ibits);
494                 if (result != 0) {
495                         mdt_object_put(ctxt, o);
496                         o = ERR_PTR(result);
497                 }
498         }
499         return o;
500 }
501
502 struct mdt_handler {
503         const char *mh_name;
504         int         mh_fail_id;
505         __u32       mh_opc;
506         __u32       mh_flags;
507         int (*mh_act)(struct mdt_thread_info *info,
508                       struct ptlrpc_request *req, int offset);
509 };
510
511 enum mdt_handler_flags {
512         /*
513          * struct mdt_body is passed in the 0-th incoming buffer.
514          */
515         HABEO_CORPUS = (1 << 0),
516         /*
517          * struct ldlm_request is passed in MDS_REQ_INTENT_LOCKREQ_OFF-th
518          * incoming buffer.
519          */
520         HABEO_CLAVIS   = (1 << 1)
521 };
522
523 struct mdt_opc_slice {
524         __u32               mos_opc_start;
525         int                 mos_opc_end;
526         struct mdt_handler *mos_hs;
527 };
528
529 static struct mdt_opc_slice mdt_handlers[];
530
531 static struct mdt_handler *mdt_handler_find(__u32 opc)
532 {
533         struct mdt_opc_slice *s;
534         struct mdt_handler   *h;
535
536         h = NULL;
537         for (s = mdt_handlers; s->mos_hs != NULL; s++) {
538                 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
539                         h = s->mos_hs + (opc - s->mos_opc_start);
540                         if (h->mh_opc != 0)
541                                 LASSERT(h->mh_opc == opc);
542                         else
543                                 h = NULL; /* unsupported opc */
544                         break;
545                 }
546         }
547         return h;
548 }
549
550 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
551 {
552         return req->rq_export->exp_mds_data.med_mcd->mcd_last_xid;
553 }
554
555 static int mdt_lock_resname_compat(struct mdt_device *m,
556                                    struct ldlm_request *req)
557 {
558         /* XXX something... later. */
559         return 0;
560 }
561
562 static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
563 {
564         /* XXX something... later. */
565         return 0;
566 }
567
568 /*
569  * Invoke handler for this request opc. Also do necessary preprocessing
570  * (according to handler ->mh_flags), and post-processing (setting of
571  * ->last_{xid,committed}).
572  */
573 static int mdt_req_handle(struct mdt_thread_info *info,
574                           struct mdt_handler *h, struct ptlrpc_request *req,
575                           int shift)
576 {
577         int result;
578         int off;
579
580         ENTRY;
581
582         LASSERT(h->mh_act != NULL);
583         LASSERT(h->mh_opc == req->rq_reqmsg->opc);
584         LASSERT(current->journal_info == NULL);
585
586         DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
587
588         if (h->mh_fail_id != 0)
589                 OBD_FAIL_RETURN(h->mh_fail_id, 0);
590
591         off = MDS_REQ_REC_OFF + shift;
592
593         result = 0;
594         if (h->mh_flags & HABEO_CORPUS) {
595                 struct mdt_body *body;
596
597                 body = info->mti_body =
598                         lustre_swab_reqbuf(req, off, sizeof *info->mti_body,
599                                            lustre_swab_mdt_body);
600                 if (body != NULL) {
601                         info->mti_object = mdt_object_find(info->mti_ctxt,
602                                                            info->mti_mdt,
603                                                            &body->fid1);
604                         if (IS_ERR(info->mti_object)) {
605                                 result = PTR_ERR(info->mti_object);
606                                 info->mti_object = NULL;
607                         }
608                 } else {
609                         CERROR("Can't unpack body\n");
610                         result = -EFAULT;
611                 }
612         } else if (h->mh_flags & HABEO_CLAVIS) {
613                 struct ldlm_request *dlm;
614
615                 LASSERT(shift == 0);
616                 dlm = info->mti_dlm_req =
617                         lustre_swab_reqbuf(req, MDS_REQ_INTENT_LOCKREQ_OFF,
618                                            sizeof *dlm,
619                                            lustre_swab_ldlm_request);
620                 if (dlm != NULL) {
621                         if (info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME)
622                                 result = mdt_lock_resname_compat(info->mti_mdt,
623                                                                  dlm);
624                 } else {
625                         CERROR("Can't unpack dlm request\n");
626                         result = -EFAULT;
627                 }
628         }
629         if (result == 0)
630                 /*
631                  * Process request.
632                  */
633                 result = h->mh_act(info, req, off);
634         /*
635          * XXX result value is unconditionally shoved into ->rq_status
636          * (original code sometimes placed error code into ->rq_status, and
637          * sometimes returned it to the
638          * caller). ptlrpc_server_handle_request() doesn't check return value
639          * anyway.
640          */
641         req->rq_status = result;
642
643         LASSERT(current->journal_info == NULL);
644
645         if (h->mh_flags & HABEO_CLAVIS &&
646             info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME) {
647                 struct ldlm_reply *rep;
648
649                 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof *rep);
650                 if (rep != NULL)
651                         result = mdt_lock_reply_compat(info->mti_mdt, rep);
652         }
653
654         /* If we're DISCONNECTing, the mds_export_data is already freed */
655         if (result == 0 && h->mh_opc != MDS_DISCONNECT) {
656                 req->rq_reqmsg->last_xid = le64_to_cpu(req_exp_last_xid(req));
657                 target_committed_to_req(req);
658         }
659         RETURN(result);
660 }
661
662 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
663 {
664         lh->mlh_lh.cookie = 0ull;
665         lh->mlh_mode = LCK_MINMODE;
666 }
667
668 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
669 {
670         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
671 }
672
673 static void mdt_thread_info_init(struct mdt_thread_info *info)
674 {
675         int i;
676
677         info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
678         /*
679          * Poison size array.
680          */
681         for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
682                 info->mti_rep_buf_size[i] = ~0;
683         info->mti_rep_buf_nr = i;
684         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
685                 mdt_lock_handle_init(&info->mti_lh[i]);
686         lu_context_enter(info->mti_ctxt);
687 }
688
689 static void mdt_thread_info_fini(struct mdt_thread_info *info)
690 {
691         int i;
692
693         lu_context_exit(info->mti_ctxt);
694         if (info->mti_object != NULL) {
695                 mdt_object_put(info->mti_ctxt, info->mti_object);
696                 info->mti_object = NULL;
697         }
698         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
699                 mdt_lock_handle_fini(&info->mti_lh[i]);
700 }
701
702 static int mds_msg_check_version(struct lustre_msg *msg)
703 {
704         int rc;
705
706         /* TODO: enable the below check while really introducing msg version.
707          * it's disabled because it will break compatibility with b1_4.
708          */
709         return (0);
710
711         switch (msg->opc) {
712         case MDS_CONNECT:
713         case MDS_DISCONNECT:
714         case OBD_PING:
715                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
716                 if (rc)
717                         CERROR("bad opc %u version %08x, expecting %08x\n",
718                                msg->opc, msg->version, LUSTRE_OBD_VERSION);
719                 break;
720         case MDS_GETSTATUS:
721         case MDS_GETATTR:
722         case MDS_GETATTR_NAME:
723         case MDS_STATFS:
724         case MDS_READPAGE:
725         case MDS_REINT:
726         case MDS_CLOSE:
727         case MDS_DONE_WRITING:
728         case MDS_PIN:
729         case MDS_SYNC:
730         case MDS_GETXATTR:
731         case MDS_SETXATTR:
732         case MDS_SET_INFO:
733         case MDS_QUOTACHECK:
734         case MDS_QUOTACTL:
735         case QUOTA_DQACQ:
736         case QUOTA_DQREL:
737                 rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
738                 if (rc)
739                         CERROR("bad opc %u version %08x, expecting %08x\n",
740                                msg->opc, msg->version, LUSTRE_MDS_VERSION);
741                 break;
742         case LDLM_ENQUEUE:
743         case LDLM_CONVERT:
744         case LDLM_BL_CALLBACK:
745         case LDLM_CP_CALLBACK:
746                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
747                 if (rc)
748                         CERROR("bad opc %u version %08x, expecting %08x\n",
749                                msg->opc, msg->version, LUSTRE_DLM_VERSION);
750                 break;
751         case OBD_LOG_CANCEL:
752         case LLOG_ORIGIN_HANDLE_CREATE:
753         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
754         case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
755         case LLOG_ORIGIN_HANDLE_READ_HEADER:
756         case LLOG_ORIGIN_HANDLE_CLOSE:
757         case LLOG_CATINFO:
758                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
759                 if (rc)
760                         CERROR("bad opc %u version %08x, expecting %08x\n",
761                                msg->opc, msg->version, LUSTRE_LOG_VERSION);
762                 break;
763         default:
764                 CERROR("MDS unknown opcode %d\n", msg->opc);
765                 rc = -ENOTSUPP;
766         }
767         return rc;
768 }
769
770 static int mdt_filter_recovery_request(struct ptlrpc_request *req,
771                                        struct obd_device *obd, int *process)
772 {
773         switch (req->rq_reqmsg->opc) {
774         case MDS_CONNECT: /* This will never get here, but for completeness. */
775         case OST_CONNECT: /* This will never get here, but for completeness. */
776         case MDS_DISCONNECT:
777         case OST_DISCONNECT:
778                *process = 1;
779                RETURN(0);
780
781         case MDS_CLOSE:
782         case MDS_SYNC: /* used in unmounting */
783         case OBD_PING:
784         case MDS_REINT:
785         case LDLM_ENQUEUE:
786                 *process = target_queue_recovery_request(req, obd);
787                 RETURN(0);
788
789         default:
790                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
791                 *process = 0;
792                 /* XXX what should we set rq_status to here? */
793                 req->rq_status = -EAGAIN;
794                 RETURN(ptlrpc_error(req));
795         }
796 }
797
798 /*
799  * Handle recovery. Return:
800  *        +1: continue request processing;
801  *       -ve: abort immediately with the given error code;
802  *         0: send reply with error code in req->rq_status;
803  */
804 static int mdt_recovery(struct ptlrpc_request *req)
805 {
806         int recovering;
807         int abort_recovery;
808         struct obd_device *obd;
809
810         ENTRY;
811
812         if (req->rq_reqmsg->opc == MDS_CONNECT)
813                 RETURN(+1);
814
815         if (req->rq_export == NULL) {
816                 CERROR("operation %d on unconnected MDS from %s\n",
817                        req->rq_reqmsg->opc,
818                        libcfs_id2str(req->rq_peer));
819                 req->rq_status = -ENOTCONN;
820                 RETURN(0);
821         }
822
823         /* sanity check: if the xid matches, the request must be marked as a
824          * resent or replayed */
825         LASSERTF(ergo(req->rq_xid == req_exp_last_xid(req),
826                       lustre_msg_get_flags(req->rq_reqmsg) &
827                       (MSG_RESENT | MSG_REPLAY)),
828                  "rq_xid "LPU64" matches last_xid, "
829                  "expected RESENT flag\n", req->rq_xid);
830
831         /* else: note the opposite is not always true; a RESENT req after a
832          * failover will usually not match the last_xid, since it was likely
833          * never committed. A REPLAYed request will almost never match the
834          * last xid, however it could for a committed, but still retained,
835          * open. */
836
837         obd = req->rq_export->exp_obd;
838
839         /* Check for aborted recovery... */
840         spin_lock_bh(&obd->obd_processing_task_lock);
841         abort_recovery = obd->obd_abort_recovery;
842         recovering = obd->obd_recovering;
843         spin_unlock_bh(&obd->obd_processing_task_lock);
844         if (abort_recovery) {
845                 target_abort_recovery(obd);
846         } else if (recovering) {
847                 int rc;
848                 int should_process;
849
850                 rc = mdt_filter_recovery_request(req, obd, &should_process);
851                 if (rc != 0 || !should_process) {
852                         LASSERT(rc < 0);
853                         RETURN(rc);
854                 }
855         }
856         RETURN(+1);
857 }
858
859 static int mdt_reply(struct ptlrpc_request *req, struct mdt_thread_info *info)
860 {
861         struct obd_device *obd;
862
863         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
864                 if (req->rq_reqmsg->opc != OBD_PING)
865                         DEBUG_REQ(D_ERROR, req, "Unexpected MSG_LAST_REPLAY");
866
867                 obd = req->rq_export != NULL ? req->rq_export->exp_obd : NULL;
868                 if (obd && obd->obd_recovering) {
869                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
870                         RETURN(target_queue_final_reply(req, req->rq_status));
871                 } else {
872                         /* Lost a race with recovery; let the error path
873                          * DTRT. */
874                         req->rq_status = -ENOTCONN;
875                 }
876         }
877         target_send_reply(req, req->rq_status, info->mti_fail_id);
878         RETURN(req->rq_status);
879 }
880
881 static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info)
882 {
883         struct mdt_handler *h;
884         struct lustre_msg  *msg;
885         int                 result;
886
887         ENTRY;
888
889         OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
890
891         LASSERT(current->journal_info == NULL);
892
893         msg = req->rq_reqmsg;
894         result = mds_msg_check_version(msg);
895         if (result == 0) {
896                 result = mdt_recovery(req);
897                 switch (result) {
898                 case +1:
899                         h = mdt_handler_find(msg->opc);
900                         if (h != NULL)
901                                 result = mdt_req_handle(info, h, req, 0);
902                         else {
903                                 req->rq_status = -ENOTSUPP;
904                                 result = ptlrpc_error(req);
905                                 break;
906                         }
907                         /* fall through */
908                 case 0:
909                         result = mdt_reply(req, info);
910                 }
911         } else
912                 CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
913         RETURN(result);
914 }
915
916 static int mdt_handle(struct ptlrpc_request *req)
917 {
918         int result;
919         struct lu_context      *ctx;
920         struct mdt_thread_info *info;
921         ENTRY;
922
923         ctx = req->rq_svc_thread->t_ctx;
924         LASSERT(ctx != NULL);
925         LASSERT(ctx->lc_thread == req->rq_svc_thread);
926
927         info = lu_context_key_get(ctx, &mdt_thread_key);
928         LASSERT(info != NULL);
929
930         mdt_thread_info_init(info);
931         /* it can be NULL while CONNECT */
932         if (req->rq_export)
933                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
934
935         result = mdt_handle0(req, info);
936         mdt_thread_info_fini(info);
937         return result;
938 }
939
940 static int mdt_intent_policy(struct ldlm_namespace *ns,
941                              struct ldlm_lock **lockp, void *req_cookie,
942                              ldlm_mode_t mode, int flags, void *data)
943 {
944         ENTRY;
945         RETURN(ELDLM_LOCK_ABORTED);
946 }
947
948 struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
949                                             svc_handler_t h, char *name,
950                                             struct proc_dir_entry *proc_entry,
951                                             svcreq_printfn_t prntfn)
952 {
953         return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize,
954                                c->psc_max_req_size, c->psc_max_reply_size,
955                                c->psc_req_portal, c->psc_rep_portal,
956                                c->psc_watchdog_timeout,
957                                h, name, proc_entry,
958                                prntfn, c->psc_num_threads);
959 }
960
961 static int mdt_config(struct lu_context *ctx, struct mdt_device *m,
962                       const char *name, void *buf, int size, int mode)
963 {
964         struct md_device *child = m->mdt_child;
965         ENTRY;
966         RETURN(child->md_ops->mdo_config(ctx, child, name, buf, size, mode));
967 }
968
969 static int mdt_seq_mgr_hpr(struct lu_context *ctx, void *opaque, __u64 *seq,
970                            int mode)
971 {
972         struct mdt_device *m = opaque;
973         int rc;
974         ENTRY;
975
976         rc = mdt_config(ctx, m, LUSTRE_CONFIG_METASEQ,
977                         seq, sizeof(*seq),
978                         mode);
979         RETURN(rc);
980 }
981
982 static int mdt_seq_mgr_read(struct lu_context *ctx, void *opaque, __u64 *seq)
983 {
984         ENTRY;
985         RETURN(mdt_seq_mgr_hpr(ctx, opaque, seq, LUSTRE_CONFIG_GET));
986 }
987
988 static int mdt_seq_mgr_write(struct lu_context *ctx, void *opaque, __u64 *seq)
989 {
990         ENTRY;
991         RETURN(mdt_seq_mgr_hpr(ctx, opaque, seq, LUSTRE_CONFIG_SET));
992 }
993
994 struct lu_seq_mgr_ops seq_mgr_ops = {
995         .smo_read  = mdt_seq_mgr_read,
996         .smo_write = mdt_seq_mgr_write
997 };
998
999 /* device init/fini methods */
1000
1001 static int mdt_fld(struct mdt_thread_info *info,
1002                    struct ptlrpc_request *req, int offset)
1003 {
1004         struct lu_site *ls  = info->mti_mdt->mdt_md_dev.md_lu_dev.ld_site;
1005         struct md_fld mf, *p, *reply;
1006         int size = sizeof(*reply);
1007         __u32 *opt; 
1008         int rc;
1009         ENTRY;
1010
1011         rc = lustre_pack_reply(req, 1, &size, NULL);
1012         if (rc)
1013                 RETURN(rc);
1014
1015         opt = lustre_swab_reqbuf(req, 0, sizeof(*opt), lustre_swab_generic_32s);
1016         p = lustre_swab_reqbuf(req, 1, sizeof(mf), lustre_swab_md_fld);
1017         mf = *p;
1018         
1019         rc = fld_handle(ls->ls_fld, *opt, &mf);
1020         if (rc)
1021                 RETURN(rc);        
1022
1023         reply = lustre_msg_buf(req->rq_repmsg, 0, size);
1024         *reply = mf;
1025         RETURN(rc);
1026 }
1027
1028 struct dt_device *md2_bottom_dev(struct mdt_device *m)
1029 {
1030         /*FIXME: get dt device here*/
1031         RETURN (NULL);
1032 }
1033
1034 static int mdt_fld_init(struct mdt_device *m)
1035 {
1036         struct dt_device *dt;
1037         struct lu_site   *ls;
1038         int rc;
1039         ENTRY;
1040
1041         dt = md2_bottom_dev(m);
1042        
1043         ls = m->mdt_md_dev.md_lu_dev.ld_site;
1044         
1045         OBD_ALLOC_PTR(ls->ls_fld);
1046         
1047         if (!ls->ls_fld)
1048              RETURN(-ENOMEM);
1049         
1050         rc = fld_server_init(ls->ls_fld, dt);
1051         
1052         RETURN(rc);
1053 }
1054
1055 static int mdt_fld_fini(struct mdt_device *m)
1056 {
1057         struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
1058         int rc = 0;
1059  
1060         if (ls && ls->ls_fld) {
1061                 fld_server_fini(ls->ls_fld);
1062                 OBD_FREE_PTR(ls->ls_fld);
1063         }
1064         RETURN(rc);
1065 }
1066
1067 static void mdt_stop_ptlrpc_service(struct mdt_device *m)
1068 {
1069         if (m->mdt_service != NULL) {
1070                 ptlrpc_unregister_service(m->mdt_service);
1071                 m->mdt_service = NULL;
1072         }
1073         if (m->mdt_fld_service != NULL) {
1074                 ptlrpc_unregister_service(m->mdt_fld_service);
1075                 m->mdt_fld_service = NULL;
1076         }
1077 }
1078
1079 static int mdt_start_ptlrpc_service(struct mdt_device *m)
1080 {
1081         int rc;
1082         ENTRY;
1083
1084         m->mdt_service_conf.psc_nbufs            = MDS_NBUFS;
1085         m->mdt_service_conf.psc_bufsize          = MDS_BUFSIZE;
1086         m->mdt_service_conf.psc_max_req_size     = MDS_MAXREQSIZE;
1087         m->mdt_service_conf.psc_max_reply_size   = MDS_MAXREPSIZE;
1088         m->mdt_service_conf.psc_req_portal       = MDS_REQUEST_PORTAL;
1089         m->mdt_service_conf.psc_rep_portal       = MDC_REPLY_PORTAL;
1090         m->mdt_service_conf.psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT;
1091         /*
1092          * We'd like to have a mechanism to set this on a per-device basis,
1093          * but alas...
1094          */
1095         m->mdt_service_conf.psc_num_threads = min(max(mdt_num_threads,
1096                                                       MDT_MIN_THREADS),
1097                                                   MDT_MAX_THREADS);
1098
1099         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1100                            "mdt_ldlm_client", &m->mdt_ldlm_client);
1101
1102         m->mdt_service =
1103                 ptlrpc_init_svc_conf(&m->mdt_service_conf, mdt_handle,
1104                                      LUSTRE_MDT0_NAME,
1105                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
1106                                      NULL);
1107         if (m->mdt_service == NULL)
1108                 RETURN(-ENOMEM);
1109
1110         rc = ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
1111         if (rc)
1112                 GOTO(err_mdt_svc, rc);
1113
1114         /*start mdt fld service */
1115
1116         m->mdt_service_conf.psc_req_portal = MDS_FLD_PORTAL;
1117
1118         m->mdt_fld_service =
1119                 ptlrpc_init_svc_conf(&m->mdt_service_conf, mdt_handle,
1120                                      LUSTRE_FLD0_NAME,
1121                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
1122                                      NULL);
1123         if (m->mdt_fld_service == NULL)
1124                 RETURN(-ENOMEM);
1125
1126         rc = ptlrpc_start_threads(NULL, m->mdt_fld_service, LUSTRE_FLD0_NAME);
1127         if (rc)
1128                 GOTO(err_fld_svc, rc);
1129
1130         RETURN(rc);
1131 err_fld_svc:
1132         ptlrpc_unregister_service(m->mdt_fld_service);
1133         m->mdt_fld_service = NULL;
1134 err_mdt_svc:
1135         ptlrpc_unregister_service(m->mdt_service);
1136         m->mdt_service = NULL;
1137
1138         RETURN(rc);
1139 }
1140
1141 static void mdt_stack_fini(struct mdt_device *m)
1142 {
1143         if (m->mdt_child) {
1144                 struct lu_device *d = md2lu_dev(m->mdt_child);
1145                 /* goes through all stack */
1146                 while (d != NULL) {
1147                         struct lu_device *n;
1148                         struct obd_type *type;
1149                         struct lu_device_type *ldt = d->ld_type;
1150
1151                         lu_device_put(d);
1152                         
1153                         /* each fini() returns next device in stack of layers 
1154                          * so we can avoid the recursion */
1155                         n = d->ld_type->ldt_ops->ldto_device_fini(d);
1156                         d->ld_type->ldt_ops->ldto_device_free(d);
1157
1158                         type = ldt->obd_type;
1159                         type->typ_refcnt--;
1160                         class_put_type(type);
1161                         /* switch to the next device in the layer */
1162                         d = n;
1163                 }
1164         }
1165         return;
1166 }
1167
1168 static struct lu_device *mdt_layer_setup(const char *typename,
1169                                          struct lu_device *child,
1170                                          struct lustre_cfg *cfg)
1171 {
1172         struct obd_type       *type;
1173         struct lu_device_type *ldt;
1174         struct lu_device      *d;
1175         int rc;
1176
1177         /* find the type */
1178         type = class_get_type(typename);
1179         if (!type) {
1180                 CERROR("Unknown type: '%s'\n", typename);
1181                 GOTO(out, rc = -ENODEV);
1182         }
1183
1184         ldt = type->typ_lu;
1185         ldt->obd_type = type;
1186         if (ldt == NULL) {
1187                 CERROR("type: '%s'\n", typename);
1188                 GOTO(out_type, rc = -EINVAL);
1189         }
1190                 
1191         d = ldt->ldt_ops->ldto_device_alloc(ldt, cfg);
1192         if (IS_ERR(d)) {
1193                 CERROR("Cannot allocate device: '%s'\n", typename);
1194                 GOTO(out_type, rc = -ENODEV);
1195         }
1196         
1197         LASSERT(child->ld_site);
1198         d->ld_site = child->ld_site;
1199
1200         type->typ_refcnt++;
1201         rc = ldt->ldt_ops->ldto_device_init(d, child);
1202         if (rc) {
1203                 CERROR("can't init device '%s', rc %d\n", typename, rc);
1204                 GOTO(out_alloc, rc);
1205         }
1206         lu_device_get(d);
1207
1208         RETURN(d);
1209 out_alloc:
1210         ldt->ldt_ops->ldto_device_free(d);
1211         type->typ_refcnt--;
1212 out_type:
1213         class_put_type(type);
1214 out:
1215         RETURN(ERR_PTR(rc));
1216 }
1217
1218 static int mdt_stack_init(struct mdt_device *m, struct lustre_cfg *cfg) 
1219 {
1220         struct lu_device  *d = &m->mdt_md_dev.md_lu_dev;
1221         int rc;
1222         
1223         /* init the stack */
1224         d = mdt_layer_setup(LUSTRE_OSD0_NAME, d, cfg);
1225         if (IS_ERR(d)) {
1226                 GOTO(out, rc = PTR_ERR(d));
1227         }
1228
1229         d = mdt_layer_setup(LUSTRE_MDD0_NAME, d, cfg);
1230         if (IS_ERR(d)) {
1231                 GOTO(out, rc = PTR_ERR(d));
1232         }
1233
1234         d = mdt_layer_setup(LUSTRE_CMM0_NAME, d, cfg);
1235         if (IS_ERR(d)) {
1236                 GOTO(out, rc = PTR_ERR(d));
1237         }
1238
1239         m->mdt_child = lu2md_dev(d);
1240
1241         RETURN(0);
1242 out:
1243         mdt_stack_fini(m);
1244         return rc;
1245 }
1246
1247 static void mdt_fini(struct mdt_device *m)
1248 {
1249         struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
1250
1251         mdt_stop_ptlrpc_service(m);
1252         if (d->ld_site != NULL) {
1253                 lu_site_fini(d->ld_site);
1254                 OBD_FREE_PTR(d->ld_site);
1255                 d->ld_site = NULL;
1256         }
1257         if (m->mdt_namespace != NULL) {
1258                 ldlm_namespace_free(m->mdt_namespace, 0);
1259                 m->mdt_namespace = NULL;
1260         }
1261
1262         /* finish the stack */
1263         mdt_stack_fini(m);
1264
1265         if (m->mdt_seq_mgr) {
1266                 seq_mgr_fini(m->mdt_seq_mgr);
1267                 m->mdt_seq_mgr = NULL;
1268         }
1269
1270         LASSERT(atomic_read(&d->ld_ref) == 0);
1271         md_device_fini(&m->mdt_md_dev);
1272 }
1273
1274 static int mdt_init0(struct mdt_device *m,
1275                      struct lu_device_type *t, struct lustre_cfg *cfg)
1276 {
1277         int rc;
1278         struct lu_site *s;
1279         char   ns_name[48];
1280         struct lu_context ctx;
1281         ENTRY;
1282
1283         OBD_ALLOC_PTR(s);
1284         if (s == NULL)
1285                 RETURN(-ENOMEM);
1286
1287         md_device_init(&m->mdt_md_dev, t);
1288         m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
1289         
1290         rc = lu_site_init(s, &m->mdt_md_dev.md_lu_dev, cfg);
1291         if (rc) {
1292                 CERROR("can't init lu_site, rc %d\n", rc);
1293                 GOTO(err_fini_site, rc);
1294         }
1295         /* init the stack */
1296         rc = mdt_stack_init(m, cfg);
1297         if (rc) {
1298                 CERROR("can't init device stack, rc %d\n", rc);
1299                 GOTO(err_fini_site, rc);
1300         }
1301
1302         m->mdt_seq_mgr = seq_mgr_init(&seq_mgr_ops, m);
1303         if (!m->mdt_seq_mgr) {
1304                 CERROR("can't initialize sequence manager\n");
1305                 GOTO(err_fini_stack, rc);
1306         }
1307
1308         rc = lu_context_init(&ctx);
1309         if (rc != 0)
1310                 GOTO(err_fini_mgr, rc);
1311
1312         lu_context_enter(&ctx);
1313         /* init sequence info after device stack is initialized. */
1314         rc = seq_mgr_setup(&ctx, m->mdt_seq_mgr);
1315         lu_context_exit(&ctx);
1316         if (rc)
1317                 GOTO(err_fini_ctx, rc);
1318
1319         lu_context_fini(&ctx);
1320
1321         snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
1322         m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
1323         if (m->mdt_namespace == NULL)
1324                 GOTO(err_fini_site, rc = -ENOMEM);
1325
1326         ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
1327
1328         rc = mdt_fld_init(m);
1329         if (rc)
1330                 GOTO(err_free_ns, rc);
1331
1332         rc = mdt_start_ptlrpc_service(m);
1333         if (rc)
1334                 GOTO(err_free_fld, rc);
1335         RETURN(0);
1336
1337 err_free_fld:
1338         mdt_fld_fini(m);
1339 err_free_ns:
1340         ldlm_namespace_free(m->mdt_namespace, 0);
1341         m->mdt_namespace = NULL;
1342 err_fini_ctx:
1343         lu_context_fini(&ctx);
1344 err_fini_mgr:
1345         seq_mgr_fini(m->mdt_seq_mgr);
1346         m->mdt_seq_mgr = NULL;
1347 err_fini_stack:
1348         mdt_stack_fini(m);
1349
1350 err_fini_site:
1351         lu_site_fini(s);
1352         OBD_FREE_PTR(s);
1353         RETURN(rc);
1354 }
1355
1356 static struct lu_object *mdt_object_alloc(struct lu_context *ctxt,
1357                                           struct lu_device *d)
1358 {
1359         struct mdt_object *mo;
1360
1361         OBD_ALLOC_PTR(mo);
1362         if (mo != NULL) {
1363                 struct lu_object *o;
1364                 struct lu_object_header *h;
1365
1366                 o = &mo->mot_obj.mo_lu;
1367                 h = &mo->mot_header;
1368                 lu_object_header_init(h);
1369                 lu_object_init(o, h, d);
1370                 lu_object_add_top(h, o);
1371                 return o;
1372         } else
1373                 return NULL;
1374 }
1375
1376 static int mdt_object_init(struct lu_context *ctxt, struct lu_object *o)
1377 {
1378         struct mdt_device *d = mdt_dev(o->lo_dev);
1379         struct lu_device  *under;
1380         struct lu_object  *below;
1381
1382         under = &d->mdt_child->md_lu_dev;
1383         below = under->ld_ops->ldo_object_alloc(ctxt, under);
1384         if (below != NULL) {
1385                 lu_object_add(o, below);
1386                 return 0;
1387         } else
1388                 return -ENOMEM;
1389 }
1390
1391 static void mdt_object_free(struct lu_context *ctxt, struct lu_object *o)
1392 {
1393         struct mdt_object *mo = lu2mdt_obj(o);
1394         struct lu_object_header *h;
1395
1396         h = o->lo_header;
1397         lu_object_fini(o);
1398         lu_object_header_fini(h);
1399         OBD_FREE_PTR(mo);
1400 }
1401
1402 static void mdt_object_release(struct lu_context *ctxt, struct lu_object *o)
1403 {
1404 }
1405
1406 static int mdt_object_print(struct lu_context *ctxt,
1407                             struct seq_file *f, const struct lu_object *o)
1408 {
1409         return seq_printf(f, LUSTRE_MDT0_NAME"-object@%p", o);
1410 }
1411
1412 static struct lu_device_operations mdt_lu_ops = {
1413         .ldo_object_alloc   = mdt_object_alloc,
1414         .ldo_object_init    = mdt_object_init,
1415         .ldo_object_free    = mdt_object_free,
1416         .ldo_object_release = mdt_object_release,
1417         .ldo_object_print   = mdt_object_print
1418 };
1419
1420 /* mds_connect copy */
1421 static int mdt_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
1422                            struct obd_uuid *cluuid,
1423                            struct obd_connect_data *data)
1424 {
1425         struct obd_export *exp;
1426         int rc;
1427         struct mdt_device *mdt;
1428         struct mds_export_data *med;
1429         struct mds_client_data *mcd = NULL;
1430         ENTRY;
1431
1432         if (!conn || !obd || !cluuid)
1433                 RETURN(-EINVAL);
1434
1435         mdt = mdt_dev(obd->obd_lu_dev);
1436
1437         rc = class_connect(conn, obd, cluuid);
1438         if (rc)
1439                 RETURN(rc);
1440
1441         exp = class_conn2export(conn);
1442         LASSERT(exp);
1443         med = &exp->exp_mds_data;
1444         
1445         OBD_ALLOC_PTR(mcd);
1446         if (!mcd)
1447                 GOTO(out, rc = -ENOMEM);
1448
1449         memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
1450         med->med_mcd = mcd;
1451
1452 out:
1453         if (rc) {
1454                 class_disconnect(exp);
1455         } else {
1456                 class_export_put(exp);
1457         }
1458
1459         RETURN(rc);
1460 }
1461
1462 static int mdt_obd_disconnect(struct obd_export *exp)
1463 {
1464         struct mds_export_data *med = &exp->exp_mds_data;
1465         unsigned long irqflags;
1466         int rc;
1467         ENTRY;
1468
1469         LASSERT(exp);
1470         class_export_get(exp);
1471
1472         /* Disconnect early so that clients can't keep using export */
1473         rc = class_disconnect(exp);
1474         //ldlm_cancel_locks_for_export(exp);
1475
1476         /* complete all outstanding replies */
1477         spin_lock_irqsave(&exp->exp_lock, irqflags);
1478         while (!list_empty(&exp->exp_outstanding_replies)) {
1479                 struct ptlrpc_reply_state *rs =
1480                         list_entry(exp->exp_outstanding_replies.next,
1481                                    struct ptlrpc_reply_state, rs_exp_list);
1482                 struct ptlrpc_service *svc = rs->rs_service;
1483
1484                 spin_lock(&svc->srv_lock);
1485                 list_del_init(&rs->rs_exp_list);
1486                 ptlrpc_schedule_difficult_reply(rs);
1487                 spin_unlock(&svc->srv_lock);
1488         }
1489         spin_unlock_irqrestore(&exp->exp_lock, irqflags);
1490         
1491         OBD_FREE_PTR(med->med_mcd);
1492
1493         class_export_put(exp);
1494         RETURN(rc);
1495 }
1496
1497 static struct obd_ops mdt_obd_device_ops = {
1498         .o_owner = THIS_MODULE,
1499         .o_connect = mdt_obd_connect,
1500         .o_disconnect = mdt_obd_disconnect,
1501 };
1502
1503 static struct lu_device *mdt_device_alloc(struct lu_device_type *t,
1504                                           struct lustre_cfg *cfg)
1505 {
1506         struct lu_device  *l;
1507         struct mdt_device *m;
1508
1509         OBD_ALLOC_PTR(m);
1510         if (m != NULL) {
1511                 int result;
1512
1513                 l = &m->mdt_md_dev.md_lu_dev;
1514                 result = mdt_init0(m, t, cfg);
1515                 if (result != 0) {
1516                         mdt_fini(m);
1517                         return ERR_PTR(result);
1518                 }
1519
1520         } else
1521                 l = ERR_PTR(-ENOMEM);
1522         return l;
1523 }
1524
1525 static void mdt_device_free(struct lu_device *d)
1526 {
1527         struct mdt_device *m = mdt_dev(d);
1528
1529         mdt_fini(m);
1530         OBD_FREE_PTR(m);
1531 }
1532
1533 static void *mdt_thread_init(struct lu_context *ctx)
1534 {
1535         struct mdt_thread_info *info;
1536
1537         OBD_ALLOC_PTR(info);
1538         if (info != NULL)
1539                 info->mti_ctxt = ctx;
1540         else
1541                 info = ERR_PTR(-ENOMEM);
1542         return info;
1543 }
1544
1545 static void mdt_thread_fini(struct lu_context *ctx, void *data)
1546 {
1547         struct mdt_thread_info *info = data;
1548         OBD_FREE_PTR(info);
1549 }
1550
1551 static struct lu_context_key mdt_thread_key = {
1552         .lct_init = mdt_thread_init,
1553         .lct_fini = mdt_thread_fini
1554 };
1555
1556 static int mdt_type_init(struct lu_device_type *t)
1557 {
1558         return lu_context_key_register(&mdt_thread_key);
1559 }
1560
1561 static void mdt_type_fini(struct lu_device_type *t)
1562 {
1563         lu_context_key_degister(&mdt_thread_key);
1564 }
1565
1566 static struct lu_device_type_operations mdt_device_type_ops = {
1567         .ldto_init = mdt_type_init,
1568         .ldto_fini = mdt_type_fini,
1569
1570         .ldto_device_alloc = mdt_device_alloc,
1571         .ldto_device_free  = mdt_device_free
1572 };
1573
1574 static struct lu_device_type mdt_device_type = {
1575         .ldt_tags = LU_DEVICE_MD,
1576         .ldt_name = LUSTRE_MDT0_NAME,
1577         .ldt_ops  = &mdt_device_type_ops
1578 };
1579
1580 static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
1581         { 0 }
1582 };
1583
1584 static struct lprocfs_vars lprocfs_mdt_module_vars[] = {
1585         { 0 }
1586 };
1587
1588 LPROCFS_INIT_VARS(mdt, lprocfs_mdt_module_vars, lprocfs_mdt_obd_vars);
1589
1590 static int __init mdt_mod_init(void)
1591 {
1592         struct lprocfs_static_vars lvars;
1593
1594         mdt_num_threads = MDT_NUM_THREADS;
1595         lprocfs_init_vars(mdt, &lvars);
1596         return class_register_type(&mdt_obd_device_ops, lvars.module_vars,
1597                                    LUSTRE_MDT0_NAME, &mdt_device_type);
1598 }
1599
1600 static void __exit mdt_mod_exit(void)
1601 {
1602         class_unregister_type(LUSTRE_MDT0_NAME);
1603 }
1604
1605
1606 #define DEF_HNDL(prefix, base, suffix, flags, opc, fn)                  \
1607 [prefix ## _ ## opc - prefix ## _ ## base] = {                          \
1608         .mh_name    = #opc,                                             \
1609         .mh_fail_id = OBD_FAIL_ ## prefix ## _  ## opc ## suffix,       \
1610         .mh_opc     = prefix ## _  ## opc,                              \
1611         .mh_flags   = flags,                                            \
1612         .mh_act     = fn                                                \
1613 }
1614
1615 #define DEF_MDT_HNDL(flags, name, fn)                   \
1616         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn)
1617
1618 static struct mdt_handler mdt_mds_ops[] = {
1619         DEF_MDT_HNDL(0,            CONNECT,        mdt_connect),
1620         DEF_MDT_HNDL(0,            DISCONNECT,     mdt_disconnect),
1621         DEF_MDT_HNDL(0,            GETSTATUS,      mdt_getstatus),
1622         DEF_MDT_HNDL(HABEO_CORPUS, GETATTR,        mdt_getattr),
1623         DEF_MDT_HNDL(HABEO_CORPUS, GETATTR_NAME,   mdt_getattr_name),
1624         DEF_MDT_HNDL(HABEO_CORPUS, SETXATTR,       mdt_setxattr),
1625         DEF_MDT_HNDL(HABEO_CORPUS, GETXATTR,       mdt_getxattr),
1626         DEF_MDT_HNDL(0,            STATFS,         mdt_statfs),
1627         DEF_MDT_HNDL(HABEO_CORPUS, READPAGE,       mdt_readpage),
1628         DEF_MDT_HNDL(0,            REINT,          mdt_reint),
1629         DEF_MDT_HNDL(HABEO_CORPUS, CLOSE,          mdt_close),
1630         DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING,   mdt_done_writing),
1631         DEF_MDT_HNDL(0,            PIN,            mdt_pin),
1632         DEF_MDT_HNDL(HABEO_CORPUS, SYNC,           mdt_sync),
1633         DEF_MDT_HNDL(0,            FLD,            mdt_fld),
1634         DEF_MDT_HNDL(0,            QUOTACHECK,     mdt_handle_quotacheck),
1635         DEF_MDT_HNDL(0,            QUOTACTL,       mdt_handle_quotactl)
1636 };
1637
1638 static struct mdt_handler mdt_obd_ops[] = {
1639 };
1640
1641 #define DEF_DLM_HNDL(flags, name, fn)                   \
1642         DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn)
1643
1644 static struct mdt_handler mdt_dlm_ops[] = {
1645         DEF_DLM_HNDL(HABEO_CLAVIS, ENQUEUE,        mdt_enqueue),
1646         DEF_DLM_HNDL(HABEO_CLAVIS, CONVERT,        mdt_convert),
1647         DEF_DLM_HNDL(0,            BL_CALLBACK,    mdt_bl_callback),
1648         DEF_DLM_HNDL(0,            CP_CALLBACK,    mdt_cp_callback)
1649 };
1650
1651 static struct mdt_handler mdt_llog_ops[] = {
1652 };
1653
1654 static struct mdt_opc_slice mdt_handlers[] = {
1655         {
1656                 .mos_opc_start = MDS_GETATTR,
1657                 .mos_opc_end   = MDS_LAST_OPC,
1658                 .mos_hs        = mdt_mds_ops
1659         },
1660         {
1661                 .mos_opc_start = OBD_PING,
1662                 .mos_opc_end   = OBD_LAST_OPC,
1663                 .mos_hs        = mdt_obd_ops
1664         },
1665         {
1666                 .mos_opc_start = LDLM_ENQUEUE,
1667                 .mos_opc_end   = LDLM_LAST_OPC,
1668                 .mos_hs        = mdt_dlm_ops
1669         },
1670         {
1671                 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
1672                 .mos_opc_end   = LLOG_LAST_OPC,
1673                 .mos_hs        = mdt_llog_ops
1674         },
1675         {
1676                 .mos_hs        = NULL
1677         }
1678 };
1679
1680 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1681 MODULE_DESCRIPTION("Lustre Meta-data Target Prototype ("LUSTRE_MDT0_NAME")");
1682 MODULE_LICENSE("GPL");
1683
1684 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
1685                 "number of mdt service threads to start");
1686
1687 cfs_module(mdt, "0.0.4", mdt_mod_init, mdt_mod_exit);