Whamcloud - gitweb
LU-9325 mdt: replace simple_strtol() with kstrtol()
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2010, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/mdt/mdt_handler.c
33  *
34  * Lustre Metadata Target (mdt) request handler
35  *
36  * Author: Peter Braam <braam@clusterfs.com>
37  * Author: Andreas Dilger <adilger@clusterfs.com>
38  * Author: Phil Schwan <phil@clusterfs.com>
39  * Author: Mike Shaver <shaver@clusterfs.com>
40  * Author: Nikita Danilov <nikita@clusterfs.com>
41  * Author: Huang Hua <huanghua@clusterfs.com>
42  * Author: Yury Umanets <umka@clusterfs.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MDS
46
47 #include <linux/module.h>
48 #include <linux/pagemap.h>
49
50 #include <dt_object.h>
51 #include <lustre_acl.h>
52 #include <lustre_export.h>
53 #include <uapi/linux/lustre/lustre_ioctl.h>
54 #include <lustre_lfsck.h>
55 #include <lustre_log.h>
56 #include <lustre_nodemap.h>
57 #include <lustre_mds.h>
58 #include <uapi/linux/lustre/lustre_param.h>
59 #include <lustre_quota.h>
60 #include <lustre_swab.h>
61 #include <lustre_lmv.h>
62 #include <obd.h>
63 #include <obd_support.h>
64 #include <lustre_barrier.h>
65 #include <obd_cksum.h>
66 #include <llog_swab.h>
67
68 #include "mdt_internal.h"
69
70 static unsigned int max_mod_rpcs_per_client = 8;
71 module_param(max_mod_rpcs_per_client, uint, 0644);
72 MODULE_PARM_DESC(max_mod_rpcs_per_client, "maximum number of modify RPCs in flight allowed per client");
73
74 mdl_mode_t mdt_mdl_lock_modes[] = {
75         [LCK_MINMODE] = MDL_MINMODE,
76         [LCK_EX]      = MDL_EX,
77         [LCK_PW]      = MDL_PW,
78         [LCK_PR]      = MDL_PR,
79         [LCK_CW]      = MDL_CW,
80         [LCK_CR]      = MDL_CR,
81         [LCK_NL]      = MDL_NL,
82         [LCK_GROUP]   = MDL_GROUP
83 };
84
85 enum ldlm_mode mdt_dlm_lock_modes[] = {
86         [MDL_MINMODE]   = LCK_MINMODE,
87         [MDL_EX]        = LCK_EX,
88         [MDL_PW]        = LCK_PW,
89         [MDL_PR]        = LCK_PR,
90         [MDL_CW]        = LCK_CW,
91         [MDL_CR]        = LCK_CR,
92         [MDL_NL]        = LCK_NL,
93         [MDL_GROUP]     = LCK_GROUP
94 };
95
96 static struct mdt_device *mdt_dev(struct lu_device *d);
97
98 static const struct lu_object_operations mdt_obj_ops;
99
100 /* Slab for MDT object allocation */
101 static struct kmem_cache *mdt_object_kmem;
102
103 /* For HSM restore handles */
104 struct kmem_cache *mdt_hsm_cdt_kmem;
105
106 /* For HSM request handles */
107 struct kmem_cache *mdt_hsm_car_kmem;
108
109 static struct lu_kmem_descr mdt_caches[] = {
110         {
111                 .ckd_cache = &mdt_object_kmem,
112                 .ckd_name  = "mdt_obj",
113                 .ckd_size  = sizeof(struct mdt_object)
114         },
115         {
116                 .ckd_cache      = &mdt_hsm_cdt_kmem,
117                 .ckd_name       = "mdt_cdt_restore_handle",
118                 .ckd_size       = sizeof(struct cdt_restore_handle)
119         },
120         {
121                 .ckd_cache      = &mdt_hsm_car_kmem,
122                 .ckd_name       = "mdt_cdt_agent_req",
123                 .ckd_size       = sizeof(struct cdt_agent_req)
124         },
125         {
126                 .ckd_cache = NULL
127         }
128 };
129
130 __u64 mdt_get_disposition(struct ldlm_reply *rep, __u64 op_flag)
131 {
132         if (!rep)
133                 return 0;
134         return rep->lock_policy_res1 & op_flag;
135 }
136
137 void mdt_clear_disposition(struct mdt_thread_info *info,
138                            struct ldlm_reply *rep, __u64 op_flag)
139 {
140         if (info) {
141                 info->mti_opdata &= ~op_flag;
142                 tgt_opdata_clear(info->mti_env, op_flag);
143         }
144         if (rep)
145                 rep->lock_policy_res1 &= ~op_flag;
146 }
147
148 void mdt_set_disposition(struct mdt_thread_info *info,
149                          struct ldlm_reply *rep, __u64 op_flag)
150 {
151         if (info) {
152                 info->mti_opdata |= op_flag;
153                 tgt_opdata_set(info->mti_env, op_flag);
154         }
155         if (rep)
156                 rep->lock_policy_res1 |= op_flag;
157 }
158
159 void mdt_lock_reg_init(struct mdt_lock_handle *lh, enum ldlm_mode lm)
160 {
161         lh->mlh_pdo_hash = 0;
162         lh->mlh_reg_mode = lm;
163         lh->mlh_rreg_mode = lm;
164         lh->mlh_type = MDT_REG_LOCK;
165 }
166
167 void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode,
168                        const struct lu_name *lname)
169 {
170         lh->mlh_reg_mode = lock_mode;
171         lh->mlh_pdo_mode = LCK_MINMODE;
172         lh->mlh_rreg_mode = lock_mode;
173         lh->mlh_type = MDT_PDO_LOCK;
174
175         if (lu_name_is_valid(lname)) {
176                 lh->mlh_pdo_hash = ll_full_name_hash(NULL, lname->ln_name,
177                                                      lname->ln_namelen);
178                 /* XXX Workaround for LU-2856
179                  *
180                  * Zero is a valid return value of full_name_hash, but
181                  * several users of mlh_pdo_hash assume a non-zero
182                  * hash value. We therefore map zero onto an
183                  * arbitrary, but consistent value (1) to avoid
184                  * problems further down the road. */
185                 if (unlikely(lh->mlh_pdo_hash == 0))
186                         lh->mlh_pdo_hash = 1;
187         } else {
188                 lh->mlh_pdo_hash = 0;
189         }
190 }
191
192 static void mdt_lock_pdo_mode(struct mdt_thread_info *info, struct mdt_object *o,
193                               struct mdt_lock_handle *lh)
194 {
195         mdl_mode_t mode;
196         ENTRY;
197
198         /*
199          * Any dir access needs couple of locks:
200          *
201          * 1) on part of dir we gonna take lookup/modify;
202          *
203          * 2) on whole dir to protect it from concurrent splitting and/or to
204          * flush client's cache for readdir().
205          *
206          * so, for a given mode and object this routine decides what lock mode
207          * to use for lock #2:
208          *
209          * 1) if caller's gonna lookup in dir then we need to protect dir from
210          * being splitted only - LCK_CR
211          *
212          * 2) if caller's gonna modify dir then we need to protect dir from
213          * being splitted and to flush cache - LCK_CW
214          *
215          * 3) if caller's gonna modify dir and that dir seems ready for
216          * splitting then we need to protect it from any type of access
217          * (lookup/modify/split) - LCK_EX --bzzz
218          */
219
220         LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
221         LASSERT(lh->mlh_pdo_mode == LCK_MINMODE);
222
223         /*
224          * Ask underlaying level its opinion about preferable PDO lock mode
225          * having access type passed as regular lock mode:
226          *
227          * - MDL_MINMODE means that lower layer does not want to specify lock
228          * mode;
229          *
230          * - MDL_NL means that no PDO lock should be taken. This is used in some
231          * cases. Say, for non-splittable directories no need to use PDO locks
232          * at all.
233          */
234         mode = mdo_lock_mode(info->mti_env, mdt_object_child(o),
235                              mdt_dlm_mode2mdl_mode(lh->mlh_reg_mode));
236
237         if (mode != MDL_MINMODE) {
238                 lh->mlh_pdo_mode = mdt_mdl_mode2dlm_mode(mode);
239         } else {
240                 /*
241                  * Lower layer does not want to specify locking mode. We do it
242                  * our selves. No special protection is needed, just flush
243                  * client's cache on modification and allow concurrent
244                  * mondification.
245                  */
246                 switch (lh->mlh_reg_mode) {
247                 case LCK_EX:
248                         lh->mlh_pdo_mode = LCK_EX;
249                         break;
250                 case LCK_PR:
251                         lh->mlh_pdo_mode = LCK_CR;
252                         break;
253                 case LCK_PW:
254                         lh->mlh_pdo_mode = LCK_CW;
255                         break;
256                 default:
257                         CERROR("Not expected lock type (0x%x)\n",
258                                (int)lh->mlh_reg_mode);
259                         LBUG();
260                 }
261         }
262
263         LASSERT(lh->mlh_pdo_mode != LCK_MINMODE);
264         EXIT;
265 }
266
267 static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
268                               struct lu_fid *fid)
269 {
270         struct mdt_device *mdt = info->mti_mdt;
271         struct lu_name *lname = &info->mti_name;
272         char *filename = info->mti_filename;
273         struct mdt_object *parent;
274         u32 mode;
275         int rc = 0;
276
277         LASSERT(!info->mti_cross_ref);
278
279         /*
280          * We may want to allow this to mount a completely separate
281          * fileset from the MDT in the future, but keeping it to
282          * ROOT/ only for now avoid potential security issues.
283          */
284         *fid = mdt->mdt_md_root_fid;
285
286         while (rc == 0 && fileset != NULL && *fileset != '\0') {
287                 const char *s1 = fileset;
288                 const char *s2;
289
290                 while (*++s1 == '/')
291                         ;
292                 s2 = s1;
293                 while (*s2 != '/' && *s2 != '\0')
294                         s2++;
295
296                 if (s2 == s1)
297                         break;
298
299                 fileset = s2;
300
301                 lname->ln_namelen = s2 - s1;
302                 if (lname->ln_namelen > NAME_MAX) {
303                         rc = -EINVAL;
304                         break;
305                 }
306
307                 /* reject .. as a path component */
308                 if (lname->ln_namelen == 2 &&
309                     strncmp(s1, "..", 2) == 0) {
310                         rc = -EINVAL;
311                         break;
312                 }
313
314                 strncpy(filename, s1, lname->ln_namelen);
315                 filename[lname->ln_namelen] = '\0';
316                 lname->ln_name = filename;
317
318                 parent = mdt_object_find(info->mti_env, mdt, fid);
319                 if (IS_ERR(parent)) {
320                         rc = PTR_ERR(parent);
321                         break;
322                 }
323                 /* Only got the fid of this obj by name */
324                 fid_zero(fid);
325                 rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname,
326                                 fid, &info->mti_spec);
327                 mdt_object_put(info->mti_env, parent);
328         }
329         if (!rc) {
330                 parent = mdt_object_find(info->mti_env, mdt, fid);
331                 if (IS_ERR(parent))
332                         rc = PTR_ERR(parent);
333                 else {
334                         mode = lu_object_attr(&parent->mot_obj);
335                         mdt_object_put(info->mti_env, parent);
336                         if (!S_ISDIR(mode))
337                                 rc = -ENOTDIR;
338                 }
339         }
340
341         return rc;
342 }
343
344 static int mdt_get_root(struct tgt_session_info *tsi)
345 {
346         struct mdt_thread_info  *info = tsi2mdt_info(tsi);
347         struct mdt_device       *mdt = info->mti_mdt;
348         struct mdt_body         *repbody;
349         char                    *fileset = NULL, *buffer = NULL;
350         int                      rc;
351         struct obd_export       *exp = info->mti_exp;
352         char                    *nodemap_fileset;
353
354         ENTRY;
355
356         rc = mdt_check_ucred(info);
357         if (rc)
358                 GOTO(out, rc = err_serious(rc));
359
360         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GET_ROOT_PACK))
361                 GOTO(out, rc = err_serious(-ENOMEM));
362
363         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
364         if (req_capsule_get_size(info->mti_pill, &RMF_NAME, RCL_CLIENT) > 0) {
365                 fileset = req_capsule_client_get(info->mti_pill, &RMF_NAME);
366                 if (fileset == NULL)
367                         GOTO(out, rc = err_serious(-EFAULT));
368         }
369
370         nodemap_fileset = nodemap_get_fileset(exp->exp_target_data.ted_nodemap);
371         if (nodemap_fileset && nodemap_fileset[0]) {
372                 CDEBUG(D_INFO, "nodemap fileset is %s\n", nodemap_fileset);
373                 if (fileset) {
374                         /* consider fileset from client as a sub-fileset
375                          * of the nodemap one */
376                         OBD_ALLOC(buffer, PATH_MAX + 1);
377                         if (buffer == NULL)
378                                 GOTO(out, rc = err_serious(-ENOMEM));
379                         if (snprintf(buffer, PATH_MAX + 1, "%s/%s",
380                                      nodemap_fileset, fileset) >= PATH_MAX + 1)
381                                 GOTO(out, rc = err_serious(-EINVAL));
382                         fileset = buffer;
383                 } else {
384                         /* enforce fileset as specified in the nodemap */
385                         fileset = nodemap_fileset;
386                 }
387         }
388
389         if (fileset) {
390                 CDEBUG(D_INFO, "Getting fileset %s\n", fileset);
391                 rc = mdt_lookup_fileset(info, fileset, &repbody->mbo_fid1);
392                 if (rc < 0)
393                         GOTO(out, rc = err_serious(rc));
394         } else {
395                 repbody->mbo_fid1 = mdt->mdt_md_root_fid;
396         }
397         repbody->mbo_valid |= OBD_MD_FLID;
398
399         EXIT;
400 out:
401         mdt_thread_info_fini(info);
402         if (buffer)
403                 OBD_FREE(buffer, PATH_MAX+1);
404         return rc;
405 }
406
407 static int mdt_statfs(struct tgt_session_info *tsi)
408 {
409         struct ptlrpc_request *req = tgt_ses_req(tsi);
410         struct mdt_thread_info *info = tsi2mdt_info(tsi);
411         struct mdt_device *mdt = info->mti_mdt;
412         struct tg_grants_data *tgd = &mdt->mdt_lut.lut_tgd;
413         struct md_device *next = mdt->mdt_child;
414         struct ptlrpc_service_part *svcpt;
415         struct obd_statfs *osfs;
416         struct mdt_body *reqbody = NULL;
417         struct mdt_statfs_cache *msf;
418         int rc;
419
420         ENTRY;
421
422         svcpt = req->rq_rqbd->rqbd_svcpt;
423
424         /* This will trigger a watchdog timeout */
425         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
426                          (MDT_SERVICE_WATCHDOG_FACTOR *
427                           at_get(&svcpt->scp_at_estimate)) + 1);
428
429         rc = mdt_check_ucred(info);
430         if (rc)
431                 GOTO(out, rc = err_serious(rc));
432
433         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK))
434                 GOTO(out, rc = err_serious(-ENOMEM));
435
436         osfs = req_capsule_server_get(info->mti_pill, &RMF_OBD_STATFS);
437         if (!osfs)
438                 GOTO(out, rc = -EPROTO);
439
440         if (mdt_is_sum_statfs_client(req->rq_export) &&
441                 lustre_packed_msg_size(req->rq_reqmsg) ==
442                 req_capsule_fmt_size(req->rq_reqmsg->lm_magic,
443                                      &RQF_MDS_STATFS_NEW, RCL_CLIENT)) {
444                 req_capsule_extend(info->mti_pill, &RQF_MDS_STATFS_NEW);
445                 reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
446         }
447
448         if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS)
449                 msf = &mdt->mdt_sum_osfs;
450         else
451                 msf = &mdt->mdt_osfs;
452
453         if (msf->msf_age + OBD_STATFS_CACHE_SECONDS <= ktime_get_seconds()) {
454                         /** statfs data is too old, get up-to-date one */
455                         if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS)
456                                 rc = next->md_ops->mdo_statfs(info->mti_env,
457                                                               next, osfs);
458                         else
459                                 rc = dt_statfs(info->mti_env, mdt->mdt_bottom,
460                                                osfs);
461                         if (rc)
462                                 GOTO(out, rc);
463                         spin_lock(&mdt->mdt_lock);
464                         msf->msf_osfs = *osfs;
465                         msf->msf_age = ktime_get_seconds();
466                         spin_unlock(&mdt->mdt_lock);
467         } else {
468                         /** use cached statfs data */
469                         spin_lock(&mdt->mdt_lock);
470                         *osfs = msf->msf_osfs;
471                         spin_unlock(&mdt->mdt_lock);
472         }
473
474         /* at least try to account for cached pages.  its still racy and
475          * might be under-reporting if clients haven't announced their
476          * caches with brw recently */
477         CDEBUG(D_SUPER | D_CACHE, "blocks cached %llu granted %llu"
478                " pending %llu free %llu avail %llu\n",
479                tgd->tgd_tot_dirty, tgd->tgd_tot_granted,
480                tgd->tgd_tot_pending,
481                osfs->os_bfree << tgd->tgd_blockbits,
482                osfs->os_bavail << tgd->tgd_blockbits);
483
484         osfs->os_bavail -= min_t(u64, osfs->os_bavail,
485                                  ((tgd->tgd_tot_dirty + tgd->tgd_tot_pending +
486                                    osfs->os_bsize - 1) >> tgd->tgd_blockbits));
487
488         tgt_grant_sanity_check(mdt->mdt_lu_dev.ld_obd, __func__);
489         CDEBUG(D_CACHE, "%llu blocks: %llu free, %llu avail; "
490                "%llu objects: %llu free; state %x\n",
491                osfs->os_blocks, osfs->os_bfree, osfs->os_bavail,
492                osfs->os_files, osfs->os_ffree, osfs->os_state);
493
494         if (!exp_grant_param_supp(tsi->tsi_exp) &&
495             tgd->tgd_blockbits > COMPAT_BSIZE_SHIFT) {
496                 /* clients which don't support OBD_CONNECT_GRANT_PARAM
497                  * should not see a block size > page size, otherwise
498                  * cl_lost_grant goes mad. Therefore, we emulate a 4KB (=2^12)
499                  * block size which is the biggest block size known to work
500                  * with all client's page size. */
501                 osfs->os_blocks <<= tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT;
502                 osfs->os_bfree  <<= tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT;
503                 osfs->os_bavail <<= tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT;
504                 osfs->os_bsize = 1 << COMPAT_BSIZE_SHIFT;
505         }
506         if (rc == 0)
507                 mdt_counter_incr(req, LPROC_MDT_STATFS);
508 out:
509         mdt_thread_info_fini(info);
510         RETURN(rc);
511 }
512
513 __u32 mdt_lmm_dom_entry_check(struct lov_mds_md *lmm, int *is_dom_only)
514 {
515         struct lov_comp_md_v1 *comp_v1;
516         struct lov_mds_md *v1;
517         __u32 off;
518         __u32 dom_stripesize = 0;
519         int i;
520         bool has_ost_stripes = false;
521
522         ENTRY;
523
524         if (is_dom_only)
525                 *is_dom_only = 0;
526
527         if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_COMP_V1)
528                 RETURN(0);
529
530         comp_v1 = (struct lov_comp_md_v1 *)lmm;
531         off = le32_to_cpu(comp_v1->lcm_entries[0].lcme_offset);
532         v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
533
534         /* Fast check for DoM entry with no mirroring, should be the first */
535         if (le16_to_cpu(comp_v1->lcm_mirror_count) == 0 &&
536             lov_pattern(le32_to_cpu(v1->lmm_pattern)) != LOV_PATTERN_MDT)
537                 RETURN(0);
538
539         /* check all entries otherwise */
540         for (i = 0; i < le16_to_cpu(comp_v1->lcm_entry_count); i++) {
541                 struct lov_comp_md_entry_v1 *lcme;
542
543                 lcme = &comp_v1->lcm_entries[i];
544                 if (!(le32_to_cpu(lcme->lcme_flags) & LCME_FL_INIT))
545                         continue;
546
547                 off = le32_to_cpu(lcme->lcme_offset);
548                 v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
549
550                 if (lov_pattern(le32_to_cpu(v1->lmm_pattern)) ==
551                     LOV_PATTERN_MDT)
552                         dom_stripesize = le32_to_cpu(v1->lmm_stripe_size);
553                 else
554                         has_ost_stripes = true;
555
556                 if (dom_stripesize && has_ost_stripes)
557                         RETURN(dom_stripesize);
558         }
559         /* DoM-only case exits here */
560         if (is_dom_only && dom_stripesize)
561                 *is_dom_only = 1;
562         RETURN(dom_stripesize);
563 }
564
565 /**
566  * Pack size attributes into the reply.
567  */
568 int mdt_pack_size2body(struct mdt_thread_info *info,
569                         const struct lu_fid *fid, struct lustre_handle *lh)
570 {
571         struct mdt_body *b;
572         struct md_attr *ma = &info->mti_attr;
573         __u32 dom_stripe;
574         bool dom_lock = false;
575
576         ENTRY;
577
578         LASSERT(ma->ma_attr.la_valid & LA_MODE);
579
580         if (!S_ISREG(ma->ma_attr.la_mode) ||
581             !(ma->ma_valid & MA_LOV && ma->ma_lmm != NULL))
582                 RETURN(-ENODATA);
583
584         dom_stripe = mdt_lmm_dom_stripesize(ma->ma_lmm);
585         /* no DoM stripe, no size in reply */
586         if (!dom_stripe)
587                 RETURN(-ENOENT);
588
589         if (lustre_handle_is_used(lh)) {
590                 struct ldlm_lock *lock;
591
592                 lock = ldlm_handle2lock(lh);
593                 if (lock != NULL) {
594                         dom_lock = ldlm_has_dom(lock);
595                         LDLM_LOCK_PUT(lock);
596                 }
597         }
598
599         /* no DoM lock, no size in reply */
600         if (!dom_lock)
601                 RETURN(0);
602
603         /* Either DoM lock exists or LMM has only DoM stripe then
604          * return size on body. */
605         b = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
606
607         mdt_dom_object_size(info->mti_env, info->mti_mdt, fid, b, dom_lock);
608         RETURN(0);
609 }
610
611 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
612 /*
613  * Pack ACL data into the reply. UIDs/GIDs are mapped and filtered by nodemap.
614  *
615  * \param       info    thread info object
616  * \param       repbody reply to pack ACLs into
617  * \param       o       mdt object of file to examine
618  * \param       nodemap nodemap of client to reply to
619  * \retval      0       success
620  * \retval      -errno  error getting or parsing ACL from disk
621  */
622 int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody,
623                       struct mdt_object *o, struct lu_nodemap *nodemap)
624 {
625         const struct lu_env     *env = info->mti_env;
626         struct md_object        *next = mdt_object_child(o);
627         struct lu_buf           *buf = &info->mti_buf;
628         struct mdt_device       *mdt = info->mti_mdt;
629         struct req_capsule *pill = info->mti_pill;
630         int rc;
631
632         ENTRY;
633
634         buf->lb_buf = req_capsule_server_get(pill, &RMF_ACL);
635         buf->lb_len = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER);
636         if (buf->lb_len == 0)
637                 RETURN(0);
638
639 again:
640         rc = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_ACCESS);
641         if (rc < 0) {
642                 if (rc == -ENODATA) {
643                         repbody->mbo_aclsize = 0;
644                         repbody->mbo_valid |= OBD_MD_FLACL;
645                         rc = 0;
646                 } else if (rc == -EOPNOTSUPP) {
647                         rc = 0;
648                 } else {
649                         if (rc == -ERANGE &&
650                             exp_connect_large_acl(info->mti_exp) &&
651                             buf->lb_buf != info->mti_big_acl) {
652                                 if (info->mti_big_acl == NULL) {
653                                         info->mti_big_aclsize =
654                                                         min_t(unsigned int,
655                                                               mdt->mdt_max_ea_size,
656                                                               XATTR_SIZE_MAX);
657                                         OBD_ALLOC_LARGE(info->mti_big_acl,
658                                                         info->mti_big_aclsize);
659                                         if (info->mti_big_acl == NULL) {
660                                                 info->mti_big_aclsize = 0;
661                                                 CERROR("%s: unable to grow "
662                                                        DFID" ACL buffer\n",
663                                                        mdt_obd_name(mdt),
664                                                        PFID(mdt_object_fid(o)));
665                                                 RETURN(-ENOMEM);
666                                         }
667                                 }
668
669                                 CDEBUG(D_INODE, "%s: grow the "DFID
670                                        " ACL buffer to size %d\n",
671                                        mdt_obd_name(mdt),
672                                        PFID(mdt_object_fid(o)),
673                                        info->mti_big_aclsize);
674
675                                 buf->lb_buf = info->mti_big_acl;
676                                 buf->lb_len = info->mti_big_aclsize;
677
678                                 goto again;
679                         }
680
681                         CERROR("%s: unable to read "DFID" ACL: rc = %d\n",
682                                mdt_obd_name(mdt), PFID(mdt_object_fid(o)), rc);
683                 }
684         } else {
685                 int client;
686                 int server;
687                 int acl_buflen;
688                 int lmm_buflen = 0;
689                 int lmmsize = 0;
690
691                 acl_buflen = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER);
692                 if (acl_buflen >= rc)
693                         goto map;
694
695                 /* If LOV/LMA EA is small, we can reuse part of their buffer */
696                 client = ptlrpc_req_get_repsize(pill->rc_req);
697                 server = lustre_packed_msg_size(pill->rc_req->rq_repmsg);
698                 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) {
699                         lmm_buflen = req_capsule_get_size(pill, &RMF_MDT_MD,
700                                                           RCL_SERVER);
701                         lmmsize = repbody->mbo_eadatasize;
702                 }
703
704                 if (client < server - acl_buflen - lmm_buflen + rc + lmmsize) {
705                         CDEBUG(D_INODE, "%s: client prepared buffer size %d "
706                                "is not big enough with the ACL size %d (%d)\n",
707                                mdt_obd_name(mdt), client, rc,
708                                server - acl_buflen - lmm_buflen + rc + lmmsize);
709                         repbody->mbo_aclsize = 0;
710                         repbody->mbo_valid &= ~OBD_MD_FLACL;
711                         RETURN(-ERANGE);
712                 }
713
714 map:
715                 if (buf->lb_buf == info->mti_big_acl)
716                         info->mti_big_acl_used = 1;
717
718                 rc = nodemap_map_acl(nodemap, buf->lb_buf,
719                                      rc, NODEMAP_FS_TO_CLIENT);
720                 /* if all ACLs mapped out, rc is still >= 0 */
721                 if (rc < 0) {
722                         CERROR("%s: nodemap_map_acl unable to parse "DFID
723                                " ACL: rc = %d\n", mdt_obd_name(mdt),
724                                PFID(mdt_object_fid(o)), rc);
725                         repbody->mbo_aclsize = 0;
726                         repbody->mbo_valid &= ~OBD_MD_FLACL;
727                 } else {
728                         repbody->mbo_aclsize = rc;
729                         repbody->mbo_valid |= OBD_MD_FLACL;
730                         rc = 0;
731                 }
732         }
733
734         RETURN(rc);
735 }
736 #endif
737
738 /* XXX Look into layout in MDT layer. */
739 static inline bool mdt_hsm_is_released(struct lov_mds_md *lmm)
740 {
741         struct lov_comp_md_v1   *comp_v1;
742         struct lov_mds_md       *v1;
743         int                      i;
744
745         if (lmm->lmm_magic == LOV_MAGIC_COMP_V1) {
746                 comp_v1 = (struct lov_comp_md_v1 *)lmm;
747
748                 for (i = 0; i < comp_v1->lcm_entry_count; i++) {
749                         v1 = (struct lov_mds_md *)((char *)comp_v1 +
750                                 comp_v1->lcm_entries[i].lcme_offset);
751                         /* We don't support partial release for now */
752                         if (!(v1->lmm_pattern & LOV_PATTERN_F_RELEASED))
753                                 return false;
754                 }
755                 return true;
756         } else {
757                 return (lmm->lmm_pattern & LOV_PATTERN_F_RELEASED) ?
758                         true : false;
759         }
760 }
761
762 void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
763                         const struct lu_attr *attr, const struct lu_fid *fid)
764 {
765         struct md_attr *ma = &info->mti_attr;
766         struct obd_export *exp = info->mti_exp;
767         struct lu_nodemap *nodemap = NULL;
768
769         LASSERT(ma->ma_valid & MA_INODE);
770
771         if (attr->la_valid & LA_ATIME) {
772                 b->mbo_atime = attr->la_atime;
773                 b->mbo_valid |= OBD_MD_FLATIME;
774         }
775         if (attr->la_valid & LA_MTIME) {
776                 b->mbo_mtime = attr->la_mtime;
777                 b->mbo_valid |= OBD_MD_FLMTIME;
778         }
779         if (attr->la_valid & LA_CTIME) {
780                 b->mbo_ctime = attr->la_ctime;
781                 b->mbo_valid |= OBD_MD_FLCTIME;
782         }
783         if (attr->la_valid & LA_BTIME) {
784                 b->mbo_btime = attr->la_btime;
785                 b->mbo_valid |= OBD_MD_FLBTIME;
786         }
787         if (attr->la_valid & LA_FLAGS) {
788                 b->mbo_flags = attr->la_flags;
789                 b->mbo_valid |= OBD_MD_FLFLAGS;
790         }
791         if (attr->la_valid & LA_NLINK) {
792                 b->mbo_nlink = attr->la_nlink;
793                 b->mbo_valid |= OBD_MD_FLNLINK;
794         }
795         if (attr->la_valid & (LA_UID|LA_GID)) {
796                 nodemap = nodemap_get_from_exp(exp);
797                 if (IS_ERR(nodemap))
798                         goto out;
799         }
800         if (attr->la_valid & LA_UID) {
801                 b->mbo_uid = nodemap_map_id(nodemap, NODEMAP_UID,
802                                             NODEMAP_FS_TO_CLIENT,
803                                             attr->la_uid);
804                 b->mbo_valid |= OBD_MD_FLUID;
805         }
806         if (attr->la_valid & LA_GID) {
807                 b->mbo_gid = nodemap_map_id(nodemap, NODEMAP_GID,
808                                             NODEMAP_FS_TO_CLIENT,
809                                             attr->la_gid);
810                 b->mbo_valid |= OBD_MD_FLGID;
811         }
812
813         if (attr->la_valid & LA_PROJID) {
814                 /* TODO, nodemap for project id */
815                 b->mbo_projid = attr->la_projid;
816                 b->mbo_valid |= OBD_MD_FLPROJID;
817         }
818
819         b->mbo_mode = attr->la_mode;
820         if (attr->la_valid & LA_MODE)
821                 b->mbo_valid |= OBD_MD_FLMODE;
822         if (attr->la_valid & LA_TYPE)
823                 b->mbo_valid |= OBD_MD_FLTYPE;
824
825         if (fid != NULL) {
826                 b->mbo_fid1 = *fid;
827                 b->mbo_valid |= OBD_MD_FLID;
828                 CDEBUG(D_INODE, DFID": nlink=%d, mode=%o, valid=%#llx\n",
829                        PFID(fid), b->mbo_nlink, b->mbo_mode, b->mbo_valid);
830         }
831
832         if (!(attr->la_valid & LA_TYPE))
833                 return;
834
835         b->mbo_rdev   = attr->la_rdev;
836         b->mbo_size   = attr->la_size;
837         b->mbo_blocks = attr->la_blocks;
838
839         if (!S_ISREG(attr->la_mode)) {
840                 b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLRDEV;
841         } else if (ma->ma_need & MA_LOV && !(ma->ma_valid & MA_LOV)) {
842                 /* means no objects are allocated on osts. */
843                 LASSERT(!(ma->ma_valid & MA_LOV));
844                 /* just ignore blocks occupied by extend attributes on MDS */
845                 b->mbo_blocks = 0;
846                 /* if no object is allocated on osts, the size on mds is valid.
847                  * b=22272 */
848                 b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
849         } else if ((ma->ma_valid & MA_LOV) && ma->ma_lmm != NULL) {
850                 if (mdt_hsm_is_released(ma->ma_lmm)) {
851                         /* A released file stores its size on MDS. */
852                         /* But return 1 block for released file, unless tools
853                          * like tar will consider it fully sparse. (LU-3864)
854                          */
855                         if (unlikely(b->mbo_size == 0))
856                                 b->mbo_blocks = 0;
857                         else
858                                 b->mbo_blocks = 1;
859                         b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
860                 } else if (info->mti_som_valid) { /* som is valid */
861                         b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
862                 } else if (ma->ma_valid & MA_SOM) { /* lsom is valid */
863                         b->mbo_valid |= OBD_MD_FLLAZYSIZE | OBD_MD_FLLAZYBLOCKS;
864                         b->mbo_size = ma->ma_som.ms_size;
865                         b->mbo_blocks = ma->ma_som.ms_blocks;
866                 }
867         }
868
869         if (fid != NULL && (b->mbo_valid & OBD_MD_FLSIZE ||
870                             b->mbo_valid & OBD_MD_FLLAZYSIZE))
871                 CDEBUG(D_VFSTRACE, DFID": returning size %llu\n",
872                        PFID(fid), (unsigned long long)b->mbo_size);
873
874 out:
875         if (!IS_ERR_OR_NULL(nodemap))
876                 nodemap_putref(nodemap);
877 }
878
879 static inline int mdt_body_has_lov(const struct lu_attr *la,
880                                    const struct mdt_body *body)
881 {
882         return (S_ISREG(la->la_mode) && (body->mbo_valid & OBD_MD_FLEASIZE)) ||
883                (S_ISDIR(la->la_mode) && (body->mbo_valid & OBD_MD_FLDIREA));
884 }
885
886 void mdt_client_compatibility(struct mdt_thread_info *info)
887 {
888         struct mdt_body       *body;
889         struct ptlrpc_request *req = mdt_info_req(info);
890         struct obd_export     *exp = req->rq_export;
891         struct md_attr        *ma = &info->mti_attr;
892         struct lu_attr        *la = &ma->ma_attr;
893         ENTRY;
894
895         if (exp_connect_layout(exp))
896                 /* the client can deal with 16-bit lmm_stripe_count */
897                 RETURN_EXIT;
898
899         body = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
900
901         if (!mdt_body_has_lov(la, body))
902                 RETURN_EXIT;
903
904         /* now we have a reply with a lov for a client not compatible with the
905          * layout lock so we have to clean the layout generation number */
906         if (S_ISREG(la->la_mode))
907                 ma->ma_lmm->lmm_layout_gen = 0;
908         EXIT;
909 }
910
911 static int mdt_attr_get_eabuf_size(struct mdt_thread_info *info,
912                                    struct mdt_object *o)
913 {
914         const struct lu_env *env = info->mti_env;
915         int rc, rc2;
916
917         rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL,
918                           XATTR_NAME_LOV);
919
920         if (rc == -ENODATA)
921                 rc = 0;
922
923         if (rc < 0)
924                 goto out;
925
926         /* Is it a directory? Let's check for the LMV as well */
927         if (S_ISDIR(lu_object_attr(&mdt_object_child(o)->mo_lu))) {
928                 rc2 = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL,
929                                    XATTR_NAME_LMV);
930
931                 if (rc2 == -ENODATA)
932                         rc2 = mo_xattr_get(env, mdt_object_child(o),
933                                            &LU_BUF_NULL,
934                                            XATTR_NAME_DEFAULT_LMV);
935
936                 if ((rc2 < 0 && rc2 != -ENODATA) || (rc2 > rc))
937                         rc = rc2;
938         }
939
940 out:
941         return rc;
942 }
943
944 int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
945                       const char *name)
946 {
947         const struct lu_env *env = info->mti_env;
948         int rc;
949         ENTRY;
950
951         LASSERT(info->mti_big_lmm_used == 0);
952         rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL, name);
953         if (rc < 0)
954                 RETURN(rc);
955
956         /* big_lmm may need to be grown */
957         if (info->mti_big_lmmsize < rc) {
958                 int size = size_roundup_power2(rc);
959
960                 if (info->mti_big_lmmsize > 0) {
961                         /* free old buffer */
962                         LASSERT(info->mti_big_lmm);
963                         OBD_FREE_LARGE(info->mti_big_lmm,
964                                        info->mti_big_lmmsize);
965                         info->mti_big_lmm = NULL;
966                         info->mti_big_lmmsize = 0;
967                 }
968
969                 OBD_ALLOC_LARGE(info->mti_big_lmm, size);
970                 if (info->mti_big_lmm == NULL)
971                         RETURN(-ENOMEM);
972                 info->mti_big_lmmsize = size;
973         }
974         LASSERT(info->mti_big_lmmsize >= rc);
975
976         info->mti_buf.lb_buf = info->mti_big_lmm;
977         info->mti_buf.lb_len = info->mti_big_lmmsize;
978         rc = mo_xattr_get(env, mdt_object_child(o), &info->mti_buf, name);
979
980         RETURN(rc);
981 }
982
983 int __mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
984                      struct md_attr *ma, const char *name)
985 {
986         struct md_object *next = mdt_object_child(o);
987         struct lu_buf    *buf = &info->mti_buf;
988         int rc;
989
990         if (strcmp(name, XATTR_NAME_LOV) == 0) {
991                 buf->lb_buf = ma->ma_lmm;
992                 buf->lb_len = ma->ma_lmm_size;
993                 LASSERT(!(ma->ma_valid & MA_LOV));
994         } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
995                 buf->lb_buf = ma->ma_lmv;
996                 buf->lb_len = ma->ma_lmv_size;
997                 LASSERT(!(ma->ma_valid & MA_LMV));
998         } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
999                 buf->lb_buf = ma->ma_default_lmv;
1000                 buf->lb_len = ma->ma_default_lmv_size;
1001                 LASSERT(!(ma->ma_valid & MA_LMV_DEF));
1002         } else {
1003                 return -EINVAL;
1004         }
1005
1006         LASSERT(buf->lb_buf);
1007
1008         rc = mo_xattr_get(info->mti_env, next, buf, name);
1009         if (rc > 0) {
1010
1011 got:
1012                 if (strcmp(name, XATTR_NAME_LOV) == 0) {
1013                         if (info->mti_big_lmm_used)
1014                                 ma->ma_lmm = info->mti_big_lmm;
1015
1016                         /* NOT return LOV EA with hole to old client. */
1017                         if (unlikely(le32_to_cpu(ma->ma_lmm->lmm_pattern) &
1018                                      LOV_PATTERN_F_HOLE) &&
1019                             !(exp_connect_flags(info->mti_exp) &
1020                               OBD_CONNECT_LFSCK)) {
1021                                 return -EIO;
1022                         } else {
1023                                 ma->ma_lmm_size = rc;
1024                                 ma->ma_valid |= MA_LOV;
1025                         }
1026                 } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
1027                         if (info->mti_big_lmm_used)
1028                                 ma->ma_lmv = info->mti_big_lmm;
1029
1030                         ma->ma_lmv_size = rc;
1031                         ma->ma_valid |= MA_LMV;
1032                 } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1033                         ma->ma_default_lmv_size = rc;
1034                         ma->ma_valid |= MA_LMV_DEF;
1035                 }
1036
1037                 /* Update mdt_max_mdsize so all clients will be aware that */
1038                 if (info->mti_mdt->mdt_max_mdsize < rc)
1039                         info->mti_mdt->mdt_max_mdsize = rc;
1040
1041                 rc = 0;
1042         } else if (rc == -ENODATA) {
1043                 /* no LOV EA */
1044                 rc = 0;
1045         } else if (rc == -ERANGE) {
1046                 /* Default LMV has fixed size, so it must be able to fit
1047                  * in the original buffer */
1048                 if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0)
1049                         return rc;
1050                 rc = mdt_big_xattr_get(info, o, name);
1051                 if (rc > 0) {
1052                         info->mti_big_lmm_used = 1;
1053                         goto got;
1054                 }
1055         }
1056
1057         return rc;
1058 }
1059
1060 int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
1061                    struct md_attr *ma, const char *name)
1062 {
1063         int rc;
1064
1065         if (!info->mti_big_lmm) {
1066                 OBD_ALLOC(info->mti_big_lmm, PAGE_SIZE);
1067                 if (!info->mti_big_lmm)
1068                         return -ENOMEM;
1069                 info->mti_big_lmmsize = PAGE_SIZE;
1070         }
1071
1072         if (strcmp(name, XATTR_NAME_LOV) == 0) {
1073                 ma->ma_lmm = info->mti_big_lmm;
1074                 ma->ma_lmm_size = info->mti_big_lmmsize;
1075                 ma->ma_valid &= ~MA_LOV;
1076         } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
1077                 ma->ma_lmv = info->mti_big_lmm;
1078                 ma->ma_lmv_size = info->mti_big_lmmsize;
1079                 ma->ma_valid &= ~MA_LMV;
1080         } else {
1081                 LBUG();
1082         }
1083
1084         LASSERT(!info->mti_big_lmm_used);
1085         rc = __mdt_stripe_get(info, o, ma, name);
1086         /* since big_lmm is always used here, clear 'used' flag to avoid
1087          * assertion in mdt_big_xattr_get().
1088          */
1089         info->mti_big_lmm_used = 0;
1090
1091         return rc;
1092 }
1093
1094 int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
1095                       struct lu_fid *pfid)
1096 {
1097         struct lu_buf           *buf = &info->mti_buf;
1098         struct link_ea_header   *leh;
1099         struct link_ea_entry    *lee;
1100         int                      rc;
1101         ENTRY;
1102
1103         buf->lb_buf = info->mti_big_lmm;
1104         buf->lb_len = info->mti_big_lmmsize;
1105         rc = mo_xattr_get(info->mti_env, mdt_object_child(o),
1106                           buf, XATTR_NAME_LINK);
1107         /* ignore errors, MA_PFID won't be set and it is
1108          * up to the caller to treat this as an error */
1109         if (rc == -ERANGE || buf->lb_len == 0) {
1110                 rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
1111                 buf->lb_buf = info->mti_big_lmm;
1112                 buf->lb_len = info->mti_big_lmmsize;
1113         }
1114
1115         if (rc < 0)
1116                 RETURN(rc);
1117         if (rc < sizeof(*leh)) {
1118                 CERROR("short LinkEA on "DFID": rc = %d\n",
1119                        PFID(mdt_object_fid(o)), rc);
1120                 RETURN(-ENODATA);
1121         }
1122
1123         leh = (struct link_ea_header *) buf->lb_buf;
1124         lee = (struct link_ea_entry *)(leh + 1);
1125         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
1126                 leh->leh_magic = LINK_EA_MAGIC;
1127                 leh->leh_reccount = __swab32(leh->leh_reccount);
1128                 leh->leh_len = __swab64(leh->leh_len);
1129         }
1130         if (leh->leh_magic != LINK_EA_MAGIC)
1131                 RETURN(-EINVAL);
1132         if (leh->leh_reccount == 0)
1133                 RETURN(-ENODATA);
1134
1135         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
1136         fid_be_to_cpu(pfid, pfid);
1137
1138         RETURN(0);
1139 }
1140
1141 int mdt_attr_get_pfid_name(struct mdt_thread_info *info, struct mdt_object *o,
1142                            struct lu_fid *pfid, struct lu_name *lname)
1143 {
1144         struct lu_buf *buf = &info->mti_buf;
1145         struct link_ea_header *leh;
1146         struct link_ea_entry *lee;
1147         int reclen;
1148         int rc;
1149
1150         buf->lb_buf = info->mti_xattr_buf;
1151         buf->lb_len = sizeof(info->mti_xattr_buf);
1152         rc = mo_xattr_get(info->mti_env, mdt_object_child(o), buf,
1153                           XATTR_NAME_LINK);
1154         if (rc == -ERANGE) {
1155                 rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
1156                 buf->lb_buf = info->mti_big_lmm;
1157                 buf->lb_len = info->mti_big_lmmsize;
1158         }
1159         if (rc < 0)
1160                 return rc;
1161
1162         if (rc < sizeof(*leh)) {
1163                 CERROR("short LinkEA on "DFID": rc = %d\n",
1164                        PFID(mdt_object_fid(o)), rc);
1165                 return -ENODATA;
1166         }
1167
1168         leh = (struct link_ea_header *)buf->lb_buf;
1169         lee = (struct link_ea_entry *)(leh + 1);
1170         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
1171                 leh->leh_magic = LINK_EA_MAGIC;
1172                 leh->leh_reccount = __swab32(leh->leh_reccount);
1173                 leh->leh_len = __swab64(leh->leh_len);
1174         }
1175         if (leh->leh_magic != LINK_EA_MAGIC)
1176                 return -EINVAL;
1177
1178         if (leh->leh_reccount == 0)
1179                 return -ENODATA;
1180
1181         linkea_entry_unpack(lee, &reclen, lname, pfid);
1182
1183         return 0;
1184 }
1185
1186 int mdt_attr_get_complex(struct mdt_thread_info *info,
1187                          struct mdt_object *o, struct md_attr *ma)
1188 {
1189         const struct lu_env *env = info->mti_env;
1190         struct md_object    *next = mdt_object_child(o);
1191         struct lu_buf       *buf = &info->mti_buf;
1192         int                  need = ma->ma_need;
1193         int                  rc = 0, rc2;
1194         u32                  mode;
1195         ENTRY;
1196
1197         ma->ma_valid = 0;
1198
1199         if (mdt_object_exists(o) == 0)
1200                 GOTO(out, rc = -ENOENT);
1201         mode = lu_object_attr(&next->mo_lu);
1202
1203         if (need & MA_INODE) {
1204                 ma->ma_need = MA_INODE;
1205                 rc = mo_attr_get(env, next, ma);
1206                 if (rc)
1207                         GOTO(out, rc);
1208
1209                 if (S_ISREG(mode))
1210                         (void) mdt_get_som(info, o, ma);
1211                 ma->ma_valid |= MA_INODE;
1212         }
1213
1214         if (need & MA_PFID) {
1215                 rc = mdt_attr_get_pfid(info, o, &ma->ma_pfid);
1216                 if (rc == 0)
1217                         ma->ma_valid |= MA_PFID;
1218                 /* ignore this error, parent fid is not mandatory */
1219                 rc = 0;
1220         }
1221
1222         if (need & MA_LOV && (S_ISREG(mode) || S_ISDIR(mode))) {
1223                 rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LOV);
1224                 if (rc)
1225                         GOTO(out, rc);
1226         }
1227
1228         if (need & MA_LMV && S_ISDIR(mode)) {
1229                 rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
1230                 if (rc != 0)
1231                         GOTO(out, rc);
1232         }
1233
1234         if (need & MA_LMV_DEF && S_ISDIR(mode)) {
1235                 rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV);
1236                 if (rc != 0)
1237                         GOTO(out, rc);
1238         }
1239
1240         /*
1241          * In the handle of MA_INODE, we may already get the SOM attr.
1242          */
1243         if (need & MA_SOM && S_ISREG(mode) && !(ma->ma_valid & MA_SOM)) {
1244                 rc = mdt_get_som(info, o, ma);
1245                 if (rc != 0)
1246                         GOTO(out, rc);
1247         }
1248
1249         if (need & MA_HSM && S_ISREG(mode)) {
1250                 buf->lb_buf = info->mti_xattr_buf;
1251                 buf->lb_len = sizeof(info->mti_xattr_buf);
1252                 BUILD_BUG_ON(sizeof(struct hsm_attrs) >
1253                              sizeof(info->mti_xattr_buf));
1254                 rc2 = mo_xattr_get(info->mti_env, next, buf, XATTR_NAME_HSM);
1255                 rc2 = lustre_buf2hsm(info->mti_xattr_buf, rc2, &ma->ma_hsm);
1256                 if (rc2 == 0)
1257                         ma->ma_valid |= MA_HSM;
1258                 else if (rc2 < 0 && rc2 != -ENODATA)
1259                         GOTO(out, rc = rc2);
1260         }
1261
1262 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
1263         if (need & MA_ACL_DEF && S_ISDIR(mode)) {
1264                 buf->lb_buf = ma->ma_acl;
1265                 buf->lb_len = ma->ma_acl_size;
1266                 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT);
1267                 if (rc2 > 0) {
1268                         ma->ma_acl_size = rc2;
1269                         ma->ma_valid |= MA_ACL_DEF;
1270                 } else if (rc2 == -ENODATA) {
1271                         /* no ACLs */
1272                         ma->ma_acl_size = 0;
1273                 } else
1274                         GOTO(out, rc = rc2);
1275         }
1276 #endif
1277 out:
1278         ma->ma_need = need;
1279         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = %#llx ma_lmm=%p\n",
1280                rc, ma->ma_valid, ma->ma_lmm);
1281         RETURN(rc);
1282 }
1283
1284 static int mdt_getattr_internal(struct mdt_thread_info *info,
1285                                 struct mdt_object *o, int ma_need)
1286 {
1287         struct mdt_device *mdt = info->mti_mdt;
1288         struct md_object *next = mdt_object_child(o);
1289         const struct mdt_body *reqbody = info->mti_body;
1290         struct ptlrpc_request *req = mdt_info_req(info);
1291         struct md_attr *ma = &info->mti_attr;
1292         struct lu_attr *la = &ma->ma_attr;
1293         struct req_capsule *pill = info->mti_pill;
1294         const struct lu_env *env = info->mti_env;
1295         struct mdt_body *repbody;
1296         struct lu_buf *buffer = &info->mti_buf;
1297         struct obd_export *exp = info->mti_exp;
1298         int rc;
1299
1300         ENTRY;
1301
1302         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
1303                 RETURN(err_serious(-ENOMEM));
1304
1305         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
1306
1307         ma->ma_valid = 0;
1308
1309         if (mdt_object_remote(o)) {
1310                 /* This object is located on remote node.*/
1311                 /* Return -ENOTSUPP for old client */
1312                 if (!mdt_is_dne_client(req->rq_export))
1313                         GOTO(out, rc = -ENOTSUPP);
1314
1315                 repbody->mbo_fid1 = *mdt_object_fid(o);
1316                 repbody->mbo_valid = OBD_MD_FLID | OBD_MD_MDS;
1317                 GOTO(out, rc = 0);
1318         }
1319
1320         if (reqbody->mbo_eadatasize > 0) {
1321                 buffer->lb_buf = req_capsule_server_get(pill, &RMF_MDT_MD);
1322                 if (buffer->lb_buf == NULL)
1323                         GOTO(out, rc = -EPROTO);
1324                 buffer->lb_len = req_capsule_get_size(pill, &RMF_MDT_MD,
1325                                                       RCL_SERVER);
1326         } else {
1327                 buffer->lb_buf = NULL;
1328                 buffer->lb_len = 0;
1329                 ma_need &= ~(MA_LOV | MA_LMV);
1330                 CDEBUG(D_INFO, "%s: RPC from %s: does not need LOVEA.\n",
1331                        mdt_obd_name(info->mti_mdt),
1332                        req->rq_export->exp_client_uuid.uuid);
1333         }
1334
1335         /* from 2.12.58 intent_getattr pack default LMV in reply */
1336         if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
1337             ((reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA)) ==
1338                     (OBD_MD_MEA | OBD_MD_DEFAULT_MEA)) &&
1339             req_capsule_has_field(&req->rq_pill, &RMF_DEFAULT_MDT_MD,
1340                                   RCL_SERVER)) {
1341                 ma->ma_lmv = buffer->lb_buf;
1342                 ma->ma_lmv_size = buffer->lb_len;
1343                 ma->ma_default_lmv = req_capsule_server_get(pill,
1344                                                 &RMF_DEFAULT_MDT_MD);
1345                 ma->ma_default_lmv_size = req_capsule_get_size(pill,
1346                                                 &RMF_DEFAULT_MDT_MD,
1347                                                 RCL_SERVER);
1348                 ma->ma_need = MA_INODE;
1349                 if (ma->ma_lmv_size > 0)
1350                         ma->ma_need |= MA_LMV;
1351                 if (ma->ma_default_lmv_size > 0)
1352                         ma->ma_need |= MA_LMV_DEF;
1353         } else if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
1354                    (reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA))) {
1355                 /* If it is dir and client require MEA, then we got MEA */
1356                 /* Assumption: MDT_MD size is enough for lmv size. */
1357                 ma->ma_lmv = buffer->lb_buf;
1358                 ma->ma_lmv_size = buffer->lb_len;
1359                 ma->ma_need = MA_INODE;
1360                 if (ma->ma_lmv_size > 0) {
1361                         if (reqbody->mbo_valid & OBD_MD_MEA) {
1362                                 ma->ma_need |= MA_LMV;
1363                         } else if (reqbody->mbo_valid & OBD_MD_DEFAULT_MEA) {
1364                                 ma->ma_need |= MA_LMV_DEF;
1365                                 ma->ma_default_lmv = buffer->lb_buf;
1366                                 ma->ma_lmv = NULL;
1367                                 ma->ma_default_lmv_size = buffer->lb_len;
1368                                 ma->ma_lmv_size = 0;
1369                         }
1370                 }
1371         } else {
1372                 ma->ma_lmm = buffer->lb_buf;
1373                 ma->ma_lmm_size = buffer->lb_len;
1374                 ma->ma_need = MA_INODE | MA_HSM;
1375                 if (ma->ma_lmm_size > 0) {
1376                         ma->ma_need |= MA_LOV;
1377                         /* Older clients may crash if they getattr overstriped
1378                          * files
1379                          */
1380                         if (!exp_connect_overstriping(exp) &&
1381                             mdt_lmm_is_overstriping(ma->ma_lmm))
1382                                 RETURN(-EOPNOTSUPP);
1383                 }
1384         }
1385
1386         if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
1387             reqbody->mbo_valid & OBD_MD_FLDIREA  &&
1388             lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) {
1389                 /* get default stripe info for this dir. */
1390                 ma->ma_need |= MA_LOV_DEF;
1391         }
1392         ma->ma_need |= ma_need;
1393
1394         rc = mdt_attr_get_complex(info, o, ma);
1395         if (unlikely(rc)) {
1396                 CDEBUG(rc == -ENOENT ? D_OTHER : D_ERROR,
1397                        "%s: getattr error for "DFID": rc = %d\n",
1398                        mdt_obd_name(info->mti_mdt),
1399                        PFID(mdt_object_fid(o)), rc);
1400                 RETURN(rc);
1401         }
1402
1403         /* if file is released, check if a restore is running */
1404         if (ma->ma_valid & MA_HSM) {
1405                 repbody->mbo_valid |= OBD_MD_TSTATE;
1406                 if ((ma->ma_hsm.mh_flags & HS_RELEASED) &&
1407                     mdt_hsm_restore_is_running(info, mdt_object_fid(o)))
1408                         repbody->mbo_t_state = MS_RESTORE;
1409         }
1410
1411         if (unlikely(!(ma->ma_valid & MA_INODE)))
1412                 RETURN(-EFAULT);
1413
1414         mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
1415
1416         if (mdt_body_has_lov(la, reqbody)) {
1417                 u32 stripe_count = 1;
1418
1419                 if (ma->ma_valid & MA_LOV) {
1420                         LASSERT(ma->ma_lmm_size);
1421                         repbody->mbo_eadatasize = ma->ma_lmm_size;
1422                         if (S_ISDIR(la->la_mode))
1423                                 repbody->mbo_valid |= OBD_MD_FLDIREA;
1424                         else
1425                                 repbody->mbo_valid |= OBD_MD_FLEASIZE;
1426                         mdt_dump_lmm(D_INFO, ma->ma_lmm, repbody->mbo_valid);
1427                 }
1428                 if (ma->ma_valid & MA_LMV) {
1429                         struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1430                         u32 magic = le32_to_cpu(lmv->lmv_magic);
1431
1432                         /* Return -ENOTSUPP for old client */
1433                         if (!mdt_is_striped_client(req->rq_export))
1434                                 RETURN(-ENOTSUPP);
1435
1436                         LASSERT(S_ISDIR(la->la_mode));
1437                         mdt_dump_lmv(D_INFO, ma->ma_lmv);
1438                         repbody->mbo_eadatasize = ma->ma_lmv_size;
1439                         repbody->mbo_valid |= (OBD_MD_FLDIREA|OBD_MD_MEA);
1440
1441                         stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
1442                         if (magic == LMV_MAGIC_STRIPE && lmv_is_restriping(lmv))
1443                                 mdt_restripe_migrate_add(info, o);
1444                         else if (magic == LMV_MAGIC_V1 &&
1445                                  lmv_is_restriping(lmv))
1446                                 mdt_restripe_update_add(info, o);
1447                 }
1448                 if (ma->ma_valid & MA_LMV_DEF) {
1449                         /* Return -ENOTSUPP for old client */
1450                         if (!mdt_is_striped_client(req->rq_export))
1451                                 RETURN(-ENOTSUPP);
1452                         LASSERT(S_ISDIR(la->la_mode));
1453                         /*
1454                          * when ll_dir_getstripe() gets default LMV, it
1455                          * checks mbo_eadatasize.
1456                          */
1457                         if (!(ma->ma_valid & MA_LMV))
1458                                 repbody->mbo_eadatasize =
1459                                         ma->ma_default_lmv_size;
1460                         repbody->mbo_valid |= (OBD_MD_FLDIREA |
1461                                                OBD_MD_DEFAULT_MEA);
1462                 }
1463                 CDEBUG(D_VFSTRACE,
1464                        "dirent count %llu stripe count %u MDT count %d\n",
1465                        ma->ma_attr.la_dirent_count, stripe_count,
1466                        atomic_read(&mdt->mdt_mds_mds_conns) + 1);
1467                 if (ma->ma_attr.la_dirent_count != LU_DIRENT_COUNT_UNSET &&
1468                     ma->ma_attr.la_dirent_count >
1469                         mdt->mdt_restriper.mdr_dir_split_count &&
1470                     !fid_is_root(mdt_object_fid(o)) &&
1471                     mdt->mdt_enable_dir_auto_split &&
1472                     !o->mot_restriping &&
1473                     stripe_count < atomic_read(&mdt->mdt_mds_mds_conns) + 1)
1474                         mdt_auto_split_add(info, o);
1475         } else if (S_ISLNK(la->la_mode) &&
1476                    reqbody->mbo_valid & OBD_MD_LINKNAME) {
1477                 buffer->lb_buf = ma->ma_lmm;
1478                 /* eadatasize from client includes NULL-terminator, so
1479                  * there is no need to read it */
1480                 buffer->lb_len = reqbody->mbo_eadatasize - 1;
1481                 rc = mo_readlink(env, next, buffer);
1482                 if (unlikely(rc <= 0)) {
1483                         CERROR("%s: readlink failed for "DFID": rc = %d\n",
1484                                mdt_obd_name(info->mti_mdt),
1485                                PFID(mdt_object_fid(o)), rc);
1486                         rc = -EFAULT;
1487                 } else {
1488                         int print_limit = min_t(int, PAGE_SIZE - 128, rc);
1489
1490                         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READLINK_EPROTO))
1491                                 rc -= 2;
1492                         repbody->mbo_valid |= OBD_MD_LINKNAME;
1493                         /* we need to report back size with NULL-terminator
1494                          * because client expects that */
1495                         repbody->mbo_eadatasize = rc + 1;
1496                         if (repbody->mbo_eadatasize != reqbody->mbo_eadatasize)
1497                                 CDEBUG(D_INODE, "%s: Read shorter symlink %d "
1498                                        "on "DFID ", expected %d\n",
1499                                        mdt_obd_name(info->mti_mdt),
1500                                        rc, PFID(mdt_object_fid(o)),
1501                                        reqbody->mbo_eadatasize - 1);
1502                         /* NULL terminate */
1503                         ((char *)ma->ma_lmm)[rc] = 0;
1504
1505                         /* If the total CDEBUG() size is larger than a page, it
1506                          * will print a warning to the console, avoid this by
1507                          * printing just the last part of the symlink. */
1508                         CDEBUG(D_INODE, "symlink dest %s%.*s, len = %d\n",
1509                                print_limit < rc ? "..." : "", print_limit,
1510                                (char *)ma->ma_lmm + rc - print_limit, rc);
1511                         rc = 0;
1512                 }
1513         }
1514
1515         if (reqbody->mbo_valid & OBD_MD_FLMODEASIZE) {
1516                 repbody->mbo_max_mdsize = info->mti_mdt->mdt_max_mdsize;
1517                 repbody->mbo_valid |= OBD_MD_FLMODEASIZE;
1518                 CDEBUG(D_INODE, "changing the max MD size to %u\n",
1519                        repbody->mbo_max_mdsize);
1520         }
1521
1522 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
1523         if ((exp_connect_flags(req->rq_export) & OBD_CONNECT_ACL) &&
1524                  (reqbody->mbo_valid & OBD_MD_FLACL)) {
1525                 struct lu_nodemap *nodemap = nodemap_get_from_exp(exp);
1526                 if (IS_ERR(nodemap))
1527                         RETURN(PTR_ERR(nodemap));
1528
1529                 rc = mdt_pack_acl2body(info, repbody, o, nodemap);
1530                 nodemap_putref(nodemap);
1531         }
1532 #endif
1533
1534 out:
1535         if (rc == 0)
1536                 mdt_counter_incr(req, LPROC_MDT_GETATTR);
1537
1538         RETURN(rc);
1539 }
1540
1541 static int mdt_getattr(struct tgt_session_info *tsi)
1542 {
1543         struct mdt_thread_info  *info = tsi2mdt_info(tsi);
1544         struct mdt_object       *obj = info->mti_object;
1545         struct req_capsule      *pill = info->mti_pill;
1546         struct mdt_body         *reqbody;
1547         struct mdt_body         *repbody;
1548         int rc, rc2;
1549         ENTRY;
1550
1551         if (unlikely(info->mti_object == NULL))
1552                 RETURN(-EPROTO);
1553
1554         reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
1555         LASSERT(reqbody);
1556         LASSERT(lu_object_assert_exists(&obj->mot_obj));
1557
1558         /* Special case for Data-on-MDT files to get data version */
1559         if (unlikely(reqbody->mbo_valid & OBD_MD_FLDATAVERSION)) {
1560                 rc = mdt_data_version_get(tsi);
1561                 GOTO(out, rc);
1562         }
1563
1564         /* Unlike intent case where we need to pre-fill out buffers early on
1565          * in intent policy for ldlm reasons, here we can have a much better
1566          * guess at EA size by just reading it from disk.
1567          * Exceptions are readdir and (missing) directory striping */
1568         /* Readlink */
1569         if (reqbody->mbo_valid & OBD_MD_LINKNAME) {
1570                 /* No easy way to know how long is the symlink, but it cannot
1571                  * be more than PATH_MAX, so we allocate +1 */
1572                 rc = PATH_MAX + 1;
1573         /* A special case for fs ROOT: getattr there might fetch
1574          * default EA for entire fs, not just for this dir!
1575          */
1576         } else if (lu_fid_eq(mdt_object_fid(obj),
1577                              &info->mti_mdt->mdt_md_root_fid) &&
1578                    (reqbody->mbo_valid & OBD_MD_FLDIREA) &&
1579                    (lustre_msg_get_opc(mdt_info_req(info)->rq_reqmsg) ==
1580                                                                  MDS_GETATTR)) {
1581                 /* Should the default strping be bigger, mdt_fix_reply
1582                  * will reallocate */
1583                 rc = DEF_REP_MD_SIZE;
1584         } else {
1585                 /* Read the actual EA size from disk */
1586                 rc = mdt_attr_get_eabuf_size(info, obj);
1587         }
1588
1589         if (rc < 0)
1590                 GOTO(out, rc = err_serious(rc));
1591
1592         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, rc);
1593
1594         /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
1595          * by default. If the target object has more ACL entries, then
1596          * enlarge the buffer when necessary. */
1597         req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
1598                              LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1599
1600         rc = req_capsule_server_pack(pill);
1601         if (unlikely(rc != 0))
1602                 GOTO(out, rc = err_serious(rc));
1603
1604         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
1605         LASSERT(repbody != NULL);
1606         repbody->mbo_eadatasize = 0;
1607         repbody->mbo_aclsize = 0;
1608
1609         rc = mdt_check_ucred(info);
1610         if (unlikely(rc))
1611                 GOTO(out_shrink, rc);
1612
1613         info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
1614
1615         rc = mdt_getattr_internal(info, obj, 0);
1616         EXIT;
1617 out_shrink:
1618         mdt_client_compatibility(info);
1619         rc2 = mdt_fix_reply(info);
1620         if (rc == 0)
1621                 rc = rc2;
1622 out:
1623         mdt_thread_info_fini(info);
1624         return rc;
1625 }
1626
1627 /**
1628  * Handler of layout intent RPC requiring the layout modification
1629  *
1630  * \param[in]  info     thread environment
1631  * \param[in]  obj      object
1632  * \param[out] lhc      object ldlm lock handle
1633  * \param[in]  layout   layout change descriptor
1634  *
1635  * \retval 0    on success
1636  * \retval < 0  error code
1637  */
1638 int mdt_layout_change(struct mdt_thread_info *info, struct mdt_object *obj,
1639                       struct mdt_lock_handle *lhc,
1640                       struct md_layout_change *layout)
1641 {
1642         int rc;
1643
1644         ENTRY;
1645
1646         if (!mdt_object_exists(obj))
1647                 RETURN(-ENOENT);
1648
1649         if (!S_ISREG(lu_object_attr(&obj->mot_obj)))
1650                 RETURN(-EINVAL);
1651
1652         rc = mo_permission(info->mti_env, NULL, mdt_object_child(obj), NULL,
1653                            MAY_WRITE);
1654         if (rc)
1655                 RETURN(rc);
1656
1657         rc = mdt_check_resent_lock(info, obj, lhc);
1658         if (rc < 0)
1659                 RETURN(rc);
1660
1661         if (rc > 0) {
1662                 /* not resent */
1663                 __u64 lockpart = MDS_INODELOCK_LAYOUT;
1664
1665                 /* take layout lock to prepare layout change */
1666                 if (layout->mlc_opc == MD_LAYOUT_WRITE)
1667                         lockpart |= MDS_INODELOCK_UPDATE;
1668
1669                 mdt_lock_handle_init(lhc);
1670                 mdt_lock_reg_init(lhc, LCK_EX);
1671                 rc = mdt_reint_object_lock(info, obj, lhc, lockpart, false);
1672                 if (rc)
1673                         RETURN(rc);
1674         }
1675
1676         mutex_lock(&obj->mot_som_mutex);
1677         rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout);
1678         mutex_unlock(&obj->mot_som_mutex);
1679
1680         if (rc)
1681                 mdt_object_unlock(info, obj, lhc, 1);
1682
1683         RETURN(rc);
1684 }
1685
1686 /**
1687  * Exchange MOF_LOV_CREATED flags between two objects after a
1688  * layout swap. No assumption is made on whether o1 or o2 have
1689  * created objects or not.
1690  *
1691  * \param[in,out] o1    First swap layout object
1692  * \param[in,out] o2    Second swap layout object
1693  */
1694 static void mdt_swap_lov_flag(struct mdt_object *o1, struct mdt_object *o2)
1695 {
1696         unsigned int o1_lov_created = o1->mot_lov_created;
1697
1698         mutex_lock(&o1->mot_lov_mutex);
1699         mutex_lock(&o2->mot_lov_mutex);
1700
1701         o1->mot_lov_created = o2->mot_lov_created;
1702         o2->mot_lov_created = o1_lov_created;
1703
1704         mutex_unlock(&o2->mot_lov_mutex);
1705         mutex_unlock(&o1->mot_lov_mutex);
1706 }
1707
1708 static int mdt_swap_layouts(struct tgt_session_info *tsi)
1709 {
1710         struct mdt_thread_info  *info;
1711         struct ptlrpc_request   *req = tgt_ses_req(tsi);
1712         struct obd_export       *exp = req->rq_export;
1713         struct mdt_object       *o1, *o2, *o;
1714         struct mdt_lock_handle  *lh1, *lh2;
1715         struct mdc_swap_layouts *msl;
1716         int                      rc;
1717         ENTRY;
1718
1719         /* client does not support layout lock, so layout swaping
1720          * is disabled.
1721          * FIXME: there is a problem for old clients which don't support
1722          * layout lock yet. If those clients have already opened the file
1723          * they won't be notified at all so that old layout may still be
1724          * used to do IO. This can be fixed after file release is landed by
1725          * doing exclusive open and taking full EX ibits lock. - Jinshan */
1726         if (!exp_connect_layout(exp))
1727                 RETURN(-EOPNOTSUPP);
1728
1729         info = tsi2mdt_info(tsi);
1730         if (unlikely(info->mti_object == NULL))
1731                 RETURN(-EPROTO);
1732
1733         if (info->mti_dlm_req != NULL)
1734                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1735
1736         o1 = info->mti_object;
1737         o = o2 = mdt_object_find(info->mti_env, info->mti_mdt,
1738                                 &info->mti_body->mbo_fid2);
1739         if (IS_ERR(o))
1740                 GOTO(out, rc = PTR_ERR(o));
1741
1742         if (mdt_object_remote(o) || !mdt_object_exists(o)) /* remote object */
1743                 GOTO(put, rc = -ENOENT);
1744
1745         rc = lu_fid_cmp(&info->mti_body->mbo_fid1, &info->mti_body->mbo_fid2);
1746         if (unlikely(rc == 0)) /* same file, you kidding me? no-op. */
1747                 GOTO(put, rc);
1748
1749         if (rc < 0)
1750                 swap(o1, o2);
1751
1752         /* permission check. Make sure the calling process having permission
1753          * to write both files. */
1754         rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL,
1755                            MAY_WRITE);
1756         if (rc < 0)
1757                 GOTO(put, rc);
1758
1759         rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2), NULL,
1760                            MAY_WRITE);
1761         if (rc < 0)
1762                 GOTO(put, rc);
1763
1764         msl = req_capsule_client_get(info->mti_pill, &RMF_SWAP_LAYOUTS);
1765         if (msl == NULL)
1766                 GOTO(put, rc = -EPROTO);
1767
1768         lh1 = &info->mti_lh[MDT_LH_NEW];
1769         mdt_lock_reg_init(lh1, LCK_EX);
1770         lh2 = &info->mti_lh[MDT_LH_OLD];
1771         mdt_lock_reg_init(lh2, LCK_EX);
1772
1773         rc = mdt_object_lock(info, o1, lh1, MDS_INODELOCK_LAYOUT |
1774                              MDS_INODELOCK_XATTR);
1775         if (rc < 0)
1776                 GOTO(put, rc);
1777
1778         rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
1779                              MDS_INODELOCK_XATTR);
1780         if (rc < 0)
1781                 GOTO(unlock1, rc);
1782
1783         rc = mo_swap_layouts(info->mti_env, mdt_object_child(o1),
1784                              mdt_object_child(o2), msl->msl_flags);
1785         if (rc < 0)
1786                 GOTO(unlock2, rc);
1787
1788         mdt_swap_lov_flag(o1, o2);
1789
1790 unlock2:
1791         mdt_object_unlock(info, o2, lh2, rc);
1792 unlock1:
1793         mdt_object_unlock(info, o1, lh1, rc);
1794 put:
1795         mdt_object_put(info->mti_env, o);
1796 out:
1797         mdt_thread_info_fini(info);
1798         RETURN(rc);
1799 }
1800
1801 static int mdt_raw_lookup(struct mdt_thread_info *info,
1802                           struct mdt_object *parent,
1803                           const struct lu_name *lname,
1804                           struct ldlm_reply *ldlm_rep)
1805 {
1806         struct lu_fid   *child_fid = &info->mti_tmp_fid1;
1807         int              rc;
1808         ENTRY;
1809
1810         LASSERT(!info->mti_cross_ref);
1811
1812         /* Only got the fid of this obj by name */
1813         fid_zero(child_fid);
1814         rc = mdo_lookup(info->mti_env, mdt_object_child(info->mti_object),
1815                         lname, child_fid, &info->mti_spec);
1816         if (rc == 0) {
1817                 struct mdt_body *repbody;
1818
1819                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1820                 repbody->mbo_fid1 = *child_fid;
1821                 repbody->mbo_valid = OBD_MD_FLID;
1822                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
1823         } else if (rc == -ENOENT) {
1824                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
1825         }
1826
1827         RETURN(rc);
1828 }
1829
1830 /*
1831  * UPDATE lock should be taken against parent, and be released before exit;
1832  * child_bits lock should be taken against child, and be returned back:
1833  *            (1)normal request should release the child lock;
1834  *            (2)intent request will grant the lock to client.
1835  */
1836 static int mdt_getattr_name_lock(struct mdt_thread_info *info,
1837                                  struct mdt_lock_handle *lhc,
1838                                  __u64 child_bits,
1839                                  struct ldlm_reply *ldlm_rep)
1840 {
1841         struct ptlrpc_request  *req = mdt_info_req(info);
1842         struct mdt_body        *reqbody = NULL;
1843         struct mdt_object      *parent = info->mti_object;
1844         struct mdt_object      *child;
1845         struct lu_fid          *child_fid = &info->mti_tmp_fid1;
1846         struct lu_name         *lname = NULL;
1847         struct mdt_lock_handle *lhp = NULL;
1848         struct ldlm_lock       *lock;
1849         struct req_capsule *pill = info->mti_pill;
1850         __u64 try_bits = 0;
1851         bool is_resent;
1852         int ma_need = 0;
1853         int rc;
1854
1855         ENTRY;
1856
1857         is_resent = lustre_handle_is_used(&lhc->mlh_reg_lh);
1858         LASSERT(ergo(is_resent,
1859                      lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT));
1860
1861         if (parent == NULL)
1862                 RETURN(-ENOENT);
1863
1864         if (info->mti_cross_ref) {
1865                 /* Only getattr on the child. Parent is on another node. */
1866                 mdt_set_disposition(info, ldlm_rep,
1867                                     DISP_LOOKUP_EXECD | DISP_LOOKUP_POS);
1868                 child = parent;
1869                 CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID", "
1870                        "ldlm_rep = %p\n",
1871                        PFID(mdt_object_fid(child)), ldlm_rep);
1872
1873                 rc = mdt_check_resent_lock(info, child, lhc);
1874                 if (rc < 0) {
1875                         RETURN(rc);
1876                 } else if (rc > 0) {
1877                         mdt_lock_handle_init(lhc);
1878                         mdt_lock_reg_init(lhc, LCK_PR);
1879
1880                         /*
1881                          * Object's name entry is on another MDS, it will
1882                          * request PERM lock only because LOOKUP lock is owned
1883                          * by the MDS where name entry resides.
1884                          *
1885                          * TODO: it should try layout lock too. - Jinshan
1886                          */
1887                         child_bits &= ~(MDS_INODELOCK_LOOKUP |
1888                                         MDS_INODELOCK_LAYOUT);
1889                         child_bits |= MDS_INODELOCK_PERM;
1890
1891                         rc = mdt_object_lock(info, child, lhc, child_bits);
1892                         if (rc < 0)
1893                                 RETURN(rc);
1894                 }
1895
1896                 /* Finally, we can get attr for child. */
1897                 if (!mdt_object_exists(child)) {
1898                         LU_OBJECT_DEBUG(D_INFO, info->mti_env,
1899                                         &child->mot_obj,
1900                                         "remote object doesn't exist.");
1901                         mdt_object_unlock(info, child, lhc, 1);
1902                         RETURN(-ENOENT);
1903                 }
1904
1905                 rc = mdt_getattr_internal(info, child, 0);
1906                 if (unlikely(rc != 0)) {
1907                         mdt_object_unlock(info, child, lhc, 1);
1908                         RETURN(rc);
1909                 }
1910
1911                 rc = mdt_pack_secctx_in_reply(info, child);
1912                 if (unlikely(rc))
1913                         mdt_object_unlock(info, child, lhc, 1);
1914                 RETURN(rc);
1915         }
1916
1917         lname = &info->mti_name;
1918         mdt_name_unpack(pill, &RMF_NAME, lname, MNF_FIX_ANON);
1919
1920         if (lu_name_is_valid(lname)) {
1921                 CDEBUG(D_INODE, "getattr with lock for "DFID"/"DNAME", "
1922                        "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)),
1923                        PNAME(lname), ldlm_rep);
1924         } else {
1925                 reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
1926                 if (unlikely(reqbody == NULL))
1927                         RETURN(err_serious(-EPROTO));
1928
1929                 *child_fid = reqbody->mbo_fid2;
1930
1931                 if (unlikely(!fid_is_sane(child_fid)))
1932                         RETURN(err_serious(-EINVAL));
1933
1934                 CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
1935                        "ldlm_rep = %p\n",
1936                        PFID(mdt_object_fid(parent)),
1937                        PFID(&reqbody->mbo_fid2), ldlm_rep);
1938         }
1939
1940         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
1941
1942         if (unlikely(!mdt_object_exists(parent)) && lu_name_is_valid(lname)) {
1943                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
1944                                 &parent->mot_obj,
1945                                 "Parent doesn't exist!");
1946                 RETURN(-ESTALE);
1947         }
1948
1949         if (mdt_object_remote(parent)) {
1950                 CERROR("%s: parent "DFID" is on remote target\n",
1951                        mdt_obd_name(info->mti_mdt),
1952                        PFID(mdt_object_fid(parent)));
1953                 RETURN(-EIO);
1954         }
1955
1956         if (lu_name_is_valid(lname)) {
1957                 /* Always allow to lookup ".." */
1958                 if (unlikely(lname->ln_namelen == 2 &&
1959                              lname->ln_name[0] == '.' &&
1960                              lname->ln_name[1] == '.'))
1961                         info->mti_spec.sp_permitted = 1;
1962
1963                 if (info->mti_body->mbo_valid == OBD_MD_FLID) {
1964                         rc = mdt_raw_lookup(info, parent, lname, ldlm_rep);
1965
1966                         RETURN(rc);
1967                 }
1968
1969                 /* step 1: lock parent only if parent is a directory */
1970                 if (S_ISDIR(lu_object_attr(&parent->mot_obj))) {
1971                         lhp = &info->mti_lh[MDT_LH_PARENT];
1972                         mdt_lock_pdo_init(lhp, LCK_PR, lname);
1973                         rc = mdt_object_lock(info, parent, lhp,
1974                                              MDS_INODELOCK_UPDATE);
1975                         if (unlikely(rc != 0))
1976                                 RETURN(rc);
1977                 }
1978
1979                 /* step 2: lookup child's fid by name */
1980                 fid_zero(child_fid);
1981                 rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname,
1982                                 child_fid, &info->mti_spec);
1983                 if (rc == -ENOENT)
1984                         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
1985
1986                 if (rc != 0)
1987                         GOTO(out_parent, rc);
1988         }
1989
1990         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
1991
1992         /*
1993          *step 3: find the child object by fid & lock it.
1994          *        regardless if it is local or remote.
1995          *
1996          *Note: LU-3240 (commit 762f2114d282a98ebfa4dbbeea9298a8088ad24e)
1997          *      set parent dir fid the same as child fid in getattr by fid case
1998          *      we should not lu_object_find() the object again, could lead
1999          *      to hung if there is a concurrent unlink destroyed the object.
2000          */
2001         if (lu_fid_eq(mdt_object_fid(parent), child_fid)) {
2002                 mdt_object_get(info->mti_env, parent);
2003                 child = parent;
2004         } else {
2005                 child = mdt_object_find(info->mti_env, info->mti_mdt,
2006                                         child_fid);
2007         }
2008
2009         if (unlikely(IS_ERR(child)))
2010                 GOTO(out_parent, rc = PTR_ERR(child));
2011
2012         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2);
2013         if (!mdt_object_exists(child)) {
2014                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2015                                 &child->mot_obj,
2016                                 "Object doesn't exist!");
2017                 GOTO(out_child, rc = -ENOENT);
2018         }
2019
2020         rc = mdt_check_resent_lock(info, child, lhc);
2021         if (rc < 0) {
2022                 GOTO(out_child, rc);
2023         } else if (rc > 0) {
2024                 mdt_lock_handle_init(lhc);
2025                 mdt_lock_reg_init(lhc, LCK_PR);
2026
2027                 if (!(child_bits & MDS_INODELOCK_UPDATE) &&
2028                     !mdt_object_remote(child)) {
2029                         struct md_attr *ma = &info->mti_attr;
2030
2031                         ma->ma_valid = 0;
2032                         ma->ma_need = MA_INODE;
2033                         rc = mdt_attr_get_complex(info, child, ma);
2034                         if (unlikely(rc != 0))
2035                                 GOTO(out_child, rc);
2036
2037                         /* If the file has not been changed for some time, we
2038                          * return not only a LOOKUP lock, but also an UPDATE
2039                          * lock and this might save us RPC on later STAT. For
2040                          * directories, it also let negative dentry cache start
2041                          * working for this dir. */
2042                         if (ma->ma_valid & MA_INODE &&
2043                             ma->ma_attr.la_valid & LA_CTIME &&
2044                             info->mti_mdt->mdt_namespace->ns_ctime_age_limit +
2045                             ma->ma_attr.la_ctime < ktime_get_real_seconds())
2046                                 child_bits |= MDS_INODELOCK_UPDATE;
2047                 }
2048
2049                 /* layout lock must be granted in a best-effort way
2050                  * for IT operations */
2051                 LASSERT(!(child_bits & MDS_INODELOCK_LAYOUT));
2052                 if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
2053                     !mdt_object_remote(child) && ldlm_rep != NULL) {
2054                         if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) &&
2055                             exp_connect_layout(info->mti_exp)) {
2056                                 /* try to grant layout lock for regular file. */
2057                                 try_bits = MDS_INODELOCK_LAYOUT;
2058                         }
2059                         /* Acquire DOM lock in advance for data-on-mdt file */
2060                         if (child != parent)
2061                                 try_bits |= MDS_INODELOCK_DOM;
2062                 }
2063
2064                 if (try_bits != 0) {
2065                         /* try layout lock, it may fail to be granted due to
2066                          * contention at LOOKUP or UPDATE */
2067                         rc = mdt_object_lock_try(info, child, lhc, &child_bits,
2068                                                  try_bits, false);
2069                         if (child_bits & MDS_INODELOCK_LAYOUT)
2070                                 ma_need |= MA_LOV;
2071                 } else {
2072                         /* Do not enqueue the UPDATE lock from MDT(cross-MDT),
2073                          * client will enqueue the lock to the remote MDT */
2074                         if (mdt_object_remote(child))
2075                                 child_bits &= ~MDS_INODELOCK_UPDATE;
2076                         rc = mdt_object_lock(info, child, lhc, child_bits);
2077                 }
2078                 if (unlikely(rc != 0))
2079                         GOTO(out_child, rc);
2080         }
2081
2082         /* finally, we can get attr for child. */
2083         rc = mdt_getattr_internal(info, child, ma_need);
2084         if (unlikely(rc != 0)) {
2085                 mdt_object_unlock(info, child, lhc, 1);
2086                 GOTO(out_child, rc);
2087         }
2088
2089         rc = mdt_pack_secctx_in_reply(info, child);
2090         if (unlikely(rc)) {
2091                 mdt_object_unlock(info, child, lhc, 1);
2092                 GOTO(out_child, rc);
2093         }
2094
2095         lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
2096         if (lock) {
2097                 /* Debugging code. */
2098                 LDLM_DEBUG(lock, "Returning lock to client");
2099                 LASSERTF(fid_res_name_eq(mdt_object_fid(child),
2100                                          &lock->l_resource->lr_name),
2101                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
2102                          PLDLMRES(lock->l_resource),
2103                          PFID(mdt_object_fid(child)));
2104
2105                 if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
2106                     !mdt_object_remote(child) && child != parent) {
2107                         mdt_object_put(info->mti_env, child);
2108                         rc = mdt_pack_size2body(info, child_fid,
2109                                                 &lhc->mlh_reg_lh);
2110                         if (rc != 0 && child_bits & MDS_INODELOCK_DOM) {
2111                                 /* DOM lock was taken in advance but this is
2112                                  * not DoM file. Drop the lock.
2113                                  */
2114                                 lock_res_and_lock(lock);
2115                                 ldlm_inodebits_drop(lock, MDS_INODELOCK_DOM);
2116                                 unlock_res_and_lock(lock);
2117                         }
2118                         LDLM_LOCK_PUT(lock);
2119                         GOTO(out_parent, rc = 0);
2120                 }
2121                 LDLM_LOCK_PUT(lock);
2122         }
2123
2124         EXIT;
2125 out_child:
2126         mdt_object_put(info->mti_env, child);
2127 out_parent:
2128         if (lhp)
2129                 mdt_object_unlock(info, parent, lhp, 1);
2130         return rc;
2131 }
2132
2133 /* normal handler: should release the child lock */
2134 static int mdt_getattr_name(struct tgt_session_info *tsi)
2135 {
2136         struct mdt_thread_info  *info = tsi2mdt_info(tsi);
2137         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
2138         struct mdt_body        *reqbody;
2139         struct mdt_body        *repbody;
2140         int rc, rc2;
2141         ENTRY;
2142
2143         reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
2144         LASSERT(reqbody != NULL);
2145         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2146         LASSERT(repbody != NULL);
2147
2148         info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
2149         repbody->mbo_eadatasize = 0;
2150         repbody->mbo_aclsize = 0;
2151
2152         rc = mdt_init_ucred_intent_getattr(info, reqbody);
2153         if (unlikely(rc))
2154                 GOTO(out_shrink, rc);
2155
2156         rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
2157         if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
2158                 ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode);
2159                 lhc->mlh_reg_lh.cookie = 0;
2160         }
2161         mdt_exit_ucred(info);
2162         EXIT;
2163 out_shrink:
2164         mdt_client_compatibility(info);
2165         rc2 = mdt_fix_reply(info);
2166         if (rc == 0)
2167                 rc = rc2;
2168         mdt_thread_info_fini(info);
2169         return rc;
2170 }
2171
2172 static int mdt_rmfid_unlink(struct mdt_thread_info *info,
2173                             const struct lu_fid *pfid,
2174                             const struct lu_name *name,
2175                             struct mdt_object *obj, s64 ctime)
2176 {
2177         struct lu_fid *child_fid = &info->mti_tmp_fid1;
2178         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
2179         struct mdt_device *mdt = info->mti_mdt;
2180         struct md_attr *ma = &info->mti_attr;
2181         struct mdt_lock_handle *parent_lh;
2182         struct mdt_lock_handle *child_lh;
2183         struct mdt_object *pobj;
2184         bool cos_incompat = false;
2185         int rc;
2186         ENTRY;
2187
2188         pobj = mdt_object_find(info->mti_env, mdt, pfid);
2189         if (IS_ERR(pobj))
2190                 GOTO(out, rc = PTR_ERR(pobj));
2191
2192         parent_lh = &info->mti_lh[MDT_LH_PARENT];
2193         mdt_lock_pdo_init(parent_lh, LCK_PW, name);
2194         rc = mdt_object_lock(info, pobj, parent_lh, MDS_INODELOCK_UPDATE);
2195         if (rc != 0)
2196                 GOTO(put_parent, rc);
2197
2198         if (mdt_object_remote(pobj))
2199                 cos_incompat = true;
2200
2201         rc = mdo_lookup(info->mti_env, mdt_object_child(pobj),
2202                         name, child_fid, &info->mti_spec);
2203         if (rc != 0)
2204                 GOTO(unlock_parent, rc);
2205
2206         if (!lu_fid_eq(child_fid, mdt_object_fid(obj)))
2207                 GOTO(unlock_parent, rc = -EREMCHG);
2208
2209         child_lh = &info->mti_lh[MDT_LH_CHILD];
2210         mdt_lock_reg_init(child_lh, LCK_EX);
2211         rc = mdt_reint_striped_lock(info, obj, child_lh,
2212                                     MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE,
2213                                     einfo, cos_incompat);
2214         if (rc != 0)
2215                 GOTO(unlock_parent, rc);
2216
2217         if (atomic_read(&obj->mot_open_count)) {
2218                 CDEBUG(D_OTHER, "object "DFID" open, skip\n",
2219                        PFID(mdt_object_fid(obj)));
2220                 GOTO(unlock_child, rc = -EBUSY);
2221         }
2222
2223         ma->ma_need = 0;
2224         ma->ma_valid = MA_INODE;
2225         ma->ma_attr.la_valid = LA_CTIME;
2226         ma->ma_attr.la_ctime = ctime;
2227
2228         mutex_lock(&obj->mot_lov_mutex);
2229
2230         rc = mdo_unlink(info->mti_env, mdt_object_child(pobj),
2231                         mdt_object_child(obj), name, ma, 0);
2232
2233         mutex_unlock(&obj->mot_lov_mutex);
2234
2235 unlock_child:
2236         mdt_reint_striped_unlock(info, obj, child_lh, einfo, 1);
2237 unlock_parent:
2238         mdt_object_unlock(info, pobj, parent_lh, 1);
2239 put_parent:
2240         mdt_object_put(info->mti_env, pobj);
2241 out:
2242         RETURN(rc);
2243 }
2244
2245 static int mdt_rmfid_check_permission(struct mdt_thread_info *info,
2246                                         struct mdt_object *obj)
2247 {
2248         struct lu_ucred *uc = lu_ucred(info->mti_env);
2249         struct md_attr *ma = &info->mti_attr;
2250         struct lu_attr *la = &ma->ma_attr;
2251         int rc = 0;
2252         ENTRY;
2253
2254         ma->ma_need = MA_INODE;
2255         rc = mo_attr_get(info->mti_env, mdt_object_child(obj), ma);
2256         if (rc)
2257                 GOTO(out, rc);
2258
2259         if (la->la_flags & LUSTRE_IMMUTABLE_FL)
2260                         rc = -EACCES;
2261
2262         if (md_capable(uc, CFS_CAP_DAC_OVERRIDE))
2263                 RETURN(0);
2264         if (uc->uc_fsuid == la->la_uid) {
2265                 if ((la->la_mode & S_IWUSR) == 0)
2266                         rc = -EACCES;
2267         } else if (uc->uc_fsgid == la->la_gid) {
2268                 if ((la->la_mode & S_IWGRP) == 0)
2269                         rc = -EACCES;
2270         } else if ((la->la_mode & S_IWOTH) == 0) {
2271                         rc = -EACCES;
2272         }
2273
2274 out:
2275         RETURN(rc);
2276 }
2277
2278 static int mdt_rmfid_one(struct mdt_thread_info *info, struct lu_fid *fid,
2279                          s64 ctime)
2280 {
2281         struct mdt_device *mdt = info->mti_mdt;
2282         struct mdt_object *obj = NULL;
2283         struct linkea_data ldata = { NULL };
2284         struct lu_buf *buf = &info->mti_big_buf;
2285         struct lu_name *name = &info->mti_name;
2286         struct lu_fid *pfid = &info->mti_tmp_fid1;
2287         struct link_ea_header *leh;
2288         struct link_ea_entry *lee;
2289         int reclen, count, rc = 0;
2290         ENTRY;
2291
2292         if (!fid_is_sane(fid))
2293                 GOTO(out, rc = -EINVAL);
2294
2295         if (!fid_is_namespace_visible(fid))
2296                 GOTO(out, rc = -EINVAL);
2297
2298         obj = mdt_object_find(info->mti_env, mdt, fid);
2299         if (IS_ERR(obj))
2300                 GOTO(out, rc = PTR_ERR(obj));
2301
2302         if (mdt_object_remote(obj))
2303                 GOTO(out, rc = -EREMOTE);
2304         if (!mdt_object_exists(obj) || lu_object_is_dying(&obj->mot_header))
2305                 GOTO(out, rc = -ENOENT);
2306
2307         rc = mdt_rmfid_check_permission(info, obj);
2308         if (rc)
2309                 GOTO(out, rc);
2310
2311         /* take LinkEA */
2312         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
2313         if (!buf->lb_buf)
2314                 GOTO(out, rc = -ENOMEM);
2315
2316         ldata.ld_buf = buf;
2317         rc = mdt_links_read(info, obj, &ldata);
2318         if (rc)
2319                 GOTO(out, rc);
2320
2321         leh = buf->lb_buf;
2322         lee = (struct link_ea_entry *)(leh + 1);
2323         for (count = 0; count < leh->leh_reccount; count++) {
2324                 /* remove every hardlink */
2325                 linkea_entry_unpack(lee, &reclen, name, pfid);
2326                 lee = (struct link_ea_entry *) ((char *)lee + reclen);
2327                 rc = mdt_rmfid_unlink(info, pfid, name, obj, ctime);
2328                 if (rc)
2329                         break;
2330         }
2331
2332 out:
2333         if (obj && !IS_ERR(obj))
2334                 mdt_object_put(info->mti_env, obj);
2335         if (info->mti_big_buf.lb_buf)
2336                 lu_buf_free(&info->mti_big_buf);
2337
2338         RETURN(rc);
2339 }
2340
2341 static int mdt_rmfid(struct tgt_session_info *tsi)
2342 {
2343         struct mdt_thread_info *mti = tsi2mdt_info(tsi);
2344         struct mdt_body *reqbody;
2345         struct lu_fid *fids, *rfids;
2346         int bufsize, rc;
2347         __u32 *rcs;
2348         int i, nr;
2349         ENTRY;
2350
2351         reqbody = req_capsule_client_get(tsi->tsi_pill, &RMF_MDT_BODY);
2352         if (reqbody == NULL)
2353                 RETURN(-EPROTO);
2354         bufsize = req_capsule_get_size(tsi->tsi_pill, &RMF_FID_ARRAY,
2355                                        RCL_CLIENT);
2356         nr = bufsize / sizeof(struct lu_fid);
2357         if (nr * sizeof(struct lu_fid) != bufsize)
2358                 RETURN(-EINVAL);
2359         req_capsule_set_size(tsi->tsi_pill, &RMF_RCS,
2360                              RCL_SERVER, nr * sizeof(__u32));
2361         req_capsule_set_size(tsi->tsi_pill, &RMF_FID_ARRAY,
2362                              RCL_SERVER, nr * sizeof(struct lu_fid));
2363         rc = req_capsule_server_pack(tsi->tsi_pill);
2364         if (rc)
2365                 GOTO(out, rc = err_serious(rc));
2366         fids = req_capsule_client_get(tsi->tsi_pill, &RMF_FID_ARRAY);
2367         if (fids == NULL)
2368                 RETURN(-EPROTO);
2369         rcs = req_capsule_server_get(tsi->tsi_pill, &RMF_RCS);
2370         LASSERT(rcs);
2371         rfids = req_capsule_server_get(tsi->tsi_pill, &RMF_FID_ARRAY);
2372         LASSERT(rfids);
2373
2374         mdt_init_ucred(mti, reqbody);
2375         for (i = 0; i < nr; i++) {
2376                 rfids[i] = fids[i];
2377                 rcs[i] = mdt_rmfid_one(mti, fids + i, reqbody->mbo_ctime);
2378         }
2379         mdt_exit_ucred(mti);
2380
2381 out:
2382         RETURN(rc);
2383 }
2384
2385 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2386                          void *karg, void __user *uarg);
2387
2388 static int mdt_set_info(struct tgt_session_info *tsi)
2389 {
2390         struct ptlrpc_request   *req = tgt_ses_req(tsi);
2391         char                    *key;
2392         void                    *val;
2393         int                      keylen, vallen, rc = 0;
2394
2395         ENTRY;
2396
2397         key = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_KEY);
2398         if (key == NULL) {
2399                 DEBUG_REQ(D_HA, req, "no set_info key");
2400                 RETURN(err_serious(-EFAULT));
2401         }
2402
2403         keylen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_KEY,
2404                                       RCL_CLIENT);
2405
2406         val = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_VAL);
2407         if (val == NULL) {
2408                 DEBUG_REQ(D_HA, req, "no set_info val");
2409                 RETURN(err_serious(-EFAULT));
2410         }
2411
2412         vallen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_VAL,
2413                                       RCL_CLIENT);
2414
2415         /* Swab any part of val you need to here */
2416         if (KEY_IS(KEY_READ_ONLY)) {
2417                 spin_lock(&req->rq_export->exp_lock);
2418                 if (*(__u32 *)val)
2419                         *exp_connect_flags_ptr(req->rq_export) |=
2420                                 OBD_CONNECT_RDONLY;
2421                 else
2422                         *exp_connect_flags_ptr(req->rq_export) &=
2423                                 ~OBD_CONNECT_RDONLY;
2424                 spin_unlock(&req->rq_export->exp_lock);
2425         } else if (KEY_IS(KEY_CHANGELOG_CLEAR)) {
2426                 struct changelog_setinfo *cs = val;
2427
2428                 if (vallen != sizeof(*cs)) {
2429                         CERROR("%s: bad changelog_clear setinfo size %d\n",
2430                                tgt_name(tsi->tsi_tgt), vallen);
2431                         RETURN(-EINVAL);
2432                 }
2433                 if (ptlrpc_req_need_swab(req)) {
2434                         __swab64s(&cs->cs_recno);
2435                         __swab32s(&cs->cs_id);
2436                 }
2437
2438                 if (!mdt_is_rootadmin(tsi2mdt_info(tsi)))
2439                         RETURN(-EACCES);
2440                 rc = mdt_iocontrol(OBD_IOC_CHANGELOG_CLEAR, req->rq_export,
2441                                    vallen, val, NULL);
2442         } else if (KEY_IS(KEY_EVICT_BY_NID)) {
2443                 if (vallen > 0)
2444                         obd_export_evict_by_nid(req->rq_export->exp_obd, val);
2445         } else {
2446                 RETURN(-EINVAL);
2447         }
2448         RETURN(rc);
2449 }
2450
2451 static int mdt_readpage(struct tgt_session_info *tsi)
2452 {
2453         struct mdt_thread_info  *info = mdt_th_info(tsi->tsi_env);
2454         struct mdt_object       *object = mdt_obj(tsi->tsi_corpus);
2455         struct lu_rdpg          *rdpg = &info->mti_u.rdpg.mti_rdpg;
2456         const struct mdt_body   *reqbody = tsi->tsi_mdt_body;
2457         struct mdt_body         *repbody;
2458         int                      rc;
2459         int                      i;
2460
2461         ENTRY;
2462
2463         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
2464                 RETURN(err_serious(-ENOMEM));
2465
2466         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_MDT_BODY);
2467         if (repbody == NULL || reqbody == NULL)
2468                 RETURN(err_serious(-EFAULT));
2469
2470         /*
2471          * prepare @rdpg before calling lower layers and transfer itself. Here
2472          * reqbody->size contains offset of where to start to read and
2473          * reqbody->nlink contains number bytes to read.
2474          */
2475         rdpg->rp_hash = reqbody->mbo_size;
2476         if (rdpg->rp_hash != reqbody->mbo_size) {
2477                 CERROR("Invalid hash: %#llx != %#llx\n",
2478                        rdpg->rp_hash, reqbody->mbo_size);
2479                 RETURN(-EFAULT);
2480         }
2481
2482         rdpg->rp_attrs = reqbody->mbo_mode;
2483         if (exp_connect_flags(tsi->tsi_exp) & OBD_CONNECT_64BITHASH)
2484                 rdpg->rp_attrs |= LUDA_64BITHASH;
2485         rdpg->rp_count  = min_t(unsigned int, reqbody->mbo_nlink,
2486                                 exp_max_brw_size(tsi->tsi_exp));
2487         rdpg->rp_npages = (rdpg->rp_count + PAGE_SIZE - 1) >>
2488                           PAGE_SHIFT;
2489         OBD_ALLOC_PTR_ARRAY(rdpg->rp_pages, rdpg->rp_npages);
2490         if (rdpg->rp_pages == NULL)
2491                 RETURN(-ENOMEM);
2492
2493         for (i = 0; i < rdpg->rp_npages; ++i) {
2494                 rdpg->rp_pages[i] = alloc_page(GFP_NOFS);
2495                 if (rdpg->rp_pages[i] == NULL)
2496                         GOTO(free_rdpg, rc = -ENOMEM);
2497         }
2498
2499         /* call lower layers to fill allocated pages with directory data */
2500         rc = mo_readpage(tsi->tsi_env, mdt_object_child(object), rdpg);
2501         if (rc < 0)
2502                 GOTO(free_rdpg, rc);
2503
2504         /* send pages to client */
2505         rc = tgt_sendpage(tsi, rdpg, rc);
2506
2507         EXIT;
2508 free_rdpg:
2509
2510         for (i = 0; i < rdpg->rp_npages; i++)
2511                 if (rdpg->rp_pages[i] != NULL)
2512                         __free_page(rdpg->rp_pages[i]);
2513         OBD_FREE_PTR_ARRAY(rdpg->rp_pages, rdpg->rp_npages);
2514
2515         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
2516                 RETURN(0);
2517
2518         return rc;
2519 }
2520
2521 static int mdt_fix_attr_ucred(struct mdt_thread_info *info, __u32 op)
2522 {
2523         struct lu_ucred *uc = mdt_ucred_check(info);
2524         struct lu_attr *attr = &info->mti_attr.ma_attr;
2525
2526         if (uc == NULL)
2527                 return -EINVAL;
2528
2529         if (op != REINT_SETATTR) {
2530                 if ((attr->la_valid & LA_UID) && (attr->la_uid != -1))
2531                         attr->la_uid = uc->uc_fsuid;
2532                 /* for S_ISGID, inherit gid from his parent, such work will be
2533                  * done in cmm/mdd layer, here set all cases as uc->uc_fsgid. */
2534                 if ((attr->la_valid & LA_GID) && (attr->la_gid != -1))
2535                         attr->la_gid = uc->uc_fsgid;
2536         }
2537
2538         return 0;
2539 }
2540
2541 static inline bool mdt_is_readonly_open(struct mdt_thread_info *info, __u32 op)
2542 {
2543         return op == REINT_OPEN &&
2544              !(info->mti_spec.sp_cr_flags & (MDS_FMODE_WRITE | MDS_OPEN_CREAT));
2545 }
2546
2547 static void mdt_preset_secctx_size(struct mdt_thread_info *info)
2548 {
2549         struct req_capsule *pill = info->mti_pill;
2550
2551         if (req_capsule_has_field(pill, &RMF_FILE_SECCTX,
2552                                   RCL_SERVER) &&
2553             req_capsule_has_field(pill, &RMF_FILE_SECCTX_NAME,
2554                                   RCL_CLIENT)) {
2555                 if (req_capsule_get_size(pill, &RMF_FILE_SECCTX_NAME,
2556                                          RCL_CLIENT) != 0) {
2557                         /* pre-set size in server part with max size */
2558                         req_capsule_set_size(pill, &RMF_FILE_SECCTX,
2559                                              RCL_SERVER,
2560                                              info->mti_mdt->mdt_max_ea_size);
2561                 } else {
2562                         req_capsule_set_size(pill, &RMF_FILE_SECCTX,
2563                                              RCL_SERVER, 0);
2564                 }
2565         }
2566
2567 }
2568
2569 static int mdt_reint_internal(struct mdt_thread_info *info,
2570                               struct mdt_lock_handle *lhc,
2571                               __u32 op)
2572 {
2573         struct req_capsule      *pill = info->mti_pill;
2574         struct mdt_body         *repbody;
2575         int                      rc = 0, rc2;
2576
2577         ENTRY;
2578
2579         rc = mdt_reint_unpack(info, op);
2580         if (rc != 0) {
2581                 CERROR("Can't unpack reint, rc %d\n", rc);
2582                 RETURN(err_serious(rc));
2583         }
2584
2585
2586         /* check if the file system is set to readonly. O_RDONLY open
2587          * is still allowed even the file system is set to readonly mode */
2588         if (mdt_rdonly(info->mti_exp) && !mdt_is_readonly_open(info, op))
2589                 RETURN(err_serious(-EROFS));
2590
2591         /* for replay (no_create) lmm is not needed, client has it already */
2592         if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
2593                 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
2594                                      DEF_REP_MD_SIZE);
2595
2596         /* llog cookies are always 0, the field is kept for compatibility */
2597         if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
2598                 req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, 0);
2599
2600         /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
2601          * by default. If the target object has more ACL entries, then
2602          * enlarge the buffer when necessary. */
2603         if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER))
2604                 req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
2605                                      LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
2606
2607         mdt_preset_secctx_size(info);
2608
2609         rc = req_capsule_server_pack(pill);
2610         if (rc != 0) {
2611                 CERROR("Can't pack response, rc %d\n", rc);
2612                 RETURN(err_serious(rc));
2613         }
2614
2615         if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_SERVER)) {
2616                 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
2617                 LASSERT(repbody);
2618                 repbody->mbo_eadatasize = 0;
2619                 repbody->mbo_aclsize = 0;
2620         }
2621
2622         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_REINT_DELAY, 10);
2623
2624         /* for replay no cookkie / lmm need, because client have this already */
2625         if (info->mti_spec.no_create)
2626                 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
2627                         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, 0);
2628
2629         rc = mdt_init_ucred_reint(info);
2630         if (rc)
2631                 GOTO(out_shrink, rc);
2632
2633         rc = mdt_fix_attr_ucred(info, op);
2634         if (rc != 0)
2635                 GOTO(out_ucred, rc = err_serious(rc));
2636
2637         rc = mdt_check_resent(info, mdt_reconstruct, lhc);
2638         if (rc < 0) {
2639                 GOTO(out_ucred, rc);
2640         } else if (rc == 1) {
2641                 DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt");
2642                 rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg);
2643                 GOTO(out_ucred, rc);
2644         }
2645         rc = mdt_reint_rec(info, lhc);
2646         EXIT;
2647 out_ucred:
2648         mdt_exit_ucred(info);
2649 out_shrink:
2650         mdt_client_compatibility(info);
2651
2652         rc2 = mdt_fix_reply(info);
2653         if (rc == 0)
2654                 rc = rc2;
2655
2656         /*
2657          * Data-on-MDT optimization - read data along with OPEN and return it
2658          * in reply when possible.
2659          */
2660         if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req))
2661                 rc = mdt_dom_read_on_open(info, info->mti_mdt,
2662                                           &lhc->mlh_reg_lh);
2663
2664         return rc;
2665 }
2666
2667 static long mdt_reint_opcode(struct ptlrpc_request *req,
2668                              const struct req_format **fmt)
2669 {
2670         struct mdt_device       *mdt;
2671         struct mdt_rec_reint    *rec;
2672         long                     opc;
2673
2674         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
2675         if (rec != NULL) {
2676                 opc = rec->rr_opcode;
2677                 DEBUG_REQ(D_INODE, req, "reint opt = %ld", opc);
2678                 if (opc < REINT_MAX && fmt[opc] != NULL)
2679                         req_capsule_extend(&req->rq_pill, fmt[opc]);
2680                 else {
2681                         mdt = mdt_exp2dev(req->rq_export);
2682                         CERROR("%s: Unsupported opcode '%ld' from client '%s':"
2683                                " rc = %d\n", req->rq_export->exp_obd->obd_name,
2684                                opc, mdt->mdt_ldlm_client->cli_name, -EFAULT);
2685                         opc = err_serious(-EFAULT);
2686                 }
2687         } else {
2688                 opc = err_serious(-EFAULT);
2689         }
2690         return opc;
2691 }
2692
2693 static int mdt_reint(struct tgt_session_info *tsi)
2694 {
2695         long opc;
2696         int  rc;
2697         static const struct req_format *reint_fmts[REINT_MAX] = {
2698                 [REINT_SETATTR]  = &RQF_MDS_REINT_SETATTR,
2699                 [REINT_CREATE]   = &RQF_MDS_REINT_CREATE,
2700                 [REINT_LINK]     = &RQF_MDS_REINT_LINK,
2701                 [REINT_UNLINK]   = &RQF_MDS_REINT_UNLINK,
2702                 [REINT_RENAME]   = &RQF_MDS_REINT_RENAME,
2703                 [REINT_OPEN]     = &RQF_MDS_REINT_OPEN,
2704                 [REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR,
2705                 [REINT_RMENTRY]  = &RQF_MDS_REINT_UNLINK,
2706                 [REINT_MIGRATE]  = &RQF_MDS_REINT_MIGRATE,
2707                 [REINT_RESYNC]   = &RQF_MDS_REINT_RESYNC,
2708         };
2709
2710         ENTRY;
2711
2712         opc = mdt_reint_opcode(tgt_ses_req(tsi), reint_fmts);
2713         if (opc >= 0) {
2714                 struct mdt_thread_info *info = tsi2mdt_info(tsi);
2715                 /*
2716                  * No lock possible here from client to pass it to reint code
2717                  * path.
2718                  */
2719                 rc = mdt_reint_internal(info, NULL, opc);
2720                 mdt_thread_info_fini(info);
2721         } else {
2722                 rc = opc;
2723         }
2724
2725         tsi->tsi_reply_fail_id = OBD_FAIL_MDS_REINT_NET_REP;
2726         RETURN(rc);
2727 }
2728
2729 /* this should sync the whole device */
2730 int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt)
2731 {
2732         struct dt_device *dt = mdt->mdt_bottom;
2733         int rc;
2734         ENTRY;
2735
2736         rc = dt->dd_ops->dt_sync(env, dt);
2737         RETURN(rc);
2738 }
2739
2740 /* this should sync this object */
2741 static int mdt_object_sync(const struct lu_env *env, struct obd_export *exp,
2742                            struct mdt_object *mo)
2743 {
2744         int rc = 0;
2745
2746         ENTRY;
2747
2748         if (!mdt_object_exists(mo)) {
2749                 CWARN("%s: non existing object "DFID": rc = %d\n",
2750                       exp->exp_obd->obd_name, PFID(mdt_object_fid(mo)),
2751                       -ESTALE);
2752                 RETURN(-ESTALE);
2753         }
2754
2755         if (S_ISREG(lu_object_attr(&mo->mot_obj))) {
2756                 struct lu_target *tgt = tgt_ses_info(env)->tsi_tgt;
2757                 dt_obj_version_t version;
2758
2759                 version = dt_version_get(env, mdt_obj2dt(mo));
2760                 if (version > tgt->lut_obd->obd_last_committed)
2761                         rc = mo_object_sync(env, mdt_object_child(mo));
2762         } else {
2763                 rc = mo_object_sync(env, mdt_object_child(mo));
2764         }
2765
2766         RETURN(rc);
2767 }
2768
2769 static int mdt_sync(struct tgt_session_info *tsi)
2770 {
2771         struct ptlrpc_request   *req = tgt_ses_req(tsi);
2772         struct req_capsule      *pill = tsi->tsi_pill;
2773         struct mdt_body         *body;
2774         int                      rc;
2775
2776         ENTRY;
2777
2778         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
2779                 RETURN(err_serious(-ENOMEM));
2780
2781         if (fid_seq(&tsi->tsi_mdt_body->mbo_fid1) == 0) {
2782                 rc = mdt_device_sync(tsi->tsi_env, mdt_exp2dev(tsi->tsi_exp));
2783         } else {
2784                 struct mdt_thread_info *info = tsi2mdt_info(tsi);
2785
2786                 if (unlikely(info->mti_object == NULL))
2787                         RETURN(-EPROTO);
2788
2789                 /* sync an object */
2790                 rc = mdt_object_sync(tsi->tsi_env, tsi->tsi_exp,
2791                                      info->mti_object);
2792                 if (rc == 0) {
2793                         const struct lu_fid *fid;
2794                         struct lu_attr *la = &info->mti_attr.ma_attr;
2795
2796                         info->mti_attr.ma_need = MA_INODE;
2797                         info->mti_attr.ma_valid = 0;
2798                         rc = mdt_attr_get_complex(info, info->mti_object,
2799                                                   &info->mti_attr);
2800                         if (rc == 0) {
2801                                 body = req_capsule_server_get(pill,
2802                                                               &RMF_MDT_BODY);
2803                                 fid = mdt_object_fid(info->mti_object);
2804                                 mdt_pack_attr2body(info, body, la, fid);
2805                         }
2806                 }
2807                 mdt_thread_info_fini(info);
2808         }
2809         if (rc == 0)
2810                 mdt_counter_incr(req, LPROC_MDT_SYNC);
2811
2812         RETURN(rc);
2813 }
2814
2815 static int mdt_data_sync(struct tgt_session_info *tsi)
2816 {
2817         struct mdt_thread_info *info;
2818         struct mdt_device *mdt = mdt_exp2dev(tsi->tsi_exp);
2819         struct ost_body *body = tsi->tsi_ost_body;
2820         struct ost_body *repbody;
2821         struct mdt_object *mo = NULL;
2822         struct md_attr *ma;
2823         int rc = 0;
2824
2825         ENTRY;
2826
2827         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
2828
2829         /* if no fid is specified then do nothing,
2830          * device sync is done via MDS_SYNC */
2831         if (fid_is_zero(&tsi->tsi_fid))
2832                 RETURN(0);
2833
2834         mo = mdt_object_find(tsi->tsi_env, mdt, &tsi->tsi_fid);
2835         if (IS_ERR(mo))
2836                 RETURN(PTR_ERR(mo));
2837
2838         rc = mdt_object_sync(tsi->tsi_env, tsi->tsi_exp, mo);
2839         if (rc)
2840                 GOTO(put, rc);
2841
2842         repbody->oa.o_oi = body->oa.o_oi;
2843         repbody->oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2844
2845         info = tsi2mdt_info(tsi);
2846         ma = &info->mti_attr;
2847         ma->ma_need = MA_INODE;
2848         ma->ma_valid = 0;
2849         rc = mdt_attr_get_complex(info, mo, ma);
2850         if (rc == 0)
2851                 obdo_from_la(&repbody->oa, &ma->ma_attr, VALID_FLAGS);
2852         else
2853                 rc = 0;
2854         mdt_thread_info_fini(info);
2855
2856         EXIT;
2857 put:
2858         if (mo != NULL)
2859                 mdt_object_put(tsi->tsi_env, mo);
2860         return rc;
2861 }
2862
2863 /*
2864  * Handle quota control requests to consult current usage/limit, but also
2865  * to configure quota enforcement
2866  */
2867 static int mdt_quotactl(struct tgt_session_info *tsi)
2868 {
2869         struct obd_export *exp  = tsi->tsi_exp;
2870         struct req_capsule *pill = tsi->tsi_pill;
2871         struct obd_quotactl *oqctl, *repoqc;
2872         int id, rc;
2873         struct mdt_device *mdt = mdt_exp2dev(exp);
2874         struct lu_device *qmt = mdt->mdt_qmt_dev;
2875         struct lu_nodemap *nodemap;
2876         ENTRY;
2877
2878         oqctl = req_capsule_client_get(pill, &RMF_OBD_QUOTACTL);
2879         if (!oqctl)
2880                 RETURN(err_serious(-EPROTO));
2881
2882         rc = req_capsule_server_pack(pill);
2883         if (rc)
2884                 RETURN(err_serious(rc));
2885
2886         nodemap = nodemap_get_from_exp(exp);
2887         if (IS_ERR(nodemap))
2888                 RETURN(PTR_ERR(nodemap));
2889
2890         switch (oqctl->qc_cmd) {
2891                 /* master quotactl */
2892         case Q_SETINFO:
2893         case Q_SETQUOTA:
2894         case LUSTRE_Q_SETDEFAULT:
2895         case LUSTRE_Q_SETQUOTAPOOL:
2896         case LUSTRE_Q_SETINFOPOOL:
2897                 if (!nodemap_can_setquota(nodemap))
2898                         GOTO(out_nodemap, rc = -EPERM);
2899                 /* fallthrough */
2900         case Q_GETINFO:
2901         case Q_GETQUOTA:
2902         case LUSTRE_Q_GETDEFAULT:
2903         case LUSTRE_Q_GETQUOTAPOOL:
2904         case LUSTRE_Q_GETINFOPOOL:
2905                 if (qmt == NULL)
2906                         GOTO(out_nodemap, rc = -EOPNOTSUPP);
2907                 /* slave quotactl */
2908                 /* fallthrough */
2909         case Q_GETOINFO:
2910         case Q_GETOQUOTA:
2911                 break;
2912         default:
2913                 rc = -EFAULT;
2914                 CERROR("%s: unsupported quotactl command %d: rc = %d\n",
2915                        mdt_obd_name(mdt), oqctl->qc_cmd, rc);
2916                 GOTO(out_nodemap, rc);
2917         }
2918
2919         id = oqctl->qc_id;
2920         switch (oqctl->qc_type) {
2921         case USRQUOTA:
2922                 id = nodemap_map_id(nodemap, NODEMAP_UID,
2923                                     NODEMAP_CLIENT_TO_FS, id);
2924                 break;
2925         case GRPQUOTA:
2926                 id = nodemap_map_id(nodemap, NODEMAP_GID,
2927                                     NODEMAP_CLIENT_TO_FS, id);
2928                 break;
2929         case PRJQUOTA:
2930                 /* todo: check/map project id */
2931                 id = oqctl->qc_id;
2932                 break;
2933         default:
2934                 GOTO(out_nodemap, rc = -EOPNOTSUPP);
2935         }
2936         repoqc = req_capsule_server_get(pill, &RMF_OBD_QUOTACTL);
2937         if (repoqc == NULL)
2938                 GOTO(out_nodemap, rc = err_serious(-EFAULT));
2939
2940         if (oqctl->qc_cmd == Q_SETINFO || oqctl->qc_cmd == Q_SETQUOTA)
2941                 barrier_exit(tsi->tsi_tgt->lut_bottom);
2942
2943         if (oqctl->qc_id != id)
2944                 swap(oqctl->qc_id, id);
2945
2946         if (oqctl->qc_cmd == Q_SETINFO || oqctl->qc_cmd == Q_SETQUOTA) {
2947                 if (unlikely(!barrier_entry(tsi->tsi_tgt->lut_bottom)))
2948                         RETURN(-EINPROGRESS);
2949         }
2950
2951         switch (oqctl->qc_cmd) {
2952
2953         case Q_GETINFO:
2954         case Q_SETINFO:
2955         case Q_SETQUOTA:
2956         case Q_GETQUOTA:
2957         case LUSTRE_Q_SETDEFAULT:
2958         case LUSTRE_Q_GETDEFAULT:
2959         case LUSTRE_Q_SETQUOTAPOOL:
2960         case LUSTRE_Q_GETQUOTAPOOL:
2961         case LUSTRE_Q_SETINFOPOOL:
2962         case LUSTRE_Q_GETINFOPOOL:
2963                 /* forward quotactl request to QMT */
2964                 rc = qmt_hdls.qmth_quotactl(tsi->tsi_env, qmt, oqctl);
2965                 break;
2966
2967         case Q_GETOINFO:
2968         case Q_GETOQUOTA:
2969                 /* slave quotactl */
2970                 rc = lquotactl_slv(tsi->tsi_env, tsi->tsi_tgt->lut_bottom,
2971                                    oqctl);
2972                 break;
2973
2974         default:
2975                 CERROR("Unsupported quotactl command: %d\n", oqctl->qc_cmd);
2976                 GOTO(out_nodemap, rc = -EFAULT);
2977         }
2978
2979         if (oqctl->qc_id != id)
2980                 swap(oqctl->qc_id, id);
2981
2982         QCTL_COPY(repoqc, oqctl);
2983         EXIT;
2984
2985 out_nodemap:
2986         nodemap_putref(nodemap);
2987
2988         return rc;
2989 }
2990
2991 /** clone llog ctxt from child (mdd)
2992  * This allows remote llog (replicator) access.
2993  * We can either pass all llog RPCs (eg mdt_llog_create) on to child where the
2994  * context was originally set up, or we can handle them directly.
2995  * I choose the latter, but that means I need any llog
2996  * contexts set up by child to be accessable by the mdt.  So we clone the
2997  * context into our context list here.
2998  */
2999 static int mdt_llog_ctxt_clone(const struct lu_env *env, struct mdt_device *mdt,
3000                                int idx)
3001 {
3002         struct md_device  *next = mdt->mdt_child;
3003         struct llog_ctxt *ctxt;
3004         int rc;
3005
3006         if (!llog_ctxt_null(mdt2obd_dev(mdt), idx))
3007                 return 0;
3008
3009         rc = next->md_ops->mdo_llog_ctxt_get(env, next, idx, (void **)&ctxt);
3010         if (rc || ctxt == NULL) {
3011                 return 0;
3012         }
3013
3014         rc = llog_group_set_ctxt(&mdt2obd_dev(mdt)->obd_olg, ctxt, idx);
3015         if (rc)
3016                 CERROR("Can't set mdt ctxt %d\n", rc);
3017
3018         return rc;
3019 }
3020
3021 static int mdt_llog_ctxt_unclone(const struct lu_env *env,
3022                                  struct mdt_device *mdt, int idx)
3023 {
3024         struct llog_ctxt *ctxt;
3025
3026         ctxt = llog_get_context(mdt2obd_dev(mdt), idx);
3027         if (ctxt == NULL)
3028                 return 0;
3029         /* Put once for the get we just did, and once for the clone */
3030         llog_ctxt_put(ctxt);
3031         llog_ctxt_put(ctxt);
3032         return 0;
3033 }
3034
3035 /*
3036  * sec context handlers
3037  */
3038 static int mdt_sec_ctx_handle(struct tgt_session_info *tsi)
3039 {
3040         CFS_FAIL_TIMEOUT(OBD_FAIL_SEC_CTX_HDL_PAUSE, cfs_fail_val);
3041
3042         return 0;
3043 }
3044
3045 /*
3046  * quota request handlers
3047  */
3048 static int mdt_quota_dqacq(struct tgt_session_info *tsi)
3049 {
3050         struct mdt_device       *mdt = mdt_exp2dev(tsi->tsi_exp);
3051         struct lu_device        *qmt = mdt->mdt_qmt_dev;
3052         int                      rc;
3053         ENTRY;
3054
3055         if (qmt == NULL)
3056                 RETURN(err_serious(-EOPNOTSUPP));
3057
3058         rc = qmt_hdls.qmth_dqacq(tsi->tsi_env, qmt, tgt_ses_req(tsi));
3059         RETURN(rc);
3060 }
3061
3062 struct mdt_object *mdt_object_new(const struct lu_env *env,
3063                                   struct mdt_device *d,
3064                                   const struct lu_fid *f)
3065 {
3066         struct lu_object_conf conf = { .loc_flags = LOC_F_NEW };
3067         struct lu_object *o;
3068         struct mdt_object *m;
3069         ENTRY;
3070
3071         CDEBUG(D_INFO, "Allocate object for "DFID"\n", PFID(f));
3072         o = lu_object_find(env, &d->mdt_lu_dev, f, &conf);
3073         if (unlikely(IS_ERR(o)))
3074                 m = (struct mdt_object *)o;
3075         else
3076                 m = mdt_obj(o);
3077         RETURN(m);
3078 }
3079
3080 struct mdt_object *mdt_object_find(const struct lu_env *env,
3081                                    struct mdt_device *d,
3082                                    const struct lu_fid *f)
3083 {
3084         struct lu_object *o;
3085         struct mdt_object *m;
3086         ENTRY;
3087
3088         CDEBUG(D_INFO, "Find object for "DFID"\n", PFID(f));
3089         o = lu_object_find(env, &d->mdt_lu_dev, f, NULL);
3090         if (unlikely(IS_ERR(o)))
3091                 m = (struct mdt_object *)o;
3092         else
3093                 m = mdt_obj(o);
3094
3095         RETURN(m);
3096 }
3097
3098 /**
3099  * Asyncronous commit for mdt device.
3100  *
3101  * Pass asynchonous commit call down the MDS stack.
3102  *
3103  * \param env environment
3104  * \param mdt the mdt device
3105  */
3106 static void mdt_device_commit_async(const struct lu_env *env,
3107                                     struct mdt_device *mdt)
3108 {
3109         struct dt_device *dt = mdt->mdt_bottom;
3110         int rc;
3111         ENTRY;
3112
3113         rc = dt->dd_ops->dt_commit_async(env, dt);
3114         if (unlikely(rc != 0))
3115                 CWARN("%s: async commit start failed: rc = %d\n",
3116                       mdt_obd_name(mdt), rc);
3117         atomic_inc(&mdt->mdt_async_commit_count);
3118         EXIT;
3119 }
3120
3121 /**
3122  * Mark the lock as "synchonous".
3123  *
3124  * Mark the lock to deffer transaction commit to the unlock time.
3125  *
3126  * \param lock the lock to mark as "synchonous"
3127  *
3128  * \see mdt_is_lock_sync
3129  * \see mdt_save_lock
3130  */
3131 static inline void mdt_set_lock_sync(struct ldlm_lock *lock)
3132 {
3133         lock->l_ast_data = (void*)1;
3134 }
3135
3136 /**
3137  * Check whehter the lock "synchonous" or not.
3138  *
3139  * \param lock the lock to check
3140  * \retval 1 the lock is "synchonous"
3141  * \retval 0 the lock isn't "synchronous"
3142  *
3143  * \see mdt_set_lock_sync
3144  * \see mdt_save_lock
3145  */
3146 static inline int mdt_is_lock_sync(struct ldlm_lock *lock)
3147 {
3148         return lock->l_ast_data != NULL;
3149 }
3150
3151 /**
3152  * Blocking AST for mdt locks.
3153  *
3154  * Starts transaction commit if in case of COS lock conflict or
3155  * deffers such a commit to the mdt_save_lock.
3156  *
3157  * \param lock the lock which blocks a request or cancelling lock
3158  * \param desc unused
3159  * \param data unused
3160  * \param flag indicates whether this cancelling or blocking callback
3161  * \retval 0
3162  * \see ldlm_blocking_ast_nocheck
3163  */
3164 int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
3165                      void *data, int flag)
3166 {
3167         struct obd_device *obd = ldlm_lock_to_ns(lock)->ns_obd;
3168         struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
3169         struct ldlm_cb_set_arg *arg = data;
3170         bool commit_async = false;
3171         int rc;
3172         ENTRY;
3173
3174         if (flag == LDLM_CB_CANCELING)
3175                 RETURN(0);
3176
3177         lock_res_and_lock(lock);
3178         if (lock->l_blocking_ast != mdt_blocking_ast) {
3179                 unlock_res_and_lock(lock);
3180                 RETURN(0);
3181         }
3182
3183         /* A blocking ast may be sent from ldlm_lock_decref_internal