Whamcloud - gitweb
7f54b59efbdd181caca55d2ec34ae84ce44d3a3d
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2010, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mdt/mdt_handler.c
32  *
33  * Lustre Metadata Target (mdt) request handler
34  *
35  * Author: Peter Braam <braam@clusterfs.com>
36  * Author: Andreas Dilger <adilger@clusterfs.com>
37  * Author: Phil Schwan <phil@clusterfs.com>
38  * Author: Mike Shaver <shaver@clusterfs.com>
39  * Author: Nikita Danilov <nikita@clusterfs.com>
40  * Author: Huang Hua <huanghua@clusterfs.com>
41  * Author: Yury Umanets <umka@clusterfs.com>
42  */
43
44 #define DEBUG_SUBSYSTEM S_MDS
45
46 #include <linux/module.h>
47 #include <linux/pagemap.h>
48
49 #include <dt_object.h>
50 #include <lustre_acl.h>
51 #include <lustre_export.h>
52 #include <uapi/linux/lustre/lustre_ioctl.h>
53 #include <lustre_lfsck.h>
54 #include <lustre_log.h>
55 #include <lustre_nodemap.h>
56 #include <lustre_mds.h>
57 #include <uapi/linux/lustre/lustre_param.h>
58 #include <lustre_quota.h>
59 #include <lustre_swab.h>
60 #include <lustre_lmv.h>
61 #include <obd.h>
62 #include <obd_support.h>
63 #include <lustre_barrier.h>
64 #include <obd_cksum.h>
65 #include <llog_swab.h>
66 #include <lustre_crypto.h>
67
68 #include "mdt_internal.h"
69
70 static unsigned int max_mod_rpcs_per_client = 8;
71 module_param(max_mod_rpcs_per_client, uint, 0644);
72 MODULE_PARM_DESC(max_mod_rpcs_per_client, "maximum number of modify RPCs in flight allowed per client");
73
74 mdl_mode_t mdt_mdl_lock_modes[] = {
75         [LCK_MINMODE] = MDL_MINMODE,
76         [LCK_EX]      = MDL_EX,
77         [LCK_PW]      = MDL_PW,
78         [LCK_PR]      = MDL_PR,
79         [LCK_CW]      = MDL_CW,
80         [LCK_CR]      = MDL_CR,
81         [LCK_NL]      = MDL_NL,
82         [LCK_GROUP]   = MDL_GROUP
83 };
84
85 enum ldlm_mode mdt_dlm_lock_modes[] = {
86         [MDL_MINMODE]   = LCK_MINMODE,
87         [MDL_EX]        = LCK_EX,
88         [MDL_PW]        = LCK_PW,
89         [MDL_PR]        = LCK_PR,
90         [MDL_CW]        = LCK_CW,
91         [MDL_CR]        = LCK_CR,
92         [MDL_NL]        = LCK_NL,
93         [MDL_GROUP]     = LCK_GROUP
94 };
95
96 static struct mdt_device *mdt_dev(struct lu_device *d);
97
98 static const struct lu_object_operations mdt_obj_ops;
99
100 /* Slab for MDT object allocation */
101 static struct kmem_cache *mdt_object_kmem;
102
103 /* For HSM restore handles */
104 struct kmem_cache *mdt_hsm_cdt_kmem;
105
106 /* For HSM request handles */
107 struct kmem_cache *mdt_hsm_car_kmem;
108
109 static struct lu_kmem_descr mdt_caches[] = {
110         {
111                 .ckd_cache = &mdt_object_kmem,
112                 .ckd_name  = "mdt_obj",
113                 .ckd_size  = sizeof(struct mdt_object)
114         },
115         {
116                 .ckd_cache      = &mdt_hsm_cdt_kmem,
117                 .ckd_name       = "mdt_cdt_restore_handle",
118                 .ckd_size       = sizeof(struct cdt_restore_handle)
119         },
120         {
121                 .ckd_cache      = &mdt_hsm_car_kmem,
122                 .ckd_name       = "mdt_cdt_agent_req",
123                 .ckd_size       = sizeof(struct cdt_agent_req)
124         },
125         {
126                 .ckd_cache = NULL
127         }
128 };
129
130 __u64 mdt_get_disposition(struct ldlm_reply *rep, __u64 op_flag)
131 {
132         if (!rep)
133                 return 0;
134         return rep->lock_policy_res1 & op_flag;
135 }
136
137 void mdt_clear_disposition(struct mdt_thread_info *info,
138                            struct ldlm_reply *rep, __u64 op_flag)
139 {
140         if (info) {
141                 info->mti_opdata &= ~op_flag;
142                 tgt_opdata_clear(info->mti_env, op_flag);
143         }
144         if (rep)
145                 rep->lock_policy_res1 &= ~op_flag;
146 }
147
148 void mdt_set_disposition(struct mdt_thread_info *info,
149                          struct ldlm_reply *rep, __u64 op_flag)
150 {
151         if (info) {
152                 info->mti_opdata |= op_flag;
153                 tgt_opdata_set(info->mti_env, op_flag);
154         }
155         if (rep)
156                 rep->lock_policy_res1 |= op_flag;
157 }
158
159 void mdt_lock_reg_init(struct mdt_lock_handle *lh, enum ldlm_mode lm)
160 {
161         lh->mlh_pdo_hash = 0;
162         lh->mlh_reg_mode = lm;
163         lh->mlh_rreg_mode = lm;
164         lh->mlh_type = MDT_REG_LOCK;
165 }
166
167 void mdt_lh_reg_init(struct mdt_lock_handle *lh, struct ldlm_lock *lock)
168 {
169         mdt_lock_reg_init(lh, lock->l_req_mode);
170         if (lock->l_req_mode == LCK_GROUP)
171                 lh->mlh_gid = lock->l_policy_data.l_inodebits.li_gid;
172 }
173
174 void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode,
175                        const struct lu_name *lname)
176 {
177         lh->mlh_reg_mode = lock_mode;
178         lh->mlh_pdo_mode = LCK_MINMODE;
179         lh->mlh_rreg_mode = lock_mode;
180         lh->mlh_type = MDT_PDO_LOCK;
181
182         if (lu_name_is_valid(lname)) {
183                 lh->mlh_pdo_hash = ll_full_name_hash(NULL, lname->ln_name,
184                                                      lname->ln_namelen);
185                 /* XXX Workaround for LU-2856
186                  *
187                  * Zero is a valid return value of full_name_hash, but
188                  * several users of mlh_pdo_hash assume a non-zero
189                  * hash value. We therefore map zero onto an
190                  * arbitrary, but consistent value (1) to avoid
191                  * problems further down the road. */
192                 if (unlikely(lh->mlh_pdo_hash == 0))
193                         lh->mlh_pdo_hash = 1;
194         } else {
195                 lh->mlh_pdo_hash = 0;
196         }
197 }
198
199 static void mdt_lock_pdo_mode(struct mdt_thread_info *info, struct mdt_object *o,
200                               struct mdt_lock_handle *lh)
201 {
202         mdl_mode_t mode;
203         ENTRY;
204
205         /*
206          * Any dir access needs couple of locks:
207          *
208          * 1) on part of dir we gonna take lookup/modify;
209          *
210          * 2) on whole dir to protect it from concurrent splitting and/or to
211          * flush client's cache for readdir().
212          *
213          * so, for a given mode and object this routine decides what lock mode
214          * to use for lock #2:
215          *
216          * 1) if caller's gonna lookup in dir then we need to protect dir from
217          * being splitted only - LCK_CR
218          *
219          * 2) if caller's gonna modify dir then we need to protect dir from
220          * being splitted and to flush cache - LCK_CW
221          *
222          * 3) if caller's gonna modify dir and that dir seems ready for
223          * splitting then we need to protect it from any type of access
224          * (lookup/modify/split) - LCK_EX --bzzz
225          */
226
227         LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
228         LASSERT(lh->mlh_pdo_mode == LCK_MINMODE);
229
230         /*
231          * Ask underlaying level its opinion about preferable PDO lock mode
232          * having access type passed as regular lock mode:
233          *
234          * - MDL_MINMODE means that lower layer does not want to specify lock
235          * mode;
236          *
237          * - MDL_NL means that no PDO lock should be taken. This is used in some
238          * cases. Say, for non-splittable directories no need to use PDO locks
239          * at all.
240          */
241         mode = mdo_lock_mode(info->mti_env, mdt_object_child(o),
242                              mdt_dlm_mode2mdl_mode(lh->mlh_reg_mode));
243
244         if (mode != MDL_MINMODE) {
245                 lh->mlh_pdo_mode = mdt_mdl_mode2dlm_mode(mode);
246         } else {
247                 /*
248                  * Lower layer does not want to specify locking mode. We do it
249                  * our selves. No special protection is needed, just flush
250                  * client's cache on modification and allow concurrent
251                  * mondification.
252                  */
253                 switch (lh->mlh_reg_mode) {
254                 case LCK_EX:
255                         lh->mlh_pdo_mode = LCK_EX;
256                         break;
257                 case LCK_PR:
258                         lh->mlh_pdo_mode = LCK_CR;
259                         break;
260                 case LCK_PW:
261                         lh->mlh_pdo_mode = LCK_CW;
262                         break;
263                 default:
264                         CERROR("Not expected lock type (0x%x)\n",
265                                (int)lh->mlh_reg_mode);
266                         LBUG();
267                 }
268         }
269
270         LASSERT(lh->mlh_pdo_mode != LCK_MINMODE);
271         EXIT;
272 }
273
274 /**
275  * Check whether \a o is directory stripe object.
276  *
277  * \param[in]  info     thread environment
278  * \param[in]  o        MDT object
279  *
280  * \retval 1    is directory stripe.
281  * \retval 0    isn't directory stripe.
282  * \retval < 1  error code
283  */
284 static int mdt_is_dir_stripe(struct mdt_thread_info *info,
285                                 struct mdt_object *o)
286 {
287         struct md_attr *ma = &info->mti_attr;
288         struct lmv_mds_md_v1 *lmv;
289         int rc;
290
291         rc = mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
292         if (rc < 0)
293                 return rc;
294
295         if (!(ma->ma_valid & MA_LMV))
296                 return 0;
297
298         lmv = &ma->ma_lmv->lmv_md_v1;
299
300         if (!lmv_is_sane2(lmv))
301                 return -EBADF;
302
303         if (le32_to_cpu(lmv->lmv_magic) == LMV_MAGIC_STRIPE)
304                 return 1;
305
306         return 0;
307 }
308
309 static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
310                               struct lu_fid *fid)
311 {
312         struct mdt_device *mdt = info->mti_mdt;
313         struct lu_name *lname = &info->mti_name;
314         const char *start = fileset;
315         char *filename = info->mti_filename;
316         struct mdt_object *parent;
317         u32 mode;
318         int rc = 0;
319
320         LASSERT(!info->mti_cross_ref);
321
322         /*
323          * We may want to allow this to mount a completely separate
324          * fileset from the MDT in the future, but keeping it to
325          * ROOT/ only for now avoid potential security issues.
326          */
327         *fid = mdt->mdt_md_root_fid;
328
329         while (rc == 0 && start != NULL && *start != '\0') {
330                 const char *s1 = start;
331                 const char *s2;
332
333                 while (*++s1 == '/')
334                         ;
335                 s2 = s1;
336                 while (*s2 != '/' && *s2 != '\0')
337                         s2++;
338
339                 if (s2 == s1)
340                         break;
341
342                 start = s2;
343
344                 lname->ln_namelen = s2 - s1;
345                 if (lname->ln_namelen > NAME_MAX) {
346                         rc = -EINVAL;
347                         break;
348                 }
349
350                 /* reject .. as a path component */
351                 if (lname->ln_namelen == 2 &&
352                     strncmp(s1, "..", 2) == 0) {
353                         rc = -EINVAL;
354                         break;
355                 }
356
357                 strncpy(filename, s1, lname->ln_namelen);
358                 filename[lname->ln_namelen] = '\0';
359                 lname->ln_name = filename;
360
361                 parent = mdt_object_find(info->mti_env, mdt, fid);
362                 if (IS_ERR(parent)) {
363                         rc = PTR_ERR(parent);
364                         break;
365                 }
366                 /* Only got the fid of this obj by name */
367                 fid_zero(fid);
368                 rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname,
369                                 fid, &info->mti_spec);
370                 mdt_object_put(info->mti_env, parent);
371         }
372         if (!rc) {
373                 parent = mdt_object_find(info->mti_env, mdt, fid);
374                 if (IS_ERR(parent))
375                         rc = PTR_ERR(parent);
376                 else {
377                         mode = lu_object_attr(&parent->mot_obj);
378                         if (!S_ISDIR(mode)) {
379                                 rc = -ENOTDIR;
380                         } else if (mdt_is_remote_object(info, parent, parent)) {
381                                 if (!mdt->mdt_enable_remote_subdir_mount) {
382                                         rc = -EREMOTE;
383                                         LCONSOLE_WARN("%s: subdir mount '%s' refused because 'enable_remote_subdir_mount=0': rc = %d\n",
384                                                       mdt_obd_name(mdt),
385                                                       fileset, rc);
386                                 } else {
387                                         LCONSOLE_INFO("%s: subdir mount '%s' is remote and may be slow\n",
388                                                       mdt_obd_name(mdt),
389                                                       fileset);
390                                 }
391                         }
392                         mdt_object_put(info->mti_env, parent);
393                 }
394         }
395
396         return rc;
397 }
398
399 static int mdt_get_root(struct tgt_session_info *tsi)
400 {
401         struct mdt_thread_info  *info = tsi2mdt_info(tsi);
402         struct mdt_device       *mdt = info->mti_mdt;
403         struct mdt_body         *repbody;
404         char                    *fileset = NULL, *buffer = NULL;
405         int                      rc;
406         struct obd_export       *exp = info->mti_exp;
407         char                    *nodemap_fileset;
408
409         ENTRY;
410
411         rc = mdt_check_ucred(info);
412         if (rc)
413                 GOTO(out, rc = err_serious(rc));
414
415         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GET_ROOT_PACK))
416                 GOTO(out, rc = err_serious(-ENOMEM));
417
418         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
419         if (req_capsule_get_size(info->mti_pill, &RMF_NAME, RCL_CLIENT) > 0) {
420                 fileset = req_capsule_client_get(info->mti_pill, &RMF_NAME);
421                 if (fileset == NULL)
422                         GOTO(out, rc = err_serious(-EFAULT));
423         }
424
425         nodemap_fileset = nodemap_get_fileset(exp->exp_target_data.ted_nodemap);
426         if (nodemap_fileset && nodemap_fileset[0]) {
427                 CDEBUG(D_INFO, "nodemap fileset is %s\n", nodemap_fileset);
428                 if (fileset) {
429                         /* consider fileset from client as a sub-fileset
430                          * of the nodemap one */
431                         OBD_ALLOC(buffer, PATH_MAX + 1);
432                         if (buffer == NULL)
433                                 GOTO(out, rc = err_serious(-ENOMEM));
434                         if (snprintf(buffer, PATH_MAX + 1, "%s/%s",
435                                      nodemap_fileset, fileset) >= PATH_MAX + 1)
436                                 GOTO(out, rc = err_serious(-EINVAL));
437                         fileset = buffer;
438                 } else {
439                         /* enforce fileset as specified in the nodemap */
440                         fileset = nodemap_fileset;
441                 }
442         }
443
444         if (fileset) {
445                 CDEBUG(D_INFO, "Getting fileset %s\n", fileset);
446                 rc = mdt_lookup_fileset(info, fileset, &repbody->mbo_fid1);
447                 if (rc < 0)
448                         GOTO(out, rc = err_serious(rc));
449         } else {
450                 repbody->mbo_fid1 = mdt->mdt_md_root_fid;
451         }
452         repbody->mbo_valid |= OBD_MD_FLID;
453
454         EXIT;
455 out:
456         mdt_thread_info_fini(info);
457         if (buffer)
458                 OBD_FREE(buffer, PATH_MAX+1);
459         return rc;
460 }
461
462 static int mdt_statfs(struct tgt_session_info *tsi)
463 {
464         struct ptlrpc_request *req = tgt_ses_req(tsi);
465         struct mdt_thread_info *info = tsi2mdt_info(tsi);
466         struct mdt_device *mdt = info->mti_mdt;
467         struct tg_grants_data *tgd = &mdt->mdt_lut.lut_tgd;
468         struct md_device *next = mdt->mdt_child;
469         struct ptlrpc_service_part *svcpt;
470         struct obd_statfs *osfs;
471         struct mdt_body *reqbody = NULL;
472         struct mdt_statfs_cache *msf;
473         ktime_t kstart = ktime_get();
474         int current_blockbits;
475         int rc;
476
477         ENTRY;
478
479         svcpt = req->rq_rqbd->rqbd_svcpt;
480
481         /* This will trigger a watchdog timeout */
482         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
483                          (MDT_SERVICE_WATCHDOG_FACTOR *
484                           at_get(&svcpt->scp_at_estimate)) + 1);
485
486         rc = mdt_check_ucred(info);
487         if (rc)
488                 GOTO(out, rc = err_serious(rc));
489
490         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK))
491                 GOTO(out, rc = err_serious(-ENOMEM));
492
493         osfs = req_capsule_server_get(info->mti_pill, &RMF_OBD_STATFS);
494         if (!osfs)
495                 GOTO(out, rc = -EPROTO);
496
497         if (mdt_is_sum_statfs_client(req->rq_export) &&
498                 lustre_packed_msg_size(req->rq_reqmsg) ==
499                 req_capsule_fmt_size(req->rq_reqmsg->lm_magic,
500                                      &RQF_MDS_STATFS_NEW, RCL_CLIENT)) {
501                 req_capsule_extend(info->mti_pill, &RQF_MDS_STATFS_NEW);
502                 reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
503         }
504
505         if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS)
506                 msf = &mdt->mdt_sum_osfs;
507         else
508                 msf = &mdt->mdt_osfs;
509
510         if (msf->msf_age + OBD_STATFS_CACHE_SECONDS <= ktime_get_seconds()) {
511                         /** statfs data is too old, get up-to-date one */
512                         if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS)
513                                 rc = next->md_ops->mdo_statfs(info->mti_env,
514                                                               next, osfs);
515                         else
516                                 rc = dt_statfs(info->mti_env, mdt->mdt_bottom,
517                                                osfs);
518                         if (rc)
519                                 GOTO(out, rc);
520                         spin_lock(&mdt->mdt_lock);
521                         msf->msf_osfs = *osfs;
522                         msf->msf_age = ktime_get_seconds();
523                         spin_unlock(&mdt->mdt_lock);
524         } else {
525                         /** use cached statfs data */
526                         spin_lock(&mdt->mdt_lock);
527                         *osfs = msf->msf_osfs;
528                         spin_unlock(&mdt->mdt_lock);
529         }
530
531         /* tgd_blockbit is recordsize bits set during mkfs.
532          * This once set does not change. However, 'zfs set'
533          * can be used to change the MDT blocksize. Instead
534          * of using cached value of 'tgd_blockbit' always
535          * calculate the blocksize bits which may have
536          * changed.
537          */
538         current_blockbits = fls64(osfs->os_bsize) - 1;
539
540         /* at least try to account for cached pages.  its still racy and
541          * might be under-reporting if clients haven't announced their
542          * caches with brw recently */
543         CDEBUG(D_SUPER | D_CACHE, "blocks cached %llu granted %llu"
544                " pending %llu free %llu avail %llu\n",
545                tgd->tgd_tot_dirty, tgd->tgd_tot_granted,
546                tgd->tgd_tot_pending,
547                osfs->os_bfree << current_blockbits,
548                osfs->os_bavail << current_blockbits);
549
550         osfs->os_bavail -= min_t(u64, osfs->os_bavail,
551                                  ((tgd->tgd_tot_dirty + tgd->tgd_tot_pending +
552                                    osfs->os_bsize - 1) >> current_blockbits));
553
554         tgt_grant_sanity_check(mdt->mdt_lu_dev.ld_obd, __func__);
555         CDEBUG(D_CACHE, "%llu blocks: %llu free, %llu avail; "
556                "%llu objects: %llu free; state %x\n",
557                osfs->os_blocks, osfs->os_bfree, osfs->os_bavail,
558                osfs->os_files, osfs->os_ffree, osfs->os_state);
559
560         if (!exp_grant_param_supp(tsi->tsi_exp) &&
561             current_blockbits > COMPAT_BSIZE_SHIFT) {
562                 /* clients which don't support OBD_CONNECT_GRANT_PARAM
563                  * should not see a block size > page size, otherwise
564                  * cl_lost_grant goes mad. Therefore, we emulate a 4KB (=2^12)
565                  * block size which is the biggest block size known to work
566                  * with all client's page size. */
567                 osfs->os_blocks <<= current_blockbits - COMPAT_BSIZE_SHIFT;
568                 osfs->os_bfree  <<= current_blockbits - COMPAT_BSIZE_SHIFT;
569                 osfs->os_bavail <<= current_blockbits - COMPAT_BSIZE_SHIFT;
570                 osfs->os_bsize = 1 << COMPAT_BSIZE_SHIFT;
571         }
572         if (rc == 0)
573                 mdt_counter_incr(req, LPROC_MDT_STATFS,
574                                  ktime_us_delta(ktime_get(), kstart));
575 out:
576         mdt_thread_info_fini(info);
577         RETURN(rc);
578 }
579
580 __u32 mdt_lmm_dom_entry_check(struct lov_mds_md *lmm, int *is_dom_only)
581 {
582         struct lov_comp_md_v1 *comp_v1;
583         struct lov_mds_md *v1;
584         __u32 off;
585         __u32 dom_stripesize = 0;
586         int i;
587         bool has_ost_stripes = false;
588
589         ENTRY;
590
591         if (is_dom_only)
592                 *is_dom_only = 0;
593
594         if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_COMP_V1)
595                 RETURN(0);
596
597         comp_v1 = (struct lov_comp_md_v1 *)lmm;
598         off = le32_to_cpu(comp_v1->lcm_entries[0].lcme_offset);
599         v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
600
601         /* Fast check for DoM entry with no mirroring, should be the first */
602         if (le16_to_cpu(comp_v1->lcm_mirror_count) == 0 &&
603             lov_pattern(le32_to_cpu(v1->lmm_pattern)) != LOV_PATTERN_MDT)
604                 RETURN(0);
605
606         /* check all entries otherwise */
607         for (i = 0; i < le16_to_cpu(comp_v1->lcm_entry_count); i++) {
608                 struct lov_comp_md_entry_v1 *lcme;
609
610                 lcme = &comp_v1->lcm_entries[i];
611                 if (!(le32_to_cpu(lcme->lcme_flags) & LCME_FL_INIT))
612                         continue;
613
614                 off = le32_to_cpu(lcme->lcme_offset);
615                 v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
616
617                 if (lov_pattern(le32_to_cpu(v1->lmm_pattern)) ==
618                     LOV_PATTERN_MDT)
619                         dom_stripesize = le32_to_cpu(v1->lmm_stripe_size);
620                 else
621                         has_ost_stripes = true;
622
623                 if (dom_stripesize && has_ost_stripes)
624                         RETURN(dom_stripesize);
625         }
626         /* DoM-only case exits here */
627         if (is_dom_only && dom_stripesize)
628                 *is_dom_only = 1;
629         RETURN(dom_stripesize);
630 }
631
632 /**
633  * Pack size attributes into the reply.
634  */
635 int mdt_pack_size2body(struct mdt_thread_info *info,
636                         const struct lu_fid *fid, struct lustre_handle *lh)
637 {
638         struct mdt_body *b;
639         struct md_attr *ma = &info->mti_attr;
640         __u32 dom_stripe;
641         bool dom_lock = false;
642
643         ENTRY;
644
645         LASSERT(ma->ma_attr.la_valid & LA_MODE);
646
647         if (!S_ISREG(ma->ma_attr.la_mode) ||
648             !(ma->ma_valid & MA_LOV && ma->ma_lmm != NULL))
649                 RETURN(-ENODATA);
650
651         dom_stripe = mdt_lmm_dom_stripesize(ma->ma_lmm);
652         /* no DoM stripe, no size in reply */
653         if (!dom_stripe)
654                 RETURN(-ENOENT);
655
656         if (lustre_handle_is_used(lh)) {
657                 struct ldlm_lock *lock;
658
659                 lock = ldlm_handle2lock(lh);
660                 if (lock != NULL) {
661                         dom_lock = ldlm_has_dom(lock);
662                         LDLM_LOCK_PUT(lock);
663                 }
664         }
665
666         /* no DoM lock, no size in reply */
667         if (!dom_lock)
668                 RETURN(0);
669
670         /* Either DoM lock exists or LMM has only DoM stripe then
671          * return size on body. */
672         b = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
673
674         mdt_dom_object_size(info->mti_env, info->mti_mdt, fid, b, dom_lock);
675         RETURN(0);
676 }
677
678 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
679 /*
680  * Pack ACL data into the reply. UIDs/GIDs are mapped and filtered by nodemap.
681  *
682  * \param       info    thread info object
683  * \param       repbody reply to pack ACLs into
684  * \param       o       mdt object of file to examine
685  * \param       nodemap nodemap of client to reply to
686  * \retval      0       success
687  * \retval      -errno  error getting or parsing ACL from disk
688  */
689 int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody,
690                       struct mdt_object *o, struct lu_nodemap *nodemap)
691 {
692         const struct lu_env     *env = info->mti_env;
693         struct md_object        *next = mdt_object_child(o);
694         struct lu_buf           *buf = &info->mti_buf;
695         struct mdt_device       *mdt = info->mti_mdt;
696         struct req_capsule *pill = info->mti_pill;
697         int rc;
698
699         ENTRY;
700
701         buf->lb_buf = req_capsule_server_get(pill, &RMF_ACL);
702         buf->lb_len = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER);
703         if (buf->lb_len == 0)
704                 RETURN(0);
705
706         LASSERT(!info->mti_big_acl_used);
707 again:
708         rc = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_ACCESS);
709         if (rc < 0) {
710                 if (rc == -ENODATA) {
711                         repbody->mbo_aclsize = 0;
712                         repbody->mbo_valid |= OBD_MD_FLACL;
713                         rc = 0;
714                 } else if (rc == -EOPNOTSUPP) {
715                         rc = 0;
716                 } else if (rc == -ERANGE) {
717                         if (exp_connect_large_acl(info->mti_exp) &&
718                             !info->mti_big_acl_used) {
719                                 if (info->mti_big_acl == NULL) {
720                                         info->mti_big_aclsize =
721                                                         min_t(unsigned int,
722                                                               mdt->mdt_max_ea_size,
723                                                               XATTR_SIZE_MAX);
724                                         OBD_ALLOC_LARGE(info->mti_big_acl,
725                                                         info->mti_big_aclsize);
726                                         if (info->mti_big_acl == NULL) {
727                                                 info->mti_big_aclsize = 0;
728                                                 CERROR("%s: unable to grow "
729                                                        DFID" ACL buffer\n",
730                                                        mdt_obd_name(mdt),
731                                                        PFID(mdt_object_fid(o)));
732                                                 RETURN(-ENOMEM);
733                                         }
734                                 }
735
736                                 CDEBUG(D_INODE, "%s: grow the "DFID
737                                        " ACL buffer to size %d\n",
738                                        mdt_obd_name(mdt),
739                                        PFID(mdt_object_fid(o)),
740                                        info->mti_big_aclsize);
741
742                                 buf->lb_buf = info->mti_big_acl;
743                                 buf->lb_len = info->mti_big_aclsize;
744                                 info->mti_big_acl_used = 1;
745                                 goto again;
746                         }
747                         /* FS has ACL bigger that our limits */
748                         CDEBUG(D_INODE, "%s: "DFID" ACL can't fit into %d\n",
749                                mdt_obd_name(mdt), PFID(mdt_object_fid(o)),
750                                info->mti_big_aclsize);
751                         rc = -E2BIG;
752                 } else {
753                         CERROR("%s: unable to read "DFID" ACL: rc = %d\n",
754                                mdt_obd_name(mdt), PFID(mdt_object_fid(o)), rc);
755                 }
756         } else {
757                 rc = nodemap_map_acl(nodemap, buf->lb_buf,
758                                      rc, NODEMAP_FS_TO_CLIENT);
759                 /* if all ACLs mapped out, rc is still >= 0 */
760                 if (rc < 0) {
761                         CERROR("%s: nodemap_map_acl unable to parse "DFID
762                                " ACL: rc = %d\n", mdt_obd_name(mdt),
763                                PFID(mdt_object_fid(o)), rc);
764                         repbody->mbo_aclsize = 0;
765                         repbody->mbo_valid &= ~OBD_MD_FLACL;
766                 } else {
767                         repbody->mbo_aclsize = rc;
768                         repbody->mbo_valid |= OBD_MD_FLACL;
769                         rc = 0;
770                 }
771         }
772
773         RETURN(rc);
774 }
775 #endif
776
777 /* XXX Look into layout in MDT layer. */
778 static inline bool mdt_hsm_is_released(struct lov_mds_md *lmm)
779 {
780         struct lov_comp_md_v1   *comp_v1;
781         struct lov_mds_md       *v1;
782         int                      i;
783
784         if (lmm->lmm_magic == LOV_MAGIC_COMP_V1) {
785                 comp_v1 = (struct lov_comp_md_v1 *)lmm;
786
787                 for (i = 0; i < comp_v1->lcm_entry_count; i++) {
788                         v1 = (struct lov_mds_md *)((char *)comp_v1 +
789                                 comp_v1->lcm_entries[i].lcme_offset);
790                         /* We don't support partial release for now */
791                         if (!(v1->lmm_pattern & LOV_PATTERN_F_RELEASED))
792                                 return false;
793                 }
794                 return true;
795         } else {
796                 return (lmm->lmm_pattern & LOV_PATTERN_F_RELEASED) ?
797                         true : false;
798         }
799 }
800
801 void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
802                         const struct lu_attr *attr, const struct lu_fid *fid)
803 {
804         struct md_attr *ma = &info->mti_attr;
805         struct obd_export *exp = info->mti_exp;
806         struct lu_nodemap *nodemap = NULL;
807
808         LASSERT(ma->ma_valid & MA_INODE);
809
810         if (attr->la_valid & LA_ATIME) {
811                 b->mbo_atime = attr->la_atime;
812                 b->mbo_valid |= OBD_MD_FLATIME;
813         }
814         if (attr->la_valid & LA_MTIME) {
815                 b->mbo_mtime = attr->la_mtime;
816                 b->mbo_valid |= OBD_MD_FLMTIME;
817         }
818         if (attr->la_valid & LA_CTIME) {
819                 b->mbo_ctime = attr->la_ctime;
820                 b->mbo_valid |= OBD_MD_FLCTIME;
821         }
822         if (attr->la_valid & LA_BTIME) {
823                 b->mbo_btime = attr->la_btime;
824                 b->mbo_valid |= OBD_MD_FLBTIME;
825         }
826         if (attr->la_valid & LA_FLAGS) {
827                 b->mbo_flags = attr->la_flags;
828                 b->mbo_valid |= OBD_MD_FLFLAGS;
829         }
830         if (attr->la_valid & LA_NLINK) {
831                 b->mbo_nlink = attr->la_nlink;
832                 b->mbo_valid |= OBD_MD_FLNLINK;
833         }
834         if (attr->la_valid & (LA_UID|LA_GID)) {
835                 nodemap = nodemap_get_from_exp(exp);
836                 if (IS_ERR(nodemap))
837                         goto out;
838         }
839         if (attr->la_valid & LA_UID) {
840                 b->mbo_uid = nodemap_map_id(nodemap, NODEMAP_UID,
841                                             NODEMAP_FS_TO_CLIENT,
842                                             attr->la_uid);
843                 b->mbo_valid |= OBD_MD_FLUID;
844         }
845         if (attr->la_valid & LA_GID) {
846                 b->mbo_gid = nodemap_map_id(nodemap, NODEMAP_GID,
847                                             NODEMAP_FS_TO_CLIENT,
848                                             attr->la_gid);
849                 b->mbo_valid |= OBD_MD_FLGID;
850         }
851
852         if (attr->la_valid & LA_PROJID) {
853                 /* TODO, nodemap for project id */
854                 b->mbo_projid = attr->la_projid;
855                 b->mbo_valid |= OBD_MD_FLPROJID;
856         }
857
858         b->mbo_mode = attr->la_mode;
859         if (attr->la_valid & LA_MODE)
860                 b->mbo_valid |= OBD_MD_FLMODE;
861         if (attr->la_valid & LA_TYPE)
862                 b->mbo_valid |= OBD_MD_FLTYPE;
863
864         if (fid != NULL) {
865                 b->mbo_fid1 = *fid;
866                 b->mbo_valid |= OBD_MD_FLID;
867                 CDEBUG(D_INODE, DFID": nlink=%d, mode=%o, valid=%#llx\n",
868                        PFID(fid), b->mbo_nlink, b->mbo_mode, b->mbo_valid);
869         }
870
871         if (!(attr->la_valid & LA_TYPE))
872                 return;
873
874         b->mbo_rdev   = attr->la_rdev;
875         b->mbo_size   = attr->la_size;
876         b->mbo_blocks = attr->la_blocks;
877
878         if (!S_ISREG(attr->la_mode)) {
879                 b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLRDEV;
880         } else if (ma->ma_need & MA_LOV && !(ma->ma_valid & MA_LOV)) {
881                 /* means no objects are allocated on osts. */
882                 LASSERT(!(ma->ma_valid & MA_LOV));
883                 /* just ignore blocks occupied by extend attributes on MDS */
884                 b->mbo_blocks = 0;
885                 /* if no object is allocated on osts, the size on mds is valid.
886                  * b=22272 */
887                 b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
888         } else if ((ma->ma_valid & MA_LOV) && ma->ma_lmm != NULL) {
889                 if (mdt_hsm_is_released(ma->ma_lmm)) {
890                         /* A released file stores its size on MDS. */
891                         /* But return 1 block for released file, unless tools
892                          * like tar will consider it fully sparse. (LU-3864)
893                          */
894                         if (unlikely(b->mbo_size == 0))
895                                 b->mbo_blocks = 0;
896                         else
897                                 b->mbo_blocks = 1;
898                         b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
899                 } else if (info->mti_som_valid) { /* som is valid */
900                         b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
901                 } else if (ma->ma_valid & MA_SOM) { /* lsom is valid */
902                         b->mbo_valid |= OBD_MD_FLLAZYSIZE | OBD_MD_FLLAZYBLOCKS;
903                         b->mbo_size = ma->ma_som.ms_size;
904                         b->mbo_blocks = ma->ma_som.ms_blocks;
905                 }
906         }
907
908         if (fid != NULL && (b->mbo_valid & OBD_MD_FLSIZE ||
909                             b->mbo_valid & OBD_MD_FLLAZYSIZE))
910                 CDEBUG(D_VFSTRACE, DFID": returning size %llu\n",
911                        PFID(fid), (unsigned long long)b->mbo_size);
912
913 out:
914         if (!IS_ERR_OR_NULL(nodemap))
915                 nodemap_putref(nodemap);
916 }
917
918 static inline int mdt_body_has_lov(const struct lu_attr *la,
919                                    const struct mdt_body *body)
920 {
921         return (S_ISREG(la->la_mode) && (body->mbo_valid & OBD_MD_FLEASIZE)) ||
922                (S_ISDIR(la->la_mode) && (body->mbo_valid & OBD_MD_FLDIREA));
923 }
924
925 void mdt_client_compatibility(struct mdt_thread_info *info)
926 {
927         struct mdt_body       *body;
928         struct ptlrpc_request *req = mdt_info_req(info);
929         struct obd_export     *exp = req->rq_export;
930         struct md_attr        *ma = &info->mti_attr;
931         struct lu_attr        *la = &ma->ma_attr;
932         ENTRY;
933
934         if (exp_connect_layout(exp))
935                 /* the client can deal with 16-bit lmm_stripe_count */
936                 RETURN_EXIT;
937
938         body = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
939
940         if (!mdt_body_has_lov(la, body))
941                 RETURN_EXIT;
942
943         /* now we have a reply with a lov for a client not compatible with the
944          * layout lock so we have to clean the layout generation number */
945         if (S_ISREG(la->la_mode))
946                 ma->ma_lmm->lmm_layout_gen = 0;
947         EXIT;
948 }
949
950 static int mdt_attr_get_eabuf_size(struct mdt_thread_info *info,
951                                    struct mdt_object *o)
952 {
953         const struct lu_env *env = info->mti_env;
954         int rc, rc2;
955
956         rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL,
957                           XATTR_NAME_LOV);
958
959         if (rc == -ENODATA)
960                 rc = 0;
961
962         if (rc < 0)
963                 goto out;
964
965         /* Is it a directory? Let's check for the LMV as well */
966         if (S_ISDIR(lu_object_attr(&mdt_object_child(o)->mo_lu))) {
967                 rc2 = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL,
968                                    XATTR_NAME_LMV);
969
970                 if (rc2 == -ENODATA)
971                         rc2 = mo_xattr_get(env, mdt_object_child(o),
972                                            &LU_BUF_NULL,
973                                            XATTR_NAME_DEFAULT_LMV);
974
975                 if ((rc2 < 0 && rc2 != -ENODATA) || (rc2 > rc))
976                         rc = rc2;
977         }
978
979 out:
980         return rc;
981 }
982
983 int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
984                       const char *name)
985 {
986         const struct lu_env *env = info->mti_env;
987         int rc;
988         ENTRY;
989
990         LASSERT(info->mti_big_lmm_used == 0);
991         rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL, name);
992         if (rc < 0)
993                 RETURN(rc);
994
995         /* big_lmm may need to be grown */
996         if (info->mti_big_lmmsize < rc) {
997                 int size = size_roundup_power2(rc);
998
999                 if (info->mti_big_lmmsize > 0) {
1000                         /* free old buffer */
1001                         LASSERT(info->mti_big_lmm);
1002                         OBD_FREE_LARGE(info->mti_big_lmm,
1003                                        info->mti_big_lmmsize);
1004                         info->mti_big_lmm = NULL;
1005                         info->mti_big_lmmsize = 0;
1006                 }
1007
1008                 OBD_ALLOC_LARGE(info->mti_big_lmm, size);
1009                 if (info->mti_big_lmm == NULL)
1010                         RETURN(-ENOMEM);
1011                 info->mti_big_lmmsize = size;
1012         }
1013         LASSERT(info->mti_big_lmmsize >= rc);
1014
1015         info->mti_buf.lb_buf = info->mti_big_lmm;
1016         info->mti_buf.lb_len = info->mti_big_lmmsize;
1017         rc = mo_xattr_get(env, mdt_object_child(o), &info->mti_buf, name);
1018
1019         RETURN(rc);
1020 }
1021
1022 int __mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
1023                      struct md_attr *ma, const char *name)
1024 {
1025         struct md_object *next = mdt_object_child(o);
1026         struct lu_buf    *buf = &info->mti_buf;
1027         int rc;
1028
1029         if (strcmp(name, XATTR_NAME_LOV) == 0) {
1030                 buf->lb_buf = ma->ma_lmm;
1031                 buf->lb_len = ma->ma_lmm_size;
1032                 LASSERT(!(ma->ma_valid & MA_LOV));
1033         } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
1034                 buf->lb_buf = ma->ma_lmv;
1035                 buf->lb_len = ma->ma_lmv_size;
1036                 LASSERT(!(ma->ma_valid & MA_LMV));
1037         } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1038                 buf->lb_buf = ma->ma_default_lmv;
1039                 buf->lb_len = ma->ma_default_lmv_size;
1040                 LASSERT(!(ma->ma_valid & MA_LMV_DEF));
1041         } else {
1042                 return -EINVAL;
1043         }
1044
1045         LASSERT(buf->lb_buf);
1046
1047         rc = mo_xattr_get(info->mti_env, next, buf, name);
1048         if (rc > 0) {
1049
1050 got:
1051                 if (strcmp(name, XATTR_NAME_LOV) == 0) {
1052                         if (info->mti_big_lmm_used)
1053                                 ma->ma_lmm = info->mti_big_lmm;
1054
1055                         /* NOT return LOV EA with hole to old client. */
1056                         if (unlikely(le32_to_cpu(ma->ma_lmm->lmm_pattern) &
1057                                      LOV_PATTERN_F_HOLE) &&
1058                             !(exp_connect_flags(info->mti_exp) &
1059                               OBD_CONNECT_LFSCK)) {
1060                                 return -EIO;
1061                         } else {
1062                                 ma->ma_lmm_size = rc;
1063                                 ma->ma_valid |= MA_LOV;
1064                         }
1065                 } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
1066                         if (info->mti_big_lmm_used)
1067                                 ma->ma_lmv = info->mti_big_lmm;
1068
1069                         ma->ma_lmv_size = rc;
1070                         ma->ma_valid |= MA_LMV;
1071                 } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1072                         ma->ma_default_lmv_size = rc;
1073                         ma->ma_valid |= MA_LMV_DEF;
1074                 }
1075
1076                 /* Update mdt_max_mdsize so all clients will be aware that */
1077                 if (info->mti_mdt->mdt_max_mdsize < rc)
1078                         info->mti_mdt->mdt_max_mdsize = rc;
1079
1080                 rc = 0;
1081         } else if (rc == -ENODATA) {
1082                 /* no LOV EA */
1083                 rc = 0;
1084         } else if (rc == -ERANGE) {
1085                 /* Default LMV has fixed size, so it must be able to fit
1086                  * in the original buffer */
1087                 if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0)
1088                         return rc;
1089                 rc = mdt_big_xattr_get(info, o, name);
1090                 if (rc > 0) {
1091                         info->mti_big_lmm_used = 1;
1092                         goto got;
1093                 }
1094         }
1095
1096         return rc;
1097 }
1098
1099 int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
1100                    struct md_attr *ma, const char *name)
1101 {
1102         int rc;
1103
1104         if (!info->mti_big_lmm) {
1105                 OBD_ALLOC(info->mti_big_lmm, PAGE_SIZE);
1106                 if (!info->mti_big_lmm)
1107                         return -ENOMEM;
1108                 info->mti_big_lmmsize = PAGE_SIZE;
1109         }
1110
1111         if (strcmp(name, XATTR_NAME_LOV) == 0) {
1112                 ma->ma_lmm = info->mti_big_lmm;
1113                 ma->ma_lmm_size = info->mti_big_lmmsize;
1114                 ma->ma_valid &= ~MA_LOV;
1115         } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
1116                 ma->ma_lmv = info->mti_big_lmm;
1117                 ma->ma_lmv_size = info->mti_big_lmmsize;
1118                 ma->ma_valid &= ~MA_LMV;
1119         } else {
1120                 LBUG();
1121         }
1122
1123         LASSERT(!info->mti_big_lmm_used);
1124         rc = __mdt_stripe_get(info, o, ma, name);
1125         /* since big_lmm is always used here, clear 'used' flag to avoid
1126          * assertion in mdt_big_xattr_get().
1127          */
1128         info->mti_big_lmm_used = 0;
1129
1130         return rc;
1131 }
1132
1133 int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
1134                       struct lu_fid *pfid)
1135 {
1136         struct lu_buf           *buf = &info->mti_buf;
1137         struct link_ea_header   *leh;
1138         struct link_ea_entry    *lee;
1139         int                      rc;
1140         ENTRY;
1141
1142         buf->lb_buf = info->mti_big_lmm;
1143         buf->lb_len = info->mti_big_lmmsize;
1144         rc = mo_xattr_get(info->mti_env, mdt_object_child(o),
1145                           buf, XATTR_NAME_LINK);
1146         /* ignore errors, MA_PFID won't be set and it is
1147          * up to the caller to treat this as an error */
1148         if (rc == -ERANGE || buf->lb_len == 0) {
1149                 rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
1150                 buf->lb_buf = info->mti_big_lmm;
1151                 buf->lb_len = info->mti_big_lmmsize;
1152         }
1153
1154         if (rc < 0)
1155                 RETURN(rc);
1156         if (rc < sizeof(*leh)) {
1157                 CERROR("short LinkEA on "DFID": rc = %d\n",
1158                        PFID(mdt_object_fid(o)), rc);
1159                 RETURN(-ENODATA);
1160         }
1161
1162         leh = (struct link_ea_header *) buf->lb_buf;
1163         lee = (struct link_ea_entry *)(leh + 1);
1164         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
1165                 leh->leh_magic = LINK_EA_MAGIC;
1166                 leh->leh_reccount = __swab32(leh->leh_reccount);
1167                 leh->leh_len = __swab64(leh->leh_len);
1168         }
1169         if (leh->leh_magic != LINK_EA_MAGIC)
1170                 RETURN(-EINVAL);
1171         if (leh->leh_reccount == 0)
1172                 RETURN(-ENODATA);
1173
1174         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
1175         fid_be_to_cpu(pfid, pfid);
1176
1177         RETURN(0);
1178 }
1179
1180 int mdt_attr_get_pfid_name(struct mdt_thread_info *info, struct mdt_object *o,
1181                            struct lu_fid *pfid, struct lu_name *lname)
1182 {
1183         struct lu_buf *buf = &info->mti_buf;
1184         struct link_ea_header *leh;
1185         struct link_ea_entry *lee;
1186         int reclen;
1187         int rc;
1188
1189         buf->lb_buf = info->mti_xattr_buf;
1190         buf->lb_len = sizeof(info->mti_xattr_buf);
1191         rc = mo_xattr_get(info->mti_env, mdt_object_child(o), buf,
1192                           XATTR_NAME_LINK);
1193         if (rc == -ERANGE) {
1194                 rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
1195                 buf->lb_buf = info->mti_big_lmm;
1196                 buf->lb_len = info->mti_big_lmmsize;
1197         }
1198         if (rc < 0)
1199                 return rc;
1200
1201         if (rc < sizeof(*leh)) {
1202                 CERROR("short LinkEA on "DFID": rc = %d\n",
1203                        PFID(mdt_object_fid(o)), rc);
1204                 return -ENODATA;
1205         }
1206
1207         leh = (struct link_ea_header *)buf->lb_buf;
1208         lee = (struct link_ea_entry *)(leh + 1);
1209         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
1210                 leh->leh_magic = LINK_EA_MAGIC;
1211                 leh->leh_reccount = __swab32(leh->leh_reccount);
1212                 leh->leh_len = __swab64(leh->leh_len);
1213         }
1214         if (leh->leh_magic != LINK_EA_MAGIC)
1215                 return -EINVAL;
1216
1217         if (leh->leh_reccount == 0)
1218                 return -ENODATA;
1219
1220         linkea_entry_unpack(lee, &reclen, lname, pfid);
1221
1222         return 0;
1223 }
1224
1225 int mdt_attr_get_complex(struct mdt_thread_info *info,
1226                          struct mdt_object *o, struct md_attr *ma)
1227 {
1228         const struct lu_env *env = info->mti_env;
1229         struct md_object    *next = mdt_object_child(o);
1230         struct lu_buf       *buf = &info->mti_buf;
1231         int                  need = ma->ma_need;
1232         int                  rc = 0, rc2;
1233         u32                  mode;
1234         ENTRY;
1235
1236         ma->ma_valid = 0;
1237
1238         if (mdt_object_exists(o) == 0)
1239                 GOTO(out, rc = -ENOENT);
1240         mode = lu_object_attr(&next->mo_lu);
1241
1242         if (need & MA_INODE) {
1243                 ma->ma_need = MA_INODE;
1244                 rc = mo_attr_get(env, next, ma);
1245                 if (rc)
1246                         GOTO(out, rc);
1247
1248                 if (S_ISREG(mode))
1249                         (void) mdt_get_som(info, o, ma);
1250                 ma->ma_valid |= MA_INODE;
1251         }
1252
1253         if (need & MA_PFID) {
1254                 rc = mdt_attr_get_pfid(info, o, &ma->ma_pfid);
1255                 if (rc == 0)
1256                         ma->ma_valid |= MA_PFID;
1257                 /* ignore this error, parent fid is not mandatory */
1258                 rc = 0;
1259         }
1260
1261         if (need & MA_LOV && (S_ISREG(mode) || S_ISDIR(mode))) {
1262                 rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LOV);
1263                 if (rc)
1264                         GOTO(out, rc);
1265         }
1266
1267         if (need & MA_LMV && S_ISDIR(mode)) {
1268                 rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
1269                 if (rc != 0)
1270                         GOTO(out, rc);
1271         }
1272
1273         if (need & MA_LMV_DEF && S_ISDIR(mode)) {
1274                 rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV);
1275                 if (rc != 0)
1276                         GOTO(out, rc);
1277         }
1278
1279         /*
1280          * In the handle of MA_INODE, we may already get the SOM attr.
1281          */
1282         if (need & MA_SOM && S_ISREG(mode) && !(ma->ma_valid & MA_SOM)) {
1283                 rc = mdt_get_som(info, o, ma);
1284                 if (rc != 0)
1285                         GOTO(out, rc);
1286         }
1287
1288         if (need & MA_HSM && S_ISREG(mode)) {
1289                 buf->lb_buf = info->mti_xattr_buf;
1290                 buf->lb_len = sizeof(info->mti_xattr_buf);
1291                 BUILD_BUG_ON(sizeof(struct hsm_attrs) >
1292                              sizeof(info->mti_xattr_buf));
1293                 rc2 = mo_xattr_get(info->mti_env, next, buf, XATTR_NAME_HSM);
1294                 rc2 = lustre_buf2hsm(info->mti_xattr_buf, rc2, &ma->ma_hsm);
1295                 if (rc2 == 0)
1296                         ma->ma_valid |= MA_HSM;
1297                 else if (rc2 < 0 && rc2 != -ENODATA)
1298                         GOTO(out, rc = rc2);
1299         }
1300
1301 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
1302         if (need & MA_ACL_DEF && S_ISDIR(mode)) {
1303                 buf->lb_buf = ma->ma_acl;
1304                 buf->lb_len = ma->ma_acl_size;
1305                 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT);
1306                 if (rc2 > 0) {
1307                         ma->ma_acl_size = rc2;
1308                         ma->ma_valid |= MA_ACL_DEF;
1309                 } else if (rc2 == -ENODATA) {
1310                         /* no ACLs */
1311                         ma->ma_acl_size = 0;
1312                 } else
1313                         GOTO(out, rc = rc2);
1314         }
1315 #endif
1316 out:
1317         ma->ma_need = need;
1318         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = %#llx ma_lmm=%p\n",
1319                rc, ma->ma_valid, ma->ma_lmm);
1320         RETURN(rc);
1321 }
1322
1323 static int mdt_getattr_internal(struct mdt_thread_info *info,
1324                                 struct mdt_object *o, int ma_need)
1325 {
1326         struct mdt_device *mdt = info->mti_mdt;
1327         struct md_object *next = mdt_object_child(o);
1328         const struct mdt_body *reqbody = info->mti_body;
1329         struct ptlrpc_request *req = mdt_info_req(info);
1330         struct md_attr *ma = &info->mti_attr;
1331         struct lu_attr *la = &ma->ma_attr;
1332         struct req_capsule *pill = info->mti_pill;
1333         const struct lu_env *env = info->mti_env;
1334         struct mdt_body *repbody;
1335         struct lu_buf *buffer = &info->mti_buf;
1336         struct obd_export *exp = info->mti_exp;
1337         ktime_t kstart = ktime_get();
1338         int rc;
1339
1340         ENTRY;
1341
1342         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
1343                 RETURN(err_serious(-ENOMEM));
1344
1345         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
1346
1347         ma->ma_valid = 0;
1348
1349         if (mdt_object_remote(o)) {
1350                 /* This object is located on remote node.*/
1351                 /* Return -ENOTSUPP for old client */
1352                 if (!mdt_is_dne_client(req->rq_export))
1353                         GOTO(out, rc = -ENOTSUPP);
1354
1355                 repbody->mbo_fid1 = *mdt_object_fid(o);
1356                 repbody->mbo_valid = OBD_MD_FLID | OBD_MD_MDS;
1357                 GOTO(out, rc = 0);
1358         }
1359
1360         if (reqbody->mbo_eadatasize > 0) {
1361                 buffer->lb_buf = req_capsule_server_get(pill, &RMF_MDT_MD);
1362                 if (buffer->lb_buf == NULL)
1363                         GOTO(out, rc = -EPROTO);
1364                 buffer->lb_len = req_capsule_get_size(pill, &RMF_MDT_MD,
1365                                                       RCL_SERVER);
1366         } else {
1367                 buffer->lb_buf = NULL;
1368                 buffer->lb_len = 0;
1369                 ma_need &= ~(MA_LOV | MA_LMV);
1370                 CDEBUG(D_INFO, "%s: RPC from %s: does not need LOVEA.\n",
1371                        mdt_obd_name(info->mti_mdt),
1372                        req->rq_export->exp_client_uuid.uuid);
1373         }
1374
1375         /* from 2.12.58 intent_getattr pack default LMV in reply */
1376         if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
1377             ((reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA)) ==
1378                     (OBD_MD_MEA | OBD_MD_DEFAULT_MEA)) &&
1379             req_capsule_has_field(&req->rq_pill, &RMF_DEFAULT_MDT_MD,
1380                                   RCL_SERVER)) {
1381                 ma->ma_lmv = buffer->lb_buf;
1382                 ma->ma_lmv_size = buffer->lb_len;
1383                 ma->ma_default_lmv = req_capsule_server_get(pill,
1384                                                 &RMF_DEFAULT_MDT_MD);
1385                 ma->ma_default_lmv_size = req_capsule_get_size(pill,
1386                                                 &RMF_DEFAULT_MDT_MD,
1387                                                 RCL_SERVER);
1388                 ma->ma_need = MA_INODE;
1389                 if (ma->ma_lmv_size > 0)
1390                         ma->ma_need |= MA_LMV;
1391                 if (ma->ma_default_lmv_size > 0)
1392                         ma->ma_need |= MA_LMV_DEF;
1393         } else if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
1394                    (reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA))) {
1395                 /* If it is dir and client require MEA, then we got MEA */
1396                 /* Assumption: MDT_MD size is enough for lmv size. */
1397                 ma->ma_lmv = buffer->lb_buf;
1398                 ma->ma_lmv_size = buffer->lb_len;
1399                 ma->ma_need = MA_INODE;
1400                 if (ma->ma_lmv_size > 0) {
1401                         if (reqbody->mbo_valid & OBD_MD_MEA) {
1402                                 ma->ma_need |= MA_LMV;
1403                         } else if (reqbody->mbo_valid & OBD_MD_DEFAULT_MEA) {
1404                                 ma->ma_need |= MA_LMV_DEF;
1405                                 ma->ma_default_lmv = buffer->lb_buf;
1406                                 ma->ma_lmv = NULL;
1407                                 ma->ma_default_lmv_size = buffer->lb_len;
1408                                 ma->ma_lmv_size = 0;
1409                         }
1410                 }
1411         } else {
1412                 ma->ma_lmm = buffer->lb_buf;
1413                 ma->ma_lmm_size = buffer->lb_len;
1414                 ma->ma_need = MA_INODE | MA_HSM;
1415                 if (ma->ma_lmm_size > 0) {
1416                         ma->ma_need |= MA_LOV;
1417                         /* Older clients may crash if they getattr overstriped
1418                          * files
1419                          */
1420                         if (!exp_connect_overstriping(exp) &&
1421                             mdt_lmm_is_overstriping(ma->ma_lmm))
1422                                 RETURN(-EOPNOTSUPP);
1423                 }
1424         }
1425
1426         if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
1427             reqbody->mbo_valid & OBD_MD_FLDIREA  &&
1428             lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) {
1429                 /* get default stripe info for this dir. */
1430                 ma->ma_need |= MA_LOV_DEF;
1431         }
1432         ma->ma_need |= ma_need;
1433
1434         rc = mdt_attr_get_complex(info, o, ma);
1435         if (unlikely(rc)) {
1436                 CDEBUG_LIMIT(rc == -ENOENT ? D_OTHER : D_ERROR,
1437                              "%s: getattr error for "DFID": rc = %d\n",
1438                              mdt_obd_name(info->mti_mdt),
1439                              PFID(mdt_object_fid(o)), rc);
1440                 RETURN(rc);
1441         }
1442
1443         /* if file is released, check if a restore is running */
1444         if (ma->ma_valid & MA_HSM) {
1445                 repbody->mbo_valid |= OBD_MD_TSTATE;
1446                 if ((ma->ma_hsm.mh_flags & HS_RELEASED) &&
1447                     mdt_hsm_restore_is_running(info, mdt_object_fid(o)))
1448                         repbody->mbo_t_state = MS_RESTORE;
1449         }
1450
1451         if (unlikely(!(ma->ma_valid & MA_INODE)))
1452                 RETURN(-EFAULT);
1453
1454         mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
1455
1456         if (mdt_body_has_lov(la, reqbody)) {
1457                 u32 stripe_count = 1;
1458                 bool fixed_layout = false;
1459
1460                 if (ma->ma_valid & MA_LOV) {
1461                         LASSERT(ma->ma_lmm_size);
1462                         repbody->mbo_eadatasize = ma->ma_lmm_size;
1463                         if (S_ISDIR(la->la_mode))
1464                                 repbody->mbo_valid |= OBD_MD_FLDIREA;
1465                         else
1466                                 repbody->mbo_valid |= OBD_MD_FLEASIZE;
1467                         mdt_dump_lmm(D_INFO, ma->ma_lmm, repbody->mbo_valid);
1468                 }
1469                 if (ma->ma_valid & MA_LMV) {
1470                         struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1471                         u32 magic = le32_to_cpu(lmv->lmv_magic);
1472
1473                         /* Return -ENOTSUPP for old client */
1474                         if (!mdt_is_striped_client(req->rq_export))
1475                                 RETURN(-ENOTSUPP);
1476
1477                         LASSERT(S_ISDIR(la->la_mode));
1478                         mdt_dump_lmv(D_INFO, ma->ma_lmv);
1479                         repbody->mbo_eadatasize = ma->ma_lmv_size;
1480                         repbody->mbo_valid |= (OBD_MD_FLDIREA|OBD_MD_MEA);
1481
1482                         stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
1483                         fixed_layout = lmv_is_fixed(lmv);
1484                         if (magic == LMV_MAGIC_STRIPE && lmv_is_restriping(lmv))
1485                                 mdt_restripe_migrate_add(info, o);
1486                         else if (magic == LMV_MAGIC_V1 &&
1487                                  lmv_is_restriping(lmv))
1488                                 mdt_restripe_update_add(info, o);
1489                 }
1490                 if (ma->ma_valid & MA_LMV_DEF) {
1491                         /* Return -ENOTSUPP for old client */
1492                         if (!mdt_is_striped_client(req->rq_export))
1493                                 RETURN(-ENOTSUPP);
1494                         LASSERT(S_ISDIR(la->la_mode));
1495                         /*
1496                          * when ll_dir_getstripe() gets default LMV, it
1497                          * checks mbo_eadatasize.
1498                          */
1499                         if (!(ma->ma_valid & MA_LMV))
1500                                 repbody->mbo_eadatasize =
1501                                         ma->ma_default_lmv_size;
1502                         repbody->mbo_valid |= (OBD_MD_FLDIREA |
1503                                                OBD_MD_DEFAULT_MEA);
1504                 }
1505                 CDEBUG(D_VFSTRACE,
1506                        "dirent count %llu stripe count %u MDT count %d\n",
1507                        ma->ma_attr.la_dirent_count, stripe_count,
1508                        atomic_read(&mdt->mdt_mds_mds_conns) + 1);
1509                 if (ma->ma_attr.la_dirent_count != LU_DIRENT_COUNT_UNSET &&
1510                     ma->ma_attr.la_dirent_count >
1511                         mdt->mdt_restriper.mdr_dir_split_count &&
1512                     !fid_is_root(mdt_object_fid(o)) &&
1513                     mdt->mdt_enable_dir_auto_split &&
1514                     !o->mot_restriping &&
1515                     stripe_count < atomic_read(&mdt->mdt_mds_mds_conns) + 1 &&
1516                     !fixed_layout)
1517                         mdt_auto_split_add(info, o);
1518         } else if (S_ISLNK(la->la_mode) &&
1519                    reqbody->mbo_valid & OBD_MD_LINKNAME) {
1520                 buffer->lb_buf = ma->ma_lmm;
1521                 /* eadatasize from client includes NULL-terminator, so
1522                  * there is no need to read it */
1523                 buffer->lb_len = reqbody->mbo_eadatasize - 1;
1524                 rc = mo_readlink(env, next, buffer);
1525                 if (unlikely(rc <= 0)) {
1526                         CERROR("%s: readlink failed for "DFID": rc = %d\n",
1527                                mdt_obd_name(info->mti_mdt),
1528                                PFID(mdt_object_fid(o)), rc);
1529                         rc = -EFAULT;
1530                 } else {
1531                         int print_limit = min_t(int, PAGE_SIZE - 128, rc);
1532
1533                         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READLINK_EPROTO))
1534                                 rc -= 2;
1535                         repbody->mbo_valid |= OBD_MD_LINKNAME;
1536                         /* we need to report back size with NULL-terminator
1537                          * because client expects that */
1538                         repbody->mbo_eadatasize = rc + 1;
1539                         if (repbody->mbo_eadatasize != reqbody->mbo_eadatasize)
1540                                 CDEBUG(D_INODE, "%s: Read shorter symlink %d "
1541                                        "on "DFID ", expected %d\n",
1542                                        mdt_obd_name(info->mti_mdt),
1543                                        rc, PFID(mdt_object_fid(o)),
1544                                        reqbody->mbo_eadatasize - 1);
1545                         /* NULL terminate */
1546                         ((char *)ma->ma_lmm)[rc] = 0;
1547
1548                         /* If the total CDEBUG() size is larger than a page, it
1549                          * will print a warning to the console, avoid this by
1550                          * printing just the last part of the symlink. */
1551                         CDEBUG(D_INODE, "symlink dest %s%.*s, len = %d\n",
1552                                print_limit < rc ? "..." : "", print_limit,
1553                                (char *)ma->ma_lmm + rc - print_limit, rc);
1554                         rc = 0;
1555                 }
1556         }
1557
1558         if (reqbody->mbo_valid & OBD_MD_FLMODEASIZE) {
1559                 repbody->mbo_max_mdsize = info->mti_mdt->mdt_max_mdsize;
1560                 repbody->mbo_valid |= OBD_MD_FLMODEASIZE;
1561                 CDEBUG(D_INODE, "changing the max MD size to %u\n",
1562                        repbody->mbo_max_mdsize);
1563         }
1564
1565 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
1566         if ((exp_connect_flags(req->rq_export) & OBD_CONNECT_ACL) &&
1567                  (reqbody->mbo_valid & OBD_MD_FLACL)) {
1568                 struct lu_nodemap *nodemap = nodemap_get_from_exp(exp);
1569                 if (IS_ERR(nodemap))
1570                         RETURN(PTR_ERR(nodemap));
1571
1572                 rc = mdt_pack_acl2body(info, repbody, o, nodemap);
1573                 nodemap_putref(nodemap);
1574         }
1575 #endif
1576
1577 out:
1578         if (rc == 0)
1579                 mdt_counter_incr(req, LPROC_MDT_GETATTR,
1580                                  ktime_us_delta(ktime_get(), kstart));
1581
1582         RETURN(rc);
1583 }
1584
1585 static int mdt_getattr(struct tgt_session_info *tsi)
1586 {
1587         struct mdt_thread_info  *info = tsi2mdt_info(tsi);
1588         struct mdt_object       *obj = info->mti_object;
1589         struct req_capsule      *pill = info->mti_pill;
1590         struct mdt_body         *reqbody;
1591         struct mdt_body         *repbody;
1592         int rc, rc2;
1593         ENTRY;
1594
1595         if (unlikely(info->mti_object == NULL))
1596                 RETURN(-EPROTO);
1597
1598         reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
1599         LASSERT(reqbody);
1600         LASSERT(lu_object_assert_exists(&obj->mot_obj));
1601
1602         /* Special case for Data-on-MDT files to get data version */
1603         if (unlikely(reqbody->mbo_valid & OBD_MD_FLDATAVERSION)) {
1604                 rc = mdt_data_version_get(tsi);
1605                 GOTO(out, rc);
1606         }
1607
1608         /* Unlike intent case where we need to pre-fill out buffers early on
1609          * in intent policy for ldlm reasons, here we can have a much better
1610          * guess at EA size by just reading it from disk.
1611          * Exceptions are readdir and (missing) directory striping */
1612         /* Readlink */
1613         if (reqbody->mbo_valid & OBD_MD_LINKNAME) {
1614                 /* No easy way to know how long is the symlink, but it cannot
1615                  * be more than PATH_MAX, so we allocate +1 */
1616                 rc = PATH_MAX + 1;
1617         /* A special case for fs ROOT: getattr there might fetch
1618          * default EA for entire fs, not just for this dir!
1619          */
1620         } else if (lu_fid_eq(mdt_object_fid(obj),
1621                              &info->mti_mdt->mdt_md_root_fid) &&
1622                    (reqbody->mbo_valid & OBD_MD_FLDIREA) &&
1623                    (lustre_msg_get_opc(mdt_info_req(info)->rq_reqmsg) ==
1624                                                                  MDS_GETATTR)) {
1625                 /* Should the default strping be bigger, mdt_fix_reply
1626                  * will reallocate */
1627                 rc = DEF_REP_MD_SIZE;
1628         } else {
1629                 /* Read the actual EA size from disk */
1630                 rc = mdt_attr_get_eabuf_size(info, obj);
1631         }
1632
1633         if (rc < 0)
1634                 GOTO(out, rc = err_serious(rc));
1635
1636         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, rc);
1637
1638         /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
1639          * by default. If the target object has more ACL entries, then
1640          * enlarge the buffer when necessary. */
1641         req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
1642                              LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1643
1644         rc = req_capsule_server_pack(pill);
1645         if (unlikely(rc != 0))
1646                 GOTO(out, rc = err_serious(rc));
1647
1648         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
1649         LASSERT(repbody != NULL);
1650         repbody->mbo_eadatasize = 0;
1651         repbody->mbo_aclsize = 0;
1652
1653         rc = mdt_check_ucred(info);
1654         if (unlikely(rc))
1655                 GOTO(out_shrink, rc);
1656
1657         info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
1658
1659         rc = mdt_getattr_internal(info, obj, 0);
1660         EXIT;
1661 out_shrink:
1662         mdt_client_compatibility(info);
1663         rc2 = mdt_fix_reply(info);
1664         if (rc == 0)
1665                 rc = rc2;
1666 out:
1667         mdt_thread_info_fini(info);
1668         return rc;
1669 }
1670
1671 /**
1672  * Handler of layout intent RPC requiring the layout modification
1673  *
1674  * \param[in]  info     thread environment
1675  * \param[in]  obj      object
1676  * \param[out] lhc      object ldlm lock handle
1677  * \param[in]  layout   layout change descriptor
1678  *
1679  * \retval 0    on success
1680  * \retval < 0  error code
1681  */
1682 int mdt_layout_change(struct mdt_thread_info *info, struct mdt_object *obj,
1683                       struct mdt_lock_handle *lhc,
1684                       struct md_layout_change *layout)
1685 {
1686         int rc;
1687
1688         ENTRY;
1689
1690         if (!mdt_object_exists(obj))
1691                 RETURN(-ENOENT);
1692
1693         if (!S_ISREG(lu_object_attr(&obj->mot_obj)))
1694                 RETURN(-EINVAL);
1695
1696         rc = mo_permission(info->mti_env, NULL, mdt_object_child(obj), NULL,
1697                            MAY_WRITE);
1698         if (rc)
1699                 RETURN(rc);
1700
1701         rc = mdt_check_resent_lock(info, obj, lhc);
1702         if (rc < 0)
1703                 RETURN(rc);
1704
1705         if (rc > 0) {
1706                 /* not resent */
1707                 __u64 lockpart = MDS_INODELOCK_LAYOUT;
1708
1709                 /* take layout lock to prepare layout change */
1710                 if (layout->mlc_opc == MD_LAYOUT_WRITE)
1711                         lockpart |= MDS_INODELOCK_UPDATE;
1712
1713                 mdt_lock_handle_init(lhc);
1714                 mdt_lock_reg_init(lhc, LCK_EX);
1715                 rc = mdt_reint_object_lock(info, obj, lhc, lockpart, false);
1716                 if (rc)
1717                         RETURN(rc);
1718         }
1719
1720         mutex_lock(&obj->mot_som_mutex);
1721         rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout);
1722         mutex_unlock(&obj->mot_som_mutex);
1723
1724         if (rc)
1725                 mdt_object_unlock(info, obj, lhc, 1);
1726
1727         RETURN(rc);
1728 }
1729
1730 /**
1731  * Exchange MOF_LOV_CREATED flags between two objects after a
1732  * layout swap. No assumption is made on whether o1 or o2 have
1733  * created objects or not.
1734  *
1735  * \param[in,out] o1    First swap layout object
1736  * \param[in,out] o2    Second swap layout object
1737  */
1738 static void mdt_swap_lov_flag(struct mdt_object *o1, struct mdt_object *o2)
1739 {
1740         unsigned int o1_lov_created = o1->mot_lov_created;
1741
1742         mutex_lock(&o1->mot_lov_mutex);
1743         mutex_lock(&o2->mot_lov_mutex);
1744
1745         o1->mot_lov_created = o2->mot_lov_created;
1746         o2->mot_lov_created = o1_lov_created;
1747
1748         mutex_unlock(&o2->mot_lov_mutex);
1749         mutex_unlock(&o1->mot_lov_mutex);
1750 }
1751
1752 static int mdt_swap_layouts(struct tgt_session_info *tsi)
1753 {
1754         struct mdt_thread_info  *info;
1755         struct ptlrpc_request   *req = tgt_ses_req(tsi);
1756         struct obd_export       *exp = req->rq_export;
1757         struct mdt_object       *o1, *o2, *o;
1758         struct mdt_lock_handle  *lh1, *lh2;
1759         struct mdc_swap_layouts *msl;
1760         int                      rc;
1761         ENTRY;
1762
1763         /* client does not support layout lock, so layout swaping
1764          * is disabled.
1765          * FIXME: there is a problem for old clients which don't support
1766          * layout lock yet. If those clients have already opened the file
1767          * they won't be notified at all so that old layout may still be
1768          * used to do IO. This can be fixed after file release is landed by
1769          * doing exclusive open and taking full EX ibits lock. - Jinshan */
1770         if (!exp_connect_layout(exp))
1771                 RETURN(-EOPNOTSUPP);
1772
1773         info = tsi2mdt_info(tsi);
1774         if (unlikely(info->mti_object == NULL))
1775                 RETURN(-EPROTO);
1776
1777         if (info->mti_dlm_req != NULL)
1778                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1779
1780         o1 = info->mti_object;
1781         o = o2 = mdt_object_find(info->mti_env, info->mti_mdt,
1782                                 &info->mti_body->mbo_fid2);
1783         if (IS_ERR(o))
1784                 GOTO(out, rc = PTR_ERR(o));
1785
1786         if (mdt_object_remote(o) || !mdt_object_exists(o)) /* remote object */
1787                 GOTO(put, rc = -ENOENT);
1788
1789         rc = lu_fid_cmp(&info->mti_body->mbo_fid1, &info->mti_body->mbo_fid2);
1790         if (unlikely(rc == 0)) /* same file, you kidding me? no-op. */
1791                 GOTO(put, rc);
1792
1793         if (rc < 0)
1794                 swap(o1, o2);
1795
1796         /* permission check. Make sure the calling process having permission
1797          * to write both files. */
1798         rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL,
1799                            MAY_WRITE);
1800         if (rc < 0)
1801                 GOTO(put, rc);
1802
1803         rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2), NULL,
1804                            MAY_WRITE);
1805         if (rc < 0)
1806                 GOTO(put, rc);
1807
1808         msl = req_capsule_client_get(info->mti_pill, &RMF_SWAP_LAYOUTS);
1809         if (msl == NULL)
1810                 GOTO(put, rc = -EPROTO);
1811
1812         lh1 = &info->mti_lh[MDT_LH_NEW];
1813         mdt_lock_reg_init(lh1, LCK_EX);
1814         lh2 = &info->mti_lh[MDT_LH_OLD];
1815         mdt_lock_reg_init(lh2, LCK_EX);
1816
1817         rc = mdt_object_lock(info, o1, lh1, MDS_INODELOCK_LAYOUT |
1818                              MDS_INODELOCK_XATTR);
1819         if (rc < 0)
1820                 GOTO(put, rc);
1821
1822         rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
1823                              MDS_INODELOCK_XATTR);
1824         if (rc < 0)
1825                 GOTO(unlock1, rc);
1826
1827         rc = mo_swap_layouts(info->mti_env, mdt_object_child(o1),
1828                              mdt_object_child(o2), msl->msl_flags);
1829         if (rc < 0)
1830                 GOTO(unlock2, rc);
1831
1832         mdt_swap_lov_flag(o1, o2);
1833
1834 unlock2:
1835         mdt_object_unlock(info, o2, lh2, rc);
1836 unlock1:
1837         mdt_object_unlock(info, o1, lh1, rc);
1838 put:
1839         mdt_object_put(info->mti_env, o);
1840 out:
1841         mdt_thread_info_fini(info);
1842         RETURN(rc);
1843 }
1844
1845 static int mdt_raw_lookup(struct mdt_thread_info *info,
1846                           struct mdt_object *parent,
1847                           const struct lu_name *lname)
1848 {
1849         struct lu_fid *fid = &info->mti_tmp_fid1;
1850         struct mdt_body *repbody;
1851         bool is_dotdot = false;
1852         bool is_old_parent_stripe = false;
1853         bool is_new_parent_checked = false;
1854         int rc;
1855
1856         ENTRY;
1857
1858         LASSERT(!info->mti_cross_ref);
1859         /* Always allow to lookup ".." */
1860         if (lname->ln_namelen == 2 &&
1861             lname->ln_name[0] == '.' && lname->ln_name[1] == '.') {
1862                 info->mti_spec.sp_permitted = 1;
1863                 is_dotdot = true;
1864                 if (mdt_is_dir_stripe(info, parent) == 1)
1865                         is_old_parent_stripe = true;
1866         }
1867
1868         mdt_object_get(info->mti_env, parent);
1869 lookup:
1870         /* Only got the fid of this obj by name */
1871         fid_zero(fid);
1872         rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname, fid,
1873                         &info->mti_spec);
1874         mdt_object_put(info->mti_env, parent);
1875         if (rc)
1876                 RETURN(rc);
1877
1878         /* getattr_name("..") should return master object FID for striped dir */
1879         if (is_dotdot && (is_old_parent_stripe || !is_new_parent_checked)) {
1880                 parent = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1881                 if (IS_ERR(parent))
1882                         RETURN(PTR_ERR(parent));
1883
1884                 /* old client getattr_name("..") with stripe FID */
1885                 if (unlikely(is_old_parent_stripe)) {
1886                         is_old_parent_stripe = false;
1887                         goto lookup;
1888                 }
1889
1890                 /* ".." may be a stripe */
1891                 if (unlikely(mdt_is_dir_stripe(info, parent) == 1)) {
1892                         is_new_parent_checked = true;
1893                         goto lookup;
1894                 }
1895
1896                 mdt_object_put(info->mti_env, parent);
1897         }
1898
1899         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1900         repbody->mbo_fid1 = *fid;
1901         repbody->mbo_valid = OBD_MD_FLID;
1902
1903         RETURN(rc);
1904 }
1905
1906 /**
1907  * Find name matching hash
1908  *
1909  * We search \a child LinkEA for a name whose hash matches \a lname
1910  * (it contains an encoded hash).
1911  *
1912  * \param info mdt thread info
1913  * \param lname encoded hash to find
1914  * \param parent parent object
1915  * \param child object to search with LinkEA
1916  * \param force_check true to check hash even if LinkEA has only one entry
1917  *
1918  * \retval 1 match found
1919  * \retval 0 no match found
1920  * \retval -ev negative errno upon error
1921  */
1922 int find_name_matching_hash(struct mdt_thread_info *info, struct lu_name *lname,
1923                             struct mdt_object *parent, struct mdt_object *child,
1924                             bool force_check)
1925 {
1926         /* Here, lname is an encoded hash of on-disk name, and
1927          * client is doing access without encryption key.
1928          * So we need to get LinkEA, check parent fid is correct and
1929          * compare name hash with the one in the request.
1930          */
1931         struct lu_buf *buf = &info->mti_big_buf;
1932         struct lu_name name;
1933         struct lu_fid pfid;
1934         struct linkea_data ldata = { NULL };
1935         struct link_ea_header *leh;
1936         struct link_ea_entry *lee;
1937         struct lu_buf link = { 0 };
1938         char *hash = NULL;
1939         int reclen, count, rc;
1940
1941         ENTRY;
1942
1943         if (lname->ln_namelen < LLCRYPT_FNAME_DIGEST_SIZE)
1944                 RETURN(-EINVAL);
1945
1946         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
1947         if (!buf->lb_buf)
1948                 RETURN(-ENOMEM);
1949
1950         ldata.ld_buf = buf;
1951         rc = mdt_links_read(info, child, &ldata);
1952         if (rc < 0)
1953                 RETURN(rc);
1954
1955         leh = buf->lb_buf;
1956         if (force_check || leh->leh_reccount > 1) {
1957                 hash = kmalloc(lname->ln_namelen, GFP_NOFS);
1958                 if (!hash)
1959                         RETURN(-ENOMEM);
1960                 rc = critical_decode(lname->ln_name, lname->ln_namelen, hash);
1961         }
1962         lee = (struct link_ea_entry *)(leh + 1);
1963         for (count = 0; count < leh->leh_reccount; count++) {
1964                 linkea_entry_unpack(lee, &reclen, &name, &pfid);
1965                 if (!force_check && leh->leh_reccount == 1) {
1966                         /* if there is only one rec, it has to be it */
1967                         *lname = name;
1968                         break;
1969                 }
1970                 if (!parent || lu_fid_eq(&pfid, mdt_object_fid(parent))) {
1971                         lu_buf_check_and_alloc(&link, name.ln_namelen);
1972                         if (!link.lb_buf)
1973                                 GOTO(out_match, rc = -ENOMEM);
1974                         rc = critical_decode(name.ln_name, name.ln_namelen,
1975                                              link.lb_buf);
1976
1977                         if (memcmp(LLCRYPT_FNAME_DIGEST(link.lb_buf, rc),
1978                                    hash, LLCRYPT_FNAME_DIGEST_SIZE) == 0) {
1979                                 *lname = name;
1980                                 break;
1981                         }
1982                 }
1983                 lee = (struct link_ea_entry *) ((char *)lee + reclen);
1984         }
1985         if (count == leh->leh_reccount)
1986                 rc = 0;
1987         else
1988                 rc = 1;
1989
1990 out_match:
1991         lu_buf_free(&link);
1992         kfree(hash);
1993
1994         RETURN(rc);
1995 }
1996
1997 /*
1998  * UPDATE lock should be taken against parent, and be released before exit;
1999  * child_bits lock should be taken against child, and be returned back:
2000  *            (1)normal request should release the child lock;
2001  *            (2)intent request will grant the lock to client.
2002  */
2003 static int mdt_getattr_name_lock(struct mdt_thread_info *info,
2004                                  struct mdt_lock_handle *lhc,
2005                                  __u64 child_bits,
2006                                  struct ldlm_reply *ldlm_rep)
2007 {
2008         struct ptlrpc_request *req = mdt_info_req(info);
2009         struct mdt_body *reqbody = NULL;
2010         struct mdt_object *parent = info->mti_object;
2011         struct mdt_object *child = NULL;
2012         struct lu_fid *child_fid = &info->mti_tmp_fid1;
2013         struct lu_name *lname = NULL;
2014         struct mdt_lock_handle *lhp = NULL;
2015         struct ldlm_lock *lock;
2016         struct req_capsule *pill = info->mti_pill;
2017         __u64 try_bits = 0;
2018         bool is_resent;
2019         int ma_need = 0;
2020         int rc;
2021
2022         ENTRY;
2023
2024         is_resent = lustre_handle_is_used(&lhc->mlh_reg_lh);
2025         LASSERT(ergo(is_resent,
2026                      lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT));
2027
2028         if (parent == NULL)
2029                 RETURN(-ENOENT);
2030
2031         if (info->mti_cross_ref) {
2032                 /* Only getattr on the child. Parent is on another node. */
2033                 mdt_set_disposition(info, ldlm_rep,
2034                                     DISP_LOOKUP_EXECD | DISP_LOOKUP_POS);
2035                 child = parent;
2036                 CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID", "
2037                        "ldlm_rep = %p\n",
2038                        PFID(mdt_object_fid(child)), ldlm_rep);
2039
2040                 rc = mdt_check_resent_lock(info, child, lhc);
2041                 if (rc < 0) {
2042                         RETURN(rc);
2043                 } else if (rc > 0) {
2044                         mdt_lock_handle_init(lhc);
2045                         mdt_lock_reg_init(lhc, LCK_PR);
2046
2047                         /*
2048                          * Object's name entry is on another MDS, it will
2049                          * request PERM lock only because LOOKUP lock is owned
2050                          * by the MDS where name entry resides.
2051                          *
2052                          * TODO: it should try layout lock too. - Jinshan
2053                          */
2054                         child_bits &= ~(MDS_INODELOCK_LOOKUP |
2055                                         MDS_INODELOCK_LAYOUT);
2056                         child_bits |= MDS_INODELOCK_PERM;
2057
2058                         rc = mdt_object_lock(info, child, lhc, child_bits);
2059                         if (rc < 0)
2060                                 RETURN(rc);
2061                 }
2062
2063                 /* Finally, we can get attr for child. */
2064                 if (!mdt_object_exists(child)) {
2065                         LU_OBJECT_DEBUG(D_INFO, info->mti_env,
2066                                         &child->mot_obj,
2067                                         "remote object doesn't exist.");
2068                         mdt_object_unlock(info, child, lhc, 1);
2069                         RETURN(-ENOENT);
2070                 }
2071
2072                 rc = mdt_getattr_internal(info, child, 0);
2073                 if (unlikely(rc != 0)) {
2074                         mdt_object_unlock(info, child, lhc, 1);
2075                         RETURN(rc);
2076                 }
2077
2078                 rc = mdt_pack_secctx_in_reply(info, child);
2079                 if (unlikely(rc)) {
2080                         mdt_object_unlock(info, child, lhc, 1);
2081                         RETURN(rc);
2082                 }
2083
2084                 rc = mdt_pack_encctx_in_reply(info, child);
2085                 if (unlikely(rc))
2086                         mdt_object_unlock(info, child, lhc, 1);
2087                 RETURN(rc);
2088         }
2089
2090         lname = &info->mti_name;
2091         mdt_name_unpack(pill, &RMF_NAME, lname, MNF_FIX_ANON);
2092
2093         if (info->mti_body->mbo_valid & OBD_MD_NAMEHASH) {
2094                 reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
2095                 if (unlikely(reqbody == NULL))
2096                         RETURN(err_serious(-EPROTO));
2097
2098                 *child_fid = reqbody->mbo_fid2;
2099                 if (unlikely(!fid_is_sane(child_fid)))
2100                         RETURN(err_serious(-EINVAL));
2101
2102                 if (lu_fid_eq(mdt_object_fid(parent), child_fid)) {
2103                         mdt_object_get(info->mti_env, parent);
2104                         child = parent;
2105                 } else {
2106                         child = mdt_object_find(info->mti_env, info->mti_mdt,
2107                                                 child_fid);
2108                         if (IS_ERR(child))
2109                                 RETURN(PTR_ERR(child));
2110                 }
2111
2112                 CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
2113                        "ldlm_rep = %p\n",
2114                        PFID(mdt_object_fid(parent)),
2115                        PFID(&reqbody->mbo_fid2), ldlm_rep);
2116         } else if (lu_name_is_valid(lname)) {
2117                 if (mdt_object_remote(parent)) {
2118                         CERROR("%s: parent "DFID" is on remote target\n",
2119                                mdt_obd_name(info->mti_mdt),
2120                                PFID(mdt_object_fid(parent)));
2121                         RETURN(-EPROTO);
2122                 }
2123
2124                 CDEBUG(D_INODE, "getattr with lock for "DFID"/"DNAME", "
2125                        "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)),
2126                        PNAME(lname), ldlm_rep);
2127         } else {
2128                 reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
2129                 if (unlikely(reqbody == NULL))
2130                         RETURN(err_serious(-EPROTO));
2131
2132                 *child_fid = reqbody->mbo_fid2;
2133                 if (unlikely(!fid_is_sane(child_fid)))
2134                         RETURN(err_serious(-EINVAL));
2135
2136                 if (lu_fid_eq(mdt_object_fid(parent), child_fid)) {
2137                         mdt_object_get(info->mti_env, parent);
2138                         child = parent;
2139                 } else {
2140                         child = mdt_object_find(info->mti_env, info->mti_mdt,
2141                                                 child_fid);
2142                         if (IS_ERR(child))
2143                                 RETURN(PTR_ERR(child));
2144                 }
2145
2146                 if (mdt_object_remote(child)) {
2147                         CERROR("%s: child "DFID" is on remote target\n",
2148                                mdt_obd_name(info->mti_mdt),
2149                                PFID(mdt_object_fid(child)));
2150                         GOTO(out_child, rc = -EPROTO);
2151                 }
2152
2153                 /* don't fetch LOOKUP lock if it's remote object */
2154                 rc = mdt_is_remote_object(info, parent, child);
2155                 if (rc < 0)
2156                         GOTO(out_child, rc);
2157                 if (rc)
2158                         child_bits &= ~MDS_INODELOCK_LOOKUP;
2159
2160                 CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
2161                        "ldlm_rep = %p\n",
2162                        PFID(mdt_object_fid(parent)),
2163                        PFID(&reqbody->mbo_fid2), ldlm_rep);
2164         }
2165
2166         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
2167
2168         if (unlikely(!mdt_object_exists(parent)) &&
2169             !(info->mti_body->mbo_valid & OBD_MD_NAMEHASH) &&
2170             lu_name_is_valid(lname)) {
2171                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2172                                 &parent->mot_obj,
2173                                 "Parent doesn't exist!");
2174                 GOTO(out_child, rc = -ESTALE);
2175         }
2176
2177         if (!(info->mti_body->mbo_valid & OBD_MD_NAMEHASH) &&
2178             lu_name_is_valid(lname)) {
2179                 if (info->mti_body->mbo_valid == OBD_MD_FLID) {
2180                         rc = mdt_raw_lookup(info, parent, lname);
2181
2182                         RETURN(rc);
2183                 }
2184
2185                 /* step 1: lock parent only if parent is a directory */
2186                 if (S_ISDIR(lu_object_attr(&parent->mot_obj))) {
2187                         lhp = &info->mti_lh[MDT_LH_PARENT];
2188                         mdt_lock_pdo_init(lhp, LCK_PR, lname);
2189                         rc = mdt_object_lock(info, parent, lhp,
2190                                              MDS_INODELOCK_UPDATE);
2191                         if (unlikely(rc != 0))
2192                                 RETURN(rc);
2193                 }
2194
2195                 /* step 2: lookup child's fid by name */
2196                 fid_zero(child_fid);
2197                 rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname,
2198                                 child_fid, &info->mti_spec);
2199                 if (rc == -ENOENT)
2200                         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
2201
2202                 if (rc != 0)
2203                         GOTO(unlock_parent, rc);
2204
2205                 child = mdt_object_find(info->mti_env, info->mti_mdt,
2206                                         child_fid);
2207                 if (unlikely(IS_ERR(child)))
2208                         GOTO(unlock_parent, rc = PTR_ERR(child));
2209         }
2210
2211         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
2212
2213         /* step 3: lock child regardless if it is local or remote. */
2214         LASSERT(child);
2215
2216         if (info->mti_body->mbo_valid & OBD_MD_NAMEHASH) {
2217                 /* Here, lname is an encoded hash of on-disk name, and
2218                  * client is doing access without encryption key.
2219                  * So we need to compare name hash with the one in the request.
2220                  */
2221                 if (!find_name_matching_hash(info, lname, parent,
2222                                              child, true)) {
2223                         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
2224                         mdt_clear_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
2225                         GOTO(out_child, rc = -ENOENT);
2226                 }
2227         }
2228
2229         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2);
2230         if (!mdt_object_exists(child)) {
2231                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2232                                 &child->mot_obj,
2233                                 "Object doesn't exist!");
2234                 GOTO(out_child, rc = -ENOENT);
2235         }
2236
2237         rc = mdt_check_resent_lock(info, child, lhc);
2238         if (rc < 0) {
2239                 GOTO(out_child, rc);
2240         } else if (rc > 0) {
2241                 mdt_lock_handle_init(lhc);
2242                 mdt_lock_reg_init(lhc, LCK_PR);
2243
2244                 if (!(child_bits & MDS_INODELOCK_UPDATE) &&
2245                     !mdt_object_remote(child)) {
2246                         struct md_attr *ma = &info->mti_attr;
2247
2248                         ma->ma_valid = 0;
2249                         ma->ma_need = MA_INODE;
2250                         rc = mdt_attr_get_complex(info, child, ma);
2251                         if (unlikely(rc != 0))
2252                                 GOTO(out_child, rc);
2253
2254                         /* If the file has not been changed for some time, we
2255                          * return not only a LOOKUP lock, but also an UPDATE
2256                          * lock and this might save us RPC on later STAT. For
2257                          * directories, it also let negative dentry cache start
2258                          * working for this dir. */
2259                         if (ma->ma_valid & MA_INODE &&
2260                             ma->ma_attr.la_valid & LA_CTIME &&
2261                             info->mti_mdt->mdt_namespace->ns_ctime_age_limit +
2262                             ma->ma_attr.la_ctime < ktime_get_real_seconds())
2263                                 child_bits |= MDS_INODELOCK_UPDATE;
2264                 }
2265
2266                 /* layout lock must be granted in a best-effort way
2267                  * for IT operations */
2268                 LASSERT(!(child_bits & MDS_INODELOCK_LAYOUT));
2269                 if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
2270                     !mdt_object_remote(child) && ldlm_rep != NULL) {
2271                         if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) &&
2272                             exp_connect_layout(info->mti_exp)) {
2273                                 /* try to grant layout lock for regular file. */
2274                                 try_bits = MDS_INODELOCK_LAYOUT;
2275                         }
2276                         /* Acquire DOM lock in advance for data-on-mdt file */
2277                         if (child != parent)
2278                                 try_bits |= MDS_INODELOCK_DOM;
2279                 }
2280
2281                 if (try_bits != 0) {
2282                         /* try layout lock, it may fail to be granted due to
2283                          * contention at LOOKUP or UPDATE */
2284                         rc = mdt_object_lock_try(info, child, lhc, &child_bits,
2285                                                  try_bits, false);
2286                         if (child_bits & MDS_INODELOCK_LAYOUT)
2287                                 ma_need |= MA_LOV;
2288                 } else {
2289                         /* Do not enqueue the UPDATE lock from MDT(cross-MDT),
2290                          * client will enqueue the lock to the remote MDT */
2291                         if (mdt_object_remote(child))
2292                                 child_bits &= ~MDS_INODELOCK_UPDATE;
2293                         rc = mdt_object_lock(info, child, lhc, child_bits);
2294                 }
2295                 if (unlikely(rc != 0))
2296                         GOTO(out_child, rc);
2297         }
2298
2299         /* finally, we can get attr for child. */
2300         rc = mdt_getattr_internal(info, child, ma_need);
2301         if (unlikely(rc != 0)) {
2302                 mdt_object_unlock(info, child, lhc, 1);
2303                 GOTO(out_child, rc);
2304         }
2305
2306         rc = mdt_pack_secctx_in_reply(info, child);
2307         if (unlikely(rc)) {
2308                 mdt_object_unlock(info, child, lhc, 1);
2309                 GOTO(out_child, rc);
2310         }
2311
2312         rc = mdt_pack_encctx_in_reply(info, child);
2313         if (unlikely(rc)) {
2314                 mdt_object_unlock(info, child, lhc, 1);
2315                 GOTO(out_child, rc);
2316         }
2317
2318         lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
2319         if (lock) {
2320                 /* Debugging code. */
2321                 LDLM_DEBUG(lock, "Returning lock to client");
2322                 LASSERTF(fid_res_name_eq(mdt_object_fid(child),
2323                                          &lock->l_resource->lr_name),
2324                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
2325                          PLDLMRES(lock->l_resource),
2326                          PFID(mdt_object_fid(child)));
2327
2328                 if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND))) {
2329                         if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT))
2330                                 OBD_FAIL_TIMEOUT(OBD_FAIL_PTLRPC_ENQ_RESEND,
2331                                                  req->rq_deadline -
2332                                                  req->rq_arrival_time.tv_sec +
2333                                                  cfs_fail_val ?: 3);
2334                         /* Put the lock to the waiting list and force the cancel */
2335                         ldlm_set_ast_sent(lock);
2336                 }
2337
2338                 if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
2339                     !mdt_object_remote(child) && child != parent) {
2340                         mdt_object_put(info->mti_env, child);
2341                         rc = mdt_pack_size2body(info, child_fid,
2342                                                 &lhc->mlh_reg_lh);
2343                         if (rc != 0 && child_bits & MDS_INODELOCK_DOM) {
2344                                 /* DOM lock was taken in advance but this is
2345                                  * not DoM file. Drop the lock.
2346                                  */
2347                                 lock_res_and_lock(lock);
2348                                 ldlm_inodebits_drop(lock, MDS_INODELOCK_DOM);
2349                                 unlock_res_and_lock(lock);
2350                         }
2351                         LDLM_LOCK_PUT(lock);
2352                         GOTO(unlock_parent, rc = 0);
2353                 }
2354                 LDLM_LOCK_PUT(lock);
2355         }
2356
2357         EXIT;
2358 out_child:
2359         if (child)
2360                 mdt_object_put(info->mti_env, child);
2361 unlock_parent:
2362         if (lhp)
2363                 mdt_object_unlock(info, parent, lhp, 1);
2364         return rc;
2365 }
2366
2367 /* normal handler: should release the child lock */
2368 static int mdt_getattr_name(struct tgt_session_info *tsi)
2369 {
2370         struct mdt_thread_info  *info = tsi2mdt_info(tsi);
2371         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
2372         struct mdt_body *reqbody;
2373         struct mdt_body *repbody;
2374         int rc, rc2;
2375
2376         ENTRY;
2377
2378         reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
2379         LASSERT(reqbody != NULL);
2380         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2381         LASSERT(repbody != NULL);
2382
2383         info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
2384         repbody->mbo_eadatasize = 0;
2385         repbody->mbo_aclsize = 0;
2386
2387         rc = mdt_init_ucred(info, reqbody);
2388         if (unlikely(rc))
2389                 GOTO(out_shrink, rc);
2390
2391         rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
2392         if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
2393                 ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode);
2394                 lhc->mlh_reg_lh.cookie = 0;
2395         }
2396         mdt_exit_ucred(info);
2397         EXIT;
2398 out_shrink:
2399         mdt_client_compatibility(info);
2400         rc2 = mdt_fix_reply(info);
2401         if (rc == 0)
2402                 rc = rc2;
2403         mdt_thread_info_fini(info);
2404         return rc;
2405 }
2406
2407 static int mdt_rmfid_unlink(struct mdt_thread_info *info,
2408                             const struct lu_fid *pfid,
2409                             const struct lu_name *name,
2410                             struct mdt_object *obj, s64 ctime)
2411 {
2412         struct lu_fid *child_fid = &info->mti_tmp_fid1;
2413         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
2414         struct mdt_device *mdt = info->mti_mdt;
2415         struct md_attr *ma = &info->mti_attr;
2416         struct mdt_lock_handle *parent_lh;
2417         struct mdt_lock_handle *child_lh;
2418         struct mdt_object *pobj;
2419         bool cos_incompat = false;
2420         int rc;
2421         ENTRY;
2422
2423         pobj = mdt_object_find(info->mti_env, mdt, pfid);
2424         if (IS_ERR(pobj))
2425                 GOTO(out, rc = PTR_ERR(pobj));
2426
2427         parent_lh = &info->mti_lh[MDT_LH_PARENT];
2428         mdt_lock_pdo_init(parent_lh, LCK_PW, name);
2429         rc = mdt_object_lock(info, pobj, parent_lh, MDS_INODELOCK_UPDATE);
2430         if (rc != 0)
2431                 GOTO(put_parent, rc);
2432
2433         if (mdt_object_remote(pobj))
2434                 cos_incompat = true;
2435
2436         rc = mdo_lookup(info->mti_env, mdt_object_child(pobj),
2437                         name, child_fid, &info->mti_spec);
2438         if (rc != 0)
2439                 GOTO(unlock_parent, rc);
2440
2441         if (!lu_fid_eq(child_fid, mdt_object_fid(obj)))
2442                 GOTO(unlock_parent, rc = -EREMCHG);
2443
2444         child_lh = &info->mti_lh[MDT_LH_CHILD];
2445         mdt_lock_reg_init(child_lh, LCK_EX);
2446         rc = mdt_reint_striped_lock(info, obj, child_lh,
2447                                     MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE,
2448                                     einfo, cos_incompat);
2449         if (rc != 0)
2450                 GOTO(unlock_parent, rc);
2451
2452         if (atomic_read(&obj->mot_open_count)) {
2453                 CDEBUG(D_OTHER, "object "DFID" open, skip\n",
2454                        PFID(mdt_object_fid(obj)));
2455                 GOTO(unlock_child, rc = -EBUSY);
2456         }
2457
2458         ma->ma_need = 0;
2459         ma->ma_valid = MA_INODE;
2460         ma->ma_attr.la_valid = LA_CTIME;
2461         ma->ma_attr.la_ctime = ctime;
2462
2463         mutex_lock(&obj->mot_lov_mutex);
2464
2465         rc = mdo_unlink(info->mti_env, mdt_object_child(pobj),
2466                         mdt_object_child(obj), name, ma, 0);
2467
2468         mutex_unlock(&obj->mot_lov_mutex);
2469
2470 unlock_child:
2471         mdt_reint_striped_unlock(info, obj, child_lh, einfo, 1);
2472 unlock_parent:
2473         mdt_object_unlock(info, pobj, parent_lh, 1);
2474 put_parent:
2475         mdt_object_put(info->mti_env, pobj);
2476 out:
2477         RETURN(rc);
2478 }
2479
2480 static int mdt_rmfid_check_permission(struct mdt_thread_info *info,
2481                                         struct mdt_object *obj)
2482 {
2483         struct lu_ucred *uc = lu_ucred(info->mti_env);
2484         struct md_attr *ma = &info->mti_attr;
2485         struct lu_attr *la = &ma->ma_attr;
2486         int rc = 0;
2487         ENTRY;
2488
2489         ma->ma_need = MA_INODE;
2490         rc = mo_attr_get(info->mti_env, mdt_object_child(obj), ma);
2491         if (rc)
2492                 GOTO(out, rc);
2493
2494         if (la->la_flags & LUSTRE_IMMUTABLE_FL)
2495                         rc = -EACCES;
2496
2497         if (cap_raised(uc->uc_cap, CAP_DAC_OVERRIDE))
2498                 RETURN(0);
2499         if (uc->uc_fsuid == la->la_uid) {
2500                 if ((la->la_mode & S_IWUSR) == 0)
2501                         rc = -EACCES;
2502         } else if (uc->uc_fsgid == la->la_gid) {
2503                 if ((la->la_mode & S_IWGRP) == 0)
2504                         rc = -EACCES;
2505         } else if ((la->la_mode & S_IWOTH) == 0) {
2506                         rc = -EACCES;
2507         }
2508
2509 out:
2510         RETURN(rc);
2511 }
2512
2513 static int mdt_rmfid_one(struct mdt_thread_info *info, struct lu_fid *fid,
2514                          s64 ctime)
2515 {
2516         struct mdt_device *mdt = info->mti_mdt;
2517         struct mdt_object *obj = NULL;
2518         struct linkea_data ldata = { NULL };
2519         struct lu_buf *buf = &info->mti_big_buf;
2520         struct lu_name *name = &info->mti_name;
2521         struct lu_fid *pfid = &info->mti_tmp_fid1;
2522         struct link_ea_header *leh;
2523         struct link_ea_entry *lee;
2524         int reclen, count, rc = 0;
2525         ENTRY;
2526
2527         if (!fid_is_sane(fid))
2528                 GOTO(out, rc = -EINVAL);
2529
2530         if (!fid_is_namespace_visible(fid))
2531                 GOTO(out, rc = -EINVAL);
2532
2533         obj = mdt_object_find(info->mti_env, mdt, fid);
2534         if (IS_ERR(obj))
2535                 GOTO(out, rc = PTR_ERR(obj));
2536
2537         if (mdt_object_remote(obj))
2538                 GOTO(out, rc = -EREMOTE);
2539         if (!mdt_object_exists(obj) || lu_object_is_dying(&obj->mot_header))
2540                 GOTO(out, rc = -ENOENT);
2541
2542         rc = mdt_rmfid_check_permission(info, obj);
2543         if (rc)
2544                 GOTO(out, rc);
2545
2546         /* take LinkEA */
2547         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
2548         if (!buf->lb_buf)
2549                 GOTO(out, rc = -ENOMEM);
2550
2551         ldata.ld_buf = buf;
2552         rc = mdt_links_read(info, obj, &ldata);
2553         if (rc)
2554                 GOTO(out, rc);
2555
2556         leh = buf->lb_buf;
2557         lee = (struct link_ea_entry *)(leh + 1);
2558         for (count = 0; count < leh->leh_reccount; count++) {
2559                 /* remove every hardlink */
2560                 linkea_entry_unpack(lee, &reclen, name, pfid);
2561                 lee = (struct link_ea_entry *) ((char *)lee + reclen);
2562                 rc = mdt_rmfid_unlink(info, pfid, name, obj, ctime);
2563                 if (rc)
2564                         break;
2565         }
2566
2567 out:
2568         if (obj && !IS_ERR(obj))
2569                 mdt_object_put(info->mti_env, obj);
2570         if (info->mti_big_buf.lb_buf)
2571                 lu_buf_free(&info->mti_big_buf);
2572
2573         RETURN(rc);
2574 }
2575
2576 static int mdt_rmfid(struct tgt_session_info *tsi)
2577 {
2578         struct mdt_thread_info *mti = tsi2mdt_info(tsi);
2579         struct mdt_body *reqbody;
2580         struct lu_fid *fids, *rfids;
2581         int bufsize, rc;
2582         __u32 *rcs;
2583         int i, nr;
2584         ENTRY;
2585
2586         reqbody = req_capsule_client_get(tsi->tsi_pill, &RMF_MDT_BODY);
2587         if (reqbody == NULL)
2588                 RETURN(-EPROTO);
2589         bufsize = req_capsule_get_size(tsi->tsi_pill, &RMF_FID_ARRAY,
2590                                        RCL_CLIENT);
2591         nr = bufsize / sizeof(struct lu_fid);
2592         if (nr * sizeof(struct lu_fid) != bufsize)
2593                 RETURN(-EINVAL);
2594         req_capsule_set_size(tsi->tsi_pill, &RMF_RCS,
2595                              RCL_SERVER, nr * sizeof(__u32));
2596         req_capsule_set_size(tsi->tsi_pill, &RMF_FID_ARRAY,
2597                              RCL_SERVER, nr * sizeof(struct lu_fid));
2598         rc = req_capsule_server_pack(tsi->tsi_pill);
2599         if (rc)
2600                 GOTO(out, rc = err_serious(rc));
2601         fids = req_capsule_client_get(tsi->tsi_pill, &RMF_FID_ARRAY);
2602         if (fids == NULL)
2603                 RETURN(-EPROTO);
2604         rcs = req_capsule_server_get(tsi->tsi_pill, &RMF_RCS);
2605         LASSERT(rcs);
2606         rfids = req_capsule_server_get(tsi->tsi_pill, &RMF_FID_ARRAY);
2607         LASSERT(rfids);
2608
2609         mdt_init_ucred(mti, reqbody);
2610         for (i = 0; i < nr; i++) {
2611                 rfids[i] = fids[i];
2612                 rcs[i] = mdt_rmfid_one(mti, fids + i, reqbody->mbo_ctime);
2613         }
2614         mdt_exit_ucred(mti);
2615
2616 out:
2617         RETURN(rc);
2618 }
2619
2620 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2621                          void *karg, void __user *uarg);
2622
2623 static int mdt_set_info(struct tgt_session_info *tsi)
2624 {
2625         struct ptlrpc_request   *req = tgt_ses_req(tsi);
2626         char                    *key;
2627         void                    *val;
2628         int                      keylen, vallen, rc = 0;
2629
2630         ENTRY;
2631
2632         key = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_KEY);
2633         if (key == NULL) {
2634                 DEBUG_REQ(D_HA, req, "no set_info key");
2635                 RETURN(err_serious(-EFAULT));
2636         }
2637
2638         keylen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_KEY,
2639                                       RCL_CLIENT);
2640
2641         val = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_VAL);
2642         if (val == NULL) {
2643                 DEBUG_REQ(D_HA, req, "no set_info val");
2644                 RETURN(err_serious(-EFAULT));
2645         }
2646
2647         vallen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_VAL,
2648                                       RCL_CLIENT);
2649
2650         /* Swab any part of val you need to here */
2651         if (KEY_IS(KEY_READ_ONLY)) {
2652                 spin_lock(&req->rq_export->exp_lock);
2653                 if (*(__u32 *)val)
2654                         *exp_connect_flags_ptr(req->rq_export) |=
2655                                 OBD_CONNECT_RDONLY;
2656                 else
2657                         *exp_connect_flags_ptr(req->rq_export) &=
2658                                 ~OBD_CONNECT_RDONLY;
2659                 spin_unlock(&req->rq_export->exp_lock);
2660         } else if (KEY_IS(KEY_CHANGELOG_CLEAR)) {
2661                 struct changelog_setinfo *cs = val;
2662
2663                 if (vallen != sizeof(*cs)) {
2664                         CERROR("%s: bad changelog_clear setinfo size %d\n",
2665                                tgt_name(tsi->tsi_tgt), vallen);
2666                         RETURN(-EINVAL);
2667                 }
2668                 if (req_capsule_req_need_swab(&req->rq_pill)) {
2669                         __swab64s(&cs->cs_recno);
2670                         __swab32s(&cs->cs_id);
2671                 }
2672
2673                 if (!mdt_is_rootadmin(tsi2mdt_info(tsi)))
2674                         RETURN(-EACCES);
2675                 rc = mdt_iocontrol(OBD_IOC_CHANGELOG_CLEAR, req->rq_export,
2676                                    vallen, val, NULL);
2677         } else if (KEY_IS(KEY_EVICT_BY_NID)) {
2678                 if (vallen > 0)
2679                         obd_export_evict_by_nid(req->rq_export->exp_obd, val);
2680         } else {
2681                 RETURN(-EINVAL);
2682         }
2683         RETURN(rc);
2684 }
2685
2686 static int mdt_readpage(struct tgt_session_info *tsi)
2687 {
2688         struct mdt_thread_info  *info = mdt_th_info(tsi->tsi_env);
2689         struct mdt_object       *object = mdt_obj(tsi->tsi_corpus);
2690         struct lu_rdpg          *rdpg = &info->mti_u.rdpg.mti_rdpg;
2691         const struct mdt_body   *reqbody = tsi->tsi_mdt_body;
2692         struct mdt_body         *repbody;
2693         int                      rc;
2694         int                      i;
2695
2696         ENTRY;
2697
2698         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
2699                 RETURN(err_serious(-ENOMEM));
2700
2701         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_MDT_BODY);
2702         if (repbody == NULL || reqbody == NULL)
2703                 RETURN(err_serious(-EFAULT));
2704
2705         /*
2706          * prepare @rdpg before calling lower layers and transfer itself. Here
2707          * reqbody->size contains offset of where to start to read and
2708          * reqbody->nlink contains number bytes to read.
2709          */
2710         rdpg->rp_hash = reqbody->mbo_size;
2711         if (rdpg->rp_hash != reqbody->mbo_size) {
2712                 CERROR("Invalid hash: %#llx != %#llx\n",
2713                        rdpg->rp_hash, reqbody->mbo_size);
2714                 RETURN(-EFAULT);
2715         }
2716
2717         rdpg->rp_attrs = reqbody->mbo_mode;
2718         if (exp_connect_flags(tsi->tsi_exp) & OBD_CONNECT_64BITHASH)
2719                 rdpg->rp_attrs |= LUDA_64BITHASH;
2720         rdpg->rp_count  = min_t(unsigned int, reqbody->mbo_nlink,
2721                                 exp_max_brw_size(tsi->tsi_exp));
2722         rdpg->rp_npages = (rdpg->rp_count + PAGE_SIZE - 1) >>
2723                           PAGE_SHIFT;
2724         OBD_ALLOC_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages);
2725         if (rdpg->rp_pages == NULL)
2726                 RETURN(-ENOMEM);
2727
2728         for (i = 0; i < rdpg->rp_npages; ++i) {
2729                 rdpg->rp_pages[i] = alloc_page(GFP_NOFS);
2730                 if (rdpg->rp_pages[i] == NULL)
2731                         GOTO(free_rdpg, rc = -ENOMEM);
2732         }
2733
2734         /* call lower layers to fill allocated pages with directory data */
2735         rc = mo_readpage(tsi->tsi_env, mdt_object_child(object), rdpg);
2736         if (rc < 0)
2737                 GOTO(free_rdpg, rc);
2738
2739         /* send pages to client */
2740         rc = tgt_sendpage(tsi, rdpg, rc);
2741
2742         EXIT;
2743 free_rdpg:
2744
2745         for (i = 0; i < rdpg->rp_npages; i++)
2746                 if (rdpg->rp_pages[i] != NULL)
2747                         __free_page(rdpg->rp_pages[i]);
2748         OBD_FREE_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages);
2749
2750         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
2751                 RETURN(0);
2752
2753         return rc;
2754 }
2755
2756 static int mdt_fix_attr_ucred(struct mdt_thread_info *info, __u32 op)
2757 {
2758         struct lu_ucred *uc = mdt_ucred_check(info);
2759         struct lu_attr *attr = &info->mti_attr.ma_attr;
2760
2761         if (uc == NULL)
2762                 return -EINVAL;
2763
2764         if (op != REINT_SETATTR) {
2765                 if ((attr->la_valid & LA_UID) && (attr->la_uid != -1))
2766                         attr->la_uid = uc->uc_fsuid;
2767                 /* for S_ISGID, inherit gid from his parent, such work will be
2768                  * done in cmm/mdd layer, here set all cases as uc->uc_fsgid. */
2769                 if ((attr->la_valid & LA_GID) && (attr->la_gid != -1))
2770                         attr->la_gid = uc->uc_fsgid;
2771         }
2772
2773         return 0;
2774 }
2775
2776 static inline bool mdt_is_readonly_open(struct mdt_thread_info *info, __u32 op)
2777 {
2778         return op == REINT_OPEN &&
2779              !(info->mti_spec.sp_cr_flags & (MDS_FMODE_WRITE | MDS_OPEN_CREAT));
2780 }
2781
2782 static void mdt_preset_secctx_size(struct mdt_thread_info *info)
2783 {
2784         struct req_capsule *pill = info->mti_pill;
2785
2786         if (req_capsule_has_field(pill, &RMF_FILE_SECCTX,
2787                                   RCL_SERVER) &&
2788             req_capsule_has_field(pill, &RMF_FILE_SECCTX_NAME,
2789                                   RCL_CLIENT)) {
2790                 if (req_capsule_get_size(pill, &RMF_FILE_SECCTX_NAME,
2791                                          RCL_CLIENT) != 0)
2792                         /* pre-set size in server part with max size */
2793                         req_capsule_set_size(pill, &RMF_FILE_SECCTX,
2794                                              RCL_SERVER,
2795                                              OBD_MAX_DEFAULT_EA_SIZE);
2796                 else
2797                         req_capsule_set_size(pill, &RMF_FILE_SECCTX,
2798                                              RCL_SERVER, 0);
2799         }
2800 }
2801
2802 static void mdt_preset_encctx_size(struct mdt_thread_info *info)
2803 {
2804         struct req_capsule *pill = info->mti_pill;
2805
2806         if (req_capsule_has_field(pill, &RMF_FILE_ENCCTX,
2807                                   RCL_SERVER))
2808                 /* pre-set size in server part with max size */
2809                 req_capsule_set_size(pill, &RMF_FILE_ENCCTX,
2810                                      RCL_SERVER,
2811                                      info->mti_mdt->mdt_max_mdsize);
2812 }
2813
2814 static int mdt_reint_internal(struct mdt_thread_info *info,
2815                               struct mdt_lock_handle *lhc,
2816                               __u32 op)
2817 {
2818         struct req_capsule      *pill = info->mti_pill;
2819         struct mdt_body         *repbody;
2820         int                      rc = 0, rc2;
2821
2822         ENTRY;
2823
2824         rc = mdt_reint_unpack(info, op);
2825         if (rc != 0) {
2826                 CERROR("Can't unpack reint, rc %d\n", rc);
2827                 RETURN(err_serious(rc));
2828         }
2829
2830
2831         /* check if the file system is set to readonly. O_RDONLY open
2832          * is still allowed even the file system is set to readonly mode */
2833         if (mdt_rdonly(info->mti_exp) && !mdt_is_readonly_open(info, op))
2834                 RETURN(err_serious(-EROFS));
2835
2836         /* for replay (no_create) lmm is not needed, client has it already */
2837         if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
2838                 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
2839                                      DEF_REP_MD_SIZE);
2840
2841         /* llog cookies are always 0, the field is kept for compatibility */
2842         if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
2843                 req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, 0);
2844
2845         /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
2846          * by default. If the target object has more ACL entries, then
2847          * enlarge the buffer when necessary. */
2848         if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER))
2849                 req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
2850                                      LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
2851
2852         mdt_preset_secctx_size(info);
2853         mdt_preset_encctx_size(info);
2854
2855         rc = req_capsule_server_pack(pill);
2856         if (rc != 0) {
2857                 CERROR("Can't pack response, rc %d\n", rc);
2858                 RETURN(err_serious(rc));
2859         }
2860
2861         if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_SERVER)) {
2862                 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
2863                 LASSERT(repbody);
2864                 repbody->mbo_eadatasize = 0;
2865                 repbody->mbo_aclsize = 0;
2866         }
2867
2868         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_REINT_DELAY, 10);
2869
2870         /* for replay no cookkie / lmm need, because client have this already */
2871         if (info->mti_spec.no_create)
2872                 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
2873                         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, 0);
2874
2875         rc = mdt_init_ucred_reint(info);
2876         if (rc)
2877                 GOTO(out_shrink, rc);
2878
2879         rc = mdt_fix_attr_ucred(info, op);
2880         if (rc != 0)
2881                 GOTO(out_ucred, rc = err_serious(rc));
2882
2883         rc = mdt_check_resent(info, mdt_reconstruct, lhc);
2884         if (rc < 0) {
2885                 GOTO(out_ucred, rc);
2886         } else if (rc == 1) {
2887                 DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt");
2888                 rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg);
2889                 GOTO(out_ucred, rc);
2890         }
2891         rc = mdt_reint_rec(info, lhc);
2892         EXIT;
2893 out_ucred:
2894         mdt_exit_ucred(info);
2895 out_shrink:
2896         mdt_client_compatibility(info);
2897
2898         rc2 = mdt_fix_reply(info);
2899         if (rc == 0)
2900                 rc = rc2;
2901
2902         /*
2903          * Data-on-MDT optimization - read data along with OPEN and return it
2904          * in reply when possible.
2905          */
2906         if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req))
2907                 rc = mdt_dom_read_on_open(info, info->mti_mdt,
2908                                           &lhc->mlh_reg_lh);
2909
2910         return rc;
2911 }
2912
2913 static long mdt_reint_opcode(struct ptlrpc_request *req,
2914                              const struct req_format **fmt)
2915 {
2916         struct mdt_device       *mdt;
2917         struct mdt_rec_reint    *rec;
2918         long                     opc;
2919
2920         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
2921         if (rec != NULL) {
2922                 opc = rec->rr_opcode;
2923                 DEBUG_REQ(D_INODE, req, "reint opt = %ld", opc);
2924                 if (opc < REINT_MAX && fmt[opc] != NULL)
2925                         req_capsule_extend(&req->rq_pill, fmt[opc]);
2926                 else {
2927                         mdt = mdt_exp2dev(req->rq_export);
2928                         CERROR("%s: Unsupported opcode '%ld' from client '%s':"
2929                                " rc = %d\n", req->rq_export->exp_obd->obd_name,
2930                                opc, mdt->mdt_ldlm_client->cli_name, -EFAULT);
2931                         opc = err_serious(-EFAULT);
2932                 }
2933         } else {
2934                 opc = err_serious(-EFAULT);
2935         }
2936         return opc;
2937 }
2938
2939 static int mdt_reint(struct tgt_session_info *tsi)
2940 {
2941         long opc;
2942         int  rc;
2943         static const struct req_format *reint_fmts[REINT_MAX] = {
2944                 [REINT_SETATTR]  = &RQF_MDS_REINT_SETATTR,
2945                 [REINT_CREATE]   = &RQF_MDS_REINT_CREATE,
2946                 [REINT_LINK]     = &RQF_MDS_REINT_LINK,
2947                 [REINT_UNLINK]   = &RQF_MDS_REINT_UNLINK,
2948                 [REINT_RENAME]   = &RQF_MDS_REINT_RENAME,
2949                 [REINT_OPEN]     = &RQF_MDS_REINT_OPEN,
2950                 [REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR,
2951                 [REINT_RMENTRY]  = &RQF_MDS_REINT_UNLINK,
2952                 [REINT_MIGRATE]  = &RQF_MDS_REINT_MIGRATE,
2953                 [REINT_RESYNC]   = &RQF_MDS_REINT_RESYNC,
2954         };
2955
2956         ENTRY;
2957
2958         opc = mdt_reint_opcode(tgt_ses_req(tsi), reint_fmts);
2959         if (opc >= 0) {
2960                 struct mdt_thread_info *info = tsi2mdt_info(tsi);
2961                 /*
2962                  * No lock possible here from client to pass it to reint code
2963                  * path.
2964                  */
2965                 rc = mdt_reint_internal(info, NULL, opc);
2966                 mdt_thread_info_fini(info);
2967         } else {
2968                 rc = opc;
2969         }
2970
2971         tsi->tsi_reply_fail_id = OBD_FAIL_MDS_REINT_NET_REP;
2972         RETURN(rc);
2973 }
2974
2975 /* this should sync the whole device */
2976 int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt)
2977 {
2978         struct dt_device *dt = mdt->mdt_bottom;
2979         int rc;
2980         ENTRY;
2981
2982         rc = dt->dd_ops->dt_sync(env, dt);
2983         RETURN(rc);
2984 }
2985
2986 /* this should sync this object */
2987 static int mdt_object_sync(const struct lu_env *env, struct obd_export *exp,
2988                            struct mdt_object *mo)
2989 {
2990         int rc = 0;
2991
2992         ENTRY;
2993
2994         if (!mdt_object_exists(mo)) {
2995                 CWARN("%s: non existing object "DFID": rc = %d\n",
2996                       exp->exp_obd->obd_name, PFID(mdt_object_fid(mo)),
2997                       -ESTALE);
2998                 RETURN(-ESTALE);
2999         }
3000
3001         if (S_ISREG(lu_object_attr(&mo->mot_obj))) {
3002                 struct lu_target *tgt = tgt_ses_info(env)->tsi_tgt;
3003                 dt_obj_version_t version;
3004
3005                 version = dt_version_get(env, mdt_obj2dt(mo));
3006                 if (version > tgt->lut_obd->obd_last_committed)
3007                         rc = mo_object_sync(env, mdt_object_child(mo));
3008         } else {
3009                 rc = mo_object_sync(env, mdt_object_child(mo));
3010         }
3011
3012         RETURN(rc);
3013 }
3014
3015 static int mdt_sync(struct tgt_session_info *tsi)
3016 {
3017         struct ptlrpc_request   *req = tgt_ses_req(tsi);
3018         struct req_capsule      *pill = tsi->tsi_pill;
3019         struct mdt_body         *body;
3020         ktime_t                  kstart = ktime_get();
3021         int                      rc;
3022
3023         ENTRY;
3024
3025         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
3026                 RETURN(err_serious(-ENOMEM));
3027
3028         if (fid_seq(&tsi->tsi_mdt_body->mbo_fid1) == 0) {
3029                 rc = mdt_device_sync(tsi->tsi_env, mdt_exp2dev(tsi->tsi_exp));
3030         } else {
3031                 struct mdt_thread_info *info = tsi2mdt_info(tsi);
3032
3033                 if (unlikely(info->mti_object == NULL))
3034                         RETURN(-EPROTO);
3035
3036                 /* sync an object */
3037                 rc = mdt_object_sync(tsi->tsi_env, tsi->tsi_exp,
3038                                      info->mti_object);
3039                 if (rc == 0) {
3040                         const struct lu_fid *fid;
3041                         struct lu_attr *la = &info->mti_attr.ma_attr;
3042
3043                         info->mti_attr.ma_need = MA_INODE;
3044                         info->mti_attr.ma_valid = 0;
3045                         rc = mdt_attr_get_complex(info, info->mti_object,
3046                                                   &info->mti_attr);
3047                         if (rc == 0) {
3048                                 body = req_capsule_server_get(pill,
3049                                                               &RMF_MDT_BODY);
3050                                 fid = mdt_object_fid(info->mti_object);
3051                                 mdt_pack_attr2body(info, body, la, fid);
3052                         }
3053                 }
3054                 mdt_thread_info_fini(info);
3055         }
3056         if (rc == 0)
3057                 mdt_counter_incr(req, LPROC_MDT_SYNC,
3058                                  ktime_us_delta(ktime_get(), kstart));
3059
3060         RETURN(rc);
3061 }
3062
3063 static int mdt_data_sync(struct tgt_session_info *tsi)
3064 {
3065         struct mdt_thread_info *info;
3066         struct mdt_device *mdt = mdt_exp2dev(tsi->tsi_exp);
3067         struct ost_body *body = tsi->tsi_ost_body;
3068         struct ost_body *repbody;
3069         struct mdt_object *mo = NULL;
3070         struct md_attr *ma;
3071         int rc = 0;
3072
3073         ENTRY;
3074
3075         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
3076
3077         /* if no fid is specified then do nothing,
3078          * device sync is done via MDS_SYNC */
3079         if (fid_is_zero(&tsi->tsi_fid))
3080                 RETURN(0);
3081
3082         mo = mdt_object_find(tsi->tsi_env, mdt, &tsi->tsi_fid);
3083         if (IS_ERR(mo))
3084                 RETURN(PTR_ERR(mo));
3085
3086         rc = mdt_object_sync(tsi->tsi_env, tsi->tsi_exp, mo);
3087         if (rc)
3088                 GOTO(put, rc);
3089
3090         repbody->oa.o_oi = body->oa.o_oi;
3091         repbody->oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
3092
3093         info = tsi2mdt_info(tsi);
3094         ma = &info->mti_attr;
3095         ma->ma_need = MA_INODE;
3096         ma->ma_valid = 0;
3097         rc = mdt_attr_get_complex(info, mo, ma);
3098         if (rc == 0)
3099                 obdo_from_la(&repbody->oa, &ma->ma_attr, VALID_FLAGS);
3100         else
3101                 rc = 0;
3102         mdt_thread_info_fini(info);
3103
3104         EXIT;
3105 put:
3106         if (mo != NULL)
3107                 mdt_object_put(tsi->tsi_env, mo);
3108         return rc;
3109 }
3110
3111 /*
3112  * Handle quota control requests to consult current usage/limit, but also
3113  * to configure quota enforcement
3114  */
3115 static int mdt_quotactl(struct tgt_session_info *tsi)
3116 {
3117         struct obd_export *exp  = tsi->tsi_exp;
3118         struct req_capsule *pill = tsi->tsi_pill;
3119         struct obd_quotactl *oqctl, *repoqc;
3120         int id, rc;
3121         struct mdt_device *mdt = mdt_exp2dev(exp);
3122         struct lu_device *qmt = mdt->mdt_qmt_dev;
3123         struct lu_nodemap *nodemap;
3124         ENTRY;
3125
3126         oqctl = req_capsule_client_get(pill, &RMF_OBD_QUOTACTL);
3127         if (!oqctl)
3128                 RETURN(err_serious(-EPROTO));
3129
3130         rc = req_capsule_server_pack(pill);
3131         if (rc)
3132                 RETURN(err_serious(rc));
3133
3134         nodemap = nodemap_get_from_exp(exp);
3135         if (IS_ERR(nodemap))
3136                 RETURN(PTR_ERR(nodemap));
3137
3138         switch (oqctl->qc_cmd) {
3139                 /* master quotactl */
3140         case Q_SETINFO:
3141         case Q_SETQUOTA:
3142         case LUSTRE_Q_SETDEFAULT:
3143         case LUSTRE_Q_SETQUOTAPOOL:
3144         case LUSTRE_Q_SETINFOPOOL:
3145         case LUSTRE_Q_SETDEFAULT_POOL:
3146                 if (!nodemap_can_setquota(nodemap))
3147                         GOTO(out_nodemap, rc = -EPERM);
3148                 /* fallthrough */
3149         case Q_GETINFO:
3150         case Q_GETQUOTA: