Whamcloud - gitweb
Branch: b_new_cmd
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mds/handler.c
5  *  Lustre Metadata Target (mdt) request handler
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *   Author: Nikita Danilov <nikita@clusterfs.com>
13  *
14  *   This file is part of the Lustre file system, http://www.lustre.org
15  *   Lustre is a trademark of Cluster File Systems, Inc.
16  *
17  *   You may have signed or agreed to another license before downloading
18  *   this software.  If so, you are bound by the terms and conditions
19  *   of that agreement, and the following does not apply to you.  See the
20  *   LICENSE file included with this distribution for more information.
21  *
22  *   If you did not agree to a different license, then this copy of Lustre
23  *   is open source software; you can redistribute it and/or modify it
24  *   under the terms of version 2 of the GNU General Public License as
25  *   published by the Free Software Foundation.
26  *
27  *   In either case, Lustre is distributed in the hope that it will be
28  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
29  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
30  *   license text for more details.
31  */
32
33 #ifndef EXPORT_SYMTAB
34 # define EXPORT_SYMTAB
35 #endif
36 #define DEBUG_SUBSYSTEM S_MDS
37
38 #include <linux/module.h>
39
40 /* LUSTRE_VERSION_CODE */
41 #include <linux/lustre_ver.h>
42 /*
43  * struct OBD_{ALLOC,FREE}*()
44  * OBD_FAIL_CHECK
45  */
46 #include <linux/obd_support.h>
47 /* struct ptlrpc_request */
48 #include <linux/lustre_net.h>
49 /* struct obd_export */
50 #include <linux/lustre_export.h>
51 /* struct obd_device */
52 #include <linux/obd.h>
53
54 /* struct mds_client_data */
55 #include "../mds/mds_internal.h"
56 #include "mdt_internal.h"
57
58 /*
59  * Initialized in mdt_mod_init().
60  */
61 unsigned long mdt_num_threads;
62
63 static int mdt_handle(struct ptlrpc_request *req);
64 static struct ptlrpc_thread_key mdt_thread_key;
65
66 /* object operations */
67 #if 0
68 static int mdt_md_mkdir(struct mdt_thread_info *info, struct mdt_device *d,
69                         struct lu_fid *pfid, const char *name, struct lu_fid *cfid)
70 {
71         struct mdt_object      *o;
72         struct mdt_object      *child;
73         struct mdt_lock_handle *lh;
74
75         int result;
76
77         lh = &info->mti_lh[MDT_LH_PARENT];
78         lh->mlh_mode = LCK_PW;
79
80         o = mdt_object_find_lock(d, pfid, lh, MDS_INODELOCK_UPDATE);
81         if (IS_ERR(o))
82                 return PTR_ERR(o);
83
84         child = mdt_object_find(d, cfid);
85         if (!IS_ERR(child)) {
86                 struct md_object *next = mdt_object_child(o);
87
88                 result = next->mo_ops->moo_mkdir(&info->mti_ctxt, next, name,
89                                                  mdt_object_child(child));
90                 mdt_object_put(child);
91         } else
92                 result = PTR_ERR(child);
93         mdt_object_unlock(d->mdt_namespace, o, lh);
94         mdt_object_put(o);
95         return result;
96 }
97 #endif
98 static int mdt_md_getattr(struct mdt_thread_info *info, struct lu_fid *fid)
99 {
100         struct mdt_device *d = info->mti_mdt;
101         struct mdt_object *o;
102         int               result;
103
104         ENTRY;
105
106         o = mdt_object_find(&info->mti_ctxt, d, fid);
107         if (IS_ERR(o))
108                 return PTR_ERR(o);
109         /* attr are in mti_ctxt */
110         result = 0;
111         mdt_object_put(&info->mti_ctxt, o);
112
113         RETURN(result);
114 }
115
116 static int mdt_getstatus(struct mdt_thread_info *info,
117                          struct ptlrpc_request *req, int offset)
118 {
119         struct md_device *next  = info->mti_mdt->mdt_child;
120         struct mdt_body  *body;
121         int               size = sizeof *body;
122         int               result;
123
124         ENTRY;
125
126         result = lustre_pack_reply(req, 1, &size, NULL);
127         if (result)
128                 CERROR(LUSTRE_MDT0_NAME" out of memory for message: size=%d\n",
129                        size);
130         else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
131                 result = -ENOMEM;
132         else {
133                 body = lustre_msg_buf(req->rq_repmsg, 0, sizeof *body);
134                 result = next->md_ops->mdo_root_get(&info->mti_ctxt,
135                                                     next, &body->fid1);
136         }
137
138         /* the last_committed and last_xid fields are filled in for all
139          * replies already - no need to do so here also.
140          */
141         RETURN(result);
142 }
143
144 static int mdt_statfs(struct mdt_thread_info *info,
145                       struct ptlrpc_request *req, int offset)
146 {
147         struct md_device  *next  = info->mti_mdt->mdt_child;
148         struct obd_statfs *osfs;
149         struct kstatfs    sfs;
150         int               result;
151         int               size = sizeof(struct obd_statfs);
152
153         ENTRY;
154
155         result = lustre_pack_reply(req, 1, &size, NULL);
156         if (result)
157                 CERROR(LUSTRE_MDT0_NAME" out of memory for statfs: size=%d\n",
158                        size);
159         else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
160                 CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
161                 result = -ENOMEM;
162         } else {
163                 osfs = lustre_msg_buf(req->rq_repmsg, 0, size);
164                 /* XXX max_age optimisation is needed here. See mds_statfs */
165                 result = next->md_ops->mdo_statfs(&info->mti_ctxt, next, &sfs);
166                 statfs_pack(osfs, &sfs);
167         }
168
169         RETURN(result);
170 }
171
172 static void mdt_pack_attr2body(struct mdt_body *b, struct lu_attr *attr)
173 {
174         b->valid |= OBD_MD_FLID | OBD_MD_FLCTIME | OBD_MD_FLUID |
175                     OBD_MD_FLGID | OBD_MD_FLFLAGS | OBD_MD_FLTYPE |
176                     OBD_MD_FLMODE | OBD_MD_FLNLINK | OBD_MD_FLGENER;
177
178         if (!S_ISREG(attr->la_mode))
179                 b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLATIME |
180                             OBD_MD_FLMTIME;
181
182         b->atime      = attr->la_atime;
183         b->mtime      = attr->la_mtime;
184         b->ctime      = attr->la_ctime;
185         b->mode       = attr->la_mode;
186         b->size       = attr->la_size;
187         b->blocks     = attr->la_blocks;
188         b->uid        = attr->la_uid;
189         b->gid        = attr->la_gid;
190         b->flags      = attr->la_flags;
191         b->nlink      = attr->la_nlink;
192 }
193
194 static int mdt_getattr(struct mdt_thread_info *info,
195                        struct ptlrpc_request *req, int offset)
196 {
197         struct mdt_body        *body;
198         int                    size = sizeof (*body);
199         struct lu_attr  *attr;
200         int result;
201
202         ENTRY;
203
204         OBD_ALLOC_PTR(attr);
205         if (attr == NULL)
206                 return -ENOMEM;
207
208         result = lustre_pack_reply(req, 1, &size, NULL);
209         if (result)
210                 CERROR(LUSTRE_MDT0_NAME" out of memory for statfs: size=%d\n",
211                        size);
212         else if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK)) {
213                 CERROR(LUSTRE_MDT0_NAME": statfs lustre_pack_reply failed\n");
214                 result = -ENOMEM;
215         } else {
216                 body = lustre_msg_buf(req->rq_repmsg, 0, size);
217                 result = mdt_md_getattr(info, &body->fid1);
218                 if (result == 0)
219                         mdt_pack_attr2body(body, &info->mti_ctxt.lc_attr);
220         }
221         OBD_FREE_PTR(attr);
222         RETURN(result);
223 }
224
225 static int mdt_set_info(struct mdt_thread_info *info,
226                         struct ptlrpc_request *req, int offset)
227 {
228         struct md_device *next  = info->mti_mdt->mdt_child;
229         char *key;
230         int keylen, rc = 0;
231         ENTRY;
232         
233         key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
234         if (key == NULL) {
235                 DEBUG_REQ(D_HA, req, "no set_info key");
236                 RETURN(-EFAULT);
237         }
238         keylen = req->rq_reqmsg->buflens[0];
239        
240         if (((keylen >= strlen("fld_create") &&
241             memcmp(key, "fld_create", keylen) == 0)) || 
242             ((keylen >= strlen("fld_delete") &&
243             memcmp(key, "fld_delete", keylen) == 0))) {
244                 struct md_fld mf, *p;
245                 __u32 size = sizeof(struct md_fld);
246
247                 rc = lustre_pack_reply(req, 0, NULL, NULL);
248                 if (rc)
249                         RETURN(rc);
250                 
251                 p = lustre_swab_reqbuf(req, 1, sizeof(mf), lustre_swab_md_fld);
252                 mf = *p;
253                 rc = next->md_ops->mdo_get_info(&info->mti_ctxt, next, keylen, 
254                                                 key, &size, &mf);
255                 RETURN(rc); 
256         }
257
258         CDEBUG(D_IOCTL, "invalid key\n");
259         RETURN(-EINVAL);
260
261 }
262
263 static int mdt_get_info(struct mdt_thread_info *info,
264                         struct ptlrpc_request *req, int offset)
265 {
266         struct md_device *next  = info->mti_mdt->mdt_child;
267         char *key;
268         int keylen, rc = 0;
269         ENTRY;
270         
271         key = lustre_msg_buf(req->rq_reqmsg, 0, 1);
272         if (key == NULL) {
273                 DEBUG_REQ(D_HA, req, "no set_info key");
274                 RETURN(-EFAULT);
275         }
276         keylen = req->rq_reqmsg->buflens[0];
277        
278         if (((keylen >= strlen("fld_get") &&
279             memcmp(key, "fld_get", keylen) == 0))) {
280                 struct md_fld mf, *p, *reply;
281                 int size = sizeof(*reply);
282                
283                 rc = lustre_pack_reply(req, 1, &size, NULL);
284                 if (rc)
285                         RETURN(rc);
286                 p = lustre_swab_reqbuf(req, 1, sizeof(mf), lustre_swab_md_fld);
287                 mf = *p;
288                 rc = next->md_ops->mdo_get_info(&info->mti_ctxt, next, keylen, 
289                                                 key, &size, &mf);
290                 reply = lustre_msg_buf(req->rq_repmsg, 0, size);
291                 *reply = mf;
292                 RETURN(rc); 
293         }
294
295         CDEBUG(D_IOCTL, "invalid key\n");
296         RETURN(-EINVAL);
297 }
298
299 static struct lu_device_operations mdt_lu_ops;
300
301 static int lu_device_is_mdt(struct lu_device *d)
302 {
303         /*
304          * XXX for now. Tags in lu_device_type->ldt_something are needed.
305          */
306         return ergo(d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
307 }
308
309 static struct mdt_device *mdt_dev(struct lu_device *d)
310 {
311         LASSERT(lu_device_is_mdt(d));
312         return container_of(d, struct mdt_device, mdt_md_dev.md_lu_dev);
313 }
314
315 static int mdt_connect(struct mdt_thread_info *info,
316                        struct ptlrpc_request *req, int offset)
317 {
318         int result;
319
320         result = target_handle_connect(req, mdt_handle);
321         if (result == 0) {
322                 struct mdt_device *mdt = info->mti_mdt;
323                 struct obd_connect_data *data;
324
325                 data = lustre_msg_buf(req->rq_repmsg, 0, sizeof *data);
326                 result = seq_mgr_alloc(&info->mti_ctxt,
327                                        mdt->mdt_seq_mgr, &data->ocd_seq);
328         }
329         return result;
330 }
331
332 static int mdt_disconnect(struct mdt_thread_info *info,
333                           struct ptlrpc_request *req, int offset)
334 {
335         return -EOPNOTSUPP;
336 }
337
338 static int mdt_getattr_name(struct mdt_thread_info *info,
339                             struct ptlrpc_request *req, int offset)
340 {
341         return -EOPNOTSUPP;
342 }
343
344 static int mdt_setxattr(struct mdt_thread_info *info,
345                         struct ptlrpc_request *req, int offset)
346 {
347         return -EOPNOTSUPP;
348 }
349
350 static int mdt_getxattr(struct mdt_thread_info *info,
351                         struct ptlrpc_request *req, int offset)
352 {
353         return -EOPNOTSUPP;
354 }
355
356 static int mdt_readpage(struct mdt_thread_info *info,
357                         struct ptlrpc_request *req, int offset)
358 {
359         return -EOPNOTSUPP;
360 }
361
362 static int mdt_reint(struct mdt_thread_info *info,
363                      struct ptlrpc_request *req, int offset)
364 {
365         return -EOPNOTSUPP;
366 }
367
368 static int mdt_close(struct mdt_thread_info *info,
369                      struct ptlrpc_request *req, int offset)
370 {
371         return -EOPNOTSUPP;
372 }
373
374 static int mdt_done_writing(struct mdt_thread_info *info,
375                             struct ptlrpc_request *req, int offset)
376 {
377         return -EOPNOTSUPP;
378 }
379
380 static int mdt_pin(struct mdt_thread_info *info,
381                    struct ptlrpc_request *req, int offset)
382 {
383         return -EOPNOTSUPP;
384 }
385
386 static int mdt_sync(struct mdt_thread_info *info,
387                     struct ptlrpc_request *req, int offset)
388 {
389         return -EOPNOTSUPP;
390 }
391
392 static int mdt_handle_quotacheck(struct mdt_thread_info *info,
393                                  struct ptlrpc_request *req, int offset)
394 {
395         return -EOPNOTSUPP;
396 }
397
398 static int mdt_handle_quotactl(struct mdt_thread_info *info,
399                                struct ptlrpc_request *req, int offset)
400 {
401         return -EOPNOTSUPP;
402 }
403
404 /*
405  * DLM handlers.
406  */
407
408 static struct ldlm_callback_suite cbs = {
409         .lcs_completion = ldlm_server_completion_ast,
410         .lcs_blocking   = ldlm_server_blocking_ast,
411         .lcs_glimpse    = NULL
412 };
413
414 static int mdt_enqueue(struct mdt_thread_info *info,
415                        struct ptlrpc_request *req, int offset)
416 {
417         /*
418          * info->mti_dlm_req already contains swapped and (if necessary)
419          * converted dlm request.
420          */
421         LASSERT(info->mti_dlm_req);
422
423         info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
424         return ldlm_handle_enqueue0(req, info->mti_dlm_req, &cbs);
425 }
426
427 static int mdt_convert(struct mdt_thread_info *info,
428                        struct ptlrpc_request *req, int offset)
429 {
430         LASSERT(info->mti_dlm_req);
431         return ldlm_handle_convert0(req, info->mti_dlm_req);
432 }
433
434 static int mdt_bl_callback(struct mdt_thread_info *info,
435                            struct ptlrpc_request *req, int offset)
436 {
437         CERROR("bl callbacks should not happen on MDS\n");
438         LBUG();
439         return -EOPNOTSUPP;
440 }
441
442 static int mdt_cp_callback(struct mdt_thread_info *info,
443                            struct ptlrpc_request *req, int offset)
444 {
445         CERROR("cp callbacks should not happen on MDS\n");
446         LBUG();
447         return -EOPNOTSUPP;
448 }
449
450 /*
451  * Build (DLM) resource name from fid.
452  */
453 struct ldlm_res_id *fid_build_res_name(const struct lu_fid *f,
454                                        struct ldlm_res_id *name)
455 {
456         memset(name, 0, sizeof *name);
457         /* we use fid_num() whoch includes also object version instread of raw
458          * fid_oid(). */
459         name->name[0] = fid_seq(f);
460         name->name[1] = fid_num(f);
461         return name;
462 }
463
464 /*
465  * Return true if resource is for object identified by fid.
466  */
467 int fid_res_name_eq(const struct lu_fid *f, const struct ldlm_res_id *name)
468 {
469         return name->name[0] == fid_seq(f) && name->name[1] == fid_num(f);
470 }
471
472 /* issues dlm lock on passed @ns, @f stores it lock handle into @lh. */
473 int fid_lock(struct ldlm_namespace *ns, const struct lu_fid *f,
474              struct lustre_handle *lh, ldlm_mode_t mode,
475              ldlm_policy_data_t *policy)
476 {
477         struct ldlm_res_id res_id;
478         int flags = 0, rc;
479         ENTRY;
480
481         LASSERT(ns != NULL);
482         LASSERT(lh != NULL);
483         LASSERT(f != NULL);
484
485         /* FIXME: is that correct to have @flags=0 here? */
486         rc = ldlm_cli_enqueue(NULL, NULL, ns, *fid_build_res_name(f, &res_id),
487                               LDLM_IBITS, policy, mode, &flags,
488                               ldlm_blocking_ast, ldlm_completion_ast, NULL,
489                               NULL, NULL, 0, NULL, lh);
490         RETURN (rc == ELDLM_OK ? 0 : -EIO);
491 }
492
493 void fid_unlock(struct ldlm_namespace *ns, const struct lu_fid *f,
494                 struct lustre_handle *lh, ldlm_mode_t mode)
495 {
496         struct ldlm_lock *lock;
497         ENTRY;
498
499         /* FIXME: this is debug stuff, remove it later. */
500         lock = ldlm_handle2lock(lh);
501         if (!lock) {
502                 CERROR("invalid lock handle "LPX64, lh->cookie);
503                 LBUG();
504         }
505
506         LASSERT(fid_res_name_eq(f, &lock->l_resource->lr_name));
507
508         ldlm_lock_decref(lh, mode);
509         EXIT;
510 }
511
512 static struct mdt_object *mdt_obj(struct lu_object *o)
513 {
514         LASSERT(lu_device_is_mdt(o->lo_dev));
515         return container_of(o, struct mdt_object, mot_obj.mo_lu);
516 }
517
518 struct mdt_object *mdt_object_find(struct lu_context *ctxt,
519                                    struct mdt_device *d,
520                                    struct lu_fid *f)
521 {
522         struct lu_object *o;
523
524         o = lu_object_find(ctxt, d->mdt_md_dev.md_lu_dev.ld_site, f);
525         if (IS_ERR(o))
526                 return (struct mdt_object *)o;
527         else
528                 return mdt_obj(o);
529 }
530
531 void mdt_object_put(struct lu_context *ctxt, struct mdt_object *o)
532 {
533         lu_object_put(ctxt, &o->mot_obj.mo_lu);
534 }
535
536 struct lu_fid *mdt_object_fid(struct mdt_object *o)
537 {
538         return lu_object_fid(&o->mot_obj.mo_lu);
539 }
540
541 int mdt_object_lock(struct ldlm_namespace *ns, struct mdt_object *o,
542                     struct mdt_lock_handle *lh, __u64 ibits)
543 {
544         ldlm_policy_data_t p = {
545                 .l_inodebits = {
546                         .bits = ibits
547                 }
548         };
549         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
550         LASSERT(lh->mlh_mode != LCK_MINMODE);
551
552         return fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode, &p);
553 }
554
555 void mdt_object_unlock(struct ldlm_namespace *ns, struct mdt_object *o,
556                               struct mdt_lock_handle *lh)
557 {
558         if (lustre_handle_is_used(&lh->mlh_lh)) {
559                 fid_unlock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode);
560                 lh->mlh_lh.cookie = 0;
561         }
562 }
563
564 struct mdt_object *mdt_object_find_lock(struct lu_context *ctxt,
565                                         struct mdt_device *d,
566                                         struct lu_fid *f,
567                                         struct mdt_lock_handle *lh,
568                                         __u64 ibits)
569 {
570         struct mdt_object *o;
571
572         o = mdt_object_find(ctxt, d, f);
573         if (!IS_ERR(o)) {
574                 int result;
575
576                 result = mdt_object_lock(d->mdt_namespace, o, lh, ibits);
577                 if (result != 0) {
578                         mdt_object_put(ctxt, o);
579                         o = ERR_PTR(result);
580                 }
581         }
582         return o;
583 }
584
585 struct mdt_handler {
586         const char *mh_name;
587         int         mh_fail_id;
588         __u32       mh_opc;
589         __u32       mh_flags;
590         int (*mh_act)(struct mdt_thread_info *info,
591                       struct ptlrpc_request *req, int offset);
592 };
593
594 enum mdt_handler_flags {
595         /*
596          * struct mdt_body is passed in the 0-th incoming buffer.
597          */
598         HABEO_CORPUS = (1 << 0),
599         /*
600          * struct ldlm_request is passed in MDS_REQ_INTENT_LOCKREQ_OFF-th
601          * incoming buffer.
602          */
603         HABEO_CLAVIS   = (1 << 1)
604 };
605
606 struct mdt_opc_slice {
607         __u32               mos_opc_start;
608         int                 mos_opc_end;
609         struct mdt_handler *mos_hs;
610 };
611
612 static struct mdt_opc_slice mdt_handlers[];
613
614 static struct mdt_handler *mdt_handler_find(__u32 opc)
615 {
616         struct mdt_opc_slice *s;
617         struct mdt_handler   *h;
618
619         h = NULL;
620         for (s = mdt_handlers; s->mos_hs != NULL; s++) {
621                 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
622                         h = s->mos_hs + (opc - s->mos_opc_start);
623                         if (h->mh_opc != 0)
624                                 LASSERT(h->mh_opc == opc);
625                         else
626                                 h = NULL; /* unsupported opc */
627                         break;
628                 }
629         }
630         return h;
631 }
632
633 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
634 {
635         return req->rq_export->exp_mds_data.med_mcd->mcd_last_xid;
636 }
637
638 static int mdt_lock_resname_compat(struct mdt_device *m,
639                                    struct ldlm_request *req)
640 {
641         /* XXX something... later. */
642         return 0;
643 }
644
645 static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
646 {
647         /* XXX something... later. */
648         return 0;
649 }
650
651 /*
652  * Invoke handler for this request opc. Also do necessary preprocessing
653  * (according to handler ->mh_flags), and post-processing (setting of
654  * ->last_{xid,committed}).
655  */
656 static int mdt_req_handle(struct mdt_thread_info *info,
657                           struct mdt_handler *h, struct ptlrpc_request *req,
658                           int shift)
659 {
660         int result;
661         int off;
662         int lock_conv;
663
664         ENTRY;
665
666         LASSERT(h->mh_act != NULL);
667         LASSERT(h->mh_opc == req->rq_reqmsg->opc);
668         LASSERT(current->journal_info == NULL);
669
670         DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
671
672         if (h->mh_fail_id != 0)
673                 OBD_FAIL_RETURN(h->mh_fail_id, 0);
674
675         off = MDS_REQ_REC_OFF + shift;
676         lock_conv =
677                 h->mh_flags & HABEO_CLAVIS &&
678                 info->mti_mdt->mdt_flags & MDT_CL_COMPAT_RESNAME;
679
680         result = 0;
681         if (h->mh_flags & HABEO_CORPUS) {
682                 struct mdt_body *body;
683
684                 body = info->mti_body =
685                         lustre_swab_reqbuf(req, off, sizeof *info->mti_body,
686                                            lustre_swab_mdt_body);
687                 if (body != NULL) {
688                         info->mti_object = mdt_object_find(&info->mti_ctxt,
689                                                            info->mti_mdt,
690                                                            &body->fid1);
691                         if (IS_ERR(info->mti_object))
692                                 result = PTR_ERR(info->mti_object);
693                 } else {
694                         CERROR("Can't unpack body\n");
695                         result = -EFAULT;
696                 }
697         } else if (lock_conv) {
698                 struct ldlm_request *dlm;
699
700                 LASSERT(shift == 0);
701                 dlm = info->mti_dlm_req =
702                         lustre_swab_reqbuf(req, MDS_REQ_INTENT_LOCKREQ_OFF,
703                                            sizeof *dlm,
704                                            lustre_swab_ldlm_request);
705                 if (dlm != NULL)
706                         result = mdt_lock_resname_compat(info->mti_mdt, dlm);
707                 else {
708                         CERROR("Can't unpack dlm request\n");
709                         result = -EFAULT;
710                 }
711         }
712         if (result == 0)
713                 /*
714                  * Process request.
715                  */
716                 result = h->mh_act(info, req, off);
717         /*
718          * XXX result value is unconditionally shoved into ->rq_status
719          * (original code sometimes placed error code into ->rq_status, and
720          * sometimes returned it to the
721          * caller). ptlrpc_server_handle_request() doesn't check return value
722          * anyway.
723          */
724         req->rq_status = result;
725
726         LASSERT(current->journal_info == NULL);
727
728         if (lock_conv) {
729                 struct ldlm_reply *rep;
730
731                 rep = lustre_msg_buf(req->rq_repmsg, 0, sizeof *rep);
732                 if (rep != NULL)
733                         result = mdt_lock_reply_compat(info->mti_mdt, rep);
734         }
735
736         /* If we're DISCONNECTing, the mds_export_data is already freed */
737         if (result == 0 && h->mh_opc != MDS_DISCONNECT) {
738                 req->rq_reqmsg->last_xid = le64_to_cpu(req_exp_last_xid(req));
739                 target_committed_to_req(req);
740         }
741         RETURN(result);
742 }
743
744 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
745 {
746         lh->mlh_lh.cookie = 0ull;
747         lh->mlh_mode = LCK_MINMODE;
748 }
749
750 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
751 {
752         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
753 }
754
755 static void mdt_thread_info_init(struct mdt_thread_info *info)
756 {
757         int i;
758
759         info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
760         /*
761          * Poison size array.
762          */
763         for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
764                 info->mti_rep_buf_size[i] = ~0;
765         info->mti_rep_buf_nr = i;
766         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
767                 mdt_lock_handle_init(&info->mti_lh[i]);
768         lu_context_enter(&info->mti_ctxt);
769 }
770
771 static void mdt_thread_info_fini(struct mdt_thread_info *info)
772 {
773         int i;
774
775         lu_context_exit(&info->mti_ctxt);
776         if (info->mti_object != NULL) {
777                 mdt_object_put(&info->mti_ctxt, info->mti_object);
778                 info->mti_object = NULL;
779         }
780         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
781                 mdt_lock_handle_fini(&info->mti_lh[i]);
782 }
783
784 static int mds_msg_check_version(struct lustre_msg *msg)
785 {
786         int rc;
787
788         /* TODO: enable the below check while really introducing msg version.
789          * it's disabled because it will break compatibility with b1_4.
790          */
791         return (0);
792
793         switch (msg->opc) {
794         case MDS_CONNECT:
795         case MDS_DISCONNECT:
796         case OBD_PING:
797                 rc = lustre_msg_check_version(msg, LUSTRE_OBD_VERSION);
798                 if (rc)
799                         CERROR("bad opc %u version %08x, expecting %08x\n",
800                                msg->opc, msg->version, LUSTRE_OBD_VERSION);
801                 break;
802         case MDS_GETSTATUS:
803         case MDS_GETATTR:
804         case MDS_GETATTR_NAME:
805         case MDS_STATFS:
806         case MDS_READPAGE:
807         case MDS_REINT:
808         case MDS_CLOSE:
809         case MDS_DONE_WRITING:
810         case MDS_PIN:
811         case MDS_SYNC:
812         case MDS_GETXATTR:
813         case MDS_SETXATTR:
814         case MDS_SET_INFO:
815         case MDS_QUOTACHECK:
816         case MDS_QUOTACTL:
817         case QUOTA_DQACQ:
818         case QUOTA_DQREL:
819                 rc = lustre_msg_check_version(msg, LUSTRE_MDS_VERSION);
820                 if (rc)
821                         CERROR("bad opc %u version %08x, expecting %08x\n",
822                                msg->opc, msg->version, LUSTRE_MDS_VERSION);
823                 break;
824         case LDLM_ENQUEUE:
825         case LDLM_CONVERT:
826         case LDLM_BL_CALLBACK:
827         case LDLM_CP_CALLBACK:
828                 rc = lustre_msg_check_version(msg, LUSTRE_DLM_VERSION);
829                 if (rc)
830                         CERROR("bad opc %u version %08x, expecting %08x\n",
831                                msg->opc, msg->version, LUSTRE_DLM_VERSION);
832                 break;
833         case OBD_LOG_CANCEL:
834         case LLOG_ORIGIN_HANDLE_CREATE:
835         case LLOG_ORIGIN_HANDLE_NEXT_BLOCK:
836         case LLOG_ORIGIN_HANDLE_PREV_BLOCK:
837         case LLOG_ORIGIN_HANDLE_READ_HEADER:
838         case LLOG_ORIGIN_HANDLE_CLOSE:
839         case LLOG_CATINFO:
840                 rc = lustre_msg_check_version(msg, LUSTRE_LOG_VERSION);
841                 if (rc)
842                         CERROR("bad opc %u version %08x, expecting %08x\n",
843                                msg->opc, msg->version, LUSTRE_LOG_VERSION);
844                 break;
845         default:
846                 CERROR("MDS unknown opcode %d\n", msg->opc);
847                 rc = -ENOTSUPP;
848         }
849         return rc;
850 }
851
852 static int mdt_filter_recovery_request(struct ptlrpc_request *req,
853                                        struct obd_device *obd, int *process)
854 {
855         switch (req->rq_reqmsg->opc) {
856         case MDS_CONNECT: /* This will never get here, but for completeness. */
857         case OST_CONNECT: /* This will never get here, but for completeness. */
858         case MDS_DISCONNECT:
859         case OST_DISCONNECT:
860                *process = 1;
861                RETURN(0);
862
863         case MDS_CLOSE:
864         case MDS_SYNC: /* used in unmounting */
865         case OBD_PING:
866         case MDS_REINT:
867         case LDLM_ENQUEUE:
868                 *process = target_queue_recovery_request(req, obd);
869                 RETURN(0);
870
871         default:
872                 DEBUG_REQ(D_ERROR, req, "not permitted during recovery");
873                 *process = 0;
874                 /* XXX what should we set rq_status to here? */
875                 req->rq_status = -EAGAIN;
876                 RETURN(ptlrpc_error(req));
877         }
878 }
879
880 /*
881  * Handle recovery. Return:
882  *        +1: continue request processing;
883  *       -ve: abort immediately with the given error code;
884  *         0: send reply with error code in req->rq_status;
885  */
886 static int mdt_recovery(struct ptlrpc_request *req)
887 {
888         int recovering;
889         int abort_recovery;
890         struct obd_device *obd;
891
892         ENTRY;
893
894         if (req->rq_reqmsg->opc == MDS_CONNECT)
895                 RETURN(+1);
896
897         if (req->rq_export == NULL) {
898                 CERROR("operation %d on unconnected MDS from %s\n",
899                        req->rq_reqmsg->opc,
900                        libcfs_id2str(req->rq_peer));
901                 req->rq_status = -ENOTCONN;
902                 RETURN(0);
903         }
904
905         /* sanity check: if the xid matches, the request must be marked as a
906          * resent or replayed */
907         LASSERTF(ergo(req->rq_xid == req_exp_last_xid(req),
908                       lustre_msg_get_flags(req->rq_reqmsg) &
909                       (MSG_RESENT | MSG_REPLAY)),
910                  "rq_xid "LPU64" matches last_xid, "
911                  "expected RESENT flag\n", req->rq_xid);
912
913         /* else: note the opposite is not always true; a RESENT req after a
914          * failover will usually not match the last_xid, since it was likely
915          * never committed. A REPLAYed request will almost never match the
916          * last xid, however it could for a committed, but still retained,
917          * open. */
918
919         obd = req->rq_export->exp_obd;
920
921         /* Check for aborted recovery... */
922         spin_lock_bh(&obd->obd_processing_task_lock);
923         abort_recovery = obd->obd_abort_recovery;
924         recovering = obd->obd_recovering;
925         spin_unlock_bh(&obd->obd_processing_task_lock);
926         if (abort_recovery) {
927                 target_abort_recovery(obd);
928         } else if (recovering) {
929                 int rc;
930                 int should_process;
931
932                 rc = mdt_filter_recovery_request(req, obd, &should_process);
933                 if (rc != 0 || !should_process) {
934                         LASSERT(rc < 0);
935                         RETURN(rc);
936                 }
937         }
938         RETURN(+1);
939 }
940
941 static int mdt_reply(struct ptlrpc_request *req, struct mdt_thread_info *info)
942 {
943         struct obd_device *obd;
944
945         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_LAST_REPLAY) {
946                 if (req->rq_reqmsg->opc != OBD_PING)
947                         DEBUG_REQ(D_ERROR, req, "Unexpected MSG_LAST_REPLAY");
948
949                 obd = req->rq_export != NULL ? req->rq_export->exp_obd : NULL;
950                 if (obd && obd->obd_recovering) {
951                         DEBUG_REQ(D_HA, req, "LAST_REPLAY, queuing reply");
952                         RETURN(target_queue_final_reply(req, req->rq_status));
953                 } else {
954                         /* Lost a race with recovery; let the error path
955                          * DTRT. */
956                         req->rq_status = -ENOTCONN;
957                 }
958         }
959         target_send_reply(req, req->rq_status, info->mti_fail_id);
960         RETURN(req->rq_status);
961 }
962
963 static int mdt_handle0(struct ptlrpc_request *req, struct mdt_thread_info *info)
964 {
965         struct mdt_handler *h;
966         struct lustre_msg  *msg;
967         int                 result;
968
969         ENTRY;
970
971         OBD_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
972
973         LASSERT(current->journal_info == NULL);
974
975         msg = req->rq_reqmsg;
976         result = mds_msg_check_version(msg);
977         if (result == 0) {
978                 result = mdt_recovery(req);
979                 switch (result) {
980                 case +1:
981                         h = mdt_handler_find(msg->opc);
982                         if (h != NULL)
983                                 result = mdt_req_handle(info, h, req, 0);
984                         else {
985                                 req->rq_status = -ENOTSUPP;
986                                 result = ptlrpc_error(req);
987                                 break;
988                         }
989                         /* fall through */
990                 case 0:
991                         result = mdt_reply(req, info);
992                 }
993         } else
994                 CERROR(LUSTRE_MDT0_NAME" drops mal-formed request\n");
995         RETURN(result);
996 }
997
998 static int mdt_handle(struct ptlrpc_request *req)
999 {
1000         int result;
1001
1002         struct mdt_thread_info *info = ptlrpc_thread_key_get(req->rq_svc_thread,
1003                                                              &mdt_thread_key);
1004         ENTRY;
1005
1006         mdt_thread_info_init(info);
1007         /* it can be NULL while CONNECT */
1008         if (req->rq_export)
1009                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
1010         info->mti_ctxt.lc_thread = req->rq_svc_thread;
1011
1012         result = mdt_handle0(req, info);
1013         mdt_thread_info_fini(info);
1014         return result;
1015 }
1016
1017 static int mdt_intent_policy(struct ldlm_namespace *ns,
1018                              struct ldlm_lock **lockp, void *req_cookie,
1019                              ldlm_mode_t mode, int flags, void *data)
1020 {
1021         ENTRY;
1022         RETURN(ELDLM_LOCK_ABORTED);
1023 }
1024
1025 struct ptlrpc_service *ptlrpc_init_svc_conf(struct ptlrpc_service_conf *c,
1026                                             svc_handler_t h, char *name,
1027                                             struct proc_dir_entry *proc_entry,
1028                                             svcreq_printfn_t prntfn)
1029 {
1030         return ptlrpc_init_svc(c->psc_nbufs, c->psc_bufsize,
1031                                c->psc_max_req_size, c->psc_max_reply_size,
1032                                c->psc_req_portal, c->psc_rep_portal,
1033                                c->psc_watchdog_timeout,
1034                                h, name, proc_entry,
1035                                prntfn, c->psc_num_threads);
1036 }
1037
1038 static int mdt_config(struct lu_context *ctx, struct mdt_device *m,
1039                       const char *name, void *buf, int size, int mode)
1040 {
1041         struct md_device *child = m->mdt_child;
1042         ENTRY;
1043         RETURN(child->md_ops->mdo_config(ctx, child, name, buf, size, mode));
1044 }
1045
1046 static int mdt_seq_mgr_hpr(struct lu_context *ctx, void *opaque, __u64 *seq,
1047                            int mode)
1048 {
1049         struct mdt_device *m = opaque;
1050         int rc;
1051         ENTRY;
1052
1053         rc = mdt_config(ctx, m, LUSTRE_CONFIG_METASEQ,
1054                         seq, sizeof(*seq),
1055                         mode);
1056         RETURN(rc);
1057 }
1058
1059 static int mdt_seq_mgr_read(struct lu_context *ctx, void *opaque, __u64 *seq)
1060 {
1061         ENTRY;
1062         RETURN(mdt_seq_mgr_hpr(ctx, opaque, seq, LUSTRE_CONFIG_GET));
1063 }
1064
1065 static int mdt_seq_mgr_write(struct lu_context *ctx, void *opaque, __u64 *seq)
1066 {
1067         ENTRY;
1068         RETURN(mdt_seq_mgr_hpr(ctx, opaque, seq, LUSTRE_CONFIG_SET));
1069 }
1070
1071 struct lu_seq_mgr_ops seq_mgr_ops = {
1072         .smo_read  = mdt_seq_mgr_read,
1073         .smo_write = mdt_seq_mgr_write
1074 };
1075
1076 static void mdt_fini(struct mdt_device *m)
1077 {
1078         struct lu_device *d = &m->mdt_md_dev.md_lu_dev;
1079
1080         if (d->ld_site != NULL) {
1081                 lu_site_fini(d->ld_site);
1082                 OBD_FREE_PTR(d->ld_site);
1083                 d->ld_site = NULL;
1084         }
1085         if (m->mdt_service != NULL) {
1086                 ptlrpc_unregister_service(m->mdt_service);
1087                 m->mdt_service = NULL;
1088         }
1089         if (m->mdt_namespace != NULL) {
1090                 ldlm_namespace_free(m->mdt_namespace, 0);
1091                 m->mdt_namespace = NULL;
1092         }
1093         /* finish the stack */
1094         if (m->mdt_child) {
1095                 struct lu_device *child = md2lu_dev(m->mdt_child);
1096                 child->ld_type->ldt_ops->ldto_device_fini(child);
1097         }
1098
1099         if (m->mdt_seq_mgr) {
1100                 seq_mgr_fini(m->mdt_seq_mgr);
1101                 m->mdt_seq_mgr = NULL;
1102         }
1103
1104         LASSERT(atomic_read(&d->ld_ref) == 0);
1105         md_device_fini(&m->mdt_md_dev);
1106 }
1107
1108 static int mdt_init0(struct mdt_device *m,
1109                      struct lu_device_type *t, struct lustre_cfg *cfg)
1110 {
1111         int rc;
1112         struct lu_site *s;
1113         char   ns_name[48];
1114         struct obd_device *obd;
1115         struct lu_device  *mdt_child;
1116         const char *top   = lustre_cfg_string(cfg, 0);
1117         const char *child = lustre_cfg_string(cfg, 1);
1118         struct lu_context ctx;
1119
1120         ENTRY;
1121
1122         /* get next layer */
1123         obd = class_name2obd((char *)child);
1124         if (obd && obd->obd_lu_dev) {
1125                 CDEBUG(D_INFO, "Child device is %s\n", child);
1126                 m->mdt_child = lu2md_dev(obd->obd_lu_dev);
1127                 mdt_child = md2lu_dev(m->mdt_child);
1128         } else {
1129                 CDEBUG(D_INFO, "Child device %s is not found\n", child);
1130                 RETURN(-EINVAL);
1131         }
1132
1133         OBD_ALLOC_PTR(s);
1134         if (s == NULL)
1135                 RETURN(-ENOMEM);
1136
1137         md_device_init(&m->mdt_md_dev, t);
1138         m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
1139         lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
1140
1141         m->mdt_service_conf.psc_nbufs            = MDS_NBUFS;
1142         m->mdt_service_conf.psc_bufsize          = MDS_BUFSIZE;
1143         m->mdt_service_conf.psc_max_req_size     = MDS_MAXREQSIZE;
1144         m->mdt_service_conf.psc_max_reply_size   = MDS_MAXREPSIZE;
1145         m->mdt_service_conf.psc_req_portal       = MDS_REQUEST_PORTAL;
1146         m->mdt_service_conf.psc_rep_portal       = MDC_REPLY_PORTAL;
1147         m->mdt_service_conf.psc_watchdog_timeout = MDS_SERVICE_WATCHDOG_TIMEOUT;
1148         /*
1149          * We'd like to have a mechanism to set this on a per-device basis,
1150          * but alas...
1151          */
1152         m->mdt_service_conf.psc_num_threads = min(max(mdt_num_threads,
1153                                                       MDT_MIN_THREADS),
1154                                                   MDT_MAX_THREADS);
1155         snprintf(ns_name, sizeof ns_name, LUSTRE_MDT0_NAME"-%p", m);
1156         m->mdt_namespace = ldlm_namespace_new(ns_name, LDLM_NAMESPACE_SERVER);
1157         if (m->mdt_namespace == NULL)
1158                 GOTO(err_fini_site, rc = -ENOMEM);
1159
1160         ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
1161
1162         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
1163                            "mdt_ldlm_client", &m->mdt_ldlm_client);
1164
1165         m->mdt_service =
1166                 ptlrpc_init_svc_conf(&m->mdt_service_conf, mdt_handle,
1167                                      LUSTRE_MDT0_NAME,
1168                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
1169                                      NULL);
1170         if (m->mdt_service == NULL)
1171                 GOTO(err_free_ns, rc = -ENOMEM);
1172
1173         /* init the stack */
1174         LASSERT(mdt_child->ld_type->ldt_ops->ldto_device_init != NULL);
1175         rc = mdt_child->ld_type->ldt_ops->ldto_device_init(mdt_child, top);
1176         if (rc) {
1177                 CERROR("can't init device stack, rc %d\n", rc);
1178                 GOTO(err_free_svc, rc);
1179         }
1180
1181         m->mdt_seq_mgr = seq_mgr_init(&seq_mgr_ops, m);
1182         if (!m->mdt_seq_mgr) {
1183                 CERROR("can't initialize sequence manager\n");
1184                 GOTO(err_fini_child, rc);
1185         }
1186
1187         rc = lu_context_init(&ctx);
1188         if (rc != 0)
1189                 GOTO(err_fini_mgr, rc);
1190
1191         lu_context_enter(&ctx);
1192         /* init sequence info after device stack is initialized. */
1193         rc = seq_mgr_setup(&ctx, m->mdt_seq_mgr);
1194         lu_context_exit(&ctx);
1195         if (rc)
1196                 GOTO(err_fini_ctx, rc);
1197
1198         rc = ptlrpc_start_threads(NULL, m->mdt_service, LUSTRE_MDT0_NAME);
1199         if (rc)
1200                 GOTO(err_fini_ctx, rc);
1201
1202         lu_context_fini(&ctx);
1203         RETURN(0);
1204
1205 err_fini_ctx:
1206         lu_context_fini(&ctx);
1207 err_fini_mgr:
1208         seq_mgr_fini(m->mdt_seq_mgr);
1209         m->mdt_seq_mgr = NULL;
1210 err_fini_child:
1211         mdt_child->ld_type->ldt_ops->ldto_device_fini(mdt_child);
1212 err_free_svc:
1213         ptlrpc_unregister_service(m->mdt_service);
1214         m->mdt_service = NULL;
1215 err_free_ns:
1216         ldlm_namespace_free(m->mdt_namespace, 0);
1217         m->mdt_namespace = NULL;
1218 err_fini_site:
1219         lu_site_fini(s);
1220         OBD_FREE_PTR(s);
1221         RETURN(rc);
1222 }
1223
1224 static struct lu_object *mdt_object_alloc(struct lu_context *ctxt,
1225                                           struct lu_device *d)
1226 {
1227         struct mdt_object *mo;
1228
1229         OBD_ALLOC_PTR(mo);
1230         if (mo != NULL) {
1231                 struct lu_object *o;
1232                 struct lu_object_header *h;
1233
1234                 o = &mo->mot_obj.mo_lu;
1235                 h = &mo->mot_header;
1236                 lu_object_header_init(h);
1237                 lu_object_init(o, h, d);
1238                 lu_object_add_top(h, o);
1239                 return o;
1240         } else
1241                 return NULL;
1242 }
1243
1244 static int mdt_object_init(struct lu_context *ctxt, struct lu_object *o)
1245 {
1246         struct mdt_device *d = mdt_dev(o->lo_dev);
1247         struct lu_device  *under;
1248         struct lu_object  *below;
1249
1250         under = &d->mdt_child->md_lu_dev;
1251         below = under->ld_ops->ldo_object_alloc(ctxt, under);
1252         if (below != NULL) {
1253                 lu_object_add(o, below);
1254                 return 0;
1255         } else
1256                 return -ENOMEM;
1257 }
1258
1259 static void mdt_object_free(struct lu_context *ctxt, struct lu_object *o)
1260 {
1261         struct lu_object_header *h;
1262
1263         h = o->lo_header;
1264         lu_object_fini(o);
1265         lu_object_header_fini(h);
1266 }
1267
1268 static void mdt_object_release(struct lu_context *ctxt, struct lu_object *o)
1269 {
1270 }
1271
1272 static int mdt_object_print(struct lu_context *ctxt,
1273                             struct seq_file *f, const struct lu_object *o)
1274 {
1275         return seq_printf(f, LUSTRE_MDT0_NAME"-object@%p", o);
1276 }
1277
1278 static struct lu_device_operations mdt_lu_ops = {
1279         .ldo_object_alloc   = mdt_object_alloc,
1280         .ldo_object_init    = mdt_object_init,
1281         .ldo_object_free    = mdt_object_free,
1282         .ldo_object_release = mdt_object_release,
1283         .ldo_object_print   = mdt_object_print
1284 };
1285
1286 /* mds_connect copy */
1287 static int mdt_obd_connect(struct lustre_handle *conn, struct obd_device *obd,
1288                            struct obd_uuid *cluuid,
1289                            struct obd_connect_data *data)
1290 {
1291         struct obd_export *exp;
1292         int rc, abort_recovery;
1293         struct mdt_device *mdt;
1294         struct mds_export_data *med;
1295         struct mds_client_data *mcd = NULL;
1296
1297         ENTRY;
1298
1299         if (!conn || !obd || !cluuid)
1300                 RETURN(-EINVAL);
1301
1302         mdt = mdt_dev(obd->obd_lu_dev);
1303
1304         /* Check for aborted recovery. */
1305         spin_lock_bh(&obd->obd_processing_task_lock);
1306         abort_recovery = obd->obd_abort_recovery;
1307         spin_unlock_bh(&obd->obd_processing_task_lock);
1308         if (abort_recovery)
1309                 target_abort_recovery(obd);
1310
1311         /* XXX There is a small race between checking the list and adding a
1312          * new connection for the same UUID, but the real threat (list
1313          * corruption when multiple different clients connect) is solved.
1314          *
1315          * There is a second race between adding the export to the list,
1316          * and filling in the client data below.  Hence skipping the case
1317          * of NULL mcd above.  We should already be controlling multiple
1318          * connects at the client, and we can't hold the spinlock over
1319          * memory allocations without risk of deadlocking.
1320          */
1321         rc = class_connect(conn, obd, cluuid);
1322         if (rc)
1323                 RETURN(rc);
1324         exp = class_conn2export(conn);
1325         LASSERT(exp);
1326         med = &exp->exp_mds_data;
1327
1328         OBD_ALLOC(mcd, sizeof(*mcd));
1329         if (!mcd)
1330                 GOTO(out, rc = -ENOMEM);
1331
1332         memcpy(mcd->mcd_uuid, cluuid, sizeof(mcd->mcd_uuid));
1333         med->med_mcd = mcd;
1334
1335 out:
1336         if (rc) {
1337                 if (mcd) {
1338                         OBD_FREE(mcd, sizeof(*mcd));
1339                         med->med_mcd = NULL;
1340                 }
1341                 class_disconnect(exp);
1342         } else {
1343                 class_export_put(exp);
1344         }
1345
1346         RETURN(rc);
1347 }
1348
1349 static struct obd_ops mdt_obd_device_ops = {
1350         .o_owner = THIS_MODULE,
1351         .o_connect = mdt_obd_connect
1352 };
1353
1354 static struct lu_device *mdt_device_alloc(struct lu_device_type *t,
1355                                           struct lustre_cfg *cfg)
1356 {
1357         struct lu_device  *l;
1358         struct mdt_device *m;
1359
1360         OBD_ALLOC_PTR(m);
1361         if (m != NULL) {
1362                 int result;
1363
1364                 l = &m->mdt_md_dev.md_lu_dev;
1365                 result = mdt_init0(m, t, cfg);
1366                 if (result != 0) {
1367                         mdt_fini(m);
1368                         return ERR_PTR(result);
1369                 }
1370
1371         } else
1372                 l = ERR_PTR(-ENOMEM);
1373         return l;
1374 }
1375
1376 static void mdt_device_free(struct lu_device *d)
1377 {
1378         struct mdt_device *m = mdt_dev(d);
1379
1380         mdt_fini(m);
1381         OBD_FREE_PTR(m);
1382 }
1383
1384 static void *mdt_thread_init(struct ptlrpc_thread *t)
1385 {
1386         struct mdt_thread_info *info;
1387         int result;
1388
1389         OBD_ALLOC_PTR(info);
1390         if (info != NULL)
1391                 result = lu_context_init(&info->mti_ctxt);
1392         else
1393                 result = -ENOMEM;
1394         if (result != 0)
1395                 info = ERR_PTR(result);
1396         return info;
1397 }
1398
1399 static void mdt_thread_fini(struct ptlrpc_thread *t, void *data)
1400 {
1401         struct mdt_thread_info *info = data;
1402         lu_context_fini(&info->mti_ctxt);
1403         OBD_FREE_PTR(info);
1404 }
1405
1406 static struct ptlrpc_thread_key mdt_thread_key = {
1407         .ptk_init = mdt_thread_init,
1408         .ptk_fini = mdt_thread_fini
1409 };
1410
1411 static int mdt_type_init(struct lu_device_type *t)
1412 {
1413         return ptlrpc_thread_key_register(&mdt_thread_key);
1414 }
1415
1416 static void mdt_type_fini(struct lu_device_type *t)
1417 {
1418 }
1419
1420 static struct lu_device_type_operations mdt_device_type_ops = {
1421         .ldto_init = mdt_type_init,
1422         .ldto_fini = mdt_type_fini,
1423
1424         .ldto_device_alloc = mdt_device_alloc,
1425         .ldto_device_free  = mdt_device_free
1426 };
1427
1428 static struct lu_device_type mdt_device_type = {
1429         .ldt_tags = LU_DEVICE_MD,
1430         .ldt_name = LUSTRE_MDT0_NAME,
1431         .ldt_ops  = &mdt_device_type_ops
1432 };
1433
1434 static struct lprocfs_vars lprocfs_mdt_obd_vars[] = {
1435         { 0 }
1436 };
1437
1438 static struct lprocfs_vars lprocfs_mdt_module_vars[] = {
1439         { 0 }
1440 };
1441
1442 LPROCFS_INIT_VARS(mdt, lprocfs_mdt_module_vars, lprocfs_mdt_obd_vars);
1443
1444 static int __init mdt_mod_init(void)
1445 {
1446         struct lprocfs_static_vars lvars;
1447
1448         mdt_num_threads = MDT_NUM_THREADS;
1449         lprocfs_init_vars(mdt, &lvars);
1450         return class_register_type(&mdt_obd_device_ops, lvars.module_vars,
1451                                    LUSTRE_MDT0_NAME, &mdt_device_type);
1452 }
1453
1454 static void __exit mdt_mod_exit(void)
1455 {
1456         class_unregister_type(LUSTRE_MDT0_NAME);
1457 }
1458
1459
1460 #define DEF_HNDL(prefix, base, suffix, flags, opc, fn)                  \
1461 [prefix ## _ ## opc - prefix ## _ ## base] = {                          \
1462         .mh_name    = #opc,                                             \
1463         .mh_fail_id = OBD_FAIL_ ## prefix ## _  ## opc ## suffix,       \
1464         .mh_opc     = prefix ## _  ## opc,                              \
1465         .mh_flags   = flags,                                            \
1466         .mh_act     = fn                                                \
1467 }
1468
1469 #define DEF_MDT_HNDL(flags, name, fn)                   \
1470         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn)
1471
1472 static struct mdt_handler mdt_mds_ops[] = {
1473         DEF_MDT_HNDL(0,            CONNECT,        mdt_connect),
1474         DEF_MDT_HNDL(0,            DISCONNECT,     mdt_disconnect),
1475         DEF_MDT_HNDL(0,            GETSTATUS,      mdt_getstatus),
1476         DEF_MDT_HNDL(HABEO_CORPUS, GETATTR,        mdt_getattr),
1477         DEF_MDT_HNDL(HABEO_CORPUS, GETATTR_NAME,   mdt_getattr_name),
1478         DEF_MDT_HNDL(HABEO_CORPUS, SETXATTR,       mdt_setxattr),
1479         DEF_MDT_HNDL(HABEO_CORPUS, GETXATTR,       mdt_getxattr),
1480         DEF_MDT_HNDL(0,            STATFS,         mdt_statfs),
1481         DEF_MDT_HNDL(HABEO_CORPUS, READPAGE,       mdt_readpage),
1482         DEF_MDT_HNDL(0,            REINT,          mdt_reint),
1483         DEF_MDT_HNDL(HABEO_CORPUS, CLOSE,          mdt_close),
1484         DEF_MDT_HNDL(HABEO_CORPUS, DONE_WRITING,   mdt_done_writing),
1485         DEF_MDT_HNDL(0,            PIN,            mdt_pin),
1486         DEF_MDT_HNDL(HABEO_CORPUS, SYNC,           mdt_sync),
1487         DEF_MDT_HNDL(0,            SET_INFO,       mdt_set_info),
1488         DEF_MDT_HNDL(0,            GET_INFO,       mdt_get_info),
1489         DEF_MDT_HNDL(0,            QUOTACHECK,     mdt_handle_quotacheck),
1490         DEF_MDT_HNDL(0,            QUOTACTL,       mdt_handle_quotactl)
1491 };
1492
1493 static struct mdt_handler mdt_obd_ops[] = {
1494 };
1495
1496 #define DEF_DLM_HNDL(flags, name, fn)                   \
1497         DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn)
1498
1499 static struct mdt_handler mdt_dlm_ops[] = {
1500         DEF_DLM_HNDL(HABEO_CLAVIS, ENQUEUE,        mdt_enqueue),
1501         DEF_DLM_HNDL(HABEO_CLAVIS, CONVERT,        mdt_convert),
1502         DEF_DLM_HNDL(0,            BL_CALLBACK,    mdt_bl_callback),
1503         DEF_DLM_HNDL(0,            CP_CALLBACK,    mdt_cp_callback)
1504 };
1505
1506 static struct mdt_handler mdt_llog_ops[] = {
1507 };
1508
1509 static struct mdt_opc_slice mdt_handlers[] = {
1510         {
1511                 .mos_opc_start = MDS_GETATTR,
1512                 .mos_opc_end   = MDS_LAST_OPC,
1513                 .mos_hs        = mdt_mds_ops
1514         },
1515         {
1516                 .mos_opc_start = OBD_PING,
1517                 .mos_opc_end   = OBD_LAST_OPC,
1518                 .mos_hs        = mdt_obd_ops
1519         },
1520         {
1521                 .mos_opc_start = LDLM_ENQUEUE,
1522                 .mos_opc_end   = LDLM_LAST_OPC,
1523                 .mos_hs        = mdt_dlm_ops
1524         },
1525         {
1526                 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
1527                 .mos_opc_end   = LLOG_LAST_OPC,
1528                 .mos_hs        = mdt_llog_ops
1529         },
1530         {
1531                 .mos_hs        = NULL
1532         }
1533 };
1534
1535 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
1536 MODULE_DESCRIPTION("Lustre Meta-data Target Prototype ("LUSTRE_MDT0_NAME")");
1537 MODULE_LICENSE("GPL");
1538
1539 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
1540                 "number of mdt service threads to start");
1541
1542 cfs_module(mdt, "0.0.4", mdt_mod_init, mdt_mod_exit);