Whamcloud - gitweb
update comment.
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mdt/mdt_handler.c
5  *  Lustre Metadata Target (mdt) request handler
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *   Author: Nikita Danilov <nikita@clusterfs.com>
13  *   Author: Huang Hua <huanghua@clusterfs.com>
14  *
15  *   This file is part of the Lustre file system, http://www.lustre.org
16  *   Lustre is a trademark of Cluster File Systems, Inc.
17  *
18  *   You may have signed or agreed to another license before downloading
19  *   this software.  If so, you are bound by the terms and conditions
20  *   of that agreement, and the following does not apply to you.  See the
21  *   LICENSE file included with this distribution for more information.
22  *
23  *   If you did not agree to a different license, then this copy of Lustre
24  *   is open source software; you can redistribute it and/or modify it
25  *   under the terms of version 2 of the GNU General Public License as
26  *   published by the Free Software Foundation.
27  *
28  *   In either case, Lustre is distributed in the hope that it will be
29  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
30  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
31  *   license text for more details.
32  */
33
34 #ifndef EXPORT_SYMTAB
35 # define EXPORT_SYMTAB
36 #endif
37 #define DEBUG_SUBSYSTEM S_MDS
38
39 #include <linux/module.h>
40
41 /* LUSTRE_VERSION_CODE */
42 #include <lustre_ver.h>
43 /*
44  * struct OBD_{ALLOC,FREE}*()
45  * MDT_FAIL_CHECK
46  */
47 #include <obd_support.h>
48 /* struct ptlrpc_request */
49 #include <lustre_net.h>
50 /* struct obd_export */
51 #include <lustre_export.h>
52 /* struct obd_device */
53 #include <obd.h>
54 /* lu2dt_dev() */
55 #include <dt_object.h>
56 #include <lustre_mds.h>
57 #include <lustre_mdt.h>
58 #include "mdt_internal.h"
59 #include <linux/lustre_acl.h>
60 #include <lustre_param.h>
61 /*
62  * Initialized in mdt_mod_init().
63  */
64 unsigned long mdt_num_threads;
65
66 /* ptlrpc request handler for MDT. All handlers are
67  * grouped into several slices - struct mdt_opc_slice,
68  * and stored in an array - mdt_handlers[].
69  */
70 struct mdt_handler {
71         /* The name of this handler. */
72         const char *mh_name;
73         /* Fail id for this handler, checked at the beginning of this handler*/
74         int         mh_fail_id;
75         /* Operation code for this handler */
76         __u32       mh_opc;
77         /* flags are listed in enum mdt_handler_flags below. */
78         __u32       mh_flags;
79         /* The actual handler function to execute. */
80         int (*mh_act)(struct mdt_thread_info *info);
81         /* Request format for this request. */
82         const struct req_format *mh_fmt;
83 };
84
85 enum mdt_handler_flags {
86         /*
87          * struct mdt_body is passed in the incoming message, and object
88          * identified by this fid exists on disk.
89          *
90          * "habeo corpus" == "I have a body"
91          */
92         HABEO_CORPUS = (1 << 0),
93         /*
94          * struct ldlm_request is passed in the incoming message.
95          *
96          * "habeo clavis" == "I have a key"
97          */
98         HABEO_CLAVIS = (1 << 1),
99         /*
100          * this request has fixed reply format, so that reply message can be
101          * packed by generic code.
102          *
103          * "habeo refero" == "I have a reply"
104          */
105         HABEO_REFERO = (1 << 2),
106         /*
107          * this request will modify something, so check whether the filesystem
108          * is readonly or not, then return -EROFS to client asap if necessary.
109          *
110          * "mutabor" == "I shall modify"
111          */
112         MUTABOR      = (1 << 3)
113 };
114
115 struct mdt_opc_slice {
116         __u32               mos_opc_start;
117         int                 mos_opc_end;
118         struct mdt_handler *mos_hs;
119 };
120
121 static struct mdt_opc_slice mdt_regular_handlers[];
122 static struct mdt_opc_slice mdt_readpage_handlers[];
123 static struct mdt_opc_slice mdt_seq_handlers[];
124 static struct mdt_opc_slice mdt_fld_handlers[];
125
126 static struct mdt_device *mdt_dev(struct lu_device *d);
127 static int mdt_regular_handle(struct ptlrpc_request *req);
128 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags);
129
130 static struct lu_object_operations mdt_obj_ops;
131
132 int mdt_get_disposition(struct ldlm_reply *rep, int flag)
133 {
134         if (!rep)
135                 return 0;
136         return (rep->lock_policy_res1 & flag);
137 }
138
139 void mdt_clear_disposition(struct mdt_thread_info *info,
140                            struct ldlm_reply *rep, int flag)
141 {
142         if (info)
143                 info->mti_opdata &= ~flag;
144         if (rep)
145                 rep->lock_policy_res1 &= ~flag;
146 }
147
148 void mdt_set_disposition(struct mdt_thread_info *info,
149                          struct ldlm_reply *rep, int flag)
150 {
151         if (info)
152                 info->mti_opdata |= flag;
153         if (rep)
154                 rep->lock_policy_res1 |= flag;
155 }
156
157 static int mdt_getstatus(struct mdt_thread_info *info)
158 {
159         struct mdt_device *mdt  = info->mti_mdt;
160         struct md_device  *next = mdt->mdt_child;
161         struct mdt_body   *body;
162         int                rc;
163
164         ENTRY;
165
166         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
167                 RETURN(err_serious(-ENOMEM));
168
169         body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
170         rc = next->md_ops->mdo_root_get(info->mti_env, next, &body->fid1);
171         if (rc != 0)
172                 RETURN(rc);
173
174         body->valid |= OBD_MD_FLID;
175
176         if (mdt->mdt_opts.mo_mds_capa) {
177                 struct mdt_object  *root;
178                 struct lustre_capa *capa;
179
180                 root = mdt_object_find(info->mti_env, mdt, &body->fid1);
181                 if (IS_ERR(root))
182                         RETURN(PTR_ERR(root));
183
184                 capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
185                 LASSERT(capa);
186                 capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
187
188                 rc = mo_capa_get(info->mti_env, mdt_object_child(root), capa,
189                                  0);
190                 mdt_object_put(info->mti_env, root);
191                 if (rc == 0)
192                         body->valid |= OBD_MD_FLMDSCAPA;
193         }
194
195         RETURN(rc);
196 }
197
198 static int mdt_statfs(struct mdt_thread_info *info)
199 {
200         struct md_device  *next  = info->mti_mdt->mdt_child;
201         struct obd_statfs *osfs;
202         int                rc;
203
204         ENTRY;
205
206         /* This will trigger a watchdog timeout */
207         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
208                          (MDT_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1);
209
210
211         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
212                 rc = err_serious(-ENOMEM);
213         } else {
214                 osfs = req_capsule_server_get(&info->mti_pill,&RMF_OBD_STATFS);
215                 /* XXX max_age optimisation is needed here. See mds_statfs */
216                 rc = next->md_ops->mdo_statfs(info->mti_env, next,
217                                               &info->mti_u.ksfs);
218                 statfs_pack(osfs, &info->mti_u.ksfs);
219         }
220         RETURN(rc);
221 }
222
223 void mdt_pack_size2body(struct mdt_body *b, const struct lu_attr *attr,
224                         struct mdt_object *o)
225 {
226         /* Check if Size-on-MDS is enabled. */
227         if (S_ISREG(attr->la_mode) && mdt_sizeonmds_enabled(o)) {
228                 b->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
229                 b->size = attr->la_size;
230                 b->blocks = attr->la_blocks;
231         }
232 }
233
234 void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
235                         const struct lu_attr *attr, const struct lu_fid *fid)
236 {
237         /*XXX should pack the reply body according to lu_valid*/
238         b->valid |= OBD_MD_FLCTIME | OBD_MD_FLUID   |
239                     OBD_MD_FLGID   | OBD_MD_FLTYPE  |
240                     OBD_MD_FLMODE  | OBD_MD_FLNLINK | OBD_MD_FLFLAGS |
241                     OBD_MD_FLATIME | OBD_MD_FLMTIME ;
242
243         if (!S_ISREG(attr->la_mode))
244                 b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLRDEV;
245
246         b->atime      = attr->la_atime;
247         b->mtime      = attr->la_mtime;
248         b->ctime      = attr->la_ctime;
249         b->mode       = attr->la_mode;
250         b->size       = attr->la_size;
251         b->blocks     = attr->la_blocks;
252         b->uid        = attr->la_uid;
253         b->gid        = attr->la_gid;
254         b->flags      = attr->la_flags;
255         b->nlink      = attr->la_nlink;
256         b->rdev       = attr->la_rdev;
257
258         if (fid) {
259                 b->fid1 = *fid;
260                 b->valid |= OBD_MD_FLID;
261                 CDEBUG(D_INODE, ""DFID": nlink=%d, mode=%o, size="LPU64"\n",
262                                 PFID(fid), b->nlink, b->mode, b->size);
263         }
264
265         if (info)
266                 mdt_body_reverse_idmap(info, b);
267 }
268
269 static inline int mdt_body_has_lov(const struct lu_attr *la,
270                                    const struct mdt_body *body)
271 {
272         return ((S_ISREG(la->la_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
273                 (S_ISDIR(la->la_mode) && (body->valid & OBD_MD_FLDIREA )) );
274 }
275
276 static int mdt_getattr_internal(struct mdt_thread_info *info,
277                                 struct mdt_object *o)
278 {
279         struct md_object        *next = mdt_object_child(o);
280         struct mdt_device       *mdt = info->mti_mdt;
281         const struct mdt_body   *reqbody = info->mti_body;
282         struct ptlrpc_request   *req = mdt_info_req(info);
283         struct mdt_export_data  *med = &req->rq_export->exp_mdt_data;
284         struct md_attr          *ma = &info->mti_attr;
285         struct lu_attr          *la = &ma->ma_attr;
286         struct req_capsule      *pill = &info->mti_pill;
287         const struct lu_env     *env = info->mti_env;
288         struct mdt_body         *repbody;
289         struct lu_buf           *buffer = &info->mti_buf;
290         int                     rc;
291         ENTRY;
292
293         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
294                 RETURN(err_serious(-ENOMEM));
295
296         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
297         repbody->eadatasize = 0;
298         repbody->aclsize = 0;
299
300         if (reqbody->valid & OBD_MD_MEA) {
301                 /* Assumption: MDT_MD size is enough for lmv size FIXME */
302                 ma->ma_lmv = req_capsule_server_get(pill, &RMF_MDT_MD);
303                 ma->ma_lmv_size = req_capsule_get_size(pill, &RMF_MDT_MD,
304                                                              RCL_SERVER);
305                 ma->ma_need = MA_INODE | MA_LMV;
306         } else {
307                 ma->ma_need = MA_INODE | MA_LOV ;
308                 ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
309                 ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD,
310                                                              RCL_SERVER);
311         }
312         rc = mo_attr_get(env, next, ma);
313         if (rc == -EREMOTE) {
314                 /* This object is located on remote node.*/
315                 repbody->fid1 = *mdt_object_fid(o);
316                 repbody->valid = OBD_MD_FLID | OBD_MD_MDS;
317                 RETURN(0);
318         } else if (rc) {
319                 CERROR("getattr error for "DFID": %d\n",
320                         PFID(mdt_object_fid(o)), rc);
321                 RETURN(rc);
322         }
323
324         if (ma->ma_valid & MA_INODE)
325                 mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
326         else
327                 RETURN(-EFAULT);
328
329         if (mdt_body_has_lov(la, reqbody)) {
330                 if (ma->ma_valid & MA_LOV) {
331                         LASSERT(ma->ma_lmm_size);
332                         mdt_dump_lmm(D_INFO, ma->ma_lmm);
333                         repbody->eadatasize = ma->ma_lmm_size;
334                         if (S_ISDIR(la->la_mode))
335                                 repbody->valid |= OBD_MD_FLDIREA;
336                         else
337                                 repbody->valid |= OBD_MD_FLEASIZE;
338                 }
339                 if (ma->ma_valid & MA_LMV) {
340                         LASSERT(S_ISDIR(la->la_mode));
341                         repbody->eadatasize = ma->ma_lmv_size;
342                         repbody->valid |= OBD_MD_FLDIREA;
343                         repbody->valid |= OBD_MD_MEA;
344                 }
345         } else if (S_ISLNK(la->la_mode) &&
346                           reqbody->valid & OBD_MD_LINKNAME) {
347                 /* FIXME: Is this buffer long enough? */
348                 buffer->lb_buf = ma->ma_lmm;
349                 buffer->lb_len = ma->ma_lmm_size;
350                 rc = mo_readlink(env, next, buffer);
351                 if (rc <= 0) {
352                         CERROR("readlink failed: %d\n", rc);
353                         rc = -EFAULT;
354                 } else {
355                         repbody->valid |= OBD_MD_LINKNAME;
356                         repbody->eadatasize = rc + 1;
357                         ((char*)ma->ma_lmm)[rc] = 0; /* NULL terminate */
358                         CDEBUG(D_INODE, "symlink dest %s, len = %d\n",
359                                         (char*)ma->ma_lmm, rc);
360                         rc = 0;
361                 }
362         }
363
364         if (reqbody->valid & OBD_MD_FLMODEASIZE) {
365                 repbody->max_cookiesize = info->mti_mdt->mdt_max_cookiesize;
366                 repbody->max_mdsize = info->mti_mdt->mdt_max_mdsize;
367                 repbody->valid |= OBD_MD_FLMODEASIZE;
368                 CDEBUG(D_INODE, "I am going to change the MAX_MD_SIZE & "
369                                 "MAX_COOKIE to : %d:%d\n",
370                                 repbody->max_mdsize,
371                                 repbody->max_cookiesize);
372         }
373
374         if (med->med_rmtclient && (reqbody->valid & OBD_MD_FLRMTPERM)) {
375                 void *buf = req_capsule_server_get(pill, &RMF_ACL);
376
377                 /* mdt_getattr_lock only */
378                 rc = mdt_pack_remote_perm(info, o, buf);
379                 if (rc) {
380                         repbody->valid &= ~OBD_MD_FLRMTPERM;
381                         repbody->aclsize = 0;
382                         RETURN(rc);
383                 } else {
384                         repbody->valid |= OBD_MD_FLRMTPERM;
385                         repbody->aclsize = sizeof(struct mdt_remote_perm);
386                 }
387         }
388 #ifdef CONFIG_FS_POSIX_ACL
389         else if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
390                  (reqbody->valid & OBD_MD_FLACL)) {
391                 buffer->lb_buf = req_capsule_server_get(pill, &RMF_ACL);
392                 buffer->lb_len = req_capsule_get_size(pill,
393                                                       &RMF_ACL, RCL_SERVER);
394                 if (buffer->lb_len > 0) {
395                         rc = mo_xattr_get(env, next, buffer,
396                                           XATTR_NAME_ACL_ACCESS);
397                         if (rc < 0) {
398                                 if (rc == -ENODATA || rc == -EOPNOTSUPP)
399                                         rc = 0;
400                                 else
401                                         CERROR("got acl size: %d\n", rc);
402                         } else {
403                                 repbody->aclsize = rc;
404                                 repbody->valid |= OBD_MD_FLACL;
405                                 rc = 0;
406                         }
407                 }
408         }
409 #endif
410
411         if ((reqbody->valid & OBD_MD_FLMDSCAPA) && mdt->mdt_opts.mo_mds_capa) {
412                 struct lustre_capa *capa;
413
414                 capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
415                 LASSERT(capa);
416                 capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
417                 rc = mo_capa_get(env, next, capa, 0);
418                 if (rc)
419                         RETURN(rc);
420                 repbody->valid |= OBD_MD_FLMDSCAPA;
421         }
422
423         RETURN(rc);
424 }
425
426 static int mdt_renew_capa(struct mdt_thread_info *info)
427 {
428         struct mdt_device  *mdt = info->mti_mdt;
429         struct mdt_object  *obj = info->mti_object;
430         struct mdt_body    *body;
431         struct lustre_capa *capa, *c;
432         int rc;
433         ENTRY;
434
435         body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
436         LASSERT(body != NULL);
437
438         /* NB: see mdt_unpack_req_pack_rep */
439         if (!obj)
440                 GOTO(out, rc = -ENOENT);
441
442         c = req_capsule_client_get(&info->mti_pill, &RMF_CAPA1);
443         LASSERT(c);
444
445         if (!mdt->mdt_opts.mo_mds_capa) {
446                 DEBUG_CAPA(D_SEC, c,
447                            "mds has disabled capability, skip renew for");
448                 GOTO(out, rc = -ENOENT);
449         }
450
451         capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
452         LASSERT(capa);
453
454         *capa = *c;
455         rc = mo_capa_get(info->mti_env, mdt_object_child(obj), capa, 1);
456         if (rc)
457                 GOTO(out, rc);
458
459         body->valid |= OBD_MD_FLOSSCAPA;
460         EXIT;
461 out:
462         /* NB: capability renewal might fail because object has been removed,
463          * or server has disabled capability, but this is not error, llite
464          * will handle this internally, see mdc_interpret_renew_capa.
465          * body->flags is borrowed to store errno.
466          */
467         body->flags = (__u32)-rc;
468         return 0;
469 }
470
471 static int mdt_getattr(struct mdt_thread_info *info)
472 {
473         struct mdt_object *obj = info->mti_object;
474         struct mdt_body   *reqbody;
475         int rc;
476         ENTRY;
477
478         reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
479         if (reqbody == NULL)
480                 GOTO(out, rc = -EFAULT);
481
482         if (reqbody->valid & OBD_MD_FLOSSCAPA) {
483                 rc = mdt_renew_capa(info);
484                 mdt_shrink_reply(info, REPLY_REC_OFF + 1, 0, 0);
485                 RETURN(rc);
486         }
487
488         LASSERT(obj != NULL);
489         LASSERT(lu_object_assert_exists(&obj->mot_obj.mo_lu));
490
491         if (reqbody->valid & OBD_MD_FLRMTPERM) {
492                 rc = mdt_init_ucred(info, reqbody);
493                 if (rc)
494                         GOTO(out, rc);
495         }
496
497         rc = mdt_getattr_internal(info, obj);
498         if (reqbody->valid & OBD_MD_FLRMTPERM)
499                 mdt_exit_ucred(info);
500         EXIT;
501 out:
502         mdt_shrink_reply(info, REPLY_REC_OFF + 1, 1, 0);
503         return rc;
504 }
505
506 static int mdt_is_subdir(struct mdt_thread_info *info)
507 {
508         struct mdt_object   *obj = info->mti_object;
509         struct req_capsule  *pill = &info->mti_pill;
510         struct mdt_body     *repbody;
511         int                  rc;
512
513         obj = info->mti_object;
514         LASSERT(obj != NULL);
515         LASSERT(lu_object_assert_exists(&obj->mot_obj.mo_lu));
516         ENTRY;
517
518         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
519
520         /*
521          * We save last checked parent fid to @repbody->fid1 for remote
522          * directory case.
523          */
524         LASSERT(fid_is_sane(&info->mti_body->fid2));
525         rc = mdo_is_subdir(info->mti_env, mdt_object_child(obj),
526                            &info->mti_body->fid2, &repbody->fid1);
527         if (rc < 0)
528                 RETURN(rc);
529
530         /*
531          * Save error code to ->mode. Later it it is used for detecting the case
532          * of remote subdir.
533          */
534         repbody->mode = rc;
535         repbody->valid = OBD_MD_FLMODE;
536
537         if (rc == -EREMOTE)
538                 repbody->valid |= OBD_MD_FLID;
539
540         RETURN(0);
541 }
542
543 /*
544  * UPDATE lock should be taken against parent, and be release before exit;
545  * child_bits lock should be taken against child, and be returned back:
546  *            (1)normal request should release the child lock;
547  *            (2)intent request will grant the lock to client.
548  */
549 static int mdt_getattr_name_lock(struct mdt_thread_info *info,
550                                  struct mdt_lock_handle *lhc,
551                                  __u64 child_bits,
552                                  struct ldlm_reply *ldlm_rep)
553 {
554         struct ptlrpc_request *req = mdt_info_req(info);
555         struct mdt_object     *parent = info->mti_object;
556         struct mdt_object     *child;
557         struct md_object      *next = mdt_object_child(info->mti_object);
558         struct lu_fid         *child_fid = &info->mti_tmp_fid1;
559         int                    is_resent, rc;
560         const char            *name;
561         struct mdt_lock_handle *lhp;
562         struct ldlm_lock      *lock;
563         ENTRY;
564
565         is_resent = lustre_handle_is_used(&lhc->mlh_lh);
566         if (is_resent)
567                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
568
569         LASSERT(info->mti_object != NULL);
570         name = req_capsule_client_get(&info->mti_pill, &RMF_NAME);
571         if (name == NULL)
572                 RETURN(err_serious(-EFAULT));
573
574         CDEBUG(D_INODE, "getattr with lock for "DFID"/%s, ldlm_rep = %p\n",
575                PFID(mdt_object_fid(parent)), name, ldlm_rep);
576
577         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
578
579         rc = mdt_object_exists(parent);
580         if (rc == 0)
581                 RETURN(-ESTALE);
582         else if (rc < 0) {
583                 CERROR("Object "DFID" locates on remote server\n",
584                         PFID(mdt_object_fid(parent)));
585                 LBUG();
586         }
587
588         if (strlen(name) == 0) {
589                 /* Only getattr on the child. Parent is on another node. */
590                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
591                 child = parent;
592                 CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID
593                        ", ldlm_rep=%p\n", PFID(mdt_object_fid(child)), ldlm_rep);
594
595                 if (is_resent) {
596                         /* Do not take lock for resent case. */
597                         lock = ldlm_handle2lock(&lhc->mlh_lh);
598                         if (!lock) {
599                                 CERROR("Invalid lock handle "LPX64"\n",
600                                        lhc->mlh_lh.cookie);
601                                 LBUG();
602                         }
603                         LASSERT(fid_res_name_eq(mdt_object_fid(child),
604                                                 &lock->l_resource->lr_name));
605                         LDLM_LOCK_PUT(lock);
606                         rc = 0;
607                 } else {
608                         mdt_lock_handle_init(lhc);
609                         lhc->mlh_mode = LCK_CR;
610
611                         /*
612                          * Object's name is on another MDS, no lookup lock is
613                          * needed here but update is.
614                          */
615                         child_bits &= ~MDS_INODELOCK_LOOKUP;
616                         child_bits |= MDS_INODELOCK_UPDATE;
617                         rc = mdt_object_lock(info, child, lhc, child_bits);
618                 }
619                 if (rc == 0) {
620                         /* Finally, we can get attr for child. */
621                         mdt_set_capainfo(info, 0, mdt_object_fid(child),
622                                          BYPASS_CAPA);
623                         rc = mdt_getattr_internal(info, child);
624                         if (rc != 0)
625                                 mdt_object_unlock(info, child, lhc, 1);
626                 }
627                 GOTO(out, rc);
628         }
629
630         /*step 1: lock parent */
631         lhp = &info->mti_lh[MDT_LH_PARENT];
632         lhp->mlh_mode = LCK_CR;
633         rc = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE);
634         if (rc != 0)
635                 RETURN(rc);
636
637         /*step 2: lookup child's fid by name */
638         rc = mdo_lookup(info->mti_env, next, name, child_fid);
639         if (rc != 0) {
640                 if (rc == -ENOENT)
641                         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
642                 GOTO(out_parent, rc);
643         } else
644                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
645         /*
646          *step 3: find the child object by fid & lock it.
647          *        regardless if it is local or remote.
648          */
649         child = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
650         if (IS_ERR(child))
651                 GOTO(out_parent, rc = PTR_ERR(child));
652         if (is_resent) {
653                 /* Do not take lock for resent case. */
654                 lock = ldlm_handle2lock(&lhc->mlh_lh);
655                 if (!lock) {
656                         CERROR("Invalid lock handle "LPX64"\n",
657                                lhc->mlh_lh.cookie);
658                         LBUG();
659                 }
660                 LASSERT(fid_res_name_eq(child_fid,
661                                         &lock->l_resource->lr_name));
662                 LDLM_LOCK_PUT(lock);
663         } else {
664                 mdt_lock_handle_init(lhc);
665                 lhc->mlh_mode = LCK_CR;
666                 rc = mdt_object_cr_lock(info, child, lhc, child_bits);
667                 if (rc != 0)
668                         GOTO(out_child, rc);
669         }
670
671         /* finally, we can get attr for child. */
672         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
673         rc = mdt_getattr_internal(info, child);
674         if (rc != 0) {
675                 mdt_object_unlock(info, child, lhc, 1);
676         } else {
677                 struct ldlm_lock *lock = ldlm_handle2lock(&lhc->mlh_lh);
678                 if (lock) {
679                         struct ldlm_res_id *res_id;
680                         struct mdt_body *repbody;
681                         struct lu_attr *ma;
682
683                         /* Debugging code. */
684                         res_id = &lock->l_resource->lr_name;
685                         LDLM_DEBUG(lock, "we will return this lock client\n");
686                         LASSERTF(fid_res_name_eq(mdt_object_fid(child),
687                                                  &lock->l_resource->lr_name),
688                                 "Lock res_id: %lu/%lu/%lu, Fid: "DFID".\n",
689                                 (unsigned long)res_id->name[0],
690                                 (unsigned long)res_id->name[1],
691                                 (unsigned long)res_id->name[2],
692                                 PFID(mdt_object_fid(child)));
693
694                         /* Pack Size-on-MDS inode attributes to the body if
695                          * update lock is given. */
696                         repbody = req_capsule_server_get(&info->mti_pill,
697                                                          &RMF_MDT_BODY);
698                         ma = &info->mti_attr.ma_attr;
699                         if (lock->l_policy_data.l_inodebits.bits &
700                             MDS_INODELOCK_UPDATE)
701                                 mdt_pack_size2body(repbody, ma, child);
702                         LDLM_LOCK_PUT(lock);
703                 }
704         }
705         EXIT;
706 out_child:
707         mdt_object_put(info->mti_env, child);
708 out_parent:
709         mdt_object_unlock(info, parent, lhp, 1);
710 out:
711         return rc;
712 }
713
714 /* normal handler: should release the child lock */
715 static int mdt_getattr_name(struct mdt_thread_info *info)
716 {
717         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
718         struct mdt_body        *reqbody;
719         int rc;
720         ENTRY;
721
722         reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
723         if (reqbody == NULL)
724                 GOTO(out, rc = err_serious(-EFAULT));
725
726         rc = mdt_init_ucred(info, reqbody);
727         if (rc)
728                 GOTO(out, rc);
729
730         rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
731         if (lustre_handle_is_used(&lhc->mlh_lh)) {
732                 ldlm_lock_decref(&lhc->mlh_lh, lhc->mlh_mode);
733                 lhc->mlh_lh.cookie = 0;
734         }
735         mdt_exit_ucred(info);
736         EXIT;
737 out:
738         mdt_shrink_reply(info, REPLY_REC_OFF + 1, 1, 0);
739         return rc;
740 }
741
742 static struct lu_device_operations mdt_lu_ops;
743
744 static int lu_device_is_mdt(struct lu_device *d)
745 {
746         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
747 }
748
749 static int mdt_connect(struct mdt_thread_info *info)
750 {
751         int rc;
752         struct ptlrpc_request *req;
753
754         req = mdt_info_req(info);
755         rc = target_handle_connect(req);
756         if (rc == 0) {
757                 LASSERT(req->rq_export != NULL);
758                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
759                 rc = mdt_init_idmap(info);
760         } else
761                 rc = err_serious(rc);
762         return rc;
763 }
764
765 static int mdt_disconnect(struct mdt_thread_info *info)
766 {
767         int rc;
768
769         rc = target_handle_disconnect(mdt_info_req(info));
770         if (rc)
771                 rc = err_serious(rc);
772         return rc;
773 }
774
775 static int mdt_sendpage(struct mdt_thread_info *info,
776                         struct lu_rdpg *rdpg)
777 {
778         struct ptlrpc_request   *req = mdt_info_req(info);
779         struct ptlrpc_bulk_desc *desc;
780         struct l_wait_info      *lwi = &info->mti_u.rdpg.mti_wait_info;
781         int                      tmpcount;
782         int                      tmpsize;
783         int                      i;
784         int                      rc;
785         ENTRY;
786
787         desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, BULK_PUT_SOURCE,
788                                     MDS_BULK_PORTAL);
789         if (desc == NULL)
790                 GOTO(out, rc = -ENOMEM);
791
792         for (i = 0, tmpcount = rdpg->rp_count;
793                 i < rdpg->rp_npages; i++, tmpcount -= tmpsize) {
794                 tmpsize = min_t(int, tmpcount, CFS_PAGE_SIZE);
795                 ptlrpc_prep_bulk_page(desc, rdpg->rp_pages[i], 0, tmpsize);
796         }
797
798         LASSERT(desc->bd_nob == rdpg->rp_count);
799         rc = ptlrpc_start_bulk_transfer(desc);
800         if (rc)
801                 GOTO(free_desc, rc);
802
803         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
804                 GOTO(abort_bulk, rc);
805
806         *lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
807         rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), lwi);
808         LASSERT (rc == 0 || rc == -ETIMEDOUT);
809
810         if (rc == 0) {
811                 if (desc->bd_success &&
812                     desc->bd_nob_transferred == rdpg->rp_count)
813                         GOTO(free_desc, rc);
814
815                 rc = -ETIMEDOUT; /* XXX should this be a different errno? */
816         }
817
818         DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
819                   (rc == -ETIMEDOUT) ? "timeout" : "network error",
820                   desc->bd_nob_transferred, rdpg->rp_count,
821                   req->rq_export->exp_client_uuid.uuid,
822                   req->rq_export->exp_connection->c_remote_uuid.uuid);
823
824         class_fail_export(req->rq_export);
825
826         EXIT;
827 abort_bulk:
828         ptlrpc_abort_bulk(desc);
829 free_desc:
830         ptlrpc_free_bulk(desc);
831 out:
832         return rc;
833 }
834
835 #ifdef HAVE_SPLIT_SUPPORT
836 /*
837  * Retrieve dir entry from the page and insert it to the
838  * slave object, actually, this should be in osd layer,
839  * but since it will not in the final product, so just do
840  * it here and do not define more moo api anymore for
841  * this.
842  */
843 static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page,
844                               int size)
845 {
846         struct mdt_object *object = info->mti_object;
847         struct lu_dirpage *dp;
848         struct lu_dirent *ent;
849         int rc = 0, offset = 0, is_dir;
850
851         ENTRY;
852
853         /* Disable trans for this name insert, since it will
854          * include many trans for this */
855         info->mti_no_need_trans = 1;
856         kmap(page);
857         dp = page_address(page);
858         offset = (int)((__u32)lu_dirent_start(dp) - (__u32)dp);
859
860         for (ent = lu_dirent_start(dp); ent != NULL;
861                           ent = lu_dirent_next(ent)) {
862                 struct lu_fid *lf = &ent->lde_fid;
863                 char *name;
864
865                 offset += ent->lde_reclen;
866                 if (ent->lde_namelen == 0)
867                         continue;
868
869                 if (offset > size)
870                         break;
871                 is_dir = le32_to_cpu(ent->lde_hash) & MAX_HASH_HIGHEST_BIT;
872                 OBD_ALLOC(name, ent->lde_namelen + 1);
873                 memcpy(name, ent->lde_name, ent->lde_namelen);
874                 rc = mdo_name_insert(info->mti_env,
875                                      md_object_next(&object->mot_obj),
876                                      name, lf, is_dir);
877                 OBD_FREE(name, ent->lde_namelen + 1);
878                 if (rc)
879                         GOTO(out, rc);
880         }
881 out:
882         kunmap(page);
883         RETURN(rc);
884 }
885
886 static int mdt_bulk_timeout(void *data)
887 {
888         ENTRY;
889
890         CERROR("mdt bulk transfer timeout \n");
891
892         RETURN(1);
893 }
894
895 static int mdt_writepage(struct mdt_thread_info *info)
896 {
897         struct ptlrpc_request   *req = mdt_info_req(info);
898         struct mdt_body         *reqbody;
899         struct l_wait_info      *lwi;
900         struct ptlrpc_bulk_desc *desc;
901         struct page             *page;
902         int                rc;
903         ENTRY;
904
905
906         reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
907         if (reqbody == NULL)
908                 RETURN(err_serious(-EFAULT));
909
910         desc = ptlrpc_prep_bulk_exp (req, 1, BULK_GET_SINK, MDS_BULK_PORTAL);
911         if (!desc)
912                 RETURN(err_serious(-ENOMEM));
913
914         /* allocate the page for the desc */
915         page = alloc_pages(GFP_KERNEL, 0);
916         if (!page)
917                 GOTO(desc_cleanup, rc = -ENOMEM);
918
919         CDEBUG(D_INFO, "Received page offset %d size %d \n",
920                         (int)reqbody->size, (int)reqbody->nlink);
921
922         ptlrpc_prep_bulk_page(desc, page, (int)reqbody->size,
923                               (int)reqbody->nlink);
924
925         /* FIXME: following parts are copied from ost_brw_write */
926
927         /* Check if client was evicted while we were doing i/o before touching
928            network */
929         OBD_ALLOC_PTR(lwi);
930         if (!lwi)
931                 GOTO(cleanup_page, rc = -ENOMEM);
932
933         if (desc->bd_export->exp_failed)
934                 rc = -ENOTCONN;
935         else
936                 rc = ptlrpc_start_bulk_transfer (desc);
937         if (rc == 0) {
938                 *lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * HZ / 4, HZ,
939                                             mdt_bulk_timeout, desc);
940                 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc) ||
941                                   desc->bd_export->exp_failed, lwi);
942                 LASSERT(rc == 0 || rc == -ETIMEDOUT);
943                 if (rc == -ETIMEDOUT) {
944                         DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
945                         ptlrpc_abort_bulk(desc);
946                 } else if (desc->bd_export->exp_failed) {
947                         DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
948                         rc = -ENOTCONN;
949                         ptlrpc_abort_bulk(desc);
950                 } else if (!desc->bd_success ||
951                            desc->bd_nob_transferred != desc->bd_nob) {
952                         DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
953                                   desc->bd_success ?
954                                   "truncated" : "network error on",
955                                   desc->bd_nob_transferred, desc->bd_nob);
956                         /* XXX should this be a different errno? */
957                         rc = -ETIMEDOUT;
958                 }
959         } else {
960                 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d\n", rc);
961         }
962         if (rc)
963                 GOTO(cleanup_lwi, rc);
964         rc = mdt_write_dir_page(info, page, reqbody->nlink);
965
966 cleanup_lwi:
967         OBD_FREE_PTR(lwi);
968 cleanup_page:
969         __free_pages(page, 0);
970 desc_cleanup:
971         ptlrpc_free_bulk(desc);
972         RETURN(rc);
973 }
974 #endif
975
976 static int mdt_readpage(struct mdt_thread_info *info)
977 {
978         struct mdt_object *object = info->mti_object;
979         struct lu_rdpg    *rdpg = &info->mti_u.rdpg.mti_rdpg;
980         struct mdt_body   *reqbody;
981         struct mdt_body   *repbody;
982         int                rc;
983         int                i;
984         ENTRY;
985
986         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
987                 RETURN(err_serious(-ENOMEM));
988
989         reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
990         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
991         if (reqbody == NULL || repbody == NULL)
992                 RETURN(err_serious(-EFAULT));
993
994         /*
995          * prepare @rdpg before calling lower layers and transfer itself. Here
996          * reqbody->size contains offset of where to start to read and
997          * reqbody->nlink contains number bytes to read.
998          */
999         rdpg->rp_hash = reqbody->size;
1000         if ((__u64)rdpg->rp_hash != reqbody->size) {
1001                 CERROR("Invalid hash: %#llx != %#llx\n",
1002                        (__u64)rdpg->rp_hash, reqbody->size);
1003                 RETURN(-EFAULT);
1004         }
1005         rdpg->rp_count  = reqbody->nlink;
1006         rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1)>>CFS_PAGE_SHIFT;
1007         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
1008         if (rdpg->rp_pages == NULL)
1009                 RETURN(-ENOMEM);
1010
1011         for (i = 0; i < rdpg->rp_npages; ++i) {
1012                 rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
1013                 if (rdpg->rp_pages[i] == NULL)
1014                         GOTO(free_rdpg, rc = -ENOMEM);
1015         }
1016
1017         /* call lower layers to fill allocated pages with directory data */
1018         rc = mo_readpage(info->mti_env, mdt_object_child(object), rdpg);
1019         if (rc)
1020                 GOTO(free_rdpg, rc);
1021
1022         /* send pages to client */
1023         rc = mdt_sendpage(info, rdpg);
1024
1025         EXIT;
1026 free_rdpg:
1027
1028         for (i = 0; i < rdpg->rp_npages; i++)
1029                 if (rdpg->rp_pages[i] != NULL)
1030                         __free_pages(rdpg->rp_pages[i], 0);
1031         OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
1032
1033         MDT_FAIL_RETURN(OBD_FAIL_MDS_SENDPAGE, 0);
1034
1035         return rc;
1036 }
1037
1038 static int mdt_reint_internal(struct mdt_thread_info *info,
1039                               struct mdt_lock_handle *lhc,
1040                               __u32 op)
1041 {
1042         struct req_capsule      *pill = &info->mti_pill;
1043         struct mdt_device       *mdt = info->mti_mdt;
1044         struct ptlrpc_request   *req = mdt_info_req(info);
1045         int                      rc;
1046         ENTRY;
1047
1048         /* pack reply */
1049         if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
1050                 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
1051                                      mdt->mdt_max_mdsize);
1052         if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
1053                 req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
1054                                      mdt->mdt_max_cookiesize);
1055         rc = req_capsule_pack(pill);
1056         if (rc != 0) {
1057                 CERROR("Can't pack response, rc %d\n", rc);
1058                 RETURN(err_serious(rc));
1059         }
1060
1061         /*
1062          * Check this after packing response, because after we fail here without
1063          * allocating response, caller anyway may want to get ldlm_reply from it
1064          * and will get oops.
1065          */
1066         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK))
1067                 RETURN(err_serious(-EFAULT));
1068
1069         rc = mdt_reint_unpack(info, op);
1070         if (rc != 0) {
1071                 CERROR("Can't unpack reint, rc %d\n", rc);
1072                 RETURN(err_serious(rc));
1073         }
1074
1075         rc = mdt_init_ucred_reint(info);
1076         if (rc)
1077                 RETURN(rc);
1078
1079         rc = mdt_fix_attr_ucred(info, op);
1080         if (rc != 0)
1081                 GOTO(out, rc);
1082
1083         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
1084                 struct mdt_client_data *mcd;
1085
1086                 mcd = req->rq_export->exp_mdt_data.med_mcd;
1087                 if (mcd->mcd_last_xid == req->rq_xid ||
1088                     mcd->mcd_last_close_xid == req->rq_xid) {
1089                         mdt_reconstruct(info, lhc);
1090                         rc = lustre_msg_get_status(req->rq_repmsg);
1091                         GOTO(out, rc);
1092                 }
1093                 DEBUG_REQ(D_HA, req, "no reply for RESENT (xid "LPD64")",
1094                           mcd->mcd_last_xid);
1095         }
1096         rc = mdt_reint_rec(info, lhc);
1097
1098 out:
1099         mdt_exit_ucred(info);
1100         RETURN(rc);
1101 }
1102
1103 static long mdt_reint_opcode(struct mdt_thread_info *info,
1104                              const struct req_format **fmt)
1105 {
1106         __u32 *ptr;
1107         long opc;
1108
1109         opc = err_serious(-EFAULT);
1110         ptr = req_capsule_client_get(&info->mti_pill, &RMF_REINT_OPC);
1111         if (ptr != NULL) {
1112                 opc = *ptr;
1113                 DEBUG_REQ(D_INODE, mdt_info_req(info), "reint opt = %ld", opc);
1114                 if (opc < REINT_MAX && fmt[opc] != NULL)
1115                         req_capsule_extend(&info->mti_pill, fmt[opc]);
1116                 else {
1117                         CERROR("Unsupported opc: %ld\n", opc);
1118                         opc = err_serious(opc);
1119                 }
1120         }
1121         return opc;
1122 }
1123
1124 static int mdt_reint(struct mdt_thread_info *info)
1125 {
1126         long opc;
1127         int  rc;
1128
1129         static const struct req_format *reint_fmts[REINT_MAX] = {
1130                 [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR,
1131                 [REINT_CREATE]  = &RQF_MDS_REINT_CREATE,
1132                 [REINT_LINK]    = &RQF_MDS_REINT_LINK,
1133                 [REINT_UNLINK]  = &RQF_MDS_REINT_UNLINK,
1134                 [REINT_RENAME]  = &RQF_MDS_REINT_RENAME,
1135                 [REINT_OPEN]    = &RQF_MDS_REINT_OPEN
1136         };
1137
1138         ENTRY;
1139
1140         opc = mdt_reint_opcode(info, reint_fmts);
1141         if (opc >= 0) {
1142                 /*
1143                  * No lock possible here from client to pass it to reint code
1144                  * path.
1145                  */
1146                 rc = mdt_reint_internal(info, NULL, opc);
1147         } else {
1148                 rc = opc;
1149         }
1150
1151         info->mti_fail_id = OBD_FAIL_MDS_REINT_NET_REP;
1152         RETURN(rc);
1153 }
1154
1155 /* TODO these two methods not available now. */
1156
1157 /* this should sync the whole device */
1158 static int mdt_device_sync(struct mdt_thread_info *info)
1159 {
1160         return 0;
1161 }
1162
1163 /* this should sync this object */
1164 static int mdt_object_sync(struct mdt_thread_info *info)
1165 {
1166         return 0;
1167 }
1168
1169 static int mdt_sync(struct mdt_thread_info *info)
1170 {
1171         struct req_capsule *pill = &info->mti_pill;
1172         struct mdt_body *body;
1173         int rc;
1174         ENTRY;
1175
1176         /* The fid may be zero, so we req_capsule_set manually */
1177         req_capsule_set(pill, &RQF_MDS_SYNC);
1178
1179         body = req_capsule_client_get(pill, &RMF_MDT_BODY);
1180         if (body == NULL)
1181                 RETURN(err_serious(-EINVAL));
1182
1183         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
1184                 RETURN(err_serious(-ENOMEM));
1185
1186         if (fid_seq(&body->fid1) == 0) {
1187                 /* sync the whole device */
1188                 rc = req_capsule_pack(pill);
1189                 if (rc == 0)
1190                         rc = mdt_device_sync(info);
1191                 else
1192                         rc = err_serious(rc);
1193         } else {
1194                 /* sync an object */
1195                 rc = mdt_unpack_req_pack_rep(info, HABEO_CORPUS|HABEO_REFERO);
1196                 if (rc == 0) {
1197                         rc = mdt_object_sync(info);
1198                         if (rc == 0) {
1199                                 struct md_object *next;
1200                                 const struct lu_fid *fid;
1201                                 struct lu_attr *la = &info->mti_attr.ma_attr;
1202
1203                                 next = mdt_object_child(info->mti_object);
1204                                 info->mti_attr.ma_need = MA_INODE;
1205                                 rc = mo_attr_get(info->mti_env, next,
1206                                                  &info->mti_attr);
1207                                 if (rc == 0) {
1208                                         body = req_capsule_server_get(pill,
1209                                                                 &RMF_MDT_BODY);
1210                                         fid = mdt_object_fid(info->mti_object);
1211                                         mdt_pack_attr2body(info, body, la, fid);
1212                                 }
1213                         }
1214                 } else
1215                         rc = err_serious(rc);
1216         }
1217         RETURN(rc);
1218 }
1219
1220 static int mdt_quotacheck_handle(struct mdt_thread_info *info)
1221 {
1222         return err_serious(-EOPNOTSUPP);
1223 }
1224
1225 static int mdt_quotactl_handle(struct mdt_thread_info *info)
1226 {
1227         return err_serious(-EOPNOTSUPP);
1228 }
1229
1230 /*
1231  * OBD PING and other handlers.
1232  */
1233 static int mdt_obd_ping(struct mdt_thread_info *info)
1234 {
1235         int rc;
1236         ENTRY;
1237         rc = target_handle_ping(mdt_info_req(info));
1238         if (rc < 0)
1239                 rc = err_serious(rc);
1240         RETURN(rc);
1241 }
1242
1243 static int mdt_obd_log_cancel(struct mdt_thread_info *info)
1244 {
1245         return err_serious(-EOPNOTSUPP);
1246 }
1247
1248 static int mdt_obd_qc_callback(struct mdt_thread_info *info)
1249 {
1250         return err_serious(-EOPNOTSUPP);
1251 }
1252
1253
1254 /*
1255  * DLM handlers.
1256  */
1257
1258 static struct ldlm_callback_suite cbs = {
1259         .lcs_completion = ldlm_server_completion_ast,
1260         .lcs_blocking   = ldlm_server_blocking_ast,
1261         .lcs_glimpse    = NULL
1262 };
1263
1264 static int mdt_enqueue(struct mdt_thread_info *info)
1265 {
1266         struct ptlrpc_request *req;
1267         int rc;
1268
1269         /*
1270          * info->mti_dlm_req already contains swapped and (if necessary)
1271          * converted dlm request.
1272          */
1273         LASSERT(info->mti_dlm_req != NULL);
1274
1275         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE)) {
1276                 info->mti_fail_id = OBD_FAIL_LDLM_ENQUEUE;
1277                 return 0;
1278         }
1279
1280         req = mdt_info_req(info);
1281         rc = ldlm_handle_enqueue0(info->mti_mdt->mdt_namespace,
1282                                   req, info->mti_dlm_req, &cbs);
1283         info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
1284         return rc ? err_serious(rc) : req->rq_status;
1285 }
1286
1287 static int mdt_convert(struct mdt_thread_info *info)
1288 {
1289         int rc;
1290         struct ptlrpc_request *req;
1291
1292         LASSERT(info->mti_dlm_req);
1293         req = mdt_info_req(info);
1294         rc = ldlm_handle_convert0(req, info->mti_dlm_req);
1295         return rc ? err_serious(rc) : req->rq_status;
1296 }
1297
1298 static int mdt_bl_callback(struct mdt_thread_info *info)
1299 {
1300         CERROR("bl callbacks should not happen on MDS\n");
1301         LBUG();
1302         return err_serious(-EOPNOTSUPP);
1303 }
1304
1305 static int mdt_cp_callback(struct mdt_thread_info *info)
1306 {
1307         CERROR("cp callbacks should not happen on MDS\n");
1308         LBUG();
1309         return err_serious(-EOPNOTSUPP);
1310 }
1311
1312 /*
1313  * sec context handlers
1314  */
1315 static int mdt_sec_ctx_handle(struct mdt_thread_info *info)
1316 {
1317         return mdt_handle_idmap(info);
1318 }
1319
1320 static struct mdt_object *mdt_obj(struct lu_object *o)
1321 {
1322         LASSERT(lu_device_is_mdt(o->lo_dev));
1323         return container_of0(o, struct mdt_object, mot_obj.mo_lu);
1324 }
1325
1326 struct mdt_object *mdt_object_find(const struct lu_env *env,
1327                                    struct mdt_device *d,
1328                                    const struct lu_fid *f)
1329 {
1330         struct lu_object *o;
1331         struct mdt_object *m;
1332         ENTRY;
1333
1334         o = lu_object_find(env, d->mdt_md_dev.md_lu_dev.ld_site, f);
1335         if (IS_ERR(o))
1336                 m = (struct mdt_object *)o;
1337         else
1338                 m = mdt_obj(o);
1339         RETURN(m);
1340 }
1341
1342 int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
1343                     struct mdt_lock_handle *lh, __u64 ibits)
1344 {
1345         ldlm_policy_data_t *policy = &info->mti_policy;
1346         struct ldlm_res_id *res_id = &info->mti_res_id;
1347         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1348         int rc;
1349         ENTRY;
1350
1351         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
1352         LASSERT(lh->mlh_mode != LCK_MINMODE);
1353         if (mdt_object_exists(o) < 0) {
1354                 LASSERT(!(ibits & MDS_INODELOCK_UPDATE));
1355                 LASSERT(ibits & MDS_INODELOCK_LOOKUP);
1356         }
1357         policy->l_inodebits.bits = ibits;
1358
1359         rc = fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode,
1360                       policy, res_id);
1361         RETURN(rc);
1362 }
1363
1364 /* lock with cross-ref fixes */
1365 int mdt_object_cr_lock(struct mdt_thread_info *info, struct mdt_object *o,
1366                        struct mdt_lock_handle *lh, __u64 ibits)
1367 {
1368         if (mdt_object_exists(o) < 0) {
1369                 /* cross-ref object fix */
1370                 ibits &= ~MDS_INODELOCK_UPDATE;
1371                 ibits |= MDS_INODELOCK_LOOKUP;
1372         }
1373         return mdt_object_lock(info, o, lh, ibits);
1374 }
1375
1376 /*
1377  * Just call ldlm_lock_decref() if decref, else we only call ptlrpc_save_lock()
1378  * to save this lock in req.  when transaction committed, req will be released,
1379  * and lock will, too.
1380  */
1381 void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
1382                        struct mdt_lock_handle *lh, int decref)
1383 {
1384         struct ptlrpc_request *req    = mdt_info_req(info);
1385         struct lustre_handle  *handle = &lh->mlh_lh;
1386         ldlm_mode_t            mode   = lh->mlh_mode;
1387         ENTRY;
1388
1389         if (lustre_handle_is_used(handle)) {
1390                 if (decref)
1391                         fid_unlock(mdt_object_fid(o), handle, mode);
1392                 else
1393                         ptlrpc_save_lock(req, handle, mode);
1394                 handle->cookie = 0;
1395         }
1396         EXIT;
1397 }
1398
1399 struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info,
1400                                         const struct lu_fid *f,
1401                                         struct mdt_lock_handle *lh,
1402                                         __u64 ibits)
1403 {
1404         struct mdt_object *o;
1405
1406         o = mdt_object_find(info->mti_env, info->mti_mdt, f);
1407         if (!IS_ERR(o)) {
1408                 int rc;
1409
1410                 rc = mdt_object_lock(info, o, lh, ibits);
1411                 if (rc != 0) {
1412                         mdt_object_put(info->mti_env, o);
1413                         o = ERR_PTR(rc);
1414                 }
1415         }
1416         return o;
1417 }
1418
1419 void mdt_object_unlock_put(struct mdt_thread_info * info,
1420                            struct mdt_object * o,
1421                            struct mdt_lock_handle *lh,
1422                            int decref)
1423 {
1424         mdt_object_unlock(info, o, lh, decref);
1425         mdt_object_put(info->mti_env, o);
1426 }
1427
1428 static struct mdt_handler *mdt_handler_find(__u32 opc,
1429                                             struct mdt_opc_slice *supported)
1430 {
1431         struct mdt_opc_slice *s;
1432         struct mdt_handler   *h;
1433
1434         h = NULL;
1435         for (s = supported; s->mos_hs != NULL; s++) {
1436                 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
1437                         h = s->mos_hs + (opc - s->mos_opc_start);
1438                         if (h->mh_opc != 0)
1439                                 LASSERT(h->mh_opc == opc);
1440                         else
1441                                 h = NULL; /* unsupported opc */
1442                         break;
1443                 }
1444         }
1445         return h;
1446 }
1447
1448 static inline __u64 req_exp_last_xid(struct ptlrpc_request *req)
1449 {
1450         return le64_to_cpu(req->rq_export->exp_mdt_data.med_mcd->mcd_last_xid);
1451 }
1452
1453 static inline __u64 req_exp_last_close_xid(struct ptlrpc_request *req)
1454 {
1455         return le64_to_cpu(req->rq_export->exp_mdt_data.med_mcd->mcd_last_close_xid);
1456 }
1457
1458 static int mdt_lock_resname_compat(struct mdt_device *m,
1459                                    struct ldlm_request *req)
1460 {
1461         /* XXX something... later. */
1462         return 0;
1463 }
1464
1465 static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
1466 {
1467         /* XXX something... later. */
1468         return 0;
1469 }
1470
1471 /*
1472  * Generic code handling requests that have struct mdt_body passed in:
1473  *
1474  *  - extract mdt_body from request and save it in @info, if present;
1475  *
1476  *  - create lu_object, corresponding to the fid in mdt_body, and save it in
1477  *  @info;
1478  *
1479  *  - if HABEO_CORPUS flag is set for this request type check whether object
1480  *  actually exists on storage (lu_object_exists()).
1481  *
1482  */
1483 static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags)
1484 {
1485         const struct mdt_body    *body;
1486         struct mdt_object        *obj;
1487         const struct lu_env      *env;
1488         struct req_capsule       *pill;
1489         int                       rc;
1490
1491         env = info->mti_env;
1492         pill = &info->mti_pill;
1493
1494         body = info->mti_body = req_capsule_client_get(pill, &RMF_MDT_BODY);
1495         if (body == NULL)
1496                 return -EFAULT;
1497
1498         if (!fid_is_sane(&body->fid1)) {
1499                 CERROR("Invalid fid: "DFID"\n", PFID(&body->fid1));
1500                 return -EINVAL;
1501         }
1502
1503         /*
1504          * Do not get size or any capa fields before we check that request
1505          * contains capa actually. There are some requests which do not, for
1506          * instance MDS_IS_SUBDIR.
1507          */
1508         if (req_capsule_has_field(pill, &RMF_CAPA1, RCL_CLIENT) &&
1509             req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
1510                 mdt_set_capainfo(info, 0, &body->fid1,
1511                                  req_capsule_client_get(pill, &RMF_CAPA1));
1512
1513         obj = mdt_object_find(env, info->mti_mdt, &body->fid1);
1514         if (!IS_ERR(obj)) {
1515                 if ((flags & HABEO_CORPUS) &&
1516                     !mdt_object_exists(obj)) {
1517                         mdt_object_put(env, obj);
1518                         /* for capability renew ENOENT will be handled in 
1519                          * mdt_renew_capa */
1520                         if (body->valid & OBD_MD_FLOSSCAPA)
1521                                 rc = 0;
1522                         else
1523                                 rc = -ENOENT;
1524                 } else {
1525                         info->mti_object = obj;
1526                         rc = 0;
1527                 }
1528         } else
1529                 rc = PTR_ERR(obj);
1530
1531         return rc;
1532 }
1533
1534 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags)
1535 {
1536         struct req_capsule *pill;
1537         int rc;
1538
1539         ENTRY;
1540         pill = &info->mti_pill;
1541
1542         if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT))
1543                 rc = mdt_body_unpack(info, flags);
1544         else
1545                 rc = 0;
1546
1547         if (rc == 0 && (flags & HABEO_REFERO)) {
1548                 struct mdt_device       *mdt = info->mti_mdt;
1549                 /*pack reply*/
1550                 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
1551                         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
1552                                              mdt->mdt_max_mdsize);
1553                 if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
1554                         req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
1555                                              mdt->mdt_max_cookiesize);
1556
1557                 rc = req_capsule_pack(pill);
1558         }
1559         RETURN(rc);
1560 }
1561
1562 #if 0
1563 struct lu_context_key mdt_txn_key;
1564 static inline void mdt_finish_reply(struct mdt_thread_info *info, int rc)
1565 {
1566         struct mdt_device     *mdt = info->mti_mdt;
1567         struct ptlrpc_request *req = mdt_info_req(info);
1568         struct obd_export     *exp = req->rq_export;
1569
1570         /* sometimes the reply message has not been successfully packed */
1571         if (mdt == NULL || req == NULL || req->rq_repmsg == NULL)
1572                 return;
1573
1574         if (info->mti_trans_flags & MDT_NONEED_TRANSNO)
1575                 return;
1576
1577         /*XXX: assert on this when all code will be finished */
1578         if (rc != 0 && info->mti_transno != 0) {
1579                 info->mti_transno = 0;
1580                 CERROR("Transno is not 0 while rc is %i!\n", rc);
1581         }
1582
1583         CDEBUG(D_INODE, "transno = %llu, last_committed = %llu\n",
1584                info->mti_transno, exp->exp_obd->obd_last_committed);
1585
1586         spin_lock(&mdt->mdt_transno_lock);
1587         req->rq_transno = info->mti_transno;
1588         lustre_msg_set_transno(req->rq_repmsg, info->mti_transno);
1589
1590         target_committed_to_req(req);
1591
1592         spin_unlock(&mdt->mdt_transno_lock);
1593         lustre_msg_set_last_xid(req->rq_repmsg, req_exp_last_xid(req));
1594         //lustre_msg_set_last_xid(req->rq_repmsg, req->rq_xid);
1595 }
1596 #endif
1597
1598
1599 static int mdt_init_capa_ctxt(const struct lu_env *env, struct mdt_device *m)
1600 {
1601         struct md_device *next = m->mdt_child;
1602
1603         return next->md_ops->mdo_init_capa_ctxt(env, next,
1604                                                 m->mdt_opts.mo_mds_capa,
1605                                                 m->mdt_capa_timeout,
1606                                                 m->mdt_capa_alg,
1607                                                 m->mdt_capa_keys);
1608 }
1609
1610 /*
1611  * Invoke handler for this request opc. Also do necessary preprocessing
1612  * (according to handler ->mh_flags), and post-processing (setting of
1613  * ->last_{xid,committed}).
1614  */
1615 static int mdt_req_handle(struct mdt_thread_info *info,
1616                           struct mdt_handler *h, struct ptlrpc_request *req)
1617 {
1618         int   rc, serious = 0;
1619         __u32 flags;
1620
1621         ENTRY;
1622
1623         LASSERT(h->mh_act != NULL);
1624         LASSERT(h->mh_opc == lustre_msg_get_opc(req->rq_reqmsg));
1625         LASSERT(current->journal_info == NULL);
1626
1627         DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
1628
1629         /*
1630          * Do not use *_FAIL_CHECK_ONCE() macros, because they will stop
1631          * correct handling of failed req later in ldlm due to doing
1632          * obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED without actually
1633          * correct actions like it is done in target_send_reply_msg().
1634          */
1635         if (h->mh_fail_id != 0) {
1636                 /*
1637                  * Set to info->mti_fail_id to handler fail_id, it will be used
1638                  * later, and better than use default fail_id.
1639                  */
1640                 if (OBD_FAIL_CHECK(h->mh_fail_id)) {
1641                         info->mti_fail_id = h->mh_fail_id;
1642                         RETURN(0);
1643                 }
1644         }
1645
1646         rc = 0;
1647         flags = h->mh_flags;
1648         LASSERT(ergo(flags & (HABEO_CORPUS|HABEO_REFERO), h->mh_fmt != NULL));
1649
1650         if (h->mh_fmt != NULL) {
1651                 req_capsule_set(&info->mti_pill, h->mh_fmt);
1652                 rc = mdt_unpack_req_pack_rep(info, flags);
1653         }
1654
1655         if (rc == 0 && flags & MUTABOR &&
1656             req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
1657                 /* should it be rq_status? */
1658                 rc = -EROFS;
1659
1660         if (rc == 0 && flags & HABEO_CLAVIS) {
1661                 struct ldlm_request *dlm_req;
1662
1663                 LASSERT(h->mh_fmt != NULL);
1664
1665                 dlm_req = req_capsule_client_get(&info->mti_pill, &RMF_DLM_REQ);
1666                 if (dlm_req != NULL) {
1667                         if (info->mti_mdt->mdt_opts.mo_compat_resname)
1668                                 rc = mdt_lock_resname_compat(info->mti_mdt,
1669                                                              dlm_req);
1670                         info->mti_dlm_req = dlm_req;
1671                 } else {
1672                         CERROR("Can't unpack dlm request\n");
1673                         rc = -EFAULT;
1674                 }
1675         }
1676
1677         /* capability setting changed via /proc, needs reinitialize ctxt */
1678         if (info->mti_mdt && info->mti_mdt->mdt_capa_conf) {
1679                 mdt_init_capa_ctxt(info->mti_env, info->mti_mdt);
1680                 info->mti_mdt->mdt_capa_conf = 0;
1681         }
1682
1683         if (rc == 0) {
1684                 /*
1685                  * Process request, there can be two types of rc:
1686                  * 1) errors with msg unpack/pack, other failures outside the
1687                  * operation itself. This is counted as serious errors;
1688                  * 2) errors during fs operation, should be placed in rq_status
1689                  * only
1690                  */
1691                 rc = h->mh_act(info);
1692                 serious = is_serious(rc);
1693                 rc = clear_serious(rc);
1694         } else
1695                 serious = 1;
1696
1697         req->rq_status = rc;
1698
1699         /*
1700          * ELDLM_* codes which > 0 should be in rq_status only as well as
1701          * all non-serious errors.
1702          */
1703         if (rc > 0 || !serious)
1704                 rc = 0;
1705
1706         LASSERT(current->journal_info == NULL);
1707
1708         if (rc == 0 && (flags & HABEO_CLAVIS)
1709             && info->mti_mdt->mdt_opts.mo_compat_resname) {
1710                 struct ldlm_reply *dlmrep;
1711
1712                 dlmrep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1713                 if (dlmrep != NULL)
1714                         rc = mdt_lock_reply_compat(info->mti_mdt, dlmrep);
1715         }
1716
1717         /* If we're DISCONNECTing, the mdt_export_data is already freed */
1718         if (rc == 0 && h->mh_opc != MDS_DISCONNECT)
1719                 target_committed_to_req(req);
1720
1721         RETURN(rc);
1722 }
1723
1724 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
1725 {
1726         lh->mlh_lh.cookie = 0ull;
1727         lh->mlh_mode = LCK_MINMODE;
1728 }
1729
1730 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
1731 {
1732         LASSERT(!lustre_handle_is_used(&lh->mlh_lh));
1733 }
1734
1735 static void mdt_thread_info_init(struct ptlrpc_request *req,
1736                                  struct mdt_thread_info *info)
1737 {
1738         int i;
1739
1740         LASSERT(info->mti_env != req->rq_svc_thread->t_env);
1741         memset(info, 0, sizeof(*info));
1742
1743         info->mti_rep_buf_nr = ARRAY_SIZE(info->mti_rep_buf_size);
1744         for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
1745                 info->mti_rep_buf_size[i] = -1;
1746
1747         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
1748                 mdt_lock_handle_init(&info->mti_lh[i]);
1749
1750         info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
1751         info->mti_env = req->rq_svc_thread->t_env;
1752         info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg);
1753
1754         /* it can be NULL while CONNECT */
1755         if (req->rq_export)
1756                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
1757         req_capsule_init(&info->mti_pill, req, RCL_SERVER,
1758                          info->mti_rep_buf_size);
1759 }
1760
1761 static void mdt_thread_info_fini(struct mdt_thread_info *info)
1762 {
1763         int i;
1764
1765         req_capsule_fini(&info->mti_pill);
1766         if (info->mti_object != NULL) {
1767                 mdt_object_put(info->mti_env, info->mti_object);
1768                 info->mti_object = NULL;
1769         }
1770         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
1771                 mdt_lock_handle_fini(&info->mti_lh[i]);
1772         info->mti_env = NULL;
1773 }
1774
1775 /* mds/handler.c */
1776 extern int mds_filter_recovery_request(struct ptlrpc_request *req,
1777                                        struct obd_device *obd, int *process);
1778 /*
1779  * Handle recovery. Return:
1780  *        +1: continue request processing;
1781  *       -ve: abort immediately with the given error code;
1782  *         0: send reply with error code in req->rq_status;
1783  */
1784 static int mdt_recovery(struct mdt_thread_info *info)
1785 {
1786         struct ptlrpc_request *req = mdt_info_req(info);
1787         int recovering;
1788         struct obd_device *obd;
1789
1790         ENTRY;
1791
1792         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1793         case MDS_CONNECT:
1794         case SEC_CTX_INIT:
1795         case SEC_CTX_INIT_CONT:
1796         case SEC_CTX_FINI:
1797                 mdt_handle_idmap(info);
1798                 RETURN(+1);
1799         }
1800
1801         if (req->rq_export == NULL) {
1802                 CERROR("operation %d on unconnected MDS from %s\n",
1803                        lustre_msg_get_opc(req->rq_reqmsg),
1804                        libcfs_id2str(req->rq_peer));
1805                 req->rq_status = -ENOTCONN;
1806                 target_send_reply(req, -ENOTCONN, info->mti_fail_id);
1807                 RETURN(0);
1808         }
1809
1810         /* sanity check: if the xid matches, the request must be marked as a
1811          * resent or replayed */
1812         if (req->rq_xid == req_exp_last_xid(req) ||
1813             req->rq_xid == req_exp_last_close_xid(req)) {
1814                 if (!(lustre_msg_get_flags(req->rq_reqmsg) &
1815                       (MSG_RESENT | MSG_REPLAY))) {
1816                         CERROR("rq_xid "LPU64" matches last_xid, "
1817                                 "expected RESENT flag\n", req->rq_xid);
1818                         LBUG();
1819                         req->rq_status = -ENOTCONN;
1820                         RETURN(-ENOTCONN);
1821                 }
1822         }
1823
1824         /* else: note the opposite is not always true; a RESENT req after a
1825          * failover will usually not match the last_xid, since it was likely
1826          * never committed. A REPLAYed request will almost never match the
1827          * last xid, however it could for a committed, but still retained,
1828          * open. */
1829
1830         obd = req->rq_export->exp_obd;
1831
1832         /* Check for aborted recovery... */
1833         spin_lock_bh(&obd->obd_processing_task_lock);
1834         recovering = obd->obd_recovering;
1835         spin_unlock_bh(&obd->obd_processing_task_lock);
1836         if (recovering) {
1837                 int rc;
1838                 int should_process;
1839                 DEBUG_REQ(D_WARNING, req, "Got new replay");
1840                 rc = mds_filter_recovery_request(req, obd, &should_process);
1841                 if (rc != 0 || !should_process)
1842                         RETURN(rc);
1843                 else if (should_process < 0) {
1844                         req->rq_status = should_process;
1845                         rc = ptlrpc_error(req);
1846                         RETURN(rc);
1847                 }
1848         }
1849         RETURN(+1);
1850 }
1851
1852 static int mdt_reply(struct ptlrpc_request *req, int rc,
1853                      struct mdt_thread_info *info)
1854 {
1855         ENTRY;
1856
1857 #if 0
1858         if (req->rq_reply_state == NULL && rc == 0) {
1859                 req->rq_status = rc;
1860                 lustre_pack_reply(req, 1, NULL, NULL);
1861         }
1862 #endif
1863         target_send_reply(req, rc, info->mti_fail_id);
1864         RETURN(0);
1865 }
1866
1867 /* mds/handler.c */
1868 extern int mds_msg_check_version(struct lustre_msg *msg);
1869
1870 static int mdt_handle0(struct ptlrpc_request *req,
1871                        struct mdt_thread_info *info,
1872                        struct mdt_opc_slice *supported)
1873 {
1874         struct mdt_handler *h;
1875         struct lustre_msg  *msg;
1876         int                 rc;
1877
1878         ENTRY;
1879
1880         MDT_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
1881
1882         LASSERT(current->journal_info == NULL);
1883
1884         msg = req->rq_reqmsg;
1885         rc = mds_msg_check_version(msg);
1886         if (rc == 0) {
1887                 rc = mdt_recovery(info);
1888                 if (rc == +1) {
1889                         h = mdt_handler_find(lustre_msg_get_opc(msg),
1890                                              supported);
1891                         if (h != NULL) {
1892                                 rc = mdt_req_handle(info, h, req);
1893                                 rc = mdt_reply(req, rc, info);
1894                         } else {
1895                                 req->rq_status = -ENOTSUPP;
1896                                 rc = ptlrpc_error(req);
1897                                 RETURN(rc);
1898                         }
1899                 }
1900         } else
1901                 CERROR(LUSTRE_MDT_NAME" drops mal-formed request\n");
1902         RETURN(rc);
1903 }
1904
1905 /*
1906  * MDT handler function called by ptlrpc service thread when request comes.
1907  *
1908  * XXX common "target" functionality should be factored into separate module
1909  * shared by mdt, ost and stand-alone services like fld.
1910  */
1911 static int mdt_handle_common(struct ptlrpc_request *req,
1912                              struct mdt_opc_slice *supported)
1913 {
1914         struct lu_env          *env;
1915         struct mdt_thread_info *info;
1916         int                     rc;
1917         ENTRY;
1918
1919         env = req->rq_svc_thread->t_env;
1920         LASSERT(env != NULL);
1921         LASSERT(env->le_ses != NULL);
1922         LASSERT(env->le_ctx.lc_thread == req->rq_svc_thread);
1923         info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
1924         LASSERT(info != NULL);
1925
1926         mdt_thread_info_init(req, info);
1927
1928         rc = mdt_handle0(req, info, supported);
1929
1930         mdt_thread_info_fini(info);
1931         RETURN(rc);
1932 }
1933
1934 /*
1935  * This is called from recovery code as handler of _all_ RPC types, FLD and SEQ
1936  * as well.
1937  */
1938 int mdt_recovery_handle(struct ptlrpc_request *req)
1939 {
1940         int rc;
1941         ENTRY;
1942
1943         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
1944         case FLD_QUERY:
1945                 rc = mdt_handle_common(req, mdt_fld_handlers);
1946                 break;
1947         case SEQ_QUERY:
1948                 rc = mdt_handle_common(req, mdt_seq_handlers);
1949                 break;
1950         default:
1951                 rc = mdt_handle_common(req, mdt_regular_handlers);
1952                 break;
1953         }
1954
1955         RETURN(rc);
1956 }
1957
1958 static int mdt_regular_handle(struct ptlrpc_request *req)
1959 {
1960         return mdt_handle_common(req, mdt_regular_handlers);
1961 }
1962
1963 static int mdt_readpage_handle(struct ptlrpc_request *req)
1964 {
1965         return mdt_handle_common(req, mdt_readpage_handlers);
1966 }
1967
1968 static int mdt_mdsc_handle(struct ptlrpc_request *req)
1969 {
1970         return mdt_handle_common(req, mdt_seq_handlers);
1971 }
1972
1973 static int mdt_mdss_handle(struct ptlrpc_request *req)
1974 {
1975         return mdt_handle_common(req, mdt_seq_handlers);
1976 }
1977
1978 static int mdt_dtss_handle(struct ptlrpc_request *req)
1979 {
1980         return mdt_handle_common(req, mdt_seq_handlers);
1981 }
1982
1983 static int mdt_fld_handle(struct ptlrpc_request *req)
1984 {
1985         return mdt_handle_common(req, mdt_fld_handlers);
1986 }
1987
1988 enum mdt_it_code {
1989         MDT_IT_OPEN,
1990         MDT_IT_OCREAT,
1991         MDT_IT_CREATE,
1992         MDT_IT_GETATTR,
1993         MDT_IT_READDIR,
1994         MDT_IT_LOOKUP,
1995         MDT_IT_UNLINK,
1996         MDT_IT_TRUNC,
1997         MDT_IT_GETXATTR,
1998         MDT_IT_NR
1999 };
2000
2001 static int mdt_intent_getattr(enum mdt_it_code opcode,
2002                               struct mdt_thread_info *info,
2003                               struct ldlm_lock **,
2004                               int);
2005 static int mdt_intent_reint(enum mdt_it_code opcode,
2006                             struct mdt_thread_info *info,
2007                             struct ldlm_lock **,
2008                             int);
2009
2010 static struct mdt_it_flavor {
2011         const struct req_format *it_fmt;
2012         __u32                    it_flags;
2013         int                    (*it_act)(enum mdt_it_code ,
2014                                          struct mdt_thread_info *,
2015                                          struct ldlm_lock **,
2016                                          int);
2017         long                     it_reint;
2018 } mdt_it_flavor[] = {
2019         [MDT_IT_OPEN]     = {
2020                 .it_fmt   = &RQF_LDLM_INTENT,
2021                 /*.it_flags = HABEO_REFERO,*/
2022                 .it_flags = 0,
2023                 .it_act   = mdt_intent_reint,
2024                 .it_reint = REINT_OPEN
2025         },
2026         [MDT_IT_OCREAT]   = {
2027                 .it_fmt   = &RQF_LDLM_INTENT,
2028                 .it_flags = MUTABOR,
2029                 .it_act   = mdt_intent_reint,
2030                 .it_reint = REINT_OPEN
2031         },
2032         [MDT_IT_CREATE]   = {
2033                 .it_fmt   = &RQF_LDLM_INTENT,
2034                 .it_flags = MUTABOR,
2035                 .it_act   = mdt_intent_reint,
2036                 .it_reint = REINT_CREATE
2037         },
2038         [MDT_IT_GETATTR]  = {
2039                 .it_fmt   = &RQF_LDLM_INTENT_GETATTR,
2040                 .it_flags = HABEO_REFERO,
2041                 .it_act   = mdt_intent_getattr
2042         },
2043         [MDT_IT_READDIR]  = {
2044                 .it_fmt   = NULL,
2045                 .it_flags = 0,
2046                 .it_act   = NULL
2047         },
2048         [MDT_IT_LOOKUP]   = {
2049                 .it_fmt   = &RQF_LDLM_INTENT_GETATTR,
2050                 .it_flags = HABEO_REFERO,
2051                 .it_act   = mdt_intent_getattr
2052         },
2053         [MDT_IT_UNLINK]   = {
2054                 .it_fmt   = &RQF_LDLM_INTENT_UNLINK,
2055                 .it_flags = MUTABOR,
2056                 .it_act   = NULL, /* XXX can be mdt_intent_reint, ? */
2057                 .it_reint = REINT_UNLINK
2058         },
2059         [MDT_IT_TRUNC]    = {
2060                 .it_fmt   = NULL,
2061                 .it_flags = MUTABOR,
2062                 .it_act   = NULL
2063         },
2064         [MDT_IT_GETXATTR] = {
2065                 .it_fmt   = NULL,
2066                 .it_flags = 0,
2067                 .it_act   = NULL
2068         }
2069 };
2070
2071 int mdt_intent_lock_replace(struct mdt_thread_info *info,
2072                             struct ldlm_lock **lockp,
2073                             struct ldlm_lock *new_lock,
2074                             struct mdt_lock_handle *lh,
2075                             int flags)
2076 {
2077         struct ptlrpc_request  *req = mdt_info_req(info);
2078         struct ldlm_lock       *lock = *lockp;
2079
2080         /*
2081          * Get new lock only for cases when possible resent did not find any
2082          * lock.
2083          */
2084         if (new_lock == NULL)
2085                 new_lock = ldlm_handle2lock(&lh->mlh_lh);
2086
2087         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY)) {
2088                 lh->mlh_lh.cookie = 0;
2089                 RETURN(0);
2090         }
2091
2092         LASSERTF(new_lock != NULL,
2093                  "lockh "LPX64"\n", lh->mlh_lh.cookie);
2094
2095         /*
2096          * If we've already given this lock to a client once, then we should
2097          * have no readers or writers.  Otherwise, we should have one reader
2098          * _or_ writer ref (which will be zeroed below) before returning the
2099          * lock to a client.
2100          */
2101         if (new_lock->l_export == req->rq_export) {
2102                 LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
2103         } else {
2104                 LASSERT(new_lock->l_export == NULL);
2105                 LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
2106         }
2107
2108         *lockp = new_lock;
2109
2110         if (new_lock->l_export == req->rq_export) {
2111                 /*
2112                  * Already gave this to the client, which means that we
2113                  * reconstructed a reply.
2114                  */
2115                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
2116                         MSG_RESENT);
2117                 lh->mlh_lh.cookie = 0;
2118                 RETURN(ELDLM_LOCK_REPLACED);
2119         }
2120
2121         /* Fixup the lock to be given to the client */
2122         lock_res_and_lock(new_lock);
2123         new_lock->l_readers = 0;
2124         new_lock->l_writers = 0;
2125
2126         new_lock->l_export = class_export_get(req->rq_export);
2127         list_add(&new_lock->l_export_chain,
2128                  &new_lock->l_export->exp_ldlm_data.led_held_locks);
2129
2130         new_lock->l_blocking_ast = lock->l_blocking_ast;
2131         new_lock->l_completion_ast = lock->l_completion_ast;
2132         new_lock->l_remote_handle = lock->l_remote_handle;
2133         new_lock->l_flags &= ~LDLM_FL_LOCAL;
2134
2135         unlock_res_and_lock(new_lock);
2136         LDLM_LOCK_PUT(new_lock);
2137         lh->mlh_lh.cookie = 0;
2138
2139         RETURN(ELDLM_LOCK_REPLACED);
2140 }
2141
2142 static void mdt_intent_fixup_resent(struct req_capsule *pill,
2143                                     struct ldlm_lock *new_lock,
2144                                     struct ldlm_lock **old_lock,
2145                                     struct mdt_lock_handle *lh)
2146 {
2147         struct ptlrpc_request  *req = pill->rc_req;
2148         struct obd_export      *exp = req->rq_export;
2149         struct lustre_handle    remote_hdl;
2150         struct ldlm_request    *dlmreq;
2151         struct list_head       *iter;
2152
2153         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
2154                 return;
2155
2156         dlmreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
2157         remote_hdl = dlmreq->lock_handle1;
2158
2159         spin_lock(&exp->exp_ldlm_data.led_lock);
2160         list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
2161                 struct ldlm_lock *lock;
2162                 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
2163                 if (lock == new_lock)
2164                         continue;
2165                 if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
2166                         lh->mlh_lh.cookie = lock->l_handle.h_cookie;
2167                         lh->mlh_mode = lock->l_granted_mode;
2168
2169                         LDLM_DEBUG(lock, "restoring lock cookie");
2170                         DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
2171                                   lh->mlh_lh.cookie);
2172                         if (old_lock)
2173                                 *old_lock = LDLM_LOCK_GET(lock);
2174                         spin_unlock(&exp->exp_ldlm_data.led_lock);
2175                         return;
2176                 }
2177         }
2178         spin_unlock(&exp->exp_ldlm_data.led_lock);
2179
2180         /*
2181          * If the xid matches, then we know this is a resent request, and allow
2182          * it. (It's probably an OPEN, for which we don't send a lock.
2183          */
2184         if (req->rq_xid == req_exp_last_xid(req))
2185                 return;
2186
2187         if (req->rq_xid == req_exp_last_close_xid(req))
2188                 return;
2189
2190         /*
2191          * This remote handle isn't enqueued, so we never received or processed
2192          * this request.  Clear MSG_RESENT, because it can be handled like any
2193          * normal request now.
2194          */
2195         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
2196
2197         DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
2198                   remote_hdl.cookie);
2199 }
2200
2201 static int mdt_intent_getattr(enum mdt_it_code opcode,
2202                               struct mdt_thread_info *info,
2203                               struct ldlm_lock **lockp,
2204                               int flags)
2205 {
2206         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
2207         struct ldlm_lock       *new_lock = NULL;
2208         __u64                   child_bits;
2209         struct ldlm_reply      *ldlm_rep;
2210         struct ptlrpc_request  *req;
2211         struct mdt_body        *reqbody;
2212         int                     rc;
2213
2214         ENTRY;
2215
2216         switch (opcode) {
2217         case MDT_IT_LOOKUP:
2218                 child_bits = MDS_INODELOCK_LOOKUP;
2219                 break;
2220         case MDT_IT_GETATTR:
2221                 child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
2222                 break;
2223         default:
2224                 CERROR("Unhandled till now");
2225                 GOTO(out, rc = -EINVAL);
2226         }
2227
2228         reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
2229         if (reqbody == NULL)
2230                 GOTO(out, rc = err_serious(-EFAULT));
2231
2232         rc = mdt_init_ucred(info, reqbody);
2233         if (rc)
2234                 GOTO(out, rc);
2235
2236         req = info->mti_pill.rc_req;
2237         ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
2238         mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
2239
2240         /* Get lock from request for possible resent case. */
2241         mdt_intent_fixup_resent(&info->mti_pill, *lockp, &new_lock, lhc);
2242
2243         ldlm_rep->lock_policy_res2 =
2244                 mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep);
2245
2246         if (mdt_get_disposition(ldlm_rep, DISP_LOOKUP_NEG))
2247                 ldlm_rep->lock_policy_res2 = 0;
2248         if (!mdt_get_disposition(ldlm_rep, DISP_LOOKUP_POS) ||
2249             ldlm_rep->lock_policy_res2) {
2250                 lhc->mlh_lh.cookie = 0ull;
2251                 GOTO(out_ucred, rc = ELDLM_LOCK_ABORTED);
2252         }
2253
2254         rc = mdt_intent_lock_replace(info, lockp, new_lock, lhc, flags);
2255 out_ucred:
2256         mdt_exit_ucred(info);
2257         GOTO(out, rc);
2258 out:
2259         mdt_shrink_reply(info, DLM_REPLY_REC_OFF + 1, 1, 0);
2260         return rc;
2261 }
2262
2263 static int mdt_intent_reint(enum mdt_it_code opcode,
2264                             struct mdt_thread_info *info,
2265                             struct ldlm_lock **lockp,
2266                             int flags)
2267 {
2268         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
2269         struct ldlm_reply      *rep;
2270         long                    opc;
2271         int                     rc;
2272
2273         static const struct req_format *intent_fmts[REINT_MAX] = {
2274                 [REINT_CREATE]  = &RQF_LDLM_INTENT_CREATE,
2275                 [REINT_OPEN]    = &RQF_LDLM_INTENT_OPEN
2276         };
2277
2278         ENTRY;
2279
2280         opc = mdt_reint_opcode(info, intent_fmts);
2281         if (opc < 0)
2282                 RETURN(opc);
2283
2284         if (mdt_it_flavor[opcode].it_reint != opc) {
2285                 CERROR("Reint code %ld doesn't match intent: %d\n",
2286                        opc, opcode);
2287                 RETURN(err_serious(-EPROTO));
2288         }
2289
2290         /* Get lock from request for possible resent case. */
2291         mdt_intent_fixup_resent(&info->mti_pill, *lockp, NULL, lhc);
2292
2293         rc = mdt_reint_internal(info, lhc, opc);
2294
2295         rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
2296         if (rep == NULL)
2297                 RETURN(err_serious(-EFAULT));
2298
2299         /* MDC expects this in any case */
2300         if (rc != 0)
2301                 mdt_set_disposition(info, rep, DISP_LOOKUP_EXECD);
2302
2303         /* cross-ref case, the lock should be returned to the client */
2304         if (rc == -EREMOTE) {
2305                 LASSERT(lustre_handle_is_used(&lhc->mlh_lh));
2306                 rep->lock_policy_res2 = 0;
2307                 RETURN(mdt_intent_lock_replace(info, lockp, NULL, lhc, flags));
2308         }
2309         rep->lock_policy_res2 = clear_serious(rc);
2310
2311         lhc->mlh_lh.cookie = 0ull;
2312         RETURN(ELDLM_LOCK_ABORTED);
2313 }
2314
2315 static int mdt_intent_code(long itcode)
2316 {
2317         int rc;
2318
2319         switch(itcode) {
2320         case IT_OPEN:
2321                 rc = MDT_IT_OPEN;
2322                 break;
2323         case IT_OPEN|IT_CREAT:
2324                 rc = MDT_IT_OCREAT;
2325                 break;
2326         case IT_CREAT:
2327                 rc = MDT_IT_CREATE;
2328                 break;
2329         case IT_READDIR:
2330                 rc = MDT_IT_READDIR;
2331                 break;
2332         case IT_GETATTR:
2333                 rc = MDT_IT_GETATTR;
2334                 break;
2335         case IT_LOOKUP:
2336                 rc = MDT_IT_LOOKUP;
2337                 break;
2338         case IT_UNLINK:
2339                 rc = MDT_IT_UNLINK;
2340                 break;
2341         case IT_TRUNC:
2342                 rc = MDT_IT_TRUNC;
2343                 break;
2344         case IT_GETXATTR:
2345                 rc = MDT_IT_GETXATTR;
2346                 break;
2347         default:
2348                 CERROR("Unknown intent opcode: %ld\n", itcode);
2349                 rc = -EINVAL;
2350                 break;
2351         }
2352         return rc;
2353 }
2354
2355 static int mdt_intent_opc(long itopc, struct mdt_thread_info *info,
2356                           struct ldlm_lock **lockp, int flags)
2357 {
2358         struct req_capsule   *pill;
2359         struct mdt_it_flavor *flv;
2360         int opc;
2361         int rc;
2362         ENTRY;
2363
2364         opc = mdt_intent_code(itopc);
2365         if (opc < 0)
2366                 RETURN(-EINVAL);
2367
2368         pill = &info->mti_pill;
2369         flv  = &mdt_it_flavor[opc];
2370
2371         if (flv->it_fmt != NULL)
2372                 req_capsule_extend(pill, flv->it_fmt);
2373
2374         rc = mdt_unpack_req_pack_rep(info, flv->it_flags);
2375         if (rc == 0) {
2376                 struct ptlrpc_request *req = mdt_info_req(info);
2377                 if (flv->it_flags & MUTABOR &&
2378                     req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
2379                         rc = -EROFS;
2380         }
2381         if (rc == 0 && flv->it_act != NULL) {
2382                 /* execute policy */
2383                 rc = flv->it_act(opc, info, lockp, flags);
2384         } else
2385                 rc = -EOPNOTSUPP;
2386         RETURN(rc);
2387 }
2388
2389 static int mdt_intent_policy(struct ldlm_namespace *ns,
2390                              struct ldlm_lock **lockp, void *req_cookie,
2391                              ldlm_mode_t mode, int flags, void *data)
2392 {
2393         struct mdt_thread_info *info;
2394         struct ptlrpc_request  *req  =  req_cookie;
2395         struct ldlm_intent     *it;
2396         struct req_capsule     *pill;
2397         struct ldlm_lock       *lock = *lockp;
2398         int rc;
2399
2400         ENTRY;
2401
2402         LASSERT(req != NULL);
2403
2404         info = lu_context_key_get(&req->rq_svc_thread->t_env->le_ctx,
2405                                   &mdt_thread_key);
2406         LASSERT(info != NULL);
2407         pill = &info->mti_pill;
2408         LASSERT(pill->rc_req == req);
2409
2410         if (req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) {
2411                 req_capsule_extend(pill, &RQF_LDLM_INTENT);
2412                 it = req_capsule_client_get(pill, &RMF_LDLM_INTENT);
2413                 if (it != NULL) {
2414                         LDLM_DEBUG(lock, "intent policy opc: %s\n",
2415                                    ldlm_it2str(it->opc));
2416
2417                         rc = mdt_intent_opc(it->opc, info, lockp, flags);
2418                         if (rc == 0)
2419                                 rc = ELDLM_OK;
2420                 } else
2421                         rc = err_serious(-EFAULT);
2422         } else {
2423                 /* No intent was provided */
2424                 LASSERT(pill->rc_fmt == &RQF_LDLM_ENQUEUE);
2425                 rc = req_capsule_pack(pill);
2426                 if (rc)
2427                         rc = err_serious(rc);
2428         }
2429         RETURN(rc);
2430 }
2431
2432 /*
2433  * Seq wrappers
2434  */
2435 static int mdt_seq_fini(const struct lu_env *env,
2436                         struct mdt_device *m)
2437 {
2438         struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
2439         ENTRY;
2440
2441         if (ls && ls->ls_server_seq) {
2442                 seq_server_fini(ls->ls_server_seq, env);
2443                 OBD_FREE_PTR(ls->ls_server_seq);
2444                 ls->ls_server_seq = NULL;
2445         }
2446
2447         if (ls && ls->ls_control_seq) {
2448                 seq_server_fini(ls->ls_control_seq, env);
2449                 OBD_FREE_PTR(ls->ls_control_seq);
2450                 ls->ls_control_seq = NULL;
2451         }
2452
2453         if (ls && ls->ls_client_seq) {
2454                 seq_client_fini(ls->ls_client_seq);
2455                 OBD_FREE_PTR(ls->ls_client_seq);
2456                 ls->ls_client_seq = NULL;
2457         }
2458
2459         RETURN(0);
2460 }
2461
2462 static int mdt_seq_init(const struct lu_env *env,
2463                         const char *uuid,
2464                         struct mdt_device *m)
2465 {
2466         struct lu_site *ls;
2467         char *prefix;
2468         int rc;
2469         ENTRY;
2470
2471         ls = m->mdt_md_dev.md_lu_dev.ld_site;
2472
2473         /*
2474          * This is sequence-controller node. Init seq-controller server on local
2475          * MDT.
2476          */
2477         if (ls->ls_node_id == 0) {
2478                 LASSERT(ls->ls_control_seq == NULL);
2479
2480                 OBD_ALLOC_PTR(ls->ls_control_seq);
2481                 if (ls->ls_control_seq == NULL)
2482                         RETURN(-ENOMEM);
2483
2484                 rc = seq_server_init(ls->ls_control_seq,
2485                                      m->mdt_bottom, uuid,
2486                                      LUSTRE_SEQ_CONTROLLER,
2487                                      env);
2488
2489                 if (rc)
2490                         GOTO(out_seq_fini, rc);
2491
2492                 OBD_ALLOC_PTR(ls->ls_client_seq);
2493                 if (ls->ls_client_seq == NULL)
2494                         GOTO(out_seq_fini, rc = -ENOMEM);
2495
2496                 OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
2497                 if (prefix == NULL) {
2498                         OBD_FREE_PTR(ls->ls_client_seq);
2499                         GOTO(out_seq_fini, rc = -ENOMEM);
2500                 }
2501
2502                 snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s",
2503                          uuid);
2504
2505                 /*
2506                  * Init seq-controller client after seq-controller server is
2507                  * ready. Pass ls->ls_control_seq to it for direct talking.
2508                  */
2509                 rc = seq_client_init(ls->ls_client_seq, NULL,
2510                                      LUSTRE_SEQ_METADATA, prefix,
2511                                      ls->ls_control_seq);
2512                 OBD_FREE(prefix, MAX_OBD_NAME + 5);
2513
2514                 if (rc)
2515                         GOTO(out_seq_fini, rc);
2516         }
2517
2518         /* Init seq-server on local MDT */
2519         LASSERT(ls->ls_server_seq == NULL);
2520
2521         OBD_ALLOC_PTR(ls->ls_server_seq);
2522         if (ls->ls_server_seq == NULL)
2523                 GOTO(out_seq_fini, rc = -ENOMEM);
2524
2525         rc = seq_server_init(ls->ls_server_seq,
2526                              m->mdt_bottom, uuid,
2527                              LUSTRE_SEQ_SERVER,
2528                              env);
2529         if (rc)
2530                 GOTO(out_seq_fini, rc = -ENOMEM);
2531
2532         /* Assign seq-controller client to local seq-server. */
2533         if (ls->ls_node_id == 0) {
2534                 LASSERT(ls->ls_client_seq != NULL);
2535
2536                 rc = seq_server_set_cli(ls->ls_server_seq,
2537                                         ls->ls_client_seq,
2538                                         env);
2539         }
2540
2541         EXIT;
2542 out_seq_fini:
2543         if (rc)
2544                 mdt_seq_fini(env, m);
2545
2546         return rc;
2547 }
2548
2549 static int mdt_md_connect(const struct lu_env *env,
2550                           struct lustre_handle *conn,
2551                           struct obd_device *mdc)
2552 {
2553         struct obd_connect_data *ocd;
2554         int rc;
2555
2556         OBD_ALLOC_PTR(ocd);
2557         if (!ocd)
2558                 RETURN(-ENOMEM);
2559         /* The connection between MDS must be local */
2560         ocd->ocd_connect_flags |= OBD_CONNECT_LCL_CLIENT;
2561         rc = obd_connect(env, conn, mdc, &mdc->obd_uuid, ocd);
2562
2563         OBD_FREE_PTR(ocd);
2564
2565         RETURN(rc);
2566 }
2567
2568 /*
2569  * Init client sequence manager which is used by local MDS to talk to sequence
2570  * controller on remote node.
2571  */
2572 static int mdt_seq_init_cli(const struct lu_env *env,
2573                             struct mdt_device *m,
2574                             struct lustre_cfg *cfg)
2575 {
2576         struct lu_site    *ls = m->mdt_md_dev.md_lu_dev.ld_site;
2577         struct obd_device *mdc;
2578         struct obd_uuid   *uuidp, *mdcuuidp;
2579         char              *uuid_str, *mdc_uuid_str;
2580         int                rc;
2581         int                index;
2582         struct mdt_thread_info *info;
2583         char *p, *index_string = lustre_cfg_string(cfg, 2);
2584         ENTRY;
2585
2586         info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
2587         uuidp = &info->mti_u.uuid[0];
2588         mdcuuidp = &info->mti_u.uuid[1];
2589
2590         LASSERT(index_string);
2591
2592         index = simple_strtol(index_string, &p, 10);
2593         if (*p) {
2594                 CERROR("Invalid index in lustre_cgf, offset 2\n");
2595                 RETURN(-EINVAL);
2596         }
2597
2598         /* check if this is adding the first MDC and controller is not yet
2599          * initialized. */
2600         if (index != 0 || ls->ls_client_seq)
2601                 RETURN(0);
2602
2603         uuid_str = lustre_cfg_string(cfg, 1);
2604         mdc_uuid_str = lustre_cfg_string(cfg, 4);
2605         obd_str2uuid(uuidp, uuid_str);
2606         obd_str2uuid(mdcuuidp, mdc_uuid_str);
2607
2608         mdc = class_find_client_obd(uuidp, LUSTRE_MDC_NAME, mdcuuidp);
2609         if (!mdc) {
2610                 CERROR("can't find controller MDC by uuid %s\n",
2611                        uuid_str);
2612                 rc = -ENOENT;
2613         } else if (!mdc->obd_set_up) {
2614                 CERROR("target %s not set up\n", mdc->obd_name);
2615                 rc = -EINVAL;
2616         } else {
2617                 struct lustre_handle conn = {0, };
2618
2619                 CDEBUG(D_CONFIG, "connect to controller %s(%s)\n",
2620                        mdc->obd_name, mdc->obd_uuid.uuid);
2621
2622                 rc = mdt_md_connect(env, &conn, mdc);
2623                 if (rc) {
2624                         CERROR("target %s connect error %d\n",
2625                                mdc->obd_name, rc);
2626                 } else {
2627                         ls->ls_control_exp = class_conn2export(&conn);
2628
2629                         OBD_ALLOC_PTR(ls->ls_client_seq);
2630
2631                         if (ls->ls_client_seq != NULL) {
2632                                 char *prefix;
2633
2634                                 OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
2635                                 if (!prefix)
2636                                         RETURN(-ENOMEM);
2637
2638                                 snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s",
2639                                          mdc->obd_name);
2640
2641                                 rc = seq_client_init(ls->ls_client_seq,
2642                                                      ls->ls_control_exp,
2643                                                      LUSTRE_SEQ_METADATA,
2644                                                      prefix, NULL);
2645                                 OBD_FREE(prefix, MAX_OBD_NAME + 5);
2646                         } else
2647                                 rc = -ENOMEM;
2648
2649                         if (rc)
2650                                 RETURN(rc);
2651
2652                         LASSERT(ls->ls_server_seq != NULL);
2653
2654                         rc = seq_server_set_cli(ls->ls_server_seq,
2655                                                 ls->ls_client_seq,
2656                                                 env);
2657                 }
2658         }
2659
2660         RETURN(rc);
2661 }
2662
2663 static void mdt_seq_fini_cli(struct mdt_device *m)
2664 {
2665         struct lu_site *ls;
2666         int rc;
2667
2668         ENTRY;
2669
2670         ls = m->mdt_md_dev.md_lu_dev.ld_site;
2671
2672         if (ls && ls->ls_server_seq)
2673                 seq_server_set_cli(ls->ls_server_seq,
2674                                    NULL, NULL);
2675
2676         if (ls && ls->ls_control_exp) {
2677                 rc = obd_disconnect(ls->ls_control_exp);
2678                 if (rc) {
2679                         CERROR("failure to disconnect "
2680                                "obd: %d\n", rc);
2681                 }
2682                 ls->ls_control_exp = NULL;
2683         }
2684         EXIT;
2685 }
2686
2687 /*
2688  * FLD wrappers
2689  */
2690 static int mdt_fld_fini(const struct lu_env *env,
2691                         struct mdt_device *m)
2692 {
2693         struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
2694         ENTRY;
2695
2696         if (ls && ls->ls_server_fld) {
2697                 fld_server_fini(ls->ls_server_fld, env);
2698                 OBD_FREE_PTR(ls->ls_server_fld);
2699                 ls->ls_server_fld = NULL;
2700         }
2701
2702         if (ls && ls->ls_client_fld != NULL) {
2703                 fld_client_fini(ls->ls_client_fld);
2704                 OBD_FREE_PTR(ls->ls_client_fld);
2705                 ls->ls_client_fld = NULL;
2706         }
2707
2708         RETURN(0);
2709 }
2710
2711 static int mdt_fld_init(const struct lu_env *env,
2712                         const char *uuid,
2713                         struct mdt_device *m)
2714 {
2715         struct lu_fld_target target;
2716         struct lu_site *ls;
2717         int rc;
2718         ENTRY;
2719
2720         ls = m->mdt_md_dev.md_lu_dev.ld_site;
2721
2722         OBD_ALLOC_PTR(ls->ls_server_fld);
2723         if (ls->ls_server_fld == NULL)
2724                 RETURN(rc = -ENOMEM);
2725
2726         rc = fld_server_init(ls->ls_server_fld,
2727                              m->mdt_bottom, uuid, env);
2728         if (rc) {
2729                 OBD_FREE_PTR(ls->ls_server_fld);
2730                 ls->ls_server_fld = NULL;
2731         }
2732
2733         OBD_ALLOC_PTR(ls->ls_client_fld);
2734         if (!ls->ls_client_fld)
2735                 GOTO(out_fld_fini, rc = -ENOMEM);
2736
2737         rc = fld_client_init(ls->ls_client_fld, uuid,
2738                              LUSTRE_CLI_FLD_HASH_DHT);
2739         if (rc) {
2740                 CERROR("can't init FLD, err %d\n",  rc);
2741                 OBD_FREE_PTR(ls->ls_client_fld);
2742                 GOTO(out_fld_fini, rc);
2743         }
2744
2745         target.ft_srv = ls->ls_server_fld;
2746         target.ft_idx = ls->ls_node_id;
2747         target.ft_exp = NULL;
2748
2749         fld_client_add_target(ls->ls_client_fld, &target);
2750         EXIT;
2751 out_fld_fini:
2752         if (rc)
2753                 mdt_fld_fini(env, m);
2754         return rc;
2755 }
2756
2757 /* device init/fini methods */
2758 static void mdt_stop_ptlrpc_service(struct mdt_device *m)
2759 {
2760         if (m->mdt_regular_service != NULL) {
2761                 ptlrpc_unregister_service(m->mdt_regular_service);
2762                 m->mdt_regular_service = NULL;
2763         }
2764         if (m->mdt_readpage_service != NULL) {
2765                 ptlrpc_unregister_service(m->mdt_readpage_service);
2766                 m->mdt_readpage_service = NULL;
2767         }
2768         if (m->mdt_setattr_service != NULL) {
2769                 ptlrpc_unregister_service(m->mdt_setattr_service);
2770                 m->mdt_setattr_service = NULL;
2771         }
2772         if (m->mdt_mdsc_service != NULL) {
2773                 ptlrpc_unregister_service(m->mdt_mdsc_service);
2774                 m->mdt_mdsc_service = NULL;
2775         }
2776         if (m->mdt_mdss_service != NULL) {
2777                 ptlrpc_unregister_service(m->mdt_mdss_service);
2778                 m->mdt_mdss_service = NULL;
2779         }
2780         if (m->mdt_dtss_service != NULL) {
2781                 ptlrpc_unregister_service(m->mdt_dtss_service);
2782                 m->mdt_dtss_service = NULL;
2783         }
2784         if (m->mdt_fld_service != NULL) {
2785                 ptlrpc_unregister_service(m->mdt_fld_service);
2786                 m->mdt_fld_service = NULL;
2787         }
2788 }
2789
2790 static int mdt_start_ptlrpc_service(struct mdt_device *m)
2791 {
2792         int rc;
2793         static struct ptlrpc_service_conf conf;
2794         ENTRY;
2795
2796         conf = (typeof(conf)) {
2797                 .psc_nbufs            = MDS_NBUFS,
2798                 .psc_bufsize          = MDS_BUFSIZE,
2799                 .psc_max_req_size     = MDS_MAXREQSIZE,
2800                 .psc_max_reply_size   = MDS_MAXREPSIZE,
2801                 .psc_req_portal       = MDS_REQUEST_PORTAL,
2802                 .psc_rep_portal       = MDC_REPLY_PORTAL,
2803                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2804                 /*
2805                  * We'd like to have a mechanism to set this on a per-device
2806                  * basis, but alas...
2807                  */
2808                 .psc_num_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
2809                                        MDT_MAX_THREADS),
2810                 .psc_ctx_tags      = LCT_MD_THREAD
2811         };
2812
2813         m->mdt_ldlm_client = &m->mdt_md_dev.md_lu_dev.ld_obd->obd_ldlm_client;
2814         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
2815                            "mdt_ldlm_client", m->mdt_ldlm_client);
2816
2817         m->mdt_regular_service =
2818                 ptlrpc_init_svc_conf(&conf, mdt_regular_handle, LUSTRE_MDT_NAME,
2819                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2820                                      NULL);
2821         if (m->mdt_regular_service == NULL)
2822                 RETURN(-ENOMEM);
2823
2824         rc = ptlrpc_start_threads(NULL, m->mdt_regular_service, LUSTRE_MDT_NAME);
2825         if (rc)
2826                 GOTO(err_mdt_svc, rc);
2827
2828         /*
2829          * readpage service configuration. Parameters have to be adjusted,
2830          * ideally.
2831          */
2832         conf = (typeof(conf)) {
2833                 .psc_nbufs            = MDS_NBUFS,
2834                 .psc_bufsize          = MDS_BUFSIZE,
2835                 .psc_max_req_size     = MDS_MAXREQSIZE,
2836                 .psc_max_reply_size   = MDS_MAXREPSIZE,
2837                 .psc_req_portal       = MDS_READPAGE_PORTAL,
2838                 .psc_rep_portal       = MDC_REPLY_PORTAL,
2839                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2840                 .psc_num_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
2841                                        MDT_MAX_THREADS),
2842                 .psc_ctx_tags      = LCT_MD_THREAD
2843         };
2844         m->mdt_readpage_service =
2845                 ptlrpc_init_svc_conf(&conf, mdt_readpage_handle,
2846                                      LUSTRE_MDT_NAME "_readpage",
2847                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2848                                      NULL);
2849
2850         if (m->mdt_readpage_service == NULL) {
2851                 CERROR("failed to start readpage service\n");
2852                 GOTO(err_mdt_svc, rc = -ENOMEM);
2853         }
2854
2855         rc = ptlrpc_start_threads(NULL, m->mdt_readpage_service, "mdt_rdpg");
2856
2857         /*
2858          * setattr service configuration.
2859          */
2860         conf = (typeof(conf)) {
2861                 .psc_nbufs            = MDS_NBUFS,
2862                 .psc_bufsize          = MDS_BUFSIZE,
2863                 .psc_max_req_size     = MDS_MAXREQSIZE,
2864                 .psc_max_reply_size   = MDS_MAXREPSIZE,
2865                 .psc_req_portal       = MDS_SETATTR_PORTAL,
2866                 .psc_rep_portal       = MDC_REPLY_PORTAL,
2867                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2868                 .psc_num_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
2869                                        MDT_MAX_THREADS),
2870                 .psc_ctx_tags      = LCT_MD_THREAD
2871         };
2872
2873         m->mdt_setattr_service =
2874                 ptlrpc_init_svc_conf(&conf, mdt_regular_handle,
2875                                      LUSTRE_MDT_NAME "_setattr",
2876                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2877                                      NULL);
2878
2879         if (!m->mdt_setattr_service) {
2880                 CERROR("failed to start setattr service\n");
2881                 GOTO(err_mdt_svc, rc = -ENOMEM);
2882         }
2883
2884         rc = ptlrpc_start_threads(NULL, m->mdt_setattr_service, "mdt_attr");
2885         if (rc)
2886                 GOTO(err_mdt_svc, rc);
2887
2888         /*
2889          * sequence controller service configuration
2890          */
2891         conf = (typeof(conf)) {
2892                 .psc_nbufs = MDS_NBUFS,
2893                 .psc_bufsize = MDS_BUFSIZE,
2894                 .psc_max_req_size = SEQ_MAXREQSIZE,
2895                 .psc_max_reply_size = SEQ_MAXREPSIZE,
2896                 .psc_req_portal = SEQ_CONTROLLER_PORTAL,
2897                 .psc_rep_portal = MDC_REPLY_PORTAL,
2898                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2899                 .psc_num_threads = SEQ_NUM_THREADS,
2900                 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2901         };
2902
2903         m->mdt_mdsc_service =
2904                 ptlrpc_init_svc_conf(&conf, mdt_mdsc_handle,
2905                                      LUSTRE_MDT_NAME"_mdsc",
2906                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2907                                      NULL);
2908         if (!m->mdt_mdsc_service) {
2909                 CERROR("failed to start seq controller service\n");
2910                 GOTO(err_mdt_svc, rc = -ENOMEM);
2911         }
2912
2913         rc = ptlrpc_start_threads(NULL, m->mdt_mdsc_service, "mdt_mdsc");
2914         if (rc)
2915                 GOTO(err_mdt_svc, rc);
2916
2917         /*
2918          * metadata sequence server service configuration
2919          */
2920         conf = (typeof(conf)) {
2921                 .psc_nbufs = MDS_NBUFS,
2922                 .psc_bufsize = MDS_BUFSIZE,
2923                 .psc_max_req_size = SEQ_MAXREQSIZE,
2924                 .psc_max_reply_size = SEQ_MAXREPSIZE,
2925                 .psc_req_portal = SEQ_METADATA_PORTAL,
2926                 .psc_rep_portal = MDC_REPLY_PORTAL,
2927                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2928                 .psc_num_threads = SEQ_NUM_THREADS,
2929                 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2930         };
2931
2932         m->mdt_mdss_service =
2933                 ptlrpc_init_svc_conf(&conf, mdt_mdss_handle,
2934                                      LUSTRE_MDT_NAME"_mdss",
2935                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2936                                      NULL);
2937         if (!m->mdt_mdss_service) {
2938                 CERROR("failed to start metadata seq server service\n");
2939                 GOTO(err_mdt_svc, rc = -ENOMEM);
2940         }
2941
2942         rc = ptlrpc_start_threads(NULL, m->mdt_mdss_service, "mdt_mdss");
2943         if (rc)
2944                 GOTO(err_mdt_svc, rc);
2945
2946
2947         /*
2948          * Data sequence server service configuration. We want to have really
2949          * cluster-wide sequences space. This is why we start only one sequence
2950          * controller which manages space.
2951          */
2952         conf = (typeof(conf)) {
2953                 .psc_nbufs = MDS_NBUFS,
2954                 .psc_bufsize = MDS_BUFSIZE,
2955                 .psc_max_req_size = SEQ_MAXREQSIZE,
2956                 .psc_max_reply_size = SEQ_MAXREPSIZE,
2957                 .psc_req_portal = SEQ_DATA_PORTAL,
2958                 .psc_rep_portal = OSC_REPLY_PORTAL,
2959                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2960                 .psc_num_threads = SEQ_NUM_THREADS,
2961                 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
2962         };
2963
2964         m->mdt_dtss_service =
2965                 ptlrpc_init_svc_conf(&conf, mdt_dtss_handle,
2966                                      LUSTRE_MDT_NAME"_dtss",
2967                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2968                                      NULL);
2969         if (!m->mdt_dtss_service) {
2970                 CERROR("failed to start data seq server service\n");
2971                 GOTO(err_mdt_svc, rc = -ENOMEM);
2972         }
2973
2974         rc = ptlrpc_start_threads(NULL, m->mdt_dtss_service, "mdt_dtss");
2975         if (rc)
2976                 GOTO(err_mdt_svc, rc);
2977
2978         /* FLD service start */
2979         conf = (typeof(conf)) {
2980                 .psc_nbufs            = MDS_NBUFS,
2981                 .psc_bufsize          = MDS_BUFSIZE,
2982                 .psc_max_req_size     = FLD_MAXREQSIZE,
2983                 .psc_max_reply_size   = FLD_MAXREPSIZE,
2984                 .psc_req_portal       = FLD_REQUEST_PORTAL,
2985                 .psc_rep_portal       = MDC_REPLY_PORTAL,
2986                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
2987                 .psc_num_threads      = FLD_NUM_THREADS,
2988                 .psc_ctx_tags         = LCT_DT_THREAD|LCT_MD_THREAD
2989         };
2990
2991         m->mdt_fld_service =
2992                 ptlrpc_init_svc_conf(&conf, mdt_fld_handle,
2993                                      LUSTRE_MDT_NAME"_fld",
2994                                      m->mdt_md_dev.md_lu_dev.ld_proc_entry,
2995                                      NULL);
2996         if (!m->mdt_fld_service) {
2997                 CERROR("failed to start fld service\n");
2998                 GOTO(err_mdt_svc, rc = -ENOMEM);
2999         }
3000
3001         rc = ptlrpc_start_threads(NULL, m->mdt_fld_service, "mdt_fld");
3002         if (rc)
3003                 GOTO(err_mdt_svc, rc);
3004
3005         EXIT;
3006 err_mdt_svc:
3007         if (rc)
3008                 mdt_stop_ptlrpc_service(m);
3009
3010         return rc;
3011 }
3012
3013 static void mdt_stack_fini(const struct lu_env *env,
3014                            struct mdt_device *m, struct lu_device *top)
3015 {
3016         struct lu_device        *d = top, *n;
3017         struct lustre_cfg_bufs  *bufs;
3018         struct lustre_cfg       *lcfg;
3019         struct mdt_thread_info  *info;
3020         ENTRY;
3021
3022         info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
3023         LASSERT(info != NULL);
3024
3025         bufs = &info->mti_u.bufs;
3026         /* process cleanup */
3027         lustre_cfg_bufs_reset(bufs, NULL);
3028         lcfg = lustre_cfg_new(LCFG_CLEANUP, bufs);
3029         if (!lcfg) {
3030                 CERROR("Cannot alloc lcfg!\n");
3031                 return;
3032         }
3033         LASSERT(top);
3034         top->ld_ops->ldo_process_config(env, top, lcfg);
3035         lustre_cfg_free(lcfg);
3036
3037         lu_site_purge(env, top->ld_site, ~0);
3038         while (d != NULL) {
3039                 struct obd_type *type;
3040                 struct lu_device_type *ldt = d->ld_type;
3041
3042                 /* each fini() returns next device in stack of layers
3043                  * * so we can avoid the recursion */
3044                 n = ldt->ldt_ops->ldto_device_fini(env, d);
3045                 lu_device_put(d);
3046                 ldt->ldt_ops->ldto_device_free(env, d);
3047                 type = ldt->ldt_obd_type;
3048                 type->typ_refcnt--;
3049                 class_put_type(type);
3050
3051                 /* switch to the next device in the layer */
3052                 d = n;
3053         }
3054         m->mdt_child = NULL;
3055 }
3056
3057 static struct lu_device *mdt_layer_setup(const struct lu_env *env,
3058                                          const char *typename,
3059                                          struct lu_device *child,
3060                                          struct lustre_cfg *cfg)
3061 {
3062         struct obd_type       *type;
3063         struct lu_device_type *ldt;
3064         struct lu_device      *d;
3065         int rc;
3066         ENTRY;
3067
3068         /* find the type */
3069         type = class_get_type(typename);
3070         if (!type) {
3071                 CERROR("Unknown type: '%s'\n", typename);
3072                 GOTO(out, rc = -ENODEV);
3073         }
3074
3075         rc = lu_context_refill(&env->le_ctx);
3076         if (rc != 0) {
3077                 CERROR("Failure to refill context: '%d'\n", rc);
3078                 GOTO(out_type, rc);
3079         }
3080
3081         if (env->le_ses != NULL) {
3082                 rc = lu_context_refill(env->le_ses);
3083                 if (rc != 0) {
3084                         CERROR("Failure to refill session: '%d'\n", rc);
3085                         GOTO(out_type, rc);
3086                 }
3087         }
3088
3089         ldt = type->typ_lu;
3090         if (ldt == NULL) {
3091                 CERROR("type: '%s'\n", typename);
3092                 GOTO(out_type, rc = -EINVAL);
3093         }
3094
3095         ldt->ldt_obd_type = type;
3096         d = ldt->ldt_ops->ldto_device_alloc(env, ldt, cfg);
3097         if (IS_ERR(d)) {
3098                 CERROR("Cannot allocate device: '%s'\n", typename);
3099                 GOTO(out_type, rc = -ENODEV);
3100         }
3101
3102         LASSERT(child->ld_site);
3103         d->ld_site = child->ld_site;
3104
3105         type->typ_refcnt++;
3106         rc = ldt->ldt_ops->ldto_device_init(env, d, child);
3107         if (rc) {
3108                 CERROR("can't init device '%s', rc %d\n", typename, rc);
3109                 GOTO(out_alloc, rc);
3110         }
3111         lu_device_get(d);
3112
3113         RETURN(d);
3114
3115 out_alloc:
3116         ldt->ldt_ops->ldto_device_free(env, d);
3117         type->typ_refcnt--;
3118 out_type:
3119         class_put_type(type);
3120 out:
3121         return ERR_PTR(rc);
3122 }
3123
3124 static int mdt_stack_init(const struct lu_env *env,
3125                           struct mdt_device *m, struct lustre_cfg *cfg)
3126 {
3127         struct lu_device  *d = &m->mdt_md_dev.md_lu_dev;
3128         struct lu_device  *tmp;
3129         struct md_device  *md;
3130         int rc;
3131         ENTRY;
3132
3133         /* init the stack */
3134         tmp = mdt_layer_setup(env, LUSTRE_OSD_NAME, d, cfg);
3135         if (IS_ERR(tmp)) {
3136                 RETURN(PTR_ERR(tmp));
3137         }
3138         m->mdt_bottom = lu2dt_dev(tmp);
3139         d = tmp;
3140         tmp = mdt_layer_setup(env, LUSTRE_MDD_NAME, d, cfg);
3141         if (IS_ERR(tmp)) {
3142                 GOTO(out, rc = PTR_ERR(tmp));
3143         }
3144         d = tmp;
3145         md = lu2md_dev(d);
3146
3147         tmp = mdt_layer_setup(env, LUSTRE_CMM_NAME, d, cfg);
3148         if (IS_ERR(tmp)) {
3149                 GOTO(out, rc = PTR_ERR(tmp));
3150         }
3151         d = tmp;
3152         /*set mdd upcall device*/
3153         md->md_upcall.mu_upcall_dev = lu2md_dev(d);
3154
3155         md = lu2md_dev(d);
3156         /*set cmm upcall device*/
3157         md->md_upcall.mu_upcall_dev = &m->mdt_md_dev;
3158
3159         m->mdt_child = lu2md_dev(d);
3160
3161         /* process setup config */
3162         tmp = &m->mdt_md_dev.md_lu_dev;
3163         rc = tmp->ld_ops->ldo_process_config(env, tmp, cfg);
3164         GOTO(out, rc);
3165 out:
3166         /* fini from last known good lu_device */
3167         if (rc)
3168                 mdt_stack_fini(env, m, d);
3169
3170         return rc;
3171 }
3172
3173 static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
3174 {
3175         struct md_device *next = m->mdt_child;
3176         struct lu_device  *d = &m->mdt_md_dev.md_lu_dev;
3177         struct lu_site    *ls = d->ld_site;
3178
3179         ENTRY;
3180
3181         mdt_fs_cleanup(env, m);
3182
3183         ping_evictor_stop();
3184         mdt_stop_ptlrpc_service(m);
3185
3186         cleanup_capas(CAPA_SITE_SERVER);
3187         del_timer(&m->mdt_ck_timer);
3188         mdt_ck_thread_stop(m);
3189
3190         upcall_cache_cleanup(m->mdt_rmtacl_cache);
3191         m->mdt_rmtacl_cache = NULL;
3192
3193         upcall_cache_cleanup(m->mdt_identity_cache);
3194         m->mdt_identity_cache = NULL;
3195
3196         if (m->mdt_namespace != NULL) {
3197                 ldlm_namespace_free(m->mdt_namespace, 0);
3198                 d->ld_obd->obd_namespace = m->mdt_namespace = NULL;
3199         }
3200
3201         mdt_seq_fini(env, m);
3202         mdt_seq_fini_cli(m);
3203         mdt_fld_fini(env, m);
3204
3205         if (m->mdt_rootsquash_info) {
3206                 OBD_FREE_PTR(m->mdt_rootsquash_info);
3207                 m->mdt_rootsquash_info = NULL;
3208         }
3209
3210         next->md_ops->mdo_init_capa_ctxt(env, next, 0, 0, 0, NULL);
3211         cleanup_capas(CAPA_SITE_SERVER);
3212         del_timer(&m->mdt_ck_timer);
3213         mdt_ck_thread_stop(m);
3214
3215         /* finish the stack */
3216         mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
3217
3218         if (ls) {
3219                 lu_site_fini(ls);
3220                 OBD_FREE_PTR(ls);
3221                 d->ld_site = NULL;
3222         }
3223         LASSERT(atomic_read(&d->ld_ref) == 0);
3224         md_device_fini(&m->mdt_md_dev);
3225
3226         EXIT;
3227 }
3228
3229 static void fsoptions_to_mdt_flags(struct mdt_device *m, char *options)
3230 {
3231         char *p = options;
3232
3233         if (!options)
3234                 return;
3235
3236         while (*options) {
3237                 int len;
3238
3239                 while (*p && *p != ',')
3240                         p++;
3241
3242                 len = p - options;
3243                 if ((len == sizeof("user_xattr") - 1) &&
3244                     (memcmp(options, "user_xattr", len) == 0)) {
3245                         m->mdt_opts.mo_user_xattr = 1;
3246                         LCONSOLE_INFO("Enabling user_xattr\n");
3247                 } else if ((len == sizeof("nouser_xattr") - 1) &&
3248                            (memcmp(options, "nouser_xattr", len) == 0)) {
3249                         m->mdt_opts.mo_user_xattr = 0;
3250                         LCONSOLE_INFO("Disabling user_xattr\n");
3251                 } else if ((len == sizeof("acl") - 1) &&
3252                            (memcmp(options, "acl", len) == 0)) {
3253 #ifdef CONFIG_FS_POSIX_ACL
3254                         m->mdt_opts.mo_acl = 1;
3255                         LCONSOLE_INFO("Enabling ACL\n");
3256 #else
3257                         m->mdt_opts.mo_acl = 0;
3258                         CWARN("ignoring unsupported acl mount option\n");
3259                         LCONSOLE_INFO("Disabling ACL\n");
3260 #endif
3261                 } else if ((len == sizeof("noacl") - 1) &&
3262                            (memcmp(options, "noacl", len) == 0)) {
3263 #ifdef CONFIG_FS_POSIX_ACL
3264                         m->mdt_opts.mo_acl = 0;
3265                         LCONSOLE_INFO("Disabling ACL\n");
3266 #endif
3267                 }
3268
3269                 options = ++p;
3270         }
3271 }
3272
3273 int mdt_postrecov(const struct lu_env *, struct mdt_device *);
3274
3275 static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
3276                      struct lu_device_type *ldt, struct lustre_cfg *cfg)
3277 {
3278         struct lprocfs_static_vars lvars;
3279         struct mdt_thread_info    *info;
3280         struct obd_device         *obd;
3281         const char                *dev = lustre_cfg_string(cfg, 0);
3282         const char                *num = lustre_cfg_string(cfg, 2);
3283         struct lustre_mount_info  *lmi;
3284         struct lustre_sb_info     *lsi;
3285         struct lu_site            *s;
3286         int                        rc;
3287         ENTRY;
3288
3289         info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
3290         LASSERT(info != NULL);
3291
3292         obd = class_name2obd(dev);
3293         LASSERT(obd);
3294
3295         spin_lock_init(&m->mdt_transno_lock);
3296
3297         m->mdt_max_mdsize = MAX_MD_SIZE;
3298         m->mdt_max_cookiesize = sizeof(struct llog_cookie);
3299
3300         m->mdt_opts.mo_user_xattr = 0;
3301         m->mdt_opts.mo_acl = 0;
3302         lmi = server_get_mount_2(dev);
3303         if (lmi == NULL) {
3304                 CERROR("Cannot get mount info for %s! "
3305                        "set mdt_opts by default!\n", dev);
3306         } else {
3307                 lsi = s2lsi(lmi->lmi_sb);
3308                 fsoptions_to_mdt_flags(m, lsi->lsi_lmd->lmd_opts);
3309                 server_put_mount_2(dev, lmi->lmi_mnt);
3310         }
3311
3312         spin_lock_init(&m->mdt_ioepoch_lock);
3313         m->mdt_opts.mo_compat_resname = 0;
3314         m->mdt_capa_timeout = 320; //CAPA_TIMEOUT;
3315         m->mdt_capa_alg = CAPA_HMAC_ALG_SHA1;
3316         m->mdt_ck_timeout = CAPA_KEY_TIMEOUT;
3317         obd->obd_replayable = 1;
3318         spin_lock_init(&m->mdt_client_bitmap_lock);
3319
3320         OBD_ALLOC_PTR(s);
3321         if (s == NULL)
3322                 RETURN(-ENOMEM);
3323
3324         md_device_init(&m->mdt_md_dev, ldt);
3325         m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
3326         m->mdt_md_dev.md_lu_dev.ld_obd = obd;
3327         /* set this lu_device to obd, because error handling need it */
3328         obd->obd_lu_dev = &m->mdt_md_dev.md_lu_dev;
3329
3330         rc = lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
3331         if (rc) {
3332                 CERROR("can't init lu_site, rc %d\n", rc);
3333                 GOTO(err_free_site, rc);
3334         }
3335
3336         lprocfs_init_vars(mdt, &lvars);
3337         rc = lprocfs_obd_setup(obd, lvars.obd_vars);
3338         if (rc) {
3339                 CERROR("can't init lprocfs, rc %d\n", rc);
3340                 GOTO(err_fini_site, rc);
3341         }
3342
3343         /* init the stack */
3344         rc = mdt_stack_init(env, m, cfg);
3345         if (rc) {
3346                 CERROR("can't init device stack, rc %d\n", rc);
3347                 GOTO(err_fini_site, rc);
3348         }
3349
3350         /* set server index */
3351         LASSERT(num);
3352         s->ls_node_id = simple_strtol(num, NULL, 10);
3353
3354         rc = mdt_fld_init(env, obd->obd_name, m);
3355         if (rc)
3356                 GOTO(err_fini_stack, rc);
3357
3358         rc = mdt_seq_init(env, obd->obd_name, m);
3359         if (rc)
3360                 GOTO(err_fini_fld, rc);
3361
3362         snprintf(info->mti_u.ns_name, sizeof info->mti_u.ns_name,
3363                  LUSTRE_MDT_NAME"-%p", m);
3364         m->mdt_namespace = ldlm_namespace_new(info->mti_u.ns_name,
3365                                               LDLM_NAMESPACE_SERVER);
3366         if (m->mdt_namespace == NULL)
3367                 GOTO(err_fini_seq, rc = -ENOMEM);
3368
3369         ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
3370         /* set obd_namespace for compatibility with old code */
3371         obd->obd_namespace = m->mdt_namespace;
3372
3373         m->mdt_identity_cache = upcall_cache_init(obd->obd_name,
3374                                                   "NONE",
3375                                                   &mdt_identity_upcall_cache_ops);
3376         if (IS_ERR(m->mdt_identity_cache)) {
3377                 rc = PTR_ERR(m->mdt_identity_cache);
3378                 m->mdt_identity_cache = NULL;
3379                 GOTO(err_free_ns, rc);
3380         }
3381
3382         m->mdt_rmtacl_cache = upcall_cache_init(obd->obd_name,
3383                                                 MDT_RMTACL_UPCALL_PATH,
3384                                                 &mdt_rmtacl_upcall_cache_ops);
3385         if (IS_ERR(m->mdt_rmtacl_cache)) {
3386                 rc = PTR_ERR(m->mdt_rmtacl_cache);
3387                 m->mdt_rmtacl_cache = NULL;
3388                 GOTO(err_free_ns, rc);
3389         }
3390
3391         m->mdt_ck_timer.function = mdt_ck_timer_callback;
3392         m->mdt_ck_timer.data = (unsigned long)m;
3393         init_timer(&m->mdt_ck_timer);
3394         rc = mdt_ck_thread_start(m);
3395         if (rc)
3396                 GOTO(err_free_ns, rc);
3397
3398         rc = mdt_start_ptlrpc_service(m);
3399         if (rc)
3400                 GOTO(err_capa, rc);
3401
3402         ping_evictor_start();
3403
3404         rc = mdt_fs_setup(env, m, obd);
3405         if (rc)
3406                 GOTO(err_stop_service, rc);
3407
3408         rc = lu_site_init_finish(s);
3409         if (rc)
3410                 GOTO(err_fs_cleanup, rc);
3411
3412         if (obd->obd_recovering == 0)
3413                 mdt_postrecov(env, m);
3414
3415         mdt_init_capa_ctxt(env, m);
3416         RETURN(0);
3417
3418 err_fs_cleanup:
3419         mdt_fs_cleanup(env, m);
3420 err_stop_service:
3421         mdt_stop_ptlrpc_service(m);
3422 err_capa:
3423         del_timer(&m->mdt_ck_timer);
3424         mdt_ck_thread_stop(m);
3425 err_free_ns:
3426         upcall_cache_cleanup(m->mdt_rmtacl_cache);
3427         m->mdt_rmtacl_cache = NULL;
3428         upcall_cache_cleanup(m->mdt_identity_cache);
3429         m->mdt_identity_cache = NULL;
3430         ldlm_namespace_free(m->mdt_namespace, 0);
3431         obd->obd_namespace = m->mdt_namespace = NULL;
3432 err_fini_seq:
3433         mdt_seq_fini(env, m);
3434 err_fini_fld:
3435         mdt_fld_fini(env, m);
3436 err_fini_stack:
3437         mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
3438 err_fini_site:
3439         lu_site_fini(s);
3440 err_free_site:
3441         OBD_FREE_PTR(s);
3442
3443         md_device_fini(&m->mdt_md_dev);
3444         return (rc);
3445 }
3446
3447 /* used by MGS to process specific configurations */
3448 static int mdt_process_config(const struct lu_env *env,
3449                               struct lu_device *d, struct lustre_cfg *cfg)
3450 {
3451         struct mdt_device *m = mdt_dev(d);
3452         struct md_device *md_next = m->mdt_child;
3453         struct lu_device *next = md2lu_dev(md_next);
3454         int rc = 0;
3455         ENTRY;
3456
3457         switch (cfg->lcfg_command) {
3458         case LCFG_PARAM: {
3459                 struct lprocfs_static_vars lvars;
3460                 struct obd_device *obd = d->ld_obd;
3461
3462                 lprocfs_init_vars(mdt, &lvars);
3463                 rc = class_process_proc_param(PARAM_MDT, lvars.obd_vars, cfg, obd);
3464                 if (rc)
3465                         /* others are passed further */
3466                         rc = next->ld_ops->ldo_process_config(env, next, cfg);
3467                 break;
3468         }
3469         case LCFG_ADD_MDC:
3470                 /*
3471                  * Add mdc hook to get first MDT uuid and connect it to
3472                  * ls->controller to use for seq manager.
3473                  */
3474                 rc = mdt_seq_init_cli(env, mdt_dev(d), cfg);
3475                 if (rc) {
3476                         CERROR("can't initialize controller export, "
3477                                "rc %d\n", rc);
3478                 }
3479         default:
3480                 /* others are passed further */
3481                 rc = next->ld_ops->ldo_process_config(env, next, cfg);
3482                 break;
3483         }
3484         RETURN(rc);
3485 }
3486
3487 static struct lu_object *mdt_object_alloc(const struct lu_env *env,
3488                                           const struct lu_object_header *hdr,
3489                                           struct lu_device *d)
3490 {
3491         struct mdt_object *mo;
3492
3493         ENTRY;
3494
3495         OBD_ALLOC_PTR(mo);
3496         if (mo != NULL) {
3497                 struct lu_object *o;
3498                 struct lu_object_header *h;
3499
3500                 o = &mo->mot_obj.mo_lu;
3501                 h = &mo->mot_header;
3502                 lu_object_header_init(h);
3503                 lu_object_init(o, h, d);
3504                 lu_object_add_top(h, o);
3505                 o->lo_ops = &mdt_obj_ops;
3506                 RETURN(o);
3507         } else
3508                 RETURN(NULL);
3509 }
3510
3511 static int mdt_object_init(const struct lu_env *env, struct lu_object *o)
3512 {
3513         struct mdt_device *d = mdt_dev(o->lo_dev);
3514         struct lu_device  *under;
3515         struct lu_object  *below;
3516         int                rc = 0;
3517         ENTRY;
3518
3519         CDEBUG(D_INFO, "object init, fid = "DFID"\n",
3520                PFID(lu_object_fid(o)));
3521
3522         under = &d->mdt_child->md_lu_dev;
3523         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
3524         if (below != NULL) {
3525                 lu_object_add(o, below);
3526         } else
3527                 rc = -ENOMEM;
3528
3529         RETURN(rc);
3530 }
3531
3532 static void mdt_object_free(const struct lu_env *env, struct lu_object *o)
3533 {
3534         struct mdt_object *mo = mdt_obj(o);
3535         struct lu_object_header *h;
3536         ENTRY;
3537
3538         h = o->lo_header;
3539         CDEBUG(D_INFO, "object free, fid = "DFID"\n",
3540                PFID(lu_object_fid(o)));
3541
3542         lu_object_fini(o);
3543         lu_object_header_fini(h);
3544         OBD_FREE_PTR(mo);
3545         EXIT;
3546 }
3547
3548 static int mdt_object_print(const struct lu_env *env, void *cookie,
3549                             lu_printer_t p, const struct lu_object *o)
3550 {
3551         return (*p)(env, cookie, LUSTRE_MDT_NAME"-object@%p", o);
3552 }
3553
3554 static struct lu_device_operations mdt_lu_ops = {
3555         .ldo_object_alloc   = mdt_object_alloc,
3556         .ldo_process_config = mdt_process_config
3557 };
3558
3559 static struct lu_object_operations mdt_obj_ops = {
3560         .loo_object_init    = mdt_object_init,
3561         .loo_object_free    = mdt_object_free,
3562         .loo_object_print   = mdt_object_print
3563 };
3564
3565 /* mds_connect_internal */
3566 static int mdt_connect_internal(struct obd_export *exp,
3567                                 struct mdt_device *mdt,
3568                                 struct obd_connect_data *data)
3569 {
3570         __u64 flags;
3571
3572         if (data != NULL) {
3573                 data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
3574                 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
3575
3576                 /* If no known bits (which should not happen, probably,
3577                    as everybody should support LOOKUP and UPDATE bits at least)
3578                    revert to compat mode with plain locks. */
3579                 if (!data->ocd_ibits_known &&
3580                     data->ocd_connect_flags & OBD_CONNECT_IBITS)
3581                         data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
3582
3583                 if (!mdt->mdt_opts.mo_acl)
3584                         data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
3585
3586                 if (!mdt->mdt_opts.mo_user_xattr)
3587                         data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
3588
3589                 if (!mdt->mdt_opts.mo_mds_capa)
3590                         data->ocd_connect_flags &= ~OBD_CONNECT_MDS_CAPA;
3591
3592                 if (!mdt->mdt_opts.mo_oss_capa)
3593                         data->ocd_connect_flags &= ~OBD_CONNECT_OSS_CAPA;
3594
3595                 exp->exp_connect_flags = data->ocd_connect_flags;
3596                 data->ocd_version = LUSTRE_VERSION_CODE;
3597                 exp->exp_mdt_data.med_ibits_known = data->ocd_ibits_known;
3598         }
3599
3600 #if 0
3601         if (mdt->mdt_opts.mo_acl &&
3602             ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
3603                 CWARN("%s: MDS requires ACL support but client does not\n",
3604                       mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
3605                 return -EBADE;
3606         }
3607 #endif
3608
3609         flags = OBD_CONNECT_LCL_CLIENT | OBD_CONNECT_RMT_CLIENT;
3610         if ((exp->exp_connect_flags & flags) == flags) {
3611                 CWARN("%s: both local and remote client flags are set\n",
3612                       mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
3613                 return -EBADE;
3614         }
3615
3616         if (mdt->mdt_opts.mo_mds_capa &&
3617             ((exp->exp_connect_flags & OBD_CONNECT_MDS_CAPA) == 0)) {
3618                 CWARN("%s: MDS requires capability support, but client not\n",
3619                       mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
3620                 return -EBADE;
3621         }
3622
3623         if (mdt->mdt_opts.mo_oss_capa &&
3624             ((exp->exp_connect_flags & OBD_CONNECT_OSS_CAPA) == 0)) {
3625                 CWARN("%s: MDS requires OSS capability support, "
3626                       "but client not\n",
3627                       mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
3628                 return -EBADE;
3629         }
3630
3631         return 0;
3632 }
3633
3634 /* mds_connect copy */
3635 static int mdt_obd_connect(const struct lu_env *env,
3636                            struct lustre_handle *conn, struct obd_device *obd,
3637                            struct obd_uuid *cluuid,
3638                            struct obd_connect_data *data)
3639 {
3640         struct mdt_export_data *med;
3641         struct mdt_client_data *mcd;
3642         struct obd_export      *exp;
3643         struct mdt_device      *mdt;
3644         int                     rc;
3645         ENTRY;
3646
3647         LASSERT(env != NULL);
3648         if (!conn || !obd || !cluuid)
3649                 RETURN(-EINVAL);
3650
3651         mdt = mdt_dev(obd->obd_lu_dev);
3652
3653         rc = class_connect(conn, obd, cluuid);
3654         if (rc)
3655                 RETURN(rc);
3656
3657         exp = class_conn2export(conn);
3658         LASSERT(exp != NULL);
3659         med = &exp->exp_mdt_data;
3660
3661         rc = mdt_connect_internal(exp, mdt, data);
3662         if (rc == 0) {
3663                 OBD_ALLOC_PTR(mcd);
3664                 if (mcd != NULL) {
3665                         memcpy(mcd->mcd_uuid, cluuid, sizeof mcd->mcd_uuid);
3666                         med->med_mcd = mcd;
3667                         rc = mdt_client_new(env, mdt, med);
3668                         if (rc != 0) {
3669                                 OBD_FREE_PTR(mcd);
3670                                 med->med_mcd = NULL;
3671                         }
3672                 } else
3673                         rc = -ENOMEM;
3674         }
3675
3676         if (rc != 0)
3677                 class_disconnect(exp);
3678         else
3679                 class_export_put(exp);
3680
3681         RETURN(rc);
3682 }
3683
3684 static int mdt_obd_reconnect(struct obd_export *exp, struct obd_device *obd,
3685                              struct obd_uuid *cluuid,
3686                              struct obd_connect_data *data)
3687 {
3688         int rc;
3689         ENTRY;
3690
3691         if (exp == NULL || obd == NULL || cluuid == NULL)
3692                 RETURN(-EINVAL);
3693
3694         rc = mdt_connect_internal(exp, mdt_dev(obd->obd_lu_dev), data);
3695
3696         RETURN(rc);
3697 }
3698
3699 static int mdt_obd_disconnect(struct obd_export *exp)
3700 {
3701         struct mdt_device *mdt = mdt_dev(exp->exp_obd->obd_lu_dev);
3702         int rc;
3703         ENTRY;
3704
3705         LASSERT(exp);
3706         class_export_get(exp);
3707
3708         /* Disconnect early so that clients can't keep using export */
3709         rc = class_disconnect(exp);
3710         if (mdt->mdt_namespace != NULL || exp->exp_obd->obd_namespace != NULL)
3711                 ldlm_cancel_locks_for_export(exp);
3712
3713         /* complete all outstanding replies */
3714         spin_lock(&exp->exp_lock);
3715         while (!list_empty(&exp->exp_outstanding_replies)) {
3716                 struct ptlrpc_reply_state *rs =
3717                         list_entry(exp->exp_outstanding_replies.next,
3718                                    struct ptlrpc_reply_state, rs_exp_list);
3719                 struct ptlrpc_service *svc = rs->rs_service;
3720
3721                 spin_lock(&svc->srv_lock);
3722                 list_del_init(&rs->rs_exp_list);
3723                 ptlrpc_schedule_difficult_reply(rs);
3724                 spin_unlock(&svc->srv_lock);
3725         }
3726         spin_unlock(&exp->exp_lock);
3727
3728         class_export_put(exp);
3729         RETURN(rc);
3730 }
3731
3732 /* FIXME: Can we avoid using these two interfaces? */
3733 static int mdt_init_export(struct obd_export *exp)
3734 {
3735         struct mdt_export_data *med = &exp->exp_mdt_data;
3736         ENTRY;
3737
3738         INIT_LIST_HEAD(&med->med_open_head);
3739         spin_lock_init(&med->med_open_lock);
3740         exp->exp_connecting = 1;
3741         RETURN(0);
3742 }
3743
3744 static int mdt_destroy_export(struct obd_export *export)
3745 {
3746         struct mdt_export_data *med;
3747         struct obd_device      *obd = export->exp_obd;
3748         struct mdt_device      *mdt;
3749         struct mdt_thread_info *info;
3750         struct lu_env           env;
3751         struct md_attr         *ma;
3752         int lmm_size;
3753         int cookie_size;
3754         int rc = 0;
3755         ENTRY;
3756
3757         med = &export->exp_mdt_data;
3758         if (med->med_rmtclient)
3759                 mdt_cleanup_idmap(med);
3760
3761         target_destroy_export(export);
3762
3763         if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
3764                 RETURN(0);
3765
3766         mdt = mdt_dev(obd->obd_lu_dev);
3767         LASSERT(mdt != NULL);
3768
3769         rc = lu_env_init(&env, NULL, LCT_MD_THREAD);
3770         if (rc)
3771                 RETURN(rc);
3772
3773         info = lu_context_key_get(&env.le_ctx, &mdt_thread_key);
3774         LASSERT(info != NULL);
3775         memset(info, 0, sizeof *info);
3776         info->mti_env = &env;
3777         info->mti_mdt = mdt;
3778
3779         ma = &info->mti_attr;
3780         lmm_size = ma->ma_lmm_size = mdt->mdt_max_mdsize;
3781         cookie_size = ma->ma_cookie_size = mdt->mdt_max_cookiesize;
3782         OBD_ALLOC(ma->ma_lmm, lmm_size);
3783         OBD_ALLOC(ma->ma_cookie, cookie_size);
3784
3785         if (ma->ma_lmm == NULL || ma->ma_cookie == NULL)
3786                 GOTO(out, rc = -ENOMEM);
3787         ma->ma_need = MA_LOV | MA_COOKIE;
3788
3789         /* Close any open files (which may also cause orphan unlinking). */
3790         spin_lock(&med->med_open_lock);
3791         while (!list_empty(&med->med_open_head)) {
3792                 struct list_head *tmp = med->med_open_head.next;
3793                 struct mdt_file_data *mfd =
3794                         list_entry(tmp, struct mdt_file_data, mfd_list);
3795
3796                 /* Remove mfd handle so it can't be found again.
3797                  * We are consuming the mfd_list reference here. */
3798                 class_handle_unhash(&mfd->mfd_handle);
3799                 list_del_init(&mfd->mfd_list);
3800                 spin_unlock(&med->med_open_lock);
3801                 mdt_mfd_close(info, mfd);
3802                 /* TODO: if we close the unlinked file,
3803                  * we need to remove it's objects from OST */
3804                 memset(&ma->ma_attr, 0, sizeof(ma->ma_attr));
3805                 spin_lock(&med->med_open_lock);
3806                 ma->ma_lmm_size = lmm_size;
3807                 ma->ma_cookie_size = cookie_size;
3808                 ma->ma_need = MA_LOV | MA_COOKIE;
3809         }
3810         spin_unlock(&med->med_open_lock);
3811         info->mti_mdt = NULL;
3812         mdt_client_del(&env, mdt, med);
3813
3814 out:
3815         if (lmm_size)
3816                 OBD_FREE(ma->ma_lmm, lmm_size);
3817         if (cookie_size)
3818                 OBD_FREE(ma->ma_cookie, cookie_size);
3819         lu_env_fini(&env);
3820
3821         RETURN(rc);
3822 }
3823
3824 static int mdt_upcall(const struct lu_env *env, struct md_device *md,
3825                       enum md_upcall_event ev)
3826 {
3827         struct mdt_device *m = mdt_dev(&md->md_lu_dev);
3828         struct md_device  *next  = m->mdt_child;
3829         struct mdt_thread_info *mti;
3830         int rc = 0;
3831         ENTRY;
3832
3833         switch (ev) {
3834                 case MD_LOV_SYNC:
3835                         rc = next->md_ops->mdo_maxsize_get(env, next,
3836                                         &m->mdt_max_mdsize,
3837                                         &m->mdt_max_cookiesize);
3838                         CDEBUG(D_INFO, "get max mdsize %d max cookiesize %d\n",
3839                                      m->mdt_max_mdsize, m->mdt_max_cookiesize);
3840                         break;
3841                 case MD_NO_TRANS:
3842                         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
3843                         mti->mti_no_need_trans = 1;
3844                         CDEBUG(D_INFO, "disable mdt trans for this thread\n");
3845                         break;
3846                 default:
3847                         CERROR("invalid event\n");
3848                         rc = -EINVAL;
3849                         break;
3850         }
3851         RETURN(rc);
3852 }
3853
3854 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
3855                          void *karg, void *uarg)
3856 {
3857         struct lu_env      env;
3858         struct obd_device *obd= exp->exp_obd;
3859         struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
3860         struct dt_device  *dt = mdt->mdt_bottom;
3861         int rc;
3862
3863         ENTRY;
3864         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
3865         rc = lu_env_init(&env, NULL, LCT_MD_THREAD);
3866         if (rc)
3867                 RETURN(rc);
3868
3869         switch (cmd) {
3870         case OBD_IOC_SYNC:
3871                 rc = dt->dd_ops->dt_sync(&env, dt);
3872                 break;
3873
3874         case OBD_IOC_SET_READONLY:
3875                 rc = dt->dd_ops->dt_sync(&env, dt);
3876                 dt->dd_ops->dt_ro(&env, dt);
3877                 break;
3878
3879         case OBD_IOC_ABORT_RECOVERY:
3880                 CERROR("aborting recovery for device %s\n", obd->obd_name);
3881                 target_stop_recovery_thread(obd);
3882                 break;
3883
3884         default:
3885                 CERROR("not supported cmd = %d for device %s\n",
3886                        cmd, obd->obd_name);
3887                 rc = -EOPNOTSUPP;
3888         }
3889
3890         lu_env_fini(&env);
3891         RETURN(rc);
3892 }
3893
3894 int mdt_postrecov(const struct lu_env *env, struct mdt_device *mdt)
3895 {
3896         struct lu_device *ld = md2lu_dev(mdt->mdt_child);
3897         int rc;
3898         ENTRY;
3899         rc = ld->ld_ops->ldo_recovery_complete(env, ld);
3900         RETURN(rc);
3901 }
3902
3903 int mdt_obd_postrecov(struct obd_device *obd)
3904 {
3905         struct lu_env env;
3906         int rc;
3907
3908         rc = lu_env_init(&env, NULL, LCT_MD_THREAD);
3909         if (rc)
3910                 RETURN(rc);
3911         rc = mdt_postrecov(&env, mdt_dev(obd->obd_lu_dev));
3912         lu_env_fini(&env);
3913         return rc;
3914 }
3915
3916 static struct obd_ops mdt_obd_device_ops = {
3917         .o_owner          = THIS_MODULE,
3918         .o_connect        = mdt_obd_connect,
3919         .o_reconnect      = mdt_obd_reconnect,
3920         .o_disconnect     = mdt_obd_disconnect,
3921         .o_init_export    = mdt_init_export,
3922         .o_destroy_export = mdt_destroy_export,
3923         .o_iocontrol      = mdt_iocontrol,
3924         .o_postrecov      = mdt_obd_postrecov
3925
3926 };
3927
3928 static struct lu_device* mdt_device_fini(const struct lu_env *env,
3929                                          struct lu_device *d)
3930 {
3931         struct mdt_device *m = mdt_dev(d);
3932
3933         mdt_fini(env, m);
3934         RETURN(NULL);
3935 }
3936
3937 static void mdt_device_free(const struct lu_env *env, struct lu_device *d)
3938 {
3939         struct mdt_device *m = mdt_dev(d);
3940
3941         OBD_FREE_PTR(m);
3942 }
3943
3944 static struct lu_device *mdt_device_alloc(const struct lu_env *env,
3945                                           struct lu_device_type *t,
3946                                           struct lustre_cfg *cfg)
3947 {
3948         struct lu_device  *l;
3949         struct mdt_device *m;
3950
3951         OBD_ALLOC_PTR(m);
3952         if (m != NULL) {
3953                 int rc;
3954
3955                 l = &m->mdt_md_dev.md_lu_dev;
3956                 rc = mdt_init0(env, m, t, cfg);
3957                 if (rc != 0) {
3958                         OBD_FREE_PTR(m);
3959                         l = ERR_PTR(rc);
3960                         return l;
3961                 }
3962                 m->mdt_md_dev.md_upcall.mu_upcall = mdt_upcall;
3963         } else
3964                 l = ERR_PTR(-ENOMEM);
3965         return l;
3966 }
3967
3968 /*
3969  * context key constructor/destructor
3970  */
3971 static void *mdt_key_init(const struct lu_context *ctx,
3972                           struct lu_context_key *key)
3973 {
3974         struct mdt_thread_info *info;
3975
3976         /*
3977          * check that no high order allocations are incurred.
3978          */
3979         CLASSERT(CFS_PAGE_SIZE >= sizeof *info);
3980         OBD_ALLOC_PTR(info);
3981         if (info == NULL)
3982                 info = ERR_PTR(-ENOMEM);
3983         return info;
3984 }
3985
3986 static void mdt_key_fini(const struct lu_context *ctx,
3987                          struct lu_context_key *key, void *data)
3988 {
3989         struct mdt_thread_info *info = data;
3990         OBD_FREE_PTR(info);
3991 }
3992
3993 struct lu_context_key mdt_thread_key = {
3994         .lct_tags = LCT_MD_THREAD,
3995         .lct_init = mdt_key_init,
3996         .lct_fini = mdt_key_fini
3997 };
3998
3999 static void *mdt_txn_key_init(const struct lu_context *ctx,
4000                               struct lu_context_key *key)
4001 {
4002         struct mdt_txn_info *txi;
4003
4004         /*
4005          * check that no high order allocations are incurred.
4006          */
4007         CLASSERT(CFS_PAGE_SIZE >= sizeof *txi);
4008         OBD_ALLOC_PTR(txi);
4009         if (txi == NULL)
4010                 txi = ERR_PTR(-ENOMEM);
4011         return txi;
4012 }
4013
4014 static void mdt_txn_key_fini(const struct lu_context *ctx,
4015                              struct lu_context_key *key, void *data)
4016 {
4017         struct mdt_txn_info *txi = data;
4018         OBD_FREE_PTR(txi);
4019 }
4020
4021 struct lu_context_key mdt_txn_key = {
4022         .lct_tags = LCT_TX_HANDLE,
4023         .lct_init = mdt_txn_key_init,
4024         .lct_fini = mdt_txn_key_fini
4025 };
4026
4027 struct md_ucred *mdt_ucred(const struct mdt_thread_info *info)
4028 {
4029         return md_ucred(info->mti_env);
4030 }
4031
4032 static int mdt_type_init(struct lu_device_type *t)
4033 {
4034         int rc;
4035
4036         rc = lu_context_key_register(&mdt_thread_key);
4037         if (rc == 0)
4038                 rc = lu_context_key_register(&mdt_txn_key);
4039         return rc;
4040 }
4041
4042 static void mdt_type_fini(struct lu_device_type *t)
4043 {
4044         lu_context_key_degister(&mdt_thread_key);
4045         lu_context_key_degister(&mdt_txn_key);
4046 }
4047
4048 static struct lu_device_type_operations mdt_device_type_ops = {
4049         .ldto_init = mdt_type_init,
4050         .ldto_fini = mdt_type_fini,
4051
4052         .ldto_device_alloc = mdt_device_alloc,
4053         .ldto_device_free  = mdt_device_free,
4054         .ldto_device_fini  = mdt_device_fini
4055 };
4056
4057 static struct lu_device_type mdt_device_type = {
4058         .ldt_tags     = LU_DEVICE_MD,
4059         .ldt_name     = LUSTRE_MDT_NAME,
4060         .ldt_ops      = &mdt_device_type_ops,
4061         .ldt_ctx_tags = LCT_MD_THREAD
4062 };
4063
4064 static int __init mdt_mod_init(void)
4065 {
4066         struct lprocfs_static_vars lvars;
4067         int rc;
4068
4069         printk(KERN_INFO "Lustre: MetaData Target; info@clusterfs.com\n");
4070
4071         mdt_num_threads = MDT_NUM_THREADS;
4072         lprocfs_init_vars(mdt, &lvars);
4073         rc = class_register_type(&mdt_obd_device_ops, NULL,
4074                                  lvars.module_vars, LUSTRE_MDT_NAME,
4075                                  &mdt_device_type);
4076
4077         return rc;
4078 }
4079
4080 static void __exit mdt_mod_exit(void)
4081 {
4082         class_unregister_type(LUSTRE_MDT_NAME);
4083 }
4084
4085
4086 #define DEF_HNDL(prefix, base, suffix, flags, opc, fn, fmt)             \
4087 [prefix ## _ ## opc - prefix ## _ ## base] = {                          \
4088         .mh_name    = #opc,                                             \
4089         .mh_fail_id = OBD_FAIL_ ## prefix ## _  ## opc ## suffix,       \
4090         .mh_opc     = prefix ## _  ## opc,                              \
4091         .mh_flags   = flags,                                            \
4092         .mh_act     = fn,                                               \
4093         .mh_fmt     = fmt                                               \
4094 }
4095
4096 #define DEF_MDT_HNDL(flags, name, fn, fmt)                                  \
4097         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, fmt)
4098
4099 #define DEF_SEQ_HNDL(flags, name, fn, fmt)                      \
4100         DEF_HNDL(SEQ, QUERY, _NET, flags, name, fn, fmt)
4101
4102 #define DEF_FLD_HNDL(flags, name, fn, fmt)                      \
4103         DEF_HNDL(FLD, QUERY, _NET, flags, name, fn, fmt)
4104 /*
4105  * Request with a format known in advance
4106  */
4107 #define DEF_MDT_HNDL_F(flags, name, fn)                                 \
4108         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, &RQF_MDS_ ## name)
4109
4110 #define DEF_SEQ_HNDL_F(flags, name, fn)                                 \
4111         DEF_HNDL(SEQ, QUERY, _NET, flags, name, fn, &RQF_SEQ_ ## name)
4112
4113 #define DEF_FLD_HNDL_F(flags, name, fn)                                 \
4114         DEF_HNDL(FLD, QUERY, _NET, flags, name, fn, &RQF_FLD_ ## name)
4115 /*
4116  * Request with a format we do not yet know
4117  */
4118 #define DEF_MDT_HNDL_0(flags, name, fn)                                 \
4119         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, NULL)
4120
4121 static struct mdt_handler mdt_mds_ops[] = {
4122 DEF_MDT_HNDL_F(0,                         CONNECT,      mdt_connect),
4123 DEF_MDT_HNDL_F(0,                         DISCONNECT,   mdt_disconnect),
4124 DEF_MDT_HNDL_F(0           |HABEO_REFERO, GETSTATUS,    mdt_getstatus),
4125 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, GETATTR,      mdt_getattr),
4126 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, GETATTR_NAME, mdt_getattr_name),
4127 DEF_MDT_HNDL_F(HABEO_CORPUS|MUTABOR,      SETXATTR,     mdt_setxattr),
4128 DEF_MDT_HNDL_F(HABEO_CORPUS,              GETXATTR,     mdt_getxattr),
4129 DEF_MDT_HNDL_F(0           |HABEO_REFERO, STATFS,       mdt_statfs),
4130 DEF_MDT_HNDL_F(0                        |MUTABOR,
4131                                           REINT,        mdt_reint),
4132 DEF_MDT_HNDL_F(HABEO_CORPUS             , CLOSE,        mdt_close),
4133 DEF_MDT_HNDL_F(HABEO_CORPUS             , DONE_WRITING, mdt_done_writing),
4134 DEF_MDT_HNDL_F(0           |HABEO_REFERO, PIN,          mdt_pin),
4135 DEF_MDT_HNDL_0(0,                         SYNC,         mdt_sync),
4136 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, IS_SUBDIR,    mdt_is_subdir),
4137 DEF_MDT_HNDL_0(0,                         QUOTACHECK,   mdt_quotacheck_handle),
4138 DEF_MDT_HNDL_0(0,                         QUOTACTL,     mdt_quotactl_handle)
4139 };
4140
4141 #define DEF_OBD_HNDL(flags, name, fn)                   \
4142         DEF_HNDL(OBD, PING, _NET, flags, name, fn, NULL)
4143
4144
4145 static struct mdt_handler mdt_obd_ops[] = {
4146         DEF_OBD_HNDL(0, PING,           mdt_obd_ping),
4147         DEF_OBD_HNDL(0, LOG_CANCEL,     mdt_obd_log_cancel),
4148         DEF_OBD_HNDL(0, QC_CALLBACK,    mdt_obd_qc_callback)
4149 };
4150
4151 #define DEF_DLM_HNDL_0(flags, name, fn)                   \
4152         DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, NULL)
4153 #define DEF_DLM_HNDL_F(flags, name, fn)                   \
4154         DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, &RQF_LDLM_ ## name)
4155
4156 static struct mdt_handler mdt_dlm_ops[] = {
4157         DEF_DLM_HNDL_F(HABEO_CLAVIS, ENQUEUE,        mdt_enqueue),
4158         DEF_DLM_HNDL_0(HABEO_CLAVIS, CONVERT,        mdt_convert),
4159         DEF_DLM_HNDL_0(0,            BL_CALLBACK,    mdt_bl_callback),
4160         DEF_DLM_HNDL_0(0,            CP_CALLBACK,    mdt_cp_callback)
4161 };
4162
4163 static struct mdt_handler mdt_llog_ops[] = {
4164 };
4165
4166 #define DEF_SEC_CTX_HNDL(name, fn)                      \
4167         DEF_HNDL(SEC_CTX, INIT, _NET, 0, name, fn, NULL)
4168
4169 static struct mdt_handler mdt_sec_ctx_ops[] = {
4170         DEF_SEC_CTX_HNDL(INIT,          mdt_sec_ctx_handle),
4171         DEF_SEC_CTX_HNDL(INIT_CONT,     mdt_sec_ctx_handle),
4172         DEF_SEC_CTX_HNDL(FINI,          mdt_sec_ctx_handle)
4173 };
4174
4175 static struct mdt_opc_slice mdt_regular_handlers[] = {
4176         {
4177                 .mos_opc_start = MDS_GETATTR,
4178                 .mos_opc_end   = MDS_LAST_OPC,
4179                 .mos_hs        = mdt_mds_ops
4180         },
4181         {
4182                 .mos_opc_start = OBD_PING,
4183                 .mos_opc_end   = OBD_LAST_OPC,
4184                 .mos_hs        = mdt_obd_ops
4185         },
4186         {
4187                 .mos_opc_start = LDLM_ENQUEUE,
4188                 .mos_opc_end   = LDLM_LAST_OPC,
4189                 .mos_hs        = mdt_dlm_ops
4190         },
4191         {
4192                 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
4193                 .mos_opc_end   = LLOG_LAST_OPC,
4194                 .mos_hs        = mdt_llog_ops
4195         },
4196         {
4197                 .mos_opc_start = SEC_CTX_INIT,
4198                 .mos_opc_end   = SEC_LAST_OPC,
4199                 .mos_hs        = mdt_sec_ctx_ops
4200         },
4201         {
4202                 .mos_hs        = NULL
4203         }
4204 };
4205
4206 static struct mdt_handler mdt_readpage_ops[] = {
4207         DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, READPAGE, mdt_readpage),
4208 #ifdef HAVE_SPLIT_SUPPORT
4209         DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, WRITEPAGE, mdt_writepage),
4210 #endif
4211
4212         /*
4213          * XXX: this is ugly and should be fixed one day, see mdc_close() for
4214          * detailed comments. --umka
4215          */
4216         DEF_MDT_HNDL_F(HABEO_CORPUS,              CLOSE,    mdt_close),
4217         DEF_MDT_HNDL_F(HABEO_CORPUS,              DONE_WRITING,    mdt_done_writing),
4218 };
4219
4220 static struct mdt_opc_slice mdt_readpage_handlers[] = {
4221         {
4222                 .mos_opc_start = MDS_GETATTR,
4223                 .mos_opc_end   = MDS_LAST_OPC,
4224                 .mos_hs        = mdt_readpage_ops
4225         },
4226         {
4227                 .mos_hs        = NULL
4228         }
4229 };
4230
4231 static struct mdt_handler mdt_seq_ops[] = {
4232         DEF_SEQ_HNDL_F(0, QUERY, (int (*)(struct mdt_thread_info *))seq_query)
4233 };
4234
4235 static struct mdt_opc_slice mdt_seq_handlers[] = {
4236         {
4237                 .mos_opc_start = SEQ_QUERY,
4238                 .mos_opc_end   = SEQ_LAST_OPC,
4239                 .mos_hs        = mdt_seq_ops
4240         },
4241         {
4242                 .mos_hs        = NULL
4243         }
4244 };
4245
4246 static struct mdt_handler mdt_fld_ops[] = {
4247         DEF_FLD_HNDL_F(0, QUERY, (int (*)(struct mdt_thread_info *))fld_query)
4248 };
4249
4250 static struct mdt_opc_slice mdt_fld_handlers[] = {
4251         {
4252                 .mos_opc_start = FLD_QUERY,
4253                 .mos_opc_end   = FLD_LAST_OPC,
4254                 .mos_hs        = mdt_fld_ops
4255         },
4256         {
4257                 .mos_hs        = NULL
4258         }
4259 };
4260
4261 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
4262 MODULE_DESCRIPTION("Lustre Meta-data Target ("LUSTRE_MDT_NAME")");
4263 MODULE_LICENSE("GPL");
4264
4265 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
4266                 "number of mdt service threads to start");
4267
4268 cfs_module(mdt, "0.2.0", mdt_mod_init, mdt_mod_exit);