Whamcloud - gitweb
LU-15527 dne: refactor commit-on-sharing for DNE
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mdt/mdt_reint.c
32  *
33  * Lustre Metadata Target (mdt) reintegration routines
34  *
35  * Author: Peter Braam <braam@clusterfs.com>
36  * Author: Andreas Dilger <adilger@clusterfs.com>
37  * Author: Phil Schwan <phil@clusterfs.com>
38  * Author: Huang Hua <huanghua@clusterfs.com>
39  * Author: Yury Umanets <umka@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_MDS
43
44 #include <lprocfs_status.h>
45 #include "mdt_internal.h"
46 #include <lustre_lmv.h>
47 #include <lustre_crypto.h>
48
49 static inline void mdt_reint_init_ma(struct mdt_thread_info *info,
50                                      struct md_attr *ma)
51 {
52         ma->ma_need = MA_INODE;
53         ma->ma_valid = 0;
54 }
55
56 /**
57  * Get version of object by fid.
58  *
59  * Return real version or ENOENT_VERSION if object doesn't exist
60  */
61 static void mdt_obj_version_get(struct mdt_thread_info *info,
62                                 struct mdt_object *o, __u64 *version)
63 {
64         LASSERT(o);
65
66         if (mdt_object_exists(o) && !mdt_object_remote(o) &&
67             !fid_is_obf(mdt_object_fid(o)))
68                 *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
69         else
70                 *version = ENOENT_VERSION;
71         CDEBUG(D_INODE, "FID "DFID" version is %#llx\n",
72                PFID(mdt_object_fid(o)), *version);
73 }
74
75 /**
76  * Check version is correct.
77  *
78  * Should be called only during replay.
79  */
80 static int mdt_version_check(struct ptlrpc_request *req,
81                              __u64 version, int idx)
82 {
83         __u64 *pre_ver = lustre_msg_get_versions(req->rq_reqmsg);
84
85         ENTRY;
86         if (!exp_connect_vbr(req->rq_export))
87                 RETURN(0);
88
89         LASSERT(req_is_replay(req));
90         /** VBR: version is checked always because costs nothing */
91         LASSERT(idx < PTLRPC_NUM_VERSIONS);
92         /** Sanity check for malformed buffers */
93         if (pre_ver == NULL) {
94                 CERROR("No versions in request buffer\n");
95                 spin_lock(&req->rq_export->exp_lock);
96                 req->rq_export->exp_vbr_failed = 1;
97                 spin_unlock(&req->rq_export->exp_lock);
98                 RETURN(-EOVERFLOW);
99         } else if (pre_ver[idx] != version) {
100                 CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n",
101                        pre_ver[idx], version);
102                 spin_lock(&req->rq_export->exp_lock);
103                 req->rq_export->exp_vbr_failed = 1;
104                 spin_unlock(&req->rq_export->exp_lock);
105                 RETURN(-EOVERFLOW);
106         }
107         RETURN(0);
108 }
109
110 /**
111  * Save pre-versions in reply.
112  */
113 static void mdt_version_save(struct ptlrpc_request *req, __u64 version,
114                              int idx)
115 {
116         __u64 *reply_ver;
117
118         if (!exp_connect_vbr(req->rq_export))
119                 return;
120
121         LASSERT(!req_is_replay(req));
122         LASSERT(req->rq_repmsg != NULL);
123         reply_ver = lustre_msg_get_versions(req->rq_repmsg);
124         if (reply_ver)
125                 reply_ver[idx] = version;
126 }
127
128 /**
129  * Save enoent version, it is needed when it is obvious that object doesn't
130  * exist, e.g. child during create.
131  */
132 static void mdt_enoent_version_save(struct mdt_thread_info *info, int idx)
133 {
134         /* save version of file name for replay, it must be ENOENT here */
135         if (!req_is_replay(mdt_info_req(info))) {
136                 info->mti_ver[idx] = ENOENT_VERSION;
137                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
138         }
139 }
140
141 /**
142  * Get version from disk and save in reply buffer.
143  *
144  * Versions are saved in reply only during normal operations not replays.
145  */
146 void mdt_version_get_save(struct mdt_thread_info *info,
147                           struct mdt_object *mto, int idx)
148 {
149         /* don't save versions during replay */
150         if (!req_is_replay(mdt_info_req(info))) {
151                 mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
152                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
153         }
154 }
155
156 /**
157  * Get version from disk and check it, no save in reply.
158  */
159 int mdt_version_get_check(struct mdt_thread_info *info,
160                           struct mdt_object *mto, int idx)
161 {
162         /* only check versions during replay */
163         if (!req_is_replay(mdt_info_req(info)))
164                 return 0;
165
166         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
167         return mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
168 }
169
170 /**
171  * Get version from disk and check if recovery or just save.
172  */
173 int mdt_version_get_check_save(struct mdt_thread_info *info,
174                                struct mdt_object *mto, int idx)
175 {
176         int rc = 0;
177
178         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
179         if (req_is_replay(mdt_info_req(info)))
180                 rc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx],
181                                        idx);
182         else
183                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
184         return rc;
185 }
186
187 /**
188  * Lookup with version checking.
189  *
190  * This checks version of 'name'. Many reint functions uses 'name' for child not
191  * FID, therefore we need to get object by name and check its version.
192  */
193 int mdt_lookup_version_check(struct mdt_thread_info *info,
194                              struct mdt_object *p,
195                              const struct lu_name *lname,
196                              struct lu_fid *fid, int idx)
197 {
198         int rc, vbrc;
199
200         rc = mdo_lookup(info->mti_env, mdt_object_child(p), lname, fid,
201                         &info->mti_spec);
202         /* Check version only during replay */
203         if (!req_is_replay(mdt_info_req(info)))
204                 return rc;
205
206         info->mti_ver[idx] = ENOENT_VERSION;
207         if (rc == 0) {
208                 struct mdt_object *child;
209
210                 child = mdt_object_find(info->mti_env, info->mti_mdt, fid);
211                 if (likely(!IS_ERR(child))) {
212                         mdt_obj_version_get(info, child, &info->mti_ver[idx]);
213                         mdt_object_put(info->mti_env, child);
214                 }
215         }
216         vbrc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
217         return vbrc ? vbrc : rc;
218
219 }
220
221 static int mdt_stripes_unlock(struct mdt_thread_info *mti,
222                               struct mdt_object *obj,
223                               struct ldlm_enqueue_info *einfo,
224                               int decref)
225 {
226         union ldlm_policy_data *policy = &mti->mti_policy;
227         struct mdt_lock_handle *lh = &mti->mti_lh[MDT_LH_LOCAL];
228         struct lustre_handle_array *locks = einfo->ei_cbdata;
229         int i;
230
231         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
232         LASSERT(locks);
233
234         memset(policy, 0, sizeof(*policy));
235         policy->l_inodebits.bits = einfo->ei_inodebits;
236         mdt_lock_reg_init(lh, einfo->ei_mode);
237         for (i = 0; i < locks->ha_count; i++) {
238                 if (test_bit(i, (void *)locks->ha_map))
239                         lh->mlh_rreg_lh = locks->ha_handles[i];
240                 else
241                         lh->mlh_reg_lh = locks->ha_handles[i];
242                 mdt_object_unlock(mti, NULL, lh, decref);
243                 locks->ha_handles[i].cookie = 0ull;
244         }
245
246         return mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
247                                 policy);
248 }
249
250 /**
251  * Lock slave stripes if necessary, the lock handles of slave stripes
252  * will be stored in einfo->ei_cbdata.
253  **/
254 static int mdt_stripes_lock(struct mdt_thread_info *mti, struct mdt_object *obj,
255                             enum ldlm_mode mode, __u64 ibits,
256                             struct ldlm_enqueue_info *einfo)
257 {
258         union ldlm_policy_data *policy = &mti->mti_policy;
259
260         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
261         einfo->ei_type = LDLM_IBITS;
262         einfo->ei_mode = mode;
263         einfo->ei_cb_bl = mdt_remote_blocking_ast;
264         einfo->ei_cb_local_bl = mdt_blocking_ast;
265         einfo->ei_cb_cp = ldlm_completion_ast;
266         einfo->ei_enq_slave = 1;
267         einfo->ei_namespace = mti->mti_mdt->mdt_namespace;
268         einfo->ei_inodebits = ibits;
269         einfo->ei_req_slot = 1;
270         memset(policy, 0, sizeof(*policy));
271         policy->l_inodebits.bits = ibits;
272         policy->l_inodebits.li_initiator_id = mdt_node_id(mti->mti_mdt);
273
274         return mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
275                               policy);
276 }
277
278 /** lock object, and stripes if it's a striped directory
279  *
280  * object should be local, this is called in operations which modify both object
281  * and stripes.
282  *
283  * \param info          struct mdt_thread_info
284  * \param parent        parent object, if it's NULL, find parent by mdo_lookup()
285  * \param child         child object
286  * \param lh            lock handle
287  * \param einfo         struct ldlm_enqueue_info
288  * \param ibits         MDS inode lock bits
289  * \param mode          lock mode
290  *
291  * \retval              0 on success, -ev on error.
292  */
293 int mdt_object_stripes_lock(struct mdt_thread_info *info,
294                             struct mdt_object *parent,
295                             struct mdt_object *child,
296                             struct mdt_lock_handle *lh,
297                             struct ldlm_enqueue_info *einfo, __u64 ibits,
298                             enum ldlm_mode mode)
299 {
300         int rc;
301
302         ENTRY;
303         /* according to the protocol, child should be local, is request sent to
304          * wrong MDT?
305          */
306         if (mdt_object_remote(child)) {
307                 CERROR("%s: lock target "DFID", but it is on other MDT: rc = %d\n",
308                        mdt_obd_name(info->mti_mdt), PFID(mdt_object_fid(child)),
309                        -EREMOTE);
310                 RETURN(-EREMOTE);
311         }
312
313         memset(einfo, 0, sizeof(*einfo));
314         if (ibits & MDS_INODELOCK_LOOKUP) {
315                 LASSERT(parent);
316                 rc = mdt_object_check_lock(info, parent, child, lh, ibits,
317                                            mode);
318         } else {
319                 rc = mdt_object_lock(info, child, lh, ibits, mode);
320         }
321         if (rc)
322                 RETURN(rc);
323
324         if (!S_ISDIR(child->mot_header.loh_attr))
325                 RETURN(0);
326
327         /* lock stripes for striped directory */
328         rc = mdt_stripes_lock(info, child, lh->mlh_reg_mode, ibits, einfo);
329         if (rc == -EIO && CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME))
330                 rc = 0;
331         if (rc)
332                 mdt_object_unlock(info, child, lh, rc);
333
334         RETURN(rc);
335 }
336
337 void mdt_object_stripes_unlock(struct mdt_thread_info *info,
338                               struct mdt_object *obj,
339                               struct mdt_lock_handle *lh,
340                               struct ldlm_enqueue_info *einfo, int decref)
341 {
342         if (einfo->ei_cbdata)
343                 mdt_stripes_unlock(info, obj, einfo, decref);
344         mdt_object_unlock(info, obj, lh, decref);
345 }
346
347 static int mdt_restripe(struct mdt_thread_info *info,
348                         struct mdt_object *parent,
349                         const struct lu_name *lname,
350                         const struct lu_fid *tfid,
351                         struct md_op_spec *spec,
352                         struct md_attr *ma)
353 {
354         struct mdt_device *mdt = info->mti_mdt;
355         struct lu_fid *fid = &info->mti_tmp_fid2;
356         struct ldlm_enqueue_info *einfo = &info->mti_einfo;
357         struct lmv_user_md *lum = spec->u.sp_ea.eadata;
358         struct lu_ucred *uc = mdt_ucred(info);
359         struct lmv_mds_md_v1 *lmv;
360         struct mdt_object *child;
361         struct mdt_lock_handle *lhp;
362         struct mdt_lock_handle *lhc;
363         struct mdt_body *repbody;
364         int rc;
365
366         ENTRY;
367
368         /* we want rbac roles to have precedence over any other
369          * permission or capability checks
370          */
371         if (!mdt->mdt_enable_dir_restripe && !uc->uc_rbac_dne_ops)
372                 RETURN(-EPERM);
373
374         LASSERT(lum);
375         lum->lum_hash_type |= cpu_to_le32(LMV_HASH_FLAG_FIXED);
376
377         rc = mdt_version_get_check_save(info, parent, 0);
378         if (rc)
379                 RETURN(rc);
380
381         lhp = &info->mti_lh[MDT_LH_PARENT];
382         rc = mdt_parent_lock(info, parent, lhp, lname, LCK_PW);
383         if (rc)
384                 RETURN(rc);
385
386         rc = mdt_stripe_get(info, parent, ma, XATTR_NAME_LMV);
387         if (rc)
388                 GOTO(unlock_parent, rc);
389
390         if (ma->ma_valid & MA_LMV) {
391                 /* don't allow restripe if parent dir layout is changing */
392                 lmv = &ma->ma_lmv->lmv_md_v1;
393                 if (!lmv_is_sane2(lmv))
394                         GOTO(unlock_parent, rc = -EBADF);
395
396                 if (lmv_is_layout_changing(lmv))
397                         GOTO(unlock_parent, rc = -EBUSY);
398         }
399
400         fid_zero(fid);
401         rc = mdt_lookup_version_check(info, parent, lname, fid, 1);
402         if (rc)
403                 GOTO(unlock_parent, rc);
404
405         child = mdt_object_find(info->mti_env, mdt, fid);
406         if (IS_ERR(child))
407                 GOTO(unlock_parent, rc = PTR_ERR(child));
408
409         if (!mdt_object_exists(child))
410                 GOTO(out_child, rc = -ENOENT);
411
412         if (mdt_object_remote(child)) {
413                 struct mdt_body *repbody;
414
415                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
416                 if (!repbody)
417                         GOTO(out_child, rc = -EPROTO);
418
419                 repbody->mbo_fid1 = *fid;
420                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
421                 GOTO(out_child, rc = -EREMOTE);
422         }
423
424         if (!S_ISDIR(lu_object_attr(&child->mot_obj)))
425                 GOTO(out_child, rc = -ENOTDIR);
426
427         rc = mdt_stripe_get(info, child, ma, XATTR_NAME_LMV);
428         if (rc)
429                 GOTO(out_child, rc);
430
431         /* race with migrate? */
432         if ((ma->ma_valid & MA_LMV) &&
433              lmv_is_migrating(&ma->ma_lmv->lmv_md_v1))
434                 GOTO(out_child, rc = -EBUSY);
435
436         /* lock object */
437         lhc = &info->mti_lh[MDT_LH_CHILD];
438         rc = mdt_object_stripes_lock(info, parent, child, lhc, einfo,
439                                      MDS_INODELOCK_FULL, LCK_PW);
440         if (rc)
441                 GOTO(unlock_child, rc);
442
443         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
444         rc = mdt_version_get_check_save(info, child, 1);
445         if (rc)
446                 GOTO(unlock_child, rc);
447
448         spin_lock(&mdt->mdt_restriper.mdr_lock);
449         if (child->mot_restriping) {
450                 /* race? */
451                 spin_unlock(&mdt->mdt_restriper.mdr_lock);
452                 GOTO(unlock_child, rc = -EBUSY);
453         }
454         child->mot_restriping = 1;
455         spin_unlock(&mdt->mdt_restriper.mdr_lock);
456
457         *fid = *tfid;
458         rc = mdt_restripe_internal(info, parent, child, lname, fid, spec, ma);
459         if (rc)
460                 GOTO(restriping_clear, rc);
461
462         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
463         if (!repbody)
464                 GOTO(restriping_clear, rc = -EPROTO);
465
466         mdt_pack_attr2body(info, repbody, &ma->ma_attr, fid);
467         EXIT;
468
469 restriping_clear:
470         child->mot_restriping = 0;
471 unlock_child:
472         mdt_object_stripes_unlock(info, child, lhc, einfo, rc);
473 out_child:
474         mdt_object_put(info->mti_env, child);
475 unlock_parent:
476         mdt_object_unlock(info, parent, lhp, rc);
477
478         return rc;
479 }
480
481 /*
482  * VBR: we save three versions in reply:
483  * 0 - parent. Check that parent version is the same during replay.
484  * 1 - name. Version of 'name' if file exists with the same name or
485  * ENOENT_VERSION, it is needed because file may appear due to missed replays.
486  * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
487  * check.
488  */
489 static int mdt_create(struct mdt_thread_info *info)
490 {
491         struct mdt_device *mdt = info->mti_mdt;
492         struct mdt_object *parent;
493         struct mdt_object *child;
494         struct mdt_lock_handle *lh;
495         struct mdt_body *repbody;
496         struct md_attr *ma = &info->mti_attr;
497         struct mdt_reint_record *rr = &info->mti_rr;
498         struct md_op_spec *spec = &info->mti_spec;
499         struct lu_ucred *uc = mdt_ucred(info);
500         bool restripe = false;
501         int rc;
502
503         ENTRY;
504         DEBUG_REQ(D_INODE, mdt_info_req(info),
505                   "Create ("DNAME"->"DFID") in "DFID,
506                   PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1));
507
508         if (!fid_is_md_operative(rr->rr_fid1))
509                 RETURN(-EPERM);
510
511         /* MDS_OPEN_DEFAULT_LMV means eadata is parent default LMV, which is set
512          * if client maintains inherited default LMV
513          */
514         if (S_ISDIR(ma->ma_attr.la_mode) &&
515             spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0 &&
516             !(spec->sp_cr_flags & MDS_OPEN_DEFAULT_LMV)) {
517                 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
518                 struct obd_export *exp = mdt_info_req(info)->rq_export;
519
520                 /* Only new clients can create remote dir( >= 2.4) and
521                  * striped dir(>= 2.6), old client will return -ENOTSUPP
522                  */
523                 if (!mdt_is_dne_client(exp))
524                         RETURN(-ENOTSUPP);
525
526                 if (le32_to_cpu(lum->lum_stripe_count) > 1) {
527                         if (!mdt_is_striped_client(exp))
528                                 RETURN(-ENOTSUPP);
529
530                         if (!mdt->mdt_enable_striped_dir)
531                                 RETURN(-EPERM);
532                 } else if (!mdt->mdt_enable_remote_dir) {
533                         RETURN(-EPERM);
534                 }
535
536                 if ((!(exp_connect_flags2(exp) & OBD_CONNECT2_CRUSH)) &&
537                     (le32_to_cpu(lum->lum_hash_type) & LMV_HASH_TYPE_MASK) >=
538                     LMV_HASH_TYPE_CRUSH)
539                         RETURN(-EPROTO);
540
541                 /* we want rbac roles to have precedence over any other
542                  * permission or capability checks
543                  */
544                 if (!uc->uc_rbac_dne_ops ||
545                     (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
546                      uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
547                      mdt->mdt_enable_remote_dir_gid != -1))
548                         RETURN(-EPERM);
549
550                 /* restripe if later found dir exists, MDS_OPEN_CREAT means
551                  * this is create only, don't try restripe.
552                  */
553                 if (mdt->mdt_enable_dir_restripe &&
554                     le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT &&
555                     !(spec->sp_cr_flags & MDS_OPEN_CREAT))
556                         restripe = true;
557         }
558
559         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
560
561         parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
562         if (IS_ERR(parent))
563                 RETURN(PTR_ERR(parent));
564
565         if (!mdt_object_exists(parent))
566                 GOTO(put_parent, rc = -ENOENT);
567
568         rc = mdt_check_enc(info, parent);
569         if (rc)
570                 GOTO(put_parent, rc);
571
572         if (!uc->uc_rbac_fscrypt_admin &&
573             parent->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD)
574                 GOTO(put_parent, rc = -EPERM);
575
576         /*
577          * LU-10235: check if name exists locklessly first to avoid massive
578          * lock recalls on existing directories.
579          */
580         rc = mdt_lookup_version_check(info, parent, &rr->rr_name,
581                                       &info->mti_tmp_fid1, 1);
582         if (rc == 0) {
583                 if (!restripe)
584                         GOTO(put_parent, rc = -EEXIST);
585
586                 rc = mdt_restripe(info, parent, &rr->rr_name, rr->rr_fid2, spec,
587                                   ma);
588         }
589
590         /* -ENOENT is expected here */
591         if (rc != -ENOENT)
592                 GOTO(put_parent, rc);
593
594         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PAUSE_CREATE_AFTER_LOOKUP, cfs_fail_val);
595
596         /* save version of file name for replay, it must be ENOENT here */
597         mdt_enoent_version_save(info, 1);
598
599         CFS_RACE(OBD_FAIL_MDS_CREATE_RACE);
600
601         lh = &info->mti_lh[MDT_LH_PARENT];
602         rc = mdt_parent_lock(info, parent, lh, &rr->rr_name, LCK_PW);
603         if (rc)
604                 GOTO(put_parent, rc);
605
606         if (!mdt_object_remote(parent)) {
607                 rc = mdt_version_get_check_save(info, parent, 0);
608                 if (rc)
609                         GOTO(unlock_parent, rc);
610         }
611
612         /*
613          * now repeat the lookup having a LDLM lock on the parent dir,
614          * as another thread could create the same name. notice this
615          * lookup is supposed to hit cache in OSD and be cheap if the
616          * directory is not being modified concurrently.
617          */
618         rc = mdo_lookup(info->mti_env, mdt_object_child(parent), &rr->rr_name,
619                         &info->mti_tmp_fid1, &info->mti_spec);
620         if (unlikely(rc == 0))
621                 GOTO(unlock_parent, rc = -EEXIST);
622
623         child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
624         if (unlikely(IS_ERR(child)))
625                 GOTO(unlock_parent, rc = PTR_ERR(child));
626
627         ma->ma_need = MA_INODE;
628         ma->ma_valid = 0;
629
630         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
631                         OBD_FAIL_MDS_REINT_CREATE_WRITE);
632
633         /* Version of child will be updated on disk. */
634         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
635         rc = mdt_version_get_check_save(info, child, 2);
636         if (rc)
637                 GOTO(put_child, rc);
638
639         if (parent->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD ||
640             (rr->rr_name.ln_namelen == strlen(dot_fscrypt_name) &&
641              strncmp(rr->rr_name.ln_name, dot_fscrypt_name,
642                      rr->rr_name.ln_namelen) == 0))
643                 child->mot_obj.lo_header->loh_attr |= LOHA_FSCRYPT_MD;
644
645         /*
646          * Do not perform lookup sanity check. We know that name does
647          * not exist.
648          */
649         info->mti_spec.sp_cr_lookup = 0;
650         if (mdt_object_remote(parent))
651                 info->mti_spec.sp_cr_lookup = 1;
652         info->mti_spec.sp_feat = &dt_directory_features;
653
654         rc = mdo_create(info->mti_env, mdt_object_child(parent), &rr->rr_name,
655                         mdt_object_child(child), &info->mti_spec, ma);
656         if (rc == 0)
657                 rc = mdt_attr_get_complex(info, child, ma);
658
659         if (rc < 0)
660                 GOTO(put_child, rc);
661
662         /* save child locks to eliminate dependey between 'mkdir a' and
663          * 'mkdir a/b' if b is a remote directory
664          */
665         if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
666                 struct mdt_lock_handle *lhc;
667                 struct ldlm_enqueue_info *einfo = &info->mti_einfo;
668
669                 lhc = &info->mti_lh[MDT_LH_CHILD];
670                 rc = mdt_object_stripes_lock(info, parent, child, lhc, einfo,
671                                              MDS_INODELOCK_UPDATE, LCK_PW);
672                 if (rc)
673                         GOTO(put_child, rc);
674
675                 mdt_object_stripes_unlock(info, child, lhc, einfo, rc);
676         }
677
678         /* Return fid & attr to client. */
679         if (ma->ma_valid & MA_INODE)
680                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
681                                    mdt_object_fid(child));
682         EXIT;
683 put_child:
684         mdt_object_put(info->mti_env, child);
685 unlock_parent:
686         mdt_object_unlock(info, parent, lh, rc);
687 put_parent:
688         mdt_object_put(info->mti_env, parent);
689         return rc;
690 }
691
692 static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
693                         struct md_attr *ma)
694 {
695         struct mdt_lock_handle  *lh;
696         int do_vbr = ma->ma_attr.la_valid &
697                         (LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS);
698         __u64 lockpart = MDS_INODELOCK_UPDATE;
699         struct ldlm_enqueue_info *einfo = &info->mti_einfo;
700         int rc;
701
702         ENTRY;
703         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
704                 lockpart |= MDS_INODELOCK_PERM;
705         /* Clear xattr cache on clients, so the virtual project ID xattr
706          * can get the new project ID
707          */
708         if (ma->ma_attr.la_valid & LA_PROJID)
709                 lockpart |= MDS_INODELOCK_XATTR;
710
711         lh = &info->mti_lh[MDT_LH_PARENT];
712         rc = mdt_object_stripes_lock(info, NULL, mo, lh, einfo, lockpart,
713                                      LCK_PW);
714         if (rc != 0)
715                 RETURN(rc);
716
717         /* all attrs are packed into mti_attr in unpack_setattr */
718         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
719                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
720
721         /* VBR: update version if attr changed are important for recovery */
722         if (do_vbr) {
723                 /* update on-disk version of changed object */
724                 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mo));
725                 rc = mdt_version_get_check_save(info, mo, 0);
726                 if (rc)
727                         GOTO(out_unlock, rc);
728         }
729
730         /* Ensure constant striping during chown(). See LU-2789. */
731         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
732                 mutex_lock(&mo->mot_lov_mutex);
733
734         /* all attrs are packed into mti_attr in unpack_setattr */
735         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
736
737         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
738                 mutex_unlock(&mo->mot_lov_mutex);
739
740         if (rc != 0)
741                 GOTO(out_unlock, rc);
742         mdt_dom_obj_lvb_update(info->mti_env, mo, NULL, false);
743         EXIT;
744 out_unlock:
745         mdt_object_stripes_unlock(info, mo, lh, einfo, rc);
746         return rc;
747 }
748
749 /**
750  * Check HSM flags and add HS_DIRTY flag if relevant.
751  *
752  * A file could be set dirty only if it has a copy in the backend (HS_EXISTS)
753  * and is not RELEASED.
754  */
755 int mdt_add_dirty_flag(struct mdt_thread_info *info, struct mdt_object *mo,
756                         struct md_attr *ma)
757 {
758         struct lu_ucred *uc = mdt_ucred(info);
759         kernel_cap_t cap_saved;
760         int rc;
761
762         ENTRY;
763         /* If the file was modified, add the dirty flag */
764         ma->ma_need = MA_HSM;
765         rc = mdt_attr_get_complex(info, mo, ma);
766         if (rc) {
767                 CERROR("file attribute read error for "DFID": %d.\n",
768                         PFID(mdt_object_fid(mo)), rc);
769                 RETURN(rc);
770         }
771
772         /* If an up2date copy exists in the backend, add dirty flag */
773         if ((ma->ma_valid & MA_HSM) && (ma->ma_hsm.mh_flags & HS_EXISTS)
774             && !(ma->ma_hsm.mh_flags & (HS_DIRTY|HS_RELEASED))) {
775                 ma->ma_hsm.mh_flags |= HS_DIRTY;
776
777                 /* Bump cap so that closes from non-owner writers can
778                  * set the HSM state to dirty.
779                  */
780                 cap_saved = uc->uc_cap;
781                 cap_raise(uc->uc_cap, CAP_FOWNER);
782                 rc = mdt_hsm_attr_set(info, mo, &ma->ma_hsm);
783                 uc->uc_cap = cap_saved;
784                 if (rc)
785                         CERROR("file attribute change error for "DFID": %d\n",
786                                 PFID(mdt_object_fid(mo)), rc);
787         }
788
789         RETURN(rc);
790 }
791
792 static int mdt_reint_setattr(struct mdt_thread_info *info,
793                              struct mdt_lock_handle *lhc)
794 {
795         struct mdt_device *mdt = info->mti_mdt;
796         struct md_attr *ma = &info->mti_attr;
797         struct mdt_reint_record *rr = &info->mti_rr;
798         struct ptlrpc_request *req = mdt_info_req(info);
799         struct mdt_object *mo;
800         struct mdt_body *repbody;
801         ktime_t kstart = ktime_get();
802         int rc;
803
804         ENTRY;
805         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
806                   (unsigned int)ma->ma_attr.la_valid);
807
808         if (info->mti_dlm_req)
809                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
810
811         CFS_RACE(OBD_FAIL_PTLRPC_RESEND_RACE);
812
813         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
814         mo = mdt_object_find(info->mti_env, mdt, rr->rr_fid1);
815         if (IS_ERR(mo))
816                 GOTO(out, rc = PTR_ERR(mo));
817
818         if (!mdt_object_exists(mo))
819                 GOTO(out_put, rc = -ENOENT);
820
821         if (mdt_object_remote(mo))
822                 GOTO(out_put, rc = -EREMOTE);
823
824         ma->ma_enable_chprojid_gid = mdt->mdt_enable_chprojid_gid;
825         /* revoke lease lock if size is going to be changed */
826         if (unlikely(ma->ma_attr.la_valid & LA_SIZE &&
827                      !(ma->ma_attr_flags & MDS_TRUNC_KEEP_LEASE) &&
828                      atomic_read(&mo->mot_lease_count) > 0)) {
829                 down_read(&mo->mot_open_sem);
830
831                 if (atomic_read(&mo->mot_lease_count) > 0) { /* lease exists */
832                         lhc = &info->mti_lh[MDT_LH_LOCAL];
833                         rc = mdt_object_lock(info, mo, lhc, MDS_INODELOCK_OPEN,
834                                              LCK_CW);
835                         if (rc != 0) {
836                                 up_read(&mo->mot_open_sem);
837                                 GOTO(out_put, rc);
838                         }
839
840                         /* revoke lease lock */
841                         mdt_object_unlock(info, mo, lhc, 1);
842                 }
843                 up_read(&mo->mot_open_sem);
844         }
845
846         if (ma->ma_attr.la_valid & LA_SIZE || rr->rr_flags & MRF_OPEN_TRUNC) {
847                 /* Check write access for the O_TRUNC case */
848                 if (mdt_write_read(mo) < 0)
849                         GOTO(out_put, rc = -ETXTBSY);
850
851                 /* LU-10286: compatibility check for FLR.
852                  * Please check the comment in mdt_finish_open() for details
853                  */
854                 if (!exp_connect_flr(info->mti_exp) ||
855                     !exp_connect_overstriping(info->mti_exp)) {
856                         rc = mdt_big_xattr_get(info, mo, XATTR_NAME_LOV);
857                         if (rc < 0 && rc != -ENODATA)
858                                 GOTO(out_put, rc);
859
860                         if (!exp_connect_flr(info->mti_exp)) {
861                                 if (rc > 0 &&
862                                     mdt_lmm_is_flr(info->mti_big_lmm))
863                                         GOTO(out_put, rc = -EOPNOTSUPP);
864                         }
865
866                         if (!exp_connect_overstriping(info->mti_exp)) {
867                                 if (rc > 0 &&
868                                     mdt_lmm_is_overstriping(info->mti_big_lmm))
869                                         GOTO(out_put, rc = -EOPNOTSUPP);
870                         }
871                 }
872
873                 /* For truncate, the file size sent from client
874                  * is believable, but the blocks are incorrect,
875                  * which makes the block size in LSOM attribute
876                  * inconsisent with the real block size.
877                  */
878                 rc = mdt_lsom_update(info, mo, true);
879                 if (rc)
880                         GOTO(out_put, rc);
881         }
882
883         if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
884                 if (ma->ma_valid & MA_LOV)
885                         GOTO(out_put, rc = -EPROTO);
886
887                 /* MDT supports FMD for regular files due to Data-on-MDT */
888                 if (S_ISREG(lu_object_attr(&mo->mot_obj)) &&
889                     ma->ma_attr.la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) {
890                         tgt_fmd_update(info->mti_exp, mdt_object_fid(mo),
891                                        req->rq_xid);
892
893                         if (ma->ma_attr.la_valid & LA_MTIME) {
894                                 rc = mdt_attr_get_pfid(info, mo, &ma->ma_pfid);
895                                 if (!rc)
896                                         ma->ma_valid |= MA_PFID;
897                         }
898                 }
899
900                 rc = mdt_attr_set(info, mo, ma);
901                 if (rc)
902                         GOTO(out_put, rc);
903         } else if ((ma->ma_valid & (MA_LOV | MA_LMV)) &&
904                    (ma->ma_valid & MA_INODE)) {
905                 struct lu_buf *buf = &info->mti_buf;
906                 struct lu_ucred *uc = mdt_ucred(info);
907                 struct mdt_lock_handle *lh;
908                 const char *name;
909
910                 /* reject if either remote or striped dir is disabled */
911                 if (ma->ma_valid & MA_LMV) {
912                         if (!mdt->mdt_enable_remote_dir ||
913                             !mdt->mdt_enable_striped_dir)
914                                 GOTO(out_put, rc = -EPERM);
915
916                         /* we want rbac roles to have precedence over any other
917                          * permission or capability checks
918                          */
919                         if (!uc->uc_rbac_dne_ops ||
920                             (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
921                              uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
922                              mdt->mdt_enable_remote_dir_gid != -1))
923                                 GOTO(out_put, rc = -EPERM);
924                 }
925
926                 if (!S_ISDIR(lu_object_attr(&mo->mot_obj)))
927                         GOTO(out_put, rc = -ENOTDIR);
928
929                 if (ma->ma_attr.la_valid != 0)
930                         GOTO(out_put, rc = -EPROTO);
931
932                 lh = &info->mti_lh[MDT_LH_PARENT];
933                 if (ma->ma_valid & MA_LOV) {
934                         buf->lb_buf = ma->ma_lmm;
935                         buf->lb_len = ma->ma_lmm_size;
936                         name = XATTR_NAME_LOV;
937                         rc = mdt_object_lock(info, mo, lh, MDS_INODELOCK_XATTR,
938                                              LCK_PW);
939                 } else {
940                         buf->lb_buf = &ma->ma_lmv->lmv_user_md;
941                         buf->lb_len = ma->ma_lmv_size;
942                         name = XATTR_NAME_DEFAULT_LMV;
943
944                         if (unlikely(fid_is_root(mdt_object_fid(mo)))) {
945                                 rc = mdt_object_lock(info, mo, lh,
946                                                      MDS_INODELOCK_XATTR |
947                                                      MDS_INODELOCK_LOOKUP,
948                                                      LCK_PW);
949                         } else {
950                                 struct lu_fid *pfid = &info->mti_tmp_fid1;
951                                 struct lu_name *pname = &info->mti_name;
952                                 const char dotdot[] = "..";
953                                 struct mdt_object *pobj;
954
955                                 fid_zero(pfid);
956                                 pname->ln_name = dotdot;
957                                 pname->ln_namelen = sizeof(dotdot);
958                                 rc = mdo_lookup(info->mti_env,
959                                                 mdt_object_child(mo), pname,
960                                                 pfid, NULL);
961                                 if (rc)
962                                         GOTO(out_put, rc);
963
964                                 pobj = mdt_object_find(info->mti_env,
965                                                        info->mti_mdt, pfid);
966                                 if (IS_ERR(pobj))
967                                         GOTO(out_put, rc = PTR_ERR(pobj));
968
969                                 rc = mdt_object_check_lock(info, pobj, mo, lh,
970                                                            MDS_INODELOCK_XATTR |
971                                                            MDS_INODELOCK_LOOKUP,
972                                                            LCK_PW);
973                                 mdt_object_put(info->mti_env, pobj);
974                         }
975                 }
976
977                 if (rc != 0)
978                         GOTO(out_put, rc);
979
980                 rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), buf,
981                                   name, 0);
982
983                 mdt_object_unlock(info, mo, lh, rc);
984                 if (rc)
985                         GOTO(out_put, rc);
986         } else {
987                 GOTO(out_put, rc = -EPROTO);
988         }
989
990         /* If file data is modified, add the dirty flag */
991         if (ma->ma_attr_flags & MDS_DATA_MODIFIED)
992                 rc = mdt_add_dirty_flag(info, mo, ma);
993
994         ma->ma_need = MA_INODE;
995         ma->ma_valid = 0;
996         rc = mdt_attr_get_complex(info, mo, ma);
997         if (rc != 0)
998                 GOTO(out_put, rc);
999
1000         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
1001
1002         EXIT;
1003 out_put:
1004         mdt_object_put(info->mti_env, mo);
1005 out:
1006         if (rc == 0)
1007                 mdt_counter_incr(req, LPROC_MDT_SETATTR,
1008                                  ktime_us_delta(ktime_get(), kstart));
1009
1010         mdt_client_compatibility(info);
1011         return rc;
1012 }
1013
1014 static int mdt_reint_create(struct mdt_thread_info *info,
1015                             struct mdt_lock_handle *lhc)
1016 {
1017         struct ptlrpc_request   *req = mdt_info_req(info);
1018         ktime_t                 kstart = ktime_get();
1019         int                     rc;
1020
1021         ENTRY;
1022         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
1023                 RETURN(err_serious(-ESTALE));
1024
1025         if (info->mti_dlm_req)
1026                 ldlm_request_cancel(mdt_info_req(info),
1027                                     info->mti_dlm_req, 0, LATF_SKIP);
1028
1029         if (!lu_name_is_valid(&info->mti_rr.rr_name))
1030                 RETURN(-EPROTO);
1031
1032         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
1033         case S_IFDIR:
1034         case S_IFREG:
1035         case S_IFLNK:
1036         case S_IFCHR:
1037         case S_IFBLK:
1038         case S_IFIFO:
1039         case S_IFSOCK:
1040                 break;
1041         default:
1042                 CERROR("%s: Unsupported mode %o\n",
1043                        mdt_obd_name(info->mti_mdt),
1044                        info->mti_attr.ma_attr.la_mode);
1045                 RETURN(err_serious(-EOPNOTSUPP));
1046         }
1047
1048         rc = mdt_create(info);
1049         if (rc == 0) {
1050                 if ((info->mti_attr.ma_attr.la_mode & S_IFMT) == S_IFDIR)
1051                         mdt_counter_incr(req, LPROC_MDT_MKDIR,
1052                                          ktime_us_delta(ktime_get(), kstart));
1053                 else
1054                         /* Special file should stay on the same node as parent*/
1055                         mdt_counter_incr(req, LPROC_MDT_MKNOD,
1056                                          ktime_us_delta(ktime_get(), kstart));
1057         }
1058
1059         RETURN(rc);
1060 }
1061
1062 /*
1063  * VBR: save parent version in reply and child version getting by its name.
1064  * Version of child is getting and checking during its lookup. If
1065  */
1066 static int mdt_reint_unlink(struct mdt_thread_info *info,
1067                             struct mdt_lock_handle *lhc)
1068 {
1069         struct mdt_reint_record *rr = &info->mti_rr;
1070         struct ptlrpc_request *req = mdt_info_req(info);
1071         struct md_attr *ma = &info->mti_attr;
1072         struct lu_fid *child_fid = &info->mti_tmp_fid1;
1073         struct mdt_object *mp;
1074         struct mdt_object *mc;
1075         struct mdt_lock_handle *parent_lh;
1076         struct mdt_lock_handle *child_lh;
1077         struct ldlm_enqueue_info *einfo = &info->mti_einfo;
1078         struct lu_ucred *uc  = mdt_ucred(info);
1079         int no_name = 0;
1080         ktime_t kstart = ktime_get();
1081         int rc;
1082
1083         ENTRY;
1084         DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1),
1085                   PNAME(&rr->rr_name));
1086
1087         if (info->mti_dlm_req)
1088                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1089
1090         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
1091                 RETURN(err_serious(-ENOENT));
1092
1093         if (!fid_is_md_operative(rr->rr_fid1))
1094                 RETURN(-EPERM);
1095
1096         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1097         if (IS_ERR(mp))
1098                 RETURN(PTR_ERR(mp));
1099
1100         if (!mdt_object_remote(mp)) {
1101                 rc = mdt_version_get_check_save(info, mp, 0);
1102                 if (rc)
1103                         GOTO(put_parent, rc);
1104         }
1105
1106         if (!uc->uc_rbac_fscrypt_admin &&
1107             mp->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD)
1108                 GOTO(put_parent, rc = -EPERM);
1109
1110         CFS_RACE(OBD_FAIL_MDS_REINT_OPEN);
1111         CFS_RACE(OBD_FAIL_MDS_REINT_OPEN2);
1112         parent_lh = &info->mti_lh[MDT_LH_PARENT];
1113         rc = mdt_parent_lock(info, mp, parent_lh, &rr->rr_name, LCK_PW);
1114         if (rc != 0)
1115                 GOTO(put_parent, rc);
1116
1117         if (info->mti_spec.sp_rm_entry) {
1118                 if (!mdt_is_dne_client(req->rq_export))
1119                         /* Return -ENOTSUPP for old client */
1120                         GOTO(unlock_parent, rc = -ENOTSUPP);
1121
1122                 if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN))
1123                         GOTO(unlock_parent, rc = -EPERM);
1124
1125                 ma->ma_need = MA_INODE;
1126                 ma->ma_valid = 0;
1127                 rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1128                                 NULL, &rr->rr_name, ma, no_name);
1129                 GOTO(unlock_parent, rc);
1130         }
1131
1132         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID) {
1133                 *child_fid = *rr->rr_fid2;
1134         } else {
1135                 /* lookup child object along with version checking */
1136                 fid_zero(child_fid);
1137                 rc = mdt_lookup_version_check(info, mp, &rr->rr_name, child_fid,
1138                                               1);
1139                 if (rc != 0) {
1140                         /* Name might not be able to find during resend of
1141                          * remote unlink, considering following case.
1142                          * dir_A is a remote directory, the name entry of
1143                          * dir_A is on MDT0, the directory is on MDT1,
1144                          *
1145                          * 1. client sends unlink req to MDT1.
1146                          * 2. MDT1 sends name delete update to MDT0.
1147                          * 3. name entry is being deleted in MDT0 synchronously.
1148                          * 4. MDT1 is restarted.
1149                          * 5. client resends unlink req to MDT1. So it can not
1150                          *    find the name entry on MDT0 anymore.
1151                          * In this case, MDT1 only needs to destory the local
1152                          * directory.
1153                          */
1154                         if (mdt_object_remote(mp) && rc == -ENOENT &&
1155                             !fid_is_zero(rr->rr_fid2) &&
1156                             lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
1157                                 no_name = 1;
1158                                 *child_fid = *rr->rr_fid2;
1159                         } else {
1160                                 GOTO(unlock_parent, rc);
1161                         }
1162                 }
1163         }
1164
1165         if (!fid_is_md_operative(child_fid))
1166                 GOTO(unlock_parent, rc = -EPERM);
1167
1168         /* We will lock the child regardless it is local or remote. No harm. */
1169         mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
1170         if (IS_ERR(mc))
1171                 GOTO(unlock_parent, rc = PTR_ERR(mc));
1172
1173         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID) {
1174                 /* In this case, child fid is embedded in the request, and we do
1175                  * not have a proper name as rr_name contains an encoded
1176                  * hash. So find name that matches provided hash.
1177                  */
1178                 if (!find_name_matching_hash(info, &rr->rr_name,
1179                                              NULL, mc))
1180                         GOTO(put_child, rc = -ENOENT);
1181         }
1182
1183         child_lh = &info->mti_lh[MDT_LH_CHILD];
1184         if (mdt_object_remote(mc)) {
1185                 struct mdt_body  *repbody;
1186
1187                 if (!fid_is_zero(rr->rr_fid2)) {
1188                         CDEBUG(D_INFO, "%s: name "DNAME" cannot find "DFID"\n",
1189                                mdt_obd_name(info->mti_mdt),
1190                                PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1191                         GOTO(put_child, rc = -ENOENT);
1192                 }
1193                 CDEBUG(D_INFO, "%s: name "DNAME": "DFID" is on another MDT\n",
1194                        mdt_obd_name(info->mti_mdt),
1195                        PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1196
1197                 if (!mdt_is_dne_client(req->rq_export))
1198                         /* Return -ENOTSUPP for old client */
1199                         GOTO(put_child, rc = -ENOTSUPP);
1200
1201                 /* Revoke the LOOKUP lock of the remote object granted by
1202                  * this MDT. Since the unlink will happen on another MDT,
1203                  * it will release the LOOKUP lock right away. Then What
1204                  * would happen if another client try to grab the LOOKUP
1205                  * lock at the same time with unlink XXX
1206                  */
1207                 rc = mdt_object_lookup_lock(info, NULL, mc, child_lh, LCK_EX);
1208                 if (rc)
1209                         GOTO(put_child, rc);
1210
1211                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1212                 LASSERT(repbody != NULL);
1213                 repbody->mbo_fid1 = *mdt_object_fid(mc);
1214                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
1215                 GOTO(unlock_child, rc = -EREMOTE);
1216         }
1217         /* We used to acquire MDS_INODELOCK_FULL here but we can't do
1218          * this now because a running HSM restore on the child (unlink
1219          * victim) will hold the layout lock. See LU-4002.
1220          */
1221         rc = mdt_object_stripes_lock(info, mp, mc, child_lh, einfo,
1222                                      MDS_INODELOCK_LOOKUP |
1223                                      MDS_INODELOCK_UPDATE, LCK_EX);
1224         if (rc != 0)
1225                 GOTO(put_child, rc);
1226
1227         /*
1228          * Now we can only make sure we need MA_INODE, in mdd layer, will check
1229          * whether need MA_LOV and MA_COOKIE.
1230          */
1231         ma->ma_need = MA_INODE;
1232         ma->ma_valid = 0;
1233
1234         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1235                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
1236         /* save version when object is locked */
1237         mdt_version_get_save(info, mc, 1);
1238
1239         mutex_lock(&mc->mot_lov_mutex);
1240
1241         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1242                         mdt_object_child(mc), &rr->rr_name, ma, no_name);
1243
1244         mutex_unlock(&mc->mot_lov_mutex);
1245         if (rc != 0)
1246                 GOTO(unlock_child, rc);
1247
1248         if (!lu_object_is_dying(&mc->mot_header)) {
1249                 rc = mdt_attr_get_complex(info, mc, ma);
1250                 if (rc)
1251                         GOTO(out_stat, rc);
1252         } else if (mdt_dom_check_for_discard(info, mc)) {
1253                 mdt_dom_discard_data(info, mc);
1254         }
1255         mdt_handle_last_unlink(info, mc, ma);
1256
1257 out_stat:
1258         if (ma->ma_valid & MA_INODE) {
1259                 switch (ma->ma_attr.la_mode & S_IFMT) {
1260                 case S_IFDIR:
1261                         mdt_counter_incr(req, LPROC_MDT_RMDIR,
1262                                          ktime_us_delta(ktime_get(), kstart));
1263                         break;
1264                 case S_IFREG:
1265                 case S_IFLNK:
1266                 case S_IFCHR:
1267                 case S_IFBLK:
1268                 case S_IFIFO:
1269                 case S_IFSOCK:
1270                         mdt_counter_incr(req, LPROC_MDT_UNLINK,
1271                                          ktime_us_delta(ktime_get(), kstart));
1272                         break;
1273                 default:
1274                         LASSERTF(0, "bad file type %o unlinking\n",
1275                                 ma->ma_attr.la_mode);
1276                 }
1277         }
1278
1279         EXIT;
1280
1281 unlock_child:
1282         /* after unlink the object is gone, no need to keep lock */
1283         mdt_object_stripes_unlock(info, mc, child_lh, einfo, 1);
1284 put_child:
1285         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID &&
1286             info->mti_big_buf.lb_buf)
1287                 lu_buf_free(&info->mti_big_buf);
1288         mdt_object_put(info->mti_env, mc);
1289 unlock_parent:
1290         mdt_object_unlock(info, mp, parent_lh, rc);
1291 put_parent:
1292         mdt_object_put(info->mti_env, mp);
1293         CFS_RACE_WAKEUP(OBD_FAIL_OBD_ZERO_NLINK_RACE);
1294         return rc;
1295 }
1296
1297 /*
1298  * VBR: save versions in reply: 0 - parent; 1 - child by fid; 2 - target by
1299  * name.
1300  */
1301 static int mdt_reint_link(struct mdt_thread_info *info,
1302                           struct mdt_lock_handle *lhc)
1303 {
1304         struct mdt_reint_record *rr = &info->mti_rr;
1305         struct ptlrpc_request   *req = mdt_info_req(info);
1306         struct md_attr          *ma = &info->mti_attr;
1307         struct mdt_object       *ms;
1308         struct mdt_object       *mp;
1309         struct mdt_lock_handle  *lhs;
1310         struct mdt_lock_handle  *lhp;
1311         ktime_t kstart = ktime_get();
1312         int rc;
1313
1314         ENTRY;
1315         DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/"DNAME,
1316                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), PNAME(&rr->rr_name));
1317
1318         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
1319                 RETURN(err_serious(-ENOENT));
1320
1321         if (CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_RESEND_RACE) ||
1322             CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND)) {
1323                 req->rq_no_reply = 1;
1324                 RETURN(err_serious(-ENOENT));
1325         }
1326
1327         if (info->mti_dlm_req)
1328                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1329
1330         /* Invalid case so return error immediately instead of
1331          * processing it
1332          */
1333         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
1334                 RETURN(-EPERM);
1335
1336         if (!fid_is_md_operative(rr->rr_fid1) ||
1337             !fid_is_md_operative(rr->rr_fid2))
1338                 RETURN(-EPERM);
1339
1340         /* step 1: find target parent dir */
1341         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
1342         if (IS_ERR(mp))
1343                 RETURN(PTR_ERR(mp));
1344
1345         rc = mdt_version_get_check_save(info, mp, 0);
1346         if (rc)
1347                 GOTO(put_parent, rc);
1348
1349         rc = mdt_check_enc(info, mp);
1350         if (rc)
1351                 GOTO(put_parent, rc);
1352
1353         /* step 2: find source */
1354         ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1355         if (IS_ERR(ms))
1356                 GOTO(put_parent, rc = PTR_ERR(ms));
1357
1358         if (!mdt_object_exists(ms)) {
1359                 CDEBUG(D_INFO, "%s: "DFID" does not exist.\n",
1360                        mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1));
1361                 GOTO(put_source, rc = -ENOENT);
1362         }
1363
1364         CFS_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
1365
1366         lhp = &info->mti_lh[MDT_LH_PARENT];
1367         rc = mdt_parent_lock(info, mp, lhp, &rr->rr_name, LCK_PW);
1368         if (rc != 0)
1369                 GOTO(put_source, rc);
1370
1371         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
1372
1373         lhs = &info->mti_lh[MDT_LH_CHILD];
1374         rc = mdt_object_lock(info, ms, lhs,
1375                              MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
1376                              LCK_EX);
1377         if (rc != 0)
1378                 GOTO(unlock_parent, rc);
1379
1380         /* step 3: link it */
1381         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1382                         OBD_FAIL_MDS_REINT_LINK_WRITE);
1383
1384         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(ms));
1385         rc = mdt_version_get_check_save(info, ms, 1);
1386         if (rc)
1387                 GOTO(unlock_source, rc);
1388
1389         /** check target version by name during replay */
1390         rc = mdt_lookup_version_check(info, mp, &rr->rr_name,
1391                                       &info->mti_tmp_fid1, 2);
1392         if (rc != 0 && rc != -ENOENT)
1393                 GOTO(unlock_source, rc);
1394         /* save version of file name for replay, it must be ENOENT here */
1395         if (!req_is_replay(mdt_info_req(info))) {
1396                 if (rc != -ENOENT) {
1397                         CDEBUG(D_INFO, "link target "DNAME" existed!\n",
1398                                PNAME(&rr->rr_name));
1399                         GOTO(unlock_source, rc = -EEXIST);
1400                 }
1401                 info->mti_ver[2] = ENOENT_VERSION;
1402                 mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2);
1403         }
1404
1405         rc = mdo_link(info->mti_env, mdt_object_child(mp),
1406                       mdt_object_child(ms), &rr->rr_name, ma);
1407
1408         if (rc == 0)
1409                 mdt_counter_incr(req, LPROC_MDT_LINK,
1410                                  ktime_us_delta(ktime_get(), kstart));
1411
1412         EXIT;
1413 unlock_source:
1414         mdt_object_unlock(info, ms, lhs, rc);
1415 unlock_parent:
1416         mdt_object_unlock(info, mp, lhp, rc);
1417 put_source:
1418         mdt_object_put(info->mti_env, ms);
1419 put_parent:
1420         mdt_object_put(info->mti_env, mp);
1421         return rc;
1422 }
1423
1424 /**
1425  * Get BFL lock for rename or migrate process.
1426  **/
1427 static int mdt_rename_lock(struct mdt_thread_info *info,
1428                            struct mdt_lock_handle *lh)
1429 {
1430         struct lu_fid *fid = &info->mti_tmp_fid1;
1431         struct mdt_object *obj;
1432         __u64 ibits = MDS_INODELOCK_UPDATE;
1433         int rc;
1434
1435         ENTRY;
1436         lu_root_fid(fid);
1437         obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1438         if (IS_ERR(obj))
1439                 RETURN(PTR_ERR(obj));
1440
1441         mdt_lock_reg_init(lh, LCK_EX);
1442         rc = mdt_object_lock_internal(info, obj, &LUSTRE_BFL_FID, lh,
1443                                       &ibits, 0, false);
1444         mdt_object_put(info->mti_env, obj);
1445         RETURN(rc);
1446 }
1447
1448 static void mdt_rename_unlock(struct mdt_thread_info *info,
1449                               struct mdt_lock_handle *lh)
1450 {
1451         ENTRY;
1452         /* Cancel the single rename lock right away */
1453         mdt_object_unlock(info, NULL, lh, 1);
1454         EXIT;
1455 }
1456
1457 static struct mdt_object *mdt_parent_find_check(struct mdt_thread_info *info,
1458                                                 const struct lu_fid *fid,
1459                                                 int idx)
1460 {
1461         struct mdt_object *dir;
1462         int rc;
1463
1464         ENTRY;
1465         dir = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1466         if (IS_ERR(dir))
1467                 RETURN(dir);
1468
1469         /* check early, the real version will be saved after locking */
1470         rc = mdt_version_get_check(info, dir, idx);
1471         if (rc)
1472                 GOTO(out_put, rc);
1473
1474         if (!mdt_object_exists(dir))
1475                 GOTO(out_put, rc = -ENOENT);
1476
1477         if (!S_ISDIR(lu_object_attr(&dir->mot_obj)))
1478                 GOTO(out_put, rc = -ENOTDIR);
1479
1480         RETURN(dir);
1481 out_put:
1482         mdt_object_put(info->mti_env, dir);
1483         return ERR_PTR(rc);
1484 }
1485
1486 /*
1487  * lock rename source object.
1488  *
1489  * Both source and its parent object may be located on remote MDTs, and even on
1490  * different MDTs, which means source object is a remote object on parent.
1491  *
1492  * \retval      0 on success
1493  * \retval      -ev negative errno upon error
1494  */
1495 static int mdt_rename_source_lock(struct mdt_thread_info *info,
1496                                   struct mdt_object *parent,
1497                                   struct mdt_object *child,
1498                                   struct mdt_lock_handle *lh,
1499                                   struct mdt_lock_handle *lh_lookup,
1500                                   __u64 ibits)
1501 {
1502         int rc;
1503
1504         LASSERT(ibits & MDS_INODELOCK_LOOKUP);
1505         /* if @obj is remote object, LOOKUP lock needs to be taken from
1506          * parent MDT.
1507          */
1508         rc = mdt_is_remote_object(info, parent, child);
1509         if (rc < 0)
1510                 return rc;
1511
1512         if (rc == 1) {
1513                 rc = mdt_object_lookup_lock(info, parent, child, lh_lookup,
1514                                             LCK_EX);
1515                 if (rc)
1516                         return rc;
1517
1518                 ibits &= ~MDS_INODELOCK_LOOKUP;
1519         }
1520
1521         rc = mdt_object_lock(info, child, lh, ibits, LCK_EX);
1522         if (unlikely(rc && !(ibits & MDS_INODELOCK_LOOKUP)))
1523                 mdt_object_unlock(info, NULL, lh_lookup, rc);
1524
1525         return 0;
1526 }
1527
1528 static void mdt_rename_source_unlock(struct mdt_thread_info *info,
1529                                      struct mdt_object *obj,
1530                                      struct mdt_lock_handle *lh,
1531                                      struct mdt_lock_handle *lh_lookup,
1532                                      int decref)
1533 {
1534         mdt_object_unlock(info, obj, lh, decref);
1535         mdt_object_unlock(info, NULL, lh_lookup, decref);
1536 }
1537
1538 /* migration takes UPDATE lock of link parent, and LOOKUP lock of link */
1539 struct mdt_link_lock {
1540         struct mdt_object *mll_obj;
1541         struct mdt_lock_handle mll_lh;
1542         struct list_head mll_linkage;
1543 };
1544
1545 static inline int mdt_migrate_link_lock_add(struct mdt_thread_info *info,
1546                                             struct mdt_object *o,
1547                                             struct mdt_lock_handle *lh,
1548                                             struct list_head *list)
1549 {
1550         struct mdt_link_lock *mll;
1551
1552         OBD_ALLOC_PTR(mll);
1553         if (mll == NULL)
1554                 return -ENOMEM;
1555
1556         INIT_LIST_HEAD(&mll->mll_linkage);
1557         mdt_object_get(info->mti_env, o);
1558         mll->mll_obj = o;
1559         mll->mll_lh = *lh;
1560         memset(lh, 0, sizeof(*lh));
1561         list_add_tail(&mll->mll_linkage, list);
1562
1563         return 0;
1564 }
1565
1566 static inline void mdt_migrate_link_lock_del(struct mdt_thread_info *info,
1567                                              struct mdt_link_lock *mll,
1568                                              int decref)
1569 {
1570         mdt_object_unlock(info, mll->mll_obj, &mll->mll_lh, decref);
1571         mdt_object_put(info->mti_env, mll->mll_obj);
1572         list_del(&mll->mll_linkage);
1573         OBD_FREE_PTR(mll);
1574 }
1575
1576 static void mdt_migrate_links_unlock(struct mdt_thread_info *info,
1577                                      struct list_head *list, int decref)
1578 {
1579         struct mdt_link_lock *mll;
1580         struct mdt_link_lock *tmp;
1581
1582         list_for_each_entry_safe(mll, tmp, list, mll_linkage)
1583                 mdt_migrate_link_lock_del(info, mll, decref);
1584 }
1585
1586 /* take link parent UPDATE lock.
1587  * \retval      0 \a lnkp is already locked, no lock taken.
1588  *              1 lock taken
1589  *              -ev negative errno.
1590  */
1591 static int mdt_migrate_link_parent_lock(struct mdt_thread_info *info,
1592                                         struct mdt_object *lnkp,
1593                                         struct list_head *update_locks,
1594                                         bool *blocked)
1595 {
1596         const struct lu_fid *fid = mdt_object_fid(lnkp);
1597         struct mdt_lock_handle *lhl = &info->mti_lh[MDT_LH_LOCAL];
1598         struct mdt_link_lock *entry;
1599         __u64 ibits = 0;
1600         int rc;
1601
1602         ENTRY;
1603
1604         /* check if it's already locked */
1605         list_for_each_entry(entry, update_locks, mll_linkage) {
1606                 if (lu_fid_eq(mdt_object_fid(entry->mll_obj), fid)) {
1607                         CDEBUG(D_INFO, "skip "DFID" lock\n", PFID(fid));
1608                         RETURN(0);
1609                 }
1610         }
1611
1612         /* link parent UPDATE lock */
1613         CDEBUG(D_INFO, "lock "DFID"\n", PFID(fid));
1614
1615         if (*blocked) {
1616                 /* revoke lock instead of take in *blocked* mode */
1617                 rc = mdt_object_lock(info, lnkp, lhl, MDS_INODELOCK_UPDATE,
1618                                      LCK_PW);
1619                 if (rc)
1620                         RETURN(rc);
1621
1622                 if (mdt_object_remote(lnkp)) {
1623                         struct ldlm_lock *lock;
1624
1625                         /*
1626                          * for remote object, set lock cb_atomic, so lock can be
1627                          * released in blocking_ast() immediately, then the next
1628                          * lock_try will have better chance of success.
1629                          */
1630                         lock = ldlm_handle2lock(&lhl->mlh_rreg_lh);
1631                         LASSERT(lock != NULL);
1632                         lock_res_and_lock(lock);
1633                         ldlm_set_atomic_cb(lock);
1634                         unlock_res_and_lock(lock);
1635                         LDLM_LOCK_PUT(lock);
1636                 }
1637
1638                 mdt_object_unlock(info, lnkp, lhl, 1);
1639                 RETURN(0);
1640         }
1641
1642         /*
1643          * we can't follow parent-child lock order like other MD
1644          * operations, use lock_try here to avoid deadlock, if the lock
1645          * cannot be taken, drop all locks taken, revoke the blocked
1646          * one, and continue processing the remaining entries, and in
1647          * the end of the loop restart from beginning.
1648          *
1649          * don't lock with PDO mode in case two links are under the same
1650          * parent and their hash values are different.
1651          */
1652         rc = mdt_object_lock_try(info, lnkp, lhl, &ibits, MDS_INODELOCK_UPDATE,
1653                                  LCK_PW);
1654         if (rc < 0)
1655                 RETURN(rc);
1656
1657         if (!(ibits & MDS_INODELOCK_UPDATE)) {
1658                 CDEBUG(D_INFO, "busy lock on "DFID"\n", PFID(fid));
1659                 *blocked = true;
1660                 RETURN(-EAGAIN);
1661         }
1662
1663         rc = mdt_migrate_link_lock_add(info, lnkp, lhl, update_locks);
1664         if (rc) {
1665                 mdt_object_unlock(info, lnkp, lhl, 1);
1666                 RETURN(rc);
1667         }
1668
1669         RETURN(1);
1670 }
1671
1672 /* take link LOOKUP lock.
1673  * \retval      0 \a lnkp is already locked, no lock taken.
1674  *              1 lock taken.
1675  *              -ev negative errno.
1676  */
1677 static int mdt_migrate_link_lock(struct mdt_thread_info *info,
1678                                  struct mdt_object *lnkp,
1679                                  struct mdt_object *spobj,
1680                                  struct mdt_object *obj,
1681                                  struct list_head *lookup_locks)
1682 {
1683         const struct lu_fid *fid = mdt_object_fid(lnkp);
1684         struct mdt_lock_handle *lhl = &info->mti_lh[MDT_LH_LOCAL];
1685         struct mdt_link_lock *entry;
1686         int rc;
1687
1688         ENTRY;
1689
1690         /* check if it's already locked by source */
1691         rc = mdt_fids_different_target(info, fid, mdt_object_fid(spobj));
1692         if (rc <= 0) {
1693                 CDEBUG(D_INFO, "skip lookup lock on source parent "DFID"\n",
1694                        PFID(fid));
1695                 RETURN(rc);
1696         }
1697
1698         /* check if it's already locked by other links */
1699         list_for_each_entry(entry, lookup_locks, mll_linkage) {
1700                 rc = mdt_fids_different_target(info, fid,
1701                                                mdt_object_fid(entry->mll_obj));
1702                 if (rc <= 0) {
1703                         CDEBUG(D_INFO, "skip lookup lock on parent "DFID"\n",
1704                                PFID(fid));
1705                         RETURN(rc);
1706                 }
1707         }
1708
1709         rc = mdt_object_lookup_lock(info, lnkp, obj, lhl, LCK_EX);
1710         if (rc)
1711                 RETURN(rc);
1712
1713         /* don't take local LOOKUP lock, because later we will lock other ibits
1714          * of sobj (which is on local MDT), and lock the same object twice may
1715          * deadlock, just revoke this lock.
1716          */
1717         if (!mdt_object_remote(lnkp))
1718                 GOTO(unlock, rc = 0);
1719
1720         rc = mdt_migrate_link_lock_add(info, lnkp, lhl, lookup_locks);
1721         if (rc)
1722                 GOTO(unlock, rc);
1723
1724         RETURN(1);
1725 unlock:
1726         mdt_object_unlock(info, lnkp, lhl, 1);
1727         return rc;
1728 }
1729
1730 /*
1731  * take UPDATE lock of link parents and LOOKUP lock of links, also check whether
1732  * total local lock count exceeds RS_MAX_LOCKS.
1733  *
1734  * \retval      0 on success, and locks can be saved in ptlrpc_reply_stat
1735  * \retval      1 on success, but total lock count may exceed RS_MAX_LOCKS
1736  * \retval      -ev negative errno upon error
1737  */
1738 static int mdt_migrate_links_lock(struct mdt_thread_info *info,
1739                                   struct mdt_object *spobj,
1740                                   struct mdt_object *tpobj,
1741                                   struct mdt_object *obj,
1742                                   struct mdt_lock_handle *lhsp,
1743                                   struct mdt_lock_handle *lhtp,
1744                                   struct list_head *link_locks)
1745 {
1746         struct mdt_device *mdt = info->mti_mdt;
1747         struct lu_buf *buf = &info->mti_big_buf;
1748         struct lu_name *lname = &info->mti_name;
1749         struct linkea_data ldata = { NULL };
1750         int local_lock_cnt = 0;
1751         bool blocked = false;
1752         bool saved;
1753         struct mdt_object *lnkp;
1754         struct lu_fid fid;
1755         LIST_HEAD(update_locks);
1756         LIST_HEAD(lookup_locks);
1757         int rc;
1758
1759         ENTRY;
1760         if (S_ISDIR(lu_object_attr(&obj->mot_obj)))
1761                 RETURN(0);
1762
1763         buf = lu_buf_check_and_alloc(buf, MAX_LINKEA_SIZE);
1764         if (buf->lb_buf == NULL)
1765                 RETURN(-ENOMEM);
1766
1767         ldata.ld_buf = buf;
1768         rc = mdt_links_read(info, obj, &ldata);
1769         if (rc) {
1770                 if (rc == -ENOENT || rc == -ENODATA)
1771                         rc = 0;
1772                 RETURN(rc);
1773         }
1774
1775         for (linkea_first_entry(&ldata); ldata.ld_lee && !rc;
1776              linkea_next_entry(&ldata)) {
1777                 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, lname,
1778                                     &fid);
1779
1780                 /* check if link parent is source parent too */
1781                 if (lu_fid_eq(mdt_object_fid(spobj), &fid)) {
1782                         CDEBUG(D_INFO,
1783                                "skip lock on source parent "DFID"/"DNAME"\n",
1784                                PFID(&fid), PNAME(lname));
1785                         continue;
1786                 }
1787
1788                 /* check if link parent is target parent too */
1789                 if (tpobj != spobj && lu_fid_eq(mdt_object_fid(tpobj), &fid)) {
1790                         CDEBUG(D_INFO,
1791                                "skip lock on target parent "DFID"/"DNAME"\n",
1792                                PFID(&fid), PNAME(lname));
1793                         continue;
1794                 }
1795
1796                 lnkp = mdt_object_find(info->mti_env, mdt, &fid);
1797                 if (IS_ERR(lnkp)) {
1798                         CWARN("%s: cannot find obj "DFID": %ld\n",
1799                               mdt_obd_name(mdt), PFID(&fid), PTR_ERR(lnkp));
1800                         continue;
1801                 }
1802
1803                 if (!mdt_object_exists(lnkp)) {
1804                         CDEBUG(D_INFO, DFID" doesn't exist, skip "DNAME"\n",
1805                                PFID(&fid), PNAME(lname));
1806                         mdt_object_put(info->mti_env, lnkp);
1807                         continue;
1808                 }
1809 relock:
1810                 saved = blocked;
1811                 rc = mdt_migrate_link_parent_lock(info, lnkp, &update_locks,
1812                                                   &blocked);
1813                 if (!saved && blocked) {
1814                         /* unlock all locks taken to avoid deadlock */
1815                         mdt_migrate_links_unlock(info, &update_locks, 1);
1816                         mdt_object_unlock(info, spobj, lhsp, 1);
1817                         if (tpobj != spobj)
1818                                 mdt_object_unlock(info, tpobj, lhtp, 1);
1819                         goto relock;
1820                 }
1821                 if (rc < 0) {
1822                         mdt_object_put(info->mti_env, lnkp);
1823                         GOTO(out, rc);
1824                 }
1825
1826                 if (rc == 1 && !mdt_object_remote(lnkp))
1827                         local_lock_cnt++;
1828
1829                 rc = mdt_migrate_link_lock(info, lnkp, spobj, obj,
1830                                            &lookup_locks);
1831                 if (rc < 0) {
1832                         mdt_object_put(info->mti_env, lnkp);
1833                         GOTO(out, rc);
1834                 }
1835                 if (rc == 1 && !mdt_object_remote(lnkp))
1836                         local_lock_cnt++;
1837                 mdt_object_put(info->mti_env, lnkp);
1838         }
1839
1840         if (blocked)
1841                 GOTO(out, rc = -EBUSY);
1842
1843         EXIT;
1844 out:
1845         list_splice(&update_locks, link_locks);
1846         list_splice(&lookup_locks, link_locks);
1847         if (rc < 0) {
1848                 mdt_migrate_links_unlock(info, link_locks, rc);
1849         } else if (local_lock_cnt > RS_MAX_LOCKS - 5) {
1850                 /*
1851                  * parent may have 3 local objects: master object and 2 stripes
1852                  * (if it's being migrated too); source may have 1 local objects
1853                  * as regular file; target has 1 local object.
1854                  * Note, source may have 2 local locks if it is directory but it
1855                  * can't have hardlinks, so it is not considered here.
1856                  */
1857                 CDEBUG(D_INFO, "Too many local locks (%d), migrate in sync mode\n",
1858                        local_lock_cnt);
1859                 rc = 1;
1860         }
1861         return rc;
1862 }
1863
1864 /*
1865  * lookup source by name, if parent is striped directory, we need to find the
1866  * corresponding stripe where source is located, and then lookup there.
1867  *
1868  * besides, if parent is migrating too, and file is already in target stripe,
1869  * this should be a redo of 'lfs migrate' on client side.
1870  *
1871  * \retval 1 tpobj stripe index is less than spobj stripe index
1872  * \retval 0 tpobj stripe index is larger than or equal to spobj stripe index
1873  * \retval -ev negative errno upon error
1874  */
1875 static int mdt_migrate_lookup(struct mdt_thread_info *info,
1876                               struct mdt_object *pobj,
1877                               const struct md_attr *ma,
1878                               const struct lu_name *lname,
1879                               struct mdt_object **spobj,
1880                               struct mdt_object **tpobj,
1881                               struct mdt_object **sobj)
1882 {
1883         const struct lu_env *env = info->mti_env;
1884         struct lu_fid *fid = &info->mti_tmp_fid1;
1885         int spindex = -1;
1886         int tpindex = -1;
1887         int rc;
1888
1889         if (ma->ma_valid & MA_LMV) {
1890                 /* if parent is striped, lookup on corresponding stripe */
1891                 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1892                 struct lu_fid *fid2 = &info->mti_tmp_fid2;
1893
1894                 if (!lmv_is_sane(lmv))
1895                         return -EBADF;
1896
1897                 spindex = lmv_name_to_stripe_index_old(lmv, lname->ln_name,
1898                                                        lname->ln_namelen);
1899                 if (spindex < 0)
1900                         return spindex;
1901
1902                 fid_le_to_cpu(fid2, &lmv->lmv_stripe_fids[spindex]);
1903
1904                 *spobj = mdt_object_find(env, info->mti_mdt, fid2);
1905                 if (IS_ERR(*spobj)) {
1906                         rc = PTR_ERR(*spobj);
1907                         *spobj = NULL;
1908                         return rc;
1909                 }
1910
1911                 if (!mdt_object_exists(*spobj))
1912                         GOTO(spobj_put, rc = -ENOENT);
1913
1914                 fid_zero(fid);
1915                 rc = mdo_lookup(env, mdt_object_child(*spobj), lname, fid,
1916                                 &info->mti_spec);
1917                 if ((rc == -ENOENT || rc == 0) && lmv_is_layout_changing(lmv)) {
1918                         /* fail check here to let top dir migration succeed. */
1919                         if (CFS_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_ENTRIES, 0))
1920                                 GOTO(spobj_put, rc = -EIO);
1921
1922                         /*
1923                          * if parent layout is changeing, and lookup child
1924                          * failed on source stripe, lookup again on target
1925                          * stripe, if it exists, it means previous migration
1926                          * was interrupted, and current file was migrated
1927                          * already.
1928                          */
1929                         tpindex = lmv_name_to_stripe_index(lmv, lname->ln_name,
1930                                                            lname->ln_namelen);
1931                         if (tpindex < 0)
1932                                 GOTO(spobj_put, rc = tpindex);
1933
1934                         fid_le_to_cpu(fid2, &lmv->lmv_stripe_fids[tpindex]);
1935
1936                         *tpobj = mdt_object_find(env, info->mti_mdt, fid2);
1937                         if (IS_ERR(*tpobj)) {
1938                                 rc = PTR_ERR(*tpobj);
1939                                 *tpobj = NULL;
1940                                 GOTO(spobj_put, rc);
1941                         }
1942
1943                         if (!mdt_object_exists(*tpobj))
1944                                 GOTO(tpobj_put, rc = -ENOENT);
1945
1946                         if (rc == -ENOENT) {
1947                                 fid_zero(fid);
1948                                 rc = mdo_lookup(env, mdt_object_child(*tpobj),
1949                                                 lname, fid, &info->mti_spec);
1950                                 GOTO(tpobj_put, rc = rc ?: -EALREADY);
1951                         }
1952                 } else if (rc) {
1953                         GOTO(spobj_put, rc);
1954                 } else {
1955                         *tpobj = *spobj;
1956                         tpindex = spindex;
1957                         mdt_object_get(env, *tpobj);
1958                 }
1959         } else {
1960                 fid_zero(fid);
1961                 rc = mdo_lookup(env, mdt_object_child(pobj), lname, fid,
1962                                 &info->mti_spec);
1963                 if (rc)
1964                         return rc;
1965
1966                 *spobj = pobj;
1967                 *tpobj = pobj;
1968                 mdt_object_get(env, pobj);
1969                 mdt_object_get(env, pobj);
1970         }
1971
1972         *sobj = mdt_object_find(env, info->mti_mdt, fid);
1973         if (IS_ERR(*sobj)) {
1974                 rc = PTR_ERR(*sobj);
1975                 *sobj = NULL;
1976                 GOTO(tpobj_put, rc);
1977         }
1978
1979         if (!mdt_object_exists(*sobj))
1980                 GOTO(sobj_put, rc = -ENOENT);
1981
1982         return (tpindex < spindex);
1983
1984 sobj_put:
1985         mdt_object_put(env, *sobj);
1986         *sobj = NULL;
1987 tpobj_put:
1988         mdt_object_put(env, *tpobj);
1989         *tpobj = NULL;
1990 spobj_put:
1991         mdt_object_put(env, *spobj);
1992         *spobj = NULL;
1993
1994         return rc;
1995 }
1996
1997 /* end lease and close file for regular file */
1998 static int mdd_migrate_close(struct mdt_thread_info *info,
1999                              struct mdt_object *obj)
2000 {
2001         struct close_data *data;
2002         struct mdt_body *repbody;
2003         struct ldlm_lock *lease;
2004         int rc;
2005         int rc2;
2006
2007         rc = -EPROTO;
2008         if (!req_capsule_field_present(info->mti_pill, &RMF_MDT_EPOCH,
2009                                       RCL_CLIENT) ||
2010             !req_capsule_field_present(info->mti_pill, &RMF_CLOSE_DATA,
2011                                       RCL_CLIENT))
2012                 goto close;
2013
2014         data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
2015         if (!data)
2016                 goto close;
2017
2018         rc = -ESTALE;
2019         lease = ldlm_handle2lock(&data->cd_handle);
2020         if (!lease)
2021                 goto close;
2022
2023         /* check if the lease was already canceled */
2024         lock_res_and_lock(lease);
2025         rc = ldlm_is_cancel(lease);
2026         unlock_res_and_lock(lease);
2027
2028         if (rc) {
2029                 rc = -EAGAIN;
2030                 LDLM_DEBUG(lease, DFID" lease broken",
2031                            PFID(mdt_object_fid(obj)));
2032         }
2033
2034         /*
2035          * cancel server side lease, client side counterpart should have been
2036          * cancelled, it's okay to cancel it now as we've held mot_open_sem.
2037          */
2038         ldlm_lock_cancel(lease);
2039         ldlm_reprocess_all(lease->l_resource,
2040                            lease->l_policy_data.l_inodebits.bits);
2041         LDLM_LOCK_PUT(lease);
2042
2043 close:
2044         rc2 = mdt_close_internal(info, mdt_info_req(info), NULL);
2045         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2046         repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
2047
2048         return rc ?: rc2;
2049 }
2050
2051 /* LFSCK used to clear hash type and MIGRATION flag upon migration failure */
2052 static inline bool lmv_is_failed_migration(const struct lmv_mds_md_v1 *lmv)
2053 {
2054         return le32_to_cpu(lmv->lmv_hash_type) ==
2055                 (LMV_HASH_TYPE_UNKNOWN | LMV_HASH_FLAG_BAD_TYPE) &&
2056                lmv_is_known_hash_type(le32_to_cpu(lmv->lmv_migrate_hash)) &&
2057                le32_to_cpu(lmv->lmv_migrate_offset) > 0 &&
2058                le32_to_cpu(lmv->lmv_migrate_offset) <
2059                 le32_to_cpu(lmv->lmv_stripe_count);
2060 }
2061
2062 /*
2063  * migrate file in below steps:
2064  *  1. lock source and target stripes
2065  *  2. lookup source by name
2066  *  3. lock parents of source links if source is not directory
2067  *  4. reject if source is in HSM
2068  *  5. take source open_sem and close file if source is regular file
2069  *  6. lock source, and its stripes if it's directory
2070  *  7. migrate file
2071  *  8. lock target so subsequent change to it can trigger COS
2072  *  9. unlock above locks
2073  * 10. sync device if source has too many links
2074  */
2075 int mdt_reint_migrate(struct mdt_thread_info *info,
2076                       struct mdt_lock_handle *unused)
2077 {
2078         const struct lu_env *env = info->mti_env;
2079         struct mdt_device *mdt = info->mti_mdt;
2080         struct ptlrpc_request *req = mdt_info_req(info);
2081         struct mdt_reint_record *rr = &info->mti_rr;
2082         struct lu_ucred *uc = mdt_ucred(info);
2083         struct md_attr *ma = &info->mti_attr;
2084         struct mdt_object *pobj;
2085         struct mdt_object *spobj;
2086         struct mdt_object *tpobj;
2087         struct mdt_object *sobj;
2088         struct mdt_object *tobj;
2089         struct mdt_lock_handle *rename_lh = &info->mti_lh[MDT_LH_RMT];
2090         struct mdt_lock_handle *lhsp;
2091         struct mdt_lock_handle *lhtp;
2092         struct mdt_lock_handle *lhs;
2093         struct mdt_lock_handle *lhl;
2094         LIST_HEAD(link_locks);
2095         int lock_retries = 5;
2096         bool reverse = false;
2097         bool open_sem_locked = false;
2098         bool do_sync = false;
2099         bool is_plain_dir = false;
2100         int rc;
2101
2102         ENTRY;
2103         CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1),
2104                PNAME(&rr->rr_name), PFID(rr->rr_fid2));
2105
2106         if (info->mti_dlm_req)
2107                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2108
2109         if (!fid_is_md_operative(rr->rr_fid1) ||
2110             !fid_is_md_operative(rr->rr_fid2))
2111                 RETURN(-EPERM);
2112
2113         /* don't allow migrate . or .. */
2114         if (lu_name_is_dot_or_dotdot(&rr->rr_name))
2115                 RETURN(-EBUSY);
2116
2117         if (!mdt->mdt_enable_remote_dir || !mdt->mdt_enable_dir_migration)
2118                 RETURN(-EPERM);
2119
2120         /* we want rbac roles to have precedence over any other
2121          * permission or capability checks
2122          */
2123         if (uc && (!uc->uc_rbac_dne_ops ||
2124                    (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
2125                     uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
2126                     mdt->mdt_enable_remote_dir_gid != -1)))
2127                 RETURN(-EPERM);
2128
2129         /*
2130          * Note: do not enqueue rename lock for replay request, because
2131          * if other MDT holds rename lock, but being blocked to wait for
2132          * this MDT to finish its recovery, and the failover MDT can not
2133          * get rename lock, which will cause deadlock.
2134          *
2135          * req is NULL if this is called by directory auto-split.
2136          */
2137         if (req && !req_is_replay(req)) {
2138                 rc = mdt_rename_lock(info, rename_lh);
2139                 if (rc != 0) {
2140                         CERROR("%s: can't lock FS for rename: rc = %d\n",
2141                                mdt_obd_name(info->mti_mdt), rc);
2142                         RETURN(rc);
2143                 }
2144         }
2145
2146         /* pobj is master object of parent */
2147         pobj = mdt_object_find(env, mdt, rr->rr_fid1);
2148         if (IS_ERR(pobj))
2149                 GOTO(unlock_rename, rc = PTR_ERR(pobj));
2150
2151         if (req) {
2152                 rc = mdt_version_get_check(info, pobj, 0);
2153                 if (rc)
2154                         GOTO(put_parent, rc);
2155         }
2156
2157         if (!mdt_object_exists(pobj))
2158                 GOTO(put_parent, rc = -ENOENT);
2159
2160         if (!S_ISDIR(lu_object_attr(&pobj->mot_obj)))
2161                 GOTO(put_parent, rc = -ENOTDIR);
2162
2163         rc = mdt_check_enc(info, pobj);
2164         if (rc)
2165                 GOTO(put_parent, rc);
2166
2167         rc = mdt_stripe_get(info, pobj, ma, XATTR_NAME_LMV);
2168         if (rc)
2169                 GOTO(put_parent, rc);
2170
2171         if (CFS_FAIL_CHECK(OBD_FAIL_MIGRATE_BAD_HASH) &&
2172             (ma->ma_valid & MA_LMV) &&
2173             lmv_is_migrating(&ma->ma_lmv->lmv_md_v1)) {
2174                 struct lu_buf *buf = &info->mti_buf;
2175                 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
2176                 __u32 version = le32_to_cpu(lmv->lmv_layout_version);
2177
2178                 lmv->lmv_hash_type = cpu_to_le32(LMV_HASH_TYPE_UNKNOWN |
2179                                                  LMV_HASH_FLAG_BAD_TYPE);
2180                 lmv->lmv_layout_version = cpu_to_le32(version + 1);
2181                 buf->lb_buf = lmv;
2182                 buf->lb_len = sizeof(*lmv);
2183                 rc = mo_xattr_set(env, mdt_object_child(pobj), buf,
2184                                   XATTR_NAME_LMV, LU_XATTR_REPLACE);
2185                 mo_invalidate(env, mdt_object_child(pobj));
2186                 GOTO(put_parent, rc);
2187         }
2188
2189         /* @spobj is the parent stripe of @sobj if @pobj is striped directory,
2190          * if @pobj is migrating too, tpobj is the target parent stripe.
2191          */
2192         rc = mdt_migrate_lookup(info, pobj, ma, &rr->rr_name, &spobj, &tpobj,
2193                                 &sobj);
2194         if (rc < 0)
2195                 GOTO(put_parent, rc);
2196         reverse = rc;
2197
2198         /* parent unchanged, this happens in dir restripe */
2199         if (info->mti_spec.sp_migrate_nsonly && spobj == tpobj)
2200                 GOTO(put_source, rc = -EALREADY);
2201
2202 lock_parent:
2203         LASSERT(spobj);
2204         LASSERT(tpobj);
2205         lhsp = &info->mti_lh[MDT_LH_PARENT];
2206         lhtp = &info->mti_lh[MDT_LH_CHILD];
2207         /* lock spobj and tpobj in stripe index order */
2208         if (reverse) {
2209                 rc = mdt_parent_lock(info, tpobj, lhtp, &rr->rr_name, LCK_PW);
2210                 if (rc)
2211                         GOTO(put_source, rc);
2212
2213                 LASSERT(spobj != tpobj);
2214                 rc = mdt_parent_lock(info, spobj, lhsp, &rr->rr_name, LCK_PW);
2215                 if (rc)
2216                         GOTO(unlock_parent, rc);
2217         } else {
2218                 rc = mdt_parent_lock(info, spobj, lhsp, &rr->rr_name, LCK_PW);
2219                 if (rc)
2220                         GOTO(put_source, rc);
2221
2222                 if (tpobj != spobj) {
2223                         rc = mdt_parent_lock(info, tpobj, lhtp, &rr->rr_name,
2224                                              LCK_PW);
2225                         if (rc)
2226                                 GOTO(unlock_parent, rc);
2227                 }
2228         }
2229
2230         /* if inode is not migrated, or is dir, no need to lock links */
2231         if (!info->mti_spec.sp_migrate_nsonly &&
2232             !S_ISDIR(lu_object_attr(&sobj->mot_obj))) {
2233                 /* lock link parents, and take LOOKUP lock of links */
2234                 rc = mdt_migrate_links_lock(info, spobj, tpobj, sobj, lhsp,
2235                                             lhtp, &link_locks);
2236                 if (rc == -EBUSY && lock_retries-- > 0) {
2237                         LASSERT(list_empty(&link_locks));
2238                         goto lock_parent;
2239                 }
2240
2241                 if (rc < 0)
2242                         GOTO(put_source, rc);
2243
2244                 /*
2245                  * RS_MAX_LOCKS is the limit of number of locks that can be
2246                  * saved along with one request, if total lock count exceeds
2247                  * this limit, we will drop all locks after migration, and
2248                  * trigger commit in the end.
2249                  */
2250                 do_sync = rc;
2251         }
2252
2253         /* lock source */
2254         lhs = &info->mti_lh[MDT_LH_OLD];
2255         lhl = &info->mti_lh[MDT_LH_LOOKUP];
2256         rc = mdt_rename_source_lock(info, spobj, sobj, lhs, lhl,
2257                                     MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR |
2258                                     MDS_INODELOCK_OPEN);
2259         if (rc)
2260                 GOTO(unlock_links, rc);
2261
2262         if (S_ISREG(lu_object_attr(&sobj->mot_obj))) {
2263                 /* TODO: DoM migration is not supported, migrate dirent only */
2264                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LOV);
2265                 if (rc)
2266                         GOTO(unlock_source, rc);
2267
2268                 if (ma->ma_valid & MA_LOV && mdt_lmm_dom_stripesize(ma->ma_lmm))
2269                         info->mti_spec.sp_migrate_nsonly = 1;
2270         } else if (S_ISDIR(lu_object_attr(&sobj->mot_obj))) {
2271                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2272                 if (rc)
2273                         GOTO(unlock_source, rc);
2274
2275                 if (!(ma->ma_valid & MA_LMV))
2276                         is_plain_dir = true;
2277                 else if (lmv_is_restriping(&ma->ma_lmv->lmv_md_v1))
2278                         /* race with restripe/auto-split */
2279                         GOTO(unlock_source, rc = -EBUSY);
2280                 else if (lmv_is_failed_migration(&ma->ma_lmv->lmv_md_v1)) {
2281                         struct lu_buf *buf = &info->mti_buf;
2282                         struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
2283                         __u32 version = le32_to_cpu(lmv->lmv_layout_version);
2284
2285                         /* migration failed before, and LFSCK cleared hash type
2286                          * and flags, fake it to resume migration.
2287                          */
2288                         lmv->lmv_hash_type =
2289                                 cpu_to_le32(LMV_HASH_TYPE_FNV_1A_64 |
2290                                             LMV_HASH_FLAG_MIGRATION |
2291                                             LMV_HASH_FLAG_BAD_TYPE |
2292                                             LMV_HASH_FLAG_FIXED);
2293                         lmv->lmv_layout_version = cpu_to_le32(version + 1);
2294                         buf->lb_buf = lmv;
2295                         buf->lb_len = sizeof(*lmv);
2296                         rc = mo_xattr_set(env, mdt_object_child(sobj), buf,
2297                                           XATTR_NAME_LMV, LU_XATTR_REPLACE);
2298                         mo_invalidate(env, mdt_object_child(sobj));
2299                         GOTO(unlock_source, rc = -EALREADY);
2300                 }
2301         }
2302
2303         /* if migration HSM is allowed */
2304         if (!mdt->mdt_opts.mo_migrate_hsm_allowed) {
2305                 ma->ma_need = MA_HSM;
2306                 ma->ma_valid = 0;
2307                 rc = mdt_attr_get_complex(info, sobj, ma);
2308                 if (rc)
2309                         GOTO(unlock_source, rc);
2310
2311                 if ((ma->ma_valid & MA_HSM) && ma->ma_hsm.mh_flags != 0)
2312                         GOTO(unlock_source, rc = -EOPNOTSUPP);
2313         }
2314
2315         /* end lease and close file for regular file */
2316         if (info->mti_spec.sp_migrate_close) {
2317                 /* try to hold open_sem so that nobody else can open the file */
2318                 if (!down_write_trylock(&sobj->mot_open_sem)) {
2319                         /* close anyway */
2320                         mdd_migrate_close(info, sobj);
2321                         GOTO(unlock_source, rc = -EBUSY);
2322                 } else {
2323                         open_sem_locked = true;
2324                         rc = mdd_migrate_close(info, sobj);
2325                         if (rc && rc != -ESTALE)
2326                                 GOTO(unlock_open_sem, rc);
2327                 }
2328         }
2329
2330         tobj = mdt_object_find(env, mdt, rr->rr_fid2);
2331         if (IS_ERR(tobj))
2332                 GOTO(unlock_open_sem, rc = PTR_ERR(tobj));
2333
2334         /* Don't do lookup sanity check. We know name doesn't exist. */
2335         info->mti_spec.sp_cr_lookup = 0;
2336         info->mti_spec.sp_feat = &dt_directory_features;
2337
2338         rc = mdo_migrate(env, mdt_object_child(spobj),
2339                          mdt_object_child(tpobj), mdt_object_child(sobj),
2340                          mdt_object_child(tobj), &rr->rr_name,
2341                          &info->mti_spec, ma);
2342         if (rc)
2343                 GOTO(put_target, rc);
2344
2345         /* save target locks for directory */
2346         if (S_ISDIR(lu_object_attr(&sobj->mot_obj)) &&
2347             !info->mti_spec.sp_migrate_nsonly) {
2348                 struct mdt_lock_handle *lht = &info->mti_lh[MDT_LH_NEW];
2349                 struct ldlm_enqueue_info *einfo = &info->mti_einfo;
2350
2351                 /* in case sobj becomes a stripe of tobj, unlock sobj here,
2352                  * otherwise stripes lock may deadlock.
2353                  */
2354                 if (is_plain_dir)
2355                         mdt_rename_source_unlock(info, sobj, lhs, lhl, 1);
2356
2357                 rc = mdt_object_stripes_lock(info, tpobj, tobj, lht, einfo,
2358                                              MDS_INODELOCK_UPDATE, LCK_PW);
2359                 if (rc)
2360                         GOTO(put_target, rc);
2361
2362                 mdt_object_stripes_unlock(info, tobj, lht, einfo, 0);
2363         }
2364
2365         lprocfs_counter_incr(mdt->mdt_lu_dev.ld_obd->obd_md_stats,
2366                              LPROC_MDT_MIGRATE + LPROC_MD_LAST_OPC);
2367
2368         EXIT;
2369 put_target:
2370         mdt_object_put(env, tobj);
2371 unlock_open_sem:
2372         if (open_sem_locked)
2373                 up_write(&sobj->mot_open_sem);
2374 unlock_source:
2375         mdt_rename_source_unlock(info, sobj, lhs, lhl, rc);
2376 unlock_links:
2377         /* if we've got too many locks to save into RPC,
2378          * then just commit before the locks are released
2379          */
2380         if (!rc && do_sync)
2381                 mdt_device_sync(env, mdt);
2382         mdt_migrate_links_unlock(info, &link_locks, do_sync ? 1 : rc);
2383 unlock_parent:
2384         mdt_object_unlock(info, spobj, lhsp, rc);
2385         mdt_object_unlock(info, tpobj, lhtp, rc);
2386 put_source:
2387         mdt_object_put(env, sobj);
2388         mdt_object_put(env, spobj);
2389         mdt_object_put(env, tpobj);
2390 put_parent:
2391         mo_invalidate(env, mdt_object_child(pobj));
2392         mdt_object_put(env, pobj);
2393 unlock_rename:
2394         mdt_rename_unlock(info, rename_lh);
2395
2396         if (rc)
2397                 CERROR("%s: migrate "DFID"/"DNAME" failed: rc = %d\n",
2398                        mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1),
2399                        PNAME(&rr->rr_name), rc);
2400
2401         return rc;
2402 }
2403
2404 /*
2405  * determine lock order of sobj and tobj
2406  *
2407  * there are two situations we need to lock tobj before sobj:
2408  * 1. sobj is child of tobj
2409  * 2. sobj and tobj are stripes of a directory, and stripe index of sobj is
2410  *    larger than that of tobj
2411  *
2412  * \retval      1 lock tobj before sobj
2413  * \retval      0 lock sobj before tobj
2414  * \retval      -ev negative errno upon error
2415  */
2416 static int mdt_rename_determine_lock_order(struct mdt_thread_info *info,
2417                                            struct mdt_object *sobj,
2418                                            struct mdt_object *tobj)
2419 {
2420         struct md_attr *ma = &info->mti_attr;
2421         struct lu_fid *spfid = &info->mti_tmp_fid1;
2422         struct lu_fid *tpfid = &info->mti_tmp_fid2;
2423         struct lmv_mds_md_v1 *lmv;
2424         __u32 sindex;
2425         __u32 tindex;
2426         int rc;
2427
2428         /* sobj and tobj are the same */
2429         if (sobj == tobj)
2430                 return 0;
2431
2432         if (fid_is_root(mdt_object_fid(sobj)))
2433                 return 0;
2434
2435         if (fid_is_root(mdt_object_fid(tobj)))
2436                 return 1;
2437
2438         /* check whether sobj is child of tobj */
2439         rc = mdo_is_subdir(info->mti_env, mdt_object_child(sobj),
2440                            mdt_object_fid(tobj));
2441         if (rc < 0)
2442                 return rc;
2443
2444         if (rc == 1)
2445                 return 1;
2446
2447         /* check whether sobj and tobj are children of the same parent */
2448         rc = mdt_attr_get_pfid(info, sobj, spfid);
2449         if (rc)
2450                 return rc;
2451
2452         rc = mdt_attr_get_pfid(info, tobj, tpfid);
2453         if (rc)
2454                 return rc;
2455
2456         if (!lu_fid_eq(spfid, tpfid))
2457                 return 0;
2458
2459         /* check whether sobj and tobj are sibling stripes */
2460         rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2461         if (rc)
2462                 return rc;
2463
2464         if (!(ma->ma_valid & MA_LMV))
2465                 return 0;
2466
2467         lmv = &ma->ma_lmv->lmv_md_v1;
2468         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2469                 return 0;
2470         sindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2471
2472         ma->ma_valid = 0;
2473         rc = mdt_stripe_get(info, tobj, ma, XATTR_NAME_LMV);
2474         if (rc)
2475                 return rc;
2476
2477         if (!(ma->ma_valid & MA_LMV))
2478                 return -ENODATA;
2479
2480         lmv = &ma->ma_lmv->lmv_md_v1;
2481         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2482                 return -EINVAL;
2483         tindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2484
2485         /* check stripe index of sobj and tobj */
2486         if (sindex == tindex)
2487                 return -EINVAL;
2488
2489         return sindex < tindex ? 0 : 1;
2490 }
2491
2492 /* Helper function for mdt_reint_rename so we don't need to opencode
2493  * two different order lockings
2494  */
2495 static int mdt_lock_two_dirs(struct mdt_thread_info *info,
2496                              struct mdt_object *mfirstdir,
2497                              struct mdt_lock_handle *lh_firstdirp,
2498                              const struct lu_name *firstname,
2499                              struct mdt_object *mseconddir,
2500                              struct mdt_lock_handle *lh_seconddirp,
2501                              const struct lu_name *secondname)
2502 {
2503         int rc;
2504
2505         rc = mdt_parent_lock(info, mfirstdir, lh_firstdirp, firstname, LCK_PW);
2506         if (rc)
2507                 return rc;
2508
2509         mdt_version_get_save(info, mfirstdir, 0);
2510         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
2511
2512         if (mfirstdir != mseconddir) {
2513                 rc = mdt_parent_lock(info, mseconddir, lh_seconddirp,
2514                                      secondname, LCK_PW);
2515         } else if (!mdt_object_remote(mseconddir)) {
2516                 if (lh_firstdirp->mlh_pdo_hash !=
2517                     lh_seconddirp->mlh_pdo_hash) {
2518                         rc = mdt_object_pdo_lock(info, mseconddir,
2519                                                  lh_seconddirp, secondname,
2520                                                  LCK_PW, false);
2521                         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
2522                 }
2523         }
2524         mdt_version_get_save(info, mseconddir, 1);
2525
2526         if (rc != 0)
2527                 mdt_object_unlock(info, mfirstdir, lh_firstdirp, rc);
2528
2529         return rc;
2530 }
2531
2532 /*
2533  * VBR: rename versions in reply: 0 - srcdir parent; 1 - tgtdir parent;
2534  * 2 - srcdir child; 3 - tgtdir child.
2535  * Update on disk version of srcdir child.
2536  */
2537 static int mdt_reint_rename(struct mdt_thread_info *info,
2538                             struct mdt_lock_handle *unused)
2539 {
2540         struct mdt_device *mdt = info->mti_mdt;
2541         struct mdt_reint_record *rr = &info->mti_rr;
2542         struct md_attr *ma = &info->mti_attr;
2543         struct ptlrpc_request *req = mdt_info_req(info);
2544         struct mdt_object *msrcdir = NULL;
2545         struct mdt_object *mtgtdir = NULL;
2546         struct mdt_object *mold;
2547         struct mdt_object *mnew = NULL;
2548         struct mdt_lock_handle *rename_lh = &info->mti_lh[MDT_LH_RMT];
2549         struct mdt_lock_handle *lh_srcdirp;
2550         struct mdt_lock_handle *lh_tgtdirp;
2551         struct mdt_lock_handle *lh_oldp = NULL;
2552         struct mdt_lock_handle *lh_lookup = NULL;
2553         struct mdt_lock_handle *lh_newp = NULL;
2554         struct lu_fid *old_fid = &info->mti_tmp_fid1;
2555         struct lu_fid *new_fid = &info->mti_tmp_fid2;
2556         struct lu_ucred *uc = mdt_ucred(info);
2557         bool reverse = false, discard = false;
2558         ktime_t kstart = ktime_get();
2559         enum mdt_stat_idx msi = 0;
2560         int rc;
2561
2562         ENTRY;
2563         DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME,
2564                   PFID(rr->rr_fid1), PNAME(&rr->rr_name),
2565                   PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name));
2566
2567         if (info->mti_dlm_req)
2568                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2569
2570         if (!fid_is_md_operative(rr->rr_fid1) ||
2571             !fid_is_md_operative(rr->rr_fid2))
2572                 RETURN(-EPERM);
2573
2574         /* find both parents. */
2575         msrcdir = mdt_parent_find_check(info, rr->rr_fid1, 0);
2576         if (IS_ERR(msrcdir))
2577                 RETURN(PTR_ERR(msrcdir));
2578
2579         rc = mdt_check_enc(info, msrcdir);
2580         if (rc)
2581                 GOTO(out_put_srcdir, rc);
2582
2583         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
2584
2585         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
2586                 mtgtdir = msrcdir;
2587                 mdt_object_get(info->mti_env, mtgtdir);
2588         } else {
2589                 mtgtdir = mdt_parent_find_check(info, rr->rr_fid2, 1);
2590                 if (IS_ERR(mtgtdir))
2591                         GOTO(out_put_srcdir, rc = PTR_ERR(mtgtdir));
2592         }
2593
2594         rc = mdt_check_enc(info, mtgtdir);
2595         if (rc)
2596                 GOTO(out_put_tgtdir, rc);
2597
2598         if (!uc->uc_rbac_fscrypt_admin &&
2599             mtgtdir->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD)
2600                 GOTO(out_put_tgtdir, rc = -EPERM);
2601
2602         /*
2603          * Note: do not enqueue rename lock for replay request, because
2604          * if other MDT holds rename lock, but being blocked to wait for
2605          * this MDT to finish its recovery, and the failover MDT can not
2606          * get rename lock, which will cause deadlock.
2607          */
2608         if (!req_is_replay(req)) {
2609                 bool remote = mdt_object_remote(msrcdir);
2610
2611                 /*
2612                  * Normally rename RPC is handled on the MDT with the target
2613                  * directory (if target exists, it's on the MDT with the
2614                  * target), if the source directory is remote, it's a hint that
2615                  * source is remote too (this may not be true, but it won't
2616                  * cause any issue), return -EXDEV early to avoid taking
2617                  * rename_lock.
2618                  */
2619                 if (!mdt->mdt_enable_remote_rename && remote)
2620                         GOTO(out_put_tgtdir, rc = -EXDEV);
2621
2622                 /* This might be further relaxed in the future for regular file
2623                  * renames in different source and target parents. Start with
2624                  * only same-directory renames for simplicity and because this
2625                  * is by far the most the common use case.
2626                  *
2627                  * Striped directories should be considered "remote".
2628                  */
2629                 if (msrcdir != mtgtdir || remote ||
2630                     (S_ISDIR(ma->ma_attr.la_mode) &&
2631                      !mdt->mdt_enable_parallel_rename_dir) ||
2632                     (!S_ISDIR(ma->ma_attr.la_mode) &&
2633                      !mdt->mdt_enable_parallel_rename_file)) {
2634                         rc = mdt_rename_lock(info, rename_lh);
2635                         if (rc != 0) {
2636                                 CERROR("%s: cannot lock for rename: rc = %d\n",
2637                                        mdt_obd_name(mdt), rc);
2638                                 GOTO(out_put_tgtdir, rc);
2639                         }
2640                 } else {
2641                         if (S_ISDIR(ma->ma_attr.la_mode))
2642                                 msi = LPROC_MDT_RENAME_PAR_DIR;
2643                         else
2644                                 msi = LPROC_MDT_RENAME_PAR_FILE;
2645
2646                         CDEBUG(D_INFO,
2647                                "%s: samedir parallel rename "DFID"/"DNAME"\n",
2648                                mdt_obd_name(mdt), PFID(rr->rr_fid1),
2649                                PNAME(&rr->rr_name));
2650                 }
2651         }
2652
2653         rc = mdt_rename_determine_lock_order(info, msrcdir, mtgtdir);
2654         if (rc < 0)
2655                 GOTO(out_unlock_rename, rc);
2656         reverse = rc;
2657
2658         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2659         CFS_RACE(OBD_FAIL_MDS_REINT_OPEN);
2660         CFS_RACE(OBD_FAIL_MDS_REINT_OPEN2);
2661
2662         /* lock parents in the proper order. */
2663         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
2664         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
2665         mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name);
2666         mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name);
2667
2668         /* In case of same dir local rename we must sort by the hash,
2669          * otherwise a lock deadlock is possible when renaming
2670          * a to b and b to a at the same time LU-15285
2671          */
2672         if (!mdt_object_remote(mtgtdir) && mtgtdir == msrcdir)
2673                 reverse = lh_srcdirp->mlh_pdo_hash > lh_tgtdirp->mlh_pdo_hash;
2674         if (unlikely(CFS_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK)))
2675                 reverse = 0;
2676
2677         if (reverse)
2678                 rc = mdt_lock_two_dirs(info, mtgtdir, lh_tgtdirp,
2679                                        &rr->rr_tgt_name, msrcdir, lh_srcdirp,
2680                                        &rr->rr_name);
2681         else
2682                 rc = mdt_lock_two_dirs(info, msrcdir, lh_srcdirp, &rr->rr_name,
2683                                        mtgtdir, lh_tgtdirp, &rr->rr_tgt_name);
2684
2685         if (rc != 0)
2686                 GOTO(out_unlock_rename, rc);
2687
2688         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2689         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME2, 5);
2690
2691         /* find mold object. */
2692         fid_zero(old_fid);
2693         rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2);
2694         if (rc != 0)
2695                 GOTO(out_unlock_parents, rc);
2696
2697         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
2698                 GOTO(out_unlock_parents, rc = -EINVAL);
2699
2700         if (!fid_is_md_operative(old_fid))
2701                 GOTO(out_unlock_parents, rc = -EPERM);
2702
2703         mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
2704         if (IS_ERR(mold))
2705                 GOTO(out_unlock_parents, rc = PTR_ERR(mold));
2706
2707         if (!mdt_object_exists(mold)) {
2708                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2709                                 &mold->mot_obj,
2710                                 "object does not exist");
2711                 GOTO(out_put_old, rc = -ENOENT);
2712         }
2713
2714         if (mdt_object_remote(mold) && !mdt->mdt_enable_remote_rename)
2715                 GOTO(out_put_old, rc = -EXDEV);
2716
2717         /* Check if @mtgtdir is subdir of @mold, before locking child
2718          * to avoid reverse locking.
2719          */
2720         if (mtgtdir != msrcdir) {
2721                 rc = mdo_is_subdir(info->mti_env, mdt_object_child(mtgtdir),
2722                                    old_fid);
2723                 if (rc) {
2724                         if (rc == 1)
2725                                 rc = -EINVAL;
2726                         GOTO(out_put_old, rc);
2727                 }
2728         }
2729
2730         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold));
2731         /* save version after locking */
2732         mdt_version_get_save(info, mold, 2);
2733
2734         /* find mnew object:
2735          * mnew target object may not exist now
2736          * lookup with version checking
2737          */
2738         fid_zero(new_fid);
2739         rc = mdt_lookup_version_check(info, mtgtdir, &rr->rr_tgt_name, new_fid,
2740                                       3);
2741         if (rc == 0) {
2742                 /* the new_fid should have been filled at this moment */
2743                 if (lu_fid_eq(old_fid, new_fid))
2744                         GOTO(out_put_old, rc);
2745
2746                 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
2747                     lu_fid_eq(new_fid, rr->rr_fid2))
2748                         GOTO(out_put_old, rc = -EINVAL);
2749
2750                 if (!fid_is_md_operative(new_fid))
2751                         GOTO(out_put_old, rc = -EPERM);
2752
2753                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
2754                 if (IS_ERR(mnew))
2755                         GOTO(out_put_old, rc = PTR_ERR(mnew));
2756
2757                 if (!mdt_object_exists(mnew)) {
2758                         LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2759                                         &mnew->mot_obj,
2760                                         "object does not exist");
2761                         GOTO(out_put_new, rc = -ENOENT);
2762                 }
2763
2764                 if (mdt_object_remote(mnew)) {
2765                         struct mdt_body  *repbody;
2766
2767                         /* Always send rename req to the target child MDT */
2768                         repbody = req_capsule_server_get(info->mti_pill,
2769                                                          &RMF_MDT_BODY);
2770                         LASSERT(repbody != NULL);
2771                         repbody->mbo_fid1 = *new_fid;
2772                         repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
2773                         GOTO(out_put_new, rc = -EXDEV);
2774                 }
2775                 /* Before locking the target dir, check we do not replace
2776                  * a dir with a non-dir, otherwise it may deadlock with
2777                  * link op which tries to create a link in this dir
2778                  * back to this non-dir.
2779                  */
2780                 if (S_ISDIR(lu_object_attr(&mnew->mot_obj)) &&
2781                     !S_ISDIR(lu_object_attr(&mold->mot_obj)))
2782                         GOTO(out_put_new, rc = -EISDIR);
2783
2784                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2785                 lh_lookup = &info->mti_lh[MDT_LH_LOOKUP];
2786                 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2787                                             lh_lookup,
2788                                             MDS_INODELOCK_LOOKUP |
2789                                             MDS_INODELOCK_XATTR);
2790                 if (rc < 0)
2791                         GOTO(out_put_new, rc);
2792
2793                 /* Check if @msrcdir is subdir of @mnew, before locking child
2794                  * to avoid reverse locking.
2795                  */
2796                 if (mtgtdir != msrcdir) {
2797                         rc = mdo_is_subdir(info->mti_env,
2798                                            mdt_object_child(msrcdir), new_fid);
2799                         if (rc) {
2800                                 if (rc == 1)
2801                                         rc = -EINVAL;
2802                                 GOTO(out_unlock_old, rc);
2803                         }
2804                 }
2805
2806                 /* We used to acquire MDS_INODELOCK_FULL here but we
2807                  * can't do this now because a running HSM restore on
2808                  * the rename onto victim will hold the layout
2809                  * lock. See LU-4002.
2810                  */
2811
2812                 lh_newp = &info->mti_lh[MDT_LH_NEW];
2813                 rc = mdt_object_check_lock(info, mtgtdir, mnew, lh_newp,
2814                                            MDS_INODELOCK_LOOKUP |
2815                                            MDS_INODELOCK_UPDATE, LCK_EX);
2816                 if (rc != 0)
2817                         GOTO(out_unlock_new, rc);
2818
2819                 /* get and save version after locking */
2820                 mdt_version_get_save(info, mnew, 3);
2821         } else if (rc != -ENOENT) {
2822                 GOTO(out_put_old, rc);
2823         } else {
2824                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2825                 lh_lookup = &info->mti_lh[MDT_LH_LOOKUP];
2826                 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2827                                             lh_lookup,
2828                                             MDS_INODELOCK_LOOKUP |
2829                                             MDS_INODELOCK_XATTR);
2830                 if (rc != 0)
2831                         GOTO(out_put_old, rc);
2832
2833                 mdt_enoent_version_save(info, 3);
2834         }
2835
2836         /* step 5: rename it */
2837         mdt_reint_init_ma(info, ma);
2838
2839         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
2840                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
2841
2842         if (mnew != NULL)
2843                 mutex_lock(&mnew->mot_lov_mutex);
2844
2845         rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
2846                         mdt_object_child(mtgtdir), old_fid, &rr->rr_name,
2847                         mnew != NULL ? mdt_object_child(mnew) : NULL,
2848                         &rr->rr_tgt_name, ma);
2849
2850         if (mnew != NULL)
2851                 mutex_unlock(&mnew->mot_lov_mutex);
2852
2853         /* handle last link of tgt object */
2854         if (rc == 0) {
2855                 if (mnew) {
2856                         mdt_handle_last_unlink(info, mnew, ma);
2857                         discard = mdt_dom_check_for_discard(info, mnew);
2858                 }
2859                 mdt_rename_counter_tally(info, info->mti_mdt, req,
2860                                          msrcdir, mtgtdir, msi,
2861                                          ktime_us_delta(ktime_get(), kstart));
2862         }
2863
2864         EXIT;
2865 out_unlock_new:
2866         if (mnew != NULL)
2867                 /* mnew is gone, no need to keep lock */
2868                 mdt_object_unlock(info, mnew, lh_newp, 1);
2869 out_unlock_old:
2870         mdt_object_unlock(info, NULL, lh_lookup, rc);
2871         mdt_object_unlock(info, mold, lh_oldp, rc);
2872 out_put_new:
2873         if (mnew && !discard)
2874                 mdt_object_put(info->mti_env, mnew);
2875 out_put_old:
2876         mdt_object_put(info->mti_env, mold);
2877 out_unlock_parents:
2878         mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
2879         mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
2880 out_unlock_rename:
2881         mdt_rename_unlock(info, rename_lh);
2882 out_put_tgtdir:
2883         mdt_object_put(info->mti_env, mtgtdir);
2884 out_put_srcdir:
2885         mdt_object_put(info->mti_env, msrcdir);
2886
2887         /* The DoM discard can be done right in the place above where it is
2888          * assigned, meanwhile it is done here after rename unlock due to
2889          * compatibility with old clients, for them the discard blocks
2890          * the main thread until completion. Check LU-11359 for details.
2891          */
2892         if (discard) {
2893                 mdt_dom_discard_data(info, mnew);
2894                 mdt_object_put(info->mti_env, mnew);
2895         }
2896         CFS_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
2897         return rc;
2898 }
2899
2900 static int mdt_reint_resync(struct mdt_thread_info *info,
2901                             struct mdt_lock_handle *lhc)
2902 {
2903         struct mdt_reint_record *rr = &info->mti_rr;
2904         struct ptlrpc_request *req = mdt_info_req(info);
2905         struct md_attr *ma = &info->mti_attr;
2906         struct mdt_object *mo;
2907         struct ldlm_lock *lease;
2908         struct mdt_body *repbody;
2909         struct md_layout_change layout = { .mlc_mirror_id = rr->rr_mirror_id };
2910         bool lease_broken;
2911         int rc;
2912
2913         ENTRY;
2914         DEBUG_REQ(D_INODE, req, DFID", FLR file resync", PFID(rr->rr_fid1));
2915
2916         if (info->mti_dlm_req)
2917                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2918
2919         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
2920         if (IS_ERR(mo))
2921                 GOTO(out, rc = PTR_ERR(mo));
2922
2923         if (!mdt_object_exists(mo))
2924                 GOTO(out_obj, rc = -ENOENT);
2925
2926         if (!S_ISREG(lu_object_attr(&mo->mot_obj)))
2927                 GOTO(out_obj, rc = -EINVAL);
2928
2929         if (mdt_object_remote(mo))
2930                 GOTO(out_obj, rc = -EREMOTE);
2931
2932         lease = ldlm_handle2lock(rr->rr_lease_handle);
2933         if (lease == NULL)
2934                 GOTO(out_obj, rc = -ESTALE);
2935
2936         /* It's really necessary to grab open_sem and check if the lease lock
2937          * has been lost. There would exist a concurrent writer coming in and
2938          * generating some dirty data in memory cache, the writeback would fail
2939          * after the layout version is increased by MDS_REINT_RESYNC RPC.
2940          */
2941         if (!down_write_trylock(&mo->mot_open_sem))
2942                 GOTO(out_put_lease, rc = -EBUSY);
2943
2944         lock_res_and_lock(lease);
2945         lease_broken = ldlm_is_cancel(lease);
2946         unlock_res_and_lock(lease);
2947         if (lease_broken)
2948                 GOTO(out_unlock, rc = -EBUSY);
2949
2950         /* the file has yet opened by anyone else after we took the lease. */
2951         layout.mlc_opc = MD_LAYOUT_RESYNC;
2952         lhc = &info->mti_lh[MDT_LH_LOCAL];
2953         rc = mdt_layout_change(info, mo, lhc, &layout);
2954         if (rc)
2955                 GOTO(out_unlock, rc);
2956
2957         mdt_object_unlock(info, mo, lhc, 0);
2958
2959         ma->ma_need = MA_INODE;
2960         ma->ma_valid = 0;
2961         rc = mdt_attr_get_complex(info, mo, ma);
2962         if (rc != 0)
2963                 GOTO(out_unlock, rc);
2964
2965         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2966         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
2967
2968         EXIT;
2969 out_unlock:
2970         up_write(&mo->mot_open_sem);
2971 out_put_lease:
2972         LDLM_LOCK_PUT(lease);
2973 out_obj:
2974         mdt_object_put(info->mti_env, mo);
2975 out:
2976         mdt_client_compatibility(info);
2977         return rc;
2978 }
2979
2980 struct mdt_reinter {
2981         int (*mr_handler)(struct mdt_thread_info *, struct mdt_lock_handle *);
2982         enum lprocfs_extra_opc mr_extra_opc;
2983 };
2984
2985 static const struct mdt_reinter mdt_reinters[] = {
2986         [REINT_SETATTR] = {
2987                 .mr_handler = &mdt_reint_setattr,
2988                 .mr_extra_opc = MDS_REINT_SETATTR,
2989         },
2990         [REINT_CREATE] = {
2991                 .mr_handler = &mdt_reint_create,
2992                 .mr_extra_opc = MDS_REINT_CREATE,
2993         },
2994         [REINT_LINK] = {
2995                 .mr_handler = &mdt_reint_link,
2996                 .mr_extra_opc = MDS_REINT_LINK,
2997         },
2998         [REINT_UNLINK] = {
2999                 .mr_handler = &mdt_reint_unlink,
3000                 .mr_extra_opc = MDS_REINT_UNLINK,
3001         },
3002         [REINT_RENAME] = {
3003                 .mr_handler = &mdt_reint_rename,
3004                 .mr_extra_opc = MDS_REINT_RENAME,
3005         },
3006         [REINT_OPEN] = {
3007                 .mr_handler = &mdt_reint_open,
3008                 .mr_extra_opc = MDS_REINT_OPEN,
3009         },
3010         [REINT_SETXATTR] = {
3011                 .mr_handler = &mdt_reint_setxattr,
3012                 .mr_extra_opc = MDS_REINT_SETXATTR,
3013         },
3014         [REINT_RMENTRY] = {
3015                 .mr_handler = &mdt_reint_unlink,
3016                 .mr_extra_opc = MDS_REINT_UNLINK,
3017         },
3018         [REINT_MIGRATE] = {
3019                 .mr_handler = &mdt_reint_migrate,
3020                 .mr_extra_opc = MDS_REINT_RENAME,
3021         },
3022         [REINT_RESYNC] = {
3023                 .mr_handler = &mdt_reint_resync,
3024                 .mr_extra_opc = MDS_REINT_RESYNC,
3025         },
3026 };
3027
3028 int mdt_reint_rec(struct mdt_thread_info *info,
3029                   struct mdt_lock_handle *lhc)
3030 {
3031         const struct mdt_reinter *mr;
3032         int rc;
3033
3034         ENTRY;
3035         if (!(info->mti_rr.rr_opcode < ARRAY_SIZE(mdt_reinters)))
3036                 RETURN(-EPROTO);
3037
3038         mr = &mdt_reinters[info->mti_rr.rr_opcode];
3039         if (mr->mr_handler == NULL)
3040                 RETURN(-EPROTO);
3041
3042         rc = (*mr->mr_handler)(info, lhc);
3043
3044         lprocfs_counter_incr(ptlrpc_req2svc(mdt_info_req(info))->srv_stats,
3045                              PTLRPC_LAST_CNTR + mr->mr_extra_opc);
3046
3047         RETURN(rc);
3048 }