Whamcloud - gitweb
(1) Drop unnecessary permission check for name_{insert,remove}.
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  lustre/mdt/mdt_handler.c
5  *  Lustre Metadata Target (mdt) request handler
6  *
7  *  Copyright (c) 2006 Cluster File Systems, Inc.
8  *   Author: Peter Braam <braam@clusterfs.com>
9  *   Author: Andreas Dilger <adilger@clusterfs.com>
10  *   Author: Phil Schwan <phil@clusterfs.com>
11  *   Author: Mike Shaver <shaver@clusterfs.com>
12  *   Author: Nikita Danilov <nikita@clusterfs.com>
13  *   Author: Huang Hua <huanghua@clusterfs.com>
14  *   Author: Yury Umanets <umka@clusterfs.com>
15  *
16  *   This file is part of the Lustre file system, http://www.lustre.org
17  *   Lustre is a trademark of Cluster File Systems, Inc.
18  *
19  *   You may have signed or agreed to another license before downloading
20  *   this software.  If so, you are bound by the terms and conditions
21  *   of that agreement, and the following does not apply to you.  See the
22  *   LICENSE file included with this distribution for more information.
23  *
24  *   If you did not agree to a different license, then this copy of Lustre
25  *   is open source software; you can redistribute it and/or modify it
26  *   under the terms of version 2 of the GNU General Public License as
27  *   published by the Free Software Foundation.
28  *
29  *   In either case, Lustre is distributed in the hope that it will be
30  *   useful, but WITHOUT ANY WARRANTY; without even the implied warranty
31  *   of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
32  *   license text for more details.
33  */
34
35 #ifndef EXPORT_SYMTAB
36 # define EXPORT_SYMTAB
37 #endif
38 #define DEBUG_SUBSYSTEM S_MDS
39
40 #include <linux/module.h>
41
42 /* LUSTRE_VERSION_CODE */
43 #include <lustre_ver.h>
44 /*
45  * struct OBD_{ALLOC,FREE}*()
46  * MDT_FAIL_CHECK
47  */
48 #include <obd_support.h>
49 /* struct ptlrpc_request */
50 #include <lustre_net.h>
51 /* struct obd_export */
52 #include <lustre_export.h>
53 /* struct obd_device */
54 #include <obd.h>
55 /* lu2dt_dev() */
56 #include <dt_object.h>
57 #include <lustre_mds.h>
58 #include <lustre_mdt.h>
59 #include "mdt_internal.h"
60 #include <linux/lustre_acl.h>
61 #include <lustre_param.h>
62
63 mdl_mode_t mdt_mdl_lock_modes[] = {
64         [LCK_MINMODE] = MDL_MINMODE,
65         [LCK_EX]      = MDL_EX,
66         [LCK_PW]      = MDL_PW,
67         [LCK_PR]      = MDL_PR,
68         [LCK_CW]      = MDL_CW,
69         [LCK_CR]      = MDL_CR,
70         [LCK_NL]      = MDL_NL,
71         [LCK_GROUP]   = MDL_GROUP
72 };
73
74 ldlm_mode_t mdt_dlm_lock_modes[] = {
75         [MDL_MINMODE] = LCK_MINMODE,
76         [MDL_EX]      = LCK_EX,
77         [MDL_PW]      = LCK_PW,
78         [MDL_PR]      = LCK_PR,
79         [MDL_CW]      = LCK_CW,
80         [MDL_CR]      = LCK_CR,
81         [MDL_NL]      = LCK_NL,
82         [MDL_GROUP]   = LCK_GROUP
83 };
84
85 /*
86  * Initialized in mdt_mod_init().
87  */
88 unsigned long mdt_num_threads;
89
90 /* ptlrpc request handler for MDT. All handlers are
91  * grouped into several slices - struct mdt_opc_slice,
92  * and stored in an array - mdt_handlers[].
93  */
94 struct mdt_handler {
95         /* The name of this handler. */
96         const char *mh_name;
97         /* Fail id for this handler, checked at the beginning of this handler*/
98         int         mh_fail_id;
99         /* Operation code for this handler */
100         __u32       mh_opc;
101         /* flags are listed in enum mdt_handler_flags below. */
102         __u32       mh_flags;
103         /* The actual handler function to execute. */
104         int (*mh_act)(struct mdt_thread_info *info);
105         /* Request format for this request. */
106         const struct req_format *mh_fmt;
107 };
108
109 enum mdt_handler_flags {
110         /*
111          * struct mdt_body is passed in the incoming message, and object
112          * identified by this fid exists on disk.
113          *
114          * "habeo corpus" == "I have a body"
115          */
116         HABEO_CORPUS = (1 << 0),
117         /*
118          * struct ldlm_request is passed in the incoming message.
119          *
120          * "habeo clavis" == "I have a key"
121          */
122         HABEO_CLAVIS = (1 << 1),
123         /*
124          * this request has fixed reply format, so that reply message can be
125          * packed by generic code.
126          *
127          * "habeo refero" == "I have a reply"
128          */
129         HABEO_REFERO = (1 << 2),
130         /*
131          * this request will modify something, so check whether the filesystem
132          * is readonly or not, then return -EROFS to client asap if necessary.
133          *
134          * "mutabor" == "I shall modify"
135          */
136         MUTABOR      = (1 << 3)
137 };
138
139 struct mdt_opc_slice {
140         __u32               mos_opc_start;
141         int                 mos_opc_end;
142         struct mdt_handler *mos_hs;
143 };
144
145 static struct mdt_opc_slice mdt_regular_handlers[];
146 static struct mdt_opc_slice mdt_readpage_handlers[];
147 static struct mdt_opc_slice mdt_seq_handlers[];
148 static struct mdt_opc_slice mdt_fld_handlers[];
149
150 static struct mdt_device *mdt_dev(struct lu_device *d);
151 static int mdt_regular_handle(struct ptlrpc_request *req);
152 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags);
153
154 static struct lu_object_operations mdt_obj_ops;
155
156 int mdt_get_disposition(struct ldlm_reply *rep, int flag)
157 {
158         if (!rep)
159                 return 0;
160         return (rep->lock_policy_res1 & flag);
161 }
162
163 void mdt_clear_disposition(struct mdt_thread_info *info,
164                            struct ldlm_reply *rep, int flag)
165 {
166         if (info)
167                 info->mti_opdata &= ~flag;
168         if (rep)
169                 rep->lock_policy_res1 &= ~flag;
170 }
171
172 void mdt_set_disposition(struct mdt_thread_info *info,
173                          struct ldlm_reply *rep, int flag)
174 {
175         if (info)
176                 info->mti_opdata |= flag;
177         if (rep)
178                 rep->lock_policy_res1 |= flag;
179 }
180
181 void mdt_lock_reg_init(struct mdt_lock_handle *lh, ldlm_mode_t lm)
182 {
183         lh->mlh_pdo_hash = 0;
184         lh->mlh_reg_mode = lm;
185         lh->mlh_type = MDT_REG_LOCK;
186 }
187
188 void mdt_lock_pdo_init(struct mdt_lock_handle *lh, ldlm_mode_t lm,
189                        const char *name, int namelen)
190 {
191         lh->mlh_reg_mode = lm;
192         lh->mlh_type = MDT_PDO_LOCK;
193
194         if (name != NULL) {
195                 LASSERT(namelen > 0);
196                 lh->mlh_pdo_hash = full_name_hash(name, namelen - 1);
197         } else {
198                 LASSERT(namelen == 0);
199                 lh->mlh_pdo_hash = 0ull;
200         }
201 }
202
203 static void mdt_lock_pdo_mode(struct mdt_thread_info *info, struct mdt_object *o,
204                               struct mdt_lock_handle *lh)
205 {
206         mdl_mode_t mode;
207         ENTRY;
208
209         /*
210          * Any dir access needs couple of locks:
211          *
212          * 1) on part of dir we gonna take lookup/modify;
213          *
214          * 2) on whole dir to protect it from concurrent splitting and/or to
215          * flush client's cache for readdir().
216          *
217          * so, for a given mode and object this routine decides what lock mode
218          * to use for lock #2:
219          *
220          * 1) if caller's gonna lookup in dir then we need to protect dir from
221          * being splitted only - LCK_CR
222          *
223          * 2) if caller's gonna modify dir then we need to protect dir from
224          * being splitted and to flush cache - LCK_CW
225          *
226          * 3) if caller's gonna modify dir and that dir seems ready for
227          * splitting then we need to protect it from any type of access
228          * (lookup/modify/split) - LCK_EX --bzzz
229          */
230
231         LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
232         LASSERT(lh->mlh_pdo_mode == LCK_MINMODE);
233
234         /*
235          * No pdo locks possible on not existing objects, because pdo lock is
236          * taken on parent dir and parent can't be absent.
237          */
238         LASSERT(mdt_object_exists(o) > 0);
239
240         /*
241          * Ask underlaying level its opinion about preferable PDO lock mode
242          * having access type passed as regular lock mode:
243          *
244          * - MDL_MINMODE means that lower layer does not want to specify lock
245          * mode;
246          *
247          * - MDL_NL means that no PDO lock should be taken. This is used in some
248          * cases. Say, for non-splittable directories no need to use PDO locks
249          * at all.
250          */
251         mode = mdo_lock_mode(info->mti_env, mdt_object_child(o),
252                              mdt_dlm_mode2mdl_mode(lh->mlh_reg_mode));
253
254         if (mode != MDL_MINMODE) {
255                 lh->mlh_pdo_mode = mdt_mdl_mode2dlm_mode(mode);
256         } else {
257                 /*
258                  * Lower layer does not want to specify locking mode. We do it
259                  * our selves. No special protection is needed, just flush
260                  * client's cache on modification and allow concurrent
261                  * mondification.
262                  */
263                 switch (lh->mlh_reg_mode) {
264                 case LCK_EX:
265                         lh->mlh_pdo_mode = LCK_EX;
266                         break;
267                 case LCK_PR:
268                         lh->mlh_pdo_mode = LCK_CR;
269                         break;
270                 case LCK_PW:
271                         lh->mlh_pdo_mode = LCK_CW;
272                         break;
273                 default:
274                         CERROR("Not expected lock type (0x%x)\n",
275                                (int)lh->mlh_reg_mode);
276                         LBUG();
277                 }
278         }
279
280         LASSERT(lh->mlh_pdo_mode != LCK_MINMODE);
281         EXIT;
282 }
283
284 static int mdt_getstatus(struct mdt_thread_info *info)
285 {
286         struct mdt_device *mdt  = info->mti_mdt;
287         struct md_device  *next = mdt->mdt_child;
288         struct mdt_body   *repbody;
289         int                rc;
290
291         ENTRY;
292
293         rc = mdt_check_ucred(info);
294         if (rc)
295                 RETURN(err_serious(rc));
296
297         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
298                 RETURN(err_serious(-ENOMEM));
299
300         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
301         rc = next->md_ops->mdo_root_get(info->mti_env, next, &repbody->fid1);
302         if (rc != 0)
303                 RETURN(rc);
304
305         repbody->valid |= OBD_MD_FLID;
306
307         if (mdt->mdt_opts.mo_mds_capa) {
308                 struct mdt_object  *root;
309                 struct lustre_capa *capa;
310
311                 root = mdt_object_find(info->mti_env, mdt, &repbody->fid1);
312                 if (IS_ERR(root))
313                         RETURN(PTR_ERR(root));
314
315                 capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
316                 LASSERT(capa);
317                 capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
318
319                 rc = mo_capa_get(info->mti_env, mdt_object_child(root), capa,
320                                  0);
321                 mdt_object_put(info->mti_env, root);
322                 if (rc == 0)
323                         repbody->valid |= OBD_MD_FLMDSCAPA;
324         }
325
326         RETURN(rc);
327 }
328
329 static int mdt_statfs(struct mdt_thread_info *info)
330 {
331         struct md_device  *next  = info->mti_mdt->mdt_child;
332         struct obd_statfs *osfs;
333         int                rc;
334
335         ENTRY;
336
337         /* This will trigger a watchdog timeout */
338         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
339                          (MDT_SERVICE_WATCHDOG_TIMEOUT / 1000) + 1);
340
341         rc = mdt_check_ucred(info);
342         if (rc)
343                 RETURN(err_serious(rc));
344
345         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK)) {
346                 rc = err_serious(-ENOMEM);
347         } else {
348                 osfs = req_capsule_server_get(&info->mti_pill,&RMF_OBD_STATFS);
349                 /* XXX max_age optimisation is needed here. See mds_statfs */
350                 rc = next->md_ops->mdo_statfs(info->mti_env, next,
351                                               &info->mti_u.ksfs);
352                 statfs_pack(osfs, &info->mti_u.ksfs);
353         }
354         RETURN(rc);
355 }
356
357 void mdt_pack_size2body(struct mdt_body *b, const struct lu_attr *attr,
358                         struct mdt_object *o)
359 {
360         /* Check if Size-on-MDS is enabled. */
361         if (S_ISREG(attr->la_mode) && mdt_sizeonmds_enabled(o)) {
362                 b->valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
363                 b->size = attr->la_size;
364                 b->blocks = attr->la_blocks;
365         }
366 }
367
368 void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
369                         const struct lu_attr *attr, const struct lu_fid *fid)
370 {
371         /*XXX should pack the reply body according to lu_valid*/
372         b->valid |= OBD_MD_FLCTIME | OBD_MD_FLUID   |
373                     OBD_MD_FLGID   | OBD_MD_FLTYPE  |
374                     OBD_MD_FLMODE  | OBD_MD_FLNLINK | OBD_MD_FLFLAGS |
375                     OBD_MD_FLATIME | OBD_MD_FLMTIME ;
376
377         if (!S_ISREG(attr->la_mode))
378                 b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLRDEV;
379
380         b->atime      = attr->la_atime;
381         b->mtime      = attr->la_mtime;
382         b->ctime      = attr->la_ctime;
383         b->mode       = attr->la_mode;
384         b->size       = attr->la_size;
385         b->blocks     = attr->la_blocks;
386         b->uid        = attr->la_uid;
387         b->gid        = attr->la_gid;
388         b->flags      = attr->la_flags;
389         b->nlink      = attr->la_nlink;
390         b->rdev       = attr->la_rdev;
391
392         if (fid) {
393                 b->fid1 = *fid;
394                 b->valid |= OBD_MD_FLID;
395                 CDEBUG(D_INODE, ""DFID": nlink=%d, mode=%o, size="LPU64"\n",
396                                 PFID(fid), b->nlink, b->mode, b->size);
397         }
398
399         if (info)
400                 mdt_body_reverse_idmap(info, b);
401 }
402
403 static inline int mdt_body_has_lov(const struct lu_attr *la,
404                                    const struct mdt_body *body)
405 {
406         return ((S_ISREG(la->la_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
407                 (S_ISDIR(la->la_mode) && (body->valid & OBD_MD_FLDIREA )) );
408 }
409
410 static int mdt_getattr_internal(struct mdt_thread_info *info,
411                                 struct mdt_object *o)
412 {
413         struct md_object        *next = mdt_object_child(o);
414         const struct mdt_body   *reqbody = info->mti_body;
415         struct ptlrpc_request   *req = mdt_info_req(info);
416         struct mdt_export_data  *med = &req->rq_export->exp_mdt_data;
417         struct md_attr          *ma = &info->mti_attr;
418         struct lu_attr          *la = &ma->ma_attr;
419         struct req_capsule      *pill = &info->mti_pill;
420         const struct lu_env     *env = info->mti_env;
421         struct mdt_body         *repbody;
422         struct lu_buf           *buffer = &info->mti_buf;
423         int                     rc;
424         ENTRY;
425
426         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
427                 RETURN(err_serious(-ENOMEM));
428
429         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
430
431         if (reqbody->valid & OBD_MD_MEA) {
432                 /* Assumption: MDT_MD size is enough for lmv size FIXME */
433                 ma->ma_lmv = req_capsule_server_get(pill, &RMF_MDT_MD);
434                 ma->ma_lmv_size = req_capsule_get_size(pill, &RMF_MDT_MD,
435                                                              RCL_SERVER);
436                 ma->ma_need = MA_INODE | MA_LMV;
437         } else {
438                 ma->ma_need = MA_INODE | MA_LOV ;
439                 ma->ma_lmm = req_capsule_server_get(pill, &RMF_MDT_MD);
440                 ma->ma_lmm_size = req_capsule_get_size(pill, &RMF_MDT_MD,
441                                                              RCL_SERVER);
442         }
443         ma->ma_valid = 0;
444         rc = mo_attr_get(env, next, ma);
445         if (rc == -EREMOTE) {
446                 /* This object is located on remote node.*/
447                 repbody->fid1 = *mdt_object_fid(o);
448                 repbody->valid = OBD_MD_FLID | OBD_MD_MDS;
449                 RETURN(0);
450         } else if (rc) {
451                 CERROR("getattr error for "DFID": %d\n",
452                         PFID(mdt_object_fid(o)), rc);
453                 RETURN(rc);
454         }
455
456         if (ma->ma_valid & MA_INODE)
457                 mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
458         else
459                 RETURN(-EFAULT);
460
461         if (mdt_body_has_lov(la, reqbody)) {
462                 if (ma->ma_valid & MA_LOV) {
463                         LASSERT(ma->ma_lmm_size);
464                         mdt_dump_lmm(D_INFO, ma->ma_lmm);
465                         repbody->eadatasize = ma->ma_lmm_size;
466                         if (S_ISDIR(la->la_mode))
467                                 repbody->valid |= OBD_MD_FLDIREA;
468                         else
469                                 repbody->valid |= OBD_MD_FLEASIZE;
470                 }
471                 if (ma->ma_valid & MA_LMV) {
472                         LASSERT(S_ISDIR(la->la_mode));
473                         repbody->eadatasize = ma->ma_lmv_size;
474                         repbody->valid |= OBD_MD_FLDIREA;
475                         repbody->valid |= OBD_MD_MEA;
476                 }
477         } else if (S_ISLNK(la->la_mode) &&
478                    reqbody->valid & OBD_MD_LINKNAME) {
479                 buffer->lb_buf = ma->ma_lmm;
480                 buffer->lb_len = reqbody->eadatasize;
481                 rc = mo_readlink(env, next, buffer);
482                 if (rc <= 0) {
483                         CERROR("readlink failed: %d\n", rc);
484                         rc = -EFAULT;
485                 } else {
486                         repbody->valid |= OBD_MD_LINKNAME;
487                         repbody->eadatasize = rc;
488                         ((char*)ma->ma_lmm)[rc - 1] = 0; /* NULL terminate */
489                         CDEBUG(D_INODE, "symlink dest %s, len = %d\n",
490                                         (char*)ma->ma_lmm, rc);
491                         rc = 0;
492                 }
493         }
494
495         if (reqbody->valid & OBD_MD_FLMODEASIZE) {
496                 repbody->max_cookiesize = info->mti_mdt->mdt_max_cookiesize;
497                 repbody->max_mdsize = info->mti_mdt->mdt_max_mdsize;
498                 repbody->valid |= OBD_MD_FLMODEASIZE;
499                 CDEBUG(D_INODE, "I am going to change the MAX_MD_SIZE & "
500                        "MAX_COOKIE to : %d:%d\n", repbody->max_mdsize,
501                        repbody->max_cookiesize);
502         }
503
504         if (med->med_rmtclient && (reqbody->valid & OBD_MD_FLRMTPERM)) {
505                 void *buf = req_capsule_server_get(pill, &RMF_ACL);
506
507                 /* mdt_getattr_lock only */
508                 rc = mdt_pack_remote_perm(info, o, buf);
509                 if (rc) {
510                         repbody->valid &= ~OBD_MD_FLRMTPERM;
511                         repbody->aclsize = 0;
512                         RETURN(rc);
513                 } else {
514                         repbody->valid |= OBD_MD_FLRMTPERM;
515                         repbody->aclsize = sizeof(struct mdt_remote_perm);
516                 }
517         }
518 #ifdef CONFIG_FS_POSIX_ACL
519         else if ((req->rq_export->exp_connect_flags & OBD_CONNECT_ACL) &&
520                  (reqbody->valid & OBD_MD_FLACL)) {
521                 buffer->lb_buf = req_capsule_server_get(pill, &RMF_ACL);
522                 buffer->lb_len = req_capsule_get_size(pill,
523                                                       &RMF_ACL, RCL_SERVER);
524                 if (buffer->lb_len > 0) {
525                         rc = mo_xattr_get(env, next, buffer,
526                                           XATTR_NAME_ACL_ACCESS);
527                         if (rc < 0) {
528                                 if (rc == -ENODATA) {
529                                         repbody->aclsize = 0;
530                                         repbody->valid |= OBD_MD_FLACL;
531                                         rc = 0;
532                                 } else if (rc == -EOPNOTSUPP) {
533                                         rc = 0;
534                                 } else {
535                                         CERROR("got acl size: %d\n", rc);
536                                 }
537                         } else {
538                                 repbody->aclsize = rc;
539                                 repbody->valid |= OBD_MD_FLACL;
540                                 rc = 0;
541                         }
542                 }
543         }
544 #endif
545
546         if ((reqbody->valid & OBD_MD_FLMDSCAPA) &&
547             info->mti_mdt->mdt_opts.mo_mds_capa) {
548                 struct lustre_capa *capa;
549
550                 capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
551                 LASSERT(capa);
552                 capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
553                 rc = mo_capa_get(env, next, capa, 0);
554                 if (rc)
555                         RETURN(rc);
556                 repbody->valid |= OBD_MD_FLMDSCAPA;
557         }
558         RETURN(rc);
559 }
560
561 static int mdt_renew_capa(struct mdt_thread_info *info)
562 {
563         struct mdt_device  *mdt = info->mti_mdt;
564         struct mdt_object  *obj = info->mti_object;
565         struct mdt_body    *body;
566         struct lustre_capa *capa, *c;
567         int rc;
568         ENTRY;
569
570         /* if object doesn't exist, or server has disabled capability,
571          * return directly, client will find body->valid OBD_MD_FLOSSCAPA
572          * flag not set.
573          */
574         if (!obj || !mdt->mdt_opts.mo_mds_capa)
575                 RETURN(0);
576
577         body = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
578         LASSERT(body != NULL);
579
580         c = req_capsule_client_get(&info->mti_pill, &RMF_CAPA1);
581         LASSERT(c);
582
583         capa = req_capsule_server_get(&info->mti_pill, &RMF_CAPA1);
584         LASSERT(capa);
585
586         *capa = *c;
587         rc = mo_capa_get(info->mti_env, mdt_object_child(obj), capa, 1);
588         if (rc == 0)
589                 body->valid |= OBD_MD_FLOSSCAPA;
590
591         RETURN(rc);
592 }
593
594 static int mdt_getattr(struct mdt_thread_info *info)
595 {
596         struct mdt_object       *obj = info->mti_object;
597         struct req_capsule      *pill = &info->mti_pill;
598         struct mdt_body         *reqbody;
599         struct mdt_body         *repbody;
600         mode_t                  mode;
601         int rc;
602         ENTRY;
603
604         mdt_lprocfs_time_start(info->mti_mdt, &info->mti_time,
605                                LPROC_MDT_GETATTR);
606         
607         reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
608         LASSERT(reqbody);
609
610         if (reqbody->valid & OBD_MD_FLOSSCAPA) {
611                 rc = req_capsule_pack(pill);
612                 if (rc)
613                         RETURN(err_serious(rc));
614                 rc = mdt_renew_capa(info);
615                 mdt_shrink_reply(info, REPLY_REC_OFF + 1, 0, 0);
616                 GOTO(out, rc);
617         }
618
619         LASSERT(obj != NULL);
620         LASSERT(lu_object_assert_exists(&obj->mot_obj.mo_lu));
621
622         mode = lu_object_attr(&obj->mot_obj.mo_lu);
623         if (S_ISLNK(mode) && (reqbody->valid & OBD_MD_LINKNAME) &&
624             (reqbody->eadatasize > info->mti_mdt->mdt_max_mdsize)) {
625                 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
626                                      reqbody->eadatasize);
627         } else {
628                 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
629                                      info->mti_mdt->mdt_max_mdsize);
630         }
631
632         rc = req_capsule_pack(pill);
633         if (rc != 0)
634                 GOTO(out, rc = err_serious(rc));
635
636         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
637         LASSERT(repbody != NULL);
638         repbody->eadatasize = 0;
639         repbody->aclsize = 0;
640
641         if (reqbody->valid & OBD_MD_FLRMTPERM)
642                 rc = mdt_init_ucred(info, reqbody);
643         else
644                 rc = mdt_check_ucred(info);
645         if (rc)
646                 GOTO(out_shrink, rc);
647
648         info->mti_spec.sp_ck_split = !!(reqbody->valid & OBD_MD_FLCKSPLIT);
649         info->mti_cross_ref = !!(reqbody->valid & OBD_MD_FLCROSSREF);
650         
651         /*
652          * Don't check capability at all, because rename might getattr for
653          * remote obj, and at that time no capability is available.
654          */
655         mdt_set_capainfo(info, 1, &reqbody->fid1, BYPASS_CAPA);
656         rc = mdt_getattr_internal(info, obj);
657         if (reqbody->valid & OBD_MD_FLRMTPERM)
658                 mdt_exit_ucred(info);
659         EXIT;
660 out_shrink:
661         mdt_shrink_reply(info, REPLY_REC_OFF + 1, 1, 0);
662 out:
663         mdt_lprocfs_time_end(info->mti_mdt, &info->mti_time,
664                              LPROC_MDT_GETATTR);
665         return rc;
666 }
667
668 static int mdt_is_subdir(struct mdt_thread_info *info)
669 {
670         struct mdt_object     *o = info->mti_object;
671         struct req_capsule    *pill = &info->mti_pill;
672         const struct mdt_body *body = info->mti_body;
673         struct mdt_body       *repbody;
674         int                    rc;
675         ENTRY;
676
677         LASSERT(o != NULL);
678
679         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
680
681         /*
682          * We save last checked parent fid to @repbody->fid1 for remote
683          * directory case.
684          */
685         LASSERT(fid_is_sane(&body->fid2));
686         mdt_set_capainfo(info, 0, &body->fid1, BYPASS_CAPA);
687         mdt_set_capainfo(info, 1, &body->fid2, BYPASS_CAPA);
688
689         LASSERT(mdt_object_exists(o) > 0);
690         rc = mdo_is_subdir(info->mti_env, mdt_object_child(o),
691                            &body->fid2, &repbody->fid1);
692         if (rc == 0 || rc == -EREMOTE)
693                 repbody->valid |= OBD_MD_FLID;
694
695         RETURN(rc);
696 }
697
698 static int mdt_raw_lookup(struct mdt_thread_info *info,
699                           struct mdt_object *parent,
700                           const char* name,
701                           struct ldlm_reply *ldlm_rep)
702 {
703         struct md_object *next = mdt_object_child(info->mti_object);
704         const struct mdt_body *reqbody = info->mti_body;
705         struct lu_fid *child_fid = &info->mti_tmp_fid1;
706         struct mdt_body *repbody;
707         int rc;
708         ENTRY;
709
710         if (reqbody->valid != OBD_MD_FLID)
711                 RETURN(0);
712
713         LASSERT(!info->mti_cross_ref);
714         
715         /* Only got the fid of this obj by name */
716         rc = mdo_lookup(info->mti_env, next, name, child_fid,
717                         &info->mti_spec);
718 #if 0
719         /* XXX is raw_lookup possible as intent operation? */
720         if (rc != 0) {
721                 if (rc == -ENOENT)
722                         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
723                 RETURN(rc);
724         } else
725                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
726
727         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
728 #endif
729         if (rc == 0) {
730                 repbody = req_capsule_server_get(&info->mti_pill,
731                                                  &RMF_MDT_BODY);
732                 repbody->fid1 = *child_fid;
733                 repbody->valid = OBD_MD_FLID;
734         }
735         RETURN(1);
736 }
737
738 /*
739  * UPDATE lock should be taken against parent, and be release before exit;
740  * child_bits lock should be taken against child, and be returned back:
741  *            (1)normal request should release the child lock;
742  *            (2)intent request will grant the lock to client.
743  */
744 static int mdt_getattr_name_lock(struct mdt_thread_info *info,
745                                  struct mdt_lock_handle *lhc,
746                                  __u64 child_bits,
747                                  struct ldlm_reply *ldlm_rep)
748 {
749         struct ptlrpc_request *req = mdt_info_req(info);
750         struct mdt_object     *parent = info->mti_object;
751         struct mdt_object     *child;
752         struct md_object      *next = mdt_object_child(info->mti_object);
753         struct lu_fid         *child_fid = &info->mti_tmp_fid1;
754         int                    is_resent, rc, namelen = 0;
755         const char            *name;
756         struct mdt_lock_handle *lhp;
757         struct ldlm_lock      *lock;
758         struct ldlm_res_id *res_id;
759         ENTRY;
760
761         is_resent = lustre_handle_is_used(&lhc->mlh_reg_lh);
762         if (is_resent)
763                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT);
764
765         LASSERT(info->mti_object != NULL);
766         name = req_capsule_client_get(&info->mti_pill, &RMF_NAME);
767         if (name == NULL)
768                 RETURN(err_serious(-EFAULT));
769
770         namelen = req_capsule_get_size(&info->mti_pill, &RMF_NAME,
771                                        RCL_CLIENT);
772
773         CDEBUG(D_INODE, "getattr with lock for "DFID"/%s, ldlm_rep = %p\n",
774                         PFID(mdt_object_fid(parent)), name, ldlm_rep);
775
776         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
777
778         rc = mdt_object_exists(parent);
779         if (rc == 0) {
780                 LU_OBJECT_DEBUG(D_WARNING, info->mti_env, &parent->mot_obj.mo_lu,
781                                 "Parent doesn't exist!\n");
782                 RETURN(-ESTALE);
783         }
784         else if (rc < 0) {
785                 CERROR("Object "DFID" locates on remote server\n",
786                        PFID(mdt_object_fid(parent)));
787                 LBUG();
788         }
789
790         rc = mdt_raw_lookup(info, parent, name, ldlm_rep);
791         if (rc != 0) {
792                 if (rc > 0)
793                         rc = 0;
794                 RETURN(rc);
795         }
796
797         if (info->mti_cross_ref) {
798                 /* Only getattr on the child. Parent is on another node. */
799                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
800                 child = parent;
801                 CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID", "
802                        "ldlm_rep=%p\n", PFID(mdt_object_fid(child)), ldlm_rep);
803
804                 if (is_resent) {
805                         /* Do not take lock for resent case. */
806                         lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
807                         if (!lock) {
808                                 CERROR("Invalid lock handle "LPX64"\n",
809                                        lhc->mlh_reg_lh.cookie);
810                                 LBUG();
811                         }
812                         LASSERT(fid_res_name_eq(mdt_object_fid(child),
813                                                 &lock->l_resource->lr_name));
814                         LDLM_LOCK_PUT(lock);
815                         rc = 0;
816                 } else {
817                         mdt_lock_handle_init(lhc);
818                         mdt_lock_reg_init(lhc, LCK_PR);
819
820                         /*
821                          * Object's name is on another MDS, no lookup lock is
822                          * needed here but update is.
823                          */
824                         child_bits &= ~MDS_INODELOCK_LOOKUP;
825                         child_bits |= MDS_INODELOCK_UPDATE;
826
827                         rc = mdt_object_lock(info, child, lhc, child_bits,
828                                              MDT_LOCAL_LOCK);
829                 }
830                 if (rc == 0) {
831                         /* Finally, we can get attr for child. */
832                         mdt_set_capainfo(info, 0, mdt_object_fid(child),
833                                          BYPASS_CAPA);
834                         rc = mdt_getattr_internal(info, child);
835                         if (rc != 0)
836                                 mdt_object_unlock(info, child, lhc, 1);
837                 }
838                 GOTO(out, rc);
839         }
840
841         /* step 1: lock parent */
842         lhp = &info->mti_lh[MDT_LH_PARENT];
843         mdt_lock_pdo_init(lhp, LCK_PR, name, namelen);
844         rc = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
845                              MDT_LOCAL_LOCK);
846         if (rc != 0)
847                 RETURN(rc);
848
849         /* step 2: lookup child's fid by name */
850         rc = mdo_lookup(info->mti_env, next, name, child_fid,
851                         &info->mti_spec);
852         if (rc != 0) {
853                 if (rc == -ENOENT)
854                         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
855                 GOTO(out_parent, rc);
856         } else
857                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
858         /*
859          *step 3: find the child object by fid & lock it.
860          *        regardless if it is local or remote.
861          */
862         child = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
863         if (IS_ERR(child))
864                 GOTO(out_parent, rc = PTR_ERR(child));
865         if (is_resent) {
866                 /* Do not take lock for resent case. */
867                 lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
868                 if (!lock) {
869                         CERROR("Invalid lock handle "LPX64"\n",
870                                lhc->mlh_reg_lh.cookie);
871                         LBUG();
872                 }
873                 res_id = &lock->l_resource->lr_name;
874                 LASSERTF(fid_res_name_eq(child_fid,
875                                          &lock->l_resource->lr_name),
876                         "Lock res_id: %lx/%lx/%lx/%lx, Fid: "DFID".\n",
877                         (unsigned long)res_id->name[0],
878                         (unsigned long)res_id->name[1],
879                         (unsigned long)res_id->name[2],
880                         (unsigned long)res_id->name[3],
881                         PFID(mdt_object_fid(child)));
882                 LDLM_LOCK_PUT(lock);
883                 rc = 0;
884         } else {
885                 mdt_lock_handle_init(lhc);
886                 mdt_lock_reg_init(lhc, LCK_PR);
887                 
888                 if (mdt_object_exists(child) == 0) {
889                         LU_OBJECT_DEBUG(D_WARNING, info->mti_env,
890                                         &child->mot_obj.mo_lu,
891                                         "Object doesn't exist!\n");
892                 }
893                 rc = mdt_object_lock(info, child, lhc, child_bits,
894                                      MDT_CROSS_LOCK);
895                 if (rc != 0)
896                         GOTO(out_child, rc);
897         }
898
899         /* finally, we can get attr for child. */
900         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
901         rc = mdt_getattr_internal(info, child);
902         if (rc != 0) {
903                 mdt_object_unlock(info, child, lhc, 1);
904         } else {
905                 lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
906                 if (lock) {
907                         struct mdt_body *repbody;
908                         struct lu_attr *ma;
909
910                         /* Debugging code. */
911                         res_id = &lock->l_resource->lr_name;
912                         LDLM_DEBUG(lock, "We will return this lock client\n");
913                         LASSERTF(fid_res_name_eq(mdt_object_fid(child),
914                                                  &lock->l_resource->lr_name),
915                                  "Lock res_id: %lu/%lu/%lu, Fid: "DFID".\n",
916                                  (unsigned long)res_id->name[0],
917                                  (unsigned long)res_id->name[1],
918                                  (unsigned long)res_id->name[2],
919                                  PFID(mdt_object_fid(child)));
920
921                         /*
922                          * Pack Size-on-MDS inode attributes to the body if
923                          * update lock is given.
924                          */
925                         repbody = req_capsule_server_get(&info->mti_pill,
926                                                          &RMF_MDT_BODY);
927                         ma = &info->mti_attr.ma_attr;
928                         if (lock->l_policy_data.l_inodebits.bits &
929                             MDS_INODELOCK_UPDATE)
930                                 mdt_pack_size2body(repbody, ma, child);
931                         LDLM_LOCK_PUT(lock);
932                 }
933         }
934         EXIT;
935 out_child:
936         mdt_object_put(info->mti_env, child);
937 out_parent:
938         mdt_object_unlock(info, parent, lhp, 1);
939 out:
940         return rc;
941 }
942
943 /* normal handler: should release the child lock */
944 static int mdt_getattr_name(struct mdt_thread_info *info)
945 {
946         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
947         struct mdt_body        *reqbody;
948         struct mdt_body        *repbody;
949         int rc;
950         ENTRY;
951
952         mdt_lprocfs_time_start(info->mti_mdt, &info->mti_time,
953                                LPROC_MDT_GETATTR_NAME);
954         
955         reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
956         LASSERT(reqbody != NULL);
957         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
958         LASSERT(repbody != NULL);
959
960         info->mti_spec.sp_ck_split = !!(reqbody->valid & OBD_MD_FLCKSPLIT);
961         info->mti_cross_ref = !!(reqbody->valid & OBD_MD_FLCROSSREF);
962         repbody->eadatasize = 0;
963         repbody->aclsize = 0;
964
965         rc = mdt_init_ucred(info, reqbody);
966         if (rc)
967                 GOTO(out, rc);
968
969         rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
970         if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
971                 ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode);
972                 lhc->mlh_reg_lh.cookie = 0;
973         }
974         mdt_exit_ucred(info);
975         EXIT;
976 out:
977         mdt_shrink_reply(info, REPLY_REC_OFF + 1, 1, 0);
978         mdt_lprocfs_time_end(info->mti_mdt, &info->mti_time,
979                              LPROC_MDT_GETATTR_NAME);
980         return rc;
981 }
982
983 static struct lu_device_operations mdt_lu_ops;
984
985 static int lu_device_is_mdt(struct lu_device *d)
986 {
987         return ergo(d != NULL && d->ld_ops != NULL, d->ld_ops == &mdt_lu_ops);
988 }
989
990 static int mdt_connect(struct mdt_thread_info *info)
991 {
992         int rc;
993         struct ptlrpc_request *req;
994
995         req = mdt_info_req(info);
996         rc = target_handle_connect(req);
997         if (rc == 0) {
998                 LASSERT(req->rq_export != NULL);
999                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
1000                 rc = mdt_init_idmap(info);
1001         } else
1002                 rc = err_serious(rc);
1003         return rc;
1004 }
1005
1006 static int mdt_disconnect(struct mdt_thread_info *info)
1007 {
1008         int rc;
1009
1010         rc = target_handle_disconnect(mdt_info_req(info));
1011         if (rc)
1012                 rc = err_serious(rc);
1013         return rc;
1014 }
1015
1016 static int mdt_sendpage(struct mdt_thread_info *info,
1017                         struct lu_rdpg *rdpg)
1018 {
1019         struct ptlrpc_request   *req = mdt_info_req(info);
1020         struct ptlrpc_bulk_desc *desc;
1021         struct l_wait_info      *lwi = &info->mti_u.rdpg.mti_wait_info;
1022         int                      tmpcount;
1023         int                      tmpsize;
1024         int                      i;
1025         int                      rc;
1026         ENTRY;
1027
1028         desc = ptlrpc_prep_bulk_exp(req, rdpg->rp_npages, BULK_PUT_SOURCE,
1029                                     MDS_BULK_PORTAL);
1030         if (desc == NULL)
1031                 GOTO(out, rc = -ENOMEM);
1032
1033         for (i = 0, tmpcount = rdpg->rp_count;
1034                 i < rdpg->rp_npages; i++, tmpcount -= tmpsize) {
1035                 tmpsize = min_t(int, tmpcount, CFS_PAGE_SIZE);
1036                 ptlrpc_prep_bulk_page(desc, rdpg->rp_pages[i], 0, tmpsize);
1037         }
1038
1039         LASSERT(desc->bd_nob == rdpg->rp_count);
1040         rc = ptlrpc_start_bulk_transfer(desc);
1041         if (rc)
1042                 GOTO(free_desc, rc);
1043
1044         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
1045                 GOTO(abort_bulk, rc);
1046
1047         *lwi = LWI_TIMEOUT(obd_timeout * HZ / 4, NULL, NULL);
1048         rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc), lwi);
1049         LASSERT (rc == 0 || rc == -ETIMEDOUT);
1050
1051         if (rc == 0) {
1052                 if (desc->bd_success &&
1053                     desc->bd_nob_transferred == rdpg->rp_count)
1054                         GOTO(free_desc, rc);
1055
1056                 rc = -ETIMEDOUT; /* XXX should this be a different errno? */
1057         }
1058
1059         DEBUG_REQ(D_ERROR, req, "bulk failed: %s %d(%d), evicting %s@%s\n",
1060                   (rc == -ETIMEDOUT) ? "timeout" : "network error",
1061                   desc->bd_nob_transferred, rdpg->rp_count,
1062                   req->rq_export->exp_client_uuid.uuid,
1063                   req->rq_export->exp_connection->c_remote_uuid.uuid);
1064
1065         class_fail_export(req->rq_export);
1066
1067         EXIT;
1068 abort_bulk:
1069         ptlrpc_abort_bulk(desc);
1070 free_desc:
1071         ptlrpc_free_bulk(desc);
1072 out:
1073         return rc;
1074 }
1075
1076 #ifdef HAVE_SPLIT_SUPPORT
1077 /*
1078  * Retrieve dir entry from the page and insert it to the slave object, actually,
1079  * this should be in osd layer, but since it will not in the final product, so
1080  * just do it here and do not define more moo api anymore for this.
1081  */
1082 static int mdt_write_dir_page(struct mdt_thread_info *info, struct page *page,
1083                               int size)
1084 {
1085         struct mdt_object *object = info->mti_object;
1086         int rc = 0, offset = 0, is_dir;
1087         struct lu_dirpage *dp;
1088         struct lu_dirent *ent;
1089         ENTRY;
1090
1091         /* Make sure we have at least one entry. */
1092         if (size == 0)
1093                 RETURN(-EINVAL);
1094
1095         /*
1096          * Disable trans for this name insert, since it will include many trans
1097          * for this.
1098          */
1099         info->mti_no_need_trans = 1;
1100
1101         kmap(page);
1102         dp = page_address(page);
1103         offset = (int)((__u32)lu_dirent_start(dp) - (__u32)dp);
1104
1105         for (ent = lu_dirent_start(dp); ent != NULL;
1106              ent = lu_dirent_next(ent)) {
1107                 struct lu_fid *lf = &info->mti_tmp_fid2;
1108                 char *name;
1109
1110                 if (le16_to_cpu(ent->lde_namelen) == 0)
1111                         continue;
1112
1113                 fid_le_to_cpu(lf, &ent->lde_fid);
1114                 is_dir = le32_to_cpu(ent->lde_hash) & MAX_HASH_HIGHEST_BIT;
1115                 OBD_ALLOC(name, le16_to_cpu(ent->lde_namelen) + 1);
1116                 if (name == NULL)
1117                         GOTO(out, rc = -ENOMEM);
1118
1119                 memcpy(name, ent->lde_name, le16_to_cpu(ent->lde_namelen));
1120                 /* No permission check for name_insert when write_dir_page */
1121                 rc = mdo_name_insert(info->mti_env,
1122                                      md_object_next(&object->mot_obj),
1123                                      name, lf, is_dir);
1124                 OBD_FREE(name, le16_to_cpu(ent->lde_namelen) + 1);
1125                 if (rc) {
1126                         CERROR("Can't insert %*.*s, rc %d\n",
1127                                le16_to_cpu(ent->lde_namelen),
1128                                le16_to_cpu(ent->lde_namelen),
1129                                ent->lde_name, rc);
1130                         GOTO(out, rc);
1131                 }
1132
1133                 offset += lu_dirent_size(ent);
1134                 if (offset >= size)
1135                         break;
1136         }
1137         EXIT;
1138 out:
1139         kunmap(page);
1140         return rc;
1141 }
1142
1143 static int mdt_bulk_timeout(void *data)
1144 {
1145         ENTRY;
1146
1147         CERROR("mdt bulk transfer timeout \n");
1148
1149         RETURN(1);
1150 }
1151
1152 static int mdt_writepage(struct mdt_thread_info *info)
1153 {
1154         struct ptlrpc_request   *req = mdt_info_req(info);
1155         struct mdt_body         *reqbody;
1156         struct l_wait_info      *lwi;
1157         struct ptlrpc_bulk_desc *desc;
1158         struct page             *page;
1159         int                rc;
1160         ENTRY;
1161
1162
1163         reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
1164         if (reqbody == NULL)
1165                 RETURN(err_serious(-EFAULT));
1166
1167         desc = ptlrpc_prep_bulk_exp (req, 1, BULK_GET_SINK, MDS_BULK_PORTAL);
1168         if (!desc)
1169                 RETURN(err_serious(-ENOMEM));
1170
1171         /* allocate the page for the desc */
1172         page = alloc_pages(GFP_KERNEL, 0);
1173         if (!page)
1174                 GOTO(desc_cleanup, rc = -ENOMEM);
1175
1176         CDEBUG(D_INFO, "Received page offset %d size %d \n",
1177                (int)reqbody->size, (int)reqbody->nlink);
1178
1179         ptlrpc_prep_bulk_page(desc, page, (int)reqbody->size,
1180                               (int)reqbody->nlink);
1181
1182         /*
1183          * Check if client was evicted while we were doing i/o before touching
1184          * network.
1185          */
1186         OBD_ALLOC_PTR(lwi);
1187         if (!lwi)
1188                 GOTO(cleanup_page, rc = -ENOMEM);
1189
1190         if (desc->bd_export->exp_failed)
1191                 rc = -ENOTCONN;
1192         else
1193                 rc = ptlrpc_start_bulk_transfer (desc);
1194         if (rc == 0) {
1195                 *lwi = LWI_TIMEOUT_INTERVAL(obd_timeout * HZ / 4, HZ,
1196                                             mdt_bulk_timeout, desc);
1197                 rc = l_wait_event(desc->bd_waitq, !ptlrpc_bulk_active(desc) ||
1198                                   desc->bd_export->exp_failed, lwi);
1199                 LASSERT(rc == 0 || rc == -ETIMEDOUT);
1200                 if (rc == -ETIMEDOUT) {
1201                         DEBUG_REQ(D_ERROR, req, "timeout on bulk GET");
1202                         ptlrpc_abort_bulk(desc);
1203                 } else if (desc->bd_export->exp_failed) {
1204                         DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
1205                         rc = -ENOTCONN;
1206                         ptlrpc_abort_bulk(desc);
1207                 } else if (!desc->bd_success ||
1208                            desc->bd_nob_transferred != desc->bd_nob) {
1209                         DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
1210                                   desc->bd_success ?
1211                                   "truncated" : "network error on",
1212                                   desc->bd_nob_transferred, desc->bd_nob);
1213                         /* XXX should this be a different errno? */
1214                         rc = -ETIMEDOUT;
1215                 }
1216         } else {
1217                 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d\n", rc);
1218         }
1219         if (rc)
1220                 GOTO(cleanup_lwi, rc);
1221         rc = mdt_write_dir_page(info, page, reqbody->nlink);
1222
1223 cleanup_lwi:
1224         OBD_FREE_PTR(lwi);
1225 cleanup_page:
1226         __free_pages(page, 0);
1227 desc_cleanup:
1228         ptlrpc_free_bulk(desc);
1229         RETURN(rc);
1230 }
1231 #endif
1232
1233 static int mdt_readpage(struct mdt_thread_info *info)
1234 {
1235         struct mdt_object *object = info->mti_object;
1236         struct lu_rdpg    *rdpg = &info->mti_u.rdpg.mti_rdpg;
1237         struct mdt_body   *reqbody;
1238         struct mdt_body   *repbody;
1239         int                rc;
1240         int                i;
1241         ENTRY;
1242
1243         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
1244                 RETURN(err_serious(-ENOMEM));
1245
1246         reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
1247         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
1248         if (reqbody == NULL || repbody == NULL)
1249                 RETURN(err_serious(-EFAULT));
1250
1251         rc = mdt_check_ucred(info);
1252         if (rc)
1253                 RETURN(err_serious(rc));
1254
1255         /*
1256          * prepare @rdpg before calling lower layers and transfer itself. Here
1257          * reqbody->size contains offset of where to start to read and
1258          * reqbody->nlink contains number bytes to read.
1259          */
1260         rdpg->rp_hash = reqbody->size;
1261         if ((__u64)rdpg->rp_hash != reqbody->size) {
1262                 CERROR("Invalid hash: %#llx != %#llx\n",
1263                        (__u64)rdpg->rp_hash, reqbody->size);
1264                 RETURN(-EFAULT);
1265         }
1266         rdpg->rp_count  = reqbody->nlink;
1267         rdpg->rp_npages = (rdpg->rp_count + CFS_PAGE_SIZE - 1)>>CFS_PAGE_SHIFT;
1268         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
1269         if (rdpg->rp_pages == NULL)
1270                 RETURN(-ENOMEM);
1271
1272         for (i = 0; i < rdpg->rp_npages; ++i) {
1273                 rdpg->rp_pages[i] = alloc_pages(GFP_KERNEL, 0);
1274                 if (rdpg->rp_pages[i] == NULL)
1275                         GOTO(free_rdpg, rc = -ENOMEM);
1276         }
1277
1278         /* call lower layers to fill allocated pages with directory data */
1279         rc = mo_readpage(info->mti_env, mdt_object_child(object), rdpg);
1280         if (rc)
1281                 GOTO(free_rdpg, rc);
1282
1283         /* send pages to client */
1284         rc = mdt_sendpage(info, rdpg);
1285
1286         EXIT;
1287 free_rdpg:
1288
1289         for (i = 0; i < rdpg->rp_npages; i++)
1290                 if (rdpg->rp_pages[i] != NULL)
1291                         __free_pages(rdpg->rp_pages[i], 0);
1292         OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
1293
1294         MDT_FAIL_RETURN(OBD_FAIL_MDS_SENDPAGE, 0);
1295
1296         return rc;
1297 }
1298
1299 static int mdt_reint_internal(struct mdt_thread_info *info,
1300                               struct mdt_lock_handle *lhc,
1301                               __u32 op)
1302 {
1303         struct req_capsule      *pill = &info->mti_pill;
1304         struct mdt_device       *mdt = info->mti_mdt;
1305         struct ptlrpc_request   *req = mdt_info_req(info);
1306         struct mdt_body         *repbody;
1307         int                      need_shrink = 0;
1308         int                      rc;
1309         ENTRY;
1310
1311         /* pack reply */
1312         if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) {
1313                 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
1314                                      mdt->mdt_max_mdsize);
1315                 need_shrink = 1;
1316         }
1317         if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER)) {
1318                 req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
1319                                      mdt->mdt_max_cookiesize);
1320                 need_shrink = 1;
1321         }
1322         rc = req_capsule_pack(pill);
1323         if (rc != 0) {
1324                 CERROR("Can't pack response, rc %d\n", rc);
1325                 RETURN(err_serious(rc));
1326         }
1327
1328         if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_SERVER)) {
1329                 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
1330                 LASSERT(repbody);
1331                 repbody->eadatasize = 0;
1332                 repbody->aclsize = 0;
1333         }
1334
1335         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNPACK))
1336                 GOTO(out_shrink, rc = err_serious(-EFAULT));
1337
1338         rc = mdt_reint_unpack(info, op);
1339         if (rc != 0) {
1340                 CERROR("Can't unpack reint, rc %d\n", rc);
1341                 GOTO(out_shrink, rc = err_serious(rc));
1342         }
1343
1344         rc = mdt_init_ucred_reint(info);
1345         if (rc)
1346                 GOTO(out_shrink, rc = err_serious(rc));
1347
1348         rc = mdt_fix_attr_ucred(info, op);
1349         if (rc != 0)
1350                 GOTO(out_ucred, rc = err_serious(rc));
1351
1352         if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
1353                 struct mdt_client_data *mcd;
1354
1355                 mcd = req->rq_export->exp_mdt_data.med_mcd;
1356                 if (req_xid_is_last(req)) {
1357                         need_shrink = 0;
1358                         mdt_reconstruct(info, lhc);
1359                         rc = lustre_msg_get_status(req->rq_repmsg);
1360                         GOTO(out_ucred, rc);
1361                 }
1362                 DEBUG_REQ(D_HA, req, "no reply for RESENT (xid "LPD64")",
1363                           mcd->mcd_last_xid);
1364         }
1365         
1366         need_shrink = 0;
1367         rc = mdt_reint_rec(info, lhc);
1368         EXIT;
1369 out_ucred:
1370         mdt_exit_ucred(info);
1371 out_shrink:
1372         if (need_shrink) {
1373                 if (info->mti_pill.rc_fmt == &RQF_LDLM_INTENT_OPEN)
1374                         mdt_shrink_reply(info, DLM_REPLY_REC_OFF + 1, 0, 0);
1375                 else
1376                         mdt_shrink_reply(info, REPLY_REC_OFF + 1, 0, 0);
1377         }
1378         return rc;
1379 }
1380
1381 static long mdt_reint_opcode(struct mdt_thread_info *info,
1382                              const struct req_format **fmt)
1383 {
1384         __u32 *ptr;
1385         long opc;
1386
1387         opc = err_serious(-EFAULT);
1388         ptr = req_capsule_client_get(&info->mti_pill, &RMF_REINT_OPC);
1389         if (ptr != NULL) {
1390                 opc = *ptr;
1391                 DEBUG_REQ(D_INODE, mdt_info_req(info), "reint opt = %ld", opc);
1392                 if (opc < REINT_MAX && fmt[opc] != NULL)
1393                         req_capsule_extend(&info->mti_pill, fmt[opc]);
1394                 else {
1395                         CERROR("Unsupported opc: %ld\n", opc);
1396                         opc = err_serious(opc);
1397                 }
1398         }
1399         return opc;
1400 }
1401
1402 static int mdt_reint(struct mdt_thread_info *info)
1403 {
1404         long opc;
1405         int  rc;
1406
1407         static const struct req_format *reint_fmts[REINT_MAX] = {
1408                 [REINT_SETATTR] = &RQF_MDS_REINT_SETATTR,
1409                 [REINT_CREATE]  = &RQF_MDS_REINT_CREATE,
1410                 [REINT_LINK]    = &RQF_MDS_REINT_LINK,
1411                 [REINT_UNLINK]  = &RQF_MDS_REINT_UNLINK,
1412                 [REINT_RENAME]  = &RQF_MDS_REINT_RENAME,
1413                 [REINT_OPEN]    = &RQF_MDS_REINT_OPEN
1414         };
1415
1416         ENTRY;
1417
1418         opc = mdt_reint_opcode(info, reint_fmts);
1419         if (opc >= 0) {
1420                 /*
1421                  * No lock possible here from client to pass it to reint code
1422                  * path.
1423                  */
1424                 rc = mdt_reint_internal(info, NULL, opc);
1425         } else {
1426                 rc = opc;
1427         }
1428
1429         info->mti_fail_id = OBD_FAIL_MDS_REINT_NET_REP;
1430         RETURN(rc);
1431 }
1432
1433 /* TODO these two methods not available now. */
1434
1435 /* this should sync the whole device */
1436 static int mdt_device_sync(struct mdt_thread_info *info)
1437 {
1438         return 0;
1439 }
1440
1441 /* this should sync this object */
1442 static int mdt_object_sync(struct mdt_thread_info *info)
1443 {
1444         return 0;
1445 }
1446
1447 static int mdt_sync(struct mdt_thread_info *info)
1448 {
1449         struct req_capsule *pill = &info->mti_pill;
1450         struct mdt_body *body;
1451         int rc;
1452         ENTRY;
1453
1454         /* The fid may be zero, so we req_capsule_set manually */
1455         req_capsule_set(pill, &RQF_MDS_SYNC);
1456
1457         body = req_capsule_client_get(pill, &RMF_MDT_BODY);
1458         if (body == NULL)
1459                 RETURN(err_serious(-EINVAL));
1460
1461         if (MDT_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
1462                 RETURN(err_serious(-ENOMEM));
1463
1464         if (fid_seq(&body->fid1) == 0) {
1465                 /* sync the whole device */
1466                 rc = req_capsule_pack(pill);
1467                 if (rc == 0)
1468                         rc = mdt_device_sync(info);
1469                 else
1470                         rc = err_serious(rc);
1471         } else {
1472                 /* sync an object */
1473                 rc = mdt_unpack_req_pack_rep(info, HABEO_CORPUS|HABEO_REFERO);
1474                 if (rc == 0) {
1475                         rc = mdt_object_sync(info);
1476                         if (rc == 0) {
1477                                 struct md_object *next;
1478                                 const struct lu_fid *fid;
1479                                 struct lu_attr *la = &info->mti_attr.ma_attr;
1480
1481                                 next = mdt_object_child(info->mti_object);
1482                                 info->mti_attr.ma_need = MA_INODE;
1483                                 info->mti_attr.ma_valid = 0;
1484                                 rc = mo_attr_get(info->mti_env, next,
1485                                                  &info->mti_attr);
1486                                 if (rc == 0) {
1487                                         body = req_capsule_server_get(pill,
1488                                                                 &RMF_MDT_BODY);
1489                                         fid = mdt_object_fid(info->mti_object);
1490                                         mdt_pack_attr2body(info, body, la, fid);
1491                                 }
1492                         }
1493                 } else
1494                         rc = err_serious(rc);
1495         }
1496         RETURN(rc);
1497 }
1498
1499 static int mdt_quotacheck_handle(struct mdt_thread_info *info)
1500 {
1501         return err_serious(-EOPNOTSUPP);
1502 }
1503
1504 static int mdt_quotactl_handle(struct mdt_thread_info *info)
1505 {
1506         return err_serious(-EOPNOTSUPP);
1507 }
1508
1509 /*
1510  * OBD PING and other handlers.
1511  */
1512 static int mdt_obd_ping(struct mdt_thread_info *info)
1513 {
1514         int rc;
1515         ENTRY;
1516         rc = target_handle_ping(mdt_info_req(info));
1517         if (rc < 0)
1518                 rc = err_serious(rc);
1519         RETURN(rc);
1520 }
1521
1522 static int mdt_obd_log_cancel(struct mdt_thread_info *info)
1523 {
1524         return err_serious(-EOPNOTSUPP);
1525 }
1526
1527 static int mdt_obd_qc_callback(struct mdt_thread_info *info)
1528 {
1529         return err_serious(-EOPNOTSUPP);
1530 }
1531
1532
1533 /*
1534  * DLM handlers.
1535  */
1536 static struct ldlm_callback_suite cbs = {
1537         .lcs_completion = ldlm_server_completion_ast,
1538         .lcs_blocking   = ldlm_server_blocking_ast,
1539         .lcs_glimpse    = NULL
1540 };
1541
1542 static int mdt_enqueue(struct mdt_thread_info *info)
1543 {
1544         struct ptlrpc_request *req;
1545         __u64 req_bits;
1546         int rc;
1547
1548         /*
1549          * info->mti_dlm_req already contains swapped and (if necessary)
1550          * converted dlm request.
1551          */
1552         LASSERT(info->mti_dlm_req != NULL);
1553
1554         if (OBD_FAIL_CHECK(OBD_FAIL_LDLM_ENQUEUE)) {
1555                 info->mti_fail_id = OBD_FAIL_LDLM_ENQUEUE;
1556                 return 0;
1557         }
1558
1559         req = mdt_info_req(info);
1560
1561         /*
1562          * Lock without inodebits makes no sense and will oops later in
1563          * ldlm. Let's check it now to see if we have wrong lock from client or
1564          * bits get corrupted somewhere in mdt_intent_policy().
1565          */
1566         req_bits = info->mti_dlm_req->lock_desc.l_policy_data.l_inodebits.bits;
1567         LASSERT(req_bits != 0);
1568
1569         rc = ldlm_handle_enqueue0(info->mti_mdt->mdt_namespace,
1570                                   req, info->mti_dlm_req, &cbs);
1571         info->mti_fail_id = OBD_FAIL_LDLM_REPLY;
1572         return rc ? err_serious(rc) : req->rq_status;
1573 }
1574
1575 static int mdt_convert(struct mdt_thread_info *info)
1576 {
1577         int rc;
1578         struct ptlrpc_request *req;
1579
1580         LASSERT(info->mti_dlm_req);
1581         req = mdt_info_req(info);
1582         rc = ldlm_handle_convert0(req, info->mti_dlm_req);
1583         return rc ? err_serious(rc) : req->rq_status;
1584 }
1585
1586 static int mdt_bl_callback(struct mdt_thread_info *info)
1587 {
1588         CERROR("bl callbacks should not happen on MDS\n");
1589         LBUG();
1590         return err_serious(-EOPNOTSUPP);
1591 }
1592
1593 static int mdt_cp_callback(struct mdt_thread_info *info)
1594 {
1595         CERROR("cp callbacks should not happen on MDS\n");
1596         LBUG();
1597         return err_serious(-EOPNOTSUPP);
1598 }
1599
1600 /*
1601  * sec context handlers
1602  */
1603 static int mdt_sec_ctx_handle(struct mdt_thread_info *info)
1604 {
1605         int rc;
1606
1607         rc = mdt_handle_idmap(info);
1608
1609         if (unlikely(rc)) {
1610                 struct ptlrpc_request *req = mdt_info_req(info);
1611                 __u32                  opc;
1612
1613                 opc = lustre_msg_get_opc(req->rq_reqmsg);
1614                 if (opc == SEC_CTX_INIT || opc == SEC_CTX_INIT_CONT)
1615                         sptlrpc_svc_ctx_invalidate(req);
1616         }
1617
1618         return rc;
1619 }
1620
1621 static struct mdt_object *mdt_obj(struct lu_object *o)
1622 {
1623         LASSERT(lu_device_is_mdt(o->lo_dev));
1624         return container_of0(o, struct mdt_object, mot_obj.mo_lu);
1625 }
1626
1627 struct mdt_object *mdt_object_find(const struct lu_env *env,
1628                                    struct mdt_device *d,
1629                                    const struct lu_fid *f)
1630 {
1631         struct lu_object *o;
1632         struct mdt_object *m;
1633         ENTRY;
1634
1635         o = lu_object_find(env, d->mdt_md_dev.md_lu_dev.ld_site, f);
1636         if (IS_ERR(o))
1637                 m = (struct mdt_object *)o;
1638         else
1639                 m = mdt_obj(o);
1640         RETURN(m);
1641 }
1642
1643 int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
1644                     struct mdt_lock_handle *lh, __u64 ibits, int locality)
1645 {
1646         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1647         ldlm_policy_data_t *policy = &info->mti_policy;
1648         struct ldlm_res_id *res_id = &info->mti_res_id;
1649         int exist = mdt_object_exists(o);
1650         int rc;
1651         ENTRY;
1652
1653         LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh));
1654         LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh));
1655         LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
1656         LASSERT(lh->mlh_type != MDT_NUL_LOCK);
1657
1658         if (exist < 0) {
1659                 if (locality == MDT_CROSS_LOCK) {
1660                         /* cross-ref object fix */
1661                         ibits &= ~MDS_INODELOCK_UPDATE;
1662                         ibits |= MDS_INODELOCK_LOOKUP;
1663                 } else {
1664                         LASSERT(!(ibits & MDS_INODELOCK_UPDATE));
1665                         LASSERT(ibits & MDS_INODELOCK_LOOKUP);
1666                 }
1667                 /* No PDO lock on remote object */
1668                 LASSERT(lh->mlh_type != MDT_PDO_LOCK);
1669         } else if (exist == 0 && lh->mlh_type == MDT_PDO_LOCK) {
1670                 /*
1671                  * No PDO lock on non-existing object.
1672                  * This may happen on removed $PWD on client.
1673                  */
1674                 RETURN(-ESTALE);
1675         }
1676
1677         memset(policy, 0, sizeof(*policy));
1678         fid_build_reg_res_name(mdt_object_fid(o), res_id);
1679
1680         /*
1681          * Take PDO lock on whole directory and build correct @res_id for lock
1682          * on part of directory.
1683          */
1684         if (lh->mlh_type == MDT_PDO_LOCK && lh->mlh_pdo_hash != 0) {
1685                 mdt_lock_pdo_mode(info, o, lh);
1686                 if (lh->mlh_pdo_mode != LCK_NL) {
1687                         /*
1688                          * Do not use LDLM_FL_LOCAL_ONLY for parallel lock, it
1689                          * is never going to be sent to client and we do not
1690                          * want it slowed down due to possible cancels.
1691                          */
1692                         policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
1693                         rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode,
1694                                           policy, res_id, LDLM_FL_ATOMIC_CB);
1695                         if (rc)
1696                                 RETURN(rc);
1697                 }
1698
1699                 /*
1700                  * Finish res_id initializing by name hash marking patr of
1701                  * directory which is taking modification.
1702                  */
1703                 res_id->name[LUSTRE_RES_ID_HSH_OFF] = lh->mlh_pdo_hash;
1704         }
1705
1706         policy->l_inodebits.bits = ibits;
1707
1708         /*
1709          * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
1710          * going to be sent to client. If it is - mdt_intent_policy() path will
1711          * fix it up and turns FL_LOCAL flag off.
1712          */
1713         rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
1714                           res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB);
1715
1716         if (rc && lh->mlh_type == MDT_PDO_LOCK) {
1717                 mdt_fid_unlock(&lh->mlh_pdo_lh, lh->mlh_pdo_mode);
1718                 lh->mlh_pdo_lh.cookie = 0ull;
1719         }
1720
1721         RETURN(rc);
1722 }
1723
1724 /*
1725  * Just call ldlm_lock_decref() if decref, else we only call ptlrpc_save_lock()
1726  * to save this lock in req.  when transaction committed, req will be released,
1727  * and lock will, too.
1728  */
1729 void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
1730                        struct mdt_lock_handle *lh, int decref)
1731 {
1732         struct ptlrpc_request *req = mdt_info_req(info);
1733         ENTRY;
1734
1735         if (lustre_handle_is_used(&lh->mlh_pdo_lh)) {
1736                 /* Do not save PDO locks to request, just decref. */
1737                 mdt_fid_unlock(&lh->mlh_pdo_lh,
1738                                lh->mlh_pdo_mode);
1739                 lh->mlh_pdo_lh.cookie = 0;
1740         }
1741
1742         if (lustre_handle_is_used(&lh->mlh_reg_lh)) {
1743                 if (decref) {
1744                         mdt_fid_unlock(&lh->mlh_reg_lh,
1745                                        lh->mlh_reg_mode);
1746                 } else {
1747                         ptlrpc_save_lock(req, &lh->mlh_reg_lh,
1748                                          lh->mlh_reg_mode);
1749                 }
1750                 lh->mlh_reg_lh.cookie = 0;
1751         }
1752
1753         EXIT;
1754 }
1755
1756 struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info,
1757                                         const struct lu_fid *f,
1758                                         struct mdt_lock_handle *lh,
1759                                         __u64 ibits)
1760 {
1761         struct mdt_object *o;
1762
1763         o = mdt_object_find(info->mti_env, info->mti_mdt, f);
1764         if (!IS_ERR(o)) {
1765                 int rc;
1766
1767                 rc = mdt_object_lock(info, o, lh, ibits,
1768                                      MDT_LOCAL_LOCK);
1769                 if (rc != 0) {
1770                         mdt_object_put(info->mti_env, o);
1771                         o = ERR_PTR(rc);
1772                 }
1773         }
1774         return o;
1775 }
1776
1777 void mdt_object_unlock_put(struct mdt_thread_info * info,
1778                            struct mdt_object * o,
1779                            struct mdt_lock_handle *lh,
1780                            int decref)
1781 {
1782         mdt_object_unlock(info, o, lh, decref);
1783         mdt_object_put(info->mti_env, o);
1784 }
1785
1786 static struct mdt_handler *mdt_handler_find(__u32 opc,
1787                                             struct mdt_opc_slice *supported)
1788 {
1789         struct mdt_opc_slice *s;
1790         struct mdt_handler   *h;
1791
1792         h = NULL;
1793         for (s = supported; s->mos_hs != NULL; s++) {
1794                 if (s->mos_opc_start <= opc && opc < s->mos_opc_end) {
1795                         h = s->mos_hs + (opc - s->mos_opc_start);
1796                         if (h->mh_opc != 0)
1797                                 LASSERT(h->mh_opc == opc);
1798                         else
1799                                 h = NULL; /* unsupported opc */
1800                         break;
1801                 }
1802         }
1803         return h;
1804 }
1805
1806 static int mdt_lock_resname_compat(struct mdt_device *m,
1807                                    struct ldlm_request *req)
1808 {
1809         /* XXX something... later. */
1810         return 0;
1811 }
1812
1813 static int mdt_lock_reply_compat(struct mdt_device *m, struct ldlm_reply *rep)
1814 {
1815         /* XXX something... later. */
1816         return 0;
1817 }
1818
1819 /*
1820  * Generic code handling requests that have struct mdt_body passed in:
1821  *
1822  *  - extract mdt_body from request and save it in @info, if present;
1823  *
1824  *  - create lu_object, corresponding to the fid in mdt_body, and save it in
1825  *  @info;
1826  *
1827  *  - if HABEO_CORPUS flag is set for this request type check whether object
1828  *  actually exists on storage (lu_object_exists()).
1829  *
1830  */
1831 static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags)
1832 {
1833         const struct mdt_body    *body;
1834         struct mdt_object        *obj;
1835         const struct lu_env      *env;
1836         struct req_capsule       *pill;
1837         int                       rc;
1838
1839         env = info->mti_env;
1840         pill = &info->mti_pill;
1841
1842         body = info->mti_body = req_capsule_client_get(pill, &RMF_MDT_BODY);
1843         if (body == NULL)
1844                 return -EFAULT;
1845
1846         if (!fid_is_sane(&body->fid1)) {
1847                 CERROR("Invalid fid: "DFID"\n", PFID(&body->fid1));
1848                 return -EINVAL;
1849         }
1850
1851         /*
1852          * Do not get size or any capa fields before we check that request
1853          * contains capa actually. There are some requests which do not, for
1854          * instance MDS_IS_SUBDIR.
1855          */
1856         if (req_capsule_has_field(pill, &RMF_CAPA1, RCL_CLIENT) &&
1857             req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
1858                 mdt_set_capainfo(info, 0, &body->fid1,
1859                                  req_capsule_client_get(pill, &RMF_CAPA1));
1860
1861         obj = mdt_object_find(env, info->mti_mdt, &body->fid1);
1862         if (!IS_ERR(obj)) {
1863                 if ((flags & HABEO_CORPUS) &&
1864                     !mdt_object_exists(obj)) {
1865                         mdt_object_put(env, obj);
1866                         /* for capability renew ENOENT will be handled in
1867                          * mdt_renew_capa */
1868                         if (body->valid & OBD_MD_FLOSSCAPA)
1869                                 rc = 0;
1870                         else
1871                                 rc = -ENOENT;
1872                 } else {
1873                         info->mti_object = obj;
1874                         rc = 0;
1875                 }
1876         } else
1877                 rc = PTR_ERR(obj);
1878
1879         return rc;
1880 }
1881
1882 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags)
1883 {
1884         struct req_capsule *pill;
1885         int rc;
1886
1887         ENTRY;
1888         pill = &info->mti_pill;
1889
1890         if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT))
1891                 rc = mdt_body_unpack(info, flags);
1892         else
1893                 rc = 0;
1894
1895         if (rc == 0 && (flags & HABEO_REFERO)) {
1896                 struct mdt_device *mdt = info->mti_mdt;
1897
1898                 /* Pack reply. */
1899                 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
1900                         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
1901                                              mdt->mdt_max_mdsize);
1902                 if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
1903                         req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER,
1904                                              mdt->mdt_max_cookiesize);
1905
1906                 rc = req_capsule_pack(pill);
1907         }
1908         RETURN(rc);
1909 }
1910
1911 static int mdt_init_capa_ctxt(const struct lu_env *env, struct mdt_device *m)
1912 {
1913         struct md_device *next = m->mdt_child;
1914
1915         return next->md_ops->mdo_init_capa_ctxt(env, next,
1916                                                 m->mdt_opts.mo_mds_capa,
1917                                                 m->mdt_capa_timeout,
1918                                                 m->mdt_capa_alg,
1919                                                 m->mdt_capa_keys);
1920 }
1921
1922 /*
1923  * Invoke handler for this request opc. Also do necessary preprocessing
1924  * (according to handler ->mh_flags), and post-processing (setting of
1925  * ->last_{xid,committed}).
1926  */
1927 static int mdt_req_handle(struct mdt_thread_info *info,
1928                           struct mdt_handler *h, struct ptlrpc_request *req)
1929 {
1930         int   rc, serious = 0;
1931         __u32 flags;
1932
1933         ENTRY;
1934
1935         LASSERT(h->mh_act != NULL);
1936         LASSERT(h->mh_opc == lustre_msg_get_opc(req->rq_reqmsg));
1937         LASSERT(current->journal_info == NULL);
1938
1939         DEBUG_REQ(D_INODE, req, "%s", h->mh_name);
1940
1941         /*
1942          * Do not use *_FAIL_CHECK_ONCE() macros, because they will stop
1943          * correct handling of failed req later in ldlm due to doing
1944          * obd_fail_loc |= OBD_FAIL_ONCE | OBD_FAILED without actually
1945          * correct actions like it is done in target_send_reply_msg().
1946          */
1947         if (h->mh_fail_id != 0) {
1948                 /*
1949                  * Set to info->mti_fail_id to handler fail_id, it will be used
1950                  * later, and better than use default fail_id.
1951                  */
1952                 if (OBD_FAIL_CHECK(h->mh_fail_id)) {
1953                         info->mti_fail_id = h->mh_fail_id;
1954                         RETURN(0);
1955                 }
1956         }
1957
1958         rc = 0;
1959         flags = h->mh_flags;
1960         LASSERT(ergo(flags & (HABEO_CORPUS|HABEO_REFERO), h->mh_fmt != NULL));
1961
1962         if (h->mh_fmt != NULL) {
1963                 req_capsule_set(&info->mti_pill, h->mh_fmt);
1964                 rc = mdt_unpack_req_pack_rep(info, flags);
1965         }
1966
1967         if (rc == 0 && flags & MUTABOR &&
1968             req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
1969                 /* should it be rq_status? */
1970                 rc = -EROFS;
1971
1972         if (rc == 0 && flags & HABEO_CLAVIS) {
1973                 struct ldlm_request *dlm_req;
1974
1975                 LASSERT(h->mh_fmt != NULL);
1976
1977                 dlm_req = req_capsule_client_get(&info->mti_pill, &RMF_DLM_REQ);
1978                 if (dlm_req != NULL) {
1979                         if (info->mti_mdt->mdt_opts.mo_compat_resname)
1980                                 rc = mdt_lock_resname_compat(info->mti_mdt,
1981                                                              dlm_req);
1982                         info->mti_dlm_req = dlm_req;
1983                 } else {
1984                         CERROR("Can't unpack dlm request\n");
1985                         rc = -EFAULT;
1986                 }
1987         }
1988
1989         /* capability setting changed via /proc, needs reinitialize ctxt */
1990         if (info->mti_mdt && info->mti_mdt->mdt_capa_conf) {
1991                 mdt_init_capa_ctxt(info->mti_env, info->mti_mdt);
1992                 info->mti_mdt->mdt_capa_conf = 0;
1993         }
1994
1995         if (rc == 0) {
1996                 /*
1997                  * Process request, there can be two types of rc:
1998                  * 1) errors with msg unpack/pack, other failures outside the
1999                  * operation itself. This is counted as serious errors;
2000                  * 2) errors during fs operation, should be placed in rq_status
2001                  * only
2002                  */
2003                 rc = h->mh_act(info);
2004                 serious = is_serious(rc);
2005                 rc = clear_serious(rc);
2006         } else
2007                 serious = 1;
2008
2009         req->rq_status = rc;
2010
2011         /*
2012          * ELDLM_* codes which > 0 should be in rq_status only as well as
2013          * all non-serious errors.
2014          */
2015         if (rc > 0 || !serious)
2016                 rc = 0;
2017
2018         LASSERT(current->journal_info == NULL);
2019
2020         if (rc == 0 && (flags & HABEO_CLAVIS)
2021             && info->mti_mdt->mdt_opts.mo_compat_resname) {
2022                 struct ldlm_reply *dlmrep;
2023
2024                 dlmrep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
2025                 if (dlmrep != NULL)
2026                         rc = mdt_lock_reply_compat(info->mti_mdt, dlmrep);
2027         }
2028
2029         /* If we're DISCONNECTing, the mdt_export_data is already freed */
2030         if (rc == 0 && h->mh_opc != MDS_DISCONNECT)
2031                 target_committed_to_req(req);
2032
2033         if ((lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) &&
2034             lustre_msg_get_transno(req->rq_reqmsg) == 0) {
2035                 DEBUG_REQ(D_ERROR, req, "transno is 0 during REPLAY\n");
2036                 LBUG();
2037         }
2038
2039         RETURN(rc);
2040 }
2041
2042 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
2043 {
2044         lh->mlh_type = MDT_NUL_LOCK;
2045         lh->mlh_reg_lh.cookie = 0ull;
2046         lh->mlh_reg_mode = LCK_MINMODE;
2047         lh->mlh_pdo_lh.cookie = 0ull;
2048         lh->mlh_pdo_mode = LCK_MINMODE;
2049 }
2050
2051 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
2052 {
2053         LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh));
2054         LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh));
2055 }
2056
2057 /*
2058  * Initialize fields of struct mdt_thread_info. Other fields are left in
2059  * uninitialized state, because it's too expensive to zero out whole
2060  * mdt_thread_info (> 1K) on each request arrival.
2061  */
2062 static void mdt_thread_info_init(struct ptlrpc_request *req,
2063                                  struct mdt_thread_info *info)
2064 {
2065         int i;
2066
2067         info->mti_rep_buf_nr = ARRAY_SIZE(info->mti_rep_buf_size);
2068         for (i = 0; i < ARRAY_SIZE(info->mti_rep_buf_size); i++)
2069                 info->mti_rep_buf_size[i] = -1;
2070         req_capsule_init(&info->mti_pill, req, RCL_SERVER,
2071                          info->mti_rep_buf_size);
2072
2073         /* lock handle */
2074         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
2075                 mdt_lock_handle_init(&info->mti_lh[i]);
2076
2077         /* mdt device: it can be NULL while CONNECT */
2078         if (req->rq_export)
2079                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
2080         else
2081                 info->mti_mdt = NULL;
2082         info->mti_env = req->rq_svc_thread->t_env;
2083
2084         info->mti_fail_id = OBD_FAIL_MDS_ALL_REPLY_NET;
2085         info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg);
2086
2087         memset(&info->mti_attr, 0, sizeof(info->mti_attr));
2088         info->mti_body = NULL;
2089         info->mti_object = NULL;
2090         info->mti_dlm_req = NULL;
2091         info->mti_has_trans = 0;
2092         info->mti_no_need_trans = 0;
2093         info->mti_cross_ref = 0;
2094         info->mti_opdata = 0;
2095
2096         /* To not check for split by default. */
2097         info->mti_spec.sp_ck_split = 0;
2098 }
2099
2100 static void mdt_thread_info_fini(struct mdt_thread_info *info)
2101 {
2102         int i;
2103
2104         req_capsule_fini(&info->mti_pill);
2105         if (info->mti_object != NULL) {
2106                 mdt_object_put(info->mti_env, info->mti_object);
2107                 info->mti_object = NULL;
2108         }
2109         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
2110                 mdt_lock_handle_fini(&info->mti_lh[i]);
2111         info->mti_env = NULL;
2112 }
2113
2114 /* mds/handler.c */
2115 extern int mds_filter_recovery_request(struct ptlrpc_request *req,
2116                                        struct obd_device *obd, int *process);
2117 /*
2118  * Handle recovery. Return:
2119  *        +1: continue request processing;
2120  *       -ve: abort immediately with the given error code;
2121  *         0: send reply with error code in req->rq_status;
2122  */
2123 static int mdt_recovery(struct mdt_thread_info *info)
2124 {
2125         struct ptlrpc_request *req = mdt_info_req(info);
2126         int recovering;
2127         struct obd_device *obd;
2128
2129         ENTRY;
2130
2131         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2132         case MDS_CONNECT:
2133         case SEC_CTX_INIT:
2134         case SEC_CTX_INIT_CONT:
2135         case SEC_CTX_FINI:
2136                 {
2137 #if 0
2138                         int rc;
2139
2140                         rc = mdt_handle_idmap(info);
2141                         if (rc)
2142                                 RETURN(rc);
2143                         else
2144 #endif
2145                                 RETURN(+1);
2146                 }
2147         }
2148
2149         if (req->rq_export == NULL) {
2150                 CERROR("operation %d on unconnected MDS from %s\n",
2151                        lustre_msg_get_opc(req->rq_reqmsg),
2152                        libcfs_id2str(req->rq_peer));
2153                 req->rq_status = -ENOTCONN;
2154                 target_send_reply(req, -ENOTCONN, info->mti_fail_id);
2155                 RETURN(0);
2156         }
2157
2158         /* sanity check: if the xid matches, the request must be marked as a
2159          * resent or replayed */
2160         if (req_xid_is_last(req)) {
2161                 if (!(lustre_msg_get_flags(req->rq_reqmsg) &
2162                       (MSG_RESENT | MSG_REPLAY))) {
2163                         DEBUG_REQ(D_WARNING, req, "rq_xid "LPU64" matches last_xid, "
2164                                   "expected REPLAY or RESENT flag\n", req->rq_xid);
2165                         LBUG();
2166                         req->rq_status = -ENOTCONN;
2167                         RETURN(-ENOTCONN);
2168                 }
2169         }
2170
2171         /* else: note the opposite is not always true; a RESENT req after a
2172          * failover will usually not match the last_xid, since it was likely
2173          * never committed. A REPLAYed request will almost never match the
2174          * last xid, however it could for a committed, but still retained,
2175          * open. */
2176
2177         obd = req->rq_export->exp_obd;
2178
2179         /* Check for aborted recovery... */
2180         spin_lock_bh(&obd->obd_processing_task_lock);
2181         recovering = obd->obd_recovering;
2182         spin_unlock_bh(&obd->obd_processing_task_lock);
2183         if (recovering) {
2184                 int rc;
2185                 int should_process;
2186                 DEBUG_REQ(D_INFO, req, "Got new replay");
2187                 rc = mds_filter_recovery_request(req, obd, &should_process);
2188                 if (rc != 0 || !should_process)
2189                         RETURN(rc);
2190                 else if (should_process < 0) {
2191                         req->rq_status = should_process;
2192                         rc = ptlrpc_error(req);
2193                         RETURN(rc);
2194                 }
2195         }
2196         RETURN(+1);
2197 }
2198
2199 static int mdt_reply(struct ptlrpc_request *req, int rc,
2200                      struct mdt_thread_info *info)
2201 {
2202         ENTRY;
2203
2204 #if 0
2205         if (req->rq_reply_state == NULL && rc == 0) {
2206                 req->rq_status = rc;
2207                 lustre_pack_reply(req, 1, NULL, NULL);
2208         }
2209 #endif
2210         target_send_reply(req, rc, info->mti_fail_id);
2211         RETURN(0);
2212 }
2213
2214 /* mds/handler.c */
2215 extern int mds_msg_check_version(struct lustre_msg *msg);
2216
2217 static int mdt_handle0(struct ptlrpc_request *req,
2218                        struct mdt_thread_info *info,
2219                        struct mdt_opc_slice *supported)
2220 {
2221         struct mdt_handler *h;
2222         struct lustre_msg  *msg;
2223         int                 rc;
2224
2225         ENTRY;
2226
2227         MDT_FAIL_RETURN(OBD_FAIL_MDS_ALL_REQUEST_NET | OBD_FAIL_ONCE, 0);
2228
2229         LASSERT(current->journal_info == NULL);
2230
2231         msg = req->rq_reqmsg;
2232         rc = mds_msg_check_version(msg);
2233         if (rc == 0) {
2234                 rc = mdt_recovery(info);
2235                 if (rc == +1) {
2236                         h = mdt_handler_find(lustre_msg_get_opc(msg),
2237                                              supported);
2238                         if (h != NULL) {
2239                                 rc = mdt_req_handle(info, h, req);
2240                                 rc = mdt_reply(req, rc, info);
2241                         } else {
2242                                 req->rq_status = -ENOTSUPP;
2243                                 rc = ptlrpc_error(req);
2244                                 RETURN(rc);
2245                         }
2246                 }
2247         } else
2248                 CERROR(LUSTRE_MDT_NAME" drops mal-formed request\n");
2249         RETURN(rc);
2250 }
2251
2252 /*
2253  * MDT handler function called by ptlrpc service thread when request comes.
2254  *
2255  * XXX common "target" functionality should be factored into separate module
2256  * shared by mdt, ost and stand-alone services like fld.
2257  */
2258 static int mdt_handle_common(struct ptlrpc_request *req,
2259                              struct mdt_opc_slice *supported)
2260 {
2261         struct lu_env          *env;
2262         struct mdt_thread_info *info;
2263         int                     rc;
2264         ENTRY;
2265
2266         env = req->rq_svc_thread->t_env;
2267         LASSERT(env != NULL);
2268         LASSERT(env->le_ses != NULL);
2269         LASSERT(env->le_ctx.lc_thread == req->rq_svc_thread);
2270         info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
2271         LASSERT(info != NULL);
2272
2273         mdt_thread_info_init(req, info);
2274
2275         rc = mdt_handle0(req, info, supported);
2276
2277         mdt_thread_info_fini(info);
2278         RETURN(rc);
2279 }
2280
2281 /*
2282  * This is called from recovery code as handler of _all_ RPC types, FLD and SEQ
2283  * as well.
2284  */
2285 int mdt_recovery_handle(struct ptlrpc_request *req)
2286 {
2287         int rc;
2288         ENTRY;
2289
2290         switch (lustre_msg_get_opc(req->rq_reqmsg)) {
2291         case FLD_QUERY:
2292                 rc = mdt_handle_common(req, mdt_fld_handlers);
2293                 break;
2294         case SEQ_QUERY:
2295                 rc = mdt_handle_common(req, mdt_seq_handlers);
2296                 break;
2297         default:
2298                 rc = mdt_handle_common(req, mdt_regular_handlers);
2299                 break;
2300         }
2301
2302         RETURN(rc);
2303 }
2304
2305 static int mdt_regular_handle(struct ptlrpc_request *req)
2306 {
2307         return mdt_handle_common(req, mdt_regular_handlers);
2308 }
2309
2310 static int mdt_readpage_handle(struct ptlrpc_request *req)
2311 {
2312         return mdt_handle_common(req, mdt_readpage_handlers);
2313 }
2314
2315 static int mdt_mdsc_handle(struct ptlrpc_request *req)
2316 {
2317         return mdt_handle_common(req, mdt_seq_handlers);
2318 }
2319
2320 static int mdt_mdss_handle(struct ptlrpc_request *req)
2321 {
2322         return mdt_handle_common(req, mdt_seq_handlers);
2323 }
2324
2325 static int mdt_dtss_handle(struct ptlrpc_request *req)
2326 {
2327         return mdt_handle_common(req, mdt_seq_handlers);
2328 }
2329
2330 static int mdt_fld_handle(struct ptlrpc_request *req)
2331 {
2332         return mdt_handle_common(req, mdt_fld_handlers);
2333 }
2334
2335 enum mdt_it_code {
2336         MDT_IT_OPEN,
2337         MDT_IT_OCREAT,
2338         MDT_IT_CREATE,
2339         MDT_IT_GETATTR,
2340         MDT_IT_READDIR,
2341         MDT_IT_LOOKUP,
2342         MDT_IT_UNLINK,
2343         MDT_IT_TRUNC,
2344         MDT_IT_GETXATTR,
2345         MDT_IT_NR
2346 };
2347
2348 static int mdt_intent_getattr(enum mdt_it_code opcode,
2349                               struct mdt_thread_info *info,
2350                               struct ldlm_lock **,
2351                               int);
2352 static int mdt_intent_reint(enum mdt_it_code opcode,
2353                             struct mdt_thread_info *info,
2354                             struct ldlm_lock **,
2355                             int);
2356
2357 static struct mdt_it_flavor {
2358         const struct req_format *it_fmt;
2359         __u32                    it_flags;
2360         int                    (*it_act)(enum mdt_it_code ,
2361                                          struct mdt_thread_info *,
2362                                          struct ldlm_lock **,
2363                                          int);
2364         long                     it_reint;
2365 } mdt_it_flavor[] = {
2366         [MDT_IT_OPEN]     = {
2367                 .it_fmt   = &RQF_LDLM_INTENT,
2368                 /*.it_flags = HABEO_REFERO,*/
2369                 .it_flags = 0,
2370                 .it_act   = mdt_intent_reint,
2371                 .it_reint = REINT_OPEN
2372         },
2373         [MDT_IT_OCREAT]   = {
2374                 .it_fmt   = &RQF_LDLM_INTENT,
2375                 .it_flags = MUTABOR,
2376                 .it_act   = mdt_intent_reint,
2377                 .it_reint = REINT_OPEN
2378         },
2379         [MDT_IT_CREATE]   = {
2380                 .it_fmt   = &RQF_LDLM_INTENT,
2381                 .it_flags = MUTABOR,
2382                 .it_act   = mdt_intent_reint,
2383                 .it_reint = REINT_CREATE
2384         },
2385         [MDT_IT_GETATTR]  = {
2386                 .it_fmt   = &RQF_LDLM_INTENT_GETATTR,
2387                 .it_flags = HABEO_REFERO,
2388                 .it_act   = mdt_intent_getattr
2389         },
2390         [MDT_IT_READDIR]  = {
2391                 .it_fmt   = NULL,
2392                 .it_flags = 0,
2393                 .it_act   = NULL
2394         },
2395         [MDT_IT_LOOKUP]   = {
2396                 .it_fmt   = &RQF_LDLM_INTENT_GETATTR,
2397                 .it_flags = HABEO_REFERO,
2398                 .it_act   = mdt_intent_getattr
2399         },
2400         [MDT_IT_UNLINK]   = {
2401                 .it_fmt   = &RQF_LDLM_INTENT_UNLINK,
2402                 .it_flags = MUTABOR,
2403                 .it_act   = NULL, /* XXX can be mdt_intent_reint, ? */
2404                 .it_reint = REINT_UNLINK
2405         },
2406         [MDT_IT_TRUNC]    = {
2407                 .it_fmt   = NULL,
2408                 .it_flags = MUTABOR,
2409                 .it_act   = NULL
2410         },
2411         [MDT_IT_GETXATTR] = {
2412                 .it_fmt   = NULL,
2413                 .it_flags = 0,
2414                 .it_act   = NULL
2415         }
2416 };
2417
2418 int mdt_intent_lock_replace(struct mdt_thread_info *info,
2419                             struct ldlm_lock **lockp,
2420                             struct ldlm_lock *new_lock,
2421                             struct mdt_lock_handle *lh,
2422                             int flags)
2423 {
2424         struct ptlrpc_request  *req = mdt_info_req(info);
2425         struct ldlm_lock       *lock = *lockp;
2426
2427         /*
2428          * Get new lock only for cases when possible resent did not find any
2429          * lock.
2430          */
2431         if (new_lock == NULL)
2432                 new_lock = ldlm_handle2lock(&lh->mlh_reg_lh);
2433
2434         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY)) {
2435                 lh->mlh_reg_lh.cookie = 0;
2436                 RETURN(0);
2437         }
2438
2439         LASSERTF(new_lock != NULL,
2440                  "lockh "LPX64"\n", lh->mlh_reg_lh.cookie);
2441
2442         /*
2443          * If we've already given this lock to a client once, then we should
2444          * have no readers or writers.  Otherwise, we should have one reader
2445          * _or_ writer ref (which will be zeroed below) before returning the
2446          * lock to a client.
2447          */
2448         if (new_lock->l_export == req->rq_export) {
2449                 LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
2450         } else {
2451                 LASSERT(new_lock->l_export == NULL);
2452                 LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
2453         }
2454
2455         *lockp = new_lock;
2456
2457         if (new_lock->l_export == req->rq_export) {
2458                 /*
2459                  * Already gave this to the client, which means that we
2460                  * reconstructed a reply.
2461                  */
2462                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
2463                         MSG_RESENT);
2464                 lh->mlh_reg_lh.cookie = 0;
2465                 RETURN(ELDLM_LOCK_REPLACED);
2466         }
2467
2468         /* Fixup the lock to be given to the client */
2469         lock_res_and_lock(new_lock);
2470         new_lock->l_readers = 0;
2471         new_lock->l_writers = 0;
2472
2473         new_lock->l_export = class_export_get(req->rq_export);
2474         spin_lock(&req->rq_export->exp_ldlm_data.led_lock);
2475         list_add(&new_lock->l_export_chain,
2476                  &new_lock->l_export->exp_ldlm_data.led_held_locks);
2477         spin_unlock(&req->rq_export->exp_ldlm_data.led_lock);
2478
2479         new_lock->l_blocking_ast = lock->l_blocking_ast;
2480         new_lock->l_completion_ast = lock->l_completion_ast;
2481         new_lock->l_remote_handle = lock->l_remote_handle;
2482         new_lock->l_flags &= ~LDLM_FL_LOCAL;
2483
2484         unlock_res_and_lock(new_lock);
2485         LDLM_LOCK_PUT(new_lock);
2486         lh->mlh_reg_lh.cookie = 0;
2487
2488         RETURN(ELDLM_LOCK_REPLACED);
2489 }
2490
2491 static void mdt_intent_fixup_resent(struct mdt_thread_info *info,
2492                                     struct ldlm_lock *new_lock,
2493                                     struct ldlm_lock **old_lock,
2494                                     struct mdt_lock_handle *lh)
2495 {
2496         struct ptlrpc_request  *req = mdt_info_req(info);
2497         struct obd_export      *exp = req->rq_export;
2498         struct lustre_handle    remote_hdl;
2499         struct ldlm_request    *dlmreq;
2500         struct list_head       *iter;
2501
2502         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
2503                 return;
2504
2505         dlmreq = req_capsule_client_get(&info->mti_pill, &RMF_DLM_REQ);
2506         remote_hdl = dlmreq->lock_handle1;
2507
2508         spin_lock(&exp->exp_ldlm_data.led_lock);
2509         list_for_each(iter, &exp->exp_ldlm_data.led_held_locks) {
2510                 struct ldlm_lock *lock;
2511                 lock = list_entry(iter, struct ldlm_lock, l_export_chain);
2512                 if (lock == new_lock)
2513                         continue;
2514                 if (lock->l_remote_handle.cookie == remote_hdl.cookie) {
2515                         lh->mlh_reg_lh.cookie = lock->l_handle.h_cookie;
2516                         lh->mlh_reg_mode = lock->l_granted_mode;
2517
2518                         LDLM_DEBUG(lock, "restoring lock cookie");
2519                         DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64,
2520                                   lh->mlh_reg_lh.cookie);
2521                         if (old_lock)
2522                                 *old_lock = LDLM_LOCK_GET(lock);
2523                         spin_unlock(&exp->exp_ldlm_data.led_lock);
2524                         return;
2525                 }
2526         }
2527         spin_unlock(&exp->exp_ldlm_data.led_lock);
2528
2529         /*
2530          * If the xid matches, then we know this is a resent request, and allow
2531          * it. (It's probably an OPEN, for which we don't send a lock.
2532          */
2533         if (req_xid_is_last(req))
2534                 return;
2535
2536         /*
2537          * This remote handle isn't enqueued, so we never received or processed
2538          * this request.  Clear MSG_RESENT, because it can be handled like any
2539          * normal request now.
2540          */
2541         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
2542
2543         DEBUG_REQ(D_HA, req, "no existing lock with rhandle "LPX64,
2544                   remote_hdl.cookie);
2545 }
2546
2547 static int mdt_intent_getattr(enum mdt_it_code opcode,
2548                               struct mdt_thread_info *info,
2549                               struct ldlm_lock **lockp,
2550                               int flags)
2551 {
2552         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
2553         struct ldlm_lock       *new_lock = NULL;
2554         __u64                   child_bits;
2555         struct ldlm_reply      *ldlm_rep;
2556         struct ptlrpc_request  *req;
2557         struct mdt_body        *reqbody;
2558         struct mdt_body        *repbody;
2559         int                     rc;
2560         ENTRY;
2561
2562         mdt_lprocfs_time_start(info->mti_mdt, &info->mti_time,
2563                                LPROC_MDT_INTENT_GETATTR);
2564
2565         reqbody = req_capsule_client_get(&info->mti_pill, &RMF_MDT_BODY);
2566         LASSERT(reqbody);
2567
2568         repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY);
2569         LASSERT(repbody);
2570
2571         info->mti_spec.sp_ck_split = !!(reqbody->valid & OBD_MD_FLCKSPLIT);
2572         info->mti_cross_ref = !!(reqbody->valid & OBD_MD_FLCROSSREF);
2573         repbody->eadatasize = 0;
2574         repbody->aclsize = 0;
2575
2576         switch (opcode) {
2577         case MDT_IT_LOOKUP:
2578                 child_bits = MDS_INODELOCK_LOOKUP;
2579                 break;
2580         case MDT_IT_GETATTR:
2581                 child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
2582                 break;
2583         default:
2584                 CERROR("Unhandled till now");
2585                 GOTO(out, rc = -EINVAL);
2586         }
2587
2588         rc = mdt_init_ucred(info, reqbody);
2589         if (rc)
2590                 GOTO(out, rc);
2591
2592         req = info->mti_pill.rc_req;
2593         ldlm_rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
2594         mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
2595
2596         /* Get lock from request for possible resent case. */
2597         mdt_intent_fixup_resent(info, *lockp, &new_lock, lhc);
2598
2599         ldlm_rep->lock_policy_res2 =
2600                 mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep);
2601
2602         if (mdt_get_disposition(ldlm_rep, DISP_LOOKUP_NEG))
2603                 ldlm_rep->lock_policy_res2 = 0;
2604         if (!mdt_get_disposition(ldlm_rep, DISP_LOOKUP_POS) ||
2605             ldlm_rep->lock_policy_res2) {
2606                 lhc->mlh_reg_lh.cookie = 0ull;
2607                 GOTO(out_ucred, rc = ELDLM_LOCK_ABORTED);
2608         }
2609
2610         rc = mdt_intent_lock_replace(info, lockp, new_lock, lhc, flags);
2611         EXIT;
2612 out_ucred:
2613         mdt_exit_ucred(info);
2614 out:
2615         mdt_shrink_reply(info, DLM_REPLY_REC_OFF + 1, 1, 0);
2616         mdt_lprocfs_time_end(info->mti_mdt, &info->mti_time,
2617                              LPROC_MDT_INTENT_GETATTR);
2618         return rc;
2619 }
2620
2621 static int mdt_intent_reint(enum mdt_it_code opcode,
2622                             struct mdt_thread_info *info,
2623                             struct ldlm_lock **lockp,
2624                             int flags)
2625 {
2626         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
2627         struct ldlm_reply      *rep = NULL;
2628         long                    opc;
2629         int                     rc;
2630
2631         static const struct req_format *intent_fmts[REINT_MAX] = {
2632                 [REINT_CREATE]  = &RQF_LDLM_INTENT_CREATE,
2633                 [REINT_OPEN]    = &RQF_LDLM_INTENT_OPEN
2634         };
2635
2636         ENTRY;
2637
2638         mdt_lprocfs_time_start(info->mti_mdt, &info->mti_time,
2639                                LPROC_MDT_INTENT_REINT);
2640
2641         opc = mdt_reint_opcode(info, intent_fmts);
2642         if (opc < 0)
2643                 GOTO(out, rc = opc);
2644
2645         if (mdt_it_flavor[opcode].it_reint != opc) {
2646                 CERROR("Reint code %ld doesn't match intent: %d\n",
2647                        opc, opcode);
2648                 GOTO(out, rc = err_serious(-EPROTO));
2649         }
2650
2651         /* Get lock from request for possible resent case. */
2652         mdt_intent_fixup_resent(info, *lockp, NULL, lhc);
2653
2654         rc = mdt_reint_internal(info, lhc, opc);
2655         
2656         /* Check whether the reply has been packed successfully. */
2657         if (mdt_info_req(info)->rq_repmsg != NULL)
2658                 rep = req_capsule_server_get(&info->mti_pill, &RMF_DLM_REP);
2659         if (rep == NULL)
2660                 GOTO(out, rc = err_serious(-EFAULT));
2661
2662         /* MDC expects this in any case */
2663         if (rc != 0)
2664                 mdt_set_disposition(info, rep, DISP_LOOKUP_EXECD);
2665
2666         /* Cross-ref case, the lock should be returned to the client */
2667         if (rc == -EREMOTE) {
2668                 LASSERT(lustre_handle_is_used(&lhc->mlh_reg_lh));
2669                 rep->lock_policy_res2 = 0;
2670                 rc = mdt_intent_lock_replace(info, lockp, NULL, lhc, flags);
2671                 GOTO(out, rc);
2672         }
2673         rep->lock_policy_res2 = clear_serious(rc);
2674
2675         lhc->mlh_reg_lh.cookie = 0ull;
2676         rc = ELDLM_LOCK_ABORTED;
2677         EXIT;
2678 out:
2679         mdt_lprocfs_time_end(info->mti_mdt, &info->mti_time,
2680                              LPROC_MDT_INTENT_REINT);
2681         return rc;
2682 }
2683
2684 static int mdt_intent_code(long itcode)
2685 {
2686         int rc;
2687
2688         switch(itcode) {
2689         case IT_OPEN:
2690                 rc = MDT_IT_OPEN;
2691                 break;
2692         case IT_OPEN|IT_CREAT:
2693                 rc = MDT_IT_OCREAT;
2694                 break;
2695         case IT_CREAT:
2696                 rc = MDT_IT_CREATE;
2697                 break;
2698         case IT_READDIR:
2699                 rc = MDT_IT_READDIR;
2700                 break;
2701         case IT_GETATTR:
2702                 rc = MDT_IT_GETATTR;
2703                 break;
2704         case IT_LOOKUP:
2705                 rc = MDT_IT_LOOKUP;
2706                 break;
2707         case IT_UNLINK:
2708                 rc = MDT_IT_UNLINK;
2709                 break;
2710         case IT_TRUNC:
2711                 rc = MDT_IT_TRUNC;
2712                 break;
2713         case IT_GETXATTR:
2714                 rc = MDT_IT_GETXATTR;
2715                 break;
2716         default:
2717                 CERROR("Unknown intent opcode: %ld\n", itcode);
2718                 rc = -EINVAL;
2719                 break;
2720         }
2721         return rc;
2722 }
2723
2724 static int mdt_intent_opc(long itopc, struct mdt_thread_info *info,
2725                           struct ldlm_lock **lockp, int flags)
2726 {
2727         struct req_capsule   *pill;
2728         struct mdt_it_flavor *flv;
2729         int opc;
2730         int rc;
2731         ENTRY;
2732
2733         opc = mdt_intent_code(itopc);
2734         if (opc < 0)
2735                 RETURN(-EINVAL);
2736
2737         pill = &info->mti_pill;
2738         flv  = &mdt_it_flavor[opc];
2739
2740         if (flv->it_fmt != NULL)
2741                 req_capsule_extend(pill, flv->it_fmt);
2742
2743         rc = mdt_unpack_req_pack_rep(info, flv->it_flags);
2744         if (rc == 0) {
2745                 struct ptlrpc_request *req = mdt_info_req(info);
2746                 if (flv->it_flags & MUTABOR &&
2747                     req->rq_export->exp_connect_flags & OBD_CONNECT_RDONLY)
2748                         rc = -EROFS;
2749         }
2750         if (rc == 0 && flv->it_act != NULL) {
2751                 /* execute policy */
2752                 rc = flv->it_act(opc, info, lockp, flags);
2753         } else
2754                 rc = -EOPNOTSUPP;
2755         RETURN(rc);
2756 }
2757
2758 static int mdt_intent_policy(struct ldlm_namespace *ns,
2759                              struct ldlm_lock **lockp, void *req_cookie,
2760                              ldlm_mode_t mode, int flags, void *data)
2761 {
2762         struct mdt_thread_info *info;
2763         struct ptlrpc_request  *req  =  req_cookie;
2764         struct ldlm_intent     *it;
2765         struct req_capsule     *pill;
2766         struct ldlm_lock       *lock = *lockp;
2767         int rc;
2768
2769         ENTRY;
2770
2771         LASSERT(req != NULL);
2772
2773         info = lu_context_key_get(&req->rq_svc_thread->t_env->le_ctx,
2774                                   &mdt_thread_key);
2775         LASSERT(info != NULL);
2776         pill = &info->mti_pill;
2777         LASSERT(pill->rc_req == req);
2778
2779         if (req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) {
2780                 req_capsule_extend(pill, &RQF_LDLM_INTENT);
2781                 it = req_capsule_client_get(pill, &RMF_LDLM_INTENT);
2782                 if (it != NULL) {
2783                         const struct ldlm_request *dlmreq;
2784                         __u64 req_bits;
2785
2786                         LDLM_DEBUG(lock, "intent policy opc: %s\n",
2787                                    ldlm_it2str(it->opc));
2788
2789                         rc = mdt_intent_opc(it->opc, info, lockp, flags);
2790                         if (rc == 0)
2791                                 rc = ELDLM_OK;
2792
2793                         /*
2794                          * Lock without inodebits makes no sense and will oops
2795                          * later in ldlm. Let's check it now to see if we have
2796                          * wrong lock from client or bits get corrupted
2797                          * somewhere in mdt_intent_opc().
2798                          */
2799                         dlmreq = info->mti_dlm_req;
2800                         req_bits = dlmreq->lock_desc.l_policy_data.l_inodebits.bits;
2801                         LASSERT(req_bits != 0);
2802
2803                 } else
2804                         rc = err_serious(-EFAULT);
2805         } else {
2806                 /* No intent was provided */
2807                 LASSERT(pill->rc_fmt == &RQF_LDLM_ENQUEUE);
2808                 rc = req_capsule_pack(pill);
2809                 if (rc)
2810                         rc = err_serious(rc);
2811         }
2812         RETURN(rc);
2813 }
2814
2815 /*
2816  * Seq wrappers
2817  */
2818 static int mdt_seq_fini(const struct lu_env *env,
2819                         struct mdt_device *m)
2820 {
2821         struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
2822         ENTRY;
2823
2824         if (ls && ls->ls_server_seq) {
2825                 seq_server_fini(ls->ls_server_seq, env);
2826                 OBD_FREE_PTR(ls->ls_server_seq);
2827                 ls->ls_server_seq = NULL;
2828         }
2829
2830         if (ls && ls->ls_control_seq) {
2831                 seq_server_fini(ls->ls_control_seq, env);
2832                 OBD_FREE_PTR(ls->ls_control_seq);
2833                 ls->ls_control_seq = NULL;
2834         }
2835
2836         if (ls && ls->ls_client_seq) {
2837                 seq_client_fini(ls->ls_client_seq);
2838                 OBD_FREE_PTR(ls->ls_client_seq);
2839                 ls->ls_client_seq = NULL;
2840         }
2841
2842         RETURN(0);
2843 }
2844
2845 static int mdt_seq_init(const struct lu_env *env,
2846                         const char *uuid,
2847                         struct mdt_device *m)
2848 {
2849         struct lu_site *ls;
2850         char *prefix;
2851         int rc;
2852         ENTRY;
2853
2854         ls = m->mdt_md_dev.md_lu_dev.ld_site;
2855
2856         /*
2857          * This is sequence-controller node. Init seq-controller server on local
2858          * MDT.
2859          */
2860         if (ls->ls_node_id == 0) {
2861                 LASSERT(ls->ls_control_seq == NULL);
2862
2863                 OBD_ALLOC_PTR(ls->ls_control_seq);
2864                 if (ls->ls_control_seq == NULL)
2865                         RETURN(-ENOMEM);
2866
2867                 rc = seq_server_init(ls->ls_control_seq,
2868                                      m->mdt_bottom, uuid,
2869                                      LUSTRE_SEQ_CONTROLLER,
2870                                      env);
2871
2872                 if (rc)
2873                         GOTO(out_seq_fini, rc);
2874
2875                 OBD_ALLOC_PTR(ls->ls_client_seq);
2876                 if (ls->ls_client_seq == NULL)
2877                         GOTO(out_seq_fini, rc = -ENOMEM);
2878
2879                 OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
2880                 if (prefix == NULL) {
2881                         OBD_FREE_PTR(ls->ls_client_seq);
2882                         GOTO(out_seq_fini, rc = -ENOMEM);
2883                 }
2884
2885                 snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s",
2886                          uuid);
2887
2888                 /*
2889                  * Init seq-controller client after seq-controller server is
2890                  * ready. Pass ls->ls_control_seq to it for direct talking.
2891                  */
2892                 rc = seq_client_init(ls->ls_client_seq, NULL,
2893                                      LUSTRE_SEQ_METADATA, prefix,
2894                                      ls->ls_control_seq);
2895                 OBD_FREE(prefix, MAX_OBD_NAME + 5);
2896
2897                 if (rc)
2898                         GOTO(out_seq_fini, rc);
2899         }
2900
2901         /* Init seq-server on local MDT */
2902         LASSERT(ls->ls_server_seq == NULL);
2903
2904         OBD_ALLOC_PTR(ls->ls_server_seq);
2905         if (ls->ls_server_seq == NULL)
2906                 GOTO(out_seq_fini, rc = -ENOMEM);
2907
2908         rc = seq_server_init(ls->ls_server_seq,
2909                              m->mdt_bottom, uuid,
2910                              LUSTRE_SEQ_SERVER,
2911                              env);
2912         if (rc)
2913                 GOTO(out_seq_fini, rc = -ENOMEM);
2914
2915         /* Assign seq-controller client to local seq-server. */
2916         if (ls->ls_node_id == 0) {
2917                 LASSERT(ls->ls_client_seq != NULL);
2918
2919                 rc = seq_server_set_cli(ls->ls_server_seq,
2920                                         ls->ls_client_seq,
2921                                         env);
2922         }
2923
2924         EXIT;
2925 out_seq_fini:
2926         if (rc)
2927                 mdt_seq_fini(env, m);
2928
2929         return rc;
2930 }
2931 /*
2932  * Init client sequence manager which is used by local MDS to talk to sequence
2933  * controller on remote node.
2934  */
2935 static int mdt_seq_init_cli(const struct lu_env *env,
2936                             struct mdt_device *m,
2937                             struct lustre_cfg *cfg)
2938 {
2939         struct lu_site    *ls = m->mdt_md_dev.md_lu_dev.ld_site;
2940         struct obd_device *mdc;
2941         struct obd_uuid   *uuidp, *mdcuuidp;
2942         char              *uuid_str, *mdc_uuid_str;
2943         int                rc;
2944         int                index;
2945         struct mdt_thread_info *info;
2946         char *p, *index_string = lustre_cfg_string(cfg, 2);
2947         ENTRY;
2948
2949         info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
2950         uuidp = &info->mti_u.uuid[0];
2951         mdcuuidp = &info->mti_u.uuid[1];
2952
2953         LASSERT(index_string);
2954
2955         index = simple_strtol(index_string, &p, 10);
2956         if (*p) {
2957                 CERROR("Invalid index in lustre_cgf, offset 2\n");
2958                 RETURN(-EINVAL);
2959         }
2960
2961         /* check if this is adding the first MDC and controller is not yet
2962          * initialized. */
2963         if (index != 0 || ls->ls_client_seq)
2964                 RETURN(0);
2965
2966         uuid_str = lustre_cfg_string(cfg, 1);
2967         mdc_uuid_str = lustre_cfg_string(cfg, 4);
2968         obd_str2uuid(uuidp, uuid_str);
2969         obd_str2uuid(mdcuuidp, mdc_uuid_str);
2970
2971         mdc = class_find_client_obd(uuidp, LUSTRE_MDC_NAME, mdcuuidp);
2972         if (!mdc) {
2973                 CERROR("can't find controller MDC by uuid %s\n",
2974                        uuid_str);
2975                 rc = -ENOENT;
2976         } else if (!mdc->obd_set_up) {
2977                 CERROR("target %s not set up\n", mdc->obd_name);
2978                 rc = -EINVAL;
2979         } else {
2980                         LASSERT(ls->ls_control_exp);
2981                         OBD_ALLOC_PTR(ls->ls_client_seq);
2982                         if (ls->ls_client_seq != NULL) {
2983                                 char *prefix;
2984
2985                                 OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
2986                                 if (!prefix)
2987                                         RETURN(-ENOMEM);
2988
2989                                 snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s",
2990                                          mdc->obd_name);
2991
2992                                 rc = seq_client_init(ls->ls_client_seq,
2993                                                      ls->ls_control_exp,
2994                                                      LUSTRE_SEQ_METADATA,
2995                                                      prefix, NULL);
2996                                 OBD_FREE(prefix, MAX_OBD_NAME + 5);
2997                         } else
2998                                 rc = -ENOMEM;
2999
3000                         if (rc)
3001                                 RETURN(rc);
3002
3003                         LASSERT(ls->ls_server_seq != NULL);
3004
3005                         rc = seq_server_set_cli(ls->ls_server_seq,
3006                                                 ls->ls_client_seq,
3007                                                 env);
3008         }
3009
3010         RETURN(rc);
3011 }
3012
3013 static void mdt_seq_fini_cli(struct mdt_device *m)
3014 {
3015         struct lu_site *ls;
3016
3017         ENTRY;
3018
3019         ls = m->mdt_md_dev.md_lu_dev.ld_site;
3020
3021         if (ls && ls->ls_server_seq)
3022                 seq_server_set_cli(ls->ls_server_seq,
3023                                    NULL, NULL);
3024
3025         if (ls && ls->ls_control_exp) {
3026                 class_export_put(ls->ls_control_exp);
3027                 ls->ls_control_exp = NULL;
3028         }
3029         EXIT;
3030 }
3031
3032 /*
3033  * FLD wrappers
3034  */
3035 static int mdt_fld_fini(const struct lu_env *env,
3036                         struct mdt_device *m)
3037 {
3038         struct lu_site *ls = m->mdt_md_dev.md_lu_dev.ld_site;
3039         ENTRY;
3040
3041         if (ls && ls->ls_server_fld) {
3042                 fld_server_fini(ls->ls_server_fld, env);
3043                 OBD_FREE_PTR(ls->ls_server_fld);
3044                 ls->ls_server_fld = NULL;
3045         }
3046
3047         RETURN(0);
3048 }
3049
3050 static int mdt_fld_init(const struct lu_env *env,
3051                         const char *uuid,
3052                         struct mdt_device *m)
3053 {
3054         struct lu_site *ls;
3055         int rc;
3056         ENTRY;
3057
3058         ls = m->mdt_md_dev.md_lu_dev.ld_site;
3059
3060         OBD_ALLOC_PTR(ls->ls_server_fld);
3061         if (ls->ls_server_fld == NULL)
3062                 RETURN(rc = -ENOMEM);
3063
3064         rc = fld_server_init(ls->ls_server_fld,
3065                              m->mdt_bottom, uuid, env);
3066         if (rc) {
3067                 OBD_FREE_PTR(ls->ls_server_fld);
3068                 ls->ls_server_fld = NULL;
3069                 RETURN(rc);
3070         }
3071
3072         RETURN(0);
3073 }
3074
3075 /* device init/fini methods */
3076 static void mdt_stop_ptlrpc_service(struct mdt_device *m)
3077 {
3078         if (m->mdt_regular_service != NULL) {
3079                 ptlrpc_unregister_service(m->mdt_regular_service);
3080                 m->mdt_regular_service = NULL;
3081         }
3082         if (m->mdt_readpage_service != NULL) {
3083                 ptlrpc_unregister_service(m->mdt_readpage_service);
3084                 m->mdt_readpage_service = NULL;
3085         }
3086         if (m->mdt_setattr_service != NULL) {
3087                 ptlrpc_unregister_service(m->mdt_setattr_service);
3088                 m->mdt_setattr_service = NULL;
3089         }
3090         if (m->mdt_mdsc_service != NULL) {
3091                 ptlrpc_unregister_service(m->mdt_mdsc_service);
3092                 m->mdt_mdsc_service = NULL;
3093         }
3094         if (m->mdt_mdss_service != NULL) {
3095                 ptlrpc_unregister_service(m->mdt_mdss_service);
3096                 m->mdt_mdss_service = NULL;
3097         }
3098         if (m->mdt_dtss_service != NULL) {
3099                 ptlrpc_unregister_service(m->mdt_dtss_service);
3100                 m->mdt_dtss_service = NULL;
3101         }
3102         if (m->mdt_fld_service != NULL) {
3103                 ptlrpc_unregister_service(m->mdt_fld_service);
3104                 m->mdt_fld_service = NULL;
3105         }
3106 }
3107
3108 static int mdt_start_ptlrpc_service(struct mdt_device *m)
3109 {
3110         int rc;
3111         static struct ptlrpc_service_conf conf;
3112         cfs_proc_dir_entry_t *procfs_entry;
3113         ENTRY;
3114
3115         procfs_entry = m->mdt_md_dev.md_lu_dev.ld_obd->obd_proc_entry;
3116
3117         conf = (typeof(conf)) {
3118                 .psc_nbufs            = MDS_NBUFS,
3119                 .psc_bufsize          = MDS_BUFSIZE,
3120                 .psc_max_req_size     = MDS_MAXREQSIZE,
3121                 .psc_max_reply_size   = MDS_MAXREPSIZE,
3122                 .psc_req_portal       = MDS_REQUEST_PORTAL,
3123                 .psc_rep_portal       = MDC_REPLY_PORTAL,
3124                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
3125                 /*
3126                  * We'd like to have a mechanism to set this on a per-device
3127                  * basis, but alas...
3128                  */
3129                 .psc_num_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
3130                                        MDT_MAX_THREADS),
3131                 .psc_ctx_tags      = LCT_MD_THREAD
3132         };
3133
3134         m->mdt_ldlm_client = &m->mdt_md_dev.md_lu_dev.ld_obd->obd_ldlm_client;
3135         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
3136                            "mdt_ldlm_client", m->mdt_ldlm_client);
3137
3138         m->mdt_regular_service =
3139                 ptlrpc_init_svc_conf(&conf, mdt_regular_handle, LUSTRE_MDT_NAME,
3140                                      procfs_entry, NULL);
3141         if (m->mdt_regular_service == NULL)
3142                 RETURN(-ENOMEM);
3143
3144         rc = ptlrpc_start_threads(NULL, m->mdt_regular_service, LUSTRE_MDT_NAME);
3145         if (rc)
3146                 GOTO(err_mdt_svc, rc);
3147
3148         /*
3149          * readpage service configuration. Parameters have to be adjusted,
3150          * ideally.
3151          */
3152         conf = (typeof(conf)) {
3153                 .psc_nbufs            = MDS_NBUFS,
3154                 .psc_bufsize          = MDS_BUFSIZE,
3155                 .psc_max_req_size     = MDS_MAXREQSIZE,
3156                 .psc_max_reply_size   = MDS_MAXREPSIZE,
3157                 .psc_req_portal       = MDS_READPAGE_PORTAL,
3158                 .psc_rep_portal       = MDC_REPLY_PORTAL,
3159                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
3160                 .psc_num_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
3161                                        MDT_MAX_THREADS),
3162                 .psc_ctx_tags      = LCT_MD_THREAD
3163         };
3164         m->mdt_readpage_service =
3165                 ptlrpc_init_svc_conf(&conf, mdt_readpage_handle,
3166                                      LUSTRE_MDT_NAME "_readpage",
3167                                      procfs_entry, NULL);
3168
3169         if (m->mdt_readpage_service == NULL) {
3170                 CERROR("failed to start readpage service\n");
3171                 GOTO(err_mdt_svc, rc = -ENOMEM);
3172         }
3173
3174         rc = ptlrpc_start_threads(NULL, m->mdt_readpage_service, "mdt_rdpg");
3175
3176         /*
3177          * setattr service configuration.
3178          */
3179         conf = (typeof(conf)) {
3180                 .psc_nbufs            = MDS_NBUFS,
3181                 .psc_bufsize          = MDS_BUFSIZE,
3182                 .psc_max_req_size     = MDS_MAXREQSIZE,
3183                 .psc_max_reply_size   = MDS_MAXREPSIZE,
3184                 .psc_req_portal       = MDS_SETATTR_PORTAL,
3185                 .psc_rep_portal       = MDC_REPLY_PORTAL,
3186                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
3187                 .psc_num_threads   = min(max(mdt_num_threads, MDT_MIN_THREADS),
3188                                        MDT_MAX_THREADS),
3189                 .psc_ctx_tags      = LCT_MD_THREAD
3190         };
3191
3192         m->mdt_setattr_service =
3193                 ptlrpc_init_svc_conf(&conf, mdt_regular_handle,
3194                                      LUSTRE_MDT_NAME "_setattr",
3195                                      procfs_entry, NULL);
3196
3197         if (!m->mdt_setattr_service) {
3198                 CERROR("failed to start setattr service\n");
3199                 GOTO(err_mdt_svc, rc = -ENOMEM);
3200         }
3201
3202         rc = ptlrpc_start_threads(NULL, m->mdt_setattr_service, "mdt_attr");
3203         if (rc)
3204                 GOTO(err_mdt_svc, rc);
3205
3206         /*
3207          * sequence controller service configuration
3208          */
3209         conf = (typeof(conf)) {
3210                 .psc_nbufs = MDS_NBUFS,
3211                 .psc_bufsize = MDS_BUFSIZE,
3212                 .psc_max_req_size = SEQ_MAXREQSIZE,
3213                 .psc_max_reply_size = SEQ_MAXREPSIZE,
3214                 .psc_req_portal = SEQ_CONTROLLER_PORTAL,
3215                 .psc_rep_portal = MDC_REPLY_PORTAL,
3216                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
3217                 .psc_num_threads = SEQ_NUM_THREADS,
3218                 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
3219         };
3220
3221         m->mdt_mdsc_service =
3222                 ptlrpc_init_svc_conf(&conf, mdt_mdsc_handle,
3223                                      LUSTRE_MDT_NAME"_mdsc",
3224                                      procfs_entry, NULL);
3225         if (!m->mdt_mdsc_service) {
3226                 CERROR("failed to start seq controller service\n");
3227                 GOTO(err_mdt_svc, rc = -ENOMEM);
3228         }
3229
3230         rc = ptlrpc_start_threads(NULL, m->mdt_mdsc_service, "mdt_mdsc");
3231         if (rc)
3232                 GOTO(err_mdt_svc, rc);
3233
3234         /*
3235          * metadata sequence server service configuration
3236          */
3237         conf = (typeof(conf)) {
3238                 .psc_nbufs = MDS_NBUFS,
3239                 .psc_bufsize = MDS_BUFSIZE,
3240                 .psc_max_req_size = SEQ_MAXREQSIZE,
3241                 .psc_max_reply_size = SEQ_MAXREPSIZE,
3242                 .psc_req_portal = SEQ_METADATA_PORTAL,
3243                 .psc_rep_portal = MDC_REPLY_PORTAL,
3244                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
3245                 .psc_num_threads = SEQ_NUM_THREADS,
3246                 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
3247         };
3248
3249         m->mdt_mdss_service =
3250                 ptlrpc_init_svc_conf(&conf, mdt_mdss_handle,
3251                                      LUSTRE_MDT_NAME"_mdss",
3252                                      procfs_entry, NULL);
3253         if (!m->mdt_mdss_service) {
3254                 CERROR("failed to start metadata seq server service\n");
3255                 GOTO(err_mdt_svc, rc = -ENOMEM);
3256         }
3257
3258         rc = ptlrpc_start_threads(NULL, m->mdt_mdss_service, "mdt_mdss");
3259         if (rc)
3260                 GOTO(err_mdt_svc, rc);
3261
3262
3263         /*
3264          * Data sequence server service configuration. We want to have really
3265          * cluster-wide sequences space. This is why we start only one sequence
3266          * controller which manages space.
3267          */
3268         conf = (typeof(conf)) {
3269                 .psc_nbufs = MDS_NBUFS,
3270                 .psc_bufsize = MDS_BUFSIZE,
3271                 .psc_max_req_size = SEQ_MAXREQSIZE,
3272                 .psc_max_reply_size = SEQ_MAXREPSIZE,
3273                 .psc_req_portal = SEQ_DATA_PORTAL,
3274                 .psc_rep_portal = OSC_REPLY_PORTAL,
3275                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
3276                 .psc_num_threads = SEQ_NUM_THREADS,
3277                 .psc_ctx_tags = LCT_MD_THREAD|LCT_DT_THREAD
3278         };
3279
3280         m->mdt_dtss_service =
3281                 ptlrpc_init_svc_conf(&conf, mdt_dtss_handle,
3282                                      LUSTRE_MDT_NAME"_dtss",
3283                                      procfs_entry, NULL);
3284         if (!m->mdt_dtss_service) {
3285                 CERROR("failed to start data seq server service\n");
3286                 GOTO(err_mdt_svc, rc = -ENOMEM);
3287         }
3288
3289         rc = ptlrpc_start_threads(NULL, m->mdt_dtss_service, "mdt_dtss");
3290         if (rc)
3291                 GOTO(err_mdt_svc, rc);
3292
3293         /* FLD service start */
3294         conf = (typeof(conf)) {
3295                 .psc_nbufs            = MDS_NBUFS,
3296                 .psc_bufsize          = MDS_BUFSIZE,
3297                 .psc_max_req_size     = FLD_MAXREQSIZE,
3298                 .psc_max_reply_size   = FLD_MAXREPSIZE,
3299                 .psc_req_portal       = FLD_REQUEST_PORTAL,
3300                 .psc_rep_portal       = MDC_REPLY_PORTAL,
3301                 .psc_watchdog_timeout = MDT_SERVICE_WATCHDOG_TIMEOUT,
3302                 .psc_num_threads      = FLD_NUM_THREADS,
3303                 .psc_ctx_tags         = LCT_DT_THREAD|LCT_MD_THREAD
3304         };
3305
3306         m->mdt_fld_service =
3307                 ptlrpc_init_svc_conf(&conf, mdt_fld_handle,
3308                                      LUSTRE_MDT_NAME"_fld",
3309                                      procfs_entry, NULL);
3310         if (!m->mdt_fld_service) {
3311                 CERROR("failed to start fld service\n");
3312                 GOTO(err_mdt_svc, rc = -ENOMEM);
3313         }
3314
3315         rc = ptlrpc_start_threads(NULL, m->mdt_fld_service, "mdt_fld");
3316         if (rc)
3317                 GOTO(err_mdt_svc, rc);
3318
3319         EXIT;
3320 err_mdt_svc:
3321         if (rc)
3322                 mdt_stop_ptlrpc_service(m);
3323
3324         return rc;
3325 }
3326
3327 static void mdt_stack_fini(const struct lu_env *env,
3328                            struct mdt_device *m, struct lu_device *top)
3329 {
3330         struct lu_device        *d = top, *n;
3331         struct lustre_cfg_bufs  *bufs;
3332         struct lustre_cfg       *lcfg;
3333         struct mdt_thread_info  *info;
3334         ENTRY;
3335
3336         info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
3337         LASSERT(info != NULL);
3338
3339         bufs = &info->mti_u.bufs;
3340         /* process cleanup, pass mdt obd name to get obd umount flags */
3341         lustre_cfg_bufs_reset(bufs, m->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
3342         lcfg = lustre_cfg_new(LCFG_CLEANUP, bufs);
3343         if (!lcfg) {
3344                 CERROR("Cannot alloc lcfg!\n");
3345                 return;
3346         }
3347         LASSERT(top);
3348         top->ld_ops->ldo_process_config(env, top, lcfg);
3349         lustre_cfg_free(lcfg);
3350
3351         lu_site_purge(env, top->ld_site, ~0);
3352         while (d != NULL) {
3353                 struct obd_type *type;
3354                 struct lu_device_type *ldt = d->ld_type;
3355
3356                 /* each fini() returns next device in stack of layers
3357                  * * so we can avoid the recursion */
3358                 n = ldt->ldt_ops->ldto_device_fini(env, d);
3359                 lu_device_put(d);
3360                 ldt->ldt_ops->ldto_device_free(env, d);
3361                 type = ldt->ldt_obd_type;
3362                 type->typ_refcnt--;
3363                 class_put_type(type);
3364
3365                 /* switch to the next device in the layer */
3366                 d = n;
3367         }
3368         m->mdt_child = NULL;
3369 }
3370
3371 static struct lu_device *mdt_layer_setup(const struct lu_env *env,
3372                                          const char *typename,
3373                                          struct lu_device *child,
3374                                          struct lustre_cfg *cfg)
3375 {
3376         const char            *dev = lustre_cfg_string(cfg, 0);
3377         struct obd_type       *type;
3378         struct lu_device_type *ldt;
3379         struct lu_device      *d;
3380         int rc;
3381         ENTRY;
3382
3383         /* find the type */
3384         type = class_get_type(typename);
3385         if (!type) {
3386                 CERROR("Unknown type: '%s'\n", typename);
3387                 GOTO(out, rc = -ENODEV);
3388         }
3389
3390         rc = lu_context_refill(&env->le_ctx);
3391         if (rc != 0) {
3392                 CERROR("Failure to refill context: '%d'\n", rc);
3393                 GOTO(out_type, rc);
3394         }
3395
3396         if (env->le_ses != NULL) {
3397                 rc = lu_context_refill(env->le_ses);
3398                 if (rc != 0) {
3399                         CERROR("Failure to refill session: '%d'\n", rc);
3400                         GOTO(out_type, rc);
3401                 }
3402         }
3403
3404         ldt = type->typ_lu;
3405         if (ldt == NULL) {
3406                 CERROR("type: '%s'\n", typename);
3407                 GOTO(out_type, rc = -EINVAL);
3408         }
3409
3410         ldt->ldt_obd_type = type;
3411         d = ldt->ldt_ops->ldto_device_alloc(env, ldt, cfg);
3412         if (IS_ERR(d)) {
3413                 CERROR("Cannot allocate device: '%s'\n", typename);
3414                 GOTO(out_type, rc = -ENODEV);
3415         }
3416
3417         LASSERT(child->ld_site);
3418         d->ld_site = child->ld_site;
3419
3420         type->typ_refcnt++;
3421         rc = ldt->ldt_ops->ldto_device_init(env, d, dev, child);
3422         if (rc) {
3423                 CERROR("can't init device '%s', rc %d\n", typename, rc);
3424                 GOTO(out_alloc, rc);
3425         }
3426         lu_device_get(d);
3427
3428         RETURN(d);
3429
3430 out_alloc:
3431         ldt->ldt_ops->ldto_device_free(env, d);
3432         type->typ_refcnt--;
3433 out_type:
3434         class_put_type(type);
3435 out:
3436         return ERR_PTR(rc);
3437 }
3438
3439 static int mdt_stack_init(const struct lu_env *env,
3440                           struct mdt_device *m, struct lustre_cfg *cfg)
3441 {
3442         struct lu_device  *d = &m->mdt_md_dev.md_lu_dev;
3443         struct lu_device  *tmp;
3444         struct md_device  *md;
3445         int rc;
3446         ENTRY;
3447
3448         /* init the stack */
3449         tmp = mdt_layer_setup(env, LUSTRE_OSD_NAME, d, cfg);
3450         if (IS_ERR(tmp)) {
3451                 RETURN(PTR_ERR(tmp));
3452         }
3453         m->mdt_bottom = lu2dt_dev(tmp);
3454         d = tmp;
3455         tmp = mdt_layer_setup(env, LUSTRE_MDD_NAME, d, cfg);
3456         if (IS_ERR(tmp)) {
3457                 GOTO(out, rc = PTR_ERR(tmp));
3458         }
3459         d = tmp;
3460         md = lu2md_dev(d);
3461
3462         tmp = mdt_layer_setup(env, LUSTRE_CMM_NAME, d, cfg);
3463         if (IS_ERR(tmp)) {
3464                 GOTO(out, rc = PTR_ERR(tmp));
3465         }
3466         d = tmp;
3467         /*set mdd upcall device*/
3468         md->md_upcall.mu_upcall_dev = lu2md_dev(d);
3469
3470         md = lu2md_dev(d);
3471         /*set cmm upcall device*/
3472         md->md_upcall.mu_upcall_dev = &m->mdt_md_dev;
3473
3474         m->mdt_child = lu2md_dev(d);
3475
3476         /* process setup config */
3477         tmp = &m->mdt_md_dev.md_lu_dev;
3478         rc = tmp->ld_ops->ldo_process_config(env, tmp, cfg);
3479         GOTO(out, rc);
3480 out:
3481         /* fini from last known good lu_device */
3482         if (rc)
3483                 mdt_stack_fini(env, m, d);
3484
3485         return rc;
3486 }
3487
3488 static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
3489 {
3490         struct md_device *next = m->mdt_child;
3491         struct lu_device  *d = &m->mdt_md_dev.md_lu_dev;
3492         struct lu_site    *ls = d->ld_site;
3493
3494         ENTRY;
3495
3496         mdt_fs_cleanup(env, m);
3497
3498         ping_evictor_stop();
3499         mdt_stop_ptlrpc_service(m);
3500
3501         cleanup_capas(CAPA_SITE_SERVER);
3502         del_timer(&m->mdt_ck_timer);
3503         mdt_ck_thread_stop(m);
3504
3505         upcall_cache_cleanup(m->mdt_rmtacl_cache);
3506         m->mdt_rmtacl_cache = NULL;
3507
3508         upcall_cache_cleanup(m->mdt_identity_cache);
3509         m->mdt_identity_cache = NULL;
3510
3511         if (m->mdt_namespace != NULL) {
3512                 ldlm_namespace_free(m->mdt_namespace, 0);
3513                 d->ld_obd->obd_namespace = m->mdt_namespace = NULL;
3514         }
3515
3516         mdt_seq_fini(env, m);
3517         mdt_seq_fini_cli(m);
3518         mdt_fld_fini(env, m);
3519         mdt_procfs_fini(m);
3520         ptlrpc_lprocfs_unregister_obd(d->ld_obd);
3521         lprocfs_obd_cleanup(d->ld_obd);
3522
3523         if (m->mdt_rootsquash_info) {
3524                 OBD_FREE_PTR(m->mdt_rootsquash_info);
3525                 m->mdt_rootsquash_info = NULL;
3526         }
3527
3528         next->md_ops->mdo_init_capa_ctxt(env, next, 0, 0, 0, NULL);
3529         cleanup_capas(CAPA_SITE_SERVER);
3530         del_timer(&m->mdt_ck_timer);
3531         mdt_ck_thread_stop(m);
3532
3533         /* finish the stack */
3534         mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
3535
3536         if (ls) {
3537                 if (!list_empty(&ls->ls_lru) ||
3538                     ls->ls_total != 0 || ls->ls_busy != 0) {
3539                         /*
3540                          * Uh-oh, objects still exist.
3541                          */
3542                         static DECLARE_LU_CDEBUG_PRINT_INFO(cookie, D_ERROR);
3543
3544                         lu_site_print(env, ls, &cookie, lu_cdebug_printer);
3545                 }
3546
3547                 lu_site_fini(ls);
3548                 OBD_FREE_PTR(ls);
3549                 d->ld_site = NULL;
3550         }
3551         LASSERT(atomic_read(&d->ld_ref) == 0);
3552         md_device_fini(&m->mdt_md_dev);
3553
3554         EXIT;
3555 }
3556
3557 static void fsoptions_to_mdt_flags(struct mdt_device *m, char *options)
3558 {
3559         char *p = options;
3560
3561         if (!options)
3562                 return;
3563
3564         while (*options) {
3565                 int len;
3566
3567                 while (*p && *p != ',')
3568                         p++;
3569
3570                 len = p - options;
3571                 if ((len == sizeof("user_xattr") - 1) &&
3572                     (memcmp(options, "user_xattr", len) == 0)) {
3573                         m->mdt_opts.mo_user_xattr = 1;
3574                         LCONSOLE_INFO("Enabling user_xattr\n");
3575                 } else if ((len == sizeof("nouser_xattr") - 1) &&
3576                            (memcmp(options, "nouser_xattr", len) == 0)) {
3577                         m->mdt_opts.mo_user_xattr = 0;
3578                         LCONSOLE_INFO("Disabling user_xattr\n");
3579                 } else if ((len == sizeof("acl") - 1) &&
3580                            (memcmp(options, "acl", len) == 0)) {
3581 #ifdef CONFIG_FS_POSIX_ACL
3582                         m->mdt_opts.mo_acl = 1;
3583                         LCONSOLE_INFO("Enabling ACL\n");
3584 #else
3585                         m->mdt_opts.mo_acl = 0;
3586                         CWARN("ignoring unsupported acl mount option\n");
3587                         LCONSOLE_INFO("Disabling ACL\n");
3588 #endif
3589                 } else if ((len == sizeof("noacl") - 1) &&
3590                            (memcmp(options, "noacl", len) == 0)) {
3591 #ifdef CONFIG_FS_POSIX_ACL
3592                         m->mdt_opts.mo_acl = 0;
3593                         LCONSOLE_INFO("Disabling ACL\n");
3594 #endif
3595                 }
3596
3597                 options = ++p;
3598         }
3599 }
3600
3601 int mdt_postrecov(const struct lu_env *, struct mdt_device *);
3602
3603 static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
3604                      struct lu_device_type *ldt, struct lustre_cfg *cfg)
3605 {
3606         struct lprocfs_static_vars lvars;
3607         struct mdt_thread_info    *info;
3608         struct obd_device         *obd;
3609         const char                *dev = lustre_cfg_string(cfg, 0);
3610         const char                *num = lustre_cfg_string(cfg, 2);
3611         struct lustre_mount_info  *lmi;
3612         struct lustre_sb_info     *lsi;
3613         struct lu_site            *s;
3614         int                        rc;
3615         ENTRY;
3616
3617         info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
3618         LASSERT(info != NULL);
3619
3620         obd = class_name2obd(dev);
3621         LASSERT(obd != NULL);
3622
3623         spin_lock_init(&m->mdt_transno_lock);
3624
3625         m->mdt_max_mdsize = MAX_MD_SIZE;
3626         m->mdt_max_cookiesize = sizeof(struct llog_cookie);
3627
3628         m->mdt_opts.mo_user_xattr = 0;
3629         m->mdt_opts.mo_acl = 0;
3630         lmi = server_get_mount_2(dev);
3631         if (lmi == NULL) {
3632                 CERROR("Cannot get mount info for %s!\n", dev);
3633                 RETURN(-EFAULT);
3634         } else {
3635                 lsi = s2lsi(lmi->lmi_sb);
3636                 fsoptions_to_mdt_flags(m, lsi->lsi_lmd->lmd_opts);
3637                 server_put_mount_2(dev, lmi->lmi_mnt);
3638         }
3639
3640         spin_lock_init(&m->mdt_ioepoch_lock);
3641         m->mdt_opts.mo_compat_resname = 0;
3642         m->mdt_capa_timeout = CAPA_TIMEOUT;
3643         m->mdt_capa_alg = CAPA_HMAC_ALG_SHA1;
3644         m->mdt_ck_timeout = CAPA_KEY_TIMEOUT;
3645         obd->obd_replayable = 1;
3646         spin_lock_init(&m->mdt_client_bitmap_lock);
3647
3648         OBD_ALLOC_PTR(s);
3649         if (s == NULL)
3650                 RETURN(-ENOMEM);
3651
3652         md_device_init(&m->mdt_md_dev, ldt);
3653         m->mdt_md_dev.md_lu_dev.ld_ops = &mdt_lu_ops;
3654         m->mdt_md_dev.md_lu_dev.ld_obd = obd;
3655         /* set this lu_device to obd, because error handling need it */
3656         obd->obd_lu_dev = &m->mdt_md_dev.md_lu_dev;
3657
3658         rc = lu_site_init(s, &m->mdt_md_dev.md_lu_dev);
3659         if (rc) {
3660                 CERROR("Can't init lu_site, rc %d\n", rc);
3661                 GOTO(err_free_site, rc);
3662         }
3663
3664         lprocfs_init_vars(mdt, &lvars);
3665         rc = lprocfs_obd_setup(obd, lvars.obd_vars);
3666         if (rc) {
3667                 CERROR("Can't init lprocfs, rc %d\n", rc);
3668                 GOTO(err_fini_site, rc);
3669         }
3670         ptlrpc_lprocfs_register_obd(obd);
3671
3672         rc = mdt_procfs_init(m, dev);
3673         if (rc) {
3674                 CERROR("Can't init MDT lprocfs, rc %d\n", rc);
3675                 GOTO(err_fini_proc, rc);
3676         }
3677
3678         /* set server index */
3679         LASSERT(num);
3680         s->ls_node_id = simple_strtol(num, NULL, 10);
3681
3682         /* init the stack */
3683         rc = mdt_stack_init(env, m, cfg);
3684         if (rc) {
3685                 CERROR("Can't init device stack, rc %d\n", rc);
3686                 GOTO(err_fini_proc, rc);
3687         }
3688
3689         rc = mdt_fld_init(env, obd->obd_name, m);
3690         if (rc)
3691                 GOTO(err_fini_stack, rc);
3692
3693         rc = mdt_seq_init(env, obd->obd_name, m);
3694         if (rc)
3695                 GOTO(err_fini_fld, rc);
3696
3697         snprintf(info->mti_u.ns_name, sizeof info->mti_u.ns_name,
3698                  LUSTRE_MDT_NAME"-%p", m);
3699         m->mdt_namespace = ldlm_namespace_new(info->mti_u.ns_name,
3700                                               LDLM_NAMESPACE_SERVER);
3701         if (m->mdt_namespace == NULL)
3702                 GOTO(err_fini_seq, rc = -ENOMEM);
3703
3704         ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
3705         /* set obd_namespace for compatibility with old code */
3706         obd->obd_namespace = m->mdt_namespace;
3707
3708         m->mdt_identity_cache = upcall_cache_init(obd->obd_name,
3709                                                   "NONE",
3710                                                   &mdt_identity_upcall_cache_ops);
3711         if (IS_ERR(m->mdt_identity_cache)) {
3712                 rc = PTR_ERR(m->mdt_identity_cache);
3713                 m->mdt_identity_cache = NULL;
3714                 GOTO(err_free_ns, rc);
3715         }
3716
3717         m->mdt_rmtacl_cache = upcall_cache_init(obd->obd_name,
3718                                                 MDT_RMTACL_UPCALL_PATH,
3719                                                 &mdt_rmtacl_upcall_cache_ops);
3720         if (IS_ERR(m->mdt_rmtacl_cache)) {
3721                 rc = PTR_ERR(m->mdt_rmtacl_cache);
3722                 m->mdt_rmtacl_cache = NULL;
3723                 GOTO(err_free_ns, rc);
3724         }
3725
3726         m->mdt_ck_timer.function = mdt_ck_timer_callback;
3727         m->mdt_ck_timer.data = (unsigned long)m;
3728         init_timer(&m->mdt_ck_timer);
3729         rc = mdt_ck_thread_start(m);
3730         if (rc)
3731                 GOTO(err_free_ns, rc);
3732
3733         rc = mdt_start_ptlrpc_service(m);
3734         if (rc)
3735                 GOTO(err_capa, rc);
3736
3737         ping_evictor_start();
3738
3739         rc = mdt_fs_setup(env, m, obd);
3740         if (rc)
3741                 GOTO(err_stop_service, rc);
3742
3743         rc = lu_site_init_finish(s);
3744         if (rc)
3745                 GOTO(err_fs_cleanup, rc);
3746
3747         if (obd->obd_recovering == 0)
3748                 mdt_postrecov(env, m);
3749
3750         mdt_init_capa_ctxt(env, m);
3751         RETURN(0);
3752
3753 err_fs_cleanup:
3754         mdt_fs_cleanup(env, m);
3755 err_stop_service:
3756         mdt_stop_ptlrpc_service(m);
3757 err_capa:
3758         del_timer(&m->mdt_ck_timer);
3759         mdt_ck_thread_stop(m);
3760 err_free_ns:
3761         upcall_cache_cleanup(m->mdt_rmtacl_cache);
3762         m->mdt_rmtacl_cache = NULL;
3763         upcall_cache_cleanup(m->mdt_identity_cache);
3764         m->mdt_identity_cache = NULL;
3765         ldlm_namespace_free(m->mdt_namespace, 0);
3766         obd->obd_namespace = m->mdt_namespace = NULL;
3767 err_fini_seq:
3768         mdt_seq_fini(env, m);
3769 err_fini_fld:
3770         mdt_fld_fini(env, m);
3771 err_fini_stack:
3772         mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
3773 err_fini_proc:
3774         mdt_procfs_fini(m);
3775         lprocfs_obd_cleanup(obd);
3776 err_fini_site:
3777         lu_site_fini(s);
3778 err_free_site:
3779         OBD_FREE_PTR(s);
3780
3781         md_device_fini(&m->mdt_md_dev);
3782         return (rc);
3783 }
3784
3785 /* used by MGS to process specific configurations */
3786 static int mdt_process_config(const struct lu_env *env,
3787                               struct lu_device *d, struct lustre_cfg *cfg)
3788 {
3789         struct mdt_device *m = mdt_dev(d);
3790         struct md_device *md_next = m->mdt_child;
3791         struct lu_device *next = md2lu_dev(md_next);
3792         int rc = 0;
3793         ENTRY;
3794
3795         switch (cfg->lcfg_command) {
3796         case LCFG_PARAM: {
3797                 struct lprocfs_static_vars lvars;
3798                 struct obd_device *obd = d->ld_obd;
3799
3800                 lprocfs_init_vars(mdt, &lvars);
3801                 rc = class_process_proc_param(PARAM_MDT, lvars.obd_vars, cfg, obd);
3802                 if (rc)
3803                         /* others are passed further */
3804                         rc = next->ld_ops->ldo_process_config(env, next, cfg);
3805                 break;
3806         }
3807         case LCFG_ADD_MDC:
3808                 /*
3809                  * Add mdc hook to get first MDT uuid and connect it to
3810                  * ls->controller to use for seq manager.
3811                  */
3812                 rc = next->ld_ops->ldo_process_config(env, next, cfg);
3813                 if (rc)
3814                         CERROR("Can't add mdc, rc %d\n", rc);
3815                 else
3816                         rc = mdt_seq_init_cli(env, mdt_dev(d), cfg);
3817                 break;
3818         default:
3819                 /* others are passed further */
3820                 rc = next->ld_ops->ldo_process_config(env, next, cfg);
3821                 break;
3822         }
3823         RETURN(rc);
3824 }
3825
3826 static struct lu_object *mdt_object_alloc(const struct lu_env *env,
3827                                           const struct lu_object_header *hdr,
3828                                           struct lu_device *d)
3829 {
3830         struct mdt_object *mo;
3831
3832         ENTRY;
3833
3834         OBD_ALLOC_PTR(mo);
3835         if (mo != NULL) {
3836                 struct lu_object *o;
3837                 struct lu_object_header *h;
3838
3839                 o = &mo->mot_obj.mo_lu;
3840                 h = &mo->mot_header;
3841                 lu_object_header_init(h);
3842                 lu_object_init(o, h, d);
3843                 lu_object_add_top(h, o);
3844                 o->lo_ops = &mdt_obj_ops;
3845                 RETURN(o);
3846         } else
3847                 RETURN(NULL);
3848 }
3849
3850 static int mdt_object_init(const struct lu_env *env, struct lu_object *o)
3851 {
3852         struct mdt_device *d = mdt_dev(o->lo_dev);
3853         struct lu_device  *under;
3854         struct lu_object  *below;
3855         int                rc = 0;
3856         ENTRY;
3857
3858         CDEBUG(D_INFO, "object init, fid = "DFID"\n",
3859                PFID(lu_object_fid(o)));
3860
3861         under = &d->mdt_child->md_lu_dev;
3862         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
3863         if (below != NULL) {
3864                 lu_object_add(o, below);
3865         } else
3866                 rc = -ENOMEM;
3867
3868         RETURN(rc);
3869 }
3870
3871 static void mdt_object_free(const struct lu_env *env, struct lu_object *o)
3872 {
3873         struct mdt_object *mo = mdt_obj(o);
3874         struct lu_object_header *h;
3875         ENTRY;
3876
3877         h = o->lo_header;
3878         CDEBUG(D_INFO, "object free, fid = "DFID"\n",
3879                PFID(lu_object_fid(o)));
3880
3881         lu_object_fini(o);
3882         lu_object_header_fini(h);
3883         OBD_FREE_PTR(mo);
3884         EXIT;
3885 }
3886
3887 static int mdt_object_print(const struct lu_env *env, void *cookie,
3888                             lu_printer_t p, const struct lu_object *o)
3889 {
3890         return (*p)(env, cookie, LUSTRE_MDT_NAME"-object@%p", o);
3891 }
3892
3893 static struct lu_device_operations mdt_lu_ops = {
3894         .ldo_object_alloc   = mdt_object_alloc,
3895         .ldo_process_config = mdt_process_config
3896 };
3897
3898 static struct lu_object_operations mdt_obj_ops = {
3899         .loo_object_init    = mdt_object_init,
3900         .loo_object_free    = mdt_object_free,
3901         .loo_object_print   = mdt_object_print
3902 };
3903
3904 /* mds_connect_internal */
3905 static int mdt_connect_internal(struct obd_export *exp,
3906                                 struct mdt_device *mdt,
3907                                 struct obd_connect_data *data)
3908 {
3909         __u64 flags;
3910
3911         if (data != NULL) {
3912                 data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
3913                 data->ocd_ibits_known &= MDS_INODELOCK_FULL;
3914
3915                 /* If no known bits (which should not happen, probably,
3916                    as everybody should support LOOKUP and UPDATE bits at least)
3917                    revert to compat mode with plain locks. */
3918                 if (!data->ocd_ibits_known &&
3919                     data->ocd_connect_flags & OBD_CONNECT_IBITS)
3920                         data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
3921
3922                 if (!mdt->mdt_opts.mo_acl)
3923                         data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
3924
3925                 if (!mdt->mdt_opts.mo_user_xattr)
3926                         data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
3927
3928                 if (!mdt->mdt_opts.mo_mds_capa)
3929                         data->ocd_connect_flags &= ~OBD_CONNECT_MDS_CAPA;
3930
3931                 if (!mdt->mdt_opts.mo_oss_capa)
3932                         data->ocd_connect_flags &= ~OBD_CONNECT_OSS_CAPA;
3933
3934                 exp->exp_connect_flags = data->ocd_connect_flags;
3935                 data->ocd_version = LUSTRE_VERSION_CODE;
3936                 exp->exp_mdt_data.med_ibits_known = data->ocd_ibits_known;
3937         }
3938
3939 #if 0
3940         if (mdt->mdt_opts.mo_acl &&
3941             ((exp->exp_connect_flags & OBD_CONNECT_ACL) == 0)) {
3942                 CWARN("%s: MDS requires ACL support but client does not\n",
3943                       mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
3944                 return -EBADE;
3945         }
3946 #endif
3947
3948         flags = OBD_CONNECT_LCL_CLIENT | OBD_CONNECT_RMT_CLIENT;
3949         if ((exp->exp_connect_flags & flags) == flags) {
3950                 CWARN("%s: both local and remote client flags are set\n",
3951                       mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
3952                 return -EBADE;
3953         }
3954
3955         if (mdt->mdt_opts.mo_mds_capa &&
3956             ((exp->exp_connect_flags & OBD_CONNECT_MDS_CAPA) == 0)) {
3957                 CWARN("%s: MDS requires capability support, but client not\n",
3958                       mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
3959                 return -EBADE;
3960         }
3961
3962         if (mdt->mdt_opts.mo_oss_capa &&
3963             ((exp->exp_connect_flags & OBD_CONNECT_OSS_CAPA) == 0)) {
3964                 CWARN("%s: MDS requires OSS capability support, "
3965                       "but client not\n",
3966                       mdt->mdt_md_dev.md_lu_dev.ld_obd->obd_name);
3967                 return -EBADE;
3968         }
3969
3970         return 0;
3971 }
3972
3973 /* mds_connect copy */
3974 static int mdt_obd_connect(const struct lu_env *env,
3975                            struct lustre_handle *conn, struct obd_device *obd,
3976                            struct obd_uuid *cluuid,
3977                            struct obd_connect_data *data)
3978 {
3979         struct mdt_export_data *med;
3980         struct mdt_client_data *mcd;
3981         struct obd_export      *exp;
3982         struct mdt_device      *mdt;
3983         int                     rc;
3984         ENTRY;
3985
3986         LASSERT(env != NULL);
3987         if (!conn || !obd || !cluuid)
3988                 RETURN(-EINVAL);
3989
3990         mdt = mdt_dev(obd->obd_lu_dev);
3991
3992         rc = class_connect(conn, obd, cluuid);
3993         if (rc)
3994                 RETURN(rc);
3995
3996         exp = class_conn2export(conn);
3997         LASSERT(exp != NULL);
3998         med = &exp->exp_mdt_data;
3999
4000         rc = mdt_connect_internal(exp, mdt, data);
4001         if (rc == 0) {
4002                 OBD_ALLOC_PTR(mcd);
4003                 if (mcd != NULL) {
4004                         memcpy(mcd->mcd_uuid, cluuid, sizeof mcd->mcd_uuid);
4005                         med->med_mcd = mcd;
4006                         rc = mdt_client_new(env, mdt, med);
4007                         if (rc != 0) {
4008                                 OBD_FREE_PTR(mcd);
4009                                 med->med_mcd = NULL;
4010                         }
4011                 } else
4012                         rc = -ENOMEM;
4013         }
4014
4015         if (rc != 0)
4016                 class_disconnect(exp);
4017         else
4018                 class_export_put(exp);
4019
4020         RETURN(rc);
4021 }
4022
4023 static int mdt_obd_reconnect(struct obd_export *exp, struct obd_device *obd,
4024                              struct obd_uuid *cluuid,
4025                              struct obd_connect_data *data)
4026 {
4027         int rc;
4028         ENTRY;
4029
4030         if (exp == NULL || obd == NULL || cluuid == NULL)
4031                 RETURN(-EINVAL);
4032
4033         rc = mdt_connect_internal(exp, mdt_dev(obd->obd_lu_dev), data);
4034
4035         RETURN(rc);
4036 }
4037
4038 static int mdt_obd_disconnect(struct obd_export *exp)
4039 {
4040         struct mdt_device *mdt = mdt_dev(exp->exp_obd->obd_lu_dev);
4041         int rc;
4042         ENTRY;
4043
4044         LASSERT(exp);
4045         class_export_get(exp);
4046
4047         /* Disconnect early so that clients can't keep using export */
4048         rc = class_disconnect(exp);
4049         if (mdt->mdt_namespace != NULL || exp->exp_obd->obd_namespace != NULL)
4050                 ldlm_cancel_locks_for_export(exp);
4051
4052         /* complete all outstanding replies */
4053         spin_lock(&exp->exp_lock);
4054         while (!list_empty(&exp->exp_outstanding_replies)) {
4055                 struct ptlrpc_reply_state *rs =
4056                         list_entry(exp->exp_outstanding_replies.next,
4057                                    struct ptlrpc_reply_state, rs_exp_list);
4058                 struct ptlrpc_service *svc = rs->rs_service;
4059
4060                 spin_lock(&svc->srv_lock);
4061                 list_del_init(&rs->rs_exp_list);
4062                 ptlrpc_schedule_difficult_reply(rs);
4063                 spin_unlock(&svc->srv_lock);
4064         }
4065         spin_unlock(&exp->exp_lock);
4066
4067         class_export_put(exp);
4068         RETURN(rc);
4069 }
4070
4071 /* FIXME: Can we avoid using these two interfaces? */
4072 static int mdt_init_export(struct obd_export *exp)
4073 {
4074         struct mdt_export_data *med = &exp->exp_mdt_data;
4075         ENTRY;
4076
4077         INIT_LIST_HEAD(&med->med_open_head);
4078         spin_lock_init(&med->med_open_lock);
4079         exp->exp_connecting = 1;
4080         RETURN(0);
4081 }
4082
4083 static int mdt_destroy_export(struct obd_export *export)
4084 {
4085         struct mdt_export_data *med;
4086         struct obd_device      *obd = export->exp_obd;
4087         struct mdt_device      *mdt;
4088         struct mdt_thread_info *info;
4089         struct lu_env           env;
4090         struct md_attr         *ma;
4091         int lmm_size;
4092         int cookie_size;
4093         int rc = 0;
4094         ENTRY;
4095
4096         med = &export->exp_mdt_data;
4097         if (med->med_rmtclient)
4098                 mdt_cleanup_idmap(med);
4099
4100         target_destroy_export(export);
4101
4102         if (obd_uuid_equals(&export->exp_client_uuid, &obd->obd_uuid))
4103                 RETURN(0);
4104
4105         mdt = mdt_dev(obd->obd_lu_dev);
4106         LASSERT(mdt != NULL);
4107
4108         rc = lu_env_init(&env, NULL, LCT_MD_THREAD);
4109         if (rc)
4110                 RETURN(rc);
4111
4112         info = lu_context_key_get(&env.le_ctx, &mdt_thread_key);
4113         LASSERT(info != NULL);
4114         memset(info, 0, sizeof *info);
4115         info->mti_env = &env;
4116         info->mti_mdt = mdt;
4117
4118         ma = &info->mti_attr;
4119         lmm_size = ma->ma_lmm_size = mdt->mdt_max_mdsize;
4120         cookie_size = ma->ma_cookie_size = mdt->mdt_max_cookiesize;
4121         OBD_ALLOC(ma->ma_lmm, lmm_size);
4122         OBD_ALLOC(ma->ma_cookie, cookie_size);
4123
4124         if (ma->ma_lmm == NULL || ma->ma_cookie == NULL)
4125                 GOTO(out, rc = -ENOMEM);
4126         ma->ma_need = MA_LOV | MA_COOKIE;
4127         ma->ma_valid = 0;
4128         /* Close any open files (which may also cause orphan unlinking). */
4129         spin_lock(&med->med_open_lock);
4130         while (!list_empty(&med->med_open_head)) {
4131                 struct list_head *tmp = med->med_open_head.next;
4132                 struct mdt_file_data *mfd =
4133                         list_entry(tmp, struct mdt_file_data, mfd_list);
4134
4135                 /* Remove mfd handle so it can't be found again.
4136                  * We are consuming the mfd_list reference here. */
4137                 class_handle_unhash(&mfd->mfd_handle);
4138                 list_del_init(&mfd->mfd_list);
4139                 spin_unlock(&med->med_open_lock);
4140                 mdt_mfd_close(info, mfd);
4141                 /* TODO: if we close the unlinked file,
4142                  * we need to remove it's objects from OST */
4143                 memset(&ma->ma_attr, 0, sizeof(ma->ma_attr));
4144                 spin_lock(&med->med_open_lock);
4145                 ma->ma_lmm_size = lmm_size;
4146                 ma->ma_cookie_size = cookie_size;
4147                 ma->ma_need = MA_LOV | MA_COOKIE;
4148                 ma->ma_valid = 0;
4149         }
4150         spin_unlock(&med->med_open_lock);
4151         info->mti_mdt = NULL;
4152         mdt_client_del(&env, mdt, med);
4153
4154         EXIT;
4155 out:
4156         if (lmm_size) {
4157                 OBD_FREE(ma->ma_lmm, lmm_size);
4158                 ma->ma_lmm = NULL;
4159         }
4160         if (cookie_size) {
4161                 OBD_FREE(ma->ma_cookie, cookie_size);
4162                 ma->ma_cookie = NULL;
4163         }
4164         lu_env_fini(&env);
4165
4166         return rc;
4167 }
4168
4169 static int mdt_upcall(const struct lu_env *env, struct md_device *md,
4170                       enum md_upcall_event ev)
4171 {
4172         struct mdt_device *m = mdt_dev(&md->md_lu_dev);
4173         struct md_device  *next  = m->mdt_child;
4174         struct mdt_thread_info *mti;
4175         int rc = 0;
4176         ENTRY;
4177
4178         switch (ev) {
4179                 case MD_LOV_SYNC:
4180                         rc = next->md_ops->mdo_maxsize_get(env, next,
4181                                         &m->mdt_max_mdsize,
4182                                         &m->mdt_max_cookiesize);
4183                         CDEBUG(D_INFO, "get max mdsize %d max cookiesize %d\n",
4184                                      m->mdt_max_mdsize, m->mdt_max_cookiesize);
4185                         break;
4186                 case MD_NO_TRANS:
4187                         mti = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
4188                         mti->mti_no_need_trans = 1;
4189                         CDEBUG(D_INFO, "disable mdt trans for this thread\n");
4190                         break;
4191                 default:
4192                         CERROR("invalid event\n");
4193                         rc = -EINVAL;
4194                         break;
4195         }
4196         RETURN(rc);
4197 }
4198
4199 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
4200                          void *karg, void *uarg)
4201 {
4202         struct lu_env      env;
4203         struct obd_device *obd= exp->exp_obd;
4204         struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
4205         struct dt_device  *dt = mdt->mdt_bottom;
4206         int rc;
4207
4208         ENTRY;
4209         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
4210         rc = lu_env_init(&env, NULL, LCT_MD_THREAD);
4211         if (rc)
4212                 RETURN(rc);
4213
4214         switch (cmd) {
4215         case OBD_IOC_SYNC:
4216                 rc = dt->dd_ops->dt_sync(&env, dt);
4217                 break;
4218
4219         case OBD_IOC_SET_READONLY:
4220                 rc = dt->dd_ops->dt_sync(&env, dt);
4221                 dt->dd_ops->dt_ro(&env, dt);
4222                 break;
4223
4224         case OBD_IOC_ABORT_RECOVERY:
4225                 CERROR("aborting recovery for device %s\n", obd->obd_name);
4226                 target_stop_recovery_thread(obd);
4227                 break;
4228
4229         default:
4230                 CERROR("not supported cmd = %d for device %s\n",
4231                        cmd, obd->obd_name);
4232                 rc = -EOPNOTSUPP;
4233         }
4234
4235         lu_env_fini(&env);
4236         RETURN(rc);
4237 }
4238
4239 int mdt_postrecov(const struct lu_env *env, struct mdt_device *mdt)
4240 {
4241         struct lu_device *ld = md2lu_dev(mdt->mdt_child);
4242         int rc;
4243         ENTRY;
4244         rc = ld->ld_ops->ldo_recovery_complete(env, ld);
4245         RETURN(rc);
4246 }
4247
4248 int mdt_obd_postrecov(struct obd_device *obd)
4249 {
4250         struct lu_env env;
4251         int rc;
4252
4253         rc = lu_env_init(&env, NULL, LCT_MD_THREAD);
4254         if (rc)
4255                 RETURN(rc);
4256         rc = mdt_postrecov(&env, mdt_dev(obd->obd_lu_dev));
4257         lu_env_fini(&env);
4258         return rc;
4259 }
4260
4261 static struct obd_ops mdt_obd_device_ops = {
4262         .o_owner          = THIS_MODULE,
4263         .o_connect        = mdt_obd_connect,
4264         .o_reconnect      = mdt_obd_reconnect,
4265         .o_disconnect     = mdt_obd_disconnect,
4266         .o_init_export    = mdt_init_export,
4267         .o_destroy_export = mdt_destroy_export,
4268         .o_iocontrol      = mdt_iocontrol,
4269         .o_postrecov      = mdt_obd_postrecov
4270
4271 };
4272
4273 static struct lu_device* mdt_device_fini(const struct lu_env *env,
4274                                          struct lu_device *d)
4275 {
4276         struct mdt_device *m = mdt_dev(d);
4277
4278         mdt_fini(env, m);
4279         RETURN(NULL);
4280 }
4281
4282 static void mdt_device_free(const struct lu_env *env, struct lu_device *d)
4283 {
4284         struct mdt_device *m = mdt_dev(d);
4285
4286         OBD_FREE_PTR(m);
4287 }
4288
4289 static struct lu_device *mdt_device_alloc(const struct lu_env *env,
4290                                           struct lu_device_type *t,
4291                                           struct lustre_cfg *cfg)
4292 {
4293         struct lu_device  *l;
4294         struct mdt_device *m;
4295
4296         OBD_ALLOC_PTR(m);
4297         if (m != NULL) {
4298                 int rc;
4299
4300                 l = &m->mdt_md_dev.md_lu_dev;
4301                 rc = mdt_init0(env, m, t, cfg);
4302                 if (rc != 0) {
4303                         OBD_FREE_PTR(m);
4304                         l = ERR_PTR(rc);
4305                         return l;
4306                 }
4307                 m->mdt_md_dev.md_upcall.mu_upcall = mdt_upcall;
4308         } else
4309                 l = ERR_PTR(-ENOMEM);
4310         return l;
4311 }
4312
4313 /*
4314  * context key constructor/destructor
4315  */
4316 static void *mdt_key_init(const struct lu_context *ctx,
4317                           struct lu_context_key *key)
4318 {
4319         struct mdt_thread_info *info;
4320
4321         /*
4322          * check that no high order allocations are incurred.
4323          */
4324         CLASSERT(CFS_PAGE_SIZE >= sizeof *info);
4325         OBD_ALLOC_PTR(info);
4326         if (info == NULL)
4327                 info = ERR_PTR(-ENOMEM);
4328         return info;
4329 }
4330
4331 static void mdt_key_fini(const struct lu_context *ctx,
4332                          struct lu_context_key *key, void *data)
4333 {
4334         struct mdt_thread_info *info = data;
4335         OBD_FREE_PTR(info);
4336 }
4337
4338 struct lu_context_key mdt_thread_key = {
4339         .lct_tags = LCT_MD_THREAD,
4340         .lct_init = mdt_key_init,
4341         .lct_fini = mdt_key_fini
4342 };
4343
4344 static void *mdt_txn_key_init(const struct lu_context *ctx,
4345                               struct lu_context_key *key)
4346 {
4347         struct mdt_txn_info *txi;
4348
4349         /*
4350          * check that no high order allocations are incurred.
4351          */
4352         CLASSERT(CFS_PAGE_SIZE >= sizeof *txi);
4353         OBD_ALLOC_PTR(txi);
4354         if (txi == NULL)
4355                 txi = ERR_PTR(-ENOMEM);
4356         return txi;
4357 }
4358
4359 static void mdt_txn_key_fini(const struct lu_context *ctx,
4360                              struct lu_context_key *key, void *data)
4361 {
4362         struct mdt_txn_info *txi = data;
4363         OBD_FREE_PTR(txi);
4364 }
4365
4366 struct lu_context_key mdt_txn_key = {
4367         .lct_tags = LCT_TX_HANDLE,
4368         .lct_init = mdt_txn_key_init,
4369         .lct_fini = mdt_txn_key_fini
4370 };
4371
4372 struct md_ucred *mdt_ucred(const struct mdt_thread_info *info)
4373 {
4374         return md_ucred(info->mti_env);
4375 }
4376
4377 static int mdt_type_init(struct lu_device_type *t)
4378 {
4379         int rc;
4380
4381         rc = lu_context_key_register(&mdt_thread_key);
4382         if (rc == 0)
4383                 rc = lu_context_key_register(&mdt_txn_key);
4384         return rc;
4385 }
4386
4387 static void mdt_type_fini(struct lu_device_type *t)
4388 {
4389         lu_context_key_degister(&mdt_thread_key);
4390         lu_context_key_degister(&mdt_txn_key);
4391 }
4392
4393 static struct lu_device_type_operations mdt_device_type_ops = {
4394         .ldto_init = mdt_type_init,
4395         .ldto_fini = mdt_type_fini,
4396
4397         .ldto_device_alloc = mdt_device_alloc,
4398         .ldto_device_free  = mdt_device_free,
4399         .ldto_device_fini  = mdt_device_fini
4400 };
4401
4402 static struct lu_device_type mdt_device_type = {
4403         .ldt_tags     = LU_DEVICE_MD,
4404         .ldt_name     = LUSTRE_MDT_NAME,
4405         .ldt_ops      = &mdt_device_type_ops,
4406         .ldt_ctx_tags = LCT_MD_THREAD
4407 };
4408
4409 static int __init mdt_mod_init(void)
4410 {
4411         struct lprocfs_static_vars lvars;
4412         int rc;
4413
4414         mdt_num_threads = MDT_NUM_THREADS;
4415         lprocfs_init_vars(mdt, &lvars);
4416         rc = class_register_type(&mdt_obd_device_ops, NULL,
4417                                  lvars.module_vars, LUSTRE_MDT_NAME,
4418                                  &mdt_device_type);
4419
4420         return rc;
4421 }
4422
4423 static void __exit mdt_mod_exit(void)
4424 {
4425         class_unregister_type(LUSTRE_MDT_NAME);
4426 }
4427
4428
4429 #define DEF_HNDL(prefix, base, suffix, flags, opc, fn, fmt)             \
4430 [prefix ## _ ## opc - prefix ## _ ## base] = {                          \
4431         .mh_name    = #opc,                                             \
4432         .mh_fail_id = OBD_FAIL_ ## prefix ## _  ## opc ## suffix,       \
4433         .mh_opc     = prefix ## _  ## opc,                              \
4434         .mh_flags   = flags,                                            \
4435         .mh_act     = fn,                                               \
4436         .mh_fmt     = fmt                                               \
4437 }
4438
4439 #define DEF_MDT_HNDL(flags, name, fn, fmt)                                  \
4440         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, fmt)
4441
4442 #define DEF_SEQ_HNDL(flags, name, fn, fmt)                      \
4443         DEF_HNDL(SEQ, QUERY, _NET, flags, name, fn, fmt)
4444
4445 #define DEF_FLD_HNDL(flags, name, fn, fmt)                      \
4446         DEF_HNDL(FLD, QUERY, _NET, flags, name, fn, fmt)
4447 /*
4448  * Request with a format known in advance
4449  */
4450 #define DEF_MDT_HNDL_F(flags, name, fn)                                 \
4451         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, &RQF_MDS_ ## name)
4452
4453 #define DEF_SEQ_HNDL_F(flags, name, fn)                                 \
4454         DEF_HNDL(SEQ, QUERY, _NET, flags, name, fn, &RQF_SEQ_ ## name)
4455
4456 #define DEF_FLD_HNDL_F(flags, name, fn)                                 \
4457         DEF_HNDL(FLD, QUERY, _NET, flags, name, fn, &RQF_FLD_ ## name)
4458 /*
4459  * Request with a format we do not yet know
4460  */
4461 #define DEF_MDT_HNDL_0(flags, name, fn)                                 \
4462         DEF_HNDL(MDS, GETATTR, _NET, flags, name, fn, NULL)
4463
4464 static struct mdt_handler mdt_mds_ops[] = {
4465 DEF_MDT_HNDL_F(0,                         CONNECT,      mdt_connect),
4466 DEF_MDT_HNDL_F(0,                         DISCONNECT,   mdt_disconnect),
4467 DEF_MDT_HNDL_F(0           |HABEO_REFERO, GETSTATUS,    mdt_getstatus),
4468 DEF_MDT_HNDL_F(HABEO_CORPUS             , GETATTR,      mdt_getattr),
4469 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, GETATTR_NAME, mdt_getattr_name),
4470 DEF_MDT_HNDL_F(HABEO_CORPUS|MUTABOR,      SETXATTR,     mdt_setxattr),
4471 DEF_MDT_HNDL_F(HABEO_CORPUS,              GETXATTR,     mdt_getxattr),
4472 DEF_MDT_HNDL_F(0           |HABEO_REFERO, STATFS,       mdt_statfs),
4473 DEF_MDT_HNDL_F(0                        |MUTABOR,
4474                                           REINT,        mdt_reint),
4475 DEF_MDT_HNDL_F(HABEO_CORPUS             , CLOSE,        mdt_close),
4476 DEF_MDT_HNDL_F(HABEO_CORPUS             , DONE_WRITING, mdt_done_writing),
4477 DEF_MDT_HNDL_F(0           |HABEO_REFERO, PIN,          mdt_pin),
4478 DEF_MDT_HNDL_0(0,                         SYNC,         mdt_sync),
4479 DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, IS_SUBDIR,    mdt_is_subdir),
4480 DEF_MDT_HNDL_0(0,                         QUOTACHECK,   mdt_quotacheck_handle),
4481 DEF_MDT_HNDL_0(0,                         QUOTACTL,     mdt_quotactl_handle)
4482 };
4483
4484 #define DEF_OBD_HNDL(flags, name, fn)                   \
4485         DEF_HNDL(OBD, PING, _NET, flags, name, fn, NULL)
4486
4487
4488 static struct mdt_handler mdt_obd_ops[] = {
4489         DEF_OBD_HNDL(0, PING,           mdt_obd_ping),
4490         DEF_OBD_HNDL(0, LOG_CANCEL,     mdt_obd_log_cancel),
4491         DEF_OBD_HNDL(0, QC_CALLBACK,    mdt_obd_qc_callback)
4492 };
4493
4494 #define DEF_DLM_HNDL_0(flags, name, fn)                   \
4495         DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, NULL)
4496 #define DEF_DLM_HNDL_F(flags, name, fn)                   \
4497         DEF_HNDL(LDLM, ENQUEUE, , flags, name, fn, &RQF_LDLM_ ## name)
4498
4499 static struct mdt_handler mdt_dlm_ops[] = {
4500         DEF_DLM_HNDL_F(HABEO_CLAVIS, ENQUEUE,        mdt_enqueue),
4501         DEF_DLM_HNDL_0(HABEO_CLAVIS, CONVERT,        mdt_convert),
4502         DEF_DLM_HNDL_0(0,            BL_CALLBACK,    mdt_bl_callback),
4503         DEF_DLM_HNDL_0(0,            CP_CALLBACK,    mdt_cp_callback)
4504 };
4505
4506 static struct mdt_handler mdt_llog_ops[] = {
4507 };
4508
4509 #define DEF_SEC_CTX_HNDL(name, fn)                      \
4510         DEF_HNDL(SEC_CTX, INIT, _NET, 0, name, fn, NULL)
4511
4512 static struct mdt_handler mdt_sec_ctx_ops[] = {
4513         DEF_SEC_CTX_HNDL(INIT,          mdt_sec_ctx_handle),
4514         DEF_SEC_CTX_HNDL(INIT_CONT,     mdt_sec_ctx_handle),
4515         DEF_SEC_CTX_HNDL(FINI,          mdt_sec_ctx_handle)
4516 };
4517
4518 static struct mdt_opc_slice mdt_regular_handlers[] = {
4519         {
4520                 .mos_opc_start = MDS_GETATTR,
4521                 .mos_opc_end   = MDS_LAST_OPC,
4522                 .mos_hs        = mdt_mds_ops
4523         },
4524         {
4525                 .mos_opc_start = OBD_PING,
4526                 .mos_opc_end   = OBD_LAST_OPC,
4527                 .mos_hs        = mdt_obd_ops
4528         },
4529         {
4530                 .mos_opc_start = LDLM_ENQUEUE,
4531                 .mos_opc_end   = LDLM_LAST_OPC,
4532                 .mos_hs        = mdt_dlm_ops
4533         },
4534         {
4535                 .mos_opc_start = LLOG_ORIGIN_HANDLE_CREATE,
4536                 .mos_opc_end   = LLOG_LAST_OPC,
4537                 .mos_hs        = mdt_llog_ops
4538         },
4539         {
4540                 .mos_opc_start = SEC_CTX_INIT,
4541                 .mos_opc_end   = SEC_LAST_OPC,
4542                 .mos_hs        = mdt_sec_ctx_ops
4543         },
4544         {
4545                 .mos_hs        = NULL
4546         }
4547 };
4548
4549 static struct mdt_handler mdt_readpage_ops[] = {
4550         DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, READPAGE, mdt_readpage),
4551 #ifdef HAVE_SPLIT_SUPPORT
4552         DEF_MDT_HNDL_F(HABEO_CORPUS|HABEO_REFERO, WRITEPAGE, mdt_writepage),
4553 #endif
4554
4555         /*
4556          * XXX: this is ugly and should be fixed one day, see mdc_close() for
4557          * detailed comments. --umka
4558          */
4559         DEF_MDT_HNDL_F(HABEO_CORPUS,              CLOSE,    mdt_close),
4560         DEF_MDT_HNDL_F(HABEO_CORPUS,              DONE_WRITING,    mdt_done_writing),
4561 };
4562
4563 static struct mdt_opc_slice mdt_readpage_handlers[] = {
4564         {
4565                 .mos_opc_start = MDS_GETATTR,
4566                 .mos_opc_end   = MDS_LAST_OPC,
4567                 .mos_hs        = mdt_readpage_ops
4568         },
4569         {
4570                 .mos_hs        = NULL
4571         }
4572 };
4573
4574 static struct mdt_handler mdt_seq_ops[] = {
4575         DEF_SEQ_HNDL_F(0, QUERY, (int (*)(struct mdt_thread_info *))seq_query)
4576 };
4577
4578 static struct mdt_opc_slice mdt_seq_handlers[] = {
4579         {
4580                 .mos_opc_start = SEQ_QUERY,
4581                 .mos_opc_end   = SEQ_LAST_OPC,
4582                 .mos_hs        = mdt_seq_ops
4583         },
4584         {
4585                 .mos_hs        = NULL
4586         }
4587 };
4588
4589 static struct mdt_handler mdt_fld_ops[] = {
4590         DEF_FLD_HNDL_F(0, QUERY, (int (*)(struct mdt_thread_info *))fld_query)
4591 };
4592
4593 static struct mdt_opc_slice mdt_fld_handlers[] = {
4594         {
4595                 .mos_opc_start = FLD_QUERY,
4596                 .mos_opc_end   = FLD_LAST_OPC,
4597                 .mos_hs        = mdt_fld_ops
4598         },
4599         {
4600                 .mos_hs        = NULL
4601         }
4602 };
4603
4604 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
4605 MODULE_DESCRIPTION("Lustre Meta-data Target ("LUSTRE_MDT_NAME")");
4606 MODULE_LICENSE("GPL");
4607
4608 CFS_MODULE_PARM(mdt_num_threads, "ul", ulong, 0444,
4609                 "number of mdt service threads to start");
4610
4611 cfs_module(mdt, "0.2.0", mdt_mod_init, mdt_mod_exit);