Whamcloud - gitweb
64557082d981c8b5285a87c9b36a77dbf8ccf5c6
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2010, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mdt/mdt_handler.c
32  *
33  * Lustre Metadata Target (mdt) request handler
34  *
35  * Author: Peter Braam <braam@clusterfs.com>
36  * Author: Andreas Dilger <adilger@clusterfs.com>
37  * Author: Phil Schwan <phil@clusterfs.com>
38  * Author: Mike Shaver <shaver@clusterfs.com>
39  * Author: Nikita Danilov <nikita@clusterfs.com>
40  * Author: Huang Hua <huanghua@clusterfs.com>
41  * Author: Yury Umanets <umka@clusterfs.com>
42  */
43
44 #define DEBUG_SUBSYSTEM S_MDS
45
46 #include <linux/module.h>
47 #include <linux/pagemap.h>
48
49 #include <dt_object.h>
50 #include <lustre_acl.h>
51 #include <lustre_export.h>
52 #include <uapi/linux/lustre/lustre_ioctl.h>
53 #include <lustre_lfsck.h>
54 #include <lustre_log.h>
55 #include <lustre_nodemap.h>
56 #include <lustre_mds.h>
57 #include <uapi/linux/lustre/lustre_param.h>
58 #include <lustre_quota.h>
59 #include <lustre_swab.h>
60 #include <lustre_lmv.h>
61 #include <obd.h>
62 #include <obd_support.h>
63 #include <lustre_barrier.h>
64 #include <obd_cksum.h>
65 #include <llog_swab.h>
66
67 #include "mdt_internal.h"
68
69 static unsigned int max_mod_rpcs_per_client = 8;
70 module_param(max_mod_rpcs_per_client, uint, 0644);
71 MODULE_PARM_DESC(max_mod_rpcs_per_client, "maximum number of modify RPCs in flight allowed per client");
72
73 mdl_mode_t mdt_mdl_lock_modes[] = {
74         [LCK_MINMODE] = MDL_MINMODE,
75         [LCK_EX]      = MDL_EX,
76         [LCK_PW]      = MDL_PW,
77         [LCK_PR]      = MDL_PR,
78         [LCK_CW]      = MDL_CW,
79         [LCK_CR]      = MDL_CR,
80         [LCK_NL]      = MDL_NL,
81         [LCK_GROUP]   = MDL_GROUP
82 };
83
84 enum ldlm_mode mdt_dlm_lock_modes[] = {
85         [MDL_MINMODE]   = LCK_MINMODE,
86         [MDL_EX]        = LCK_EX,
87         [MDL_PW]        = LCK_PW,
88         [MDL_PR]        = LCK_PR,
89         [MDL_CW]        = LCK_CW,
90         [MDL_CR]        = LCK_CR,
91         [MDL_NL]        = LCK_NL,
92         [MDL_GROUP]     = LCK_GROUP
93 };
94
95 static struct mdt_device *mdt_dev(struct lu_device *d);
96
97 static const struct lu_object_operations mdt_obj_ops;
98
99 /* Slab for MDT object allocation */
100 static struct kmem_cache *mdt_object_kmem;
101
102 /* For HSM restore handles */
103 struct kmem_cache *mdt_hsm_cdt_kmem;
104
105 /* For HSM request handles */
106 struct kmem_cache *mdt_hsm_car_kmem;
107
108 static struct lu_kmem_descr mdt_caches[] = {
109         {
110                 .ckd_cache = &mdt_object_kmem,
111                 .ckd_name  = "mdt_obj",
112                 .ckd_size  = sizeof(struct mdt_object)
113         },
114         {
115                 .ckd_cache      = &mdt_hsm_cdt_kmem,
116                 .ckd_name       = "mdt_cdt_restore_handle",
117                 .ckd_size       = sizeof(struct cdt_restore_handle)
118         },
119         {
120                 .ckd_cache      = &mdt_hsm_car_kmem,
121                 .ckd_name       = "mdt_cdt_agent_req",
122                 .ckd_size       = sizeof(struct cdt_agent_req)
123         },
124         {
125                 .ckd_cache = NULL
126         }
127 };
128
129 __u64 mdt_get_disposition(struct ldlm_reply *rep, __u64 op_flag)
130 {
131         if (!rep)
132                 return 0;
133         return rep->lock_policy_res1 & op_flag;
134 }
135
136 void mdt_clear_disposition(struct mdt_thread_info *info,
137                            struct ldlm_reply *rep, __u64 op_flag)
138 {
139         if (info) {
140                 info->mti_opdata &= ~op_flag;
141                 tgt_opdata_clear(info->mti_env, op_flag);
142         }
143         if (rep)
144                 rep->lock_policy_res1 &= ~op_flag;
145 }
146
147 void mdt_set_disposition(struct mdt_thread_info *info,
148                          struct ldlm_reply *rep, __u64 op_flag)
149 {
150         if (info) {
151                 info->mti_opdata |= op_flag;
152                 tgt_opdata_set(info->mti_env, op_flag);
153         }
154         if (rep)
155                 rep->lock_policy_res1 |= op_flag;
156 }
157
158 void mdt_lock_reg_init(struct mdt_lock_handle *lh, enum ldlm_mode lm)
159 {
160         lh->mlh_pdo_hash = 0;
161         lh->mlh_reg_mode = lm;
162         lh->mlh_rreg_mode = lm;
163         lh->mlh_type = MDT_REG_LOCK;
164 }
165
166 void mdt_lh_reg_init(struct mdt_lock_handle *lh, struct ldlm_lock *lock)
167 {
168         mdt_lock_reg_init(lh, lock->l_req_mode);
169         if (lock->l_req_mode == LCK_GROUP)
170                 lh->mlh_gid = lock->l_policy_data.l_inodebits.li_gid;
171 }
172
173 void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode,
174                        const struct lu_name *lname)
175 {
176         lh->mlh_reg_mode = lock_mode;
177         lh->mlh_pdo_mode = LCK_MINMODE;
178         lh->mlh_rreg_mode = lock_mode;
179         lh->mlh_type = MDT_PDO_LOCK;
180
181         if (lu_name_is_valid(lname)) {
182                 lh->mlh_pdo_hash = ll_full_name_hash(NULL, lname->ln_name,
183                                                      lname->ln_namelen);
184                 /* XXX Workaround for LU-2856
185                  *
186                  * Zero is a valid return value of full_name_hash, but
187                  * several users of mlh_pdo_hash assume a non-zero
188                  * hash value. We therefore map zero onto an
189                  * arbitrary, but consistent value (1) to avoid
190                  * problems further down the road. */
191                 if (unlikely(lh->mlh_pdo_hash == 0))
192                         lh->mlh_pdo_hash = 1;
193         } else {
194                 lh->mlh_pdo_hash = 0;
195         }
196 }
197
198 static void mdt_lock_pdo_mode(struct mdt_thread_info *info, struct mdt_object *o,
199                               struct mdt_lock_handle *lh)
200 {
201         mdl_mode_t mode;
202         ENTRY;
203
204         /*
205          * Any dir access needs couple of locks:
206          *
207          * 1) on part of dir we gonna take lookup/modify;
208          *
209          * 2) on whole dir to protect it from concurrent splitting and/or to
210          * flush client's cache for readdir().
211          *
212          * so, for a given mode and object this routine decides what lock mode
213          * to use for lock #2:
214          *
215          * 1) if caller's gonna lookup in dir then we need to protect dir from
216          * being splitted only - LCK_CR
217          *
218          * 2) if caller's gonna modify dir then we need to protect dir from
219          * being splitted and to flush cache - LCK_CW
220          *
221          * 3) if caller's gonna modify dir and that dir seems ready for
222          * splitting then we need to protect it from any type of access
223          * (lookup/modify/split) - LCK_EX --bzzz
224          */
225
226         LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
227         LASSERT(lh->mlh_pdo_mode == LCK_MINMODE);
228
229         /*
230          * Ask underlaying level its opinion about preferable PDO lock mode
231          * having access type passed as regular lock mode:
232          *
233          * - MDL_MINMODE means that lower layer does not want to specify lock
234          * mode;
235          *
236          * - MDL_NL means that no PDO lock should be taken. This is used in some
237          * cases. Say, for non-splittable directories no need to use PDO locks
238          * at all.
239          */
240         mode = mdo_lock_mode(info->mti_env, mdt_object_child(o),
241                              mdt_dlm_mode2mdl_mode(lh->mlh_reg_mode));
242
243         if (mode != MDL_MINMODE) {
244                 lh->mlh_pdo_mode = mdt_mdl_mode2dlm_mode(mode);
245         } else {
246                 /*
247                  * Lower layer does not want to specify locking mode. We do it
248                  * our selves. No special protection is needed, just flush
249                  * client's cache on modification and allow concurrent
250                  * mondification.
251                  */
252                 switch (lh->mlh_reg_mode) {
253                 case LCK_EX:
254                         lh->mlh_pdo_mode = LCK_EX;
255                         break;
256                 case LCK_PR:
257                         lh->mlh_pdo_mode = LCK_CR;
258                         break;
259                 case LCK_PW:
260                         lh->mlh_pdo_mode = LCK_CW;
261                         break;
262                 default:
263                         CERROR("Not expected lock type (0x%x)\n",
264                                (int)lh->mlh_reg_mode);
265                         LBUG();
266                 }
267         }
268
269         LASSERT(lh->mlh_pdo_mode != LCK_MINMODE);
270         EXIT;
271 }
272
273 static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
274                               struct lu_fid *fid)
275 {
276         struct mdt_device *mdt = info->mti_mdt;
277         struct lu_name *lname = &info->mti_name;
278         const char *start = fileset;
279         char *filename = info->mti_filename;
280         struct mdt_object *parent;
281         u32 mode;
282         int rc = 0;
283
284         LASSERT(!info->mti_cross_ref);
285
286         /*
287          * We may want to allow this to mount a completely separate
288          * fileset from the MDT in the future, but keeping it to
289          * ROOT/ only for now avoid potential security issues.
290          */
291         *fid = mdt->mdt_md_root_fid;
292
293         while (rc == 0 && start != NULL && *start != '\0') {
294                 const char *s1 = start;
295                 const char *s2;
296
297                 while (*++s1 == '/')
298                         ;
299                 s2 = s1;
300                 while (*s2 != '/' && *s2 != '\0')
301                         s2++;
302
303                 if (s2 == s1)
304                         break;
305
306                 start = s2;
307
308                 lname->ln_namelen = s2 - s1;
309                 if (lname->ln_namelen > NAME_MAX) {
310                         rc = -EINVAL;
311                         break;
312                 }
313
314                 /* reject .. as a path component */
315                 if (lname->ln_namelen == 2 &&
316                     strncmp(s1, "..", 2) == 0) {
317                         rc = -EINVAL;
318                         break;
319                 }
320
321                 strncpy(filename, s1, lname->ln_namelen);
322                 filename[lname->ln_namelen] = '\0';
323                 lname->ln_name = filename;
324
325                 parent = mdt_object_find(info->mti_env, mdt, fid);
326                 if (IS_ERR(parent)) {
327                         rc = PTR_ERR(parent);
328                         break;
329                 }
330                 /* Only got the fid of this obj by name */
331                 fid_zero(fid);
332                 rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname,
333                                 fid, &info->mti_spec);
334                 mdt_object_put(info->mti_env, parent);
335         }
336         if (!rc) {
337                 parent = mdt_object_find(info->mti_env, mdt, fid);
338                 if (IS_ERR(parent))
339                         rc = PTR_ERR(parent);
340                 else {
341                         mode = lu_object_attr(&parent->mot_obj);
342                         if (!S_ISDIR(mode)) {
343                                 rc = -ENOTDIR;
344                         } else if (mdt_is_remote_object(info, parent, parent)) {
345                                 if (!mdt->mdt_enable_remote_subdir_mount) {
346                                         rc = -EREMOTE;
347                                         LCONSOLE_WARN("%s: subdir mount '%s' refused because 'enable_remote_subdir_mount=0': rc = %d\n",
348                                                       mdt_obd_name(mdt),
349                                                       fileset, rc);
350                                 } else {
351                                         LCONSOLE_INFO("%s: subdir mount '%s' is remote and may be slow\n",
352                                                       mdt_obd_name(mdt),
353                                                       fileset);
354                                 }
355                         }
356                         mdt_object_put(info->mti_env, parent);
357                 }
358         }
359
360         return rc;
361 }
362
363 static int mdt_get_root(struct tgt_session_info *tsi)
364 {
365         struct mdt_thread_info  *info = tsi2mdt_info(tsi);
366         struct mdt_device       *mdt = info->mti_mdt;
367         struct mdt_body         *repbody;
368         char                    *fileset = NULL, *buffer = NULL;
369         int                      rc;
370         struct obd_export       *exp = info->mti_exp;
371         char                    *nodemap_fileset;
372
373         ENTRY;
374
375         rc = mdt_check_ucred(info);
376         if (rc)
377                 GOTO(out, rc = err_serious(rc));
378
379         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GET_ROOT_PACK))
380                 GOTO(out, rc = err_serious(-ENOMEM));
381
382         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
383         if (req_capsule_get_size(info->mti_pill, &RMF_NAME, RCL_CLIENT) > 0) {
384                 fileset = req_capsule_client_get(info->mti_pill, &RMF_NAME);
385                 if (fileset == NULL)
386                         GOTO(out, rc = err_serious(-EFAULT));
387         }
388
389         nodemap_fileset = nodemap_get_fileset(exp->exp_target_data.ted_nodemap);
390         if (nodemap_fileset && nodemap_fileset[0]) {
391                 CDEBUG(D_INFO, "nodemap fileset is %s\n", nodemap_fileset);
392                 if (fileset) {
393                         /* consider fileset from client as a sub-fileset
394                          * of the nodemap one */
395                         OBD_ALLOC(buffer, PATH_MAX + 1);
396                         if (buffer == NULL)
397                                 GOTO(out, rc = err_serious(-ENOMEM));
398                         if (snprintf(buffer, PATH_MAX + 1, "%s/%s",
399                                      nodemap_fileset, fileset) >= PATH_MAX + 1)
400                                 GOTO(out, rc = err_serious(-EINVAL));
401                         fileset = buffer;
402                 } else {
403                         /* enforce fileset as specified in the nodemap */
404                         fileset = nodemap_fileset;
405                 }
406         }
407
408         if (fileset) {
409                 CDEBUG(D_INFO, "Getting fileset %s\n", fileset);
410                 rc = mdt_lookup_fileset(info, fileset, &repbody->mbo_fid1);
411                 if (rc < 0)
412                         GOTO(out, rc = err_serious(rc));
413         } else {
414                 repbody->mbo_fid1 = mdt->mdt_md_root_fid;
415         }
416         repbody->mbo_valid |= OBD_MD_FLID;
417
418         EXIT;
419 out:
420         mdt_thread_info_fini(info);
421         if (buffer)
422                 OBD_FREE(buffer, PATH_MAX+1);
423         return rc;
424 }
425
426 static int mdt_statfs(struct tgt_session_info *tsi)
427 {
428         struct ptlrpc_request *req = tgt_ses_req(tsi);
429         struct mdt_thread_info *info = tsi2mdt_info(tsi);
430         struct mdt_device *mdt = info->mti_mdt;
431         struct tg_grants_data *tgd = &mdt->mdt_lut.lut_tgd;
432         struct md_device *next = mdt->mdt_child;
433         struct ptlrpc_service_part *svcpt;
434         struct obd_statfs *osfs;
435         struct mdt_body *reqbody = NULL;
436         struct mdt_statfs_cache *msf;
437         ktime_t kstart = ktime_get();
438         int current_blockbits;
439         int rc;
440
441         ENTRY;
442
443         svcpt = req->rq_rqbd->rqbd_svcpt;
444
445         /* This will trigger a watchdog timeout */
446         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
447                          (MDT_SERVICE_WATCHDOG_FACTOR *
448                           at_get(&svcpt->scp_at_estimate)) + 1);
449
450         rc = mdt_check_ucred(info);
451         if (rc)
452                 GOTO(out, rc = err_serious(rc));
453
454         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK))
455                 GOTO(out, rc = err_serious(-ENOMEM));
456
457         osfs = req_capsule_server_get(info->mti_pill, &RMF_OBD_STATFS);
458         if (!osfs)
459                 GOTO(out, rc = -EPROTO);
460
461         if (mdt_is_sum_statfs_client(req->rq_export) &&
462                 lustre_packed_msg_size(req->rq_reqmsg) ==
463                 req_capsule_fmt_size(req->rq_reqmsg->lm_magic,
464                                      &RQF_MDS_STATFS_NEW, RCL_CLIENT)) {
465                 req_capsule_extend(info->mti_pill, &RQF_MDS_STATFS_NEW);
466                 reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
467         }
468
469         if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS)
470                 msf = &mdt->mdt_sum_osfs;
471         else
472                 msf = &mdt->mdt_osfs;
473
474         if (msf->msf_age + OBD_STATFS_CACHE_SECONDS <= ktime_get_seconds()) {
475                         /** statfs data is too old, get up-to-date one */
476                         if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS)
477                                 rc = next->md_ops->mdo_statfs(info->mti_env,
478                                                               next, osfs);
479                         else
480                                 rc = dt_statfs(info->mti_env, mdt->mdt_bottom,
481                                                osfs);
482                         if (rc)
483                                 GOTO(out, rc);
484                         spin_lock(&mdt->mdt_lock);
485                         msf->msf_osfs = *osfs;
486                         msf->msf_age = ktime_get_seconds();
487                         spin_unlock(&mdt->mdt_lock);
488         } else {
489                         /** use cached statfs data */
490                         spin_lock(&mdt->mdt_lock);
491                         *osfs = msf->msf_osfs;
492                         spin_unlock(&mdt->mdt_lock);
493         }
494
495         /* tgd_blockbit is recordsize bits set during mkfs.
496          * This once set does not change. However, 'zfs set'
497          * can be used to change the MDT blocksize. Instead
498          * of using cached value of 'tgd_blockbit' always
499          * calculate the blocksize bits which may have
500          * changed.
501          */
502         current_blockbits = fls64(osfs->os_bsize) - 1;
503
504         /* at least try to account for cached pages.  its still racy and
505          * might be under-reporting if clients haven't announced their
506          * caches with brw recently */
507         CDEBUG(D_SUPER | D_CACHE, "blocks cached %llu granted %llu"
508                " pending %llu free %llu avail %llu\n",
509                tgd->tgd_tot_dirty, tgd->tgd_tot_granted,
510                tgd->tgd_tot_pending,
511                osfs->os_bfree << current_blockbits,
512                osfs->os_bavail << current_blockbits);
513
514         osfs->os_bavail -= min_t(u64, osfs->os_bavail,
515                                  ((tgd->tgd_tot_dirty + tgd->tgd_tot_pending +
516                                    osfs->os_bsize - 1) >> current_blockbits));
517
518         tgt_grant_sanity_check(mdt->mdt_lu_dev.ld_obd, __func__);
519         CDEBUG(D_CACHE, "%llu blocks: %llu free, %llu avail; "
520                "%llu objects: %llu free; state %x\n",
521                osfs->os_blocks, osfs->os_bfree, osfs->os_bavail,
522                osfs->os_files, osfs->os_ffree, osfs->os_state);
523
524         if (!exp_grant_param_supp(tsi->tsi_exp) &&
525             current_blockbits > COMPAT_BSIZE_SHIFT) {
526                 /* clients which don't support OBD_CONNECT_GRANT_PARAM
527                  * should not see a block size > page size, otherwise
528                  * cl_lost_grant goes mad. Therefore, we emulate a 4KB (=2^12)
529                  * block size which is the biggest block size known to work
530                  * with all client's page size. */
531                 osfs->os_blocks <<= current_blockbits - COMPAT_BSIZE_SHIFT;
532                 osfs->os_bfree  <<= current_blockbits - COMPAT_BSIZE_SHIFT;
533                 osfs->os_bavail <<= current_blockbits - COMPAT_BSIZE_SHIFT;
534                 osfs->os_bsize = 1 << COMPAT_BSIZE_SHIFT;
535         }
536         if (rc == 0)
537                 mdt_counter_incr(req, LPROC_MDT_STATFS,
538                                  ktime_us_delta(ktime_get(), kstart));
539 out:
540         mdt_thread_info_fini(info);
541         RETURN(rc);
542 }
543
544 __u32 mdt_lmm_dom_entry_check(struct lov_mds_md *lmm, int *is_dom_only)
545 {
546         struct lov_comp_md_v1 *comp_v1;
547         struct lov_mds_md *v1;
548         __u32 off;
549         __u32 dom_stripesize = 0;
550         int i;
551         bool has_ost_stripes = false;
552
553         ENTRY;
554
555         if (is_dom_only)
556                 *is_dom_only = 0;
557
558         if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_COMP_V1)
559                 RETURN(0);
560
561         comp_v1 = (struct lov_comp_md_v1 *)lmm;
562         off = le32_to_cpu(comp_v1->lcm_entries[0].lcme_offset);
563         v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
564
565         /* Fast check for DoM entry with no mirroring, should be the first */
566         if (le16_to_cpu(comp_v1->lcm_mirror_count) == 0 &&
567             lov_pattern(le32_to_cpu(v1->lmm_pattern)) != LOV_PATTERN_MDT)
568                 RETURN(0);
569
570         /* check all entries otherwise */
571         for (i = 0; i < le16_to_cpu(comp_v1->lcm_entry_count); i++) {
572                 struct lov_comp_md_entry_v1 *lcme;
573
574                 lcme = &comp_v1->lcm_entries[i];
575                 if (!(le32_to_cpu(lcme->lcme_flags) & LCME_FL_INIT))
576                         continue;
577
578                 off = le32_to_cpu(lcme->lcme_offset);
579                 v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
580
581                 if (lov_pattern(le32_to_cpu(v1->lmm_pattern)) ==
582                     LOV_PATTERN_MDT)
583                         dom_stripesize = le32_to_cpu(v1->lmm_stripe_size);
584                 else
585                         has_ost_stripes = true;
586
587                 if (dom_stripesize && has_ost_stripes)
588                         RETURN(dom_stripesize);
589         }
590         /* DoM-only case exits here */
591         if (is_dom_only && dom_stripesize)
592                 *is_dom_only = 1;
593         RETURN(dom_stripesize);
594 }
595
596 /**
597  * Pack size attributes into the reply.
598  */
599 int mdt_pack_size2body(struct mdt_thread_info *info,
600                         const struct lu_fid *fid, struct lustre_handle *lh)
601 {
602         struct mdt_body *b;
603         struct md_attr *ma = &info->mti_attr;
604         __u32 dom_stripe;
605         bool dom_lock = false;
606
607         ENTRY;
608
609         LASSERT(ma->ma_attr.la_valid & LA_MODE);
610
611         if (!S_ISREG(ma->ma_attr.la_mode) ||
612             !(ma->ma_valid & MA_LOV && ma->ma_lmm != NULL))
613                 RETURN(-ENODATA);
614
615         dom_stripe = mdt_lmm_dom_stripesize(ma->ma_lmm);
616         /* no DoM stripe, no size in reply */
617         if (!dom_stripe)
618                 RETURN(-ENOENT);
619
620         if (lustre_handle_is_used(lh)) {
621                 struct ldlm_lock *lock;
622
623                 lock = ldlm_handle2lock(lh);
624                 if (lock != NULL) {
625                         dom_lock = ldlm_has_dom(lock);
626                         LDLM_LOCK_PUT(lock);
627                 }
628         }
629
630         /* no DoM lock, no size in reply */
631         if (!dom_lock)
632                 RETURN(0);
633
634         /* Either DoM lock exists or LMM has only DoM stripe then
635          * return size on body. */
636         b = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
637
638         mdt_dom_object_size(info->mti_env, info->mti_mdt, fid, b, dom_lock);
639         RETURN(0);
640 }
641
642 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
643 /*
644  * Pack ACL data into the reply. UIDs/GIDs are mapped and filtered by nodemap.
645  *
646  * \param       info    thread info object
647  * \param       repbody reply to pack ACLs into
648  * \param       o       mdt object of file to examine
649  * \param       nodemap nodemap of client to reply to
650  * \retval      0       success
651  * \retval      -errno  error getting or parsing ACL from disk
652  */
653 int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody,
654                       struct mdt_object *o, struct lu_nodemap *nodemap)
655 {
656         const struct lu_env     *env = info->mti_env;
657         struct md_object        *next = mdt_object_child(o);
658         struct lu_buf           *buf = &info->mti_buf;
659         struct mdt_device       *mdt = info->mti_mdt;
660         struct req_capsule *pill = info->mti_pill;
661         int rc;
662
663         ENTRY;
664
665         buf->lb_buf = req_capsule_server_get(pill, &RMF_ACL);
666         buf->lb_len = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER);
667         if (buf->lb_len == 0)
668                 RETURN(0);
669
670         LASSERT(!info->mti_big_acl_used);
671 again:
672         rc = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_ACCESS);
673         if (rc < 0) {
674                 if (rc == -ENODATA) {
675                         repbody->mbo_aclsize = 0;
676                         repbody->mbo_valid |= OBD_MD_FLACL;
677                         rc = 0;
678                 } else if (rc == -EOPNOTSUPP) {
679                         rc = 0;
680                 } else if (rc == -ERANGE) {
681                         if (exp_connect_large_acl(info->mti_exp) &&
682                             !info->mti_big_acl_used) {
683                                 if (info->mti_big_acl == NULL) {
684                                         info->mti_big_aclsize =
685                                                         min_t(unsigned int,
686                                                               mdt->mdt_max_ea_size,
687                                                               XATTR_SIZE_MAX);
688                                         OBD_ALLOC_LARGE(info->mti_big_acl,
689                                                         info->mti_big_aclsize);
690                                         if (info->mti_big_acl == NULL) {
691                                                 info->mti_big_aclsize = 0;
692                                                 CERROR("%s: unable to grow "
693                                                        DFID" ACL buffer\n",
694                                                        mdt_obd_name(mdt),
695                                                        PFID(mdt_object_fid(o)));
696                                                 RETURN(-ENOMEM);
697                                         }
698                                 }
699
700                                 CDEBUG(D_INODE, "%s: grow the "DFID
701                                        " ACL buffer to size %d\n",
702                                        mdt_obd_name(mdt),
703                                        PFID(mdt_object_fid(o)),
704                                        info->mti_big_aclsize);
705
706                                 buf->lb_buf = info->mti_big_acl;
707                                 buf->lb_len = info->mti_big_aclsize;
708                                 info->mti_big_acl_used = 1;
709                                 goto again;
710                         }
711                         /* FS has ACL bigger that our limits */
712                         CDEBUG(D_INODE, "%s: "DFID" ACL can't fit into %d\n",
713                                mdt_obd_name(mdt), PFID(mdt_object_fid(o)),
714                                info->mti_big_aclsize);
715                         rc = -E2BIG;
716                 } else {
717                         CERROR("%s: unable to read "DFID" ACL: rc = %d\n",
718                                mdt_obd_name(mdt), PFID(mdt_object_fid(o)), rc);
719                 }
720         } else {
721                 rc = nodemap_map_acl(nodemap, buf->lb_buf,
722                                      rc, NODEMAP_FS_TO_CLIENT);
723                 /* if all ACLs mapped out, rc is still >= 0 */
724                 if (rc < 0) {
725                         CERROR("%s: nodemap_map_acl unable to parse "DFID
726                                " ACL: rc = %d\n", mdt_obd_name(mdt),
727                                PFID(mdt_object_fid(o)), rc);
728                         repbody->mbo_aclsize = 0;
729                         repbody->mbo_valid &= ~OBD_MD_FLACL;
730                 } else {
731                         repbody->mbo_aclsize = rc;
732                         repbody->mbo_valid |= OBD_MD_FLACL;
733                         rc = 0;
734                 }
735         }
736
737         RETURN(rc);
738 }
739 #endif
740
741 /* XXX Look into layout in MDT layer. */
742 static inline bool mdt_hsm_is_released(struct lov_mds_md *lmm)
743 {
744         struct lov_comp_md_v1   *comp_v1;
745         struct lov_mds_md       *v1;
746         int                      i;
747
748         if (lmm->lmm_magic == LOV_MAGIC_COMP_V1) {
749                 comp_v1 = (struct lov_comp_md_v1 *)lmm;
750
751                 for (i = 0; i < comp_v1->lcm_entry_count; i++) {
752                         v1 = (struct lov_mds_md *)((char *)comp_v1 +
753                                 comp_v1->lcm_entries[i].lcme_offset);
754                         /* We don't support partial release for now */
755                         if (!(v1->lmm_pattern & LOV_PATTERN_F_RELEASED))
756                                 return false;
757                 }
758                 return true;
759         } else {
760                 return (lmm->lmm_pattern & LOV_PATTERN_F_RELEASED) ?
761                         true : false;
762         }
763 }
764
765 void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
766                         const struct lu_attr *attr, const struct lu_fid *fid)
767 {
768         struct md_attr *ma = &info->mti_attr;
769         struct obd_export *exp = info->mti_exp;
770         struct lu_nodemap *nodemap = NULL;
771
772         LASSERT(ma->ma_valid & MA_INODE);
773
774         if (attr->la_valid & LA_ATIME) {
775                 b->mbo_atime = attr->la_atime;
776                 b->mbo_valid |= OBD_MD_FLATIME;
777         }
778         if (attr->la_valid & LA_MTIME) {
779                 b->mbo_mtime = attr->la_mtime;
780                 b->mbo_valid |= OBD_MD_FLMTIME;
781         }
782         if (attr->la_valid & LA_CTIME) {
783                 b->mbo_ctime = attr->la_ctime;
784                 b->mbo_valid |= OBD_MD_FLCTIME;
785         }
786         if (attr->la_valid & LA_BTIME) {
787                 b->mbo_btime = attr->la_btime;
788                 b->mbo_valid |= OBD_MD_FLBTIME;
789         }
790         if (attr->la_valid & LA_FLAGS) {
791                 b->mbo_flags = attr->la_flags;
792                 b->mbo_valid |= OBD_MD_FLFLAGS;
793         }
794         if (attr->la_valid & LA_NLINK) {
795                 b->mbo_nlink = attr->la_nlink;
796                 b->mbo_valid |= OBD_MD_FLNLINK;
797         }
798         if (attr->la_valid & (LA_UID|LA_GID)) {
799                 nodemap = nodemap_get_from_exp(exp);
800                 if (IS_ERR(nodemap))
801                         goto out;
802         }
803         if (attr->la_valid & LA_UID) {
804                 b->mbo_uid = nodemap_map_id(nodemap, NODEMAP_UID,
805                                             NODEMAP_FS_TO_CLIENT,
806                                             attr->la_uid);
807                 b->mbo_valid |= OBD_MD_FLUID;
808         }
809         if (attr->la_valid & LA_GID) {
810                 b->mbo_gid = nodemap_map_id(nodemap, NODEMAP_GID,
811                                             NODEMAP_FS_TO_CLIENT,
812                                             attr->la_gid);
813                 b->mbo_valid |= OBD_MD_FLGID;
814         }
815
816         if (attr->la_valid & LA_PROJID) {
817                 /* TODO, nodemap for project id */
818                 b->mbo_projid = attr->la_projid;
819                 b->mbo_valid |= OBD_MD_FLPROJID;
820         }
821
822         b->mbo_mode = attr->la_mode;
823         if (attr->la_valid & LA_MODE)
824                 b->mbo_valid |= OBD_MD_FLMODE;
825         if (attr->la_valid & LA_TYPE)
826                 b->mbo_valid |= OBD_MD_FLTYPE;
827
828         if (fid != NULL) {
829                 b->mbo_fid1 = *fid;
830                 b->mbo_valid |= OBD_MD_FLID;
831                 CDEBUG(D_INODE, DFID": nlink=%d, mode=%o, valid=%#llx\n",
832                        PFID(fid), b->mbo_nlink, b->mbo_mode, b->mbo_valid);
833         }
834
835         if (!(attr->la_valid & LA_TYPE))
836                 return;
837
838         b->mbo_rdev   = attr->la_rdev;
839         b->mbo_size   = attr->la_size;
840         b->mbo_blocks = attr->la_blocks;
841
842         if (!S_ISREG(attr->la_mode)) {
843                 b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLRDEV;
844         } else if (ma->ma_need & MA_LOV && !(ma->ma_valid & MA_LOV)) {
845                 /* means no objects are allocated on osts. */
846                 LASSERT(!(ma->ma_valid & MA_LOV));
847                 /* just ignore blocks occupied by extend attributes on MDS */
848                 b->mbo_blocks = 0;
849                 /* if no object is allocated on osts, the size on mds is valid.
850                  * b=22272 */
851                 b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
852         } else if ((ma->ma_valid & MA_LOV) && ma->ma_lmm != NULL) {
853                 if (mdt_hsm_is_released(ma->ma_lmm)) {
854                         /* A released file stores its size on MDS. */
855                         /* But return 1 block for released file, unless tools
856                          * like tar will consider it fully sparse. (LU-3864)
857                          */
858                         if (unlikely(b->mbo_size == 0))
859                                 b->mbo_blocks = 0;
860                         else
861                                 b->mbo_blocks = 1;
862                         b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
863                 } else if (info->mti_som_valid) { /* som is valid */
864                         b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
865                 } else if (ma->ma_valid & MA_SOM) { /* lsom is valid */
866                         b->mbo_valid |= OBD_MD_FLLAZYSIZE | OBD_MD_FLLAZYBLOCKS;
867                         b->mbo_size = ma->ma_som.ms_size;
868                         b->mbo_blocks = ma->ma_som.ms_blocks;
869                 }
870         }
871
872         if (fid != NULL && (b->mbo_valid & OBD_MD_FLSIZE ||
873                             b->mbo_valid & OBD_MD_FLLAZYSIZE))
874                 CDEBUG(D_VFSTRACE, DFID": returning size %llu\n",
875                        PFID(fid), (unsigned long long)b->mbo_size);
876
877 out:
878         if (!IS_ERR_OR_NULL(nodemap))
879                 nodemap_putref(nodemap);
880 }
881
882 static inline int mdt_body_has_lov(const struct lu_attr *la,
883                                    const struct mdt_body *body)
884 {
885         return (S_ISREG(la->la_mode) && (body->mbo_valid & OBD_MD_FLEASIZE)) ||
886                (S_ISDIR(la->la_mode) && (body->mbo_valid & OBD_MD_FLDIREA));
887 }
888
889 void mdt_client_compatibility(struct mdt_thread_info *info)
890 {
891         struct mdt_body       *body;
892         struct ptlrpc_request *req = mdt_info_req(info);
893         struct obd_export     *exp = req->rq_export;
894         struct md_attr        *ma = &info->mti_attr;
895         struct lu_attr        *la = &ma->ma_attr;
896         ENTRY;
897
898         if (exp_connect_layout(exp))
899                 /* the client can deal with 16-bit lmm_stripe_count */
900                 RETURN_EXIT;
901
902         body = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
903
904         if (!mdt_body_has_lov(la, body))
905                 RETURN_EXIT;
906
907         /* now we have a reply with a lov for a client not compatible with the
908          * layout lock so we have to clean the layout generation number */
909         if (S_ISREG(la->la_mode))
910                 ma->ma_lmm->lmm_layout_gen = 0;
911         EXIT;
912 }
913
914 static int mdt_attr_get_eabuf_size(struct mdt_thread_info *info,
915                                    struct mdt_object *o)
916 {
917         const struct lu_env *env = info->mti_env;
918         int rc, rc2;
919
920         rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL,
921                           XATTR_NAME_LOV);
922
923         if (rc == -ENODATA)
924                 rc = 0;
925
926         if (rc < 0)
927                 goto out;
928
929         /* Is it a directory? Let's check for the LMV as well */
930         if (S_ISDIR(lu_object_attr(&mdt_object_child(o)->mo_lu))) {
931                 rc2 = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL,
932                                    XATTR_NAME_LMV);
933
934                 if (rc2 == -ENODATA)
935                         rc2 = mo_xattr_get(env, mdt_object_child(o),
936                                            &LU_BUF_NULL,
937                                            XATTR_NAME_DEFAULT_LMV);
938
939                 if ((rc2 < 0 && rc2 != -ENODATA) || (rc2 > rc))
940                         rc = rc2;
941         }
942
943 out:
944         return rc;
945 }
946
947 int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
948                       const char *name)
949 {
950         const struct lu_env *env = info->mti_env;
951         int rc;
952         ENTRY;
953
954         LASSERT(info->mti_big_lmm_used == 0);
955         rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL, name);
956         if (rc < 0)
957                 RETURN(rc);
958
959         /* big_lmm may need to be grown */
960         if (info->mti_big_lmmsize < rc) {
961                 int size = size_roundup_power2(rc);
962
963                 if (info->mti_big_lmmsize > 0) {
964                         /* free old buffer */
965                         LASSERT(info->mti_big_lmm);
966                         OBD_FREE_LARGE(info->mti_big_lmm,
967                                        info->mti_big_lmmsize);
968                         info->mti_big_lmm = NULL;
969                         info->mti_big_lmmsize = 0;
970                 }
971
972                 OBD_ALLOC_LARGE(info->mti_big_lmm, size);
973                 if (info->mti_big_lmm == NULL)
974                         RETURN(-ENOMEM);
975                 info->mti_big_lmmsize = size;
976         }
977         LASSERT(info->mti_big_lmmsize >= rc);
978
979         info->mti_buf.lb_buf = info->mti_big_lmm;
980         info->mti_buf.lb_len = info->mti_big_lmmsize;
981         rc = mo_xattr_get(env, mdt_object_child(o), &info->mti_buf, name);
982
983         RETURN(rc);
984 }
985
986 int __mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
987                      struct md_attr *ma, const char *name)
988 {
989         struct md_object *next = mdt_object_child(o);
990         struct lu_buf    *buf = &info->mti_buf;
991         int rc;
992
993         if (strcmp(name, XATTR_NAME_LOV) == 0) {
994                 buf->lb_buf = ma->ma_lmm;
995                 buf->lb_len = ma->ma_lmm_size;
996                 LASSERT(!(ma->ma_valid & MA_LOV));
997         } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
998                 buf->lb_buf = ma->ma_lmv;
999                 buf->lb_len = ma->ma_lmv_size;
1000                 LASSERT(!(ma->ma_valid & MA_LMV));
1001         } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1002                 buf->lb_buf = ma->ma_default_lmv;
1003                 buf->lb_len = ma->ma_default_lmv_size;
1004                 LASSERT(!(ma->ma_valid & MA_LMV_DEF));
1005         } else {
1006                 return -EINVAL;
1007         }
1008
1009         LASSERT(buf->lb_buf);
1010
1011         rc = mo_xattr_get(info->mti_env, next, buf, name);
1012         if (rc > 0) {
1013
1014 got:
1015                 if (strcmp(name, XATTR_NAME_LOV) == 0) {
1016                         if (info->mti_big_lmm_used)
1017                                 ma->ma_lmm = info->mti_big_lmm;
1018
1019                         /* NOT return LOV EA with hole to old client. */
1020                         if (unlikely(le32_to_cpu(ma->ma_lmm->lmm_pattern) &
1021                                      LOV_PATTERN_F_HOLE) &&
1022                             !(exp_connect_flags(info->mti_exp) &
1023                               OBD_CONNECT_LFSCK)) {
1024                                 return -EIO;
1025                         } else {
1026                                 ma->ma_lmm_size = rc;
1027                                 ma->ma_valid |= MA_LOV;
1028                         }
1029                 } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
1030                         if (info->mti_big_lmm_used)
1031                                 ma->ma_lmv = info->mti_big_lmm;
1032
1033                         ma->ma_lmv_size = rc;
1034                         ma->ma_valid |= MA_LMV;
1035                 } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1036                         ma->ma_default_lmv_size = rc;
1037                         ma->ma_valid |= MA_LMV_DEF;
1038                 }
1039
1040                 /* Update mdt_max_mdsize so all clients will be aware that */
1041                 if (info->mti_mdt->mdt_max_mdsize < rc)
1042                         info->mti_mdt->mdt_max_mdsize = rc;
1043
1044                 rc = 0;
1045         } else if (rc == -ENODATA) {
1046                 /* no LOV EA */
1047                 rc = 0;
1048         } else if (rc == -ERANGE) {
1049                 /* Default LMV has fixed size, so it must be able to fit
1050                  * in the original buffer */
1051                 if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0)
1052                         return rc;
1053                 rc = mdt_big_xattr_get(info, o, name);
1054                 if (rc > 0) {
1055                         info->mti_big_lmm_used = 1;
1056                         goto got;
1057                 }
1058         }
1059
1060         return rc;
1061 }
1062
1063 int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
1064                    struct md_attr *ma, const char *name)
1065 {
1066         int rc;
1067
1068         if (!info->mti_big_lmm) {
1069                 OBD_ALLOC(info->mti_big_lmm, PAGE_SIZE);
1070                 if (!info->mti_big_lmm)
1071                         return -ENOMEM;
1072                 info->mti_big_lmmsize = PAGE_SIZE;
1073         }
1074
1075         if (strcmp(name, XATTR_NAME_LOV) == 0) {
1076                 ma->ma_lmm = info->mti_big_lmm;
1077                 ma->ma_lmm_size = info->mti_big_lmmsize;
1078                 ma->ma_valid &= ~MA_LOV;
1079         } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
1080                 ma->ma_lmv = info->mti_big_lmm;
1081                 ma->ma_lmv_size = info->mti_big_lmmsize;
1082                 ma->ma_valid &= ~MA_LMV;
1083         } else {
1084                 LBUG();
1085         }
1086
1087         LASSERT(!info->mti_big_lmm_used);
1088         rc = __mdt_stripe_get(info, o, ma, name);
1089         /* since big_lmm is always used here, clear 'used' flag to avoid
1090          * assertion in mdt_big_xattr_get().
1091          */
1092         info->mti_big_lmm_used = 0;
1093
1094         return rc;
1095 }
1096
1097 int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
1098                       struct lu_fid *pfid)
1099 {
1100         struct lu_buf           *buf = &info->mti_buf;
1101         struct link_ea_header   *leh;
1102         struct link_ea_entry    *lee;
1103         int                      rc;
1104         ENTRY;
1105
1106         buf->lb_buf = info->mti_big_lmm;
1107         buf->lb_len = info->mti_big_lmmsize;
1108         rc = mo_xattr_get(info->mti_env, mdt_object_child(o),
1109                           buf, XATTR_NAME_LINK);
1110         /* ignore errors, MA_PFID won't be set and it is
1111          * up to the caller to treat this as an error */
1112         if (rc == -ERANGE || buf->lb_len == 0) {
1113                 rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
1114                 buf->lb_buf = info->mti_big_lmm;
1115                 buf->lb_len = info->mti_big_lmmsize;
1116         }
1117
1118         if (rc < 0)
1119                 RETURN(rc);
1120         if (rc < sizeof(*leh)) {
1121                 CERROR("short LinkEA on "DFID": rc = %d\n",
1122                        PFID(mdt_object_fid(o)), rc);
1123                 RETURN(-ENODATA);
1124         }
1125
1126         leh = (struct link_ea_header *) buf->lb_buf;
1127         lee = (struct link_ea_entry *)(leh + 1);
1128         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
1129                 leh->leh_magic = LINK_EA_MAGIC;
1130                 leh->leh_reccount = __swab32(leh->leh_reccount);
1131                 leh->leh_len = __swab64(leh->leh_len);
1132         }
1133         if (leh->leh_magic != LINK_EA_MAGIC)
1134                 RETURN(-EINVAL);
1135         if (leh->leh_reccount == 0)
1136                 RETURN(-ENODATA);
1137
1138         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
1139         fid_be_to_cpu(pfid, pfid);
1140
1141         RETURN(0);
1142 }
1143
1144 int mdt_attr_get_pfid_name(struct mdt_thread_info *info, struct mdt_object *o,
1145                            struct lu_fid *pfid, struct lu_name *lname)
1146 {
1147         struct lu_buf *buf = &info->mti_buf;
1148         struct link_ea_header *leh;
1149         struct link_ea_entry *lee;
1150         int reclen;
1151         int rc;
1152
1153         buf->lb_buf = info->mti_xattr_buf;
1154         buf->lb_len = sizeof(info->mti_xattr_buf);
1155         rc = mo_xattr_get(info->mti_env, mdt_object_child(o), buf,
1156                           XATTR_NAME_LINK);
1157         if (rc == -ERANGE) {
1158                 rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
1159                 buf->lb_buf = info->mti_big_lmm;
1160                 buf->lb_len = info->mti_big_lmmsize;
1161         }
1162         if (rc < 0)
1163                 return rc;
1164
1165         if (rc < sizeof(*leh)) {
1166                 CERROR("short LinkEA on "DFID": rc = %d\n",
1167                        PFID(mdt_object_fid(o)), rc);
1168                 return -ENODATA;
1169         }
1170
1171         leh = (struct link_ea_header *)buf->lb_buf;
1172         lee = (struct link_ea_entry *)(leh + 1);
1173         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
1174                 leh->leh_magic = LINK_EA_MAGIC;
1175                 leh->leh_reccount = __swab32(leh->leh_reccount);
1176                 leh->leh_len = __swab64(leh->leh_len);
1177         }
1178         if (leh->leh_magic != LINK_EA_MAGIC)
1179                 return -EINVAL;
1180
1181         if (leh->leh_reccount == 0)
1182                 return -ENODATA;
1183
1184         linkea_entry_unpack(lee, &reclen, lname, pfid);
1185
1186         return 0;
1187 }
1188
1189 int mdt_attr_get_complex(struct mdt_thread_info *info,
1190                          struct mdt_object *o, struct md_attr *ma)
1191 {
1192         const struct lu_env *env = info->mti_env;
1193         struct md_object    *next = mdt_object_child(o);
1194         struct lu_buf       *buf = &info->mti_buf;
1195         int                  need = ma->ma_need;
1196         int                  rc = 0, rc2;
1197         u32                  mode;
1198         ENTRY;
1199
1200         ma->ma_valid = 0;
1201
1202         if (mdt_object_exists(o) == 0)
1203                 GOTO(out, rc = -ENOENT);
1204         mode = lu_object_attr(&next->mo_lu);
1205
1206         if (need & MA_INODE) {
1207                 ma->ma_need = MA_INODE;
1208                 rc = mo_attr_get(env, next, ma);
1209                 if (rc)
1210                         GOTO(out, rc);
1211
1212                 if (S_ISREG(mode))
1213                         (void) mdt_get_som(info, o, ma);
1214                 ma->ma_valid |= MA_INODE;
1215         }
1216
1217         if (need & MA_PFID) {
1218                 rc = mdt_attr_get_pfid(info, o, &ma->ma_pfid);
1219                 if (rc == 0)
1220                         ma->ma_valid |= MA_PFID;
1221                 /* ignore this error, parent fid is not mandatory */
1222                 rc = 0;
1223         }
1224
1225         if (need & MA_LOV && (S_ISREG(mode) || S_ISDIR(mode))) {
1226                 rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LOV);
1227                 if (rc)
1228                         GOTO(out, rc);
1229         }
1230
1231         if (need & MA_LMV && S_ISDIR(mode)) {
1232                 rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
1233                 if (rc != 0)
1234                         GOTO(out, rc);
1235         }
1236
1237         if (need & MA_LMV_DEF && S_ISDIR(mode)) {
1238                 rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV);
1239                 if (rc != 0)
1240                         GOTO(out, rc);
1241         }
1242
1243         /*
1244          * In the handle of MA_INODE, we may already get the SOM attr.
1245          */
1246         if (need & MA_SOM && S_ISREG(mode) && !(ma->ma_valid & MA_SOM)) {
1247                 rc = mdt_get_som(info, o, ma);
1248                 if (rc != 0)
1249                         GOTO(out, rc);
1250         }
1251
1252         if (need & MA_HSM && S_ISREG(mode)) {
1253                 buf->lb_buf = info->mti_xattr_buf;
1254                 buf->lb_len = sizeof(info->mti_xattr_buf);
1255                 BUILD_BUG_ON(sizeof(struct hsm_attrs) >
1256                              sizeof(info->mti_xattr_buf));
1257                 rc2 = mo_xattr_get(info->mti_env, next, buf, XATTR_NAME_HSM);
1258                 rc2 = lustre_buf2hsm(info->mti_xattr_buf, rc2, &ma->ma_hsm);
1259                 if (rc2 == 0)
1260                         ma->ma_valid |= MA_HSM;
1261                 else if (rc2 < 0 && rc2 != -ENODATA)
1262                         GOTO(out, rc = rc2);
1263         }
1264
1265 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
1266         if (need & MA_ACL_DEF && S_ISDIR(mode)) {
1267                 buf->lb_buf = ma->ma_acl;
1268                 buf->lb_len = ma->ma_acl_size;
1269                 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT);
1270                 if (rc2 > 0) {
1271                         ma->ma_acl_size = rc2;
1272                         ma->ma_valid |= MA_ACL_DEF;
1273                 } else if (rc2 == -ENODATA) {
1274                         /* no ACLs */
1275                         ma->ma_acl_size = 0;
1276                 } else
1277                         GOTO(out, rc = rc2);
1278         }
1279 #endif
1280 out:
1281         ma->ma_need = need;
1282         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = %#llx ma_lmm=%p\n",
1283                rc, ma->ma_valid, ma->ma_lmm);
1284         RETURN(rc);
1285 }
1286
1287 static int mdt_getattr_internal(struct mdt_thread_info *info,
1288                                 struct mdt_object *o, int ma_need)
1289 {
1290         struct mdt_device *mdt = info->mti_mdt;
1291         struct md_object *next = mdt_object_child(o);
1292         const struct mdt_body *reqbody = info->mti_body;
1293         struct ptlrpc_request *req = mdt_info_req(info);
1294         struct md_attr *ma = &info->mti_attr;
1295         struct lu_attr *la = &ma->ma_attr;
1296         struct req_capsule *pill = info->mti_pill;
1297         const struct lu_env *env = info->mti_env;
1298         struct mdt_body *repbody;
1299         struct lu_buf *buffer = &info->mti_buf;
1300         struct obd_export *exp = info->mti_exp;
1301         ktime_t kstart = ktime_get();
1302         int rc;
1303
1304         ENTRY;
1305
1306         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
1307                 RETURN(err_serious(-ENOMEM));
1308
1309         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
1310
1311         ma->ma_valid = 0;
1312
1313         if (mdt_object_remote(o)) {
1314                 /* This object is located on remote node.*/
1315                 /* Return -ENOTSUPP for old client */
1316                 if (!mdt_is_dne_client(req->rq_export))
1317                         GOTO(out, rc = -ENOTSUPP);
1318
1319                 repbody->mbo_fid1 = *mdt_object_fid(o);
1320                 repbody->mbo_valid = OBD_MD_FLID | OBD_MD_MDS;
1321                 GOTO(out, rc = 0);
1322         }
1323
1324         if (reqbody->mbo_eadatasize > 0) {
1325                 buffer->lb_buf = req_capsule_server_get(pill, &RMF_MDT_MD);
1326                 if (buffer->lb_buf == NULL)
1327                         GOTO(out, rc = -EPROTO);
1328                 buffer->lb_len = req_capsule_get_size(pill, &RMF_MDT_MD,
1329                                                       RCL_SERVER);
1330         } else {
1331                 buffer->lb_buf = NULL;
1332                 buffer->lb_len = 0;
1333                 ma_need &= ~(MA_LOV | MA_LMV);
1334                 CDEBUG(D_INFO, "%s: RPC from %s: does not need LOVEA.\n",
1335                        mdt_obd_name(info->mti_mdt),
1336                        req->rq_export->exp_client_uuid.uuid);
1337         }
1338
1339         /* from 2.12.58 intent_getattr pack default LMV in reply */
1340         if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
1341             ((reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA)) ==
1342                     (OBD_MD_MEA | OBD_MD_DEFAULT_MEA)) &&
1343             req_capsule_has_field(&req->rq_pill, &RMF_DEFAULT_MDT_MD,
1344                                   RCL_SERVER)) {
1345                 ma->ma_lmv = buffer->lb_buf;
1346                 ma->ma_lmv_size = buffer->lb_len;
1347                 ma->ma_default_lmv = req_capsule_server_get(pill,
1348                                                 &RMF_DEFAULT_MDT_MD);
1349                 ma->ma_default_lmv_size = req_capsule_get_size(pill,
1350                                                 &RMF_DEFAULT_MDT_MD,
1351                                                 RCL_SERVER);
1352                 ma->ma_need = MA_INODE;
1353                 if (ma->ma_lmv_size > 0)
1354                         ma->ma_need |= MA_LMV;
1355                 if (ma->ma_default_lmv_size > 0)
1356                         ma->ma_need |= MA_LMV_DEF;
1357         } else if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
1358                    (reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA))) {
1359                 /* If it is dir and client require MEA, then we got MEA */
1360                 /* Assumption: MDT_MD size is enough for lmv size. */
1361                 ma->ma_lmv = buffer->lb_buf;
1362                 ma->ma_lmv_size = buffer->lb_len;
1363                 ma->ma_need = MA_INODE;
1364                 if (ma->ma_lmv_size > 0) {
1365                         if (reqbody->mbo_valid & OBD_MD_MEA) {
1366                                 ma->ma_need |= MA_LMV;
1367                         } else if (reqbody->mbo_valid & OBD_MD_DEFAULT_MEA) {
1368                                 ma->ma_need |= MA_LMV_DEF;
1369                                 ma->ma_default_lmv = buffer->lb_buf;
1370                                 ma->ma_lmv = NULL;
1371                                 ma->ma_default_lmv_size = buffer->lb_len;
1372                                 ma->ma_lmv_size = 0;
1373                         }
1374                 }
1375         } else {
1376                 ma->ma_lmm = buffer->lb_buf;
1377                 ma->ma_lmm_size = buffer->lb_len;
1378                 ma->ma_need = MA_INODE | MA_HSM;
1379                 if (ma->ma_lmm_size > 0) {
1380                         ma->ma_need |= MA_LOV;
1381                         /* Older clients may crash if they getattr overstriped
1382                          * files
1383                          */
1384                         if (!exp_connect_overstriping(exp) &&
1385                             mdt_lmm_is_overstriping(ma->ma_lmm))
1386                                 RETURN(-EOPNOTSUPP);
1387                 }
1388         }
1389
1390         if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
1391             reqbody->mbo_valid & OBD_MD_FLDIREA  &&
1392             lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) {
1393                 /* get default stripe info for this dir. */
1394                 ma->ma_need |= MA_LOV_DEF;
1395         }
1396         ma->ma_need |= ma_need;
1397
1398         rc = mdt_attr_get_complex(info, o, ma);
1399         if (unlikely(rc)) {
1400                 CDEBUG_LIMIT(rc == -ENOENT ? D_OTHER : D_ERROR,
1401                              "%s: getattr error for "DFID": rc = %d\n",
1402                              mdt_obd_name(info->mti_mdt),
1403                              PFID(mdt_object_fid(o)), rc);
1404                 RETURN(rc);
1405         }
1406
1407         /* if file is released, check if a restore is running */
1408         if (ma->ma_valid & MA_HSM) {
1409                 repbody->mbo_valid |= OBD_MD_TSTATE;
1410                 if ((ma->ma_hsm.mh_flags & HS_RELEASED) &&
1411                     mdt_hsm_restore_is_running(info, mdt_object_fid(o)))
1412                         repbody->mbo_t_state = MS_RESTORE;
1413         }
1414
1415         if (unlikely(!(ma->ma_valid & MA_INODE)))
1416                 RETURN(-EFAULT);
1417
1418         mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
1419
1420         if (mdt_body_has_lov(la, reqbody)) {
1421                 u32 stripe_count = 1;
1422                 bool fixed_layout = false;
1423
1424                 if (ma->ma_valid & MA_LOV) {
1425                         LASSERT(ma->ma_lmm_size);
1426                         repbody->mbo_eadatasize = ma->ma_lmm_size;
1427                         if (S_ISDIR(la->la_mode))
1428                                 repbody->mbo_valid |= OBD_MD_FLDIREA;
1429                         else
1430                                 repbody->mbo_valid |= OBD_MD_FLEASIZE;
1431                         mdt_dump_lmm(D_INFO, ma->ma_lmm, repbody->mbo_valid);
1432                 }
1433                 if (ma->ma_valid & MA_LMV) {
1434                         struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1435                         u32 magic = le32_to_cpu(lmv->lmv_magic);
1436
1437                         /* Return -ENOTSUPP for old client */
1438                         if (!mdt_is_striped_client(req->rq_export))
1439                                 RETURN(-ENOTSUPP);
1440
1441                         LASSERT(S_ISDIR(la->la_mode));
1442                         mdt_dump_lmv(D_INFO, ma->ma_lmv);
1443                         repbody->mbo_eadatasize = ma->ma_lmv_size;
1444                         repbody->mbo_valid |= (OBD_MD_FLDIREA|OBD_MD_MEA);
1445
1446                         stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
1447                         fixed_layout = lmv_is_fixed(lmv);
1448                         if (magic == LMV_MAGIC_STRIPE && lmv_is_restriping(lmv))
1449                                 mdt_restripe_migrate_add(info, o);
1450                         else if (magic == LMV_MAGIC_V1 &&
1451                                  lmv_is_restriping(lmv))
1452                                 mdt_restripe_update_add(info, o);
1453                 }
1454                 if (ma->ma_valid & MA_LMV_DEF) {
1455                         /* Return -ENOTSUPP for old client */
1456                         if (!mdt_is_striped_client(req->rq_export))
1457                                 RETURN(-ENOTSUPP);
1458                         LASSERT(S_ISDIR(la->la_mode));
1459                         /*
1460                          * when ll_dir_getstripe() gets default LMV, it
1461                          * checks mbo_eadatasize.
1462                          */
1463                         if (!(ma->ma_valid & MA_LMV))
1464                                 repbody->mbo_eadatasize =
1465                                         ma->ma_default_lmv_size;
1466                         repbody->mbo_valid |= (OBD_MD_FLDIREA |
1467                                                OBD_MD_DEFAULT_MEA);
1468                 }
1469                 CDEBUG(D_VFSTRACE,
1470                        "dirent count %llu stripe count %u MDT count %d\n",
1471                        ma->ma_attr.la_dirent_count, stripe_count,
1472                        atomic_read(&mdt->mdt_mds_mds_conns) + 1);
1473                 if (ma->ma_attr.la_dirent_count != LU_DIRENT_COUNT_UNSET &&
1474                     ma->ma_attr.la_dirent_count >
1475                         mdt->mdt_restriper.mdr_dir_split_count &&
1476                     !fid_is_root(mdt_object_fid(o)) &&
1477                     mdt->mdt_enable_dir_auto_split &&
1478                     !o->mot_restriping &&
1479                     stripe_count < atomic_read(&mdt->mdt_mds_mds_conns) + 1 &&
1480                     !fixed_layout)
1481                         mdt_auto_split_add(info, o);
1482         } else if (S_ISLNK(la->la_mode) &&
1483                    reqbody->mbo_valid & OBD_MD_LINKNAME) {
1484                 buffer->lb_buf = ma->ma_lmm;
1485                 /* eadatasize from client includes NULL-terminator, so
1486                  * there is no need to read it */
1487                 buffer->lb_len = reqbody->mbo_eadatasize - 1;
1488                 rc = mo_readlink(env, next, buffer);
1489                 if (unlikely(rc <= 0)) {
1490                         CERROR("%s: readlink failed for "DFID": rc = %d\n",
1491                                mdt_obd_name(info->mti_mdt),
1492                                PFID(mdt_object_fid(o)), rc);
1493                         rc = -EFAULT;
1494                 } else {
1495                         int print_limit = min_t(int, PAGE_SIZE - 128, rc);
1496
1497                         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READLINK_EPROTO))
1498                                 rc -= 2;
1499                         repbody->mbo_valid |= OBD_MD_LINKNAME;
1500                         /* we need to report back size with NULL-terminator
1501                          * because client expects that */
1502                         repbody->mbo_eadatasize = rc + 1;
1503                         if (repbody->mbo_eadatasize != reqbody->mbo_eadatasize)
1504                                 CDEBUG(D_INODE, "%s: Read shorter symlink %d "
1505                                        "on "DFID ", expected %d\n",
1506                                        mdt_obd_name(info->mti_mdt),
1507                                        rc, PFID(mdt_object_fid(o)),
1508                                        reqbody->mbo_eadatasize - 1);
1509                         /* NULL terminate */
1510                         ((char *)ma->ma_lmm)[rc] = 0;
1511
1512                         /* If the total CDEBUG() size is larger than a page, it
1513                          * will print a warning to the console, avoid this by
1514                          * printing just the last part of the symlink. */
1515                         CDEBUG(D_INODE, "symlink dest %s%.*s, len = %d\n",
1516                                print_limit < rc ? "..." : "", print_limit,
1517                                (char *)ma->ma_lmm + rc - print_limit, rc);
1518                         rc = 0;
1519                 }
1520         }
1521
1522         if (reqbody->mbo_valid & OBD_MD_FLMODEASIZE) {
1523                 repbody->mbo_max_mdsize = info->mti_mdt->mdt_max_mdsize;
1524                 repbody->mbo_valid |= OBD_MD_FLMODEASIZE;
1525                 CDEBUG(D_INODE, "changing the max MD size to %u\n",
1526                        repbody->mbo_max_mdsize);
1527         }
1528
1529 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
1530         if ((exp_connect_flags(req->rq_export) & OBD_CONNECT_ACL) &&
1531                  (reqbody->mbo_valid & OBD_MD_FLACL)) {
1532                 struct lu_nodemap *nodemap = nodemap_get_from_exp(exp);
1533                 if (IS_ERR(nodemap))
1534                         RETURN(PTR_ERR(nodemap));
1535
1536                 rc = mdt_pack_acl2body(info, repbody, o, nodemap);
1537                 nodemap_putref(nodemap);
1538         }
1539 #endif
1540
1541 out:
1542         if (rc == 0)
1543                 mdt_counter_incr(req, LPROC_MDT_GETATTR,
1544                                  ktime_us_delta(ktime_get(), kstart));
1545
1546         RETURN(rc);
1547 }
1548
1549 static int mdt_getattr(struct tgt_session_info *tsi)
1550 {
1551         struct mdt_thread_info  *info = tsi2mdt_info(tsi);
1552         struct mdt_object       *obj = info->mti_object;
1553         struct req_capsule      *pill = info->mti_pill;
1554         struct mdt_body         *reqbody;
1555         struct mdt_body         *repbody;
1556         int rc, rc2;
1557         ENTRY;
1558
1559         if (unlikely(info->mti_object == NULL))
1560                 RETURN(-EPROTO);
1561
1562         reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
1563         LASSERT(reqbody);
1564         LASSERT(lu_object_assert_exists(&obj->mot_obj));
1565
1566         /* Special case for Data-on-MDT files to get data version */
1567         if (unlikely(reqbody->mbo_valid & OBD_MD_FLDATAVERSION)) {
1568                 rc = mdt_data_version_get(tsi);
1569                 GOTO(out, rc);
1570         }
1571
1572         /* Unlike intent case where we need to pre-fill out buffers early on
1573          * in intent policy for ldlm reasons, here we can have a much better
1574          * guess at EA size by just reading it from disk.
1575          * Exceptions are readdir and (missing) directory striping */
1576         /* Readlink */
1577         if (reqbody->mbo_valid & OBD_MD_LINKNAME) {
1578                 /* No easy way to know how long is the symlink, but it cannot
1579                  * be more than PATH_MAX, so we allocate +1 */
1580                 rc = PATH_MAX + 1;
1581         /* A special case for fs ROOT: getattr there might fetch
1582          * default EA for entire fs, not just for this dir!
1583          */
1584         } else if (lu_fid_eq(mdt_object_fid(obj),
1585                              &info->mti_mdt->mdt_md_root_fid) &&
1586                    (reqbody->mbo_valid & OBD_MD_FLDIREA) &&
1587                    (lustre_msg_get_opc(mdt_info_req(info)->rq_reqmsg) ==
1588                                                                  MDS_GETATTR)) {
1589                 /* Should the default strping be bigger, mdt_fix_reply
1590                  * will reallocate */
1591                 rc = DEF_REP_MD_SIZE;
1592         } else {
1593                 /* Read the actual EA size from disk */
1594                 rc = mdt_attr_get_eabuf_size(info, obj);
1595         }
1596
1597         if (rc < 0)
1598                 GOTO(out, rc = err_serious(rc));
1599
1600         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, rc);
1601
1602         /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
1603          * by default. If the target object has more ACL entries, then
1604          * enlarge the buffer when necessary. */
1605         req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
1606                              LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1607
1608         rc = req_capsule_server_pack(pill);
1609         if (unlikely(rc != 0))
1610                 GOTO(out, rc = err_serious(rc));
1611
1612         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
1613         LASSERT(repbody != NULL);
1614         repbody->mbo_eadatasize = 0;
1615         repbody->mbo_aclsize = 0;
1616
1617         rc = mdt_check_ucred(info);
1618         if (unlikely(rc))
1619                 GOTO(out_shrink, rc);
1620
1621         info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
1622
1623         rc = mdt_getattr_internal(info, obj, 0);
1624         EXIT;
1625 out_shrink:
1626         mdt_client_compatibility(info);
1627         rc2 = mdt_fix_reply(info);
1628         if (rc == 0)
1629                 rc = rc2;
1630 out:
1631         mdt_thread_info_fini(info);
1632         return rc;
1633 }
1634
1635 /**
1636  * Handler of layout intent RPC requiring the layout modification
1637  *
1638  * \param[in]  info     thread environment
1639  * \param[in]  obj      object
1640  * \param[out] lhc      object ldlm lock handle
1641  * \param[in]  layout   layout change descriptor
1642  *
1643  * \retval 0    on success
1644  * \retval < 0  error code
1645  */
1646 int mdt_layout_change(struct mdt_thread_info *info, struct mdt_object *obj,
1647                       struct mdt_lock_handle *lhc,
1648                       struct md_layout_change *layout)
1649 {
1650         int rc;
1651
1652         ENTRY;
1653
1654         if (!mdt_object_exists(obj))
1655                 RETURN(-ENOENT);
1656
1657         if (!S_ISREG(lu_object_attr(&obj->mot_obj)))
1658                 RETURN(-EINVAL);
1659
1660         rc = mo_permission(info->mti_env, NULL, mdt_object_child(obj), NULL,
1661                            MAY_WRITE);
1662         if (rc)
1663                 RETURN(rc);
1664
1665         rc = mdt_check_resent_lock(info, obj, lhc);
1666         if (rc < 0)
1667                 RETURN(rc);
1668
1669         if (rc > 0) {
1670                 /* not resent */
1671                 __u64 lockpart = MDS_INODELOCK_LAYOUT;
1672
1673                 /* take layout lock to prepare layout change */
1674                 if (layout->mlc_opc == MD_LAYOUT_WRITE)
1675                         lockpart |= MDS_INODELOCK_UPDATE;
1676
1677                 mdt_lock_handle_init(lhc);
1678                 mdt_lock_reg_init(lhc, LCK_EX);
1679                 rc = mdt_reint_object_lock(info, obj, lhc, lockpart, false);
1680                 if (rc)
1681                         RETURN(rc);
1682         }
1683
1684         mutex_lock(&obj->mot_som_mutex);
1685         rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout);
1686         mutex_unlock(&obj->mot_som_mutex);
1687
1688         if (rc)
1689                 mdt_object_unlock(info, obj, lhc, 1);
1690
1691         RETURN(rc);
1692 }
1693
1694 /**
1695  * Exchange MOF_LOV_CREATED flags between two objects after a
1696  * layout swap. No assumption is made on whether o1 or o2 have
1697  * created objects or not.
1698  *
1699  * \param[in,out] o1    First swap layout object
1700  * \param[in,out] o2    Second swap layout object
1701  */
1702 static void mdt_swap_lov_flag(struct mdt_object *o1, struct mdt_object *o2)
1703 {
1704         unsigned int o1_lov_created = o1->mot_lov_created;
1705
1706         mutex_lock(&o1->mot_lov_mutex);
1707         mutex_lock(&o2->mot_lov_mutex);
1708
1709         o1->mot_lov_created = o2->mot_lov_created;
1710         o2->mot_lov_created = o1_lov_created;
1711
1712         mutex_unlock(&o2->mot_lov_mutex);
1713         mutex_unlock(&o1->mot_lov_mutex);
1714 }
1715
1716 static int mdt_swap_layouts(struct tgt_session_info *tsi)
1717 {
1718         struct mdt_thread_info  *info;
1719         struct ptlrpc_request   *req = tgt_ses_req(tsi);
1720         struct obd_export       *exp = req->rq_export;
1721         struct mdt_object       *o1, *o2, *o;
1722         struct mdt_lock_handle  *lh1, *lh2;
1723         struct mdc_swap_layouts *msl;
1724         int                      rc;
1725         ENTRY;
1726
1727         /* client does not support layout lock, so layout swaping
1728          * is disabled.
1729          * FIXME: there is a problem for old clients which don't support
1730          * layout lock yet. If those clients have already opened the file
1731          * they won't be notified at all so that old layout may still be
1732          * used to do IO. This can be fixed after file release is landed by
1733          * doing exclusive open and taking full EX ibits lock. - Jinshan */
1734         if (!exp_connect_layout(exp))
1735                 RETURN(-EOPNOTSUPP);
1736
1737         info = tsi2mdt_info(tsi);
1738         if (unlikely(info->mti_object == NULL))
1739                 RETURN(-EPROTO);
1740
1741         if (info->mti_dlm_req != NULL)
1742                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1743
1744         o1 = info->mti_object;
1745         o = o2 = mdt_object_find(info->mti_env, info->mti_mdt,
1746                                 &info->mti_body->mbo_fid2);
1747         if (IS_ERR(o))
1748                 GOTO(out, rc = PTR_ERR(o));
1749
1750         if (mdt_object_remote(o) || !mdt_object_exists(o)) /* remote object */
1751                 GOTO(put, rc = -ENOENT);
1752
1753         rc = lu_fid_cmp(&info->mti_body->mbo_fid1, &info->mti_body->mbo_fid2);
1754         if (unlikely(rc == 0)) /* same file, you kidding me? no-op. */
1755                 GOTO(put, rc);
1756
1757         if (rc < 0)
1758                 swap(o1, o2);
1759
1760         /* permission check. Make sure the calling process having permission
1761          * to write both files. */
1762         rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL,
1763                            MAY_WRITE);
1764         if (rc < 0)
1765                 GOTO(put, rc);
1766
1767         rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2), NULL,
1768                            MAY_WRITE);
1769         if (rc < 0)
1770                 GOTO(put, rc);
1771
1772         msl = req_capsule_client_get(info->mti_pill, &RMF_SWAP_LAYOUTS);
1773         if (msl == NULL)
1774                 GOTO(put, rc = -EPROTO);
1775
1776         lh1 = &info->mti_lh[MDT_LH_NEW];
1777         mdt_lock_reg_init(lh1, LCK_EX);
1778         lh2 = &info->mti_lh[MDT_LH_OLD];
1779         mdt_lock_reg_init(lh2, LCK_EX);
1780
1781         rc = mdt_object_lock(info, o1, lh1, MDS_INODELOCK_LAYOUT |
1782                              MDS_INODELOCK_XATTR);
1783         if (rc < 0)
1784                 GOTO(put, rc);
1785
1786         rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
1787                              MDS_INODELOCK_XATTR);
1788         if (rc < 0)
1789                 GOTO(unlock1, rc);
1790
1791         rc = mo_swap_layouts(info->mti_env, mdt_object_child(o1),
1792                              mdt_object_child(o2), msl->msl_flags);
1793         if (rc < 0)
1794                 GOTO(unlock2, rc);
1795
1796         mdt_swap_lov_flag(o1, o2);
1797
1798 unlock2:
1799         mdt_object_unlock(info, o2, lh2, rc);
1800 unlock1:
1801         mdt_object_unlock(info, o1, lh1, rc);
1802 put:
1803         mdt_object_put(info->mti_env, o);
1804 out:
1805         mdt_thread_info_fini(info);
1806         RETURN(rc);
1807 }
1808
1809 static int mdt_raw_lookup(struct mdt_thread_info *info,
1810                           struct mdt_object *parent,
1811                           const struct lu_name *lname,
1812                           struct ldlm_reply *ldlm_rep)
1813 {
1814         struct lu_fid   *child_fid = &info->mti_tmp_fid1;
1815         int              rc;
1816         ENTRY;
1817
1818         LASSERT(!info->mti_cross_ref);
1819
1820         /* Only got the fid of this obj by name */
1821         fid_zero(child_fid);
1822         rc = mdo_lookup(info->mti_env, mdt_object_child(info->mti_object),
1823                         lname, child_fid, &info->mti_spec);
1824         if (rc == 0) {
1825                 struct mdt_body *repbody;
1826
1827                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1828                 repbody->mbo_fid1 = *child_fid;
1829                 repbody->mbo_valid = OBD_MD_FLID;
1830                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
1831         } else if (rc == -ENOENT) {
1832                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
1833         }
1834
1835         RETURN(rc);
1836 }
1837
1838 /*
1839  * UPDATE lock should be taken against parent, and be released before exit;
1840  * child_bits lock should be taken against child, and be returned back:
1841  *            (1)normal request should release the child lock;
1842  *            (2)intent request will grant the lock to client.
1843  */
1844 static int mdt_getattr_name_lock(struct mdt_thread_info *info,
1845                                  struct mdt_lock_handle *lhc,
1846                                  __u64 child_bits,
1847                                  struct ldlm_reply *ldlm_rep)
1848 {
1849         struct ptlrpc_request *req = mdt_info_req(info);
1850         struct mdt_body *reqbody = NULL;
1851         struct mdt_object *parent = info->mti_object;
1852         struct mdt_object *child = NULL;
1853         struct lu_fid *child_fid = &info->mti_tmp_fid1;
1854         struct lu_name *lname = NULL;
1855         struct mdt_lock_handle *lhp = NULL;
1856         struct ldlm_lock *lock;
1857         struct req_capsule *pill = info->mti_pill;
1858         __u64 try_bits = 0;
1859         bool is_resent;
1860         int ma_need = 0;
1861         int rc;
1862
1863         ENTRY;
1864
1865         is_resent = lustre_handle_is_used(&lhc->mlh_reg_lh);
1866         LASSERT(ergo(is_resent,
1867                      lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT));
1868
1869         if (parent == NULL)
1870                 RETURN(-ENOENT);
1871
1872         if (info->mti_cross_ref) {
1873                 /* Only getattr on the child. Parent is on another node. */
1874                 mdt_set_disposition(info, ldlm_rep,
1875                                     DISP_LOOKUP_EXECD | DISP_LOOKUP_POS);
1876                 child = parent;
1877                 CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID", "
1878                        "ldlm_rep = %p\n",
1879                        PFID(mdt_object_fid(child)), ldlm_rep);
1880
1881                 rc = mdt_check_resent_lock(info, child, lhc);
1882                 if (rc < 0) {
1883                         RETURN(rc);
1884                 } else if (rc > 0) {
1885                         mdt_lock_handle_init(lhc);
1886                         mdt_lock_reg_init(lhc, LCK_PR);
1887
1888                         /*
1889                          * Object's name entry is on another MDS, it will
1890                          * request PERM lock only because LOOKUP lock is owned
1891                          * by the MDS where name entry resides.
1892                          *
1893                          * TODO: it should try layout lock too. - Jinshan
1894                          */
1895                         child_bits &= ~(MDS_INODELOCK_LOOKUP |
1896                                         MDS_INODELOCK_LAYOUT);
1897                         child_bits |= MDS_INODELOCK_PERM;
1898
1899                         rc = mdt_object_lock(info, child, lhc, child_bits);
1900                         if (rc < 0)
1901                                 RETURN(rc);
1902                 }
1903
1904                 /* Finally, we can get attr for child. */
1905                 if (!mdt_object_exists(child)) {
1906                         LU_OBJECT_DEBUG(D_INFO, info->mti_env,
1907                                         &child->mot_obj,
1908                                         "remote object doesn't exist.");
1909                         mdt_object_unlock(info, child, lhc, 1);
1910                         RETURN(-ENOENT);
1911                 }
1912
1913                 rc = mdt_getattr_internal(info, child, 0);
1914                 if (unlikely(rc != 0)) {
1915                         mdt_object_unlock(info, child, lhc, 1);
1916                         RETURN(rc);
1917                 }
1918
1919                 rc = mdt_pack_secctx_in_reply(info, child);
1920                 if (unlikely(rc)) {
1921                         mdt_object_unlock(info, child, lhc, 1);
1922                         RETURN(rc);
1923                 }
1924
1925                 rc = mdt_pack_encctx_in_reply(info, child);
1926                 if (unlikely(rc))
1927                         mdt_object_unlock(info, child, lhc, 1);
1928                 RETURN(rc);
1929         }
1930
1931         lname = &info->mti_name;
1932         mdt_name_unpack(pill, &RMF_NAME, lname, MNF_FIX_ANON);
1933
1934         if (lu_name_is_valid(lname)) {
1935                 if (mdt_object_remote(parent)) {
1936                         CERROR("%s: parent "DFID" is on remote target\n",
1937                                mdt_obd_name(info->mti_mdt),
1938                                PFID(mdt_object_fid(parent)));
1939                         RETURN(-EPROTO);
1940                 }
1941
1942                 CDEBUG(D_INODE, "getattr with lock for "DFID"/"DNAME", "
1943                        "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)),
1944                        PNAME(lname), ldlm_rep);
1945         } else {
1946                 reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
1947                 if (unlikely(reqbody == NULL))
1948                         RETURN(err_serious(-EPROTO));
1949
1950                 *child_fid = reqbody->mbo_fid2;
1951                 if (unlikely(!fid_is_sane(child_fid)))
1952                         RETURN(err_serious(-EINVAL));
1953
1954                 if (lu_fid_eq(mdt_object_fid(parent), child_fid)) {
1955                         mdt_object_get(info->mti_env, parent);
1956                         child = parent;
1957                 } else {
1958                         child = mdt_object_find(info->mti_env, info->mti_mdt,
1959                                                 child_fid);
1960                         if (IS_ERR(child))
1961                                 RETURN(PTR_ERR(child));
1962                 }
1963
1964                 if (mdt_object_remote(child)) {
1965                         CERROR("%s: child "DFID" is on remote target\n",
1966                                mdt_obd_name(info->mti_mdt),
1967                                PFID(mdt_object_fid(child)));
1968                         GOTO(out_child, rc = -EPROTO);
1969                 }
1970
1971                 /* don't fetch LOOKUP lock if it's remote object */
1972                 rc = mdt_is_remote_object(info, parent, child);
1973                 if (rc < 0)
1974                         GOTO(out_child, rc);
1975                 if (rc)
1976                         child_bits &= ~MDS_INODELOCK_LOOKUP;
1977
1978                 CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
1979                        "ldlm_rep = %p\n",
1980                        PFID(mdt_object_fid(parent)),
1981                        PFID(&reqbody->mbo_fid2), ldlm_rep);
1982         }
1983
1984         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
1985
1986         if (unlikely(!mdt_object_exists(parent)) && lu_name_is_valid(lname)) {
1987                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
1988                                 &parent->mot_obj,
1989                                 "Parent doesn't exist!");
1990                 GOTO(out_child, rc = -ESTALE);
1991         }
1992
1993         if (lu_name_is_valid(lname)) {
1994                 /* Always allow to lookup ".." */
1995                 if (unlikely(lname->ln_namelen == 2 &&
1996                              lname->ln_name[0] == '.' &&
1997                              lname->ln_name[1] == '.'))
1998                         info->mti_spec.sp_permitted = 1;
1999
2000                 if (info->mti_body->mbo_valid == OBD_MD_FLID) {
2001                         rc = mdt_raw_lookup(info, parent, lname, ldlm_rep);
2002
2003                         RETURN(rc);
2004                 }
2005
2006                 /* step 1: lock parent only if parent is a directory */
2007                 if (S_ISDIR(lu_object_attr(&parent->mot_obj))) {
2008                         lhp = &info->mti_lh[MDT_LH_PARENT];
2009                         mdt_lock_pdo_init(lhp, LCK_PR, lname);
2010                         rc = mdt_object_lock(info, parent, lhp,
2011                                              MDS_INODELOCK_UPDATE);
2012                         if (unlikely(rc != 0))
2013                                 RETURN(rc);
2014                 }
2015
2016                 /* step 2: lookup child's fid by name */
2017                 fid_zero(child_fid);
2018                 rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname,
2019                                 child_fid, &info->mti_spec);
2020                 if (rc == -ENOENT)
2021                         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
2022
2023                 if (rc != 0)
2024                         GOTO(unlock_parent, rc);
2025
2026                 child = mdt_object_find(info->mti_env, info->mti_mdt,
2027                                         child_fid);
2028                 if (unlikely(IS_ERR(child)))
2029                         GOTO(unlock_parent, rc = PTR_ERR(child));
2030         }
2031
2032         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
2033
2034         /* step 3: lock child regardless if it is local or remote. */
2035         LASSERT(child);
2036
2037         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2);
2038         if (!mdt_object_exists(child)) {
2039                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2040                                 &child->mot_obj,
2041                                 "Object doesn't exist!");
2042                 GOTO(out_child, rc = -ENOENT);
2043         }
2044
2045         rc = mdt_check_resent_lock(info, child, lhc);
2046         if (rc < 0) {
2047                 GOTO(out_child, rc);
2048         } else if (rc > 0) {
2049                 mdt_lock_handle_init(lhc);
2050                 mdt_lock_reg_init(lhc, LCK_PR);
2051
2052                 if (!(child_bits & MDS_INODELOCK_UPDATE) &&
2053                     !mdt_object_remote(child)) {
2054                         struct md_attr *ma = &info->mti_attr;
2055
2056                         ma->ma_valid = 0;
2057                         ma->ma_need = MA_INODE;
2058                         rc = mdt_attr_get_complex(info, child, ma);
2059                         if (unlikely(rc != 0))
2060                                 GOTO(out_child, rc);
2061
2062                         /* If the file has not been changed for some time, we
2063                          * return not only a LOOKUP lock, but also an UPDATE
2064                          * lock and this might save us RPC on later STAT. For
2065                          * directories, it also let negative dentry cache start
2066                          * working for this dir. */
2067                         if (ma->ma_valid & MA_INODE &&
2068                             ma->ma_attr.la_valid & LA_CTIME &&
2069                             info->mti_mdt->mdt_namespace->ns_ctime_age_limit +
2070                             ma->ma_attr.la_ctime < ktime_get_real_seconds())
2071                                 child_bits |= MDS_INODELOCK_UPDATE;
2072                 }
2073
2074                 /* layout lock must be granted in a best-effort way
2075                  * for IT operations */
2076                 LASSERT(!(child_bits & MDS_INODELOCK_LAYOUT));
2077                 if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
2078                     !mdt_object_remote(child) && ldlm_rep != NULL) {
2079                         if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) &&
2080                             exp_connect_layout(info->mti_exp)) {
2081                                 /* try to grant layout lock for regular file. */
2082                                 try_bits = MDS_INODELOCK_LAYOUT;
2083                         }
2084                         /* Acquire DOM lock in advance for data-on-mdt file */
2085                         if (child != parent)
2086                                 try_bits |= MDS_INODELOCK_DOM;
2087                 }
2088
2089                 if (try_bits != 0) {
2090                         /* try layout lock, it may fail to be granted due to
2091                          * contention at LOOKUP or UPDATE */
2092                         rc = mdt_object_lock_try(info, child, lhc, &child_bits,
2093                                                  try_bits, false);
2094                         if (child_bits & MDS_INODELOCK_LAYOUT)
2095                                 ma_need |= MA_LOV;
2096                 } else {
2097                         /* Do not enqueue the UPDATE lock from MDT(cross-MDT),
2098                          * client will enqueue the lock to the remote MDT */
2099                         if (mdt_object_remote(child))
2100                                 child_bits &= ~MDS_INODELOCK_UPDATE;
2101                         rc = mdt_object_lock(info, child, lhc, child_bits);
2102                 }
2103                 if (unlikely(rc != 0))
2104                         GOTO(out_child, rc);
2105         }
2106
2107         /* finally, we can get attr for child. */
2108         rc = mdt_getattr_internal(info, child, ma_need);
2109         if (unlikely(rc != 0)) {
2110                 mdt_object_unlock(info, child, lhc, 1);
2111                 GOTO(out_child, rc);
2112         }
2113
2114         rc = mdt_pack_secctx_in_reply(info, child);
2115         if (unlikely(rc)) {
2116                 mdt_object_unlock(info, child, lhc, 1);
2117                 GOTO(out_child, rc);
2118         }
2119
2120         rc = mdt_pack_encctx_in_reply(info, child);
2121         if (unlikely(rc)) {
2122                 mdt_object_unlock(info, child, lhc, 1);
2123                 GOTO(out_child, rc);
2124         }
2125
2126         lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
2127         if (lock) {
2128                 /* Debugging code. */
2129                 LDLM_DEBUG(lock, "Returning lock to client");
2130                 LASSERTF(fid_res_name_eq(mdt_object_fid(child),
2131                                          &lock->l_resource->lr_name),
2132                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
2133                          PLDLMRES(lock->l_resource),
2134                          PFID(mdt_object_fid(child)));
2135
2136                 if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND))) {
2137                         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
2138                                 OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_ENQ_RESEND,
2139                                                  req->rq_deadline -
2140                                                  req->rq_arrival_time.tv_sec +
2141                                                  cfs_fail_val ?: 3);
2142                         /* Put the lock to the waiting list and force the cancel */
2143                         ldlm_set_ast_sent(lock);
2144                 }
2145
2146                 if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
2147                     !mdt_object_remote(child) && child != parent) {
2148                         mdt_object_put(info->mti_env, child);
2149                         rc = mdt_pack_size2body(info, child_fid,
2150                                                 &lhc->mlh_reg_lh);
2151                         if (rc != 0 && child_bits & MDS_INODELOCK_DOM) {
2152                                 /* DOM lock was taken in advance but this is
2153                                  * not DoM file. Drop the lock.
2154                                  */
2155                                 lock_res_and_lock(lock);
2156                                 ldlm_inodebits_drop(lock, MDS_INODELOCK_DOM);
2157                                 unlock_res_and_lock(lock);
2158                         }
2159                         LDLM_LOCK_PUT(lock);
2160                         GOTO(unlock_parent, rc = 0);
2161                 }
2162                 LDLM_LOCK_PUT(lock);
2163         }
2164
2165         EXIT;
2166 out_child:
2167         if (child)
2168                 mdt_object_put(info->mti_env, child);
2169 unlock_parent:
2170         if (lhp)
2171                 mdt_object_unlock(info, parent, lhp, 1);
2172         return rc;
2173 }
2174
2175 /* normal handler: should release the child lock */
2176 static int mdt_getattr_name(struct tgt_session_info *tsi)
2177 {
2178         struct mdt_thread_info  *info = tsi2mdt_info(tsi);
2179         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
2180         struct mdt_body *reqbody;
2181         struct mdt_body *repbody;
2182         int rc, rc2;
2183
2184         ENTRY;
2185
2186         reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
2187         LASSERT(reqbody != NULL);
2188         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2189         LASSERT(repbody != NULL);
2190
2191         info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
2192         repbody->mbo_eadatasize = 0;
2193         repbody->mbo_aclsize = 0;
2194
2195         rc = mdt_init_ucred(info, reqbody);
2196         if (unlikely(rc))
2197                 GOTO(out_shrink, rc);
2198
2199         rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
2200         if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
2201                 ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode);
2202                 lhc->mlh_reg_lh.cookie = 0;
2203         }
2204         mdt_exit_ucred(info);
2205         EXIT;
2206 out_shrink:
2207         mdt_client_compatibility(info);
2208         rc2 = mdt_fix_reply(info);
2209         if (rc == 0)
2210                 rc = rc2;
2211         mdt_thread_info_fini(info);
2212         return rc;
2213 }
2214
2215 static int mdt_rmfid_unlink(struct mdt_thread_info *info,
2216                             const struct lu_fid *pfid,
2217                             const struct lu_name *name,
2218                             struct mdt_object *obj, s64 ctime)
2219 {
2220         struct lu_fid *child_fid = &info->mti_tmp_fid1;
2221         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
2222         struct mdt_device *mdt = info->mti_mdt;
2223         struct md_attr *ma = &info->mti_attr;
2224         struct mdt_lock_handle *parent_lh;
2225         struct mdt_lock_handle *child_lh;
2226         struct mdt_object *pobj;
2227         bool cos_incompat = false;
2228         int rc;
2229         ENTRY;
2230
2231         pobj = mdt_object_find(info->mti_env, mdt, pfid);
2232         if (IS_ERR(pobj))
2233                 GOTO(out, rc = PTR_ERR(pobj));
2234
2235         parent_lh = &info->mti_lh[MDT_LH_PARENT];
2236         mdt_lock_pdo_init(parent_lh, LCK_PW, name);
2237         rc = mdt_object_lock(info, pobj, parent_lh, MDS_INODELOCK_UPDATE);
2238         if (rc != 0)
2239                 GOTO(put_parent, rc);
2240
2241         if (mdt_object_remote(pobj))
2242                 cos_incompat = true;
2243
2244         rc = mdo_lookup(info->mti_env, mdt_object_child(pobj),
2245                         name, child_fid, &info->mti_spec);
2246         if (rc != 0)
2247                 GOTO(unlock_parent, rc);
2248
2249         if (!lu_fid_eq(child_fid, mdt_object_fid(obj)))
2250                 GOTO(unlock_parent, rc = -EREMCHG);
2251
2252         child_lh = &info->mti_lh[MDT_LH_CHILD];
2253         mdt_lock_reg_init(child_lh, LCK_EX);
2254         rc = mdt_reint_striped_lock(info, obj, child_lh,
2255                                     MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE,
2256                                     einfo, cos_incompat);
2257         if (rc != 0)
2258                 GOTO(unlock_parent, rc);
2259
2260         if (atomic_read(&obj->mot_open_count)) {
2261                 CDEBUG(D_OTHER, "object "DFID" open, skip\n",
2262                        PFID(mdt_object_fid(obj)));
2263                 GOTO(unlock_child, rc = -EBUSY);
2264         }
2265
2266         ma->ma_need = 0;
2267         ma->ma_valid = MA_INODE;
2268         ma->ma_attr.la_valid = LA_CTIME;
2269         ma->ma_attr.la_ctime = ctime;
2270
2271         mutex_lock(&obj->mot_lov_mutex);
2272
2273         rc = mdo_unlink(info->mti_env, mdt_object_child(pobj),
2274                         mdt_object_child(obj), name, ma, 0);
2275
2276         mutex_unlock(&obj->mot_lov_mutex);
2277
2278 unlock_child:
2279         mdt_reint_striped_unlock(info, obj, child_lh, einfo, 1);
2280 unlock_parent:
2281         mdt_object_unlock(info, pobj, parent_lh, 1);
2282 put_parent:
2283         mdt_object_put(info->mti_env, pobj);
2284 out:
2285         RETURN(rc);
2286 }
2287
2288 static int mdt_rmfid_check_permission(struct mdt_thread_info *info,
2289                                         struct mdt_object *obj)
2290 {
2291         struct lu_ucred *uc = lu_ucred(info->mti_env);
2292         struct md_attr *ma = &info->mti_attr;
2293         struct lu_attr *la = &ma->ma_attr;
2294         int rc = 0;
2295         ENTRY;
2296
2297         ma->ma_need = MA_INODE;
2298         rc = mo_attr_get(info->mti_env, mdt_object_child(obj), ma);
2299         if (rc)
2300                 GOTO(out, rc);
2301
2302         if (la->la_flags & LUSTRE_IMMUTABLE_FL)
2303                         rc = -EACCES;
2304
2305         if (md_capable(uc, CAP_DAC_OVERRIDE))
2306                 RETURN(0);
2307         if (uc->uc_fsuid == la->la_uid) {
2308                 if ((la->la_mode & S_IWUSR) == 0)
2309                         rc = -EACCES;
2310         } else if (uc->uc_fsgid == la->la_gid) {
2311                 if ((la->la_mode & S_IWGRP) == 0)
2312                         rc = -EACCES;
2313         } else if ((la->la_mode & S_IWOTH) == 0) {
2314                         rc = -EACCES;
2315         }
2316
2317 out:
2318         RETURN(rc);
2319 }
2320
2321 static int mdt_rmfid_one(struct mdt_thread_info *info, struct lu_fid *fid,
2322                          s64 ctime)
2323 {
2324         struct mdt_device *mdt = info->mti_mdt;
2325         struct mdt_object *obj = NULL;
2326         struct linkea_data ldata = { NULL };
2327         struct lu_buf *buf = &info->mti_big_buf;
2328         struct lu_name *name = &info->mti_name;
2329         struct lu_fid *pfid = &info->mti_tmp_fid1;
2330         struct link_ea_header *leh;
2331         struct link_ea_entry *lee;
2332         int reclen, count, rc = 0;
2333         ENTRY;
2334
2335         if (!fid_is_sane(fid))
2336                 GOTO(out, rc = -EINVAL);
2337
2338         if (!fid_is_namespace_visible(fid))
2339                 GOTO(out, rc = -EINVAL);
2340
2341         obj = mdt_object_find(info->mti_env, mdt, fid);
2342         if (IS_ERR(obj))
2343                 GOTO(out, rc = PTR_ERR(obj));
2344
2345         if (mdt_object_remote(obj))
2346                 GOTO(out, rc = -EREMOTE);
2347         if (!mdt_object_exists(obj) || lu_object_is_dying(&obj->mot_header))
2348                 GOTO(out, rc = -ENOENT);
2349
2350         rc = mdt_rmfid_check_permission(info, obj);
2351         if (rc)
2352                 GOTO(out, rc);
2353
2354         /* take LinkEA */
2355         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
2356         if (!buf->lb_buf)
2357                 GOTO(out, rc = -ENOMEM);
2358
2359         ldata.ld_buf = buf;
2360         rc = mdt_links_read(info, obj, &ldata);
2361         if (rc)
2362                 GOTO(out, rc);
2363
2364         leh = buf->lb_buf;
2365         lee = (struct link_ea_entry *)(leh + 1);
2366         for (count = 0; count < leh->leh_reccount; count++) {
2367                 /* remove every hardlink */
2368                 linkea_entry_unpack(lee, &reclen, name, pfid);
2369                 lee = (struct link_ea_entry *) ((char *)lee + reclen);
2370                 rc = mdt_rmfid_unlink(info, pfid, name, obj, ctime);
2371                 if (rc)
2372                         break;
2373         }
2374
2375 out:
2376         if (obj && !IS_ERR(obj))
2377                 mdt_object_put(info->mti_env, obj);
2378         if (info->mti_big_buf.lb_buf)
2379                 lu_buf_free(&info->mti_big_buf);
2380
2381         RETURN(rc);
2382 }
2383
2384 static int mdt_rmfid(struct tgt_session_info *tsi)
2385 {
2386         struct mdt_thread_info *mti = tsi2mdt_info(tsi);
2387         struct mdt_body *reqbody;
2388         struct lu_fid *fids, *rfids;
2389         int bufsize, rc;
2390         __u32 *rcs;
2391         int i, nr;
2392         ENTRY;
2393
2394         reqbody = req_capsule_client_get(tsi->tsi_pill, &RMF_MDT_BODY);
2395         if (reqbody == NULL)
2396                 RETURN(-EPROTO);
2397         bufsize = req_capsule_get_size(tsi->tsi_pill, &RMF_FID_ARRAY,
2398                                        RCL_CLIENT);
2399         nr = bufsize / sizeof(struct lu_fid);
2400         if (nr * sizeof(struct lu_fid) != bufsize)
2401                 RETURN(-EINVAL);
2402         req_capsule_set_size(tsi->tsi_pill, &RMF_RCS,
2403                              RCL_SERVER, nr * sizeof(__u32));
2404         req_capsule_set_size(tsi->tsi_pill, &RMF_FID_ARRAY,
2405                              RCL_SERVER, nr * sizeof(struct lu_fid));
2406         rc = req_capsule_server_pack(tsi->tsi_pill);
2407         if (rc)
2408                 GOTO(out, rc = err_serious(rc));
2409         fids = req_capsule_client_get(tsi->tsi_pill, &RMF_FID_ARRAY);
2410         if (fids == NULL)
2411                 RETURN(-EPROTO);
2412         rcs = req_capsule_server_get(tsi->tsi_pill, &RMF_RCS);
2413         LASSERT(rcs);
2414         rfids = req_capsule_server_get(tsi->tsi_pill, &RMF_FID_ARRAY);
2415         LASSERT(rfids);
2416
2417         mdt_init_ucred(mti, reqbody);
2418         for (i = 0; i < nr; i++) {
2419                 rfids[i] = fids[i];
2420                 rcs[i] = mdt_rmfid_one(mti, fids + i, reqbody->mbo_ctime);
2421         }
2422         mdt_exit_ucred(mti);
2423
2424 out:
2425         RETURN(rc);
2426 }
2427
2428 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2429                          void *karg, void __user *uarg);
2430
2431 static int mdt_set_info(struct tgt_session_info *tsi)
2432 {
2433         struct ptlrpc_request   *req = tgt_ses_req(tsi);
2434         char                    *key;
2435         void                    *val;
2436         int                      keylen, vallen, rc = 0;
2437
2438         ENTRY;
2439
2440         key = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_KEY);
2441         if (key == NULL) {
2442                 DEBUG_REQ(D_HA, req, "no set_info key");
2443                 RETURN(err_serious(-EFAULT));
2444         }
2445
2446         keylen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_KEY,
2447                                       RCL_CLIENT);
2448
2449         val = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_VAL);
2450         if (val == NULL) {
2451                 DEBUG_REQ(D_HA, req, "no set_info val");
2452                 RETURN(err_serious(-EFAULT));
2453         }
2454
2455         vallen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_VAL,
2456                                       RCL_CLIENT);
2457
2458         /* Swab any part of val you need to here */
2459         if (KEY_IS(KEY_READ_ONLY)) {
2460                 spin_lock(&req->rq_export->exp_lock);
2461                 if (*(__u32 *)val)
2462                         *exp_connect_flags_ptr(req->rq_export) |=
2463                                 OBD_CONNECT_RDONLY;
2464                 else
2465                         *exp_connect_flags_ptr(req->rq_export) &=
2466                                 ~OBD_CONNECT_RDONLY;
2467                 spin_unlock(&req->rq_export->exp_lock);
2468         } else if (KEY_IS(KEY_CHANGELOG_CLEAR)) {
2469                 struct changelog_setinfo *cs = val;
2470
2471                 if (vallen != sizeof(*cs)) {
2472                         CERROR("%s: bad changelog_clear setinfo size %d\n",
2473                                tgt_name(tsi->tsi_tgt), vallen);
2474                         RETURN(-EINVAL);
2475                 }
2476                 if (req_capsule_req_need_swab(&req->rq_pill)) {
2477                         __swab64s(&cs->cs_recno);
2478                         __swab32s(&cs->cs_id);
2479                 }
2480
2481                 if (!mdt_is_rootadmin(tsi2mdt_info(tsi)))
2482                         RETURN(-EACCES);
2483                 rc = mdt_iocontrol(OBD_IOC_CHANGELOG_CLEAR, req->rq_export,
2484                                    vallen, val, NULL);
2485         } else if (KEY_IS(KEY_EVICT_BY_NID)) {
2486                 if (vallen > 0)
2487                         obd_export_evict_by_nid(req->rq_export->exp_obd, val);
2488         } else {
2489                 RETURN(-EINVAL);
2490         }
2491         RETURN(rc);
2492 }
2493
2494 static int mdt_readpage(struct tgt_session_info *tsi)
2495 {
2496         struct mdt_thread_info  *info = mdt_th_info(tsi->tsi_env);
2497         struct mdt_object       *object = mdt_obj(tsi->tsi_corpus);
2498         struct lu_rdpg          *rdpg = &info->mti_u.rdpg.mti_rdpg;
2499         const struct mdt_body   *reqbody = tsi->tsi_mdt_body;
2500         struct mdt_body         *repbody;
2501         int                      rc;
2502         int                      i;
2503
2504         ENTRY;
2505
2506         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
2507                 RETURN(err_serious(-ENOMEM));
2508
2509         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_MDT_BODY);
2510         if (repbody == NULL || reqbody == NULL)
2511                 RETURN(err_serious(-EFAULT));
2512
2513         /*
2514          * prepare @rdpg before calling lower layers and transfer itself. Here
2515          * reqbody->size contains offset of where to start to read and
2516          * reqbody->nlink contains number bytes to read.
2517          */
2518         rdpg->rp_hash = reqbody->mbo_size;
2519         if (rdpg->rp_hash != reqbody->mbo_size) {
2520                 CERROR("Invalid hash: %#llx != %#llx\n",
2521                        rdpg->rp_hash, reqbody->mbo_size);
2522                 RETURN(-EFAULT);
2523         }
2524
2525         rdpg->rp_attrs = reqbody->mbo_mode;
2526         if (exp_connect_flags(tsi->tsi_exp) & OBD_CONNECT_64BITHASH)
2527                 rdpg->rp_attrs |= LUDA_64BITHASH;
2528         rdpg->rp_count  = min_t(unsigned int, reqbody->mbo_nlink,
2529                                 exp_max_brw_size(tsi->tsi_exp));
2530         rdpg->rp_npages = (rdpg->rp_count + PAGE_SIZE - 1) >>
2531                           PAGE_SHIFT;
2532         OBD_ALLOC_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages);
2533         if (rdpg->rp_pages == NULL)
2534                 RETURN(-ENOMEM);
2535
2536         for (i = 0; i < rdpg->rp_npages; ++i) {
2537                 rdpg->rp_pages[i] = alloc_page(GFP_NOFS);
2538                 if (rdpg->rp_pages[i] == NULL)
2539                         GOTO(free_rdpg, rc = -ENOMEM);
2540         }
2541
2542         /* call lower layers to fill allocated pages with directory data */
2543         rc = mo_readpage(tsi->tsi_env, mdt_object_child(object), rdpg);
2544         if (rc < 0)
2545                 GOTO(free_rdpg, rc);
2546
2547         /* send pages to client */
2548         rc = tgt_sendpage(tsi, rdpg, rc);
2549
2550         EXIT;
2551 free_rdpg:
2552
2553         for (i = 0; i < rdpg->rp_npages; i++)
2554                 if (rdpg->rp_pages[i] != NULL)
2555                         __free_page(rdpg->rp_pages[i]);
2556         OBD_FREE_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages);
2557
2558         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
2559                 RETURN(0);
2560
2561         return rc;
2562 }
2563
2564 static int mdt_fix_attr_ucred(struct mdt_thread_info *info, __u32 op)
2565 {
2566         struct lu_ucred *uc = mdt_ucred_check(info);
2567         struct lu_attr *attr = &info->mti_attr.ma_attr;
2568
2569         if (uc == NULL)
2570                 return -EINVAL;
2571
2572         if (op != REINT_SETATTR) {
2573                 if ((attr->la_valid & LA_UID) && (attr->la_uid != -1))
2574                         attr->la_uid = uc->uc_fsuid;
2575                 /* for S_ISGID, inherit gid from his parent, such work will be
2576                  * done in cmm/mdd layer, here set all cases as uc->uc_fsgid. */
2577                 if ((attr->la_valid & LA_GID) && (attr->la_gid != -1))
2578                         attr->la_gid = uc->uc_fsgid;
2579         }
2580
2581         return 0;
2582 }
2583
2584 static inline bool mdt_is_readonly_open(struct mdt_thread_info *info, __u32 op)
2585 {
2586         return op == REINT_OPEN &&
2587              !(info->mti_spec.sp_cr_flags & (MDS_FMODE_WRITE | MDS_OPEN_CREAT));
2588 }
2589
2590 static void mdt_preset_secctx_size(struct mdt_thread_info *info)
2591 {
2592         struct req_capsule *pill = info->mti_pill;
2593
2594         if (req_capsule_has_field(pill, &RMF_FILE_SECCTX,
2595                                   RCL_SERVER) &&
2596             req_capsule_has_field(pill, &RMF_FILE_SECCTX_NAME,
2597                                   RCL_CLIENT)) {
2598                 if (req_capsule_get_size(pill, &RMF_FILE_SECCTX_NAME,
2599                                          RCL_CLIENT) != 0)
2600                         /* pre-set size in server part with max size */
2601                         req_capsule_set_size(pill, &RMF_FILE_SECCTX,
2602                                              RCL_SERVER,
2603                                              OBD_MAX_DEFAULT_EA_SIZE);
2604                 else
2605                         req_capsule_set_size(pill, &RMF_FILE_SECCTX,
2606                                              RCL_SERVER, 0);
2607         }
2608 }
2609
2610 static void mdt_preset_encctx_size(struct mdt_thread_info *info)
2611 {
2612         struct req_capsule *pill = info->mti_pill;
2613
2614         if (req_capsule_has_field(pill, &RMF_FILE_ENCCTX,
2615                                   RCL_SERVER))
2616                 /* pre-set size in server part with max size */
2617                 req_capsule_set_size(pill, &RMF_FILE_ENCCTX,
2618                                      RCL_SERVER,
2619                                      info->mti_mdt->mdt_max_mdsize);
2620 }
2621
2622 static int mdt_reint_internal(struct mdt_thread_info *info,
2623                               struct mdt_lock_handle *lhc,
2624                               __u32 op)
2625 {
2626         struct req_capsule      *pill = info->mti_pill;
2627         struct mdt_body         *repbody;
2628         int                      rc = 0, rc2;
2629
2630         ENTRY;
2631
2632         rc = mdt_reint_unpack(info, op);
2633         if (rc != 0) {
2634                 CERROR("Can't unpack reint, rc %d\n", rc);
2635                 RETURN(err_serious(rc));
2636         }
2637
2638
2639         /* check if the file system is set to readonly. O_RDONLY open
2640          * is still allowed even the file system is set to readonly mode */
2641         if (mdt_rdonly(info->mti_exp) && !mdt_is_readonly_open(info, op))
2642                 RETURN(err_serious(-EROFS));
2643
2644         /* for replay (no_create) lmm is not needed, client has it already */
2645         if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
2646                 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
2647                                      DEF_REP_MD_SIZE);
2648
2649         /* llog cookies are always 0, the field is kept for compatibility */
2650         if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
2651                 req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, 0);
2652
2653         /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
2654          * by default. If the target object has more ACL entries, then
2655          * enlarge the buffer when necessary. */
2656         if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER))
2657                 req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
2658                                      LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
2659
2660         mdt_preset_secctx_size(info);
2661         mdt_preset_encctx_size(info);
2662
2663         rc = req_capsule_server_pack(pill);
2664         if (rc != 0) {
2665                 CERROR("Can't pack response, rc %d\n", rc);
2666                 RETURN(err_serious(rc));
2667         }
2668
2669         if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_SERVER)) {
2670                 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
2671                 LASSERT(repbody);
2672                 repbody->mbo_eadatasize = 0;
2673                 repbody->mbo_aclsize = 0;
2674         }
2675
2676         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_REINT_DELAY, 10);
2677
2678         /* for replay no cookkie / lmm need, because client have this already */
2679         if (info->mti_spec.no_create)
2680                 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
2681                         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, 0);
2682
2683         rc = mdt_init_ucred_reint(info);
2684         if (rc)
2685                 GOTO(out_shrink, rc);
2686
2687         rc = mdt_fix_attr_ucred(info, op);
2688         if (rc != 0)
2689                 GOTO(out_ucred, rc = err_serious(rc));
2690
2691         rc = mdt_check_resent(info, mdt_reconstruct, lhc);
2692         if (rc < 0) {
2693                 GOTO(out_ucred, rc);
2694         } else if (rc == 1) {
2695                 DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt");
2696                 rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg);
2697                 GOTO(out_ucred, rc);
2698         }
2699         rc = mdt_reint_rec(info, lhc);
2700         EXIT;
2701 out_ucred:
2702         mdt_exit_ucred(info);
2703 out_shrink:
2704         mdt_client_compatibility(info);
2705
2706         rc2 = mdt_fix_reply(info);
2707         if (rc == 0)
2708                 rc = rc2;
2709
2710         /*
2711          * Data-on-MDT optimization - read data along with OPEN and return it
2712          * in reply when possible.
2713          */
2714         if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req))
2715                 rc = mdt_dom_read_on_open(info, info->mti_mdt,
2716                                           &lhc->mlh_reg_lh);
2717
2718         return rc;
2719 }
2720
2721 static long mdt_reint_opcode(struct ptlrpc_request *req,
2722                              const struct req_format **fmt)
2723 {
2724         struct mdt_device       *mdt;
2725         struct mdt_rec_reint    *rec;
2726         long                     opc;
2727
2728         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
2729         if (rec != NULL) {
2730                 opc = rec->rr_opcode;
2731                 DEBUG_REQ(D_INODE, req, "reint opt = %ld", opc);
2732                 if (opc < REINT_MAX && fmt[opc] != NULL)
2733                         req_capsule_extend(&req->rq_pill, fmt[opc]);
2734                 else {
2735                         mdt = mdt_exp2dev(req->rq_export);
2736                         CERROR("%s: Unsupported opcode '%ld' from client '%s':"
2737                                " rc = %d\n", req->rq_export->exp_obd->obd_name,
2738                                opc, mdt->mdt_ldlm_client->cli_name, -EFAULT);
2739                         opc = err_serious(-EFAULT);
2740                 }
2741         } else {
2742                 opc = err_serious(-EFAULT);
2743         }
2744         return opc;
2745 }
2746
2747 static int mdt_reint(struct tgt_session_info *tsi)
2748 {
2749         long opc;
2750         int  rc;
2751         static const struct req_format *reint_fmts[REINT_MAX] = {
2752                 [REINT_SETATTR]  = &RQF_MDS_REINT_SETATTR,
2753                 [REINT_CREATE]   = &RQF_MDS_REINT_CREATE,
2754                 [REINT_LINK]     = &RQF_MDS_REINT_LINK,
2755                 [REINT_UNLINK]   = &RQF_MDS_REINT_UNLINK,
2756                 [REINT_RENAME]   = &RQF_MDS_REINT_RENAME,
2757                 [REINT_OPEN]     = &RQF_MDS_REINT_OPEN,
2758                 [REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR,
2759                 [REINT_RMENTRY]  = &RQF_MDS_REINT_UNLINK,
2760                 [REINT_MIGRATE]  = &RQF_MDS_REINT_MIGRATE,
2761                 [REINT_RESYNC]   = &RQF_MDS_REINT_RESYNC,
2762         };
2763
2764         ENTRY;
2765
2766         opc = mdt_reint_opcode(tgt_ses_req(tsi), reint_fmts);
2767         if (opc >= 0) {
2768                 struct mdt_thread_info *info = tsi2mdt_info(tsi);
2769                 /*
2770                  * No lock possible here from client to pass it to reint code
2771                  * path.
2772                  */
2773                 rc = mdt_reint_internal(info, NULL, opc);
2774                 mdt_thread_info_fini(info);
2775         } else {
2776                 rc = opc;
2777         }
2778
2779         tsi->tsi_reply_fail_id = OBD_FAIL_MDS_REINT_NET_REP;
2780         RETURN(rc);
2781 }
2782
2783 /* this should sync the whole device */
2784 int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt)
2785 {
2786         struct dt_device *dt = mdt->mdt_bottom;
2787         int rc;
2788         ENTRY;
2789
2790         rc = dt->dd_ops->dt_sync(env, dt);
2791         RETURN(rc);
2792 }
2793
2794 /* this should sync this object */
2795 static int mdt_object_sync(const struct lu_env *env, struct obd_export *exp,
2796                            struct mdt_object *mo)
2797 {
2798         int rc = 0;
2799
2800         ENTRY;
2801
2802         if (!mdt_object_exists(mo)) {
2803                 CWARN("%s: non existing object "DFID": rc = %d\n",
2804                       exp->exp_obd->obd_name, PFID(mdt_object_fid(mo)),
2805                       -ESTALE);
2806                 RETURN(-ESTALE);
2807         }
2808
2809         if (S_ISREG(lu_object_attr(&mo->mot_obj))) {
2810                 struct lu_target *tgt = tgt_ses_info(env)->tsi_tgt;
2811                 dt_obj_version_t version;
2812
2813                 version = dt_version_get(env, mdt_obj2dt(mo));
2814                 if (version > tgt->lut_obd->obd_last_committed)
2815                         rc = mo_object_sync(env, mdt_object_child(mo));
2816         } else {
2817                 rc = mo_object_sync(env, mdt_object_child(mo));
2818         }
2819
2820         RETURN(rc);
2821 }
2822
2823 static int mdt_sync(struct tgt_session_info *tsi)
2824 {
2825         struct ptlrpc_request   *req = tgt_ses_req(tsi);
2826         struct req_capsule      *pill = tsi->tsi_pill;
2827         struct mdt_body         *body;
2828         ktime_t                  kstart = ktime_get();
2829         int                      rc;
2830
2831         ENTRY;
2832
2833         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
2834                 RETURN(err_serious(-ENOMEM));
2835
2836         if (fid_seq(&tsi->tsi_mdt_body->mbo_fid1) == 0) {
2837                 rc = mdt_device_sync(tsi->tsi_env, mdt_exp2dev(tsi->tsi_exp));
2838         } else {
2839                 struct mdt_thread_info *info = tsi2mdt_info(tsi);
2840
2841                 if (unlikely(info->mti_object == NULL))
2842                         RETURN(-EPROTO);
2843
2844                 /* sync an object */
2845                 rc = mdt_object_sync(tsi->tsi_env, tsi->tsi_exp,
2846                                      info->mti_object);
2847                 if (rc == 0) {
2848                         const struct lu_fid *fid;
2849                         struct lu_attr *la = &info->mti_attr.ma_attr;
2850
2851                         info->mti_attr.ma_need = MA_INODE;
2852                         info->mti_attr.ma_valid = 0;
2853                         rc = mdt_attr_get_complex(info, info->mti_object,
2854                                                   &info->mti_attr);
2855                         if (rc == 0) {
2856                                 body = req_capsule_server_get(pill,
2857                                                               &RMF_MDT_BODY);
2858                                 fid = mdt_object_fid(info->mti_object);
2859                                 mdt_pack_attr2body(info, body, la, fid);
2860                         }
2861                 }
2862                 mdt_thread_info_fini(info);
2863         }
2864         if (rc == 0)
2865                 mdt_counter_incr(req, LPROC_MDT_SYNC,
2866                                  ktime_us_delta(ktime_get(), kstart));
2867
2868         RETURN(rc);
2869 }
2870
2871 static int mdt_data_sync(struct tgt_session_info *tsi)
2872 {
2873         struct mdt_thread_info *info;
2874         struct mdt_device *mdt = mdt_exp2dev(tsi->tsi_exp);
2875         struct ost_body *body = tsi->tsi_ost_body;
2876         struct ost_body *repbody;
2877         struct mdt_object *mo = NULL;
2878         struct md_attr *ma;
2879         int rc = 0;
2880
2881         ENTRY;
2882
2883         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
2884
2885         /* if no fid is specified then do nothing,
2886          * device sync is done via MDS_SYNC */
2887         if (fid_is_zero(&tsi->tsi_fid))
2888                 RETURN(0);
2889
2890         mo = mdt_object_find(tsi->tsi_env, mdt, &tsi->tsi_fid);
2891         if (IS_ERR(mo))
2892                 RETURN(PTR_ERR(mo));
2893
2894         rc = mdt_object_sync(tsi->tsi_env, tsi->tsi_exp, mo);
2895         if (rc)
2896                 GOTO(put, rc);
2897
2898         repbody->oa.o_oi = body->oa.o_oi;
2899         repbody->oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2900
2901         info = tsi2mdt_info(tsi);
2902         ma = &info->mti_attr;
2903         ma->ma_need = MA_INODE;
2904         ma->ma_valid = 0;
2905         rc = mdt_attr_get_complex(info, mo, ma);
2906         if (rc == 0)
2907                 obdo_from_la(&repbody->oa, &ma->ma_attr, VALID_FLAGS);
2908         else
2909                 rc = 0;
2910         mdt_thread_info_fini(info);
2911
2912         EXIT;
2913 put:
2914         if (mo != NULL)
2915                 mdt_object_put(tsi->tsi_env, mo);
2916         return rc;
2917 }
2918
2919 /*
2920  * Handle quota control requests to consult current usage/limit, but also
2921  * to configure quota enforcement
2922  */
2923 static int mdt_quotactl(struct tgt_session_info *tsi)
2924 {
2925         struct obd_export *exp  = tsi->tsi_exp;
2926         struct req_capsule *pill = tsi->tsi_pill;
2927         struct obd_quotactl *oqctl, *repoqc;
2928         int id, rc;
2929         struct mdt_device *mdt = mdt_exp2dev(exp);
2930         struct lu_device *qmt = mdt->mdt_qmt_dev;
2931         struct lu_nodemap *nodemap;
2932         ENTRY;
2933
2934         oqctl = req_capsule_client_get(pill, &RMF_OBD_QUOTACTL);
2935         if (!oqctl)
2936                 RETURN(err_serious(-EPROTO));
2937
2938         rc = req_capsule_server_pack(pill);
2939         if (rc)
2940                 RETURN(err_serious(rc));
2941
2942         nodemap = nodemap_get_from_exp(exp);
2943         if (IS_ERR(nodemap))
2944                 RETURN(PTR_ERR(nodemap));
2945
2946         switch (oqctl->qc_cmd) {
2947                 /* master quotactl */
2948         case Q_SETINFO:
2949         case Q_SETQUOTA:
2950         case LUSTRE_Q_SETDEFAULT:
2951         case LUSTRE_Q_SETQUOTAPOOL:
2952         case LUSTRE_Q_SETINFOPOOL:
2953         case LUSTRE_Q_SETDEFAULT_POOL:
2954                 if (!nodemap_can_setquota(nodemap))
2955                         GOTO(out_nodemap, rc = -EPERM);
2956                 /* fallthrough */
2957         case Q_GETINFO:
2958         case Q_GETQUOTA:
2959         case LUSTRE_Q_GETDEFAULT:
2960         case LUSTRE_Q_GETQUOTAPOOL:
2961         case LUSTRE_Q_GETINFOPOOL:
2962         case LUSTRE_Q_GETDEFAULT_POOL:
2963                 if (qmt == NULL)
2964                         GOTO(out_nodemap, rc = -EOPNOTSUPP);
2965                 /* slave quotactl */
2966                 /* fallthrough */
2967         case Q_GETOINFO:
2968         case Q_GETOQUOTA:
2969                 break;
2970         default:
2971                 rc = -EFAULT;
2972                 CERROR("%s: unsupported quotactl command %d: rc = %d\n",
2973                        mdt_obd_name(mdt), oqctl->qc_cmd, rc);
2974                 GOTO(out_nodemap, rc);
2975         }
2976
2977         id = oqctl->qc_id;
2978         switch (oqctl->qc_type) {
2979         case USRQUOTA:
2980                 id = nodemap_map_id(nodemap, NODEMAP_UID,
2981                                     NODEMAP_CLIENT_TO_FS, id);
2982                 break;
2983         case GRPQUOTA:
2984                 id = nodemap_map_id(nodemap, NODEMAP_GID,
2985                                     NODEMAP_CLIENT_TO_FS, id);
2986                 break;
2987         case PRJQUOTA:
2988                 /* todo: check/map project id */
2989                 id = oqctl->qc_id;
2990                 break;
2991         default:
2992                 GOTO(out_nodemap, rc = -EOPNOTSUPP);
2993         }
2994         repoqc = req_capsule_server_get(pill, &RMF_OBD_QUOTACTL);
2995         if (repoqc == NULL)
2996                 GOTO(out_nodemap, rc = err_serious(-EFAULT));
2997
2998         if (oqctl->qc_cmd == Q_SETINFO || oqctl->qc_cmd == Q_SETQUOTA)
2999                 barrier_exit(tsi->tsi_tgt->lut_bottom);
3000
3001         if (oqctl->qc_id != id)
3002                 swap(oqctl->qc_id, id);
3003
3004         if (oqctl->qc_cmd == Q_SETINFO || oqctl->qc_cmd == Q_SETQUOTA) {
3005                 if (unlikely(!barrier_entry(tsi->tsi_tgt->lut_bottom)))
3006                         RETURN(-EINPROGRESS);
3007         }
3008
3009         switch (oqctl->qc_cmd) {
3010
3011         case Q_GETINFO:
3012         case Q_SETINFO:
3013         case Q_SETQUOTA:
3014         case Q_GETQUOTA:
3015         case LUSTRE_Q_SETDEFAULT:
3016         case LUSTRE_Q_GETDEFAULT:
3017         case LUSTRE_Q_SETQUOTAPOOL:
3018         case LUSTRE_Q_GETQUOTAPOOL:
3019         case LUSTRE_Q_SETINFOPOOL:
3020         case LUSTRE_Q_GETINFOPOOL:
3021         case LUSTRE_Q_SETDEFAULT_POOL:
3022         case LUSTRE_Q_GETDEFAULT_POOL:
3023                 /* forward quotactl request to QMT */
3024                 rc = qmt_hdls.qmth_quotactl(tsi->tsi_env, qmt, oqctl);
3025                 break;
3026
3027         case Q_GETOINFO:
3028         case Q_GETOQUOTA:
3029                 /* slave quotactl */
3030                 rc = lquotactl_slv(tsi->tsi_env, tsi->tsi_tgt->lut_bottom,
3031                                    oqctl);
3032                 break;
3033
3034         default:
3035                 CERROR("Unsupported quotactl command: %d\n", oqctl->qc_cmd);
3036                 GOTO(out_nodemap, rc = -EFAULT);
3037         }
3038
3039         if (oqctl->qc_id != id)
3040                 swap(oqctl->qc_id, id);
3041
3042         QCTL_COPY(repoqc, oqctl);
3043         EXIT;
3044
3045 out_nodemap:
3046         nodemap_putref(nodemap);
3047
3048         return rc;
3049 }
3050
3051 /** clone llog ctxt from child (mdd)
3052  * This allows remote llog (replicator) access.
3053  * We can either pass all llog RPCs (eg mdt_llog_create) on to child where the
3054  * context was originally set up, or we can handle them directly.
3055  * I choose the latter, but that means I need any llog
3056  * contexts set up by child to be accessable by the mdt.  So we clone the
3057  * context into our context list here.
3058  */
3059 static int mdt_llog_ctxt_clone(const struct lu_env *env, struct mdt_device *mdt,
3060                                int idx)
3061 {
3062         struct md_device  *next = mdt->mdt_child;
3063         struct llog_ctxt *ctxt;
3064         int rc;
3065
3066         if (!llog_ctxt_null(mdt2obd_dev(mdt), idx))
3067                 return 0;
3068
3069         rc = next->md_ops->mdo_llog_ctxt_get(env, next, idx, (void **)&ctxt);
3070         if (rc || ctxt == NULL) {
3071                 return 0;
3072         }
3073
3074         rc = llog_group_set_ctxt(&mdt2obd_dev(mdt)->obd_olg, ctxt, idx);
3075         if (rc)
3076                 CERROR("Can't set mdt ctxt %d\n", rc);
3077
3078         return rc;
3079 }
3080
3081 static int mdt_llog_ctxt_unclone(const struct lu_env *env,
3082                                  struct mdt_device *mdt, int idx)
3083 {
3084         struct llog_ctxt *ctxt;
3085
3086         ctxt = llog_get_context(mdt2obd_dev(mdt), idx);
3087         if (ctxt == NULL)
3088                 return 0;
3089         /* Put once for the get we just did, and once for the clone */
3090         llog_ctxt_put(ctxt);
3091         llog_ctxt_put(ctxt);
3092         return 0;
3093 }
3094
3095 /*
3096  * sec context handlers
3097  */
3098 static int mdt_sec_ctx_handle(struct tgt_session_info *tsi)
3099 {
3100         CFS_FAIL_TIMEOUT(OBD_FAIL_SEC_CTX_HDL_PAUSE, cfs_fail_val);
3101
3102         return 0;
3103 }
3104
3105 /*
3106  * quota request handlers
3107  */
3108 static int mdt_quota_dqacq(struct tgt_session_info *tsi)
3109 {
3110         struct mdt_device       *mdt = mdt_exp2dev(tsi->tsi_exp);
3111         struct lu_device        *qmt = mdt->mdt_qmt_dev;
3112         int                      rc;
3113         ENTRY;
3114
3115         if (qmt == NULL)
3116                 RETURN(err_serious(-EOPNOTSUPP));
3117
3118         rc = qmt_hdls.qmth_dqacq(tsi->tsi_env, qmt, tgt_ses_req(tsi));
3119         RETURN(rc);
3120 }
3121
3122 struct mdt_object *mdt_object_new(const struct lu_env *env,
3123                                   struct mdt_device *d,
3124                                   const struct lu_fid *f)
3125 {
3126         struct lu_object_conf conf = { .loc_flags = LOC_F_NEW };
3127         struct lu_object *o;
3128         struct mdt_object *m;
3129         ENTRY;
3130
3131         CDEBUG(D_INFO, "Allocate object for "DFID"\n", PFID(f));
3132         o = lu_object_find(env, &d->mdt_lu_dev, f, &conf);
3133         if (unlikely(IS_ERR(o)))
3134                 m = (struct mdt_object *)o;
3135         else
3136                 m = mdt_obj(o);
3137         RETURN(m);
3138 }
3139
3140 struct mdt_object *mdt_object_find(const struct lu_env *env,
3141                                    struct mdt_device *d,
3142                                    const struct lu_fid *f)
3143 {
3144         struct lu_object *o;
3145         struct mdt_object *m;
3146         ENTRY;
3147
3148         CDEBUG(D_INFO, "Find object for "DFID"\n", PFID(f));
3149         o = lu_object_find(env, &d->mdt_lu_dev, f, NULL);
3150         if (unlikely(IS_ERR(o)))
3151                 m = (struct mdt_object *)o;
3152         else
3153                 m = mdt_obj(o);
3154
3155         RETURN(m);
3156 }
3157
3158 /**
3159  * Asyncronous commit for mdt device.
3160  *
3161  * Pass asynchonous commit call down the MDS stack.
3162  *
3163  * \param env environment