Whamcloud - gitweb
8daf7e15609572668ec1f061f05a058f5477fae4
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2010, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdt/mdt_handler.c
37  *
38  * Lustre Metadata Target (mdt) request handler
39  *
40  * Author: Peter Braam <braam@clusterfs.com>
41  * Author: Andreas Dilger <adilger@clusterfs.com>
42  * Author: Phil Schwan <phil@clusterfs.com>
43  * Author: Mike Shaver <shaver@clusterfs.com>
44  * Author: Nikita Danilov <nikita@clusterfs.com>
45  * Author: Huang Hua <huanghua@clusterfs.com>
46  * Author: Yury Umanets <umka@clusterfs.com>
47  */
48
49 #define DEBUG_SUBSYSTEM S_MDS
50
51 #include <linux/module.h>
52 /*
53  * struct OBD_{ALLOC,FREE}*()
54  */
55 #include <obd_support.h>
56 /* struct ptlrpc_request */
57 #include <lustre_net.h>
58 /* struct obd_export */
59 #include <lustre_export.h>
60 /* struct obd_device */
61 #include <obd.h>
62 /* lu2dt_dev() */
63 #include <dt_object.h>
64 #include <lustre_mds.h>
65 #include <lustre_log.h>
66 #include "mdt_internal.h"
67 #include <lustre_acl.h>
68 #include <lustre_param.h>
69 #include <lustre_quota.h>
70 #include <lustre_linkea.h>
71 #include <lustre_lfsck.h>
72
73 mdl_mode_t mdt_mdl_lock_modes[] = {
74         [LCK_MINMODE] = MDL_MINMODE,
75         [LCK_EX]      = MDL_EX,
76         [LCK_PW]      = MDL_PW,
77         [LCK_PR]      = MDL_PR,
78         [LCK_CW]      = MDL_CW,
79         [LCK_CR]      = MDL_CR,
80         [LCK_NL]      = MDL_NL,
81         [LCK_GROUP]   = MDL_GROUP
82 };
83
84 ldlm_mode_t mdt_dlm_lock_modes[] = {
85         [MDL_MINMODE] = LCK_MINMODE,
86         [MDL_EX]      = LCK_EX,
87         [MDL_PW]      = LCK_PW,
88         [MDL_PR]      = LCK_PR,
89         [MDL_CW]      = LCK_CW,
90         [MDL_CR]      = LCK_CR,
91         [MDL_NL]      = LCK_NL,
92         [MDL_GROUP]   = LCK_GROUP
93 };
94
95 static struct mdt_device *mdt_dev(struct lu_device *d);
96 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags);
97
98 static const struct lu_object_operations mdt_obj_ops;
99
100 /* Slab for MDT object allocation */
101 static struct kmem_cache *mdt_object_kmem;
102
103 /* For HSM restore handles */
104 struct kmem_cache *mdt_hsm_cdt_kmem;
105
106 /* For HSM request handles */
107 struct kmem_cache *mdt_hsm_car_kmem;
108
109 static struct lu_kmem_descr mdt_caches[] = {
110         {
111                 .ckd_cache = &mdt_object_kmem,
112                 .ckd_name  = "mdt_obj",
113                 .ckd_size  = sizeof(struct mdt_object)
114         },
115         {
116                 .ckd_cache      = &mdt_hsm_cdt_kmem,
117                 .ckd_name       = "mdt_cdt_restore_handle",
118                 .ckd_size       = sizeof(struct cdt_restore_handle)
119         },
120         {
121                 .ckd_cache      = &mdt_hsm_car_kmem,
122                 .ckd_name       = "mdt_cdt_agent_req",
123                 .ckd_size       = sizeof(struct cdt_agent_req)
124         },
125         {
126                 .ckd_cache = NULL
127         }
128 };
129
130 int mdt_get_disposition(struct ldlm_reply *rep, int flag)
131 {
132         if (!rep)
133                 return 0;
134         return (rep->lock_policy_res1 & flag);
135 }
136
137 void mdt_clear_disposition(struct mdt_thread_info *info,
138                            struct ldlm_reply *rep, int flag)
139 {
140         if (info) {
141                 info->mti_opdata &= ~flag;
142                 tgt_opdata_clear(info->mti_env, flag);
143         }
144         if (rep)
145                 rep->lock_policy_res1 &= ~flag;
146 }
147
148 void mdt_set_disposition(struct mdt_thread_info *info,
149                          struct ldlm_reply *rep, int flag)
150 {
151         if (info) {
152                 info->mti_opdata |= flag;
153                 tgt_opdata_set(info->mti_env, flag);
154         }
155         if (rep)
156                 rep->lock_policy_res1 |= flag;
157 }
158
159 void mdt_lock_reg_init(struct mdt_lock_handle *lh, ldlm_mode_t lm)
160 {
161         lh->mlh_pdo_hash = 0;
162         lh->mlh_reg_mode = lm;
163         lh->mlh_rreg_mode = lm;
164         lh->mlh_type = MDT_REG_LOCK;
165 }
166
167 void mdt_lock_pdo_init(struct mdt_lock_handle *lh, ldlm_mode_t lm,
168                        const char *name, int namelen)
169 {
170         lh->mlh_reg_mode = lm;
171         lh->mlh_rreg_mode = lm;
172         lh->mlh_type = MDT_PDO_LOCK;
173
174         if (name != NULL && (name[0] != '\0')) {
175                 LASSERT(namelen > 0);
176                 lh->mlh_pdo_hash = full_name_hash(name, namelen);
177                 /* XXX Workaround for LU-2856
178                  * Zero is a valid return value of full_name_hash, but several
179                  * users of mlh_pdo_hash assume a non-zero hash value. We
180                  * therefore map zero onto an arbitrary, but consistent
181                  * value (1) to avoid problems further down the road. */
182                 if (unlikely(!lh->mlh_pdo_hash))
183                         lh->mlh_pdo_hash = 1;
184         } else {
185                 LASSERT(namelen == 0);
186                 lh->mlh_pdo_hash = 0ull;
187         }
188 }
189
190 static void mdt_lock_pdo_mode(struct mdt_thread_info *info, struct mdt_object *o,
191                               struct mdt_lock_handle *lh)
192 {
193         mdl_mode_t mode;
194         ENTRY;
195
196         /*
197          * Any dir access needs couple of locks:
198          *
199          * 1) on part of dir we gonna take lookup/modify;
200          *
201          * 2) on whole dir to protect it from concurrent splitting and/or to
202          * flush client's cache for readdir().
203          *
204          * so, for a given mode and object this routine decides what lock mode
205          * to use for lock #2:
206          *
207          * 1) if caller's gonna lookup in dir then we need to protect dir from
208          * being splitted only - LCK_CR
209          *
210          * 2) if caller's gonna modify dir then we need to protect dir from
211          * being splitted and to flush cache - LCK_CW
212          *
213          * 3) if caller's gonna modify dir and that dir seems ready for
214          * splitting then we need to protect it from any type of access
215          * (lookup/modify/split) - LCK_EX --bzzz
216          */
217
218         LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
219         LASSERT(lh->mlh_pdo_mode == LCK_MINMODE);
220
221         /*
222          * Ask underlaying level its opinion about preferable PDO lock mode
223          * having access type passed as regular lock mode:
224          *
225          * - MDL_MINMODE means that lower layer does not want to specify lock
226          * mode;
227          *
228          * - MDL_NL means that no PDO lock should be taken. This is used in some
229          * cases. Say, for non-splittable directories no need to use PDO locks
230          * at all.
231          */
232         mode = mdo_lock_mode(info->mti_env, mdt_object_child(o),
233                              mdt_dlm_mode2mdl_mode(lh->mlh_reg_mode));
234
235         if (mode != MDL_MINMODE) {
236                 lh->mlh_pdo_mode = mdt_mdl_mode2dlm_mode(mode);
237         } else {
238                 /*
239                  * Lower layer does not want to specify locking mode. We do it
240                  * our selves. No special protection is needed, just flush
241                  * client's cache on modification and allow concurrent
242                  * mondification.
243                  */
244                 switch (lh->mlh_reg_mode) {
245                 case LCK_EX:
246                         lh->mlh_pdo_mode = LCK_EX;
247                         break;
248                 case LCK_PR:
249                         lh->mlh_pdo_mode = LCK_CR;
250                         break;
251                 case LCK_PW:
252                         lh->mlh_pdo_mode = LCK_CW;
253                         break;
254                 default:
255                         CERROR("Not expected lock type (0x%x)\n",
256                                (int)lh->mlh_reg_mode);
257                         LBUG();
258                 }
259         }
260
261         LASSERT(lh->mlh_pdo_mode != LCK_MINMODE);
262         EXIT;
263 }
264
265 int mdt_getstatus(struct tgt_session_info *tsi)
266 {
267         struct mdt_thread_info  *info = tsi2mdt_info(tsi);
268         struct mdt_device       *mdt = info->mti_mdt;
269         struct mdt_body         *repbody;
270         int                      rc;
271
272         ENTRY;
273
274         rc = mdt_check_ucred(info);
275         if (rc)
276                 GOTO(out, rc = err_serious(rc));
277
278         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETSTATUS_PACK))
279                 GOTO(out, rc = err_serious(-ENOMEM));
280
281         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
282         repbody->fid1 = mdt->mdt_md_root_fid;
283         repbody->valid |= OBD_MD_FLID;
284
285         if (tsi->tsi_tgt->lut_mds_capa &&
286             exp_connect_flags(info->mti_exp) & OBD_CONNECT_MDS_CAPA) {
287                 struct mdt_object       *root;
288                 struct lustre_capa      *capa;
289
290                 root = mdt_object_find(info->mti_env, mdt, &repbody->fid1);
291                 if (IS_ERR(root))
292                         GOTO(out, rc = PTR_ERR(root));
293
294                 capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA1);
295                 LASSERT(capa);
296                 capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
297                 rc = mo_capa_get(info->mti_env, mdt_object_child(root), capa,
298                                  0);
299                 mdt_object_put(info->mti_env, root);
300                 if (rc == 0)
301                         repbody->valid |= OBD_MD_FLMDSCAPA;
302         }
303         EXIT;
304 out:
305         mdt_thread_info_fini(info);
306         return rc;
307 }
308
309 int mdt_statfs(struct tgt_session_info *tsi)
310 {
311         struct ptlrpc_request           *req = tgt_ses_req(tsi);
312         struct mdt_thread_info          *info = tsi2mdt_info(tsi);
313         struct md_device                *next = info->mti_mdt->mdt_child;
314         struct ptlrpc_service_part      *svcpt;
315         struct obd_statfs               *osfs;
316         int                             rc;
317
318         ENTRY;
319
320         svcpt = req->rq_rqbd->rqbd_svcpt;
321
322         /* This will trigger a watchdog timeout */
323         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
324                          (MDT_SERVICE_WATCHDOG_FACTOR *
325                           at_get(&svcpt->scp_at_estimate)) + 1);
326
327         rc = mdt_check_ucred(info);
328         if (rc)
329                 GOTO(out, rc = err_serious(rc));
330
331         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK))
332                 GOTO(out, rc = err_serious(-ENOMEM));
333
334         osfs = req_capsule_server_get(info->mti_pill, &RMF_OBD_STATFS);
335         if (!osfs)
336                 GOTO(out, rc = -EPROTO);
337
338         /** statfs information are cached in the mdt_device */
339         if (cfs_time_before_64(info->mti_mdt->mdt_osfs_age,
340                                cfs_time_shift_64(-OBD_STATFS_CACHE_SECONDS))) {
341                 /** statfs data is too old, get up-to-date one */
342                 rc = next->md_ops->mdo_statfs(info->mti_env, next, osfs);
343                 if (rc)
344                         GOTO(out, rc);
345                 spin_lock(&info->mti_mdt->mdt_osfs_lock);
346                 info->mti_mdt->mdt_osfs = *osfs;
347                 info->mti_mdt->mdt_osfs_age = cfs_time_current_64();
348                 spin_unlock(&info->mti_mdt->mdt_osfs_lock);
349         } else {
350                 /** use cached statfs data */
351                 spin_lock(&info->mti_mdt->mdt_osfs_lock);
352                 *osfs = info->mti_mdt->mdt_osfs;
353                 spin_unlock(&info->mti_mdt->mdt_osfs_lock);
354         }
355
356         if (rc == 0)
357                 mdt_counter_incr(req, LPROC_MDT_STATFS);
358 out:
359         mdt_thread_info_fini(info);
360         RETURN(rc);
361 }
362
363 /**
364  * Pack SOM attributes into the reply.
365  * Call under a DLM UPDATE lock.
366  */
367 static void mdt_pack_size2body(struct mdt_thread_info *info,
368                                struct mdt_object *mo)
369 {
370         struct mdt_body *b;
371         struct md_attr *ma = &info->mti_attr;
372
373         LASSERT(ma->ma_attr.la_valid & LA_MODE);
374         b = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
375
376         /* Check if Size-on-MDS is supported, if this is a regular file,
377          * if SOM is enabled on the object and if SOM cache exists and valid.
378          * Otherwise do not pack Size-on-MDS attributes to the reply. */
379         if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM) ||
380             !S_ISREG(ma->ma_attr.la_mode) ||
381             !mdt_object_is_som_enabled(mo) ||
382             !(ma->ma_valid & MA_SOM))
383                 return;
384
385         b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
386         b->size = ma->ma_som->msd_size;
387         b->blocks = ma->ma_som->msd_blocks;
388 }
389
390 void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
391                         const struct lu_attr *attr, const struct lu_fid *fid)
392 {
393         struct md_attr *ma = &info->mti_attr;
394
395         LASSERT(ma->ma_valid & MA_INODE);
396
397         b->atime      = attr->la_atime;
398         b->mtime      = attr->la_mtime;
399         b->ctime      = attr->la_ctime;
400         b->mode       = attr->la_mode;
401         b->size       = attr->la_size;
402         b->blocks     = attr->la_blocks;
403         b->uid        = attr->la_uid;
404         b->gid        = attr->la_gid;
405         b->flags      = attr->la_flags;
406         b->nlink      = attr->la_nlink;
407         b->rdev       = attr->la_rdev;
408
409         /*XXX should pack the reply body according to lu_valid*/
410         b->valid |= OBD_MD_FLCTIME | OBD_MD_FLUID   |
411                     OBD_MD_FLGID   | OBD_MD_FLTYPE  |
412                     OBD_MD_FLMODE  | OBD_MD_FLNLINK | OBD_MD_FLFLAGS |
413                     OBD_MD_FLATIME | OBD_MD_FLMTIME ;
414
415         if (!S_ISREG(attr->la_mode)) {
416                 b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLRDEV;
417         } else if (ma->ma_need & MA_LOV && !(ma->ma_valid & MA_LOV)) {
418                 /* means no objects are allocated on osts. */
419                 LASSERT(!(ma->ma_valid & MA_LOV));
420                 /* just ignore blocks occupied by extend attributes on MDS */
421                 b->blocks = 0;
422                 /* if no object is allocated on osts, the size on mds is valid. b=22272 */
423                 b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
424         } else if ((ma->ma_valid & MA_LOV) && ma->ma_lmm != NULL &&
425                    ma->ma_lmm->lmm_pattern & LOV_PATTERN_F_RELEASED) {
426                 /* A released file stores its size on MDS. */
427                 /* But return 1 block for released file, unless tools like tar
428                  * will consider it fully sparse. (LU-3864)
429                  */
430                 if (unlikely(b->size == 0))
431                         b->blocks = 0;
432                 else
433                         b->blocks = 1;
434                 b->valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
435         }
436
437         if (fid) {
438                 b->fid1 = *fid;
439                 b->valid |= OBD_MD_FLID;
440                 CDEBUG(D_INODE, DFID": nlink=%d, mode=%o, size="LPU64"\n",
441                                 PFID(fid), b->nlink, b->mode, b->size);
442         }
443
444         if (info)
445                 mdt_body_reverse_idmap(info, b);
446
447         if (fid != NULL && (b->valid & OBD_MD_FLSIZE))
448                 CDEBUG(D_VFSTRACE, DFID": returning size %llu\n",
449                        PFID(fid), (unsigned long long)b->size);
450 }
451
452 static inline int mdt_body_has_lov(const struct lu_attr *la,
453                                    const struct mdt_body *body)
454 {
455         return ((S_ISREG(la->la_mode) && (body->valid & OBD_MD_FLEASIZE)) ||
456                 (S_ISDIR(la->la_mode) && (body->valid & OBD_MD_FLDIREA )) );
457 }
458
459 void mdt_client_compatibility(struct mdt_thread_info *info)
460 {
461         struct mdt_body       *body;
462         struct ptlrpc_request *req = mdt_info_req(info);
463         struct obd_export     *exp = req->rq_export;
464         struct md_attr        *ma = &info->mti_attr;
465         struct lu_attr        *la = &ma->ma_attr;
466         ENTRY;
467
468         if (exp_connect_layout(exp))
469                 /* the client can deal with 16-bit lmm_stripe_count */
470                 RETURN_EXIT;
471
472         body = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
473
474         if (!mdt_body_has_lov(la, body))
475                 RETURN_EXIT;
476
477         /* now we have a reply with a lov for a client not compatible with the
478          * layout lock so we have to clean the layout generation number */
479         if (S_ISREG(la->la_mode))
480                 ma->ma_lmm->lmm_layout_gen = 0;
481         EXIT;
482 }
483
484 static int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
485                              char *name)
486 {
487         const struct lu_env *env = info->mti_env;
488         int rc;
489         ENTRY;
490
491         LASSERT(info->mti_big_lmm_used == 0);
492         rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL, name);
493         if (rc < 0)
494                 RETURN(rc);
495
496         /* big_lmm may need to be grown */
497         if (info->mti_big_lmmsize < rc) {
498                 int size = size_roundup_power2(rc);
499
500                 if (info->mti_big_lmmsize > 0) {
501                         /* free old buffer */
502                         LASSERT(info->mti_big_lmm);
503                         OBD_FREE_LARGE(info->mti_big_lmm,
504                                        info->mti_big_lmmsize);
505                         info->mti_big_lmm = NULL;
506                         info->mti_big_lmmsize = 0;
507                 }
508
509                 OBD_ALLOC_LARGE(info->mti_big_lmm, size);
510                 if (info->mti_big_lmm == NULL)
511                         RETURN(-ENOMEM);
512                 info->mti_big_lmmsize = size;
513         }
514         LASSERT(info->mti_big_lmmsize >= rc);
515
516         info->mti_buf.lb_buf = info->mti_big_lmm;
517         info->mti_buf.lb_len = info->mti_big_lmmsize;
518         rc = mo_xattr_get(env, mdt_object_child(o), &info->mti_buf, name);
519
520         RETURN(rc);
521 }
522
523 int mdt_attr_get_lov(struct mdt_thread_info *info,
524                      struct mdt_object *o, struct md_attr *ma)
525 {
526         struct md_object *next = mdt_object_child(o);
527         struct lu_buf    *buf = &info->mti_buf;
528         int rc;
529
530         buf->lb_buf = ma->ma_lmm;
531         buf->lb_len = ma->ma_lmm_size;
532         rc = mo_xattr_get(info->mti_env, next, buf, XATTR_NAME_LOV);
533         if (rc > 0) {
534                 ma->ma_lmm_size = rc;
535                 ma->ma_valid |= MA_LOV;
536                 rc = 0;
537         } else if (rc == -ENODATA) {
538                 /* no LOV EA */
539                 rc = 0;
540         } else if (rc == -ERANGE) {
541                 rc = mdt_big_xattr_get(info, o, XATTR_NAME_LOV);
542                 if (rc > 0) {
543                         info->mti_big_lmm_used = 1;
544                         ma->ma_valid |= MA_LOV;
545                         ma->ma_lmm = info->mti_big_lmm;
546                         ma->ma_lmm_size = rc;
547                         /* update mdt_max_mdsize so all clients
548                          * will be aware about that */
549                         if (info->mti_mdt->mdt_max_mdsize < rc)
550                                 info->mti_mdt->mdt_max_mdsize = rc;
551                         rc = 0;
552                 }
553         }
554
555         return rc;
556 }
557
558 int mdt_attr_get_pfid(struct mdt_thread_info *info,
559                       struct mdt_object *o, struct lu_fid *pfid)
560 {
561         struct lu_buf           *buf = &info->mti_buf;
562         struct link_ea_header   *leh;
563         struct link_ea_entry    *lee;
564         int                      rc;
565         ENTRY;
566
567         buf->lb_buf = info->mti_big_lmm;
568         buf->lb_len = info->mti_big_lmmsize;
569         rc = mo_xattr_get(info->mti_env, mdt_object_child(o),
570                           buf, XATTR_NAME_LINK);
571         /* ignore errors, MA_PFID won't be set and it is
572          * up to the caller to treat this as an error */
573         if (rc == -ERANGE || buf->lb_len == 0) {
574                 rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
575                 buf->lb_buf = info->mti_big_lmm;
576                 buf->lb_len = info->mti_big_lmmsize;
577         }
578
579         if (rc < 0)
580                 RETURN(rc);
581         if (rc < sizeof(*leh)) {
582                 CERROR("short LinkEA on "DFID": rc = %d\n",
583                        PFID(mdt_object_fid(o)), rc);
584                 RETURN(-ENODATA);
585         }
586
587         leh = (struct link_ea_header *) buf->lb_buf;
588         lee = (struct link_ea_entry *)(leh + 1);
589         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
590                 leh->leh_magic = LINK_EA_MAGIC;
591                 leh->leh_reccount = __swab32(leh->leh_reccount);
592                 leh->leh_len = __swab64(leh->leh_len);
593         }
594         if (leh->leh_magic != LINK_EA_MAGIC)
595                 RETURN(-EINVAL);
596         if (leh->leh_reccount == 0)
597                 RETURN(-ENODATA);
598
599         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
600         fid_be_to_cpu(pfid, pfid);
601
602         RETURN(0);
603 }
604
605 int mdt_attr_get_complex(struct mdt_thread_info *info,
606                          struct mdt_object *o, struct md_attr *ma)
607 {
608         const struct lu_env *env = info->mti_env;
609         struct md_object    *next = mdt_object_child(o);
610         struct lu_buf       *buf = &info->mti_buf;
611         u32                  mode = lu_object_attr(&next->mo_lu);
612         int                  need = ma->ma_need;
613         int                  rc = 0, rc2;
614         ENTRY;
615
616         ma->ma_valid = 0;
617
618         if (need & MA_INODE) {
619                 ma->ma_need = MA_INODE;
620                 rc = mo_attr_get(env, next, ma);
621                 if (rc)
622                         GOTO(out, rc);
623                 ma->ma_valid |= MA_INODE;
624         }
625
626         if (need & MA_PFID) {
627                 rc = mdt_attr_get_pfid(info, o, &ma->ma_pfid);
628                 if (rc == 0)
629                         ma->ma_valid |= MA_PFID;
630                 /* ignore this error, parent fid is not mandatory */
631                 rc = 0;
632         }
633
634         if (need & MA_LOV && (S_ISREG(mode) || S_ISDIR(mode))) {
635                 rc = mdt_attr_get_lov(info, o, ma);
636                 if (rc)
637                         GOTO(out, rc);
638         }
639
640         if (need & MA_LMV && S_ISDIR(mode)) {
641                 buf->lb_buf = ma->ma_lmv;
642                 buf->lb_len = ma->ma_lmv_size;
643                 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_LMV);
644                 if (rc2 > 0) {
645                         ma->ma_lmv_size = rc2;
646                         ma->ma_valid |= MA_LMV;
647                 } else if (rc2 == -ENODATA) {
648                         /* no LMV EA */
649                         ma->ma_lmv_size = 0;
650                 } else
651                         GOTO(out, rc = rc2);
652         }
653
654         if (need & MA_SOM && S_ISREG(mode)) {
655                 buf->lb_buf = info->mti_xattr_buf;
656                 buf->lb_len = sizeof(info->mti_xattr_buf);
657                 CLASSERT(sizeof(struct som_attrs) <=
658                          sizeof(info->mti_xattr_buf));
659                 rc2 = mo_xattr_get(info->mti_env, next, buf, XATTR_NAME_SOM);
660                 rc2 = lustre_buf2som(info->mti_xattr_buf, rc2, ma->ma_som);
661                 if (rc2 == 0)
662                         ma->ma_valid |= MA_SOM;
663                 else if (rc2 < 0 && rc2 != -ENODATA)
664                         GOTO(out, rc = rc2);
665         }
666
667         if (need & MA_HSM && S_ISREG(mode)) {
668                 buf->lb_buf = info->mti_xattr_buf;
669                 buf->lb_len = sizeof(info->mti_xattr_buf);
670                 CLASSERT(sizeof(struct hsm_attrs) <=
671                          sizeof(info->mti_xattr_buf));
672                 rc2 = mo_xattr_get(info->mti_env, next, buf, XATTR_NAME_HSM);
673                 rc2 = lustre_buf2hsm(info->mti_xattr_buf, rc2, &ma->ma_hsm);
674                 if (rc2 == 0)
675                         ma->ma_valid |= MA_HSM;
676                 else if (rc2 < 0 && rc2 != -ENODATA)
677                         GOTO(out, rc = rc2);
678         }
679
680 #ifdef CONFIG_FS_POSIX_ACL
681         if (need & MA_ACL_DEF && S_ISDIR(mode)) {
682                 buf->lb_buf = ma->ma_acl;
683                 buf->lb_len = ma->ma_acl_size;
684                 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT);
685                 if (rc2 > 0) {
686                         ma->ma_acl_size = rc2;
687                         ma->ma_valid |= MA_ACL_DEF;
688                 } else if (rc2 == -ENODATA) {
689                         /* no ACLs */
690                         ma->ma_acl_size = 0;
691                 } else
692                         GOTO(out, rc = rc2);
693         }
694 #endif
695 out:
696         ma->ma_need = need;
697         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = "LPX64" ma_lmm=%p\n",
698                rc, ma->ma_valid, ma->ma_lmm);
699         RETURN(rc);
700 }
701
702 static int mdt_getattr_internal(struct mdt_thread_info *info,
703                                 struct mdt_object *o, int ma_need)
704 {
705         struct md_object        *next = mdt_object_child(o);
706         const struct mdt_body   *reqbody = info->mti_body;
707         struct ptlrpc_request   *req = mdt_info_req(info);
708         struct md_attr          *ma = &info->mti_attr;
709         struct lu_attr          *la = &ma->ma_attr;
710         struct req_capsule      *pill = info->mti_pill;
711         const struct lu_env     *env = info->mti_env;
712         struct mdt_body         *repbody;
713         struct lu_buf           *buffer = &info->mti_buf;
714         int                     rc;
715         int                     is_root;
716         ENTRY;
717
718         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
719                 RETURN(err_serious(-ENOMEM));
720
721         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
722
723         ma->ma_valid = 0;
724
725         if (mdt_object_remote(o)) {
726                 /* This object is located on remote node.*/
727                 /* Return -EIO for old client */
728                 if (!mdt_is_dne_client(req->rq_export))
729                         GOTO(out, rc = -EIO);
730
731                 repbody->fid1 = *mdt_object_fid(o);
732                 repbody->valid = OBD_MD_FLID | OBD_MD_MDS;
733                 GOTO(out, rc = 0);
734         }
735
736         buffer->lb_len = reqbody->eadatasize;
737         if (buffer->lb_len > 0)
738                 buffer->lb_buf = req_capsule_server_get(pill, &RMF_MDT_MD);
739         else
740                 buffer->lb_buf = NULL;
741
742         /* If it is dir object and client require MEA, then we got MEA */
743         if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
744             reqbody->valid & OBD_MD_MEA) {
745                 /* Assumption: MDT_MD size is enough for lmv size. */
746                 ma->ma_lmv = buffer->lb_buf;
747                 ma->ma_lmv_size = buffer->lb_len;
748                 ma->ma_need = MA_LMV | MA_INODE;
749         } else {
750                 ma->ma_lmm = buffer->lb_buf;
751                 ma->ma_lmm_size = buffer->lb_len;
752                 ma->ma_need = MA_LOV | MA_INODE | MA_HSM;
753         }
754
755         if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
756             reqbody->valid & OBD_MD_FLDIREA  &&
757             lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) {
758                 /* get default stripe info for this dir. */
759                 ma->ma_need |= MA_LOV_DEF;
760         }
761         ma->ma_need |= ma_need;
762         if (ma->ma_need & MA_SOM)
763                 ma->ma_som = &info->mti_u.som.data;
764
765         rc = mdt_attr_get_complex(info, o, ma);
766         if (unlikely(rc)) {
767                 CERROR("%s: getattr error for "DFID": rc = %d\n",
768                        mdt_obd_name(info->mti_mdt),
769                        PFID(mdt_object_fid(o)), rc);
770                 RETURN(rc);
771         }
772
773         /* if file is released, check if a restore is running */
774         if ((ma->ma_valid & MA_HSM) && (ma->ma_hsm.mh_flags & HS_RELEASED) &&
775             mdt_hsm_restore_is_running(info, mdt_object_fid(o))) {
776                 repbody->t_state = MS_RESTORE;
777                 repbody->valid |= OBD_MD_TSTATE;
778         }
779
780         is_root = lu_fid_eq(mdt_object_fid(o), &info->mti_mdt->mdt_md_root_fid);
781
782         /* the Lustre protocol supposes to return default striping
783          * on the user-visible root if explicitly requested */
784         if ((ma->ma_valid & MA_LOV) == 0 && S_ISDIR(la->la_mode) &&
785             (ma->ma_need & MA_LOV_DEF && is_root) && ma->ma_need & MA_LOV) {
786                 struct lu_fid      rootfid;
787                 struct mdt_object *root;
788                 struct mdt_device *mdt = info->mti_mdt;
789
790                 rc = dt_root_get(env, mdt->mdt_bottom, &rootfid);
791                 if (rc)
792                         RETURN(rc);
793                 root = mdt_object_find(env, mdt, &rootfid);
794                 if (IS_ERR(root))
795                         RETURN(PTR_ERR(root));
796                 rc = mdt_attr_get_lov(info, root, ma);
797                 mdt_object_put(info->mti_env, root);
798                 if (unlikely(rc)) {
799                         CERROR("%s: getattr error for "DFID": rc = %d\n",
800                                mdt_obd_name(info->mti_mdt),
801                                PFID(mdt_object_fid(o)), rc);
802                         RETURN(rc);
803                 }
804         }
805
806         if (likely(ma->ma_valid & MA_INODE))
807                 mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
808         else
809                 RETURN(-EFAULT);
810
811         if (mdt_body_has_lov(la, reqbody)) {
812                 if (ma->ma_valid & MA_LOV) {
813                         LASSERT(ma->ma_lmm_size);
814                         mdt_dump_lmm(D_INFO, ma->ma_lmm);
815                         repbody->eadatasize = ma->ma_lmm_size;
816                         if (S_ISDIR(la->la_mode))
817                                 repbody->valid |= OBD_MD_FLDIREA;
818                         else
819                                 repbody->valid |= OBD_MD_FLEASIZE;
820                 }
821                 if (ma->ma_valid & MA_LMV) {
822                         LASSERT(S_ISDIR(la->la_mode));
823                         repbody->eadatasize = ma->ma_lmv_size;
824                         repbody->valid |= (OBD_MD_FLDIREA|OBD_MD_MEA);
825                 }
826         } else if (S_ISLNK(la->la_mode) &&
827                    reqbody->valid & OBD_MD_LINKNAME) {
828                 buffer->lb_buf = ma->ma_lmm;
829                 /* eadatasize from client includes NULL-terminator, so
830                  * there is no need to read it */
831                 buffer->lb_len = reqbody->eadatasize - 1;
832                 rc = mo_readlink(env, next, buffer);
833                 if (unlikely(rc <= 0)) {
834                         CERROR("%s: readlink failed for "DFID": rc = %d\n",
835                                mdt_obd_name(info->mti_mdt),
836                                PFID(mdt_object_fid(o)), rc);
837                         rc = -EFAULT;
838                 } else {
839                         int print_limit = min_t(int, PAGE_CACHE_SIZE - 128, rc);
840
841                         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READLINK_EPROTO))
842                                 rc -= 2;
843                         repbody->valid |= OBD_MD_LINKNAME;
844                         /* we need to report back size with NULL-terminator
845                          * because client expects that */
846                         repbody->eadatasize = rc + 1;
847                         if (repbody->eadatasize != reqbody->eadatasize)
848                                 CDEBUG(D_INODE, "%s: Read shorter symlink %d "
849                                        "on "DFID ", expected %d\n",
850                                        mdt_obd_name(info->mti_mdt),
851                                        rc, PFID(mdt_object_fid(o)),
852                                        reqbody->eadatasize - 1);
853                         /* NULL terminate */
854                         ((char *)ma->ma_lmm)[rc] = 0;
855
856                         /* If the total CDEBUG() size is larger than a page, it
857                          * will print a warning to the console, avoid this by
858                          * printing just the last part of the symlink. */
859                         CDEBUG(D_INODE, "symlink dest %s%.*s, len = %d\n",
860                                print_limit < rc ? "..." : "", print_limit,
861                                (char *)ma->ma_lmm + rc - print_limit, rc);
862                         rc = 0;
863                 }
864         }
865
866         if (reqbody->valid & OBD_MD_FLMODEASIZE) {
867                 repbody->max_cookiesize = 0;
868                 repbody->max_mdsize = info->mti_mdt->mdt_max_mdsize;
869                 repbody->valid |= OBD_MD_FLMODEASIZE;
870                 CDEBUG(D_INODE, "I am going to change the MAX_MD_SIZE & "
871                        "MAX_COOKIE to : %d:%d\n", repbody->max_mdsize,
872                        repbody->max_cookiesize);
873         }
874
875         if (exp_connect_rmtclient(info->mti_exp) &&
876             reqbody->valid & OBD_MD_FLRMTPERM) {
877                 void *buf = req_capsule_server_get(pill, &RMF_ACL);
878
879                 /* mdt_getattr_lock only */
880                 rc = mdt_pack_remote_perm(info, o, buf);
881                 if (rc) {
882                         repbody->valid &= ~OBD_MD_FLRMTPERM;
883                         repbody->aclsize = 0;
884                         RETURN(rc);
885                 } else {
886                         repbody->valid |= OBD_MD_FLRMTPERM;
887                         repbody->aclsize = sizeof(struct mdt_remote_perm);
888                 }
889         }
890 #ifdef CONFIG_FS_POSIX_ACL
891         else if ((exp_connect_flags(req->rq_export) & OBD_CONNECT_ACL) &&
892                  (reqbody->valid & OBD_MD_FLACL)) {
893                 buffer->lb_buf = req_capsule_server_get(pill, &RMF_ACL);
894                 buffer->lb_len = req_capsule_get_size(pill,
895                                                       &RMF_ACL, RCL_SERVER);
896                 if (buffer->lb_len > 0) {
897                         rc = mo_xattr_get(env, next, buffer,
898                                           XATTR_NAME_ACL_ACCESS);
899                         if (rc < 0) {
900                                 if (rc == -ENODATA) {
901                                         repbody->aclsize = 0;
902                                         repbody->valid |= OBD_MD_FLACL;
903                                         rc = 0;
904                                 } else if (rc == -EOPNOTSUPP) {
905                                         rc = 0;
906                                 } else {
907                                         CERROR("%s: unable to read "DFID
908                                                " ACL: rc = %d\n",
909                                                mdt_obd_name(info->mti_mdt),
910                                                PFID(mdt_object_fid(o)), rc);
911                                 }
912                         } else {
913                                 repbody->aclsize = rc;
914                                 repbody->valid |= OBD_MD_FLACL;
915                                 rc = 0;
916                         }
917                 }
918         }
919 #endif
920
921         if (reqbody->valid & OBD_MD_FLMDSCAPA &&
922             info->mti_mdt->mdt_lut.lut_mds_capa &&
923             exp_connect_flags(info->mti_exp) & OBD_CONNECT_MDS_CAPA) {
924                 struct lustre_capa *capa;
925
926                 capa = req_capsule_server_get(pill, &RMF_CAPA1);
927                 LASSERT(capa);
928                 capa->lc_opc = CAPA_OPC_MDS_DEFAULT;
929                 rc = mo_capa_get(env, next, capa, 0);
930                 if (rc)
931                         RETURN(rc);
932                 repbody->valid |= OBD_MD_FLMDSCAPA;
933         }
934
935 out:
936         if (rc == 0)
937                 mdt_counter_incr(req, LPROC_MDT_GETATTR);
938
939         RETURN(rc);
940 }
941
942 static int mdt_renew_capa(struct mdt_thread_info *info)
943 {
944         struct mdt_object  *obj = info->mti_object;
945         struct mdt_body    *body;
946         struct lustre_capa *capa, *c;
947         int rc;
948         ENTRY;
949
950         /* if object doesn't exist, or server has disabled capability,
951          * return directly, client will find body->valid OBD_MD_FLOSSCAPA
952          * flag not set.
953          */
954         if (!obj || !info->mti_mdt->mdt_lut.lut_oss_capa ||
955             !(exp_connect_flags(info->mti_exp) & OBD_CONNECT_OSS_CAPA))
956                 RETURN(0);
957
958         body = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
959         LASSERT(body != NULL);
960
961         c = req_capsule_client_get(info->mti_pill, &RMF_CAPA1);
962         LASSERT(c);
963
964         capa = req_capsule_server_get(info->mti_pill, &RMF_CAPA2);
965         LASSERT(capa);
966
967         *capa = *c;
968         rc = mo_capa_get(info->mti_env, mdt_object_child(obj), capa, 1);
969         if (rc == 0)
970                 body->valid |= OBD_MD_FLOSSCAPA;
971         RETURN(rc);
972 }
973
974 int mdt_getattr(struct tgt_session_info *tsi)
975 {
976         struct mdt_thread_info  *info = tsi2mdt_info(tsi);
977         struct mdt_object       *obj = info->mti_object;
978         struct req_capsule      *pill = info->mti_pill;
979         struct mdt_body         *reqbody;
980         struct mdt_body         *repbody;
981         mode_t                   mode;
982         int rc, rc2;
983         ENTRY;
984
985         reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
986         LASSERT(reqbody);
987
988         if (reqbody->valid & OBD_MD_FLOSSCAPA) {
989                 rc = req_capsule_server_pack(pill);
990                 if (unlikely(rc))
991                         RETURN(err_serious(rc));
992                 rc = mdt_renew_capa(info);
993                 GOTO(out_shrink, rc);
994         }
995
996         LASSERT(obj != NULL);
997         LASSERT(lu_object_assert_exists(&obj->mot_obj));
998
999         mode = lu_object_attr(&obj->mot_obj);
1000
1001         /* old clients may not report needed easize, use max value then */
1002         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
1003                              reqbody->eadatasize == 0 ?
1004                              info->mti_mdt->mdt_max_mdsize :
1005                              reqbody->eadatasize);
1006
1007         rc = req_capsule_server_pack(pill);
1008         if (unlikely(rc != 0))
1009                 GOTO(out, rc = err_serious(rc));
1010
1011         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
1012         LASSERT(repbody != NULL);
1013         repbody->eadatasize = 0;
1014         repbody->aclsize = 0;
1015
1016         if (reqbody->valid & OBD_MD_FLRMTPERM)
1017                 rc = mdt_init_ucred(info, reqbody);
1018         else
1019                 rc = mdt_check_ucred(info);
1020         if (unlikely(rc))
1021                 GOTO(out_shrink, rc);
1022
1023         info->mti_cross_ref = !!(reqbody->valid & OBD_MD_FLCROSSREF);
1024
1025         /*
1026          * Don't check capability at all, because rename might getattr for
1027          * remote obj, and at that time no capability is available.
1028          */
1029         mdt_set_capainfo(info, 1, &reqbody->fid1, BYPASS_CAPA);
1030         rc = mdt_getattr_internal(info, obj, 0);
1031         if (reqbody->valid & OBD_MD_FLRMTPERM)
1032                 mdt_exit_ucred(info);
1033         EXIT;
1034 out_shrink:
1035         mdt_client_compatibility(info);
1036         rc2 = mdt_fix_reply(info);
1037         if (rc == 0)
1038                 rc = rc2;
1039 out:
1040         mdt_thread_info_fini(info);
1041         return rc;
1042 }
1043
1044 int mdt_is_subdir(struct tgt_session_info *tsi)
1045 {
1046         struct mdt_thread_info  *info = tsi2mdt_info(tsi);
1047         struct mdt_object     *o = info->mti_object;
1048         struct req_capsule    *pill = info->mti_pill;
1049         const struct mdt_body *body = info->mti_body;
1050         struct mdt_body       *repbody;
1051         int                    rc;
1052         ENTRY;
1053
1054         LASSERT(o != NULL);
1055
1056         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
1057
1058         /*
1059          * We save last checked parent fid to @repbody->fid1 for remote
1060          * directory case.
1061          */
1062         LASSERT(fid_is_sane(&body->fid2));
1063         LASSERT(mdt_object_exists(o) && !mdt_object_remote(o));
1064         rc = mdo_is_subdir(info->mti_env, mdt_object_child(o),
1065                            &body->fid2, &repbody->fid1);
1066         if (rc == 0 || rc == -EREMOTE)
1067                 repbody->valid |= OBD_MD_FLID;
1068
1069         mdt_thread_info_fini(info);
1070         RETURN(rc);
1071 }
1072
1073 int mdt_swap_layouts(struct tgt_session_info *tsi)
1074 {
1075         struct mdt_thread_info  *info;
1076         struct ptlrpc_request   *req = tgt_ses_req(tsi);
1077         struct obd_export       *exp = req->rq_export;
1078         struct mdt_object       *o1, *o2, *o;
1079         struct mdt_lock_handle  *lh1, *lh2;
1080         struct mdc_swap_layouts *msl;
1081         int                      rc;
1082         ENTRY;
1083
1084         /* client does not support layout lock, so layout swaping
1085          * is disabled.
1086          * FIXME: there is a problem for old clients which don't support
1087          * layout lock yet. If those clients have already opened the file
1088          * they won't be notified at all so that old layout may still be
1089          * used to do IO. This can be fixed after file release is landed by
1090          * doing exclusive open and taking full EX ibits lock. - Jinshan */
1091         if (!exp_connect_layout(exp))
1092                 RETURN(-EOPNOTSUPP);
1093
1094         info = tsi2mdt_info(tsi);
1095         if (req_capsule_get_size(info->mti_pill, &RMF_CAPA1, RCL_CLIENT))
1096                 mdt_set_capainfo(info, 0, &info->mti_body->fid1,
1097                                  req_capsule_client_get(info->mti_pill,
1098                                                         &RMF_CAPA1));
1099
1100         if (req_capsule_get_size(info->mti_pill, &RMF_CAPA2, RCL_CLIENT))
1101                 mdt_set_capainfo(info, 1, &info->mti_body->fid2,
1102                                  req_capsule_client_get(info->mti_pill,
1103                                                         &RMF_CAPA2));
1104
1105         o1 = info->mti_object;
1106         o = o2 = mdt_object_find(info->mti_env, info->mti_mdt,
1107                                 &info->mti_body->fid2);
1108         if (IS_ERR(o))
1109                 GOTO(out, rc = PTR_ERR(o));
1110
1111         if (mdt_object_remote(o) || !mdt_object_exists(o)) /* remote object */
1112                 GOTO(put, rc = -ENOENT);
1113
1114         rc = lu_fid_cmp(&info->mti_body->fid1, &info->mti_body->fid2);
1115         if (unlikely(rc == 0)) /* same file, you kidding me? no-op. */
1116                 GOTO(put, rc);
1117
1118         if (rc < 0)
1119                 swap(o1, o2);
1120
1121         /* permission check. Make sure the calling process having permission
1122          * to write both files. */
1123         rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL,
1124                                 MAY_WRITE);
1125         if (rc < 0)
1126                 GOTO(put, rc);
1127
1128         rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2), NULL,
1129                                 MAY_WRITE);
1130         if (rc < 0)
1131                 GOTO(put, rc);
1132
1133         msl = req_capsule_client_get(info->mti_pill, &RMF_SWAP_LAYOUTS);
1134         if (msl == NULL)
1135                 GOTO(put, rc = -EPROTO);
1136
1137         lh1 = &info->mti_lh[MDT_LH_NEW];
1138         mdt_lock_reg_init(lh1, LCK_EX);
1139         lh2 = &info->mti_lh[MDT_LH_OLD];
1140         mdt_lock_reg_init(lh2, LCK_EX);
1141
1142         rc = mdt_object_lock(info, o1, lh1, MDS_INODELOCK_LAYOUT |
1143                              MDS_INODELOCK_XATTR, MDT_LOCAL_LOCK);
1144         if (rc < 0)
1145                 GOTO(put, rc);
1146
1147         rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
1148                              MDS_INODELOCK_XATTR, MDT_LOCAL_LOCK);
1149         if (rc < 0)
1150                 GOTO(unlock1, rc);
1151
1152         rc = mo_swap_layouts(info->mti_env, mdt_object_child(o1),
1153                              mdt_object_child(o2), msl->msl_flags);
1154         GOTO(unlock2, rc);
1155 unlock2:
1156         mdt_object_unlock(info, o2, lh2, rc);
1157 unlock1:
1158         mdt_object_unlock(info, o1, lh1, rc);
1159 put:
1160         mdt_object_put(info->mti_env, o);
1161 out:
1162         mdt_thread_info_fini(info);
1163         RETURN(rc);
1164 }
1165
1166 static int mdt_raw_lookup(struct mdt_thread_info *info,
1167                           struct mdt_object *parent,
1168                           const struct lu_name *lname,
1169                           struct ldlm_reply *ldlm_rep)
1170 {
1171         struct md_object *next = mdt_object_child(info->mti_object);
1172         const struct mdt_body *reqbody = info->mti_body;
1173         struct lu_fid *child_fid = &info->mti_tmp_fid1;
1174         struct mdt_body *repbody;
1175         int rc;
1176         ENTRY;
1177
1178         if (reqbody->valid != OBD_MD_FLID)
1179                 RETURN(0);
1180
1181         LASSERT(!info->mti_cross_ref);
1182
1183         /* Only got the fid of this obj by name */
1184         fid_zero(child_fid);
1185         rc = mdo_lookup(info->mti_env, next, lname, child_fid,
1186                         &info->mti_spec);
1187 #if 0
1188         /* XXX is raw_lookup possible as intent operation? */
1189         if (rc != 0) {
1190                 if (rc == -ENOENT)
1191                         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
1192                 RETURN(rc);
1193         } else
1194                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
1195
1196         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1197 #endif
1198         if (rc == 0) {
1199                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1200                 repbody->fid1 = *child_fid;
1201                 repbody->valid = OBD_MD_FLID;
1202         }
1203         RETURN(1);
1204 }
1205
1206 /*
1207  * UPDATE lock should be taken against parent, and be release before exit;
1208  * child_bits lock should be taken against child, and be returned back:
1209  *            (1)normal request should release the child lock;
1210  *            (2)intent request will grant the lock to client.
1211  */
1212 static int mdt_getattr_name_lock(struct mdt_thread_info *info,
1213                                  struct mdt_lock_handle *lhc,
1214                                  __u64 child_bits,
1215                                  struct ldlm_reply *ldlm_rep)
1216 {
1217         struct ptlrpc_request  *req       = mdt_info_req(info);
1218         struct mdt_body        *reqbody   = NULL;
1219         struct mdt_object      *parent    = info->mti_object;
1220         struct mdt_object      *child;
1221         struct md_object       *next      = mdt_object_child(parent);
1222         struct lu_fid          *child_fid = &info->mti_tmp_fid1;
1223         struct lu_name         *lname     = NULL;
1224         const char             *name      = NULL;
1225         int                     namelen   = 0;
1226         struct mdt_lock_handle *lhp       = NULL;
1227         struct ldlm_lock       *lock;
1228         struct ldlm_res_id     *res_id;
1229         int                     is_resent;
1230         int                     ma_need = 0;
1231         int                     rc;
1232
1233         ENTRY;
1234
1235         is_resent = lustre_handle_is_used(&lhc->mlh_reg_lh);
1236         LASSERT(ergo(is_resent,
1237                      lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT));
1238
1239         LASSERT(parent != NULL);
1240         name = req_capsule_client_get(info->mti_pill, &RMF_NAME);
1241         if (name == NULL)
1242                 RETURN(err_serious(-EFAULT));
1243
1244         namelen = req_capsule_get_size(info->mti_pill, &RMF_NAME,
1245                                        RCL_CLIENT) - 1;
1246         if (!info->mti_cross_ref) {
1247                 /*
1248                  * XXX: Check for "namelen == 0" is for getattr by fid
1249                  * (OBD_CONNECT_ATTRFID), otherwise do not allow empty name,
1250                  * that is the name must contain at least one character and
1251                  * the terminating '\0'
1252                  */
1253                 if (namelen == 0) {
1254                         reqbody = req_capsule_client_get(info->mti_pill,
1255                                                          &RMF_MDT_BODY);
1256                         if (unlikely(reqbody == NULL))
1257                                 RETURN(err_serious(-EFAULT));
1258
1259                         if (unlikely(!fid_is_sane(&reqbody->fid2)))
1260                                 RETURN(err_serious(-EINVAL));
1261
1262                         name = NULL;
1263                         CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
1264                                "ldlm_rep = %p\n",
1265                                PFID(mdt_object_fid(parent)),
1266                                PFID(&reqbody->fid2), ldlm_rep);
1267                 } else {
1268                         lname = mdt_name(info->mti_env, (char *)name, namelen);
1269                         CDEBUG(D_INODE, "getattr with lock for "DFID"/%s, "
1270                                "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)),
1271                                name, ldlm_rep);
1272                 }
1273         }
1274         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
1275
1276         if (unlikely(!mdt_object_exists(parent)) && lname) {
1277                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
1278                                 &parent->mot_obj,
1279                                 "Parent doesn't exist!\n");
1280                 RETURN(-ESTALE);
1281         } else if (!info->mti_cross_ref) {
1282                 LASSERTF(!mdt_object_remote(parent),
1283                          "Parent "DFID" is on remote server\n",
1284                          PFID(mdt_object_fid(parent)));
1285         }
1286         if (lname) {
1287                 rc = mdt_raw_lookup(info, parent, lname, ldlm_rep);
1288                 if (rc != 0) {
1289                         if (rc > 0)
1290                                 rc = 0;
1291                         RETURN(rc);
1292                 }
1293         }
1294
1295         if (info->mti_cross_ref) {
1296                 /* Only getattr on the child. Parent is on another node. */
1297                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
1298                 child = parent;
1299                 CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID", "
1300                        "ldlm_rep=%p\n", PFID(mdt_object_fid(child)), ldlm_rep);
1301
1302                 if (is_resent) {
1303                         /* Do not take lock for resent case. */
1304                         lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
1305                         LASSERTF(lock != NULL, "Invalid lock handle "LPX64"\n",
1306                                  lhc->mlh_reg_lh.cookie);
1307                         LASSERT(fid_res_name_eq(mdt_object_fid(child),
1308                                                 &lock->l_resource->lr_name));
1309                         LDLM_LOCK_PUT(lock);
1310                         rc = 0;
1311                 } else {
1312                         mdt_lock_handle_init(lhc);
1313                         mdt_lock_reg_init(lhc, LCK_PR);
1314
1315                         /*
1316                          * Object's name is on another MDS, no lookup or layout
1317                          * lock is needed here but update lock is.
1318                          */
1319                         child_bits &= ~(MDS_INODELOCK_LOOKUP |
1320                                         MDS_INODELOCK_LAYOUT);
1321                         child_bits |= MDS_INODELOCK_PERM | MDS_INODELOCK_UPDATE;
1322
1323                         rc = mdt_object_lock(info, child, lhc, child_bits,
1324                                              MDT_LOCAL_LOCK);
1325                 }
1326                 if (rc == 0) {
1327                         /* Finally, we can get attr for child. */
1328                         mdt_set_capainfo(info, 0, mdt_object_fid(child),
1329                                          BYPASS_CAPA);
1330                         rc = mdt_getattr_internal(info, child, 0);
1331                         if (unlikely(rc != 0))
1332                                 mdt_object_unlock(info, child, lhc, 1);
1333                 }
1334                 RETURN(rc);
1335         }
1336
1337         if (lname) {
1338                 /* step 1: lock parent only if parent is a directory */
1339                 if (S_ISDIR(lu_object_attr(&parent->mot_obj))) {
1340                         lhp = &info->mti_lh[MDT_LH_PARENT];
1341                         mdt_lock_pdo_init(lhp, LCK_PR, name, namelen);
1342                         rc = mdt_object_lock(info, parent, lhp,
1343                                              MDS_INODELOCK_UPDATE,
1344                                              MDT_LOCAL_LOCK);
1345                         if (unlikely(rc != 0))
1346                                 RETURN(rc);
1347                 }
1348
1349                 /* step 2: lookup child's fid by name */
1350                 fid_zero(child_fid);
1351                 rc = mdo_lookup(info->mti_env, next, lname, child_fid,
1352                                 &info->mti_spec);
1353
1354                 if (rc != 0) {
1355                         if (rc == -ENOENT)
1356                                 mdt_set_disposition(info, ldlm_rep,
1357                                                     DISP_LOOKUP_NEG);
1358                         GOTO(out_parent, rc);
1359                 } else
1360                         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
1361         } else {
1362                 *child_fid = reqbody->fid2;
1363                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
1364         }
1365
1366         /*
1367          *step 3: find the child object by fid & lock it.
1368          *        regardless if it is local or remote.
1369          *
1370          *Note: LU-3240 (commit 762f2114d282a98ebfa4dbbeea9298a8088ad24e)
1371          *      set parent dir fid the same as child fid in getattr by fid case
1372          *      we should not lu_object_find() the object again, could lead
1373          *      to hung if there is a concurrent unlink destroyed the object.
1374          */
1375         if (lu_fid_eq(mdt_object_fid(parent), child_fid)) {
1376                 mdt_object_get(info->mti_env, parent);
1377                 child = parent;
1378         } else {
1379                 child = mdt_object_find(info->mti_env, info->mti_mdt,
1380                                         child_fid);
1381         }
1382
1383         if (unlikely(IS_ERR(child)))
1384                 GOTO(out_parent, rc = PTR_ERR(child));
1385         if (is_resent) {
1386                 /* Do not take lock for resent case. */
1387                 lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
1388                 LASSERTF(lock != NULL, "Invalid lock handle "LPX64"\n",
1389                          lhc->mlh_reg_lh.cookie);
1390
1391                 res_id = &lock->l_resource->lr_name;
1392                 if (!fid_res_name_eq(mdt_object_fid(child),
1393                                      &lock->l_resource->lr_name)) {
1394                         LASSERTF(fid_res_name_eq(mdt_object_fid(parent),
1395                                                  &lock->l_resource->lr_name),
1396                                  "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1397                                  PLDLMRES(lock->l_resource),
1398                                  PFID(mdt_object_fid(parent)));
1399                         CWARN("Although resent, but still not get child lock"
1400                               "parent:"DFID" child:"DFID"\n",
1401                               PFID(mdt_object_fid(parent)),
1402                               PFID(mdt_object_fid(child)));
1403                         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
1404                         LDLM_LOCK_PUT(lock);
1405                         GOTO(relock, 0);
1406                 }
1407                 LDLM_LOCK_PUT(lock);
1408                 rc = 0;
1409         } else {
1410                 bool try_layout = false;
1411
1412 relock:
1413                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout*2);
1414                 mdt_lock_handle_init(lhc);
1415                 mdt_lock_reg_init(lhc, LCK_PR);
1416
1417                 if (!mdt_object_exists(child)) {
1418                         LU_OBJECT_DEBUG(D_INODE, info->mti_env,
1419                                         &child->mot_obj,
1420                                         "Object doesn't exist!\n");
1421                         GOTO(out_child, rc = -ENOENT);
1422                 }
1423
1424                 if (!(child_bits & MDS_INODELOCK_UPDATE) &&
1425                       mdt_object_exists(child) && !mdt_object_remote(child)) {
1426                         struct md_attr *ma = &info->mti_attr;
1427
1428                         ma->ma_valid = 0;
1429                         ma->ma_need = MA_INODE;
1430                         rc = mdt_attr_get_complex(info, child, ma);
1431                         if (unlikely(rc != 0))
1432                                 GOTO(out_child, rc);
1433
1434                         /* If the file has not been changed for some time, we
1435                          * return not only a LOOKUP lock, but also an UPDATE
1436                          * lock and this might save us RPC on later STAT. For
1437                          * directories, it also let negative dentry cache start
1438                          * working for this dir. */
1439                         if (ma->ma_valid & MA_INODE &&
1440                             ma->ma_attr.la_valid & LA_CTIME &&
1441                             info->mti_mdt->mdt_namespace->ns_ctime_age_limit +
1442                                 ma->ma_attr.la_ctime < cfs_time_current_sec())
1443                                 child_bits |= MDS_INODELOCK_UPDATE;
1444                 }
1445
1446                 /* layout lock must be granted in a best-effort way
1447                  * for IT operations */
1448                 LASSERT(!(child_bits & MDS_INODELOCK_LAYOUT));
1449                 if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) &&
1450                     exp_connect_layout(info->mti_exp) &&
1451                     S_ISREG(lu_object_attr(&child->mot_obj)) &&
1452                     !mdt_object_remote(child) && ldlm_rep != NULL) {
1453                         /* try to grant layout lock for regular file. */
1454                         try_layout = true;
1455                 }
1456
1457                 rc = 0;
1458                 if (try_layout) {
1459                         child_bits |= MDS_INODELOCK_LAYOUT;
1460                         /* try layout lock, it may fail to be granted due to
1461                          * contention at LOOKUP or UPDATE */
1462                         if (!mdt_object_lock_try(info, child, lhc, child_bits,
1463                                                  MDT_CROSS_LOCK)) {
1464                                 child_bits &= ~MDS_INODELOCK_LAYOUT;
1465                                 LASSERT(child_bits != 0);
1466                                 rc = mdt_object_lock(info, child, lhc,
1467                                                 child_bits, MDT_CROSS_LOCK);
1468                         } else {
1469                                 ma_need |= MA_LOV;
1470                         }
1471                 } else {
1472                         rc = mdt_object_lock(info, child, lhc, child_bits,
1473                                                 MDT_CROSS_LOCK);
1474                 }
1475                 if (unlikely(rc != 0))
1476                         GOTO(out_child, rc);
1477         }
1478
1479         lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
1480         /* Get MA_SOM attributes if update lock is given. */
1481         if (lock &&
1482             lock->l_policy_data.l_inodebits.bits & MDS_INODELOCK_UPDATE &&
1483             S_ISREG(lu_object_attr(&mdt_object_child(child)->mo_lu)))
1484                 ma_need |= MA_SOM;
1485
1486         /* finally, we can get attr for child. */
1487         mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA);
1488         rc = mdt_getattr_internal(info, child, ma_need);
1489         if (unlikely(rc != 0)) {
1490                 mdt_object_unlock(info, child, lhc, 1);
1491         } else if (lock) {
1492                 /* Debugging code. */
1493                 res_id = &lock->l_resource->lr_name;
1494                 LDLM_DEBUG(lock, "Returning lock to client");
1495                 LASSERTF(fid_res_name_eq(mdt_object_fid(child),
1496                                          &lock->l_resource->lr_name),
1497                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
1498                          PLDLMRES(lock->l_resource),
1499                          PFID(mdt_object_fid(child)));
1500                 if (mdt_object_exists(child) && !mdt_object_remote(child))
1501                         mdt_pack_size2body(info, child);
1502         }
1503         if (lock)
1504                 LDLM_LOCK_PUT(lock);
1505
1506         EXIT;
1507 out_child:
1508         mdt_object_put(info->mti_env, child);
1509 out_parent:
1510         if (lhp)
1511                 mdt_object_unlock(info, parent, lhp, 1);
1512         return rc;
1513 }
1514
1515 /* normal handler: should release the child lock */
1516 int mdt_getattr_name(struct tgt_session_info *tsi)
1517 {
1518         struct mdt_thread_info  *info = tsi2mdt_info(tsi);
1519         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
1520         struct mdt_body        *reqbody;
1521         struct mdt_body        *repbody;
1522         int rc, rc2;
1523         ENTRY;
1524
1525         reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
1526         LASSERT(reqbody != NULL);
1527         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1528         LASSERT(repbody != NULL);
1529
1530         info->mti_cross_ref = !!(reqbody->valid & OBD_MD_FLCROSSREF);
1531         repbody->eadatasize = 0;
1532         repbody->aclsize = 0;
1533
1534         rc = mdt_init_ucred(info, reqbody);
1535         if (unlikely(rc))
1536                 GOTO(out_shrink, rc);
1537
1538         rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
1539         if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
1540                 ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode);
1541                 lhc->mlh_reg_lh.cookie = 0;
1542         }
1543         mdt_exit_ucred(info);
1544         EXIT;
1545 out_shrink:
1546         mdt_client_compatibility(info);
1547         rc2 = mdt_fix_reply(info);
1548         if (rc == 0)
1549                 rc = rc2;
1550         mdt_thread_info_fini(info);
1551         return rc;
1552 }
1553
1554 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
1555                          void *karg, void *uarg);
1556
1557 int mdt_set_info(struct tgt_session_info *tsi)
1558 {
1559         struct ptlrpc_request   *req = tgt_ses_req(tsi);
1560         char                    *key;
1561         void                    *val;
1562         int                      keylen, vallen, rc = 0;
1563
1564         ENTRY;
1565
1566         key = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_KEY);
1567         if (key == NULL) {
1568                 DEBUG_REQ(D_HA, req, "no set_info key");
1569                 RETURN(err_serious(-EFAULT));
1570         }
1571
1572         keylen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_KEY,
1573                                       RCL_CLIENT);
1574
1575         val = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_VAL);
1576         if (val == NULL) {
1577                 DEBUG_REQ(D_HA, req, "no set_info val");
1578                 RETURN(err_serious(-EFAULT));
1579         }
1580
1581         vallen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_VAL,
1582                                       RCL_CLIENT);
1583
1584         /* Swab any part of val you need to here */
1585         if (KEY_IS(KEY_READ_ONLY)) {
1586                 spin_lock(&req->rq_export->exp_lock);
1587                 if (*(__u32 *)val)
1588                         *exp_connect_flags_ptr(req->rq_export) |=
1589                                 OBD_CONNECT_RDONLY;
1590                 else
1591                         *exp_connect_flags_ptr(req->rq_export) &=
1592                                 ~OBD_CONNECT_RDONLY;
1593                 spin_unlock(&req->rq_export->exp_lock);
1594         } else if (KEY_IS(KEY_CHANGELOG_CLEAR)) {
1595                 struct changelog_setinfo *cs = val;
1596
1597                 if (vallen != sizeof(*cs)) {
1598                         CERROR("%s: bad changelog_clear setinfo size %d\n",
1599                                tgt_name(tsi->tsi_tgt), vallen);
1600                         RETURN(-EINVAL);
1601                 }
1602                 if (ptlrpc_req_need_swab(req)) {
1603                         __swab64s(&cs->cs_recno);
1604                         __swab32s(&cs->cs_id);
1605                 }
1606
1607                 rc = mdt_iocontrol(OBD_IOC_CHANGELOG_CLEAR, req->rq_export,
1608                                    vallen, val, NULL);
1609         } else {
1610                 RETURN(-EINVAL);
1611         }
1612         RETURN(rc);
1613 }
1614
1615 int mdt_readpage(struct tgt_session_info *tsi)
1616 {
1617         struct mdt_thread_info  *info = mdt_th_info(tsi->tsi_env);
1618         struct mdt_object       *object = mdt_obj(tsi->tsi_corpus);
1619         struct lu_rdpg          *rdpg = &info->mti_u.rdpg.mti_rdpg;
1620         const struct mdt_body   *reqbody = tsi->tsi_mdt_body;
1621         struct mdt_body         *repbody;
1622         int                      rc;
1623         int                      i;
1624
1625         ENTRY;
1626
1627         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
1628                 RETURN(err_serious(-ENOMEM));
1629
1630         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_MDT_BODY);
1631         if (repbody == NULL || reqbody == NULL)
1632                 RETURN(err_serious(-EFAULT));
1633
1634         /*
1635          * prepare @rdpg before calling lower layers and transfer itself. Here
1636          * reqbody->size contains offset of where to start to read and
1637          * reqbody->nlink contains number bytes to read.
1638          */
1639         rdpg->rp_hash = reqbody->size;
1640         if (rdpg->rp_hash != reqbody->size) {
1641                 CERROR("Invalid hash: "LPX64" != "LPX64"\n",
1642                        rdpg->rp_hash, reqbody->size);
1643                 RETURN(-EFAULT);
1644         }
1645
1646         rdpg->rp_attrs = reqbody->mode;
1647         if (exp_connect_flags(tsi->tsi_exp) & OBD_CONNECT_64BITHASH)
1648                 rdpg->rp_attrs |= LUDA_64BITHASH;
1649         rdpg->rp_count  = min_t(unsigned int, reqbody->nlink,
1650                                 exp_max_brw_size(tsi->tsi_exp));
1651         rdpg->rp_npages = (rdpg->rp_count + PAGE_CACHE_SIZE - 1) >>
1652                           PAGE_CACHE_SHIFT;
1653         OBD_ALLOC(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
1654         if (rdpg->rp_pages == NULL)
1655                 RETURN(-ENOMEM);
1656
1657         for (i = 0; i < rdpg->rp_npages; ++i) {
1658                 rdpg->rp_pages[i] = alloc_page(GFP_IOFS);
1659                 if (rdpg->rp_pages[i] == NULL)
1660                         GOTO(free_rdpg, rc = -ENOMEM);
1661         }
1662
1663         /* call lower layers to fill allocated pages with directory data */
1664         rc = mo_readpage(tsi->tsi_env, mdt_object_child(object), rdpg);
1665         if (rc < 0)
1666                 GOTO(free_rdpg, rc);
1667
1668         /* send pages to client */
1669         rc = tgt_sendpage(tsi, rdpg, rc);
1670
1671         EXIT;
1672 free_rdpg:
1673
1674         for (i = 0; i < rdpg->rp_npages; i++)
1675                 if (rdpg->rp_pages[i] != NULL)
1676                         __free_page(rdpg->rp_pages[i]);
1677         OBD_FREE(rdpg->rp_pages, rdpg->rp_npages * sizeof rdpg->rp_pages[0]);
1678
1679         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
1680                 RETURN(0);
1681
1682         return rc;
1683 }
1684
1685 static int mdt_reint_internal(struct mdt_thread_info *info,
1686                               struct mdt_lock_handle *lhc,
1687                               __u32 op)
1688 {
1689         struct req_capsule      *pill = info->mti_pill;
1690         struct mdt_body         *repbody;
1691         int                      rc = 0, rc2;
1692
1693         ENTRY;
1694
1695         rc = mdt_reint_unpack(info, op);
1696         if (rc != 0) {
1697                 CERROR("Can't unpack reint, rc %d\n", rc);
1698                 RETURN(err_serious(rc));
1699         }
1700
1701         /* for replay (no_create) lmm is not needed, client has it already */
1702         if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
1703                 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
1704                                      info->mti_rr.rr_eadatalen);
1705
1706         /* llog cookies are always 0, the field is kept for compatibility */
1707         if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
1708                 req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, 0);
1709
1710         rc = req_capsule_server_pack(pill);
1711         if (rc != 0) {
1712                 CERROR("Can't pack response, rc %d\n", rc);
1713                 RETURN(err_serious(rc));
1714         }
1715
1716         if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_SERVER)) {
1717                 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
1718                 LASSERT(repbody);
1719                 repbody->eadatasize = 0;
1720                 repbody->aclsize = 0;
1721         }
1722
1723         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_REINT_DELAY, 10);
1724
1725         /* for replay no cookkie / lmm need, because client have this already */
1726         if (info->mti_spec.no_create)
1727                 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
1728                         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, 0);
1729
1730         rc = mdt_init_ucred_reint(info);
1731         if (rc)
1732                 GOTO(out_shrink, rc);
1733
1734         rc = mdt_fix_attr_ucred(info, op);
1735         if (rc != 0)
1736                 GOTO(out_ucred, rc = err_serious(rc));
1737
1738         if (mdt_check_resent(info, mdt_reconstruct, lhc)) {
1739                 rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg);
1740                 GOTO(out_ucred, rc);
1741         }
1742         rc = mdt_reint_rec(info, lhc);
1743         EXIT;
1744 out_ucred:
1745         mdt_exit_ucred(info);
1746 out_shrink:
1747         mdt_client_compatibility(info);
1748         rc2 = mdt_fix_reply(info);
1749         if (rc == 0)
1750                 rc = rc2;
1751         return rc;
1752 }
1753
1754 static long mdt_reint_opcode(struct ptlrpc_request *req,
1755                              const struct req_format **fmt)
1756 {
1757         struct mdt_device       *mdt;
1758         struct mdt_rec_reint    *rec;
1759         long                     opc;
1760
1761         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
1762         if (rec != NULL) {
1763                 opc = rec->rr_opcode;
1764                 DEBUG_REQ(D_INODE, req, "reint opt = %ld", opc);
1765                 if (opc < REINT_MAX && fmt[opc] != NULL)
1766                         req_capsule_extend(&req->rq_pill, fmt[opc]);
1767                 else {
1768                         mdt = mdt_exp2dev(req->rq_export);
1769                         CERROR("%s: Unsupported opcode '%ld' from client '%s':"
1770                                " rc = %d\n", req->rq_export->exp_obd->obd_name,
1771                                opc, mdt->mdt_ldlm_client->cli_name, -EFAULT);
1772                         opc = err_serious(-EFAULT);
1773                 }
1774         } else {
1775                 opc = err_serious(-EFAULT);
1776         }
1777         return opc;
1778 }
1779
1780 int mdt_reint(struct tgt_session_info *tsi)
1781 {
1782         long opc;
1783         int  rc;
1784         static const struct req_format *reint_fmts[REINT_MAX] = {
1785                 [REINT_SETATTR]  = &RQF_MDS_REINT_SETATTR,
1786                 [REINT_CREATE]   = &RQF_MDS_REINT_CREATE,
1787                 [REINT_LINK]     = &RQF_MDS_REINT_LINK,
1788                 [REINT_UNLINK]   = &RQF_MDS_REINT_UNLINK,
1789                 [REINT_RENAME]   = &RQF_MDS_REINT_RENAME,
1790                 [REINT_OPEN]     = &RQF_MDS_REINT_OPEN,
1791                 [REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR,
1792                 [REINT_RMENTRY]  = &RQF_MDS_REINT_UNLINK
1793         };
1794
1795         ENTRY;
1796
1797         opc = mdt_reint_opcode(tgt_ses_req(tsi), reint_fmts);
1798         if (opc >= 0) {
1799                 struct mdt_thread_info *info = tsi2mdt_info(tsi);
1800                 /*
1801                  * No lock possible here from client to pass it to reint code
1802                  * path.
1803                  */
1804                 rc = mdt_reint_internal(info, NULL, opc);
1805                 mdt_thread_info_fini(info);
1806         } else {
1807                 rc = opc;
1808         }
1809
1810         tsi->tsi_reply_fail_id = OBD_FAIL_MDS_REINT_NET_REP;
1811         RETURN(rc);
1812 }
1813
1814 /* this should sync the whole device */
1815 static int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt)
1816 {
1817         struct dt_device *dt = mdt->mdt_bottom;
1818         int rc;
1819         ENTRY;
1820
1821         rc = dt->dd_ops->dt_sync(env, dt);
1822         RETURN(rc);
1823 }
1824
1825 /* this should sync this object */
1826 static int mdt_object_sync(struct mdt_thread_info *info)
1827 {
1828         struct md_object *next;
1829         int rc;
1830         ENTRY;
1831
1832         if (!mdt_object_exists(info->mti_object)) {
1833                 CWARN("Non existing object  "DFID"!\n",
1834                       PFID(mdt_object_fid(info->mti_object)));
1835                 RETURN(-ESTALE);
1836         }
1837         next = mdt_object_child(info->mti_object);
1838         rc = mo_object_sync(info->mti_env, next);
1839
1840         RETURN(rc);
1841 }
1842
1843 int mdt_sync(struct tgt_session_info *tsi)
1844 {
1845         struct ptlrpc_request   *req = tgt_ses_req(tsi);
1846         struct req_capsule      *pill = tsi->tsi_pill;
1847         struct mdt_body         *body;
1848         int                      rc;
1849
1850         ENTRY;
1851
1852         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
1853                 RETURN(err_serious(-ENOMEM));
1854
1855         if (fid_seq(&tsi->tsi_mdt_body->fid1) == 0) {
1856                 rc = mdt_device_sync(tsi->tsi_env, mdt_exp2dev(tsi->tsi_exp));
1857         } else {
1858                 struct mdt_thread_info *info = tsi2mdt_info(tsi);
1859
1860                 /* sync an object */
1861                 rc = mdt_object_sync(info);
1862                 if (rc == 0) {
1863                         const struct lu_fid *fid;
1864                         struct lu_attr *la = &info->mti_attr.ma_attr;
1865
1866                         info->mti_attr.ma_need = MA_INODE;
1867                         info->mti_attr.ma_valid = 0;
1868                         rc = mdt_attr_get_complex(info, info->mti_object,
1869                                                   &info->mti_attr);
1870                         if (rc == 0) {
1871                                 body = req_capsule_server_get(pill,
1872                                                               &RMF_MDT_BODY);
1873                                 fid = mdt_object_fid(info->mti_object);
1874                                 mdt_pack_attr2body(info, body, la, fid);
1875                         }
1876                 }
1877                 mdt_thread_info_fini(info);
1878         }
1879         if (rc == 0)
1880                 mdt_counter_incr(req, LPROC_MDT_SYNC);
1881
1882         RETURN(rc);
1883 }
1884
1885 /*
1886  * Handle quota control requests to consult current usage/limit, but also
1887  * to configure quota enforcement
1888  */
1889 int mdt_quotactl(struct tgt_session_info *tsi)
1890 {
1891         struct obd_export       *exp  = tsi->tsi_exp;
1892         struct req_capsule      *pill = tsi->tsi_pill;
1893         struct obd_quotactl     *oqctl, *repoqc;
1894         int                      id, rc;
1895         struct mdt_device       *mdt = mdt_exp2dev(exp);
1896         struct lu_device        *qmt = mdt->mdt_qmt_dev;
1897         ENTRY;
1898
1899         oqctl = req_capsule_client_get(pill, &RMF_OBD_QUOTACTL);
1900         if (oqctl == NULL)
1901                 RETURN(err_serious(-EPROTO));
1902
1903         rc = req_capsule_server_pack(pill);
1904         if (rc)
1905                 RETURN(err_serious(rc));
1906
1907         switch (oqctl->qc_cmd) {
1908         case Q_QUOTACHECK:
1909         case LUSTRE_Q_INVALIDATE:
1910         case LUSTRE_Q_FINVALIDATE:
1911         case Q_QUOTAON:
1912         case Q_QUOTAOFF:
1913         case Q_INITQUOTA:
1914                 /* deprecated, not used any more */
1915                 RETURN(-EOPNOTSUPP);
1916                 /* master quotactl */
1917         case Q_GETINFO:
1918         case Q_SETINFO:
1919         case Q_SETQUOTA:
1920         case Q_GETQUOTA:
1921                 if (qmt == NULL)
1922                         RETURN(-EOPNOTSUPP);
1923                 /* slave quotactl */
1924         case Q_GETOINFO:
1925         case Q_GETOQUOTA:
1926                 break;
1927         default:
1928                 CERROR("Unsupported quotactl command: %d\n", oqctl->qc_cmd);
1929                 RETURN(-EFAULT);
1930         }
1931
1932         /* map uid/gid for remote client */
1933         id = oqctl->qc_id;
1934         if (exp_connect_rmtclient(exp)) {
1935                 struct lustre_idmap_table *idmap;
1936
1937                 idmap = exp->exp_mdt_data.med_idmap;
1938
1939                 if (unlikely(oqctl->qc_cmd != Q_GETQUOTA &&
1940                              oqctl->qc_cmd != Q_GETINFO))
1941                         RETURN(-EPERM);
1942
1943                 if (oqctl->qc_type == USRQUOTA)
1944                         id = lustre_idmap_lookup_uid(NULL, idmap, 0,
1945                                                      oqctl->qc_id);
1946                 else if (oqctl->qc_type == GRPQUOTA)
1947                         id = lustre_idmap_lookup_gid(NULL, idmap, 0,
1948                                                      oqctl->qc_id);
1949                 else
1950                         RETURN(-EINVAL);
1951
1952                 if (id == CFS_IDMAP_NOTFOUND) {
1953                         CDEBUG(D_QUOTA, "no mapping for id %u\n", oqctl->qc_id);
1954                         RETURN(-EACCES);
1955                 }
1956         }
1957
1958         repoqc = req_capsule_server_get(pill, &RMF_OBD_QUOTACTL);
1959         if (repoqc == NULL)
1960                 RETURN(err_serious(-EFAULT));
1961
1962         if (oqctl->qc_id != id)
1963                 swap(oqctl->qc_id, id);
1964
1965         switch (oqctl->qc_cmd) {
1966
1967         case Q_GETINFO:
1968         case Q_SETINFO:
1969         case Q_SETQUOTA:
1970         case Q_GETQUOTA:
1971                 /* forward quotactl request to QMT */
1972                 rc = qmt_hdls.qmth_quotactl(tsi->tsi_env, qmt, oqctl);
1973                 break;
1974
1975         case Q_GETOINFO:
1976         case Q_GETOQUOTA:
1977                 /* slave quotactl */
1978                 rc = lquotactl_slv(tsi->tsi_env, tsi->tsi_tgt->lut_bottom,
1979                                    oqctl);
1980                 break;
1981
1982         default:
1983                 CERROR("Unsupported quotactl command: %d\n", oqctl->qc_cmd);
1984                 RETURN(-EFAULT);
1985         }
1986
1987         if (oqctl->qc_id != id)
1988                 swap(oqctl->qc_id, id);
1989
1990         *repoqc = *oqctl;
1991         RETURN(rc);
1992 }
1993
1994 /** clone llog ctxt from child (mdd)
1995  * This allows remote llog (replicator) access.
1996  * We can either pass all llog RPCs (eg mdt_llog_create) on to child where the
1997  * context was originally set up, or we can handle them directly.
1998  * I choose the latter, but that means I need any llog
1999  * contexts set up by child to be accessable by the mdt.  So we clone the
2000  * context into our context list here.
2001  */
2002 static int mdt_llog_ctxt_clone(const struct lu_env *env, struct mdt_device *mdt,
2003                                int idx)
2004 {
2005         struct md_device  *next = mdt->mdt_child;
2006         struct llog_ctxt *ctxt;
2007         int rc;
2008
2009         if (!llog_ctxt_null(mdt2obd_dev(mdt), idx))
2010                 return 0;
2011
2012         rc = next->md_ops->mdo_llog_ctxt_get(env, next, idx, (void **)&ctxt);
2013         if (rc || ctxt == NULL) {
2014                 return 0;
2015         }
2016
2017         rc = llog_group_set_ctxt(&mdt2obd_dev(mdt)->obd_olg, ctxt, idx);
2018         if (rc)
2019                 CERROR("Can't set mdt ctxt %d\n", rc);
2020
2021         return rc;
2022 }
2023
2024 static int mdt_llog_ctxt_unclone(const struct lu_env *env,
2025                                  struct mdt_device *mdt, int idx)
2026 {
2027         struct llog_ctxt *ctxt;
2028
2029         ctxt = llog_get_context(mdt2obd_dev(mdt), idx);
2030         if (ctxt == NULL)
2031                 return 0;
2032         /* Put once for the get we just did, and once for the clone */
2033         llog_ctxt_put(ctxt);
2034         llog_ctxt_put(ctxt);
2035         return 0;
2036 }
2037
2038 /*
2039  * sec context handlers
2040  */
2041 int mdt_sec_ctx_handle(struct tgt_session_info *tsi)
2042 {
2043         int rc;
2044
2045         rc = mdt_handle_idmap(tsi);
2046         if (unlikely(rc)) {
2047                 struct ptlrpc_request   *req = tgt_ses_req(tsi);
2048                 __u32                    opc;
2049
2050                 opc = lustre_msg_get_opc(req->rq_reqmsg);
2051                 if (opc == SEC_CTX_INIT || opc == SEC_CTX_INIT_CONT)
2052                         sptlrpc_svc_ctx_invalidate(req);
2053         }
2054
2055         CFS_FAIL_TIMEOUT(OBD_FAIL_SEC_CTX_HDL_PAUSE, cfs_fail_val);
2056
2057         return rc;
2058 }
2059
2060 /*
2061  * quota request handlers
2062  */
2063 int mdt_quota_dqacq(struct tgt_session_info *tsi)
2064 {
2065         struct mdt_device       *mdt = mdt_exp2dev(tsi->tsi_exp);
2066         struct lu_device        *qmt = mdt->mdt_qmt_dev;
2067         int                      rc;
2068         ENTRY;
2069
2070         if (qmt == NULL)
2071                 RETURN(err_serious(-EOPNOTSUPP));
2072
2073         rc = qmt_hdls.qmth_dqacq(tsi->tsi_env, qmt, tgt_ses_req(tsi));
2074         RETURN(rc);
2075 }
2076
2077 struct mdt_object *mdt_object_new(const struct lu_env *env,
2078                                   struct mdt_device *d,
2079                                   const struct lu_fid *f)
2080 {
2081         struct lu_object_conf conf = { .loc_flags = LOC_F_NEW };
2082         struct lu_object *o;
2083         struct mdt_object *m;
2084         ENTRY;
2085
2086         CDEBUG(D_INFO, "Allocate object for "DFID"\n", PFID(f));
2087         o = lu_object_find(env, &d->mdt_lu_dev, f, &conf);
2088         if (unlikely(IS_ERR(o)))
2089                 m = (struct mdt_object *)o;
2090         else
2091                 m = mdt_obj(o);
2092         RETURN(m);
2093 }
2094
2095 struct mdt_object *mdt_object_find(const struct lu_env *env,
2096                                    struct mdt_device *d,
2097                                    const struct lu_fid *f)
2098 {
2099         struct lu_object *o;
2100         struct mdt_object *m;
2101         ENTRY;
2102
2103         CDEBUG(D_INFO, "Find object for "DFID"\n", PFID(f));
2104         o = lu_object_find(env, &d->mdt_lu_dev, f, NULL);
2105         if (unlikely(IS_ERR(o)))
2106                 m = (struct mdt_object *)o;
2107         else
2108                 m = mdt_obj(o);
2109
2110         RETURN(m);
2111 }
2112
2113 /**
2114  * Asyncronous commit for mdt device.
2115  *
2116  * Pass asynchonous commit call down the MDS stack.
2117  *
2118  * \param env environment
2119  * \param mdt the mdt device
2120  */
2121 static void mdt_device_commit_async(const struct lu_env *env,
2122                                     struct mdt_device *mdt)
2123 {
2124         struct dt_device *dt = mdt->mdt_bottom;
2125         int rc;
2126
2127         rc = dt->dd_ops->dt_commit_async(env, dt);
2128         if (unlikely(rc != 0))
2129                 CWARN("async commit start failed with rc = %d", rc);
2130 }
2131
2132 /**
2133  * Mark the lock as "synchonous".
2134  *
2135  * Mark the lock to deffer transaction commit to the unlock time.
2136  *
2137  * \param lock the lock to mark as "synchonous"
2138  *
2139  * \see mdt_is_lock_sync
2140  * \see mdt_save_lock
2141  */
2142 static inline void mdt_set_lock_sync(struct ldlm_lock *lock)
2143 {
2144         lock->l_ast_data = (void*)1;
2145 }
2146
2147 /**
2148  * Check whehter the lock "synchonous" or not.
2149  *
2150  * \param lock the lock to check
2151  * \retval 1 the lock is "synchonous"
2152  * \retval 0 the lock isn't "synchronous"
2153  *
2154  * \see mdt_set_lock_sync
2155  * \see mdt_save_lock
2156  */
2157 static inline int mdt_is_lock_sync(struct ldlm_lock *lock)
2158 {
2159         return lock->l_ast_data != NULL;
2160 }
2161
2162 /**
2163  * Blocking AST for mdt locks.
2164  *
2165  * Starts transaction commit if in case of COS lock conflict or
2166  * deffers such a commit to the mdt_save_lock.
2167  *
2168  * \param lock the lock which blocks a request or cancelling lock
2169  * \param desc unused
2170  * \param data unused
2171  * \param flag indicates whether this cancelling or blocking callback
2172  * \retval 0
2173  * \see ldlm_blocking_ast_nocheck
2174  */
2175 int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
2176                      void *data, int flag)
2177 {
2178         struct obd_device *obd = ldlm_lock_to_ns(lock)->ns_obd;
2179         struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
2180         int rc;
2181         ENTRY;
2182
2183         if (flag == LDLM_CB_CANCELING)
2184                 RETURN(0);
2185         lock_res_and_lock(lock);
2186         if (lock->l_blocking_ast != mdt_blocking_ast) {
2187                 unlock_res_and_lock(lock);
2188                 RETURN(0);
2189         }
2190         if (mdt_cos_is_enabled(mdt) &&
2191             lock->l_req_mode & (LCK_PW | LCK_EX) &&
2192             lock->l_blocking_lock != NULL &&
2193             lock->l_client_cookie != lock->l_blocking_lock->l_client_cookie) {
2194                 mdt_set_lock_sync(lock);
2195         }
2196         rc = ldlm_blocking_ast_nocheck(lock);
2197
2198         /* There is no lock conflict if l_blocking_lock == NULL,
2199          * it indicates a blocking ast sent from ldlm_lock_decref_internal
2200          * when the last reference to a local lock was released */
2201         if (lock->l_req_mode == LCK_COS && lock->l_blocking_lock != NULL) {
2202                 struct lu_env env;
2203
2204                 rc = lu_env_init(&env, LCT_LOCAL);
2205                 if (unlikely(rc != 0))
2206                         CWARN("lu_env initialization failed with rc = %d,"
2207                               "cannot start asynchronous commit\n", rc);
2208                 else
2209                         mdt_device_commit_async(&env, mdt);
2210                 lu_env_fini(&env);
2211         }
2212         RETURN(rc);
2213 }
2214
2215 int mdt_md_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
2216                         void *data, int flag)
2217 {
2218         struct lustre_handle lockh;
2219         int               rc;
2220
2221         switch (flag) {
2222         case LDLM_CB_BLOCKING:
2223                 ldlm_lock2handle(lock, &lockh);
2224                 rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
2225                 if (rc < 0) {
2226                         CDEBUG(D_INODE, "ldlm_cli_cancel: %d\n", rc);
2227                         RETURN(rc);
2228                 }
2229                 break;
2230         case LDLM_CB_CANCELING:
2231                 LDLM_DEBUG(lock, "Revoke remote lock\n");
2232                 break;
2233         default:
2234                 LBUG();
2235         }
2236         RETURN(0);
2237 }
2238
2239 int mdt_remote_object_lock(struct mdt_thread_info *mti,
2240                            struct mdt_object *o, struct lustre_handle *lh,
2241                            ldlm_mode_t mode, __u64 ibits)
2242 {
2243         struct ldlm_enqueue_info *einfo = &mti->mti_einfo;
2244         ldlm_policy_data_t *policy = &mti->mti_policy;
2245         int rc = 0;
2246         ENTRY;
2247
2248         LASSERT(mdt_object_remote(o));
2249
2250         LASSERT(ibits & MDS_INODELOCK_UPDATE);
2251
2252         memset(einfo, 0, sizeof(*einfo));
2253         einfo->ei_type = LDLM_IBITS;
2254         einfo->ei_mode = mode;
2255         einfo->ei_cb_bl = mdt_md_blocking_ast;
2256         einfo->ei_cb_cp = ldlm_completion_ast;
2257
2258         memset(policy, 0, sizeof(*policy));
2259         policy->l_inodebits.bits = ibits;
2260
2261         rc = mo_object_lock(mti->mti_env, mdt_object_child(o), lh, einfo,
2262                             policy);
2263         RETURN(rc);
2264 }
2265
2266 static int mdt_object_lock0(struct mdt_thread_info *info, struct mdt_object *o,
2267                             struct mdt_lock_handle *lh, __u64 ibits,
2268                             bool nonblock, int locality)
2269 {
2270         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
2271         ldlm_policy_data_t *policy = &info->mti_policy;
2272         struct ldlm_res_id *res_id = &info->mti_res_id;
2273         __u64 dlmflags;
2274         int rc;
2275         ENTRY;
2276
2277         LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh));
2278         LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh));
2279         LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
2280         LASSERT(lh->mlh_type != MDT_NUL_LOCK);
2281
2282         if (mdt_object_remote(o)) {
2283                 if (locality == MDT_CROSS_LOCK) {
2284                         ibits &= ~(MDS_INODELOCK_UPDATE | MDS_INODELOCK_PERM |
2285                                    MDS_INODELOCK_LAYOUT);
2286                         ibits |= MDS_INODELOCK_LOOKUP;
2287                 } else {
2288                         LASSERTF(!(ibits &
2289                                  (MDS_INODELOCK_UPDATE | MDS_INODELOCK_PERM |
2290                                   MDS_INODELOCK_LAYOUT)),
2291                                 "%s: wrong bit "LPX64" for remote obj "DFID"\n",
2292                                 mdt_obd_name(info->mti_mdt), ibits,
2293                                 PFID(mdt_object_fid(o)));
2294                         LASSERT(ibits & MDS_INODELOCK_LOOKUP);
2295                 }
2296                 /* No PDO lock on remote object */
2297                 LASSERT(lh->mlh_type != MDT_PDO_LOCK);
2298         }
2299
2300         if (lh->mlh_type == MDT_PDO_LOCK) {
2301                 /* check for exists after object is locked */
2302                 if (mdt_object_exists(o) == 0) {
2303                         /* Non-existent object shouldn't have PDO lock */
2304                         RETURN(-ESTALE);
2305                 } else {
2306                         /* Non-dir object shouldn't have PDO lock */
2307                         if (!S_ISDIR(lu_object_attr(&o->mot_obj)))
2308                                 RETURN(-ENOTDIR);
2309                 }
2310         }
2311
2312         memset(policy, 0, sizeof(*policy));
2313         fid_build_reg_res_name(mdt_object_fid(o), res_id);
2314
2315         dlmflags = LDLM_FL_ATOMIC_CB;
2316         if (nonblock)
2317                 dlmflags |= LDLM_FL_BLOCK_NOWAIT;
2318
2319         /*
2320          * Take PDO lock on whole directory and build correct @res_id for lock
2321          * on part of directory.
2322          */
2323         if (lh->mlh_pdo_hash != 0) {
2324                 LASSERT(lh->mlh_type == MDT_PDO_LOCK);
2325                 mdt_lock_pdo_mode(info, o, lh);
2326                 if (lh->mlh_pdo_mode != LCK_NL) {
2327                         /*
2328                          * Do not use LDLM_FL_LOCAL_ONLY for parallel lock, it
2329                          * is never going to be sent to client and we do not
2330                          * want it slowed down due to possible cancels.
2331                          */
2332                         policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
2333                         rc = mdt_fid_lock(ns, &lh->mlh_pdo_lh, lh->mlh_pdo_mode,
2334                                           policy, res_id, dlmflags,
2335                                           &info->mti_exp->exp_handle.h_cookie);
2336                         if (unlikely(rc))
2337                                 RETURN(rc);
2338                 }
2339
2340                 /*
2341                  * Finish res_id initializing by name hash marking part of
2342                  * directory which is taking modification.
2343                  */
2344                 res_id->name[LUSTRE_RES_ID_HSH_OFF] = lh->mlh_pdo_hash;
2345         }
2346
2347         policy->l_inodebits.bits = ibits;
2348
2349         /*
2350          * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
2351          * going to be sent to client. If it is - mdt_intent_policy() path will
2352          * fix it up and turn FL_LOCAL flag off.
2353          */
2354         rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy,
2355                           res_id, LDLM_FL_LOCAL_ONLY | dlmflags,
2356                           &info->mti_exp->exp_handle.h_cookie);
2357         if (rc)
2358                 mdt_object_unlock(info, o, lh, 1);
2359         else if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK)) &&
2360                  lh->mlh_pdo_hash != 0 &&
2361                  (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX)) {
2362                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK, 15);
2363         }
2364
2365         RETURN(rc);
2366 }
2367
2368 int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o,
2369                     struct mdt_lock_handle *lh, __u64 ibits, int locality)
2370 {
2371         return mdt_object_lock0(info, o, lh, ibits, false, locality);
2372 }
2373
2374 int mdt_object_lock_try(struct mdt_thread_info *info, struct mdt_object *o,
2375                         struct mdt_lock_handle *lh, __u64 ibits, int locality)
2376 {
2377         struct mdt_lock_handle tmp = *lh;
2378         int rc;
2379
2380         rc = mdt_object_lock0(info, o, &tmp, ibits, true, locality);
2381         if (rc == 0)
2382                 *lh = tmp;
2383
2384         return rc == 0;
2385 }
2386
2387 /**
2388  * Save a lock within request object.
2389  *
2390  * Keep the lock referenced until whether client ACK or transaction
2391  * commit happens or release the lock immediately depending on input
2392  * parameters. If COS is ON, a write lock is converted to COS lock
2393  * before saving.
2394  *
2395  * \param info thead info object
2396  * \param h lock handle
2397  * \param mode lock mode
2398  * \param decref force immediate lock releasing
2399  */
2400 static
2401 void mdt_save_lock(struct mdt_thread_info *info, struct lustre_handle *h,
2402                    ldlm_mode_t mode, int decref)
2403 {
2404         ENTRY;
2405
2406         if (lustre_handle_is_used(h)) {
2407                 if (decref || !info->mti_has_trans ||
2408                     !(mode & (LCK_PW | LCK_EX))){
2409                         mdt_fid_unlock(h, mode);
2410                 } else {
2411                         struct mdt_device *mdt = info->mti_mdt;
2412                         struct ldlm_lock *lock = ldlm_handle2lock(h);
2413                         struct ptlrpc_request *req = mdt_info_req(info);
2414                         int no_ack = 0;
2415
2416                         LASSERTF(lock != NULL, "no lock for cookie "LPX64"\n",
2417                                  h->cookie);
2418                         /* there is no request if mdt_object_unlock() is called
2419                          * from mdt_export_cleanup()->mdt_add_dirty_flag() */
2420                         if (likely(req != NULL)) {
2421                                 CDEBUG(D_HA, "request = %p reply state = %p"
2422                                        " transno = "LPD64"\n", req,
2423                                        req->rq_reply_state, req->rq_transno);
2424                                 if (mdt_cos_is_enabled(mdt)) {
2425                                         no_ack = 1;
2426                                         ldlm_lock_downgrade(lock, LCK_COS);
2427                                         mode = LCK_COS;
2428                                 }
2429                                 ptlrpc_save_lock(req, h, mode, no_ack);
2430                         } else {
2431                                 ldlm_lock_decref(h, mode);
2432                         }
2433                         if (mdt_is_lock_sync(lock)) {
2434                                 CDEBUG(D_HA, "found sync-lock,"
2435                                        " async commit started\n");
2436                                 mdt_device_commit_async(info->mti_env,
2437                                                         mdt);
2438                         }
2439                         LDLM_LOCK_PUT(lock);
2440                 }
2441                 h->cookie = 0ull;
2442         }
2443
2444         EXIT;
2445 }
2446
2447 /**
2448  * Unlock mdt object.
2449  *
2450  * Immeditely release the regular lock and the PDO lock or save the
2451  * lock in reqeuest and keep them referenced until client ACK or
2452  * transaction commit.
2453  *
2454  * \param info thread info object
2455  * \param o mdt object
2456  * \param lh mdt lock handle referencing regular and PDO locks
2457  * \param decref force immediate lock releasing
2458  */
2459 void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o,
2460                        struct mdt_lock_handle *lh, int decref)
2461 {
2462         ENTRY;
2463
2464         mdt_save_lock(info, &lh->mlh_pdo_lh, lh->mlh_pdo_mode, decref);
2465         mdt_save_lock(info, &lh->mlh_reg_lh, lh->mlh_reg_mode, decref);
2466
2467         if (lustre_handle_is_used(&lh->mlh_rreg_lh))
2468                 ldlm_lock_decref(&lh->mlh_rreg_lh, lh->mlh_rreg_mode);
2469
2470         EXIT;
2471 }
2472
2473 struct mdt_object *mdt_object_find_lock(struct mdt_thread_info *info,
2474                                         const struct lu_fid *f,
2475                                         struct mdt_lock_handle *lh,
2476                                         __u64 ibits)
2477 {
2478         struct mdt_object *o;
2479
2480         o = mdt_object_find(info->mti_env, info->mti_mdt, f);
2481         if (!IS_ERR(o)) {
2482                 int rc;
2483
2484                 rc = mdt_object_lock(info, o, lh, ibits,
2485                                      MDT_LOCAL_LOCK);
2486                 if (rc != 0) {
2487                         mdt_object_put(info->mti_env, o);
2488                         o = ERR_PTR(rc);
2489                 }
2490         }
2491         return o;
2492 }
2493
2494 void mdt_object_unlock_put(struct mdt_thread_info * info,
2495                            struct mdt_object * o,
2496                            struct mdt_lock_handle *lh,
2497                            int decref)
2498 {
2499         mdt_object_unlock(info, o, lh, decref);
2500         mdt_object_put(info->mti_env, o);
2501 }
2502
2503 /*
2504  * Generic code handling requests that have struct mdt_body passed in:
2505  *
2506  *  - extract mdt_body from request and save it in @info, if present;
2507  *
2508  *  - create lu_object, corresponding to the fid in mdt_body, and save it in
2509  *  @info;
2510  *
2511  *  - if HABEO_CORPUS flag is set for this request type check whether object
2512  *  actually exists on storage (lu_object_exists()).
2513  *
2514  */
2515 static int mdt_body_unpack(struct mdt_thread_info *info, __u32 flags)
2516 {
2517         const struct mdt_body    *body;
2518         struct mdt_object        *obj;
2519         const struct lu_env      *env;
2520         struct req_capsule       *pill;
2521         int                       rc;
2522         ENTRY;
2523
2524         env = info->mti_env;
2525         pill = info->mti_pill;
2526
2527         body = info->mti_body = req_capsule_client_get(pill, &RMF_MDT_BODY);
2528         if (body == NULL)
2529                 RETURN(-EFAULT);
2530
2531         if (!(body->valid & OBD_MD_FLID))
2532                 RETURN(0);
2533
2534         if (!fid_is_sane(&body->fid1)) {
2535                 CERROR("Invalid fid: "DFID"\n", PFID(&body->fid1));
2536                 RETURN(-EINVAL);
2537         }
2538
2539         /*
2540          * Do not get size or any capa fields before we check that request
2541          * contains capa actually. There are some requests which do not, for
2542          * instance MDS_IS_SUBDIR.
2543          */
2544         if (req_capsule_has_field(pill, &RMF_CAPA1, RCL_CLIENT) &&
2545             req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT))
2546                 mdt_set_capainfo(info, 0, &body->fid1,
2547                                  req_capsule_client_get(pill, &RMF_CAPA1));
2548
2549         obj = mdt_object_find(env, info->mti_mdt, &body->fid1);
2550         if (!IS_ERR(obj)) {
2551                 if ((flags & HABEO_CORPUS) &&
2552                     !mdt_object_exists(obj)) {
2553                         mdt_object_put(env, obj);
2554                         /* for capability renew ENOENT will be handled in
2555                          * mdt_renew_capa */
2556                         if (body->valid & OBD_MD_FLOSSCAPA)
2557                                 rc = 0;
2558                         else
2559                                 rc = -ENOENT;
2560                 } else {
2561                         info->mti_object = obj;
2562                         rc = 0;
2563                 }
2564         } else
2565                 rc = PTR_ERR(obj);
2566
2567         RETURN(rc);
2568 }
2569
2570 static int mdt_unpack_req_pack_rep(struct mdt_thread_info *info, __u32 flags)
2571 {
2572         struct req_capsule *pill = info->mti_pill;
2573         int rc;
2574         ENTRY;
2575
2576         if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_CLIENT))
2577                 rc = mdt_body_unpack(info, flags);
2578         else
2579                 rc = 0;
2580
2581         if (rc == 0 && (flags & HABEO_REFERO)) {
2582                 /* Pack reply. */
2583                 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
2584                         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
2585                                              info->mti_body->eadatasize);
2586                 if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
2587                         req_capsule_set_size(pill, &RMF_LOGCOOKIES,
2588                                              RCL_SERVER, 0);
2589
2590                 rc = req_capsule_server_pack(pill);
2591         }
2592         RETURN(rc);
2593 }
2594
2595 static int mdt_init_capa_ctxt(const struct lu_env *env, struct mdt_device *m)
2596 {
2597         struct md_device *next = m->mdt_child;
2598
2599         return next->md_ops->mdo_init_capa_ctxt(env, next,
2600                                                 m->mdt_lut.lut_mds_capa,
2601                                                 m->mdt_capa_timeout,
2602                                                 m->mdt_capa_alg,
2603                                                 m->mdt_capa_keys);
2604 }
2605
2606 void mdt_lock_handle_init(struct mdt_lock_handle *lh)
2607 {
2608         lh->mlh_type = MDT_NUL_LOCK;
2609         lh->mlh_reg_lh.cookie = 0ull;
2610         lh->mlh_reg_mode = LCK_MINMODE;
2611         lh->mlh_pdo_lh.cookie = 0ull;
2612         lh->mlh_pdo_mode = LCK_MINMODE;
2613         lh->mlh_rreg_lh.cookie = 0ull;
2614         lh->mlh_rreg_mode = LCK_MINMODE;
2615 }
2616
2617 void mdt_lock_handle_fini(struct mdt_lock_handle *lh)
2618 {
2619         LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh));
2620         LASSERT(!lustre_handle_is_used(&lh->mlh_pdo_lh));
2621 }
2622
2623 /*
2624  * Initialize fields of struct mdt_thread_info. Other fields are left in
2625  * uninitialized state, because it's too expensive to zero out whole
2626  * mdt_thread_info (> 1K) on each request arrival.
2627  */
2628 void mdt_thread_info_init(struct ptlrpc_request *req,
2629                           struct mdt_thread_info *info)
2630 {
2631         int i;
2632
2633         info->mti_pill = &req->rq_pill;
2634
2635         /* lock handle */
2636         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
2637                 mdt_lock_handle_init(&info->mti_lh[i]);
2638
2639         /* mdt device: it can be NULL while CONNECT */
2640         if (req->rq_export) {
2641                 info->mti_mdt = mdt_dev(req->rq_export->exp_obd->obd_lu_dev);
2642                 info->mti_exp = req->rq_export;
2643         } else
2644                 info->mti_mdt = NULL;
2645         info->mti_env = req->rq_svc_thread->t_env;
2646         info->mti_transno = lustre_msg_get_transno(req->rq_reqmsg);
2647
2648         memset(&info->mti_attr, 0, sizeof(info->mti_attr));
2649         info->mti_big_buf = LU_BUF_NULL;
2650         info->mti_body = NULL;
2651         info->mti_object = NULL;
2652         info->mti_dlm_req = NULL;
2653         info->mti_has_trans = 0;
2654         info->mti_cross_ref = 0;
2655         info->mti_opdata = 0;
2656         info->mti_big_lmm_used = 0;
2657
2658         /* To not check for split by default. */
2659         info->mti_spec.no_create = 0;
2660         info->mti_spec.sp_rm_entry = 0;
2661 }
2662
2663 void mdt_thread_info_fini(struct mdt_thread_info *info)
2664 {
2665         int i;
2666
2667         if (info->mti_object != NULL) {
2668                 mdt_object_put(info->mti_env, info->mti_object);
2669                 info->mti_object = NULL;
2670         }
2671
2672         for (i = 0; i < ARRAY_SIZE(info->mti_lh); i++)
2673                 mdt_lock_handle_fini(&info->mti_lh[i]);
2674         info->mti_env = NULL;
2675         info->mti_pill = NULL;
2676         info->mti_exp = NULL;
2677
2678         if (unlikely(info->mti_big_buf.lb_buf != NULL))
2679                 lu_buf_free(&info->mti_big_buf);
2680 }
2681
2682 struct mdt_thread_info *tsi2mdt_info(struct tgt_session_info *tsi)
2683 {
2684         struct mdt_thread_info  *mti;
2685         struct lustre_capa      *lc;
2686
2687         mti = mdt_th_info(tsi->tsi_env);
2688         LASSERT(mti != NULL);
2689
2690         mdt_thread_info_init(tgt_ses_req(tsi), mti);
2691         if (tsi->tsi_corpus != NULL) {
2692                 struct req_capsule *pill = tsi->tsi_pill;
2693
2694                 mti->mti_object = mdt_obj(tsi->tsi_corpus);
2695                 lu_object_get(tsi->tsi_corpus);
2696
2697                 /*
2698                  * XXX: must be part of tgt_mdt_body_unpack but moved here
2699                  * due to mdt_set_capainfo().
2700                  */
2701                 if (req_capsule_has_field(pill, &RMF_CAPA1, RCL_CLIENT) &&
2702                     req_capsule_get_size(pill, &RMF_CAPA1, RCL_CLIENT) > 0) {
2703                         lc = req_capsule_client_get(pill, &RMF_CAPA1);
2704                         mdt_set_capainfo(mti, 0, &tsi->tsi_mdt_body->fid1, lc);
2705                 }
2706         }
2707         mti->mti_body = tsi->tsi_mdt_body;
2708         mti->mti_dlm_req = tsi->tsi_dlm_req;
2709
2710         return mti;
2711 }
2712
2713 int mdt_tgt_connect(struct tgt_session_info *tsi)
2714 {
2715         struct ptlrpc_request   *req = tgt_ses_req(tsi);
2716         int                      rc;
2717
2718         ENTRY;
2719
2720         rc = tgt_connect(tsi);
2721         if (rc != 0)
2722                 RETURN(rc);
2723
2724         rc = mdt_init_idmap(tsi);
2725         if (rc != 0)
2726                 GOTO(err, rc);
2727         RETURN(0);
2728 err:
2729         obd_disconnect(class_export_get(req->rq_export));
2730         return rc;
2731 }
2732
2733 enum mdt_it_code {
2734         MDT_IT_OPEN,
2735         MDT_IT_OCREAT,
2736         MDT_IT_CREATE,
2737         MDT_IT_GETATTR,
2738         MDT_IT_READDIR,
2739         MDT_IT_LOOKUP,
2740         MDT_IT_UNLINK,
2741         MDT_IT_TRUNC,
2742         MDT_IT_GETXATTR,
2743         MDT_IT_LAYOUT,
2744         MDT_IT_QUOTA,
2745         MDT_IT_NR
2746 };
2747
2748 static int mdt_intent_getattr(enum mdt_it_code opcode,
2749                               struct mdt_thread_info *info,
2750                               struct ldlm_lock **,
2751                               __u64);
2752
2753 static int mdt_intent_getxattr(enum mdt_it_code opcode,
2754                                 struct mdt_thread_info *info,
2755                                 struct ldlm_lock **lockp,
2756                                 __u64 flags);
2757
2758 static int mdt_intent_layout(enum mdt_it_code opcode,
2759                              struct mdt_thread_info *info,
2760                              struct ldlm_lock **,
2761                              __u64);
2762 static int mdt_intent_reint(enum mdt_it_code opcode,
2763                             struct mdt_thread_info *info,
2764                             struct ldlm_lock **,
2765                             __u64);
2766
2767 static struct mdt_it_flavor {
2768         const struct req_format *it_fmt;
2769         __u32                    it_flags;
2770         int                    (*it_act)(enum mdt_it_code ,
2771                                          struct mdt_thread_info *,
2772                                          struct ldlm_lock **,
2773                                          __u64);
2774         long                     it_reint;
2775 } mdt_it_flavor[] = {
2776         [MDT_IT_OPEN]     = {
2777                 .it_fmt   = &RQF_LDLM_INTENT,
2778                 /*.it_flags = HABEO_REFERO,*/
2779                 .it_flags = 0,
2780                 .it_act   = mdt_intent_reint,
2781                 .it_reint = REINT_OPEN
2782         },
2783         [MDT_IT_OCREAT]   = {
2784                 .it_fmt   = &RQF_LDLM_INTENT,
2785                 /*
2786                  * OCREAT is not a MUTABOR request as if the file
2787                  * already exists.
2788                  * We do the extra check of OBD_CONNECT_RDONLY in
2789                  * mdt_reint_open() when we really need to create
2790                  * the object.
2791                  */
2792                 .it_flags = 0,
2793                 .it_act   = mdt_intent_reint,
2794                 .it_reint = REINT_OPEN
2795         },
2796         [MDT_IT_CREATE]   = {
2797                 .it_fmt   = &RQF_LDLM_INTENT,
2798                 .it_flags = MUTABOR,
2799                 .it_act   = mdt_intent_reint,
2800                 .it_reint = REINT_CREATE
2801         },
2802         [MDT_IT_GETATTR]  = {
2803                 .it_fmt   = &RQF_LDLM_INTENT_GETATTR,
2804                 .it_flags = HABEO_REFERO,
2805                 .it_act   = mdt_intent_getattr
2806         },
2807         [MDT_IT_READDIR]  = {
2808                 .it_fmt   = NULL,
2809                 .it_flags = 0,
2810                 .it_act   = NULL
2811         },
2812         [MDT_IT_LOOKUP]   = {
2813                 .it_fmt   = &RQF_LDLM_INTENT_GETATTR,
2814                 .it_flags = HABEO_REFERO,
2815                 .it_act   = mdt_intent_getattr
2816         },
2817         [MDT_IT_UNLINK]   = {
2818                 .it_fmt   = &RQF_LDLM_INTENT_UNLINK,
2819                 .it_flags = MUTABOR,
2820                 .it_act   = NULL,
2821                 .it_reint = REINT_UNLINK
2822         },
2823         [MDT_IT_TRUNC]    = {
2824                 .it_fmt   = NULL,
2825                 .it_flags = MUTABOR,
2826                 .it_act   = NULL
2827         },
2828         [MDT_IT_GETXATTR] = {
2829                 .it_fmt   = &RQF_LDLM_INTENT_GETXATTR,
2830                 .it_flags = HABEO_CORPUS,
2831                 .it_act   = mdt_intent_getxattr
2832         },
2833         [MDT_IT_LAYOUT] = {
2834                 .it_fmt   = &RQF_LDLM_INTENT_LAYOUT,
2835                 .it_flags = 0,
2836                 .it_act   = mdt_intent_layout
2837         }
2838 };
2839
2840 int mdt_intent_lock_replace(struct mdt_thread_info *info,
2841                             struct ldlm_lock **lockp,
2842                             struct ldlm_lock *new_lock,
2843                             struct mdt_lock_handle *lh,
2844                             __u64 flags)
2845 {
2846         struct ptlrpc_request  *req = mdt_info_req(info);
2847         struct ldlm_lock       *lock = *lockp;
2848
2849         /*
2850          * Get new lock only for cases when possible resent did not find any
2851          * lock.
2852          */
2853         if (new_lock == NULL)
2854                 new_lock = ldlm_handle2lock_long(&lh->mlh_reg_lh, 0);
2855
2856         if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY)) {
2857                 lh->mlh_reg_lh.cookie = 0;
2858                 RETURN(0);
2859         }
2860
2861         LASSERTF(new_lock != NULL,
2862                  "lockh "LPX64"\n", lh->mlh_reg_lh.cookie);
2863
2864         /*
2865          * If we've already given this lock to a client once, then we should
2866          * have no readers or writers.  Otherwise, we should have one reader
2867          * _or_ writer ref (which will be zeroed below) before returning the
2868          * lock to a client.
2869          */
2870         if (new_lock->l_export == req->rq_export) {
2871                 LASSERT(new_lock->l_readers + new_lock->l_writers == 0);
2872         } else {
2873                 LASSERT(new_lock->l_export == NULL);
2874                 LASSERT(new_lock->l_readers + new_lock->l_writers == 1);
2875         }
2876
2877         *lockp = new_lock;
2878
2879         if (new_lock->l_export == req->rq_export) {
2880                 /*
2881                  * Already gave this to the client, which means that we
2882                  * reconstructed a reply.
2883                  */
2884                 LASSERT(lustre_msg_get_flags(req->rq_reqmsg) &
2885                         MSG_RESENT);
2886                 lh->mlh_reg_lh.cookie = 0;
2887                 RETURN(ELDLM_LOCK_REPLACED);
2888         }
2889
2890         /*
2891          * Fixup the lock to be given to the client.
2892          */
2893         lock_res_and_lock(new_lock);
2894         /* Zero new_lock->l_readers and new_lock->l_writers without triggering
2895          * possible blocking AST. */
2896         while (new_lock->l_readers > 0) {
2897                 lu_ref_del(&new_lock->l_reference, "reader", new_lock);
2898                 lu_ref_del(&new_lock->l_reference, "user", new_lock);
2899                 new_lock->l_readers--;
2900         }
2901         while (new_lock->l_writers > 0) {
2902                 lu_ref_del(&new_lock->l_reference, "writer", new_lock);
2903                 lu_ref_del(&new_lock->l_reference, "user", new_lock);
2904                 new_lock->l_writers--;
2905         }
2906
2907         new_lock->l_export = class_export_lock_get(req->rq_export, new_lock);
2908         new_lock->l_blocking_ast = lock->l_blocking_ast;
2909         new_lock->l_completion_ast = lock->l_completion_ast;
2910         new_lock->l_remote_handle = lock->l_remote_handle;
2911         new_lock->l_flags &= ~LDLM_FL_LOCAL;
2912
2913         unlock_res_and_lock(new_lock);
2914
2915         cfs_hash_add(new_lock->l_export->exp_lock_hash,
2916                      &new_lock->l_remote_handle,
2917                      &new_lock->l_exp_hash);
2918
2919         LDLM_LOCK_RELEASE(new_lock);
2920         lh->mlh_reg_lh.cookie = 0;
2921
2922         RETURN(ELDLM_LOCK_REPLACED);
2923 }
2924
2925 static void mdt_intent_fixup_resent(struct mdt_thread_info *info,
2926                                     struct ldlm_lock *new_lock,
2927                                     struct ldlm_lock **old_lock,
2928                                     struct mdt_lock_handle *lh)
2929 {
2930         struct ptlrpc_request  *req = mdt_info_req(info);
2931         struct obd_export      *exp = req->rq_export;
2932         struct lustre_handle    remote_hdl;
2933         struct ldlm_request    *dlmreq;
2934         struct ldlm_lock       *lock;
2935
2936         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
2937                 return;
2938
2939         dlmreq = req_capsule_client_get(info->mti_pill, &RMF_DLM_REQ);
2940         remote_hdl = dlmreq->lock_handle[0];
2941
2942         /* In the function below, .hs_keycmp resolves to
2943          * ldlm_export_lock_keycmp() */
2944         /* coverity[overrun-buffer-val] */
2945         lock = cfs_hash_lookup(exp->exp_lock_hash, &remote_hdl);
2946         if (lock) {
2947                 if (lock != new_lock) {
2948                         lh->mlh_reg_lh.cookie = lock->l_handle.h_cookie;
2949                         lh->mlh_reg_mode = lock->l_granted_mode;
2950
2951                         LDLM_DEBUG(lock, "Restoring lock cookie");
2952                         DEBUG_REQ(D_DLMTRACE, req,
2953                                   "restoring lock cookie "LPX64,
2954                                   lh->mlh_reg_lh.cookie);
2955                         if (old_lock)
2956                                 *old_lock = LDLM_LOCK_GET(lock);
2957                         cfs_hash_put(exp->exp_lock_hash, &lock->l_exp_hash);
2958                         return;
2959                 }
2960
2961                 cfs_hash_put(exp->exp_lock_hash, &lock->l_exp_hash);
2962         }
2963
2964         /*
2965          * If the xid matches, then we know this is a resent request, and allow
2966          * it. (It's probably an OPEN, for which we don't send a lock.
2967          */
2968         if (req_xid_is_last(req))
2969                 return;
2970
2971         /*
2972          * This remote handle isn't enqueued, so we never received or processed
2973          * this request.  Clear MSG_RESENT, because it can be handled like any
2974          * normal request now.
2975          */
2976         lustre_msg_clear_flags(req->rq_reqmsg, MSG_RESENT);
2977
2978         DEBUG_REQ(D_DLMTRACE, req, "no existing lock with rhandle "LPX64,
2979                   remote_hdl.cookie);
2980 }
2981
2982 static int mdt_intent_getxattr(enum mdt_it_code opcode,
2983                                 struct mdt_thread_info *info,
2984                                 struct ldlm_lock **lockp,
2985                                 __u64 flags)
2986 {
2987         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
2988         struct ldlm_reply      *ldlm_rep = NULL;
2989         int rc, grc;
2990
2991         /*
2992          * Initialize lhc->mlh_reg_lh either from a previously granted lock
2993          * (for the resend case) or a new lock. Below we will use it to
2994          * replace the original lock.
2995          */
2996         mdt_intent_fixup_resent(info, *lockp, NULL, lhc);
2997         if (!lustre_handle_is_used(&lhc->mlh_reg_lh)) {
2998                 mdt_lock_reg_init(lhc, (*lockp)->l_req_mode);
2999                 rc = mdt_object_lock(info, info->mti_object, lhc,
3000                                         MDS_INODELOCK_XATTR,
3001                                         MDT_LOCAL_LOCK);
3002                 if (rc)
3003                         return rc;
3004         }
3005
3006         grc = mdt_getxattr(info);
3007
3008         rc = mdt_intent_lock_replace(info, lockp, NULL, lhc, flags);
3009
3010         if (mdt_info_req(info)->rq_repmsg != NULL)
3011                 ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
3012         if (ldlm_rep == NULL)
3013                 RETURN(err_serious(-EFAULT));
3014
3015         ldlm_rep->lock_policy_res2 = grc;
3016
3017         return rc;
3018 }
3019
3020 static int mdt_intent_getattr(enum mdt_it_code opcode,
3021                               struct mdt_thread_info *info,
3022                               struct ldlm_lock **lockp,
3023                               __u64 flags)
3024 {
3025         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
3026         struct ldlm_lock       *new_lock = NULL;
3027         __u64                   child_bits;
3028         struct ldlm_reply      *ldlm_rep;
3029         struct ptlrpc_request  *req;
3030         struct mdt_body        *reqbody;
3031         struct mdt_body        *repbody;
3032         int                     rc, rc2;
3033         ENTRY;
3034
3035         reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
3036         LASSERT(reqbody);
3037
3038         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
3039         LASSERT(repbody);
3040
3041         info->mti_cross_ref = !!(reqbody->valid & OBD_MD_FLCROSSREF);
3042         repbody->eadatasize = 0;
3043         repbody->aclsize = 0;
3044
3045         switch (opcode) {
3046         case MDT_IT_LOOKUP:
3047                 child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM;
3048                 break;
3049         case MDT_IT_GETATTR:
3050                 child_bits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE |
3051                              MDS_INODELOCK_PERM;
3052                 break;
3053         default:
3054                 CERROR("Unsupported intent (%d)\n", opcode);
3055                 GOTO(out_shrink, rc = -EINVAL);
3056         }
3057
3058         rc = mdt_init_ucred(info, reqbody);
3059         if (rc)
3060                 GOTO(out_shrink, rc);
3061
3062         req = info->mti_pill->rc_req;
3063         ldlm_rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
3064         mdt_set_disposition(info, ldlm_rep, DISP_IT_EXECD);
3065
3066         /* Get lock from request for possible resent case. */
3067         mdt_intent_fixup_resent(info, *lockp, &new_lock, lhc);
3068
3069         rc = mdt_getattr_name_lock(info, lhc, child_bits, ldlm_rep);
3070         ldlm_rep->lock_policy_res2 = clear_serious(rc);
3071
3072         if (mdt_get_disposition(ldlm_rep, DISP_LOOKUP_NEG))
3073                 ldlm_rep->lock_policy_res2 = 0;
3074         if (!mdt_get_disposition(ldlm_rep, DISP_LOOKUP_POS) ||
3075             ldlm_rep->lock_policy_res2) {
3076                 lhc->mlh_reg_lh.cookie = 0ull;
3077                 GOTO(out_ucred, rc = ELDLM_LOCK_ABORTED);
3078         }
3079
3080         rc = mdt_intent_lock_replace(info, lockp, new_lock, lhc, flags);
3081         EXIT;
3082 out_ucred:
3083         mdt_exit_ucred(info);
3084 out_shrink:
3085         mdt_client_compatibility(info);
3086         rc2 = mdt_fix_reply(info);
3087         if (rc == 0)
3088                 rc = rc2;
3089         return rc;
3090 }
3091
3092 static int mdt_intent_layout(enum mdt_it_code opcode,
3093                              struct mdt_thread_info *info,
3094                              struct ldlm_lock **lockp,
3095                              __u64 flags)
3096 {
3097         struct layout_intent *layout;
3098         struct lu_fid *fid;
3099         struct mdt_object *obj = NULL;
3100         struct md_object *child = NULL;
3101         int rc;
3102         ENTRY;
3103
3104         if (opcode != MDT_IT_LAYOUT) {
3105                 CERROR("%s: Unknown intent (%d)\n", mdt_obd_name(info->mti_mdt),
3106                         opcode);
3107                 RETURN(-EINVAL);
3108         }
3109
3110         fid = &info->mti_tmp_fid2;
3111         fid_extract_from_res_name(fid, &(*lockp)->l_resource->lr_name);
3112
3113         obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
3114         if (IS_ERR(obj))
3115                 RETURN(PTR_ERR(obj));
3116
3117         if (mdt_object_exists(obj) && !mdt_object_remote(obj)) {
3118                 child = mdt_object_child(obj);
3119
3120                 /* get the length of lsm */
3121                 rc = mo_xattr_get(info->mti_env, child, &LU_BUF_NULL,
3122                                   XATTR_NAME_LOV);
3123
3124                 if (rc > info->mti_mdt->mdt_max_mdsize)
3125                         info->mti_mdt->mdt_max_mdsize = rc;
3126         }
3127
3128         mdt_object_put(info->mti_env, obj);
3129
3130         (*lockp)->l_lvb_type = LVB_T_LAYOUT;
3131         req_capsule_set_size(info->mti_pill, &RMF_DLM_LVB, RCL_SERVER,
3132                         ldlm_lvbo_size(*lockp));
3133         rc = req_capsule_server_pack(info->mti_pill);
3134         if (rc != 0)
3135                 RETURN(-EINVAL);
3136
3137         layout = req_capsule_client_get(info->mti_pill, &RMF_LAYOUT_INTENT);
3138         LASSERT(layout != NULL);
3139         if (layout->li_opc == LAYOUT_INTENT_ACCESS)
3140                 /* return to normal ldlm handling */
3141                 RETURN(0);
3142
3143         CERROR("%s: Unsupported layout intent (%d)\n",
3144                 mdt_obd_name(info->mti_mdt), layout->li_opc);
3145         RETURN(-EINVAL);
3146 }
3147
3148 static int mdt_intent_reint(enum mdt_it_code opcode,
3149                             struct mdt_thread_info *info,
3150                             struct ldlm_lock **lockp,
3151                             __u64 flags)
3152 {
3153         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_RMT];
3154         struct ldlm_reply      *rep = NULL;
3155         long                    opc;
3156         int                     rc;
3157
3158         static const struct req_format *intent_fmts[REINT_MAX] = {
3159                 [REINT_CREATE]  = &RQF_LDLM_INTENT_CREATE,
3160                 [REINT_OPEN]    = &RQF_LDLM_INTENT_OPEN
3161         };
3162
3163         ENTRY;
3164
3165         opc = mdt_reint_opcode(mdt_info_req(info), intent_fmts);
3166         if (opc < 0)
3167                 RETURN(opc);
3168
3169         if (mdt_it_flavor[opcode].it_reint != opc) {
3170                 CERROR("Reint code %ld doesn't match intent: %d\n",
3171                        opc, opcode);
3172                 RETURN(err_serious(-EPROTO));
3173         }
3174
3175         /* Get lock from request for possible resent case. */
3176         mdt_intent_fixup_resent(info, *lockp, NULL, lhc);
3177
3178         rc = mdt_reint_internal(info, lhc, opc);
3179
3180         /* Check whether the reply has been packed successfully. */
3181         if (mdt_info_req(info)->rq_repmsg != NULL)
3182                 rep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
3183         if (rep == NULL)
3184                 RETURN(err_serious(-EFAULT));
3185
3186         /* MDC expects this in any case */
3187         if (rc != 0)
3188                 mdt_set_disposition(info, rep, DISP_LOOKUP_EXECD);
3189
3190         /* the open lock or the lock for cross-ref object should be
3191          * returned to the client */
3192         if (rc == -EREMOTE || mdt_get_disposition(rep, DISP_OPEN_LOCK)) {
3193                 LASSERT(lustre_handle_is_used(&lhc->mlh_reg_lh));
3194                 rep->lock_policy_res2 = 0;
3195                 rc = mdt_intent_lock_replace(info, lockp, NULL, lhc, flags);
3196                 RETURN(rc);
3197         }
3198
3199         rep->lock_policy_res2 = clear_serious(rc);
3200
3201         if (rep->lock_policy_res2 == -ENOENT &&
3202             mdt_get_disposition(rep, DISP_LOOKUP_NEG))
3203                 rep->lock_policy_res2 = 0;
3204
3205         if (rc == -ENOTCONN || rc == -ENODEV ||
3206             rc == -EOVERFLOW) { /**< if VBR failure then return error */
3207                 /*
3208                  * If it is the disconnect error (ENODEV & ENOCONN), the error
3209                  * will be returned by rq_status, and client at ptlrpc layer
3210                  * will detect this, then disconnect, reconnect the import
3211                  * immediately, instead of impacting the following the rpc.
3212                  */
3213                 lhc->mlh_reg_lh.cookie = 0ull;
3214                 RETURN(rc);
3215         } else {
3216                 /*
3217                  * For other cases, the error will be returned by intent.
3218                  * and client will retrieve the result from intent.
3219                  */
3220                  /*
3221                   * FIXME: when open lock is finished, that should be
3222                   * checked here.
3223                   */
3224                 if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
3225                         LASSERTF(rc == 0, "Error occurred but lock handle "
3226                                  "is still in use, rc = %d\n", rc);
3227                         rep->lock_policy_res2 = 0;
3228                         rc = mdt_intent_lock_replace(info, lockp, NULL, lhc, flags);
3229                         RETURN(rc);
3230                 } else {
3231                         lhc->mlh_reg_lh.cookie = 0ull;
3232                         RETURN(ELDLM_LOCK_ABORTED);
3233                 }
3234         }
3235 }
3236
3237 static int mdt_intent_code(long itcode)
3238 {
3239         int rc;
3240
3241         switch(itcode) {
3242         case IT_OPEN:
3243                 rc = MDT_IT_OPEN;
3244                 break;
3245         case IT_OPEN|IT_CREAT:
3246                 rc = MDT_IT_OCREAT;
3247                 break;
3248         case IT_CREAT:
3249                 rc = MDT_IT_CREATE;
3250                 break;
3251         case IT_READDIR:
3252                 rc = MDT_IT_READDIR;
3253                 break;
3254         case IT_GETATTR:
3255                 rc = MDT_IT_GETATTR;
3256                 break;
3257         case IT_LOOKUP:
3258                 rc = MDT_IT_LOOKUP;
3259                 break;
3260         case IT_UNLINK:
3261                 rc = MDT_IT_UNLINK;
3262                 break;
3263         case IT_TRUNC:
3264                 rc = MDT_IT_TRUNC;
3265                 break;
3266         case IT_GETXATTR:
3267                 rc = MDT_IT_GETXATTR;
3268                 break;
3269         case IT_LAYOUT:
3270                 rc = MDT_IT_LAYOUT;
3271                 break;
3272         case IT_QUOTA_DQACQ:
3273         case IT_QUOTA_CONN:
3274                 rc = MDT_IT_QUOTA;
3275                 break;
3276         default:
3277                 CERROR("Unknown intent opcode: %ld\n", itcode);
3278                 rc = -EINVAL;
3279                 break;
3280         }
3281         return rc;
3282 }
3283
3284 static int mdt_intent_opc(long itopc, struct mdt_thread_info *info,
3285                           struct ldlm_lock **lockp, __u64 flags)
3286 {
3287         struct req_capsule   *pill;
3288         struct mdt_it_flavor *flv;
3289         int opc;
3290         int rc;
3291         ENTRY;
3292
3293         opc = mdt_intent_code(itopc);
3294         if (opc < 0)
3295                 RETURN(-EINVAL);
3296
3297         pill = info->mti_pill;
3298
3299         if (opc == MDT_IT_QUOTA) {
3300                 struct lu_device *qmt = info->mti_mdt->mdt_qmt_dev;
3301
3302                 if (qmt == NULL)
3303                         RETURN(-EOPNOTSUPP);
3304
3305                 (*lockp)->l_lvb_type = LVB_T_LQUOTA;
3306                 /* pass the request to quota master */
3307                 rc = qmt_hdls.qmth_intent_policy(info->mti_env, qmt,
3308                                                  mdt_info_req(info), lockp,
3309                                                  flags);
3310                 RETURN(rc);
3311         }
3312
3313         flv  = &mdt_it_flavor[opc];
3314         if (flv->it_fmt != NULL)
3315                 req_capsule_extend(pill, flv->it_fmt);
3316
3317         rc = mdt_unpack_req_pack_rep(info, flv->it_flags);
3318         if (rc == 0) {
3319                 struct ptlrpc_request *req = mdt_info_req(info);
3320                 if (flv->it_flags & MUTABOR &&
3321                     exp_connect_flags(req->rq_export) & OBD_CONNECT_RDONLY)
3322                         RETURN(-EROFS);
3323         }
3324         if (rc == 0 && flv->it_act != NULL) {
3325                 struct ldlm_reply *rep;
3326
3327                 /* execute policy */
3328                 rc = flv->it_act(opc, info, lockp, flags);
3329
3330                 /* Check whether the reply has been packed successfully. */
3331                 if (mdt_info_req(info)->rq_repmsg != NULL) {
3332                         rep = req_capsule_server_get(info->mti_pill,
3333                                                      &RMF_DLM_REP);
3334                         rep->lock_policy_res2 =
3335                                 ptlrpc_status_hton(rep->lock_policy_res2);
3336                 }
3337         } else {
3338                 rc = -EPROTO;
3339         }
3340         RETURN(rc);
3341 }
3342
3343 static int mdt_intent_policy(struct ldlm_namespace *ns,
3344                              struct ldlm_lock **lockp, void *req_cookie,
3345                              ldlm_mode_t mode, __u64 flags, void *data)
3346 {
3347         struct tgt_session_info *tsi;
3348         struct mdt_thread_info  *info;
3349         struct ptlrpc_request   *req  =  req_cookie;
3350         struct ldlm_intent      *it;
3351         struct req_capsule      *pill;
3352         int rc;
3353
3354         ENTRY;
3355
3356         LASSERT(req != NULL);
3357
3358         tsi = tgt_ses_info(req->rq_svc_thread->t_env);
3359
3360         info = tsi2mdt_info(tsi);
3361         LASSERT(info != NULL);
3362         pill = info->mti_pill;
3363         LASSERT(pill->rc_req == req);
3364
3365         if (req->rq_reqmsg->lm_bufcount > DLM_INTENT_IT_OFF) {
3366                 req_capsule_extend(pill, &RQF_LDLM_INTENT_BASIC);
3367                 it = req_capsule_client_get(pill, &RMF_LDLM_INTENT);
3368                 if (it != NULL) {
3369                         rc = mdt_intent_opc(it->opc, info, lockp, flags);
3370                         if (rc == 0)
3371                                 rc = ELDLM_OK;
3372
3373                         /* Lock without inodebits makes no sense and will oops
3374                          * later in ldlm. Let's check it now to see if we have
3375                          * ibits corrupted somewhere in mdt_intent_opc().
3376                          * The case for client miss to set ibits has been
3377                          * processed by others. */
3378                         LASSERT(ergo(info->mti_dlm_req->lock_desc.l_resource.\
3379                                         lr_type == LDLM_IBITS,
3380                                      info->mti_dlm_req->lock_desc.\
3381                                         l_policy_data.l_inodebits.bits != 0));
3382                 } else
3383                         rc = err_serious(-EFAULT);
3384         } else {
3385                 /* No intent was provided */
3386                 LASSERT(pill->rc_fmt == &RQF_LDLM_ENQUEUE);
3387                 req_capsule_set_size(pill, &RMF_DLM_LVB, RCL_SERVER, 0);
3388                 rc = req_capsule_server_pack(pill);
3389                 if (rc)
3390                         rc = err_serious(rc);
3391         }
3392         mdt_thread_info_fini(info);
3393         RETURN(rc);
3394 }
3395
3396 static void mdt_deregister_seq_exp(struct mdt_device *mdt)
3397 {
3398         struct seq_server_site  *ss = mdt_seq_site(mdt);
3399
3400         if (ss->ss_node_id == 0)
3401                 return;
3402
3403         if (ss->ss_client_seq != NULL) {
3404                 lustre_deregister_lwp_item(&ss->ss_client_seq->lcs_exp);
3405                 ss->ss_client_seq->lcs_exp = NULL;
3406         }
3407
3408         if (ss->ss_server_fld != NULL) {
3409                 lustre_deregister_lwp_item(&ss->ss_server_fld->lsf_control_exp);
3410                 ss->ss_server_fld->lsf_control_exp = NULL;
3411         }
3412 }
3413
3414 static void mdt_seq_fini_cli(struct mdt_device *mdt)
3415 {
3416         struct seq_server_site *ss = mdt_seq_site(mdt);
3417
3418         if (ss == NULL)
3419                 return;
3420
3421         if (ss->ss_server_seq == NULL)
3422                 seq_server_set_cli(NULL, ss->ss_server_seq, NULL);
3423
3424         return;
3425 }
3426
3427 static int mdt_seq_fini(const struct lu_env *env, struct mdt_device *mdt)
3428 {
3429         mdt_seq_fini_cli(mdt);
3430         mdt_deregister_seq_exp(mdt);
3431
3432         return seq_site_fini(env, mdt_seq_site(mdt));
3433 }
3434
3435 /**
3436  * It will retrieve its FLDB entries from MDT0, and it only happens
3437  * when upgrading existent FS to 2.6 or when local FLDB is corrupted,
3438  * and it needs to refresh FLDB from the MDT0.
3439  **/
3440 static int mdt_register_lwp_callback(void *data)
3441 {
3442         struct lu_env           env;
3443         struct mdt_device       *mdt = data;
3444         struct lu_server_fld    *fld = mdt_seq_site(mdt)->ss_server_fld;
3445         int                     rc;
3446         ENTRY;
3447
3448         LASSERT(mdt_seq_site(mdt)->ss_node_id != 0);
3449
3450         if (!likely(fld->lsf_new))
3451                 RETURN(0);
3452
3453         rc = lu_env_init(&env, LCT_MD_THREAD);
3454         if (rc) {
3455                 CERROR("%s: cannot init env: rc = %d\n", mdt_obd_name(mdt), rc);
3456                 RETURN(rc);
3457         }
3458
3459         rc = fld_update_from_controller(&env, fld);
3460         if (rc != 0) {
3461                 CERROR("%s: cannot update controller: rc = %d\n",
3462                        mdt_obd_name(mdt), rc);
3463                 GOTO(out, rc);
3464         }
3465 out:
3466         lu_env_fini(&env);
3467         RETURN(rc);
3468 }
3469
3470 static int mdt_register_seq_exp(struct mdt_device *mdt)
3471 {
3472         struct seq_server_site  *ss = mdt_seq_site(mdt);
3473         char                    *lwp_name = NULL;
3474         int                     rc;
3475
3476         if (ss->ss_node_id == 0)
3477                 return 0;
3478
3479         OBD_ALLOC(lwp_name, MAX_OBD_NAME);
3480         if (lwp_name == NULL)
3481                 GOTO(out_free, rc = -ENOMEM);
3482
3483         rc = tgt_name2lwpname(mdt_obd_name(mdt), lwp_name);
3484         if (rc != 0)
3485                 GOTO(out_free, rc);
3486
3487         rc = lustre_register_lwp_item(lwp_name, &ss->ss_client_seq->lcs_exp,
3488                                       NULL, NULL);
3489         if (rc != 0)
3490                 GOTO(out_free, rc);
3491
3492         rc = lustre_register_lwp_item(lwp_name,
3493                                       &ss->ss_server_fld->lsf_control_exp,
3494                                       mdt_register_lwp_callback, mdt);
3495         if (rc != 0) {
3496                 lustre_deregister_lwp_item(&ss->ss_client_seq->lcs_exp);
3497                 ss->ss_client_seq->lcs_exp = NULL;
3498                 GOTO(out_free, rc);
3499         }
3500 out_free:
3501         if (lwp_name != NULL)
3502                 OBD_FREE(lwp_name, MAX_OBD_NAME);
3503
3504         return rc;
3505 }
3506
3507 /*
3508  * Init client sequence manager which is used by local MDS to talk to sequence
3509  * controller on remote node.
3510  */
3511 static int mdt_seq_init_cli(const struct lu_env *env, struct mdt_device *mdt)
3512 {
3513         struct seq_server_site  *ss = mdt_seq_site(mdt);
3514         int                     rc;
3515         char                    *prefix;
3516         ENTRY;
3517
3518         /* check if this is adding the first MDC and controller is not yet
3519          * initialized. */
3520         OBD_ALLOC_PTR(ss->ss_client_seq);
3521         if (ss->ss_client_seq == NULL)
3522                 RETURN(-ENOMEM);
3523
3524         OBD_ALLOC(prefix, MAX_OBD_NAME + 5);
3525         if (prefix == NULL) {
3526                 OBD_FREE_PTR(ss->ss_client_seq);
3527                 ss->ss_client_seq = NULL;
3528                 RETURN(-ENOMEM);
3529         }
3530
3531         /* Note: seq_client_fini will be called in seq_site_fini */
3532         snprintf(prefix, MAX_OBD_NAME + 5, "ctl-%s", mdt_obd_name(mdt));
3533         rc = seq_client_init(ss->ss_client_seq, NULL, LUSTRE_SEQ_METADATA,
3534                              prefix, ss->ss_node_id == 0 ?  ss->ss_control_seq :
3535                                                             NULL);
3536         OBD_FREE(prefix, MAX_OBD_NAME + 5);
3537         if (rc != 0) {
3538                 OBD_FREE_PTR(ss->ss_client_seq);
3539                 ss->ss_client_seq = NULL;
3540                 RETURN(rc);
3541         }
3542
3543         rc = seq_server_set_cli(env, ss->ss_server_seq, ss->ss_client_seq);
3544
3545         RETURN(rc);
3546 }
3547
3548 static int mdt_seq_init(const struct lu_env *env, struct mdt_device *mdt)
3549 {
3550         struct seq_server_site  *ss;
3551         int                     rc;
3552         ENTRY;
3553
3554         ss = mdt_seq_site(mdt);
3555         /* init sequence controller server(MDT0) */
3556         if (ss->ss_node_id == 0) {
3557                 OBD_ALLOC_PTR(ss->ss_control_seq);
3558                 if (ss->ss_control_seq == NULL)
3559                         RETURN(-ENOMEM);
3560
3561                 rc = seq_server_init(env, ss->ss_control_seq, mdt->mdt_bottom,
3562                                      mdt_obd_name(mdt), LUSTRE_SEQ_CONTROLLER,
3563                                      ss);
3564                 if (rc)
3565                         GOTO(out_seq_fini, rc);
3566         }
3567
3568         /* Init normal sequence server */
3569         OBD_ALLOC_PTR(ss->ss_server_seq);
3570         if (ss->ss_server_seq == NULL)
3571                 GOTO(out_seq_fini, rc = -ENOMEM);
3572
3573         rc = seq_server_init(env, ss->ss_server_seq, mdt->mdt_bottom,
3574                              mdt_obd_name(mdt), LUSTRE_SEQ_SERVER, ss);
3575         if (rc)
3576                 GOTO(out_seq_fini, rc);
3577
3578         /* init seq client for seq server to talk to seq controller(MDT0) */
3579         rc = mdt_seq_init_cli(env, mdt);
3580         if (rc != 0)
3581                 GOTO(out_seq_fini, rc);
3582
3583         if (ss->ss_node_id != 0)
3584                 /* register controler export through lwp */
3585                 rc = mdt_register_seq_exp(mdt);
3586
3587         EXIT;
3588 out_seq_fini:
3589         if (rc)
3590                 mdt_seq_fini(env, mdt);
3591
3592         return rc;
3593 }
3594
3595 /*
3596  * FLD wrappers
3597  */
3598 static int mdt_fld_fini(const struct lu_env *env,
3599                         struct mdt_device *m)
3600 {
3601         struct seq_server_site *ss = mdt_seq_site(m);
3602         ENTRY;
3603
3604         if (ss && ss->ss_server_fld) {
3605                 fld_server_fini(env, ss->ss_server_fld);
3606                 OBD_FREE_PTR(ss->ss_server_fld);
3607                 ss->ss_server_fld = NULL;
3608         }
3609
3610         RETURN(0);
3611 }
3612
3613 static int mdt_fld_init(const struct lu_env *env,
3614                         const char *uuid,
3615                         struct mdt_device *m)
3616 {
3617         struct seq_server_site *ss;
3618         int rc;
3619         ENTRY;
3620
3621         ss = mdt_seq_site(m);
3622
3623         OBD_ALLOC_PTR(ss->ss_server_fld);
3624         if (ss->ss_server_fld == NULL)
3625                 RETURN(rc = -ENOMEM);
3626
3627         rc = fld_server_init(env, ss->ss_server_fld, m->mdt_bottom, uuid,
3628                              LU_SEQ_RANGE_MDT);
3629         if (rc) {
3630                 OBD_FREE_PTR(ss->ss_server_fld);
3631                 ss->ss_server_fld = NULL;
3632                 RETURN(rc);
3633         }
3634
3635         RETURN(0);
3636 }
3637
3638 static void mdt_stack_pre_fini(const struct lu_env *env,
3639                            struct mdt_device *m, struct lu_device *top)
3640 {
3641         struct obd_device       *obd;
3642         struct lustre_cfg_bufs  *bufs;
3643         struct lustre_cfg       *lcfg;
3644         struct mdt_thread_info  *info;
3645         ENTRY;
3646
3647         LASSERT(top);
3648
3649         info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
3650         LASSERT(info != NULL);
3651
3652         bufs = &info->mti_u.bufs;
3653
3654         LASSERT(m->mdt_child_exp);
3655         LASSERT(m->mdt_child_exp->exp_obd);
3656         obd = m->mdt_child_exp->exp_obd;
3657
3658         /* process cleanup, pass mdt obd name to get obd umount flags */
3659         /* XXX: this is needed because all layers are referenced by
3660          * objects (some of them are pinned by osd, for example *
3661          * the proper solution should be a model where object used
3662          * by osd only doesn't have mdt/mdd slices -bzzz */
3663         lustre_cfg_bufs_reset(bufs, mdt_obd_name(m));
3664         lustre_cfg_bufs_set_string(bufs, 1, NULL);
3665         lcfg = lustre_cfg_new(LCFG_PRE_CLEANUP, bufs);
3666         if (!lcfg) {
3667                 CERROR("%s:Cannot alloc lcfg!\n", mdt_obd_name(m));
3668                 return;
3669         }
3670         top->ld_ops->ldo_process_config(env, top, lcfg);
3671         lustre_cfg_free(lcfg);
3672         EXIT;
3673 }
3674
3675 static void mdt_stack_fini(const struct lu_env *env,
3676                            struct mdt_device *m, struct lu_device *top)
3677 {
3678         struct obd_device       *obd = mdt2obd_dev(m);
3679         struct lustre_cfg_bufs  *bufs;
3680         struct lustre_cfg       *lcfg;
3681         struct mdt_thread_info  *info;
3682         char                     flags[3] = "";
3683         ENTRY;
3684
3685         info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
3686         LASSERT(info != NULL);
3687
3688         lu_dev_del_linkage(top->ld_site, top);
3689
3690         lu_site_purge(env, top->ld_site, -1);
3691
3692         bufs = &info->mti_u.bufs;
3693         /* process cleanup, pass mdt obd name to get obd umount flags */
3694         /* another purpose is to let all layers to release their objects */
3695         lustre_cfg_bufs_reset(bufs, mdt_obd_name(m));
3696         if (obd->obd_force)
3697                 strcat(flags, "F");
3698         if (obd->obd_fail)
3699                 strcat(flags, "A");
3700         lustre_cfg_bufs_set_string(bufs, 1, flags);
3701         lcfg = lustre_cfg_new(LCFG_CLEANUP, bufs);
3702         if (!lcfg) {
3703                 CERROR("Cannot alloc lcfg!\n");
3704                 return;
3705         }
3706         LASSERT(top);
3707         top->ld_ops->ldo_process_config(env, top, lcfg);
3708         lustre_cfg_free(lcfg);
3709
3710         lu_site_purge(env, top->ld_site, -1);
3711
3712         m->mdt_child = NULL;
3713         m->mdt_bottom = NULL;
3714
3715         obd_disconnect(m->mdt_child_exp);
3716         m->mdt_child_exp = NULL;
3717
3718         obd_disconnect(m->mdt_bottom_exp);
3719         m->mdt_child_exp = NULL;
3720 }
3721
3722 static int mdt_connect_to_next(const struct lu_env *env, struct mdt_device *m,
3723                                const char *next, struct obd_export **exp)
3724 {
3725         struct obd_connect_data *data = NULL;
3726         struct obd_device       *obd;
3727         int                      rc;
3728         ENTRY;
3729
3730         OBD_ALLOC_PTR(data);
3731         if (data == NULL)
3732                 GOTO(out, rc = -ENOMEM);
3733
3734         obd = class_name2obd(next);
3735         if (obd == NULL) {
3736                 CERROR("%s: can't locate next device: %s\n",
3737                        mdt_obd_name(m), next);
3738                 GOTO(out, rc = -ENOTCONN);
3739         }
3740
3741         data->ocd_connect_flags = OBD_CONNECT_VERSION;
3742         data->ocd_version = LUSTRE_VERSION_CODE;
3743
3744         rc = obd_connect(NULL, exp, obd, &obd->obd_uuid, data, NULL);
3745         if (rc) {
3746                 CERROR("%s: cannot connect to next dev %s (%d)\n",
3747                        mdt_obd_name(m), next, rc);
3748                 GOTO(out, rc);
3749         }
3750
3751 out:
3752         if (data)
3753                 OBD_FREE_PTR(data);
3754         RETURN(rc);
3755 }
3756
3757 static int mdt_stack_init(const struct lu_env *env, struct mdt_device *mdt,
3758                           struct lustre_cfg *cfg)
3759 {
3760         char                   *dev = lustre_cfg_string(cfg, 0);
3761         int                     rc, name_size, uuid_size;
3762         char                   *name, *uuid, *p;
3763         struct lustre_cfg_bufs *bufs;
3764         struct lustre_cfg      *lcfg;
3765         struct obd_device      *obd;
3766         struct lustre_profile  *lprof;
3767         struct lu_site         *site;
3768         ENTRY;
3769
3770         /* in 1.8 we had the only device in the stack - MDS.
3771          * 2.0 introduces MDT, MDD, OSD; MDT starts others internally.
3772          * in 2.3 OSD is instantiated by obd_mount.c, so we need
3773          * to generate names and setup MDT, MDD. MDT will be using
3774          * generated name to connect to MDD. for MDD the next device
3775          * will be LOD with name taken from so called "profile" which
3776          * is generated by mount_option line
3777          *
3778          * 1.8 MGS generates config. commands like this:
3779          *   #06 (104)mount_option 0:  1:lustre-MDT0000  2:lustre-mdtlov
3780          *   #08 (120)setup   0:lustre-MDT0000  1:dev 2:type 3:lustre-MDT0000
3781          * 2.0 MGS generates config. commands like this:
3782          *   #07 (112)mount_option 0:  1:lustre-MDT0000  2:lustre-MDT0000-mdtlov
3783          *   #08 (160)setup   0:lustre-MDT0000  1:lustre-MDT0000_UUID  2:0
3784          *                    3:lustre-MDT0000-mdtlov  4:f
3785          *
3786          * we generate MDD name from MDT one, just replacing T with D
3787          *
3788          * after all the preparations, the logical equivalent will be
3789          *   #01 (160)setup   0:lustre-MDD0000  1:lustre-MDD0000_UUID  2:0
3790          *                    3:lustre-MDT0000-mdtlov  4:f
3791          *   #02 (160)setup   0:lustre-MDT0000  1:lustre-MDT0000_UUID  2:0
3792          *                    3:lustre-MDD0000  4:f
3793          *
3794          *  notice we build the stack from down to top: MDD first, then MDT */
3795
3796         name_size = MAX_OBD_NAME;
3797         uuid_size = MAX_OBD_NAME;
3798
3799         OBD_ALLOC(name, name_size);
3800         OBD_ALLOC(uuid, uuid_size);
3801         if (name == NULL || uuid == NULL)
3802                 GOTO(cleanup_mem, rc = -ENOMEM);
3803
3804         OBD_ALLOC_PTR(bufs);
3805         if (!bufs)
3806                 GOTO(cleanup_mem, rc = -ENOMEM);
3807
3808         strcpy(name, dev);
3809         p = strstr(name, "-MDT");
3810         if (p == NULL)
3811                 GOTO(free_bufs, rc = -ENOMEM);
3812         p[3] = 'D';
3813
3814         snprintf(uuid, MAX_OBD_NAME, "%s_UUID", name);
3815
3816         lprof = class_get_profile(lustre_cfg_string(cfg, 0));
3817         if (lprof == NULL || lprof->lp_dt == NULL) {
3818                 CERROR("can't find the profile: %s\n",
3819                        lustre_cfg_string(cfg, 0));
3820                 GOTO(free_bufs, rc = -EINVAL);
3821         }
3822
3823         lustre_cfg_bufs_reset(bufs, name);
3824         lustre_cfg_bufs_set_string(bufs, 1, LUSTRE_MDD_NAME);
3825         lustre_cfg_bufs_set_string(bufs, 2, uuid);
3826         lustre_cfg_bufs_set_string(bufs, 3, lprof->lp_dt);
3827
3828         lcfg = lustre_cfg_new(LCFG_ATTACH, bufs);
3829         if (!lcfg)
3830                 GOTO(free_bufs, rc = -ENOMEM);
3831
3832         rc = class_attach(lcfg);
3833         if (rc)
3834                 GOTO(lcfg_cleanup, rc);
3835
3836         obd = class_name2obd(name);
3837         if (!obd) {
3838                 CERROR("Can not find obd %s (%s in config)\n",
3839                        MDD_OBD_NAME, lustre_cfg_string(cfg, 0));
3840                 GOTO(class_detach, rc = -EINVAL);
3841         }
3842
3843         lustre_cfg_free(lcfg);
3844
3845         lustre_cfg_bufs_reset(bufs, name);
3846         lustre_cfg_bufs_set_string(bufs, 1, uuid);
3847         lustre_cfg_bufs_set_string(bufs, 2, dev);
3848         lustre_cfg_bufs_set_string(bufs, 3, lprof->lp_dt);
3849
3850         lcfg = lustre_cfg_new(LCFG_SETUP, bufs);
3851
3852         rc = class_setup(obd, lcfg);
3853         if (rc)
3854                 GOTO(class_detach, rc);
3855
3856         /* connect to MDD we just setup */
3857         rc = mdt_connect_to_next(env, mdt, name, &mdt->mdt_child_exp);
3858         if (rc)
3859                 GOTO(class_detach, rc);
3860
3861         site = mdt->mdt_child_exp->exp_obd->obd_lu_dev->ld_site;
3862         LASSERT(site);
3863         LASSERT(mdt_lu_site(mdt) == NULL);
3864         mdt->mdt_lu_dev.ld_site = site;
3865         site->ls_top_dev = &mdt->mdt_lu_dev;
3866         mdt->mdt_child = lu2md_dev(mdt->mdt_child_exp->exp_obd->obd_lu_dev);
3867
3868
3869         /* now connect to bottom OSD */
3870         snprintf(name, MAX_OBD_NAME, "%s-osd", dev);
3871         rc = mdt_connect_to_next(env, mdt, name, &mdt->mdt_bottom_exp);
3872         if (rc)
3873                 GOTO(class_detach, rc);
3874         mdt->mdt_bottom =
3875                 lu2dt_dev(mdt->mdt_bottom_exp->exp_obd->obd_lu_dev);
3876
3877
3878         rc = lu_env_refill((struct lu_env *)env);
3879         if (rc != 0)
3880                 CERROR("Failure to refill session: '%d'\n", rc);
3881
3882         lu_dev_add_linkage(site, &mdt->mdt_lu_dev);
3883
3884         EXIT;
3885 class_detach:
3886         if (rc)
3887                 class_detach(obd, lcfg);
3888 lcfg_cleanup:
3889         lustre_cfg_free(lcfg);
3890 free_bufs:
3891         OBD_FREE_PTR(bufs);
3892 cleanup_mem:
3893         if (name)
3894                 OBD_FREE(name, name_size);
3895         if (uuid)
3896                 OBD_FREE(uuid, uuid_size);
3897         RETURN(rc);
3898 }
3899
3900 /* setup quota master target on MDT0 */
3901 static int mdt_quota_init(const struct lu_env *env, struct mdt_device *mdt,
3902                           struct lustre_cfg *cfg)
3903 {
3904         struct obd_device       *obd;
3905         char                    *dev = lustre_cfg_string(cfg, 0);
3906         char                    *qmtname, *uuid, *p;
3907         struct lustre_cfg_bufs  *bufs;
3908         struct lustre_cfg       *lcfg;
3909         struct lustre_profile   *lprof;
3910         struct obd_connect_data *data;
3911         int                      rc;
3912         ENTRY;
3913
3914         LASSERT(mdt->mdt_qmt_exp == NULL);
3915         LASSERT(mdt->mdt_qmt_dev == NULL);
3916
3917         /* quota master is on MDT0 only for now */
3918         if (mdt->mdt_seq_site.ss_node_id != 0)
3919                 RETURN(0);
3920
3921         /* MGS generates config commands which look as follows:
3922          *   #01 (160)setup   0:lustre-MDT0000  1:lustre-MDT0000_UUID  2:0
3923          *                    3:lustre-MDT0000-mdtlov  4:f
3924          *
3925          * We generate the QMT name from the MDT one, just replacing MD with QM
3926          * after all the preparations, the logical equivalent will be:
3927          *   #01 (160)setup   0:lustre-QMT0000  1:lustre-QMT0000_UUID  2:0
3928          *                    3:lustre-MDT0000-osd  4:f */
3929         OBD_ALLOC(qmtname, MAX_OBD_NAME);
3930         OBD_ALLOC(uuid, UUID_MAX);
3931         OBD_ALLOC_PTR(bufs);
3932         OBD_ALLOC_PTR(data);
3933         if (qmtname == NULL || uuid == NULL || bufs == NULL || data == NULL)
3934                 GOTO(cleanup_mem, rc = -ENOMEM);
3935
3936         strcpy(qmtname, dev);
3937         p = strstr(qmtname, "-MDT");
3938         if (p == NULL)
3939                 GOTO(cleanup_mem, rc = -ENOMEM);
3940         /* replace MD with QM */
3941         p[1] = 'Q';
3942         p[2] = 'M';
3943
3944         snprintf(uuid, UUID_MAX, "%s_UUID", qmtname);
3945
3946         lprof = class_get_profile(lustre_cfg_string(cfg, 0));
3947         if (lprof == NULL || lprof->lp_dt == NULL) {
3948                 CERROR("can't find profile for %s\n",
3949                        lustre_cfg_string(cfg, 0));
3950                 GOTO(cleanup_mem, rc = -EINVAL);
3951         }
3952
3953         lustre_cfg_bufs_reset(bufs, qmtname);
3954         lustre_cfg_bufs_set_string(bufs, 1, LUSTRE_QMT_NAME);
3955         lustre_cfg_bufs_set_string(bufs, 2, uuid);
3956         lustre_cfg_bufs_set_string(bufs, 3, lprof->lp_dt);
3957
3958         lcfg = lustre_cfg_new(LCFG_ATTACH, bufs);
3959         if (!lcfg)
3960                 GOTO(cleanup_mem, rc = -ENOMEM);
3961
3962         rc = class_attach(lcfg);
3963         if (rc)
3964                 GOTO(lcfg_cleanup, rc);
3965
3966         obd = class_name2obd(qmtname);
3967         if (!obd) {
3968                 CERROR("Can not find obd %s (%s in config)\n", qmtname,
3969                        lustre_cfg_string(cfg, 0));
3970                 GOTO(class_detach, rc = -EINVAL);
3971         }
3972
3973         lustre_cfg_free(lcfg);
3974
3975         lustre_cfg_bufs_reset(bufs, qmtname);
3976         lustre_cfg_bufs_set_string(bufs, 1, uuid);
3977         lustre_cfg_bufs_set_string(bufs, 2, dev);
3978
3979         /* for quota, the next device should be the OSD device */
3980         lustre_cfg_bufs_set_string(bufs, 3,
3981                                    mdt->mdt_bottom->dd_lu_dev.ld_obd->obd_name);
3982
3983         lcfg = lustre_cfg_new(LCFG_SETUP, bufs);
3984
3985         rc = class_setup(obd, lcfg);
3986         if (rc)
3987                 GOTO(class_detach, rc);
3988
3989         mdt->mdt_qmt_dev = obd->obd_lu_dev;
3990
3991         /* configure local quota objects */
3992         rc = mdt->mdt_qmt_dev->ld_ops->ldo_prepare(env,
3993                                                    &mdt->mdt_lu_dev,
3994                                                    mdt->mdt_qmt_dev);
3995         if (rc)
3996                 GOTO(class_cleanup, rc);
3997
3998         /* connect to quota master target */
3999         data->ocd_connect_flags = OBD_CONNECT_VERSION;
4000         data->ocd_version = LUSTRE_VERSION_CODE;
4001         rc = obd_connect(NULL, &mdt->mdt_qmt_exp, obd, &obd->obd_uuid,
4002                          data, NULL);
4003         if (rc) {
4004                 CERROR("cannot connect to quota master device %s (%d)\n",
4005                        qmtname, rc);
4006                 GOTO(class_cleanup, rc);
4007         }
4008
4009         EXIT;
4010 class_cleanup:
4011         if (rc) {
4012                 class_manual_cleanup(obd);
4013                 mdt->mdt_qmt_dev = NULL;
4014         }
4015 class_detach:
4016         if (rc)
4017                 class_detach(obd, lcfg);
4018 lcfg_cleanup:
4019         lustre_cfg_free(lcfg);
4020 cleanup_mem:
4021         if (bufs)
4022                 OBD_FREE_PTR(bufs);
4023         if (qmtname)
4024                 OBD_FREE(qmtname, MAX_OBD_NAME);
4025         if (uuid)
4026                 OBD_FREE(uuid, UUID_MAX);
4027         if (data)
4028                 OBD_FREE_PTR(data);
4029         return rc;
4030 }
4031
4032 /* Shutdown quota master target associated with mdt */
4033 static void mdt_quota_fini(const struct lu_env *env, struct mdt_device *mdt)
4034 {
4035         ENTRY;
4036
4037         if (mdt->mdt_qmt_exp == NULL)
4038                 RETURN_EXIT;
4039         LASSERT(mdt->mdt_qmt_dev != NULL);
4040
4041         /* the qmt automatically shuts down when the mdt disconnects */
4042         obd_disconnect(mdt->mdt_qmt_exp);
4043         mdt->mdt_qmt_exp = NULL;
4044         mdt->mdt_qmt_dev = NULL;
4045         EXIT;
4046 }
4047
4048 /* mdt_getxattr() is used from mdt_intent_getxattr(), use this wrapper
4049  * for now. This will be removed along with converting rest of MDT code
4050  * to use tgt_session_info */
4051 int mdt_tgt_getxattr(struct tgt_session_info *tsi)
4052 {
4053         struct mdt_thread_info  *info = tsi2mdt_info(tsi);
4054         int                      rc;
4055
4056         rc = mdt_getxattr(info);
4057
4058         mdt_thread_info_fini(info);
4059         return rc;
4060 }
4061
4062 static struct tgt_handler mdt_tgt_handlers[] = {
4063 TGT_RPC_HANDLER(MDS_FIRST_OPC,
4064                 0,                      MDS_CONNECT,    mdt_tgt_connect,
4065                 &RQF_CONNECT, LUSTRE_OBD_VERSION),
4066 TGT_RPC_HANDLER(MDS_FIRST_OPC,
4067                 0,                      MDS_DISCONNECT, tgt_disconnect,
4068                 &RQF_MDS_DISCONNECT, LUSTRE_OBD_VERSION),
4069 TGT_RPC_HANDLER(MDS_FIRST_OPC,
4070                 HABEO_REFERO,           MDS_SET_INFO,   mdt_set_info,
4071                 &RQF_OBD_SET_INFO, LUSTRE_MDS_VERSION),
4072 TGT_MDT_HDL(0,                          MDS_GET_INFO,   mdt_get_info),
4073 TGT_MDT_HDL(0           | HABEO_REFERO, MDS_GETSTATUS,  mdt_getstatus),
4074 TGT_MDT_HDL(HABEO_CORPUS,               MDS_GETATTR,    mdt_getattr),
4075 TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_GETATTR_NAME,
4076                                                         mdt_getattr_name),
4077 TGT_MDT_HDL(HABEO_CORPUS,               MDS_GETXATTR,   mdt_tgt_getxattr),
4078 TGT_MDT_HDL(0           | HABEO_REFERO, MDS_STATFS,     mdt_statfs),
4079 TGT_MDT_HDL(0           | MUTABOR,      MDS_REINT,      mdt_reint),
4080 TGT_MDT_HDL(HABEO_CORPUS,               MDS_CLOSE,      mdt_close),
4081 TGT_MDT_HDL(HABEO_CORPUS,               MDS_DONE_WRITING,
4082                                                         mdt_done_writing),
4083 TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_READPAGE,   mdt_readpage),
4084 TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_SYNC,       mdt_sync),
4085 TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_IS_SUBDIR,  mdt_is_subdir),
4086 TGT_MDT_HDL(0,                          MDS_QUOTACTL,   mdt_quotactl),
4087 TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO | MUTABOR, MDS_HSM_PROGRESS,
4088                                                         mdt_hsm_progress),
4089 TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO | MUTABOR, MDS_HSM_CT_REGISTER,
4090                                                         mdt_hsm_ct_register),
4091 TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO | MUTABOR, MDS_HSM_CT_UNREGISTER,
4092                                                         mdt_hsm_ct_unregister),
4093 TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_STATE_GET,
4094                                                         mdt_hsm_state_get),
4095 TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO | MUTABOR, MDS_HSM_STATE_SET,
4096                                                         mdt_hsm_state_set),
4097 TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_ACTION, mdt_hsm_action),
4098 TGT_MDT_HDL(HABEO_CORPUS| HABEO_REFERO, MDS_HSM_REQUEST,
4099                                                         mdt_hsm_request),
4100 TGT_MDT_HDL(HABEO_CORPUS|HABEO_REFERO | MUTABOR, MDS_SWAP_LAYOUTS,
4101                                                         mdt_swap_layouts)
4102 };
4103
4104 static struct tgt_handler mdt_sec_ctx_ops[] = {
4105 TGT_SEC_HDL_VAR(0,                      SEC_CTX_INIT,     mdt_sec_ctx_handle),
4106 TGT_SEC_HDL_VAR(0,                      SEC_CTX_INIT_CONT,mdt_sec_ctx_handle),
4107 TGT_SEC_HDL_VAR(0,                      SEC_CTX_FINI,     mdt_sec_ctx_handle)
4108 };
4109
4110 static struct tgt_handler mdt_quota_ops[] = {
4111 TGT_QUOTA_HDL(HABEO_REFERO,             QUOTA_DQACQ,      mdt_quota_dqacq),
4112 };
4113
4114 static struct tgt_opc_slice mdt_common_slice[] = {
4115         {
4116                 .tos_opc_start  = MDS_FIRST_OPC,
4117                 .tos_opc_end    = MDS_LAST_OPC,
4118                 .tos_hs         = mdt_tgt_handlers
4119         },
4120         {
4121                 .tos_opc_start  = OBD_FIRST_OPC,
4122                 .tos_opc_end    = OBD_LAST_OPC,
4123                 .tos_hs         = tgt_obd_handlers
4124         },
4125         {
4126                 .tos_opc_start  = LDLM_FIRST_OPC,
4127                 .tos_opc_end    = LDLM_LAST_OPC,
4128                 .tos_hs         = tgt_dlm_handlers
4129         },
4130         {
4131                 .tos_opc_start  = SEC_FIRST_OPC,
4132                 .tos_opc_end    = SEC_LAST_OPC,
4133                 .tos_hs         = mdt_sec_ctx_ops
4134         },
4135         {
4136                 .tos_opc_start  = UPDATE_OBJ,
4137                 .tos_opc_end    = UPDATE_LAST_OPC,
4138                 .tos_hs         = tgt_out_handlers
4139         },
4140         {
4141                 .tos_opc_start  = FLD_FIRST_OPC,
4142                 .tos_opc_end    = FLD_LAST_OPC,
4143                 .tos_hs         = fld_handlers
4144         },
4145         {
4146                 .tos_opc_start  = SEQ_FIRST_OPC,
4147                 .tos_opc_end    = SEQ_LAST_OPC,
4148                 .tos_hs         = seq_handlers
4149         },
4150         {
4151                 .tos_opc_start  = QUOTA_DQACQ,
4152                 .tos_opc_end    = QUOTA_LAST_OPC,
4153                 .tos_hs         = mdt_quota_ops
4154         },
4155         {
4156                 .tos_opc_start  = LLOG_FIRST_OPC,
4157                 .tos_opc_end    = LLOG_LAST_OPC,
4158                 .tos_hs         = tgt_llog_handlers
4159         },
4160
4161         {
4162                 .tos_hs         = NULL
4163         }
4164 };
4165
4166 static void mdt_fini(const struct lu_env *env, struct mdt_device *m)
4167 {
4168         struct md_device  *next = m->mdt_child;
4169         struct lu_device  *d    = &m->mdt_lu_dev;
4170         struct obd_device *obd = mdt2obd_dev(m);
4171         ENTRY;
4172
4173         target_recovery_fini(obd);
4174
4175         ping_evictor_stop();
4176
4177         mdt_stack_pre_fini(env, m, md2lu_dev(m->mdt_child));
4178
4179         if (m->mdt_opts.mo_coordinator)
4180                 mdt_hsm_cdt_stop(m);
4181
4182         mdt_hsm_cdt_fini(m);
4183
4184         mdt_llog_ctxt_unclone(env, m, LLOG_AGENT_ORIG_CTXT);
4185         mdt_llog_ctxt_unclone(env, m, LLOG_CHANGELOG_ORIG_CTXT);
4186         obd_exports_barrier(obd);
4187         obd_zombie_barrier();
4188
4189         mdt_procfs_fini(m);
4190
4191         tgt_fini(env, &m->mdt_lut);
4192         mdt_fs_cleanup(env, m);
4193         upcall_cache_cleanup(m->mdt_identity_cache);
4194         m->mdt_identity_cache = NULL;
4195
4196         if (m->mdt_namespace != NULL) {
4197                 ldlm_namespace_free(m->mdt_namespace, NULL,
4198                                     d->ld_obd->obd_force);
4199                 d->ld_obd->obd_namespace = m->mdt_namespace = NULL;
4200         }
4201
4202         mdt_quota_fini(env, m);
4203
4204         cfs_free_nidlist(&m->mdt_nosquash_nids);
4205         if (m->mdt_nosquash_str) {
4206                 OBD_FREE(m->mdt_nosquash_str, m->mdt_nosquash_strlen);
4207                 m->mdt_nosquash_str = NULL;
4208                 m->mdt_nosquash_strlen = 0;
4209         }
4210
4211         next->md_ops->mdo_iocontrol(env, next, OBD_IOC_PAUSE_LFSCK, 0, NULL);
4212
4213         mdt_seq_fini(env, m);
4214         mdt_fld_fini(env, m);
4215
4216         next->md_ops->mdo_init_capa_ctxt(env, next, 0, 0, 0, NULL);
4217         cfs_timer_disarm(&m->mdt_ck_timer);
4218         mdt_ck_thread_stop(m);
4219
4220         /*
4221          * Finish the stack
4222          */
4223         mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
4224
4225         LASSERT(cfs_atomic_read(&d->ld_ref) == 0);
4226
4227         server_put_mount(mdt_obd_name(m));
4228
4229         EXIT;
4230 }
4231
4232 int mdt_postrecov(const struct lu_env *, struct mdt_device *);
4233
4234 static int mdt_init0(const struct lu_env *env, struct mdt_device *m,
4235                      struct lu_device_type *ldt, struct lustre_cfg *cfg)
4236 {
4237         struct mdt_thread_info    *info;
4238         struct obd_device         *obd;
4239         const char                *dev = lustre_cfg_string(cfg, 0);
4240         const char                *num = lustre_cfg_string(cfg, 2);
4241         struct lustre_mount_info  *lmi = NULL;
4242         struct lustre_sb_info     *lsi;
4243         struct lu_site            *s;
4244         struct seq_server_site    *ss_site;
4245         const char                *identity_upcall = "NONE";
4246         struct md_device          *next;
4247         int                        rc;
4248         long                       node_id;
4249         mntopt_t                   mntopts;
4250         ENTRY;
4251
4252         lu_device_init(&m->mdt_lu_dev, ldt);
4253         /*
4254          * Environment (env) might be missing mdt_thread_key values at that
4255          * point, if device is allocated when mdt_thread_key is in QUIESCENT
4256          * mode.
4257          *
4258          * Usually device allocation path doesn't use module key values, but
4259          * mdt has to do a lot of work here, so allocate key value.
4260          */
4261         rc = lu_env_refill((struct lu_env *)env);
4262         if (rc != 0)
4263                 RETURN(rc);
4264
4265         info = lu_context_key_get(&env->le_ctx, &mdt_thread_key);
4266         LASSERT(info != NULL);
4267
4268         obd = class_name2obd(dev);
4269         LASSERT(obd != NULL);
4270
4271         m->mdt_max_mdsize = MAX_MD_SIZE; /* 4 stripes */
4272
4273         m->mdt_som_conf = 0;
4274
4275         m->mdt_opts.mo_cos = MDT_COS_DEFAULT;
4276
4277         /* default is coordinator off, it is started through conf_param
4278          * or /proc */
4279         m->mdt_opts.mo_coordinator = 0;
4280
4281         lmi = server_get_mount(dev);
4282         if (lmi == NULL) {
4283                 CERROR("Cannot get mount info for %s!\n", dev);
4284                 RETURN(-EFAULT);
4285         } else {
4286                 lsi = s2lsi(lmi->lmi_sb);
4287                 /* CMD is supported only in IAM mode */
4288                 LASSERT(num);
4289                 node_id = simple_strtol(num, NULL, 10);
4290                 obd->u.obt.obt_magic = OBT_MAGIC;
4291         }
4292
4293         spin_lock_init(&m->mdt_ioepoch_lock);
4294         m->mdt_capa_timeout = CAPA_TIMEOUT;
4295         m->mdt_capa_alg = CAPA_HMAC_ALG_SHA1;
4296         m->mdt_ck_timeout = CAPA_KEY_TIMEOUT;
4297         m->mdt_squash_uid = 0;
4298         m->mdt_squash_gid = 0;
4299         CFS_INIT_LIST_HEAD(&m->mdt_nosquash_nids);
4300         m->mdt_nosquash_str = NULL;
4301         m->mdt_nosquash_strlen = 0;
4302         init_rwsem(&m->mdt_squash_sem);
4303         spin_lock_init(&m->mdt_osfs_lock);
4304         m->mdt_osfs_age = cfs_time_shift_64(-1000);
4305         m->mdt_enable_remote_dir = 0;
4306         m->mdt_enable_remote_dir_gid = 0;
4307
4308         m->mdt_lu_dev.ld_ops = &mdt_lu_ops;
4309         m->mdt_lu_dev.ld_obd = obd;
4310         /* Set this lu_device to obd for error handling purposes. */
4311         obd->obd_lu_dev = &m->mdt_lu_dev;
4312
4313         /* init the stack */
4314         rc = mdt_stack_init((struct lu_env *)env, m, cfg);
4315         if (rc) {
4316                 CERROR("%s: Can't init device stack, rc %d\n",
4317                        mdt_obd_name(m), rc);
4318                 GOTO(err_lmi, rc);
4319         }
4320
4321         s = mdt_lu_site(m);
4322         ss_site = mdt_seq_site(m);
4323         s->ld_seq_site = ss_site;
4324         ss_site->ss_lu = s;
4325
4326         /* set server index */
4327         ss_site->ss_node_id = node_id;
4328
4329         /* failover is the default
4330          * FIXME: we do not failout mds0/mgs, which may cause some problems.
4331          * assumed whose ss_node_id == 0 XXX
4332          * */
4333         obd->obd_replayable = 1;
4334         /* No connection accepted until configurations will finish */
4335         obd->obd_no_conn = 1;
4336
4337         if (cfg->lcfg_bufcount > 4 && LUSTRE_CFG_BUFLEN(cfg, 4) > 0) {
4338                 char *str = lustre_cfg_string(cfg, 4);
4339                 if (strchr(str, 'n')) {
4340                         CWARN("%s: recovery disabled\n", mdt_obd_name(m));
4341                         obd->obd_replayable = 0;
4342                 }
4343         }
4344
4345         rc = mdt_fld_init(env, mdt_obd_name(m), m);
4346         if (rc)
4347                 GOTO(err_fini_stack, rc);
4348
4349         rc = mdt_seq_init(env, m);
4350         if (rc)
4351                 GOTO(err_fini_fld, rc);
4352
4353         snprintf(info->mti_u.ns_name, sizeof(info->mti_u.ns_name), "%s-%s",
4354                  LUSTRE_MDT_NAME, obd->obd_uuid.uuid);
4355         m->mdt_namespace = ldlm_namespace_new(obd, info->mti_u.ns_name,
4356                                               LDLM_NAMESPACE_SERVER,
4357                                               LDLM_NAMESPACE_GREEDY,
4358                                               LDLM_NS_TYPE_MDT);
4359         if (m->mdt_namespace == NULL)
4360                 GOTO(err_fini_seq, rc = -ENOMEM);
4361
4362         m->mdt_namespace->ns_lvbp = m;
4363         m->mdt_namespace->ns_lvbo = &mdt_lvbo;
4364
4365         ldlm_register_intent(m->mdt_namespace, mdt_intent_policy);
4366         /* set obd_namespace for compatibility with old code */
4367         obd->obd_namespace = m->mdt_namespace;
4368
4369         cfs_timer_init(&m->mdt_ck_timer, mdt_ck_timer_callback, m);
4370
4371         rc = mdt_hsm_cdt_init(m);
4372         if (rc != 0) {
4373                 CERROR("%s: error initializing coordinator, rc %d\n",
4374                        mdt_obd_name(m), rc);
4375                 GOTO(err_free_ns, rc);
4376         }
4377
4378         rc = mdt_ck_thread_start(m);
4379         if (rc)
4380                 GOTO(err_free_hsm, rc);
4381
4382         rc = tgt_init(env, &m->mdt_lut, obd, m->mdt_bottom, mdt_common_slice,
4383                       OBD_FAIL_MDS_ALL_REQUEST_NET,
4384                       OBD_FAIL_MDS_ALL_REPLY_NET);
4385         if (rc)
4386                 GOTO(err_capa, rc);
4387
4388         rc = mdt_fs_setup(env, m, obd, lsi);
4389         if (rc)
4390                 GOTO(err_tgt, rc);
4391
4392         tgt_adapt_sptlrpc_conf(&m->mdt_lut, 1);
4393
4394         next = m->mdt_child;
4395         rc = next->md_ops->mdo_iocontrol(env, next, OBD_IOC_GET_MNTOPT, 0,
4396                                          &mntopts);
4397         if (rc)
4398                 GOTO(err_fs_cleanup, rc);
4399
4400         if (mntopts & MNTOPT_USERXATTR)
4401                 m->mdt_opts.mo_user_xattr = 1;
4402         else
4403                 m->mdt_opts.mo_user_xattr = 0;
4404
4405         rc = next->md_ops->mdo_maxeasize_get(env, next, &m->mdt_max_ea_size);
4406         if (rc)
4407                 GOTO(err_fs_cleanup, rc);
4408
4409         if (mntopts & MNTOPT_ACL)
4410                 m->mdt_opts.mo_acl = 1;
4411         else
4412                 m->mdt_opts.mo_acl = 0;
4413
4414         /* XXX: to support suppgid for ACL, we enable identity_upcall
4415          * by default, otherwise, maybe got unexpected -EACCESS. */
4416         if (m->mdt_opts.mo_acl)
4417                 identity_upcall = MDT_IDENTITY_UPCALL_PATH;
4418
4419         m->mdt_identity_cache = upcall_cache_init(mdt_obd_name(m),
4420                                                 identity_upcall,
4421                                                 &mdt_identity_upcall_cache_ops);
4422         if (IS_ERR(m->mdt_identity_cache)) {
4423                 rc = PTR_ERR(m->mdt_identity_cache);
4424                 m->mdt_identity_cache = NULL;
4425                 GOTO(err_fs_cleanup, rc);
4426         }
4427
4428         rc = mdt_procfs_init(m, dev);
4429         if (rc) {
4430                 CERROR("Can't init MDT lprocfs, rc %d\n", rc);
4431                 GOTO(err_recovery, rc);
4432         }
4433
4434         rc = mdt_quota_init(env, m, cfg);
4435         if (rc)
4436                 GOTO(err_procfs, rc);
4437
4438         m->mdt_ldlm_client = &mdt2obd_dev(m)->obd_ldlm_client;
4439         ptlrpc_init_client(LDLM_CB_REQUEST_PORTAL, LDLM_CB_REPLY_PORTAL,
4440                            "mdt_ldlm_client", m->mdt_ldlm_client);
4441
4442         ping_evictor_start();
4443
4444         /* recovery will be started upon mdt_prepare()
4445          * when the whole stack is complete and ready
4446          * to serve the requests */
4447
4448         mdt_init_capa_ctxt(env, m);
4449
4450         /* Reduce the initial timeout on an MDS because it doesn't need such
4451          * a long timeout as an OST does. Adaptive timeouts will adjust this
4452          * value appropriately. */
4453         if (ldlm_timeout == LDLM_TIMEOUT_DEFAULT)
4454                 ldlm_timeout = MDS_LDLM_TIMEOUT_DEFAULT;
4455
4456         RETURN(0);
4457 err_procfs:
4458         mdt_procfs_fini(m);
4459 err_recovery:
4460         target_recovery_fini(obd);
4461         upcall_cache_cleanup(m->mdt_identity_cache);
4462         m->mdt_identity_cache = NULL;
4463 err_fs_cleanup:
4464         mdt_fs_cleanup(env, m);
4465 err_tgt:
4466         tgt_fini(env, &m->mdt_lut);
4467 err_capa:
4468         cfs_timer_disarm(&m->mdt_ck_timer);
4469         mdt_ck_thread_stop(m);
4470 err_free_hsm:
4471         mdt_hsm_cdt_fini(m);
4472 err_free_ns:
4473         ldlm_namespace_free(m->mdt_namespace, NULL, 0);
4474         obd->obd_namespace = m->mdt_namespace = NULL;
4475 err_fini_seq:
4476         mdt_seq_fini(env, m);
4477 err_fini_fld:
4478         mdt_fld_fini(env, m);
4479 err_fini_stack:
4480         mdt_stack_fini(env, m, md2lu_dev(m->mdt_child));
4481 err_lmi:
4482         if (lmi)
4483                 server_put_mount(dev);
4484         return(rc);
4485 }
4486
4487 /* For interoperability, the left element is old parameter, the right one
4488  * is the new version of the parameter, if some parameter is deprecated,
4489  * the new version should be set as NULL. */
4490 static struct cfg_interop_param mdt_interop_param[] = {
4491         { "mdt.group_upcall",   NULL },
4492         { "mdt.quota_type",     NULL },
4493         { "mdd.quota_type",     NULL },
4494         { "mdt.rootsquash",     "mdt.root_squash" },
4495         { "mdt.nosquash_nid",   "mdt.nosquash_nids" },
4496         { NULL }
4497 };
4498
4499 /* used by MGS to process specific configurations */
4500 static int mdt_process_config(const struct lu_env *env,
4501                               struct lu_device *d, struct lustre_cfg *cfg)
4502 {
4503         struct mdt_device *m = mdt_dev(d);
4504         struct md_device *md_next = m->mdt_child;
4505         struct lu_device *next = md2lu_dev(md_next);
4506         int rc;
4507         ENTRY;
4508
4509         switch (cfg->lcfg_command) {
4510         case LCFG_PARAM: {
4511                 struct lprocfs_static_vars  lvars;
4512                 struct obd_device          *obd = d->ld_obd;
4513
4514                 /* For interoperability */
4515                 struct cfg_interop_param   *ptr = NULL;
4516                 struct lustre_cfg          *old_cfg = NULL;
4517                 char                       *param = NULL;
4518
4519                 param = lustre_cfg_string(cfg, 1);
4520                 if (param == NULL) {
4521                         CERROR("param is empty\n");
4522                         rc = -EINVAL;
4523                         break;
4524                 }
4525
4526                 ptr = class_find_old_param(param, mdt_interop_param);
4527                 if (ptr != NULL) {
4528                         if (ptr->new_param == NULL) {
4529                                 rc = 0;
4530                                 CWARN("For interoperability, skip this %s."
4531                                       " It is obsolete.\n", ptr->old_param);
4532                                 break;
4533                         }
4534
4535                         CWARN("Found old param %s, changed it to %s.\n",
4536                               ptr->old_param, ptr->new_param);
4537
4538                         old_cfg = cfg;
4539                         cfg = lustre_cfg_rename(old_cfg, ptr->new_param);
4540                         if (IS_ERR(cfg)) {
4541                                 rc = PTR_ERR(cfg);
4542                                 break;
4543                         }
4544                 }
4545
4546                 lprocfs_mdt_init_vars(&lvars);
4547                 rc = class_process_proc_param(PARAM_MDT, lvars.obd_vars,
4548                                               cfg, obd);
4549                 if (rc > 0 || rc == -ENOSYS) {
4550                         /* is it an HSM var ? */
4551                         rc = class_process_proc_param(PARAM_HSM,
4552                                                       hsm_cdt_get_proc_vars(),
4553                                                       cfg, obd);
4554                         if (rc > 0 || rc == -ENOSYS)
4555                                 /* we don't understand; pass it on */
4556                                 rc = next->ld_ops->ldo_process_config(env, next,
4557                                                                       cfg);
4558                 }
4559
4560                 if (old_cfg != NULL)
4561                         lustre_cfg_free(cfg);
4562
4563                 break;
4564         }
4565         default:
4566                 /* others are passed further */
4567                 rc = next->ld_ops->ldo_process_config(env, next, cfg);
4568                 break;
4569         }
4570         RETURN(rc);
4571 }
4572
4573 static struct lu_object *mdt_object_alloc(const struct lu_env *env,
4574                                           const struct lu_object_header *hdr,
4575                                           struct lu_device *d)
4576 {
4577         struct mdt_object *mo;
4578
4579         ENTRY;
4580
4581         OBD_SLAB_ALLOC_PTR_GFP(mo, mdt_object_kmem, __GFP_IO);
4582         if (mo != NULL) {
4583                 struct lu_object *o;
4584                 struct lu_object_header *h;
4585
4586                 o = &mo->mot_obj;
4587                 h = &mo->mot_header;
4588                 lu_object_header_init(h);
4589                 lu_object_init(o, h, d);
4590                 lu_object_add_top(h, o);
4591                 o->lo_ops = &mdt_obj_ops;
4592                 mutex_init(&mo->mot_ioepoch_mutex);
4593                 mutex_init(&mo->mot_lov_mutex);
4594                 init_rwsem(&mo->mot_open_sem);
4595                 RETURN(o);
4596         }
4597         RETURN(NULL);
4598 }
4599
4600 static int mdt_object_init(const struct lu_env *env, struct lu_object *o,
4601                            const struct lu_object_conf *unused)
4602 {
4603         struct mdt_device *d = mdt_dev(o->lo_dev);
4604         struct lu_device  *under;
4605         struct lu_object  *below;
4606         int                rc = 0;
4607         ENTRY;
4608
4609         CDEBUG(D_INFO, "object init, fid = "DFID"\n",
4610                PFID(lu_object_fid(o)));
4611
4612         under = &d->mdt_child->md_lu_dev;
4613         below = under->ld_ops->ldo_object_alloc(env, o->lo_header, under);
4614         if (below != NULL) {
4615                 lu_object_add(o, below);
4616         } else
4617                 rc = -ENOMEM;
4618
4619         RETURN(rc);
4620 }
4621
4622 static void mdt_object_free(const struct lu_env *env, struct lu_object *o)
4623 {
4624         struct mdt_object *mo = mdt_obj(o);
4625         struct lu_object_header *h;
4626         ENTRY;
4627
4628         h = o->lo_header;
4629         CDEBUG(D_INFO, "object free, fid = "DFID"\n",
4630                PFID(lu_object_fid(o)));
4631
4632         LASSERT(atomic_read(&mo->mot_open_count) == 0);
4633         LASSERT(atomic_read(&mo->mot_lease_count) == 0);
4634
4635         lu_object_fini(o);
4636         lu_object_header_fini(h);
4637         OBD_SLAB_FREE_PTR(mo, mdt_object_kmem);
4638
4639         EXIT;
4640 }
4641
4642 static int mdt_object_print(const struct lu_env *env, void *cookie,
4643                             lu_printer_t p, const struct lu_object *o)
4644 {
4645         struct mdt_object *mdto = mdt_obj((struct lu_object *)o);
4646         return (*p)(env, cookie, LUSTRE_MDT_NAME"-object@%p(ioepoch="LPU64" "
4647                     "flags="LPX64", epochcount=%d, writecount=%d)",
4648                     mdto, mdto->mot_ioepoch, mdto->mot_flags,
4649                     mdto->mot_ioepoch_count, mdto->mot_writecount);
4650 }
4651
4652 static int mdt_prepare(const struct lu_env *env,
4653                 struct lu_device *pdev,
4654                 struct lu_device *cdev)
4655 {
4656         struct mdt_device *mdt = mdt_dev(cdev);
4657         struct lu_device *next = &mdt->mdt_child->md_lu_dev;
4658         struct obd_device *obd = cdev->ld_obd;
4659         struct lfsck_start_param lsp;
4660         int rc;
4661
4662         ENTRY;
4663
4664         LASSERT(obd);
4665
4666         rc = next->ld_ops->ldo_prepare(env, cdev, next);
4667         if (rc)
4668                 RETURN(rc);
4669
4670         rc = mdt_llog_ctxt_clone(env, mdt, LLOG_CHANGELOG_ORIG_CTXT);
4671         if (rc)
4672                 RETURN(rc);
4673
4674         rc = mdt_llog_ctxt_clone(env, mdt, LLOG_AGENT_ORIG_CTXT);
4675         if (rc)
4676                 RETURN(rc);
4677
4678         lsp.lsp_start = NULL;
4679         lsp.lsp_namespace = mdt->mdt_namespace;
4680         rc = mdt->mdt_child->md_ops->mdo_iocontrol(env, mdt->mdt_child,
4681                                                    OBD_IOC_START_LFSCK,
4682                                                    0, &lsp);
4683         if (rc != 0) {
4684                 CWARN("%s: auto trigger paused LFSCK failed: rc = %d\n",
4685                       mdt_obd_name(mdt), rc);
4686                 rc = 0;
4687         }
4688
4689         if (mdt->mdt_seq_site.ss_node_id == 0) {
4690                 rc = mdt->mdt_child->md_ops->mdo_root_get(env, mdt->mdt_child,
4691                                                          &mdt->mdt_md_root_fid);
4692                 if (rc)
4693                         RETURN(rc);
4694         }
4695
4696         LASSERT(!test_bit(MDT_FL_CFGLOG, &mdt->mdt_state));
4697         target_recovery_init(&mdt->mdt_lut, tgt_request_handle);
4698         set_bit(MDT_FL_CFGLOG, &mdt->mdt_state);
4699         LASSERT(obd->obd_no_conn);
4700         spin_lock(&obd->obd_dev_lock);
4701         obd->obd_no_conn = 0;
4702         spin_unlock(&obd->obd_dev_lock);
4703
4704         if (obd->obd_recovering == 0)
4705                 mdt_postrecov(env, mdt);
4706
4707         RETURN(rc);
4708 }
4709
4710 const struct lu_device_operations mdt_lu_ops = {
4711         .ldo_object_alloc   = mdt_object_alloc,
4712         .ldo_process_config = mdt_process_config,
4713         .ldo_prepare        = mdt_prepare,
4714 };
4715
4716 static const struct lu_object_operations mdt_obj_ops = {
4717         .loo_object_init    = mdt_object_init,
4718         .loo_object_free    = mdt_object_free,
4719         .loo_object_print   = mdt_object_print
4720 };
4721
4722 static int mdt_obd_set_info_async(const struct lu_env *env,
4723                                   struct obd_export *exp,
4724                                   __u32 keylen, void *key,
4725                                   __u32 vallen, void *val,
4726                                   struct ptlrpc_request_set *set)
4727 {
4728         int rc;
4729
4730         ENTRY;
4731
4732         if (KEY_IS(KEY_SPTLRPC_CONF)) {
4733                 rc = tgt_adapt_sptlrpc_conf(class_exp2tgt(exp), 0);
4734                 RETURN(rc);
4735         }
4736
4737         RETURN(0);
4738 }
4739
4740 /**
4741  * Match client and server connection feature flags.
4742  *
4743  * Compute the compatibility flags for a connection request based on
4744  * features mutually supported by client and server.
4745  *
4746  * The obd_export::exp_connect_data.ocd_connect_flags field in \a exp
4747  * must not be updated here, otherwise a partially initialized value may
4748  * be exposed. After the connection request is successfully processed,
4749  * the top-level MDT connect request handler atomically updates the export
4750  * connect flags from the obd_connect_data::ocd_connect_flags field of the
4751  * reply. \see mdt_connect().
4752  *
4753  * \param exp   the obd_export associated with this client/target pair
4754  * \param mdt   the target device for the connection
4755  * \param data  stores data for this connect request
4756  *
4757  * \retval 0       success
4758  * \retval -EPROTO \a data unexpectedly has zero obd_connect_data::ocd_brw_size
4759  * \retval -EBADE  client and server feature requirements are incompatible
4760  */
4761 static int mdt_connect_internal(struct obd_export *exp,
4762                                 struct mdt_device *mdt,
4763                                 struct obd_connect_data *data)
4764 {
4765         LASSERT(data != NULL);
4766
4767         data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
4768         data->ocd_ibits_known &= MDS_INODELOCK_FULL;
4769
4770         /* If no known bits (which should not happen, probably,
4771            as everybody should support LOOKUP and UPDATE bits at least)
4772            revert to compat mode with plain locks. */
4773         if (!data->ocd_ibits_known &&
4774             data->ocd_connect_flags & OBD_CONNECT_IBITS)
4775                 data->ocd_connect_flags &= ~OBD_CONNECT_IBITS;
4776
4777         if (!mdt->mdt_opts.mo_acl)
4778                 data->ocd_connect_flags &= ~OBD_CONNECT_ACL;
4779
4780         if (!mdt->mdt_opts.mo_user_xattr)
4781                 data->ocd_connect_flags &= ~OBD_CONNECT_XATTR;
4782
4783         if (!mdt->mdt_som_conf)
4784                 data->ocd_connect_flags &= ~OBD_CONNECT_SOM;
4785
4786         if (data->ocd_connect_flags & OBD_CONNECT_BRW_SIZE) {
4787                 data->ocd_brw_size = min(data->ocd_brw_size,
4788                                          (__u32)MD_MAX_BRW_SIZE);
4789                 if (data->ocd_brw_size == 0) {
4790                         CERROR("%s: cli %s/%p ocd_connect_flags: "LPX64
4791                                " ocd_version: %x ocd_grant: %d "
4792                                "ocd_index: %u ocd_brw_size is "
4793                                "unexpectedly zero, network data "
4794                                "corruption? Refusing connection of this"
4795                                " client\n",
4796                                mdt_obd_name(mdt),
4797                                exp->exp_client_uuid.uuid,
4798                                exp, data->ocd_connect_flags, data->ocd_version,
4799                                data->ocd_grant, data->ocd_index);
4800                         return -EPROTO;
4801                 }
4802         }
4803
4804         /* NB: Disregard the rule against updating
4805          * exp_connect_data.ocd_connect_flags in this case, since
4806          * tgt_client_new() needs to know if this is a lightweight
4807          * connection, and it is safe to expose this flag before
4808          * connection processing completes. */
4809         if (data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT) {
4810                 spin_lock(&exp->exp_lock);
4811                 *exp_connect_flags_ptr(exp) |= OBD_CONNECT_LIGHTWEIGHT;
4812                 spin_unlock(&exp->exp_lock);
4813         }
4814
4815         data->ocd_version = LUSTRE_VERSION_CODE;
4816
4817         if ((data->ocd_connect_flags & OBD_CONNECT_FID) == 0) {
4818                 CWARN("%s: MDS requires FID support, but client not\n",
4819                       mdt_obd_name(mdt));
4820                 return -EBADE;
4821         }
4822
4823         if (mdt->mdt_som_conf &&
4824             !(data->ocd_connect_flags & (OBD_CONNECT_LIGHTWEIGHT |
4825                                          OBD_CONNECT_MDS_MDS |
4826                                          OBD_CONNECT_SOM))) {
4827                 CWARN("%s: MDS has SOM enabled, but client does not support "
4828                       "it\n", mdt_obd_name(mdt));
4829                 return -EBADE;
4830         }
4831
4832         if (OCD_HAS_FLAG(data, PINGLESS)) {
4833                 if (ptlrpc_pinger_suppress_pings()) {
4834                         spin_lock(&exp->exp_obd->obd_dev_lock);
4835                         list_del_init(&exp->exp_obd_chain_timed);
4836                         spin_unlock(&exp->exp_obd->obd_dev_lock);
4837                 } else {
4838                         data->ocd_connect_flags &= ~OBD_CONNECT_PINGLESS;
4839                 }
4840         }
4841
4842         data->ocd_max_easize = mdt->mdt_max_ea_size;
4843
4844         return 0;
4845 }
4846
4847 /* mds_connect copy */
4848 static int mdt_obd_connect(const struct lu_env *env,
4849                            struct obd_export **exp, struct obd_device *obd,
4850                            struct obd_uuid *cluuid,
4851                            struct obd_connect_data *data,
4852                            void *localdata)
4853 {
4854         struct obd_export      *lexp;
4855         struct lustre_handle    conn = { 0 };
4856         struct mdt_device      *mdt;
4857         int                     rc;
4858         ENTRY;
4859
4860         LASSERT(env != NULL);
4861         if (!exp || !obd || !cluuid)
4862                 RETURN(-EINVAL);
4863
4864         mdt = mdt_dev(obd->obd_lu_dev);
4865
4866         /*
4867          * first, check whether the stack is ready to handle requests
4868          * XXX: probably not very appropriate method is used now
4869          *      at some point we should find a better one
4870          */
4871         if (!test_bit(MDT_FL_SYNCED, &mdt->mdt_state) && data != NULL &&
4872             !(data->ocd_connect_flags & OBD_CONNECT_LIGHTWEIGHT)) {
4873                 rc = obd_get_info(env, mdt->mdt_child_exp,
4874                                   sizeof(KEY_OSP_CONNECTED),
4875                                   KEY_OSP_CONNECTED, NULL, NULL, NULL);
4876                 if (rc)
4877                         RETURN(-EAGAIN);
4878                 set_bit(MDT_FL_SYNCED, &mdt->mdt_state);
4879         }
4880
4881         rc = class_connect(&conn, obd, cluuid);
4882         if (rc)
4883                 RETURN(rc);
4884
4885         lexp = class_conn2export(&conn);
4886         LASSERT(lexp != NULL);
4887
4888         rc = mdt_connect_internal(lexp, mdt, data);
4889         if (rc == 0) {
4890                 struct lsd_client_data *lcd = lexp->exp_target_data.ted_lcd;
4891
4892                 LASSERT(lcd);
4893                 memcpy(lcd->lcd_uuid, cluuid, sizeof lcd->lcd_uuid);
4894                 rc = tgt_client_new(env, lexp);
4895                 if (rc == 0)
4896                         mdt_export_stats_init(obd, lexp, localdata);
4897         }
4898
4899         if (rc != 0) {
4900                 class_disconnect(lexp);
4901                 *exp = NULL;
4902         } else {
4903                 *exp = lexp;
4904         }
4905
4906         RETURN(rc);
4907 }
4908
4909 static int mdt_obd_reconnect(const struct lu_env *env,
4910                              struct obd_export *exp, struct obd_device *obd,
4911                              struct obd_uuid *cluuid,
4912                              struct obd_connect_data *data,
4913                              void *localdata)
4914 {
4915         int                     rc;
4916         ENTRY;
4917
4918         if (exp == NULL || obd == NULL || cluuid == NULL)
4919                 RETURN(-EINVAL);
4920
4921         rc = mdt_connect_internal(exp, mdt_dev(obd->obd_lu_dev), data);
4922         if (rc == 0)
4923                 mdt_export_stats_init(obd, exp, localdata);
4924
4925         RETURN(rc);
4926 }
4927
4928 static int mdt_ctxt_add_dirty_flag(struct lu_env *env,
4929                                    struct mdt_thread_info *info,
4930                                    struct mdt_file_data *mfd)
4931 {
4932         struct lu_context ses;
4933         int rc;
4934         ENTRY;
4935
4936         rc = lu_context_init(&ses, LCT_SERVER_SESSION);
4937         if (rc)
4938                 RETURN(rc);
4939
4940         env->le_ses = &ses;
4941         lu_context_enter(&ses);
4942
4943         mdt_ucred(info)->uc_valid = UCRED_OLD;
4944         rc = mdt_add_dirty_flag(info, mfd->mfd_object, &info->mti_attr);
4945
4946         lu_context_exit(&ses);
4947         lu_context_fini(&ses);
4948         env->le_ses = NULL;
4949
4950         RETURN(rc);
4951 }
4952
4953 static int mdt_export_cleanup(struct obd_export *exp)
4954 {
4955         struct mdt_export_data *med = &exp->exp_mdt_data;
4956         struct obd_device      *obd = exp->exp_obd;
4957         struct mdt_device      *mdt;
4958         struct mdt_thread_info *info;
4959         struct lu_env           env;
4960         CFS_LIST_HEAD(closing_list);
4961         struct mdt_file_data *mfd, *n;
4962         int rc = 0;
4963         ENTRY;
4964
4965         spin_lock(&med->med_open_lock);
4966         while (!cfs_list_empty(&med->med_open_head)) {
4967                 cfs_list_t *tmp = med->med_open_head.next;
4968                 mfd = cfs_list_entry(tmp, struct mdt_file_data, mfd_list);
4969
4970                 /* Remove mfd handle so it can't be found again.
4971                  * We are consuming the mfd_list reference here. */
4972                 class_handle_unhash(&mfd->mfd_handle);
4973                 cfs_list_move_tail(&mfd->mfd_list, &closing_list);
4974         }
4975         spin_unlock(&med->med_open_lock);
4976         mdt = mdt_dev(obd->obd_lu_dev);
4977         LASSERT(mdt != NULL);
4978
4979         rc = lu_env_init(&env, LCT_MD_THREAD);
4980         if (rc)
4981                 RETURN(rc);
4982
4983         info = lu_context_key_get(&env.le_ctx, &mdt_thread_key);
4984         LASSERT(info != NULL);
4985         memset(info, 0, sizeof *info);
4986         info->mti_env = &env;
4987         info->mti_mdt = mdt;
4988         info->mti_exp = exp;
4989
4990         if (!cfs_list_empty(&closing_list)) {
4991                 struct md_attr *ma = &info->mti_attr;
4992
4993                 /* Close any open files (which may also cause orphan unlinking). */
4994                 cfs_list_for_each_entry_safe(mfd, n, &closing_list, mfd_list) {
4995                         cfs_list_del_init(&mfd->mfd_list);
4996                         ma->ma_need = ma->ma_valid = 0;
4997
4998                         /* This file is being closed due to an eviction, it
4999                          * could have been modified and now dirty regarding to
5000                          * HSM archive, check this!
5001                          * The logic here is to mark a file dirty if there's a
5002                          * chance it was dirtied before the client was evicted,
5003                          * so that we don't have to wait for a release attempt
5004                          * before finding out the file was actually dirty and
5005                          * fail the release. Aggressively marking it dirty here
5006                          * will cause the policy engine to attempt to
5007                          * re-archive it; when rearchiving, we can compare the
5008                          * current version to the HSM data_version and make the
5009                          * archive request into a noop if it's not actually
5010                          * dirty.
5011                          */
5012                         if (mfd->mfd_mode & (FMODE_WRITE|MDS_FMODE_TRUNC))
5013                                 rc = mdt_ctxt_add_dirty_flag(&env, info, mfd);
5014
5015                         /* Don't unlink orphan on failover umount, LU-184 */
5016                         if (exp->exp_flags & OBD_OPT_FAILOVER) {
5017                                 ma->ma_valid = MA_FLAGS;
5018                                 ma->ma_attr_flags |= MDS_KEEP_ORPHAN;
5019                         }
5020                         mdt_mfd_close(info, mfd);
5021                 }
5022         }
5023         info->mti_mdt = NULL;
5024         /* cleanup client slot early */
5025         /* Do not erase record for recoverable client. */
5026         if (!(exp->exp_flags & OBD_OPT_FAILOVER) || exp->exp_failed)
5027                 tgt_client_del(&env, exp);
5028         lu_env_fini(&env);
5029
5030         RETURN(rc);
5031 }
5032
5033 static int mdt_obd_disconnect(struct obd_export *exp)
5034 {
5035         int rc;
5036         ENTRY;
5037
5038         LASSERT(exp);
5039         class_export_get(exp);
5040
5041         rc = server_disconnect_export(exp);
5042         if (rc != 0)
5043                 CDEBUG(D_IOCTL, "server disconnect error: %d\n", rc);
5044
5045         rc = mdt_export_cleanup(exp);
5046         class_export_put(exp);
5047         RETURN(rc);
5048 }
5049
5050 /* FIXME: Can we avoid using these two interfaces? */
5051 static int mdt_init_export(struct obd_export *exp)
5052 {
5053         struct mdt_export_data *med = &exp->exp_mdt_data;
5054         int                     rc;
5055         ENTRY;
5056
5057         CFS_INIT_LIST_HEAD(&med->med_open_head);
5058         spin_lock_init(&med->med_open_lock);
5059         mutex_init(&med->med_idmap_mutex);
5060         med->med_idmap = NULL;
5061         spin_lock(&exp->exp_lock);
5062         exp->exp_connecting = 1;
5063         spin_unlock(&exp->exp_lock);
5064
5065         /* self-export doesn't need client data and ldlm initialization */
5066         if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid,
5067                                      &exp->exp_client_uuid)))
5068                 RETURN(0);
5069
5070         rc = tgt_client_alloc(exp);
5071         if (rc)
5072                 GOTO(err, rc);
5073
5074         rc = ldlm_init_export(exp);
5075         if (rc)
5076                 GOTO(err_free, rc);
5077
5078         RETURN(rc);
5079
5080 err_free:
5081         tgt_client_free(exp);
5082 err:
5083         CERROR("%s: Failed to initialize export: rc = %d\n",
5084                exp->exp_obd->obd_name, rc);
5085         return rc;
5086 }
5087
5088 static int mdt_destroy_export(struct obd_export *exp)
5089 {
5090         ENTRY;
5091
5092         if (exp_connect_rmtclient(exp))
5093                 mdt_cleanup_idmap(&exp->exp_mdt_data);
5094
5095         target_destroy_export(exp);
5096         /* destroy can be called from failed obd_setup, so
5097          * checking uuid is safer than obd_self_export */
5098         if (unlikely(obd_uuid_equals(&exp->exp_obd->obd_uuid,
5099                                      &exp->exp_client_uuid)))
5100                 RETURN(0);
5101
5102         ldlm_destroy_export(exp);
5103         tgt_client_free(exp);
5104
5105         LASSERT(cfs_list_empty(&exp->exp_outstanding_replies));
5106         LASSERT(cfs_list_empty(&exp->exp_mdt_data.med_open_head));
5107
5108         RETURN(0);
5109 }
5110
5111 /** The maximum depth that fid2path() will search.
5112  * This is limited only because we want to store the fids for
5113  * historical path lookup purposes.
5114  */
5115 #define MAX_PATH_DEPTH 100
5116
5117 /** mdt_path() lookup structure. */
5118 struct path_lookup_info {
5119         __u64                   pli_recno;      /**< history point */
5120         __u64                   pli_currec;     /**< current record */
5121         struct lu_fid           pli_fid;
5122         struct lu_fid           pli_fids[MAX_PATH_DEPTH]; /**< path, in fids */
5123         struct mdt_object       *pli_mdt_obj;
5124         char                    *pli_path;      /**< full path */
5125         int                     pli_pathlen;
5126         int                     pli_linkno;     /**< which hardlink to follow */
5127         int                     pli_fidcount;   /**< number of \a pli_fids */
5128 };
5129
5130 static int mdt_links_read(struct mdt_thread_info *info,
5131                           struct mdt_object *mdt_obj, struct linkea_data *ldata)
5132 {
5133         int rc;
5134
5135         LASSERT(ldata->ld_buf->lb_buf != NULL);
5136
5137         if (!mdt_object_exists(mdt_obj))
5138                 return -ENODATA;
5139
5140         rc = mo_xattr_get(info->mti_env, mdt_object_child(mdt_obj),
5141                           ldata->ld_buf, XATTR_NAME_LINK);
5142         if (rc == -ERANGE) {
5143                 /* Buf was too small, figure out what we need. */
5144                 lu_buf_free(ldata->ld_buf);
5145                 rc = mo_xattr_get(info->mti_env, mdt_object_child(mdt_obj),
5146                                   ldata->ld_buf, XATTR_NAME_LINK);
5147                 if (rc < 0)
5148                         return rc;
5149                 ldata->ld_buf = lu_buf_check_and_alloc(ldata->ld_buf, rc);
5150                 if (ldata->ld_buf->lb_buf == NULL)
5151                         return -ENOMEM;
5152                 rc = mo_xattr_get(info->mti_env, mdt_object_child(mdt_obj),
5153                                   ldata->ld_buf, XATTR_NAME_LINK);
5154         }
5155         if (rc < 0)
5156                 return rc;
5157
5158         return linkea_init(ldata);
5159 }
5160
5161 static int mdt_path_current(struct mdt_thread_info *info,
5162                             struct path_lookup_info *pli)
5163 {
5164         struct mdt_device       *mdt = info->mti_mdt;
5165         struct mdt_object       *mdt_obj;
5166         struct link_ea_header   *leh;
5167         struct link_ea_entry    *lee;
5168         struct lu_name          *tmpname = &info->mti_name;
5169         struct lu_fid           *tmpfid = &info->mti_tmp_fid1;
5170         struct lu_buf           *buf = &info->mti_big_buf;
5171         char                    *ptr;
5172         int                     reclen;
5173         struct linkea_data      ldata = { 0 };
5174         int                     rc = 0;
5175         ENTRY;
5176
5177         /* temp buffer for path element, the buffer will be finally freed
5178          * in mdt_thread_info_fini */
5179         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
5180         if (buf->lb_buf == NULL)
5181                 RETURN(-ENOMEM);
5182
5183         ldata.ld_buf = buf;
5184         ptr = pli->pli_path + pli->pli_pathlen - 1;
5185         *ptr = 0;
5186         --ptr;
5187         pli->pli_fidcount = 0;
5188         pli->pli_fids[0] = *(struct lu_fid *)mdt_object_fid(pli->pli_mdt_obj);
5189
5190         /* root FID only exists on MDT0, and fid2path should also ends at MDT0,
5191          * so checking root_fid can only happen on MDT0. */
5192         while (!lu_fid_eq(&mdt->mdt_md_root_fid,
5193                           &pli->pli_fids[pli->pli_fidcount])) {
5194                 mdt_obj = mdt_object_find(info->mti_env, mdt,
5195                                           &pli->pli_fids[pli->pli_fidcount]);
5196                 if (IS_ERR(mdt_obj))
5197                         GOTO(out, rc = PTR_ERR(mdt_obj));
5198                 if (mdt_object_remote(mdt_obj)) {
5199                         mdt_object_put(info->mti_env, mdt_obj);
5200                         GOTO(remote_out, rc = -EREMOTE);
5201                 }
5202                 if (!mdt_object_exists(mdt_obj)) {
5203                         mdt_object_put(info->mti_env, mdt_obj);
5204                         GOTO(out, rc = -ENOENT);
5205                 }
5206
5207                 rc = mdt_links_read(info, mdt_obj, &ldata);
5208                 mdt_object_put(info->mti_env, mdt_obj);
5209                 if (rc != 0)
5210                         GOTO(out, rc);
5211
5212                 leh = buf->lb_buf;
5213                 lee = (struct link_ea_entry *)(leh + 1); /* link #0 */
5214                 linkea_entry_unpack(lee, &reclen, tmpname, tmpfid);
5215                 /* If set, use link #linkno for path lookup, otherwise use
5216                    link #0.  Only do this for the final path element. */
5217                 if (pli->pli_fidcount == 0 &&
5218                     pli->pli_linkno < leh->leh_reccount) {
5219                         int count;
5220                         for (count = 0; count < pli->pli_linkno; count++) {
5221                                 lee = (struct link_ea_entry *)
5222                                      ((char *)lee + reclen);
5223                                 linkea_entry_unpack(lee, &reclen, tmpname,
5224                                                     tmpfid);
5225                         }
5226                         if (pli->pli_linkno < leh->leh_reccount - 1)
5227                                 /* indicate to user there are more links */
5228                                 pli->pli_linkno++;
5229                 }
5230
5231                 /* Pack the name in the end of the buffer */
5232                 ptr -= tmpname->ln_namelen;
5233                 if (ptr - 1 <= pli->pli_path)
5234                         GOTO(out, rc = -EOVERFLOW);
5235                 strncpy(ptr, tmpname->ln_name, tmpname->ln_namelen);
5236                 *(--ptr) = '/';
5237
5238                 /* Store the parent fid for historic lookup */
5239                 if (++pli->pli_fidcount >= MAX_PATH_DEPTH)
5240                         GOTO(out, rc = -EOVERFLOW);
5241                 pli->pli_fids[pli->pli_fidcount] = *tmpfid;
5242         }
5243
5244 remote_out:
5245         ptr++; /* skip leading / */
5246         memmove(pli->pli_path, ptr, pli->pli_path + pli->pli_pathlen - ptr);
5247
5248         EXIT;
5249 out:
5250         return rc;
5251 }
5252
5253 /* Returns the full path to this fid, as of changelog record recno. */
5254 static int mdt_path(struct mdt_thread_info *info, struct mdt_object *obj,
5255                     char *path, int pathlen, __u64 *recno, int *linkno,
5256                     struct lu_fid *fid)
5257 {
5258         struct mdt_device       *mdt = info->mti_mdt;
5259         struct path_lookup_info *pli;
5260         int                     tries = 3;
5261         int                     rc = -EAGAIN;
5262         ENTRY;
5263
5264         if (pathlen < 3)
5265                 RETURN(-EOVERFLOW);
5266
5267         if (lu_fid_eq(&mdt->mdt_md_root_fid, mdt_object_fid(obj))) {
5268                 path[0] = '\0';
5269                 RETURN(0);
5270         }
5271
5272         OBD_ALLOC_PTR(pli);
5273         if (pli == NULL)
5274                 RETURN(-ENOMEM);
5275
5276         pli->pli_mdt_obj = obj;
5277         pli->pli_recno = *recno;
5278         pli->pli_path = path;
5279         pli->pli_pathlen = pathlen;
5280         pli->pli_linkno = *linkno;
5281
5282         /* Retry multiple times in case file is being moved */
5283         while (tries-- && rc == -EAGAIN)
5284                 rc = mdt_path_current(info, pli);
5285
5286         /* return the last resolved fids to the client, so the client will
5287          * build the left path on another MDT for remote object */
5288         *fid = pli->pli_fids[pli->pli_fidcount];
5289
5290         *recno = pli->pli_currec;
5291         /* Return next link index to caller */
5292         *linkno = pli->pli_linkno;
5293
5294         OBD_FREE_PTR(pli);
5295
5296         RETURN(rc);
5297 }
5298
5299 static int mdt_fid2path(struct mdt_thread_info *info,
5300                         struct getinfo_fid2path *fp)
5301 {
5302         struct mdt_device *mdt = info->mti_mdt;
5303         struct mdt_object *obj;
5304         int    rc;
5305         ENTRY;
5306
5307         CDEBUG(D_IOCTL, "path get "DFID" from "LPU64" #%d\n",
5308                 PFID(&fp->gf_fid), fp->gf_recno, fp->gf_linkno);
5309
5310         if (!fid_is_sane(&fp->gf_fid))
5311                 RETURN(-EINVAL);
5312
5313         if (!fid_is_namespace_visible(&fp->gf_fid)) {
5314                 CWARN("%s: "DFID" is invalid, sequence should be "
5315                       ">= "LPX64"\n", mdt_obd_name(mdt),
5316                       PFID(&fp->gf_fid), (__u64)FID_SEQ_NORMAL);
5317                 RETURN(-EINVAL);
5318         }
5319
5320         obj = mdt_object_find(info->mti_env, mdt, &fp->gf_fid);
5321         if (obj == NULL || IS_ERR(obj)) {
5322                 CDEBUG(D_IOCTL, "no object "DFID": %ld\n", PFID(&fp->gf_fid),
5323                        PTR_ERR(obj));
5324                 RETURN(-EINVAL);
5325         }
5326
5327         if (mdt_object_remote(obj))
5328                 rc = -EREMOTE;
5329         else if (!mdt_object_exists(obj))
5330                 rc = -ENOENT;
5331         else
5332                 rc = 0;
5333
5334         if (rc < 0) {
5335                 mdt_object_put(info->mti_env, obj);
5336                 CDEBUG(D_IOCTL, "nonlocal object "DFID": %d\n",
5337                        PFID(&fp->gf_fid), rc);
5338                 RETURN(rc);
5339         }
5340
5341         rc = mdt_path(info, obj, fp->gf_path, fp->gf_pathlen, &fp->gf_recno,
5342                       &fp->gf_linkno, &fp->gf_fid);
5343
5344         CDEBUG(D_INFO, "fid "DFID", path %s recno "LPX64" linkno %u\n",
5345                PFID(&fp->gf_fid), fp->gf_path, fp->gf_recno, fp->gf_linkno);
5346
5347         mdt_object_put(info->mti_env, obj);
5348
5349         RETURN(rc);
5350 }
5351
5352 static int mdt_rpc_fid2path(struct mdt_thread_info *info, void *key,
5353                             void *val, int vallen)
5354 {
5355         struct getinfo_fid2path *fpout, *fpin;
5356         int rc = 0;
5357
5358         fpin = key + cfs_size_round(sizeof(KEY_FID2PATH));
5359         fpout = val;
5360
5361         if (ptlrpc_req_need_swab(info->mti_pill->rc_req))
5362                 lustre_swab_fid2path(fpin);
5363
5364         memcpy(fpout, fpin, sizeof(*fpin));
5365         if (fpout->gf_pathlen != vallen - sizeof(*fpin))
5366                 RETURN(-EINVAL);
5367
5368         rc = mdt_fid2path(info, fpout);
5369         RETURN(rc);
5370 }
5371
5372 int mdt_get_info(struct tgt_session_info *tsi)
5373 {
5374         char    *key;
5375         int      keylen;
5376         __u32   *vallen;
5377         void    *valout;
5378         int      rc;
5379
5380         ENTRY;
5381
5382         key = req_capsule_client_get(tsi->tsi_pill, &RMF_GETINFO_KEY);
5383         if (key == NULL) {
5384                 CDEBUG(D_IOCTL, "No GETINFO key");
5385                 RETURN(err_serious(-EFAULT));
5386         }
5387         keylen = req_capsule_get_size(tsi->tsi_pill, &RMF_GETINFO_KEY,
5388                                       RCL_CLIENT);
5389
5390         vallen = req_capsule_client_get(tsi->tsi_pill, &RMF_GETINFO_VALLEN);
5391         if (vallen == NULL) {
5392                 CDEBUG(D_IOCTL, "Unable to get RMF_GETINFO_VALLEN buffer");
5393                 RETURN(err_serious(-EFAULT));
5394         }
5395
5396         req_capsule_set_size(tsi->tsi_pill, &RMF_GETINFO_VAL, RCL_SERVER,
5397                              *vallen);
5398         rc = req_capsule_server_pack(tsi->tsi_pill);
5399         if (rc)
5400                 RETURN(err_serious(rc));
5401
5402         valout = req_capsule_server_get(tsi->tsi_pill, &RMF_GETINFO_VAL);
5403         if (valout == NULL) {
5404                 CDEBUG(D_IOCTL, "Unable to get get-info RPC out buffer");
5405                 RETURN(err_serious(-EFAULT));
5406         }
5407
5408         if (KEY_IS(KEY_FID2PATH)) {
5409                 struct mdt_thread_info  *info = tsi2mdt_info(tsi);
5410
5411                 rc = mdt_rpc_fid2path(info, key, valout, *vallen);
5412                 mdt_thread_info_fini(info);
5413         } else {
5414                 rc = -EINVAL;
5415         }
5416         RETURN(rc);
5417 }
5418
5419 /* Pass the ioc down */
5420 static int mdt_ioc_child(struct lu_env *env, struct mdt_device *mdt,
5421                          unsigned int cmd, int len, void *data)
5422 {
5423         struct lu_context ioctl_session;
5424         struct md_device *next = mdt->mdt_child;
5425         int rc;
5426         ENTRY;
5427
5428         rc = lu_context_init(&ioctl_session, LCT_SERVER_SESSION);
5429         if (rc)
5430                 RETURN(rc);
5431         ioctl_session.lc_thread = (struct ptlrpc_thread *)current;
5432         lu_context_enter(&ioctl_session);
5433         env->le_ses = &ioctl_session;
5434
5435         LASSERT(next->md_ops->mdo_iocontrol);
5436         rc = next->md_ops->mdo_iocontrol(env, next, cmd, len, data);
5437
5438         lu_context_exit(&ioctl_session);
5439         lu_context_fini(&ioctl_session);
5440         RETURN(rc);
5441 }
5442
5443 static int mdt_ioc_version_get(struct mdt_thread_info *mti, void *karg)
5444 {
5445         struct obd_ioctl_data *data = karg;
5446         struct lu_fid *fid;
5447         __u64 version;
5448         struct mdt_object *obj;
5449         struct mdt_lock_handle  *lh;
5450         int rc;
5451         ENTRY;
5452
5453         if (data->ioc_inlbuf1 == NULL || data->ioc_inllen1 != sizeof(*fid) ||
5454             data->ioc_inlbuf2 == NULL || data->ioc_inllen2 != sizeof(version))
5455                 RETURN(-EINVAL);
5456
5457         fid = (struct lu_fid *)data->ioc_inlbuf1;
5458
5459         if (!fid_is_sane(fid))
5460                 RETURN(-EINVAL);
5461
5462         CDEBUG(D_IOCTL, "getting version for "DFID"\n", PFID(fid));
5463
5464         lh = &mti->mti_lh[MDT_LH_PARENT];
5465         mdt_lock_reg_init(lh, LCK_CR);
5466
5467         obj = mdt_object_find_lock(mti, fid, lh, MDS_INODELOCK_UPDATE);
5468         if (IS_ERR(obj))
5469                 RETURN(PTR_ERR(obj));
5470
5471         if (mdt_object_remote(obj)) {
5472                 rc = -EREMOTE;
5473                 /**
5474                  * before calling version get the correct MDS should be
5475                  * fid, this is error to find remote object here
5476                  */
5477                 CERROR("nonlocal object "DFID"\n", PFID(fid));
5478         } else if (!mdt_object_exists(obj)) {
5479                 *(__u64 *)data->ioc_inlbuf2 = ENOENT_VERSION;
5480                 rc = -ENOENT;
5481         } else {
5482                 version = dt_version_get(mti->mti_env, mdt_obj2dt(obj));
5483                *(__u64 *)data->ioc_inlbuf2 = version;
5484                 rc = 0;
5485         }
5486         mdt_object_unlock_put(mti, obj, lh, 1);
5487         RETURN(rc);
5488 }
5489
5490 /* ioctls on obd dev */
5491 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
5492                          void *karg, void *uarg)
5493 {
5494         struct lu_env      env;
5495         struct obd_device *obd = exp->exp_obd;
5496         struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
5497         struct dt_device  *dt = mdt->mdt_bottom;
5498         int rc;
5499
5500         ENTRY;
5501         CDEBUG(D_IOCTL, "handling ioctl cmd %#x\n", cmd);
5502         rc = lu_env_init(&env, LCT_MD_THREAD);
5503         if (rc)
5504                 RETURN(rc);
5505
5506         switch (cmd) {
5507         case OBD_IOC_SYNC:
5508                 rc = mdt_device_sync(&env, mdt);
5509                 break;
5510         case OBD_IOC_SET_READONLY:
5511                 rc = dt->dd_ops->dt_ro(&env, dt);
5512                 break;
5513         case OBD_IOC_ABORT_RECOVERY:
5514                 CERROR("%s: Aborting recovery for device\n", mdt_obd_name(mdt));
5515                 target_stop_recovery_thread(obd);
5516                 rc = 0;
5517                 break;
5518         case OBD_IOC_CHANGELOG_REG:
5519         case OBD_IOC_CHANGELOG_DEREG:
5520         case OBD_IOC_CHANGELOG_CLEAR:
5521                 rc = mdt_ioc_child(&env, mdt, cmd, len, karg);
5522                 break;
5523         case OBD_IOC_START_LFSCK: {
5524                 struct md_device *next = mdt->mdt_child;
5525                 struct obd_ioctl_data *data = karg;
5526                 struct lfsck_start_param lsp;
5527
5528                 if (unlikely(data == NULL)) {
5529                         rc = -EINVAL;
5530                         break;
5531                 }
5532
5533                 lsp.lsp_start = (struct lfsck_start *)(data->ioc_inlbuf1);
5534                 lsp.lsp_namespace = mdt->mdt_namespace;
5535                 rc = next->md_ops->mdo_iocontrol(&env, next, cmd, 0, &lsp);
5536                 break;
5537         }
5538         case OBD_IOC_STOP_LFSCK: {
5539                 struct md_device *next = mdt->mdt_child;
5540
5541                 rc = next->md_ops->mdo_iocontrol(&env, next, cmd, 0, NULL);
5542                 break;
5543         }
5544         case OBD_IOC_GET_OBJ_VERSION: {
5545                 struct mdt_thread_info *mti;
5546                 mti = lu_context_key_get(&env.le_ctx, &mdt_thread_key);
5547                 memset(mti, 0, sizeof *mti);
5548                 mti->mti_env = &env;
5549                 mti->mti_mdt = mdt;
5550                 mti->mti_exp = exp;
5551
5552                 rc = mdt_ioc_version_get(mti, karg);
5553                 break;
5554         }
5555         case OBD_IOC_CATLOGLIST:
5556                 rc = llog_catalog_list(&env, mdt->mdt_bottom, 0, karg);
5557                 break;
5558         default:
5559                 rc = -EOPNOTSUPP;
5560                 CERROR("%s: Not supported cmd = %d, rc = %d\n",
5561                         mdt_obd_name(mdt), cmd, rc);
5562         }
5563
5564         lu_env_fini(&env);
5565         RETURN(rc);
5566 }
5567
5568 int mdt_postrecov(const struct lu_env *env, struct mdt_device *mdt)
5569 {
5570         struct lu_device *ld = md2lu_dev(mdt->mdt_child);
5571         int rc;
5572         ENTRY;
5573
5574         rc = ld->ld_ops->ldo_recovery_complete(env, ld);
5575         RETURN(rc);
5576 }
5577
5578 int mdt_obd_postrecov(struct obd_device *obd)
5579 {
5580         struct lu_env env;
5581         int rc;
5582
5583         rc = lu_env_init(&env, LCT_MD_THREAD);
5584         if (rc)
5585                 RETURN(rc);
5586         rc = mdt_postrecov(&env, mdt_dev(obd->obd_lu_dev));
5587         lu_env_fini(&env);
5588         return rc;
5589 }
5590
5591 static struct obd_ops mdt_obd_device_ops = {
5592         .o_owner          = THIS_MODULE,
5593         .o_set_info_async = mdt_obd_set_info_async,
5594         .o_connect        = mdt_obd_connect,
5595         .o_reconnect      = mdt_obd_reconnect,
5596         .o_disconnect     = mdt_obd_disconnect,
5597         .o_init_export    = mdt_init_export,
5598         .o_destroy_export = mdt_destroy_export,
5599         .o_iocontrol      = mdt_iocontrol,
5600         .o_postrecov      = mdt_obd_postrecov,
5601 };
5602
5603 static struct lu_device* mdt_device_fini(const struct lu_env *env,
5604                                          struct lu_device *d)
5605 {
5606         struct mdt_device *m = mdt_dev(d);
5607         ENTRY;
5608
5609         mdt_fini(env, m);
5610         RETURN(NULL);
5611 }
5612
5613 static struct lu_device *mdt_device_free(const struct lu_env *env,
5614                                          struct lu_device *d)
5615 {
5616         struct mdt_device *m = mdt_dev(d);
5617         ENTRY;
5618
5619         lu_device_fini(&m->mdt_lu_dev);
5620         OBD_FREE_PTR(m);
5621
5622         RETURN(NULL);
5623 }
5624
5625 static struct lu_device *mdt_device_alloc(const struct lu_env *env,
5626                                           struct lu_device_type *t,
5627                                           struct lustre_cfg *cfg)
5628 {
5629         struct lu_device  *l;
5630         struct mdt_device *m;
5631
5632         OBD_ALLOC_PTR(m);
5633         if (m != NULL) {
5634                 int rc;
5635
5636                 l = &m->mdt_lu_dev;
5637                 rc = mdt_init0(env, m, t, cfg);
5638                 if (rc != 0) {
5639                         mdt_device_free(env, l);
5640                         l = ERR_PTR(rc);
5641                         return l;
5642                 }
5643         } else
5644                 l = ERR_PTR(-ENOMEM);
5645         return l;
5646 }
5647
5648 /* context key constructor/destructor: mdt_key_init, mdt_key_fini */
5649 LU_KEY_INIT(mdt, struct mdt_thread_info);
5650
5651 static void mdt_key_fini(const struct lu_context *ctx,
5652                          struct lu_context_key *key, void* data)
5653 {
5654         struct mdt_thread_info *info = data;
5655
5656         if (info->mti_big_lmm) {
5657                 OBD_FREE_LARGE(info->mti_big_lmm, info->mti_big_lmmsize);
5658                 info->mti_big_lmm = NULL;
5659                 info->mti_big_lmmsize = 0;
5660         }
5661         OBD_FREE_PTR(info);
5662 }
5663
5664 /* context key: mdt_thread_key */
5665 LU_CONTEXT_KEY_DEFINE(mdt, LCT_MD_THREAD);
5666
5667 struct lu_ucred *mdt_ucred(const struct mdt_thread_info *info)
5668 {
5669         return lu_ucred(info->mti_env);
5670 }
5671
5672 struct lu_ucred *mdt_ucred_check(const struct mdt_thread_info *info)
5673 {
5674         return lu_ucred_check(info->mti_env);
5675 }
5676
5677 /**
5678  * Enable/disable COS (Commit On Sharing).
5679  *
5680  * Set/Clear the COS flag in mdt options.
5681  *
5682  * \param mdt mdt device
5683  * \param val 0 disables COS, other values enable COS
5684  */
5685 void mdt_enable_cos(struct mdt_device *mdt, int val)
5686 {
5687         struct lu_env env;
5688         int rc;
5689
5690         mdt->mdt_opts.mo_cos = !!val;
5691         rc = lu_env_init(&env, LCT_LOCAL);
5692         if (unlikely(rc != 0)) {
5693                 CWARN("lu_env initialization failed with rc = %d,"
5694                       "cannot sync\n", rc);
5695                 return;
5696         }
5697         mdt_device_sync(&env, mdt);
5698         lu_env_fini(&env);
5699 }
5700
5701 /**
5702  * Check COS (Commit On Sharing) status.
5703  *
5704  * Return COS flag status.
5705  *
5706  * \param mdt mdt device
5707  */
5708 int mdt_cos_is_enabled(struct mdt_device *mdt)
5709 {
5710         return mdt->mdt_opts.mo_cos != 0;
5711 }
5712
5713 static struct lu_device_type_operations mdt_device_type_ops = {
5714         .ldto_device_alloc = mdt_device_alloc,
5715         .ldto_device_free  = mdt_device_free,
5716         .ldto_device_fini  = mdt_device_fini
5717 };
5718
5719 static struct lu_device_type mdt_device_type = {
5720         .ldt_tags     = LU_DEVICE_MD,
5721         .ldt_name     = LUSTRE_MDT_NAME,
5722         .ldt_ops      = &mdt_device_type_ops,
5723         .ldt_ctx_tags = LCT_MD_THREAD
5724 };
5725
5726 static int __init mdt_mod_init(void)
5727 {
5728         struct lprocfs_static_vars lvars;
5729         int rc;
5730
5731         CLASSERT(sizeof("0x0123456789ABCDEF:0x01234567:0x01234567") ==
5732                  FID_NOBRACE_LEN + 1);
5733         CLASSERT(sizeof("[0x0123456789ABCDEF:0x01234567:0x01234567]") ==
5734                  FID_LEN + 1);
5735         rc = lu_kmem_init(mdt_caches);
5736         if (rc)
5737                 return rc;
5738
5739         rc = mds_mod_init();
5740         if (rc)
5741                 GOTO(lu_fini, rc);
5742
5743         lprocfs_mdt_init_vars(&lvars);
5744         rc = class_register_type(&mdt_obd_device_ops, NULL, NULL,
5745 #ifndef HAVE_ONLY_PROCFS_SEQ
5746                                 lvars.module_vars,
5747 #endif
5748                                 LUSTRE_MDT_NAME, &mdt_device_type);
5749         if (rc)
5750                 GOTO(mds_fini, rc);
5751 lu_fini:
5752         if (rc)
5753                 lu_kmem_fini(mdt_caches);
5754 mds_fini:
5755         if (rc)
5756                 mds_mod_exit();
5757         return rc;
5758 }
5759
5760 static void __exit mdt_mod_exit(void)
5761 {
5762         class_unregister_type(LUSTRE_MDT_NAME);
5763         mds_mod_exit();
5764         lu_kmem_fini(mdt_caches);
5765 }
5766
5767 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
5768 MODULE_DESCRIPTION("Lustre Metadata Target ("LUSTRE_MDT_NAME")");
5769 MODULE_LICENSE("GPL");
5770
5771 cfs_module(mdt, LUSTRE_VERSION_STRING, mdt_mod_init, mdt_mod_exit);