Whamcloud - gitweb
mdt_stack_fini(): cosmetic fixes
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/handler.c
5  *  Lustre Metadata Target (mdt) request handler
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *   Author: Nikita Danilov <nikita@clusterfs.com>
13  *
14  *   This file is part of the Lustre file system, http://www.lustre.org
15  *   Lustre is a trademark of Cluster File Systems, Inc.
16  *
17  *   You may have signed or agreed to another license before downloading
18  *   this software.  If so, you are bound by the terms and conditions
19  *   of that agreement, and the following does not apply to you.  See the
20  *   LICENSE file included with this distribution for more information.
21  *
22  *   If you did not agree to a different license, then this copy of Lustre
23  *   is open source software; you can redistribute it and/or modify it
24  *   under the terms of version 2 of the GNU General Public License as
25  *   published by the Free Software Foundation.
26  *
27  *   In either case, Lustre is distributed in the hope that it will be
28  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
29  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
30  *   license text for more details.
31  */
32
33 #ifndef EXPORT_SYMTAB
34 # define EXPORT_SYMTAB
35 #endif
36 #define DEBUG_SUBSYSTEM S_MDS
37
38 #include <linux/module.h>
39
40 /* LUSTRE_VERSION_CODE */
41 #include <linux/lustre_ver.h>
42 /*
43  * struct OBD_{ALLOC,FREE}*()
44  * OBD_FAIL_CHECK
45  */
46 #include <linux/obd_support.h>
47 /* struct ptlrpc_request */
48 #include <linux/lustre_net.h>
49 /* struct obd_export */
50 #include <linux/lustre_export.h>
51 /* struct obd_device */
52 #include <linux/obd.h>
53
54 /* struct mds_client_data */
55 #include "../mds/mds_internal.h"
56 #include "mdt_internal.h"
57
58 /*
59  * Initialized in mdt_mod_init().
60  */
61 unsigned long mdt_num_threads;
62
63 static int                mdt_handle    (struct ptlrpc_request *req);
64 static struct mdt_device *mdt_dev       (struct lu_device *d);
65 static struct lu_fid     *mdt_object_fid(struct mdt_object *o);
66
67 static struct lu_context_key mdt_thread_key;
68
69 /* object operations */
70 static int mdt_md_mkdir(struct mdt_thread_info *info, struct mdt_device *d,
71                         struct lu_fid *pfid, const char *name,
72                         struct lu_fid *cfid)
73 {
74         struct mdt_object      *o;
75         struct mdt_object      *child;
76         struct mdt_lock_handle *lh;
77
78         int result;
79
80         lh = &info->mti_lh[MDT_LH_PARENT];
81         lh->mlh_mode = LCK_PW;
82
83         o = mdt_object_find_lock(info->mti_ctxt,
84                                  d, pfid, lh, MDS_INODELOCK_UPDATE);
85         if (IS_ERR(o))
86                 return PTR_ERR(o);
87
88         child = mdt_object_find(info->mti_ctxt, d, cfid);
89         if (!IS_ERR(child)) {
90                 struct md_object *next = mdt_object_child(o);
91
92                 result = next->mo_ops->moo_mkdir(info->mti_ctxt, next, name,
93                                                  mdt_object_child(child));
94                 mdt_object_put(info->mti_ctxt, child);
95         } else
96                 result = PTR_ERR(child);
97         mdt_object_unlock(d->mdt_namespace, o, lh);
98         mdt_object_put(info->mti_ctxt, o);
99         return result;
100 }
101
102 static int mdt_getstatus(struct mdt_thread_info *info,
103                          struct ptlrpc_request *req, int offset)
104 {
105         struct md_device *next  = info->mti_mdt->mdt_child;
106         struct mdt_body  *body;
107         int               size = sizeof *body;
108         int               result;
109
110         ENTRY;
111
112         result = lustre_pack_reply(req, 1, &size, NULL);
113         if (result)
114                 CERROR(LUSTRE_MDT0_NAME" out of memory for message: size=%d\n",
115                        size);
116         else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
117                 result = -ENOMEM;
118         else {
119                 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof *body);
120                 result = next->md_ops->mdo_root_get(info->mti_ctxt,
121                                                     next, &body->fid1);
122         }
123
124         /* the last_committed and last_xid fields are filled in for all
125          * replies already - no need to do so here also.
126          */
127         RETURN(result);
128 }
129
130 static int mdt_statfs(struct mdt_thread_info *info,
131                       struct ptlrpc_request *req, int offset)
132 {
133         struct md_device  *next  = info->mti_mdt->mdt_child;
134         struct obd_statfs *osfs;
135         struct kstatfs    sfs;
136         int               result;
137         int               size = sizeof(struct obd_statfs);
138
139         ENTRY;
140
141         result = lustre_pack_reply(req, 1, &size, NULL);
142         if (result)
143                 CERROR(LUSTRE_MDT0_NAME" out of memory for statfs: size=%d\n",
144                        size);
145         else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
146                 CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
147                 result = -ENOMEM;
148         } else {
149                 osfs = lustre_msg_buf(req->rq_repmsg, 0, size);
150                 /* XXX max_age optimisation is needed here. See mds_statfs */
151                 result = next->md_ops->mdo_statfs(info->mti_ctxt, next, &sfs);
152                 statfs_pack(osfs, &sfs);
153         }
154
155         RETURN(result);
156 }
157
158 static void mdt_pack_attr2body(struct mdt_body *b, struct lu_attr *attr)
159 {
160         b->valid |= OBD_MD_FLID | OBD_MD_FLCTIME | OBD_MD_FLUID |
161                     OBD_MD_FLGID | OBD_MD_FLFLAGS | OBD_MD_FLTYPE |
162                     OBD_MD_FLMODE | OBD_MD_FLNLINK | OBD_MD_FLGENER;
163
164         if (!S_ISREG(attr->la_mode))
165                 b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLATIME |
166                             OBD_MD_FLMTIME;
167
168         b->atime      = attr->la_atime;
169         b->mtime      = attr->la_mtime;
170         b->ctime      = attr->la_ctime;
171         b->mode       = attr->la_mode;
172         b->size       = attr->la_size;
173         b->blocks     = attr->la_blocks;
174         b->uid        = attr->la_uid;
175         b->gid        = attr->la_gid;
176         b->flags      = attr->la_flags;
177         b->nlink      = attr->la_nlink;
178 }
179
180 static int mdt_getattr(struct mdt_thread_info *info,
181                        struct ptlrpc_request *req, int offset)
182 {
183         struct mdt_body *body;
184         int              size = sizeof (*body);
185         int              result;
186
187         LASSERT(info->mti_object != NULL);
188
189         ENTRY;
190
191         result = lustre_pack_reply(req, 1, &size, NULL);
192         if (result)
193                 CERROR(LUSTRE_MDT0_NAME" cannot pack size=%d, rc=%d\n",
194                        size, result);
195         else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
196                 CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
197                 result = -ENOMEM;
198         } else {
199                 struct md_object *next = mdt_object_child(info->mti_object);
200
201                 result = next->mo_ops->moo_attr_get(info->mti_ctxt, next,
202                                                     &info->mti_ctxt->lc_attr);
203                 if (result == 0) {
204                         body = lustre_msg_buf(req->rq_repmsg, 0, size);
205                         mdt_pack_attr2body(body, &info->mti_ctxt->lc_attr);
206                         body->fid1 = *mdt_object_fid(info->mti_object);
207                 }
208         }
209         RETURN(result);
210 }
211
212 static struct lu_device_operations mdt_lu_ops;
213
214 static int lu_device_is_mdt(struct lu_device *d)
215 {
216         /*
217          * XXX for now. Tags in lu_device_type->ldt_something are needed.
218          */
219         return ergo(d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
220 }
221
222 static struct mdt_device *mdt_dev(struct lu_device *d)
223 {
224         LASSERT(lu_device_is_mdt(d));
225         return container_of(d, struct mdt_device, mdt_md_dev.md_lu_dev);
226 }
227
228 static int mdt_connect(struct mdt_thread_info *info,
229                        struct ptlrpc_request *req, int offset)
230 {
231         int result;
232
233         result = target_handle_connect(req, mdt_handle);
234         if (result == 0) {
235                 struct obd_connect_data *data;
236
237                 LASSERT(req->rq_export != NULL);
238                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
239
240                 data = lustre_msg_buf(req->rq_repmsg, 0, sizeof *data);
241                 result = seq_mgr_alloc(info->mti_ctxt,
242                                        info->mti_mdt->mdt_seq_mgr,
243                                        &data->ocd_seq);
244         }
245         return result;
246 }
247
248 static int mdt_disconnect(struct mdt_thread_info *info,
249                           struct ptlrpc_request *req, int offset)
250 {
251         //return -EOPNOTSUPP;
252         return target_handle_disconnect(req);
253 }
254
255 static int mdt_getattr_name(struct mdt_thread_info *info,
256                             struct ptlrpc_request *req, int offset)
257 {
258         return -EOPNOTSUPP;
259 }
260
261 static int mdt_setxattr(struct mdt_thread_info *info,
262                         struct ptlrpc_request *req, int offset)
263 {
264         return -EOPNOTSUPP;
265 }
266
267 static int mdt_getxattr(struct mdt_thread_info *info,
268                         struct ptlrpc_request *req, int offset)
269 {
270         return -EOPNOTSUPP;
271 }
272
273 static int mdt_readpage(struct mdt_thread_info *info,
274                         struct ptlrpc_request *req, int offset)
275 {
276         return -EOPNOTSUPP;
277 }
278
279 static int mdt_reint(struct mdt_thread_info *info,
280                      struct ptlrpc_request *req, int offset)
281 {
282         return -EOPNOTSUPP;
283 }
284
285 static int mdt_close(struct mdt_thread_info *info,
286                      struct ptlrpc_request *req, int offset)
287 {
288         return -EOPNOTSUPP;
289 }
290
291 static int mdt_done_writing(struct mdt_thread_info *info,
292                             struct ptlrpc_request *req, int offset)
293 {
294         return -EOPNOTSUPP;
295 }
296
297 static int mdt_pin(struct mdt_thread_info *info,
298                    struct ptlrpc_request *req, int offset)
299 {
300         return -EOPNOTSUPP;
301 }
302
303 static int mdt_sync(struct mdt_thread_info *info,
304                     struct ptlrpc_request *req, int offset)
305 {
306         return -EOPNOTSUPP;
307 }
308
309 static int mdt_handle_quotacheck(struct mdt_thread_info *info,
310                                  struct ptlrpc_request *req, int offset)
311 {
312         return -EOPNOTSUPP;
313 }
314
315 static int mdt_handle_quotactl(struct mdt_thread_info *info,
316                                struct ptlrpc_request *req, int offset)
317 {
318         return -EOPNOTSUPP;
319 }
320
321 /*
322  * DLM handlers.
323  */
324
325 static struct ldlm_callback_suite cbs = {
326         .lcs_completion = ldlm_server_completion_ast,
327         .lcs_blocking   = ldlm_server_blocking_ast,
328         .lcs_glimpse    = NULL
329 };
330
331 static int mdt_enqueue(struct mdt_thread_info *info,
332                        struct ptlrpc_request *req, int offset)
333 {
334         /*
335          * info->mti_dlm_req already contains swapped and (if necessary)
336          * converted dlm request.
337          */
338         LASSERT(info->mti_dlm_req != NULL);
339
340         info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
341         return ldlm_handle_enqueue0(req, info->mti_dlm_req, &cbs);
342 }
343
344 static int mdt_convert(struct mdt_thread_info *info,
345                        struct ptlrpc_request *req, int offset)
346 {
347         LASSERT(info->mti_dlm_req);
348         return ldlm_handle_convert0(req, info->mti_dlm_req);
349 }
350
351 static int mdt_bl_callback(struct mdt_thread_info *info,
352                            struct ptlrpc_request *req, int offset)
353 {
354         CERROR("bl callbacks should not happen on MDS\n");
355         LBUG();
356         return -EOPNOTSUPP;
357 }
358
359 static int mdt_cp_callback(struct mdt_thread_info *info,
360                            struct ptlrpc_request *req, int offset)
361 {
362         CERROR("cp callbacks should not happen on MDS\n");
363         LBUG();
364         return -EOPNOTSUPP;
365 }
366
367 /*
368  * Build (DLM) resource name from fid.
369  */
370 struct ldlm_res_id *fid_build_res_name(const struct lu_fid *f,
371                                        struct ldlm_res_id *name)
372 {
373         memset(name, 0, sizeof *name);
374         /* we use fid_num() whoch includes also object version instread of raw
375          * fid_oid(). */
376         name->name[0] = fid_seq(f);
377         name->name[1] = fid_num(f);
378         return name;
379 }
380
381 /*
382  * Return true if resource is for object identified by fid.
383  */
384 int fid_res_name_eq(const struct lu_fid *f, const struct ldlm_res_id *name)
385 {
386         return name->name[0] == fid_seq(f) && name->name[1] == fid_num(f);
387 }
388
389 /* issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
390 int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
391              struct lustre_handle *lh, ldlm_mode_t mode,
392              ldlm_policy_data_t *policy)
393 {
394         struct ldlm_res_id res_id;
395         int flags = 0, rc;
396         ENTRY;
397
398         LASSERT(ns != NULL);
399         LASSERT(lh != NULL);
400         LASSERT(f != NULL);
401
402         /* FIXME: is that correct to have @flags=0 here? */
403         rc = ldlm_cli_enqueue(NULL, NULL, ns, *fid_build_res_name(f, &res_id),
404                               LDLM_IBITS, policy, mode, &flags,
405                               ldlm_blocking_ast, ldlm_completion_ast, NULL,
406                               NULL, NULL, 0, NULL, lh);
407         RETURN (rc == ELDLM_OK ? 0 : -EIO);
408 }
409
410 void fid_unlock(struct ldlm_namespace *ns, const struct lu_fid *f,
411                 struct lustre_handle *lh, ldlm_mode_t mode)
412 {
413         struct ldlm_lock *lock;
414         ENTRY;
415
416         /* FIXME: this is debug stuff, remove it later. */
417         lock = ldlm_handle2lock(lh);
418         if (!lock) {
419                 CERROR("invalid lock handle "LPX64, lh->cookie);
420                 LBUG();
421         }
422
423         LASSERT(fid_res_name_eq(f, &lock->l_resource->lr_name));
424
425         ldlm_lock_decref(lh, mode);
426         EXIT;
427 }
428
429 static struct mdt_object *mdt_obj(struct lu_object *o)
430 {
431         LASSERT(lu_device_is_mdt(o->lo_dev));
432         return container_of(o, struct mdt_object, mot_obj.mo_lu);
433 }
434
435 struct mdt_object *mdt_object_find(struct lu_context *ctxt,
436                                    struct mdt_device *d,
437                                    struct lu_fid *f)
438 {
439         struct lu_object *o;
440
441         o = lu_object_find(ctxt, d->mdt_md_dev.md_lu_dev.ld_site, f);
442         if (IS_ERR(o))
443                 return (struct mdt_object *)o;
444         else
445                 return mdt_obj(o);
446 }
447
448 void mdt_object_put(struct lu_context *ctxt, struct mdt_object *o)
449 {
450         lu_object_put(ctxt, &o->mot_obj.mo_lu);
451 }
452
453 static struct lu_fid *mdt_object_fid(struct mdt_object *o)
454 {
455         return lu_object_fid(&o->mot_obj.mo_lu);
456 }
457
458 int mdt_object_lock(struct ldlm_namespace *ns, struct mdt_object *o,
459                     struct mdt_lock_handle *lh, __u64 ibits)
460 {
461         ldlm_policy_data_t p = {
462                 .l_inodebits = {
463                         .bits = ibits
464                 }
465         };
466         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
467         LASSERT(lh->mlh_mode != LCK_MINMODE);
468
469         return fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode, &p);
470 }
471
472 void mdt_object_unlock(struct ldlm_namespace *ns, struct mdt_object *o,
473                               struct mdt_lock_handle *lh)
474 {
475         if (lustre_handle_is_used(&lh->mlh_lh)) {
476                 fid_unlock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode);
477                 lh->mlh_lh.cookie = 0;
478         }
479 }
480
481 struct mdt_object *mdt_object_find_lock(struct lu_context *ctxt,
482                                         struct mdt_device *d,
483                                         struct lu_fid *f,
484                                         struct mdt_lock_handle *lh,
485                                         __u64 ibits)
486 {
487         struct mdt_object *o;
488
489         o = mdt_object_find(ctxt, d, f);
490         if (!IS_ERR(o)) {
491                 int result;
492
493                 result = mdt_object_lock(d->mdt_namespace, o, lh, ibits);
494                 if (result != 0) {
495                         mdt_object_put(ctxt, o);
496                         o = ERR_PTR(result);
497                 }
498         }
499         return o;
500 }
501
502 struct mdt_handler {
503         const char *mh_name;
504         int         mh_fail_id;
505         __u32       mh_opc;
506         __u32       mh_flags;
507         int (*mh_act)(struct mdt_thread_info *info,
508                       struct ptlrpc_request *req, int offset);
509 };
510
511 enum mdt_handler_flags {
512         /*
513          * struct mdt_body is passed in the 0-th incoming buffer.
514          */
515         HABEO_CORPUS = (1 << 0),
516         /*
517          * struct ldlm_request is passed in MDS_REQ_INTENT_LOCKREQ_OFF-th
518          * incoming buffer.
519          */
520         HABEO_CLAVIS   = (1 << 1)
521 };
522
523 struct mdt_opc_slice {
524         __u32               mos_opc_start;
525         int                 mos_opc_end;
526         struct mdt_handler *mos_hs;
527 };
528
529 static struct mdt_opc_slice mdt_handlers[];
530
531 static struct mdt_handler *mdt_handler_find(__u32 opc)
532 {
533         struct mdt_opc_slice *s;
534         struct mdt_handler   *h;
535
536         h = NULL;
537         for (s = mdt_handlers; s->mos_hs != NULL; s++) {
538                 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
539                         h = s->mos_hs + (opc - s->mos_opc_start);
540                         if (h->mh_opc != 0)
541                                 LASSERT(h->mh_opc == opc);
542                         else
543                                 h = NULL; /* unsupported opc */
544                         break;
545                 }
546         }
547         return h;
548 }
549
550 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
551 {
552         return req->rq_export->exp_mds_data.med_mcd->mcd_last_xid;
553 }
554
555 static int mdt_lock_resname_compat(struct mdt_device *m,
556                                    struct ldlm_request *req)
557 {
558         /* XXX something... later. */
559         return 0;
560 }
561
562 static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
563 {
564         /* XXX something... later. */
565         return 0;
566 }
567
568 /*
569  * Invoke handler for this request opc. Also do necessary preprocessing
570  * (according to handler ->mh_flags), and post-processing (setting of
571  * ->last_{xid,committed}).
572  */
573 static int mdt_req_handle(struct mdt_thread_info *info,
574                           struct mdt_handler *h, struct ptlrpc_request *req,
575                           int shift)
576 {
577         int result;
578         int off;
579
580         ENTRY;
581
582         LASSERT(h->mh_act != NULL);
583         LASSERT(h->mh_opc == req->rq_reqmsg->opc);
584         LASSERT(current->journal_info == NULL);
585
586         DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
587
588         if (h->mh_fail_id != 0)
589                 OBD_FAIL_RETURN(h->mh_fail_id, 0);
590
591         off = MDS_REQ_REC_OFF + shift;
592
593         result = 0;
594         if (h->mh_flags & HABEO_CORPUS) {
595                 struct mdt_body *body;
596
597                 body = info->mti_body =
598                         lustre_swab_reqbuf(req, off, sizeof *info->mti_body,
599                                            lustre_swab_mdt_body);
600                 if (body != NULL) {
601                         info->mti_object = mdt_object_find(info->mti_ctxt,
602                                                            info->mti_mdt,
603                                                            &body->fid1);
604                         if (IS_ERR(info->mti_object)) {
605                                 result = PTR_ERR(info->mti_object);
606                                 info->mti_object = NULL;
607                         }
608                 } else {
609                         CERROR("Can't unpack body\n");
610                         result = -EFAULT;
611                 }
612         } else if (h->mh_flags & HABEO_CLAVIS) {
613                 struct ldlm_request *dlm;
614
615                 LASSERT(shift == 0);
616                 dlm = info->mti_dlm_req =
617                         lustre_swab_reqbuf(req, MDS_REQ_INTENT_LOCKREQ_OFF,
618                                            sizeof *dlm,
619                                            lustre_swab_ldlm_request);
620                 if (dlm != NULL) {
621                         if (info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME)
622                                 result = mdt_lock_resname_compat(info->mti_mdt,
623                                                                  dlm);
624                 } else {
625                         CERROR("Can't unpack dlm request\n");
626                         result = -EFAULT;
627                 }
628         }
629         if (result == 0)
630                 /*
631                  * Process request.
632                  */
633                 result = h->mh_act(info, req, off);
634         /*
635          * XXX result value is unconditionally shoved into ->rq_status
636          * (original code sometimes placed error code into ->rq_status, and
637          * sometimes returned it to the
638          * caller). ptlrpc_server_handle_request() doesn't check return value
639          * anyway.
640          */
641         req->rq_status = result;
642
643         LASSERT(current->journal_info == NULL);
644
645         if (h->mh_flags & HABEO_CLAVIS &&
646             info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME) {
647                 struct ldlm_reply *rep;
648
649                 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof *rep);
650                 if (rep != NULL)
651                         result = mdt_lock_reply_compat(info->mti_mdt, rep);
652         }
653
654         /* If we're DISCONNECTing, the mds_export_data is already freed */
655         if (result == 0 && h->mh_opc != MDS_DISCONNECT) {
656                 req->rq_reqmsg->last_xid = le64_to_cpu(req_exp_last_xid(req));
657                 target_committed_to_req(req);
658         }
659         RETURN(result);
660 }
661
662 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
663 {
664         lh->mlh_lh.cookie = 0ull;
665         lh->mlh_mode = LCK_MINMODE;
666 }
667
668 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
669 {
670         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
671 }
672
673 static void mdt_thread_info_init(struct mdt_thread_info *info)
674 {
675         int i;
676
677         info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
678         /*
679          * Poison size array.
680          */
681         for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
682                 info->mti_rep_buf_size[i] = ~0;
683         info->mti_rep_buf_nr = i;
684         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
685                 mdt_lock_handle_init(&info->mti_lh[i]);
686         lu_context_enter(info->mti_ctxt);
687 }
688
689 static void mdt_thread_info_fini(struct mdt_thread_info *info)
690 {
691         int i;
692
693         lu_context_exit(info->mti_ctxt);
694         if (info->mti_object != NULL) {
695                 mdt_object_put(info->mti_ctxt, info->mti_object);
696                 info->mti_object = NULL;
697         }
698         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
699                 mdt_lock_handle_fini(&info->mti_lh[i]);
700 }
701
702 static int mds_msg_check_version(struct lustre_msg *msg)
703 {
704         int rc;
705
706         /* TODO: enable the below check while really introducing msg version.
707          * it's disabled because it will break compatibility with b1_4.
708          */
709         return (0);
710
711         switch (msg->opc) {
712         case MDS_CONNECT:
713         case MDS_DISCONNECT:
714         case OBD_PING:
715                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
716                 if (rc)
717                         CERROR("bad opc %u version %08x, expecting %08x\n",
718                                msg->opc, msg->version, LUSTRE_OBD_VERSION);
719                 break;
720         case MDS_GETSTATUS:
721         case MDS_GETATTR:
722         case MDS_GETATTR_NAME:
723         case MDS_STATFS:
724         case MDS_READPAGE:
725         case MDS_REINT:
726         case MDS_CLOSE:
727         case MDS_DONE_WRITING:
728         case MDS_PIN:
729         case MDS_SYNC:
730         case MDS_GETXATTR:
731         case MDS_SETXATTR:
732         case MDS_SET_INFO:
733         case MDS_QUOTACHECK:
734         case MDS_QUOTACTL:
735         case QUOTA_DQACQ:
736         case QUOTA_DQREL:
737                 rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
738                 if (rc)
739                         CERROR("bad opc %u version %08x, expecting %08x\n",
740                                msg->opc, msg->version, LUSTRE_MDS_VERSION);
741                 break;
742         case LDLM_ENQUEUE:
743         case LDLM_CONVERT:
744         case LDLM_BL_CALLBACK:
745         case LDLM_CP_CALLBACK:
746                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
747                 if (rc)
748                         CERROR("bad opc %u version %08x, expecting %08x\n",
749                                msg->opc, msg->version, LUSTRE_DLM_VERSION);
750                 break;
751         case OBD_LOG_CANCEL:
752         case LLOG_ORIGIN_HANDLE_CREATE:
753         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
754         case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
755         case LLOG_ORIGIN_HANDLE_READ_HEADER:
756         case LLOG_ORIGIN_HANDLE_CLOSE:
757         case LLOG_CATINFO:
758                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
759                 if (rc)
760                         CERROR("bad opc %u version %08x, expecting %08x\n",
761                                msg->opc, msg->version, LUSTRE_LOG_VERSION);
762                 break;
763         default:
764                 CERROR("MDS unknown opcode %d\n", msg->opc);
765                 rc = -ENOTSUPP;
766         }
767         return rc;
768 }
769
770 static int mdt_filter_recovery_request(struct ptlrpc_request *req,
771                                        struct obd_device *obd, int *process)
772 {
773         switch (req->rq_reqmsg->opc) {
774         case MDS_CONNECT: /* This will never get here, but for completeness. */
775         case OST_CONNECT: /* This will never get here, but for completeness. */
776         case MDS_DISCONNECT:
777         case OST_DISCONNECT:
778                *process = 1;
779                RETURN(0);
780
781         case MDS_CLOSE:
782         case MDS_SYNC: /* used in unmounting */
783         case OBD_PING:
784         case MDS_REINT:
785         case LDLM_ENQUEUE:
786                 *process = target_queue_recovery_request(req, obd);
787                 RETURN(0);
788
789         default:
790                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
791                 *process = 0;
792                 /* XXX what should we set rq_status to here? */
793                 req->rq_status = -EAGAIN;
794                 RETURN(ptlrpc_error(req));
795         }
796 }
797
798 /*
799  * Handle recovery. Return:
800  *        +1: continue request processing;
801  *       -ve: abort immediately with the given error code;
802  *         0: send reply with error code in req->rq_status;
803  */
804 static int mdt_recovery(struct ptlrpc_request *req)
805 {
806         int recovering;
807         int abort_recovery;
808         struct obd_device *obd;
809
810         ENTRY;
811
812         if (req->rq_reqmsg->opc == MDS_CONNECT)
813                 RETURN(+1);
814
815         if (req->rq_export == NULL) {
816                 CERROR("operation %d on unconnected MDS from %s\n",
817                        req->rq_reqmsg->opc,
818                        libcfs_id2str(req->rq_peer));
819                 req->rq_status = -ENOTCONN;
820                 RETURN(0);
821         }
822
823         /* sanity check: if the xid matches, the request must be marked as a
824          * resent or replayed */
825         LASSERTF(ergo(req->rq_xid == req_exp_last_xid(req),
826                       lustre_msg_get_flags(req->rq_reqmsg) &
827                       (MSG_RESENT | MSG_REPLAY)),
828                  "rq_xid "LPU64" matches last_xid, "
829                  "expected RESENT flag\n", req->rq_xid);
830
831         /* else: note the opposite is not always true; a RESENT req after a
832          * failover will usually not match the last_xid, since it was likely
833          * never committed. A REPLAYed request will almost never match the
834          * last xid, however it could for a committed, but still retained,
835          * open. */
836
837         obd = req->rq_export->exp_obd;
838
839         /* Check for aborted recovery... */
840         spin_lock_bh(&obd->obd_processing_task_lock);
841         abort_recovery = obd->obd_abort_recovery;
842         recovering = obd->obd_recovering;
843         spin_unlock_bh(&obd->obd_processing_task_lock);
844         if (abort_recovery) {
845                 target_abort_recovery(obd);
846         } else if (recovering) {
847                 int rc;
848                 int should_process;
849
850                 rc = mdt_filter_recovery_request(req, obd, &should_process);
851                 if (rc != 0 || !should_process) {
852                         LASSERT(rc < 0);
853                         RETURN(rc);
854                 }
855         }
856         RETURN(+1);
857 }
858
859 static int mdt_reply(struct ptlrpc_request *req, struct mdt_thread_info *info)
860 {
861         struct obd_device *obd;
862
863         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
864                 if (req->rq_reqmsg->opc != OBD_PING)
865                         DEBUG_REQ(D_ERROR, req, "Unexpected MSG_LAST_REPLAY");
866
867                 obd = req->rq_export != NULL ? req->rq_export->exp_obd : NULL;
868                 if (obd && obd->obd_recovering) {
869                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
870                         RETURN(target_queue_final_reply(req, req->rq_status));
871                 } else {
872                         /* Lost a race with recovery; let the error path
873                          * DTRT. */
874                         req->rq_status = -ENOTCONN;
875                 }
876         }
877         target_send_reply(req, req->rq_status, info->mti_fail_id);
878         RETURN(req->rq_status);
879 }
880
881 static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info)
882 {
883         struct mdt_handler *h;
884         struct lustre_msg  *msg;
885         int                 result;
886
887         ENTRY;
888
889         OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
890
891         LASSERT(current->journal_info == NULL);
892
893         msg = req->rq_reqmsg;
894         result = mds_msg_check_version(msg);
895         if (result == 0) {
896                 result = mdt_recovery(req);
897                 switch (result) {
898                 case +1:
899                         h = mdt_handler_find(msg->opc);
900                         if (h != NULL)
901                                 result = mdt_req_handle(info, h, req, 0);
902                         else {
903                                 req->rq_status = -ENOTSUPP;
904                                 result = ptlrpc_error(req);
905                                 break;
906                         }
907                         /* fall through */
908                 case 0:
909                         result = mdt_reply(req, info);
910                 }
911         } else
912                 CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
913         RETURN(result);
914 }
915
916 static int mdt_handle(struct ptlrpc_request *req)
917 {
918         int result;
919         struct lu_context      *ctx;
920         struct mdt_thread_info *info;
921         ENTRY;
922
923         ctx = req->rq_svc_thread->t_ctx;
924         LASSERT(ctx != NULL);
925         LASSERT(ctx->lc_thread == req->rq_svc_thread);
926
927         info = lu_context_key_get(ctx, &mdt_thread_key);
928         LASSERT(info != NULL);
929
930         mdt_thread_info_init(info);
931         /* it can be NULL while CONNECT */
932         if (req->rq_export)
933                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
934
935         result = mdt_handle0(req, info);
936         mdt_thread_info_fini(info);
937         return result;
938 }
939
940 static int mdt_intent_policy(struct ldlm_namespace *ns,
941                              struct ldlm_lock **lockp, void *req_cookie,
942                              ldlm_mode_t mode, int flags, void *data)
943 {
944         ENTRY;
945         RETURN(ELDLM_LOCK_ABORTED);
946 }
947
948 struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
949                                             svc_handler_t h, char *name,
950                                             struct proc_dir_entry *proc_entry,
951                                             svcreq_printfn_t prntfn)
952 {
953         return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize,
954                                c->psc_max_req_size, c->psc_max_reply_size,
955                                c->psc_req_portal, c->psc_rep_portal,
956                                c->psc_watchdog_timeout,
957                                h, name, proc_entry,
958                                prntfn, c->psc_num_threads);
959 }
960
961 static int mdt_config(struct lu_context *ctx, struct mdt_device *m,
962                       const char *name, void *buf, int size, int mode)
963 {
964         struct md_device *child = m->mdt_child;
965         ENTRY;
966         RETURN(child->md_ops->mdo_config(ctx, child, name, buf, size, mode));
967 }
968
969 static int mdt_seq_mgr_hpr(struct lu_context *ctx, void *opaque, __u64 *seq,
970                            int mode)
971 {
972         struct mdt_device *m = opaque;
973         int rc;
974         ENTRY;
975
976         rc = mdt_config(ctx, m, LUSTRE_CONFIG_METASEQ,
977                         seq, sizeof(*seq),
978                         mode);
979         RETURN(rc);
980 }
981
982 static int mdt_seq_mgr_read(struct lu_context *ctx, void *opaque, __u64 *seq)
983 {
984         ENTRY;
985         RETURN(mdt_seq_mgr_hpr(ctx, opaque, seq, LUSTRE_CONFIG_GET));
986 }
987
988 static int mdt_seq_mgr_write(struct lu_context *ctx, void *opaque, __u64 *seq)
989 {
990         ENTRY;
991         RETURN(mdt_seq_mgr_hpr(ctx, opaque, seq, LUSTRE_CONFIG_SET));
992 }
993
994 struct lu_seq_mgr_ops seq_mgr_ops = {
995         .smo_read  = mdt_seq_mgr_read,
996         .smo_write = mdt_seq_mgr_write
997 };
998
999 /* device init/fini methods */
1000
1001 static int mdt_fld(struct mdt_thread_info *info,
1002                    struct ptlrpc_request *req, int offset)
1003 {
1004         struct lu_site *ls  = info->mti_mdt->mdt_md_dev.md_lu_dev.ld_site;
1005         struct md_fld mf, *p, *reply;
1006         int size = sizeof(*reply);
1007         __u32 *opt;
1008         int rc;
1009         ENTRY;
1010
1011         rc = lustre_pack_reply(req, 1, &size, NULL);
1012         if (rc)
1013                 RETURN(rc);
1014
1015         opt = lustre_swab_reqbuf(req, 0, sizeof(*opt), lustre_swab_generic_32s);
1016         p = lustre_swab_reqbuf(req, 1, sizeof(mf), lustre_swab_md_fld);
1017         mf = *p;
1018
1019         rc = fld_handle(ls->ls_fld, *opt, &mf);
1020         if (rc)
1021                 RETURN(rc);
1022
1023         reply = lustre_msg_buf(req->rq_repmsg, 0, size);
1024         *reply = mf;
1025         RETURN(rc);
1026 }
1027
1028 struct dt_device *md2_bottom_dev(struct mdt_device *m)
1029 {
1030         /*FIXME: get dt device here*/
1031         RETURN (NULL);
1032 }
1033
1034 static int mdt_fld_init(struct mdt_device *m)
1035 {
1036         struct dt_device *dt;
1037         struct lu_site   *ls;
1038         int rc;
1039         ENTRY;
1040
1041         dt = md2_bottom_dev(m);
1042
1043         ls = m->mdt_md_dev.md_lu_dev.ld_site;
1044
1045         OBD_ALLOC_PTR(ls->ls_fld);
1046
1047         if (!ls->ls_fld)
1048              RETURN(-ENOMEM);
1049
1050         rc = fld_server_init(ls->ls_fld, dt);
1051
1052         RETURN(rc);
1053 }
1054
1055 static int mdt_fld_fini(struct mdt_device *m)
1056 {
1057         struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
1058         int rc = 0;
1059
1060         if (ls && ls->ls_fld) {
1061                 fld_server_fini(ls->ls_fld);
1062                 OBD_FREE_PTR(ls->ls_fld);
1063         }
1064         RETURN(rc);
1065 }
1066
1067 static void mdt_stop_ptlrpc_service(struct mdt_device *m)
1068 {
1069         if (m->mdt_service != NULL) {
1070                 ptlrpc_unregister_service(m->mdt_service);
1071                 m->mdt_service = NULL;
1072         }
1073         if (m->mdt_fld_service != NULL) {
1074                 ptlrpc_unregister_service(m->mdt_fld_service);
1075                 m->mdt_fld_service = NULL;
1076         }
1077 }
1078
1079 static int mdt_start_ptlrpc_service(struct mdt_device *m)
1080 {
1081         int rc;
1082         ENTRY;
1083
1084         m->mdt_service_conf.psc_nbufs            = MDS_NBUFS;
1085         m->mdt_service_conf.psc_bufsize          = MDS_BUFSIZE;
1086         m->mdt_service_conf.psc_max_req_size     = MDS_MAXREQSIZE;
1087         m->mdt_service_conf.psc_max_reply_size   = MDS_MAXREPSIZE;
1088         m->mdt_service_conf.psc_req_portal       = MDS_REQUEST_PORTAL;
1089         m->mdt_service_conf.psc_rep_portal       = MDC_REPLY_PORTAL;
1090         m->mdt_service_conf.psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT;
1091         /*
1092          * We'd like to have a mechanism to set this on a per-device basis,
1093          * but alas...
1094          */
1095         m->mdt_service_conf.psc_num_threads = min(max(mdt_num_threads,
1096                                                       MDT_MIN_THREADS),
1097                                                   MDT_MAX_THREADS);
1098
1099         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1100                            "mdt_ldlm_client", &m->mdt_ldlm_client);
1101
1102         m->mdt_service =
1103                 ptlrpc_init_svc_conf(&m->mdt_service_conf, mdt_handle,
1104                                      LUSTRE_MDT0_NAME,
1105                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
1106                                      NULL);
1107         if (m->mdt_service == NULL)
1108                 RETURN(-ENOMEM);
1109
1110         rc = ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
1111         if (rc)
1112                 GOTO(err_mdt_svc, rc);
1113
1114         /*start mdt fld service */
1115
1116         m->mdt_service_conf.psc_req_portal = MDS_FLD_PORTAL;
1117
1118         m->mdt_fld_service =
1119                 ptlrpc_init_svc_conf(&m->mdt_service_conf, mdt_handle,
1120                                      LUSTRE_FLD0_NAME,
1121                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
1122                                      NULL);
1123         if (m->mdt_fld_service == NULL)
1124                 RETURN(-ENOMEM);
1125
1126         rc = ptlrpc_start_threads(NULL, m->mdt_fld_service, LUSTRE_FLD0_NAME);
1127         if (rc)
1128                 GOTO(err_fld_svc, rc);
1129
1130         RETURN(rc);
1131 err_fld_svc:
1132         ptlrpc_unregister_service(m->mdt_fld_service);
1133         m->mdt_fld_service = NULL;
1134 err_mdt_svc:
1135         ptlrpc_unregister_service(m->mdt_service);
1136         m->mdt_service = NULL;
1137
1138         RETURN(rc);
1139 }
1140
1141 static void mdt_stack_fini(struct mdt_device *m)
1142 {
1143         if (m->mdt_child) {
1144                 struct lu_device *d = md2lu_dev(m->mdt_child);
1145                 /* goes through all stack */
1146                 while (d != NULL) {
1147                         struct lu_device *n;
1148                         struct obd_type *type;
1149                         struct lu_device_type *ldt = d->ld_type;
1150
1151                         lu_device_put(d);
1152
1153                         /* each fini() returns next device in stack of layers
1154                          * so we can avoid the recursion */
1155                         n = ldt->ldt_ops->ldto_device_fini(d);
1156                         ldt->ldt_ops->ldto_device_free(d);
1157
1158                         type = ldt->obd_type;
1159                         type->typ_refcnt--;
1160                         class_put_type(type);
1161                         /* switch to the next device in the layer */
1162                         d = n;
1163                 }
1164         }
1165 }
1166
1167 static struct lu_device *mdt_layer_setup(const char *typename,
1168                                          struct lu_device *child,
1169                                          struct lustre_cfg *cfg)
1170 {
1171         struct obd_type       *type;
1172         struct lu_device_type *ldt;
1173         struct lu_device      *d;
1174         int rc;
1175
1176         /* find the type */
1177         type = class_get_type(typename);
1178         if (!type) {
1179                 CERROR("Unknown type: '%s'\n", typename);
1180                 GOTO(out, rc = -ENODEV);
1181         }
1182
1183         ldt = type->typ_lu;
1184         ldt->obd_type = type;
1185         if (ldt == NULL) {
1186                 CERROR("type: '%s'\n", typename);
1187                 GOTO(out_type, rc = -EINVAL);
1188         }
1189
1190         d = ldt->ldt_ops->ldto_device_alloc(ldt, cfg);
1191         if (IS_ERR(d)) {
1192                 CERROR("Cannot allocate device: '%s'\n", typename);
1193                 GOTO(out_type, rc = -ENODEV);
1194         }
1195
1196         LASSERT(child->ld_site);
1197         d->ld_site = child->ld_site;
1198
1199         type->typ_refcnt++;
1200         rc = ldt->ldt_ops->ldto_device_init(d, child);
1201         if (rc) {
1202                 CERROR("can't init device '%s', rc %d\n", typename, rc);
1203                 GOTO(out_alloc, rc);
1204         }
1205         lu_device_get(d);
1206
1207         RETURN(d);
1208 out_alloc:
1209         ldt->ldt_ops->ldto_device_free(d);
1210         type->typ_refcnt--;
1211 out_type:
1212         class_put_type(type);
1213 out:
1214         RETURN(ERR_PTR(rc));
1215 }
1216
1217 static int mdt_stack_init(struct mdt_device *m, struct lustre_cfg *cfg)
1218 {
1219         struct lu_device  *d = &m->mdt_md_dev.md_lu_dev;
1220         int rc;
1221
1222         /* init the stack */
1223         d = mdt_layer_setup(LUSTRE_OSD0_NAME, d, cfg);
1224         if (IS_ERR(d)) {
1225                 GOTO(out, rc = PTR_ERR(d));
1226         }
1227
1228         d = mdt_layer_setup(LUSTRE_MDD0_NAME, d, cfg);
1229         if (IS_ERR(d)) {
1230                 GOTO(out, rc = PTR_ERR(d));
1231         }
1232
1233         d = mdt_layer_setup(LUSTRE_CMM0_NAME, d, cfg);
1234         if (IS_ERR(d)) {
1235                 GOTO(out, rc = PTR_ERR(d));
1236         }
1237
1238         m->mdt_child = lu2md_dev(d);
1239
1240         RETURN(0);
1241 out:
1242         mdt_stack_fini(m);
1243         return rc;
1244 }
1245
1246 static void mdt_fini(struct mdt_device *m)
1247 {
1248         struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
1249
1250         mdt_stop_ptlrpc_service(m);
1251
1252         /* finish the stack */
1253         mdt_stack_fini(m);
1254
1255         if (d->ld_site != NULL) {
1256                 struct lustre_mount_info *lmi = d->ld_site->ls_lmi;
1257                 if (lmi)
1258                         server_put_mount(lmi->lmi_name, lmi->lmi_mnt);
1259                 lu_site_fini(d->ld_site);
1260                 OBD_FREE_PTR(d->ld_site);
1261                 d->ld_site = NULL;
1262         }
1263         if (m->mdt_namespace != NULL) {
1264                 ldlm_namespace_free(m->mdt_namespace, 0);
1265                 m->mdt_namespace = NULL;
1266         }
1267
1268         if (m->mdt_seq_mgr) {
1269                 seq_mgr_fini(m->mdt_seq_mgr);
1270                 m->mdt_seq_mgr = NULL;
1271         }
1272
1273         LASSERT(atomic_read(&d->ld_ref) == 0);
1274         md_device_fini(&m->mdt_md_dev);
1275 }
1276
1277 static int mdt_init0(struct mdt_device *m,
1278                      struct lu_device_type *t, struct lustre_cfg *cfg)
1279 {
1280         int rc;
1281         struct lu_site *s;
1282         char   ns_name[48];
1283         struct lu_context ctx;
1284         const char *dev = lustre_cfg_string(cfg, 0);
1285         struct lustre_mount_info *lmi;
1286
1287         ENTRY;
1288
1289         OBD_ALLOC_PTR(s);
1290         if (s == NULL)
1291                 RETURN(-ENOMEM);
1292
1293         md_device_init(&m->mdt_md_dev, t);
1294         m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
1295
1296         rc = lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
1297         if (rc) {
1298                 CERROR("can't init lu_site, rc %d\n", rc);
1299                 GOTO(err_fini_site, rc);
1300         }
1301
1302         /* get mount */
1303         lmi = server_get_mount(dev);
1304         if (lmi == NULL) {
1305                 CERROR("Cannot get mount info for %s!\n", dev);
1306                 GOTO(err_fini_site, rc = -EFAULT);
1307         }
1308         //put lmi into lu_site
1309         s->ls_lmi = lmi;
1310
1311         /* init the stack */
1312         rc = mdt_stack_init(m, cfg);
1313         if (rc) {
1314                 CERROR("can't init device stack, rc %d\n", rc);
1315                 GOTO(err_fini_mount, rc);
1316         }
1317
1318         m->mdt_seq_mgr = seq_mgr_init(&seq_mgr_ops, m);
1319         if (!m->mdt_seq_mgr) {
1320                 CERROR("can't initialize sequence manager\n");
1321                 GOTO(err_fini_stack, rc);
1322         }
1323
1324         rc = lu_context_init(&ctx);
1325         if (rc != 0)
1326                 GOTO(err_fini_mgr, rc);
1327
1328         lu_context_enter(&ctx);
1329         /* init sequence info after device stack is initialized. */
1330         rc = seq_mgr_setup(&ctx, m->mdt_seq_mgr);
1331         lu_context_exit(&ctx);
1332         if (rc)
1333                 GOTO(err_fini_ctx, rc);
1334
1335         lu_context_fini(&ctx);
1336
1337         snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
1338         m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
1339         if (m->mdt_namespace == NULL)
1340                 GOTO(err_fini_site, rc = -ENOMEM);
1341
1342         ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
1343
1344         rc = mdt_fld_init(m);
1345         if (rc)
1346                 GOTO(err_free_ns, rc);
1347
1348         rc = mdt_start_ptlrpc_service(m);
1349         if (rc)
1350                 GOTO(err_free_fld, rc);
1351         RETURN(0);
1352
1353 err_free_fld:
1354         mdt_fld_fini(m);
1355 err_free_ns:
1356         ldlm_namespace_free(m->mdt_namespace, 0);
1357         m->mdt_namespace = NULL;
1358 err_fini_ctx:
1359         lu_context_fini(&ctx);
1360 err_fini_mgr:
1361         seq_mgr_fini(m->mdt_seq_mgr);
1362         m->mdt_seq_mgr = NULL;
1363 err_fini_stack:
1364         mdt_stack_fini(m);
1365 err_fini_mount:
1366         server_put_mount(lmi->lmi_name, lmi->lmi_mnt);
1367 err_fini_site:
1368         lu_site_fini(s);
1369         OBD_FREE_PTR(s);
1370         RETURN(rc);
1371 }
1372
1373 static struct lu_object *mdt_object_alloc(struct lu_context *ctxt,
1374                                           struct lu_device *d)
1375 {
1376         struct mdt_object *mo;
1377
1378         OBD_ALLOC_PTR(mo);
1379         if (mo != NULL) {
1380                 struct lu_object *o;
1381                 struct lu_object_header *h;
1382
1383                 o = &mo->mot_obj.mo_lu;
1384                 h = &mo->mot_header;
1385                 lu_object_header_init(h);
1386                 lu_object_init(o, h, d);
1387                 lu_object_add_top(h, o);
1388                 return o;
1389         } else
1390                 return NULL;
1391 }
1392
1393 static int mdt_object_init(struct lu_context *ctxt, struct lu_object *o)
1394 {
1395         struct mdt_device *d = mdt_dev(o->lo_dev);
1396         struct lu_device  *under;
1397         struct lu_object  *below;
1398
1399         under = &d->mdt_child->md_lu_dev;
1400         below = under->ld_ops->ldo_object_alloc(ctxt, under);
1401         if (below != NULL) {
1402                 lu_object_add(o, below);
1403                 return 0;
1404         } else
1405                 return -ENOMEM;
1406 }
1407
1408 static void mdt_object_free(struct lu_context *ctxt, struct lu_object *o)
1409 {
1410         struct mdt_object *mo = mdt_obj(o);
1411         struct lu_object_header *h;
1412
1413         h = o->lo_header;
1414         lu_object_fini(o);
1415         lu_object_header_fini(h);
1416         OBD_FREE_PTR(mo);
1417 }
1418
1419 static void mdt_object_release(struct lu_context *ctxt, struct lu_object *o)
1420 {
1421 }
1422
1423 static int mdt_object_print(struct lu_context *ctxt,
1424                             struct seq_file *f, const struct lu_object *o)
1425 {
1426         return seq_printf(f, LUSTRE_MDT0_NAME"-object@%p", o);
1427 }
1428
1429 static struct lu_device_operations mdt_lu_ops = {
1430         .ldo_object_alloc   = mdt_object_alloc,
1431         .ldo_object_init    = mdt_object_init,
1432         .ldo_object_free    = mdt_object_free,
1433         .ldo_object_release = mdt_object_release,
1434         .ldo_object_print   = mdt_object_print
1435 };
1436
1437 /* mds_connect copy */
1438 static int mdt_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
1439                            struct obd_uuid *cluuid,
1440                            struct obd_connect_data *data)
1441 {
1442         struct obd_export *exp;
1443         int rc;
1444         struct mdt_device *mdt;
1445         struct mds_export_data *med;
1446         struct mds_client_data *mcd = NULL;
1447         ENTRY;
1448
1449         if (!conn || !obd || !cluuid)
1450                 RETURN(-EINVAL);
1451
1452         mdt = mdt_dev(obd->obd_lu_dev);
1453
1454         rc = class_connect(conn, obd, cluuid);
1455         if (rc)
1456                 RETURN(rc);
1457
1458         exp = class_conn2export(conn);
1459         LASSERT(exp);
1460         med = &exp->exp_mds_data;
1461
1462         OBD_ALLOC_PTR(mcd);
1463         if (!mcd)
1464                 GOTO(out, rc = -ENOMEM);
1465
1466         memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
1467         med->med_mcd = mcd;
1468
1469 out:
1470         if (rc) {
1471                 class_disconnect(exp);
1472         } else {
1473                 class_export_put(exp);
1474         }
1475
1476         RETURN(rc);
1477 }
1478
1479 static int mdt_obd_disconnect(struct obd_export *exp)
1480 {
1481         struct mds_export_data *med = &exp->exp_mds_data;
1482         unsigned long irqflags;
1483         int rc;
1484         ENTRY;
1485
1486         LASSERT(exp);
1487         class_export_get(exp);
1488
1489         /* Disconnect early so that clients can't keep using export */
1490         rc = class_disconnect(exp);
1491         //ldlm_cancel_locks_for_export(exp);
1492
1493         /* complete all outstanding replies */
1494         spin_lock_irqsave(&exp->exp_lock, irqflags);
1495         while (!list_empty(&exp->exp_outstanding_replies)) {
1496                 struct ptlrpc_reply_state *rs =
1497                         list_entry(exp->exp_outstanding_replies.next,
1498                                    struct ptlrpc_reply_state, rs_exp_list);
1499                 struct ptlrpc_service *svc = rs->rs_service;
1500
1501                 spin_lock(&svc->srv_lock);
1502                 list_del_init(&rs->rs_exp_list);
1503                 ptlrpc_schedule_difficult_reply(rs);
1504                 spin_unlock(&svc->srv_lock);
1505         }
1506         spin_unlock_irqrestore(&exp->exp_lock, irqflags);
1507
1508         OBD_FREE_PTR(med->med_mcd);
1509
1510         class_export_put(exp);
1511         RETURN(rc);
1512 }
1513
1514 static struct obd_ops mdt_obd_device_ops = {
1515         .o_owner = THIS_MODULE,
1516         .o_connect = mdt_obd_connect,
1517         .o_disconnect = mdt_obd_disconnect,
1518 };
1519
1520 static struct lu_device *mdt_device_alloc(struct lu_device_type *t,
1521                                           struct lustre_cfg *cfg)
1522 {
1523         struct lu_device  *l;
1524         struct mdt_device *m;
1525
1526         OBD_ALLOC_PTR(m);
1527         if (m != NULL) {
1528                 int result;
1529
1530                 l = &m->mdt_md_dev.md_lu_dev;
1531                 result = mdt_init0(m, t, cfg);
1532                 if (result != 0) {
1533                         mdt_fini(m);
1534                         return ERR_PTR(result);
1535                 }
1536
1537         } else
1538                 l = ERR_PTR(-ENOMEM);
1539         return l;
1540 }
1541
1542 static void mdt_device_free(struct lu_device *d)
1543 {
1544         struct mdt_device *m = mdt_dev(d);
1545
1546         mdt_fini(m);
1547         OBD_FREE_PTR(m);
1548 }
1549
1550 static void *mdt_thread_init(struct lu_context *ctx)
1551 {
1552         struct mdt_thread_info *info;
1553
1554         OBD_ALLOC_PTR(info);
1555         if (info != NULL)
1556                 info->mti_ctxt = ctx;
1557         else
1558                 info = ERR_PTR(-ENOMEM);
1559         return info;
1560 }
1561
1562 static void mdt_thread_fini(struct lu_context *ctx, void *data)
1563 {
1564         struct mdt_thread_info *info = data;
1565         OBD_FREE_PTR(info);
1566 }
1567
1568 static struct lu_context_key mdt_thread_key = {
1569         .lct_init = mdt_thread_init,
1570         .lct_fini = mdt_thread_fini
1571 };
1572
1573 static int mdt_type_init(struct lu_device_type *t)
1574 {
1575         return lu_context_key_register(&mdt_thread_key);
1576 }
1577
1578 static void mdt_type_fini(struct lu_device_type *t)
1579 {
1580         lu_context_key_degister(&mdt_thread_key);
1581 }
1582
1583 static struct lu_device_type_operations mdt_device_type_ops = {
1584         .ldto_init = mdt_type_init,
1585         .ldto_fini = mdt_type_fini,
1586
1587         .ldto_device_alloc = mdt_device_alloc,
1588         .ldto_device_free  = mdt_device_free
1589 };
1590
1591 static struct lu_device_type mdt_device_type = {
1592         .ldt_tags = LU_DEVICE_MD,
1593         .ldt_name = LUSTRE_MDT0_NAME,
1594         .ldt_ops  = &mdt_device_type_ops
1595 };
1596
1597 static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
1598         { 0 }
1599 };
1600
1601 static struct lprocfs_vars lprocfs_mdt_module_vars[] = {
1602         { 0 }
1603 };
1604
1605 LPROCFS_INIT_VARS(mdt, lprocfs_mdt_module_vars, lprocfs_mdt_obd_vars);
1606
1607 static int __init mdt_mod_init(void)
1608 {
1609         struct lprocfs_static_vars lvars;
1610
1611         mdt_num_threads = MDT_NUM_THREADS;
1612         lprocfs_init_vars(mdt, &lvars);
1613         return class_register_type(&mdt_obd_device_ops, lvars.module_vars,
1614                                    LUSTRE_MDT0_NAME, &mdt_device_type);
1615 }
1616
1617 static void __exit mdt_mod_exit(void)
1618 {
1619         class_unregister_type(LUSTRE_MDT0_NAME);
1620 }
1621
1622
1623 #define DEF_HNDL(prefix, base, suffix, flags, opc, fn)                  \
1624 [prefix ## _ ## opc - prefix ## _ ## base] = {                          \
1625         .mh_name    = #opc,                                             \
1626         .mh_fail_id = OBD_FAIL_ ## prefix ## _  ## opc ## suffix,       \
1627         .mh_opc     = prefix ## _  ## opc,                              \
1628         .mh_flags   = flags,                                            \
1629         .mh_act     = fn                                                \
1630 }
1631
1632 #define DEF_MDT_HNDL(flags, name, fn)                   \
1633         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn)
1634
1635 static struct mdt_handler mdt_mds_ops[] = {
1636         DEF_MDT_HNDL(0,            CONNECT,        mdt_connect),
1637         DEF_MDT_HNDL(0,            DISCONNECT,     mdt_disconnect),
1638         DEF_MDT_HNDL(0,            GETSTATUS,      mdt_getstatus),
1639         DEF_MDT_HNDL(HABEO_CORPUS, GETATTR,        mdt_getattr),
1640         DEF_MDT_HNDL(HABEO_CORPUS, GETATTR_NAME,   mdt_getattr_name),
1641         DEF_MDT_HNDL(HABEO_CORPUS, SETXATTR,       mdt_setxattr),
1642         DEF_MDT_HNDL(HABEO_CORPUS, GETXATTR,       mdt_getxattr),
1643         DEF_MDT_HNDL(0,            STATFS,         mdt_statfs),
1644         DEF_MDT_HNDL(HABEO_CORPUS, READPAGE,       mdt_readpage),
1645         DEF_MDT_HNDL(0,            REINT,          mdt_reint),
1646         DEF_MDT_HNDL(HABEO_CORPUS, CLOSE,          mdt_close),
1647         DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING,   mdt_done_writing),
1648         DEF_MDT_HNDL(0,            PIN,            mdt_pin),
1649         DEF_MDT_HNDL(HABEO_CORPUS, SYNC,           mdt_sync),
1650         DEF_MDT_HNDL(0,            FLD,            mdt_fld),
1651         DEF_MDT_HNDL(0,            QUOTACHECK,     mdt_handle_quotacheck),
1652         DEF_MDT_HNDL(0,            QUOTACTL,       mdt_handle_quotactl)
1653 };
1654
1655 static struct mdt_handler mdt_obd_ops[] = {
1656 };
1657
1658 #define DEF_DLM_HNDL(flags, name, fn)                   \
1659         DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn)
1660
1661 static struct mdt_handler mdt_dlm_ops[] = {
1662         DEF_DLM_HNDL(HABEO_CLAVIS, ENQUEUE,        mdt_enqueue),
1663         DEF_DLM_HNDL(HABEO_CLAVIS, CONVERT,        mdt_convert),
1664         DEF_DLM_HNDL(0,            BL_CALLBACK,    mdt_bl_callback),
1665         DEF_DLM_HNDL(0,            CP_CALLBACK,    mdt_cp_callback)
1666 };
1667
1668 static struct mdt_handler mdt_llog_ops[] = {
1669 };
1670
1671 static struct mdt_opc_slice mdt_handlers[] = {
1672         {
1673                 .mos_opc_start = MDS_GETATTR,
1674                 .mos_opc_end   = MDS_LAST_OPC,
1675                 .mos_hs        = mdt_mds_ops
1676         },
1677         {
1678                 .mos_opc_start = OBD_PING,
1679                 .mos_opc_end   = OBD_LAST_OPC,
1680                 .mos_hs        = mdt_obd_ops
1681         },
1682         {
1683                 .mos_opc_start = LDLM_ENQUEUE,
1684                 .mos_opc_end   = LDLM_LAST_OPC,
1685                 .mos_hs        = mdt_dlm_ops
1686         },
1687         {
1688                 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
1689                 .mos_opc_end   = LLOG_LAST_OPC,
1690                 .mos_hs        = mdt_llog_ops
1691         },
1692         {
1693                 .mos_hs        = NULL
1694         }
1695 };
1696
1697 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1698 MODULE_DESCRIPTION("Lustre Meta-data Target Prototype ("LUSTRE_MDT0_NAME")");
1699 MODULE_LICENSE("GPL");
1700
1701 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
1702                 "number of mdt service threads to start");
1703
1704 cfs_module(mdt, "0.0.4", mdt_mod_init, mdt_mod_exit);