Whamcloud - gitweb
LU-14565 ofd: Do not rely on tgd_blockbit
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2010, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mdt/mdt_handler.c
32  *
33  * Lustre Metadata Target (mdt) request handler
34  *
35  * Author: Peter Braam <braam@clusterfs.com>
36  * Author: Andreas Dilger <adilger@clusterfs.com>
37  * Author: Phil Schwan <phil@clusterfs.com>
38  * Author: Mike Shaver <shaver@clusterfs.com>
39  * Author: Nikita Danilov <nikita@clusterfs.com>
40  * Author: Huang Hua <huanghua@clusterfs.com>
41  * Author: Yury Umanets <umka@clusterfs.com>
42  */
43
44 #define DEBUG_SUBSYSTEM S_MDS
45
46 #include <linux/module.h>
47 #include <linux/pagemap.h>
48
49 #include <dt_object.h>
50 #include <lustre_acl.h>
51 #include <lustre_export.h>
52 #include <uapi/linux/lustre/lustre_ioctl.h>
53 #include <lustre_lfsck.h>
54 #include <lustre_log.h>
55 #include <lustre_nodemap.h>
56 #include <lustre_mds.h>
57 #include <uapi/linux/lustre/lustre_param.h>
58 #include <lustre_quota.h>
59 #include <lustre_swab.h>
60 #include <lustre_lmv.h>
61 #include <obd.h>
62 #include <obd_support.h>
63 #include <lustre_barrier.h>
64 #include <obd_cksum.h>
65 #include <llog_swab.h>
66
67 #include "mdt_internal.h"
68
69 static unsigned int max_mod_rpcs_per_client = 8;
70 module_param(max_mod_rpcs_per_client, uint, 0644);
71 MODULE_PARM_DESC(max_mod_rpcs_per_client, "maximum number of modify RPCs in flight allowed per client");
72
73 mdl_mode_t mdt_mdl_lock_modes[] = {
74         [LCK_MINMODE] = MDL_MINMODE,
75         [LCK_EX]      = MDL_EX,
76         [LCK_PW]      = MDL_PW,
77         [LCK_PR]      = MDL_PR,
78         [LCK_CW]      = MDL_CW,
79         [LCK_CR]      = MDL_CR,
80         [LCK_NL]      = MDL_NL,
81         [LCK_GROUP]   = MDL_GROUP
82 };
83
84 enum ldlm_mode mdt_dlm_lock_modes[] = {
85         [MDL_MINMODE]   = LCK_MINMODE,
86         [MDL_EX]        = LCK_EX,
87         [MDL_PW]        = LCK_PW,
88         [MDL_PR]        = LCK_PR,
89         [MDL_CW]        = LCK_CW,
90         [MDL_CR]        = LCK_CR,
91         [MDL_NL]        = LCK_NL,
92         [MDL_GROUP]     = LCK_GROUP
93 };
94
95 static struct mdt_device *mdt_dev(struct lu_device *d);
96
97 static const struct lu_object_operations mdt_obj_ops;
98
99 /* Slab for MDT object allocation */
100 static struct kmem_cache *mdt_object_kmem;
101
102 /* For HSM restore handles */
103 struct kmem_cache *mdt_hsm_cdt_kmem;
104
105 /* For HSM request handles */
106 struct kmem_cache *mdt_hsm_car_kmem;
107
108 static struct lu_kmem_descr mdt_caches[] = {
109         {
110                 .ckd_cache = &mdt_object_kmem,
111                 .ckd_name  = "mdt_obj",
112                 .ckd_size  = sizeof(struct mdt_object)
113         },
114         {
115                 .ckd_cache      = &mdt_hsm_cdt_kmem,
116                 .ckd_name       = "mdt_cdt_restore_handle",
117                 .ckd_size       = sizeof(struct cdt_restore_handle)
118         },
119         {
120                 .ckd_cache      = &mdt_hsm_car_kmem,
121                 .ckd_name       = "mdt_cdt_agent_req",
122                 .ckd_size       = sizeof(struct cdt_agent_req)
123         },
124         {
125                 .ckd_cache = NULL
126         }
127 };
128
129 __u64 mdt_get_disposition(struct ldlm_reply *rep, __u64 op_flag)
130 {
131         if (!rep)
132                 return 0;
133         return rep->lock_policy_res1 & op_flag;
134 }
135
136 void mdt_clear_disposition(struct mdt_thread_info *info,
137                            struct ldlm_reply *rep, __u64 op_flag)
138 {
139         if (info) {
140                 info->mti_opdata &= ~op_flag;
141                 tgt_opdata_clear(info->mti_env, op_flag);
142         }
143         if (rep)
144                 rep->lock_policy_res1 &= ~op_flag;
145 }
146
147 void mdt_set_disposition(struct mdt_thread_info *info,
148                          struct ldlm_reply *rep, __u64 op_flag)
149 {
150         if (info) {
151                 info->mti_opdata |= op_flag;
152                 tgt_opdata_set(info->mti_env, op_flag);
153         }
154         if (rep)
155                 rep->lock_policy_res1 |= op_flag;
156 }
157
158 void mdt_lock_reg_init(struct mdt_lock_handle *lh, enum ldlm_mode lm)
159 {
160         lh->mlh_pdo_hash = 0;
161         lh->mlh_reg_mode = lm;
162         lh->mlh_rreg_mode = lm;
163         lh->mlh_type = MDT_REG_LOCK;
164 }
165
166 void mdt_lh_reg_init(struct mdt_lock_handle *lh, struct ldlm_lock *lock)
167 {
168         mdt_lock_reg_init(lh, lock->l_req_mode);
169         if (lock->l_req_mode == LCK_GROUP)
170                 lh->mlh_gid = lock->l_policy_data.l_inodebits.li_gid;
171 }
172
173 void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode,
174                        const struct lu_name *lname)
175 {
176         lh->mlh_reg_mode = lock_mode;
177         lh->mlh_pdo_mode = LCK_MINMODE;
178         lh->mlh_rreg_mode = lock_mode;
179         lh->mlh_type = MDT_PDO_LOCK;
180
181         if (lu_name_is_valid(lname)) {
182                 lh->mlh_pdo_hash = ll_full_name_hash(NULL, lname->ln_name,
183                                                      lname->ln_namelen);
184                 /* XXX Workaround for LU-2856
185                  *
186                  * Zero is a valid return value of full_name_hash, but
187                  * several users of mlh_pdo_hash assume a non-zero
188                  * hash value. We therefore map zero onto an
189                  * arbitrary, but consistent value (1) to avoid
190                  * problems further down the road. */
191                 if (unlikely(lh->mlh_pdo_hash == 0))
192                         lh->mlh_pdo_hash = 1;
193         } else {
194                 lh->mlh_pdo_hash = 0;
195         }
196 }
197
198 static void mdt_lock_pdo_mode(struct mdt_thread_info *info, struct mdt_object *o,
199                               struct mdt_lock_handle *lh)
200 {
201         mdl_mode_t mode;
202         ENTRY;
203
204         /*
205          * Any dir access needs couple of locks:
206          *
207          * 1) on part of dir we gonna take lookup/modify;
208          *
209          * 2) on whole dir to protect it from concurrent splitting and/or to
210          * flush client's cache for readdir().
211          *
212          * so, for a given mode and object this routine decides what lock mode
213          * to use for lock #2:
214          *
215          * 1) if caller's gonna lookup in dir then we need to protect dir from
216          * being splitted only - LCK_CR
217          *
218          * 2) if caller's gonna modify dir then we need to protect dir from
219          * being splitted and to flush cache - LCK_CW
220          *
221          * 3) if caller's gonna modify dir and that dir seems ready for
222          * splitting then we need to protect it from any type of access
223          * (lookup/modify/split) - LCK_EX --bzzz
224          */
225
226         LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
227         LASSERT(lh->mlh_pdo_mode == LCK_MINMODE);
228
229         /*
230          * Ask underlaying level its opinion about preferable PDO lock mode
231          * having access type passed as regular lock mode:
232          *
233          * - MDL_MINMODE means that lower layer does not want to specify lock
234          * mode;
235          *
236          * - MDL_NL means that no PDO lock should be taken. This is used in some
237          * cases. Say, for non-splittable directories no need to use PDO locks
238          * at all.
239          */
240         mode = mdo_lock_mode(info->mti_env, mdt_object_child(o),
241                              mdt_dlm_mode2mdl_mode(lh->mlh_reg_mode));
242
243         if (mode != MDL_MINMODE) {
244                 lh->mlh_pdo_mode = mdt_mdl_mode2dlm_mode(mode);
245         } else {
246                 /*
247                  * Lower layer does not want to specify locking mode. We do it
248                  * our selves. No special protection is needed, just flush
249                  * client's cache on modification and allow concurrent
250                  * mondification.
251                  */
252                 switch (lh->mlh_reg_mode) {
253                 case LCK_EX:
254                         lh->mlh_pdo_mode = LCK_EX;
255                         break;
256                 case LCK_PR:
257                         lh->mlh_pdo_mode = LCK_CR;
258                         break;
259                 case LCK_PW:
260                         lh->mlh_pdo_mode = LCK_CW;
261                         break;
262                 default:
263                         CERROR("Not expected lock type (0x%x)\n",
264                                (int)lh->mlh_reg_mode);
265                         LBUG();
266                 }
267         }
268
269         LASSERT(lh->mlh_pdo_mode != LCK_MINMODE);
270         EXIT;
271 }
272
273 static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
274                               struct lu_fid *fid)
275 {
276         struct mdt_device *mdt = info->mti_mdt;
277         struct lu_name *lname = &info->mti_name;
278         const char *start = fileset;
279         char *filename = info->mti_filename;
280         struct mdt_object *parent;
281         u32 mode;
282         int rc = 0;
283
284         LASSERT(!info->mti_cross_ref);
285
286         /*
287          * We may want to allow this to mount a completely separate
288          * fileset from the MDT in the future, but keeping it to
289          * ROOT/ only for now avoid potential security issues.
290          */
291         *fid = mdt->mdt_md_root_fid;
292
293         while (rc == 0 && start != NULL && *start != '\0') {
294                 const char *s1 = start;
295                 const char *s2;
296
297                 while (*++s1 == '/')
298                         ;
299                 s2 = s1;
300                 while (*s2 != '/' && *s2 != '\0')
301                         s2++;
302
303                 if (s2 == s1)
304                         break;
305
306                 start = s2;
307
308                 lname->ln_namelen = s2 - s1;
309                 if (lname->ln_namelen > NAME_MAX) {
310                         rc = -EINVAL;
311                         break;
312                 }
313
314                 /* reject .. as a path component */
315                 if (lname->ln_namelen == 2 &&
316                     strncmp(s1, "..", 2) == 0) {
317                         rc = -EINVAL;
318                         break;
319                 }
320
321                 strncpy(filename, s1, lname->ln_namelen);
322                 filename[lname->ln_namelen] = '\0';
323                 lname->ln_name = filename;
324
325                 parent = mdt_object_find(info->mti_env, mdt, fid);
326                 if (IS_ERR(parent)) {
327                         rc = PTR_ERR(parent);
328                         break;
329                 }
330                 /* Only got the fid of this obj by name */
331                 fid_zero(fid);
332                 rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname,
333                                 fid, &info->mti_spec);
334                 mdt_object_put(info->mti_env, parent);
335         }
336         if (!rc) {
337                 parent = mdt_object_find(info->mti_env, mdt, fid);
338                 if (IS_ERR(parent))
339                         rc = PTR_ERR(parent);
340                 else {
341                         mode = lu_object_attr(&parent->mot_obj);
342                         if (!S_ISDIR(mode)) {
343                                 rc = -ENOTDIR;
344                         } else if (mdt_is_remote_object(info, parent, parent)) {
345                                 if (!mdt->mdt_enable_remote_subdir_mount) {
346                                         rc = -EREMOTE;
347                                         LCONSOLE_WARN("%s: subdir mount '%s' refused because 'enable_remote_subdir_mount=0': rc = %d\n",
348                                                       mdt_obd_name(mdt),
349                                                       fileset, rc);
350                                 } else {
351                                         LCONSOLE_INFO("%s: subdir mount '%s' is remote and may be slow\n",
352                                                       mdt_obd_name(mdt),
353                                                       fileset);
354                                 }
355                         }
356                         mdt_object_put(info->mti_env, parent);
357                 }
358         }
359
360         return rc;
361 }
362
363 static int mdt_get_root(struct tgt_session_info *tsi)
364 {
365         struct mdt_thread_info  *info = tsi2mdt_info(tsi);
366         struct mdt_device       *mdt = info->mti_mdt;
367         struct mdt_body         *repbody;
368         char                    *fileset = NULL, *buffer = NULL;
369         int                      rc;
370         struct obd_export       *exp = info->mti_exp;
371         char                    *nodemap_fileset;
372
373         ENTRY;
374
375         rc = mdt_check_ucred(info);
376         if (rc)
377                 GOTO(out, rc = err_serious(rc));
378
379         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GET_ROOT_PACK))
380                 GOTO(out, rc = err_serious(-ENOMEM));
381
382         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
383         if (req_capsule_get_size(info->mti_pill, &RMF_NAME, RCL_CLIENT) > 0) {
384                 fileset = req_capsule_client_get(info->mti_pill, &RMF_NAME);
385                 if (fileset == NULL)
386                         GOTO(out, rc = err_serious(-EFAULT));
387         }
388
389         nodemap_fileset = nodemap_get_fileset(exp->exp_target_data.ted_nodemap);
390         if (nodemap_fileset && nodemap_fileset[0]) {
391                 CDEBUG(D_INFO, "nodemap fileset is %s\n", nodemap_fileset);
392                 if (fileset) {
393                         /* consider fileset from client as a sub-fileset
394                          * of the nodemap one */
395                         OBD_ALLOC(buffer, PATH_MAX + 1);
396                         if (buffer == NULL)
397                                 GOTO(out, rc = err_serious(-ENOMEM));
398                         if (snprintf(buffer, PATH_MAX + 1, "%s/%s",
399                                      nodemap_fileset, fileset) >= PATH_MAX + 1)
400                                 GOTO(out, rc = err_serious(-EINVAL));
401                         fileset = buffer;
402                 } else {
403                         /* enforce fileset as specified in the nodemap */
404                         fileset = nodemap_fileset;
405                 }
406         }
407
408         if (fileset) {
409                 CDEBUG(D_INFO, "Getting fileset %s\n", fileset);
410                 rc = mdt_lookup_fileset(info, fileset, &repbody->mbo_fid1);
411                 if (rc < 0)
412                         GOTO(out, rc = err_serious(rc));
413         } else {
414                 repbody->mbo_fid1 = mdt->mdt_md_root_fid;
415         }
416         repbody->mbo_valid |= OBD_MD_FLID;
417
418         EXIT;
419 out:
420         mdt_thread_info_fini(info);
421         if (buffer)
422                 OBD_FREE(buffer, PATH_MAX+1);
423         return rc;
424 }
425
426 static int mdt_statfs(struct tgt_session_info *tsi)
427 {
428         struct ptlrpc_request *req = tgt_ses_req(tsi);
429         struct mdt_thread_info *info = tsi2mdt_info(tsi);
430         struct mdt_device *mdt = info->mti_mdt;
431         struct tg_grants_data *tgd = &mdt->mdt_lut.lut_tgd;
432         struct md_device *next = mdt->mdt_child;
433         struct ptlrpc_service_part *svcpt;
434         struct obd_statfs *osfs;
435         struct mdt_body *reqbody = NULL;
436         struct mdt_statfs_cache *msf;
437         ktime_t kstart = ktime_get();
438         int current_blockbits;
439         int rc;
440
441         ENTRY;
442
443         svcpt = req->rq_rqbd->rqbd_svcpt;
444
445         /* This will trigger a watchdog timeout */
446         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
447                          (MDT_SERVICE_WATCHDOG_FACTOR *
448                           at_get(&svcpt->scp_at_estimate)) + 1);
449
450         rc = mdt_check_ucred(info);
451         if (rc)
452                 GOTO(out, rc = err_serious(rc));
453
454         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK))
455                 GOTO(out, rc = err_serious(-ENOMEM));
456
457         osfs = req_capsule_server_get(info->mti_pill, &RMF_OBD_STATFS);
458         if (!osfs)
459                 GOTO(out, rc = -EPROTO);
460
461         if (mdt_is_sum_statfs_client(req->rq_export) &&
462                 lustre_packed_msg_size(req->rq_reqmsg) ==
463                 req_capsule_fmt_size(req->rq_reqmsg->lm_magic,
464                                      &RQF_MDS_STATFS_NEW, RCL_CLIENT)) {
465                 req_capsule_extend(info->mti_pill, &RQF_MDS_STATFS_NEW);
466                 reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
467         }
468
469         if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS)
470                 msf = &mdt->mdt_sum_osfs;
471         else
472                 msf = &mdt->mdt_osfs;
473
474         if (msf->msf_age + OBD_STATFS_CACHE_SECONDS <= ktime_get_seconds()) {
475                         /** statfs data is too old, get up-to-date one */
476                         if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS)
477                                 rc = next->md_ops->mdo_statfs(info->mti_env,
478                                                               next, osfs);
479                         else
480                                 rc = dt_statfs(info->mti_env, mdt->mdt_bottom,
481                                                osfs);
482                         if (rc)
483                                 GOTO(out, rc);
484                         spin_lock(&mdt->mdt_lock);
485                         msf->msf_osfs = *osfs;
486                         msf->msf_age = ktime_get_seconds();
487                         spin_unlock(&mdt->mdt_lock);
488         } else {
489                         /** use cached statfs data */
490                         spin_lock(&mdt->mdt_lock);
491                         *osfs = msf->msf_osfs;
492                         spin_unlock(&mdt->mdt_lock);
493         }
494
495         /* tgd_blockbit is recordsize bits set during mkfs.
496          * This once set does not change. However, 'zfs set'
497          * can be used to change the MDT blocksize. Instead
498          * of using cached value of 'tgd_blockbit' always
499          * calculate the blocksize bits which may have
500          * changed.
501          */
502         current_blockbits = fls64(osfs->os_bsize) - 1;
503
504         /* at least try to account for cached pages.  its still racy and
505          * might be under-reporting if clients haven't announced their
506          * caches with brw recently */
507         CDEBUG(D_SUPER | D_CACHE, "blocks cached %llu granted %llu"
508                " pending %llu free %llu avail %llu\n",
509                tgd->tgd_tot_dirty, tgd->tgd_tot_granted,
510                tgd->tgd_tot_pending,
511                osfs->os_bfree << current_blockbits,
512                osfs->os_bavail << current_blockbits);
513
514         osfs->os_bavail -= min_t(u64, osfs->os_bavail,
515                                  ((tgd->tgd_tot_dirty + tgd->tgd_tot_pending +
516                                    osfs->os_bsize - 1) >> current_blockbits));
517
518         tgt_grant_sanity_check(mdt->mdt_lu_dev.ld_obd, __func__);
519         CDEBUG(D_CACHE, "%llu blocks: %llu free, %llu avail; "
520                "%llu objects: %llu free; state %x\n",
521                osfs->os_blocks, osfs->os_bfree, osfs->os_bavail,
522                osfs->os_files, osfs->os_ffree, osfs->os_state);
523
524         if (!exp_grant_param_supp(tsi->tsi_exp) &&
525             current_blockbits > COMPAT_BSIZE_SHIFT) {
526                 /* clients which don't support OBD_CONNECT_GRANT_PARAM
527                  * should not see a block size > page size, otherwise
528                  * cl_lost_grant goes mad. Therefore, we emulate a 4KB (=2^12)
529                  * block size which is the biggest block size known to work
530                  * with all client's page size. */
531                 osfs->os_blocks <<= current_blockbits - COMPAT_BSIZE_SHIFT;
532                 osfs->os_bfree  <<= current_blockbits - COMPAT_BSIZE_SHIFT;
533                 osfs->os_bavail <<= current_blockbits - COMPAT_BSIZE_SHIFT;
534                 osfs->os_bsize = 1 << COMPAT_BSIZE_SHIFT;
535         }
536         if (rc == 0)
537                 mdt_counter_incr(req, LPROC_MDT_STATFS,
538                                  ktime_us_delta(ktime_get(), kstart));
539 out:
540         mdt_thread_info_fini(info);
541         RETURN(rc);
542 }
543
544 __u32 mdt_lmm_dom_entry_check(struct lov_mds_md *lmm, int *is_dom_only)
545 {
546         struct lov_comp_md_v1 *comp_v1;
547         struct lov_mds_md *v1;
548         __u32 off;
549         __u32 dom_stripesize = 0;
550         int i;
551         bool has_ost_stripes = false;
552
553         ENTRY;
554
555         if (is_dom_only)
556                 *is_dom_only = 0;
557
558         if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_COMP_V1)
559                 RETURN(0);
560
561         comp_v1 = (struct lov_comp_md_v1 *)lmm;
562         off = le32_to_cpu(comp_v1->lcm_entries[0].lcme_offset);
563         v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
564
565         /* Fast check for DoM entry with no mirroring, should be the first */
566         if (le16_to_cpu(comp_v1->lcm_mirror_count) == 0 &&
567             lov_pattern(le32_to_cpu(v1->lmm_pattern)) != LOV_PATTERN_MDT)
568                 RETURN(0);
569
570         /* check all entries otherwise */
571         for (i = 0; i < le16_to_cpu(comp_v1->lcm_entry_count); i++) {
572                 struct lov_comp_md_entry_v1 *lcme;
573
574                 lcme = &comp_v1->lcm_entries[i];
575                 if (!(le32_to_cpu(lcme->lcme_flags) & LCME_FL_INIT))
576                         continue;
577
578                 off = le32_to_cpu(lcme->lcme_offset);
579                 v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
580
581                 if (lov_pattern(le32_to_cpu(v1->lmm_pattern)) ==
582                     LOV_PATTERN_MDT)
583                         dom_stripesize = le32_to_cpu(v1->lmm_stripe_size);
584                 else
585                         has_ost_stripes = true;
586
587                 if (dom_stripesize && has_ost_stripes)
588                         RETURN(dom_stripesize);
589         }
590         /* DoM-only case exits here */
591         if (is_dom_only && dom_stripesize)
592                 *is_dom_only = 1;
593         RETURN(dom_stripesize);
594 }
595
596 /**
597  * Pack size attributes into the reply.
598  */
599 int mdt_pack_size2body(struct mdt_thread_info *info,
600                         const struct lu_fid *fid, struct lustre_handle *lh)
601 {
602         struct mdt_body *b;
603         struct md_attr *ma = &info->mti_attr;
604         __u32 dom_stripe;
605         bool dom_lock = false;
606
607         ENTRY;
608
609         LASSERT(ma->ma_attr.la_valid & LA_MODE);
610
611         if (!S_ISREG(ma->ma_attr.la_mode) ||
612             !(ma->ma_valid & MA_LOV && ma->ma_lmm != NULL))
613                 RETURN(-ENODATA);
614
615         dom_stripe = mdt_lmm_dom_stripesize(ma->ma_lmm);
616         /* no DoM stripe, no size in reply */
617         if (!dom_stripe)
618                 RETURN(-ENOENT);
619
620         if (lustre_handle_is_used(lh)) {
621                 struct ldlm_lock *lock;
622
623                 lock = ldlm_handle2lock(lh);
624                 if (lock != NULL) {
625                         dom_lock = ldlm_has_dom(lock);
626                         LDLM_LOCK_PUT(lock);
627                 }
628         }
629
630         /* no DoM lock, no size in reply */
631         if (!dom_lock)
632                 RETURN(0);
633
634         /* Either DoM lock exists or LMM has only DoM stripe then
635          * return size on body. */
636         b = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
637
638         mdt_dom_object_size(info->mti_env, info->mti_mdt, fid, b, dom_lock);
639         RETURN(0);
640 }
641
642 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
643 /*
644  * Pack ACL data into the reply. UIDs/GIDs are mapped and filtered by nodemap.
645  *
646  * \param       info    thread info object
647  * \param       repbody reply to pack ACLs into
648  * \param       o       mdt object of file to examine
649  * \param       nodemap nodemap of client to reply to
650  * \retval      0       success
651  * \retval      -errno  error getting or parsing ACL from disk
652  */
653 int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody,
654                       struct mdt_object *o, struct lu_nodemap *nodemap)
655 {
656         const struct lu_env     *env = info->mti_env;
657         struct md_object        *next = mdt_object_child(o);
658         struct lu_buf           *buf = &info->mti_buf;
659         struct mdt_device       *mdt = info->mti_mdt;
660         struct req_capsule *pill = info->mti_pill;
661         int rc;
662
663         ENTRY;
664
665         buf->lb_buf = req_capsule_server_get(pill, &RMF_ACL);
666         buf->lb_len = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER);
667         if (buf->lb_len == 0)
668                 RETURN(0);
669
670         LASSERT(!info->mti_big_acl_used);
671 again:
672         rc = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_ACCESS);
673         if (rc < 0) {
674                 if (rc == -ENODATA) {
675                         repbody->mbo_aclsize = 0;
676                         repbody->mbo_valid |= OBD_MD_FLACL;
677                         rc = 0;
678                 } else if (rc == -EOPNOTSUPP) {
679                         rc = 0;
680                 } else if (rc == -ERANGE) {
681                         if (exp_connect_large_acl(info->mti_exp) &&
682                             !info->mti_big_acl_used) {
683                                 if (info->mti_big_acl == NULL) {
684                                         info->mti_big_aclsize =
685                                                         min_t(unsigned int,
686                                                               mdt->mdt_max_ea_size,
687                                                               XATTR_SIZE_MAX);
688                                         OBD_ALLOC_LARGE(info->mti_big_acl,
689                                                         info->mti_big_aclsize);
690                                         if (info->mti_big_acl == NULL) {
691                                                 info->mti_big_aclsize = 0;
692                                                 CERROR("%s: unable to grow "
693                                                        DFID" ACL buffer\n",
694                                                        mdt_obd_name(mdt),
695                                                        PFID(mdt_object_fid(o)));
696                                                 RETURN(-ENOMEM);
697                                         }
698                                 }
699
700                                 CDEBUG(D_INODE, "%s: grow the "DFID
701                                        " ACL buffer to size %d\n",
702                                        mdt_obd_name(mdt),
703                                        PFID(mdt_object_fid(o)),
704                                        info->mti_big_aclsize);
705
706                                 buf->lb_buf = info->mti_big_acl;
707                                 buf->lb_len = info->mti_big_aclsize;
708                                 info->mti_big_acl_used = 1;
709                                 goto again;
710                         }
711                         /* FS has ACL bigger that our limits */
712                         CDEBUG(D_INODE, "%s: "DFID" ACL can't fit into %d\n",
713                                mdt_obd_name(mdt), PFID(mdt_object_fid(o)),
714                                info->mti_big_aclsize);
715                         rc = -E2BIG;
716                 } else {
717                         CERROR("%s: unable to read "DFID" ACL: rc = %d\n",
718                                mdt_obd_name(mdt), PFID(mdt_object_fid(o)), rc);
719                 }
720         } else {
721                 rc = nodemap_map_acl(nodemap, buf->lb_buf,
722                                      rc, NODEMAP_FS_TO_CLIENT);
723                 /* if all ACLs mapped out, rc is still >= 0 */
724                 if (rc < 0) {
725                         CERROR("%s: nodemap_map_acl unable to parse "DFID
726                                " ACL: rc = %d\n", mdt_obd_name(mdt),
727                                PFID(mdt_object_fid(o)), rc);
728                         repbody->mbo_aclsize = 0;
729                         repbody->mbo_valid &= ~OBD_MD_FLACL;
730                 } else {
731                         repbody->mbo_aclsize = rc;
732                         repbody->mbo_valid |= OBD_MD_FLACL;
733                         rc = 0;
734                 }
735         }
736
737         RETURN(rc);
738 }
739 #endif
740
741 /* XXX Look into layout in MDT layer. */
742 static inline bool mdt_hsm_is_released(struct lov_mds_md *lmm)
743 {
744         struct lov_comp_md_v1   *comp_v1;
745         struct lov_mds_md       *v1;
746         int                      i;
747
748         if (lmm->lmm_magic == LOV_MAGIC_COMP_V1) {
749                 comp_v1 = (struct lov_comp_md_v1 *)lmm;
750
751                 for (i = 0; i < comp_v1->lcm_entry_count; i++) {
752                         v1 = (struct lov_mds_md *)((char *)comp_v1 +
753                                 comp_v1->lcm_entries[i].lcme_offset);
754                         /* We don't support partial release for now */
755                         if (!(v1->lmm_pattern & LOV_PATTERN_F_RELEASED))
756                                 return false;
757                 }
758                 return true;
759         } else {
760                 return (lmm->lmm_pattern & LOV_PATTERN_F_RELEASED) ?
761                         true : false;
762         }
763 }
764
765 void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
766                         const struct lu_attr *attr, const struct lu_fid *fid)
767 {
768         struct md_attr *ma = &info->mti_attr;
769         struct obd_export *exp = info->mti_exp;
770         struct lu_nodemap *nodemap = NULL;
771
772         LASSERT(ma->ma_valid & MA_INODE);
773
774         if (attr->la_valid & LA_ATIME) {
775                 b->mbo_atime = attr->la_atime;
776                 b->mbo_valid |= OBD_MD_FLATIME;
777         }
778         if (attr->la_valid & LA_MTIME) {
779                 b->mbo_mtime = attr->la_mtime;
780                 b->mbo_valid |= OBD_MD_FLMTIME;
781         }
782         if (attr->la_valid & LA_CTIME) {
783                 b->mbo_ctime = attr->la_ctime;
784                 b->mbo_valid |= OBD_MD_FLCTIME;
785         }
786         if (attr->la_valid & LA_BTIME) {
787                 b->mbo_btime = attr->la_btime;
788                 b->mbo_valid |= OBD_MD_FLBTIME;
789         }
790         if (attr->la_valid & LA_FLAGS) {
791                 b->mbo_flags = attr->la_flags;
792                 b->mbo_valid |= OBD_MD_FLFLAGS;
793         }
794         if (attr->la_valid & LA_NLINK) {
795                 b->mbo_nlink = attr->la_nlink;
796                 b->mbo_valid |= OBD_MD_FLNLINK;
797         }
798         if (attr->la_valid & (LA_UID|LA_GID)) {
799                 nodemap = nodemap_get_from_exp(exp);
800                 if (IS_ERR(nodemap))
801                         goto out;
802         }
803         if (attr->la_valid & LA_UID) {
804                 b->mbo_uid = nodemap_map_id(nodemap, NODEMAP_UID,
805                                             NODEMAP_FS_TO_CLIENT,
806                                             attr->la_uid);
807                 b->mbo_valid |= OBD_MD_FLUID;
808         }
809         if (attr->la_valid & LA_GID) {
810                 b->mbo_gid = nodemap_map_id(nodemap, NODEMAP_GID,
811                                             NODEMAP_FS_TO_CLIENT,
812                                             attr->la_gid);
813                 b->mbo_valid |= OBD_MD_FLGID;
814         }
815
816         if (attr->la_valid & LA_PROJID) {
817                 /* TODO, nodemap for project id */
818                 b->mbo_projid = attr->la_projid;
819                 b->mbo_valid |= OBD_MD_FLPROJID;
820         }
821
822         b->mbo_mode = attr->la_mode;
823         if (attr->la_valid & LA_MODE)
824                 b->mbo_valid |= OBD_MD_FLMODE;
825         if (attr->la_valid & LA_TYPE)
826                 b->mbo_valid |= OBD_MD_FLTYPE;
827
828         if (fid != NULL) {
829                 b->mbo_fid1 = *fid;
830                 b->mbo_valid |= OBD_MD_FLID;
831                 CDEBUG(D_INODE, DFID": nlink=%d, mode=%o, valid=%#llx\n",
832                        PFID(fid), b->mbo_nlink, b->mbo_mode, b->mbo_valid);
833         }
834
835         if (!(attr->la_valid & LA_TYPE))
836                 return;
837
838         b->mbo_rdev   = attr->la_rdev;
839         b->mbo_size   = attr->la_size;
840         b->mbo_blocks = attr->la_blocks;
841
842         if (!S_ISREG(attr->la_mode)) {
843                 b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLRDEV;
844         } else if (ma->ma_need & MA_LOV && !(ma->ma_valid & MA_LOV)) {
845                 /* means no objects are allocated on osts. */
846                 LASSERT(!(ma->ma_valid & MA_LOV));
847                 /* just ignore blocks occupied by extend attributes on MDS */
848                 b->mbo_blocks = 0;
849                 /* if no object is allocated on osts, the size on mds is valid.
850                  * b=22272 */
851                 b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
852         } else if ((ma->ma_valid & MA_LOV) && ma->ma_lmm != NULL) {
853                 if (mdt_hsm_is_released(ma->ma_lmm)) {
854                         /* A released file stores its size on MDS. */
855                         /* But return 1 block for released file, unless tools
856                          * like tar will consider it fully sparse. (LU-3864)
857                          */
858                         if (unlikely(b->mbo_size == 0))
859                                 b->mbo_blocks = 0;
860                         else
861                                 b->mbo_blocks = 1;
862                         b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
863                 } else if (info->mti_som_valid) { /* som is valid */
864                         b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
865                 } else if (ma->ma_valid & MA_SOM) { /* lsom is valid */
866                         b->mbo_valid |= OBD_MD_FLLAZYSIZE | OBD_MD_FLLAZYBLOCKS;
867                         b->mbo_size = ma->ma_som.ms_size;
868                         b->mbo_blocks = ma->ma_som.ms_blocks;
869                 }
870         }
871
872         if (fid != NULL && (b->mbo_valid & OBD_MD_FLSIZE ||
873                             b->mbo_valid & OBD_MD_FLLAZYSIZE))
874                 CDEBUG(D_VFSTRACE, DFID": returning size %llu\n",
875                        PFID(fid), (unsigned long long)b->mbo_size);
876
877 out:
878         if (!IS_ERR_OR_NULL(nodemap))
879                 nodemap_putref(nodemap);
880 }
881
882 static inline int mdt_body_has_lov(const struct lu_attr *la,
883                                    const struct mdt_body *body)
884 {
885         return (S_ISREG(la->la_mode) && (body->mbo_valid & OBD_MD_FLEASIZE)) ||
886                (S_ISDIR(la->la_mode) && (body->mbo_valid & OBD_MD_FLDIREA));
887 }
888
889 void mdt_client_compatibility(struct mdt_thread_info *info)
890 {
891         struct mdt_body       *body;
892         struct ptlrpc_request *req = mdt_info_req(info);
893         struct obd_export     *exp = req->rq_export;
894         struct md_attr        *ma = &info->mti_attr;
895         struct lu_attr        *la = &ma->ma_attr;
896         ENTRY;
897
898         if (exp_connect_layout(exp))
899                 /* the client can deal with 16-bit lmm_stripe_count */
900                 RETURN_EXIT;
901
902         body = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
903
904         if (!mdt_body_has_lov(la, body))
905                 RETURN_EXIT;
906
907         /* now we have a reply with a lov for a client not compatible with the
908          * layout lock so we have to clean the layout generation number */
909         if (S_ISREG(la->la_mode))
910                 ma->ma_lmm->lmm_layout_gen = 0;
911         EXIT;
912 }
913
914 static int mdt_attr_get_eabuf_size(struct mdt_thread_info *info,
915                                    struct mdt_object *o)
916 {
917         const struct lu_env *env = info->mti_env;
918         int rc, rc2;
919
920         rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL,
921                           XATTR_NAME_LOV);
922
923         if (rc == -ENODATA)
924                 rc = 0;
925
926         if (rc < 0)
927                 goto out;
928
929         /* Is it a directory? Let's check for the LMV as well */
930         if (S_ISDIR(lu_object_attr(&mdt_object_child(o)->mo_lu))) {
931                 rc2 = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL,
932                                    XATTR_NAME_LMV);
933
934                 if (rc2 == -ENODATA)
935                         rc2 = mo_xattr_get(env, mdt_object_child(o),
936                                            &LU_BUF_NULL,
937                                            XATTR_NAME_DEFAULT_LMV);
938
939                 if ((rc2 < 0 && rc2 != -ENODATA) || (rc2 > rc))
940                         rc = rc2;
941         }
942
943 out:
944         return rc;
945 }
946
947 int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
948                       const char *name)
949 {
950         const struct lu_env *env = info->mti_env;
951         int rc;
952         ENTRY;
953
954         LASSERT(info->mti_big_lmm_used == 0);
955         rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL, name);
956         if (rc < 0)
957                 RETURN(rc);
958
959         /* big_lmm may need to be grown */
960         if (info->mti_big_lmmsize < rc) {
961                 int size = size_roundup_power2(rc);
962
963                 if (info->mti_big_lmmsize > 0) {
964                         /* free old buffer */
965                         LASSERT(info->mti_big_lmm);
966                         OBD_FREE_LARGE(info->mti_big_lmm,
967                                        info->mti_big_lmmsize);
968                         info->mti_big_lmm = NULL;
969                         info->mti_big_lmmsize = 0;
970                 }
971
972                 OBD_ALLOC_LARGE(info->mti_big_lmm, size);
973                 if (info->mti_big_lmm == NULL)
974                         RETURN(-ENOMEM);
975                 info->mti_big_lmmsize = size;
976         }
977         LASSERT(info->mti_big_lmmsize >= rc);
978
979         info->mti_buf.lb_buf = info->mti_big_lmm;
980         info->mti_buf.lb_len = info->mti_big_lmmsize;
981         rc = mo_xattr_get(env, mdt_object_child(o), &info->mti_buf, name);
982
983         RETURN(rc);
984 }
985
986 int __mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
987                      struct md_attr *ma, const char *name)
988 {
989         struct md_object *next = mdt_object_child(o);
990         struct lu_buf    *buf = &info->mti_buf;
991         int rc;
992
993         if (strcmp(name, XATTR_NAME_LOV) == 0) {
994                 buf->lb_buf = ma->ma_lmm;
995                 buf->lb_len = ma->ma_lmm_size;
996                 LASSERT(!(ma->ma_valid & MA_LOV));
997         } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
998                 buf->lb_buf = ma->ma_lmv;
999                 buf->lb_len = ma->ma_lmv_size;
1000                 LASSERT(!(ma->ma_valid & MA_LMV));
1001         } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1002                 buf->lb_buf = ma->ma_default_lmv;
1003                 buf->lb_len = ma->ma_default_lmv_size;
1004                 LASSERT(!(ma->ma_valid & MA_LMV_DEF));
1005         } else {
1006                 return -EINVAL;
1007         }
1008
1009         LASSERT(buf->lb_buf);
1010
1011         rc = mo_xattr_get(info->mti_env, next, buf, name);
1012         if (rc > 0) {
1013
1014 got:
1015                 if (strcmp(name, XATTR_NAME_LOV) == 0) {
1016                         if (info->mti_big_lmm_used)
1017                                 ma->ma_lmm = info->mti_big_lmm;
1018
1019                         /* NOT return LOV EA with hole to old client. */
1020                         if (unlikely(le32_to_cpu(ma->ma_lmm->lmm_pattern) &
1021                                      LOV_PATTERN_F_HOLE) &&
1022                             !(exp_connect_flags(info->mti_exp) &
1023                               OBD_CONNECT_LFSCK)) {
1024                                 return -EIO;
1025                         } else {
1026                                 ma->ma_lmm_size = rc;
1027                                 ma->ma_valid |= MA_LOV;
1028                         }
1029                 } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
1030                         if (info->mti_big_lmm_used)
1031                                 ma->ma_lmv = info->mti_big_lmm;
1032
1033                         ma->ma_lmv_size = rc;
1034                         ma->ma_valid |= MA_LMV;
1035                 } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1036                         ma->ma_default_lmv_size = rc;
1037                         ma->ma_valid |= MA_LMV_DEF;
1038                 }
1039
1040                 /* Update mdt_max_mdsize so all clients will be aware that */
1041                 if (info->mti_mdt->mdt_max_mdsize < rc)
1042                         info->mti_mdt->mdt_max_mdsize = rc;
1043
1044                 rc = 0;
1045         } else if (rc == -ENODATA) {
1046                 /* no LOV EA */
1047                 rc = 0;
1048         } else if (rc == -ERANGE) {
1049                 /* Default LMV has fixed size, so it must be able to fit
1050                  * in the original buffer */
1051                 if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0)
1052                         return rc;
1053                 rc = mdt_big_xattr_get(info, o, name);
1054                 if (rc > 0) {
1055                         info->mti_big_lmm_used = 1;
1056                         goto got;
1057                 }
1058         }
1059
1060         return rc;
1061 }
1062
1063 int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
1064                    struct md_attr *ma, const char *name)
1065 {
1066         int rc;
1067
1068         if (!info->mti_big_lmm) {
1069                 OBD_ALLOC(info->mti_big_lmm, PAGE_SIZE);
1070                 if (!info->mti_big_lmm)
1071                         return -ENOMEM;
1072                 info->mti_big_lmmsize = PAGE_SIZE;
1073         }
1074
1075         if (strcmp(name, XATTR_NAME_LOV) == 0) {
1076                 ma->ma_lmm = info->mti_big_lmm;
1077                 ma->ma_lmm_size = info->mti_big_lmmsize;
1078                 ma->ma_valid &= ~MA_LOV;
1079         } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
1080                 ma->ma_lmv = info->mti_big_lmm;
1081                 ma->ma_lmv_size = info->mti_big_lmmsize;
1082                 ma->ma_valid &= ~MA_LMV;
1083         } else {
1084                 LBUG();
1085         }
1086
1087         LASSERT(!info->mti_big_lmm_used);
1088         rc = __mdt_stripe_get(info, o, ma, name);
1089         /* since big_lmm is always used here, clear 'used' flag to avoid
1090          * assertion in mdt_big_xattr_get().
1091          */
1092         info->mti_big_lmm_used = 0;
1093
1094         return rc;
1095 }
1096
1097 int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
1098                       struct lu_fid *pfid)
1099 {
1100         struct lu_buf           *buf = &info->mti_buf;
1101         struct link_ea_header   *leh;
1102         struct link_ea_entry    *lee;
1103         int                      rc;
1104         ENTRY;
1105
1106         buf->lb_buf = info->mti_big_lmm;
1107         buf->lb_len = info->mti_big_lmmsize;
1108         rc = mo_xattr_get(info->mti_env, mdt_object_child(o),
1109                           buf, XATTR_NAME_LINK);
1110         /* ignore errors, MA_PFID won't be set and it is
1111          * up to the caller to treat this as an error */
1112         if (rc == -ERANGE || buf->lb_len == 0) {
1113                 rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
1114                 buf->lb_buf = info->mti_big_lmm;
1115                 buf->lb_len = info->mti_big_lmmsize;
1116         }
1117
1118         if (rc < 0)
1119                 RETURN(rc);
1120         if (rc < sizeof(*leh)) {
1121                 CERROR("short LinkEA on "DFID": rc = %d\n",
1122                        PFID(mdt_object_fid(o)), rc);
1123                 RETURN(-ENODATA);
1124         }
1125
1126         leh = (struct link_ea_header *) buf->lb_buf;
1127         lee = (struct link_ea_entry *)(leh + 1);
1128         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
1129                 leh->leh_magic = LINK_EA_MAGIC;
1130                 leh->leh_reccount = __swab32(leh->leh_reccount);
1131                 leh->leh_len = __swab64(leh->leh_len);
1132         }
1133         if (leh->leh_magic != LINK_EA_MAGIC)
1134                 RETURN(-EINVAL);
1135         if (leh->leh_reccount == 0)
1136                 RETURN(-ENODATA);
1137
1138         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
1139         fid_be_to_cpu(pfid, pfid);
1140
1141         RETURN(0);
1142 }
1143
1144 int mdt_attr_get_pfid_name(struct mdt_thread_info *info, struct mdt_object *o,
1145                            struct lu_fid *pfid, struct lu_name *lname)
1146 {
1147         struct lu_buf *buf = &info->mti_buf;
1148         struct link_ea_header *leh;
1149         struct link_ea_entry *lee;
1150         int reclen;
1151         int rc;
1152
1153         buf->lb_buf = info->mti_xattr_buf;
1154         buf->lb_len = sizeof(info->mti_xattr_buf);
1155         rc = mo_xattr_get(info->mti_env, mdt_object_child(o), buf,
1156                           XATTR_NAME_LINK);
1157         if (rc == -ERANGE) {
1158                 rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
1159                 buf->lb_buf = info->mti_big_lmm;
1160                 buf->lb_len = info->mti_big_lmmsize;
1161         }
1162         if (rc < 0)
1163                 return rc;
1164
1165         if (rc < sizeof(*leh)) {
1166                 CERROR("short LinkEA on "DFID": rc = %d\n",
1167                        PFID(mdt_object_fid(o)), rc);
1168                 return -ENODATA;
1169         }
1170
1171         leh = (struct link_ea_header *)buf->lb_buf;
1172         lee = (struct link_ea_entry *)(leh + 1);
1173         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
1174                 leh->leh_magic = LINK_EA_MAGIC;
1175                 leh->leh_reccount = __swab32(leh->leh_reccount);
1176                 leh->leh_len = __swab64(leh->leh_len);
1177         }
1178         if (leh->leh_magic != LINK_EA_MAGIC)
1179                 return -EINVAL;
1180
1181         if (leh->leh_reccount == 0)
1182                 return -ENODATA;
1183
1184         linkea_entry_unpack(lee, &reclen, lname, pfid);
1185
1186         return 0;
1187 }
1188
1189 int mdt_attr_get_complex(struct mdt_thread_info *info,
1190                          struct mdt_object *o, struct md_attr *ma)
1191 {
1192         const struct lu_env *env = info->mti_env;
1193         struct md_object    *next = mdt_object_child(o);
1194         struct lu_buf       *buf = &info->mti_buf;
1195         int                  need = ma->ma_need;
1196         int                  rc = 0, rc2;
1197         u32                  mode;
1198         ENTRY;
1199
1200         ma->ma_valid = 0;
1201
1202         if (mdt_object_exists(o) == 0)
1203                 GOTO(out, rc = -ENOENT);
1204         mode = lu_object_attr(&next->mo_lu);
1205
1206         if (need & MA_INODE) {
1207                 ma->ma_need = MA_INODE;
1208                 rc = mo_attr_get(env, next, ma);
1209                 if (rc)
1210                         GOTO(out, rc);
1211
1212                 if (S_ISREG(mode))
1213                         (void) mdt_get_som(info, o, ma);
1214                 ma->ma_valid |= MA_INODE;
1215         }
1216
1217         if (need & MA_PFID) {
1218                 rc = mdt_attr_get_pfid(info, o, &ma->ma_pfid);
1219                 if (rc == 0)
1220                         ma->ma_valid |= MA_PFID;
1221                 /* ignore this error, parent fid is not mandatory */
1222                 rc = 0;
1223         }
1224
1225         if (need & MA_LOV && (S_ISREG(mode) || S_ISDIR(mode))) {
1226                 rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LOV);
1227                 if (rc)
1228                         GOTO(out, rc);
1229         }
1230
1231         if (need & MA_LMV && S_ISDIR(mode)) {
1232                 rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
1233                 if (rc != 0)
1234                         GOTO(out, rc);
1235         }
1236
1237         if (need & MA_LMV_DEF && S_ISDIR(mode)) {
1238                 rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV);
1239                 if (rc != 0)
1240                         GOTO(out, rc);
1241         }
1242
1243         /*
1244          * In the handle of MA_INODE, we may already get the SOM attr.
1245          */
1246         if (need & MA_SOM && S_ISREG(mode) && !(ma->ma_valid & MA_SOM)) {
1247                 rc = mdt_get_som(info, o, ma);
1248                 if (rc != 0)
1249                         GOTO(out, rc);
1250         }
1251
1252         if (need & MA_HSM && S_ISREG(mode)) {
1253                 buf->lb_buf = info->mti_xattr_buf;
1254                 buf->lb_len = sizeof(info->mti_xattr_buf);
1255                 BUILD_BUG_ON(sizeof(struct hsm_attrs) >
1256                              sizeof(info->mti_xattr_buf));
1257                 rc2 = mo_xattr_get(info->mti_env, next, buf, XATTR_NAME_HSM);
1258                 rc2 = lustre_buf2hsm(info->mti_xattr_buf, rc2, &ma->ma_hsm);
1259                 if (rc2 == 0)
1260                         ma->ma_valid |= MA_HSM;
1261                 else if (rc2 < 0 && rc2 != -ENODATA)
1262                         GOTO(out, rc = rc2);
1263         }
1264
1265 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
1266         if (need & MA_ACL_DEF && S_ISDIR(mode)) {
1267                 buf->lb_buf = ma->ma_acl;
1268                 buf->lb_len = ma->ma_acl_size;
1269                 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT);
1270                 if (rc2 > 0) {
1271                         ma->ma_acl_size = rc2;
1272                         ma->ma_valid |= MA_ACL_DEF;
1273                 } else if (rc2 == -ENODATA) {
1274                         /* no ACLs */
1275                         ma->ma_acl_size = 0;
1276                 } else
1277                         GOTO(out, rc = rc2);
1278         }
1279 #endif
1280 out:
1281         ma->ma_need = need;
1282         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = %#llx ma_lmm=%p\n",
1283                rc, ma->ma_valid, ma->ma_lmm);
1284         RETURN(rc);
1285 }
1286
1287 static int mdt_getattr_internal(struct mdt_thread_info *info,
1288                                 struct mdt_object *o, int ma_need)
1289 {
1290         struct mdt_device *mdt = info->mti_mdt;
1291         struct md_object *next = mdt_object_child(o);
1292         const struct mdt_body *reqbody = info->mti_body;
1293         struct ptlrpc_request *req = mdt_info_req(info);
1294         struct md_attr *ma = &info->mti_attr;
1295         struct lu_attr *la = &ma->ma_attr;
1296         struct req_capsule *pill = info->mti_pill;
1297         const struct lu_env *env = info->mti_env;
1298         struct mdt_body *repbody;
1299         struct lu_buf *buffer = &info->mti_buf;
1300         struct obd_export *exp = info->mti_exp;
1301         ktime_t kstart = ktime_get();
1302         int rc;
1303
1304         ENTRY;
1305
1306         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
1307                 RETURN(err_serious(-ENOMEM));
1308
1309         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
1310
1311         ma->ma_valid = 0;
1312
1313         if (mdt_object_remote(o)) {
1314                 /* This object is located on remote node.*/
1315                 /* Return -ENOTSUPP for old client */
1316                 if (!mdt_is_dne_client(req->rq_export))
1317                         GOTO(out, rc = -ENOTSUPP);
1318
1319                 repbody->mbo_fid1 = *mdt_object_fid(o);
1320                 repbody->mbo_valid = OBD_MD_FLID | OBD_MD_MDS;
1321                 GOTO(out, rc = 0);
1322         }
1323
1324         if (reqbody->mbo_eadatasize > 0) {
1325                 buffer->lb_buf = req_capsule_server_get(pill, &RMF_MDT_MD);
1326                 if (buffer->lb_buf == NULL)
1327                         GOTO(out, rc = -EPROTO);
1328                 buffer->lb_len = req_capsule_get_size(pill, &RMF_MDT_MD,
1329                                                       RCL_SERVER);
1330         } else {
1331                 buffer->lb_buf = NULL;
1332                 buffer->lb_len = 0;
1333                 ma_need &= ~(MA_LOV | MA_LMV);
1334                 CDEBUG(D_INFO, "%s: RPC from %s: does not need LOVEA.\n",
1335                        mdt_obd_name(info->mti_mdt),
1336                        req->rq_export->exp_client_uuid.uuid);
1337         }
1338
1339         /* from 2.12.58 intent_getattr pack default LMV in reply */
1340         if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
1341             ((reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA)) ==
1342                     (OBD_MD_MEA | OBD_MD_DEFAULT_MEA)) &&
1343             req_capsule_has_field(&req->rq_pill, &RMF_DEFAULT_MDT_MD,
1344                                   RCL_SERVER)) {
1345                 ma->ma_lmv = buffer->lb_buf;
1346                 ma->ma_lmv_size = buffer->lb_len;
1347                 ma->ma_default_lmv = req_capsule_server_get(pill,
1348                                                 &RMF_DEFAULT_MDT_MD);
1349                 ma->ma_default_lmv_size = req_capsule_get_size(pill,
1350                                                 &RMF_DEFAULT_MDT_MD,
1351                                                 RCL_SERVER);
1352                 ma->ma_need = MA_INODE;
1353                 if (ma->ma_lmv_size > 0)
1354                         ma->ma_need |= MA_LMV;
1355                 if (ma->ma_default_lmv_size > 0)
1356                         ma->ma_need |= MA_LMV_DEF;
1357         } else if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
1358                    (reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA))) {
1359                 /* If it is dir and client require MEA, then we got MEA */
1360                 /* Assumption: MDT_MD size is enough for lmv size. */
1361                 ma->ma_lmv = buffer->lb_buf;
1362                 ma->ma_lmv_size = buffer->lb_len;
1363                 ma->ma_need = MA_INODE;
1364                 if (ma->ma_lmv_size > 0) {
1365                         if (reqbody->mbo_valid & OBD_MD_MEA) {
1366                                 ma->ma_need |= MA_LMV;
1367                         } else if (reqbody->mbo_valid & OBD_MD_DEFAULT_MEA) {
1368                                 ma->ma_need |= MA_LMV_DEF;
1369                                 ma->ma_default_lmv = buffer->lb_buf;
1370                                 ma->ma_lmv = NULL;
1371                                 ma->ma_default_lmv_size = buffer->lb_len;
1372                                 ma->ma_lmv_size = 0;
1373                         }
1374                 }
1375         } else {
1376                 ma->ma_lmm = buffer->lb_buf;
1377                 ma->ma_lmm_size = buffer->lb_len;
1378                 ma->ma_need = MA_INODE | MA_HSM;
1379                 if (ma->ma_lmm_size > 0) {
1380                         ma->ma_need |= MA_LOV;
1381                         /* Older clients may crash if they getattr overstriped
1382                          * files
1383                          */
1384                         if (!exp_connect_overstriping(exp) &&
1385                             mdt_lmm_is_overstriping(ma->ma_lmm))
1386                                 RETURN(-EOPNOTSUPP);
1387                 }
1388         }
1389
1390         if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
1391             reqbody->mbo_valid & OBD_MD_FLDIREA  &&
1392             lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) {
1393                 /* get default stripe info for this dir. */
1394                 ma->ma_need |= MA_LOV_DEF;
1395         }
1396         ma->ma_need |= ma_need;
1397
1398         rc = mdt_attr_get_complex(info, o, ma);
1399         if (unlikely(rc)) {
1400                 CDEBUG_LIMIT(rc == -ENOENT ? D_OTHER : D_ERROR,
1401                              "%s: getattr error for "DFID": rc = %d\n",
1402                              mdt_obd_name(info->mti_mdt),
1403                              PFID(mdt_object_fid(o)), rc);
1404                 RETURN(rc);
1405         }
1406
1407         /* if file is released, check if a restore is running */
1408         if (ma->ma_valid & MA_HSM) {
1409                 repbody->mbo_valid |= OBD_MD_TSTATE;
1410                 if ((ma->ma_hsm.mh_flags & HS_RELEASED) &&
1411                     mdt_hsm_restore_is_running(info, mdt_object_fid(o)))
1412                         repbody->mbo_t_state = MS_RESTORE;
1413         }
1414
1415         if (unlikely(!(ma->ma_valid & MA_INODE)))
1416                 RETURN(-EFAULT);
1417
1418         mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
1419
1420         if (mdt_body_has_lov(la, reqbody)) {
1421                 u32 stripe_count = 1;
1422
1423                 if (ma->ma_valid & MA_LOV) {
1424                         LASSERT(ma->ma_lmm_size);
1425                         repbody->mbo_eadatasize = ma->ma_lmm_size;
1426                         if (S_ISDIR(la->la_mode))
1427                                 repbody->mbo_valid |= OBD_MD_FLDIREA;
1428                         else
1429                                 repbody->mbo_valid |= OBD_MD_FLEASIZE;
1430                         mdt_dump_lmm(D_INFO, ma->ma_lmm, repbody->mbo_valid);
1431                 }
1432                 if (ma->ma_valid & MA_LMV) {
1433                         struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1434                         u32 magic = le32_to_cpu(lmv->lmv_magic);
1435
1436                         /* Return -ENOTSUPP for old client */
1437                         if (!mdt_is_striped_client(req->rq_export))
1438                                 RETURN(-ENOTSUPP);
1439
1440                         LASSERT(S_ISDIR(la->la_mode));
1441                         mdt_dump_lmv(D_INFO, ma->ma_lmv);
1442                         repbody->mbo_eadatasize = ma->ma_lmv_size;
1443                         repbody->mbo_valid |= (OBD_MD_FLDIREA|OBD_MD_MEA);
1444
1445                         stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
1446                         if (magic == LMV_MAGIC_STRIPE && lmv_is_restriping(lmv))
1447                                 mdt_restripe_migrate_add(info, o);
1448                         else if (magic == LMV_MAGIC_V1 &&
1449                                  lmv_is_restriping(lmv))
1450                                 mdt_restripe_update_add(info, o);
1451                 }
1452                 if (ma->ma_valid & MA_LMV_DEF) {
1453                         /* Return -ENOTSUPP for old client */
1454                         if (!mdt_is_striped_client(req->rq_export))
1455                                 RETURN(-ENOTSUPP);
1456                         LASSERT(S_ISDIR(la->la_mode));
1457                         /*
1458                          * when ll_dir_getstripe() gets default LMV, it
1459                          * checks mbo_eadatasize.
1460                          */
1461                         if (!(ma->ma_valid & MA_LMV))
1462                                 repbody->mbo_eadatasize =
1463                                         ma->ma_default_lmv_size;
1464                         repbody->mbo_valid |= (OBD_MD_FLDIREA |
1465                                                OBD_MD_DEFAULT_MEA);
1466                 }
1467                 CDEBUG(D_VFSTRACE,
1468                        "dirent count %llu stripe count %u MDT count %d\n",
1469                        ma->ma_attr.la_dirent_count, stripe_count,
1470                        atomic_read(&mdt->mdt_mds_mds_conns) + 1);
1471                 if (ma->ma_attr.la_dirent_count != LU_DIRENT_COUNT_UNSET &&
1472                     ma->ma_attr.la_dirent_count >
1473                         mdt->mdt_restriper.mdr_dir_split_count &&
1474                     !fid_is_root(mdt_object_fid(o)) &&
1475                     mdt->mdt_enable_dir_auto_split &&
1476                     !o->mot_restriping &&
1477                     stripe_count < atomic_read(&mdt->mdt_mds_mds_conns) + 1)
1478                         mdt_auto_split_add(info, o);
1479         } else if (S_ISLNK(la->la_mode) &&
1480                    reqbody->mbo_valid & OBD_MD_LINKNAME) {
1481                 buffer->lb_buf = ma->ma_lmm;
1482                 /* eadatasize from client includes NULL-terminator, so
1483                  * there is no need to read it */
1484                 buffer->lb_len = reqbody->mbo_eadatasize - 1;
1485                 rc = mo_readlink(env, next, buffer);
1486                 if (unlikely(rc <= 0)) {
1487                         CERROR("%s: readlink failed for "DFID": rc = %d\n",
1488                                mdt_obd_name(info->mti_mdt),
1489                                PFID(mdt_object_fid(o)), rc);
1490                         rc = -EFAULT;
1491                 } else {
1492                         int print_limit = min_t(int, PAGE_SIZE - 128, rc);
1493
1494                         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READLINK_EPROTO))
1495                                 rc -= 2;
1496                         repbody->mbo_valid |= OBD_MD_LINKNAME;
1497                         /* we need to report back size with NULL-terminator
1498                          * because client expects that */
1499                         repbody->mbo_eadatasize = rc + 1;
1500                         if (repbody->mbo_eadatasize != reqbody->mbo_eadatasize)
1501                                 CDEBUG(D_INODE, "%s: Read shorter symlink %d "
1502                                        "on "DFID ", expected %d\n",
1503                                        mdt_obd_name(info->mti_mdt),
1504                                        rc, PFID(mdt_object_fid(o)),
1505                                        reqbody->mbo_eadatasize - 1);
1506                         /* NULL terminate */
1507                         ((char *)ma->ma_lmm)[rc] = 0;
1508
1509                         /* If the total CDEBUG() size is larger than a page, it
1510                          * will print a warning to the console, avoid this by
1511                          * printing just the last part of the symlink. */
1512                         CDEBUG(D_INODE, "symlink dest %s%.*s, len = %d\n",
1513                                print_limit < rc ? "..." : "", print_limit,
1514                                (char *)ma->ma_lmm + rc - print_limit, rc);
1515                         rc = 0;
1516                 }
1517         }
1518
1519         if (reqbody->mbo_valid & OBD_MD_FLMODEASIZE) {
1520                 repbody->mbo_max_mdsize = info->mti_mdt->mdt_max_mdsize;
1521                 repbody->mbo_valid |= OBD_MD_FLMODEASIZE;
1522                 CDEBUG(D_INODE, "changing the max MD size to %u\n",
1523                        repbody->mbo_max_mdsize);
1524         }
1525
1526 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
1527         if ((exp_connect_flags(req->rq_export) & OBD_CONNECT_ACL) &&
1528                  (reqbody->mbo_valid & OBD_MD_FLACL)) {
1529                 struct lu_nodemap *nodemap = nodemap_get_from_exp(exp);
1530                 if (IS_ERR(nodemap))
1531                         RETURN(PTR_ERR(nodemap));
1532
1533                 rc = mdt_pack_acl2body(info, repbody, o, nodemap);
1534                 nodemap_putref(nodemap);
1535         }
1536 #endif
1537
1538 out:
1539         if (rc == 0)
1540                 mdt_counter_incr(req, LPROC_MDT_GETATTR,
1541                                  ktime_us_delta(ktime_get(), kstart));
1542
1543         RETURN(rc);
1544 }
1545
1546 static int mdt_getattr(struct tgt_session_info *tsi)
1547 {
1548         struct mdt_thread_info  *info = tsi2mdt_info(tsi);
1549         struct mdt_object       *obj = info->mti_object;
1550         struct req_capsule      *pill = info->mti_pill;
1551         struct mdt_body         *reqbody;
1552         struct mdt_body         *repbody;
1553         int rc, rc2;
1554         ENTRY;
1555
1556         if (unlikely(info->mti_object == NULL))
1557                 RETURN(-EPROTO);
1558
1559         reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
1560         LASSERT(reqbody);
1561         LASSERT(lu_object_assert_exists(&obj->mot_obj));
1562
1563         /* Special case for Data-on-MDT files to get data version */
1564         if (unlikely(reqbody->mbo_valid & OBD_MD_FLDATAVERSION)) {
1565                 rc = mdt_data_version_get(tsi);
1566                 GOTO(out, rc);
1567         }
1568
1569         /* Unlike intent case where we need to pre-fill out buffers early on
1570          * in intent policy for ldlm reasons, here we can have a much better
1571          * guess at EA size by just reading it from disk.
1572          * Exceptions are readdir and (missing) directory striping */
1573         /* Readlink */
1574         if (reqbody->mbo_valid & OBD_MD_LINKNAME) {
1575                 /* No easy way to know how long is the symlink, but it cannot
1576                  * be more than PATH_MAX, so we allocate +1 */
1577                 rc = PATH_MAX + 1;
1578         /* A special case for fs ROOT: getattr there might fetch
1579          * default EA for entire fs, not just for this dir!
1580          */
1581         } else if (lu_fid_eq(mdt_object_fid(obj),
1582                              &info->mti_mdt->mdt_md_root_fid) &&
1583                    (reqbody->mbo_valid & OBD_MD_FLDIREA) &&
1584                    (lustre_msg_get_opc(mdt_info_req(info)->rq_reqmsg) ==
1585                                                                  MDS_GETATTR)) {
1586                 /* Should the default strping be bigger, mdt_fix_reply
1587                  * will reallocate */
1588                 rc = DEF_REP_MD_SIZE;
1589         } else {
1590                 /* Read the actual EA size from disk */
1591                 rc = mdt_attr_get_eabuf_size(info, obj);
1592         }
1593
1594         if (rc < 0)
1595                 GOTO(out, rc = err_serious(rc));
1596
1597         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, rc);
1598
1599         /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
1600          * by default. If the target object has more ACL entries, then
1601          * enlarge the buffer when necessary. */
1602         req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
1603                              LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1604
1605         rc = req_capsule_server_pack(pill);
1606         if (unlikely(rc != 0))
1607                 GOTO(out, rc = err_serious(rc));
1608
1609         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
1610         LASSERT(repbody != NULL);
1611         repbody->mbo_eadatasize = 0;
1612         repbody->mbo_aclsize = 0;
1613
1614         rc = mdt_check_ucred(info);
1615         if (unlikely(rc))
1616                 GOTO(out_shrink, rc);
1617
1618         info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
1619
1620         rc = mdt_getattr_internal(info, obj, 0);
1621         EXIT;
1622 out_shrink:
1623         mdt_client_compatibility(info);
1624         rc2 = mdt_fix_reply(info);
1625         if (rc == 0)
1626                 rc = rc2;
1627 out:
1628         mdt_thread_info_fini(info);
1629         return rc;
1630 }
1631
1632 /**
1633  * Handler of layout intent RPC requiring the layout modification
1634  *
1635  * \param[in]  info     thread environment
1636  * \param[in]  obj      object
1637  * \param[out] lhc      object ldlm lock handle
1638  * \param[in]  layout   layout change descriptor
1639  *
1640  * \retval 0    on success
1641  * \retval < 0  error code
1642  */
1643 int mdt_layout_change(struct mdt_thread_info *info, struct mdt_object *obj,
1644                       struct mdt_lock_handle *lhc,
1645                       struct md_layout_change *layout)
1646 {
1647         int rc;
1648
1649         ENTRY;
1650
1651         if (!mdt_object_exists(obj))
1652                 RETURN(-ENOENT);
1653
1654         if (!S_ISREG(lu_object_attr(&obj->mot_obj)))
1655                 RETURN(-EINVAL);
1656
1657         rc = mo_permission(info->mti_env, NULL, mdt_object_child(obj), NULL,
1658                            MAY_WRITE);
1659         if (rc)
1660                 RETURN(rc);
1661
1662         rc = mdt_check_resent_lock(info, obj, lhc);
1663         if (rc < 0)
1664                 RETURN(rc);
1665
1666         if (rc > 0) {
1667                 /* not resent */
1668                 __u64 lockpart = MDS_INODELOCK_LAYOUT;
1669
1670                 /* take layout lock to prepare layout change */
1671                 if (layout->mlc_opc == MD_LAYOUT_WRITE)
1672                         lockpart |= MDS_INODELOCK_UPDATE;
1673
1674                 mdt_lock_handle_init(lhc);
1675                 mdt_lock_reg_init(lhc, LCK_EX);
1676                 rc = mdt_reint_object_lock(info, obj, lhc, lockpart, false);
1677                 if (rc)
1678                         RETURN(rc);
1679         }
1680
1681         mutex_lock(&obj->mot_som_mutex);
1682         rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout);
1683         mutex_unlock(&obj->mot_som_mutex);
1684
1685         if (rc)
1686                 mdt_object_unlock(info, obj, lhc, 1);
1687
1688         RETURN(rc);
1689 }
1690
1691 /**
1692  * Exchange MOF_LOV_CREATED flags between two objects after a
1693  * layout swap. No assumption is made on whether o1 or o2 have
1694  * created objects or not.
1695  *
1696  * \param[in,out] o1    First swap layout object
1697  * \param[in,out] o2    Second swap layout object
1698  */
1699 static void mdt_swap_lov_flag(struct mdt_object *o1, struct mdt_object *o2)
1700 {
1701         unsigned int o1_lov_created = o1->mot_lov_created;
1702
1703         mutex_lock(&o1->mot_lov_mutex);
1704         mutex_lock(&o2->mot_lov_mutex);
1705
1706         o1->mot_lov_created = o2->mot_lov_created;
1707         o2->mot_lov_created = o1_lov_created;
1708
1709         mutex_unlock(&o2->mot_lov_mutex);
1710         mutex_unlock(&o1->mot_lov_mutex);
1711 }
1712
1713 static int mdt_swap_layouts(struct tgt_session_info *tsi)
1714 {
1715         struct mdt_thread_info  *info;
1716         struct ptlrpc_request   *req = tgt_ses_req(tsi);
1717         struct obd_export       *exp = req->rq_export;
1718         struct mdt_object       *o1, *o2, *o;
1719         struct mdt_lock_handle  *lh1, *lh2;
1720         struct mdc_swap_layouts *msl;
1721         int                      rc;
1722         ENTRY;
1723
1724         /* client does not support layout lock, so layout swaping
1725          * is disabled.
1726          * FIXME: there is a problem for old clients which don't support
1727          * layout lock yet. If those clients have already opened the file
1728          * they won't be notified at all so that old layout may still be
1729          * used to do IO. This can be fixed after file release is landed by
1730          * doing exclusive open and taking full EX ibits lock. - Jinshan */
1731         if (!exp_connect_layout(exp))
1732                 RETURN(-EOPNOTSUPP);
1733
1734         info = tsi2mdt_info(tsi);
1735         if (unlikely(info->mti_object == NULL))
1736                 RETURN(-EPROTO);
1737
1738         if (info->mti_dlm_req != NULL)
1739                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1740
1741         o1 = info->mti_object;
1742         o = o2 = mdt_object_find(info->mti_env, info->mti_mdt,
1743                                 &info->mti_body->mbo_fid2);
1744         if (IS_ERR(o))
1745                 GOTO(out, rc = PTR_ERR(o));
1746
1747         if (mdt_object_remote(o) || !mdt_object_exists(o)) /* remote object */
1748                 GOTO(put, rc = -ENOENT);
1749
1750         rc = lu_fid_cmp(&info->mti_body->mbo_fid1, &info->mti_body->mbo_fid2);
1751         if (unlikely(rc == 0)) /* same file, you kidding me? no-op. */
1752                 GOTO(put, rc);
1753
1754         if (rc < 0)
1755                 swap(o1, o2);
1756
1757         /* permission check. Make sure the calling process having permission
1758          * to write both files. */
1759         rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL,
1760                            MAY_WRITE);
1761         if (rc < 0)
1762                 GOTO(put, rc);
1763
1764         rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2), NULL,
1765                            MAY_WRITE);
1766         if (rc < 0)
1767                 GOTO(put, rc);
1768
1769         msl = req_capsule_client_get(info->mti_pill, &RMF_SWAP_LAYOUTS);
1770         if (msl == NULL)
1771                 GOTO(put, rc = -EPROTO);
1772
1773         lh1 = &info->mti_lh[MDT_LH_NEW];
1774         mdt_lock_reg_init(lh1, LCK_EX);
1775         lh2 = &info->mti_lh[MDT_LH_OLD];
1776         mdt_lock_reg_init(lh2, LCK_EX);
1777
1778         rc = mdt_object_lock(info, o1, lh1, MDS_INODELOCK_LAYOUT |
1779                              MDS_INODELOCK_XATTR);
1780         if (rc < 0)
1781                 GOTO(put, rc);
1782
1783         rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
1784                              MDS_INODELOCK_XATTR);
1785         if (rc < 0)
1786                 GOTO(unlock1, rc);
1787
1788         rc = mo_swap_layouts(info->mti_env, mdt_object_child(o1),
1789                              mdt_object_child(o2), msl->msl_flags);
1790         if (rc < 0)
1791                 GOTO(unlock2, rc);
1792
1793         mdt_swap_lov_flag(o1, o2);
1794
1795 unlock2:
1796         mdt_object_unlock(info, o2, lh2, rc);
1797 unlock1:
1798         mdt_object_unlock(info, o1, lh1, rc);
1799 put:
1800         mdt_object_put(info->mti_env, o);
1801 out:
1802         mdt_thread_info_fini(info);
1803         RETURN(rc);
1804 }
1805
1806 static int mdt_raw_lookup(struct mdt_thread_info *info,
1807                           struct mdt_object *parent,
1808                           const struct lu_name *lname,
1809                           struct ldlm_reply *ldlm_rep)
1810 {
1811         struct lu_fid   *child_fid = &info->mti_tmp_fid1;
1812         int              rc;
1813         ENTRY;
1814
1815         LASSERT(!info->mti_cross_ref);
1816
1817         /* Only got the fid of this obj by name */
1818         fid_zero(child_fid);
1819         rc = mdo_lookup(info->mti_env, mdt_object_child(info->mti_object),
1820                         lname, child_fid, &info->mti_spec);
1821         if (rc == 0) {
1822                 struct mdt_body *repbody;
1823
1824                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1825                 repbody->mbo_fid1 = *child_fid;
1826                 repbody->mbo_valid = OBD_MD_FLID;
1827                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
1828         } else if (rc == -ENOENT) {
1829                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
1830         }
1831
1832         RETURN(rc);
1833 }
1834
1835 /*
1836  * UPDATE lock should be taken against parent, and be released before exit;
1837  * child_bits lock should be taken against child, and be returned back:
1838  *            (1)normal request should release the child lock;
1839  *            (2)intent request will grant the lock to client.
1840  */
1841 static int mdt_getattr_name_lock(struct mdt_thread_info *info,
1842                                  struct mdt_lock_handle *lhc,
1843                                  __u64 child_bits,
1844                                  struct ldlm_reply *ldlm_rep)
1845 {
1846         struct ptlrpc_request *req = mdt_info_req(info);
1847         struct mdt_body *reqbody = NULL;
1848         struct mdt_object *parent = info->mti_object;
1849         struct mdt_object *child = NULL;
1850         struct lu_fid *child_fid = &info->mti_tmp_fid1;
1851         struct lu_name *lname = NULL;
1852         struct mdt_lock_handle *lhp = NULL;
1853         struct ldlm_lock *lock;
1854         struct req_capsule *pill = info->mti_pill;
1855         __u64 try_bits = 0;
1856         bool is_resent;
1857         int ma_need = 0;
1858         int rc;
1859
1860         ENTRY;
1861
1862         is_resent = lustre_handle_is_used(&lhc->mlh_reg_lh);
1863         LASSERT(ergo(is_resent,
1864                      lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT));
1865
1866         if (parent == NULL)
1867                 RETURN(-ENOENT);
1868
1869         if (info->mti_cross_ref) {
1870                 /* Only getattr on the child. Parent is on another node. */
1871                 mdt_set_disposition(info, ldlm_rep,
1872                                     DISP_LOOKUP_EXECD | DISP_LOOKUP_POS);
1873                 child = parent;
1874                 CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID", "
1875                        "ldlm_rep = %p\n",
1876                        PFID(mdt_object_fid(child)), ldlm_rep);
1877
1878                 rc = mdt_check_resent_lock(info, child, lhc);
1879                 if (rc < 0) {
1880                         RETURN(rc);
1881                 } else if (rc > 0) {
1882                         mdt_lock_handle_init(lhc);
1883                         mdt_lock_reg_init(lhc, LCK_PR);
1884
1885                         /*
1886                          * Object's name entry is on another MDS, it will
1887                          * request PERM lock only because LOOKUP lock is owned
1888                          * by the MDS where name entry resides.
1889                          *
1890                          * TODO: it should try layout lock too. - Jinshan
1891                          */
1892                         child_bits &= ~(MDS_INODELOCK_LOOKUP |
1893                                         MDS_INODELOCK_LAYOUT);
1894                         child_bits |= MDS_INODELOCK_PERM;
1895
1896                         rc = mdt_object_lock(info, child, lhc, child_bits);
1897                         if (rc < 0)
1898                                 RETURN(rc);
1899                 }
1900
1901                 /* Finally, we can get attr for child. */
1902                 if (!mdt_object_exists(child)) {
1903                         LU_OBJECT_DEBUG(D_INFO, info->mti_env,
1904                                         &child->mot_obj,
1905                                         "remote object doesn't exist.");
1906                         mdt_object_unlock(info, child, lhc, 1);
1907                         RETURN(-ENOENT);
1908                 }
1909
1910                 rc = mdt_getattr_internal(info, child, 0);
1911                 if (unlikely(rc != 0)) {
1912                         mdt_object_unlock(info, child, lhc, 1);
1913                         RETURN(rc);
1914                 }
1915
1916                 rc = mdt_pack_secctx_in_reply(info, child);
1917                 if (unlikely(rc)) {
1918                         mdt_object_unlock(info, child, lhc, 1);
1919                         RETURN(rc);
1920                 }
1921
1922                 rc = mdt_pack_encctx_in_reply(info, child);
1923                 if (unlikely(rc))
1924                         mdt_object_unlock(info, child, lhc, 1);
1925                 RETURN(rc);
1926         }
1927
1928         lname = &info->mti_name;
1929         mdt_name_unpack(pill, &RMF_NAME, lname, MNF_FIX_ANON);
1930
1931         if (lu_name_is_valid(lname)) {
1932                 if (mdt_object_remote(parent)) {
1933                         CERROR("%s: parent "DFID" is on remote target\n",
1934                                mdt_obd_name(info->mti_mdt),
1935                                PFID(mdt_object_fid(parent)));
1936                         RETURN(-EPROTO);
1937                 }
1938
1939                 CDEBUG(D_INODE, "getattr with lock for "DFID"/"DNAME", "
1940                        "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)),
1941                        PNAME(lname), ldlm_rep);
1942         } else {
1943                 reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
1944                 if (unlikely(reqbody == NULL))
1945                         RETURN(err_serious(-EPROTO));
1946
1947                 *child_fid = reqbody->mbo_fid2;
1948                 if (unlikely(!fid_is_sane(child_fid)))
1949                         RETURN(err_serious(-EINVAL));
1950
1951                 if (lu_fid_eq(mdt_object_fid(parent), child_fid)) {
1952                         mdt_object_get(info->mti_env, parent);
1953                         child = parent;
1954                 } else {
1955                         child = mdt_object_find(info->mti_env, info->mti_mdt,
1956                                                 child_fid);
1957                         if (IS_ERR(child))
1958                                 RETURN(PTR_ERR(child));
1959                 }
1960
1961                 if (mdt_object_remote(child)) {
1962                         CERROR("%s: child "DFID" is on remote target\n",
1963                                mdt_obd_name(info->mti_mdt),
1964                                PFID(mdt_object_fid(child)));
1965                         GOTO(out_child, rc = -EPROTO);
1966                 }
1967
1968                 /* don't fetch LOOKUP lock if it's remote object */
1969                 rc = mdt_is_remote_object(info, parent, child);
1970                 if (rc < 0)
1971                         GOTO(out_child, rc);
1972                 if (rc)
1973                         child_bits &= ~MDS_INODELOCK_LOOKUP;
1974
1975                 CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
1976                        "ldlm_rep = %p\n",
1977                        PFID(mdt_object_fid(parent)),
1978                        PFID(&reqbody->mbo_fid2), ldlm_rep);
1979         }
1980
1981         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
1982
1983         if (unlikely(!mdt_object_exists(parent)) && lu_name_is_valid(lname)) {
1984                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
1985                                 &parent->mot_obj,
1986                                 "Parent doesn't exist!");
1987                 GOTO(out_child, rc = -ESTALE);
1988         }
1989
1990         if (lu_name_is_valid(lname)) {
1991                 /* Always allow to lookup ".." */
1992                 if (unlikely(lname->ln_namelen == 2 &&
1993                              lname->ln_name[0] == '.' &&
1994                              lname->ln_name[1] == '.'))
1995                         info->mti_spec.sp_permitted = 1;
1996
1997                 if (info->mti_body->mbo_valid == OBD_MD_FLID) {
1998                         rc = mdt_raw_lookup(info, parent, lname, ldlm_rep);
1999
2000                         RETURN(rc);
2001                 }
2002
2003                 /* step 1: lock parent only if parent is a directory */
2004                 if (S_ISDIR(lu_object_attr(&parent->mot_obj))) {
2005                         lhp = &info->mti_lh[MDT_LH_PARENT];
2006                         mdt_lock_pdo_init(lhp, LCK_PR, lname);
2007                         rc = mdt_object_lock(info, parent, lhp,
2008                                              MDS_INODELOCK_UPDATE);
2009                         if (unlikely(rc != 0))
2010                                 RETURN(rc);
2011                 }
2012
2013                 /* step 2: lookup child's fid by name */
2014                 fid_zero(child_fid);
2015                 rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname,
2016                                 child_fid, &info->mti_spec);
2017                 if (rc == -ENOENT)
2018                         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
2019
2020                 if (rc != 0)
2021                         GOTO(unlock_parent, rc);
2022
2023                 child = mdt_object_find(info->mti_env, info->mti_mdt,
2024                                         child_fid);
2025                 if (unlikely(IS_ERR(child)))
2026                         GOTO(unlock_parent, rc = PTR_ERR(child));
2027         }
2028
2029         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
2030
2031         /* step 3: lock child regardless if it is local or remote. */
2032         LASSERT(child);
2033
2034         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2);
2035         if (!mdt_object_exists(child)) {
2036                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2037                                 &child->mot_obj,
2038                                 "Object doesn't exist!");
2039                 GOTO(out_child, rc = -ENOENT);
2040         }
2041
2042         rc = mdt_check_resent_lock(info, child, lhc);
2043         if (rc < 0) {
2044                 GOTO(out_child, rc);
2045         } else if (rc > 0) {
2046                 mdt_lock_handle_init(lhc);
2047                 mdt_lock_reg_init(lhc, LCK_PR);
2048
2049                 if (!(child_bits & MDS_INODELOCK_UPDATE) &&
2050                     !mdt_object_remote(child)) {
2051                         struct md_attr *ma = &info->mti_attr;
2052
2053                         ma->ma_valid = 0;
2054                         ma->ma_need = MA_INODE;
2055                         rc = mdt_attr_get_complex(info, child, ma);
2056                         if (unlikely(rc != 0))
2057                                 GOTO(out_child, rc);
2058
2059                         /* If the file has not been changed for some time, we
2060                          * return not only a LOOKUP lock, but also an UPDATE
2061                          * lock and this might save us RPC on later STAT. For
2062                          * directories, it also let negative dentry cache start
2063                          * working for this dir. */
2064                         if (ma->ma_valid & MA_INODE &&
2065                             ma->ma_attr.la_valid & LA_CTIME &&
2066                             info->mti_mdt->mdt_namespace->ns_ctime_age_limit +
2067                             ma->ma_attr.la_ctime < ktime_get_real_seconds())
2068                                 child_bits |= MDS_INODELOCK_UPDATE;
2069                 }
2070
2071                 /* layout lock must be granted in a best-effort way
2072                  * for IT operations */
2073                 LASSERT(!(child_bits & MDS_INODELOCK_LAYOUT));
2074                 if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
2075                     !mdt_object_remote(child) && ldlm_rep != NULL) {
2076                         if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) &&
2077                             exp_connect_layout(info->mti_exp)) {
2078                                 /* try to grant layout lock for regular file. */
2079                                 try_bits = MDS_INODELOCK_LAYOUT;
2080                         }
2081                         /* Acquire DOM lock in advance for data-on-mdt file */
2082                         if (child != parent)
2083                                 try_bits |= MDS_INODELOCK_DOM;
2084                 }
2085
2086                 if (try_bits != 0) {
2087                         /* try layout lock, it may fail to be granted due to
2088                          * contention at LOOKUP or UPDATE */
2089                         rc = mdt_object_lock_try(info, child, lhc, &child_bits,
2090                                                  try_bits, false);
2091                         if (child_bits & MDS_INODELOCK_LAYOUT)
2092                                 ma_need |= MA_LOV;
2093                 } else {
2094                         /* Do not enqueue the UPDATE lock from MDT(cross-MDT),
2095                          * client will enqueue the lock to the remote MDT */
2096                         if (mdt_object_remote(child))
2097                                 child_bits &= ~MDS_INODELOCK_UPDATE;
2098                         rc = mdt_object_lock(info, child, lhc, child_bits);
2099                 }
2100                 if (unlikely(rc != 0))
2101                         GOTO(out_child, rc);
2102         }
2103
2104         /* finally, we can get attr for child. */
2105         rc = mdt_getattr_internal(info, child, ma_need);
2106         if (unlikely(rc != 0)) {
2107                 mdt_object_unlock(info, child, lhc, 1);
2108                 GOTO(out_child, rc);
2109         }
2110
2111         rc = mdt_pack_secctx_in_reply(info, child);
2112         if (unlikely(rc)) {
2113                 mdt_object_unlock(info, child, lhc, 1);
2114                 GOTO(out_child, rc);
2115         }
2116
2117         rc = mdt_pack_encctx_in_reply(info, child);
2118         if (unlikely(rc)) {
2119                 mdt_object_unlock(info, child, lhc, 1);
2120                 GOTO(out_child, rc);
2121         }
2122
2123         lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
2124         if (lock) {
2125                 /* Debugging code. */
2126                 LDLM_DEBUG(lock, "Returning lock to client");
2127                 LASSERTF(fid_res_name_eq(mdt_object_fid(child),
2128                                          &lock->l_resource->lr_name),
2129                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
2130                          PLDLMRES(lock->l_resource),
2131                          PFID(mdt_object_fid(child)));
2132
2133                 if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
2134                     !mdt_object_remote(child) && child != parent) {
2135                         mdt_object_put(info->mti_env, child);
2136                         rc = mdt_pack_size2body(info, child_fid,
2137                                                 &lhc->mlh_reg_lh);
2138                         if (rc != 0 && child_bits & MDS_INODELOCK_DOM) {
2139                                 /* DOM lock was taken in advance but this is
2140                                  * not DoM file. Drop the lock.
2141                                  */
2142                                 lock_res_and_lock(lock);
2143                                 ldlm_inodebits_drop(lock, MDS_INODELOCK_DOM);
2144                                 unlock_res_and_lock(lock);
2145                         }
2146                         LDLM_LOCK_PUT(lock);
2147                         GOTO(unlock_parent, rc = 0);
2148                 }
2149                 LDLM_LOCK_PUT(lock);
2150         }
2151
2152         EXIT;
2153 out_child:
2154         if (child)
2155                 mdt_object_put(info->mti_env, child);
2156 unlock_parent:
2157         if (lhp)
2158                 mdt_object_unlock(info, parent, lhp, 1);
2159         return rc;
2160 }
2161
2162 /* normal handler: should release the child lock */
2163 static int mdt_getattr_name(struct tgt_session_info *tsi)
2164 {
2165         struct mdt_thread_info  *info = tsi2mdt_info(tsi);
2166         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
2167         struct mdt_body *reqbody;
2168         struct mdt_body *repbody;
2169         int rc, rc2;
2170
2171         ENTRY;
2172
2173         reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
2174         LASSERT(reqbody != NULL);
2175         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2176         LASSERT(repbody != NULL);
2177
2178         info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
2179         repbody->mbo_eadatasize = 0;
2180         repbody->mbo_aclsize = 0;
2181
2182         rc = mdt_init_ucred(info, reqbody);
2183         if (unlikely(rc))
2184                 GOTO(out_shrink, rc);
2185
2186         rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
2187         if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
2188                 ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode);
2189                 lhc->mlh_reg_lh.cookie = 0;
2190         }
2191         mdt_exit_ucred(info);
2192         EXIT;
2193 out_shrink:
2194         mdt_client_compatibility(info);
2195         rc2 = mdt_fix_reply(info);
2196         if (rc == 0)
2197                 rc = rc2;
2198         mdt_thread_info_fini(info);
2199         return rc;
2200 }
2201
2202 static int mdt_rmfid_unlink(struct mdt_thread_info *info,
2203                             const struct lu_fid *pfid,
2204                             const struct lu_name *name,
2205                             struct mdt_object *obj, s64 ctime)
2206 {
2207         struct lu_fid *child_fid = &info->mti_tmp_fid1;
2208         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
2209         struct mdt_device *mdt = info->mti_mdt;
2210         struct md_attr *ma = &info->mti_attr;
2211         struct mdt_lock_handle *parent_lh;
2212         struct mdt_lock_handle *child_lh;
2213         struct mdt_object *pobj;
2214         bool cos_incompat = false;
2215         int rc;
2216         ENTRY;
2217
2218         pobj = mdt_object_find(info->mti_env, mdt, pfid);
2219         if (IS_ERR(pobj))
2220                 GOTO(out, rc = PTR_ERR(pobj));
2221
2222         parent_lh = &info->mti_lh[MDT_LH_PARENT];
2223         mdt_lock_pdo_init(parent_lh, LCK_PW, name);
2224         rc = mdt_object_lock(info, pobj, parent_lh, MDS_INODELOCK_UPDATE);
2225         if (rc != 0)
2226                 GOTO(put_parent, rc);
2227
2228         if (mdt_object_remote(pobj))
2229                 cos_incompat = true;
2230
2231         rc = mdo_lookup(info->mti_env, mdt_object_child(pobj),
2232                         name, child_fid, &info->mti_spec);
2233         if (rc != 0)
2234                 GOTO(unlock_parent, rc);
2235
2236         if (!lu_fid_eq(child_fid, mdt_object_fid(obj)))
2237                 GOTO(unlock_parent, rc = -EREMCHG);
2238
2239         child_lh = &info->mti_lh[MDT_LH_CHILD];
2240         mdt_lock_reg_init(child_lh, LCK_EX);
2241         rc = mdt_reint_striped_lock(info, obj, child_lh,
2242                                     MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE,
2243                                     einfo, cos_incompat);
2244         if (rc != 0)
2245                 GOTO(unlock_parent, rc);
2246
2247         if (atomic_read(&obj->mot_open_count)) {
2248                 CDEBUG(D_OTHER, "object "DFID" open, skip\n",
2249                        PFID(mdt_object_fid(obj)));
2250                 GOTO(unlock_child, rc = -EBUSY);
2251         }
2252
2253         ma->ma_need = 0;
2254         ma->ma_valid = MA_INODE;
2255         ma->ma_attr.la_valid = LA_CTIME;
2256         ma->ma_attr.la_ctime = ctime;
2257
2258         mutex_lock(&obj->mot_lov_mutex);
2259
2260         rc = mdo_unlink(info->mti_env, mdt_object_child(pobj),
2261                         mdt_object_child(obj), name, ma, 0);
2262
2263         mutex_unlock(&obj->mot_lov_mutex);
2264
2265 unlock_child:
2266         mdt_reint_striped_unlock(info, obj, child_lh, einfo, 1);
2267 unlock_parent:
2268         mdt_object_unlock(info, pobj, parent_lh, 1);
2269 put_parent:
2270         mdt_object_put(info->mti_env, pobj);
2271 out:
2272         RETURN(rc);
2273 }
2274
2275 static int mdt_rmfid_check_permission(struct mdt_thread_info *info,
2276                                         struct mdt_object *obj)
2277 {
2278         struct lu_ucred *uc = lu_ucred(info->mti_env);
2279         struct md_attr *ma = &info->mti_attr;
2280         struct lu_attr *la = &ma->ma_attr;
2281         int rc = 0;
2282         ENTRY;
2283
2284         ma->ma_need = MA_INODE;
2285         rc = mo_attr_get(info->mti_env, mdt_object_child(obj), ma);
2286         if (rc)
2287                 GOTO(out, rc);
2288
2289         if (la->la_flags & LUSTRE_IMMUTABLE_FL)
2290                         rc = -EACCES;
2291
2292         if (md_capable(uc, CAP_DAC_OVERRIDE))
2293                 RETURN(0);
2294         if (uc->uc_fsuid == la->la_uid) {
2295                 if ((la->la_mode & S_IWUSR) == 0)
2296                         rc = -EACCES;
2297         } else if (uc->uc_fsgid == la->la_gid) {
2298                 if ((la->la_mode & S_IWGRP) == 0)
2299                         rc = -EACCES;
2300         } else if ((la->la_mode & S_IWOTH) == 0) {
2301                         rc = -EACCES;
2302         }
2303
2304 out:
2305         RETURN(rc);
2306 }
2307
2308 static int mdt_rmfid_one(struct mdt_thread_info *info, struct lu_fid *fid,
2309                          s64 ctime)
2310 {
2311         struct mdt_device *mdt = info->mti_mdt;
2312         struct mdt_object *obj = NULL;
2313         struct linkea_data ldata = { NULL };
2314         struct lu_buf *buf = &info->mti_big_buf;
2315         struct lu_name *name = &info->mti_name;
2316         struct lu_fid *pfid = &info->mti_tmp_fid1;
2317         struct link_ea_header *leh;
2318         struct link_ea_entry *lee;
2319         int reclen, count, rc = 0;
2320         ENTRY;
2321
2322         if (!fid_is_sane(fid))
2323                 GOTO(out, rc = -EINVAL);
2324
2325         if (!fid_is_namespace_visible(fid))
2326                 GOTO(out, rc = -EINVAL);
2327
2328         obj = mdt_object_find(info->mti_env, mdt, fid);
2329         if (IS_ERR(obj))
2330                 GOTO(out, rc = PTR_ERR(obj));
2331
2332         if (mdt_object_remote(obj))
2333                 GOTO(out, rc = -EREMOTE);
2334         if (!mdt_object_exists(obj) || lu_object_is_dying(&obj->mot_header))
2335                 GOTO(out, rc = -ENOENT);
2336
2337         rc = mdt_rmfid_check_permission(info, obj);
2338         if (rc)
2339                 GOTO(out, rc);
2340
2341         /* take LinkEA */
2342         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
2343         if (!buf->lb_buf)
2344                 GOTO(out, rc = -ENOMEM);
2345
2346         ldata.ld_buf = buf;
2347         rc = mdt_links_read(info, obj, &ldata);
2348         if (rc)
2349                 GOTO(out, rc);
2350
2351         leh = buf->lb_buf;
2352         lee = (struct link_ea_entry *)(leh + 1);
2353         for (count = 0; count < leh->leh_reccount; count++) {
2354                 /* remove every hardlink */
2355                 linkea_entry_unpack(lee, &reclen, name, pfid);
2356                 lee = (struct link_ea_entry *) ((char *)lee + reclen);
2357                 rc = mdt_rmfid_unlink(info, pfid, name, obj, ctime);
2358                 if (rc)
2359                         break;
2360         }
2361
2362 out:
2363         if (obj && !IS_ERR(obj))
2364                 mdt_object_put(info->mti_env, obj);
2365         if (info->mti_big_buf.lb_buf)
2366                 lu_buf_free(&info->mti_big_buf);
2367
2368         RETURN(rc);
2369 }
2370
2371 static int mdt_rmfid(struct tgt_session_info *tsi)
2372 {
2373         struct mdt_thread_info *mti = tsi2mdt_info(tsi);
2374         struct mdt_body *reqbody;
2375         struct lu_fid *fids, *rfids;
2376         int bufsize, rc;
2377         __u32 *rcs;
2378         int i, nr;
2379         ENTRY;
2380
2381         reqbody = req_capsule_client_get(tsi->tsi_pill, &RMF_MDT_BODY);
2382         if (reqbody == NULL)
2383                 RETURN(-EPROTO);
2384         bufsize = req_capsule_get_size(tsi->tsi_pill, &RMF_FID_ARRAY,
2385                                        RCL_CLIENT);
2386         nr = bufsize / sizeof(struct lu_fid);
2387         if (nr * sizeof(struct lu_fid) != bufsize)
2388                 RETURN(-EINVAL);
2389         req_capsule_set_size(tsi->tsi_pill, &RMF_RCS,
2390                              RCL_SERVER, nr * sizeof(__u32));
2391         req_capsule_set_size(tsi->tsi_pill, &RMF_FID_ARRAY,
2392                              RCL_SERVER, nr * sizeof(struct lu_fid));
2393         rc = req_capsule_server_pack(tsi->tsi_pill);
2394         if (rc)
2395                 GOTO(out, rc = err_serious(rc));
2396         fids = req_capsule_client_get(tsi->tsi_pill, &RMF_FID_ARRAY);
2397         if (fids == NULL)
2398                 RETURN(-EPROTO);
2399         rcs = req_capsule_server_get(tsi->tsi_pill, &RMF_RCS);
2400         LASSERT(rcs);
2401         rfids = req_capsule_server_get(tsi->tsi_pill, &RMF_FID_ARRAY);
2402         LASSERT(rfids);
2403
2404         mdt_init_ucred(mti, reqbody);
2405         for (i = 0; i < nr; i++) {
2406                 rfids[i] = fids[i];
2407                 rcs[i] = mdt_rmfid_one(mti, fids + i, reqbody->mbo_ctime);
2408         }
2409         mdt_exit_ucred(mti);
2410
2411 out:
2412         RETURN(rc);
2413 }
2414
2415 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2416                          void *karg, void __user *uarg);
2417
2418 static int mdt_set_info(struct tgt_session_info *tsi)
2419 {
2420         struct ptlrpc_request   *req = tgt_ses_req(tsi);
2421         char                    *key;
2422         void                    *val;
2423         int                      keylen, vallen, rc = 0;
2424
2425         ENTRY;
2426
2427         key = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_KEY);
2428         if (key == NULL) {
2429                 DEBUG_REQ(D_HA, req, "no set_info key");
2430                 RETURN(err_serious(-EFAULT));
2431         }
2432
2433         keylen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_KEY,
2434                                       RCL_CLIENT);
2435
2436         val = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_VAL);
2437         if (val == NULL) {
2438                 DEBUG_REQ(D_HA, req, "no set_info val");
2439                 RETURN(err_serious(-EFAULT));
2440         }
2441
2442         vallen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_VAL,
2443                                       RCL_CLIENT);
2444
2445         /* Swab any part of val you need to here */
2446         if (KEY_IS(KEY_READ_ONLY)) {
2447                 spin_lock(&req->rq_export->exp_lock);
2448                 if (*(__u32 *)val)
2449                         *exp_connect_flags_ptr(req->rq_export) |=
2450                                 OBD_CONNECT_RDONLY;
2451                 else
2452                         *exp_connect_flags_ptr(req->rq_export) &=
2453                                 ~OBD_CONNECT_RDONLY;
2454                 spin_unlock(&req->rq_export->exp_lock);
2455         } else if (KEY_IS(KEY_CHANGELOG_CLEAR)) {
2456                 struct changelog_setinfo *cs = val;
2457
2458                 if (vallen != sizeof(*cs)) {
2459                         CERROR("%s: bad changelog_clear setinfo size %d\n",
2460                                tgt_name(tsi->tsi_tgt), vallen);
2461                         RETURN(-EINVAL);
2462                 }
2463                 if (ptlrpc_req_need_swab(req)) {
2464                         __swab64s(&cs->cs_recno);
2465                         __swab32s(&cs->cs_id);
2466                 }
2467
2468                 if (!mdt_is_rootadmin(tsi2mdt_info(tsi)))
2469                         RETURN(-EACCES);
2470                 rc = mdt_iocontrol(OBD_IOC_CHANGELOG_CLEAR, req->rq_export,
2471                                    vallen, val, NULL);
2472         } else if (KEY_IS(KEY_EVICT_BY_NID)) {
2473                 if (vallen > 0)
2474                         obd_export_evict_by_nid(req->rq_export->exp_obd, val);
2475         } else {
2476                 RETURN(-EINVAL);
2477         }
2478         RETURN(rc);
2479 }
2480
2481 static int mdt_readpage(struct tgt_session_info *tsi)
2482 {
2483         struct mdt_thread_info  *info = mdt_th_info(tsi->tsi_env);
2484         struct mdt_object       *object = mdt_obj(tsi->tsi_corpus);
2485         struct lu_rdpg          *rdpg = &info->mti_u.rdpg.mti_rdpg;
2486         const struct mdt_body   *reqbody = tsi->tsi_mdt_body;
2487         struct mdt_body         *repbody;
2488         int                      rc;
2489         int                      i;
2490
2491         ENTRY;
2492
2493         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
2494                 RETURN(err_serious(-ENOMEM));
2495
2496         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_MDT_BODY);
2497         if (repbody == NULL || reqbody == NULL)
2498                 RETURN(err_serious(-EFAULT));
2499
2500         /*
2501          * prepare @rdpg before calling lower layers and transfer itself. Here
2502          * reqbody->size contains offset of where to start to read and
2503          * reqbody->nlink contains number bytes to read.
2504          */
2505         rdpg->rp_hash = reqbody->mbo_size;
2506         if (rdpg->rp_hash != reqbody->mbo_size) {
2507                 CERROR("Invalid hash: %#llx != %#llx\n",
2508                        rdpg->rp_hash, reqbody->mbo_size);
2509                 RETURN(-EFAULT);
2510         }
2511
2512         rdpg->rp_attrs = reqbody->mbo_mode;
2513         if (exp_connect_flags(tsi->tsi_exp) & OBD_CONNECT_64BITHASH)
2514                 rdpg->rp_attrs |= LUDA_64BITHASH;
2515         rdpg->rp_count  = min_t(unsigned int, reqbody->mbo_nlink,
2516                                 exp_max_brw_size(tsi->tsi_exp));
2517         rdpg->rp_npages = (rdpg->rp_count + PAGE_SIZE - 1) >>
2518                           PAGE_SHIFT;
2519         OBD_ALLOC_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages);
2520         if (rdpg->rp_pages == NULL)
2521                 RETURN(-ENOMEM);
2522
2523         for (i = 0; i < rdpg->rp_npages; ++i) {
2524                 rdpg->rp_pages[i] = alloc_page(GFP_NOFS);
2525                 if (rdpg->rp_pages[i] == NULL)
2526                         GOTO(free_rdpg, rc = -ENOMEM);
2527         }
2528
2529         /* call lower layers to fill allocated pages with directory data */
2530         rc = mo_readpage(tsi->tsi_env, mdt_object_child(object), rdpg);
2531         if (rc < 0)
2532                 GOTO(free_rdpg, rc);
2533
2534         /* send pages to client */
2535         rc = tgt_sendpage(tsi, rdpg, rc);
2536
2537         EXIT;
2538 free_rdpg:
2539
2540         for (i = 0; i < rdpg->rp_npages; i++)
2541                 if (rdpg->rp_pages[i] != NULL)
2542                         __free_page(rdpg->rp_pages[i]);
2543         OBD_FREE_PTR_ARRAY_LARGE(rdpg->rp_pages, rdpg->rp_npages);
2544
2545         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
2546                 RETURN(0);
2547
2548         return rc;
2549 }
2550
2551 static int mdt_fix_attr_ucred(struct mdt_thread_info *info, __u32 op)
2552 {
2553         struct lu_ucred *uc = mdt_ucred_check(info);
2554         struct lu_attr *attr = &info->mti_attr.ma_attr;
2555
2556         if (uc == NULL)
2557                 return -EINVAL;
2558
2559         if (op != REINT_SETATTR) {
2560                 if ((attr->la_valid & LA_UID) && (attr->la_uid != -1))
2561                         attr->la_uid = uc->uc_fsuid;
2562                 /* for S_ISGID, inherit gid from his parent, such work will be
2563                  * done in cmm/mdd layer, here set all cases as uc->uc_fsgid. */
2564                 if ((attr->la_valid & LA_GID) && (attr->la_gid != -1))
2565                         attr->la_gid = uc->uc_fsgid;
2566         }
2567
2568         return 0;
2569 }
2570
2571 static inline bool mdt_is_readonly_open(struct mdt_thread_info *info, __u32 op)
2572 {
2573         return op == REINT_OPEN &&
2574              !(info->mti_spec.sp_cr_flags & (MDS_FMODE_WRITE | MDS_OPEN_CREAT));
2575 }
2576
2577 static void mdt_preset_secctx_size(struct mdt_thread_info *info)
2578 {
2579         struct req_capsule *pill = info->mti_pill;
2580
2581         if (req_capsule_has_field(pill, &RMF_FILE_SECCTX,
2582                                   RCL_SERVER) &&
2583             req_capsule_has_field(pill, &RMF_FILE_SECCTX_NAME,
2584                                   RCL_CLIENT)) {
2585                 if (req_capsule_get_size(pill, &RMF_FILE_SECCTX_NAME,
2586                                          RCL_CLIENT) != 0)
2587                         /* pre-set size in server part with max size */
2588                         req_capsule_set_size(pill, &RMF_FILE_SECCTX,
2589                                              RCL_SERVER,
2590                                              OBD_MAX_DEFAULT_EA_SIZE);
2591                 else
2592                         req_capsule_set_size(pill, &RMF_FILE_SECCTX,
2593                                              RCL_SERVER, 0);
2594         }
2595 }
2596
2597 static void mdt_preset_encctx_size(struct mdt_thread_info *info)
2598 {
2599         struct req_capsule *pill = info->mti_pill;
2600
2601         if (req_capsule_has_field(pill, &RMF_FILE_ENCCTX,
2602                                   RCL_SERVER))
2603                 /* pre-set size in server part with max size */
2604                 req_capsule_set_size(pill, &RMF_FILE_ENCCTX,
2605                                      RCL_SERVER,
2606                                      info->mti_mdt->mdt_max_mdsize);
2607 }
2608
2609 static int mdt_reint_internal(struct mdt_thread_info *info,
2610                               struct mdt_lock_handle *lhc,
2611                               __u32 op)
2612 {
2613         struct req_capsule      *pill = info->mti_pill;
2614         struct mdt_body         *repbody;
2615         int                      rc = 0, rc2;
2616
2617         ENTRY;
2618
2619         rc = mdt_reint_unpack(info, op);
2620         if (rc != 0) {
2621                 CERROR("Can't unpack reint, rc %d\n", rc);
2622                 RETURN(err_serious(rc));
2623         }
2624
2625
2626         /* check if the file system is set to readonly. O_RDONLY open
2627          * is still allowed even the file system is set to readonly mode */
2628         if (mdt_rdonly(info->mti_exp) && !mdt_is_readonly_open(info, op))
2629                 RETURN(err_serious(-EROFS));
2630
2631         /* for replay (no_create) lmm is not needed, client has it already */
2632         if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
2633                 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
2634                                      DEF_REP_MD_SIZE);
2635
2636         /* llog cookies are always 0, the field is kept for compatibility */
2637         if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
2638                 req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, 0);
2639
2640         /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
2641          * by default. If the target object has more ACL entries, then
2642          * enlarge the buffer when necessary. */
2643         if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER))
2644                 req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
2645                                      LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
2646
2647         mdt_preset_secctx_size(info);
2648         mdt_preset_encctx_size(info);
2649
2650         rc = req_capsule_server_pack(pill);
2651         if (rc != 0) {
2652                 CERROR("Can't pack response, rc %d\n", rc);
2653                 RETURN(err_serious(rc));
2654         }
2655
2656         if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_SERVER)) {
2657                 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
2658                 LASSERT(repbody);
2659                 repbody->mbo_eadatasize = 0;
2660                 repbody->mbo_aclsize = 0;
2661         }
2662
2663         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_REINT_DELAY, 10);
2664
2665         /* for replay no cookkie / lmm need, because client have this already */
2666         if (info->mti_spec.no_create)
2667                 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
2668                         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, 0);
2669
2670         rc = mdt_init_ucred_reint(info);
2671         if (rc)
2672                 GOTO(out_shrink, rc);
2673
2674         rc = mdt_fix_attr_ucred(info, op);
2675         if (rc != 0)
2676                 GOTO(out_ucred, rc = err_serious(rc));
2677
2678         rc = mdt_check_resent(info, mdt_reconstruct, lhc);
2679         if (rc < 0) {
2680                 GOTO(out_ucred, rc);
2681         } else if (rc == 1) {
2682                 DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt");
2683                 rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg);
2684                 GOTO(out_ucred, rc);
2685         }
2686         rc = mdt_reint_rec(info, lhc);
2687         EXIT;
2688 out_ucred:
2689         mdt_exit_ucred(info);
2690 out_shrink:
2691         mdt_client_compatibility(info);
2692
2693         rc2 = mdt_fix_reply(info);
2694         if (rc == 0)
2695                 rc = rc2;
2696
2697         /*
2698          * Data-on-MDT optimization - read data along with OPEN and return it
2699          * in reply when possible.
2700          */
2701         if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req))
2702                 rc = mdt_dom_read_on_open(info, info->mti_mdt,
2703                                           &lhc->mlh_reg_lh);
2704
2705         return rc;
2706 }
2707
2708 static long mdt_reint_opcode(struct ptlrpc_request *req,
2709                              const struct req_format **fmt)
2710 {
2711         struct mdt_device       *mdt;
2712         struct mdt_rec_reint    *rec;
2713         long                     opc;
2714
2715         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
2716         if (rec != NULL) {
2717                 opc = rec->rr_opcode;
2718                 DEBUG_REQ(D_INODE, req, "reint opt = %ld", opc);
2719                 if (opc < REINT_MAX && fmt[opc] != NULL)
2720                         req_capsule_extend(&req->rq_pill, fmt[opc]);
2721                 else {
2722                         mdt = mdt_exp2dev(req->rq_export);
2723                         CERROR("%s: Unsupported opcode '%ld' from client '%s':"
2724                                " rc = %d\n", req->rq_export->exp_obd->obd_name,
2725                                opc, mdt->mdt_ldlm_client->cli_name, -EFAULT);
2726                         opc = err_serious(-EFAULT);
2727                 }
2728         } else {
2729                 opc = err_serious(-EFAULT);
2730         }
2731         return opc;
2732 }
2733
2734 static int mdt_reint(struct tgt_session_info *tsi)
2735 {
2736         long opc;
2737         int  rc;
2738         static const struct req_format *reint_fmts[REINT_MAX] = {
2739                 [REINT_SETATTR]  = &RQF_MDS_REINT_SETATTR,
2740                 [REINT_CREATE]   = &RQF_MDS_REINT_CREATE,
2741                 [REINT_LINK]     = &RQF_MDS_REINT_LINK,
2742                 [REINT_UNLINK]   = &RQF_MDS_REINT_UNLINK,
2743                 [REINT_RENAME]   = &RQF_MDS_REINT_RENAME,
2744                 [REINT_OPEN]     = &RQF_MDS_REINT_OPEN,
2745                 [REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR,
2746                 [REINT_RMENTRY]  = &RQF_MDS_REINT_UNLINK,
2747                 [REINT_MIGRATE]  = &RQF_MDS_REINT_MIGRATE,
2748                 [REINT_RESYNC]   = &RQF_MDS_REINT_RESYNC,
2749         };
2750
2751         ENTRY;
2752
2753         opc = mdt_reint_opcode(tgt_ses_req(tsi), reint_fmts);
2754         if (opc >= 0) {
2755                 struct mdt_thread_info *info = tsi2mdt_info(tsi);
2756                 /*
2757                  * No lock possible here from client to pass it to reint code
2758                  * path.
2759                  */
2760                 rc = mdt_reint_internal(info, NULL, opc);
2761                 mdt_thread_info_fini(info);
2762         } else {
2763                 rc = opc;
2764         }
2765
2766         tsi->tsi_reply_fail_id = OBD_FAIL_MDS_REINT_NET_REP;
2767         RETURN(rc);
2768 }
2769
2770 /* this should sync the whole device */
2771 int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt)
2772 {
2773         struct dt_device *dt = mdt->mdt_bottom;
2774         int rc;
2775         ENTRY;
2776
2777         rc = dt->dd_ops->dt_sync(env, dt);
2778         RETURN(rc);
2779 }
2780
2781 /* this should sync this object */
2782 static int mdt_object_sync(const struct lu_env *env, struct obd_export *exp,
2783                            struct mdt_object *mo)
2784 {
2785         int rc = 0;
2786
2787         ENTRY;
2788
2789         if (!mdt_object_exists(mo)) {
2790                 CWARN("%s: non existing object "DFID": rc = %d\n",
2791                       exp->exp_obd->obd_name, PFID(mdt_object_fid(mo)),
2792                       -ESTALE);
2793                 RETURN(-ESTALE);
2794         }
2795
2796         if (S_ISREG(lu_object_attr(&mo->mot_obj))) {
2797                 struct lu_target *tgt = tgt_ses_info(env)->tsi_tgt;
2798                 dt_obj_version_t version;
2799
2800                 version = dt_version_get(env, mdt_obj2dt(mo));
2801                 if (version > tgt->lut_obd->obd_last_committed)
2802                         rc = mo_object_sync(env, mdt_object_child(mo));
2803         } else {
2804                 rc = mo_object_sync(env, mdt_object_child(mo));
2805         }
2806
2807         RETURN(rc);
2808 }
2809
2810 static int mdt_sync(struct tgt_session_info *tsi)
2811 {
2812         struct ptlrpc_request   *req = tgt_ses_req(tsi);
2813         struct req_capsule      *pill = tsi->tsi_pill;
2814         struct mdt_body         *body;
2815         ktime_t                  kstart = ktime_get();
2816         int                      rc;
2817
2818         ENTRY;
2819
2820         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
2821                 RETURN(err_serious(-ENOMEM));
2822
2823         if (fid_seq(&tsi->tsi_mdt_body->mbo_fid1) == 0) {
2824                 rc = mdt_device_sync(tsi->tsi_env, mdt_exp2dev(tsi->tsi_exp));
2825         } else {
2826                 struct mdt_thread_info *info = tsi2mdt_info(tsi);
2827
2828                 if (unlikely(info->mti_object == NULL))
2829                         RETURN(-EPROTO);
2830
2831                 /* sync an object */
2832                 rc = mdt_object_sync(tsi->tsi_env, tsi->tsi_exp,
2833                                      info->mti_object);
2834                 if (rc == 0) {
2835                         const struct lu_fid *fid;
2836                         struct lu_attr *la = &info->mti_attr.ma_attr;
2837
2838                         info->mti_attr.ma_need = MA_INODE;
2839                         info->mti_attr.ma_valid = 0;
2840                         rc = mdt_attr_get_complex(info, info->mti_object,
2841                                                   &info->mti_attr);
2842                         if (rc == 0) {
2843                                 body = req_capsule_server_get(pill,
2844                                                               &RMF_MDT_BODY);
2845                                 fid = mdt_object_fid(info->mti_object);
2846                                 mdt_pack_attr2body(info, body, la, fid);
2847                         }
2848                 }
2849                 mdt_thread_info_fini(info);
2850         }
2851         if (rc == 0)
2852                 mdt_counter_incr(req, LPROC_MDT_SYNC,
2853                                  ktime_us_delta(ktime_get(), kstart));
2854
2855         RETURN(rc);
2856 }
2857
2858 static int mdt_data_sync(struct tgt_session_info *tsi)
2859 {
2860         struct mdt_thread_info *info;
2861         struct mdt_device *mdt = mdt_exp2dev(tsi->tsi_exp);
2862         struct ost_body *body = tsi->tsi_ost_body;
2863         struct ost_body *repbody;
2864         struct mdt_object *mo = NULL;
2865         struct md_attr *ma;
2866         int rc = 0;
2867
2868         ENTRY;
2869
2870         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
2871
2872         /* if no fid is specified then do nothing,
2873          * device sync is done via MDS_SYNC */
2874         if (fid_is_zero(&tsi->tsi_fid))
2875                 RETURN(0);
2876
2877         mo = mdt_object_find(tsi->tsi_env, mdt, &tsi->tsi_fid);
2878         if (IS_ERR(mo))
2879                 RETURN(PTR_ERR(mo));
2880
2881         rc = mdt_object_sync(tsi->tsi_env, tsi->tsi_exp, mo);
2882         if (rc)
2883                 GOTO(put, rc);
2884
2885         repbody->oa.o_oi = body->oa.o_oi;
2886         repbody->oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2887
2888         info = tsi2mdt_info(tsi);
2889         ma = &info->mti_attr;
2890         ma->ma_need = MA_INODE;
2891         ma->ma_valid = 0;
2892         rc = mdt_attr_get_complex(info, mo, ma);
2893         if (rc == 0)
2894                 obdo_from_la(&repbody->oa, &ma->ma_attr, VALID_FLAGS);
2895         else
2896                 rc = 0;
2897         mdt_thread_info_fini(info);
2898
2899         EXIT;
2900 put:
2901         if (mo != NULL)
2902                 mdt_object_put(tsi->tsi_env, mo);
2903         return rc;
2904 }
2905
2906 /*
2907  * Handle quota control requests to consult current usage/limit, but also
2908  * to configure quota enforcement
2909  */
2910 static int mdt_quotactl(struct tgt_session_info *tsi)
2911 {
2912         struct obd_export *exp  = tsi->tsi_exp;
2913         struct req_capsule *pill = tsi->tsi_pill;
2914         struct obd_quotactl *oqctl, *repoqc;
2915         int id, rc;
2916         struct mdt_device *mdt = mdt_exp2dev(exp);
2917         struct lu_device *qmt = mdt->mdt_qmt_dev;
2918         struct lu_nodemap *nodemap;
2919         ENTRY;
2920
2921         oqctl = req_capsule_client_get(pill, &RMF_OBD_QUOTACTL);
2922         if (!oqctl)
2923                 RETURN(err_serious(-EPROTO));
2924
2925         rc = req_capsule_server_pack(pill);
2926         if (rc)
2927                 RETURN(err_serious(rc));
2928
2929         nodemap = nodemap_get_from_exp(exp);
2930         if (IS_ERR(nodemap))
2931                 RETURN(PTR_ERR(nodemap));
2932
2933         switch (oqctl->qc_cmd) {
2934                 /* master quotactl */
2935         case Q_SETINFO:
2936         case Q_SETQUOTA:
2937         case LUSTRE_Q_SETDEFAULT:
2938         case LUSTRE_Q_SETQUOTAPOOL:
2939         case LUSTRE_Q_SETINFOPOOL:
2940                 if (!nodemap_can_setquota(nodemap))
2941                         GOTO(out_nodemap, rc = -EPERM);
2942                 /* fallthrough */
2943         case Q_GETINFO:
2944         case Q_GETQUOTA:
2945         case LUSTRE_Q_GETDEFAULT:
2946         case LUSTRE_Q_GETQUOTAPOOL:
2947         case LUSTRE_Q_GETINFOPOOL:
2948                 if (qmt == NULL)
2949                         GOTO(out_nodemap, rc = -EOPNOTSUPP);
2950                 /* slave quotactl */
2951                 /* fallthrough */
2952         case Q_GETOINFO:
2953         case Q_GETOQUOTA:
2954                 break;
2955         default:
2956                 rc = -EFAULT;
2957                 CERROR("%s: unsupported quotactl command %d: rc = %d\n",
2958                        mdt_obd_name(mdt), oqctl->qc_cmd, rc);
2959                 GOTO(out_nodemap, rc);
2960         }
2961
2962         id = oqctl->qc_id;
2963         switch (oqctl->qc_type) {
2964         case USRQUOTA:
2965                 id = nodemap_map_id(nodemap, NODEMAP_UID,
2966                                     NODEMAP_CLIENT_TO_FS, id);
2967                 break;
2968         case GRPQUOTA:
2969                 id = nodemap_map_id(nodemap, NODEMAP_GID,
2970                                     NODEMAP_CLIENT_TO_FS, id);
2971                 break;
2972         case PRJQUOTA:
2973                 /* todo: check/map project id */
2974                 id = oqctl->qc_id;
2975                 break;
2976         default:
2977                 GOTO(out_nodemap, rc = -EOPNOTSUPP);
2978         }
2979         repoqc = req_capsule_server_get(pill, &RMF_OBD_QUOTACTL);
2980         if (repoqc == NULL)
2981                 GOTO(out_nodemap, rc = err_serious(-EFAULT));
2982
2983         if (oqctl->qc_cmd == Q_SETINFO || oqctl->qc_cmd == Q_SETQUOTA)
2984                 barrier_exit(tsi->tsi_tgt->lut_bottom);
2985
2986         if (oqctl->qc_id != id)
2987                 swap(oqctl->qc_id, id);
2988
2989         if (oqctl->qc_cmd == Q_SETINFO || oqctl->qc_cmd == Q_SETQUOTA) {
2990                 if (unlikely(!barrier_entry(tsi->tsi_tgt->lut_bottom)))
2991                         RETURN(-EINPROGRESS);
2992         }
2993
2994         switch (oqctl->qc_cmd) {
2995
2996         case Q_GETINFO:
2997         case Q_SETINFO:
2998         case Q_SETQUOTA:
2999         case Q_GETQUOTA:
3000         case LUSTRE_Q_SETDEFAULT:
3001         case LUSTRE_Q_GETDEFAULT:
3002         case LUSTRE_Q_SETQUOTAPOOL:
3003         case LUSTRE_Q_GETQUOTAPOOL:
3004         case LUSTRE_Q_SETINFOPOOL:
3005         case LUSTRE_Q_GETINFOPOOL:
3006                 /* forward quotactl request to QMT */
3007                 rc = qmt_hdls.qmth_quotactl(tsi->tsi_env, qmt, oqctl);
3008                 break;
3009
3010         case Q_GETOINFO:
3011         case Q_GETOQUOTA:
3012                 /* slave quotactl */
3013                 rc = lquotactl_slv(tsi->tsi_env, tsi->tsi_tgt->lut_bottom,
3014                                    oqctl);
3015                 break;
3016
3017         default:
3018                 CERROR("Unsupported quotactl command: %d\n", oqctl->qc_cmd);
3019                 GOTO(out_nodemap, rc = -EFAULT);
3020         }
3021
3022         if (oqctl->qc_id != id)
3023                 swap(oqctl->qc_id, id);
3024
3025         QCTL_COPY(repoqc, oqctl);
3026         EXIT;
3027
3028 out_nodemap:
3029         nodemap_putref(nodemap);
3030
3031         return rc;
3032 }
3033
3034 /** clone llog ctxt from child (mdd)
3035  * This allows remote llog (replicator) access.
3036  * We can either pass all llog RPCs (eg mdt_llog_create) on to child where the
3037  * context was originally set up, or we can handle them directly.
3038  * I choose the latter, but that means I need any llog
3039  * contexts set up by child to be accessable by the mdt.  So we clone the
3040  * context into our context list here.
3041  */
3042 static int mdt_llog_ctxt_clone(const struct lu_env *env, struct mdt_device *mdt,
3043                                int idx)
3044 {
3045         struct md_device  *next = mdt->mdt_child;
3046         struct llog_ctxt *ctxt;
3047         int rc;
3048
3049         if (!llog_ctxt_null(mdt2obd_dev(mdt), idx))
3050                 return 0;
3051
3052         rc = next->md_ops->mdo_llog_ctxt_get(env, next, idx, (void **)&ctxt);
3053         if (rc || ctxt == NULL) {
3054                 return 0;
3055         }
3056
3057         rc = llog_group_set_ctxt(&mdt2obd_dev(mdt)->obd_olg, ctxt, idx);
3058         if (rc)
3059                 CERROR("Can't set mdt ctxt %d\n", rc);
3060
3061         return rc;
3062 }
3063
3064 static int mdt_llog_ctxt_unclone(const struct lu_env *env,
3065                                  struct mdt_device *mdt, int idx)
3066 {
3067         struct llog_ctxt *ctxt;
3068
3069         ctxt = llog_get_context(mdt2obd_dev(mdt), idx);
3070         if (ctxt == NULL)
3071                 return 0;
3072         /* Put once for the get we just did, and once for the clone */
3073         llog_ctxt_put(ctxt);
3074         llog_ctxt_put(ctxt);
3075         return 0;
3076 }
3077
3078 /*
3079  * sec context handlers
3080  */
3081 static int mdt_sec_ctx_handle(struct tgt_session_info *tsi)
3082 {
3083         CFS_FAIL_TIMEOUT(OBD_FAIL_SEC_CTX_HDL_PAUSE, cfs_fail_val);
3084
3085         return 0;
3086 }
3087
3088 /*
3089  * quota request handlers
3090  */
3091 static int mdt_quota_dqacq(struct tgt_session_info *tsi)
3092 {
3093         struct mdt_device       *mdt = mdt_exp2dev(tsi->tsi_exp);
3094         struct lu_device        *qmt = mdt->mdt_qmt_dev;
3095         int                      rc;
3096         ENTRY;
3097
3098         if (qmt == NULL)
3099                 RETURN(err_serious(-EOPNOTSUPP));
3100
3101         rc = qmt_hdls.qmth_dqacq(tsi->tsi_env, qmt, tgt_ses_req(tsi));
3102         RETURN(rc);
3103 }
3104
3105 struct mdt_object *mdt_object_new(const struct lu_env *env,
3106                                   struct mdt_device *d,
3107                                   const struct lu_fid *f)
3108 {
3109         struct lu_object_conf conf = { .loc_flags = LOC_F_NEW };
3110         struct lu_object *o;
3111         struct mdt_object *m;
3112         ENTRY;
3113
3114         CDEBUG(D_INFO, "Allocate object for "DFID"\n", PFID(f));
3115         o = lu_object_find(env, &d->mdt_lu_dev, f, &conf);
3116         if (unlikely(IS_ERR(o)))
3117                 m = (struct mdt_object *)o;
3118         else
3119                 m = mdt_obj(o);
3120         RETURN(m);
3121 }
3122
3123 struct mdt_object *mdt_object_find(const struct lu_env *env,
3124                                    struct mdt_device *d,
3125                                    const struct lu_fid *f)
3126 {
3127         struct lu_object *o;
3128         struct mdt_object *m;
3129         ENTRY;
3130
3131         CDEBUG(D_INFO, "Find object for "DFID"\n", PFID(f));
3132         o = lu_object_find(env, &d->mdt_lu_dev, f, NULL);
3133         if (unlikely(IS_ERR(o)))
3134                 m = (struct mdt_object *)o;
3135         else
3136                 m = mdt_obj(o);
3137
3138         RETURN(m);
3139 }
3140
3141 /**
3142  * Asyncronous commit for mdt device.
3143  *
3144  * Pass asynchonous commit call down the MDS stack.
3145  *
3146  * \param env environment
3147  * \param mdt the mdt device
3148  */
3149 static void mdt_device_commit_async(const struct lu_env *env,
3150                                     struct mdt_device *mdt)
3151 {
3152         struct dt_device *dt = mdt->mdt_bottom;
3153         int rc;
3154         ENTRY;
3155
3156         rc = dt->dd_ops->dt_commit_async(env, dt);
3157         if (unlikely(rc != 0))
3158                 CWARN("%s: async commit start failed: rc = %d\n",
3159                       mdt_obd_name(mdt), rc);
3160         atomic_inc(&mdt->mdt_async_commit_count);
3161         EXIT;
3162 }
3163
3164 /**
3165  * Mark the lock as "synchonous".