Whamcloud - gitweb
LU-13437 mdt: don't fetch LOOKUP lock for remote object
[fs/lustre-release.git] / lustre / mdt / mdt_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2010, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/mdt/mdt_handler.c
33  *
34  * Lustre Metadata Target (mdt) request handler
35  *
36  * Author: Peter Braam <braam@clusterfs.com>
37  * Author: Andreas Dilger <adilger@clusterfs.com>
38  * Author: Phil Schwan <phil@clusterfs.com>
39  * Author: Mike Shaver <shaver@clusterfs.com>
40  * Author: Nikita Danilov <nikita@clusterfs.com>
41  * Author: Huang Hua <huanghua@clusterfs.com>
42  * Author: Yury Umanets <umka@clusterfs.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_MDS
46
47 #include <linux/module.h>
48 #include <linux/pagemap.h>
49
50 #include <dt_object.h>
51 #include <lustre_acl.h>
52 #include <lustre_export.h>
53 #include <uapi/linux/lustre/lustre_ioctl.h>
54 #include <lustre_lfsck.h>
55 #include <lustre_log.h>
56 #include <lustre_nodemap.h>
57 #include <lustre_mds.h>
58 #include <uapi/linux/lustre/lustre_param.h>
59 #include <lustre_quota.h>
60 #include <lustre_swab.h>
61 #include <lustre_lmv.h>
62 #include <obd.h>
63 #include <obd_support.h>
64 #include <lustre_barrier.h>
65 #include <obd_cksum.h>
66 #include <llog_swab.h>
67
68 #include "mdt_internal.h"
69
70 static unsigned int max_mod_rpcs_per_client = 8;
71 module_param(max_mod_rpcs_per_client, uint, 0644);
72 MODULE_PARM_DESC(max_mod_rpcs_per_client, "maximum number of modify RPCs in flight allowed per client");
73
74 mdl_mode_t mdt_mdl_lock_modes[] = {
75         [LCK_MINMODE] = MDL_MINMODE,
76         [LCK_EX]      = MDL_EX,
77         [LCK_PW]      = MDL_PW,
78         [LCK_PR]      = MDL_PR,
79         [LCK_CW]      = MDL_CW,
80         [LCK_CR]      = MDL_CR,
81         [LCK_NL]      = MDL_NL,
82         [LCK_GROUP]   = MDL_GROUP
83 };
84
85 enum ldlm_mode mdt_dlm_lock_modes[] = {
86         [MDL_MINMODE]   = LCK_MINMODE,
87         [MDL_EX]        = LCK_EX,
88         [MDL_PW]        = LCK_PW,
89         [MDL_PR]        = LCK_PR,
90         [MDL_CW]        = LCK_CW,
91         [MDL_CR]        = LCK_CR,
92         [MDL_NL]        = LCK_NL,
93         [MDL_GROUP]     = LCK_GROUP
94 };
95
96 static struct mdt_device *mdt_dev(struct lu_device *d);
97
98 static const struct lu_object_operations mdt_obj_ops;
99
100 /* Slab for MDT object allocation */
101 static struct kmem_cache *mdt_object_kmem;
102
103 /* For HSM restore handles */
104 struct kmem_cache *mdt_hsm_cdt_kmem;
105
106 /* For HSM request handles */
107 struct kmem_cache *mdt_hsm_car_kmem;
108
109 static struct lu_kmem_descr mdt_caches[] = {
110         {
111                 .ckd_cache = &mdt_object_kmem,
112                 .ckd_name  = "mdt_obj",
113                 .ckd_size  = sizeof(struct mdt_object)
114         },
115         {
116                 .ckd_cache      = &mdt_hsm_cdt_kmem,
117                 .ckd_name       = "mdt_cdt_restore_handle",
118                 .ckd_size       = sizeof(struct cdt_restore_handle)
119         },
120         {
121                 .ckd_cache      = &mdt_hsm_car_kmem,
122                 .ckd_name       = "mdt_cdt_agent_req",
123                 .ckd_size       = sizeof(struct cdt_agent_req)
124         },
125         {
126                 .ckd_cache = NULL
127         }
128 };
129
130 __u64 mdt_get_disposition(struct ldlm_reply *rep, __u64 op_flag)
131 {
132         if (!rep)
133                 return 0;
134         return rep->lock_policy_res1 & op_flag;
135 }
136
137 void mdt_clear_disposition(struct mdt_thread_info *info,
138                            struct ldlm_reply *rep, __u64 op_flag)
139 {
140         if (info) {
141                 info->mti_opdata &= ~op_flag;
142                 tgt_opdata_clear(info->mti_env, op_flag);
143         }
144         if (rep)
145                 rep->lock_policy_res1 &= ~op_flag;
146 }
147
148 void mdt_set_disposition(struct mdt_thread_info *info,
149                          struct ldlm_reply *rep, __u64 op_flag)
150 {
151         if (info) {
152                 info->mti_opdata |= op_flag;
153                 tgt_opdata_set(info->mti_env, op_flag);
154         }
155         if (rep)
156                 rep->lock_policy_res1 |= op_flag;
157 }
158
159 void mdt_lock_reg_init(struct mdt_lock_handle *lh, enum ldlm_mode lm)
160 {
161         lh->mlh_pdo_hash = 0;
162         lh->mlh_reg_mode = lm;
163         lh->mlh_rreg_mode = lm;
164         lh->mlh_type = MDT_REG_LOCK;
165 }
166
167 void mdt_lock_pdo_init(struct mdt_lock_handle *lh, enum ldlm_mode lock_mode,
168                        const struct lu_name *lname)
169 {
170         lh->mlh_reg_mode = lock_mode;
171         lh->mlh_pdo_mode = LCK_MINMODE;
172         lh->mlh_rreg_mode = lock_mode;
173         lh->mlh_type = MDT_PDO_LOCK;
174
175         if (lu_name_is_valid(lname)) {
176                 lh->mlh_pdo_hash = ll_full_name_hash(NULL, lname->ln_name,
177                                                      lname->ln_namelen);
178                 /* XXX Workaround for LU-2856
179                  *
180                  * Zero is a valid return value of full_name_hash, but
181                  * several users of mlh_pdo_hash assume a non-zero
182                  * hash value. We therefore map zero onto an
183                  * arbitrary, but consistent value (1) to avoid
184                  * problems further down the road. */
185                 if (unlikely(lh->mlh_pdo_hash == 0))
186                         lh->mlh_pdo_hash = 1;
187         } else {
188                 lh->mlh_pdo_hash = 0;
189         }
190 }
191
192 static void mdt_lock_pdo_mode(struct mdt_thread_info *info, struct mdt_object *o,
193                               struct mdt_lock_handle *lh)
194 {
195         mdl_mode_t mode;
196         ENTRY;
197
198         /*
199          * Any dir access needs couple of locks:
200          *
201          * 1) on part of dir we gonna take lookup/modify;
202          *
203          * 2) on whole dir to protect it from concurrent splitting and/or to
204          * flush client's cache for readdir().
205          *
206          * so, for a given mode and object this routine decides what lock mode
207          * to use for lock #2:
208          *
209          * 1) if caller's gonna lookup in dir then we need to protect dir from
210          * being splitted only - LCK_CR
211          *
212          * 2) if caller's gonna modify dir then we need to protect dir from
213          * being splitted and to flush cache - LCK_CW
214          *
215          * 3) if caller's gonna modify dir and that dir seems ready for
216          * splitting then we need to protect it from any type of access
217          * (lookup/modify/split) - LCK_EX --bzzz
218          */
219
220         LASSERT(lh->mlh_reg_mode != LCK_MINMODE);
221         LASSERT(lh->mlh_pdo_mode == LCK_MINMODE);
222
223         /*
224          * Ask underlaying level its opinion about preferable PDO lock mode
225          * having access type passed as regular lock mode:
226          *
227          * - MDL_MINMODE means that lower layer does not want to specify lock
228          * mode;
229          *
230          * - MDL_NL means that no PDO lock should be taken. This is used in some
231          * cases. Say, for non-splittable directories no need to use PDO locks
232          * at all.
233          */
234         mode = mdo_lock_mode(info->mti_env, mdt_object_child(o),
235                              mdt_dlm_mode2mdl_mode(lh->mlh_reg_mode));
236
237         if (mode != MDL_MINMODE) {
238                 lh->mlh_pdo_mode = mdt_mdl_mode2dlm_mode(mode);
239         } else {
240                 /*
241                  * Lower layer does not want to specify locking mode. We do it
242                  * our selves. No special protection is needed, just flush
243                  * client's cache on modification and allow concurrent
244                  * mondification.
245                  */
246                 switch (lh->mlh_reg_mode) {
247                 case LCK_EX:
248                         lh->mlh_pdo_mode = LCK_EX;
249                         break;
250                 case LCK_PR:
251                         lh->mlh_pdo_mode = LCK_CR;
252                         break;
253                 case LCK_PW:
254                         lh->mlh_pdo_mode = LCK_CW;
255                         break;
256                 default:
257                         CERROR("Not expected lock type (0x%x)\n",
258                                (int)lh->mlh_reg_mode);
259                         LBUG();
260                 }
261         }
262
263         LASSERT(lh->mlh_pdo_mode != LCK_MINMODE);
264         EXIT;
265 }
266
267 static int mdt_lookup_fileset(struct mdt_thread_info *info, const char *fileset,
268                               struct lu_fid *fid)
269 {
270         struct mdt_device *mdt = info->mti_mdt;
271         struct lu_name *lname = &info->mti_name;
272         char *filename = info->mti_filename;
273         struct mdt_object *parent;
274         u32 mode;
275         int rc = 0;
276
277         LASSERT(!info->mti_cross_ref);
278
279         /*
280          * We may want to allow this to mount a completely separate
281          * fileset from the MDT in the future, but keeping it to
282          * ROOT/ only for now avoid potential security issues.
283          */
284         *fid = mdt->mdt_md_root_fid;
285
286         while (rc == 0 && fileset != NULL && *fileset != '\0') {
287                 const char *s1 = fileset;
288                 const char *s2;
289
290                 while (*++s1 == '/')
291                         ;
292                 s2 = s1;
293                 while (*s2 != '/' && *s2 != '\0')
294                         s2++;
295
296                 if (s2 == s1)
297                         break;
298
299                 fileset = s2;
300
301                 lname->ln_namelen = s2 - s1;
302                 if (lname->ln_namelen > NAME_MAX) {
303                         rc = -EINVAL;
304                         break;
305                 }
306
307                 /* reject .. as a path component */
308                 if (lname->ln_namelen == 2 &&
309                     strncmp(s1, "..", 2) == 0) {
310                         rc = -EINVAL;
311                         break;
312                 }
313
314                 strncpy(filename, s1, lname->ln_namelen);
315                 filename[lname->ln_namelen] = '\0';
316                 lname->ln_name = filename;
317
318                 parent = mdt_object_find(info->mti_env, mdt, fid);
319                 if (IS_ERR(parent)) {
320                         rc = PTR_ERR(parent);
321                         break;
322                 }
323                 /* Only got the fid of this obj by name */
324                 fid_zero(fid);
325                 rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname,
326                                 fid, &info->mti_spec);
327                 mdt_object_put(info->mti_env, parent);
328         }
329         if (!rc) {
330                 parent = mdt_object_find(info->mti_env, mdt, fid);
331                 if (IS_ERR(parent))
332                         rc = PTR_ERR(parent);
333                 else {
334                         mode = lu_object_attr(&parent->mot_obj);
335                         mdt_object_put(info->mti_env, parent);
336                         if (!S_ISDIR(mode))
337                                 rc = -ENOTDIR;
338                 }
339         }
340
341         return rc;
342 }
343
344 static int mdt_get_root(struct tgt_session_info *tsi)
345 {
346         struct mdt_thread_info  *info = tsi2mdt_info(tsi);
347         struct mdt_device       *mdt = info->mti_mdt;
348         struct mdt_body         *repbody;
349         char                    *fileset = NULL, *buffer = NULL;
350         int                      rc;
351         struct obd_export       *exp = info->mti_exp;
352         char                    *nodemap_fileset;
353
354         ENTRY;
355
356         rc = mdt_check_ucred(info);
357         if (rc)
358                 GOTO(out, rc = err_serious(rc));
359
360         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GET_ROOT_PACK))
361                 GOTO(out, rc = err_serious(-ENOMEM));
362
363         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
364         if (req_capsule_get_size(info->mti_pill, &RMF_NAME, RCL_CLIENT) > 0) {
365                 fileset = req_capsule_client_get(info->mti_pill, &RMF_NAME);
366                 if (fileset == NULL)
367                         GOTO(out, rc = err_serious(-EFAULT));
368         }
369
370         nodemap_fileset = nodemap_get_fileset(exp->exp_target_data.ted_nodemap);
371         if (nodemap_fileset && nodemap_fileset[0]) {
372                 CDEBUG(D_INFO, "nodemap fileset is %s\n", nodemap_fileset);
373                 if (fileset) {
374                         /* consider fileset from client as a sub-fileset
375                          * of the nodemap one */
376                         OBD_ALLOC(buffer, PATH_MAX + 1);
377                         if (buffer == NULL)
378                                 GOTO(out, rc = err_serious(-ENOMEM));
379                         if (snprintf(buffer, PATH_MAX + 1, "%s/%s",
380                                      nodemap_fileset, fileset) >= PATH_MAX + 1)
381                                 GOTO(out, rc = err_serious(-EINVAL));
382                         fileset = buffer;
383                 } else {
384                         /* enforce fileset as specified in the nodemap */
385                         fileset = nodemap_fileset;
386                 }
387         }
388
389         if (fileset) {
390                 CDEBUG(D_INFO, "Getting fileset %s\n", fileset);
391                 rc = mdt_lookup_fileset(info, fileset, &repbody->mbo_fid1);
392                 if (rc < 0)
393                         GOTO(out, rc = err_serious(rc));
394         } else {
395                 repbody->mbo_fid1 = mdt->mdt_md_root_fid;
396         }
397         repbody->mbo_valid |= OBD_MD_FLID;
398
399         EXIT;
400 out:
401         mdt_thread_info_fini(info);
402         if (buffer)
403                 OBD_FREE(buffer, PATH_MAX+1);
404         return rc;
405 }
406
407 static int mdt_statfs(struct tgt_session_info *tsi)
408 {
409         struct ptlrpc_request *req = tgt_ses_req(tsi);
410         struct mdt_thread_info *info = tsi2mdt_info(tsi);
411         struct mdt_device *mdt = info->mti_mdt;
412         struct tg_grants_data *tgd = &mdt->mdt_lut.lut_tgd;
413         struct md_device *next = mdt->mdt_child;
414         struct ptlrpc_service_part *svcpt;
415         struct obd_statfs *osfs;
416         struct mdt_body *reqbody = NULL;
417         struct mdt_statfs_cache *msf;
418         int rc;
419
420         ENTRY;
421
422         svcpt = req->rq_rqbd->rqbd_svcpt;
423
424         /* This will trigger a watchdog timeout */
425         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_STATFS_LCW_SLEEP,
426                          (MDT_SERVICE_WATCHDOG_FACTOR *
427                           at_get(&svcpt->scp_at_estimate)) + 1);
428
429         rc = mdt_check_ucred(info);
430         if (rc)
431                 GOTO(out, rc = err_serious(rc));
432
433         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_STATFS_PACK))
434                 GOTO(out, rc = err_serious(-ENOMEM));
435
436         osfs = req_capsule_server_get(info->mti_pill, &RMF_OBD_STATFS);
437         if (!osfs)
438                 GOTO(out, rc = -EPROTO);
439
440         if (mdt_is_sum_statfs_client(req->rq_export) &&
441                 lustre_packed_msg_size(req->rq_reqmsg) ==
442                 req_capsule_fmt_size(req->rq_reqmsg->lm_magic,
443                                      &RQF_MDS_STATFS_NEW, RCL_CLIENT)) {
444                 req_capsule_extend(info->mti_pill, &RQF_MDS_STATFS_NEW);
445                 reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
446         }
447
448         if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS)
449                 msf = &mdt->mdt_sum_osfs;
450         else
451                 msf = &mdt->mdt_osfs;
452
453         if (msf->msf_age + OBD_STATFS_CACHE_SECONDS <= ktime_get_seconds()) {
454                         /** statfs data is too old, get up-to-date one */
455                         if (reqbody && reqbody->mbo_valid & OBD_MD_FLAGSTATFS)
456                                 rc = next->md_ops->mdo_statfs(info->mti_env,
457                                                               next, osfs);
458                         else
459                                 rc = dt_statfs(info->mti_env, mdt->mdt_bottom,
460                                                osfs);
461                         if (rc)
462                                 GOTO(out, rc);
463                         spin_lock(&mdt->mdt_lock);
464                         msf->msf_osfs = *osfs;
465                         msf->msf_age = ktime_get_seconds();
466                         spin_unlock(&mdt->mdt_lock);
467         } else {
468                         /** use cached statfs data */
469                         spin_lock(&mdt->mdt_lock);
470                         *osfs = msf->msf_osfs;
471                         spin_unlock(&mdt->mdt_lock);
472         }
473
474         /* at least try to account for cached pages.  its still racy and
475          * might be under-reporting if clients haven't announced their
476          * caches with brw recently */
477         CDEBUG(D_SUPER | D_CACHE, "blocks cached %llu granted %llu"
478                " pending %llu free %llu avail %llu\n",
479                tgd->tgd_tot_dirty, tgd->tgd_tot_granted,
480                tgd->tgd_tot_pending,
481                osfs->os_bfree << tgd->tgd_blockbits,
482                osfs->os_bavail << tgd->tgd_blockbits);
483
484         osfs->os_bavail -= min_t(u64, osfs->os_bavail,
485                                  ((tgd->tgd_tot_dirty + tgd->tgd_tot_pending +
486                                    osfs->os_bsize - 1) >> tgd->tgd_blockbits));
487
488         tgt_grant_sanity_check(mdt->mdt_lu_dev.ld_obd, __func__);
489         CDEBUG(D_CACHE, "%llu blocks: %llu free, %llu avail; "
490                "%llu objects: %llu free; state %x\n",
491                osfs->os_blocks, osfs->os_bfree, osfs->os_bavail,
492                osfs->os_files, osfs->os_ffree, osfs->os_state);
493
494         if (!exp_grant_param_supp(tsi->tsi_exp) &&
495             tgd->tgd_blockbits > COMPAT_BSIZE_SHIFT) {
496                 /* clients which don't support OBD_CONNECT_GRANT_PARAM
497                  * should not see a block size > page size, otherwise
498                  * cl_lost_grant goes mad. Therefore, we emulate a 4KB (=2^12)
499                  * block size which is the biggest block size known to work
500                  * with all client's page size. */
501                 osfs->os_blocks <<= tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT;
502                 osfs->os_bfree  <<= tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT;
503                 osfs->os_bavail <<= tgd->tgd_blockbits - COMPAT_BSIZE_SHIFT;
504                 osfs->os_bsize = 1 << COMPAT_BSIZE_SHIFT;
505         }
506         if (rc == 0)
507                 mdt_counter_incr(req, LPROC_MDT_STATFS);
508 out:
509         mdt_thread_info_fini(info);
510         RETURN(rc);
511 }
512
513 __u32 mdt_lmm_dom_entry_check(struct lov_mds_md *lmm, int *is_dom_only)
514 {
515         struct lov_comp_md_v1 *comp_v1;
516         struct lov_mds_md *v1;
517         __u32 off;
518         __u32 dom_stripesize = 0;
519         int i;
520         bool has_ost_stripes = false;
521
522         ENTRY;
523
524         if (is_dom_only)
525                 *is_dom_only = 0;
526
527         if (le32_to_cpu(lmm->lmm_magic) != LOV_MAGIC_COMP_V1)
528                 RETURN(0);
529
530         comp_v1 = (struct lov_comp_md_v1 *)lmm;
531         off = le32_to_cpu(comp_v1->lcm_entries[0].lcme_offset);
532         v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
533
534         /* Fast check for DoM entry with no mirroring, should be the first */
535         if (le16_to_cpu(comp_v1->lcm_mirror_count) == 0 &&
536             lov_pattern(le32_to_cpu(v1->lmm_pattern)) != LOV_PATTERN_MDT)
537                 RETURN(0);
538
539         /* check all entries otherwise */
540         for (i = 0; i < le16_to_cpu(comp_v1->lcm_entry_count); i++) {
541                 struct lov_comp_md_entry_v1 *lcme;
542
543                 lcme = &comp_v1->lcm_entries[i];
544                 if (!(le32_to_cpu(lcme->lcme_flags) & LCME_FL_INIT))
545                         continue;
546
547                 off = le32_to_cpu(lcme->lcme_offset);
548                 v1 = (struct lov_mds_md *)((char *)comp_v1 + off);
549
550                 if (lov_pattern(le32_to_cpu(v1->lmm_pattern)) ==
551                     LOV_PATTERN_MDT)
552                         dom_stripesize = le32_to_cpu(v1->lmm_stripe_size);
553                 else
554                         has_ost_stripes = true;
555
556                 if (dom_stripesize && has_ost_stripes)
557                         RETURN(dom_stripesize);
558         }
559         /* DoM-only case exits here */
560         if (is_dom_only && dom_stripesize)
561                 *is_dom_only = 1;
562         RETURN(dom_stripesize);
563 }
564
565 /**
566  * Pack size attributes into the reply.
567  */
568 int mdt_pack_size2body(struct mdt_thread_info *info,
569                         const struct lu_fid *fid, struct lustre_handle *lh)
570 {
571         struct mdt_body *b;
572         struct md_attr *ma = &info->mti_attr;
573         __u32 dom_stripe;
574         bool dom_lock = false;
575
576         ENTRY;
577
578         LASSERT(ma->ma_attr.la_valid & LA_MODE);
579
580         if (!S_ISREG(ma->ma_attr.la_mode) ||
581             !(ma->ma_valid & MA_LOV && ma->ma_lmm != NULL))
582                 RETURN(-ENODATA);
583
584         dom_stripe = mdt_lmm_dom_stripesize(ma->ma_lmm);
585         /* no DoM stripe, no size in reply */
586         if (!dom_stripe)
587                 RETURN(-ENOENT);
588
589         if (lustre_handle_is_used(lh)) {
590                 struct ldlm_lock *lock;
591
592                 lock = ldlm_handle2lock(lh);
593                 if (lock != NULL) {
594                         dom_lock = ldlm_has_dom(lock);
595                         LDLM_LOCK_PUT(lock);
596                 }
597         }
598
599         /* no DoM lock, no size in reply */
600         if (!dom_lock)
601                 RETURN(0);
602
603         /* Either DoM lock exists or LMM has only DoM stripe then
604          * return size on body. */
605         b = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
606
607         mdt_dom_object_size(info->mti_env, info->mti_mdt, fid, b, dom_lock);
608         RETURN(0);
609 }
610
611 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
612 /*
613  * Pack ACL data into the reply. UIDs/GIDs are mapped and filtered by nodemap.
614  *
615  * \param       info    thread info object
616  * \param       repbody reply to pack ACLs into
617  * \param       o       mdt object of file to examine
618  * \param       nodemap nodemap of client to reply to
619  * \retval      0       success
620  * \retval      -errno  error getting or parsing ACL from disk
621  */
622 int mdt_pack_acl2body(struct mdt_thread_info *info, struct mdt_body *repbody,
623                       struct mdt_object *o, struct lu_nodemap *nodemap)
624 {
625         const struct lu_env     *env = info->mti_env;
626         struct md_object        *next = mdt_object_child(o);
627         struct lu_buf           *buf = &info->mti_buf;
628         struct mdt_device       *mdt = info->mti_mdt;
629         struct req_capsule *pill = info->mti_pill;
630         int rc;
631
632         ENTRY;
633
634         buf->lb_buf = req_capsule_server_get(pill, &RMF_ACL);
635         buf->lb_len = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER);
636         if (buf->lb_len == 0)
637                 RETURN(0);
638
639 again:
640         rc = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_ACCESS);
641         if (rc < 0) {
642                 if (rc == -ENODATA) {
643                         repbody->mbo_aclsize = 0;
644                         repbody->mbo_valid |= OBD_MD_FLACL;
645                         rc = 0;
646                 } else if (rc == -EOPNOTSUPP) {
647                         rc = 0;
648                 } else {
649                         if (rc == -ERANGE &&
650                             exp_connect_large_acl(info->mti_exp) &&
651                             buf->lb_buf != info->mti_big_acl) {
652                                 if (info->mti_big_acl == NULL) {
653                                         info->mti_big_aclsize =
654                                                         min_t(unsigned int,
655                                                               mdt->mdt_max_ea_size,
656                                                               XATTR_SIZE_MAX);
657                                         OBD_ALLOC_LARGE(info->mti_big_acl,
658                                                         info->mti_big_aclsize);
659                                         if (info->mti_big_acl == NULL) {
660                                                 info->mti_big_aclsize = 0;
661                                                 CERROR("%s: unable to grow "
662                                                        DFID" ACL buffer\n",
663                                                        mdt_obd_name(mdt),
664                                                        PFID(mdt_object_fid(o)));
665                                                 RETURN(-ENOMEM);
666                                         }
667                                 }
668
669                                 CDEBUG(D_INODE, "%s: grow the "DFID
670                                        " ACL buffer to size %d\n",
671                                        mdt_obd_name(mdt),
672                                        PFID(mdt_object_fid(o)),
673                                        info->mti_big_aclsize);
674
675                                 buf->lb_buf = info->mti_big_acl;
676                                 buf->lb_len = info->mti_big_aclsize;
677
678                                 goto again;
679                         }
680
681                         CERROR("%s: unable to read "DFID" ACL: rc = %d\n",
682                                mdt_obd_name(mdt), PFID(mdt_object_fid(o)), rc);
683                 }
684         } else {
685                 int client;
686                 int server;
687                 int acl_buflen;
688                 int lmm_buflen = 0;
689                 int lmmsize = 0;
690
691                 acl_buflen = req_capsule_get_size(pill, &RMF_ACL, RCL_SERVER);
692                 if (acl_buflen >= rc)
693                         goto map;
694
695                 /* If LOV/LMA EA is small, we can reuse part of their buffer */
696                 client = ptlrpc_req_get_repsize(pill->rc_req);
697                 server = lustre_packed_msg_size(pill->rc_req->rq_repmsg);
698                 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER)) {
699                         lmm_buflen = req_capsule_get_size(pill, &RMF_MDT_MD,
700                                                           RCL_SERVER);
701                         lmmsize = repbody->mbo_eadatasize;
702                 }
703
704                 if (client < server - acl_buflen - lmm_buflen + rc + lmmsize) {
705                         CDEBUG(D_INODE, "%s: client prepared buffer size %d "
706                                "is not big enough with the ACL size %d (%d)\n",
707                                mdt_obd_name(mdt), client, rc,
708                                server - acl_buflen - lmm_buflen + rc + lmmsize);
709                         repbody->mbo_aclsize = 0;
710                         repbody->mbo_valid &= ~OBD_MD_FLACL;
711                         RETURN(-ERANGE);
712                 }
713
714 map:
715                 if (buf->lb_buf == info->mti_big_acl)
716                         info->mti_big_acl_used = 1;
717
718                 rc = nodemap_map_acl(nodemap, buf->lb_buf,
719                                      rc, NODEMAP_FS_TO_CLIENT);
720                 /* if all ACLs mapped out, rc is still >= 0 */
721                 if (rc < 0) {
722                         CERROR("%s: nodemap_map_acl unable to parse "DFID
723                                " ACL: rc = %d\n", mdt_obd_name(mdt),
724                                PFID(mdt_object_fid(o)), rc);
725                         repbody->mbo_aclsize = 0;
726                         repbody->mbo_valid &= ~OBD_MD_FLACL;
727                 } else {
728                         repbody->mbo_aclsize = rc;
729                         repbody->mbo_valid |= OBD_MD_FLACL;
730                         rc = 0;
731                 }
732         }
733
734         RETURN(rc);
735 }
736 #endif
737
738 /* XXX Look into layout in MDT layer. */
739 static inline bool mdt_hsm_is_released(struct lov_mds_md *lmm)
740 {
741         struct lov_comp_md_v1   *comp_v1;
742         struct lov_mds_md       *v1;
743         int                      i;
744
745         if (lmm->lmm_magic == LOV_MAGIC_COMP_V1) {
746                 comp_v1 = (struct lov_comp_md_v1 *)lmm;
747
748                 for (i = 0; i < comp_v1->lcm_entry_count; i++) {
749                         v1 = (struct lov_mds_md *)((char *)comp_v1 +
750                                 comp_v1->lcm_entries[i].lcme_offset);
751                         /* We don't support partial release for now */
752                         if (!(v1->lmm_pattern & LOV_PATTERN_F_RELEASED))
753                                 return false;
754                 }
755                 return true;
756         } else {
757                 return (lmm->lmm_pattern & LOV_PATTERN_F_RELEASED) ?
758                         true : false;
759         }
760 }
761
762 void mdt_pack_attr2body(struct mdt_thread_info *info, struct mdt_body *b,
763                         const struct lu_attr *attr, const struct lu_fid *fid)
764 {
765         struct md_attr *ma = &info->mti_attr;
766         struct obd_export *exp = info->mti_exp;
767         struct lu_nodemap *nodemap = NULL;
768
769         LASSERT(ma->ma_valid & MA_INODE);
770
771         if (attr->la_valid & LA_ATIME) {
772                 b->mbo_atime = attr->la_atime;
773                 b->mbo_valid |= OBD_MD_FLATIME;
774         }
775         if (attr->la_valid & LA_MTIME) {
776                 b->mbo_mtime = attr->la_mtime;
777                 b->mbo_valid |= OBD_MD_FLMTIME;
778         }
779         if (attr->la_valid & LA_CTIME) {
780                 b->mbo_ctime = attr->la_ctime;
781                 b->mbo_valid |= OBD_MD_FLCTIME;
782         }
783         if (attr->la_valid & LA_BTIME) {
784                 b->mbo_btime = attr->la_btime;
785                 b->mbo_valid |= OBD_MD_FLBTIME;
786         }
787         if (attr->la_valid & LA_FLAGS) {
788                 b->mbo_flags = attr->la_flags;
789                 b->mbo_valid |= OBD_MD_FLFLAGS;
790         }
791         if (attr->la_valid & LA_NLINK) {
792                 b->mbo_nlink = attr->la_nlink;
793                 b->mbo_valid |= OBD_MD_FLNLINK;
794         }
795         if (attr->la_valid & (LA_UID|LA_GID)) {
796                 nodemap = nodemap_get_from_exp(exp);
797                 if (IS_ERR(nodemap))
798                         goto out;
799         }
800         if (attr->la_valid & LA_UID) {
801                 b->mbo_uid = nodemap_map_id(nodemap, NODEMAP_UID,
802                                             NODEMAP_FS_TO_CLIENT,
803                                             attr->la_uid);
804                 b->mbo_valid |= OBD_MD_FLUID;
805         }
806         if (attr->la_valid & LA_GID) {
807                 b->mbo_gid = nodemap_map_id(nodemap, NODEMAP_GID,
808                                             NODEMAP_FS_TO_CLIENT,
809                                             attr->la_gid);
810                 b->mbo_valid |= OBD_MD_FLGID;
811         }
812
813         if (attr->la_valid & LA_PROJID) {
814                 /* TODO, nodemap for project id */
815                 b->mbo_projid = attr->la_projid;
816                 b->mbo_valid |= OBD_MD_FLPROJID;
817         }
818
819         b->mbo_mode = attr->la_mode;
820         if (attr->la_valid & LA_MODE)
821                 b->mbo_valid |= OBD_MD_FLMODE;
822         if (attr->la_valid & LA_TYPE)
823                 b->mbo_valid |= OBD_MD_FLTYPE;
824
825         if (fid != NULL) {
826                 b->mbo_fid1 = *fid;
827                 b->mbo_valid |= OBD_MD_FLID;
828                 CDEBUG(D_INODE, DFID": nlink=%d, mode=%o, valid=%#llx\n",
829                        PFID(fid), b->mbo_nlink, b->mbo_mode, b->mbo_valid);
830         }
831
832         if (!(attr->la_valid & LA_TYPE))
833                 return;
834
835         b->mbo_rdev   = attr->la_rdev;
836         b->mbo_size   = attr->la_size;
837         b->mbo_blocks = attr->la_blocks;
838
839         if (!S_ISREG(attr->la_mode)) {
840                 b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS | OBD_MD_FLRDEV;
841         } else if (ma->ma_need & MA_LOV && !(ma->ma_valid & MA_LOV)) {
842                 /* means no objects are allocated on osts. */
843                 LASSERT(!(ma->ma_valid & MA_LOV));
844                 /* just ignore blocks occupied by extend attributes on MDS */
845                 b->mbo_blocks = 0;
846                 /* if no object is allocated on osts, the size on mds is valid.
847                  * b=22272 */
848                 b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
849         } else if ((ma->ma_valid & MA_LOV) && ma->ma_lmm != NULL) {
850                 if (mdt_hsm_is_released(ma->ma_lmm)) {
851                         /* A released file stores its size on MDS. */
852                         /* But return 1 block for released file, unless tools
853                          * like tar will consider it fully sparse. (LU-3864)
854                          */
855                         if (unlikely(b->mbo_size == 0))
856                                 b->mbo_blocks = 0;
857                         else
858                                 b->mbo_blocks = 1;
859                         b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
860                 } else if (info->mti_som_valid) { /* som is valid */
861                         b->mbo_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
862                 } else if (ma->ma_valid & MA_SOM) { /* lsom is valid */
863                         b->mbo_valid |= OBD_MD_FLLAZYSIZE | OBD_MD_FLLAZYBLOCKS;
864                         b->mbo_size = ma->ma_som.ms_size;
865                         b->mbo_blocks = ma->ma_som.ms_blocks;
866                 }
867         }
868
869         if (fid != NULL && (b->mbo_valid & OBD_MD_FLSIZE ||
870                             b->mbo_valid & OBD_MD_FLLAZYSIZE))
871                 CDEBUG(D_VFSTRACE, DFID": returning size %llu\n",
872                        PFID(fid), (unsigned long long)b->mbo_size);
873
874 out:
875         if (!IS_ERR_OR_NULL(nodemap))
876                 nodemap_putref(nodemap);
877 }
878
879 static inline int mdt_body_has_lov(const struct lu_attr *la,
880                                    const struct mdt_body *body)
881 {
882         return (S_ISREG(la->la_mode) && (body->mbo_valid & OBD_MD_FLEASIZE)) ||
883                (S_ISDIR(la->la_mode) && (body->mbo_valid & OBD_MD_FLDIREA));
884 }
885
886 void mdt_client_compatibility(struct mdt_thread_info *info)
887 {
888         struct mdt_body       *body;
889         struct ptlrpc_request *req = mdt_info_req(info);
890         struct obd_export     *exp = req->rq_export;
891         struct md_attr        *ma = &info->mti_attr;
892         struct lu_attr        *la = &ma->ma_attr;
893         ENTRY;
894
895         if (exp_connect_layout(exp))
896                 /* the client can deal with 16-bit lmm_stripe_count */
897                 RETURN_EXIT;
898
899         body = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
900
901         if (!mdt_body_has_lov(la, body))
902                 RETURN_EXIT;
903
904         /* now we have a reply with a lov for a client not compatible with the
905          * layout lock so we have to clean the layout generation number */
906         if (S_ISREG(la->la_mode))
907                 ma->ma_lmm->lmm_layout_gen = 0;
908         EXIT;
909 }
910
911 static int mdt_attr_get_eabuf_size(struct mdt_thread_info *info,
912                                    struct mdt_object *o)
913 {
914         const struct lu_env *env = info->mti_env;
915         int rc, rc2;
916
917         rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL,
918                           XATTR_NAME_LOV);
919
920         if (rc == -ENODATA)
921                 rc = 0;
922
923         if (rc < 0)
924                 goto out;
925
926         /* Is it a directory? Let's check for the LMV as well */
927         if (S_ISDIR(lu_object_attr(&mdt_object_child(o)->mo_lu))) {
928                 rc2 = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL,
929                                    XATTR_NAME_LMV);
930
931                 if (rc2 == -ENODATA)
932                         rc2 = mo_xattr_get(env, mdt_object_child(o),
933                                            &LU_BUF_NULL,
934                                            XATTR_NAME_DEFAULT_LMV);
935
936                 if ((rc2 < 0 && rc2 != -ENODATA) || (rc2 > rc))
937                         rc = rc2;
938         }
939
940 out:
941         return rc;
942 }
943
944 int mdt_big_xattr_get(struct mdt_thread_info *info, struct mdt_object *o,
945                       const char *name)
946 {
947         const struct lu_env *env = info->mti_env;
948         int rc;
949         ENTRY;
950
951         LASSERT(info->mti_big_lmm_used == 0);
952         rc = mo_xattr_get(env, mdt_object_child(o), &LU_BUF_NULL, name);
953         if (rc < 0)
954                 RETURN(rc);
955
956         /* big_lmm may need to be grown */
957         if (info->mti_big_lmmsize < rc) {
958                 int size = size_roundup_power2(rc);
959
960                 if (info->mti_big_lmmsize > 0) {
961                         /* free old buffer */
962                         LASSERT(info->mti_big_lmm);
963                         OBD_FREE_LARGE(info->mti_big_lmm,
964                                        info->mti_big_lmmsize);
965                         info->mti_big_lmm = NULL;
966                         info->mti_big_lmmsize = 0;
967                 }
968
969                 OBD_ALLOC_LARGE(info->mti_big_lmm, size);
970                 if (info->mti_big_lmm == NULL)
971                         RETURN(-ENOMEM);
972                 info->mti_big_lmmsize = size;
973         }
974         LASSERT(info->mti_big_lmmsize >= rc);
975
976         info->mti_buf.lb_buf = info->mti_big_lmm;
977         info->mti_buf.lb_len = info->mti_big_lmmsize;
978         rc = mo_xattr_get(env, mdt_object_child(o), &info->mti_buf, name);
979
980         RETURN(rc);
981 }
982
983 int __mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
984                      struct md_attr *ma, const char *name)
985 {
986         struct md_object *next = mdt_object_child(o);
987         struct lu_buf    *buf = &info->mti_buf;
988         int rc;
989
990         if (strcmp(name, XATTR_NAME_LOV) == 0) {
991                 buf->lb_buf = ma->ma_lmm;
992                 buf->lb_len = ma->ma_lmm_size;
993                 LASSERT(!(ma->ma_valid & MA_LOV));
994         } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
995                 buf->lb_buf = ma->ma_lmv;
996                 buf->lb_len = ma->ma_lmv_size;
997                 LASSERT(!(ma->ma_valid & MA_LMV));
998         } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
999                 buf->lb_buf = ma->ma_default_lmv;
1000                 buf->lb_len = ma->ma_default_lmv_size;
1001                 LASSERT(!(ma->ma_valid & MA_LMV_DEF));
1002         } else {
1003                 return -EINVAL;
1004         }
1005
1006         LASSERT(buf->lb_buf);
1007
1008         rc = mo_xattr_get(info->mti_env, next, buf, name);
1009         if (rc > 0) {
1010
1011 got:
1012                 if (strcmp(name, XATTR_NAME_LOV) == 0) {
1013                         if (info->mti_big_lmm_used)
1014                                 ma->ma_lmm = info->mti_big_lmm;
1015
1016                         /* NOT return LOV EA with hole to old client. */
1017                         if (unlikely(le32_to_cpu(ma->ma_lmm->lmm_pattern) &
1018                                      LOV_PATTERN_F_HOLE) &&
1019                             !(exp_connect_flags(info->mti_exp) &
1020                               OBD_CONNECT_LFSCK)) {
1021                                 return -EIO;
1022                         } else {
1023                                 ma->ma_lmm_size = rc;
1024                                 ma->ma_valid |= MA_LOV;
1025                         }
1026                 } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
1027                         if (info->mti_big_lmm_used)
1028                                 ma->ma_lmv = info->mti_big_lmm;
1029
1030                         ma->ma_lmv_size = rc;
1031                         ma->ma_valid |= MA_LMV;
1032                 } else if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0) {
1033                         ma->ma_default_lmv_size = rc;
1034                         ma->ma_valid |= MA_LMV_DEF;
1035                 }
1036
1037                 /* Update mdt_max_mdsize so all clients will be aware that */
1038                 if (info->mti_mdt->mdt_max_mdsize < rc)
1039                         info->mti_mdt->mdt_max_mdsize = rc;
1040
1041                 rc = 0;
1042         } else if (rc == -ENODATA) {
1043                 /* no LOV EA */
1044                 rc = 0;
1045         } else if (rc == -ERANGE) {
1046                 /* Default LMV has fixed size, so it must be able to fit
1047                  * in the original buffer */
1048                 if (strcmp(name, XATTR_NAME_DEFAULT_LMV) == 0)
1049                         return rc;
1050                 rc = mdt_big_xattr_get(info, o, name);
1051                 if (rc > 0) {
1052                         info->mti_big_lmm_used = 1;
1053                         goto got;
1054                 }
1055         }
1056
1057         return rc;
1058 }
1059
1060 int mdt_stripe_get(struct mdt_thread_info *info, struct mdt_object *o,
1061                    struct md_attr *ma, const char *name)
1062 {
1063         int rc;
1064
1065         if (!info->mti_big_lmm) {
1066                 OBD_ALLOC(info->mti_big_lmm, PAGE_SIZE);
1067                 if (!info->mti_big_lmm)
1068                         return -ENOMEM;
1069                 info->mti_big_lmmsize = PAGE_SIZE;
1070         }
1071
1072         if (strcmp(name, XATTR_NAME_LOV) == 0) {
1073                 ma->ma_lmm = info->mti_big_lmm;
1074                 ma->ma_lmm_size = info->mti_big_lmmsize;
1075                 ma->ma_valid &= ~MA_LOV;
1076         } else if (strcmp(name, XATTR_NAME_LMV) == 0) {
1077                 ma->ma_lmv = info->mti_big_lmm;
1078                 ma->ma_lmv_size = info->mti_big_lmmsize;
1079                 ma->ma_valid &= ~MA_LMV;
1080         } else {
1081                 LBUG();
1082         }
1083
1084         LASSERT(!info->mti_big_lmm_used);
1085         rc = __mdt_stripe_get(info, o, ma, name);
1086         /* since big_lmm is always used here, clear 'used' flag to avoid
1087          * assertion in mdt_big_xattr_get().
1088          */
1089         info->mti_big_lmm_used = 0;
1090
1091         return rc;
1092 }
1093
1094 int mdt_attr_get_pfid(struct mdt_thread_info *info, struct mdt_object *o,
1095                       struct lu_fid *pfid)
1096 {
1097         struct lu_buf           *buf = &info->mti_buf;
1098         struct link_ea_header   *leh;
1099         struct link_ea_entry    *lee;
1100         int                      rc;
1101         ENTRY;
1102
1103         buf->lb_buf = info->mti_big_lmm;
1104         buf->lb_len = info->mti_big_lmmsize;
1105         rc = mo_xattr_get(info->mti_env, mdt_object_child(o),
1106                           buf, XATTR_NAME_LINK);
1107         /* ignore errors, MA_PFID won't be set and it is
1108          * up to the caller to treat this as an error */
1109         if (rc == -ERANGE || buf->lb_len == 0) {
1110                 rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
1111                 buf->lb_buf = info->mti_big_lmm;
1112                 buf->lb_len = info->mti_big_lmmsize;
1113         }
1114
1115         if (rc < 0)
1116                 RETURN(rc);
1117         if (rc < sizeof(*leh)) {
1118                 CERROR("short LinkEA on "DFID": rc = %d\n",
1119                        PFID(mdt_object_fid(o)), rc);
1120                 RETURN(-ENODATA);
1121         }
1122
1123         leh = (struct link_ea_header *) buf->lb_buf;
1124         lee = (struct link_ea_entry *)(leh + 1);
1125         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
1126                 leh->leh_magic = LINK_EA_MAGIC;
1127                 leh->leh_reccount = __swab32(leh->leh_reccount);
1128                 leh->leh_len = __swab64(leh->leh_len);
1129         }
1130         if (leh->leh_magic != LINK_EA_MAGIC)
1131                 RETURN(-EINVAL);
1132         if (leh->leh_reccount == 0)
1133                 RETURN(-ENODATA);
1134
1135         memcpy(pfid, &lee->lee_parent_fid, sizeof(*pfid));
1136         fid_be_to_cpu(pfid, pfid);
1137
1138         RETURN(0);
1139 }
1140
1141 int mdt_attr_get_pfid_name(struct mdt_thread_info *info, struct mdt_object *o,
1142                            struct lu_fid *pfid, struct lu_name *lname)
1143 {
1144         struct lu_buf *buf = &info->mti_buf;
1145         struct link_ea_header *leh;
1146         struct link_ea_entry *lee;
1147         int reclen;
1148         int rc;
1149
1150         buf->lb_buf = info->mti_xattr_buf;
1151         buf->lb_len = sizeof(info->mti_xattr_buf);
1152         rc = mo_xattr_get(info->mti_env, mdt_object_child(o), buf,
1153                           XATTR_NAME_LINK);
1154         if (rc == -ERANGE) {
1155                 rc = mdt_big_xattr_get(info, o, XATTR_NAME_LINK);
1156                 buf->lb_buf = info->mti_big_lmm;
1157                 buf->lb_len = info->mti_big_lmmsize;
1158         }
1159         if (rc < 0)
1160                 return rc;
1161
1162         if (rc < sizeof(*leh)) {
1163                 CERROR("short LinkEA on "DFID": rc = %d\n",
1164                        PFID(mdt_object_fid(o)), rc);
1165                 return -ENODATA;
1166         }
1167
1168         leh = (struct link_ea_header *)buf->lb_buf;
1169         lee = (struct link_ea_entry *)(leh + 1);
1170         if (leh->leh_magic == __swab32(LINK_EA_MAGIC)) {
1171                 leh->leh_magic = LINK_EA_MAGIC;
1172                 leh->leh_reccount = __swab32(leh->leh_reccount);
1173                 leh->leh_len = __swab64(leh->leh_len);
1174         }
1175         if (leh->leh_magic != LINK_EA_MAGIC)
1176                 return -EINVAL;
1177
1178         if (leh->leh_reccount == 0)
1179                 return -ENODATA;
1180
1181         linkea_entry_unpack(lee, &reclen, lname, pfid);
1182
1183         return 0;
1184 }
1185
1186 int mdt_attr_get_complex(struct mdt_thread_info *info,
1187                          struct mdt_object *o, struct md_attr *ma)
1188 {
1189         const struct lu_env *env = info->mti_env;
1190         struct md_object    *next = mdt_object_child(o);
1191         struct lu_buf       *buf = &info->mti_buf;
1192         int                  need = ma->ma_need;
1193         int                  rc = 0, rc2;
1194         u32                  mode;
1195         ENTRY;
1196
1197         ma->ma_valid = 0;
1198
1199         if (mdt_object_exists(o) == 0)
1200                 GOTO(out, rc = -ENOENT);
1201         mode = lu_object_attr(&next->mo_lu);
1202
1203         if (need & MA_INODE) {
1204                 ma->ma_need = MA_INODE;
1205                 rc = mo_attr_get(env, next, ma);
1206                 if (rc)
1207                         GOTO(out, rc);
1208
1209                 if (S_ISREG(mode))
1210                         (void) mdt_get_som(info, o, ma);
1211                 ma->ma_valid |= MA_INODE;
1212         }
1213
1214         if (need & MA_PFID) {
1215                 rc = mdt_attr_get_pfid(info, o, &ma->ma_pfid);
1216                 if (rc == 0)
1217                         ma->ma_valid |= MA_PFID;
1218                 /* ignore this error, parent fid is not mandatory */
1219                 rc = 0;
1220         }
1221
1222         if (need & MA_LOV && (S_ISREG(mode) || S_ISDIR(mode))) {
1223                 rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LOV);
1224                 if (rc)
1225                         GOTO(out, rc);
1226         }
1227
1228         if (need & MA_LMV && S_ISDIR(mode)) {
1229                 rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_LMV);
1230                 if (rc != 0)
1231                         GOTO(out, rc);
1232         }
1233
1234         if (need & MA_LMV_DEF && S_ISDIR(mode)) {
1235                 rc = __mdt_stripe_get(info, o, ma, XATTR_NAME_DEFAULT_LMV);
1236                 if (rc != 0)
1237                         GOTO(out, rc);
1238         }
1239
1240         /*
1241          * In the handle of MA_INODE, we may already get the SOM attr.
1242          */
1243         if (need & MA_SOM && S_ISREG(mode) && !(ma->ma_valid & MA_SOM)) {
1244                 rc = mdt_get_som(info, o, ma);
1245                 if (rc != 0)
1246                         GOTO(out, rc);
1247         }
1248
1249         if (need & MA_HSM && S_ISREG(mode)) {
1250                 buf->lb_buf = info->mti_xattr_buf;
1251                 buf->lb_len = sizeof(info->mti_xattr_buf);
1252                 BUILD_BUG_ON(sizeof(struct hsm_attrs) >
1253                              sizeof(info->mti_xattr_buf));
1254                 rc2 = mo_xattr_get(info->mti_env, next, buf, XATTR_NAME_HSM);
1255                 rc2 = lustre_buf2hsm(info->mti_xattr_buf, rc2, &ma->ma_hsm);
1256                 if (rc2 == 0)
1257                         ma->ma_valid |= MA_HSM;
1258                 else if (rc2 < 0 && rc2 != -ENODATA)
1259                         GOTO(out, rc = rc2);
1260         }
1261
1262 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
1263         if (need & MA_ACL_DEF && S_ISDIR(mode)) {
1264                 buf->lb_buf = ma->ma_acl;
1265                 buf->lb_len = ma->ma_acl_size;
1266                 rc2 = mo_xattr_get(env, next, buf, XATTR_NAME_ACL_DEFAULT);
1267                 if (rc2 > 0) {
1268                         ma->ma_acl_size = rc2;
1269                         ma->ma_valid |= MA_ACL_DEF;
1270                 } else if (rc2 == -ENODATA) {
1271                         /* no ACLs */
1272                         ma->ma_acl_size = 0;
1273                 } else
1274                         GOTO(out, rc = rc2);
1275         }
1276 #endif
1277 out:
1278         ma->ma_need = need;
1279         CDEBUG(D_INODE, "after getattr rc = %d, ma_valid = %#llx ma_lmm=%p\n",
1280                rc, ma->ma_valid, ma->ma_lmm);
1281         RETURN(rc);
1282 }
1283
1284 static int mdt_getattr_internal(struct mdt_thread_info *info,
1285                                 struct mdt_object *o, int ma_need)
1286 {
1287         struct mdt_device *mdt = info->mti_mdt;
1288         struct md_object *next = mdt_object_child(o);
1289         const struct mdt_body *reqbody = info->mti_body;
1290         struct ptlrpc_request *req = mdt_info_req(info);
1291         struct md_attr *ma = &info->mti_attr;
1292         struct lu_attr *la = &ma->ma_attr;
1293         struct req_capsule *pill = info->mti_pill;
1294         const struct lu_env *env = info->mti_env;
1295         struct mdt_body *repbody;
1296         struct lu_buf *buffer = &info->mti_buf;
1297         struct obd_export *exp = info->mti_exp;
1298         int rc;
1299
1300         ENTRY;
1301
1302         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_GETATTR_PACK))
1303                 RETURN(err_serious(-ENOMEM));
1304
1305         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
1306
1307         ma->ma_valid = 0;
1308
1309         if (mdt_object_remote(o)) {
1310                 /* This object is located on remote node.*/
1311                 /* Return -ENOTSUPP for old client */
1312                 if (!mdt_is_dne_client(req->rq_export))
1313                         GOTO(out, rc = -ENOTSUPP);
1314
1315                 repbody->mbo_fid1 = *mdt_object_fid(o);
1316                 repbody->mbo_valid = OBD_MD_FLID | OBD_MD_MDS;
1317                 GOTO(out, rc = 0);
1318         }
1319
1320         if (reqbody->mbo_eadatasize > 0) {
1321                 buffer->lb_buf = req_capsule_server_get(pill, &RMF_MDT_MD);
1322                 if (buffer->lb_buf == NULL)
1323                         GOTO(out, rc = -EPROTO);
1324                 buffer->lb_len = req_capsule_get_size(pill, &RMF_MDT_MD,
1325                                                       RCL_SERVER);
1326         } else {
1327                 buffer->lb_buf = NULL;
1328                 buffer->lb_len = 0;
1329                 ma_need &= ~(MA_LOV | MA_LMV);
1330                 CDEBUG(D_INFO, "%s: RPC from %s: does not need LOVEA.\n",
1331                        mdt_obd_name(info->mti_mdt),
1332                        req->rq_export->exp_client_uuid.uuid);
1333         }
1334
1335         /* from 2.12.58 intent_getattr pack default LMV in reply */
1336         if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
1337             ((reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA)) ==
1338                     (OBD_MD_MEA | OBD_MD_DEFAULT_MEA)) &&
1339             req_capsule_has_field(&req->rq_pill, &RMF_DEFAULT_MDT_MD,
1340                                   RCL_SERVER)) {
1341                 ma->ma_lmv = buffer->lb_buf;
1342                 ma->ma_lmv_size = buffer->lb_len;
1343                 ma->ma_default_lmv = req_capsule_server_get(pill,
1344                                                 &RMF_DEFAULT_MDT_MD);
1345                 ma->ma_default_lmv_size = req_capsule_get_size(pill,
1346                                                 &RMF_DEFAULT_MDT_MD,
1347                                                 RCL_SERVER);
1348                 ma->ma_need = MA_INODE;
1349                 if (ma->ma_lmv_size > 0)
1350                         ma->ma_need |= MA_LMV;
1351                 if (ma->ma_default_lmv_size > 0)
1352                         ma->ma_need |= MA_LMV_DEF;
1353         } else if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
1354                    (reqbody->mbo_valid & (OBD_MD_MEA | OBD_MD_DEFAULT_MEA))) {
1355                 /* If it is dir and client require MEA, then we got MEA */
1356                 /* Assumption: MDT_MD size is enough for lmv size. */
1357                 ma->ma_lmv = buffer->lb_buf;
1358                 ma->ma_lmv_size = buffer->lb_len;
1359                 ma->ma_need = MA_INODE;
1360                 if (ma->ma_lmv_size > 0) {
1361                         if (reqbody->mbo_valid & OBD_MD_MEA) {
1362                                 ma->ma_need |= MA_LMV;
1363                         } else if (reqbody->mbo_valid & OBD_MD_DEFAULT_MEA) {
1364                                 ma->ma_need |= MA_LMV_DEF;
1365                                 ma->ma_default_lmv = buffer->lb_buf;
1366                                 ma->ma_lmv = NULL;
1367                                 ma->ma_default_lmv_size = buffer->lb_len;
1368                                 ma->ma_lmv_size = 0;
1369                         }
1370                 }
1371         } else {
1372                 ma->ma_lmm = buffer->lb_buf;
1373                 ma->ma_lmm_size = buffer->lb_len;
1374                 ma->ma_need = MA_INODE | MA_HSM;
1375                 if (ma->ma_lmm_size > 0) {
1376                         ma->ma_need |= MA_LOV;
1377                         /* Older clients may crash if they getattr overstriped
1378                          * files
1379                          */
1380                         if (!exp_connect_overstriping(exp) &&
1381                             mdt_lmm_is_overstriping(ma->ma_lmm))
1382                                 RETURN(-EOPNOTSUPP);
1383                 }
1384         }
1385
1386         if (S_ISDIR(lu_object_attr(&next->mo_lu)) &&
1387             reqbody->mbo_valid & OBD_MD_FLDIREA  &&
1388             lustre_msg_get_opc(req->rq_reqmsg) == MDS_GETATTR) {
1389                 /* get default stripe info for this dir. */
1390                 ma->ma_need |= MA_LOV_DEF;
1391         }
1392         ma->ma_need |= ma_need;
1393
1394         rc = mdt_attr_get_complex(info, o, ma);
1395         if (unlikely(rc)) {
1396                 CDEBUG(rc == -ENOENT ? D_OTHER : D_ERROR,
1397                        "%s: getattr error for "DFID": rc = %d\n",
1398                        mdt_obd_name(info->mti_mdt),
1399                        PFID(mdt_object_fid(o)), rc);
1400                 RETURN(rc);
1401         }
1402
1403         /* if file is released, check if a restore is running */
1404         if (ma->ma_valid & MA_HSM) {
1405                 repbody->mbo_valid |= OBD_MD_TSTATE;
1406                 if ((ma->ma_hsm.mh_flags & HS_RELEASED) &&
1407                     mdt_hsm_restore_is_running(info, mdt_object_fid(o)))
1408                         repbody->mbo_t_state = MS_RESTORE;
1409         }
1410
1411         if (unlikely(!(ma->ma_valid & MA_INODE)))
1412                 RETURN(-EFAULT);
1413
1414         mdt_pack_attr2body(info, repbody, la, mdt_object_fid(o));
1415
1416         if (mdt_body_has_lov(la, reqbody)) {
1417                 u32 stripe_count = 1;
1418
1419                 if (ma->ma_valid & MA_LOV) {
1420                         LASSERT(ma->ma_lmm_size);
1421                         repbody->mbo_eadatasize = ma->ma_lmm_size;
1422                         if (S_ISDIR(la->la_mode))
1423                                 repbody->mbo_valid |= OBD_MD_FLDIREA;
1424                         else
1425                                 repbody->mbo_valid |= OBD_MD_FLEASIZE;
1426                         mdt_dump_lmm(D_INFO, ma->ma_lmm, repbody->mbo_valid);
1427                 }
1428                 if (ma->ma_valid & MA_LMV) {
1429                         struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1430                         u32 magic = le32_to_cpu(lmv->lmv_magic);
1431
1432                         /* Return -ENOTSUPP for old client */
1433                         if (!mdt_is_striped_client(req->rq_export))
1434                                 RETURN(-ENOTSUPP);
1435
1436                         LASSERT(S_ISDIR(la->la_mode));
1437                         mdt_dump_lmv(D_INFO, ma->ma_lmv);
1438                         repbody->mbo_eadatasize = ma->ma_lmv_size;
1439                         repbody->mbo_valid |= (OBD_MD_FLDIREA|OBD_MD_MEA);
1440
1441                         stripe_count = le32_to_cpu(lmv->lmv_stripe_count);
1442                         if (magic == LMV_MAGIC_STRIPE && lmv_is_restriping(lmv))
1443                                 mdt_restripe_migrate_add(info, o);
1444                         else if (magic == LMV_MAGIC_V1 &&
1445                                  lmv_is_restriping(lmv))
1446                                 mdt_restripe_update_add(info, o);
1447                 }
1448                 if (ma->ma_valid & MA_LMV_DEF) {
1449                         /* Return -ENOTSUPP for old client */
1450                         if (!mdt_is_striped_client(req->rq_export))
1451                                 RETURN(-ENOTSUPP);
1452                         LASSERT(S_ISDIR(la->la_mode));
1453                         /*
1454                          * when ll_dir_getstripe() gets default LMV, it
1455                          * checks mbo_eadatasize.
1456                          */
1457                         if (!(ma->ma_valid & MA_LMV))
1458                                 repbody->mbo_eadatasize =
1459                                         ma->ma_default_lmv_size;
1460                         repbody->mbo_valid |= (OBD_MD_FLDIREA |
1461                                                OBD_MD_DEFAULT_MEA);
1462                 }
1463                 CDEBUG(D_VFSTRACE,
1464                        "dirent count %llu stripe count %u MDT count %d\n",
1465                        ma->ma_attr.la_dirent_count, stripe_count,
1466                        atomic_read(&mdt->mdt_mds_mds_conns) + 1);
1467                 if (ma->ma_attr.la_dirent_count != LU_DIRENT_COUNT_UNSET &&
1468                     ma->ma_attr.la_dirent_count >
1469                         mdt->mdt_restriper.mdr_dir_split_count &&
1470                     !fid_is_root(mdt_object_fid(o)) &&
1471                     mdt->mdt_enable_dir_auto_split &&
1472                     !o->mot_restriping &&
1473                     stripe_count < atomic_read(&mdt->mdt_mds_mds_conns) + 1)
1474                         mdt_auto_split_add(info, o);
1475         } else if (S_ISLNK(la->la_mode) &&
1476                    reqbody->mbo_valid & OBD_MD_LINKNAME) {
1477                 buffer->lb_buf = ma->ma_lmm;
1478                 /* eadatasize from client includes NULL-terminator, so
1479                  * there is no need to read it */
1480                 buffer->lb_len = reqbody->mbo_eadatasize - 1;
1481                 rc = mo_readlink(env, next, buffer);
1482                 if (unlikely(rc <= 0)) {
1483                         CERROR("%s: readlink failed for "DFID": rc = %d\n",
1484                                mdt_obd_name(info->mti_mdt),
1485                                PFID(mdt_object_fid(o)), rc);
1486                         rc = -EFAULT;
1487                 } else {
1488                         int print_limit = min_t(int, PAGE_SIZE - 128, rc);
1489
1490                         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READLINK_EPROTO))
1491                                 rc -= 2;
1492                         repbody->mbo_valid |= OBD_MD_LINKNAME;
1493                         /* we need to report back size with NULL-terminator
1494                          * because client expects that */
1495                         repbody->mbo_eadatasize = rc + 1;
1496                         if (repbody->mbo_eadatasize != reqbody->mbo_eadatasize)
1497                                 CDEBUG(D_INODE, "%s: Read shorter symlink %d "
1498                                        "on "DFID ", expected %d\n",
1499                                        mdt_obd_name(info->mti_mdt),
1500                                        rc, PFID(mdt_object_fid(o)),
1501                                        reqbody->mbo_eadatasize - 1);
1502                         /* NULL terminate */
1503                         ((char *)ma->ma_lmm)[rc] = 0;
1504
1505                         /* If the total CDEBUG() size is larger than a page, it
1506                          * will print a warning to the console, avoid this by
1507                          * printing just the last part of the symlink. */
1508                         CDEBUG(D_INODE, "symlink dest %s%.*s, len = %d\n",
1509                                print_limit < rc ? "..." : "", print_limit,
1510                                (char *)ma->ma_lmm + rc - print_limit, rc);
1511                         rc = 0;
1512                 }
1513         }
1514
1515         if (reqbody->mbo_valid & OBD_MD_FLMODEASIZE) {
1516                 repbody->mbo_max_mdsize = info->mti_mdt->mdt_max_mdsize;
1517                 repbody->mbo_valid |= OBD_MD_FLMODEASIZE;
1518                 CDEBUG(D_INODE, "changing the max MD size to %u\n",
1519                        repbody->mbo_max_mdsize);
1520         }
1521
1522 #ifdef CONFIG_LUSTRE_FS_POSIX_ACL
1523         if ((exp_connect_flags(req->rq_export) & OBD_CONNECT_ACL) &&
1524                  (reqbody->mbo_valid & OBD_MD_FLACL)) {
1525                 struct lu_nodemap *nodemap = nodemap_get_from_exp(exp);
1526                 if (IS_ERR(nodemap))
1527                         RETURN(PTR_ERR(nodemap));
1528
1529                 rc = mdt_pack_acl2body(info, repbody, o, nodemap);
1530                 nodemap_putref(nodemap);
1531         }
1532 #endif
1533
1534 out:
1535         if (rc == 0)
1536                 mdt_counter_incr(req, LPROC_MDT_GETATTR);
1537
1538         RETURN(rc);
1539 }
1540
1541 static int mdt_getattr(struct tgt_session_info *tsi)
1542 {
1543         struct mdt_thread_info  *info = tsi2mdt_info(tsi);
1544         struct mdt_object       *obj = info->mti_object;
1545         struct req_capsule      *pill = info->mti_pill;
1546         struct mdt_body         *reqbody;
1547         struct mdt_body         *repbody;
1548         int rc, rc2;
1549         ENTRY;
1550
1551         if (unlikely(info->mti_object == NULL))
1552                 RETURN(-EPROTO);
1553
1554         reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
1555         LASSERT(reqbody);
1556         LASSERT(lu_object_assert_exists(&obj->mot_obj));
1557
1558         /* Special case for Data-on-MDT files to get data version */
1559         if (unlikely(reqbody->mbo_valid & OBD_MD_FLDATAVERSION)) {
1560                 rc = mdt_data_version_get(tsi);
1561                 GOTO(out, rc);
1562         }
1563
1564         /* Unlike intent case where we need to pre-fill out buffers early on
1565          * in intent policy for ldlm reasons, here we can have a much better
1566          * guess at EA size by just reading it from disk.
1567          * Exceptions are readdir and (missing) directory striping */
1568         /* Readlink */
1569         if (reqbody->mbo_valid & OBD_MD_LINKNAME) {
1570                 /* No easy way to know how long is the symlink, but it cannot
1571                  * be more than PATH_MAX, so we allocate +1 */
1572                 rc = PATH_MAX + 1;
1573         /* A special case for fs ROOT: getattr there might fetch
1574          * default EA for entire fs, not just for this dir!
1575          */
1576         } else if (lu_fid_eq(mdt_object_fid(obj),
1577                              &info->mti_mdt->mdt_md_root_fid) &&
1578                    (reqbody->mbo_valid & OBD_MD_FLDIREA) &&
1579                    (lustre_msg_get_opc(mdt_info_req(info)->rq_reqmsg) ==
1580                                                                  MDS_GETATTR)) {
1581                 /* Should the default strping be bigger, mdt_fix_reply
1582                  * will reallocate */
1583                 rc = DEF_REP_MD_SIZE;
1584         } else {
1585                 /* Read the actual EA size from disk */
1586                 rc = mdt_attr_get_eabuf_size(info, obj);
1587         }
1588
1589         if (rc < 0)
1590                 GOTO(out, rc = err_serious(rc));
1591
1592         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, rc);
1593
1594         /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
1595          * by default. If the target object has more ACL entries, then
1596          * enlarge the buffer when necessary. */
1597         req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
1598                              LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
1599
1600         rc = req_capsule_server_pack(pill);
1601         if (unlikely(rc != 0))
1602                 GOTO(out, rc = err_serious(rc));
1603
1604         repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
1605         LASSERT(repbody != NULL);
1606         repbody->mbo_eadatasize = 0;
1607         repbody->mbo_aclsize = 0;
1608
1609         rc = mdt_check_ucred(info);
1610         if (unlikely(rc))
1611                 GOTO(out_shrink, rc);
1612
1613         info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
1614
1615         rc = mdt_getattr_internal(info, obj, 0);
1616         EXIT;
1617 out_shrink:
1618         mdt_client_compatibility(info);
1619         rc2 = mdt_fix_reply(info);
1620         if (rc == 0)
1621                 rc = rc2;
1622 out:
1623         mdt_thread_info_fini(info);
1624         return rc;
1625 }
1626
1627 /**
1628  * Handler of layout intent RPC requiring the layout modification
1629  *
1630  * \param[in]  info     thread environment
1631  * \param[in]  obj      object
1632  * \param[out] lhc      object ldlm lock handle
1633  * \param[in]  layout   layout change descriptor
1634  *
1635  * \retval 0    on success
1636  * \retval < 0  error code
1637  */
1638 int mdt_layout_change(struct mdt_thread_info *info, struct mdt_object *obj,
1639                       struct mdt_lock_handle *lhc,
1640                       struct md_layout_change *layout)
1641 {
1642         int rc;
1643
1644         ENTRY;
1645
1646         if (!mdt_object_exists(obj))
1647                 RETURN(-ENOENT);
1648
1649         if (!S_ISREG(lu_object_attr(&obj->mot_obj)))
1650                 RETURN(-EINVAL);
1651
1652         rc = mo_permission(info->mti_env, NULL, mdt_object_child(obj), NULL,
1653                            MAY_WRITE);
1654         if (rc)
1655                 RETURN(rc);
1656
1657         rc = mdt_check_resent_lock(info, obj, lhc);
1658         if (rc < 0)
1659                 RETURN(rc);
1660
1661         if (rc > 0) {
1662                 /* not resent */
1663                 __u64 lockpart = MDS_INODELOCK_LAYOUT;
1664
1665                 /* take layout lock to prepare layout change */
1666                 if (layout->mlc_opc == MD_LAYOUT_WRITE)
1667                         lockpart |= MDS_INODELOCK_UPDATE;
1668
1669                 mdt_lock_handle_init(lhc);
1670                 mdt_lock_reg_init(lhc, LCK_EX);
1671                 rc = mdt_reint_object_lock(info, obj, lhc, lockpart, false);
1672                 if (rc)
1673                         RETURN(rc);
1674         }
1675
1676         mutex_lock(&obj->mot_som_mutex);
1677         rc = mo_layout_change(info->mti_env, mdt_object_child(obj), layout);
1678         mutex_unlock(&obj->mot_som_mutex);
1679
1680         if (rc)
1681                 mdt_object_unlock(info, obj, lhc, 1);
1682
1683         RETURN(rc);
1684 }
1685
1686 /**
1687  * Exchange MOF_LOV_CREATED flags between two objects after a
1688  * layout swap. No assumption is made on whether o1 or o2 have
1689  * created objects or not.
1690  *
1691  * \param[in,out] o1    First swap layout object
1692  * \param[in,out] o2    Second swap layout object
1693  */
1694 static void mdt_swap_lov_flag(struct mdt_object *o1, struct mdt_object *o2)
1695 {
1696         unsigned int o1_lov_created = o1->mot_lov_created;
1697
1698         mutex_lock(&o1->mot_lov_mutex);
1699         mutex_lock(&o2->mot_lov_mutex);
1700
1701         o1->mot_lov_created = o2->mot_lov_created;
1702         o2->mot_lov_created = o1_lov_created;
1703
1704         mutex_unlock(&o2->mot_lov_mutex);
1705         mutex_unlock(&o1->mot_lov_mutex);
1706 }
1707
1708 static int mdt_swap_layouts(struct tgt_session_info *tsi)
1709 {
1710         struct mdt_thread_info  *info;
1711         struct ptlrpc_request   *req = tgt_ses_req(tsi);
1712         struct obd_export       *exp = req->rq_export;
1713         struct mdt_object       *o1, *o2, *o;
1714         struct mdt_lock_handle  *lh1, *lh2;
1715         struct mdc_swap_layouts *msl;
1716         int                      rc;
1717         ENTRY;
1718
1719         /* client does not support layout lock, so layout swaping
1720          * is disabled.
1721          * FIXME: there is a problem for old clients which don't support
1722          * layout lock yet. If those clients have already opened the file
1723          * they won't be notified at all so that old layout may still be
1724          * used to do IO. This can be fixed after file release is landed by
1725          * doing exclusive open and taking full EX ibits lock. - Jinshan */
1726         if (!exp_connect_layout(exp))
1727                 RETURN(-EOPNOTSUPP);
1728
1729         info = tsi2mdt_info(tsi);
1730         if (unlikely(info->mti_object == NULL))
1731                 RETURN(-EPROTO);
1732
1733         if (info->mti_dlm_req != NULL)
1734                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1735
1736         o1 = info->mti_object;
1737         o = o2 = mdt_object_find(info->mti_env, info->mti_mdt,
1738                                 &info->mti_body->mbo_fid2);
1739         if (IS_ERR(o))
1740                 GOTO(out, rc = PTR_ERR(o));
1741
1742         if (mdt_object_remote(o) || !mdt_object_exists(o)) /* remote object */
1743                 GOTO(put, rc = -ENOENT);
1744
1745         rc = lu_fid_cmp(&info->mti_body->mbo_fid1, &info->mti_body->mbo_fid2);
1746         if (unlikely(rc == 0)) /* same file, you kidding me? no-op. */
1747                 GOTO(put, rc);
1748
1749         if (rc < 0)
1750                 swap(o1, o2);
1751
1752         /* permission check. Make sure the calling process having permission
1753          * to write both files. */
1754         rc = mo_permission(info->mti_env, NULL, mdt_object_child(o1), NULL,
1755                            MAY_WRITE);
1756         if (rc < 0)
1757                 GOTO(put, rc);
1758
1759         rc = mo_permission(info->mti_env, NULL, mdt_object_child(o2), NULL,
1760                            MAY_WRITE);
1761         if (rc < 0)
1762                 GOTO(put, rc);
1763
1764         msl = req_capsule_client_get(info->mti_pill, &RMF_SWAP_LAYOUTS);
1765         if (msl == NULL)
1766                 GOTO(put, rc = -EPROTO);
1767
1768         lh1 = &info->mti_lh[MDT_LH_NEW];
1769         mdt_lock_reg_init(lh1, LCK_EX);
1770         lh2 = &info->mti_lh[MDT_LH_OLD];
1771         mdt_lock_reg_init(lh2, LCK_EX);
1772
1773         rc = mdt_object_lock(info, o1, lh1, MDS_INODELOCK_LAYOUT |
1774                              MDS_INODELOCK_XATTR);
1775         if (rc < 0)
1776                 GOTO(put, rc);
1777
1778         rc = mdt_object_lock(info, o2, lh2, MDS_INODELOCK_LAYOUT |
1779                              MDS_INODELOCK_XATTR);
1780         if (rc < 0)
1781                 GOTO(unlock1, rc);
1782
1783         rc = mo_swap_layouts(info->mti_env, mdt_object_child(o1),
1784                              mdt_object_child(o2), msl->msl_flags);
1785         if (rc < 0)
1786                 GOTO(unlock2, rc);
1787
1788         mdt_swap_lov_flag(o1, o2);
1789
1790 unlock2:
1791         mdt_object_unlock(info, o2, lh2, rc);
1792 unlock1:
1793         mdt_object_unlock(info, o1, lh1, rc);
1794 put:
1795         mdt_object_put(info->mti_env, o);
1796 out:
1797         mdt_thread_info_fini(info);
1798         RETURN(rc);
1799 }
1800
1801 static int mdt_raw_lookup(struct mdt_thread_info *info,
1802                           struct mdt_object *parent,
1803                           const struct lu_name *lname,
1804                           struct ldlm_reply *ldlm_rep)
1805 {
1806         struct lu_fid   *child_fid = &info->mti_tmp_fid1;
1807         int              rc;
1808         ENTRY;
1809
1810         LASSERT(!info->mti_cross_ref);
1811
1812         /* Only got the fid of this obj by name */
1813         fid_zero(child_fid);
1814         rc = mdo_lookup(info->mti_env, mdt_object_child(info->mti_object),
1815                         lname, child_fid, &info->mti_spec);
1816         if (rc == 0) {
1817                 struct mdt_body *repbody;
1818
1819                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1820                 repbody->mbo_fid1 = *child_fid;
1821                 repbody->mbo_valid = OBD_MD_FLID;
1822                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
1823         } else if (rc == -ENOENT) {
1824                 mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
1825         }
1826
1827         RETURN(rc);
1828 }
1829
1830 /*
1831  * UPDATE lock should be taken against parent, and be released before exit;
1832  * child_bits lock should be taken against child, and be returned back:
1833  *            (1)normal request should release the child lock;
1834  *            (2)intent request will grant the lock to client.
1835  */
1836 static int mdt_getattr_name_lock(struct mdt_thread_info *info,
1837                                  struct mdt_lock_handle *lhc,
1838                                  __u64 child_bits,
1839                                  struct ldlm_reply *ldlm_rep)
1840 {
1841         struct ptlrpc_request *req = mdt_info_req(info);
1842         struct mdt_body *reqbody = NULL;
1843         struct mdt_object *parent = info->mti_object;
1844         struct mdt_object *child = NULL;
1845         struct lu_fid *child_fid = &info->mti_tmp_fid1;
1846         struct lu_name *lname = NULL;
1847         struct mdt_lock_handle *lhp = NULL;
1848         struct ldlm_lock *lock;
1849         struct req_capsule *pill = info->mti_pill;
1850         __u64 try_bits = 0;
1851         bool is_resent;
1852         int ma_need = 0;
1853         int rc;
1854
1855         ENTRY;
1856
1857         is_resent = lustre_handle_is_used(&lhc->mlh_reg_lh);
1858         LASSERT(ergo(is_resent,
1859                      lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT));
1860
1861         if (parent == NULL)
1862                 RETURN(-ENOENT);
1863
1864         if (info->mti_cross_ref) {
1865                 /* Only getattr on the child. Parent is on another node. */
1866                 mdt_set_disposition(info, ldlm_rep,
1867                                     DISP_LOOKUP_EXECD | DISP_LOOKUP_POS);
1868                 child = parent;
1869                 CDEBUG(D_INODE, "partial getattr_name child_fid = "DFID", "
1870                        "ldlm_rep = %p\n",
1871                        PFID(mdt_object_fid(child)), ldlm_rep);
1872
1873                 rc = mdt_check_resent_lock(info, child, lhc);
1874                 if (rc < 0) {
1875                         RETURN(rc);
1876                 } else if (rc > 0) {
1877                         mdt_lock_handle_init(lhc);
1878                         mdt_lock_reg_init(lhc, LCK_PR);
1879
1880                         /*
1881                          * Object's name entry is on another MDS, it will
1882                          * request PERM lock only because LOOKUP lock is owned
1883                          * by the MDS where name entry resides.
1884                          *
1885                          * TODO: it should try layout lock too. - Jinshan
1886                          */
1887                         child_bits &= ~(MDS_INODELOCK_LOOKUP |
1888                                         MDS_INODELOCK_LAYOUT);
1889                         child_bits |= MDS_INODELOCK_PERM;
1890
1891                         rc = mdt_object_lock(info, child, lhc, child_bits);
1892                         if (rc < 0)
1893                                 RETURN(rc);
1894                 }
1895
1896                 /* Finally, we can get attr for child. */
1897                 if (!mdt_object_exists(child)) {
1898                         LU_OBJECT_DEBUG(D_INFO, info->mti_env,
1899                                         &child->mot_obj,
1900                                         "remote object doesn't exist.");
1901                         mdt_object_unlock(info, child, lhc, 1);
1902                         RETURN(-ENOENT);
1903                 }
1904
1905                 rc = mdt_getattr_internal(info, child, 0);
1906                 if (unlikely(rc != 0)) {
1907                         mdt_object_unlock(info, child, lhc, 1);
1908                         RETURN(rc);
1909                 }
1910
1911                 rc = mdt_pack_secctx_in_reply(info, child);
1912                 if (unlikely(rc))
1913                         mdt_object_unlock(info, child, lhc, 1);
1914                 RETURN(rc);
1915         }
1916
1917         lname = &info->mti_name;
1918         mdt_name_unpack(pill, &RMF_NAME, lname, MNF_FIX_ANON);
1919
1920         if (lu_name_is_valid(lname)) {
1921                 if (mdt_object_remote(parent)) {
1922                         CERROR("%s: parent "DFID" is on remote target\n",
1923                                mdt_obd_name(info->mti_mdt),
1924                                PFID(mdt_object_fid(parent)));
1925                         RETURN(-EPROTO);
1926                 }
1927
1928                 CDEBUG(D_INODE, "getattr with lock for "DFID"/"DNAME", "
1929                        "ldlm_rep = %p\n", PFID(mdt_object_fid(parent)),
1930                        PNAME(lname), ldlm_rep);
1931         } else {
1932                 reqbody = req_capsule_client_get(pill, &RMF_MDT_BODY);
1933                 if (unlikely(reqbody == NULL))
1934                         RETURN(err_serious(-EPROTO));
1935
1936                 *child_fid = reqbody->mbo_fid2;
1937                 if (unlikely(!fid_is_sane(child_fid)))
1938                         RETURN(err_serious(-EINVAL));
1939
1940                 if (lu_fid_eq(mdt_object_fid(parent), child_fid)) {
1941                         mdt_object_get(info->mti_env, parent);
1942                         child = parent;
1943                 } else {
1944                         child = mdt_object_find(info->mti_env, info->mti_mdt,
1945                                                 child_fid);
1946                         if (IS_ERR(child))
1947                                 RETURN(PTR_ERR(child));
1948                 }
1949
1950                 if (mdt_object_remote(child)) {
1951                         CERROR("%s: child "DFID" is on remote target\n",
1952                                mdt_obd_name(info->mti_mdt),
1953                                PFID(mdt_object_fid(child)));
1954                         GOTO(out_child, rc = -EPROTO);
1955                 }
1956
1957                 /* don't fetch LOOKUP lock if it's remote object */
1958                 rc = mdt_is_remote_object(info, parent, child);
1959                 if (rc < 0)
1960                         GOTO(out_child, rc);
1961                 if (rc)
1962                         child_bits &= ~MDS_INODELOCK_LOOKUP;
1963
1964                 CDEBUG(D_INODE, "getattr with lock for "DFID"/"DFID", "
1965                        "ldlm_rep = %p\n",
1966                        PFID(mdt_object_fid(parent)),
1967                        PFID(&reqbody->mbo_fid2), ldlm_rep);
1968         }
1969
1970         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_EXECD);
1971
1972         if (unlikely(!mdt_object_exists(parent)) && lu_name_is_valid(lname)) {
1973                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
1974                                 &parent->mot_obj,
1975                                 "Parent doesn't exist!");
1976                 GOTO(out_child, rc = -ESTALE);
1977         }
1978
1979         if (lu_name_is_valid(lname)) {
1980                 /* Always allow to lookup ".." */
1981                 if (unlikely(lname->ln_namelen == 2 &&
1982                              lname->ln_name[0] == '.' &&
1983                              lname->ln_name[1] == '.'))
1984                         info->mti_spec.sp_permitted = 1;
1985
1986                 if (info->mti_body->mbo_valid == OBD_MD_FLID) {
1987                         rc = mdt_raw_lookup(info, parent, lname, ldlm_rep);
1988
1989                         RETURN(rc);
1990                 }
1991
1992                 /* step 1: lock parent only if parent is a directory */
1993                 if (S_ISDIR(lu_object_attr(&parent->mot_obj))) {
1994                         lhp = &info->mti_lh[MDT_LH_PARENT];
1995                         mdt_lock_pdo_init(lhp, LCK_PR, lname);
1996                         rc = mdt_object_lock(info, parent, lhp,
1997                                              MDS_INODELOCK_UPDATE);
1998                         if (unlikely(rc != 0))
1999                                 RETURN(rc);
2000                 }
2001
2002                 /* step 2: lookup child's fid by name */
2003                 fid_zero(child_fid);
2004                 rc = mdo_lookup(info->mti_env, mdt_object_child(parent), lname,
2005                                 child_fid, &info->mti_spec);
2006                 if (rc == -ENOENT)
2007                         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_NEG);
2008
2009                 if (rc != 0)
2010                         GOTO(unlock_parent, rc);
2011
2012                 child = mdt_object_find(info->mti_env, info->mti_mdt,
2013                                         child_fid);
2014                 if (unlikely(IS_ERR(child)))
2015                         GOTO(unlock_parent, rc = PTR_ERR(child));
2016         }
2017
2018         mdt_set_disposition(info, ldlm_rep, DISP_LOOKUP_POS);
2019
2020         /* step 3: lock child regardless if it is local or remote. */
2021         LASSERT(child);
2022
2023         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RESEND, obd_timeout * 2);
2024         if (!mdt_object_exists(child)) {
2025                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2026                                 &child->mot_obj,
2027                                 "Object doesn't exist!");
2028                 GOTO(out_child, rc = -ENOENT);
2029         }
2030
2031         rc = mdt_check_resent_lock(info, child, lhc);
2032         if (rc < 0) {
2033                 GOTO(out_child, rc);
2034         } else if (rc > 0) {
2035                 mdt_lock_handle_init(lhc);
2036                 mdt_lock_reg_init(lhc, LCK_PR);
2037
2038                 if (!(child_bits & MDS_INODELOCK_UPDATE) &&
2039                     !mdt_object_remote(child)) {
2040                         struct md_attr *ma = &info->mti_attr;
2041
2042                         ma->ma_valid = 0;
2043                         ma->ma_need = MA_INODE;
2044                         rc = mdt_attr_get_complex(info, child, ma);
2045                         if (unlikely(rc != 0))
2046                                 GOTO(out_child, rc);
2047
2048                         /* If the file has not been changed for some time, we
2049                          * return not only a LOOKUP lock, but also an UPDATE
2050                          * lock and this might save us RPC on later STAT. For
2051                          * directories, it also let negative dentry cache start
2052                          * working for this dir. */
2053                         if (ma->ma_valid & MA_INODE &&
2054                             ma->ma_attr.la_valid & LA_CTIME &&
2055                             info->mti_mdt->mdt_namespace->ns_ctime_age_limit +
2056                             ma->ma_attr.la_ctime < ktime_get_real_seconds())
2057                                 child_bits |= MDS_INODELOCK_UPDATE;
2058                 }
2059
2060                 /* layout lock must be granted in a best-effort way
2061                  * for IT operations */
2062                 LASSERT(!(child_bits & MDS_INODELOCK_LAYOUT));
2063                 if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
2064                     !mdt_object_remote(child) && ldlm_rep != NULL) {
2065                         if (!OBD_FAIL_CHECK(OBD_FAIL_MDS_NO_LL_GETATTR) &&
2066                             exp_connect_layout(info->mti_exp)) {
2067                                 /* try to grant layout lock for regular file. */
2068                                 try_bits = MDS_INODELOCK_LAYOUT;
2069                         }
2070                         /* Acquire DOM lock in advance for data-on-mdt file */
2071                         if (child != parent)
2072                                 try_bits |= MDS_INODELOCK_DOM;
2073                 }
2074
2075                 if (try_bits != 0) {
2076                         /* try layout lock, it may fail to be granted due to
2077                          * contention at LOOKUP or UPDATE */
2078                         rc = mdt_object_lock_try(info, child, lhc, &child_bits,
2079                                                  try_bits, false);
2080                         if (child_bits & MDS_INODELOCK_LAYOUT)
2081                                 ma_need |= MA_LOV;
2082                 } else {
2083                         /* Do not enqueue the UPDATE lock from MDT(cross-MDT),
2084                          * client will enqueue the lock to the remote MDT */
2085                         if (mdt_object_remote(child))
2086                                 child_bits &= ~MDS_INODELOCK_UPDATE;
2087                         rc = mdt_object_lock(info, child, lhc, child_bits);
2088                 }
2089                 if (unlikely(rc != 0))
2090                         GOTO(out_child, rc);
2091         }
2092
2093         /* finally, we can get attr for child. */
2094         rc = mdt_getattr_internal(info, child, ma_need);
2095         if (unlikely(rc != 0)) {
2096                 mdt_object_unlock(info, child, lhc, 1);
2097                 GOTO(out_child, rc);
2098         }
2099
2100         rc = mdt_pack_secctx_in_reply(info, child);
2101         if (unlikely(rc)) {
2102                 mdt_object_unlock(info, child, lhc, 1);
2103                 GOTO(out_child, rc);
2104         }
2105
2106         lock = ldlm_handle2lock(&lhc->mlh_reg_lh);
2107         if (lock) {
2108                 /* Debugging code. */
2109                 LDLM_DEBUG(lock, "Returning lock to client");
2110                 LASSERTF(fid_res_name_eq(mdt_object_fid(child),
2111                                          &lock->l_resource->lr_name),
2112                          "Lock res_id: "DLDLMRES", fid: "DFID"\n",
2113                          PLDLMRES(lock->l_resource),
2114                          PFID(mdt_object_fid(child)));
2115
2116                 if (S_ISREG(lu_object_attr(&child->mot_obj)) &&
2117                     !mdt_object_remote(child) && child != parent) {
2118                         mdt_object_put(info->mti_env, child);
2119                         rc = mdt_pack_size2body(info, child_fid,
2120                                                 &lhc->mlh_reg_lh);
2121                         if (rc != 0 && child_bits & MDS_INODELOCK_DOM) {
2122                                 /* DOM lock was taken in advance but this is
2123                                  * not DoM file. Drop the lock.
2124                                  */
2125                                 lock_res_and_lock(lock);
2126                                 ldlm_inodebits_drop(lock, MDS_INODELOCK_DOM);
2127                                 unlock_res_and_lock(lock);
2128                         }
2129                         LDLM_LOCK_PUT(lock);
2130                         GOTO(unlock_parent, rc = 0);
2131                 }
2132                 LDLM_LOCK_PUT(lock);
2133         }
2134
2135         EXIT;
2136 out_child:
2137         if (child)
2138                 mdt_object_put(info->mti_env, child);
2139 unlock_parent:
2140         if (lhp)
2141                 mdt_object_unlock(info, parent, lhp, 1);
2142         return rc;
2143 }
2144
2145 /* normal handler: should release the child lock */
2146 static int mdt_getattr_name(struct tgt_session_info *tsi)
2147 {
2148         struct mdt_thread_info  *info = tsi2mdt_info(tsi);
2149         struct mdt_lock_handle *lhc = &info->mti_lh[MDT_LH_CHILD];
2150         struct mdt_body        *reqbody;
2151         struct mdt_body        *repbody;
2152         int rc, rc2;
2153         ENTRY;
2154
2155         reqbody = req_capsule_client_get(info->mti_pill, &RMF_MDT_BODY);
2156         LASSERT(reqbody != NULL);
2157         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2158         LASSERT(repbody != NULL);
2159
2160         info->mti_cross_ref = !!(reqbody->mbo_valid & OBD_MD_FLCROSSREF);
2161         repbody->mbo_eadatasize = 0;
2162         repbody->mbo_aclsize = 0;
2163
2164         rc = mdt_init_ucred_intent_getattr(info, reqbody);
2165         if (unlikely(rc))
2166                 GOTO(out_shrink, rc);
2167
2168         rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL);
2169         if (lustre_handle_is_used(&lhc->mlh_reg_lh)) {
2170                 ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode);
2171                 lhc->mlh_reg_lh.cookie = 0;
2172         }
2173         mdt_exit_ucred(info);
2174         EXIT;
2175 out_shrink:
2176         mdt_client_compatibility(info);
2177         rc2 = mdt_fix_reply(info);
2178         if (rc == 0)
2179                 rc = rc2;
2180         mdt_thread_info_fini(info);
2181         return rc;
2182 }
2183
2184 static int mdt_rmfid_unlink(struct mdt_thread_info *info,
2185                             const struct lu_fid *pfid,
2186                             const struct lu_name *name,
2187                             struct mdt_object *obj, s64 ctime)
2188 {
2189         struct lu_fid *child_fid = &info->mti_tmp_fid1;
2190         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
2191         struct mdt_device *mdt = info->mti_mdt;
2192         struct md_attr *ma = &info->mti_attr;
2193         struct mdt_lock_handle *parent_lh;
2194         struct mdt_lock_handle *child_lh;
2195         struct mdt_object *pobj;
2196         bool cos_incompat = false;
2197         int rc;
2198         ENTRY;
2199
2200         pobj = mdt_object_find(info->mti_env, mdt, pfid);
2201         if (IS_ERR(pobj))
2202                 GOTO(out, rc = PTR_ERR(pobj));
2203
2204         parent_lh = &info->mti_lh[MDT_LH_PARENT];
2205         mdt_lock_pdo_init(parent_lh, LCK_PW, name);
2206         rc = mdt_object_lock(info, pobj, parent_lh, MDS_INODELOCK_UPDATE);
2207         if (rc != 0)
2208                 GOTO(put_parent, rc);
2209
2210         if (mdt_object_remote(pobj))
2211                 cos_incompat = true;
2212
2213         rc = mdo_lookup(info->mti_env, mdt_object_child(pobj),
2214                         name, child_fid, &info->mti_spec);
2215         if (rc != 0)
2216                 GOTO(unlock_parent, rc);
2217
2218         if (!lu_fid_eq(child_fid, mdt_object_fid(obj)))
2219                 GOTO(unlock_parent, rc = -EREMCHG);
2220
2221         child_lh = &info->mti_lh[MDT_LH_CHILD];
2222         mdt_lock_reg_init(child_lh, LCK_EX);
2223         rc = mdt_reint_striped_lock(info, obj, child_lh,
2224                                     MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE,
2225                                     einfo, cos_incompat);
2226         if (rc != 0)
2227                 GOTO(unlock_parent, rc);
2228
2229         if (atomic_read(&obj->mot_open_count)) {
2230                 CDEBUG(D_OTHER, "object "DFID" open, skip\n",
2231                        PFID(mdt_object_fid(obj)));
2232                 GOTO(unlock_child, rc = -EBUSY);
2233         }
2234
2235         ma->ma_need = 0;
2236         ma->ma_valid = MA_INODE;
2237         ma->ma_attr.la_valid = LA_CTIME;
2238         ma->ma_attr.la_ctime = ctime;
2239
2240         mutex_lock(&obj->mot_lov_mutex);
2241
2242         rc = mdo_unlink(info->mti_env, mdt_object_child(pobj),
2243                         mdt_object_child(obj), name, ma, 0);
2244
2245         mutex_unlock(&obj->mot_lov_mutex);
2246
2247 unlock_child:
2248         mdt_reint_striped_unlock(info, obj, child_lh, einfo, 1);
2249 unlock_parent:
2250         mdt_object_unlock(info, pobj, parent_lh, 1);
2251 put_parent:
2252         mdt_object_put(info->mti_env, pobj);
2253 out:
2254         RETURN(rc);
2255 }
2256
2257 static int mdt_rmfid_check_permission(struct mdt_thread_info *info,
2258                                         struct mdt_object *obj)
2259 {
2260         struct lu_ucred *uc = lu_ucred(info->mti_env);
2261         struct md_attr *ma = &info->mti_attr;
2262         struct lu_attr *la = &ma->ma_attr;
2263         int rc = 0;
2264         ENTRY;
2265
2266         ma->ma_need = MA_INODE;
2267         rc = mo_attr_get(info->mti_env, mdt_object_child(obj), ma);
2268         if (rc)
2269                 GOTO(out, rc);
2270
2271         if (la->la_flags & LUSTRE_IMMUTABLE_FL)
2272                         rc = -EACCES;
2273
2274         if (md_capable(uc, CFS_CAP_DAC_OVERRIDE))
2275                 RETURN(0);
2276         if (uc->uc_fsuid == la->la_uid) {
2277                 if ((la->la_mode & S_IWUSR) == 0)
2278                         rc = -EACCES;
2279         } else if (uc->uc_fsgid == la->la_gid) {
2280                 if ((la->la_mode & S_IWGRP) == 0)
2281                         rc = -EACCES;
2282         } else if ((la->la_mode & S_IWOTH) == 0) {
2283                         rc = -EACCES;
2284         }
2285
2286 out:
2287         RETURN(rc);
2288 }
2289
2290 static int mdt_rmfid_one(struct mdt_thread_info *info, struct lu_fid *fid,
2291                          s64 ctime)
2292 {
2293         struct mdt_device *mdt = info->mti_mdt;
2294         struct mdt_object *obj = NULL;
2295         struct linkea_data ldata = { NULL };
2296         struct lu_buf *buf = &info->mti_big_buf;
2297         struct lu_name *name = &info->mti_name;
2298         struct lu_fid *pfid = &info->mti_tmp_fid1;
2299         struct link_ea_header *leh;
2300         struct link_ea_entry *lee;
2301         int reclen, count, rc = 0;
2302         ENTRY;
2303
2304         if (!fid_is_sane(fid))
2305                 GOTO(out, rc = -EINVAL);
2306
2307         if (!fid_is_namespace_visible(fid))
2308                 GOTO(out, rc = -EINVAL);
2309
2310         obj = mdt_object_find(info->mti_env, mdt, fid);
2311         if (IS_ERR(obj))
2312                 GOTO(out, rc = PTR_ERR(obj));
2313
2314         if (mdt_object_remote(obj))
2315                 GOTO(out, rc = -EREMOTE);
2316         if (!mdt_object_exists(obj) || lu_object_is_dying(&obj->mot_header))
2317                 GOTO(out, rc = -ENOENT);
2318
2319         rc = mdt_rmfid_check_permission(info, obj);
2320         if (rc)
2321                 GOTO(out, rc);
2322
2323         /* take LinkEA */
2324         buf = lu_buf_check_and_alloc(buf, PATH_MAX);
2325         if (!buf->lb_buf)
2326                 GOTO(out, rc = -ENOMEM);
2327
2328         ldata.ld_buf = buf;
2329         rc = mdt_links_read(info, obj, &ldata);
2330         if (rc)
2331                 GOTO(out, rc);
2332
2333         leh = buf->lb_buf;
2334         lee = (struct link_ea_entry *)(leh + 1);
2335         for (count = 0; count < leh->leh_reccount; count++) {
2336                 /* remove every hardlink */
2337                 linkea_entry_unpack(lee, &reclen, name, pfid);
2338                 lee = (struct link_ea_entry *) ((char *)lee + reclen);
2339                 rc = mdt_rmfid_unlink(info, pfid, name, obj, ctime);
2340                 if (rc)
2341                         break;
2342         }
2343
2344 out:
2345         if (obj && !IS_ERR(obj))
2346                 mdt_object_put(info->mti_env, obj);
2347         if (info->mti_big_buf.lb_buf)
2348                 lu_buf_free(&info->mti_big_buf);
2349
2350         RETURN(rc);
2351 }
2352
2353 static int mdt_rmfid(struct tgt_session_info *tsi)
2354 {
2355         struct mdt_thread_info *mti = tsi2mdt_info(tsi);
2356         struct mdt_body *reqbody;
2357         struct lu_fid *fids, *rfids;
2358         int bufsize, rc;
2359         __u32 *rcs;
2360         int i, nr;
2361         ENTRY;
2362
2363         reqbody = req_capsule_client_get(tsi->tsi_pill, &RMF_MDT_BODY);
2364         if (reqbody == NULL)
2365                 RETURN(-EPROTO);
2366         bufsize = req_capsule_get_size(tsi->tsi_pill, &RMF_FID_ARRAY,
2367                                        RCL_CLIENT);
2368         nr = bufsize / sizeof(struct lu_fid);
2369         if (nr * sizeof(struct lu_fid) != bufsize)
2370                 RETURN(-EINVAL);
2371         req_capsule_set_size(tsi->tsi_pill, &RMF_RCS,
2372                              RCL_SERVER, nr * sizeof(__u32));
2373         req_capsule_set_size(tsi->tsi_pill, &RMF_FID_ARRAY,
2374                              RCL_SERVER, nr * sizeof(struct lu_fid));
2375         rc = req_capsule_server_pack(tsi->tsi_pill);
2376         if (rc)
2377                 GOTO(out, rc = err_serious(rc));
2378         fids = req_capsule_client_get(tsi->tsi_pill, &RMF_FID_ARRAY);
2379         if (fids == NULL)
2380                 RETURN(-EPROTO);
2381         rcs = req_capsule_server_get(tsi->tsi_pill, &RMF_RCS);
2382         LASSERT(rcs);
2383         rfids = req_capsule_server_get(tsi->tsi_pill, &RMF_FID_ARRAY);
2384         LASSERT(rfids);
2385
2386         mdt_init_ucred(mti, reqbody);
2387         for (i = 0; i < nr; i++) {
2388                 rfids[i] = fids[i];
2389                 rcs[i] = mdt_rmfid_one(mti, fids + i, reqbody->mbo_ctime);
2390         }
2391         mdt_exit_ucred(mti);
2392
2393 out:
2394         RETURN(rc);
2395 }
2396
2397 static int mdt_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2398                          void *karg, void __user *uarg);
2399
2400 static int mdt_set_info(struct tgt_session_info *tsi)
2401 {
2402         struct ptlrpc_request   *req = tgt_ses_req(tsi);
2403         char                    *key;
2404         void                    *val;
2405         int                      keylen, vallen, rc = 0;
2406
2407         ENTRY;
2408
2409         key = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_KEY);
2410         if (key == NULL) {
2411                 DEBUG_REQ(D_HA, req, "no set_info key");
2412                 RETURN(err_serious(-EFAULT));
2413         }
2414
2415         keylen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_KEY,
2416                                       RCL_CLIENT);
2417
2418         val = req_capsule_client_get(tsi->tsi_pill, &RMF_SETINFO_VAL);
2419         if (val == NULL) {
2420                 DEBUG_REQ(D_HA, req, "no set_info val");
2421                 RETURN(err_serious(-EFAULT));
2422         }
2423
2424         vallen = req_capsule_get_size(tsi->tsi_pill, &RMF_SETINFO_VAL,
2425                                       RCL_CLIENT);
2426
2427         /* Swab any part of val you need to here */
2428         if (KEY_IS(KEY_READ_ONLY)) {
2429                 spin_lock(&req->rq_export->exp_lock);
2430                 if (*(__u32 *)val)
2431                         *exp_connect_flags_ptr(req->rq_export) |=
2432                                 OBD_CONNECT_RDONLY;
2433                 else
2434                         *exp_connect_flags_ptr(req->rq_export) &=
2435                                 ~OBD_CONNECT_RDONLY;
2436                 spin_unlock(&req->rq_export->exp_lock);
2437         } else if (KEY_IS(KEY_CHANGELOG_CLEAR)) {
2438                 struct changelog_setinfo *cs = val;
2439
2440                 if (vallen != sizeof(*cs)) {
2441                         CERROR("%s: bad changelog_clear setinfo size %d\n",
2442                                tgt_name(tsi->tsi_tgt), vallen);
2443                         RETURN(-EINVAL);
2444                 }
2445                 if (ptlrpc_req_need_swab(req)) {
2446                         __swab64s(&cs->cs_recno);
2447                         __swab32s(&cs->cs_id);
2448                 }
2449
2450                 if (!mdt_is_rootadmin(tsi2mdt_info(tsi)))
2451                         RETURN(-EACCES);
2452                 rc = mdt_iocontrol(OBD_IOC_CHANGELOG_CLEAR, req->rq_export,
2453                                    vallen, val, NULL);
2454         } else if (KEY_IS(KEY_EVICT_BY_NID)) {
2455                 if (vallen > 0)
2456                         obd_export_evict_by_nid(req->rq_export->exp_obd, val);
2457         } else {
2458                 RETURN(-EINVAL);
2459         }
2460         RETURN(rc);
2461 }
2462
2463 static int mdt_readpage(struct tgt_session_info *tsi)
2464 {
2465         struct mdt_thread_info  *info = mdt_th_info(tsi->tsi_env);
2466         struct mdt_object       *object = mdt_obj(tsi->tsi_corpus);
2467         struct lu_rdpg          *rdpg = &info->mti_u.rdpg.mti_rdpg;
2468         const struct mdt_body   *reqbody = tsi->tsi_mdt_body;
2469         struct mdt_body         *repbody;
2470         int                      rc;
2471         int                      i;
2472
2473         ENTRY;
2474
2475         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_READPAGE_PACK))
2476                 RETURN(err_serious(-ENOMEM));
2477
2478         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_MDT_BODY);
2479         if (repbody == NULL || reqbody == NULL)
2480                 RETURN(err_serious(-EFAULT));
2481
2482         /*
2483          * prepare @rdpg before calling lower layers and transfer itself. Here
2484          * reqbody->size contains offset of where to start to read and
2485          * reqbody->nlink contains number bytes to read.
2486          */
2487         rdpg->rp_hash = reqbody->mbo_size;
2488         if (rdpg->rp_hash != reqbody->mbo_size) {
2489                 CERROR("Invalid hash: %#llx != %#llx\n",
2490                        rdpg->rp_hash, reqbody->mbo_size);
2491                 RETURN(-EFAULT);
2492         }
2493
2494         rdpg->rp_attrs = reqbody->mbo_mode;
2495         if (exp_connect_flags(tsi->tsi_exp) & OBD_CONNECT_64BITHASH)
2496                 rdpg->rp_attrs |= LUDA_64BITHASH;
2497         rdpg->rp_count  = min_t(unsigned int, reqbody->mbo_nlink,
2498                                 exp_max_brw_size(tsi->tsi_exp));
2499         rdpg->rp_npages = (rdpg->rp_count + PAGE_SIZE - 1) >>
2500                           PAGE_SHIFT;
2501         OBD_ALLOC_PTR_ARRAY(rdpg->rp_pages, rdpg->rp_npages);
2502         if (rdpg->rp_pages == NULL)
2503                 RETURN(-ENOMEM);
2504
2505         for (i = 0; i < rdpg->rp_npages; ++i) {
2506                 rdpg->rp_pages[i] = alloc_page(GFP_NOFS);
2507                 if (rdpg->rp_pages[i] == NULL)
2508                         GOTO(free_rdpg, rc = -ENOMEM);
2509         }
2510
2511         /* call lower layers to fill allocated pages with directory data */
2512         rc = mo_readpage(tsi->tsi_env, mdt_object_child(object), rdpg);
2513         if (rc < 0)
2514                 GOTO(free_rdpg, rc);
2515
2516         /* send pages to client */
2517         rc = tgt_sendpage(tsi, rdpg, rc);
2518
2519         EXIT;
2520 free_rdpg:
2521
2522         for (i = 0; i < rdpg->rp_npages; i++)
2523                 if (rdpg->rp_pages[i] != NULL)
2524                         __free_page(rdpg->rp_pages[i]);
2525         OBD_FREE_PTR_ARRAY(rdpg->rp_pages, rdpg->rp_npages);
2526
2527         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SENDPAGE))
2528                 RETURN(0);
2529
2530         return rc;
2531 }
2532
2533 static int mdt_fix_attr_ucred(struct mdt_thread_info *info, __u32 op)
2534 {
2535         struct lu_ucred *uc = mdt_ucred_check(info);
2536         struct lu_attr *attr = &info->mti_attr.ma_attr;
2537
2538         if (uc == NULL)
2539                 return -EINVAL;
2540
2541         if (op != REINT_SETATTR) {
2542                 if ((attr->la_valid & LA_UID) && (attr->la_uid != -1))
2543                         attr->la_uid = uc->uc_fsuid;
2544                 /* for S_ISGID, inherit gid from his parent, such work will be
2545                  * done in cmm/mdd layer, here set all cases as uc->uc_fsgid. */
2546                 if ((attr->la_valid & LA_GID) && (attr->la_gid != -1))
2547                         attr->la_gid = uc->uc_fsgid;
2548         }
2549
2550         return 0;
2551 }
2552
2553 static inline bool mdt_is_readonly_open(struct mdt_thread_info *info, __u32 op)
2554 {
2555         return op == REINT_OPEN &&
2556              !(info->mti_spec.sp_cr_flags & (MDS_FMODE_WRITE | MDS_OPEN_CREAT));
2557 }
2558
2559 static void mdt_preset_secctx_size(struct mdt_thread_info *info)
2560 {
2561         struct req_capsule *pill = info->mti_pill;
2562
2563         if (req_capsule_has_field(pill, &RMF_FILE_SECCTX,
2564                                   RCL_SERVER) &&
2565             req_capsule_has_field(pill, &RMF_FILE_SECCTX_NAME,
2566                                   RCL_CLIENT)) {
2567                 if (req_capsule_get_size(pill, &RMF_FILE_SECCTX_NAME,
2568                                          RCL_CLIENT) != 0) {
2569                         /* pre-set size in server part with max size */
2570                         req_capsule_set_size(pill, &RMF_FILE_SECCTX,
2571                                              RCL_SERVER,
2572                                              info->mti_mdt->mdt_max_ea_size);
2573                 } else {
2574                         req_capsule_set_size(pill, &RMF_FILE_SECCTX,
2575                                              RCL_SERVER, 0);
2576                 }
2577         }
2578
2579 }
2580
2581 static int mdt_reint_internal(struct mdt_thread_info *info,
2582                               struct mdt_lock_handle *lhc,
2583                               __u32 op)
2584 {
2585         struct req_capsule      *pill = info->mti_pill;
2586         struct mdt_body         *repbody;
2587         int                      rc = 0, rc2;
2588
2589         ENTRY;
2590
2591         rc = mdt_reint_unpack(info, op);
2592         if (rc != 0) {
2593                 CERROR("Can't unpack reint, rc %d\n", rc);
2594                 RETURN(err_serious(rc));
2595         }
2596
2597
2598         /* check if the file system is set to readonly. O_RDONLY open
2599          * is still allowed even the file system is set to readonly mode */
2600         if (mdt_rdonly(info->mti_exp) && !mdt_is_readonly_open(info, op))
2601                 RETURN(err_serious(-EROFS));
2602
2603         /* for replay (no_create) lmm is not needed, client has it already */
2604         if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
2605                 req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER,
2606                                      DEF_REP_MD_SIZE);
2607
2608         /* llog cookies are always 0, the field is kept for compatibility */
2609         if (req_capsule_has_field(pill, &RMF_LOGCOOKIES, RCL_SERVER))
2610                 req_capsule_set_size(pill, &RMF_LOGCOOKIES, RCL_SERVER, 0);
2611
2612         /* Set ACL reply buffer size as LUSTRE_POSIX_ACL_MAX_SIZE_OLD
2613          * by default. If the target object has more ACL entries, then
2614          * enlarge the buffer when necessary. */
2615         if (req_capsule_has_field(pill, &RMF_ACL, RCL_SERVER))
2616                 req_capsule_set_size(pill, &RMF_ACL, RCL_SERVER,
2617                                      LUSTRE_POSIX_ACL_MAX_SIZE_OLD);
2618
2619         mdt_preset_secctx_size(info);
2620
2621         rc = req_capsule_server_pack(pill);
2622         if (rc != 0) {
2623                 CERROR("Can't pack response, rc %d\n", rc);
2624                 RETURN(err_serious(rc));
2625         }
2626
2627         if (req_capsule_has_field(pill, &RMF_MDT_BODY, RCL_SERVER)) {
2628                 repbody = req_capsule_server_get(pill, &RMF_MDT_BODY);
2629                 LASSERT(repbody);
2630                 repbody->mbo_eadatasize = 0;
2631                 repbody->mbo_aclsize = 0;
2632         }
2633
2634         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_REINT_DELAY, 10);
2635
2636         /* for replay no cookkie / lmm need, because client have this already */
2637         if (info->mti_spec.no_create)
2638                 if (req_capsule_has_field(pill, &RMF_MDT_MD, RCL_SERVER))
2639                         req_capsule_set_size(pill, &RMF_MDT_MD, RCL_SERVER, 0);
2640
2641         rc = mdt_init_ucred_reint(info);
2642         if (rc)
2643                 GOTO(out_shrink, rc);
2644
2645         rc = mdt_fix_attr_ucred(info, op);
2646         if (rc != 0)
2647                 GOTO(out_ucred, rc = err_serious(rc));
2648
2649         rc = mdt_check_resent(info, mdt_reconstruct, lhc);
2650         if (rc < 0) {
2651                 GOTO(out_ucred, rc);
2652         } else if (rc == 1) {
2653                 DEBUG_REQ(D_INODE, mdt_info_req(info), "resent opt");
2654                 rc = lustre_msg_get_status(mdt_info_req(info)->rq_repmsg);
2655                 GOTO(out_ucred, rc);
2656         }
2657         rc = mdt_reint_rec(info, lhc);
2658         EXIT;
2659 out_ucred:
2660         mdt_exit_ucred(info);
2661 out_shrink:
2662         mdt_client_compatibility(info);
2663
2664         rc2 = mdt_fix_reply(info);
2665         if (rc == 0)
2666                 rc = rc2;
2667
2668         /*
2669          * Data-on-MDT optimization - read data along with OPEN and return it
2670          * in reply when possible.
2671          */
2672         if (rc == 0 && op == REINT_OPEN && !req_is_replay(pill->rc_req))
2673                 rc = mdt_dom_read_on_open(info, info->mti_mdt,
2674                                           &lhc->mlh_reg_lh);
2675
2676         return rc;
2677 }
2678
2679 static long mdt_reint_opcode(struct ptlrpc_request *req,
2680                              const struct req_format **fmt)
2681 {
2682         struct mdt_device       *mdt;
2683         struct mdt_rec_reint    *rec;
2684         long                     opc;
2685
2686         rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
2687         if (rec != NULL) {
2688                 opc = rec->rr_opcode;
2689                 DEBUG_REQ(D_INODE, req, "reint opt = %ld", opc);
2690                 if (opc < REINT_MAX && fmt[opc] != NULL)
2691                         req_capsule_extend(&req->rq_pill, fmt[opc]);
2692                 else {
2693                         mdt = mdt_exp2dev(req->rq_export);
2694                         CERROR("%s: Unsupported opcode '%ld' from client '%s':"
2695                                " rc = %d\n", req->rq_export->exp_obd->obd_name,
2696                                opc, mdt->mdt_ldlm_client->cli_name, -EFAULT);
2697                         opc = err_serious(-EFAULT);
2698                 }
2699         } else {
2700                 opc = err_serious(-EFAULT);
2701         }
2702         return opc;
2703 }
2704
2705 static int mdt_reint(struct tgt_session_info *tsi)
2706 {
2707         long opc;
2708         int  rc;
2709         static const struct req_format *reint_fmts[REINT_MAX] = {
2710                 [REINT_SETATTR]  = &RQF_MDS_REINT_SETATTR,
2711                 [REINT_CREATE]   = &RQF_MDS_REINT_CREATE,
2712                 [REINT_LINK]     = &RQF_MDS_REINT_LINK,
2713                 [REINT_UNLINK]   = &RQF_MDS_REINT_UNLINK,
2714                 [REINT_RENAME]   = &RQF_MDS_REINT_RENAME,
2715                 [REINT_OPEN]     = &RQF_MDS_REINT_OPEN,
2716                 [REINT_SETXATTR] = &RQF_MDS_REINT_SETXATTR,
2717                 [REINT_RMENTRY]  = &RQF_MDS_REINT_UNLINK,
2718                 [REINT_MIGRATE]  = &RQF_MDS_REINT_MIGRATE,
2719                 [REINT_RESYNC]   = &RQF_MDS_REINT_RESYNC,
2720         };
2721
2722         ENTRY;
2723
2724         opc = mdt_reint_opcode(tgt_ses_req(tsi), reint_fmts);
2725         if (opc >= 0) {
2726                 struct mdt_thread_info *info = tsi2mdt_info(tsi);
2727                 /*
2728                  * No lock possible here from client to pass it to reint code
2729                  * path.
2730                  */
2731                 rc = mdt_reint_internal(info, NULL, opc);
2732                 mdt_thread_info_fini(info);
2733         } else {
2734                 rc = opc;
2735         }
2736
2737         tsi->tsi_reply_fail_id = OBD_FAIL_MDS_REINT_NET_REP;
2738         RETURN(rc);
2739 }
2740
2741 /* this should sync the whole device */
2742 int mdt_device_sync(const struct lu_env *env, struct mdt_device *mdt)
2743 {
2744         struct dt_device *dt = mdt->mdt_bottom;
2745         int rc;
2746         ENTRY;
2747
2748         rc = dt->dd_ops->dt_sync(env, dt);
2749         RETURN(rc);
2750 }
2751
2752 /* this should sync this object */
2753 static int mdt_object_sync(const struct lu_env *env, struct obd_export *exp,
2754                            struct mdt_object *mo)
2755 {
2756         int rc = 0;
2757
2758         ENTRY;
2759
2760         if (!mdt_object_exists(mo)) {
2761                 CWARN("%s: non existing object "DFID": rc = %d\n",
2762                       exp->exp_obd->obd_name, PFID(mdt_object_fid(mo)),
2763                       -ESTALE);
2764                 RETURN(-ESTALE);
2765         }
2766
2767         if (S_ISREG(lu_object_attr(&mo->mot_obj))) {
2768                 struct lu_target *tgt = tgt_ses_info(env)->tsi_tgt;
2769                 dt_obj_version_t version;
2770
2771                 version = dt_version_get(env, mdt_obj2dt(mo));
2772                 if (version > tgt->lut_obd->obd_last_committed)
2773                         rc = mo_object_sync(env, mdt_object_child(mo));
2774         } else {
2775                 rc = mo_object_sync(env, mdt_object_child(mo));
2776         }
2777
2778         RETURN(rc);
2779 }
2780
2781 static int mdt_sync(struct tgt_session_info *tsi)
2782 {
2783         struct ptlrpc_request   *req = tgt_ses_req(tsi);
2784         struct req_capsule      *pill = tsi->tsi_pill;
2785         struct mdt_body         *body;
2786         int                      rc;
2787
2788         ENTRY;
2789
2790         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_SYNC_PACK))
2791                 RETURN(err_serious(-ENOMEM));
2792
2793         if (fid_seq(&tsi->tsi_mdt_body->mbo_fid1) == 0) {
2794                 rc = mdt_device_sync(tsi->tsi_env, mdt_exp2dev(tsi->tsi_exp));
2795         } else {
2796                 struct mdt_thread_info *info = tsi2mdt_info(tsi);
2797
2798                 if (unlikely(info->mti_object == NULL))
2799                         RETURN(-EPROTO);
2800
2801                 /* sync an object */
2802                 rc = mdt_object_sync(tsi->tsi_env, tsi->tsi_exp,
2803                                      info->mti_object);
2804                 if (rc == 0) {
2805                         const struct lu_fid *fid;
2806                         struct lu_attr *la = &info->mti_attr.ma_attr;
2807
2808                         info->mti_attr.ma_need = MA_INODE;
2809                         info->mti_attr.ma_valid = 0;
2810                         rc = mdt_attr_get_complex(info, info->mti_object,
2811                                                   &info->mti_attr);
2812                         if (rc == 0) {
2813                                 body = req_capsule_server_get(pill,
2814                                                               &RMF_MDT_BODY);
2815                                 fid = mdt_object_fid(info->mti_object);
2816                                 mdt_pack_attr2body(info, body, la, fid);
2817                         }
2818                 }
2819                 mdt_thread_info_fini(info);
2820         }
2821         if (rc == 0)
2822                 mdt_counter_incr(req, LPROC_MDT_SYNC);
2823
2824         RETURN(rc);
2825 }
2826
2827 static int mdt_data_sync(struct tgt_session_info *tsi)
2828 {
2829         struct mdt_thread_info *info;
2830         struct mdt_device *mdt = mdt_exp2dev(tsi->tsi_exp);
2831         struct ost_body *body = tsi->tsi_ost_body;
2832         struct ost_body *repbody;
2833         struct mdt_object *mo = NULL;
2834         struct md_attr *ma;
2835         int rc = 0;
2836
2837         ENTRY;
2838
2839         repbody = req_capsule_server_get(tsi->tsi_pill, &RMF_OST_BODY);
2840
2841         /* if no fid is specified then do nothing,
2842          * device sync is done via MDS_SYNC */
2843         if (fid_is_zero(&tsi->tsi_fid))
2844                 RETURN(0);
2845
2846         mo = mdt_object_find(tsi->tsi_env, mdt, &tsi->tsi_fid);
2847         if (IS_ERR(mo))
2848                 RETURN(PTR_ERR(mo));
2849
2850         rc = mdt_object_sync(tsi->tsi_env, tsi->tsi_exp, mo);
2851         if (rc)
2852                 GOTO(put, rc);
2853
2854         repbody->oa.o_oi = body->oa.o_oi;
2855         repbody->oa.o_valid = OBD_MD_FLID | OBD_MD_FLGROUP;
2856
2857         info = tsi2mdt_info(tsi);
2858         ma = &info->mti_attr;
2859         ma->ma_need = MA_INODE;
2860         ma->ma_valid = 0;
2861         rc = mdt_attr_get_complex(info, mo, ma);
2862         if (rc == 0)
2863                 obdo_from_la(&repbody->oa, &ma->ma_attr, VALID_FLAGS);
2864         else
2865                 rc = 0;
2866         mdt_thread_info_fini(info);
2867
2868         EXIT;
2869 put:
2870         if (mo != NULL)
2871                 mdt_object_put(tsi->tsi_env, mo);
2872         return rc;
2873 }
2874
2875 /*
2876  * Handle quota control requests to consult current usage/limit, but also
2877  * to configure quota enforcement
2878  */
2879 static int mdt_quotactl(struct tgt_session_info *tsi)
2880 {
2881         struct obd_export *exp  = tsi->tsi_exp;
2882         struct req_capsule *pill = tsi->tsi_pill;
2883         struct obd_quotactl *oqctl, *repoqc;
2884         int id, rc;
2885         struct mdt_device *mdt = mdt_exp2dev(exp);
2886         struct lu_device *qmt = mdt->mdt_qmt_dev;
2887         struct lu_nodemap *nodemap;
2888         ENTRY;
2889
2890         oqctl = req_capsule_client_get(pill, &RMF_OBD_QUOTACTL);
2891         if (!oqctl)
2892                 RETURN(err_serious(-EPROTO));
2893
2894         rc = req_capsule_server_pack(pill);
2895         if (rc)
2896                 RETURN(err_serious(rc));
2897
2898         nodemap = nodemap_get_from_exp(exp);
2899         if (IS_ERR(nodemap))
2900                 RETURN(PTR_ERR(nodemap));
2901
2902         switch (oqctl->qc_cmd) {
2903                 /* master quotactl */
2904         case Q_SETINFO:
2905         case Q_SETQUOTA:
2906         case LUSTRE_Q_SETDEFAULT:
2907         case LUSTRE_Q_SETQUOTAPOOL:
2908         case LUSTRE_Q_SETINFOPOOL:
2909                 if (!nodemap_can_setquota(nodemap))
2910                         GOTO(out_nodemap, rc = -EPERM);
2911                 /* fallthrough */
2912         case Q_GETINFO:
2913         case Q_GETQUOTA:
2914         case LUSTRE_Q_GETDEFAULT:
2915         case LUSTRE_Q_GETQUOTAPOOL:
2916         case LUSTRE_Q_GETINFOPOOL:
2917                 if (qmt == NULL)
2918                         GOTO(out_nodemap, rc = -EOPNOTSUPP);
2919                 /* slave quotactl */
2920                 /* fallthrough */
2921         case Q_GETOINFO:
2922         case Q_GETOQUOTA:
2923                 break;
2924         default:
2925                 rc = -EFAULT;
2926                 CERROR("%s: unsupported quotactl command %d: rc = %d\n",
2927                        mdt_obd_name(mdt), oqctl->qc_cmd, rc);
2928                 GOTO(out_nodemap, rc);
2929         }
2930
2931         id = oqctl->qc_id;
2932         switch (oqctl->qc_type) {
2933         case USRQUOTA:
2934                 id = nodemap_map_id(nodemap, NODEMAP_UID,
2935                                     NODEMAP_CLIENT_TO_FS, id);
2936                 break;
2937         case GRPQUOTA:
2938                 id = nodemap_map_id(nodemap, NODEMAP_GID,
2939                                     NODEMAP_CLIENT_TO_FS, id);
2940                 break;
2941         case PRJQUOTA:
2942                 /* todo: check/map project id */
2943                 id = oqctl->qc_id;
2944                 break;
2945         default:
2946                 GOTO(out_nodemap, rc = -EOPNOTSUPP);
2947         }
2948         repoqc = req_capsule_server_get(pill, &RMF_OBD_QUOTACTL);
2949         if (repoqc == NULL)
2950                 GOTO(out_nodemap, rc = err_serious(-EFAULT));
2951
2952         if (oqctl->qc_cmd == Q_SETINFO || oqctl->qc_cmd == Q_SETQUOTA)
2953                 barrier_exit(tsi->tsi_tgt->lut_bottom);
2954
2955         if (oqctl->qc_id != id)
2956                 swap(oqctl->qc_id, id);
2957
2958         if (oqctl->qc_cmd == Q_SETINFO || oqctl->qc_cmd == Q_SETQUOTA) {
2959                 if (unlikely(!barrier_entry(tsi->tsi_tgt->lut_bottom)))
2960                         RETURN(-EINPROGRESS);
2961         }
2962
2963         switch (oqctl->qc_cmd) {
2964
2965         case Q_GETINFO:
2966         case Q_SETINFO:
2967         case Q_SETQUOTA:
2968         case Q_GETQUOTA:
2969         case LUSTRE_Q_SETDEFAULT:
2970         case LUSTRE_Q_GETDEFAULT:
2971         case LUSTRE_Q_SETQUOTAPOOL:
2972         case LUSTRE_Q_GETQUOTAPOOL:
2973         case LUSTRE_Q_SETINFOPOOL:
2974         case LUSTRE_Q_GETINFOPOOL:
2975                 /* forward quotactl request to QMT */
2976                 rc = qmt_hdls.qmth_quotactl(tsi->tsi_env, qmt, oqctl);
2977                 break;
2978
2979         case Q_GETOINFO:
2980         case Q_GETOQUOTA:
2981                 /* slave quotactl */
2982                 rc = lquotactl_slv(tsi->tsi_env, tsi->tsi_tgt->lut_bottom,
2983                                    oqctl);
2984                 break;
2985
2986         default:
2987                 CERROR("Unsupported quotactl command: %d\n", oqctl->qc_cmd);
2988                 GOTO(out_nodemap, rc = -EFAULT);
2989         }
2990
2991         if (oqctl->qc_id != id)
2992                 swap(oqctl->qc_id, id);
2993
2994         QCTL_COPY(repoqc, oqctl);
2995         EXIT;
2996
2997 out_nodemap:
2998         nodemap_putref(nodemap);
2999
3000         return rc;
3001 }
3002
3003 /** clone llog ctxt from child (mdd)
3004  * This allows remote llog (replicator) access.
3005  * We can either pass all llog RPCs (eg mdt_llog_create) on to child where the
3006  * context was originally set up, or we can handle them directly.
3007  * I choose the latter, but that means I need any llog
3008  * contexts set up by child to be accessable by the mdt.  So we clone the
3009  * context into our context list here.
3010  */
3011 static int mdt_llog_ctxt_clone(const struct lu_env *env, struct mdt_device *mdt,
3012                                int idx)
3013 {
3014         struct md_device  *next = mdt->mdt_child;
3015         struct llog_ctxt *ctxt;
3016         int rc;
3017
3018         if (!llog_ctxt_null(mdt2obd_dev(mdt), idx))
3019                 return 0;
3020
3021         rc = next->md_ops->mdo_llog_ctxt_get(env, next, idx, (void **)&ctxt);
3022         if (rc || ctxt == NULL) {
3023                 return 0;
3024         }
3025
3026         rc = llog_group_set_ctxt(&mdt2obd_dev(mdt)->obd_olg, ctxt, idx);
3027         if (rc)
3028                 CERROR("Can't set mdt ctxt %d\n", rc);
3029
3030         return rc;
3031 }
3032
3033 static int mdt_llog_ctxt_unclone(const struct lu_env *env,
3034                                  struct mdt_device *mdt, int idx)
3035 {
3036         struct llog_ctxt *ctxt;
3037
3038         ctxt = llog_get_context(mdt2obd_dev(mdt), idx);
3039         if (ctxt == NULL)
3040                 return 0;
3041         /* Put once for the get we just did, and once for the clone */
3042         llog_ctxt_put(ctxt);
3043         llog_ctxt_put(ctxt);
3044         return 0;
3045 }
3046
3047 /*
3048  * sec context handlers
3049  */
3050 static int mdt_sec_ctx_handle(struct tgt_session_info *tsi)
3051 {
3052         CFS_FAIL_TIMEOUT(OBD_FAIL_SEC_CTX_HDL_PAUSE, cfs_fail_val);
3053
3054         return 0;
3055 }
3056
3057 /*
3058  * quota request handlers
3059  */
3060 static int mdt_quota_dqacq(struct tgt_session_info *tsi)
3061 {
3062         struct mdt_device       *mdt = mdt_exp2dev(tsi->tsi_exp);
3063         struct lu_device        *qmt = mdt->mdt_qmt_dev;
3064         int                      rc;
3065         ENTRY;
3066
3067         if (qmt == NULL)
3068                 RETURN(err_serious(-EOPNOTSUPP));
3069
3070         rc = qmt_hdls.qmth_dqacq(tsi->tsi_env, qmt, tgt_ses_req(tsi));
3071         RETURN(rc);
3072 }
3073
3074 struct mdt_object *mdt_object_new(const struct lu_env *env,
3075                                   struct mdt_device *d,
3076                                   const struct lu_fid *f)
3077 {
3078         struct lu_object_conf conf = { .loc_flags = LOC_F_NEW };
3079         struct lu_object *o;
3080         struct mdt_object *m;
3081         ENTRY;
3082
3083         CDEBUG(D_INFO, "Allocate object for "DFID"\n", PFID(f));
3084         o = lu_object_find(env, &d->mdt_lu_dev, f, &conf);
3085         if (unlikely(IS_ERR(o)))
3086                 m = (struct mdt_object *)o;
3087         else
3088                 m = mdt_obj(o);
3089         RETURN(m);
3090 }
3091
3092 struct mdt_object *mdt_object_find(const struct lu_env *env,
3093                                    struct mdt_device *d,
3094                                    const struct lu_fid *f)
3095 {
3096         struct lu_object *o;
3097         struct mdt_object *m;
3098         ENTRY;
3099
3100         CDEBUG(D_INFO, "Find object for "DFID"\n", PFID(f));
3101         o = lu_object_find(env, &d->mdt_lu_dev, f, NULL);
3102         if (unlikely(IS_ERR(o)))
3103                 m = (struct mdt_object *)o;
3104         else
3105                 m = mdt_obj(o);
3106
3107         RETURN(m);
3108 }
3109
3110 /**
3111  * Asyncronous commit for mdt device.
3112  *
3113  * Pass asynchonous commit call down the MDS stack.
3114  *
3115  * \param env environment
3116  * \param mdt the mdt device
3117  */
3118 static void mdt_device_commit_async(const struct lu_env *env,
3119                                     struct mdt_device *mdt)
3120 {
3121         struct dt_device *dt = mdt->mdt_bottom;
3122         int rc;
3123         ENTRY;
3124
3125         rc = dt->dd_ops->dt_commit_async(env, dt);
3126         if (unlikely(rc != 0))
3127                 CWARN("%s: async commit start failed: rc = %d\n",
3128                       mdt_obd_name(mdt), rc);
3129         atomic_inc(&mdt->mdt_async_commit_count);
3130         EXIT;
3131 }
3132
3133 /**
3134  * Mark the lock as "synchonous".
3135  *
3136  * Mark the lock to deffer transaction commit to the unlock time.
3137  *
3138  * \param lock the lock to mark as "synchonous"
3139  *
3140  * \see mdt_is_lock_sync
3141  * \see mdt_save_lock
3142  */
3143 static inline void mdt_set_lock_sync(struct ldlm_lock *lock)
3144 {
3145         lock->l_ast_data = (void*)1;
3146 }
3147
3148 /**
3149  * Check whehter the lock "synchonous" or not.
3150  *
3151  * \param lock the lock to check
3152  * \retval 1 the lock is "synchonous"
3153  * \retval 0 the lock isn't "synchronous"
3154  *
3155  * \see mdt_set_lock_sync
3156  * \see mdt_save_lock
3157  */
3158 static inline int mdt_is_lock_sync(struct ldlm_lock *lock)
3159 {
3160         return lock->l_ast_data != NULL;
3161 }
3162
3163 /**
3164  * Blocking AST for mdt locks.
3165  *
3166  * Starts transaction commit if in case of COS lock conflict or
3167  * deffers such a commit to the mdt_save_lock.
3168  *
3169  * \param lock the lock which blocks a request or cancelling lock
3170  * \param desc unused
3171  * \param data unused
3172  * \param flag indicates whether this cancelling or blocking callback
3173  * \retval 0
3174  * \see ldlm_blocking_ast_nocheck
3175  */
3176 int mdt_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
3177                      void *data, int flag)
3178 {