Whamcloud - gitweb
LU-14999 mdt: Deadlock on parent during resend
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2010, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mdt/mdt_handler.c
32  *
33  * Lustre Metadata Target (mdt) request handler
34  *
35  * Author: Peter Braam <braam@clusterfs.com>
36  * Author: Andreas Dilger <adilger@clusterfs.com>
37  * Author: Phil Schwan <phil@clusterfs.com>
38  * Author: Mike Shaver <shaver@clusterfs.com>
39  * Author: Nikita Danilov <nikita@clusterfs.com>
40  * Author: Huang Hua <huanghua@clusterfs.com>
41  * Author: Yury Umanets <umka@clusterfs.com>
42  */
43
44 #define DEBUG_SUBSYSTEM S_MDS
45
46 #include <linux/module.h>
47 #include <linux/pagemap.h>
48
49 #include <dt_object.h>
50 #include <lustre_acl.h>
51 #include <lustre_export.h>
52 #include <uapi/linux/lustre/lustre_ioctl.h>
53 #include <lustre_lfsck.h>
54 #include <lustre_log.h>
55 #include <lustre_nodemap.h>
56 #include <lustre_mds.h>
57 #include <uapi/linux/lustre/lustre_param.h>
58 #include <lustre_quota.h>
59 #include <lustre_swab.h>
60 #include <lustre_lmv.h>
61 #include <obd.h>
62 #include <obd_support.h>
63 #include <lustre_barrier.h>
64 #include <obd_cksum.h>
65 #include <llog_swab.h>
66 #include <lustre_crypto.h>
67
68 #include "mdt_internal.h"
69
70 static unsigned int max_mod_rpcs_per_client = 8;
71 module_param(max_mod_rpcs_per_client, uint, 0644);
72 MODULE_PARM_DESC(max_mod_rpcs_per_client, "maximum number of modify RPCs in flight allowed per client");
73
74 mdl_mode_t mdt_mdl_lock_modes[] = {
75         [LCK_MINMODE] = MDL_MINMODE,
76         [LCK_EX]      = MDL_EX,
77         [LCK_PW]      = MDL_PW,
78         [LCK_PR]      = MDL_PR,
79         [LCK_CW]      = MDL_CW,
80         [LCK_CR]      = MDL_CR,
81         [LCK_NL]      = MDL_NL,
82         [LCK_GROUP]   = MDL_GROUP
83 };
84
85 enum ldlm_mode mdt_dlm_lock_modes[] = {
86         [MDL_MINMODE]   = LCK_MINMODE,
87         [MDL_EX]        = LCK_EX,
88         [MDL_PW]        = LCK_PW,
89         [MDL_PR]        = LCK_PR,
90         [MDL_CW]        = LCK_CW,
91         [MDL_CR]        = LCK_CR,
92         [MDL_NL]        = LCK_NL,
93         [MDL_GROUP]     = LCK_GROUP
94 };
95
96 static struct mdt_device *mdt_dev(struct lu_device *d);
97
98 static const struct lu_object_operations mdt_obj_ops;
99
100 /* Slab for MDT object allocation */
101 static struct kmem_cache *mdt_object_kmem;
102
103 /* For HSM restore handles */
104 struct kmem_cache *mdt_hsm_cdt_kmem;
105
106 /* For HSM request handles */
107 struct kmem_cache *mdt_hsm_car_kmem;
108
109 static struct lu_kmem_descr mdt_caches[] = {
110         {
111                 .ckd_cache = &mdt_object_kmem,
112                 .ckd_name  = "mdt_obj",
113                 .ckd_size  = sizeof(struct mdt_object)
114         },
115         {
116                 .ckd_cache      = &mdt_hsm_cdt_kmem,
117                 .ckd_name       = "mdt_cdt_restore_handle",
118                 .ckd_size       = sizeof(struct cdt_restore_handle)
119         },
120         {
121                 .ckd_cache      = &mdt_hsm_car_kmem,
122                 .ckd_name       = "mdt_cdt_agent_req",
123                 .ckd_size       = sizeof(struct cdt_agent_req)
124         },
125         {
126                 .ckd_cache = NULL
127         }
128 };
129
130 __u64 mdt_get_disposition(struct ldlm_reply *rep, __u64 op_flag)
131 {
132         if (!rep)
133                 return 0;
134         return rep->lock_policy_res1 & op_flag;
135 }
136
137 void mdt_clear_disposition(struct mdt_thread_info *info,
138                            struct ldlm_reply *rep, __u64 op_flag)
139 {
140         if (info) {
141                 info->mti_opdata &= ~op_flag;
142                 tgt_opdata_clear(info->mti_env, op_flag);
143         }
144         if (rep)
145                 rep->lock_policy_res1 &= ~op_flag;
146 }
147
148 void mdt_set_disposition(struct mdt_thread_info *info,
149                          struct ldlm_reply *rep, __u64 op_flag)
150 {
151         if (info) {
152                 info->mti_opdata |= op_flag;
153                 tgt_opdata_set(info->mti_env, op_flag);
154         }
155         if (rep)
156                 rep->lock_policy_res1 |= op_flag;
157 }
158
159 void mdt_lock_reg_init(struct mdt_lock_handle *lh, enum ldlm_mode lm)
160 {
161         lh->mlh_pdo_hash = 0;
162         lh->mlh_reg_mode = lm;
163         lh->mlh_rreg_mode = lm;
164         lh->mlh_type = MDT_REG_LOCK;
165 }
166
167 void mdt_lh_reg_init(struct mdt_lock_handle *lh, struct ldlm_lock *lock)
168 {
169         mdt_lock_reg_init(lh, lock->l_req_mode);
170         if (lock->l_req_mode == LCK_GROUP)
171                 lh->mlh_gid = lock->l_policy_data.l_inodebits.li_gid;
172 }
173
174 void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode,
175                        const struct lu_name *lname)
176 {
177         lh->mlh_reg_mode = lock_mode;
178         lh->mlh_pdo_mode = LCK_MINMODE;
179         lh->mlh_rreg_mode = lock_mode;
180         lh->mlh_type = MDT_PDO_LOCK;
181
182         if (lu_name_is_valid(lname)) {
183                 lh->mlh_pdo_hash = ll_full_name_hash(NULL, lname->ln_name,
184                                                      lname->ln_namelen);
185                 /* XXX Workaround for LU-2856
186                  *
187                  * Zero is a valid return value of full_name_hash, but
188                  * several users of mlh_pdo_hash assume a non-zero
189                  * hash value. We therefore map zero onto an
190                  * arbitrary, but consistent value (1) to avoid
191                  * problems further down the road. */
192                 if (unlikely(lh->mlh_pdo_hash == 0))
193                         lh->mlh_pdo_hash = 1;
194         } else {
195                 lh->mlh_pdo_hash = 0;
196         }
197 }
198
199 static void mdt_lock_pdo_mode(struct mdt_thread_info *info, struct mdt_object *o,
200                               struct mdt_lock_handle *lh)
201 {
202         mdl_mode_t mode;
203         ENTRY;
204
205         /*
206          * Any dir access needs couple of locks:
207          *
208          * 1) on part of dir we gonna take lookup/modify;
209          *
210          * 2) on whole dir to protect it from concurrent splitting and/or to
211          * flush client's cache for readdir().
212          *
213          * so, for a given mode and object this routine decides what lock mode
214          * to use for lock #2:
215          *
216          * 1) if caller's gonna lookup in dir then we need to protect dir from
217          * being splitted only - LCK_CR
218          *
219          * 2) if caller's gonna modify dir then we need to protect dir from
220          * being splitted and to flush cache - LCK_CW
221          *
222          * 3) if caller's gonna modify dir and that dir seems ready for
223          * splitting then we need to protect it from any type of access
224          * (lookup/modify/split) - LCK_EX --bzzz
225          */
226
227         LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
228         LASSERT(lh->mlh_pdo_mode == LCK_MINMODE);
229
230         /*
231          * Ask underlaying level its opinion about preferable PDO lock mode
232          * having access type passed as regular lock mode:
233          *
234          * - MDL_MINMODE means that lower layer does not want to specify lock
235          * mode;
236          *
237          * - MDL_NL means that no PDO lock should be taken. This is used in some
238          * cases. Say, for non-splittable directories no need to use PDO locks
239          * at all.
240          */
241         mode = mdo_lock_mode(info->mti_env, mdt_object_child(o),
242                              mdt_dlm_mode2mdl_mode(lh->mlh_reg_mode));
243
244         if (mode != MDL_MINMODE) {
245                 lh->mlh_pdo_mode = mdt_mdl_mode2dlm_mode(mode);
246         } else {
247                 /*
248                  * Lower layer does not want to specify locking mode. We do it
249                  * our selves. No special protection is needed, just flush
250                  * client's cache on modification and allow concurrent
251                  * mondification.
252                  */
253                 switch (lh->mlh_reg_mode) {
254                 case LCK_EX:
255                         lh->mlh_pdo_mode = LCK_EX;
256                         break;
257                 case LCK_PR:
258                         lh->mlh_pdo_mode = LCK_CR;
259                         break;
260                 case LCK_PW:
261                         lh->mlh_pdo_mode = LCK_CW;
262                         break;
263                 default:
264                         CERROR("Not expected lock type (0x%x)\n",
265                                (int)lh->mlh_reg_mode);
266                         LBUG();
267                 }
268         }
269
270         LASSERT(lh->mlh_pdo_mode != LCK_MINMODE);
271         EXIT;
272 }
273
274 /**
275  * Check whether \a o is directory stripe object.
276  *
277  * \param[in]  info     thread environment
278  * \param[in]  o        MDT object
279  *
280  * \retval 1    is directory stripe.
281  * \retval 0    isn't directory stripe.
282  * \retval < 1  error code
283  */
284 static int mdt_is_dir_stripe(struct mdt_thread_info *info,
285                                 struct mdt_object *o)
286 {
287         struct md_attr *ma = &info->mti_attr;
288         struct lmv_mds_md_v1 *lmv;
289         int rc;
290
291         rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
292         if (rc < 0)
293                 return rc;
294
295         if (!(ma->ma_valid & MA_LMV))
296                 return 0;
297
298         lmv = &ma->ma_lmv->lmv_md_v1;
299
300         if (!lmv_is_sane2(lmv))
301                 return -EBADF;
302
303         if (le32_to_cpu(lmv->lmv_magic) == LMV_MAGIC_STRIPE)
304                 return 1;
305
306         return 0;
307 }
308
309 static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
310                               struct lu_fid *fid)
311 {
312         struct mdt_device *mdt = info->mti_mdt;
313         struct lu_name *lname = &info->mti_name;
314         const char *start = fileset;
315         char *filename = info->mti_filename;
316         struct mdt_object *parent;
317         u32 mode;
318         int rc = 0;
319
320         LASSERT(!info->mti_cross_ref);
321
322         /*
323          * We may want to allow this to mount a completely separate
324          * fileset from the MDT in the future, but keeping it to
325          * ROOT/ only for now avoid potential security issues.
326          */
327         *fid = mdt->mdt_md_root_fid;
328
329         while (rc == 0 && start != NULL && *start != '\0') {
330                 const char *s1 = start;
331                 const char *s2;
332
333                 while (*++s1 == '/')
334                         ;
335                 s2 = s1;
336                 while (*s2 != '/' && *s2 != '\0')
337                         s2++;
338
339                 if (s2 == s1)
340                         break;
341
342                 start = s2;
343
344                 lname->ln_namelen = s2 - s1;
345                 if (lname->ln_namelen > NAME_MAX) {
346                         rc = -EINVAL;
347                         break;
348                 }
349
350                 /* reject .. as a path component */
351                 if (lname->ln_namelen == 2 &&
352                     strncmp(s1, "..", 2) == 0) {
353                         rc = -EINVAL;
354                         break;
355                 }
356
357                 strncpy(filename, s1, lname->ln_namelen);
358                 filename[lname->ln_namelen] = '\0';
359                 lname->ln_name = filename;
360
361                 parent = mdt_object_find(info->mti_env, mdt, fid);
362                 if (IS_ERR(parent)) {
363                         rc = PTR_ERR(parent);
364                         break;
365                 }
366                 /* Only got the fid of this obj by name */
367                 fid_zero(fid);
368                 rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname,
369                                 fid, &info->mti_spec);
370                 mdt_object_put(info->mti_env, parent);
371         }
372         if (!rc) {
373                 parent = mdt_object_find(info->mti_env, mdt, fid);
374                 if (IS_ERR(parent))
375                         rc = PTR_ERR(parent);
376                 else {
377                         mode = lu_object_attr(&parent->mot_obj);
378                         if (!S_ISDIR(mode)) {
379                                 rc = -ENOTDIR;
380                         } else if (mdt_is_remote_object(info, parent, parent)) {
381                                 if (!mdt->mdt_enable_remote_subdir_mount) {
382                                         rc = -EREMOTE;
383                                         LCONSOLE_WARN("%s: subdir mount '%s' refused because 'enable_remote_subdir_mount=0': rc = %d\n",
384                                                       mdt_obd_name(mdt),
385                                                       fileset, rc);
386                                 } else {
387                                         LCONSOLE_INFO("%s: subdir mount '%s' is remote and may be slow\n",
388                                                       mdt_obd_name(mdt),
389                                                       fileset);
390                                 }
391                         }
392                         mdt_object_put(info->mti_env, parent);
393                 }
394         }
395
396         return rc;
397 }
398
399 static int mdt_get_root(struct tgt_session_info *tsi)
400 {
401         struct mdt_thread_info  *info = tsi2mdt_info(tsi);
402         struct mdt_device       *mdt = info->mti_mdt;
403         struct mdt_body         *repbody;
404         char                    *fileset = NULL, *buffer = NULL;
405         int                      rc;
406         struct obd_export       *exp = info->mti_exp;
407         char                    *nodemap_fileset;
408
409         ENTRY;
410
411         rc = mdt_check_ucred(info);
412         if (rc)
413                 GOTO(out, rc = err_serious(rc));
414
415         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GET_ROOT_PACK))
416                 GOTO(out, rc = err_serious(-ENOMEM));
417
418         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
419         if (req_capsule_get_size(info->mti_pill, &RMF_NAME, RCL_CLIENT) > 0) {
420                 fileset = req_capsule_client_get(info->mti_pill, &RMF_NAME);
421                 if (fileset == NULL)
422                         GOTO(out, rc = err_serious(-EFAULT));
423         }
424
425         nodemap_fileset = nodemap_get_fileset(exp->exp_target_data.ted_nodemap);
426         if (nodemap_fileset && nodemap_fileset[0]) {
427                 CDEBUG(D_INFO, "nodemap fileset is %s\n", nodemap_fileset);
428                 if (fileset) {
429                         /* consider fileset from client as a sub-fileset
430                          * of the nodemap one */
431                         OBD_ALLOC(buffer, PATH_MAX + 1);
432                         if (buffer == NULL)
433                                 GOTO(out, rc = err_serious(-ENOMEM));
434                         if (snprintf(buffer, PATH_MAX + 1, "%s/%s",
435                                      nodemap_fileset, fileset) >= PATH_MAX + 1)
436                                 GOTO(out, rc = err_serious(-EINVAL));
437                         fileset = buffer;
438                 } else {
439                         /* enforce fileset as specified in the nodemap */
440                         fileset = nodemap_fileset;
441                 }
442         }
443
444         if (fileset) {
445                 CDEBUG(D_INFO, "Getting fileset %s\n", fileset);
446                 rc = mdt_lookup_fileset(info, fileset, &repbody->mbo_fid1);
447                 if (rc < 0)
448                         GOTO(out, rc = err_serious(rc));
449         } else {
450                 repbody->mbo_fid1 = mdt->mdt_md_root_fid;
451         }
452         repbody->mbo_valid |= OBD_MD_FLID;
453
454         EXIT;
455 out:
456         mdt_thread_info_fini(info);
457         if (buffer)
458                 OBD_FREE(buffer, PATH_MAX+1);
459         return rc;
460 }
461
462 static int mdt_statfs(struct tgt_session_info *tsi)
463 {
464         struct ptlrpc_request *req = tgt_ses_req(tsi);
465         struct mdt_thread_info *info = tsi2mdt_info(tsi);
466         struct mdt_device *mdt = info->mti_mdt;
467         struct tg_grants_data *tgd = &mdt->mdt_lut.lut_tgd;
468         struct md_device *next = mdt->mdt_child;
469         struct ptlrpc_service_part *svcpt;
470         struct obd_statfs *osfs;
471         struct mdt_body *reqbody = NULL;
472         struct mdt_statfs_cache *msf;
473         ktime_t kstart = ktime_get();
474         int current_blockbits;
475         int rc;
476
477         ENTRY;
478
479         svcpt = req->rq_rqbd->rqbd_svcpt;
480
481         /* This will trigger a watchdog timeout */
482         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
483                          (MDT_SERVICE_WATCHDOG_FACTOR *
484                           at_get(&svcpt->scp_at_estimate)) + 1);
485
486         rc = mdt_check_ucred(info);
487         if (rc)
488                 GOTO(out, rc = err_serious(rc));
489
490         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK))
491                 GOTO(out, rc = err_serious(-ENOMEM));
492
493         osfs = req_capsule_server_get(info->mti_pill, &RMF_OBD_STATFS);
494         if (!osfs)
495                 GOTO(out, rc = -EPROTO);
496
497         if (mdt_is_sum_statfs_client(req->rq_export) &&
498                 lustre_packed_msg_size(req->rq_reqmsg) ==
499                 req_capsule_fmt_size(req->rq_reqmsg->lm_magic,
500                                      &RQF_MDS_STATFS_NEW, RCL_CLIENT)) {
501                 req_capsule_extend(info->mti_pill, &RQF_MDS_STATFS_NEW);
502                 reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
503         }
504
505         if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS)
506                 msf = &mdt->mdt_sum_osfs;
507         else
508                 msf = &mdt->mdt_osfs;
509
510         if (msf->msf_age + OBD_STATFS_CACHE_SECONDS <= ktime_get_seconds()) {
511                         /** statfs data is too old, get up-to-date one */
512                         if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS)
513                                 rc = next->md_ops->mdo_statfs(info->mti_env,
514                                                               next, osfs);
515                         else
516                                 rc = dt_statfs(info->mti_env, mdt->mdt_bottom,
517                                                osfs);
518                         if (rc)
519                                 GOTO(out, rc);
520                         spin_lock(&mdt->mdt_lock);
521                         msf->msf_osfs = *osfs;
522                         msf->msf_age = ktime_get_seconds();
523                         spin_unlock(&mdt->mdt_lock);
524         } else {
525                         /** use cached statfs data */
526                         spin_lock(&mdt->mdt_lock);
527                         *osfs = msf->msf_osfs;
528                         spin_unlock(&mdt->mdt_lock);
529         }
530
531         /* tgd_blockbit is recordsize bits set during mkfs.
532          * This once set does not change. However, 'zfs set'
533          * can be used to change the MDT blocksize. Instead
534          * of using cached value of 'tgd_blockbit' always
535          * calculate the blocksize bits which may have
536          * changed.
537          */
538         current_blockbits = fls64(osfs->os_bsize) - 1;
539
540         /* at least try to account for cached pages.  its still racy and
541          * might be under-reporting if clients haven't announced their
542          * caches with brw recently */
543         CDEBUG(D_SUPER | D_CACHE, "blocks cached %llu granted %llu"
544                " pending %llu free %llu avail %llu\n",
545                tgd->tgd_tot_dirty, tgd->tgd_tot_granted,
546                tgd->tgd_tot_pending,
547                osfs->os_bfree << current_blockbits,
548                osfs->os_bavail << current_blockbits);
549
550         osfs->os_bavail -= min_t(u64, osfs->os_bavail,
551                                  ((tgd->tgd_tot_dirty + tgd->tgd_tot_pending +
552                                    osfs->os_bsize - 1) >> current_blockbits));
553
554         tgt_grant_sanity_check(mdt->mdt_lu_dev.ld_obd, __func__);
555         CDEBUG(D_CACHE, "%llu blocks: %llu free, %llu avail; "
556                "%llu objects: %llu free; state %x\n",
557                osfs->os_blocks, osfs->os_bfree, osfs->os_bavail,
558                osfs->os_files, osfs->os_ffree, osfs->os_state);
559
560         if (!exp_grant_param_supp(tsi->tsi_exp) &&
561             current_blockbits > COMPAT_BSIZE_SHIFT) {
562                 /* clients which don't support OBD_CONNECT_GRANT_PARAM
563                  * should not see a block size > page size, otherwise
564                  * cl_lost_grant goes mad. Therefore, we emulate a 4KB (=2^12)
565                  * block size which is the biggest block size known to work
566                  * with all client's page size. */
567                 osfs->os_blocks <<= current_blockbits - COMPAT_BSIZE_SHIFT;
568                 osfs->os_bfree  <<= current_blockbits - COMPAT_BSIZE_SHIFT;
569                 osfs->os_bavail <<= current_blockbits - COMPAT_BSIZE_SHIFT;
570                 osfs->os_bsize = 1 << COMPAT_BSIZE_SHIFT;
571         }
572         if (rc == 0)
573                 mdt_counter_incr(req, LPROC_MDT_STATFS,
574                                  ktime_us_delta(ktime_get(), kstart));
575 out:
576         mdt_thread_info_fini(info);
577         RETURN(rc);
578 }
579
580 __u32 mdt_lmm_dom_entry_check(struct lov_mds_md *lmm, int *is_dom_only)
581 {
582         struct lov_comp_md_v1 *comp_v1;
583         struct lov_mds_md *v1;
584         __u32 off;
585         __u32 dom_stripesize = 0;
586         int i;
587         bool has_ost_stripes = false;
588
589         ENTRY;
590
591         if (is_dom_only)
592                 *is_dom_only = 0;
593
594         if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_COMP_V1)
595                 RETURN(0);
596
597         comp_v1 = (struct lov_comp_md_v1 *)lmm;
598         off = le32_to_cpu(comp_v1->lcm_entries[0].lcme_offset);
599         v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
600
601         /* Fast check for DoM entry with no mirroring, should be the first */
602         if (le16_to_cpu(comp_v1->lcm_mirror_count) == 0 &&
603             lov_pattern(le32_to_cpu(v1->lmm_pattern)) != LOV_PATTERN_MDT)
604                 RETURN(0);
605
606         /* check all entries otherwise */
607         for (i = 0; i < le16_to_cpu(comp_v1->lcm_entry_count); i++) {
608                 struct lov_comp_md_entry_v1 *lcme;
609
610                 lcme = &comp_v1->lcm_entries[i];
611                 if (!(le32_to_cpu(lcme->lcme_flags) & LCME_FL_INIT))
612                         continue;
613
614                 off = le32_to_cpu(lcme->lcme_offset);
615                 v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
616
617                 if (lov_pattern(le32_to_cpu(v1->lmm_pattern)) ==
618                     LOV_PATTERN_MDT)
619                         dom_stripesize = le32_to_cpu(v1->lmm_stripe_size);
620                 else
621                         has_ost_stripes = true;
622
623                 if (dom_stripesize && has_ost_stripes)
624                         RETURN(dom_stripesize);
625         }
626         /* DoM-only case exits here */
627         if (is_dom_only && dom_stripesize)
628                 *is_dom_only = 1;
629         RETURN(dom_stripesize);
630 }
631
632 /**
633  * Pack size attributes into the reply.
634  */
635 int mdt_pack_size2body(struct mdt_thread_info *info,
636                         const struct lu_fid *fid, struct lustre_handle *lh)
637 {
638         struct mdt_body *b;
639         struct md_attr *ma = &info->mti_attr;
640         __u32 dom_stripe;
641         bool dom_lock = false;
642
643         ENTRY;
644
645         LASSERT(ma->ma_attr.la_valid & LA_MODE);
646
647         if (!S_ISREG(ma->ma_attr.la_mode) ||
648             !(ma->ma_valid & MA_LOV && ma->ma_lmm != NULL))
649                 RETURN(-ENODATA);
650
651         dom_stripe = mdt_lmm_dom_stripesize(ma->ma_lmm);
652         /* no DoM stripe, no size in reply */
653         if (!dom_stripe)
654                 RETURN(-ENOENT);
655
656         if (lustre_handle_is_used(lh)) {
657                 struct ldlm_lock *lock;
658
659                 lock = ldlm_handle2lock(lh);
660                 if (lock != NULL) {
661                         dom_lock = ldlm_has_dom(lock);
662                         LDLM_LOCK_PUT(lock);
663                 }
664         }
665
666         /* no DoM lock, no size in reply */
667         if (!dom_lock)
668                 RETURN(0);
669
670         /* Either DoM lock exists or LMM has only DoM stripe then
671          * return size on body. */
672         b = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
673
674         mdt_dom_object_size(info->mti_env, info->mti_mdt, fid, b, dom_lock);
675         RETURN(0);
676 }
677
678 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
679 /*
680  * Pack ACL data into the reply. UIDs/GIDs are mapped and filtered by nodemap.
681  *
682  * \param       info    thread info object
683  * \param       repbody reply to pack ACLs into
684  * \param       o       mdt object of file to examine
685  * \param       nodemap nodemap of client to reply to
686  * \retval      0       success
687  * \retval      -errno  error getting or parsing ACL from disk
688  */
689 int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody,
690                       struct mdt_object *o, struct lu_nodemap *nodemap)
691 {
692         const struct lu_env     *env = info->mti_env;
693         struct md_object        *next = mdt_object_child(o);
694         struct lu_buf           *buf = &info->mti_buf;
695         struct mdt_device       *mdt = info->mti_mdt;
696         struct req_capsule *pill = info->mti_pill;
697         int rc;
698
699         ENTRY;
700
701         buf->lb_buf = req_capsule_server_get(pill, &RMF_ACL);
702         buf->lb_len = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER);
703         if (buf->lb_len == 0)
704                 RETURN(0);
705
706         LASSERT(!info->mti_big_acl_used);
707 again:
708         rc = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_ACCESS);
709         if (rc < 0) {
710                 if (rc == -ENODATA) {
711                         repbody->mbo_aclsize = 0;
712                         repbody->mbo_valid |= OBD_MD_FLACL;
713                         rc = 0;
714                 } else if (rc == -EOPNOTSUPP) {
715                         rc = 0;
716                 } else if (rc == -ERANGE) {
717                         if (exp_connect_large_acl(info->mti_exp) &&
718                             !info->mti_big_acl_used) {
719                                 if (info->mti_big_acl == NULL) {
720                                         info->mti_big_aclsize =
721                                                         min_t(unsigned int,
722                                                               mdt->mdt_max_ea_size,
723                                                               XATTR_SIZE_MAX);
724                                         OBD_ALLOC_LARGE(info->mti_big_acl,
725                                                         info->mti_big_aclsize);
726                                         if (info->mti_big_acl == NULL) {
727                                                 info->mti_big_aclsize = 0;
728                                                 CERROR("%s: unable to grow "
729                                                        DFID" ACL buffer\n",
730                                                        mdt_obd_name(mdt),
731                                                        PFID(mdt_object_fid(o)));
732                                                 RETURN(-ENOMEM);
733                                         }
734                                 }
735
736                                 CDEBUG(D_INODE, "%s: grow the "DFID
737                                        " ACL buffer to size %d\n",
738                                        mdt_obd_name(mdt),
739                                        PFID(mdt_object_fid(o)),
740                                        info->mti_big_aclsize);
741
742                                 buf->lb_buf = info->mti_big_acl;
743                                 buf->lb_len = info->mti_big_aclsize;
744                                 info->mti_big_acl_used = 1;
745                                 goto again;
746                         }
747                         /* FS has ACL bigger that our limits */
748                         CDEBUG(D_INODE, "%s: "DFID" ACL can't fit into %d\n",
749                                mdt_obd_name(mdt), PFID(mdt_object_fid(o)),
750                                info->mti_big_aclsize);
751                         rc = -E2BIG;
752                 } else {
753                         CERROR("%s: unable to read "DFID" ACL: rc = %d\n",
754                                mdt_obd_name(mdt), PFID(mdt_object_fid(o)), rc);
755                 }
756         } else {
757                 rc = nodemap_map_acl(nodemap, buf->lb_buf,
758                                      rc, NODEMAP_FS_TO_CLIENT);
759                 /* if all ACLs mapped out, rc is still >= 0 */
760                 if (rc < 0) {
761                         CERROR("%s: nodemap_map_acl unable to parse "DFID
762                                " ACL: rc = %d\n", mdt_obd_name(mdt),
763                                PFID(mdt_object_fid(o)), rc);
764                         repbody->mbo_aclsize = 0;
765                         repbody->mbo_valid &= ~OBD_MD_FLACL;
766                 } else {
767                         repbody->mbo_aclsize = rc;
768                         repbody->mbo_valid |= OBD_MD_FLACL;
769                         rc = 0;
770                 }
771         }
772
773         RETURN(rc);
774 }
775 #endif
776
777 /* XXX Look into layout in MDT layer. */
778 static inline bool mdt_hsm_is_released(struct lov_mds_md *lmm)
779 {
780         struct lov_comp_md_v1   *comp_v1;
781         struct lov_mds_md       *v1;
782         int                      i;
783
784         if (lmm->lmm_magic == LOV_MAGIC_COMP_V1) {
785                 comp_v1 = (struct lov_comp_md_v1 *)lmm;
786
787                 for (i = 0; i < comp_v1->lcm_entry_count; i++) {
788                         v1 = (struct lov_mds_md *)((char *)comp_v1 +
789                                 comp_v1->lcm_entries[i].lcme_offset);
790                         /* We don't support partial release for now */
791                         if (!(v1->lmm_pattern & LOV_PATTERN_F_RELEASED))
792                                 return false;
793                 }
794                 return true;
795         } else {
796                 return (lmm->lmm_pattern & LOV_PATTERN_F_RELEASED) ?
797                         true : false;
798         }
799 }
800
801 void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
802                         const struct lu_attr *attr, const struct lu_fid *fid)
803 {
804         struct md_attr *ma = &info->mti_attr;
805         struct obd_export *exp = info->mti_exp;
806         struct lu_nodemap *nodemap = NULL;
807
808         LASSERT(ma->ma_valid & MA_INODE);
809
810         if (attr->la_valid & LA_ATIME) {
811                 b->mbo_atime = attr->la_atime;
812                 b->mbo_valid |= OBD_MD_FLATIME;
813         }
814         if (attr->la_valid & LA_MTIME) {
815                 b->mbo_mtime = attr->la_mtime;
816                 b->mbo_valid |= OBD_MD_FLMTIME;
817         }
818         if (attr->la_valid & LA_CTIME) {
819                 b->mbo_ctime = attr->la_ctime;
820                 b->mbo_valid |= OBD_MD_FLCTIME;
821         }
822         if (attr->la_valid & LA_BTIME) {
823                 b->mbo_btime = attr->la_btime;
824                 b->mbo_valid |= OBD_MD_FLBTIME;
825         }
826         if (attr->la_valid & LA_FLAGS) {
827                 b->mbo_flags = attr->la_flags;
828                 b->mbo_valid |= OBD_MD_FLFLAGS;
829         }
830         if (attr->la_valid & LA_NLINK) {
831                 b->mbo_nlink = attr->la_nlink;
832                 b->mbo_valid |= OBD_MD_FLNLINK;
833         }
834         if (attr->la_valid & (LA_UID|LA_GID|LA_PROJID)) {
835                 nodemap = nodemap_get_from_exp(exp);
836                 if (IS_ERR(nodemap))
837                         goto out;
838         }
839         if (attr->la_valid & LA_UID) {
840                 b->mbo_uid = nodemap_map_id(nodemap, NODEMAP_UID,
841                                             NODEMAP_FS_TO_CLIENT,
842                                             attr->la_uid);
843                 b->mbo_valid |= OBD_MD_FLUID;
844         }
845         if (attr->la_valid & LA_GID) {
846                 b->mbo_gid = nodemap_map_id(nodemap, NODEMAP_GID,
847                                             NODEMAP_FS_TO_CLIENT,
848                                             attr->la_gid);
849                 b->mbo_valid |= OBD_MD_FLGID;
850         }
851
852         if (attr->la_valid & LA_PROJID) {
853                 b->mbo_projid = nodemap_map_id(nodemap, NODEMAP_PROJID,
854                                                NODEMAP_FS_TO_CLIENT,
855                                                attr->la_projid);
856                 b->mbo_valid |= OBD_MD_FLPROJID;
857         }
858
859         b->mbo_mode = attr->la_mode;
860         if (attr->la_valid & LA_MODE)
861                 b->mbo_valid |= OBD_MD_FLMODE;
862         if (attr->la_valid & LA_TYPE)
863                 b->mbo_valid |= OBD_MD_FLTYPE;
864
865         if (fid != NULL) {
866                 b->mbo_fid1 = *fid;
867                 b->mbo_valid |= OBD_MD_FLID;
868                 CDEBUG(D_INODE, DFID": nlink=%d, mode=%o, valid=%#llx\n",
869                        PFID(fid), b->mbo_nlink, b->mbo_mode, b->mbo_valid);
870         }
871
872         if (!(attr->la_valid & LA_TYPE))
873                 return;
874
875         b->mbo_rdev   = attr->la_rdev;
876         b->mbo_size   = attr->la_size;
877         b->mbo_blocks = attr->la_blocks;
878
879         if (!S_ISREG(attr->la_mode)) {
880                 b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLRDEV;
881         } else if (ma->ma_need & MA_LOV && !(ma->ma_valid & MA_LOV)) {
882                 /* means no objects are allocated on osts. */
883                 LASSERT(!(ma->ma_valid & MA_LOV));
884                 /* just ignore blocks occupied by extend attributes on MDS */
885                 b->mbo_blocks = 0;
886                 /* if no object is allocated on osts, the size on mds is valid.
887                  * b=22272 */
888                 b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
889         } else if ((ma->ma_valid & MA_LOV) && ma->ma_lmm != NULL) {
890                 if (mdt_hsm_is_released(ma->ma_lmm)) {
891                         /* A released file stores its size on MDS. */
892                         /* But return 1 block for released file, unless tools
893                          * like tar will consider it fully sparse. (LU-3864)
894                          */
895                         if (unlikely(b->mbo_size == 0))
896                                 b->mbo_blocks = 0;
897                         else
898                                 b->mbo_blocks = 1;
899                         b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
900                 } else if (info->mti_som_valid) { /* som is valid */
901                         b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
902                 } else if (ma->ma_valid & MA_SOM) { /* lsom is valid */
903                         b->mbo_valid |= OBD_MD_FLLAZYSIZE | OBD_MD_FLLAZYBLOCKS;
904                         b->mbo_size = ma->ma_som.ms_size;
905                         b->mbo_blocks = ma->ma_som.ms_blocks;
906                 }
907         }
908
909         if (fid != NULL && (b->mbo_valid & OBD_MD_FLSIZE ||
910                             b->mbo_valid & OBD_MD_FLLAZYSIZE))
911                 CDEBUG(D_VFSTRACE, DFID": returning size %llu\n",
912                        PFID(fid), (unsigned long long)b->mbo_size);
913
914 out:
915         if (!IS_ERR_OR_NULL(nodemap))
916                 nodemap_putref(nodemap);
917 }
918
919 static inline int mdt_body_has_lov(const struct lu_attr *la,
920                                    const struct mdt_body *body)
921 {
922         return (S_ISREG(la->la_mode) && (body->mbo_valid & OBD_MD_FLEASIZE)) ||
923                (S_ISDIR(la->la_mode) && (body->mbo_valid & OBD_MD_FLDIREA));
924 }
925
926 void mdt_client_compatibility(struct mdt_thread_info *info)
927 {
928         struct mdt_body       *body;
929         struct ptlrpc_request *req = mdt_info_req(info);
930         struct obd_export     *exp = req->rq_export;
931         struct md_attr        *ma = &info->mti_attr;
932         struct lu_attr        *la = &ma->ma_attr;
933         ENTRY;
934
935         if (exp_connect_layout(exp))
936                 /* the client can deal with 16-bit lmm_stripe_count */
937                 RETURN_EXIT;
938
939         body = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
940
941         if (!mdt_body_has_lov(la, body))
942                 RETURN_EXIT;
943
944         /* now we have a reply with a lov for a client not compatible with the
945          * layout lock so we have to clean the layout generation number */
946         if (S_ISREG(la->la_mode))
947                 ma->ma_lmm->lmm_layout_gen = 0;
948         EXIT;
949 }
950
951 static int mdt_attr_get_eabuf_size(struct mdt_thread_info *info,
952                                    struct mdt_object *o)
953 {
954         const struct lu_env *env = info->mti_env;
955         int rc, rc2;
956
957         rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL,
958                           XATTR_NAME_LOV);
959
960         if (rc == -ENODATA)
961                 rc = 0;
962
963         if (rc < 0)
964                 goto out;
965
966         /* Is it a directory? Let's check for the LMV as well */
967         if (S_ISDIR(lu_object_attr(&mdt_object_child(o)->mo_lu))) {
968                 rc2 = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL,
969                                    XATTR_NAME_LMV);
970
971                 if (rc2 == -ENODATA)
972                         rc2 = mo_xattr_get(env, mdt_object_child(o),
973                                            &LU_BUF_NULL,
974                                            XATTR_NAME_DEFAULT_LMV);
975
976                 if ((rc2 < 0 && rc2 != -ENODATA) || (rc2 > rc))
977                         rc = rc2;
978         }
979
980 out:
981         return rc;
982 }
983
984 int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
985                       const char *name)
986 {
987         const struct lu_env *env = info->mti_env;
988         int rc;
989         ENTRY;
990
991         LASSERT(info->mti_big_lmm_used == 0);
992         rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL, name);
993         if (rc < 0)
994                 RETURN(rc);
995
996         /* big_lmm may need to be grown */
997         if (info->mti_big_lmmsize < rc) {
998                 int size = size_roundup_power2(rc);
999
1000                 if (info->mti_big_lmmsize > 0) {
1001                         /* free old buffer */
1002                         LASSERT(info->mti_big_lmm);
1003                         OBD_FREE_LARGE(info->mti_big_lmm,
1004                                        info->mti_big_lmmsize);
1005                         info->mti_big_lmm = NULL;
1006                         info->mti_big_lmmsize = 0;
1007                 }
1008
1009                 OBD_ALLOC_LARGE(info->mti_big_lmm, size);
1010                 if (info->mti_big_lmm == NULL)
1011                         RETURN(-ENOMEM);
1012                 info->mti_big_lmmsize = size;
1013         }
1014         LASSERT(info->mti_big_lmmsize >= rc);
1015
1016         info->mti_buf.lb_buf = info->mti_big_lmm;
1017         info->mti_buf.lb_len = info->mti_big_lmmsize;
1018         rc = mo_xattr_get(env, mdt_object_child(o), &info->mti_buf, name);
1019
1020         RETURN(rc);
1021 }
1022
1023 int __mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
1024                      struct md_attr *ma, const char *name)
1025 {
1026         struct md_object *next = mdt_object_child(o);
1027         struct lu_buf    *buf = &info->mti_buf;
1028         int rc;
1029
1030         if (strcmp(name, XATTR_NAME_LOV) == 0) {
1031                 buf->lb_buf = ma->ma_lmm;
1032                 buf->lb_len = ma->ma_lmm_size;
1033                 LASSERT(!(ma->ma_valid & MA_LOV));
1034         } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
1035                 buf->lb_buf = ma->ma_lmv;
1036                 buf->lb_len = ma->ma_lmv_size;
1037                 LASSERT(!(ma->ma_valid & MA_LMV));
1038         } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1039                 buf->lb_buf = ma->ma_default_lmv;
1040                 buf->lb_len = ma->ma_default_lmv_size;
1041                 LASSERT(!(ma->ma_valid & MA_LMV_DEF));
1042         } else {
1043                 return -EINVAL;
1044         }
1045
1046         LASSERT(buf->lb_buf);
1047
1048         rc = mo_xattr_get(info->mti_env, next, buf, name);
1049         if (rc > 0) {
1050
1051 got:
1052                 if (strcmp(name, XATTR_NAME_LOV) == 0) {
1053                         if (info->mti_big_lmm_used)
1054                                 ma->ma_lmm = info->mti_big_lmm;
1055
1056                         /* NOT return LOV EA with hole to old client. */
1057                         if (unlikely(le32_to_cpu(ma->ma_lmm->lmm_pattern) &
1058                                      LOV_PATTERN_F_HOLE) &&
1059                             !(exp_connect_flags(info->mti_exp) &
1060                               OBD_CONNECT_LFSCK)) {
1061                                 return -EIO;
1062                         } else {
1063                                 ma->ma_lmm_size = rc;
1064                                 ma->ma_valid |= MA_LOV;
1065                         }
1066                 } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
1067                         if (info->mti_big_lmm_used)
1068                                 ma->ma_lmv = info->mti_big_lmm;
1069
1070                         ma->ma_lmv_size = rc;
1071                         ma->ma_valid |= MA_LMV;
1072                 } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1073                         ma->ma_default_lmv_size = rc;
1074                         ma->ma_valid |= MA_LMV_DEF;
1075                 }
1076
1077                 /* Update mdt_max_mdsize so all clients will be aware that */
1078                 if (info->mti_mdt->mdt_max_mdsize < rc)
1079                         info->mti_mdt->mdt_max_mdsize = rc;
1080
1081                 rc = 0;
1082         } else if (rc == -ENODATA) {
1083                 /* no LOV EA */
1084                 rc = 0;
1085         } else if (rc == -ERANGE) {
1086                 /* Default LMV has fixed size, so it must be able to fit
1087                  * in the original buffer */
1088                 if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0)
1089                         return rc;
1090                 rc = mdt_big_xattr_get(info, o, name);
1091                 if (rc > 0) {
1092                         info->mti_big_lmm_used = 1;
1093                         goto got;
1094                 }
1095         }
1096
1097         return rc;
1098 }
1099
1100 int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
1101                    struct md_attr *ma, const char *name)
1102 {
1103         int rc;
1104
1105         if (!info->mti_big_lmm) {
1106                 OBD_ALLOC(info->mti_big_lmm, PAGE_SIZE);
1107                 if (!info->mti_big_lmm)
1108                         return -ENOMEM;
1109                 info->mti_big_lmmsize = PAGE_SIZE;
1110         }
1111
1112         if (strcmp(name, XATTR_NAME_LOV) == 0) {
1113                 ma->ma_lmm = info->mti_big_lmm;
1114                 ma->ma_lmm_size = info->mti_big_lmmsize;
1115                 ma->ma_valid &= ~MA_LOV;
1116         } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
1117                 ma->ma_lmv = info->mti_big_lmm;
1118                 ma->ma_lmv_size = info->mti_big_lmmsize;
1119                 ma->ma_valid &= ~MA_LMV;
1120         } else {
1121                 LBUG();
1122         }
1123
1124         LASSERT(!info->mti_big_lmm_used);
1125         rc = __mdt_stripe_get(info, o, ma, name);
1126         /* since big_lmm is always used here, clear 'used' flag to avoid
1127          * assertion in mdt_big_xattr_get().
1128          */
1129         info->mti_big_lmm_used = 0;
1130
1131         return rc;
1132 }
1133
1134 int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
1135                       struct lu_fid *pfid)
1136 {
1137         struct lu_buf           *buf = &info->mti_buf;
1138         struct link_ea_header   *leh;
1139         struct link_ea_entry    *lee;
1140         int                      rc;
1141         ENTRY;
1142
1143         buf->lb_buf = info->mti_big_lmm;
1144         buf->lb_len = info->mti_big_lmmsize;
1145         rc = mo_xattr_get(info->mti_env, mdt_object_child(o),
1146                           buf, XATTR_NAME_LINK);
1147         /* ignore errors, MA_PFID won't be set and it is
1148          * up to the caller to treat this as an error */
1149         if (rc == -ERANGE || buf->lb_len == 0) {
1150                 rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
1151                 buf->lb_buf = info->mti_big_lmm;
1152                 buf->lb_len = info->mti_big_lmmsize;
1153         }
1154
1155         if (rc < 0)
1156                 RETURN(rc);
1157         if (rc < sizeof(*leh)) {
1158                 CERROR("short LinkEA on "DFID": rc = %d\n",
1159                        PFID(mdt_object_fid(o)), rc);
1160                 RETURN(-ENODATA);
1161         }
1162
1163         leh = (struct link_ea_header *) buf->lb_buf;
1164         lee = (struct link_ea_entry *)(leh + 1);
1165         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
1166                 leh->leh_magic = LINK_EA_MAGIC;
1167                 leh->leh_reccount = __swab32(leh->leh_reccount);
1168                 leh->leh_len = __swab64(leh->leh_len);
1169         }
1170         if (leh->leh_magic != LINK_EA_MAGIC)
1171                 RETURN(-EINVAL);
1172         if (leh->leh_reccount == 0)
1173                 RETURN(-ENODATA);
1174
1175         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
1176         fid_be_to_cpu(pfid, pfid);
1177
1178         RETURN(0);
1179 }
1180
1181 int mdt_attr_get_pfid_name(struct mdt_thread_info *info, struct mdt_object *o,
1182                            struct lu_fid *pfid, struct lu_name *lname)
1183 {
1184         struct lu_buf *buf = &info->mti_buf;
1185         struct link_ea_header *leh;
1186         struct link_ea_entry *lee;
1187         int reclen;
1188         int rc;
1189
1190         buf->lb_buf = info->mti_xattr_buf;
1191         buf->lb_len = sizeof(info->mti_xattr_buf);
1192         rc = mo_xattr_get(info->mti_env, mdt_object_child(o), buf,
1193                           XATTR_NAME_LINK);
1194         if (rc == -ERANGE) {
1195                 rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
1196                 buf->lb_buf = info->mti_big_lmm;
1197                 buf->lb_len = info->mti_big_lmmsize;
1198         }
1199         if (rc < 0)
1200                 return rc;
1201
1202         if (rc < sizeof(*leh)) {
1203                 CERROR("short LinkEA on "DFID": rc = %d\n",
1204                        PFID(mdt_object_fid(o)), rc);
1205                 return -ENODATA;
1206         }
1207
1208         leh = (struct link_ea_header *)buf->lb_buf;
1209         lee = (struct link_ea_entry *)(leh + 1);
1210         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
1211                 leh->leh_magic = LINK_EA_MAGIC;
1212                 leh->leh_reccount = __swab32(leh->leh_reccount);
1213                 leh->leh_len = __swab64(leh->leh_len);
1214         }
1215         if (leh->leh_magic != LINK_EA_MAGIC)
1216                 return -EINVAL;
1217
1218         if (leh->leh_reccount == 0)
1219                 return -ENODATA;
1220
1221         linkea_entry_unpack(lee, &reclen, lname, pfid);
1222
1223         return 0;
1224 }
1225
1226 int mdt_attr_get_complex(struct mdt_thread_info *info,
1227                          struct mdt_object *o, struct md_attr *ma)
1228 {
1229         const struct lu_env *env = info->mti_env;
1230         struct md_object    *next = mdt_object_child(o);
1231         struct lu_buf       *buf = &info->mti_buf;
1232         int                  need = ma->ma_need;
1233         int                  rc = 0, rc2;
1234         u32                  mode;
1235         ENTRY;
1236
1237         ma->ma_valid = 0;
1238
1239         if (mdt_object_exists(o) == 0)
1240                 GOTO(out, rc = -ENOENT);
1241         mode = lu_object_attr(&next->mo_lu);
1242
1243         if (need & MA_INODE) {
1244                 ma->ma_need = MA_INODE;
1245                 rc = mo_attr_get(env, next, ma);
1246                 if (rc)
1247                         GOTO(out, rc);
1248
1249                 if (S_ISREG(mode))
1250                         (void) mdt_get_som(info, o, ma);
1251                 ma->ma_valid |= MA_INODE;
1252         }
1253
1254         if (need & MA_PFID) {
1255                 rc = mdt_attr_get_pfid(info, o, &ma->ma_pfid);
1256                 if (rc == 0)
1257                         ma->ma_valid |= MA_PFID;
1258                 /* ignore this error, parent fid is not mandatory */
1259                 rc = 0;
1260         }
1261
1262         if (need & MA_LOV && (S_ISREG(mode) || S_ISDIR(mode))) {
1263                 rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LOV);
1264                 if (rc)
1265                         GOTO(out, rc);
1266         }
1267
1268         if (need & MA_LMV && S_ISDIR(mode)) {
1269                 rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
1270                 if (rc != 0)
1271                         GOTO(out, rc);
1272         }
1273
1274         if (need & MA_LMV_DEF && S_ISDIR(mode)) {
1275                 rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV);
1276                 if (rc != 0)
1277                         GOTO(out, rc);
1278         }
1279
1280         /*
1281          * In the handle of MA_INODE, we may already get the SOM attr.
1282          */
1283         if (need & MA_SOM && S_ISREG(mode) && !(ma->ma_valid & MA_SOM)) {
1284                 rc = mdt_get_som(info, o, ma);
1285                 if (rc != 0)
1286                         GOTO(out, rc);
1287         }
1288
1289         if (need & MA_HSM && S_ISREG(mode)) {
1290                 buf->lb_buf = info->mti_xattr_buf;
1291                 buf->lb_len = sizeof(info->mti_xattr_buf);
1292                 BUILD_BUG_ON(sizeof(struct hsm_attrs) >
1293                              sizeof(info->mti_xattr_buf));
1294                 rc2 = mo_xattr_get(info->mti_env, next, buf, XATTR_NAME_HSM);
1295                 rc2 = lustre_buf2hsm(info->mti_xattr_buf, rc2, &ma->ma_hsm);
1296                 if (rc2 == 0)
1297                         ma->ma_valid |= MA_HSM;
1298                 else if (rc2 < 0 && rc2 != -ENODATA)
1299                         GOTO(out, rc = rc2);
1300         }
1301
1302 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
1303         if (need & MA_ACL_DEF && S_ISDIR(mode)) {
1304                 buf->lb_buf = ma->ma_acl;
1305                 buf->lb_len = ma->ma_acl_size;
1306                 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT);
1307                 if (rc2 > 0) {
1308                         ma->ma_acl_size = rc2;
1309                         ma->ma_valid |= MA_ACL_DEF;
1310                 } else if (rc2 == -ENODATA) {
1311                         /* no ACLs */
1312                         ma->ma_acl_size = 0;
1313                 } else
1314                         GOTO(out, rc = rc2);
1315         }
1316 #endif
1317 out:
1318         ma->ma_need = need;
1319         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = %#llx ma_lmm=%p\n",
1320                rc, ma->ma_valid, ma->ma_lmm);
1321         RETURN(rc);
1322 }
1323
1324 static int mdt_getattr_internal(struct mdt_thread_info *info,
1325                                 struct mdt_object *o, int ma_need)
1326 {
1327         struct mdt_device *mdt = info->mti_mdt;
1328         struct md_object *next = mdt_object_child(o);
1329         const struct mdt_body *reqbody = info->mti_body;
1330         struct ptlrpc_request *req = mdt_info_req(info);
1331         struct md_attr *ma = &info->mti_attr;
1332         struct lu_attr *la = &ma->ma_attr;
1333         struct req_capsule *pill = info->mti_pill;
1334         const struct lu_env *env = info->mti_env;
1335         struct mdt_body *repbody;
1336         struct lu_buf *buffer = &info->mti_buf;
1337         struct obd_export *exp = info->mti_exp;
1338         ktime_t kstart = ktime_get();
1339         int rc;
1340
1341         ENTRY;
1342
1343         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
1344                 RETURN(err_serious(-ENOMEM));
1345
1346         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
1347
1348         ma->ma_valid = 0;
1349
1350         if (mdt_object_remote(o)) {
1351                 /* This object is located on remote node.*/
1352                 /* Return -ENOTSUPP for old client */
1353                 if (!mdt_is_dne_client(req->rq_export))
1354                         GOTO(out, rc = -ENOTSUPP);
1355
1356                 repbody->mbo_fid1 = *mdt_object_fid(o);
1357                 repbody->mbo_valid = OBD_MD_FLID | OBD_MD_MDS;
1358                 GOTO(out, rc = 0);
1359         }
1360
1361         if (reqbody->mbo_eadatasize > 0) {
1362                 buffer->lb_buf = req_capsule_server_get(pill, &RMF_MDT_MD);
1363                 if (buffer->lb_buf == NULL)
1364                         GOTO(out, rc = -EPROTO);
1365                 buffer->lb_len = req_capsule_get_size(pill, &RMF_MDT_MD,
1366                                                       RCL_SERVER);
1367         } else {
1368                 buffer->lb_buf = NULL;
1369                 buffer->lb_len = 0;
1370                 ma_need &= ~(MA_LOV | MA_LMV);
1371                 CDEBUG(D_INFO, "%s: RPC from %s: does not need LOVEA.\n",
1372                        mdt_obd_name(info->mti_mdt),
1373                        req->rq_export->exp_client_uuid.uuid);
1374         }
1375
1376         /* from 2.12.58 intent_getattr pack default LMV in reply */
1377         if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
1378             ((reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA)) ==
1379                     (OBD_MD_MEA | OBD_MD_DEFAULT_MEA)) &&
1380             req_capsule_has_field(&req->rq_pill, &RMF_DEFAULT_MDT_MD,
1381                                   RCL_SERVER)) {
1382                 ma->ma_lmv = buffer->lb_buf;
1383                 ma->ma_lmv_size = buffer->lb_len;
1384                 ma->ma_default_lmv = req_capsule_server_get(pill,
1385                                                 &RMF_DEFAULT_MDT_MD);
1386                 ma->ma_default_lmv_size = req_capsule_get_size(pill,
1387                                                 &RMF_DEFAULT_MDT_MD,
1388                                                 RCL_SERVER);
1389                 ma->ma_need = MA_INODE;
1390                 if (ma->ma_lmv_size > 0)
1391                         ma->ma_need |= MA_LMV;
1392                 if (ma->ma_default_lmv_size > 0)
1393                         ma->ma_need |= MA_LMV_DEF;
1394         } else if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
1395                    (reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA))) {
1396                 /* If it is dir and client require MEA, then we got MEA */
1397                 /* Assumption: MDT_MD size is enough for lmv size. */
1398                 ma->ma_lmv = buffer->lb_buf;
1399                 ma->ma_lmv_size = buffer->lb_len;
1400                 ma->ma_need = MA_INODE;
1401                 if (ma->ma_lmv_size > 0) {
1402                         if (reqbody->mbo_valid & OBD_MD_MEA) {
1403                                 ma->ma_need |= MA_LMV;
1404                         } else if (reqbody->mbo_valid & OBD_MD_DEFAULT_MEA) {
1405                                 ma->ma_need |= MA_LMV_DEF;
1406                                 ma->ma_default_lmv = buffer->lb_buf;
1407                                 ma->ma_lmv = NULL;
1408                                 ma->ma_default_lmv_size = buffer->lb_len;
1409                                 ma->ma_lmv_size = 0;
1410                         }
1411                 }
1412         } else {
1413                 ma->ma_lmm = buffer->lb_buf;
1414                 ma->ma_lmm_size = buffer->lb_len;
1415                 ma->ma_need = MA_INODE | MA_HSM;
1416                 if (ma->ma_lmm_size > 0) {
1417                         ma->ma_need |= MA_LOV;
1418                         /* Older clients may crash if they getattr overstriped
1419                          * files
1420                          */
1421                         if (!exp_connect_overstriping(exp) &&
1422                             mdt_lmm_is_overstriping(ma->ma_lmm))
1423                                 RETURN(-EOPNOTSUPP);
1424                 }
1425         }
1426
1427         if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
1428             reqbody->mbo_valid & OBD_MD_FLDIREA  &&
1429             lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) {
1430                 /* get default stripe info for this dir. */
1431                 ma->ma_need |= MA_LOV_DEF;
1432         }
1433         ma->ma_need |= ma_need;
1434
1435         rc = mdt_attr_get_complex(info, o, ma);
1436         if (unlikely(rc)) {
1437                 CDEBUG_LIMIT(rc == -ENOENT ? D_OTHER : D_ERROR,
1438                              "%s: getattr error for "DFID": rc = %d\n",
1439                              mdt_obd_name(info->mti_mdt),
1440                              PFID(mdt_object_fid(o)), rc);
1441                 RETURN(rc);
1442         }
1443
1444         /* if file is released, check if a restore is running */
1445         if (ma->ma_valid & MA_HSM) {
1446                 repbody->mbo_valid |= OBD_MD_TSTATE;
1447                 if ((ma->ma_hsm.mh_flags & HS_RELEASED) &&
1448                     mdt_hsm_restore_is_running(info, mdt_object_fid(o)))
1449                         repbody->mbo_t_state = MS_RESTORE;
1450         }
1451
1452         if (unlikely(!(ma->ma_valid & MA_INODE)))
1453                 RETURN(-EFAULT);
1454
1455         mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
1456
1457         if (mdt_body_has_lov(la, reqbody)) {
1458                 u32 stripe_count = 1;
1459                 bool fixed_layout = false;
1460
1461                 if (ma->ma_valid & MA_LOV) {
1462                         LASSERT(ma->ma_lmm_size);
1463                         repbody->mbo_eadatasize = ma->ma_lmm_size;
1464                         if (S_ISDIR(la->la_mode))
1465                                 repbody->mbo_valid |= OBD_MD_FLDIREA;
1466                         else
1467                                 repbody->mbo_valid |= OBD_MD_FLEASIZE;
1468                         mdt_dump_lmm(D_INFO, ma->ma_lmm, repbody->mbo_valid);
1469                 }
1470                 if (ma->ma_valid & MA_LMV) {
1471                         struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1472                         u32 magic = le32_to_cpu(lmv->lmv_magic);
1473
1474                         /* Return -ENOTSUPP for old client */
1475                         if (!mdt_is_striped_client(req->rq_export))
1476                                 RETURN(-ENOTSUPP);
1477
1478                         LASSERT(S_ISDIR(la->la_mode));
1479                         mdt_dump_lmv(D_INFO, ma->ma_lmv);
1480                         repbody->mbo_eadatasize = ma->ma_lmv_size;
1481                         repbody->mbo_valid |= (OBD_MD_FLDIREA|OBD_MD_MEA);
1482
1483                         stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
1484                         fixed_layout = lmv_is_fixed(lmv);
1485                         if (magic == LMV_MAGIC_STRIPE && lmv_is_restriping(lmv))
1486                                 mdt_restripe_migrate_add(info, o);
1487                         else if (magic == LMV_MAGIC_V1 &&
1488                                  lmv_is_restriping(lmv))
1489                                 mdt_restripe_update_add(info, o);
1490                 }
1491                 if (ma->ma_valid & MA_LMV_DEF) {
1492                         /* Return -ENOTSUPP for old client */
1493                         if (!mdt_is_striped_client(req->rq_export))
1494                                 RETURN(-ENOTSUPP);
1495                         LASSERT(S_ISDIR(la->la_mode));
1496                         /*
1497                          * when ll_dir_getstripe() gets default LMV, it
1498                          * checks mbo_eadatasize.
1499                          */
1500                         if (!(ma->ma_valid & MA_LMV))
1501                                 repbody->mbo_eadatasize =
1502                                         ma->ma_default_lmv_size;
1503                         repbody->mbo_valid |= (OBD_MD_FLDIREA |
1504                                                OBD_MD_DEFAULT_MEA);
1505                 }
1506                 CDEBUG(D_VFSTRACE,
1507                        "dirent count %llu stripe count %u MDT count %d\n",
1508                        ma->ma_attr.la_dirent_count, stripe_count,
1509                        atomic_read(&mdt->mdt_mds_mds_conns) + 1);
1510                 if (ma->ma_attr.la_dirent_count != LU_DIRENT_COUNT_UNSET &&
1511                     ma->ma_attr.la_dirent_count >
1512                         mdt->mdt_restriper.mdr_dir_split_count &&
1513                     !fid_is_root(mdt_object_fid(o)) &&
1514                     mdt->mdt_enable_dir_auto_split &&
1515                     !o->mot_restriping &&
1516                     stripe_count < atomic_read(&mdt->mdt_mds_mds_conns) + 1 &&
1517                     !fixed_layout)
1518                         mdt_auto_split_add(info, o);
1519         } else if (S_ISLNK(la->la_mode) &&
1520                    reqbody->mbo_valid & OBD_MD_LINKNAME) {
1521                 buffer->lb_buf = ma->ma_lmm;
1522                 /* eadatasize from client includes NULL-terminator, so
1523                  * there is no need to read it */
1524                 buffer->lb_len = reqbody->mbo_eadatasize - 1;
1525                 rc = mo_readlink(env, next, buffer);
1526                 if (unlikely(rc <= 0)) {
1527                         CERROR("%s: readlink failed for "DFID": rc = %d\n",
1528                                mdt_obd_name(info->mti_mdt),
1529                                PFID(mdt_object_fid(o)), rc);
1530                         rc = -EFAULT;
1531                 } else {
1532                         int print_limit = min_t(int, PAGE_SIZE - 128, rc);
1533
1534                         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READLINK_EPROTO))
1535                                 rc -= 2;
1536                         repbody->mbo_valid |= OBD_MD_LINKNAME;
1537                         /* we need to report back size with NULL-terminator
1538                          * because client expects that */
1539                         repbody->mbo_eadatasize = rc + 1;
1540                         if (repbody->mbo_eadatasize != reqbody->mbo_eadatasize)
1541                                 CDEBUG(D_INODE, "%s: Read shorter symlink %d "
1542                                        "on "DFID ", expected %d\n",
1543                                        mdt_obd_name(info->mti_mdt),
1544                                        rc, PFID(mdt_object_fid(o)),
1545                                        reqbody->mbo_eadatasize - 1);
1546                         /* NULL terminate */
1547                         ((char *)ma->ma_lmm)[rc] = 0;
1548
1549                         /* If the total CDEBUG() size is larger than a page, it
1550                          * will print a warning to the console, avoid this by
1551                          * printing just the last part of the symlink. */
1552                         CDEBUG(D_INODE, "symlink dest %s%.*s, len = %d\n",
1553                                print_limit < rc ? "..." : "", print_limit,
1554                                (char *)ma->ma_lmm + rc - print_limit, rc);
1555                         rc = 0;
1556                 }
1557         }
1558
1559         if (reqbody->mbo_valid & OBD_MD_FLMODEASIZE) {
1560                 repbody->mbo_max_mdsize = info->mti_mdt->mdt_max_mdsize;
1561                 repbody->mbo_valid |= OBD_MD_FLMODEASIZE;
1562                 CDEBUG(D_INODE, "changing the max MD size to %u\n",
1563                        repbody->mbo_max_mdsize);
1564         }
1565
1566 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
1567         if ((exp_connect_flags(req->rq_export) & OBD_CONNECT_ACL) &&
1568                  (reqbody->mbo_valid & OBD_MD_FLACL)) {
1569                 struct lu_nodemap *nodemap = nodemap_get_from_exp(exp);
1570                 if (IS_ERR(nodemap))
1571                         RETURN(PTR_ERR(nodemap));
1572
1573                 rc = mdt_pack_acl2body(info, repbody, o, nodemap);
1574                 nodemap_putref(nodemap);
1575         }
1576 #endif
1577
1578 out:
1579         if (rc == 0)
1580                 mdt_counter_incr(req, LPROC_MDT_GETATTR,
1581                                  ktime_us_delta(ktime_get(), kstart));
1582
1583         RETURN(rc);
1584 }
1585
1586 static int mdt_getattr(struct tgt_session_info *tsi)
1587 {
1588         struct mdt_thread_info  *info = tsi2mdt_info(tsi);
1589         struct mdt_object       *obj = info->mti_object;
1590         struct req_capsule      *pill = info->mti_pill;
1591         struct mdt_body         *reqbody;
1592         struct mdt_body         *repbody;
1593         int rc, rc2;
1594         ENTRY;
1595
1596         if (unlikely(info->mti_object == NULL))
1597                 RETURN(-EPROTO);
1598
1599         reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
1600         LASSERT(reqbody);
1601         LASSERT(lu_object_assert_exists(&obj->mot_obj));
1602
1603         /* Special case for Data-on-MDT files to get data version */
1604         if (unlikely(reqbody->mbo_valid & OBD_MD_FLDATAVERSION)) {
1605                 rc = mdt_data_version_get(tsi);
1606                 GOTO(out, rc);
1607         }
1608
1609         /* Unlike intent case where we need to pre-fill out buffers early on
1610          * in intent policy for ldlm reasons, here we can have a much better
1611          * guess at EA size by just reading it from disk.
1612          * Exceptions are readdir and (missing) directory striping */
1613         /* Readlink */
1614         if (reqbody->mbo_valid & OBD_MD_LINKNAME) {
1615                 /* No easy way to know how long is the symlink, but it cannot
1616                  * be more than PATH_MAX, so we allocate +1 */
1617                 rc = PATH_MAX + 1;
1618         /* A special case for fs ROOT: getattr there might fetch
1619          * default EA for entire fs, not just for this dir!
1620          */
1621         } else if (lu_fid_eq(mdt_object_fid(obj),
1622                              &info->mti_mdt->mdt_md_root_fid) &&
1623                    (reqbody->mbo_valid & OBD_MD_FLDIREA) &&
1624                    (lustre_msg_get_opc(mdt_info_req(info)->rq_reqmsg) ==
1625                                                                  MDS_GETATTR)) {
1626                 /* Should the default strping be bigger, mdt_fix_reply
1627                  * will reallocate */
1628                 rc = DEF_REP_MD_SIZE;
1629         } else {
1630                 /* Read the actual EA size from disk */
1631                 rc = mdt_attr_get_eabuf_size(info, obj);
1632         }
1633
1634         if (rc < 0)
1635                 GOTO(out, rc = err_serious(rc));
1636
1637         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, rc);
1638
1639         /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
1640          * by default. If the target object has more ACL entries, then
1641          * enlarge the buffer when necessary. */
1642         req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
1643                              LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1644
1645         rc = req_capsule_server_pack(pill);
1646         if (unlikely(rc != 0))
1647                 GOTO(out, rc = err_serious(rc));
1648
1649         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
1650         LASSERT(repbody != NULL);
1651         repbody->mbo_eadatasize = 0;
1652         repbody->mbo_aclsize = 0;
1653
1654         rc = mdt_check_ucred(info);
1655         if (unlikely(rc))
1656                 GOTO(out_shrink, rc);
1657
1658         info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
1659
1660         rc = mdt_getattr_internal(info, obj, 0);
1661         EXIT;
1662 out_shrink:
1663         mdt_client_compatibility(info);
1664         rc2 = mdt_fix_reply(info);
1665         if (rc == 0)
1666                 rc = rc2;
1667 out:
1668         mdt_thread_info_fini(info);
1669         return rc;
1670 }
1671
1672 /**
1673  * Handler of layout intent RPC requiring the layout modification
1674  *
1675  * \param[in]  info     thread environment
1676  * \param[in]  obj      object
1677  * \param[out] lhc      object ldlm lock handle
1678  * \param[in]  layout   layout change descriptor
1679  *
1680  * \retval 0    on success
1681  * \retval < 0  error code
1682  */
1683 int mdt_layout_change(struct mdt_thread_info *info, struct mdt_object *obj,
1684                       struct mdt_lock_handle *lhc,
1685                       struct md_layout_change *layout)
1686 {
1687         int rc;
1688
1689         ENTRY;
1690
1691         if (!mdt_object_exists(obj))
1692                 RETURN(-ENOENT);
1693
1694         if (!S_ISREG(lu_object_attr(&obj->mot_obj)))
1695                 RETURN(-EINVAL);
1696
1697         rc = mo_permission(info->mti_env, NULL, mdt_object_child(obj), NULL,
1698                            MAY_WRITE);
1699         if (rc)
1700                 RETURN(rc);
1701
1702         rc = mdt_check_resent_lock(info, obj, lhc);
1703         if (rc < 0)
1704                 RETURN(rc);
1705
1706         if (rc > 0) {
1707                 /* not resent */
1708                 __u64 lockpart = MDS_INODELOCK_LAYOUT;
1709
1710                 /* take layout lock to prepare layout change */
1711                 if (layout->mlc_opc == MD_LAYOUT_WRITE)
1712                         lockpart |= MDS_INODELOCK_UPDATE;
1713
1714                 mdt_lock_handle_init(lhc);
1715                 mdt_lock_reg_init(lhc, LCK_EX);
1716                 rc = mdt_reint_object_lock(info, obj, lhc, lockpart, false);
1717                 if (rc)
1718                         RETURN(rc);
1719         }
1720
1721         mutex_lock(&obj->mot_som_mutex);
1722         rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout);
1723         mutex_unlock(&obj->mot_som_mutex);
1724
1725         if (rc)
1726                 mdt_object_unlock(info, obj, lhc, 1);
1727
1728         RETURN(rc);
1729 }
1730
1731 /**
1732  * Exchange MOF_LOV_CREATED flags between two objects after a
1733  * layout swap. No assumption is made on whether o1 or o2 have
1734  * created objects or not.
1735  *
1736  * \param[in,out] o1    First swap layout object
1737  * \param[in,out] o2    Second swap layout object
1738  */
1739 static void mdt_swap_lov_flag(struct mdt_object *o1, struct mdt_object *o2)
1740 {
1741         unsigned int o1_lov_created = o1->mot_lov_created;
1742
1743         mutex_lock(&o1->mot_lov_mutex);
1744         mutex_lock(&o2->mot_lov_mutex);
1745
1746         o1->mot_lov_created = o2->mot_lov_created;
1747         o2->mot_lov_created = o1_lov_created;
1748
1749         mutex_unlock(&o2->mot_lov_mutex);
1750         mutex_unlock(&o1->mot_lov_mutex);
1751 }
1752
1753 static int mdt_swap_layouts(struct tgt_session_info *tsi)
1754 {
1755         struct mdt_thread_info  *info;
1756         struct ptlrpc_request   *req = tgt_ses_req(tsi);
1757         struct obd_export       *exp = req->rq_export;
1758         struct mdt_object       *o1, *o2, *o;
1759         struct mdt_lock_handle  *lh1, *lh2;
1760         struct mdc_swap_layouts *msl;
1761         int                      rc;
1762         ENTRY;
1763
1764         /* client does not support layout lock, so layout swaping
1765          * is disabled.
1766          * FIXME: there is a problem for old clients which don't support
1767          * layout lock yet. If those clients have already opened the file
1768          * they won't be notified at all so that old layout may still be
1769          * used to do IO. This can be fixed after file release is landed by
1770          * doing exclusive open and taking full EX ibits lock. - Jinshan */
1771         if (!exp_connect_layout(exp))
1772                 RETURN(-EOPNOTSUPP);
1773
1774         info = tsi2mdt_info(tsi);
1775         if (unlikely(info->mti_object == NULL))
1776                 RETURN(-EPROTO);
1777
1778         if (info->mti_dlm_req != NULL)
1779                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1780
1781         o1 = info->mti_object;
1782         o = o2 = mdt_object_find(info->mti_env, info->mti_mdt,
1783                                 &info->mti_body->mbo_fid2);
1784         if (IS_ERR(o))
1785                 GOTO(out, rc = PTR_ERR(o));
1786
1787         if (mdt_object_remote(o) || !mdt_object_exists(o)) /* remote object */
1788                 GOTO(put, rc = -ENOENT);
1789
1790         rc = lu_fid_cmp(&info->mti_body->mbo_fid1, &info->mti_body->mbo_fid2);
1791         if (unlikely(rc == 0)) /* same file, you kidding me? no-op. */
1792                 GOTO(put, rc);
1793
1794         if (rc < 0)
1795                 swap(o1, o2);
1796
1797         /* permission check. Make sure the calling process having permission
1798          * to write both files. */
1799         rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL,
1800                            MAY_WRITE);
1801         if (rc < 0)
1802                 GOTO(put, rc);
1803
1804         rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2), NULL,
1805                            MAY_WRITE);
1806         if (rc < 0)
1807                 GOTO(put, rc);
1808
1809         msl = req_capsule_client_get(info->mti_pill, &RMF_SWAP_LAYOUTS);
1810         if (msl == NULL)
1811                 GOTO(put, rc = -EPROTO);
1812
1813         lh1 = &info->mti_lh[MDT_LH_NEW];
1814         mdt_lock_reg_init(lh1, LCK_EX);
1815         lh2 = &info->mti_lh[MDT_LH_OLD];
1816         mdt_lock_reg_init(lh2, LCK_EX);
1817
1818         rc = mdt_object_lock(info, o1, lh1, MDS_INODELOCK_LAYOUT |
1819                              MDS_INODELOCK_XATTR);
1820         if (rc < 0)
1821                 GOTO(put, rc);
1822
1823         rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
1824                              MDS_INODELOCK_XATTR);
1825         if (rc < 0)
1826                 GOTO(unlock1, rc);
1827
1828         rc = mo_swap_layouts(info->mti_env, mdt_object_child(o1),
1829                              mdt_object_child(o2), msl->msl_flags);
1830         if (rc < 0)
1831                 GOTO(unlock2, rc);
1832
1833         mdt_swap_lov_flag(o1, o2);
1834
1835 unlock2:
1836         mdt_object_unlock(info, o2, lh2, rc);
1837 unlock1:
1838         mdt_object_unlock(info, o1, lh1, rc);
1839 put:
1840         mdt_object_put(info->mti_env, o);
1841 out:
1842         mdt_thread_info_fini(info);
1843         RETURN(rc);
1844 }
1845
1846 static int mdt_raw_lookup(struct mdt_thread_info *info,
1847                           struct mdt_object *parent,
1848                           const struct lu_name *lname)
1849 {
1850         struct lu_fid *fid = &info->mti_tmp_fid1;
1851         struct mdt_body *repbody;
1852         bool is_dotdot = false;
1853         bool is_old_parent_stripe = false;
1854         bool is_new_parent_checked = false;
1855         int rc;
1856
1857         ENTRY;
1858
1859         LASSERT(!info->mti_cross_ref);
1860         /* Always allow to lookup ".." */
1861         if (lname->ln_namelen == 2 &&
1862             lname->ln_name[0] == '.' && lname->ln_name[1] == '.') {
1863                 info->mti_spec.sp_permitted = 1;
1864                 is_dotdot = true;
1865                 if (mdt_is_dir_stripe(info, parent) == 1)
1866                         is_old_parent_stripe = true;
1867         }
1868
1869         mdt_object_get(info->mti_env, parent);
1870 lookup:
1871         /* Only got the fid of this obj by name */
1872         fid_zero(fid);
1873         rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname, fid,
1874                         &info->mti_spec);
1875         mdt_object_put(info->mti_env, parent);
1876         if (rc)
1877                 RETURN(rc);
1878
1879         /* getattr_name("..") should return master object FID for striped dir */
1880         if (is_dotdot && (is_old_parent_stripe || !is_new_parent_checked)) {
1881                 parent = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1882                 if (IS_ERR(parent))
1883                         RETURN(PTR_ERR(parent));
1884
1885                 /* old client getattr_name("..") with stripe FID */
1886                 if (unlikely(is_old_parent_stripe)) {
1887                         is_old_parent_stripe = false;
1888                         goto lookup;
1889                 }
1890
1891                 /* ".." may be a stripe */
1892                 if (unlikely(mdt_is_dir_stripe(info, parent) == 1)) {
1893                         is_new_parent_checked = true;
1894                         goto lookup;
1895                 }
1896
1897                 mdt_object_put(info->mti_env, parent);
1898         }
1899
1900         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1901         repbody->mbo_fid1 = *fid;
1902         repbody->mbo_valid = OBD_MD_FLID;
1903
1904         RETURN(rc);
1905 }
1906
1907 /**
1908  * Find name matching hash
1909  *
1910  * We search \a child LinkEA for a name whose hash matches \a lname
1911  * (it contains an encoded hash).
1912  *
1913  * \param info mdt thread info
1914  * \param lname encoded hash to find
1915  * \param parent parent object
1916  * \param child object to search with LinkEA
1917  * \param force_check true to check hash even if LinkEA has only one entry
1918  *
1919  * \retval 1 match found
1920  * \retval 0 no match found
1921  * \retval -ev negative errno upon error
1922  */
1923 int find_name_matching_hash(struct mdt_thread_info *info, struct lu_name *lname,
1924                             struct mdt_object *parent, struct mdt_object *child,
1925                             bool force_check)
1926 {
1927         /* Here, lname is an encoded hash of on-disk name, and
1928          * client is doing access without encryption key.
1929          * So we need to get LinkEA, check parent fid is correct and
1930          * compare name hash with the one in the request.
1931          */
1932         struct lu_buf *buf = &info->mti_big_buf;
1933         struct lu_name name;
1934         struct lu_fid pfid;
1935         struct linkea_data ldata = { NULL };
1936         struct link_ea_header *leh;
1937         struct link_ea_entry *lee;
1938         struct lu_buf link = { 0 };
1939         char *hash = NULL;
1940         int reclen, count, rc;
1941
1942         ENTRY;
1943
1944         if (lname->ln_namelen < LLCRYPT_FNAME_DIGEST_SIZE)
1945                 RETURN(-EINVAL);
1946
1947         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
1948         if (!buf->lb_buf)
1949                 RETURN(-ENOMEM);
1950
1951         ldata.ld_buf = buf;
1952         rc = mdt_links_read(info, child, &ldata);
1953         if (rc < 0)
1954                 RETURN(rc);
1955
1956         leh = buf->lb_buf;
1957         if (force_check || leh->leh_reccount > 1) {
1958                 hash = kmalloc(lname->ln_namelen, GFP_NOFS);
1959                 if (!hash)
1960                         RETURN(-ENOMEM);
1961                 rc = critical_decode(lname->ln_name, lname->ln_namelen, hash);
1962         }
1963         lee = (struct link_ea_entry *)(leh + 1);
1964         for (count = 0; count < leh->leh_reccount; count++) {
1965                 linkea_entry_unpack(lee, &reclen, &name, &pfid);
1966                 if (!force_check && leh->leh_reccount == 1) {
1967                         /* if there is only one rec, it has to be it */
1968                         *lname = name;
1969                         break;
1970                 }
1971                 if (!parent || lu_fid_eq(&pfid, mdt_object_fid(parent))) {
1972                         lu_buf_check_and_alloc(&link, name.ln_namelen);
1973                         if (!link.lb_buf)
1974                                 GOTO(out_match, rc = -ENOMEM);
1975                         rc = critical_decode(name.ln_name, name.ln_namelen,
1976                                              link.lb_buf);
1977
1978                         if (memcmp(LLCRYPT_FNAME_DIGEST(link.lb_buf, rc),
1979                                    hash, LLCRYPT_FNAME_DIGEST_SIZE) == 0) {
1980                                 *lname = name;
1981                                 break;
1982                         }
1983                 }
1984                 lee = (struct link_ea_entry *) ((char *)lee + reclen);
1985         }
1986         if (count == leh->leh_reccount)
1987                 rc = 0;
1988         else
1989                 rc = 1;
1990
1991 out_match:
1992         lu_buf_free(&link);
1993         kfree(hash);
1994
1995         RETURN(rc);
1996 }
1997
1998 /*
1999  * UPDATE lock should be taken against parent, and be released before exit;
2000  * child_bits lock should be taken against child, and be returned back:
2001  *            (1)normal request should release the child lock;
2002  *            (2)intent request will grant the lock to client.
2003  */
2004 static int mdt_getattr_name_lock(struct mdt_thread_info *info,
2005                                  struct mdt_lock_handle *lhc,
2006                                  __u64 child_bits,
2007                                  struct ldlm_reply *ldlm_rep)
2008 {
2009         struct ptlrpc_request *req = mdt_info_req(info);
2010         struct mdt_body *reqbody = NULL;
2011         struct mdt_object *parent = info->mti_object;
2012         struct mdt_object *child = NULL;
2013         struct lu_fid *child_fid = &info->mti_tmp_fid1;
2014         struct lu_name *lname = NULL;
2015         struct mdt_lock_handle *lhp = NULL;
2016         struct ldlm_lock *lock;
2017         struct req_capsule *pill = info->mti_pill;
2018         __u64 try_bits = 0;
2019         bool is_resent;
2020         int ma_need = 0;
2021         int rc;
2022
2023         ENTRY;
2024
2025         is_resent = lustre_handle_is_used(&lhc->mlh_reg_lh);
2026         LASSERT(ergo(is_resent,
2027                      lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT));
2028
2029         if (parent == NULL)
2030                 RETURN(-ENOENT);
2031
2032         if (info->mti_cross_ref) {
2033                 /* Only getattr on the child. Parent is on another node. */
2034                 mdt_set_disposition(info, ldlm_rep,
2035                                     DISP_LOOKUP_EXECD | DISP_LOOKUP_POS);
2036                 child = parent;
2037                 CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID", "
2038                        "ldlm_rep = %p\n",
2039                        PFID(mdt_object_fid(child)), ldlm_rep);
2040
2041                 rc = mdt_check_resent_lock(info, child, lhc);
2042                 if (rc < 0) {
2043                         RETURN(rc);
2044                 } else if (rc > 0) {
2045                         mdt_lock_handle_init(lhc);
2046                         mdt_lock_reg_init(lhc, LCK_PR);
2047
2048                         /*
2049                          * Object's name entry is on another MDS, it will
2050                          * request PERM lock only because LOOKUP lock is owned
2051                          * by the MDS where name entry resides.
2052                          *
2053                          * TODO: it should try layout lock too. - Jinshan
2054                          */
2055                         child_bits &= ~(MDS_INODELOCK_LOOKUP |
2056                                         MDS_INODELOCK_LAYOUT);
2057                         child_bits |= MDS_INODELOCK_PERM;
2058
2059                         rc = mdt_object_lock(info, child, lhc, child_bits);
2060                         if (rc < 0)
2061                                 RETURN(rc);
2062                 }
2063
2064                 /* Finally, we can get attr for child. */
2065                 if (!mdt_object_exists(child)) {
2066                         LU_OBJECT_DEBUG(D_INFO, info->mti_env,
2067                                         &child->mot_obj,
2068                                         "remote object doesn't exist.");
2069                         mdt_object_unlock(info, child, lhc, 1);
2070                         RETURN(-ENOENT);
2071                 }
2072
2073                 rc = mdt_getattr_internal(info, child, 0);
2074                 if (unlikely(rc != 0)) {
2075                         mdt_object_unlock(info, child, lhc, 1);
2076                         RETURN(rc);
2077                 }
2078
2079                 rc = mdt_pack_secctx_in_reply(info, child);
2080                 if (unlikely(rc)) {
2081                         mdt_object_unlock(info, child, lhc, 1);
2082                         RETURN(rc);
2083                 }
2084
2085                 rc = mdt_pack_encctx_in_reply(info, child);
2086                 if (unlikely(rc))
2087                         mdt_object_unlock(info, child, lhc, 1);
2088                 RETURN(rc);
2089         }
2090
2091         lname = &info->mti_name;
2092         mdt_name_unpack(pill, &RMF_NAME, lname, MNF_FIX_ANON);
2093
2094         if (info->mti_body->mbo_valid & OBD_MD_NAMEHASH) {
2095                 reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
2096                 if (unlikely(reqbody == NULL))
2097                         RETURN(err_serious(-EPROTO));
2098
2099                 *child_fid = reqbody->mbo_fid2;
2100                 if (unlikely(!fid_is_sane(child_fid)))
2101                         RETURN(err_serious(-EINVAL));
2102
2103                 if (lu_fid_eq(mdt_object_fid(parent), child_fid)) {
2104                         mdt_object_get(info->mti_env, parent);
2105                         child = parent;
2106                 } else {
2107                         child = mdt_object_find(info->mti_env, info->mti_mdt,
2108                                                 child_fid);
2109                         if (IS_ERR(child))
2110                                 RETURN(PTR_ERR(child));
2111                 }
2112
2113                 CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
2114                        "ldlm_rep = %p\n",
2115                        PFID(mdt_object_fid(parent)),
2116                        PFID(&reqbody->mbo_fid2), ldlm_rep);
2117         } else if (lu_name_is_valid(lname)) {
2118                 if (mdt_object_remote(parent)) {
2119                         CERROR("%s: parent "DFID" is on remote target\n",
2120                                mdt_obd_name(info->mti_mdt),
2121                                PFID(mdt_object_fid(parent)));
2122                         RETURN(-EPROTO);
2123                 }
2124
2125                 CDEBUG(D_INODE, "getattr with lock for "DFID"/"DNAME", "
2126                        "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)),
2127                        PNAME(lname), ldlm_rep);
2128         } else {
2129                 reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
2130                 if (unlikely(reqbody == NULL))
2131                         RETURN(err_serious(-EPROTO));
2132
2133                 *child_fid = reqbody->mbo_fid2;
2134                 if (unlikely(!fid_is_sane(child_fid)))
2135                         RETURN(err_serious(-EINVAL));
2136
2137                 if (lu_fid_eq(mdt_object_fid(parent), child_fid)) {
2138                         mdt_object_get(info->mti_env, parent);
2139                         child = parent;
2140                 } else {
2141                         child = mdt_object_find(info->mti_env, info->mti_mdt,
2142                                                 child_fid);
2143                         if (IS_ERR(child))
2144                                 RETURN(PTR_ERR(child));
2145                 }
2146
2147                 if (mdt_object_remote(child)) {
2148                         CERROR("%s: child "DFID" is on remote target\n",
2149                                mdt_obd_name(info->mti_mdt),
2150                                PFID(mdt_object_fid(child)));
2151                         GOTO(out_child, rc = -EPROTO);
2152                 }
2153
2154                 /* don't fetch LOOKUP lock if it's remote object */
2155                 rc = mdt_is_remote_object(info, parent, child);
2156                 if (rc < 0)
2157                         GOTO(out_child, rc);
2158                 if (rc)
2159                         child_bits &= ~MDS_INODELOCK_LOOKUP;
2160
2161                 CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
2162                        "ldlm_rep = %p\n",
2163                        PFID(mdt_object_fid(parent)),
2164                        PFID(&reqbody->mbo_fid2), ldlm_rep);
2165         }
2166
2167         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
2168
2169         if (unlikely(!mdt_object_exists(parent)) &&
2170             !(info->mti_body->mbo_valid & OBD_MD_NAMEHASH) &&
2171             lu_name_is_valid(lname)) {
2172                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2173                                 &parent->mot_obj,
2174                                 "Parent doesn't exist!");
2175                 GOTO(out_child, rc = -ESTALE);
2176         }
2177
2178         if (!child && is_resent) {
2179                 lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
2180                 if (lock == NULL) {
2181                         /* Lock is pinned by ldlm_handle_enqueue0() as it is
2182                          * a resend case, however, it could be already destroyed
2183                          * due to client eviction or a raced cancel RPC.
2184                          */
2185                         LDLM_DEBUG_NOLOCK("Invalid lock handle %#llx",
2186                                           lhc->mlh_reg_lh.cookie);
2187                         RETURN(-ESTALE);
2188                 }
2189                 fid_extract_from_res_name(child_fid,
2190                                           &lock->l_resource->lr_name);
2191                 LDLM_LOCK_PUT(lock);
2192                 child = mdt_object_find(info->mti_env, info->mti_mdt,
2193                                         child_fid);
2194                 if (IS_ERR(child))
2195                         RETURN(PTR_ERR(child));
2196         } else if (!(info->mti_body->mbo_valid & OBD_MD_NAMEHASH) &&
2197             lu_name_is_valid(lname)) {
2198                 if (info->mti_body->mbo_valid == OBD_MD_FLID) {
2199                         rc = mdt_raw_lookup(info, parent, lname);
2200
2201                         RETURN(rc);
2202                 }
2203
2204                 /* step 1: lock parent only if parent is a directory */
2205                 if (S_ISDIR(lu_object_attr(&parent->mot_obj))) {
2206                         lhp = &info->mti_lh[MDT_LH_PARENT];
2207                         mdt_lock_pdo_init(lhp, LCK_PR, lname);
2208                         rc = mdt_object_lock(info, parent, lhp,
2209                                              MDS_INODELOCK_UPDATE);
2210                         if (unlikely(rc != 0))
2211                                 RETURN(rc);
2212                 }
2213
2214                 /* step 2: lookup child's fid by name */
2215                 fid_zero(child_fid);
2216                 rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname,
2217                                 child_fid, &info->mti_spec);
2218                 if (rc == -ENOENT)
2219                         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
2220
2221                 if (rc != 0)
2222                         GOTO(unlock_parent, rc);
2223
2224                 child = mdt_object_find(info->mti_env, info->mti_mdt,
2225                                         child_fid);
2226                 if (unlikely(IS_ERR(child)))
2227                         GOTO(unlock_parent, rc = PTR_ERR(child));
2228         }
2229
2230         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
2231
2232         /* step 3: lock child regardless if it is local or remote. */
2233         LASSERT(child);
2234
2235         if (info->mti_body->mbo_valid & OBD_MD_NAMEHASH) {
2236                 /* Here, lname is an encoded hash of on-disk name, and
2237                  * client is doing access without encryption key.
2238                  * So we need to compare name hash with the one in the request.
2239                  */
2240                 if (!find_name_matching_hash(info, lname, parent,
2241                                              child, true)) {
2242                         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
2243                         mdt_clear_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
2244                         GOTO(out_child, rc = -ENOENT);
2245                 }
2246         }
2247
2248         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2);
2249         if (!mdt_object_exists(child)) {
2250                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2251                                 &child->mot_obj,
2252                                 "Object doesn't exist!");
2253                 GOTO(out_child, rc = -ENOENT);
2254         }
2255
2256         rc = mdt_check_resent_lock(info, child, lhc);
2257         if (rc < 0) {
2258                 GOTO(out_child, rc);
2259         } else if (rc > 0) {
2260                 mdt_lock_handle_init(lhc);
2261                 mdt_lock_reg_init(lhc, LCK_PR);
2262
2263                 if (!(child_bits & MDS_INODELOCK_UPDATE) &&
2264                     !mdt_object_remote(child)) {
2265                         struct md_attr *ma = &info->mti_attr;
2266
2267                         ma->ma_valid = 0;
2268                         ma->ma_need = MA_INODE;
2269                         rc = mdt_attr_get_complex(info, child, ma);
2270                         if (unlikely(rc != 0))
2271                                 GOTO(out_child, rc);
2272
2273                         /* If the file has not been changed for some time, we
2274                          * return not only a LOOKUP lock, but also an UPDATE
2275                          * lock and this might save us RPC on later STAT. For
2276                          * directories, it also let negative dentry cache start
2277                          * working for this dir. */
2278                         if (ma->ma_valid & MA_INODE &&
2279                             ma->ma_attr.la_valid & LA_CTIME &&
2280                             info->mti_mdt->mdt_namespace->ns_ctime_age_limit +
2281                             ma->ma_attr.la_ctime < ktime_get_real_seconds())
2282                                 child_bits |= MDS_INODELOCK_UPDATE;
2283                 }
2284
2285                 /* layout lock must be granted in a best-effort way
2286                  * for IT operations */
2287                 LASSERT(!(child_bits & MDS_INODELOCK_LAYOUT));
2288                 if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
2289                     !mdt_object_remote(child) && ldlm_rep != NULL) {
2290                         if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) &&
2291                             exp_connect_layout(info->mti_exp)) {
2292                                 /* try to grant layout lock for regular file. */
2293                                 try_bits = MDS_INODELOCK_LAYOUT;
2294                         }
2295                         /* Acquire DOM lock in advance for data-on-mdt file */
2296                         if (child != parent)
2297                                 try_bits |= MDS_INODELOCK_DOM;
2298                 }
2299
2300                 if (try_bits != 0) {
2301                         /* try layout lock, it may fail to be granted due to
2302                          * contention at LOOKUP or UPDATE */
2303                         rc = mdt_object_lock_try(info, child, lhc, &child_bits,
2304                                                  try_bits, false);
2305                         if (child_bits & MDS_INODELOCK_LAYOUT)
2306                                 ma_need |= MA_LOV;
2307                 } else {
2308                         /* Do not enqueue the UPDATE lock from MDT(cross-MDT),
2309                          * client will enqueue the lock to the remote MDT */
2310                         if (mdt_object_remote(child))
2311                                 child_bits &= ~MDS_INODELOCK_UPDATE;
2312                         rc = mdt_object_lock(info, child, lhc, child_bits);
2313                 }
2314                 if (unlikely(rc != 0))
2315                         GOTO(out_child, rc);
2316         }
2317
2318         /* finally, we can get attr for child. */
2319         rc = mdt_getattr_internal(info, child, ma_need);
2320         if (unlikely(rc != 0)) {
2321                 if (!is_resent)
2322                         mdt_object_unlock(info, child, lhc, 1);
2323                 GOTO(out_child, rc);
2324         }
2325
2326         rc = mdt_pack_secctx_in_reply(info, child);
2327         if (unlikely(rc)) {
2328                 if (!is_resent)
2329                         mdt_object_unlock(info, child, lhc, 1);
2330                 GOTO(out_child, rc);
2331         }
2332
2333         rc = mdt_pack_encctx_in_reply(info, child);
2334         if (unlikely(rc)) {
2335                 if (!is_resent)
2336                         mdt_object_unlock(info, child, lhc, 1);
2337                 GOTO(out_child, rc);
2338         }
2339
2340         lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
2341         if (lock) {
2342                 /* Debugging code. */
2343                 LDLM_DEBUG(lock, "Returning lock to client");
2344                 LASSERTF(fid_res_name_eq(mdt_object_fid(child),
2345                                          &lock->l_resource->lr_name),
2346                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
2347                          PLDLMRES(lock->l_resource),
2348                          PFID(mdt_object_fid(child)));
2349
2350                 if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND))) {
2351                         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
2352                                 OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_ENQ_RESEND,
2353                                                  req->rq_deadline -
2354                                                  req->rq_arrival_time.tv_sec +
2355                                                  cfs_fail_val ?: 3);
2356                         /* Put the lock to the waiting list and force the cancel */
2357                         ldlm_set_ast_sent(lock);
2358                 }
2359
2360                 if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
2361                     !mdt_object_remote(child) && child != parent) {
2362                         mdt_object_put(info->mti_env, child);
2363                         rc = mdt_pack_size2body(info, child_fid,
2364                                                 &lhc->mlh_reg_lh);
2365                         if (rc != 0 && child_bits & MDS_INODELOCK_DOM) {
2366                                 /* DOM lock was taken in advance but this is
2367                                  * not DoM file. Drop the lock.
2368                                  */
2369                                 lock_res_and_lock(lock);
2370                                 ldlm_inodebits_drop(lock, MDS_INODELOCK_DOM);
2371                                 unlock_res_and_lock(lock);
2372                         }
2373                         LDLM_LOCK_PUT(lock);
2374                         GOTO(unlock_parent, rc = 0);
2375                 }
2376                 LDLM_LOCK_PUT(lock);
2377         }
2378
2379         EXIT;
2380 out_child:
2381         if (child)
2382                 mdt_object_put(info->mti_env, child);
2383 unlock_parent:
2384         if (lhp)
2385                 mdt_object_unlock(info, parent, lhp, 1);
2386         return rc;
2387 }
2388
2389 /* normal handler: should release the child lock */
2390 static int mdt_getattr_name(struct tgt_session_info *tsi)
2391 {
2392         struct mdt_thread_info  *info = tsi2mdt_info(tsi);
2393         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
2394         struct mdt_body *reqbody;
2395         struct mdt_body *repbody;
2396         int rc, rc2;
2397
2398         ENTRY;
2399
2400         reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
2401         LASSERT(reqbody != NULL);
2402         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2403         LASSERT(repbody != NULL);
2404
2405         info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
2406         repbody->mbo_eadatasize = 0;
2407         repbody->mbo_aclsize = 0;
2408
2409         rc = mdt_init_ucred(info, reqbody);
2410         if (unlikely(rc))
2411                 GOTO(out_shrink, rc);
2412
2413         rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
2414         if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
2415                 ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode);
2416                 lhc->mlh_reg_lh.cookie = 0;
2417         }
2418         mdt_exit_ucred(info);
2419         EXIT;
2420 out_shrink:
2421         mdt_client_compatibility(info);
2422         rc2 = mdt_fix_reply(info);
2423         if (rc == 0)
2424                 rc = rc2;
2425         mdt_thread_info_fini(info);
2426         return rc;
2427 }
2428
2429 static int mdt_rmfid_unlink(struct mdt_thread_info *info,
2430                             const struct lu_fid *pfid,
2431                             const struct lu_name *name,
2432                             struct mdt_object *obj, s64 ctime)
2433 {
2434         struct lu_fid *child_fid = &info->mti_tmp_fid1;
2435         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
2436         struct mdt_device *mdt = info->mti_mdt;
2437         struct md_attr *ma = &info->mti_attr;
2438         struct mdt_lock_handle *parent_lh;
2439         struct mdt_lock_handle *child_lh;
2440         struct mdt_object *pobj;
2441         bool cos_incompat = false;
2442         int rc;
2443         ENTRY;
2444
2445         pobj = mdt_object_find(info->mti_env, mdt, pfid);
2446         if (IS_ERR(pobj))
2447                 GOTO(out, rc = PTR_ERR(pobj));
2448
2449         parent_lh = &info->mti_lh[MDT_LH_PARENT];
2450         mdt_lock_pdo_init(parent_lh, LCK_PW, name);
2451         rc = mdt_object_lock(info, pobj, parent_lh, MDS_INODELOCK_UPDATE);
2452         if (rc != 0)
2453                 GOTO(put_parent, rc);
2454
2455         if (mdt_object_remote(pobj))
2456                 cos_incompat = true;
2457
2458         rc = mdo_lookup(info->mti_env, mdt_object_child(pobj),
2459                         name, child_fid, &info->mti_spec);
2460         if (rc != 0)
2461                 GOTO(unlock_parent, rc);
2462
2463         if (!lu_fid_eq(child_fid, mdt_object_fid(obj)))
2464                 GOTO(unlock_parent, rc = -EREMCHG);
2465
2466         child_lh = &info->mti_lh[MDT_LH_CHILD];
2467         mdt_lock_reg_init(child_lh, LCK_EX);
2468         rc = mdt_reint_striped_lock(info, obj, child_lh,
2469                                     MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE,
2470                                     einfo, cos_incompat);
2471         if (rc != 0)
2472                 GOTO(unlock_parent, rc);
2473
2474         if (atomic_read(&obj->mot_open_count)) {
2475                 CDEBUG(D_OTHER, "object "DFID" open, skip\n",
2476                        PFID(mdt_object_fid(obj)));
2477                 GOTO(unlock_child, rc = -EBUSY);
2478         }
2479
2480         ma->ma_need = 0;
2481         ma->ma_valid = MA_INODE;
2482         ma->ma_attr.la_valid = LA_CTIME;
2483         ma->ma_attr.la_ctime = ctime;
2484
2485         mutex_lock(&obj->mot_lov_mutex);
2486
2487         rc = mdo_unlink(info->mti_env, mdt_object_child(pobj),
2488                         mdt_object_child(obj), name, ma, 0);
2489
2490         mutex_unlock(&obj->mot_lov_mutex);
2491
2492 unlock_child:
2493         mdt_reint_striped_unlock(info, obj, child_lh, einfo, 1);
2494 unlock_parent:
2495         mdt_object_unlock(info, pobj, parent_lh, 1);
2496 put_parent:
2497         mdt_object_put(info->mti_env, pobj);
2498 out:
2499         RETURN(rc);
2500 }
2501
2502 static int mdt_rmfid_check_permission(struct mdt_thread_info *info,
2503                                         struct mdt_object *obj)
2504 {
2505         struct lu_ucred *uc = lu_ucred(info->mti_env);
2506         struct md_attr *ma = &info->mti_attr;
2507         struct lu_attr *la = &ma->ma_attr;
2508         int rc = 0;
2509         ENTRY;
2510
2511         ma->ma_need = MA_INODE;
2512         rc = mo_attr_get(info->mti_env, mdt_object_child(obj), ma);
2513         if (rc)
2514                 GOTO(out, rc);
2515
2516         if (la->la_flags & LUSTRE_IMMUTABLE_FL)
2517                         rc = -EACCES;
2518
2519         if (cap_raised(uc->uc_cap, CAP_DAC_OVERRIDE))
2520                 RETURN(0);
2521         if (uc->uc_fsuid == la->la_uid) {
2522                 if ((la->la_mode & S_IWUSR) == 0)
2523                         rc = -EACCES;
2524         } else if (uc->uc_fsgid == la->la_gid) {
2525                 if ((la->la_mode & S_IWGRP) == 0)
2526                         rc = -EACCES;
2527         } else if ((la->la_mode & S_IWOTH) == 0) {
2528                         rc = -EACCES;
2529         }
2530
2531 out:
2532         RETURN(rc);
2533 }
2534
2535 static int mdt_rmfid_one(struct mdt_thread_info *info, struct lu_fid *fid,
2536                          s64 ctime)
2537 {
2538         struct mdt_device *mdt = info->mti_mdt;
2539         struct mdt_object *obj = NULL;
2540         struct linkea_data ldata = { NULL };
2541         struct lu_buf *buf = &info->mti_big_buf;
2542         struct lu_name *name = &info->mti_name;
2543         struct lu_fid *pfid = &info->mti_tmp_fid1;
2544         struct link_ea_header *leh;
2545         struct link_ea_entry *lee;
2546         int reclen, count, rc = 0;
2547         ENTRY;
2548
2549         if (!fid_is_sane(fid))
2550                 GOTO(out, rc = -EINVAL);
2551
2552         if (!fid_is_namespace_visible(fid))
2553                 GOTO(out, rc = -EINVAL);
2554
2555         obj = mdt_object_find(info->mti_env, mdt, fid);
2556         if (IS_ERR(obj))
2557                 GOTO(out, rc = PTR_ERR(obj));
2558
2559         if (mdt_object_remote(obj))
2560                 GOTO(out, rc = -EREMOTE);
2561         if (!mdt_object_exists(obj) || lu_object_is_dying(&obj->mot_header))
2562                 GOTO(out, rc = -ENOENT);
2563
2564         rc = mdt_rmfid_check_permission(info, obj);
2565         if (rc)
2566                 GOTO(out, rc);
2567
2568         /* take LinkEA */
2569         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
2570         if (!buf->lb_buf)
2571                 GOTO(out, rc = -ENOMEM);
2572
2573         ldata.ld_buf = buf;
2574         rc = mdt_links_read(info, obj, &ldata);
2575         if (rc)
2576                 GOTO(out, rc);
2577
2578         leh = buf->lb_buf;
2579         lee = (struct link_ea_entry *)(leh + 1);
2580         for (count = 0; count < leh->leh_reccount; count++) {
2581                 /* remove every hardlink */
2582                 linkea_entry_unpack(lee, &reclen, name, pfid);
2583                 lee = (struct link_ea_entry *) ((char *)lee + reclen);
2584                 rc = mdt_rmfid_unlink(info, pfid, name, obj, ctime);
2585                 if (rc)
2586                         break;
2587         }
2588
2589 out:
2590         if (obj && !IS_ERR(obj))
2591                 mdt_object_put(info->mti_env, obj);
2592         if (info->mti_big_buf.lb_buf)
2593                 lu_buf_free(&info->mti_big_buf);
2594
2595         RETURN(rc);
2596 }
2597
2598 static int mdt_rmfid(struct tgt_session_info *tsi)
2599 {
2600         struct mdt_thread_info *mti = tsi2mdt_info(tsi);
2601         struct mdt_body *reqbody;
2602         struct lu_fid *fids, *rfids;
2603         int bufsize, rc;
2604         __u32 *rcs;
2605         int i, nr;
2606         ENTRY;
2607
2608         reqbody = req_capsule_client_get(tsi->tsi_pill, &RMF_MDT_BODY);
2609         if (reqbody == NULL)
2610                 RETURN(-EPROTO);
2611         bufsize = req_capsule_get_size(tsi->tsi_pill, &RMF_FID_ARRAY,
2612                                        RCL_CLIENT);
2613         nr = bufsize / sizeof(struct lu_fid);
2614         if (nr * sizeof(struct lu_fid) != bufsize)
2615                 RETURN(-EINVAL);
2616         req_capsule_set_size(tsi->tsi_pill, &RMF_RCS,
2617                              RCL_SERVER, nr * sizeof(__u32));
2618         req_capsule_set_size(tsi->tsi_pill, &RMF_FID_ARRAY,
2619                              RCL_SERVER, nr * sizeof(struct lu_fid));
2620         rc = req_capsule_server_pack(tsi->tsi_pill);
2621         if (rc)
2622                 GOTO(out, rc = err_serious(rc));
2623         fids = req_capsule_client_get(tsi->tsi_pill, &RMF_FID_ARRAY);
2624         if (fids == NULL)
2625                 RETURN(-EPROTO);
2626         rcs = req_capsule_server_get(tsi->tsi_pill, &RMF_RCS);
2627         LASSERT(rcs);
2628         rfids = req_capsule_server_get(tsi->tsi_pill, &RMF_FID_ARRAY);
2629         LASSERT(rfids);
2630
2631         mdt_init_ucred(mti, reqbody);
2632         for (i = 0; i < nr; i++) {
2633                 rfids[i] = fids[i];
2634                 rcs[i] = mdt_rmfid_one(mti, fids + i, reqbody->mbo_ctime);
2635         }
2636         mdt_exit_ucred(mti);
2637
2638 out:
2639         RETURN(rc);
2640 }
2641
2642 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2643                          void *karg, void __user *uarg);
2644
2645 int mdt_io_set_info(struct tgt_session_info *tsi)
2646 {
2647         struct ptlrpc_request   *req = tgt_ses_req(tsi);
2648         struct ost_body         *body = NULL, *repbody;
2649         void                    *key, *val = NULL;
2650         int                      keylen, vallen, rc = 0;
2651         bool                     is_grant_shrink;
2652
2653         ENTRY;
2654
2655         key = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_KEY);
2656         if (key == NULL) {
2657                 DEBUG_REQ(D_HA, req, "no set_info key");
2658                 RETURN(err_serious(-EFAULT));
2659         }
2660         keylen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_KEY,
2661                                       RCL_CLIENT);
2662
2663         val = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_VAL);
2664         if (val == NULL) {
2665                 DEBUG_REQ(D_HA, req, "no set_info val");
2666                 RETURN(err_serious(-EFAULT));
2667         }
2668         vallen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_VAL,
2669                                       RCL_CLIENT);
2670
2671         is_grant_shrink = KEY_IS(KEY_GRANT_SHRINK);
2672         if (is_grant_shrink)
2673                 /* In this case the value is actually an RMF_OST_BODY, so we
2674                  * transmutate the type of this PTLRPC */
2675                 req_capsule_extend(tsi->tsi_pill, &RQF_OST_SET_GRANT_INFO);
2676
2677         rc = req_capsule_server_pack(tsi->tsi_pill);
2678         if (rc < 0)
2679                 RETURN(rc);
2680
2681         if (is_grant_shrink) {
2682                 body = req_capsule_client_get(tsi->tsi_pill, &RMF_OST_BODY);
2683
2684                 repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
2685                 *repbody = *body;
2686
2687                 /** handle grant shrink, similar to a read request */
2688                 tgt_grant_prepare_read(tsi->tsi_env, tsi->tsi_exp,
2689                                        &repbody->oa);
2690         } else {
2691                 CERROR("%s: Unsupported key %s\n",
2692                        tgt_name(tsi->tsi_tgt), (char *)key);
2693                 rc = -EOPNOTSUPP;
2694         }
2695
2696         RETURN(rc);
2697 }
2698
2699
2700 static int mdt_set_info(struct tgt_session_info *tsi)
2701 {
2702         struct ptlrpc_request   *req = tgt_ses_req(tsi);
2703         char                    *key;
2704         void                    *val;
2705         int                      keylen, vallen, rc = 0;
2706
2707         ENTRY;
2708
2709         key = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_KEY);
2710         if (key == NULL) {
2711                 DEBUG_REQ(D_HA, req, "no set_info key");
2712                 RETURN(err_serious(-EFAULT));
2713         }
2714
2715         keylen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_KEY,
2716                                       RCL_CLIENT);
2717
2718         val = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_VAL);
2719         if (val == NULL) {
2720                 DEBUG_REQ(D_HA, req, "no set_info val");
2721                 RETURN(err_serious(-EFAULT));
2722         }
2723
2724         vallen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_VAL,
2725                                       RCL_CLIENT);
2726
2727         /* Swab any part of val you need to here */
2728         if (KEY_IS(KEY_READ_ONLY)) {
2729                 spin_lock(&req->rq_export->exp_lock);
2730                 if (*(__u32 *)val)
2731                         *exp_connect_flags_ptr(req->rq_export) |=
2732                                 OBD_CONNECT_RDONLY;
2733                 else
2734                         *exp_connect_flags_ptr(req->rq_export) &=
2735                                 ~OBD_CONNECT_RDONLY;
2736                 spin_unlock(&req->rq_export->exp_lock);
2737         } else if (KEY_IS(KEY_CHANGELOG_CLEAR)) {
2738                 struct changelog_setinfo *cs = val;
2739
2740                 if (vallen != sizeof(*cs)) {
2741                         CERROR("%s: bad changelog_clear setinfo size %d\n",
2742                                tgt_name(tsi->tsi_tgt), vallen);
2743                         RETURN(-EINVAL);
2744                 }
2745                 if (req_capsule_req_need_swab(&req->rq_pill)) {
2746                         __swab64s(&cs->cs_recno);
2747                         __swab32s(&cs->cs_id);
2748                 }
2749
2750                 if (!mdt_is_rootadmin(tsi2mdt_info(tsi)))
2751                         RETURN(-EACCES);
2752                 rc = mdt_iocontrol(OBD_IOC_CHANGELOG_CLEAR, req->rq_export,
2753                                    vallen, val, NULL);
2754         } else if (KEY_IS(KEY_EVICT_BY_NID)) {
2755                 if (vallen > 0)
2756                         obd_export_evict_by_nid(req->rq_export->exp_obd, val);
2757         } else {
2758                 RETURN(-EINVAL);
2759         }
2760         RETURN(rc);
2761 }
2762
2763 static int mdt_readpage(struct tgt_session_info *tsi)
2764 {
2765         struct mdt_thread_info  *info = mdt_th_info(tsi->tsi_env);
2766         struct mdt_object       *object = mdt_obj(tsi->tsi_corpus);
2767         struct lu_rdpg          *rdpg = &info->mti_u.rdpg.mti_rdpg;
2768         const struct mdt_body   *reqbody = tsi->tsi_mdt_body;
2769         struct mdt_body         *repbody;
2770         int                      rc;
2771         int                      i;
2772
2773         ENTRY;
2774
2775         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
2776                 RETURN(err_serious(-ENOMEM));
2777
2778         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_MDT_BODY);
2779         if (repbody == NULL || reqbody == NULL)
2780                 RETURN(err_serious(-EFAULT));
2781
2782         /*
2783          * prepare @rdpg before calling lower layers and transfer itself. Here
2784          * reqbody->size contains offset of where to start to read and
2785          * reqbody->nlink contains number bytes to read.
2786          */
2787         rdpg->rp_hash = reqbody->mbo_size;
2788         if (rdpg->rp_hash != reqbody->mbo_size) {
2789                 CERROR("Invalid hash: %#llx != %#llx\n",
2790                        rdpg->rp_hash, reqbody->mbo_size);
2791                 RETURN(-EFAULT);
2792         }
2793
2794         rdpg->rp_attrs = reqbody->mbo_mode;
2795         if (exp_connect_flags(tsi->tsi_exp) & OBD_CONNECT_64BITHASH)
2796                 rdpg->rp_attrs |= LUDA_64BITHASH;
2797         rdpg->rp_count  = min_t(unsigned int, reqbody->mbo_nlink,
2798                                 exp_max_brw_size(tsi->tsi_exp));
2799         rdpg->rp_npages = (rdpg->rp_count + PAGE_SIZE - 1) >>
2800                           PAGE_SHIFT;
2801         OBD_ALLOC_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages);
2802         if (rdpg->rp_pages == NULL)
2803                 RETURN(-ENOMEM);
2804
2805         for (i = 0; i < rdpg->rp_npages; ++i) {
2806                 rdpg->rp_pages[i] = alloc_page(GFP_NOFS);
2807                 if (rdpg->rp_pages[i] == NULL)
2808                         GOTO(free_rdpg, rc = -ENOMEM);
2809         }
2810
2811         /* call lower layers to fill allocated pages with directory data */
2812         rc = mo_readpage(tsi->tsi_env, mdt_object_child(object), rdpg);
2813         if (rc < 0)
2814                 GOTO(free_rdpg, rc);
2815
2816         /* send pages to client */
2817         rc = tgt_sendpage(tsi, rdpg, rc);
2818
2819         EXIT;
2820 free_rdpg:
2821
2822         for (i = 0; i < rdpg->rp_npages; i++)
2823                 if (rdpg->rp_pages[i] != NULL)
2824                         __free_page(rdpg->rp_pages[i]);
2825         OBD_FREE_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages);
2826
2827         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
2828                 RETURN(0);
2829
2830         return rc;
2831 }
2832
2833 static int mdt_fix_attr_ucred(struct mdt_thread_info *info, __u32 op)
2834 {
2835         struct lu_ucred *uc = mdt_ucred_check(info);
2836         struct lu_attr *attr = &info->mti_attr.ma_attr;
2837
2838         if (uc == NULL)
2839                 return -EINVAL;
2840
2841         if (op != REINT_SETATTR) {
2842                 if ((attr->la_valid & LA_UID) && (attr->la_uid != -1))
2843                         attr->la_uid = uc->uc_fsuid;
2844                 /* for S_ISGID, inherit gid from his parent, such work will be
2845                  * done in cmm/mdd layer, here set all cases as uc->uc_fsgid. */
2846                 if ((attr->la_valid & LA_GID) && (attr->la_gid != -1))
2847                         attr->la_gid = uc->uc_fsgid;
2848         }
2849
2850         return 0;
2851 }
2852
2853 static inline bool mdt_is_readonly_open(struct mdt_thread_info *info, __u32 op)
2854 {
2855         return op == REINT_OPEN &&
2856              !(info->mti_spec.sp_cr_flags & (MDS_FMODE_WRITE | MDS_OPEN_CREAT));
2857 }
2858
2859 static void mdt_preset_secctx_size(struct mdt_thread_info *info)
2860 {
2861         struct req_capsule *pill = info->mti_pill;
2862
2863         if (req_capsule_has_field(pill, &RMF_FILE_SECCTX,
2864                                   RCL_SERVER) &&
2865             req_capsule_has_field(pill, &RMF_FILE_SECCTX_NAME,
2866                                   RCL_CLIENT)) {
2867                 if (req_capsule_get_size(pill, &RMF_FILE_SECCTX_NAME,
2868                                          RCL_CLIENT) != 0)
2869                         /* pre-set size in server part with max size */
2870                         req_capsule_set_size(pill, &RMF_FILE_SECCTX,
2871                                              RCL_SERVER,
2872                                              OBD_MAX_DEFAULT_EA_SIZE);
2873                 else
2874                         req_capsule_set_size(pill, &RMF_FILE_SECCTX,
2875                                              RCL_SERVER, 0);
2876         }
2877 }
2878
2879 static void mdt_preset_encctx_size(struct mdt_thread_info *info)
2880 {
2881         struct req_capsule *pill = info->mti_pill;
2882
2883         if (req_capsule_has_field(pill, &RMF_FILE_ENCCTX,
2884                                   RCL_SERVER))
2885                 /* pre-set size in server part with max size */
2886                 req_capsule_set_size(pill, &RMF_FILE_ENCCTX,
2887                                      RCL_SERVER,
2888                                      info->mti_mdt->mdt_max_mdsize);
2889 }
2890
2891 static int mdt_reint_internal(struct mdt_thread_info *info,
2892                               struct mdt_lock_handle *lhc,
2893                               __u32 op)
2894 {
2895         struct req_capsule      *pill = info->mti_pill;
2896         struct mdt_body         *repbody;
2897         int                      rc = 0, rc2;
2898
2899         ENTRY;
2900
2901         rc = mdt_reint_unpack(info, op);
2902         if (rc != 0) {
2903                 CERROR("Can't unpack reint, rc %d\n", rc);
2904                 RETURN(err_serious(rc));
2905         }
2906
2907
2908         /* check if the file system is set to readonly. O_RDONLY open
2909          * is still allowed even the file system is set to readonly mode */
2910         if (mdt_rdonly(info->mti_exp) && !mdt_is_readonly_open(info, op))
2911                 RETURN(err_serious(-EROFS));
2912
2913         /* for replay (no_create) lmm is not needed, client has it already */
2914         if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
2915                 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
2916                                      DEF_REP_MD_SIZE);
2917
2918         /* llog cookies are always 0, the field is kept for compatibility */
2919         if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
2920                 req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, 0);
2921
2922         /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
2923          * by default. If the target object has more ACL entries, then
2924          * enlarge the buffer when necessary. */
2925         if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER))
2926                 req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
2927                                      LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
2928
2929         mdt_preset_secctx_size(info);
2930         mdt_preset_encctx_size(info);
2931
2932         rc = req_capsule_server_pack(pill);
2933         if (rc != 0) {
2934                 CERROR("Can't pack response, rc %d\n", rc);
2935                 RETURN(err_serious(rc));
2936         }
2937
2938         if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_SERVER)) {
2939                 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
2940                 LASSERT(repbody);
2941                 repbody->mbo_eadatasize = 0;
2942                 repbody->mbo_aclsize = 0;
2943         }
2944
2945         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_REINT_DELAY, 10);
2946
2947         /* for replay no cookkie / lmm need, because client have this already */
2948         if (info->mti_spec.no_create)
2949                 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
2950                         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, 0);
2951
2952         rc = mdt_init_ucred_reint(info);
2953         if (rc)
2954                 GOTO(out_shrink, rc);
2955
2956         rc = mdt_fix_attr_ucred(info, op);
2957         if (rc != 0)
2958                 GOTO(out_ucred, rc = err_serious(rc));
2959
2960         rc = mdt_check_resent(info, mdt_reconstruct, lhc);
2961         if (rc < 0) {
2962                 GOTO(out_ucred, rc);
2963         } else if (rc == 1) {
2964                 DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt");
2965                 rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg);
2966                 GOTO(out_ucred, rc);
2967         }
2968         rc = mdt_reint_rec(info, lhc);
2969         EXIT;
2970 out_ucred:
2971         mdt_exit_ucred(info);
2972 out_shrink:
2973         mdt_client_compatibility(info);
2974
2975         rc2 = mdt_fix_reply(info);
2976         if (rc == 0)
2977                 rc = rc2;
2978
2979         /*
2980          * Data-on-MDT optimization - read data along with OPEN and return it
2981          * in reply when possible.
2982          */
2983         if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req))
2984                 rc = mdt_dom_read_on_open(info, info->mti_mdt,
2985                                           &lhc->mlh_reg_lh);
2986
2987         return rc;
2988 }
2989
2990 static long mdt_reint_opcode(struct ptlrpc_request *req,
2991                              const struct req_format **fmt)
2992 {
2993         struct mdt_device       *mdt;
2994         struct mdt_rec_reint    *rec;
2995         long                     opc;
2996
2997         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
2998         if (rec != NULL) {
2999                 opc = rec->rr_opcode;
3000                 DEBUG_REQ(D_INODE, req, "reint opt = %ld", opc);
3001                 if (opc < REINT_MAX && fmt[opc] != NULL)
3002                         req_capsule_extend(&req->rq_pill, fmt[opc]);
3003                 else {
3004                         mdt = mdt_exp2dev(req->rq_export);
3005                         CERROR("%s: Unsupported opcode '%ld' from client '%s':"
3006                                " rc = %d\n", req->rq_export->exp_obd->obd_name,
3007                                opc, mdt->mdt_ldlm_client->cli_name, -EFAULT);
3008                         opc = err_serious(-EFAULT);
3009                 }
3010         } else {
3011                 opc = err_serious(-EFAULT);
3012         }
3013         return opc;
3014 }
3015
3016 static int mdt_reint(struct tgt_session_info *tsi)
3017 {
3018         long opc;
3019         int  rc;
3020         static const struct req_format *reint_fmts[REINT_MAX] = {
3021                 [REINT_SETATTR]  = &RQF_MDS_REINT_SETATTR,
3022                 [REINT_CREATE]   = &RQF_MDS_REINT_CREATE,
3023                 [REINT_LINK]     = &RQF_MDS_REINT_LINK,
3024                 [REINT_UNLINK]   = &RQF_MDS_REINT_UNLINK,
3025                 [REINT_RENAME]   = &RQF_MDS_REINT_RENAME,
3026                 [REINT_OPEN]     = &RQF_MDS_REINT_OPEN,
3027                 [REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR,
3028                 [REINT_RMENTRY]  = &RQF_MDS_REINT_UNLINK,
3029                 [REINT_MIGRATE]  = &RQF_MDS_REINT_MIGRATE,
3030                 [REINT_RESYNC]   = &RQF_MDS_REINT_RESYNC,
3031         };
3032
3033         ENTRY;
3034
3035         opc = mdt_reint_opcode(tgt_ses_req(tsi), reint_fmts);
3036         if (opc >= 0) {
3037                 struct mdt_thread_info *info = tsi2mdt_info(tsi);
3038                 /*
3039                  * No lock possible here from client to pass it to reint code
3040                  * path.
3041                  */
3042                 rc = mdt_reint_internal(info, NULL, opc);
3043                 mdt_thread_info_fini(info);
3044         } else {
3045                 rc = opc;
3046         }
3047
3048         tsi->tsi_reply_fail_id = OBD_FAIL_MDS_REINT_NET_REP;
3049         RETURN(rc);
3050 }
3051
3052 /* this should sync the whole device */
3053 int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt)
3054 {
3055         struct dt_device *dt = mdt->mdt_bottom;
3056         int rc;
3057         ENTRY;
3058
3059         rc = dt->dd_ops->dt_sync(env, dt);
3060         RETURN(rc);
3061 }
3062
3063 /* this should sync this object */
3064 static int mdt_object_sync(const struct lu_env *env, struct obd_export *exp,
3065                            struct mdt_object *mo)
3066 {
3067         int rc = 0;
3068
3069         ENTRY;
3070
3071         if (!mdt_object_exists(mo)) {
3072                 CWARN("%s: non existing object "DFID": rc = %d\n",
3073                       exp->exp_obd->obd_name, PFID(mdt_object_fid(mo)),
3074                       -ESTALE);
3075                 RETURN(-ESTALE);
3076         }
3077
3078         if (S_ISREG(lu_object_attr(&mo->mot_obj))) {
3079                 struct lu_target *tgt = tgt_ses_info(env)->tsi_tgt;
3080                 dt_obj_version_t version;
3081
3082                 version = dt_version_get(env, mdt_obj2dt(mo));
3083                 if (version > tgt->lut_obd->obd_last_committed)
3084                         rc = mo_object_sync(env, mdt_object_child(mo));
3085         } else {
3086                 rc = mo_object_sync(env, mdt_object_child(mo));
3087         }
3088
3089         RETURN(rc);
3090 }
3091
3092 static int mdt_sync(struct tgt_session_info *tsi)
3093 {
3094         struct ptlrpc_request   *req = tgt_ses_req(tsi);
3095         struct req_capsule      *pill = tsi->tsi_pill;
3096         struct mdt_body         *body;
3097         ktime_t                  kstart = ktime_get();
3098         int                      rc;
3099
3100         ENTRY;
3101
3102         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
3103                 RETURN(err_serious(-ENOMEM));
3104
3105         if (fid_seq(&tsi->tsi_mdt_body->mbo_fid1) == 0) {
3106                 rc = mdt_device_sync(tsi->tsi_env, mdt_exp2dev(tsi->tsi_exp));
3107         } else {
3108                 struct mdt_thread_info *info = tsi2mdt_info(tsi);
3109
3110                 if (unlikely(info->mti_object == NULL))
3111                         RETURN(-EPROTO);
3112
3113                 /* sync an object */
3114                 rc = mdt_object_sync(tsi->tsi_env, tsi->tsi_exp,
3115                                      info->mti_object);
3116                 if (rc == 0) {
3117                         const struct lu_fid *fid;
3118                         struct lu_attr *la = &info->mti_attr.ma_attr;
3119
3120                         info->mti_attr.ma_need = MA_INODE;
3121                         info->mti_attr.ma_valid = 0;
3122                         rc = mdt_attr_get_complex(info, info->mti_object,
3123                                                   &info->mti_attr);
3124                         if (rc == 0) {
3125                                 body = req_capsule_server_get(pill,
3126                                                               &RMF_MDT_BODY);
3127                                 fid = mdt_object_fid(info->mti_object);
3128                                 mdt_pack_attr2body(info, body, la, fid);
3129                         }
3130                 }
3131                 mdt_thread_info_fini(info);
3132         }