Whamcloud - gitweb
mdt: export ptlrpc stats fro mdt services.
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mdt/mdt_handler.c
5  *  Lustre Metadata Target (mdt) request handler
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *   Author: Nikita Danilov <nikita@clusterfs.com>
13  *   Author: Huang Hua <huanghua@clusterfs.com>
14  *   Author: Yury Umanets <umka@clusterfs.com>
15  *
16  *   This file is part of the Lustre file system, http://www.lustre.org
17  *   Lustre is a trademark of Cluster File Systems, Inc.
18  *
19  *   You may have signed or agreed to another license before downloading
20  *   this software.  If so, you are bound by the terms and conditions
21  *   of that agreement, and the following does not apply to you.  See the
22  *   LICENSE file included with this distribution for more information.
23  *
24  *   If you did not agree to a different license, then this copy of Lustre
25  *   is open source software; you can redistribute it and/or modify it
26  *   under the terms of version 2 of the GNU General Public License as
27  *   published by the Free Software Foundation.
28  *
29  *   In either case, Lustre is distributed in the hope that it will be
30  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
31  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
32  *   license text for more details.
33  */
34
35 #ifndef EXPORT_SYMTAB
36 # define EXPORT_SYMTAB
37 #endif
38 #define DEBUG_SUBSYSTEM S_MDS
39
40 #include <linux/module.h>
41
42 /* LUSTRE_VERSION_CODE */
43 #include <lustre_ver.h>
44 /*
45  * struct OBD_{ALLOC,FREE}*()
46  * MDT_FAIL_CHECK
47  */
48 #include <obd_support.h>
49 /* struct ptlrpc_request */
50 #include <lustre_net.h>
51 /* struct obd_export */
52 #include <lustre_export.h>
53 /* struct obd_device */
54 #include <obd.h>
55 /* lu2dt_dev() */
56 #include <dt_object.h>
57 #include <lustre_mds.h>
58 #include <lustre_mdt.h>
59 #include "mdt_internal.h"
60 #include <linux/lustre_acl.h>
61 #include <lustre_param.h>
62
63 mdl_mode_t mdt_mdl_lock_modes[] = {
64         [LCK_MINMODE] = MDL_MINMODE,
65         [LCK_EX]      = MDL_EX,
66         [LCK_PW]      = MDL_PW,
67         [LCK_PR]      = MDL_PR,
68         [LCK_CW]      = MDL_CW,
69         [LCK_CR]      = MDL_CR,
70         [LCK_NL]      = MDL_NL,
71         [LCK_GROUP]   = MDL_GROUP
72 };
73
74 ldlm_mode_t mdt_dlm_lock_modes[] = {
75         [MDL_MINMODE] = LCK_MINMODE,
76         [MDL_EX]      = LCK_EX,
77         [MDL_PW]      = LCK_PW,
78         [MDL_PR]      = LCK_PR,
79         [MDL_CW]      = LCK_CW,
80         [MDL_CR]      = LCK_CR,
81         [MDL_NL]      = LCK_NL,
82         [MDL_GROUP]   = LCK_GROUP
83 };
84
85 /*
86  * Initialized in mdt_mod_init().
87  */
88 unsigned long mdt_num_threads;
89
90 /* ptlrpc request handler for MDT. All handlers are
91  * grouped into several slices - struct mdt_opc_slice,
92  * and stored in an array - mdt_handlers[].
93  */
94 struct mdt_handler {
95         /* The name of this handler. */
96         const char *mh_name;
97         /* Fail id for this handler, checked at the beginning of this handler*/
98         int         mh_fail_id;
99         /* Operation code for this handler */
100         __u32       mh_opc;
101         /* flags are listed in enum mdt_handler_flags below. */
102         __u32       mh_flags;
103         /* The actual handler function to execute. */
104         int (*mh_act)(struct mdt_thread_info *info);
105         /* Request format for this request. */
106         const struct req_format *mh_fmt;
107 };
108
109 enum mdt_handler_flags {
110         /*
111          * struct mdt_body is passed in the incoming message, and object
112          * identified by this fid exists on disk.
113          *
114          * "habeo corpus" == "I have a body"
115          */
116         HABEO_CORPUS = (1 << 0),
117         /*
118          * struct ldlm_request is passed in the incoming message.
119          *
120          * "habeo clavis" == "I have a key"
121          */
122         HABEO_CLAVIS = (1 << 1),
123         /*
124          * this request has fixed reply format, so that reply message can be
125          * packed by generic code.
126          *
127          * "habeo refero" == "I have a reply"
128          */
129         HABEO_REFERO = (1 << 2),
130         /*
131          * this request will modify something, so check whether the filesystem
132          * is readonly or not, then return -EROFS to client asap if necessary.
133          *
134          * "mutabor" == "I shall modify"
135          */
136         MUTABOR      = (1 << 3)
137 };
138
139 struct mdt_opc_slice {
140         __u32               mos_opc_start;
141         int                 mos_opc_end;
142         struct mdt_handler *mos_hs;
143 };
144
145 static struct mdt_opc_slice mdt_regular_handlers[];
146 static struct mdt_opc_slice mdt_readpage_handlers[];
147 static struct mdt_opc_slice mdt_seq_handlers[];
148 static struct mdt_opc_slice mdt_fld_handlers[];
149
150 static struct mdt_device *mdt_dev(struct lu_device *d);
151 static int mdt_regular_handle(struct ptlrpc_request *req);
152 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags);
153
154 static struct lu_object_operations mdt_obj_ops;
155
156 int mdt_get_disposition(struct ldlm_reply *rep, int flag)
157 {
158         if (!rep)
159                 return 0;
160         return (rep->lock_policy_res1 & flag);
161 }
162
163 void mdt_clear_disposition(struct mdt_thread_info *info,
164                            struct ldlm_reply *rep, int flag)
165 {
166         if (info)
167                 info->mti_opdata &= ~flag;
168         if (rep)
169                 rep->lock_policy_res1 &= ~flag;
170 }
171
172 void mdt_set_disposition(struct mdt_thread_info *info,
173                          struct ldlm_reply *rep, int flag)
174 {
175         if (info)
176                 info->mti_opdata |= flag;
177         if (rep)
178                 rep->lock_policy_res1 |= flag;
179 }
180
181 void mdt_lock_reg_init(struct mdt_lock_handle *lh, ldlm_mode_t lm)
182 {
183         lh->mlh_pdo_hash = 0;
184         lh->mlh_reg_mode = lm;
185         lh->mlh_type = MDT_REG_LOCK;
186 }
187
188 void mdt_lock_pdo_init(struct mdt_lock_handle *lh, ldlm_mode_t lm,
189                        const char *name, int namelen)
190 {
191         lh->mlh_reg_mode = lm;
192         lh->mlh_type = MDT_PDO_LOCK;
193
194         if (name != NULL) {
195                 LASSERT(namelen > 0);
196                 lh->mlh_pdo_hash = full_name_hash(name, namelen - 1);
197         } else {
198                 LASSERT(namelen == 0);
199                 lh->mlh_pdo_hash = 0ull;
200         }
201 }
202
203 static void mdt_lock_pdo_mode(struct mdt_thread_info *info, struct mdt_object *o,
204                               struct mdt_lock_handle *lh)
205 {
206         mdl_mode_t mode;
207         ENTRY;
208
209         /*
210          * Any dir access needs couple of locks:
211          *
212          * 1) on part of dir we gonna take lookup/modify;
213          *
214          * 2) on whole dir to protect it from concurrent splitting and/or to
215          * flush client's cache for readdir().
216          *
217          * so, for a given mode and object this routine decides what lock mode
218          * to use for lock #2:
219          *
220          * 1) if caller's gonna lookup in dir then we need to protect dir from
221          * being splitted only - LCK_CR
222          *
223          * 2) if caller's gonna modify dir then we need to protect dir from
224          * being splitted and to flush cache - LCK_CW
225          *
226          * 3) if caller's gonna modify dir and that dir seems ready for
227          * splitting then we need to protect it from any type of access
228          * (lookup/modify/split) - LCK_EX --bzzz
229          */
230
231         LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
232         LASSERT(lh->mlh_pdo_mode == LCK_MINMODE);
233
234         /*
235          * No pdo locks possible on not existing objects, because pdo lock is
236          * taken on parent dir and parent can't be absent.
237          */
238         LASSERT(mdt_object_exists(o) > 0);
239
240         /*
241          * Ask underlaying level its opinion about preferable PDO lock mode
242          * having access type passed as regular lock mode:
243          *
244          * - MDL_MINMODE means that lower layer does not want to specify lock
245          * mode;
246          *
247          * - MDL_NL means that no PDO lock should be taken. This is used in some
248          * cases. Say, for non-splittable directories no need to use PDO locks
249          * at all.
250          */
251         mode = mdo_lock_mode(info->mti_env, mdt_object_child(o),
252                              mdt_dlm_mode2mdl_mode(lh->mlh_reg_mode));
253
254         if (mode != MDL_MINMODE) {
255                 lh->mlh_pdo_mode = mdt_mdl_mode2dlm_mode(mode);
256         } else {
257                 /*
258                  * Lower layer does not want to specify locking mode. We do it
259                  * our selves. No special protection is needed, just flush
260                  * client's cache on modification and allow concurrent
261                  * mondification.
262                  */
263                 switch (lh->mlh_reg_mode) {
264                 case LCK_EX:
265                         lh->mlh_pdo_mode = LCK_EX;
266                         break;
267                 case LCK_PR:
268                         lh->mlh_pdo_mode = LCK_CR;
269                         break;
270                 case LCK_PW:
271                         lh->mlh_pdo_mode = LCK_CW;
272                         break;
273                 default:
274                         CERROR("Not expected lock type (0x%x)\n",
275                                (int)lh->mlh_reg_mode);
276                         LBUG();
277                 }
278         }
279
280         LASSERT(lh->mlh_pdo_mode != LCK_MINMODE);
281         EXIT;
282 }
283
284 static int mdt_getstatus(struct mdt_thread_info *info)
285 {
286         struct mdt_device *mdt  = info->mti_mdt;
287         struct md_device  *next = mdt->mdt_child;
288         struct mdt_body   *repbody;
289         int                rc;
290
291         ENTRY;
292
293         rc = mdt_check_ucred(info);
294         if (rc)
295                 RETURN(err_serious(rc));
296
297         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
298                 RETURN(err_serious(-ENOMEM));
299
300         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
301         rc = next->md_ops->mdo_root_get(info->mti_env, next, &repbody->fid1);
302         if (rc != 0)
303                 RETURN(rc);
304
305         repbody->valid |= OBD_MD_FLID;
306
307         if (mdt->mdt_opts.mo_mds_capa) {
308                 struct mdt_object  *root;
309                 struct lustre_capa *capa;
310
311                 root = mdt_object_find(info->mti_env, mdt, &repbody->fid1);
312                 if (IS_ERR(root))
313                         RETURN(PTR_ERR(root));
314
315                 capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
316                 LASSERT(capa);
317                 capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
318
319                 rc = mo_capa_get(info->mti_env, mdt_object_child(root), capa,
320                                  0);
321                 mdt_object_put(info->mti_env, root);
322                 if (rc == 0)
323                         repbody->valid |= OBD_MD_FLMDSCAPA;
324         }
325
326         RETURN(rc);
327 }
328
329 static int mdt_statfs(struct mdt_thread_info *info)
330 {
331         struct md_device  *next  = info->mti_mdt->mdt_child;
332         struct obd_statfs *osfs;
333         int                rc;
334
335         ENTRY;
336
337         /* This will trigger a watchdog timeout */
338         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
339                          (MDT_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1);
340
341         rc = mdt_check_ucred(info);
342         if (rc)
343                 RETURN(err_serious(rc));
344
345         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
346                 rc = err_serious(-ENOMEM);
347         } else {
348                 osfs = req_capsule_server_get(&info->mti_pill,&RMF_OBD_STATFS);
349                 /* XXX max_age optimisation is needed here. See mds_statfs */
350                 rc = next->md_ops->mdo_statfs(info->mti_env, next,
351                                               &info->mti_u.ksfs);
352                 statfs_pack(osfs, &info->mti_u.ksfs);
353         }
354         RETURN(rc);
355 }
356
357 void mdt_pack_size2body(struct mdt_body *b, const struct lu_attr *attr,
358                         struct mdt_object *o)
359 {
360         /* Check if Size-on-MDS is enabled. */
361         if (S_ISREG(attr->la_mode) && mdt_sizeonmds_enabled(o)) {
362                 b->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
363                 b->size = attr->la_size;
364                 b->blocks = attr->la_blocks;
365         }
366 }
367
368 void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
369                         const struct lu_attr *attr, const struct lu_fid *fid)
370 {
371         /*XXX should pack the reply body according to lu_valid*/
372         b->valid |= OBD_MD_FLCTIME | OBD_MD_FLUID   |
373                     OBD_MD_FLGID   | OBD_MD_FLTYPE  |
374                     OBD_MD_FLMODE  | OBD_MD_FLNLINK | OBD_MD_FLFLAGS |
375                     OBD_MD_FLATIME | OBD_MD_FLMTIME ;
376
377         if (!S_ISREG(attr->la_mode))
378                 b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLRDEV;
379
380         b->atime      = attr->la_atime;
381         b->mtime      = attr->la_mtime;
382         b->ctime      = attr->la_ctime;
383         b->mode       = attr->la_mode;
384         b->size       = attr->la_size;
385         b->blocks     = attr->la_blocks;
386         b->uid        = attr->la_uid;
387         b->gid        = attr->la_gid;
388         b->flags      = attr->la_flags;
389         b->nlink      = attr->la_nlink;
390         b->rdev       = attr->la_rdev;
391
392         if (fid) {
393                 b->fid1 = *fid;
394                 b->valid |= OBD_MD_FLID;
395                 CDEBUG(D_INODE, ""DFID": nlink=%d, mode=%o, size="LPU64"\n",
396                                 PFID(fid), b->nlink, b->mode, b->size);
397         }
398
399         if (info)
400                 mdt_body_reverse_idmap(info, b);
401 }
402
403 static inline int mdt_body_has_lov(const struct lu_attr *la,
404                                    const struct mdt_body *body)
405 {
406         return ((S_ISREG(la->la_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
407                 (S_ISDIR(la->la_mode) && (body->valid & OBD_MD_FLDIREA )) );
408 }
409
410 static int mdt_getattr_internal(struct mdt_thread_info *info,
411                                 struct mdt_object *o)
412 {
413         struct md_object        *next = mdt_object_child(o);
414         const struct mdt_body   *reqbody = info->mti_body;
415         struct ptlrpc_request   *req = mdt_info_req(info);
416         struct mdt_export_data  *med = &req->rq_export->exp_mdt_data;
417         struct md_attr          *ma = &info->mti_attr;
418         struct lu_attr          *la = &ma->ma_attr;
419         struct req_capsule      *pill = &info->mti_pill;
420         const struct lu_env     *env = info->mti_env;
421         struct mdt_body         *repbody;
422         struct lu_buf           *buffer = &info->mti_buf;
423         int                     rc;
424         ENTRY;
425
426         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
427                 RETURN(err_serious(-ENOMEM));
428
429         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
430
431         if (reqbody->valid & OBD_MD_MEA) {
432                 /* Assumption: MDT_MD size is enough for lmv size FIXME */
433                 ma->ma_lmv = req_capsule_server_get(pill, &RMF_MDT_MD);
434                 ma->ma_lmv_size = req_capsule_get_size(pill, &RMF_MDT_MD,
435                                                              RCL_SERVER);
436                 ma->ma_need = MA_INODE | MA_LMV;
437         } else {
438                 ma->ma_need = MA_INODE | MA_LOV ;
439                 ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
440                 ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD,
441                                                              RCL_SERVER);
442         }
443         ma->ma_valid = 0;
444         rc = mo_attr_get(env, next, ma);
445         if (rc == -EREMOTE) {
446                 /* This object is located on remote node.*/
447                 repbody->fid1 = *mdt_object_fid(o);
448                 repbody->valid = OBD_MD_FLID | OBD_MD_MDS;
449                 RETURN(0);
450         } else if (rc) {
451                 CERROR("getattr error for "DFID": %d\n",
452                         PFID(mdt_object_fid(o)), rc);
453                 RETURN(rc);
454         }
455
456         if (ma->ma_valid & MA_INODE)
457                 mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
458         else
459                 RETURN(-EFAULT);
460
461         if (mdt_body_has_lov(la, reqbody)) {
462                 if (ma->ma_valid & MA_LOV) {
463                         LASSERT(ma->ma_lmm_size);
464                         mdt_dump_lmm(D_INFO, ma->ma_lmm);
465                         repbody->eadatasize = ma->ma_lmm_size;
466                         if (S_ISDIR(la->la_mode))
467                                 repbody->valid |= OBD_MD_FLDIREA;
468                         else
469                                 repbody->valid |= OBD_MD_FLEASIZE;
470                 }
471                 if (ma->ma_valid & MA_LMV) {
472                         LASSERT(S_ISDIR(la->la_mode));
473                         repbody->eadatasize = ma->ma_lmv_size;
474                         repbody->valid |= OBD_MD_FLDIREA;
475                         repbody->valid |= OBD_MD_MEA;
476                 }
477         } else if (S_ISLNK(la->la_mode) &&
478                           reqbody->valid & OBD_MD_LINKNAME) {
479                 buffer->lb_buf = ma->ma_lmm;
480                 buffer->lb_len = reqbody->eadatasize;
481                 rc = mo_readlink(env, next, buffer);
482                 if (rc <= 0) {
483                         CERROR("readlink failed: %d\n", rc);
484                         rc = -EFAULT;
485                 } else {
486                         repbody->valid |= OBD_MD_LINKNAME;
487                         repbody->eadatasize = rc;
488                         ((char*)ma->ma_lmm)[rc - 1] = 0; /* NULL terminate */
489                         CDEBUG(D_INODE, "symlink dest %s, len = %d\n",
490                                         (char*)ma->ma_lmm, rc);
491                         rc = 0;
492                 }
493         }
494
495         if (reqbody->valid & OBD_MD_FLMODEASIZE) {
496                 repbody->max_cookiesize = info->mti_mdt->mdt_max_cookiesize;
497                 repbody->max_mdsize = info->mti_mdt->mdt_max_mdsize;
498                 repbody->valid |= OBD_MD_FLMODEASIZE;
499                 CDEBUG(D_INODE, "I am going to change the MAX_MD_SIZE & "
500                                 "MAX_COOKIE to : %d:%d\n",
501                                 repbody->max_mdsize,
502                                 repbody->max_cookiesize);
503         }
504
505         if (med->med_rmtclient && (reqbody->valid & OBD_MD_FLRMTPERM)) {
506                 void *buf = req_capsule_server_get(pill, &RMF_ACL);
507
508                 /* mdt_getattr_lock only */
509                 rc = mdt_pack_remote_perm(info, o, buf);
510                 if (rc) {
511                         repbody->valid &= ~OBD_MD_FLRMTPERM;
512                         repbody->aclsize = 0;
513                         RETURN(rc);
514                 } else {
515                         repbody->valid |= OBD_MD_FLRMTPERM;
516                         repbody->aclsize = sizeof(struct mdt_remote_perm);
517                 }
518         }
519 #ifdef CONFIG_FS_POSIX_ACL
520         else if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
521                  (reqbody->valid & OBD_MD_FLACL)) {
522                 buffer->lb_buf = req_capsule_server_get(pill, &RMF_ACL);
523                 buffer->lb_len = req_capsule_get_size(pill,
524                                                       &RMF_ACL, RCL_SERVER);
525                 if (buffer->lb_len > 0) {
526                         rc = mo_xattr_get(env, next, buffer,
527                                           XATTR_NAME_ACL_ACCESS);
528                         if (rc < 0) {
529                                 if (rc == -ENODATA) {
530                                         repbody->aclsize = 0;
531                                         repbody->valid |= OBD_MD_FLACL;
532                                         rc = 0;
533                                 } else if (rc == -EOPNOTSUPP) {
534                                         rc = 0;
535                                 } else {
536                                         CERROR("got acl size: %d\n", rc);
537                                 }
538                         } else {
539                                 repbody->aclsize = rc;
540                                 repbody->valid |= OBD_MD_FLACL;
541                                 rc = 0;
542                         }
543                 }
544         }
545 #endif
546
547         if ((reqbody->valid & OBD_MD_FLMDSCAPA) &&
548             info->mti_mdt->mdt_opts.mo_mds_capa) {
549                 struct lustre_capa *capa;
550
551                 capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
552                 LASSERT(capa);
553                 capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
554                 rc = mo_capa_get(env, next, capa, 0);
555                 if (rc)
556                         RETURN(rc);
557                 repbody->valid |= OBD_MD_FLMDSCAPA;
558         }
559         RETURN(rc);
560 }
561
562 static int mdt_renew_capa(struct mdt_thread_info *info)
563 {
564         struct mdt_device  *mdt = info->mti_mdt;
565         struct mdt_object  *obj = info->mti_object;
566         struct mdt_body    *body;
567         struct lustre_capa *capa, *c;
568         int rc;
569         ENTRY;
570
571         /* if object doesn't exist, or server has disabled capability,
572          * return directly, client will find body->valid OBD_MD_FLOSSCAPA
573          * flag not set.
574          */
575         if (!obj || !mdt->mdt_opts.mo_mds_capa)
576                 RETURN(0);
577
578         body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
579         LASSERT(body != NULL);
580
581         c = req_capsule_client_get(&info->mti_pill, &RMF_CAPA1);
582         LASSERT(c);
583
584         capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
585         LASSERT(capa);
586
587         *capa = *c;
588         rc = mo_capa_get(info->mti_env, mdt_object_child(obj), capa, 1);
589         if (rc == 0)
590                 body->valid |= OBD_MD_FLOSSCAPA;
591
592         RETURN(rc);
593 }
594
595 static int mdt_getattr(struct mdt_thread_info *info)
596 {
597         struct mdt_object       *obj = info->mti_object;
598         struct req_capsule      *pill = &info->mti_pill;
599         struct mdt_body         *reqbody;
600         struct mdt_body         *repbody;
601         mode_t                  mode;
602         int rc;
603         ENTRY;
604
605         reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
606         LASSERT(reqbody);
607
608         if (reqbody->valid & OBD_MD_FLOSSCAPA) {
609                 rc = req_capsule_pack(pill);
610                 if (rc)
611                         RETURN(err_serious(rc));
612                 rc = mdt_renew_capa(info);
613                 mdt_shrink_reply(info, REPLY_REC_OFF + 1, 0, 0);
614                 RETURN(rc);
615         }
616
617         LASSERT(obj != NULL);
618         LASSERT(lu_object_assert_exists(&obj->mot_obj.mo_lu));
619
620         mode = lu_object_attr(&obj->mot_obj.mo_lu);
621         if (S_ISLNK(mode) && (reqbody->valid & OBD_MD_LINKNAME) &&
622             (reqbody->eadatasize > info->mti_mdt->mdt_max_mdsize)) {
623                 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
624                                      reqbody->eadatasize);
625         } else {
626                 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
627                                      info->mti_mdt->mdt_max_mdsize);
628         }
629
630         rc = req_capsule_pack(pill);
631         if (rc != 0)
632                 RETURN(err_serious(rc));
633
634         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
635         LASSERT(repbody);
636         repbody->eadatasize = 0;
637         repbody->aclsize = 0;
638
639         if (reqbody->valid & OBD_MD_FLRMTPERM)
640                 rc = mdt_init_ucred(info, reqbody);
641         else
642                 rc = mdt_check_ucred(info);
643         if (rc)
644                 GOTO(out, rc);
645
646         /* don't check capability at all, because rename might
647          * getattr for remote obj, and at that time no capability
648          * is available. */
649         mdt_set_capainfo(info, 1, &reqbody->fid1, BYPASS_CAPA);
650         rc = mdt_getattr_internal(info, obj);
651         if (reqbody->valid & OBD_MD_FLRMTPERM)
652                 mdt_exit_ucred(info);
653         EXIT;
654 out:
655         mdt_shrink_reply(info, REPLY_REC_OFF + 1, 1, 0);
656         return rc;
657 }
658
659 static int mdt_is_subdir(struct mdt_thread_info *info)
660 {
661         struct mdt_object     *o = info->mti_object;
662         struct req_capsule    *pill = &info->mti_pill;
663         const struct mdt_body *body = info->mti_body;
664         struct mdt_body       *repbody;
665         int                    rc;
666         ENTRY;
667
668         LASSERT(o != NULL);
669
670         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
671
672         /*
673          * We save last checked parent fid to @repbody->fid1 for remote
674          * directory case.
675          */
676         LASSERT(fid_is_sane(&body->fid2));
677         mdt_set_capainfo(info, 0, &body->fid1, BYPASS_CAPA);
678         mdt_set_capainfo(info, 1, &body->fid2, BYPASS_CAPA);
679
680         LASSERT(mdt_object_exists(o) > 0);
681         rc = mdo_is_subdir(info->mti_env, mdt_object_child(o),
682                            &body->fid2, &repbody->fid1);
683         if (rc == 0 || rc == -EREMOTE)
684                 repbody->valid |= OBD_MD_FLID;
685
686         RETURN(rc);
687 }
688
689 static int mdt_raw_lookup(struct mdt_thread_info *info,
690                           struct mdt_object *parent,
691                           const char* name,
692                           struct ldlm_reply *ldlm_rep)
693 {
694         struct md_object *next = mdt_object_child(info->mti_object);
695         const struct mdt_body *reqbody = info->mti_body;
696         struct lu_fid *child_fid = &info->mti_tmp_fid1;
697         struct mdt_body *repbody;
698         int rc;
699         ENTRY;
700
701         if (reqbody->valid != OBD_MD_FLID)
702                 RETURN(0);
703
704         /* Only got the fid of this obj by name */
705         rc = mdo_lookup(info->mti_env, next, name, child_fid);
706         if (rc != 0) {
707                 if (rc == -ENOENT)
708                         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
709                 RETURN(rc);
710         } else
711                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
712
713         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
714         repbody->fid1 = *child_fid;
715         repbody->valid = OBD_MD_FLID;
716
717         RETURN(1);
718 }
719
720 /*
721  * UPDATE lock should be taken against parent, and be release before exit;
722  * child_bits lock should be taken against child, and be returned back:
723  *            (1)normal request should release the child lock;
724  *            (2)intent request will grant the lock to client.
725  */
726 static int mdt_getattr_name_lock(struct mdt_thread_info *info,
727                                  struct mdt_lock_handle *lhc,
728                                  __u64 child_bits,
729                                  struct ldlm_reply *ldlm_rep)
730 {
731         struct ptlrpc_request *req = mdt_info_req(info);
732         struct mdt_object     *parent = info->mti_object;
733         struct mdt_object     *child;
734         struct md_object      *next = mdt_object_child(info->mti_object);
735         struct lu_fid         *child_fid = &info->mti_tmp_fid1;
736         int                    is_resent, rc, namelen = 0;
737         const char            *name;
738         struct mdt_lock_handle *lhp;
739         struct ldlm_lock      *lock;
740         ENTRY;
741
742         is_resent = lustre_handle_is_used(&lhc->mlh_reg_lh);
743         if (is_resent)
744                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
745
746         LASSERT(info->mti_object != NULL);
747         name = req_capsule_client_get(&info->mti_pill, &RMF_NAME);
748         if (name == NULL)
749                 RETURN(err_serious(-EFAULT));
750
751         namelen = req_capsule_get_size(&info->mti_pill, &RMF_NAME,
752                                        RCL_CLIENT);
753
754         CDEBUG(D_INODE, "getattr with lock for "DFID"/%s, ldlm_rep = %p\n",
755                         PFID(mdt_object_fid(parent)), name, ldlm_rep);
756
757         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
758
759         rc = mdt_object_exists(parent);
760         if (rc == 0)
761                 RETURN(-ESTALE);
762         else if (rc < 0) {
763                 CERROR("Object "DFID" locates on remote server\n",
764                         PFID(mdt_object_fid(parent)));
765                 LBUG();
766         }
767
768         rc = mdt_raw_lookup(info, parent, name, ldlm_rep);
769         if (rc != 0) {
770                 if (rc > 0)
771                         rc = 0;
772                 RETURN(rc);
773         }
774
775         if (name[0] == 0) {
776                 /* Only getattr on the child. Parent is on another node. */
777                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
778                 child = parent;
779                 CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID", "
780                        "ldlm_rep=%p\n", PFID(mdt_object_fid(child)), ldlm_rep);
781
782                 if (is_resent) {
783                         /* Do not take lock for resent case. */
784                         lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
785                         if (!lock) {
786                                 CERROR("Invalid lock handle "LPX64"\n",
787                                        lhc->mlh_reg_lh.cookie);
788                                 LBUG();
789                         }
790                         LASSERT(fid_res_name_eq(mdt_object_fid(child),
791                                                 &lock->l_resource->lr_name));
792                         LDLM_LOCK_PUT(lock);
793                         rc = 0;
794                 } else {
795                         mdt_lock_handle_init(lhc);
796                         mdt_lock_reg_init(lhc, LCK_PR);
797
798                         /*
799                          * Object's name is on another MDS, no lookup lock is
800                          * needed here but update is.
801                          */
802                         child_bits &= ~MDS_INODELOCK_LOOKUP;
803                         child_bits |= MDS_INODELOCK_UPDATE;
804
805                         rc = mdt_object_lock(info, child, lhc, child_bits,
806                                              MDT_LOCAL_LOCK);
807                 }
808                 if (rc == 0) {
809                         /* Finally, we can get attr for child. */
810                         mdt_set_capainfo(info, 0, mdt_object_fid(child),
811                                          BYPASS_CAPA);
812                         rc = mdt_getattr_internal(info, child);
813                         if (rc != 0)
814                                 mdt_object_unlock(info, child, lhc, 1);
815                 }
816                 GOTO(out, rc);
817         }
818
819         /* step 1: lock parent */
820         lhp = &info->mti_lh[MDT_LH_PARENT];
821         mdt_lock_pdo_init(lhp, LCK_PR, name, namelen);
822         rc = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
823                              MDT_LOCAL_LOCK);
824         if (rc != 0)
825                 RETURN(rc);
826
827         /*step 2: lookup child's fid by name */
828         rc = mdo_lookup(info->mti_env, next, name, child_fid);
829         if (rc != 0) {
830                 if (rc == -ENOENT)
831                         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
832                 GOTO(out_parent, rc);
833         } else
834                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
835         /*
836          *step 3: find the child object by fid & lock it.
837          *        regardless if it is local or remote.
838          */
839         child = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
840         if (IS_ERR(child))
841                 GOTO(out_parent, rc = PTR_ERR(child));
842         if (is_resent) {
843                 /* Do not take lock for resent case. */
844                 lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
845                 if (!lock) {
846                         CERROR("Invalid lock handle "LPX64"\n",
847                                lhc->mlh_reg_lh.cookie);
848                         LBUG();
849                 }
850                 LASSERT(fid_res_name_eq(child_fid,
851                                         &lock->l_resource->lr_name));
852                 LDLM_LOCK_PUT(lock);
853                 rc = 0;
854         } else {
855                 mdt_lock_handle_init(lhc);
856                 mdt_lock_reg_init(lhc, LCK_PR);
857
858                 rc = mdt_object_lock(info, child, lhc, child_bits,
859                                      MDT_CROSS_LOCK);
860                 if (rc != 0)
861                         GOTO(out_child, rc);
862         }
863
864         /* finally, we can get attr for child. */
865         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
866         rc = mdt_getattr_internal(info, child);
867         if (rc != 0) {
868                 mdt_object_unlock(info, child, lhc, 1);
869         } else {
870                 struct ldlm_lock *lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
871                 if (lock) {
872                         struct ldlm_res_id *res_id;
873                         struct mdt_body *repbody;
874                         struct lu_attr *ma;
875
876                         /* Debugging code. */
877                         res_id = &lock->l_resource->lr_name;
878                         LDLM_DEBUG(lock, "we will return this lock client\n");
879                         LASSERTF(fid_res_name_eq(mdt_object_fid(child),
880                                                  &lock->l_resource->lr_name),
881                                 "Lock res_id: %lu/%lu/%lu, Fid: "DFID".\n",
882                                 (unsigned long)res_id->name[0],
883                                 (unsigned long)res_id->name[1],
884                                 (unsigned long)res_id->name[2],
885                                 PFID(mdt_object_fid(child)));
886
887                         /* Pack Size-on-MDS inode attributes to the body if
888                          * update lock is given. */
889                         repbody = req_capsule_server_get(&info->mti_pill,
890                                                          &RMF_MDT_BODY);
891                         ma = &info->mti_attr.ma_attr;
892                         if (lock->l_policy_data.l_inodebits.bits &
893                             MDS_INODELOCK_UPDATE)
894                                 mdt_pack_size2body(repbody, ma, child);
895                         LDLM_LOCK_PUT(lock);
896                 }
897         }
898         EXIT;
899 out_child:
900         mdt_object_put(info->mti_env, child);
901 out_parent:
902         mdt_object_unlock(info, parent, lhp, 1);
903 out:
904         return rc;
905 }
906
907 /* normal handler: should release the child lock */
908 static int mdt_getattr_name(struct mdt_thread_info *info)
909 {
910         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
911         struct mdt_body        *reqbody;
912         struct mdt_body        *repbody;
913         int rc;
914         ENTRY;
915
916         reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
917         LASSERT(reqbody);
918         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
919         LASSERT(repbody);
920         repbody->eadatasize = 0;
921         repbody->aclsize = 0;
922
923         rc = mdt_init_ucred(info, reqbody);
924         if (rc)
925                 GOTO(out, rc);
926
927         rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
928         if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
929                 ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode);
930                 lhc->mlh_reg_lh.cookie = 0;
931         }
932         mdt_exit_ucred(info);
933         EXIT;
934 out:
935         mdt_shrink_reply(info, REPLY_REC_OFF + 1, 1, 0);
936         return rc;
937 }
938
939 static struct lu_device_operations mdt_lu_ops;
940
941 static int lu_device_is_mdt(struct lu_device *d)
942 {
943         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
944 }
945
946 static int mdt_connect(struct mdt_thread_info *info)
947 {
948         int rc;
949         struct ptlrpc_request *req;
950
951         req = mdt_info_req(info);
952         rc = target_handle_connect(req);
953         if (rc == 0) {
954                 LASSERT(req->rq_export != NULL);
955                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
956                 rc = mdt_init_idmap(info);
957         } else
958                 rc = err_serious(rc);
959         return rc;
960 }
961
962 static int mdt_disconnect(struct mdt_thread_info *info)
963 {
964         int rc;
965
966         rc = target_handle_disconnect(mdt_info_req(info));
967         if (rc)
968                 rc = err_serious(rc);
969         return rc;
970 }
971
972 static int mdt_sendpage(struct mdt_thread_info *info,
973                         struct lu_rdpg *rdpg)
974 {
975         struct ptlrpc_request   *req = mdt_info_req(info);
976         struct ptlrpc_bulk_desc *desc;
977         struct l_wait_info      *lwi = &info->mti_u.rdpg.mti_wait_info;
978         int                      tmpcount;
979         int                      tmpsize;
980         int                      i;
981         int                      rc;
982         ENTRY;
983
984         desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, BULK_PUT_SOURCE,
985                                     MDS_BULK_PORTAL);
986         if (desc == NULL)
987                 GOTO(out, rc = -ENOMEM);
988
989         for (i = 0, tmpcount = rdpg->rp_count;
990                 i < rdpg->rp_npages; i++, tmpcount -= tmpsize) {
991                 tmpsize = min_t(int, tmpcount, CFS_PAGE_SIZE);
992                 ptlrpc_prep_bulk_page(desc, rdpg->rp_pages[i], 0, tmpsize);
993         }
994
995         LASSERT(desc->bd_nob == rdpg->rp_count);
996         rc = ptlrpc_start_bulk_transfer(desc);
997         if (rc)
998                 GOTO(free_desc, rc);
999
1000         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
1001                 GOTO(abort_bulk, rc);
1002
1003         *lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
1004         rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), lwi);
1005         LASSERT (rc == 0 || rc == -ETIMEDOUT);
1006
1007         if (rc == 0) {
1008                 if (desc->bd_success &&
1009                     desc->bd_nob_transferred == rdpg->rp_count)
1010                         GOTO(free_desc, rc);
1011
1012                 rc = -ETIMEDOUT; /* XXX should this be a different errno? */
1013         }
1014
1015         DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
1016                   (rc == -ETIMEDOUT) ? "timeout" : "network error",
1017                   desc->bd_nob_transferred, rdpg->rp_count,
1018                   req->rq_export->exp_client_uuid.uuid,
1019                   req->rq_export->exp_connection->c_remote_uuid.uuid);
1020
1021         class_fail_export(req->rq_export);
1022
1023         EXIT;
1024 abort_bulk:
1025         ptlrpc_abort_bulk(desc);
1026 free_desc:
1027         ptlrpc_free_bulk(desc);
1028 out:
1029         return rc;
1030 }
1031
1032 #ifdef HAVE_SPLIT_SUPPORT
1033 /*
1034  * Retrieve dir entry from the page and insert it to the slave object, actually,
1035  * this should be in osd layer, but since it will not in the final product, so
1036  * just do it here and do not define more moo api anymore for this.
1037  */
1038 static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page,
1039                               int size)
1040 {
1041         struct mdt_object *object = info->mti_object;
1042         int rc = 0, offset = 0, is_dir;
1043         struct lu_dirpage *dp;
1044         struct lu_dirent *ent;
1045         ENTRY;
1046
1047         /* Make sure we have at least one entry. */
1048         if (size == 0)
1049                 RETURN(-EINVAL);
1050
1051         /*
1052          * Disable trans for this name insert, since it will include many trans
1053          * for this.
1054          */
1055         info->mti_no_need_trans = 1;
1056
1057         kmap(page);
1058         dp = page_address(page);
1059         offset = (int)((__u32)lu_dirent_start(dp) - (__u32)dp);
1060
1061         for (ent = lu_dirent_start(dp); ent != NULL;
1062              ent = lu_dirent_next(ent)) {
1063                 struct lu_fid *lf = &info->mti_tmp_fid2;
1064                 char *name;
1065
1066                 if (le16_to_cpu(ent->lde_namelen) == 0)
1067                         continue;
1068
1069                 fid_le_to_cpu(lf, &ent->lde_fid);
1070                 is_dir = le32_to_cpu(ent->lde_hash) & MAX_HASH_HIGHEST_BIT;
1071                 OBD_ALLOC(name, le16_to_cpu(ent->lde_namelen) + 1);
1072                 if (name == NULL)
1073                         GOTO(out, rc = -ENOMEM);
1074
1075                 memcpy(name, ent->lde_name, le16_to_cpu(ent->lde_namelen));
1076                 rc = mdo_name_insert(info->mti_env,
1077                                      md_object_next(&object->mot_obj),
1078                                      name, lf, is_dir);
1079                 OBD_FREE(name, le16_to_cpu(ent->lde_namelen) + 1);
1080                 if (rc) {
1081                         CERROR("Can't insert %*.*s, rc %d\n",
1082                                le16_to_cpu(ent->lde_namelen),
1083                                le16_to_cpu(ent->lde_namelen),
1084                                ent->lde_name, rc);
1085                         GOTO(out, rc);
1086                 }
1087
1088                 offset += lu_dirent_size(ent);
1089                 if (offset >= size)
1090                         break;
1091         }
1092         EXIT;
1093 out:
1094         kunmap(page);
1095         return rc;
1096 }
1097
1098 static int mdt_bulk_timeout(void *data)
1099 {
1100         ENTRY;
1101
1102         CERROR("mdt bulk transfer timeout \n");
1103
1104         RETURN(1);
1105 }
1106
1107 static int mdt_writepage(struct mdt_thread_info *info)
1108 {
1109         struct ptlrpc_request   *req = mdt_info_req(info);
1110         struct mdt_body         *reqbody;
1111         struct l_wait_info      *lwi;
1112         struct ptlrpc_bulk_desc *desc;
1113         struct page             *page;
1114         int                rc;
1115         ENTRY;
1116
1117
1118         reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
1119         if (reqbody == NULL)
1120                 RETURN(err_serious(-EFAULT));
1121
1122         desc = ptlrpc_prep_bulk_exp (req, 1, BULK_GET_SINK, MDS_BULK_PORTAL);
1123         if (!desc)
1124                 RETURN(err_serious(-ENOMEM));
1125
1126         /* allocate the page for the desc */
1127         page = alloc_pages(GFP_KERNEL, 0);
1128         if (!page)
1129                 GOTO(desc_cleanup, rc = -ENOMEM);
1130
1131         CDEBUG(D_INFO, "Received page offset %d size %d \n",
1132                (int)reqbody->size, (int)reqbody->nlink);
1133
1134         ptlrpc_prep_bulk_page(desc, page, (int)reqbody->size,
1135                               (int)reqbody->nlink);
1136
1137         /*
1138          * Check if client was evicted while we were doing i/o before touching
1139          * network.
1140          */
1141         OBD_ALLOC_PTR(lwi);
1142         if (!lwi)
1143                 GOTO(cleanup_page, rc = -ENOMEM);
1144
1145         if (desc->bd_export->exp_failed)
1146                 rc = -ENOTCONN;
1147         else
1148                 rc = ptlrpc_start_bulk_transfer (desc);
1149         if (rc == 0) {
1150                 *lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * HZ / 4, HZ,
1151                                             mdt_bulk_timeout, desc);
1152                 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc) ||
1153                                   desc->bd_export->exp_failed, lwi);
1154                 LASSERT(rc == 0 || rc == -ETIMEDOUT);
1155                 if (rc == -ETIMEDOUT) {
1156                         DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
1157                         ptlrpc_abort_bulk(desc);
1158                 } else if (desc->bd_export->exp_failed) {
1159                         DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
1160                         rc = -ENOTCONN;
1161                         ptlrpc_abort_bulk(desc);
1162                 } else if (!desc->bd_success ||
1163                            desc->bd_nob_transferred != desc->bd_nob) {
1164                         DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
1165                                   desc->bd_success ?
1166                                   "truncated" : "network error on",
1167                                   desc->bd_nob_transferred, desc->bd_nob);
1168                         /* XXX should this be a different errno? */
1169                         rc = -ETIMEDOUT;
1170                 }
1171         } else {
1172                 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d\n", rc);
1173         }
1174         if (rc)
1175                 GOTO(cleanup_lwi, rc);
1176         rc = mdt_write_dir_page(info, page, reqbody->nlink);
1177
1178 cleanup_lwi:
1179         OBD_FREE_PTR(lwi);
1180 cleanup_page:
1181         __free_pages(page, 0);
1182 desc_cleanup:
1183         ptlrpc_free_bulk(desc);
1184         RETURN(rc);
1185 }
1186 #endif
1187
1188 static int mdt_readpage(struct mdt_thread_info *info)
1189 {
1190         struct mdt_object *object = info->mti_object;
1191         struct lu_rdpg    *rdpg = &info->mti_u.rdpg.mti_rdpg;
1192         struct mdt_body   *reqbody;
1193         struct mdt_body   *repbody;
1194         int                rc;
1195         int                i;
1196         ENTRY;
1197
1198         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
1199                 RETURN(err_serious(-ENOMEM));
1200
1201         reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
1202         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
1203         if (reqbody == NULL || repbody == NULL)
1204                 RETURN(err_serious(-EFAULT));
1205
1206         rc = mdt_check_ucred(info);
1207         if (rc)
1208                 RETURN(err_serious(rc));
1209
1210         /*
1211          * prepare @rdpg before calling lower layers and transfer itself. Here
1212          * reqbody->size contains offset of where to start to read and
1213          * reqbody->nlink contains number bytes to read.
1214          */
1215         rdpg->rp_hash = reqbody->size;
1216         if ((__u64)rdpg->rp_hash != reqbody->size) {
1217                 CERROR("Invalid hash: %#llx != %#llx\n",
1218                        (__u64)rdpg->rp_hash, reqbody->size);
1219                 RETURN(-EFAULT);
1220         }
1221         rdpg->rp_count  = reqbody->nlink;
1222         rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1)>>CFS_PAGE_SHIFT;
1223         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
1224         if (rdpg->rp_pages == NULL)
1225                 RETURN(-ENOMEM);
1226
1227         for (i = 0; i < rdpg->rp_npages; ++i) {
1228                 rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
1229                 if (rdpg->rp_pages[i] == NULL)
1230                         GOTO(free_rdpg, rc = -ENOMEM);
1231         }
1232
1233         /* call lower layers to fill allocated pages with directory data */
1234         rc = mo_readpage(info->mti_env, mdt_object_child(object), rdpg);
1235         if (rc)
1236                 GOTO(free_rdpg, rc);
1237
1238         /* send pages to client */
1239         rc = mdt_sendpage(info, rdpg);
1240
1241         EXIT;
1242 free_rdpg:
1243
1244         for (i = 0; i < rdpg->rp_npages; i++)
1245                 if (rdpg->rp_pages[i] != NULL)
1246                         __free_pages(rdpg->rp_pages[i], 0);
1247         OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
1248
1249         MDT_FAIL_RETURN(OBD_FAIL_MDS_SENDPAGE, 0);
1250
1251         return rc;
1252 }
1253
1254 static int mdt_reint_internal(struct mdt_thread_info *info,
1255                               struct mdt_lock_handle *lhc,
1256                               __u32 op)
1257 {
1258         struct req_capsule      *pill = &info->mti_pill;
1259         struct mdt_device       *mdt = info->mti_mdt;
1260         struct ptlrpc_request   *req = mdt_info_req(info);
1261         struct mdt_body         *repbody;
1262         int                      rc;
1263         ENTRY;
1264
1265         /* pack reply */
1266         if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
1267                 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
1268                                      mdt->mdt_max_mdsize);
1269         if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
1270                 req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
1271                                      mdt->mdt_max_cookiesize);
1272         rc = req_capsule_pack(pill);
1273         if (rc != 0) {
1274                 CERROR("Can't pack response, rc %d\n", rc);
1275                 RETURN(err_serious(rc));
1276         }
1277
1278         if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_SERVER)) {
1279                 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
1280                 LASSERT(repbody);
1281                 repbody->eadatasize = 0;
1282                 repbody->aclsize = 0;
1283         }
1284
1285         /*
1286          * Check this after packing response, because after we fail here without
1287          * allocating response, caller anyway may want to get ldlm_reply from it
1288          * and will get oops.
1289          */
1290         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK))
1291                 RETURN(err_serious(-EFAULT));
1292
1293         rc = mdt_reint_unpack(info, op);
1294         if (rc != 0) {
1295                 CERROR("Can't unpack reint, rc %d\n", rc);
1296                 RETURN(err_serious(rc));
1297         }
1298
1299         rc = mdt_init_ucred_reint(info);
1300         if (rc)
1301                 RETURN(rc);
1302
1303         rc = mdt_fix_attr_ucred(info, op);
1304         if (rc != 0)
1305                 GOTO(out, rc);
1306
1307         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
1308                 struct mdt_client_data *mcd;
1309
1310                 mcd = req->rq_export->exp_mdt_data.med_mcd;
1311                 if (req_xid_is_last(req)) {
1312                         mdt_reconstruct(info, lhc);
1313                         rc = lustre_msg_get_status(req->rq_repmsg);
1314                         GOTO(out, rc);
1315                 }
1316                 DEBUG_REQ(D_HA, req, "no reply for RESENT (xid "LPD64")",
1317                           mcd->mcd_last_xid);
1318         }
1319         rc = mdt_reint_rec(info, lhc);
1320 out:
1321         mdt_exit_ucred(info);
1322         RETURN(rc);
1323 }
1324
1325 static long mdt_reint_opcode(struct mdt_thread_info *info,
1326                              const struct req_format **fmt)
1327 {
1328         __u32 *ptr;
1329         long opc;
1330
1331         opc = err_serious(-EFAULT);
1332         ptr = req_capsule_client_get(&info->mti_pill, &RMF_REINT_OPC);
1333         if (ptr != NULL) {
1334                 opc = *ptr;
1335                 DEBUG_REQ(D_INODE, mdt_info_req(info), "reint opt = %ld", opc);
1336                 if (opc < REINT_MAX && fmt[opc] != NULL)
1337                         req_capsule_extend(&info->mti_pill, fmt[opc]);
1338                 else {
1339                         CERROR("Unsupported opc: %ld\n", opc);
1340                         opc = err_serious(opc);
1341                 }
1342         }
1343         return opc;
1344 }
1345
1346 static int mdt_reint(struct mdt_thread_info *info)
1347 {
1348         long opc;
1349         int  rc;
1350
1351         static const struct req_format *reint_fmts[REINT_MAX] = {
1352                 [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR,
1353                 [REINT_CREATE]  = &RQF_MDS_REINT_CREATE,
1354                 [REINT_LINK]    = &RQF_MDS_REINT_LINK,
1355                 [REINT_UNLINK]  = &RQF_MDS_REINT_UNLINK,
1356                 [REINT_RENAME]  = &RQF_MDS_REINT_RENAME,
1357                 [REINT_OPEN]    = &RQF_MDS_REINT_OPEN
1358         };
1359
1360         ENTRY;
1361
1362         opc = mdt_reint_opcode(info, reint_fmts);
1363         if (opc >= 0) {
1364                 /*
1365                  * No lock possible here from client to pass it to reint code
1366                  * path.
1367                  */
1368                 rc = mdt_reint_internal(info, NULL, opc);
1369         } else {
1370                 rc = opc;
1371         }
1372
1373         info->mti_fail_id = OBD_FAIL_MDS_REINT_NET_REP;
1374         RETURN(rc);
1375 }
1376
1377 /* TODO these two methods not available now. */
1378
1379 /* this should sync the whole device */
1380 static int mdt_device_sync(struct mdt_thread_info *info)
1381 {
1382         return 0;
1383 }
1384
1385 /* this should sync this object */
1386 static int mdt_object_sync(struct mdt_thread_info *info)
1387 {
1388         return 0;
1389 }
1390
1391 static int mdt_sync(struct mdt_thread_info *info)
1392 {
1393         struct req_capsule *pill = &info->mti_pill;
1394         struct mdt_body *body;
1395         int rc;
1396         ENTRY;
1397
1398         /* The fid may be zero, so we req_capsule_set manually */
1399         req_capsule_set(pill, &RQF_MDS_SYNC);
1400
1401         body = req_capsule_client_get(pill, &RMF_MDT_BODY);
1402         if (body == NULL)
1403                 RETURN(err_serious(-EINVAL));
1404
1405         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
1406                 RETURN(err_serious(-ENOMEM));
1407
1408         if (fid_seq(&body->fid1) == 0) {
1409                 /* sync the whole device */
1410                 rc = req_capsule_pack(pill);
1411                 if (rc == 0)
1412                         rc = mdt_device_sync(info);
1413                 else
1414                         rc = err_serious(rc);
1415         } else {
1416                 /* sync an object */
1417                 rc = mdt_unpack_req_pack_rep(info, HABEO_CORPUS|HABEO_REFERO);
1418                 if (rc == 0) {
1419                         rc = mdt_object_sync(info);
1420                         if (rc == 0) {
1421                                 struct md_object *next;
1422                                 const struct lu_fid *fid;
1423                                 struct lu_attr *la = &info->mti_attr.ma_attr;
1424
1425                                 next = mdt_object_child(info->mti_object);
1426                                 info->mti_attr.ma_need = MA_INODE;
1427                                 info->mti_attr.ma_valid = 0;
1428                                 rc = mo_attr_get(info->mti_env, next,
1429                                                  &info->mti_attr);
1430                                 if (rc == 0) {
1431                                         body = req_capsule_server_get(pill,
1432                                                                 &RMF_MDT_BODY);
1433                                         fid = mdt_object_fid(info->mti_object);
1434                                         mdt_pack_attr2body(info, body, la, fid);
1435                                 }
1436                         }
1437                 } else
1438                         rc = err_serious(rc);
1439         }
1440         RETURN(rc);
1441 }
1442
1443 static int mdt_quotacheck_handle(struct mdt_thread_info *info)
1444 {
1445         return err_serious(-EOPNOTSUPP);
1446 }
1447
1448 static int mdt_quotactl_handle(struct mdt_thread_info *info)
1449 {
1450         return err_serious(-EOPNOTSUPP);
1451 }
1452
1453 /*
1454  * OBD PING and other handlers.
1455  */
1456 static int mdt_obd_ping(struct mdt_thread_info *info)
1457 {
1458         int rc;
1459         ENTRY;
1460         rc = target_handle_ping(mdt_info_req(info));
1461         if (rc < 0)
1462                 rc = err_serious(rc);
1463         RETURN(rc);
1464 }
1465
1466 static int mdt_obd_log_cancel(struct mdt_thread_info *info)
1467 {
1468         return err_serious(-EOPNOTSUPP);
1469 }
1470
1471 static int mdt_obd_qc_callback(struct mdt_thread_info *info)
1472 {
1473         return err_serious(-EOPNOTSUPP);
1474 }
1475
1476
1477 /*
1478  * DLM handlers.
1479  */
1480 static struct ldlm_callback_suite cbs = {
1481         .lcs_completion = ldlm_server_completion_ast,
1482         .lcs_blocking   = ldlm_server_blocking_ast,
1483         .lcs_glimpse    = NULL
1484 };
1485
1486 static int mdt_enqueue(struct mdt_thread_info *info)
1487 {
1488         struct ptlrpc_request *req;
1489         __u64 req_bits;
1490         int rc;
1491
1492         /*
1493          * info->mti_dlm_req already contains swapped and (if necessary)
1494          * converted dlm request.
1495          */
1496         LASSERT(info->mti_dlm_req != NULL);
1497
1498         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE)) {
1499                 info->mti_fail_id = OBD_FAIL_LDLM_ENQUEUE;
1500                 return 0;
1501         }
1502
1503         req = mdt_info_req(info);
1504
1505         /*
1506          * Lock without inodebits makes no sense and will oops later in
1507          * ldlm. Let's check it now to see if we have wrong lock from client or
1508          * bits get corrupted somewhere in mdt_intent_policy().
1509          */
1510         req_bits = info->mti_dlm_req->lock_desc.l_policy_data.l_inodebits.bits;
1511         LASSERT(req_bits != 0);
1512
1513         rc = ldlm_handle_enqueue0(info->mti_mdt->mdt_namespace,
1514                                   req, info->mti_dlm_req, &cbs);
1515         info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
1516         return rc ? err_serious(rc) : req->rq_status;
1517 }
1518
1519 static int mdt_convert(struct mdt_thread_info *info)
1520 {
1521         int rc;
1522         struct ptlrpc_request *req;
1523
1524         LASSERT(info->mti_dlm_req);
1525         req = mdt_info_req(info);
1526         rc = ldlm_handle_convert0(req, info->mti_dlm_req);
1527         return rc ? err_serious(rc) : req->rq_status;
1528 }
1529
1530 static int mdt_bl_callback(struct mdt_thread_info *info)
1531 {
1532         CERROR("bl callbacks should not happen on MDS\n");
1533         LBUG();
1534         return err_serious(-EOPNOTSUPP);
1535 }
1536
1537 static int mdt_cp_callback(struct mdt_thread_info *info)
1538 {
1539         CERROR("cp callbacks should not happen on MDS\n");
1540         LBUG();
1541         return err_serious(-EOPNOTSUPP);
1542 }
1543
1544 /*
1545  * sec context handlers
1546  */
1547 static int mdt_sec_ctx_handle(struct mdt_thread_info *info)
1548 {
1549         return mdt_handle_idmap(info);
1550 }
1551
1552 static struct mdt_object *mdt_obj(struct lu_object *o)
1553 {
1554         LASSERT(lu_device_is_mdt(o->lo_dev));
1555         return container_of0(o, struct mdt_object, mot_obj.mo_lu);
1556 }
1557
1558 struct mdt_object *mdt_object_find(const struct lu_env *env,
1559                                    struct mdt_device *d,
1560                                    const struct lu_fid *f)
1561 {
1562         struct lu_object *o;
1563         struct mdt_object *m;
1564         ENTRY;
1565
1566         o = lu_object_find(env, d->mdt_md_dev.md_lu_dev.ld_site, f);
1567         if (IS_ERR(o))
1568                 m = (struct mdt_object *)o;
1569         else
1570                 m = mdt_obj(o);
1571         RETURN(m);
1572 }
1573
1574 int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
1575                     struct mdt_lock_handle *lh, __u64 ibits, int locality)
1576 {
1577         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1578         ldlm_policy_data_t *policy = &info->mti_policy;
1579         struct ldlm_res_id *res_id = &info->mti_res_id;
1580         int exist = mdt_object_exists(o);
1581         int rc;
1582         ENTRY;
1583
1584         LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh));
1585         LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh));
1586         LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
1587         LASSERT(lh->mlh_type != MDT_NUL_LOCK);
1588
1589         if (exist < 0) {
1590                 if (locality == MDT_CROSS_LOCK) {
1591                         /* cross-ref object fix */
1592                         ibits &= ~MDS_INODELOCK_UPDATE;
1593                         ibits |= MDS_INODELOCK_LOOKUP;
1594                 } else {
1595                         LASSERT(!(ibits & MDS_INODELOCK_UPDATE));
1596                         LASSERT(ibits & MDS_INODELOCK_LOOKUP);
1597                 }
1598                 /* No PDO lock on remote object */
1599                 LASSERT(lh->mlh_type != MDT_PDO_LOCK);
1600         } else if (exist == 0 && lh->mlh_type == MDT_PDO_LOCK) {
1601                 /*
1602                  * No PDO lock on non-existing object.
1603                  * This may happen on removed $PWD on client.
1604                  */
1605                 RETURN(-ESTALE);
1606         }
1607
1608         memset(policy, 0, sizeof(*policy));
1609         fid_build_reg_res_name(mdt_object_fid(o), res_id);
1610
1611         /*
1612          * Take PDO lock on whole directory and build correct @res_id for lock
1613          * on part of directory.
1614          */
1615         if (lh->mlh_type == MDT_PDO_LOCK && lh->mlh_pdo_hash != 0) {
1616                 mdt_lock_pdo_mode(info, o, lh);
1617                 if (lh->mlh_pdo_mode != LCK_NL) {
1618                         /*
1619                          * Do not use LDLM_FL_LOCAL_ONLY for parallel lock, it
1620                          * is never going to be sent to client and we do not
1621                          * want it slowed down due to possible cancels.
1622                          */
1623                         policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
1624                         rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode,
1625                                           policy, res_id, LDLM_FL_ATOMIC_CB);
1626                         if (rc)
1627                                 RETURN(rc);
1628                 }
1629
1630                 /*
1631                  * Finish res_id initializing by name hash marking patr of
1632                  * directory which is taking modification.
1633                  */
1634                 res_id->name[LUSTRE_RES_ID_HSH_OFF] = lh->mlh_pdo_hash;
1635         }
1636
1637         policy->l_inodebits.bits = ibits;
1638
1639         /*
1640          * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
1641          * going to be sent to client. If it is - mdt_intent_policy() path will
1642          * fix it up and turns FL_LOCAL flag off.
1643          */
1644         rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
1645                           res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB);
1646
1647         if (rc && lh->mlh_type == MDT_PDO_LOCK) {
1648                 mdt_fid_unlock(&lh->mlh_pdo_lh, lh->mlh_pdo_mode);
1649                 lh->mlh_pdo_lh.cookie = 0ull;
1650         }
1651
1652         RETURN(rc);
1653 }
1654
1655 /*
1656  * Just call ldlm_lock_decref() if decref, else we only call ptlrpc_save_lock()
1657  * to save this lock in req.  when transaction committed, req will be released,
1658  * and lock will, too.
1659  */
1660 void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
1661                        struct mdt_lock_handle *lh, int decref)
1662 {
1663         struct ptlrpc_request *req = mdt_info_req(info);
1664         ENTRY;
1665
1666         if (lustre_handle_is_used(&lh->mlh_pdo_lh)) {
1667                 /* Do not save PDO locks to request, just decref. */
1668                 mdt_fid_unlock(&lh->mlh_pdo_lh,
1669                                lh->mlh_pdo_mode);
1670                 lh->mlh_pdo_lh.cookie = 0;
1671         }
1672
1673         if (lustre_handle_is_used(&lh->mlh_reg_lh)) {
1674                 if (decref) {
1675                         mdt_fid_unlock(&lh->mlh_reg_lh,
1676                                        lh->mlh_reg_mode);
1677                 } else {
1678                         ptlrpc_save_lock(req, &lh->mlh_reg_lh,
1679                                          lh->mlh_reg_mode);
1680                 }
1681                 lh->mlh_reg_lh.cookie = 0;
1682         }
1683
1684         EXIT;
1685 }
1686
1687 struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info,
1688                                         const struct lu_fid *f,
1689                                         struct mdt_lock_handle *lh,
1690                                         __u64 ibits)
1691 {
1692         struct mdt_object *o;
1693
1694         o = mdt_object_find(info->mti_env, info->mti_mdt, f);
1695         if (!IS_ERR(o)) {
1696                 int rc;
1697
1698                 rc = mdt_object_lock(info, o, lh, ibits,
1699                                      MDT_LOCAL_LOCK);
1700                 if (rc != 0) {
1701                         mdt_object_put(info->mti_env, o);
1702                         o = ERR_PTR(rc);
1703                 }
1704         }
1705         return o;
1706 }
1707
1708 void mdt_object_unlock_put(struct mdt_thread_info * info,
1709                            struct mdt_object * o,
1710                            struct mdt_lock_handle *lh,
1711                            int decref)
1712 {
1713         mdt_object_unlock(info, o, lh, decref);
1714         mdt_object_put(info->mti_env, o);
1715 }
1716
1717 static struct mdt_handler *mdt_handler_find(__u32 opc,
1718                                             struct mdt_opc_slice *supported)
1719 {
1720         struct mdt_opc_slice *s;
1721         struct mdt_handler   *h;
1722
1723         h = NULL;
1724         for (s = supported; s->mos_hs != NULL; s++) {
1725                 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
1726                         h = s->mos_hs + (opc - s->mos_opc_start);
1727                         if (h->mh_opc != 0)
1728                                 LASSERT(h->mh_opc == opc);
1729                         else
1730                                 h = NULL; /* unsupported opc */
1731                         break;
1732                 }
1733         }
1734         return h;
1735 }
1736
1737 static int mdt_lock_resname_compat(struct mdt_device *m,
1738                                    struct ldlm_request *req)
1739 {
1740         /* XXX something... later. */
1741         return 0;
1742 }
1743
1744 static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
1745 {
1746         /* XXX something... later. */
1747         return 0;
1748 }
1749
1750 /*
1751  * Generic code handling requests that have struct mdt_body passed in:
1752  *
1753  *  - extract mdt_body from request and save it in @info, if present;
1754  *
1755  *  - create lu_object, corresponding to the fid in mdt_body, and save it in
1756  *  @info;
1757  *
1758  *  - if HABEO_CORPUS flag is set for this request type check whether object
1759  *  actually exists on storage (lu_object_exists()).
1760  *
1761  */
1762 static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags)
1763 {
1764         const struct mdt_body    *body;
1765         struct mdt_object        *obj;
1766         const struct lu_env      *env;
1767         struct req_capsule       *pill;
1768         int                       rc;
1769
1770         env = info->mti_env;
1771         pill = &info->mti_pill;
1772
1773         body = info->mti_body = req_capsule_client_get(pill, &RMF_MDT_BODY);
1774         if (body == NULL)
1775                 return -EFAULT;
1776
1777         if (!fid_is_sane(&body->fid1)) {
1778                 CERROR("Invalid fid: "DFID"\n", PFID(&body->fid1));
1779                 return -EINVAL;
1780         }
1781
1782         /*
1783          * Do not get size or any capa fields before we check that request
1784          * contains capa actually. There are some requests which do not, for
1785          * instance MDS_IS_SUBDIR.
1786          */
1787         if (req_capsule_has_field(pill, &RMF_CAPA1, RCL_CLIENT) &&
1788             req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
1789                 mdt_set_capainfo(info, 0, &body->fid1,
1790                                  req_capsule_client_get(pill, &RMF_CAPA1));
1791
1792         obj = mdt_object_find(env, info->mti_mdt, &body->fid1);
1793         if (!IS_ERR(obj)) {
1794                 if ((flags & HABEO_CORPUS) &&
1795                     !mdt_object_exists(obj)) {
1796                         mdt_object_put(env, obj);
1797                         /* for capability renew ENOENT will be handled in
1798                          * mdt_renew_capa */
1799                         if (body->valid & OBD_MD_FLOSSCAPA)
1800                                 rc = 0;
1801                         else
1802                                 rc = -ENOENT;
1803                 } else {
1804                         info->mti_object = obj;
1805                         rc = 0;
1806                 }
1807         } else
1808                 rc = PTR_ERR(obj);
1809
1810         return rc;
1811 }
1812
1813 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags)
1814 {
1815         struct req_capsule *pill;
1816         int rc;
1817
1818         ENTRY;
1819         pill = &info->mti_pill;
1820
1821         if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT))
1822                 rc = mdt_body_unpack(info, flags);
1823         else
1824                 rc = 0;
1825
1826         if (rc == 0 && (flags & HABEO_REFERO)) {
1827                 struct mdt_device       *mdt = info->mti_mdt;
1828                 /*pack reply*/
1829                 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
1830                         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
1831                                              mdt->mdt_max_mdsize);
1832                 if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
1833                         req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
1834                                              mdt->mdt_max_cookiesize);
1835
1836                 rc = req_capsule_pack(pill);
1837         }
1838         RETURN(rc);
1839 }
1840
1841 static int mdt_init_capa_ctxt(const struct lu_env *env, struct mdt_device *m)
1842 {
1843         struct md_device *next = m->mdt_child;
1844
1845         return next->md_ops->mdo_init_capa_ctxt(env, next,
1846                                                 m->mdt_opts.mo_mds_capa,
1847                                                 m->mdt_capa_timeout,
1848                                                 m->mdt_capa_alg,
1849                                                 m->mdt_capa_keys);
1850 }
1851
1852 /*
1853  * Invoke handler for this request opc. Also do necessary preprocessing
1854  * (according to handler ->mh_flags), and post-processing (setting of
1855  * ->last_{xid,committed}).
1856  */
1857 static int mdt_req_handle(struct mdt_thread_info *info,
1858                           struct mdt_handler *h, struct ptlrpc_request *req)
1859 {
1860         int   rc, serious = 0;
1861         __u32 flags;
1862
1863         ENTRY;
1864
1865         LASSERT(h->mh_act != NULL);
1866         LASSERT(h->mh_opc == lustre_msg_get_opc(req->rq_reqmsg));
1867         LASSERT(current->journal_info == NULL);
1868
1869         DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
1870
1871         /*
1872          * Do not use *_FAIL_CHECK_ONCE() macros, because they will stop
1873          * correct handling of failed req later in ldlm due to doing
1874          * obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED without actually
1875          * correct actions like it is done in target_send_reply_msg().
1876          */
1877         if (h->mh_fail_id != 0) {
1878                 /*
1879                  * Set to info->mti_fail_id to handler fail_id, it will be used
1880                  * later, and better than use default fail_id.
1881                  */
1882                 if (OBD_FAIL_CHECK(h->mh_fail_id)) {
1883                         info->mti_fail_id = h->mh_fail_id;
1884                         RETURN(0);
1885                 }
1886         }
1887
1888         rc = 0;
1889         flags = h->mh_flags;
1890         LASSERT(ergo(flags & (HABEO_CORPUS|HABEO_REFERO), h->mh_fmt != NULL));
1891
1892         if (h->mh_fmt != NULL) {
1893                 req_capsule_set(&info->mti_pill, h->mh_fmt);
1894                 rc = mdt_unpack_req_pack_rep(info, flags);
1895         }
1896
1897         if (rc == 0 && flags & MUTABOR &&
1898             req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
1899                 /* should it be rq_status? */
1900                 rc = -EROFS;
1901
1902         if (rc == 0 && flags & HABEO_CLAVIS) {
1903                 struct ldlm_request *dlm_req;
1904
1905                 LASSERT(h->mh_fmt != NULL);
1906
1907                 dlm_req = req_capsule_client_get(&info->mti_pill, &RMF_DLM_REQ);
1908                 if (dlm_req != NULL) {
1909                         if (info->mti_mdt->mdt_opts.mo_compat_resname)
1910                                 rc = mdt_lock_resname_compat(info->mti_mdt,
1911                                                              dlm_req);
1912                         info->mti_dlm_req = dlm_req;
1913                 } else {
1914                         CERROR("Can't unpack dlm request\n");
1915                         rc = -EFAULT;
1916                 }
1917         }
1918
1919         /* capability setting changed via /proc, needs reinitialize ctxt */
1920         if (info->mti_mdt && info->mti_mdt->mdt_capa_conf) {
1921                 mdt_init_capa_ctxt(info->mti_env, info->mti_mdt);
1922                 info->mti_mdt->mdt_capa_conf = 0;
1923         }
1924
1925         if (rc == 0) {
1926                 /*
1927                  * Process request, there can be two types of rc:
1928                  * 1) errors with msg unpack/pack, other failures outside the
1929                  * operation itself. This is counted as serious errors;
1930                  * 2) errors during fs operation, should be placed in rq_status
1931                  * only
1932                  */
1933                 rc = h->mh_act(info);
1934                 serious = is_serious(rc);
1935                 rc = clear_serious(rc);
1936         } else
1937                 serious = 1;
1938
1939         req->rq_status = rc;
1940
1941         /*
1942          * ELDLM_* codes which > 0 should be in rq_status only as well as
1943          * all non-serious errors.
1944          */
1945         if (rc > 0 || !serious)
1946                 rc = 0;
1947
1948         LASSERT(current->journal_info == NULL);
1949
1950         if (rc == 0 && (flags & HABEO_CLAVIS)
1951             && info->mti_mdt->mdt_opts.mo_compat_resname) {
1952                 struct ldlm_reply *dlmrep;
1953
1954                 dlmrep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
1955                 if (dlmrep != NULL)
1956                         rc = mdt_lock_reply_compat(info->mti_mdt, dlmrep);
1957         }
1958
1959         /* If we're DISCONNECTing, the mdt_export_data is already freed */
1960         if (rc == 0 && h->mh_opc != MDS_DISCONNECT)
1961                 target_committed_to_req(req);
1962
1963         if ((lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) &&
1964             lustre_msg_get_transno(req->rq_reqmsg) == 0) {
1965                 DEBUG_REQ(D_ERROR, req, "transno is 0 during REPLAY\n");
1966                 LBUG();
1967         }
1968
1969         RETURN(rc);
1970 }
1971
1972 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
1973 {
1974         lh->mlh_type = MDT_NUL_LOCK;
1975         lh->mlh_reg_lh.cookie = 0ull;
1976         lh->mlh_reg_mode = LCK_MINMODE;
1977         lh->mlh_pdo_lh.cookie = 0ull;
1978         lh->mlh_pdo_mode = LCK_MINMODE;
1979 }
1980
1981 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
1982 {
1983         LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh));
1984         LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh));
1985 }
1986
1987 /*
1988  * Initialize fields of struct mdt_thread_info. Other fields are left in
1989  * uninitialized state, because it's too expensive to zero out whole
1990  * mdt_thread_info (> 1K) on each request arrival.
1991  */
1992 static void mdt_thread_info_init(struct ptlrpc_request *req,
1993                                  struct mdt_thread_info *info)
1994 {
1995         int i;
1996
1997         info->mti_rep_buf_nr = ARRAY_SIZE(info->mti_rep_buf_size);
1998         for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
1999                 info->mti_rep_buf_size[i] = -1;
2000         req_capsule_init(&info->mti_pill, req, RCL_SERVER,
2001                          info->mti_rep_buf_size);
2002
2003         /* lock handle */
2004         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
2005                 mdt_lock_handle_init(&info->mti_lh[i]);
2006
2007         /* mdt device: it can be NULL while CONNECT */
2008         if (req->rq_export)
2009                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
2010         else
2011                 info->mti_mdt = NULL;
2012         info->mti_env = req->rq_svc_thread->t_env;
2013
2014         info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
2015         info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg);
2016
2017         memset(&info->mti_attr, 0, sizeof(info->mti_attr));
2018         info->mti_body = NULL;
2019         info->mti_object = NULL;
2020         info->mti_dlm_req = NULL;
2021         info->mti_has_trans = 0;
2022         info->mti_no_need_trans = 0;
2023         info->mti_opdata = 0;
2024 }
2025
2026 static void mdt_thread_info_fini(struct mdt_thread_info *info)
2027 {
2028         int i;
2029
2030         req_capsule_fini(&info->mti_pill);
2031         if (info->mti_object != NULL) {
2032                 mdt_object_put(info->mti_env, info->mti_object);
2033                 info->mti_object = NULL;
2034         }
2035         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
2036                 mdt_lock_handle_fini(&info->mti_lh[i]);
2037         info->mti_env = NULL;
2038 }
2039
2040 /* mds/handler.c */
2041 extern int mds_filter_recovery_request(struct ptlrpc_request *req,
2042                                        struct obd_device *obd, int *process);
2043 /*
2044  * Handle recovery. Return:
2045  *        +1: continue request processing;
2046  *       -ve: abort immediately with the given error code;
2047  *         0: send reply with error code in req->rq_status;
2048  */
2049 static int mdt_recovery(struct mdt_thread_info *info)
2050 {
2051         struct ptlrpc_request *req = mdt_info_req(info);
2052         int recovering;
2053         struct obd_device *obd;
2054
2055         ENTRY;
2056
2057         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2058         case MDS_CONNECT:
2059         case SEC_CTX_INIT:
2060         case SEC_CTX_INIT_CONT:
2061         case SEC_CTX_FINI:
2062                 {
2063 #if 0
2064                         int rc;
2065
2066                         rc = mdt_handle_idmap(info);
2067                         if (rc)
2068                                 RETURN(rc);
2069                         else
2070 #endif
2071                                 RETURN(+1);
2072                 }
2073         }
2074
2075         if (req->rq_export == NULL) {
2076                 CERROR("operation %d on unconnected MDS from %s\n",
2077                        lustre_msg_get_opc(req->rq_reqmsg),
2078                        libcfs_id2str(req->rq_peer));
2079                 req->rq_status = -ENOTCONN;
2080                 target_send_reply(req, -ENOTCONN, info->mti_fail_id);
2081                 RETURN(0);
2082         }
2083
2084         /* sanity check: if the xid matches, the request must be marked as a
2085          * resent or replayed */
2086         if (req_xid_is_last(req)) {
2087                 if (!(lustre_msg_get_flags(req->rq_reqmsg) &
2088                       (MSG_RESENT | MSG_REPLAY))) {
2089                         DEBUG_REQ(D_WARNING, req, "rq_xid "LPU64" matches last_xid, "
2090                                   "expected REPLAY or RESENT flag\n", req->rq_xid);
2091                         LBUG();
2092                         req->rq_status = -ENOTCONN;
2093                         RETURN(-ENOTCONN);
2094                 }
2095         }
2096
2097         /* else: note the opposite is not always true; a RESENT req after a
2098          * failover will usually not match the last_xid, since it was likely
2099          * never committed. A REPLAYed request will almost never match the
2100          * last xid, however it could for a committed, but still retained,
2101          * open. */
2102
2103         obd = req->rq_export->exp_obd;
2104
2105         /* Check for aborted recovery... */
2106         spin_lock_bh(&obd->obd_processing_task_lock);
2107         recovering = obd->obd_recovering;
2108         spin_unlock_bh(&obd->obd_processing_task_lock);
2109         if (recovering) {
2110                 int rc;
2111                 int should_process;
2112                 DEBUG_REQ(D_INFO, req, "Got new replay");
2113                 rc = mds_filter_recovery_request(req, obd, &should_process);
2114                 if (rc != 0 || !should_process)
2115                         RETURN(rc);
2116                 else if (should_process < 0) {
2117                         req->rq_status = should_process;
2118                         rc = ptlrpc_error(req);
2119                         RETURN(rc);
2120                 }
2121         }
2122         RETURN(+1);
2123 }
2124
2125 static int mdt_reply(struct ptlrpc_request *req, int rc,
2126                      struct mdt_thread_info *info)
2127 {
2128         ENTRY;
2129
2130 #if 0
2131         if (req->rq_reply_state == NULL && rc == 0) {
2132                 req->rq_status = rc;
2133                 lustre_pack_reply(req, 1, NULL, NULL);
2134         }
2135 #endif
2136         target_send_reply(req, rc, info->mti_fail_id);
2137         RETURN(0);
2138 }
2139
2140 /* mds/handler.c */
2141 extern int mds_msg_check_version(struct lustre_msg *msg);
2142
2143 static int mdt_handle0(struct ptlrpc_request *req,
2144                        struct mdt_thread_info *info,
2145                        struct mdt_opc_slice *supported)
2146 {
2147         struct mdt_handler *h;
2148         struct lustre_msg  *msg;
2149         int                 rc;
2150
2151         ENTRY;
2152
2153         MDT_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
2154
2155         LASSERT(current->journal_info == NULL);
2156
2157         msg = req->rq_reqmsg;
2158         rc = mds_msg_check_version(msg);
2159         if (rc == 0) {
2160                 rc = mdt_recovery(info);
2161                 if (rc == +1) {
2162                         h = mdt_handler_find(lustre_msg_get_opc(msg),
2163                                              supported);
2164                         if (h != NULL) {
2165                                 rc = mdt_req_handle(info, h, req);
2166                                 rc = mdt_reply(req, rc, info);
2167                         } else {
2168                                 req->rq_status = -ENOTSUPP;
2169                                 rc = ptlrpc_error(req);
2170                                 RETURN(rc);
2171                         }
2172                 }
2173         } else
2174                 CERROR(LUSTRE_MDT_NAME" drops mal-formed request\n");
2175         RETURN(rc);
2176 }
2177
2178 /*
2179  * MDT handler function called by ptlrpc service thread when request comes.
2180  *
2181  * XXX common "target" functionality should be factored into separate module
2182  * shared by mdt, ost and stand-alone services like fld.
2183  */
2184 static int mdt_handle_common(struct ptlrpc_request *req,
2185                              struct mdt_opc_slice *supported)
2186 {
2187         struct lu_env          *env;
2188         struct mdt_thread_info *info;
2189         int                     rc;
2190         ENTRY;
2191
2192         env = req->rq_svc_thread->t_env;
2193         LASSERT(env != NULL);
2194         LASSERT(env->le_ses != NULL);
2195         LASSERT(env->le_ctx.lc_thread == req->rq_svc_thread);
2196         info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
2197         LASSERT(info != NULL);
2198
2199         mdt_thread_info_init(req, info);
2200
2201         rc = mdt_handle0(req, info, supported);
2202
2203         mdt_thread_info_fini(info);
2204         RETURN(rc);
2205 }
2206
2207 /*
2208  * This is called from recovery code as handler of _all_ RPC types, FLD and SEQ
2209  * as well.
2210  */
2211 int mdt_recovery_handle(struct ptlrpc_request *req)
2212 {
2213         int rc;
2214         ENTRY;
2215
2216         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2217         case FLD_QUERY:
2218                 rc = mdt_handle_common(req, mdt_fld_handlers);
2219                 break;
2220         case SEQ_QUERY:
2221                 rc = mdt_handle_common(req, mdt_seq_handlers);
2222                 break;
2223         default:
2224                 rc = mdt_handle_common(req, mdt_regular_handlers);
2225                 break;
2226         }
2227
2228         RETURN(rc);
2229 }
2230
2231 static int mdt_regular_handle(struct ptlrpc_request *req)
2232 {
2233         return mdt_handle_common(req, mdt_regular_handlers);
2234 }
2235
2236 static int mdt_readpage_handle(struct ptlrpc_request *req)
2237 {
2238         return mdt_handle_common(req, mdt_readpage_handlers);
2239 }
2240
2241 static int mdt_mdsc_handle(struct ptlrpc_request *req)
2242 {
2243         return mdt_handle_common(req, mdt_seq_handlers);
2244 }
2245
2246 static int mdt_mdss_handle(struct ptlrpc_request *req)
2247 {
2248         return mdt_handle_common(req, mdt_seq_handlers);
2249 }
2250
2251 static int mdt_dtss_handle(struct ptlrpc_request *req)
2252 {
2253         return mdt_handle_common(req, mdt_seq_handlers);
2254 }
2255
2256 static int mdt_fld_handle(struct ptlrpc_request *req)
2257 {
2258         return mdt_handle_common(req, mdt_fld_handlers);
2259 }
2260
2261 enum mdt_it_code {
2262         MDT_IT_OPEN,
2263         MDT_IT_OCREAT,
2264         MDT_IT_CREATE,
2265         MDT_IT_GETATTR,
2266         MDT_IT_READDIR,
2267         MDT_IT_LOOKUP,
2268         MDT_IT_UNLINK,
2269         MDT_IT_TRUNC,
2270         MDT_IT_GETXATTR,
2271         MDT_IT_NR
2272 };
2273
2274 static int mdt_intent_getattr(enum mdt_it_code opcode,
2275                               struct mdt_thread_info *info,
2276                               struct ldlm_lock **,
2277                               int);
2278 static int mdt_intent_reint(enum mdt_it_code opcode,
2279                             struct mdt_thread_info *info,
2280                             struct ldlm_lock **,
2281                             int);
2282
2283 static struct mdt_it_flavor {
2284         const struct req_format *it_fmt;
2285         __u32                    it_flags;
2286         int                    (*it_act)(enum mdt_it_code ,
2287                                          struct mdt_thread_info *,
2288                                          struct ldlm_lock **,
2289                                          int);
2290         long                     it_reint;
2291 } mdt_it_flavor[] = {
2292         [MDT_IT_OPEN]     = {
2293                 .it_fmt   = &RQF_LDLM_INTENT,
2294                 /*.it_flags = HABEO_REFERO,*/
2295                 .it_flags = 0,
2296                 .it_act   = mdt_intent_reint,
2297                 .it_reint = REINT_OPEN
2298         },
2299         [MDT_IT_OCREAT]   = {
2300                 .it_fmt   = &RQF_LDLM_INTENT,
2301                 .it_flags = MUTABOR,
2302                 .it_act   = mdt_intent_reint,
2303                 .it_reint = REINT_OPEN
2304         },
2305         [MDT_IT_CREATE]   = {
2306                 .it_fmt   = &RQF_LDLM_INTENT,
2307                 .it_flags = MUTABOR,
2308                 .it_act   = mdt_intent_reint,
2309                 .it_reint = REINT_CREATE
2310         },
2311         [MDT_IT_GETATTR]  = {
2312                 .it_fmt   = &RQF_LDLM_INTENT_GETATTR,
2313                 .it_flags = HABEO_REFERO,
2314                 .it_act   = mdt_intent_getattr
2315         },
2316         [MDT_IT_READDIR]  = {
2317                 .it_fmt   = NULL,
2318                 .it_flags = 0,
2319                 .it_act   = NULL
2320         },
2321         [MDT_IT_LOOKUP]   = {
2322                 .it_fmt   = &RQF_LDLM_INTENT_GETATTR,
2323                 .it_flags = HABEO_REFERO,
2324                 .it_act   = mdt_intent_getattr
2325         },
2326         [MDT_IT_UNLINK]   = {
2327                 .it_fmt   = &RQF_LDLM_INTENT_UNLINK,
2328                 .it_flags = MUTABOR,
2329                 .it_act   = NULL, /* XXX can be mdt_intent_reint, ? */
2330                 .it_reint = REINT_UNLINK
2331         },
2332         [MDT_IT_TRUNC]    = {
2333                 .it_fmt   = NULL,
2334                 .it_flags = MUTABOR,
2335                 .it_act   = NULL
2336         },
2337         [MDT_IT_GETXATTR] = {
2338                 .it_fmt   = NULL,
2339                 .it_flags = 0,
2340                 .it_act   = NULL
2341         }
2342 };
2343
2344 int mdt_intent_lock_replace(struct mdt_thread_info *info,
2345                             struct ldlm_lock **lockp,
2346                             struct ldlm_lock *new_lock,
2347                             struct mdt_lock_handle *lh,
2348                             int flags)
2349 {
2350         struct ptlrpc_request  *req = mdt_info_req(info);
2351         struct ldlm_lock       *lock = *lockp;
2352
2353         /*
2354          * Get new lock only for cases when possible resent did not find any
2355          * lock.
2356          */
2357         if (new_lock == NULL)
2358                 new_lock = ldlm_handle2lock(&lh->mlh_reg_lh);
2359
2360         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY)) {
2361                 lh->mlh_reg_lh.cookie = 0;
2362                 RETURN(0);
2363         }
2364
2365         LASSERTF(new_lock != NULL,
2366                  "lockh "LPX64"\n", lh->mlh_reg_lh.cookie);
2367
2368         /*
2369          * If we've already given this lock to a client once, then we should
2370          * have no readers or writers.  Otherwise, we should have one reader
2371          * _or_ writer ref (which will be zeroed below) before returning the
2372          * lock to a client.
2373          */
2374         if (new_lock->l_export == req->rq_export) {
2375                 LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
2376         } else {
2377                 LASSERT(new_lock->l_export == NULL);
2378                 LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
2379         }
2380
2381         *lockp = new_lock;
2382
2383         if (new_lock->l_export == req->rq_export) {
2384                 /*
2385                  * Already gave this to the client, which means that we
2386                  * reconstructed a reply.
2387                  */
2388                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
2389                         MSG_RESENT);
2390                 lh->mlh_reg_lh.cookie = 0;
2391                 RETURN(ELDLM_LOCK_REPLACED);
2392         }
2393
2394         /* Fixup the lock to be given to the client */
2395         lock_res_and_lock(new_lock);
2396         new_lock->l_readers = 0;
2397         new_lock->l_writers = 0;
2398
2399         new_lock->l_export = class_export_get(req->rq_export);
2400         spin_lock(&req->rq_export->exp_ldlm_data.led_lock);
2401         list_add(&new_lock->l_export_chain,
2402                  &new_lock->l_export->exp_ldlm_data.led_held_locks);
2403         spin_unlock(&req->rq_export->exp_ldlm_data.led_lock);
2404
2405         new_lock->l_blocking_ast = lock->l_blocking_ast;
2406         new_lock->l_completion_ast = lock->l_completion_ast;
2407         new_lock->l_remote_handle = lock->l_remote_handle;
2408         new_lock->l_flags &= ~LDLM_FL_LOCAL;
2409
2410         unlock_res_and_lock(new_lock);
2411         LDLM_LOCK_PUT(new_lock);
2412         lh->mlh_reg_lh.cookie = 0;
2413
2414         RETURN(ELDLM_LOCK_REPLACED);
2415 }
2416
2417 static void mdt_intent_fixup_resent(struct req_capsule *pill,
2418                                     struct ldlm_lock *new_lock,
2419                                     struct ldlm_lock **old_lock,
2420                                     struct mdt_lock_handle *lh)
2421 {
2422         struct ptlrpc_request  *req = pill->rc_req;
2423         struct obd_export      *exp = req->rq_export;
2424         struct lustre_handle    remote_hdl;
2425         struct ldlm_request    *dlmreq;
2426         struct list_head       *iter;
2427
2428         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
2429                 return;
2430
2431         dlmreq = req_capsule_client_get(pill, &RMF_DLM_REQ);
2432         remote_hdl = dlmreq->lock_handle1;
2433
2434         spin_lock(&exp->exp_ldlm_data.led_lock);
2435         list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
2436                 struct ldlm_lock *lock;
2437                 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
2438                 if (lock == new_lock)
2439                         continue;
2440                 if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
2441                         lh->mlh_reg_lh.cookie = lock->l_handle.h_cookie;
2442                         lh->mlh_reg_mode = lock->l_granted_mode;
2443
2444                         LDLM_DEBUG(lock, "restoring lock cookie");
2445                         DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
2446                                   lh->mlh_reg_lh.cookie);
2447                         if (old_lock)
2448                                 *old_lock = LDLM_LOCK_GET(lock);
2449                         spin_unlock(&exp->exp_ldlm_data.led_lock);
2450                         return;
2451                 }
2452         }
2453         spin_unlock(&exp->exp_ldlm_data.led_lock);
2454
2455         /*
2456          * If the xid matches, then we know this is a resent request, and allow
2457          * it. (It's probably an OPEN, for which we don't send a lock.
2458          */
2459         if (req_xid_is_last(req))
2460                 return;
2461
2462         /*
2463          * This remote handle isn't enqueued, so we never received or processed
2464          * this request.  Clear MSG_RESENT, because it can be handled like any
2465          * normal request now.
2466          */
2467         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
2468
2469         DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
2470                   remote_hdl.cookie);
2471 }
2472
2473 static int mdt_intent_getattr(enum mdt_it_code opcode,
2474                               struct mdt_thread_info *info,
2475                               struct ldlm_lock **lockp,
2476                               int flags)
2477 {
2478         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
2479         struct ldlm_lock       *new_lock = NULL;
2480         __u64                   child_bits;
2481         struct ldlm_reply      *ldlm_rep;
2482         struct ptlrpc_request  *req;
2483         struct mdt_body        *reqbody;
2484         struct mdt_body        *repbody;
2485         int                     rc;
2486         ENTRY;
2487
2488         reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
2489         LASSERT(reqbody);
2490         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
2491         LASSERT(repbody);
2492         repbody->eadatasize = 0;
2493         repbody->aclsize = 0;
2494
2495         switch (opcode) {
2496         case MDT_IT_LOOKUP:
2497                 child_bits = MDS_INODELOCK_LOOKUP;
2498                 break;
2499         case MDT_IT_GETATTR:
2500                 child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
2501                 break;
2502         default:
2503                 CERROR("Unhandled till now");
2504                 GOTO(out, rc = -EINVAL);
2505         }
2506
2507         rc = mdt_init_ucred(info, reqbody);
2508         if (rc)
2509                 GOTO(out, rc);
2510
2511         req = info->mti_pill.rc_req;
2512         ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
2513         mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
2514
2515         /* Get lock from request for possible resent case. */
2516         mdt_intent_fixup_resent(&info->mti_pill, *lockp, &new_lock, lhc);
2517
2518         ldlm_rep->lock_policy_res2 =
2519                 mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep);
2520
2521         if (mdt_get_disposition(ldlm_rep, DISP_LOOKUP_NEG))
2522                 ldlm_rep->lock_policy_res2 = 0;
2523         if (!mdt_get_disposition(ldlm_rep, DISP_LOOKUP_POS) ||
2524             ldlm_rep->lock_policy_res2) {
2525                 lhc->mlh_reg_lh.cookie = 0ull;
2526                 GOTO(out_ucred, rc = ELDLM_LOCK_ABORTED);
2527         }
2528
2529         rc = mdt_intent_lock_replace(info, lockp, new_lock, lhc, flags);
2530 out_ucred:
2531         mdt_exit_ucred(info);
2532         GOTO(out, rc);
2533 out:
2534         mdt_shrink_reply(info, DLM_REPLY_REC_OFF + 1, 1, 0);
2535         return rc;
2536 }
2537
2538 static int mdt_intent_reint(enum mdt_it_code opcode,
2539                             struct mdt_thread_info *info,
2540                             struct ldlm_lock **lockp,
2541                             int flags)
2542 {
2543         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
2544         struct ldlm_reply      *rep;
2545         long                    opc;
2546         int                     rc;
2547
2548         static const struct req_format *intent_fmts[REINT_MAX] = {
2549                 [REINT_CREATE]  = &RQF_LDLM_INTENT_CREATE,
2550                 [REINT_OPEN]    = &RQF_LDLM_INTENT_OPEN
2551         };
2552
2553         ENTRY;
2554
2555         opc = mdt_reint_opcode(info, intent_fmts);
2556         if (opc < 0)
2557                 RETURN(opc);
2558
2559         if (mdt_it_flavor[opcode].it_reint != opc) {
2560                 CERROR("Reint code %ld doesn't match intent: %d\n",
2561                        opc, opcode);
2562                 RETURN(err_serious(-EPROTO));
2563         }
2564
2565         /* Get lock from request for possible resent case. */
2566         mdt_intent_fixup_resent(&info->mti_pill, *lockp, NULL, lhc);
2567
2568         rc = mdt_reint_internal(info, lhc, opc);
2569
2570         rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
2571         if (rep == NULL)
2572                 RETURN(err_serious(-EFAULT));
2573
2574         /* MDC expects this in any case */
2575         if (rc != 0)
2576                 mdt_set_disposition(info, rep, DISP_LOOKUP_EXECD);
2577
2578         /* cross-ref case, the lock should be returned to the client */
2579         if (rc == -EREMOTE) {
2580                 LASSERT(lustre_handle_is_used(&lhc->mlh_reg_lh));
2581                 rep->lock_policy_res2 = 0;
2582                 RETURN(mdt_intent_lock_replace(info, lockp, NULL, lhc, flags));
2583         }
2584         rep->lock_policy_res2 = clear_serious(rc);
2585
2586         lhc->mlh_reg_lh.cookie = 0ull;
2587         RETURN(ELDLM_LOCK_ABORTED);
2588 }
2589
2590 static int mdt_intent_code(long itcode)
2591 {
2592         int rc;
2593
2594         switch(itcode) {
2595         case IT_OPEN:
2596                 rc = MDT_IT_OPEN;
2597                 break;
2598         case IT_OPEN|IT_CREAT:
2599                 rc = MDT_IT_OCREAT;
2600                 break;
2601         case IT_CREAT:
2602                 rc = MDT_IT_CREATE;
2603                 break;
2604         case IT_READDIR:
2605                 rc = MDT_IT_READDIR;
2606                 break;
2607         case IT_GETATTR:
2608                 rc = MDT_IT_GETATTR;
2609                 break;
2610         case IT_LOOKUP:
2611                 rc = MDT_IT_LOOKUP;
2612                 break;
2613         case IT_UNLINK:
2614                 rc = MDT_IT_UNLINK;
2615                 break;
2616         case IT_TRUNC:
2617                 rc = MDT_IT_TRUNC;
2618                 break;
2619         case IT_GETXATTR:
2620                 rc = MDT_IT_GETXATTR;
2621                 break;
2622         default:
2623                 CERROR("Unknown intent opcode: %ld\n", itcode);
2624                 rc = -EINVAL;
2625                 break;
2626         }
2627         return rc;
2628 }
2629
2630 static int mdt_intent_opc(long itopc, struct mdt_thread_info *info,
2631                           struct ldlm_lock **lockp, int flags)
2632 {
2633         struct req_capsule   *pill;
2634         struct mdt_it_flavor *flv;
2635         int opc;
2636         int rc;
2637         ENTRY;
2638
2639         opc = mdt_intent_code(itopc);
2640         if (opc < 0)
2641                 RETURN(-EINVAL);
2642
2643         pill = &info->mti_pill;
2644         flv  = &mdt_it_flavor[opc];
2645
2646         if (flv->it_fmt != NULL)
2647                 req_capsule_extend(pill, flv->it_fmt);
2648
2649         rc = mdt_unpack_req_pack_rep(info, flv->it_flags);
2650         if (rc == 0) {
2651                 struct ptlrpc_request *req = mdt_info_req(info);
2652                 if (flv->it_flags & MUTABOR &&
2653                     req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
2654                         rc = -EROFS;
2655         }
2656         if (rc == 0 && flv->it_act != NULL) {
2657                 /* execute policy */
2658                 rc = flv->it_act(opc, info, lockp, flags);
2659         } else
2660                 rc = -EOPNOTSUPP;
2661         RETURN(rc);
2662 }
2663
2664 static int mdt_intent_policy(struct ldlm_namespace *ns,
2665                              struct ldlm_lock **lockp, void *req_cookie,
2666                              ldlm_mode_t mode, int flags, void *data)
2667 {
2668         struct mdt_thread_info *info;
2669         struct ptlrpc_request  *req  =  req_cookie;
2670         struct ldlm_intent     *it;
2671         struct req_capsule     *pill;
2672         struct ldlm_lock       *lock = *lockp;
2673         int rc;
2674
2675         ENTRY;
2676
2677         LASSERT(req != NULL);
2678
2679         info = lu_context_key_get(&req->rq_svc_thread->t_env->le_ctx,
2680                                   &mdt_thread_key);
2681         LASSERT(info != NULL);
2682         pill = &info->mti_pill;
2683         LASSERT(pill->rc_req == req);
2684
2685         if (req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) {
2686                 req_capsule_extend(pill, &RQF_LDLM_INTENT);
2687                 it = req_capsule_client_get(pill, &RMF_LDLM_INTENT);
2688                 if (it != NULL) {
2689                         const struct ldlm_request *dlmreq;
2690                         __u64 req_bits;
2691
2692                         LDLM_DEBUG(lock, "intent policy opc: %s\n",
2693                                    ldlm_it2str(it->opc));
2694
2695                         rc = mdt_intent_opc(it->opc, info, lockp, flags);
2696                         if (rc == 0)
2697                                 rc = ELDLM_OK;
2698
2699                         /*
2700                          * Lock without inodebits makes no sense and will oops
2701                          * later in ldlm. Let's check it now to see if we have
2702                          * wrong lock from client or bits get corrupted
2703                          * somewhere in mdt_intent_opc().
2704                          */
2705                         dlmreq = info->mti_dlm_req;
2706                         req_bits = dlmreq->lock_desc.l_policy_data.l_inodebits.bits;
2707                         LASSERT(req_bits != 0);
2708
2709                 } else
2710                         rc = err_serious(-EFAULT);
2711         } else {
2712                 /* No intent was provided */
2713                 LASSERT(pill->rc_fmt == &RQF_LDLM_ENQUEUE);
2714                 rc = req_capsule_pack(pill);
2715                 if (rc)
2716                         rc = err_serious(rc);
2717         }
2718         RETURN(rc);
2719 }
2720
2721 /*
2722  * Seq wrappers
2723  */
2724 static int mdt_seq_fini(const struct lu_env *env,
2725                         struct mdt_device *m)
2726 {
2727         struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
2728         ENTRY;
2729
2730         if (ls && ls->ls_server_seq) {
2731                 seq_server_fini(ls->ls_server_seq, env);
2732                 OBD_FREE_PTR(ls->ls_server_seq);
2733                 ls->ls_server_seq = NULL;
2734         }
2735
2736         if (ls && ls->ls_control_seq) {
2737                 seq_server_fini(ls->ls_control_seq, env);
2738                 OBD_FREE_PTR(ls->ls_control_seq);
2739                 ls->ls_control_seq = NULL;
2740         }
2741
2742         if (ls && ls->ls_client_seq) {
2743                 seq_client_fini(ls->ls_client_seq);
2744                 OBD_FREE_PTR(ls->ls_client_seq);
2745                 ls->ls_client_seq = NULL;
2746         }
2747
2748         RETURN(0);
2749 }
2750
2751 static int mdt_seq_init(const struct lu_env *env,
2752                         const char *uuid,
2753                         struct mdt_device *m)
2754 {
2755         struct lu_site *ls;
2756         char *prefix;
2757         int rc;
2758         ENTRY;
2759
2760         ls = m->mdt_md_dev.md_lu_dev.ld_site;
2761
2762         /*
2763          * This is sequence-controller node. Init seq-controller server on local
2764          * MDT.
2765          */
2766         if (ls->ls_node_id == 0) {
2767                 LASSERT(ls->ls_control_seq == NULL);
2768
2769                 OBD_ALLOC_PTR(ls->ls_control_seq);
2770                 if (ls->ls_control_seq == NULL)
2771                         RETURN(-ENOMEM);
2772
2773                 rc = seq_server_init(ls->ls_control_seq,
2774                                      m->mdt_bottom, uuid,
2775                                      LUSTRE_SEQ_CONTROLLER,
2776                                      env);
2777
2778                 if (rc)
2779                         GOTO(out_seq_fini, rc);
2780
2781                 OBD_ALLOC_PTR(ls->ls_client_seq);
2782                 if (ls->ls_client_seq == NULL)
2783                         GOTO(out_seq_fini, rc = -ENOMEM);
2784
2785                 OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
2786                 if (prefix == NULL) {
2787                         OBD_FREE_PTR(ls->ls_client_seq);
2788                         GOTO(out_seq_fini, rc = -ENOMEM);
2789                 }
2790
2791                 snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s",
2792                          uuid);
2793
2794                 /*
2795                  * Init seq-controller client after seq-controller server is
2796                  * ready. Pass ls->ls_control_seq to it for direct talking.
2797                  */
2798                 rc = seq_client_init(ls->ls_client_seq, NULL,
2799                                      LUSTRE_SEQ_METADATA, prefix,
2800                                      ls->ls_control_seq);
2801                 OBD_FREE(prefix, MAX_OBD_NAME + 5);
2802
2803                 if (rc)
2804                         GOTO(out_seq_fini, rc);
2805         }
2806
2807         /* Init seq-server on local MDT */
2808         LASSERT(ls->ls_server_seq == NULL);
2809
2810         OBD_ALLOC_PTR(ls->ls_server_seq);
2811         if (ls->ls_server_seq == NULL)
2812                 GOTO(out_seq_fini, rc = -ENOMEM);
2813
2814         rc = seq_server_init(ls->ls_server_seq,
2815                              m->mdt_bottom, uuid,
2816                              LUSTRE_SEQ_SERVER,
2817                              env);
2818         if (rc)
2819                 GOTO(out_seq_fini, rc = -ENOMEM);
2820
2821         /* Assign seq-controller client to local seq-server. */
2822         if (ls->ls_node_id == 0) {
2823                 LASSERT(ls->ls_client_seq != NULL);
2824
2825                 rc = seq_server_set_cli(ls->ls_server_seq,
2826                                         ls->ls_client_seq,
2827                                         env);
2828         }
2829
2830         EXIT;
2831 out_seq_fini:
2832         if (rc)
2833                 mdt_seq_fini(env, m);
2834
2835         return rc;
2836 }
2837 /*
2838  * Init client sequence manager which is used by local MDS to talk to sequence
2839  * controller on remote node.
2840  */
2841 static int mdt_seq_init_cli(const struct lu_env *env,
2842                             struct mdt_device *m,
2843                             struct lustre_cfg *cfg)
2844 {
2845         struct lu_site    *ls = m->mdt_md_dev.md_lu_dev.ld_site;
2846         struct obd_device *mdc;
2847         struct obd_uuid   *uuidp, *mdcuuidp;
2848         char              *uuid_str, *mdc_uuid_str;
2849         int                rc;
2850         int                index;
2851         struct mdt_thread_info *info;
2852         char *p, *index_string = lustre_cfg_string(cfg, 2);
2853         ENTRY;
2854
2855         info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
2856         uuidp = &info->mti_u.uuid[0];
2857         mdcuuidp = &info->mti_u.uuid[1];
2858
2859         LASSERT(index_string);
2860
2861         index = simple_strtol(index_string, &p, 10);
2862         if (*p) {
2863                 CERROR("Invalid index in lustre_cgf, offset 2\n");
2864                 RETURN(-EINVAL);
2865         }
2866
2867         /* check if this is adding the first MDC and controller is not yet
2868          * initialized. */
2869         if (index != 0 || ls->ls_client_seq)
2870                 RETURN(0);
2871
2872         uuid_str = lustre_cfg_string(cfg, 1);
2873         mdc_uuid_str = lustre_cfg_string(cfg, 4);
2874         obd_str2uuid(uuidp, uuid_str);
2875         obd_str2uuid(mdcuuidp, mdc_uuid_str);
2876
2877         mdc = class_find_client_obd(uuidp, LUSTRE_MDC_NAME, mdcuuidp);
2878         if (!mdc) {
2879                 CERROR("can't find controller MDC by uuid %s\n",
2880                        uuid_str);
2881                 rc = -ENOENT;
2882         } else if (!mdc->obd_set_up) {
2883                 CERROR("target %s not set up\n", mdc->obd_name);
2884                 rc = -EINVAL;
2885         } else {
2886                         LASSERT(ls->ls_control_exp);
2887                         OBD_ALLOC_PTR(ls->ls_client_seq);
2888                         if (ls->ls_client_seq != NULL) {
2889                                 char *prefix;
2890
2891                                 OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
2892                                 if (!prefix)
2893                                         RETURN(-ENOMEM);
2894
2895                                 snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s",
2896                                          mdc->obd_name);
2897
2898                                 rc = seq_client_init(ls->ls_client_seq,
2899                                                      ls->ls_control_exp,
2900                                                      LUSTRE_SEQ_METADATA,
2901                                                      prefix, NULL);
2902                                 OBD_FREE(prefix, MAX_OBD_NAME + 5);
2903                         } else
2904                                 rc = -ENOMEM;
2905
2906                         if (rc)
2907                                 RETURN(rc);
2908
2909                         LASSERT(ls->ls_server_seq != NULL);
2910
2911                         rc = seq_server_set_cli(ls->ls_server_seq,
2912                                                 ls->ls_client_seq,
2913                                                 env);
2914         }
2915
2916         RETURN(rc);
2917 }
2918
2919 static void mdt_seq_fini_cli(struct mdt_device *m)
2920 {
2921         struct lu_site *ls;
2922
2923         ENTRY;
2924
2925         ls = m->mdt_md_dev.md_lu_dev.ld_site;
2926
2927         if (ls && ls->ls_server_seq)
2928                 seq_server_set_cli(ls->ls_server_seq,
2929                                    NULL, NULL);
2930
2931         if (ls && ls->ls_control_exp) {
2932                 class_export_put(ls->ls_control_exp);
2933                 ls->ls_control_exp = NULL;
2934         }
2935         EXIT;
2936 }
2937
2938 /*
2939  * FLD wrappers
2940  */
2941 static int mdt_fld_fini(const struct lu_env *env,
2942                         struct mdt_device *m)
2943 {
2944         struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
2945         ENTRY;
2946
2947         if (ls && ls->ls_server_fld) {
2948                 fld_server_fini(ls->ls_server_fld, env);
2949                 OBD_FREE_PTR(ls->ls_server_fld);
2950                 ls->ls_server_fld = NULL;
2951         }
2952
2953         RETURN(0);
2954 }
2955
2956 static int mdt_fld_init(const struct lu_env *env,
2957                         const char *uuid,
2958                         struct mdt_device *m)
2959 {
2960         struct lu_site *ls;
2961         int rc;
2962         ENTRY;
2963
2964         ls = m->mdt_md_dev.md_lu_dev.ld_site;
2965
2966         OBD_ALLOC_PTR(ls->ls_server_fld);
2967         if (ls->ls_server_fld == NULL)
2968                 RETURN(rc = -ENOMEM);
2969
2970         rc = fld_server_init(ls->ls_server_fld,
2971                              m->mdt_bottom, uuid, env);
2972         if (rc) {
2973                 OBD_FREE_PTR(ls->ls_server_fld);
2974                 ls->ls_server_fld = NULL;
2975                 RETURN(rc);
2976         }
2977
2978         RETURN(0);
2979 }
2980
2981 /* device init/fini methods */
2982 static void mdt_stop_ptlrpc_service(struct mdt_device *m)
2983 {
2984         if (m->mdt_regular_service != NULL) {
2985                 ptlrpc_unregister_service(m->mdt_regular_service);
2986                 m->mdt_regular_service = NULL;
2987         }
2988         if (m->mdt_readpage_service != NULL) {
2989                 ptlrpc_unregister_service(m->mdt_readpage_service);
2990                 m->mdt_readpage_service = NULL;
2991         }
2992         if (m->mdt_setattr_service != NULL) {
2993                 ptlrpc_unregister_service(m->mdt_setattr_service);
2994                 m->mdt_setattr_service = NULL;
2995         }
2996         if (m->mdt_mdsc_service != NULL) {
2997                 ptlrpc_unregister_service(m->mdt_mdsc_service);
2998                 m->mdt_mdsc_service = NULL;
2999         }
3000         if (m->mdt_mdss_service != NULL) {
3001                 ptlrpc_unregister_service(m->mdt_mdss_service);
3002                 m->mdt_mdss_service = NULL;
3003         }
3004         if (m->mdt_dtss_service != NULL) {
3005                 ptlrpc_unregister_service(m->mdt_dtss_service);
3006                 m->mdt_dtss_service = NULL;
3007         }
3008         if (m->mdt_fld_service != NULL) {
3009                 ptlrpc_unregister_service(m->mdt_fld_service);
3010                 m->mdt_fld_service = NULL;
3011         }
3012 }
3013
3014 static int mdt_start_ptlrpc_service(struct mdt_device *m)
3015 {
3016         int rc;
3017         static struct ptlrpc_service_conf conf;
3018         cfs_proc_dir_entry_t *procfs_entry;
3019         ENTRY;
3020
3021         procfs_entry = m->mdt_md_dev.md_lu_dev.ld_obd->obd_proc_entry;
3022
3023         conf = (typeof(conf)) {
3024                 .psc_nbufs            = MDS_NBUFS,
3025                 .psc_bufsize          = MDS_BUFSIZE,
3026                 .psc_max_req_size     = MDS_MAXREQSIZE,
3027                 .psc_max_reply_size   = MDS_MAXREPSIZE,
3028                 .psc_req_portal       = MDS_REQUEST_PORTAL,
3029                 .psc_rep_portal       = MDC_REPLY_PORTAL,
3030                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
3031                 /*
3032                  * We'd like to have a mechanism to set this on a per-device
3033                  * basis, but alas...
3034                  */
3035                 .psc_num_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
3036                                        MDT_MAX_THREADS),
3037                 .psc_ctx_tags      = LCT_MD_THREAD
3038         };
3039
3040         m->mdt_ldlm_client = &m->mdt_md_dev.md_lu_dev.ld_obd->obd_ldlm_client;
3041         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
3042                            "mdt_ldlm_client", m->mdt_ldlm_client);
3043
3044         m->mdt_regular_service =
3045                 ptlrpc_init_svc_conf(&conf, mdt_regular_handle, LUSTRE_MDT_NAME,
3046                                      procfs_entry, NULL);
3047         if (m->mdt_regular_service == NULL)
3048                 RETURN(-ENOMEM);
3049
3050         rc = ptlrpc_start_threads(NULL, m->mdt_regular_service, LUSTRE_MDT_NAME);
3051         if (rc)
3052                 GOTO(err_mdt_svc, rc);
3053
3054         /*
3055          * readpage service configuration. Parameters have to be adjusted,
3056          * ideally.
3057          */
3058         conf = (typeof(conf)) {
3059                 .psc_nbufs            = MDS_NBUFS,
3060                 .psc_bufsize          = MDS_BUFSIZE,
3061                 .psc_max_req_size     = MDS_MAXREQSIZE,
3062                 .psc_max_reply_size   = MDS_MAXREPSIZE,
3063                 .psc_req_portal       = MDS_READPAGE_PORTAL,
3064                 .psc_rep_portal       = MDC_REPLY_PORTAL,
3065                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
3066                 .psc_num_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
3067                                        MDT_MAX_THREADS),
3068                 .psc_ctx_tags      = LCT_MD_THREAD
3069         };
3070         m->mdt_readpage_service =
3071                 ptlrpc_init_svc_conf(&conf, mdt_readpage_handle,
3072                                      LUSTRE_MDT_NAME "_readpage",
3073                                      procfs_entry, NULL);
3074
3075         if (m->mdt_readpage_service == NULL) {
3076                 CERROR("failed to start readpage service\n");
3077                 GOTO(err_mdt_svc, rc = -ENOMEM);
3078         }
3079
3080         rc = ptlrpc_start_threads(NULL, m->mdt_readpage_service, "mdt_rdpg");
3081
3082         /*
3083          * setattr service configuration.
3084          */
3085         conf = (typeof(conf)) {
3086                 .psc_nbufs            = MDS_NBUFS,
3087                 .psc_bufsize          = MDS_BUFSIZE,
3088                 .psc_max_req_size     = MDS_MAXREQSIZE,
3089                 .psc_max_reply_size   = MDS_MAXREPSIZE,
3090                 .psc_req_portal       = MDS_SETATTR_PORTAL,
3091                 .psc_rep_portal       = MDC_REPLY_PORTAL,
3092                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
3093                 .psc_num_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
3094                                        MDT_MAX_THREADS),
3095                 .psc_ctx_tags      = LCT_MD_THREAD
3096         };
3097
3098         m->mdt_setattr_service =
3099                 ptlrpc_init_svc_conf(&conf, mdt_regular_handle,
3100                                      LUSTRE_MDT_NAME "_setattr",
3101                                      procfs_entry, NULL);
3102
3103         if (!m->mdt_setattr_service) {
3104                 CERROR("failed to start setattr service\n");
3105                 GOTO(err_mdt_svc, rc = -ENOMEM);
3106         }
3107
3108         rc = ptlrpc_start_threads(NULL, m->mdt_setattr_service, "mdt_attr");
3109         if (rc)
3110                 GOTO(err_mdt_svc, rc);
3111
3112         /*
3113          * sequence controller service configuration
3114          */
3115         conf = (typeof(conf)) {
3116                 .psc_nbufs = MDS_NBUFS,
3117                 .psc_bufsize = MDS_BUFSIZE,
3118                 .psc_max_req_size = SEQ_MAXREQSIZE,
3119                 .psc_max_reply_size = SEQ_MAXREPSIZE,
3120                 .psc_req_portal = SEQ_CONTROLLER_PORTAL,
3121                 .psc_rep_portal = MDC_REPLY_PORTAL,
3122                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
3123                 .psc_num_threads = SEQ_NUM_THREADS,
3124                 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
3125         };
3126
3127         m->mdt_mdsc_service =
3128                 ptlrpc_init_svc_conf(&conf, mdt_mdsc_handle,
3129                                      LUSTRE_MDT_NAME"_mdsc",
3130                                      procfs_entry, NULL);
3131         if (!m->mdt_mdsc_service) {
3132                 CERROR("failed to start seq controller service\n");
3133                 GOTO(err_mdt_svc, rc = -ENOMEM);
3134         }
3135
3136         rc = ptlrpc_start_threads(NULL, m->mdt_mdsc_service, "mdt_mdsc");
3137         if (rc)
3138                 GOTO(err_mdt_svc, rc);
3139
3140         /*
3141          * metadata sequence server service configuration
3142          */
3143         conf = (typeof(conf)) {
3144                 .psc_nbufs = MDS_NBUFS,
3145                 .psc_bufsize = MDS_BUFSIZE,
3146                 .psc_max_req_size = SEQ_MAXREQSIZE,
3147                 .psc_max_reply_size = SEQ_MAXREPSIZE,
3148                 .psc_req_portal = SEQ_METADATA_PORTAL,
3149                 .psc_rep_portal = MDC_REPLY_PORTAL,
3150                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
3151                 .psc_num_threads = SEQ_NUM_THREADS,
3152                 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
3153         };
3154
3155         m->mdt_mdss_service =
3156                 ptlrpc_init_svc_conf(&conf, mdt_mdss_handle,
3157                                      LUSTRE_MDT_NAME"_mdss",
3158                                      procfs_entry, NULL);
3159         if (!m->mdt_mdss_service) {
3160                 CERROR("failed to start metadata seq server service\n");
3161                 GOTO(err_mdt_svc, rc = -ENOMEM);
3162         }
3163
3164         rc = ptlrpc_start_threads(NULL, m->mdt_mdss_service, "mdt_mdss");
3165         if (rc)
3166                 GOTO(err_mdt_svc, rc);
3167
3168
3169         /*
3170          * Data sequence server service configuration. We want to have really
3171          * cluster-wide sequences space. This is why we start only one sequence
3172          * controller which manages space.
3173          */
3174         conf = (typeof(conf)) {
3175                 .psc_nbufs = MDS_NBUFS,
3176                 .psc_bufsize = MDS_BUFSIZE,
3177                 .psc_max_req_size = SEQ_MAXREQSIZE,
3178                 .psc_max_reply_size = SEQ_MAXREPSIZE,
3179                 .psc_req_portal = SEQ_DATA_PORTAL,
3180                 .psc_rep_portal = OSC_REPLY_PORTAL,
3181                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
3182                 .psc_num_threads = SEQ_NUM_THREADS,
3183                 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
3184         };
3185
3186         m->mdt_dtss_service =
3187                 ptlrpc_init_svc_conf(&conf, mdt_dtss_handle,
3188                                      LUSTRE_MDT_NAME"_dtss",
3189                                      procfs_entry, NULL);
3190         if (!m->mdt_dtss_service) {
3191                 CERROR("failed to start data seq server service\n");
3192                 GOTO(err_mdt_svc, rc = -ENOMEM);
3193         }
3194
3195         rc = ptlrpc_start_threads(NULL, m->mdt_dtss_service, "mdt_dtss");
3196         if (rc)
3197                 GOTO(err_mdt_svc, rc);
3198
3199         /* FLD service start */
3200         conf = (typeof(conf)) {
3201                 .psc_nbufs            = MDS_NBUFS,
3202                 .psc_bufsize          = MDS_BUFSIZE,
3203                 .psc_max_req_size     = FLD_MAXREQSIZE,
3204                 .psc_max_reply_size   = FLD_MAXREPSIZE,
3205                 .psc_req_portal       = FLD_REQUEST_PORTAL,
3206                 .psc_rep_portal       = MDC_REPLY_PORTAL,
3207                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
3208                 .psc_num_threads      = FLD_NUM_THREADS,
3209                 .psc_ctx_tags         = LCT_DT_THREAD|LCT_MD_THREAD
3210         };
3211
3212         m->mdt_fld_service =
3213                 ptlrpc_init_svc_conf(&conf, mdt_fld_handle,
3214                                      LUSTRE_MDT_NAME"_fld",
3215                                      procfs_entry, NULL);
3216         if (!m->mdt_fld_service) {
3217                 CERROR("failed to start fld service\n");
3218                 GOTO(err_mdt_svc, rc = -ENOMEM);
3219         }
3220
3221         rc = ptlrpc_start_threads(NULL, m->mdt_fld_service, "mdt_fld");
3222         if (rc)
3223                 GOTO(err_mdt_svc, rc);
3224
3225         EXIT;
3226 err_mdt_svc:
3227         if (rc)
3228                 mdt_stop_ptlrpc_service(m);
3229
3230         return rc;
3231 }
3232
3233 static void mdt_stack_fini(const struct lu_env *env,
3234                            struct mdt_device *m, struct lu_device *top)
3235 {
3236         struct lu_device        *d = top, *n;
3237         struct lustre_cfg_bufs  *bufs;
3238         struct lustre_cfg       *lcfg;
3239         struct mdt_thread_info  *info;
3240         ENTRY;
3241
3242         info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
3243         LASSERT(info != NULL);
3244
3245         bufs = &info->mti_u.bufs;
3246         /* process cleanup, pass mdt obd name to get obd umount flags */
3247         lustre_cfg_bufs_reset(bufs, m->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
3248         lcfg = lustre_cfg_new(LCFG_CLEANUP, bufs);
3249         if (!lcfg) {
3250                 CERROR("Cannot alloc lcfg!\n");
3251                 return;
3252         }
3253         LASSERT(top);
3254         top->ld_ops->ldo_process_config(env, top, lcfg);
3255         lustre_cfg_free(lcfg);
3256
3257         lu_site_purge(env, top->ld_site, ~0);
3258         while (d != NULL) {
3259                 struct obd_type *type;
3260                 struct lu_device_type *ldt = d->ld_type;
3261
3262                 /* each fini() returns next device in stack of layers
3263                  * * so we can avoid the recursion */
3264                 n = ldt->ldt_ops->ldto_device_fini(env, d);
3265                 lu_device_put(d);
3266                 ldt->ldt_ops->ldto_device_free(env, d);
3267                 type = ldt->ldt_obd_type;
3268                 type->typ_refcnt--;
3269                 class_put_type(type);
3270
3271                 /* switch to the next device in the layer */
3272                 d = n;
3273         }
3274         m->mdt_child = NULL;
3275 }
3276
3277 static struct lu_device *mdt_layer_setup(const struct lu_env *env,
3278                                          const char *typename,
3279                                          struct lu_device *child,
3280                                          struct lustre_cfg *cfg)
3281 {
3282         const char            *dev = lustre_cfg_string(cfg, 0);
3283         struct obd_type       *type;
3284         struct lu_device_type *ldt;
3285         struct lu_device      *d;
3286         int rc;
3287         ENTRY;
3288
3289         /* find the type */
3290         type = class_get_type(typename);
3291         if (!type) {
3292                 CERROR("Unknown type: '%s'\n", typename);
3293                 GOTO(out, rc = -ENODEV);
3294         }
3295
3296         rc = lu_context_refill(&env->le_ctx);
3297         if (rc != 0) {
3298                 CERROR("Failure to refill context: '%d'\n", rc);
3299                 GOTO(out_type, rc);
3300         }
3301
3302         if (env->le_ses != NULL) {
3303                 rc = lu_context_refill(env->le_ses);
3304                 if (rc != 0) {
3305                         CERROR("Failure to refill session: '%d'\n", rc);
3306                         GOTO(out_type, rc);
3307                 }
3308         }
3309
3310         ldt = type->typ_lu;
3311         if (ldt == NULL) {
3312                 CERROR("type: '%s'\n", typename);
3313                 GOTO(out_type, rc = -EINVAL);
3314         }
3315
3316         ldt->ldt_obd_type = type;
3317         d = ldt->ldt_ops->ldto_device_alloc(env, ldt, cfg);
3318         if (IS_ERR(d)) {
3319                 CERROR("Cannot allocate device: '%s'\n", typename);
3320                 GOTO(out_type, rc = -ENODEV);
3321         }
3322
3323         LASSERT(child->ld_site);
3324         d->ld_site = child->ld_site;
3325
3326         type->typ_refcnt++;
3327         rc = ldt->ldt_ops->ldto_device_init(env, d, dev, child);
3328         if (rc) {
3329                 CERROR("can't init device '%s', rc %d\n", typename, rc);
3330                 GOTO(out_alloc, rc);
3331         }
3332         lu_device_get(d);
3333
3334         RETURN(d);
3335
3336 out_alloc:
3337         ldt->ldt_ops->ldto_device_free(env, d);
3338         type->typ_refcnt--;
3339 out_type:
3340         class_put_type(type);
3341 out:
3342         return ERR_PTR(rc);
3343 }
3344
3345 static int mdt_stack_init(const struct lu_env *env,
3346                           struct mdt_device *m, struct lustre_cfg *cfg)
3347 {
3348         struct lu_device  *d = &m->mdt_md_dev.md_lu_dev;
3349         struct lu_device  *tmp;
3350         struct md_device  *md;
3351         int rc;
3352         ENTRY;
3353
3354         /* init the stack */
3355         tmp = mdt_layer_setup(env, LUSTRE_OSD_NAME, d, cfg);
3356         if (IS_ERR(tmp)) {
3357                 RETURN(PTR_ERR(tmp));
3358         }
3359         m->mdt_bottom = lu2dt_dev(tmp);
3360         d = tmp;
3361         tmp = mdt_layer_setup(env, LUSTRE_MDD_NAME, d, cfg);
3362         if (IS_ERR(tmp)) {
3363                 GOTO(out, rc = PTR_ERR(tmp));
3364         }
3365         d = tmp;
3366         md = lu2md_dev(d);
3367
3368         tmp = mdt_layer_setup(env, LUSTRE_CMM_NAME, d, cfg);
3369         if (IS_ERR(tmp)) {
3370                 GOTO(out, rc = PTR_ERR(tmp));
3371         }
3372         d = tmp;
3373         /*set mdd upcall device*/
3374         md->md_upcall.mu_upcall_dev = lu2md_dev(d);
3375
3376         md = lu2md_dev(d);
3377         /*set cmm upcall device*/
3378         md->md_upcall.mu_upcall_dev = &m->mdt_md_dev;
3379
3380         m->mdt_child = lu2md_dev(d);
3381
3382         /* process setup config */
3383         tmp = &m->mdt_md_dev.md_lu_dev;
3384         rc = tmp->ld_ops->ldo_process_config(env, tmp, cfg);
3385         GOTO(out, rc);
3386 out:
3387         /* fini from last known good lu_device */
3388         if (rc)
3389                 mdt_stack_fini(env, m, d);
3390
3391         return rc;
3392 }
3393
3394 static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
3395 {
3396         struct md_device *next = m->mdt_child;
3397         struct lu_device  *d = &m->mdt_md_dev.md_lu_dev;
3398         struct lu_site    *ls = d->ld_site;
3399
3400         ENTRY;
3401
3402         mdt_fs_cleanup(env, m);
3403
3404         ping_evictor_stop();
3405         mdt_stop_ptlrpc_service(m);
3406
3407         cleanup_capas(CAPA_SITE_SERVER);
3408         del_timer(&m->mdt_ck_timer);
3409         mdt_ck_thread_stop(m);
3410
3411         upcall_cache_cleanup(m->mdt_rmtacl_cache);
3412         m->mdt_rmtacl_cache = NULL;
3413
3414         upcall_cache_cleanup(m->mdt_identity_cache);
3415         m->mdt_identity_cache = NULL;
3416
3417         if (m->mdt_namespace != NULL) {
3418                 ldlm_namespace_free(m->mdt_namespace, 0);
3419                 d->ld_obd->obd_namespace = m->mdt_namespace = NULL;
3420         }
3421
3422         mdt_seq_fini(env, m);
3423         mdt_seq_fini_cli(m);
3424         mdt_fld_fini(env, m);
3425         lprocfs_obd_cleanup(d->ld_obd);
3426
3427         if (m->mdt_rootsquash_info) {
3428                 OBD_FREE_PTR(m->mdt_rootsquash_info);
3429                 m->mdt_rootsquash_info = NULL;
3430         }
3431
3432         next->md_ops->mdo_init_capa_ctxt(env, next, 0, 0, 0, NULL);
3433         cleanup_capas(CAPA_SITE_SERVER);
3434         del_timer(&m->mdt_ck_timer);
3435         mdt_ck_thread_stop(m);
3436
3437         /* finish the stack */
3438         mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
3439
3440         if (ls) {
3441                 if (!list_empty(&ls->ls_lru) ||
3442                     ls->ls_total != 0 || ls->ls_busy != 0) {
3443                         /*
3444                          * Uh-oh, objects still exist.
3445                          */
3446                         static DECLARE_LU_CDEBUG_PRINT_INFO(cookie, D_ERROR);
3447
3448                         lu_site_print(env, ls, &cookie, lu_cdebug_printer);
3449                 }
3450
3451                 lu_site_fini(ls);
3452                 OBD_FREE_PTR(ls);
3453                 d->ld_site = NULL;
3454         }
3455         LASSERT(atomic_read(&d->ld_ref) == 0);
3456         md_device_fini(&m->mdt_md_dev);
3457
3458         EXIT;
3459 }
3460
3461 static void fsoptions_to_mdt_flags(struct mdt_device *m, char *options)
3462 {
3463         char *p = options;
3464
3465         if (!options)
3466                 return;
3467
3468         while (*options) {
3469                 int len;
3470
3471                 while (*p && *p != ',')
3472                         p++;
3473
3474                 len = p - options;
3475                 if ((len == sizeof("user_xattr") - 1) &&
3476                     (memcmp(options, "user_xattr", len) == 0)) {
3477                         m->mdt_opts.mo_user_xattr = 1;
3478                         LCONSOLE_INFO("Enabling user_xattr\n");
3479                 } else if ((len == sizeof("nouser_xattr") - 1) &&
3480                            (memcmp(options, "nouser_xattr", len) == 0)) {
3481                         m->mdt_opts.mo_user_xattr = 0;
3482                         LCONSOLE_INFO("Disabling user_xattr\n");
3483                 } else if ((len == sizeof("acl") - 1) &&
3484                            (memcmp(options, "acl", len) == 0)) {
3485 #ifdef CONFIG_FS_POSIX_ACL
3486                         m->mdt_opts.mo_acl = 1;
3487                         LCONSOLE_INFO("Enabling ACL\n");
3488 #else
3489                         m->mdt_opts.mo_acl = 0;
3490                         CWARN("ignoring unsupported acl mount option\n");
3491                         LCONSOLE_INFO("Disabling ACL\n");
3492 #endif
3493                 } else if ((len == sizeof("noacl") - 1) &&
3494                            (memcmp(options, "noacl", len) == 0)) {
3495 #ifdef CONFIG_FS_POSIX_ACL
3496                         m->mdt_opts.mo_acl = 0;
3497                         LCONSOLE_INFO("Disabling ACL\n");
3498 #endif
3499                 }
3500
3501                 options = ++p;
3502         }
3503 }
3504
3505 int mdt_postrecov(const struct lu_env *, struct mdt_device *);
3506
3507 static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
3508                      struct lu_device_type *ldt, struct lustre_cfg *cfg)
3509 {
3510         struct lprocfs_static_vars lvars;
3511         struct mdt_thread_info    *info;
3512         struct obd_device         *obd;
3513         const char                *dev = lustre_cfg_string(cfg, 0);
3514         const char                *num = lustre_cfg_string(cfg, 2);
3515         struct lustre_mount_info  *lmi;
3516         struct lustre_sb_info     *lsi;
3517         struct vfsmount           *mnt;
3518         struct lu_site            *s;
3519         int                        rc;
3520         ENTRY;
3521
3522         info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
3523         LASSERT(info != NULL);
3524
3525         obd = class_name2obd(dev);
3526         LASSERT(obd != NULL);
3527
3528         spin_lock_init(&m->mdt_transno_lock);
3529
3530         m->mdt_max_mdsize = MAX_MD_SIZE;
3531         m->mdt_max_cookiesize = sizeof(struct llog_cookie);
3532
3533         m->mdt_opts.mo_user_xattr = 0;
3534         m->mdt_opts.mo_acl = 0;
3535         lmi = server_get_mount_2(dev);
3536         if (lmi == NULL) {
3537                 CERROR("Cannot get mount info for %s!\n", dev);
3538                 RETURN(-EFAULT);
3539         } else {
3540                 lsi = s2lsi(lmi->lmi_sb);
3541                 fsoptions_to_mdt_flags(m, lsi->lsi_lmd->lmd_opts);
3542
3543                 mnt = lmi->lmi_mnt;
3544                 OBD_SET_CTXT_MAGIC(&obd->obd_lvfs_ctxt);
3545                 obd->obd_lvfs_ctxt.pwdmnt = mnt;
3546                 obd->obd_lvfs_ctxt.pwd = mnt->mnt_root;
3547                 obd->obd_lvfs_ctxt.fs = get_ds();
3548
3549                 server_put_mount_2(dev, mnt);
3550         }
3551
3552         spin_lock_init(&m->mdt_ioepoch_lock);
3553         m->mdt_opts.mo_compat_resname = 0;
3554         m->mdt_capa_timeout = CAPA_TIMEOUT;
3555         m->mdt_capa_alg = CAPA_HMAC_ALG_SHA1;
3556         m->mdt_ck_timeout = CAPA_KEY_TIMEOUT;
3557         obd->obd_replayable = 1;
3558         spin_lock_init(&m->mdt_client_bitmap_lock);
3559
3560         OBD_ALLOC_PTR(s);
3561         if (s == NULL)
3562                 RETURN(-ENOMEM);
3563
3564         md_device_init(&m->mdt_md_dev, ldt);
3565         m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
3566         m->mdt_md_dev.md_lu_dev.ld_obd = obd;
3567         /* set this lu_device to obd, because error handling need it */
3568         obd->obd_lu_dev = &m->mdt_md_dev.md_lu_dev;
3569
3570         rc = lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
3571         if (rc) {
3572                 CERROR("can't init lu_site, rc %d\n", rc);
3573                 GOTO(err_free_site, rc);
3574         }
3575
3576         lprocfs_init_vars(mdt, &lvars);
3577         rc = lprocfs_obd_setup(obd, lvars.obd_vars);
3578         if (rc) {
3579                 CERROR("can't init lprocfs, rc %d\n", rc);
3580                 GOTO(err_fini_site, rc);
3581         }
3582         ptlrpc_lprocfs_register_obd(obd);
3583
3584         /* set server index */
3585         LASSERT(num);
3586         s->ls_node_id = simple_strtol(num, NULL, 10);
3587
3588         /* init the stack */
3589         rc = mdt_stack_init(env, m, cfg);
3590         if (rc) {
3591                 CERROR("can't init device stack, rc %d\n", rc);
3592                 GOTO(err_fini_proc, rc);
3593         }
3594
3595         rc = mdt_fld_init(env, obd->obd_name, m);
3596         if (rc)
3597                 GOTO(err_fini_stack, rc);
3598
3599         rc = mdt_seq_init(env, obd->obd_name, m);
3600         if (rc)
3601                 GOTO(err_fini_fld, rc);
3602
3603         snprintf(info->mti_u.ns_name, sizeof info->mti_u.ns_name,
3604                  LUSTRE_MDT_NAME"-%p", m);
3605         m->mdt_namespace = ldlm_namespace_new(info->mti_u.ns_name,
3606                                               LDLM_NAMESPACE_SERVER);
3607         if (m->mdt_namespace == NULL)
3608                 GOTO(err_fini_seq, rc = -ENOMEM);
3609
3610         ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
3611         /* set obd_namespace for compatibility with old code */
3612         obd->obd_namespace = m->mdt_namespace;
3613
3614         m->mdt_identity_cache = upcall_cache_init(obd->obd_name,
3615                                                   "NONE",
3616                                                   &mdt_identity_upcall_cache_ops);
3617         if (IS_ERR(m->mdt_identity_cache)) {
3618                 rc = PTR_ERR(m->mdt_identity_cache);
3619                 m->mdt_identity_cache = NULL;
3620                 GOTO(err_free_ns, rc);
3621         }
3622
3623         m->mdt_rmtacl_cache = upcall_cache_init(obd->obd_name,
3624                                                 MDT_RMTACL_UPCALL_PATH,
3625                                                 &mdt_rmtacl_upcall_cache_ops);
3626         if (IS_ERR(m->mdt_rmtacl_cache)) {
3627                 rc = PTR_ERR(m->mdt_rmtacl_cache);
3628                 m->mdt_rmtacl_cache = NULL;
3629                 GOTO(err_free_ns, rc);
3630         }
3631
3632         m->mdt_ck_timer.function = mdt_ck_timer_callback;
3633         m->mdt_ck_timer.data = (unsigned long)m;
3634         init_timer(&m->mdt_ck_timer);
3635         rc = mdt_ck_thread_start(m);
3636         if (rc)
3637                 GOTO(err_free_ns, rc);
3638
3639         rc = mdt_start_ptlrpc_service(m);
3640         if (rc)
3641                 GOTO(err_capa, rc);
3642
3643         ping_evictor_start();
3644
3645         rc = mdt_fs_setup(env, m, obd);
3646         if (rc)
3647                 GOTO(err_stop_service, rc);
3648
3649         rc = lu_site_init_finish(s);
3650         if (rc)
3651                 GOTO(err_fs_cleanup, rc);
3652
3653         if (obd->obd_recovering == 0)
3654                 mdt_postrecov(env, m);
3655
3656         mdt_init_capa_ctxt(env, m);
3657         RETURN(0);
3658
3659 err_fs_cleanup:
3660         mdt_fs_cleanup(env, m);
3661 err_stop_service:
3662         mdt_stop_ptlrpc_service(m);
3663 err_capa:
3664         del_timer(&m->mdt_ck_timer);
3665         mdt_ck_thread_stop(m);
3666 err_free_ns:
3667         upcall_cache_cleanup(m->mdt_rmtacl_cache);
3668         m->mdt_rmtacl_cache = NULL;
3669         upcall_cache_cleanup(m->mdt_identity_cache);
3670         m->mdt_identity_cache = NULL;
3671         ldlm_namespace_free(m->mdt_namespace, 0);
3672         obd->obd_namespace = m->mdt_namespace = NULL;
3673 err_fini_seq:
3674         mdt_seq_fini(env, m);
3675 err_fini_fld:
3676         mdt_fld_fini(env, m);
3677 err_fini_stack:
3678         mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
3679 err_fini_proc:
3680         lprocfs_obd_cleanup(obd);
3681 err_fini_site:
3682         lu_site_fini(s);
3683 err_free_site:
3684         OBD_FREE_PTR(s);
3685
3686         md_device_fini(&m->mdt_md_dev);
3687         return (rc);
3688 }
3689
3690 /* used by MGS to process specific configurations */
3691 static int mdt_process_config(const struct lu_env *env,
3692                               struct lu_device *d, struct lustre_cfg *cfg)
3693 {
3694         struct mdt_device *m = mdt_dev(d);
3695         struct md_device *md_next = m->mdt_child;
3696         struct lu_device *next = md2lu_dev(md_next);
3697         int rc = 0;
3698         ENTRY;
3699
3700         switch (cfg->lcfg_command) {
3701         case LCFG_PARAM: {
3702                 struct lprocfs_static_vars lvars;
3703                 struct obd_device *obd = d->ld_obd;
3704
3705                 lprocfs_init_vars(mdt, &lvars);
3706                 rc = class_process_proc_param(PARAM_MDT, lvars.obd_vars, cfg, obd);
3707                 if (rc)
3708                         /* others are passed further */
3709                         rc = next->ld_ops->ldo_process_config(env, next, cfg);
3710                 break;
3711         }
3712         case LCFG_ADD_MDC:
3713                 /*
3714                  * Add mdc hook to get first MDT uuid and connect it to
3715                  * ls->controller to use for seq manager.
3716                  */
3717                 rc = next->ld_ops->ldo_process_config(env, next, cfg);
3718                 if (rc)
3719                         CERROR("Can't add mdc, rc %d\n", rc);
3720                 else
3721                         rc = mdt_seq_init_cli(env, mdt_dev(d), cfg);
3722                 break;
3723         default:
3724                 /* others are passed further */
3725                 rc = next->ld_ops->ldo_process_config(env, next, cfg);
3726                 break;
3727         }
3728         RETURN(rc);
3729 }
3730
3731 static struct lu_object *mdt_object_alloc(const struct lu_env *env,
3732                                           const struct lu_object_header *hdr,
3733                                           struct lu_device *d)
3734 {
3735         struct mdt_object *mo;
3736
3737         ENTRY;
3738
3739         OBD_ALLOC_PTR(mo);
3740         if (mo != NULL) {
3741                 struct lu_object *o;
3742                 struct lu_object_header *h;
3743
3744                 o = &mo->mot_obj.mo_lu;
3745                 h = &mo->mot_header;
3746                 lu_object_header_init(h);
3747                 lu_object_init(o, h, d);
3748                 lu_object_add_top(h, o);
3749                 o->lo_ops = &mdt_obj_ops;
3750                 RETURN(o);
3751         } else
3752                 RETURN(NULL);
3753 }
3754
3755 static int mdt_object_init(const struct lu_env *env, struct lu_object *o)
3756 {
3757         struct mdt_device *d = mdt_dev(o->lo_dev);
3758         struct lu_device  *under;
3759         struct lu_object  *below;
3760         int                rc = 0;
3761         ENTRY;
3762
3763         CDEBUG(D_INFO, "object init, fid = "DFID"\n",
3764                PFID(lu_object_fid(o)));
3765
3766         under = &d->mdt_child->md_lu_dev;
3767         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
3768         if (below != NULL) {
3769                 lu_object_add(o, below);
3770         } else
3771                 rc = -ENOMEM;
3772
3773         RETURN(rc);
3774 }
3775
3776 static void mdt_object_free(const struct lu_env *env, struct lu_object *o)
3777 {
3778         struct mdt_object *mo = mdt_obj(o);
3779         struct lu_object_header *h;
3780         ENTRY;
3781
3782         h = o->lo_header;
3783         CDEBUG(D_INFO, "object free, fid = "DFID"\n",
3784                PFID(lu_object_fid(o)));
3785
3786         lu_object_fini(o);
3787         lu_object_header_fini(h);
3788         OBD_FREE_PTR(mo);
3789         EXIT;
3790 }
3791
3792 static int mdt_object_print(const struct lu_env *env, void *cookie,
3793                             lu_printer_t p, const struct lu_object *o)
3794 {
3795         return (*p)(env, cookie, LUSTRE_MDT_NAME"-object@%p", o);
3796 }
3797
3798 static struct lu_device_operations mdt_lu_ops = {
3799         .ldo_object_alloc   = mdt_object_alloc,
3800         .ldo_process_config = mdt_process_config
3801 };
3802
3803 static struct lu_object_operations mdt_obj_ops = {
3804         .loo_object_init    = mdt_object_init,
3805         .loo_object_free    = mdt_object_free,
3806         .loo_object_print   = mdt_object_print
3807 };
3808
3809 /* mds_connect_internal */
3810 static int mdt_connect_internal(struct obd_export *exp,
3811                                 struct mdt_device *mdt,
3812                                 struct obd_connect_data *data)
3813 {
3814         __u64 flags;
3815
3816         if (data != NULL) {
3817                 data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
3818                 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
3819
3820                 /* If no known bits (which should not happen, probably,
3821                    as everybody should support LOOKUP and UPDATE bits at least)
3822                    revert to compat mode with plain locks. */
3823                 if (!data->ocd_ibits_known &&
3824                     data->ocd_connect_flags & OBD_CONNECT_IBITS)
3825                         data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
3826
3827                 if (!mdt->mdt_opts.mo_acl)
3828                         data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
3829
3830                 if (!mdt->mdt_opts.mo_user_xattr)
3831                         data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
3832
3833                 if (!mdt->mdt_opts.mo_mds_capa)
3834                         data->ocd_connect_flags &= ~OBD_CONNECT_MDS_CAPA;
3835
3836                 if (!mdt->mdt_opts.mo_oss_capa)
3837                         data->ocd_connect_flags &= ~OBD_CONNECT_OSS_CAPA;
3838
3839                 exp->exp_connect_flags = data->ocd_connect_flags;
3840                 data->ocd_version = LUSTRE_VERSION_CODE;
3841                 exp->exp_mdt_data.med_ibits_known = data->ocd_ibits_known;
3842         }
3843
3844 #if 0
3845         if (mdt->mdt_opts.mo_acl &&
3846             ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
3847                 CWARN("%s: MDS requires ACL support but client does not\n",
3848                       mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
3849                 return -EBADE;
3850         }
3851 #endif
3852
3853         flags = OBD_CONNECT_LCL_CLIENT | OBD_CONNECT_RMT_CLIENT;
3854         if ((exp->exp_connect_flags & flags) == flags) {
3855                 CWARN("%s: both local and remote client flags are set\n",
3856                       mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
3857                 return -EBADE;
3858         }
3859
3860         if (mdt->mdt_opts.mo_mds_capa &&
3861             ((exp->exp_connect_flags & OBD_CONNECT_MDS_CAPA) == 0)) {
3862                 CWARN("%s: MDS requires capability support, but client not\n",
3863                       mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
3864                 return -EBADE;
3865         }
3866
3867         if (mdt->mdt_opts.mo_oss_capa &&
3868             ((exp->exp_connect_flags & OBD_CONNECT_OSS_CAPA) == 0)) {
3869                 CWARN("%s: MDS requires OSS capability support, "
3870                       "but client not\n",
3871                       mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
3872                 return -EBADE;
3873         }
3874
3875         return 0;
3876 }
3877
3878 /* mds_connect copy */
3879 static int mdt_obd_connect(const struct lu_env *env,
3880                            struct lustre_handle *conn, struct obd_device *obd,
3881                            struct obd_uuid *cluuid,
3882                            struct obd_connect_data *data)
3883 {
3884         struct mdt_export_data *med;
3885         struct mdt_client_data *mcd;
3886         struct obd_export      *exp;
3887         struct mdt_device      *mdt;
3888         int                     rc;
3889         ENTRY;
3890
3891         LASSERT(env != NULL);
3892         if (!conn || !obd || !cluuid)
3893                 RETURN(-EINVAL);
3894
3895         mdt = mdt_dev(obd->obd_lu_dev);
3896
3897         rc = class_connect(conn, obd, cluuid);
3898         if (rc)
3899                 RETURN(rc);
3900
3901         exp = class_conn2export(conn);
3902         LASSERT(exp != NULL);
3903         med = &exp->exp_mdt_data;
3904
3905         rc = mdt_connect_internal(exp, mdt, data);
3906         if (rc == 0) {
3907                 OBD_ALLOC_PTR(mcd);
3908                 if (mcd != NULL) {
3909                         memcpy(mcd->mcd_uuid, cluuid, sizeof mcd->mcd_uuid);
3910                         med->med_mcd = mcd;
3911                         rc = mdt_client_new(env, mdt, med);
3912                         if (rc != 0) {
3913                                 OBD_FREE_PTR(mcd);
3914                                 med->med_mcd = NULL;
3915                         }
3916                 } else
3917                         rc = -ENOMEM;
3918         }
3919
3920         if (rc != 0)
3921                 class_disconnect(exp);
3922         else
3923                 class_export_put(exp);
3924
3925         RETURN(rc);
3926 }
3927
3928 static int mdt_obd_reconnect(struct obd_export *exp, struct obd_device *obd,
3929                              struct obd_uuid *cluuid,
3930                              struct obd_connect_data *data)
3931 {
3932         int rc;
3933         ENTRY;
3934
3935         if (exp == NULL || obd == NULL || cluuid == NULL)
3936                 RETURN(-EINVAL);
3937
3938         rc = mdt_connect_internal(exp, mdt_dev(obd->obd_lu_dev), data);
3939
3940         RETURN(rc);
3941 }
3942
3943 static int mdt_obd_disconnect(struct obd_export *exp)
3944 {
3945         struct mdt_device *mdt = mdt_dev(exp->exp_obd->obd_lu_dev);
3946         int rc;
3947         ENTRY;
3948
3949         LASSERT(exp);
3950         class_export_get(exp);
3951
3952         /* Disconnect early so that clients can't keep using export */
3953         rc = class_disconnect(exp);
3954         if (mdt->mdt_namespace != NULL || exp->exp_obd->obd_namespace != NULL)
3955                 ldlm_cancel_locks_for_export(exp);
3956
3957         /* complete all outstanding replies */
3958         spin_lock(&exp->exp_lock);
3959         while (!list_empty(&exp->exp_outstanding_replies)) {
3960                 struct ptlrpc_reply_state *rs =
3961                         list_entry(exp->exp_outstanding_replies.next,
3962                                    struct ptlrpc_reply_state, rs_exp_list);
3963                 struct ptlrpc_service *svc = rs->rs_service;
3964
3965                 spin_lock(&svc->srv_lock);
3966                 list_del_init(&rs->rs_exp_list);
3967                 ptlrpc_schedule_difficult_reply(rs);
3968                 spin_unlock(&svc->srv_lock);
3969         }
3970         spin_unlock(&exp->exp_lock);
3971
3972         class_export_put(exp);
3973         RETURN(rc);
3974 }
3975
3976 /* FIXME: Can we avoid using these two interfaces? */
3977 static int mdt_init_export(struct obd_export *exp)
3978 {
3979         struct mdt_export_data *med = &exp->exp_mdt_data;
3980         ENTRY;
3981
3982         INIT_LIST_HEAD(&med->med_open_head);
3983         spin_lock_init(&med->med_open_lock);
3984         exp->exp_connecting = 1;
3985         RETURN(0);
3986 }
3987
3988 static int mdt_destroy_export(struct obd_export *export)
3989 {
3990         struct mdt_export_data *med;
3991         struct obd_device      *obd = export->exp_obd;
3992         struct mdt_device      *mdt;
3993         struct mdt_thread_info *info;
3994         struct lu_env           env;
3995         struct md_attr         *ma;
3996         int lmm_size;
3997         int cookie_size;
3998         int rc = 0;
3999         ENTRY;
4000
4001         med = &export->exp_mdt_data;
4002         if (med->med_rmtclient)
4003                 mdt_cleanup_idmap(med);
4004
4005         target_destroy_export(export);
4006
4007         if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
4008                 RETURN(0);
4009
4010         mdt = mdt_dev(obd->obd_lu_dev);
4011         LASSERT(mdt != NULL);
4012
4013         rc = lu_env_init(&env, NULL, LCT_MD_THREAD);
4014         if (rc)
4015                 RETURN(rc);
4016
4017         info = lu_context_key_get(&env.le_ctx, &mdt_thread_key);
4018         LASSERT(info != NULL);
4019         memset(info, 0, sizeof *info);
4020         info->mti_env = &env;
4021         info->mti_mdt = mdt;
4022
4023         ma = &info->mti_attr;
4024         lmm_size = ma->ma_lmm_size = mdt->mdt_max_mdsize;
4025         cookie_size = ma->ma_cookie_size = mdt->mdt_max_cookiesize;
4026         OBD_ALLOC(ma->ma_lmm, lmm_size);
4027         OBD_ALLOC(ma->ma_cookie, cookie_size);
4028
4029         if (ma->ma_lmm == NULL || ma->ma_cookie == NULL)
4030                 GOTO(out, rc = -ENOMEM);
4031         ma->ma_need = MA_LOV | MA_COOKIE;
4032         ma->ma_valid = 0;
4033         /* Close any open files (which may also cause orphan unlinking). */
4034         spin_lock(&med->med_open_lock);
4035         while (!list_empty(&med->med_open_head)) {
4036                 struct list_head *tmp = med->med_open_head.next;
4037                 struct mdt_file_data *mfd =
4038                         list_entry(tmp, struct mdt_file_data, mfd_list);
4039
4040                 /* Remove mfd handle so it can't be found again.
4041                  * We are consuming the mfd_list reference here. */
4042                 class_handle_unhash(&mfd->mfd_handle);
4043                 list_del_init(&mfd->mfd_list);
4044                 spin_unlock(&med->med_open_lock);
4045                 mdt_mfd_close(info, mfd);
4046                 /* TODO: if we close the unlinked file,
4047                  * we need to remove it's objects from OST */
4048                 memset(&ma->ma_attr, 0, sizeof(ma->ma_attr));
4049                 spin_lock(&med->med_open_lock);
4050                 ma->ma_lmm_size = lmm_size;
4051                 ma->ma_cookie_size = cookie_size;
4052                 ma->ma_need = MA_LOV | MA_COOKIE;
4053                 ma->ma_valid = 0;
4054         }
4055         spin_unlock(&med->med_open_lock);
4056         info->mti_mdt = NULL;
4057         mdt_client_del(&env, mdt, med);
4058
4059         EXIT;
4060 out:
4061         if (lmm_size) {
4062                 OBD_FREE(ma->ma_lmm, lmm_size);
4063                 ma->ma_lmm = NULL;
4064         }
4065         if (cookie_size) {
4066                 OBD_FREE(ma->ma_cookie, cookie_size);
4067                 ma->ma_cookie = NULL;
4068         }
4069         lu_env_fini(&env);
4070
4071         return rc;
4072 }
4073
4074 static int mdt_upcall(const struct lu_env *env, struct md_device *md,
4075                       enum md_upcall_event ev)
4076 {
4077         struct mdt_device *m = mdt_dev(&md->md_lu_dev);
4078         struct md_device  *next  = m->mdt_child;
4079         struct mdt_thread_info *mti;
4080         int rc = 0;
4081         ENTRY;
4082
4083         switch (ev) {
4084                 case MD_LOV_SYNC:
4085                         rc = next->md_ops->mdo_maxsize_get(env, next,
4086                                         &m->mdt_max_mdsize,
4087                                         &m->mdt_max_cookiesize);
4088                         CDEBUG(D_INFO, "get max mdsize %d max cookiesize %d\n",
4089                                      m->mdt_max_mdsize, m->mdt_max_cookiesize);
4090                         break;
4091                 case MD_NO_TRANS:
4092                         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
4093                         mti->mti_no_need_trans = 1;
4094                         CDEBUG(D_INFO, "disable mdt trans for this thread\n");
4095                         break;
4096                 default:
4097                         CERROR("invalid event\n");
4098                         rc = -EINVAL;
4099                         break;
4100         }
4101         RETURN(rc);
4102 }
4103
4104 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
4105                          void *karg, void *uarg)
4106 {
4107         struct lu_env      env;
4108         struct obd_device *obd= exp->exp_obd;
4109         struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
4110         struct dt_device  *dt = mdt->mdt_bottom;
4111         int rc;
4112
4113         ENTRY;
4114         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
4115         rc = lu_env_init(&env, NULL, LCT_MD_THREAD);
4116         if (rc)
4117                 RETURN(rc);
4118
4119         switch (cmd) {
4120         case OBD_IOC_SYNC:
4121                 rc = dt->dd_ops->dt_sync(&env, dt);
4122                 break;
4123
4124         case OBD_IOC_SET_READONLY:
4125                 rc = dt->dd_ops->dt_sync(&env, dt);
4126                 dt->dd_ops->dt_ro(&env, dt);
4127                 break;
4128
4129         case OBD_IOC_ABORT_RECOVERY:
4130                 CERROR("aborting recovery for device %s\n", obd->obd_name);
4131                 target_stop_recovery_thread(obd);
4132                 break;
4133
4134         default:
4135                 CERROR("not supported cmd = %d for device %s\n",
4136                        cmd, obd->obd_name);
4137                 rc = -EOPNOTSUPP;
4138         }
4139
4140         lu_env_fini(&env);
4141         RETURN(rc);
4142 }
4143
4144 int mdt_postrecov(const struct lu_env *env, struct mdt_device *mdt)
4145 {
4146         struct lu_device *ld = md2lu_dev(mdt->mdt_child);
4147         int rc;
4148         ENTRY;
4149         rc = ld->ld_ops->ldo_recovery_complete(env, ld);
4150         RETURN(rc);
4151 }
4152
4153 int mdt_obd_postrecov(struct obd_device *obd)
4154 {
4155         struct lu_env env;
4156         int rc;
4157
4158         rc = lu_env_init(&env, NULL, LCT_MD_THREAD);
4159         if (rc)
4160                 RETURN(rc);
4161         rc = mdt_postrecov(&env, mdt_dev(obd->obd_lu_dev));
4162         lu_env_fini(&env);
4163         return rc;
4164 }
4165
4166 static struct obd_ops mdt_obd_device_ops = {
4167         .o_owner          = THIS_MODULE,
4168         .o_connect        = mdt_obd_connect,
4169         .o_reconnect      = mdt_obd_reconnect,
4170         .o_disconnect     = mdt_obd_disconnect,
4171         .o_init_export    = mdt_init_export,
4172         .o_destroy_export = mdt_destroy_export,
4173         .o_iocontrol      = mdt_iocontrol,
4174         .o_postrecov      = mdt_obd_postrecov
4175
4176 };
4177
4178 static struct lu_device* mdt_device_fini(const struct lu_env *env,
4179                                          struct lu_device *d)
4180 {
4181         struct mdt_device *m = mdt_dev(d);
4182
4183         mdt_fini(env, m);
4184         RETURN(NULL);
4185 }
4186
4187 static void mdt_device_free(const struct lu_env *env, struct lu_device *d)
4188 {
4189         struct mdt_device *m = mdt_dev(d);
4190
4191         OBD_FREE_PTR(m);
4192 }
4193
4194 static struct lu_device *mdt_device_alloc(const struct lu_env *env,
4195                                           struct lu_device_type *t,
4196                                           struct lustre_cfg *cfg)
4197 {
4198         struct lu_device  *l;
4199         struct mdt_device *m;
4200
4201         OBD_ALLOC_PTR(m);
4202         if (m != NULL) {
4203                 int rc;
4204
4205                 l = &m->mdt_md_dev.md_lu_dev;
4206                 rc = mdt_init0(env, m, t, cfg);
4207                 if (rc != 0) {
4208                         OBD_FREE_PTR(m);
4209                         l = ERR_PTR(rc);
4210                         return l;
4211                 }
4212                 m->mdt_md_dev.md_upcall.mu_upcall = mdt_upcall;
4213         } else
4214                 l = ERR_PTR(-ENOMEM);
4215         return l;
4216 }
4217
4218 /*
4219  * context key constructor/destructor
4220  */
4221 static void *mdt_key_init(const struct lu_context *ctx,
4222                           struct lu_context_key *key)
4223 {
4224         struct mdt_thread_info *info;
4225
4226         /*
4227          * check that no high order allocations are incurred.
4228          */
4229         CLASSERT(CFS_PAGE_SIZE >= sizeof *info);
4230         OBD_ALLOC_PTR(info);
4231         if (info == NULL)
4232                 info = ERR_PTR(-ENOMEM);
4233         return info;
4234 }
4235
4236 static void mdt_key_fini(const struct lu_context *ctx,
4237                          struct lu_context_key *key, void *data)
4238 {
4239         struct mdt_thread_info *info = data;
4240         OBD_FREE_PTR(info);
4241 }
4242
4243 struct lu_context_key mdt_thread_key = {
4244         .lct_tags = LCT_MD_THREAD,
4245         .lct_init = mdt_key_init,
4246         .lct_fini = mdt_key_fini
4247 };
4248
4249 static void *mdt_txn_key_init(const struct lu_context *ctx,
4250                               struct lu_context_key *key)
4251 {
4252         struct mdt_txn_info *txi;
4253
4254         /*
4255          * check that no high order allocations are incurred.
4256          */
4257         CLASSERT(CFS_PAGE_SIZE >= sizeof *txi);
4258         OBD_ALLOC_PTR(txi);
4259         if (txi == NULL)
4260                 txi = ERR_PTR(-ENOMEM);
4261         return txi;
4262 }
4263
4264 static void mdt_txn_key_fini(const struct lu_context *ctx,
4265                              struct lu_context_key *key, void *data)
4266 {
4267         struct mdt_txn_info *txi = data;
4268         OBD_FREE_PTR(txi);
4269 }
4270
4271 struct lu_context_key mdt_txn_key = {
4272         .lct_tags = LCT_TX_HANDLE,
4273         .lct_init = mdt_txn_key_init,
4274         .lct_fini = mdt_txn_key_fini
4275 };
4276
4277 struct md_ucred *mdt_ucred(const struct mdt_thread_info *info)
4278 {
4279         return md_ucred(info->mti_env);
4280 }
4281
4282 static int mdt_type_init(struct lu_device_type *t)
4283 {
4284         int rc;
4285
4286         rc = lu_context_key_register(&mdt_thread_key);
4287         if (rc == 0)
4288                 rc = lu_context_key_register(&mdt_txn_key);
4289         return rc;
4290 }
4291
4292 static void mdt_type_fini(struct lu_device_type *t)
4293 {
4294         lu_context_key_degister(&mdt_thread_key);
4295         lu_context_key_degister(&mdt_txn_key);
4296 }
4297
4298 static struct lu_device_type_operations mdt_device_type_ops = {
4299         .ldto_init = mdt_type_init,
4300         .ldto_fini = mdt_type_fini,
4301
4302         .ldto_device_alloc = mdt_device_alloc,
4303         .ldto_device_free  = mdt_device_free,
4304         .ldto_device_fini  = mdt_device_fini
4305 };
4306
4307 static struct lu_device_type mdt_device_type = {
4308         .ldt_tags     = LU_DEVICE_MD,
4309         .ldt_name     = LUSTRE_MDT_NAME,
4310         .ldt_ops      = &mdt_device_type_ops,
4311         .ldt_ctx_tags = LCT_MD_THREAD
4312 };
4313
4314 static int __init mdt_mod_init(void)
4315 {
4316         struct lprocfs_static_vars lvars;
4317         int rc;
4318
4319         mdt_num_threads = MDT_NUM_THREADS;
4320         lprocfs_init_vars(mdt, &lvars);
4321         rc = class_register_type(&mdt_obd_device_ops, NULL,
4322                                  lvars.module_vars, LUSTRE_MDT_NAME,
4323                                  &mdt_device_type);
4324
4325         return rc;
4326 }
4327
4328 static void __exit mdt_mod_exit(void)
4329 {
4330         class_unregister_type(LUSTRE_MDT_NAME);
4331 }
4332
4333
4334 #define DEF_HNDL(prefix, base, suffix, flags, opc, fn, fmt)             \
4335 [prefix ## _ ## opc - prefix ## _ ## base] = {                          \
4336         .mh_name    = #opc,                                             \
4337         .mh_fail_id = OBD_FAIL_ ## prefix ## _  ## opc ## suffix,       \
4338         .mh_opc     = prefix ## _  ## opc,                              \
4339         .mh_flags   = flags,                                            \
4340         .mh_act     = fn,                                               \
4341         .mh_fmt     = fmt                                               \
4342 }
4343
4344 #define DEF_MDT_HNDL(flags, name, fn, fmt)                                  \
4345         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, fmt)
4346
4347 #define DEF_SEQ_HNDL(flags, name, fn, fmt)                      \
4348         DEF_HNDL(SEQ, QUERY, _NET, flags, name, fn, fmt)
4349
4350 #define DEF_FLD_HNDL(flags, name, fn, fmt)                      \
4351         DEF_HNDL(FLD, QUERY, _NET, flags, name, fn, fmt)
4352 /*
4353  * Request with a format known in advance
4354  */
4355 #define DEF_MDT_HNDL_F(flags, name, fn)                                 \
4356         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, &RQF_MDS_ ## name)
4357
4358 #define DEF_SEQ_HNDL_F(flags, name, fn)                                 \
4359         DEF_HNDL(SEQ, QUERY, _NET, flags, name, fn, &RQF_SEQ_ ## name)
4360
4361 #define DEF_FLD_HNDL_F(flags, name, fn)                                 \
4362         DEF_HNDL(FLD, QUERY, _NET, flags, name, fn, &RQF_FLD_ ## name)
4363 /*
4364  * Request with a format we do not yet know
4365  */
4366 #define DEF_MDT_HNDL_0(flags, name, fn)                                 \
4367         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, NULL)
4368
4369 static struct mdt_handler mdt_mds_ops[] = {
4370 DEF_MDT_HNDL_F(0,                         CONNECT,      mdt_connect),
4371 DEF_MDT_HNDL_F(0,                         DISCONNECT,   mdt_disconnect),
4372 DEF_MDT_HNDL_F(0           |HABEO_REFERO, GETSTATUS,    mdt_getstatus),
4373 DEF_MDT_HNDL_F(HABEO_CORPUS             , GETATTR,      mdt_getattr),
4374 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, GETATTR_NAME, mdt_getattr_name),
4375 DEF_MDT_HNDL_F(HABEO_CORPUS|MUTABOR,      SETXATTR,     mdt_setxattr),
4376 DEF_MDT_HNDL_F(HABEO_CORPUS,              GETXATTR,     mdt_getxattr),
4377 DEF_MDT_HNDL_F(0           |HABEO_REFERO, STATFS,       mdt_statfs),
4378 DEF_MDT_HNDL_F(0                        |MUTABOR,
4379                                           REINT,        mdt_reint),
4380 DEF_MDT_HNDL_F(HABEO_CORPUS             , CLOSE,        mdt_close),
4381 DEF_MDT_HNDL_F(HABEO_CORPUS             , DONE_WRITING, mdt_done_writing),
4382 DEF_MDT_HNDL_F(0           |HABEO_REFERO, PIN,          mdt_pin),
4383 DEF_MDT_HNDL_0(0,                         SYNC,         mdt_sync),
4384 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, IS_SUBDIR,    mdt_is_subdir),
4385 DEF_MDT_HNDL_0(0,                         QUOTACHECK,   mdt_quotacheck_handle),
4386 DEF_MDT_HNDL_0(0,                         QUOTACTL,     mdt_quotactl_handle)
4387 };
4388
4389 #define DEF_OBD_HNDL(flags, name, fn)                   \
4390         DEF_HNDL(OBD, PING, _NET, flags, name, fn, NULL)
4391
4392
4393 static struct mdt_handler mdt_obd_ops[] = {
4394         DEF_OBD_HNDL(0, PING,           mdt_obd_ping),
4395         DEF_OBD_HNDL(0, LOG_CANCEL,     mdt_obd_log_cancel),
4396         DEF_OBD_HNDL(0, QC_CALLBACK,    mdt_obd_qc_callback)
4397 };
4398
4399 #define DEF_DLM_HNDL_0(flags, name, fn)                   \
4400         DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, NULL)
4401 #define DEF_DLM_HNDL_F(flags, name, fn)                   \
4402         DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, &RQF_LDLM_ ## name)
4403
4404 static struct mdt_handler mdt_dlm_ops[] = {
4405         DEF_DLM_HNDL_F(HABEO_CLAVIS, ENQUEUE,        mdt_enqueue),
4406         DEF_DLM_HNDL_0(HABEO_CLAVIS, CONVERT,        mdt_convert),
4407         DEF_DLM_HNDL_0(0,            BL_CALLBACK,    mdt_bl_callback),
4408         DEF_DLM_HNDL_0(0,            CP_CALLBACK,    mdt_cp_callback)
4409 };
4410
4411 static struct mdt_handler mdt_llog_ops[] = {
4412 };
4413
4414 #define DEF_SEC_CTX_HNDL(name, fn)                      \
4415         DEF_HNDL(SEC_CTX, INIT, _NET, 0, name, fn, NULL)
4416
4417 static struct mdt_handler mdt_sec_ctx_ops[] = {
4418         DEF_SEC_CTX_HNDL(INIT,          mdt_sec_ctx_handle),
4419         DEF_SEC_CTX_HNDL(INIT_CONT,     mdt_sec_ctx_handle),
4420         DEF_SEC_CTX_HNDL(FINI,          mdt_sec_ctx_handle)
4421 };
4422
4423 static struct mdt_opc_slice mdt_regular_handlers[] = {
4424         {
4425                 .mos_opc_start = MDS_GETATTR,
4426                 .mos_opc_end   = MDS_LAST_OPC,
4427                 .mos_hs        = mdt_mds_ops
4428         },
4429         {
4430                 .mos_opc_start = OBD_PING,
4431                 .mos_opc_end   = OBD_LAST_OPC,
4432                 .mos_hs        = mdt_obd_ops
4433         },
4434         {
4435                 .mos_opc_start = LDLM_ENQUEUE,
4436                 .mos_opc_end   = LDLM_LAST_OPC,
4437                 .mos_hs        = mdt_dlm_ops
4438         },
4439         {
4440                 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
4441                 .mos_opc_end   = LLOG_LAST_OPC,
4442                 .mos_hs        = mdt_llog_ops
4443         },
4444         {
4445                 .mos_opc_start = SEC_CTX_INIT,
4446                 .mos_opc_end   = SEC_LAST_OPC,
4447                 .mos_hs        = mdt_sec_ctx_ops
4448         },
4449         {
4450                 .mos_hs        = NULL
4451         }
4452 };
4453
4454 static struct mdt_handler mdt_readpage_ops[] = {
4455         DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, READPAGE, mdt_readpage),
4456 #ifdef HAVE_SPLIT_SUPPORT
4457         DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, WRITEPAGE, mdt_writepage),
4458 #endif
4459
4460         /*
4461          * XXX: this is ugly and should be fixed one day, see mdc_close() for
4462          * detailed comments. --umka
4463          */
4464         DEF_MDT_HNDL_F(HABEO_CORPUS,              CLOSE,    mdt_close),
4465         DEF_MDT_HNDL_F(HABEO_CORPUS,              DONE_WRITING,    mdt_done_writing),
4466 };
4467
4468 static struct mdt_opc_slice mdt_readpage_handlers[] = {
4469         {
4470                 .mos_opc_start = MDS_GETATTR,
4471                 .mos_opc_end   = MDS_LAST_OPC,
4472                 .mos_hs        = mdt_readpage_ops
4473         },
4474         {
4475                 .mos_hs        = NULL
4476         }
4477 };
4478
4479 static struct mdt_handler mdt_seq_ops[] = {
4480         DEF_SEQ_HNDL_F(0, QUERY, (int (*)(struct mdt_thread_info *))seq_query)
4481 };
4482
4483 static struct mdt_opc_slice mdt_seq_handlers[] = {
4484         {
4485                 .mos_opc_start = SEQ_QUERY,
4486                 .mos_opc_end   = SEQ_LAST_OPC,
4487                 .mos_hs        = mdt_seq_ops
4488         },
4489         {
4490                 .mos_hs        = NULL
4491         }
4492 };
4493
4494 static struct mdt_handler mdt_fld_ops[] = {
4495         DEF_FLD_HNDL_F(0, QUERY, (int (*)(struct mdt_thread_info *))fld_query)
4496 };
4497
4498 static struct mdt_opc_slice mdt_fld_handlers[] = {
4499         {
4500                 .mos_opc_start = FLD_QUERY,
4501                 .mos_opc_end   = FLD_LAST_OPC,
4502                 .mos_hs        = mdt_fld_ops
4503         },
4504         {
4505                 .mos_hs        = NULL
4506         }
4507 };
4508
4509 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
4510 MODULE_DESCRIPTION("Lustre Meta-data Target ("LUSTRE_MDT_NAME")");
4511 MODULE_LICENSE("GPL");
4512
4513 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
4514                 "number of mdt service threads to start");
4515
4516 cfs_module(mdt, "0.2.0", mdt_mod_init, mdt_mod_exit);