Whamcloud - gitweb
LU-17744 ldiskfs: mballoc stats fixes
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mdt/mdt_reint.c
32  *
33  * Lustre Metadata Target (mdt) reintegration routines
34  *
35  * Author: Peter Braam <braam@clusterfs.com>
36  * Author: Andreas Dilger <adilger@clusterfs.com>
37  * Author: Phil Schwan <phil@clusterfs.com>
38  * Author: Huang Hua <huanghua@clusterfs.com>
39  * Author: Yury Umanets <umka@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_MDS
43
44 #include <lprocfs_status.h>
45 #include "mdt_internal.h"
46 #include <lustre_lmv.h>
47 #include <lustre_crypto.h>
48
49 static inline void mdt_reint_init_ma(struct mdt_thread_info *info,
50                                      struct md_attr *ma)
51 {
52         ma->ma_need = MA_INODE;
53         ma->ma_valid = 0;
54 }
55
56 /**
57  * Get version of object by fid.
58  *
59  * Return real version or ENOENT_VERSION if object doesn't exist
60  */
61 static void mdt_obj_version_get(struct mdt_thread_info *info,
62                                 struct mdt_object *o, __u64 *version)
63 {
64         LASSERT(o);
65
66         if (mdt_object_exists(o) && !mdt_object_remote(o) &&
67             !fid_is_obf(mdt_object_fid(o)))
68                 *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
69         else
70                 *version = ENOENT_VERSION;
71         CDEBUG(D_INODE, "FID "DFID" version is %#llx\n",
72                PFID(mdt_object_fid(o)), *version);
73 }
74
75 /**
76  * Check version is correct.
77  *
78  * Should be called only during replay.
79  */
80 static int mdt_version_check(struct ptlrpc_request *req,
81                              __u64 version, int idx)
82 {
83         __u64 *pre_ver = lustre_msg_get_versions(req->rq_reqmsg);
84
85         ENTRY;
86         if (!exp_connect_vbr(req->rq_export))
87                 RETURN(0);
88
89         LASSERT(req_is_replay(req));
90         /** VBR: version is checked always because costs nothing */
91         LASSERT(idx < PTLRPC_NUM_VERSIONS);
92         /** Sanity check for malformed buffers */
93         if (pre_ver == NULL) {
94                 CERROR("No versions in request buffer\n");
95                 spin_lock(&req->rq_export->exp_lock);
96                 req->rq_export->exp_vbr_failed = 1;
97                 spin_unlock(&req->rq_export->exp_lock);
98                 RETURN(-EOVERFLOW);
99         } else if (pre_ver[idx] != version) {
100                 CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n",
101                        pre_ver[idx], version);
102                 spin_lock(&req->rq_export->exp_lock);
103                 req->rq_export->exp_vbr_failed = 1;
104                 spin_unlock(&req->rq_export->exp_lock);
105                 RETURN(-EOVERFLOW);
106         }
107         RETURN(0);
108 }
109
110 /**
111  * Save pre-versions in reply.
112  */
113 static void mdt_version_save(struct ptlrpc_request *req, __u64 version,
114                              int idx)
115 {
116         __u64 *reply_ver;
117
118         if (!exp_connect_vbr(req->rq_export))
119                 return;
120
121         LASSERT(!req_is_replay(req));
122         LASSERT(req->rq_repmsg != NULL);
123         reply_ver = lustre_msg_get_versions(req->rq_repmsg);
124         if (reply_ver)
125                 reply_ver[idx] = version;
126 }
127
128 /**
129  * Save enoent version, it is needed when it is obvious that object doesn't
130  * exist, e.g. child during create.
131  */
132 static void mdt_enoent_version_save(struct mdt_thread_info *info, int idx)
133 {
134         /* save version of file name for replay, it must be ENOENT here */
135         if (!req_is_replay(mdt_info_req(info))) {
136                 info->mti_ver[idx] = ENOENT_VERSION;
137                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
138         }
139 }
140
141 /**
142  * Get version from disk and save in reply buffer.
143  *
144  * Versions are saved in reply only during normal operations not replays.
145  */
146 void mdt_version_get_save(struct mdt_thread_info *info,
147                           struct mdt_object *mto, int idx)
148 {
149         /* don't save versions during replay */
150         if (!req_is_replay(mdt_info_req(info))) {
151                 mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
152                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
153         }
154 }
155
156 /**
157  * Get version from disk and check it, no save in reply.
158  */
159 int mdt_version_get_check(struct mdt_thread_info *info,
160                           struct mdt_object *mto, int idx)
161 {
162         /* only check versions during replay */
163         if (!req_is_replay(mdt_info_req(info)))
164                 return 0;
165
166         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
167         return mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
168 }
169
170 /**
171  * Get version from disk and check if recovery or just save.
172  */
173 int mdt_version_get_check_save(struct mdt_thread_info *info,
174                                struct mdt_object *mto, int idx)
175 {
176         int rc = 0;
177
178         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
179         if (req_is_replay(mdt_info_req(info)))
180                 rc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx],
181                                        idx);
182         else
183                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
184         return rc;
185 }
186
187 /**
188  * Lookup with version checking.
189  *
190  * This checks version of 'name'. Many reint functions uses 'name' for child not
191  * FID, therefore we need to get object by name and check its version.
192  */
193 int mdt_lookup_version_check(struct mdt_thread_info *info,
194                              struct mdt_object *p,
195                              const struct lu_name *lname,
196                              struct lu_fid *fid, int idx)
197 {
198         int rc, vbrc;
199
200         rc = mdo_lookup(info->mti_env, mdt_object_child(p), lname, fid,
201                         &info->mti_spec);
202         /* Check version only during replay */
203         if (!req_is_replay(mdt_info_req(info)))
204                 return rc;
205
206         info->mti_ver[idx] = ENOENT_VERSION;
207         if (rc == 0) {
208                 struct mdt_object *child;
209
210                 child = mdt_object_find(info->mti_env, info->mti_mdt, fid);
211                 if (likely(!IS_ERR(child))) {
212                         mdt_obj_version_get(info, child, &info->mti_ver[idx]);
213                         mdt_object_put(info->mti_env, child);
214                 }
215         }
216         vbrc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
217         return vbrc ? vbrc : rc;
218
219 }
220
221 static int mdt_stripes_unlock(struct mdt_thread_info *mti,
222                               struct mdt_object *obj,
223                               struct ldlm_enqueue_info *einfo,
224                               int decref)
225 {
226         union ldlm_policy_data *policy = &mti->mti_policy;
227         struct mdt_lock_handle *lh = &mti->mti_lh[MDT_LH_LOCAL];
228         struct lustre_handle_array *locks = einfo->ei_cbdata;
229         int i;
230
231         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
232         LASSERT(locks);
233
234         memset(policy, 0, sizeof(*policy));
235         policy->l_inodebits.bits = einfo->ei_inodebits;
236         mdt_lock_reg_init(lh, einfo->ei_mode);
237         for (i = 0; i < locks->ha_count; i++) {
238                 if (test_bit(i, (void *)locks->ha_map))
239                         lh->mlh_rreg_lh = locks->ha_handles[i];
240                 else
241                         lh->mlh_reg_lh = locks->ha_handles[i];
242                 mdt_object_unlock(mti, NULL, lh, decref);
243                 locks->ha_handles[i].cookie = 0ull;
244         }
245
246         return mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
247                                 policy);
248 }
249
250 /**
251  * Lock slave stripes if necessary, the lock handles of slave stripes
252  * will be stored in einfo->ei_cbdata.
253  **/
254 static int mdt_stripes_lock(struct mdt_thread_info *mti, struct mdt_object *obj,
255                             enum ldlm_mode mode, __u64 ibits,
256                             struct ldlm_enqueue_info *einfo)
257 {
258         union ldlm_policy_data *policy = &mti->mti_policy;
259
260         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
261         einfo->ei_type = LDLM_IBITS;
262         einfo->ei_mode = mode;
263         einfo->ei_cb_bl = mdt_remote_blocking_ast;
264         einfo->ei_cb_local_bl = mdt_blocking_ast;
265         einfo->ei_cb_cp = ldlm_completion_ast;
266         einfo->ei_enq_slave = 1;
267         einfo->ei_namespace = mti->mti_mdt->mdt_namespace;
268         einfo->ei_inodebits = ibits;
269         einfo->ei_req_slot = 1;
270         memset(policy, 0, sizeof(*policy));
271         policy->l_inodebits.bits = ibits;
272         policy->l_inodebits.li_initiator_id = mdt_node_id(mti->mti_mdt);
273
274         return mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
275                               policy);
276 }
277
278 /** lock object, and stripes if it's a striped directory
279  *
280  * object should be local, this is called in operations which modify both object
281  * and stripes.
282  *
283  * \param info          struct mdt_thread_info
284  * \param parent        parent object, if it's NULL, find parent by mdo_lookup()
285  * \param child         child object
286  * \param lh            lock handle
287  * \param einfo         struct ldlm_enqueue_info
288  * \param ibits         MDS inode lock bits
289  * \param mode          lock mode
290  *
291  * \retval              0 on success, -ev on error.
292  */
293 int mdt_object_stripes_lock(struct mdt_thread_info *info,
294                             struct mdt_object *parent,
295                             struct mdt_object *child,
296                             struct mdt_lock_handle *lh,
297                             struct ldlm_enqueue_info *einfo, __u64 ibits,
298                             enum ldlm_mode mode)
299 {
300         int rc;
301
302         ENTRY;
303         /* according to the protocol, child should be local, is request sent to
304          * wrong MDT?
305          */
306         if (mdt_object_remote(child)) {
307                 CERROR("%s: lock target "DFID", but it is on other MDT: rc = %d\n",
308                        mdt_obd_name(info->mti_mdt), PFID(mdt_object_fid(child)),
309                        -EREMOTE);
310                 RETURN(-EREMOTE);
311         }
312
313         memset(einfo, 0, sizeof(*einfo));
314         if (ibits & MDS_INODELOCK_LOOKUP) {
315                 LASSERT(parent);
316                 rc = mdt_object_check_lock(info, parent, child, lh, ibits,
317                                            mode);
318         } else {
319                 rc = mdt_object_lock(info, child, lh, ibits, mode);
320         }
321         if (rc)
322                 RETURN(rc);
323
324         if (!S_ISDIR(child->mot_header.loh_attr))
325                 RETURN(0);
326
327         /* lock stripes for striped directory */
328         rc = mdt_stripes_lock(info, child, lh->mlh_reg_mode, ibits, einfo);
329         if (rc == -EIO && CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME))
330                 rc = 0;
331         if (rc)
332                 mdt_object_unlock(info, child, lh, rc);
333
334         RETURN(rc);
335 }
336
337 void mdt_object_stripes_unlock(struct mdt_thread_info *info,
338                               struct mdt_object *obj,
339                               struct mdt_lock_handle *lh,
340                               struct ldlm_enqueue_info *einfo, int decref)
341 {
342         if (einfo->ei_cbdata)
343                 mdt_stripes_unlock(info, obj, einfo, decref);
344         mdt_object_unlock(info, obj, lh, decref);
345 }
346
347 static int mdt_restripe(struct mdt_thread_info *info,
348                         struct mdt_object *parent,
349                         const struct lu_name *lname,
350                         const struct lu_fid *tfid,
351                         struct md_op_spec *spec,
352                         struct md_attr *ma)
353 {
354         struct mdt_device *mdt = info->mti_mdt;
355         struct lu_fid *fid = &info->mti_tmp_fid2;
356         struct ldlm_enqueue_info *einfo = &info->mti_einfo;
357         struct lmv_user_md *lum = spec->u.sp_ea.eadata;
358         struct lu_ucred *uc = mdt_ucred(info);
359         struct lmv_mds_md_v1 *lmv;
360         struct mdt_object *child;
361         struct mdt_lock_handle *lhp;
362         struct mdt_lock_handle *lhc;
363         struct mdt_body *repbody;
364         int rc;
365
366         ENTRY;
367
368         /* we want rbac roles to have precedence over any other
369          * permission or capability checks
370          */
371         if (!mdt->mdt_enable_dir_restripe && !uc->uc_rbac_dne_ops)
372                 RETURN(-EPERM);
373
374         LASSERT(lum);
375         lum->lum_hash_type |= cpu_to_le32(LMV_HASH_FLAG_FIXED);
376
377         rc = mdt_version_get_check_save(info, parent, 0);
378         if (rc)
379                 RETURN(rc);
380
381         lhp = &info->mti_lh[MDT_LH_PARENT];
382         rc = mdt_parent_lock(info, parent, lhp, lname, LCK_PW);
383         if (rc)
384                 RETURN(rc);
385
386         rc = mdt_stripe_get(info, parent, ma, XATTR_NAME_LMV);
387         if (rc)
388                 GOTO(unlock_parent, rc);
389
390         if (ma->ma_valid & MA_LMV) {
391                 /* don't allow restripe if parent dir layout is changing */
392                 lmv = &ma->ma_lmv->lmv_md_v1;
393                 if (!lmv_is_sane2(lmv))
394                         GOTO(unlock_parent, rc = -EBADF);
395
396                 if (lmv_is_layout_changing(lmv))
397                         GOTO(unlock_parent, rc = -EBUSY);
398         }
399
400         fid_zero(fid);
401         rc = mdt_lookup_version_check(info, parent, lname, fid, 1);
402         if (rc)
403                 GOTO(unlock_parent, rc);
404
405         child = mdt_object_find(info->mti_env, mdt, fid);
406         if (IS_ERR(child))
407                 GOTO(unlock_parent, rc = PTR_ERR(child));
408
409         if (!mdt_object_exists(child))
410                 GOTO(out_child, rc = -ENOENT);
411
412         if (mdt_object_remote(child)) {
413                 struct mdt_body *repbody;
414
415                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
416                 if (!repbody)
417                         GOTO(out_child, rc = -EPROTO);
418
419                 repbody->mbo_fid1 = *fid;
420                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
421                 GOTO(out_child, rc = -EREMOTE);
422         }
423
424         if (!S_ISDIR(lu_object_attr(&child->mot_obj)))
425                 GOTO(out_child, rc = -ENOTDIR);
426
427         rc = mdt_stripe_get(info, child, ma, XATTR_NAME_LMV);
428         if (rc)
429                 GOTO(out_child, rc);
430
431         /* race with migrate? */
432         if ((ma->ma_valid & MA_LMV) &&
433              lmv_is_migrating(&ma->ma_lmv->lmv_md_v1))
434                 GOTO(out_child, rc = -EBUSY);
435
436         /* lock object */
437         lhc = &info->mti_lh[MDT_LH_CHILD];
438         rc = mdt_object_stripes_lock(info, parent, child, lhc, einfo,
439                                      MDS_INODELOCK_FULL, LCK_PW);
440         if (rc)
441                 GOTO(unlock_child, rc);
442
443         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
444         rc = mdt_version_get_check_save(info, child, 1);
445         if (rc)
446                 GOTO(unlock_child, rc);
447
448         spin_lock(&mdt->mdt_restriper.mdr_lock);
449         if (child->mot_restriping) {
450                 /* race? */
451                 spin_unlock(&mdt->mdt_restriper.mdr_lock);
452                 GOTO(unlock_child, rc = -EBUSY);
453         }
454         child->mot_restriping = 1;
455         spin_unlock(&mdt->mdt_restriper.mdr_lock);
456
457         *fid = *tfid;
458         rc = mdt_restripe_internal(info, parent, child, lname, fid, spec, ma);
459         if (rc)
460                 GOTO(restriping_clear, rc);
461
462         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
463         if (!repbody)
464                 GOTO(restriping_clear, rc = -EPROTO);
465
466         mdt_pack_attr2body(info, repbody, &ma->ma_attr, fid);
467         EXIT;
468
469 restriping_clear:
470         child->mot_restriping = 0;
471 unlock_child:
472         mdt_object_stripes_unlock(info, child, lhc, einfo, rc);
473 out_child:
474         mdt_object_put(info->mti_env, child);
475 unlock_parent:
476         mdt_object_unlock(info, parent, lhp, rc);
477
478         return rc;
479 }
480
481 /*
482  * VBR: we save three versions in reply:
483  * 0 - parent. Check that parent version is the same during replay.
484  * 1 - name. Version of 'name' if file exists with the same name or
485  * ENOENT_VERSION, it is needed because file may appear due to missed replays.
486  * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
487  * check.
488  */
489 static int mdt_create(struct mdt_thread_info *info)
490 {
491         struct mdt_device *mdt = info->mti_mdt;
492         struct mdt_object *parent;
493         struct mdt_object *child;
494         struct mdt_lock_handle *lh;
495         struct mdt_body *repbody;
496         struct md_attr *ma = &info->mti_attr;
497         struct mdt_reint_record *rr = &info->mti_rr;
498         struct md_op_spec *spec = &info->mti_spec;
499         struct lu_ucred *uc = mdt_ucred(info);
500         bool restripe = false;
501         bool recreate_obj = false;
502         int rc;
503
504         ENTRY;
505         DEBUG_REQ(D_INODE, mdt_info_req(info),
506                   "Create ("DNAME"->"DFID") in "DFID,
507                   PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1));
508
509         if (!fid_is_md_operative(rr->rr_fid1))
510                 RETURN(-EPERM);
511
512         /* MDS_OPEN_DEFAULT_LMV means eadata is parent default LMV, which is set
513          * if client maintains inherited default LMV
514          */
515         if (S_ISDIR(ma->ma_attr.la_mode) &&
516             spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0 &&
517             !(spec->sp_cr_flags & MDS_OPEN_DEFAULT_LMV)) {
518                 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
519                 struct obd_export *exp = mdt_info_req(info)->rq_export;
520
521                 /* Only new clients can create remote dir( >= 2.4) and
522                  * striped dir(>= 2.6), old client will return -ENOTSUPP
523                  */
524                 if (!mdt_is_dne_client(exp))
525                         RETURN(-ENOTSUPP);
526
527                 if (le32_to_cpu(lum->lum_stripe_count) > 1) {
528                         if (!mdt_is_striped_client(exp))
529                                 RETURN(-ENOTSUPP);
530
531                         if (!mdt->mdt_enable_striped_dir)
532                                 RETURN(-EPERM);
533                 } else if (!mdt->mdt_enable_remote_dir) {
534                         RETURN(-EPERM);
535                 }
536
537                 if ((!(exp_connect_flags2(exp) & OBD_CONNECT2_CRUSH)) &&
538                     (le32_to_cpu(lum->lum_hash_type) & LMV_HASH_TYPE_MASK) >=
539                     LMV_HASH_TYPE_CRUSH)
540                         RETURN(-EPROTO);
541
542                 /* we want rbac roles to have precedence over any other
543                  * permission or capability checks
544                  */
545                 if (!uc->uc_rbac_dne_ops ||
546                     (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
547                      uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
548                      mdt->mdt_enable_remote_dir_gid != -1))
549                         RETURN(-EPERM);
550
551                 /* restripe if later found dir exists, MDS_OPEN_CREAT means
552                  * this is create only, don't try restripe.
553                  */
554                 if (mdt->mdt_enable_dir_restripe &&
555                     le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT &&
556                     !(spec->sp_cr_flags & MDS_OPEN_CREAT))
557                         restripe = true;
558         }
559
560         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
561
562         parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
563         if (IS_ERR(parent))
564                 RETURN(PTR_ERR(parent));
565
566         if (!mdt_object_exists(parent))
567                 GOTO(put_parent, rc = -ENOENT);
568
569         rc = mdt_check_enc(info, parent);
570         if (rc)
571                 GOTO(put_parent, rc);
572
573         if (!uc->uc_rbac_fscrypt_admin &&
574             parent->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD)
575                 GOTO(put_parent, rc = -EPERM);
576
577         info->mti_spec.sp_replay = req_is_replay(mdt_info_req(info));
578
579         /*
580          * LU-10235: check if name exists locklessly first to avoid massive
581          * lock recalls on existing directories.
582          */
583         rc = mdo_lookup(info->mti_env, mdt_object_child(parent), &rr->rr_name,
584                         &info->mti_tmp_fid1, &info->mti_spec);
585         if (rc == 0) {
586                 /* mkdir may be partially executed: name entry was successfully
587                  * inserted into parent diretory on remote MDT, while target not
588                  * created on local MDT. This happens when update log recovery
589                  * is aborted, and mkdir is replayed by client request.
590                  */
591                 if (unlikely(!(info->mti_spec.sp_replay &&
592                                mdt_object_remote(parent)) &&
593                              !restripe))
594                         GOTO(put_parent, rc = -EEXIST);
595
596                 child = mdt_object_find(info->mti_env, info->mti_mdt,
597                                         &info->mti_tmp_fid1);
598                 if (unlikely(IS_ERR(child)))
599                         GOTO(put_parent, rc = PTR_ERR(child));
600
601                 if (mdt_object_exists(child)) {
602                         mdt_object_put(info->mti_env, child);
603                         rc = -EEXIST;
604                         if (restripe)
605                                 rc = mdt_restripe(info, parent, &rr->rr_name,
606                                                   rr->rr_fid2, spec, ma);
607                         GOTO(put_parent, rc);
608                 }
609                 mdt_object_put(info->mti_env, child);
610                 recreate_obj = true;
611         } else if (rc != -ENOENT) {
612                 GOTO(put_parent, rc);
613         }
614
615         if (unlikely(info->mti_spec.sp_replay)) {
616                 /* check version only during replay */
617                 rc = mdt_version_check(mdt_info_req(info), ENOENT_VERSION, 1);
618                 if (rc)
619                         GOTO(put_parent, rc);
620         } else {
621                 CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_PAUSE_CREATE_AFTER_LOOKUP,
622                                  cfs_fail_val);
623
624                 /* save version of file name for replay, must be ENOENT here */
625                 mdt_enoent_version_save(info, 1);
626         }
627
628         CFS_RACE(OBD_FAIL_MDS_CREATE_RACE);
629
630         lh = &info->mti_lh[MDT_LH_PARENT];
631         rc = mdt_parent_lock(info, parent, lh, &rr->rr_name, LCK_PW);
632         if (rc)
633                 GOTO(put_parent, rc);
634
635         if (!mdt_object_remote(parent)) {
636                 rc = mdt_version_get_check_save(info, parent, 0);
637                 if (rc)
638                         GOTO(unlock_parent, rc);
639         }
640
641         /*
642          * now repeat the lookup having a LDLM lock on the parent dir,
643          * as another thread could create the same name. notice this
644          * lookup is supposed to hit cache in OSD and be cheap if the
645          * directory is not being modified concurrently.
646          */
647         rc = mdo_lookup(info->mti_env, mdt_object_child(parent), &rr->rr_name,
648                         &info->mti_tmp_fid1, &info->mti_spec);
649         if (unlikely(rc == 0 && !recreate_obj))
650                 GOTO(unlock_parent, rc = -EEXIST);
651
652         child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
653         if (unlikely(IS_ERR(child)))
654                 GOTO(unlock_parent, rc = PTR_ERR(child));
655
656         ma->ma_need = MA_INODE;
657         ma->ma_valid = 0;
658
659         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
660                         OBD_FAIL_MDS_REINT_CREATE_WRITE);
661
662         /* Version of child will be updated on disk. */
663         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
664         rc = mdt_version_get_check_save(info, child, 2);
665         if (rc)
666                 GOTO(put_child, rc);
667
668         if (parent->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD ||
669             (rr->rr_name.ln_namelen == strlen(dot_fscrypt_name) &&
670              strncmp(rr->rr_name.ln_name, dot_fscrypt_name,
671                      rr->rr_name.ln_namelen) == 0))
672                 child->mot_obj.lo_header->loh_attr |= LOHA_FSCRYPT_MD;
673
674         /*
675          * Do not perform lookup sanity check. We know that name does
676          * not exist.
677          */
678         info->mti_spec.sp_cr_lookup = 0;
679         if (mdt_object_remote(parent))
680                 info->mti_spec.sp_cr_lookup = 1;
681         info->mti_spec.sp_feat = &dt_directory_features;
682
683         /* set jobid xattr name from sysfs parameter */
684         strncpy(info->mti_spec.sp_cr_job_xattr, mdt->mdt_job_xattr,
685                 XATTR_JOB_MAX_LEN);
686
687         rc = mdo_create(info->mti_env, mdt_object_child(parent), &rr->rr_name,
688                         mdt_object_child(child), &info->mti_spec, ma);
689         if (rc < 0)
690                 GOTO(put_child, rc);
691
692         if (S_ISDIR(ma->ma_attr.la_mode) &&
693             (info->mti_spec.sp_cr_flags & MDS_MKDIR_LMV))
694                 mdt_prep_ma_buf_from_rep(info, child, ma, 0);
695
696         rc = mdt_attr_get_complex(info, child, ma);
697         if (rc < 0)
698                 GOTO(put_child, rc);
699
700         if (ma->ma_valid & MA_LMV) {
701                 mdt_dump_lmv(D_INFO, ma->ma_lmv);
702                 repbody->mbo_eadatasize = ma->ma_lmv_size;
703                 repbody->mbo_valid |= (OBD_MD_FLDIREA|OBD_MD_MEA);
704         }
705
706         /* save child locks to eliminate dependey between 'mkdir a' and
707          * 'mkdir a/b' if b is a remote directory
708          */
709         if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
710                 struct mdt_lock_handle *lhc;
711                 struct ldlm_enqueue_info *einfo = &info->mti_einfo;
712
713                 lhc = &info->mti_lh[MDT_LH_CHILD];
714                 rc = mdt_object_stripes_lock(info, parent, child, lhc, einfo,
715                                              MDS_INODELOCK_UPDATE, LCK_PW);
716                 if (rc)
717                         GOTO(put_child, rc);
718
719                 mdt_object_stripes_unlock(info, child, lhc, einfo, rc);
720         }
721
722         /* Return fid & attr to client. */
723         if (ma->ma_valid & MA_INODE)
724                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
725                                    mdt_object_fid(child));
726         EXIT;
727 put_child:
728         mdt_object_put(info->mti_env, child);
729 unlock_parent:
730         mdt_object_unlock(info, parent, lh, rc);
731 put_parent:
732         mdt_object_put(info->mti_env, parent);
733         return rc;
734 }
735
736 static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
737                         struct md_attr *ma)
738 {
739         struct mdt_lock_handle  *lh;
740         int do_vbr = ma->ma_attr.la_valid &
741                         (LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS);
742         __u64 lockpart = MDS_INODELOCK_UPDATE;
743         struct ldlm_enqueue_info *einfo = &info->mti_einfo;
744         int rc;
745
746         ENTRY;
747         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
748                 lockpart |= MDS_INODELOCK_PERM;
749         /* Clear xattr cache on clients, so the virtual project ID xattr
750          * can get the new project ID
751          */
752         if (ma->ma_attr.la_valid & LA_PROJID)
753                 lockpart |= MDS_INODELOCK_XATTR;
754
755         lh = &info->mti_lh[MDT_LH_PARENT];
756         rc = mdt_object_stripes_lock(info, NULL, mo, lh, einfo, lockpart,
757                                      LCK_PW);
758         if (rc != 0)
759                 RETURN(rc);
760
761         /* all attrs are packed into mti_attr in unpack_setattr */
762         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
763                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
764
765         /* VBR: update version if attr changed are important for recovery */
766         if (do_vbr) {
767                 /* update on-disk version of changed object */
768                 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mo));
769                 rc = mdt_version_get_check_save(info, mo, 0);
770                 if (rc)
771                         GOTO(out_unlock, rc);
772         }
773
774         /* Ensure constant striping during chown(). See LU-2789. */
775         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
776                 mutex_lock(&mo->mot_lov_mutex);
777
778         /* all attrs are packed into mti_attr in unpack_setattr */
779         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
780
781         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
782                 mutex_unlock(&mo->mot_lov_mutex);
783
784         if (rc != 0)
785                 GOTO(out_unlock, rc);
786         mdt_dom_obj_lvb_update(info->mti_env, mo, NULL, false);
787         EXIT;
788 out_unlock:
789         mdt_object_stripes_unlock(info, mo, lh, einfo, rc);
790         return rc;
791 }
792
793 /**
794  * Check HSM flags and add HS_DIRTY flag if relevant.
795  *
796  * A file could be set dirty only if it has a copy in the backend (HS_EXISTS)
797  * and is not RELEASED.
798  */
799 int mdt_add_dirty_flag(struct mdt_thread_info *info, struct mdt_object *mo,
800                         struct md_attr *ma)
801 {
802         struct lu_ucred *uc = mdt_ucred(info);
803         kernel_cap_t cap_saved;
804         int rc;
805
806         ENTRY;
807         /* If the file was modified, add the dirty flag */
808         ma->ma_need = MA_HSM;
809         rc = mdt_attr_get_complex(info, mo, ma);
810         if (rc) {
811                 CERROR("file attribute read error for "DFID": %d.\n",
812                         PFID(mdt_object_fid(mo)), rc);
813                 RETURN(rc);
814         }
815
816         /* If an up2date copy exists in the backend, add dirty flag */
817         if ((ma->ma_valid & MA_HSM) && (ma->ma_hsm.mh_flags & HS_EXISTS)
818             && !(ma->ma_hsm.mh_flags & (HS_DIRTY|HS_RELEASED))) {
819                 ma->ma_hsm.mh_flags |= HS_DIRTY;
820
821                 /* Bump cap so that closes from non-owner writers can
822                  * set the HSM state to dirty.
823                  */
824                 cap_saved = uc->uc_cap;
825                 cap_raise(uc->uc_cap, CAP_FOWNER);
826                 rc = mdt_hsm_attr_set(info, mo, &ma->ma_hsm);
827                 uc->uc_cap = cap_saved;
828                 if (rc)
829                         CERROR("file attribute change error for "DFID": %d\n",
830                                 PFID(mdt_object_fid(mo)), rc);
831         }
832
833         RETURN(rc);
834 }
835
836 static int mdt_reint_setattr(struct mdt_thread_info *info,
837                              struct mdt_lock_handle *lhc)
838 {
839         struct mdt_device *mdt = info->mti_mdt;
840         struct md_attr *ma = &info->mti_attr;
841         struct mdt_reint_record *rr = &info->mti_rr;
842         struct ptlrpc_request *req = mdt_info_req(info);
843         struct mdt_object *mo;
844         struct mdt_body *repbody;
845         ktime_t kstart = ktime_get();
846         int rc;
847
848         ENTRY;
849         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
850                   (unsigned int)ma->ma_attr.la_valid);
851
852         if (info->mti_dlm_req)
853                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
854
855         CFS_RACE(OBD_FAIL_PTLRPC_RESEND_RACE);
856
857         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
858         mo = mdt_object_find(info->mti_env, mdt, rr->rr_fid1);
859         if (IS_ERR(mo))
860                 GOTO(out, rc = PTR_ERR(mo));
861
862         if (!mdt_object_exists(mo))
863                 GOTO(out_put, rc = -ENOENT);
864
865         if (mdt_object_remote(mo))
866                 GOTO(out_put, rc = -EREMOTE);
867
868         ma->ma_enable_chprojid_gid = mdt->mdt_enable_chprojid_gid;
869         /* revoke lease lock if size is going to be changed */
870         if (unlikely(ma->ma_attr.la_valid & LA_SIZE &&
871                      !(ma->ma_attr_flags & MDS_TRUNC_KEEP_LEASE) &&
872                      atomic_read(&mo->mot_lease_count) > 0)) {
873                 down_read(&mo->mot_open_sem);
874
875                 if (atomic_read(&mo->mot_lease_count) > 0) { /* lease exists */
876                         lhc = &info->mti_lh[MDT_LH_LOCAL];
877                         rc = mdt_object_lock(info, mo, lhc, MDS_INODELOCK_OPEN,
878                                              LCK_CW);
879                         if (rc != 0) {
880                                 up_read(&mo->mot_open_sem);
881                                 GOTO(out_put, rc);
882                         }
883
884                         /* revoke lease lock */
885                         mdt_object_unlock(info, mo, lhc, 1);
886                 }
887                 up_read(&mo->mot_open_sem);
888         }
889
890         if (ma->ma_attr.la_valid & LA_SIZE || rr->rr_flags & MRF_OPEN_TRUNC) {
891                 /* Check write access for the O_TRUNC case */
892                 if (mdt_write_read(mo) < 0)
893                         GOTO(out_put, rc = -ETXTBSY);
894
895                 /* LU-10286: compatibility check for FLR.
896                  * Please check the comment in mdt_finish_open() for details
897                  */
898                 if (!exp_connect_flr(info->mti_exp) ||
899                     !exp_connect_overstriping(info->mti_exp)) {
900                         rc = mdt_big_xattr_get(info, mo, XATTR_NAME_LOV);
901                         if (rc < 0 && rc != -ENODATA)
902                                 GOTO(out_put, rc);
903
904                         if (!exp_connect_flr(info->mti_exp)) {
905                                 if (rc > 0 &&
906                                     mdt_lmm_is_flr(info->mti_big_lmm))
907                                         GOTO(out_put, rc = -EOPNOTSUPP);
908                         }
909
910                         if (!exp_connect_overstriping(info->mti_exp)) {
911                                 if (rc > 0 &&
912                                     mdt_lmm_is_overstriping(info->mti_big_lmm))
913                                         GOTO(out_put, rc = -EOPNOTSUPP);
914                         }
915                 }
916
917                 /* For truncate, the file size sent from client
918                  * is believable, but the blocks are incorrect,
919                  * which makes the block size in LSOM attribute
920                  * inconsisent with the real block size.
921                  */
922                 rc = mdt_lsom_update(info, mo, true);
923                 if (rc)
924                         GOTO(out_put, rc);
925         }
926
927         if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
928                 if (ma->ma_valid & MA_LOV)
929                         GOTO(out_put, rc = -EPROTO);
930
931                 /* MDT supports FMD for regular files due to Data-on-MDT */
932                 if (S_ISREG(lu_object_attr(&mo->mot_obj)) &&
933                     ma->ma_attr.la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) {
934                         tgt_fmd_update(info->mti_exp, mdt_object_fid(mo),
935                                        req->rq_xid);
936
937                         if (ma->ma_attr.la_valid & LA_MTIME) {
938                                 rc = mdt_attr_get_pfid(info, mo, &ma->ma_pfid);
939                                 if (!rc)
940                                         ma->ma_valid |= MA_PFID;
941                         }
942                 }
943
944                 rc = mdt_attr_set(info, mo, ma);
945                 if (rc)
946                         GOTO(out_put, rc);
947         } else if ((ma->ma_valid & (MA_LOV | MA_LMV)) &&
948                    (ma->ma_valid & MA_INODE)) {
949                 struct lu_buf *buf = &info->mti_buf;
950                 struct lu_ucred *uc = mdt_ucred(info);
951                 struct mdt_lock_handle *lh;
952                 const char *name;
953
954                 /* reject if either remote or striped dir is disabled */
955                 if (ma->ma_valid & MA_LMV) {
956                         if (!mdt->mdt_enable_remote_dir ||
957                             !mdt->mdt_enable_striped_dir)
958                                 GOTO(out_put, rc = -EPERM);
959
960                         /* we want rbac roles to have precedence over any other
961                          * permission or capability checks
962                          */
963                         if (!uc->uc_rbac_dne_ops ||
964                             (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
965                              uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
966                              mdt->mdt_enable_remote_dir_gid != -1))
967                                 GOTO(out_put, rc = -EPERM);
968                 }
969
970                 if (!S_ISDIR(lu_object_attr(&mo->mot_obj)))
971                         GOTO(out_put, rc = -ENOTDIR);
972
973                 if (ma->ma_attr.la_valid != 0)
974                         GOTO(out_put, rc = -EPROTO);
975
976                 lh = &info->mti_lh[MDT_LH_PARENT];
977                 if (ma->ma_valid & MA_LOV) {
978                         buf->lb_buf = ma->ma_lmm;
979                         buf->lb_len = ma->ma_lmm_size;
980                         name = XATTR_NAME_LOV;
981                         rc = mdt_object_lock(info, mo, lh, MDS_INODELOCK_XATTR,
982                                              LCK_PW);
983                 } else {
984                         buf->lb_buf = &ma->ma_lmv->lmv_user_md;
985                         buf->lb_len = ma->ma_lmv_size;
986                         name = XATTR_NAME_DEFAULT_LMV;
987
988                         if (unlikely(fid_is_root(mdt_object_fid(mo)))) {
989                                 rc = mdt_object_lock(info, mo, lh,
990                                                      MDS_INODELOCK_XATTR |
991                                                      MDS_INODELOCK_LOOKUP,
992                                                      LCK_PW);
993                         } else {
994                                 struct lu_fid *pfid = &info->mti_tmp_fid1;
995                                 struct lu_name *pname = &info->mti_name;
996                                 const char dotdot[] = "..";
997                                 struct mdt_object *pobj;
998
999                                 fid_zero(pfid);
1000                                 pname->ln_name = dotdot;
1001                                 pname->ln_namelen = sizeof(dotdot);
1002                                 rc = mdo_lookup(info->mti_env,
1003                                                 mdt_object_child(mo), pname,
1004                                                 pfid, NULL);
1005                                 if (rc)
1006                                         GOTO(out_put, rc);
1007
1008                                 pobj = mdt_object_find(info->mti_env,
1009                                                        info->mti_mdt, pfid);
1010                                 if (IS_ERR(pobj))
1011                                         GOTO(out_put, rc = PTR_ERR(pobj));
1012
1013                                 rc = mdt_object_check_lock(info, pobj, mo, lh,
1014                                                            MDS_INODELOCK_XATTR |
1015                                                            MDS_INODELOCK_LOOKUP,
1016                                                            LCK_PW);
1017                                 mdt_object_put(info->mti_env, pobj);
1018                         }
1019                 }
1020
1021                 if (rc != 0)
1022                         GOTO(out_put, rc);
1023
1024                 rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), buf,
1025                                   name, 0);
1026
1027                 mdt_object_unlock(info, mo, lh, rc);
1028                 if (rc)
1029                         GOTO(out_put, rc);
1030         } else {
1031                 GOTO(out_put, rc = -EPROTO);
1032         }
1033
1034         /* If file data is modified, add the dirty flag */
1035         if (ma->ma_attr_flags & MDS_DATA_MODIFIED)
1036                 rc = mdt_add_dirty_flag(info, mo, ma);
1037
1038         ma->ma_need = MA_INODE;
1039         ma->ma_valid = 0;
1040         rc = mdt_attr_get_complex(info, mo, ma);
1041         if (rc != 0)
1042                 GOTO(out_put, rc);
1043
1044         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
1045
1046         EXIT;
1047 out_put:
1048         mdt_object_put(info->mti_env, mo);
1049 out:
1050         if (rc == 0)
1051                 mdt_counter_incr(req, LPROC_MDT_SETATTR,
1052                                  ktime_us_delta(ktime_get(), kstart));
1053
1054         mdt_client_compatibility(info);
1055         return rc;
1056 }
1057
1058 static int mdt_reint_create(struct mdt_thread_info *info,
1059                             struct mdt_lock_handle *lhc)
1060 {
1061         struct ptlrpc_request   *req = mdt_info_req(info);
1062         ktime_t                 kstart = ktime_get();
1063         int                     rc;
1064
1065         ENTRY;
1066         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
1067                 RETURN(err_serious(-ESTALE));
1068
1069         if (info->mti_dlm_req)
1070                 ldlm_request_cancel(mdt_info_req(info),
1071                                     info->mti_dlm_req, 0, LATF_SKIP);
1072
1073         if (!lu_name_is_valid(&info->mti_rr.rr_name))
1074                 RETURN(-EPROTO);
1075
1076         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
1077         case S_IFDIR:
1078         case S_IFREG:
1079         case S_IFLNK:
1080         case S_IFCHR:
1081         case S_IFBLK:
1082         case S_IFIFO:
1083         case S_IFSOCK:
1084                 break;
1085         default:
1086                 CERROR("%s: Unsupported mode %o\n",
1087                        mdt_obd_name(info->mti_mdt),
1088                        info->mti_attr.ma_attr.la_mode);
1089                 RETURN(err_serious(-EOPNOTSUPP));
1090         }
1091
1092         rc = mdt_create(info);
1093         if (rc == 0) {
1094                 if ((info->mti_attr.ma_attr.la_mode & S_IFMT) == S_IFDIR)
1095                         mdt_counter_incr(req, LPROC_MDT_MKDIR,
1096                                          ktime_us_delta(ktime_get(), kstart));
1097                 else
1098                         /* Special file should stay on the same node as parent*/
1099                         mdt_counter_incr(req, LPROC_MDT_MKNOD,
1100                                          ktime_us_delta(ktime_get(), kstart));
1101         }
1102
1103         RETURN(rc);
1104 }
1105
1106 /*
1107  * VBR: save parent version in reply and child version getting by its name.
1108  * Version of child is getting and checking during its lookup. If
1109  */
1110 static int mdt_reint_unlink(struct mdt_thread_info *info,
1111                             struct mdt_lock_handle *lhc)
1112 {
1113         struct mdt_reint_record *rr = &info->mti_rr;
1114         struct ptlrpc_request *req = mdt_info_req(info);
1115         struct md_attr *ma = &info->mti_attr;
1116         struct lu_fid *child_fid = &info->mti_tmp_fid1;
1117         struct mdt_object *mp;
1118         struct mdt_object *mc;
1119         struct mdt_lock_handle *parent_lh;
1120         struct mdt_lock_handle *child_lh;
1121         struct ldlm_enqueue_info *einfo = &info->mti_einfo;
1122         struct lu_ucred *uc  = mdt_ucred(info);
1123         int no_name = 0;
1124         ktime_t kstart = ktime_get();
1125         int rc;
1126
1127         ENTRY;
1128         DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1),
1129                   PNAME(&rr->rr_name));
1130
1131         if (info->mti_dlm_req)
1132                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1133
1134         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
1135                 RETURN(err_serious(-ENOENT));
1136
1137         if (!fid_is_md_operative(rr->rr_fid1))
1138                 RETURN(-EPERM);
1139
1140         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1141         if (IS_ERR(mp))
1142                 RETURN(PTR_ERR(mp));
1143
1144         if (!mdt_object_remote(mp)) {
1145                 rc = mdt_version_get_check_save(info, mp, 0);
1146                 if (rc)
1147                         GOTO(put_parent, rc);
1148         }
1149
1150         if (!uc->uc_rbac_fscrypt_admin &&
1151             mp->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD)
1152                 GOTO(put_parent, rc = -EPERM);
1153
1154         CFS_RACE(OBD_FAIL_MDS_REINT_OPEN);
1155         CFS_RACE(OBD_FAIL_MDS_REINT_OPEN2);
1156         parent_lh = &info->mti_lh[MDT_LH_PARENT];
1157         rc = mdt_parent_lock(info, mp, parent_lh, &rr->rr_name, LCK_PW);
1158         if (rc != 0)
1159                 GOTO(put_parent, rc);
1160
1161         if (info->mti_spec.sp_rm_entry) {
1162                 if (!mdt_is_dne_client(req->rq_export))
1163                         /* Return -ENOTSUPP for old client */
1164                         GOTO(unlock_parent, rc = -ENOTSUPP);
1165
1166                 if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN))
1167                         GOTO(unlock_parent, rc = -EPERM);
1168
1169                 ma->ma_need = MA_INODE;
1170                 ma->ma_valid = 0;
1171                 rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1172                                 NULL, &rr->rr_name, ma, no_name);
1173                 GOTO(unlock_parent, rc);
1174         }
1175
1176         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID) {
1177                 *child_fid = *rr->rr_fid2;
1178         } else {
1179                 /* lookup child object along with version checking */
1180                 fid_zero(child_fid);
1181                 rc = mdt_lookup_version_check(info, mp, &rr->rr_name, child_fid,
1182                                               1);
1183                 if (rc != 0) {
1184                         /* Name might not be able to find during resend of
1185                          * remote unlink, considering following case.
1186                          * dir_A is a remote directory, the name entry of
1187                          * dir_A is on MDT0, the directory is on MDT1,
1188                          *
1189                          * 1. client sends unlink req to MDT1.
1190                          * 2. MDT1 sends name delete update to MDT0.
1191                          * 3. name entry is being deleted in MDT0 synchronously.
1192                          * 4. MDT1 is restarted.
1193                          * 5. client resends unlink req to MDT1. So it can not
1194                          *    find the name entry on MDT0 anymore.
1195                          * In this case, MDT1 only needs to destory the local
1196                          * directory.
1197                          */
1198                         if (mdt_object_remote(mp) && rc == -ENOENT &&
1199                             !fid_is_zero(rr->rr_fid2) &&
1200                             lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
1201                                 no_name = 1;
1202                                 *child_fid = *rr->rr_fid2;
1203                         } else {
1204                                 GOTO(unlock_parent, rc);
1205                         }
1206                 }
1207         }
1208
1209         if (!fid_is_md_operative(child_fid))
1210                 GOTO(unlock_parent, rc = -EPERM);
1211
1212         /* We will lock the child regardless it is local or remote. No harm. */
1213         mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
1214         if (IS_ERR(mc))
1215                 GOTO(unlock_parent, rc = PTR_ERR(mc));
1216
1217         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID) {
1218                 /* In this case, child fid is embedded in the request, and we do
1219                  * not have a proper name as rr_name contains an encoded
1220                  * hash. So find name that matches provided hash.
1221                  */
1222                 if (!find_name_matching_hash(info, &rr->rr_name,
1223                                              NULL, mc))
1224                         GOTO(put_child, rc = -ENOENT);
1225         }
1226
1227         child_lh = &info->mti_lh[MDT_LH_CHILD];
1228         if (mdt_object_remote(mc)) {
1229                 struct mdt_body  *repbody;
1230
1231                 if (!fid_is_zero(rr->rr_fid2)) {
1232                         CDEBUG(D_INFO, "%s: name "DNAME" cannot find "DFID"\n",
1233                                mdt_obd_name(info->mti_mdt),
1234                                PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1235                         GOTO(put_child, rc = -ENOENT);
1236                 }
1237                 CDEBUG(D_INFO, "%s: name "DNAME": "DFID" is on another MDT\n",
1238                        mdt_obd_name(info->mti_mdt),
1239                        PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1240
1241                 if (!mdt_is_dne_client(req->rq_export))
1242                         /* Return -ENOTSUPP for old client */
1243                         GOTO(put_child, rc = -ENOTSUPP);
1244
1245                 /* Revoke the LOOKUP lock of the remote object granted by
1246                  * this MDT. Since the unlink will happen on another MDT,
1247                  * it will release the LOOKUP lock right away. Then What
1248                  * would happen if another client try to grab the LOOKUP
1249                  * lock at the same time with unlink XXX
1250                  */
1251                 rc = mdt_object_lookup_lock(info, NULL, mc, child_lh, LCK_EX);
1252                 if (rc)
1253                         GOTO(put_child, rc);
1254
1255                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1256                 LASSERT(repbody != NULL);
1257                 repbody->mbo_fid1 = *mdt_object_fid(mc);
1258                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
1259                 GOTO(unlock_child, rc = -EREMOTE);
1260         }
1261         /* We used to acquire MDS_INODELOCK_FULL here but we can't do
1262          * this now because a running HSM restore on the child (unlink
1263          * victim) will hold the layout lock. See LU-4002.
1264          */
1265         rc = mdt_object_stripes_lock(info, mp, mc, child_lh, einfo,
1266                                      MDS_INODELOCK_LOOKUP |
1267                                      MDS_INODELOCK_UPDATE, LCK_EX);
1268         if (rc != 0)
1269                 GOTO(put_child, rc);
1270
1271         /*
1272          * Now we can only make sure we need MA_INODE, in mdd layer, will check
1273          * whether need MA_LOV and MA_COOKIE.
1274          */
1275         ma->ma_need = MA_INODE;
1276         ma->ma_valid = 0;
1277
1278         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1279                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
1280         /* save version when object is locked */
1281         mdt_version_get_save(info, mc, 1);
1282
1283         mutex_lock(&mc->mot_lov_mutex);
1284
1285         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1286                         mdt_object_child(mc), &rr->rr_name, ma, no_name);
1287
1288         mutex_unlock(&mc->mot_lov_mutex);
1289         if (rc != 0)
1290                 GOTO(unlock_child, rc);
1291
1292         if (!lu_object_is_dying(&mc->mot_header)) {
1293                 rc = mdt_attr_get_complex(info, mc, ma);
1294                 if (rc)
1295                         GOTO(out_stat, rc);
1296         } else if (mdt_dom_check_for_discard(info, mc)) {
1297                 mdt_dom_discard_data(info, mc);
1298         }
1299         mdt_handle_last_unlink(info, mc, ma);
1300
1301 out_stat:
1302         if (ma->ma_valid & MA_INODE) {
1303                 switch (ma->ma_attr.la_mode & S_IFMT) {
1304                 case S_IFDIR:
1305                         mdt_counter_incr(req, LPROC_MDT_RMDIR,
1306                                          ktime_us_delta(ktime_get(), kstart));
1307                         break;
1308                 case S_IFREG:
1309                 case S_IFLNK:
1310                 case S_IFCHR:
1311                 case S_IFBLK:
1312                 case S_IFIFO:
1313                 case S_IFSOCK:
1314                         mdt_counter_incr(req, LPROC_MDT_UNLINK,
1315                                          ktime_us_delta(ktime_get(), kstart));
1316                         break;
1317                 default:
1318                         LASSERTF(0, "bad file type %o unlinking\n",
1319                                 ma->ma_attr.la_mode);
1320                 }
1321         }
1322
1323         EXIT;
1324
1325 unlock_child:
1326         /* after unlink the object is gone, no need to keep lock */
1327         mdt_object_stripes_unlock(info, mc, child_lh, einfo, 1);
1328 put_child:
1329         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID &&
1330             info->mti_big_buf.lb_buf)
1331                 lu_buf_free(&info->mti_big_buf);
1332         mdt_object_put(info->mti_env, mc);
1333 unlock_parent:
1334         mdt_object_unlock(info, mp, parent_lh, rc);
1335 put_parent:
1336         mdt_object_put(info->mti_env, mp);
1337         CFS_RACE_WAKEUP(OBD_FAIL_OBD_ZERO_NLINK_RACE);
1338         return rc;
1339 }
1340
1341 /*
1342  * VBR: save versions in reply: 0 - parent; 1 - child by fid; 2 - target by
1343  * name.
1344  */
1345 static int mdt_reint_link(struct mdt_thread_info *info,
1346                           struct mdt_lock_handle *lhc)
1347 {
1348         struct mdt_reint_record *rr = &info->mti_rr;
1349         struct ptlrpc_request   *req = mdt_info_req(info);
1350         struct md_attr          *ma = &info->mti_attr;
1351         struct mdt_object       *ms;
1352         struct mdt_object       *mp;
1353         struct mdt_lock_handle  *lhs;
1354         struct mdt_lock_handle  *lhp;
1355         ktime_t kstart = ktime_get();
1356         int rc;
1357
1358         ENTRY;
1359         DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/"DNAME,
1360                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), PNAME(&rr->rr_name));
1361
1362         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
1363                 RETURN(err_serious(-ENOENT));
1364
1365         if (CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_RESEND_RACE) ||
1366             CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND)) {
1367                 req->rq_no_reply = 1;
1368                 RETURN(err_serious(-ENOENT));
1369         }
1370
1371         if (info->mti_dlm_req)
1372                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1373
1374         /* Invalid case so return error immediately instead of
1375          * processing it
1376          */
1377         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
1378                 RETURN(-EPERM);
1379
1380         if (!fid_is_md_operative(rr->rr_fid1) ||
1381             !fid_is_md_operative(rr->rr_fid2))
1382                 RETURN(-EPERM);
1383
1384         /* step 1: find target parent dir */
1385         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
1386         if (IS_ERR(mp))
1387                 RETURN(PTR_ERR(mp));
1388
1389         rc = mdt_version_get_check_save(info, mp, 0);
1390         if (rc)
1391                 GOTO(put_parent, rc);
1392
1393         rc = mdt_check_enc(info, mp);
1394         if (rc)
1395                 GOTO(put_parent, rc);
1396
1397         /* step 2: find source */
1398         ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1399         if (IS_ERR(ms))
1400                 GOTO(put_parent, rc = PTR_ERR(ms));
1401
1402         if (!mdt_object_exists(ms)) {
1403                 CDEBUG(D_INFO, "%s: "DFID" does not exist.\n",
1404                        mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1));
1405                 GOTO(put_source, rc = -ENOENT);
1406         }
1407
1408         CFS_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
1409
1410         lhp = &info->mti_lh[MDT_LH_PARENT];
1411         rc = mdt_parent_lock(info, mp, lhp, &rr->rr_name, LCK_PW);
1412         if (rc != 0)
1413                 GOTO(put_source, rc);
1414
1415         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
1416
1417         lhs = &info->mti_lh[MDT_LH_CHILD];
1418         rc = mdt_object_lock(info, ms, lhs,
1419                              MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
1420                              LCK_EX);
1421         if (rc != 0)
1422                 GOTO(unlock_parent, rc);
1423
1424         /* step 3: link it */
1425         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1426                         OBD_FAIL_MDS_REINT_LINK_WRITE);
1427
1428         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(ms));
1429         rc = mdt_version_get_check_save(info, ms, 1);
1430         if (rc)
1431                 GOTO(unlock_source, rc);
1432
1433         /** check target version by name during replay */
1434         rc = mdt_lookup_version_check(info, mp, &rr->rr_name,
1435                                       &info->mti_tmp_fid1, 2);
1436         if (rc != 0 && rc != -ENOENT)
1437                 GOTO(unlock_source, rc);
1438         /* save version of file name for replay, it must be ENOENT here */
1439         if (!req_is_replay(mdt_info_req(info))) {
1440                 if (rc != -ENOENT) {
1441                         CDEBUG(D_INFO, "link target "DNAME" existed!\n",
1442                                PNAME(&rr->rr_name));
1443                         GOTO(unlock_source, rc = -EEXIST);
1444                 }
1445                 info->mti_ver[2] = ENOENT_VERSION;
1446                 mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2);
1447         }
1448
1449         rc = mdo_link(info->mti_env, mdt_object_child(mp),
1450                       mdt_object_child(ms), &rr->rr_name, ma);
1451
1452         if (rc == 0)
1453                 mdt_counter_incr(req, LPROC_MDT_LINK,
1454                                  ktime_us_delta(ktime_get(), kstart));
1455
1456         EXIT;
1457 unlock_source:
1458         mdt_object_unlock(info, ms, lhs, rc);
1459 unlock_parent:
1460         mdt_object_unlock(info, mp, lhp, rc);
1461 put_source:
1462         mdt_object_put(info->mti_env, ms);
1463 put_parent:
1464         mdt_object_put(info->mti_env, mp);
1465         return rc;
1466 }
1467
1468 /**
1469  * Get BFL lock for rename or migrate process.
1470  **/
1471 static int mdt_rename_lock(struct mdt_thread_info *info,
1472                            struct mdt_lock_handle *lh)
1473 {
1474         struct lu_fid *fid = &info->mti_tmp_fid1;
1475         struct mdt_object *obj;
1476         __u64 ibits = MDS_INODELOCK_UPDATE;
1477         int rc;
1478
1479         ENTRY;
1480         lu_root_fid(fid);
1481         obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1482         if (IS_ERR(obj))
1483                 RETURN(PTR_ERR(obj));
1484
1485         mdt_lock_reg_init(lh, LCK_EX);
1486         rc = mdt_object_lock_internal(info, obj, &LUSTRE_BFL_FID, lh,
1487                                       &ibits, 0, false);
1488         mdt_object_put(info->mti_env, obj);
1489         RETURN(rc);
1490 }
1491
1492 static void mdt_rename_unlock(struct mdt_thread_info *info,
1493                               struct mdt_lock_handle *lh)
1494 {
1495         ENTRY;
1496         /* Cancel the single rename lock right away */
1497         mdt_object_unlock(info, NULL, lh, 1);
1498         EXIT;
1499 }
1500
1501 static struct mdt_object *mdt_parent_find_check(struct mdt_thread_info *info,
1502                                                 const struct lu_fid *fid,
1503                                                 int idx)
1504 {
1505         struct mdt_object *dir;
1506         int rc;
1507
1508         ENTRY;
1509         dir = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1510         if (IS_ERR(dir))
1511                 RETURN(dir);
1512
1513         /* check early, the real version will be saved after locking */
1514         rc = mdt_version_get_check(info, dir, idx);
1515         if (rc)
1516                 GOTO(out_put, rc);
1517
1518         if (!mdt_object_exists(dir))
1519                 GOTO(out_put, rc = -ENOENT);
1520
1521         if (!S_ISDIR(lu_object_attr(&dir->mot_obj)))
1522                 GOTO(out_put, rc = -ENOTDIR);
1523
1524         RETURN(dir);
1525 out_put:
1526         mdt_object_put(info->mti_env, dir);
1527         return ERR_PTR(rc);
1528 }
1529
1530 /*
1531  * lock rename source object.
1532  *
1533  * Both source and its parent object may be located on remote MDTs, and even on
1534  * different MDTs, which means source object is a remote object on parent.
1535  *
1536  * \retval      0 on success
1537  * \retval      -ev negative errno upon error
1538  */
1539 static int mdt_rename_source_lock(struct mdt_thread_info *info,
1540                                   struct mdt_object *parent,
1541                                   struct mdt_object *child,
1542                                   struct mdt_lock_handle *lh,
1543                                   struct mdt_lock_handle *lh_lookup,
1544                                   __u64 ibits)
1545 {
1546         int rc;
1547
1548         LASSERT(ibits & MDS_INODELOCK_LOOKUP);
1549         /* if @obj is remote object, LOOKUP lock needs to be taken from
1550          * parent MDT.
1551          */
1552         rc = mdt_is_remote_object(info, parent, child);
1553         if (rc < 0)
1554                 return rc;
1555
1556         if (rc == 1) {
1557                 rc = mdt_object_lookup_lock(info, parent, child, lh_lookup,
1558                                             LCK_EX);
1559                 if (rc)
1560                         return rc;
1561
1562                 ibits &= ~MDS_INODELOCK_LOOKUP;
1563         }
1564
1565         rc = mdt_object_lock(info, child, lh, ibits, LCK_EX);
1566         if (unlikely(rc && !(ibits & MDS_INODELOCK_LOOKUP)))
1567                 mdt_object_unlock(info, NULL, lh_lookup, rc);
1568
1569         return 0;
1570 }
1571
1572 static void mdt_rename_source_unlock(struct mdt_thread_info *info,
1573                                      struct mdt_object *obj,
1574                                      struct mdt_lock_handle *lh,
1575                                      struct mdt_lock_handle *lh_lookup,
1576                                      int decref)
1577 {
1578         mdt_object_unlock(info, obj, lh, decref);
1579         mdt_object_unlock(info, NULL, lh_lookup, decref);
1580 }
1581
1582 /* migration takes UPDATE lock of link parent, and LOOKUP lock of link */
1583 struct mdt_link_lock {
1584         struct mdt_object *mll_obj;
1585         struct mdt_lock_handle mll_lh;
1586         struct list_head mll_linkage;
1587 };
1588
1589 static inline int mdt_migrate_link_lock_add(struct mdt_thread_info *info,
1590                                             struct mdt_object *o,
1591                                             struct mdt_lock_handle *lh,
1592                                             struct list_head *list)
1593 {
1594         struct mdt_link_lock *mll;
1595
1596         OBD_ALLOC_PTR(mll);
1597         if (mll == NULL)
1598                 return -ENOMEM;
1599
1600         INIT_LIST_HEAD(&mll->mll_linkage);
1601         mdt_object_get(info->mti_env, o);
1602         mll->mll_obj = o;
1603         mll->mll_lh = *lh;
1604         memset(lh, 0, sizeof(*lh));
1605         list_add_tail(&mll->mll_linkage, list);
1606
1607         return 0;
1608 }
1609
1610 static inline void mdt_migrate_link_lock_del(struct mdt_thread_info *info,
1611                                              struct mdt_link_lock *mll,
1612                                              int decref)
1613 {
1614         mdt_object_unlock(info, mll->mll_obj, &mll->mll_lh, decref);
1615         mdt_object_put(info->mti_env, mll->mll_obj);
1616         list_del(&mll->mll_linkage);
1617         OBD_FREE_PTR(mll);
1618 }
1619
1620 static void mdt_migrate_links_unlock(struct mdt_thread_info *info,
1621                                      struct list_head *list, int decref)
1622 {
1623         struct mdt_link_lock *mll;
1624         struct mdt_link_lock *tmp;
1625
1626         list_for_each_entry_safe(mll, tmp, list, mll_linkage)
1627                 mdt_migrate_link_lock_del(info, mll, decref);
1628 }
1629
1630 /* take link parent UPDATE lock.
1631  * \retval      0 \a lnkp is already locked, no lock taken.
1632  *              1 lock taken
1633  *              -ev negative errno.
1634  */
1635 static int mdt_migrate_link_parent_lock(struct mdt_thread_info *info,
1636                                         struct mdt_object *lnkp,
1637                                         struct list_head *update_locks,
1638                                         bool *blocked)
1639 {
1640         const struct lu_fid *fid = mdt_object_fid(lnkp);
1641         struct mdt_lock_handle *lhl = &info->mti_lh[MDT_LH_LOCAL];
1642         struct mdt_link_lock *entry;
1643         __u64 ibits = 0;
1644         int rc;
1645
1646         ENTRY;
1647
1648         /* check if it's already locked */
1649         list_for_each_entry(entry, update_locks, mll_linkage) {
1650                 if (lu_fid_eq(mdt_object_fid(entry->mll_obj), fid)) {
1651                         CDEBUG(D_INFO, "skip "DFID" lock\n", PFID(fid));
1652                         RETURN(0);
1653                 }
1654         }
1655
1656         /* link parent UPDATE lock */
1657         CDEBUG(D_INFO, "lock "DFID"\n", PFID(fid));
1658
1659         if (*blocked) {
1660                 /* revoke lock instead of take in *blocked* mode */
1661                 rc = mdt_object_lock(info, lnkp, lhl, MDS_INODELOCK_UPDATE,
1662                                      LCK_PW);
1663                 if (rc)
1664                         RETURN(rc);
1665
1666                 if (mdt_object_remote(lnkp)) {
1667                         struct ldlm_lock *lock;
1668
1669                         /*
1670                          * for remote object, set lock cb_atomic, so lock can be
1671                          * released in blocking_ast() immediately, then the next
1672                          * lock_try will have better chance of success.
1673                          */
1674                         lock = ldlm_handle2lock(&lhl->mlh_rreg_lh);
1675                         LASSERT(lock != NULL);
1676                         lock_res_and_lock(lock);
1677                         ldlm_set_atomic_cb(lock);
1678                         unlock_res_and_lock(lock);
1679                         LDLM_LOCK_PUT(lock);
1680                 }
1681
1682                 mdt_object_unlock(info, lnkp, lhl, 1);
1683                 RETURN(0);
1684         }
1685
1686         /*
1687          * we can't follow parent-child lock order like other MD
1688          * operations, use lock_try here to avoid deadlock, if the lock
1689          * cannot be taken, drop all locks taken, revoke the blocked
1690          * one, and continue processing the remaining entries, and in
1691          * the end of the loop restart from beginning.
1692          *
1693          * don't lock with PDO mode in case two links are under the same
1694          * parent and their hash values are different.
1695          */
1696         rc = mdt_object_lock_try(info, lnkp, lhl, &ibits, MDS_INODELOCK_UPDATE,
1697                                  LCK_PW);
1698         if (rc < 0)
1699                 RETURN(rc);
1700
1701         if (!(ibits & MDS_INODELOCK_UPDATE)) {
1702                 CDEBUG(D_INFO, "busy lock on "DFID"\n", PFID(fid));
1703                 *blocked = true;
1704                 RETURN(-EAGAIN);
1705         }
1706
1707         rc = mdt_migrate_link_lock_add(info, lnkp, lhl, update_locks);
1708         if (rc) {
1709                 mdt_object_unlock(info, lnkp, lhl, 1);
1710                 RETURN(rc);
1711         }
1712
1713         RETURN(1);
1714 }
1715
1716 /* take link LOOKUP lock.
1717  * \retval      0 \a lnkp is already locked, no lock taken.
1718  *              1 lock taken.
1719  *              -ev negative errno.
1720  */
1721 static int mdt_migrate_link_lock(struct mdt_thread_info *info,
1722                                  struct mdt_object *lnkp,
1723                                  struct mdt_object *spobj,
1724                                  struct mdt_object *obj,
1725                                  struct list_head *lookup_locks)
1726 {
1727         const struct lu_fid *fid = mdt_object_fid(lnkp);
1728         struct mdt_lock_handle *lhl = &info->mti_lh[MDT_LH_LOCAL];
1729         struct mdt_link_lock *entry;
1730         int rc;
1731
1732         ENTRY;
1733
1734         /* check if it's already locked by source */
1735         rc = mdt_fids_different_target(info, fid, mdt_object_fid(spobj));
1736         if (rc <= 0) {
1737                 CDEBUG(D_INFO, "skip lookup lock on source parent "DFID"\n",
1738                        PFID(fid));
1739                 RETURN(rc);
1740         }
1741
1742         /* check if it's already locked by other links */
1743         list_for_each_entry(entry, lookup_locks, mll_linkage) {
1744                 rc = mdt_fids_different_target(info, fid,
1745                                                mdt_object_fid(entry->mll_obj));
1746                 if (rc <= 0) {
1747                         CDEBUG(D_INFO, "skip lookup lock on parent "DFID"\n",
1748                                PFID(fid));
1749                         RETURN(rc);
1750                 }
1751         }
1752
1753         rc = mdt_object_lookup_lock(info, lnkp, obj, lhl, LCK_EX);
1754         if (rc)
1755                 RETURN(rc);
1756
1757         /* don't take local LOOKUP lock, because later we will lock other ibits
1758          * of sobj (which is on local MDT), and lock the same object twice may
1759          * deadlock, just revoke this lock.
1760          */
1761         if (!mdt_object_remote(lnkp))
1762                 GOTO(unlock, rc = 0);
1763
1764         rc = mdt_migrate_link_lock_add(info, lnkp, lhl, lookup_locks);
1765         if (rc)
1766                 GOTO(unlock, rc);
1767
1768         RETURN(1);
1769 unlock:
1770         mdt_object_unlock(info, lnkp, lhl, 1);
1771         return rc;
1772 }
1773
1774 /*
1775  * take UPDATE lock of link parents and LOOKUP lock of links, also check whether
1776  * total local lock count exceeds RS_MAX_LOCKS.
1777  *
1778  * \retval      0 on success, and locks can be saved in ptlrpc_reply_stat
1779  * \retval      1 on success, but total lock count may exceed RS_MAX_LOCKS
1780  * \retval      -ev negative errno upon error
1781  */
1782 static int mdt_migrate_links_lock(struct mdt_thread_info *info,
1783                                   struct mdt_object *spobj,
1784                                   struct mdt_object *tpobj,
1785                                   struct mdt_object *obj,
1786                                   struct mdt_lock_handle *lhsp,
1787                                   struct mdt_lock_handle *lhtp,
1788                                   struct list_head *link_locks)
1789 {
1790         struct mdt_device *mdt = info->mti_mdt;
1791         struct lu_buf *buf = &info->mti_big_buf;
1792         struct lu_name *lname = &info->mti_name;
1793         struct linkea_data ldata = { NULL };
1794         int local_lock_cnt = 0;
1795         bool blocked = false;
1796         bool saved;
1797         struct mdt_object *lnkp;
1798         struct lu_fid fid;
1799         LIST_HEAD(update_locks);
1800         LIST_HEAD(lookup_locks);
1801         int rc;
1802
1803         ENTRY;
1804         if (S_ISDIR(lu_object_attr(&obj->mot_obj)))
1805                 RETURN(0);
1806
1807         buf = lu_buf_check_and_alloc(buf, MAX_LINKEA_SIZE);
1808         if (buf->lb_buf == NULL)
1809                 RETURN(-ENOMEM);
1810
1811         ldata.ld_buf = buf;
1812         rc = mdt_links_read(info, obj, &ldata);
1813         if (rc) {
1814                 if (rc == -ENOENT || rc == -ENODATA)
1815                         rc = 0;
1816                 RETURN(rc);
1817         }
1818
1819         for (linkea_first_entry(&ldata); ldata.ld_lee && !rc;
1820              linkea_next_entry(&ldata)) {
1821                 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, lname,
1822                                     &fid);
1823
1824                 /* check if link parent is source parent too */
1825                 if (lu_fid_eq(mdt_object_fid(spobj), &fid)) {
1826                         CDEBUG(D_INFO,
1827                                "skip lock on source parent "DFID"/"DNAME"\n",
1828                                PFID(&fid), PNAME(lname));
1829                         continue;
1830                 }
1831
1832                 /* check if link parent is target parent too */
1833                 if (tpobj != spobj && lu_fid_eq(mdt_object_fid(tpobj), &fid)) {
1834                         CDEBUG(D_INFO,
1835                                "skip lock on target parent "DFID"/"DNAME"\n",
1836                                PFID(&fid), PNAME(lname));
1837                         continue;
1838                 }
1839
1840                 lnkp = mdt_object_find(info->mti_env, mdt, &fid);
1841                 if (IS_ERR(lnkp)) {
1842                         CWARN("%s: cannot find obj "DFID": %ld\n",
1843                               mdt_obd_name(mdt), PFID(&fid), PTR_ERR(lnkp));
1844                         continue;
1845                 }
1846
1847                 if (!mdt_object_exists(lnkp)) {
1848                         CDEBUG(D_INFO, DFID" doesn't exist, skip "DNAME"\n",
1849                                PFID(&fid), PNAME(lname));
1850                         mdt_object_put(info->mti_env, lnkp);
1851                         continue;
1852                 }
1853 relock:
1854                 saved = blocked;
1855                 rc = mdt_migrate_link_parent_lock(info, lnkp, &update_locks,
1856                                                   &blocked);
1857                 if (!saved && blocked) {
1858                         /* unlock all locks taken to avoid deadlock */
1859                         mdt_migrate_links_unlock(info, &update_locks, 1);
1860                         mdt_object_unlock(info, spobj, lhsp, 1);
1861                         if (tpobj != spobj)
1862                                 mdt_object_unlock(info, tpobj, lhtp, 1);
1863                         goto relock;
1864                 }
1865                 if (rc < 0) {
1866                         mdt_object_put(info->mti_env, lnkp);
1867                         GOTO(out, rc);
1868                 }
1869
1870                 if (rc == 1 && !mdt_object_remote(lnkp))
1871                         local_lock_cnt++;
1872
1873                 rc = mdt_migrate_link_lock(info, lnkp, spobj, obj,
1874                                            &lookup_locks);
1875                 if (rc < 0) {
1876                         mdt_object_put(info->mti_env, lnkp);
1877                         GOTO(out, rc);
1878                 }
1879                 if (rc == 1 && !mdt_object_remote(lnkp))
1880                         local_lock_cnt++;
1881                 mdt_object_put(info->mti_env, lnkp);
1882         }
1883
1884         if (blocked)
1885                 GOTO(out, rc = -EBUSY);
1886
1887         EXIT;
1888 out:
1889         list_splice(&update_locks, link_locks);
1890         list_splice(&lookup_locks, link_locks);
1891         if (rc < 0) {
1892                 mdt_migrate_links_unlock(info, link_locks, rc);
1893         } else if (local_lock_cnt > RS_MAX_LOCKS - 5) {
1894                 /*
1895                  * parent may have 3 local objects: master object and 2 stripes
1896                  * (if it's being migrated too); source may have 1 local objects
1897                  * as regular file; target has 1 local object.
1898                  * Note, source may have 2 local locks if it is directory but it
1899                  * can't have hardlinks, so it is not considered here.
1900                  */
1901                 CDEBUG(D_INFO, "Too many local locks (%d), migrate in sync mode\n",
1902                        local_lock_cnt);
1903                 rc = 1;
1904         }
1905         return rc;
1906 }
1907
1908 /*
1909  * lookup source by name, if parent is striped directory, we need to find the
1910  * corresponding stripe where source is located, and then lookup there.
1911  *
1912  * besides, if parent is migrating too, and file is already in target stripe,
1913  * this should be a redo of 'lfs migrate' on client side.
1914  *
1915  * \retval 1 tpobj stripe index is less than spobj stripe index
1916  * \retval 0 tpobj stripe index is larger than or equal to spobj stripe index
1917  * \retval -ev negative errno upon error
1918  */
1919 static int mdt_migrate_lookup(struct mdt_thread_info *info,
1920                               struct mdt_object *pobj,
1921                               const struct md_attr *ma,
1922                               const struct lu_name *lname,
1923                               struct mdt_object **spobj,
1924                               struct mdt_object **tpobj,
1925                               struct mdt_object **sobj)
1926 {
1927         const struct lu_env *env = info->mti_env;
1928         struct lu_fid *fid = &info->mti_tmp_fid1;
1929         int spindex = -1;
1930         int tpindex = -1;
1931         int rc;
1932
1933         if (ma->ma_valid & MA_LMV) {
1934                 /* if parent is striped, lookup on corresponding stripe */
1935                 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1936                 struct lu_fid *fid2 = &info->mti_tmp_fid2;
1937
1938                 if (!lmv_is_sane(lmv))
1939                         return -EBADF;
1940
1941                 spindex = lmv_name_to_stripe_index_old(lmv, lname->ln_name,
1942                                                        lname->ln_namelen);
1943                 if (spindex < 0)
1944                         return spindex;
1945
1946                 fid_le_to_cpu(fid2, &lmv->lmv_stripe_fids[spindex]);
1947
1948                 *spobj = mdt_object_find(env, info->mti_mdt, fid2);
1949                 if (IS_ERR(*spobj)) {
1950                         rc = PTR_ERR(*spobj);
1951                         *spobj = NULL;
1952                         return rc;
1953                 }
1954
1955                 if (!mdt_object_exists(*spobj))
1956                         GOTO(spobj_put, rc = -ENOENT);
1957
1958                 fid_zero(fid);
1959                 rc = mdo_lookup(env, mdt_object_child(*spobj), lname, fid,
1960                                 &info->mti_spec);
1961                 if ((rc == -ENOENT || rc == 0) && lmv_is_layout_changing(lmv)) {
1962                         /* fail check here to let top dir migration succeed. */
1963                         if (CFS_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_ENTRIES, 0))
1964                                 GOTO(spobj_put, rc = -EIO);
1965
1966                         /*
1967                          * if parent layout is changeing, and lookup child
1968                          * failed on source stripe, lookup again on target
1969                          * stripe, if it exists, it means previous migration
1970                          * was interrupted, and current file was migrated
1971                          * already.
1972                          */
1973                         tpindex = lmv_name_to_stripe_index(lmv, lname->ln_name,
1974                                                            lname->ln_namelen);
1975                         if (tpindex < 0)
1976                                 GOTO(spobj_put, rc = tpindex);
1977
1978                         fid_le_to_cpu(fid2, &lmv->lmv_stripe_fids[tpindex]);
1979
1980                         *tpobj = mdt_object_find(env, info->mti_mdt, fid2);
1981                         if (IS_ERR(*tpobj)) {
1982                                 rc = PTR_ERR(*tpobj);
1983                                 *tpobj = NULL;
1984                                 GOTO(spobj_put, rc);
1985                         }
1986
1987                         if (!mdt_object_exists(*tpobj))
1988                                 GOTO(tpobj_put, rc = -ENOENT);
1989
1990                         if (rc == -ENOENT) {
1991                                 fid_zero(fid);
1992                                 rc = mdo_lookup(env, mdt_object_child(*tpobj),
1993                                                 lname, fid, &info->mti_spec);
1994                                 GOTO(tpobj_put, rc = rc ?: -EALREADY);
1995                         }
1996                 } else if (rc) {
1997                         GOTO(spobj_put, rc);
1998                 } else {
1999                         *tpobj = *spobj;
2000                         tpindex = spindex;
2001                         mdt_object_get(env, *tpobj);
2002                 }
2003         } else {
2004                 fid_zero(fid);
2005                 rc = mdo_lookup(env, mdt_object_child(pobj), lname, fid,
2006                                 &info->mti_spec);
2007                 if (rc)
2008                         return rc;
2009
2010                 *spobj = pobj;
2011                 *tpobj = pobj;
2012                 mdt_object_get(env, pobj);
2013                 mdt_object_get(env, pobj);
2014         }
2015
2016         *sobj = mdt_object_find(env, info->mti_mdt, fid);
2017         if (IS_ERR(*sobj)) {
2018                 rc = PTR_ERR(*sobj);
2019                 *sobj = NULL;
2020                 GOTO(tpobj_put, rc);
2021         }
2022
2023         if (!mdt_object_exists(*sobj))
2024                 GOTO(sobj_put, rc = -ENOENT);
2025
2026         return (tpindex < spindex);
2027
2028 sobj_put:
2029         mdt_object_put(env, *sobj);
2030         *sobj = NULL;
2031 tpobj_put:
2032         mdt_object_put(env, *tpobj);
2033         *tpobj = NULL;
2034 spobj_put:
2035         mdt_object_put(env, *spobj);
2036         *spobj = NULL;
2037
2038         return rc;
2039 }
2040
2041 /* end lease and close file for regular file */
2042 static int mdd_migrate_close(struct mdt_thread_info *info,
2043                              struct mdt_object *obj)
2044 {
2045         struct close_data *data;
2046         struct mdt_body *repbody;
2047         struct ldlm_lock *lease;
2048         int rc;
2049         int rc2;
2050
2051         rc = -EPROTO;
2052         if (!req_capsule_field_present(info->mti_pill, &RMF_MDT_EPOCH,
2053                                       RCL_CLIENT) ||
2054             !req_capsule_field_present(info->mti_pill, &RMF_CLOSE_DATA,
2055                                       RCL_CLIENT))
2056                 goto close;
2057
2058         data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
2059         if (!data)
2060                 goto close;
2061
2062         rc = -ESTALE;
2063         lease = ldlm_handle2lock(&data->cd_handle);
2064         if (!lease)
2065                 goto close;
2066
2067         /* check if the lease was already canceled */
2068         lock_res_and_lock(lease);
2069         rc = ldlm_is_cancel(lease);
2070         unlock_res_and_lock(lease);
2071
2072         if (rc) {
2073                 rc = -EAGAIN;
2074                 LDLM_DEBUG(lease, DFID" lease broken",
2075                            PFID(mdt_object_fid(obj)));
2076         }
2077
2078         /*
2079          * cancel server side lease, client side counterpart should have been
2080          * cancelled, it's okay to cancel it now as we've held mot_open_sem.
2081          */
2082         ldlm_lock_cancel(lease);
2083         ldlm_reprocess_all(lease->l_resource,
2084                            lease->l_policy_data.l_inodebits.bits);
2085         LDLM_LOCK_PUT(lease);
2086
2087 close:
2088         rc2 = mdt_close_internal(info, mdt_info_req(info), NULL);
2089         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2090         repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
2091
2092         return rc ?: rc2;
2093 }
2094
2095 /* LFSCK used to clear hash type and MIGRATION flag upon migration failure */
2096 static inline bool lmv_is_failed_migration(const struct lmv_mds_md_v1 *lmv)
2097 {
2098         return le32_to_cpu(lmv->lmv_hash_type) ==
2099                 (LMV_HASH_TYPE_UNKNOWN | LMV_HASH_FLAG_BAD_TYPE) &&
2100                lmv_is_known_hash_type(le32_to_cpu(lmv->lmv_migrate_hash)) &&
2101                le32_to_cpu(lmv->lmv_migrate_offset) > 0 &&
2102                le32_to_cpu(lmv->lmv_migrate_offset) <
2103                 le32_to_cpu(lmv->lmv_stripe_count);
2104 }
2105
2106 /*
2107  * migrate file in below steps:
2108  *  1. lock source and target stripes
2109  *  2. lookup source by name
2110  *  3. lock parents of source links if source is not directory
2111  *  4. reject if source is in HSM
2112  *  5. take source open_sem and close file if source is regular file
2113  *  6. lock source, and its stripes if it's directory
2114  *  7. migrate file
2115  *  8. lock target so subsequent change to it can trigger COS
2116  *  9. unlock above locks
2117  * 10. sync device if source has too many links
2118  */
2119 int mdt_reint_migrate(struct mdt_thread_info *info,
2120                       struct mdt_lock_handle *unused)
2121 {
2122         const struct lu_env *env = info->mti_env;
2123         struct mdt_device *mdt = info->mti_mdt;
2124         struct ptlrpc_request *req = mdt_info_req(info);
2125         struct mdt_reint_record *rr = &info->mti_rr;
2126         struct lu_ucred *uc = mdt_ucred(info);
2127         struct md_attr *ma = &info->mti_attr;
2128         struct mdt_object *pobj;
2129         struct mdt_object *spobj;
2130         struct mdt_object *tpobj;
2131         struct mdt_object *sobj;
2132         struct mdt_object *tobj;
2133         struct mdt_lock_handle *rename_lh = &info->mti_lh[MDT_LH_RMT];
2134         struct mdt_lock_handle *lhsp;
2135         struct mdt_lock_handle *lhtp;
2136         struct mdt_lock_handle *lhs;
2137         struct mdt_lock_handle *lhl;
2138         LIST_HEAD(link_locks);
2139         int lock_retries = 5;
2140         bool reverse = false;
2141         bool open_sem_locked = false;
2142         bool do_sync = false;
2143         bool is_plain_dir = false;
2144         int rc;
2145
2146         ENTRY;
2147         CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1),
2148                PNAME(&rr->rr_name), PFID(rr->rr_fid2));
2149
2150         if (info->mti_dlm_req)
2151                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2152
2153         if (!fid_is_md_operative(rr->rr_fid1) ||
2154             !fid_is_md_operative(rr->rr_fid2))
2155                 RETURN(-EPERM);
2156
2157         /* don't allow migrate . or .. */
2158         if (lu_name_is_dot_or_dotdot(&rr->rr_name))
2159                 RETURN(-EBUSY);
2160
2161         if (!mdt->mdt_enable_remote_dir || !mdt->mdt_enable_dir_migration)
2162                 RETURN(-EPERM);
2163
2164         /* we want rbac roles to have precedence over any other
2165          * permission or capability checks
2166          */
2167         if (uc && (!uc->uc_rbac_dne_ops ||
2168                    (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
2169                     uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
2170                     mdt->mdt_enable_remote_dir_gid != -1)))
2171                 RETURN(-EPERM);
2172
2173         /*
2174          * Note: do not enqueue rename lock for replay request, because
2175          * if other MDT holds rename lock, but being blocked to wait for
2176          * this MDT to finish its recovery, and the failover MDT can not
2177          * get rename lock, which will cause deadlock.
2178          *
2179          * req is NULL if this is called by directory auto-split.
2180          */
2181         if (req && !req_is_replay(req)) {
2182                 rc = mdt_rename_lock(info, rename_lh);
2183                 if (rc != 0) {
2184                         CERROR("%s: can't lock FS for rename: rc = %d\n",
2185                                mdt_obd_name(info->mti_mdt), rc);
2186                         RETURN(rc);
2187                 }
2188         }
2189
2190         /* pobj is master object of parent */
2191         pobj = mdt_object_find(env, mdt, rr->rr_fid1);
2192         if (IS_ERR(pobj))
2193                 GOTO(unlock_rename, rc = PTR_ERR(pobj));
2194
2195         if (req) {
2196                 rc = mdt_version_get_check(info, pobj, 0);
2197                 if (rc)
2198                         GOTO(put_parent, rc);
2199         }
2200
2201         if (!mdt_object_exists(pobj))
2202                 GOTO(put_parent, rc = -ENOENT);
2203
2204         if (!S_ISDIR(lu_object_attr(&pobj->mot_obj)))
2205                 GOTO(put_parent, rc = -ENOTDIR);
2206
2207         rc = mdt_check_enc(info, pobj);
2208         if (rc)
2209                 GOTO(put_parent, rc);
2210
2211         rc = mdt_stripe_get(info, pobj, ma, XATTR_NAME_LMV);
2212         if (rc)
2213                 GOTO(put_parent, rc);
2214
2215         if (CFS_FAIL_CHECK(OBD_FAIL_MIGRATE_BAD_HASH) &&
2216             (ma->ma_valid & MA_LMV) &&
2217             lmv_is_migrating(&ma->ma_lmv->lmv_md_v1)) {
2218                 struct lu_buf *buf = &info->mti_buf;
2219                 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
2220                 __u32 version = le32_to_cpu(lmv->lmv_layout_version);
2221
2222                 lmv->lmv_hash_type = cpu_to_le32(LMV_HASH_TYPE_UNKNOWN |
2223                                                  LMV_HASH_FLAG_BAD_TYPE);
2224                 lmv->lmv_layout_version = cpu_to_le32(version + 1);
2225                 buf->lb_buf = lmv;
2226                 buf->lb_len = sizeof(*lmv);
2227                 rc = mo_xattr_set(env, mdt_object_child(pobj), buf,
2228                                   XATTR_NAME_LMV, LU_XATTR_REPLACE);
2229                 mo_invalidate(env, mdt_object_child(pobj));
2230                 GOTO(put_parent, rc);
2231         }
2232
2233         /* @spobj is the parent stripe of @sobj if @pobj is striped directory,
2234          * if @pobj is migrating too, tpobj is the target parent stripe.
2235          */
2236         rc = mdt_migrate_lookup(info, pobj, ma, &rr->rr_name, &spobj, &tpobj,
2237                                 &sobj);
2238         if (rc < 0)
2239                 GOTO(put_parent, rc);
2240         reverse = rc;
2241
2242         /* parent unchanged, this happens in dir restripe */
2243         if (info->mti_spec.sp_migrate_nsonly && spobj == tpobj)
2244                 GOTO(put_source, rc = -EALREADY);
2245
2246 lock_parent:
2247         LASSERT(spobj);
2248         LASSERT(tpobj);
2249         lhsp = &info->mti_lh[MDT_LH_PARENT];
2250         lhtp = &info->mti_lh[MDT_LH_CHILD];
2251         /* lock spobj and tpobj in stripe index order */
2252         if (reverse) {
2253                 rc = mdt_parent_lock(info, tpobj, lhtp, &rr->rr_name, LCK_PW);
2254                 if (rc)
2255                         GOTO(put_source, rc);
2256
2257                 LASSERT(spobj != tpobj);
2258                 rc = mdt_parent_lock(info, spobj, lhsp, &rr->rr_name, LCK_PW);
2259                 if (rc)
2260                         GOTO(unlock_parent, rc);
2261         } else {
2262                 rc = mdt_parent_lock(info, spobj, lhsp, &rr->rr_name, LCK_PW);
2263                 if (rc)
2264                         GOTO(put_source, rc);
2265
2266                 if (tpobj != spobj) {
2267                         rc = mdt_parent_lock(info, tpobj, lhtp, &rr->rr_name,
2268                                              LCK_PW);
2269                         if (rc)
2270                                 GOTO(unlock_parent, rc);
2271                 }
2272         }
2273
2274         /* if inode is not migrated, or is dir, no need to lock links */
2275         if (!info->mti_spec.sp_migrate_nsonly &&
2276             !S_ISDIR(lu_object_attr(&sobj->mot_obj))) {
2277                 /* lock link parents, and take LOOKUP lock of links */
2278                 rc = mdt_migrate_links_lock(info, spobj, tpobj, sobj, lhsp,
2279                                             lhtp, &link_locks);
2280                 if (rc == -EBUSY && lock_retries-- > 0) {
2281                         LASSERT(list_empty(&link_locks));
2282                         goto lock_parent;
2283                 }
2284
2285                 if (rc < 0)
2286                         GOTO(put_source, rc);
2287
2288                 /*
2289                  * RS_MAX_LOCKS is the limit of number of locks that can be
2290                  * saved along with one request, if total lock count exceeds
2291                  * this limit, we will drop all locks after migration, and
2292                  * trigger commit in the end.
2293                  */
2294                 do_sync = rc;
2295         }
2296
2297         /* lock source */
2298         lhs = &info->mti_lh[MDT_LH_OLD];
2299         lhl = &info->mti_lh[MDT_LH_LOOKUP];
2300         rc = mdt_rename_source_lock(info, spobj, sobj, lhs, lhl,
2301                                     MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR |
2302                                     MDS_INODELOCK_OPEN);
2303         if (rc)
2304                 GOTO(unlock_links, rc);
2305
2306         if (mdt_object_remote(sobj)) {
2307                 struct md_attr *ma2 = &info->mti_attr2;
2308                 ma2->ma_need = MA_INODE;
2309                 rc = mo_attr_get(env, mdt_object_child(sobj), ma2);
2310                 if (rc)
2311                         GOTO(unlock_source, rc);
2312         }
2313
2314         if (S_ISREG(lu_object_attr(&sobj->mot_obj))) {
2315                 /* TODO: DoM migration is not supported, migrate dirent only */
2316                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LOV);
2317                 if (rc)
2318                         GOTO(unlock_source, rc);
2319
2320                 if (ma->ma_valid & MA_LOV && mdt_lmm_dom_stripesize(ma->ma_lmm))
2321                         info->mti_spec.sp_migrate_nsonly = 1;
2322         } else if (S_ISDIR(lu_object_attr(&sobj->mot_obj))) {
2323                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2324                 if (rc)
2325                         GOTO(unlock_source, rc);
2326
2327                 if (!(ma->ma_valid & MA_LMV))
2328                         is_plain_dir = true;
2329                 else if (lmv_is_restriping(&ma->ma_lmv->lmv_md_v1))
2330                         /* race with restripe/auto-split */
2331                         GOTO(unlock_source, rc = -EBUSY);
2332                 else if (lmv_is_failed_migration(&ma->ma_lmv->lmv_md_v1)) {
2333                         struct lu_buf *buf = &info->mti_buf;
2334                         struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
2335                         __u32 version = le32_to_cpu(lmv->lmv_layout_version);
2336
2337                         /* migration failed before, and LFSCK cleared hash type
2338                          * and flags, fake it to resume migration.
2339                          */
2340                         lmv->lmv_hash_type =
2341                                 cpu_to_le32(LMV_HASH_TYPE_FNV_1A_64 |
2342                                             LMV_HASH_FLAG_MIGRATION |
2343                                             LMV_HASH_FLAG_BAD_TYPE |
2344                                             LMV_HASH_FLAG_FIXED);
2345                         lmv->lmv_layout_version = cpu_to_le32(version + 1);
2346                         buf->lb_buf = lmv;
2347                         buf->lb_len = sizeof(*lmv);
2348                         rc = mo_xattr_set(env, mdt_object_child(sobj), buf,
2349                                           XATTR_NAME_LMV, LU_XATTR_REPLACE);
2350                         mo_invalidate(env, mdt_object_child(sobj));
2351                         GOTO(unlock_source, rc = -EALREADY);
2352                 }
2353         }
2354
2355         /* if migration HSM is allowed */
2356         if (!mdt->mdt_migrate_hsm_allowed) {
2357                 ma->ma_need = MA_HSM;
2358                 ma->ma_valid = 0;
2359                 rc = mdt_attr_get_complex(info, sobj, ma);
2360                 if (rc)
2361                         GOTO(unlock_source, rc);
2362
2363                 if ((ma->ma_valid & MA_HSM) && ma->ma_hsm.mh_flags != 0)
2364                         GOTO(unlock_source, rc = -EOPNOTSUPP);
2365         }
2366
2367         /* end lease and close file for regular file */
2368         if (info->mti_spec.sp_migrate_close) {
2369                 /* try to hold open_sem so that nobody else can open the file */
2370                 if (!down_write_trylock(&sobj->mot_open_sem)) {
2371                         /* close anyway */
2372                         mdd_migrate_close(info, sobj);
2373                         GOTO(unlock_source, rc = -EBUSY);
2374                 } else {
2375                         open_sem_locked = true;
2376                         rc = mdd_migrate_close(info, sobj);
2377                         if (rc && rc != -ESTALE)
2378                                 GOTO(unlock_open_sem, rc);
2379                 }
2380         }
2381
2382         tobj = mdt_object_find(env, mdt, rr->rr_fid2);
2383         if (IS_ERR(tobj))
2384                 GOTO(unlock_open_sem, rc = PTR_ERR(tobj));
2385
2386         /* Don't do lookup sanity check. We know name doesn't exist. */
2387         info->mti_spec.sp_cr_lookup = 0;
2388         info->mti_spec.sp_feat = &dt_directory_features;
2389
2390         rc = mdo_migrate(env, mdt_object_child(spobj),
2391                          mdt_object_child(tpobj), mdt_object_child(sobj),
2392                          mdt_object_child(tobj), &rr->rr_name,
2393                          &info->mti_spec, ma);
2394         if (rc)
2395                 GOTO(put_target, rc);
2396
2397         /* save target locks for directory */
2398         if (S_ISDIR(lu_object_attr(&sobj->mot_obj)) &&
2399             !info->mti_spec.sp_migrate_nsonly) {
2400                 struct mdt_lock_handle *lht = &info->mti_lh[MDT_LH_NEW];
2401                 struct ldlm_enqueue_info *einfo = &info->mti_einfo;
2402
2403                 /* in case sobj becomes a stripe of tobj, unlock sobj here,
2404                  * otherwise stripes lock may deadlock.
2405                  */
2406                 if (is_plain_dir)
2407                         mdt_rename_source_unlock(info, sobj, lhs, lhl, 1);
2408
2409                 rc = mdt_object_stripes_lock(info, tpobj, tobj, lht, einfo,
2410                                              MDS_INODELOCK_UPDATE, LCK_PW);
2411                 if (rc)
2412                         GOTO(put_target, rc);
2413
2414                 mdt_object_stripes_unlock(info, tobj, lht, einfo, 0);
2415         }
2416
2417         lprocfs_counter_incr(mdt->mdt_lu_dev.ld_obd->obd_md_stats,
2418                              LPROC_MDT_MIGRATE + LPROC_MD_LAST_OPC);
2419
2420         EXIT;
2421 put_target:
2422         mdt_object_put(env, tobj);
2423 unlock_open_sem:
2424         if (open_sem_locked)
2425                 up_write(&sobj->mot_open_sem);
2426 unlock_source:
2427         mdt_rename_source_unlock(info, sobj, lhs, lhl, rc);
2428 unlock_links:
2429         /* if we've got too many locks to save into RPC,
2430          * then just commit before the locks are released
2431          */
2432         if (!rc && do_sync)
2433                 mdt_device_sync(env, mdt);
2434         mdt_migrate_links_unlock(info, &link_locks, do_sync ? 1 : rc);
2435 unlock_parent:
2436         mdt_object_unlock(info, spobj, lhsp, rc);
2437         mdt_object_unlock(info, tpobj, lhtp, rc);
2438 put_source:
2439         mdt_object_put(env, sobj);
2440         mdt_object_put(env, spobj);
2441         mdt_object_put(env, tpobj);
2442 put_parent:
2443         mo_invalidate(env, mdt_object_child(pobj));
2444         mdt_object_put(env, pobj);
2445 unlock_rename:
2446         mdt_rename_unlock(info, rename_lh);
2447
2448         if (rc)
2449                 CERROR("%s: migrate "DFID"/"DNAME" failed: rc = %d\n",
2450                        mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1),
2451                        PNAME(&rr->rr_name), rc);
2452
2453         return rc;
2454 }
2455
2456 /*
2457  * determine lock order of sobj and tobj
2458  *
2459  * there are two situations we need to lock tobj before sobj:
2460  * 1. sobj is child of tobj
2461  * 2. sobj and tobj are stripes of a directory, and stripe index of sobj is
2462  *    larger than that of tobj
2463  *
2464  * \retval      1 lock tobj before sobj
2465  * \retval      0 lock sobj before tobj
2466  * \retval      -ev negative errno upon error
2467  */
2468 static int mdt_rename_determine_lock_order(struct mdt_thread_info *info,
2469                                            struct mdt_object *sobj,
2470                                            struct mdt_object *tobj)
2471 {
2472         struct md_attr *ma = &info->mti_attr;
2473         struct lu_fid *spfid = &info->mti_tmp_fid1;
2474         struct lu_fid *tpfid = &info->mti_tmp_fid2;
2475         struct lmv_mds_md_v1 *lmv;
2476         __u32 sindex;
2477         __u32 tindex;
2478         int rc;
2479
2480         /* sobj and tobj are the same */
2481         if (sobj == tobj)
2482                 return 0;
2483
2484         if (fid_is_root(mdt_object_fid(sobj)))
2485                 return 0;
2486
2487         if (fid_is_root(mdt_object_fid(tobj)))
2488                 return 1;
2489
2490         /* check whether sobj is child of tobj */
2491         rc = mdo_is_subdir(info->mti_env, mdt_object_child(sobj),
2492                            mdt_object_fid(tobj));
2493         if (rc < 0)
2494                 return rc;
2495
2496         if (rc == 1)
2497                 return 1;
2498
2499         /* check whether sobj and tobj are children of the same parent */
2500         rc = mdt_attr_get_pfid(info, sobj, spfid);
2501         if (rc)
2502                 return rc;
2503
2504         rc = mdt_attr_get_pfid(info, tobj, tpfid);
2505         if (rc)
2506                 return rc;
2507
2508         if (!lu_fid_eq(spfid, tpfid))
2509                 return 0;
2510
2511         /* check whether sobj and tobj are sibling stripes */
2512         rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2513         if (rc)
2514                 return rc;
2515
2516         if (!(ma->ma_valid & MA_LMV))
2517                 return 0;
2518
2519         lmv = &ma->ma_lmv->lmv_md_v1;
2520         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2521                 return 0;
2522         sindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2523
2524         ma->ma_valid = 0;
2525         rc = mdt_stripe_get(info, tobj, ma, XATTR_NAME_LMV);
2526         if (rc)
2527                 return rc;
2528
2529         if (!(ma->ma_valid & MA_LMV))
2530                 return -ENODATA;
2531
2532         lmv = &ma->ma_lmv->lmv_md_v1;
2533         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2534                 return -EINVAL;
2535         tindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2536
2537         /* check stripe index of sobj and tobj */
2538         if (sindex == tindex)
2539                 return -EINVAL;
2540
2541         return sindex < tindex ? 0 : 1;
2542 }
2543
2544 /* Helper function for mdt_reint_rename so we don't need to opencode
2545  * two different order lockings
2546  */
2547 static int mdt_lock_two_dirs(struct mdt_thread_info *info,
2548                              struct mdt_object *mfirstdir,
2549                              struct mdt_lock_handle *lh_firstdirp,
2550                              const struct lu_name *firstname,
2551                              struct mdt_object *mseconddir,
2552                              struct mdt_lock_handle *lh_seconddirp,
2553                              const struct lu_name *secondname)
2554 {
2555         int rc;
2556
2557         rc = mdt_parent_lock(info, mfirstdir, lh_firstdirp, firstname, LCK_PW);
2558         if (rc)
2559                 return rc;
2560
2561         mdt_version_get_save(info, mfirstdir, 0);
2562         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
2563
2564         if (mfirstdir != mseconddir) {
2565                 rc = mdt_parent_lock(info, mseconddir, lh_seconddirp,
2566                                      secondname, LCK_PW);
2567         } else if (!mdt_object_remote(mseconddir)) {
2568                 if (lh_firstdirp->mlh_pdo_hash !=
2569                     lh_seconddirp->mlh_pdo_hash) {
2570                         rc = mdt_object_pdo_lock(info, mseconddir,
2571                                                  lh_seconddirp, secondname,
2572                                                  LCK_PW, false);
2573                         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
2574                 }
2575         }
2576         mdt_version_get_save(info, mseconddir, 1);
2577
2578         if (rc != 0)
2579                 mdt_object_unlock(info, mfirstdir, lh_firstdirp, rc);
2580
2581         return rc;
2582 }
2583
2584 /*
2585  * VBR: rename versions in reply: 0 - srcdir parent; 1 - tgtdir parent;
2586  * 2 - srcdir child; 3 - tgtdir child.
2587  * Update on disk version of srcdir child.
2588  */
2589 static int mdt_reint_rename(struct mdt_thread_info *info,
2590                             struct mdt_lock_handle *unused)
2591 {
2592         struct mdt_device *mdt = info->mti_mdt;
2593         struct mdt_reint_record *rr = &info->mti_rr;
2594         struct md_attr *ma = &info->mti_attr;
2595         struct ptlrpc_request *req = mdt_info_req(info);
2596         struct mdt_object *msrcdir = NULL;
2597         struct mdt_object *mtgtdir = NULL;
2598         struct mdt_object *mold;
2599         struct mdt_object *mnew = NULL;
2600         struct mdt_lock_handle *rename_lh = &info->mti_lh[MDT_LH_RMT];
2601         struct mdt_lock_handle *lh_srcdirp;
2602         struct mdt_lock_handle *lh_tgtdirp;
2603         struct mdt_lock_handle *lh_oldp = NULL;
2604         struct mdt_lock_handle *lh_lookup = NULL;
2605         struct mdt_lock_handle *lh_newp = NULL;
2606         struct lu_fid *old_fid = &info->mti_tmp_fid1;
2607         struct lu_fid *new_fid = &info->mti_tmp_fid2;
2608         struct lu_ucred *uc = mdt_ucred(info);
2609         bool reverse = false, discard = false;
2610         ktime_t kstart = ktime_get();
2611         enum mdt_stat_idx msi = 0;
2612         bool remote;
2613         bool bfl = false;
2614         int rc;
2615
2616         ENTRY;
2617         DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME,
2618                   PFID(rr->rr_fid1), PNAME(&rr->rr_name),
2619                   PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name));
2620
2621         if (info->mti_dlm_req)
2622                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2623
2624         if (!fid_is_md_operative(rr->rr_fid1) ||
2625             !fid_is_md_operative(rr->rr_fid2))
2626                 RETURN(-EPERM);
2627
2628         /* find both parents. */
2629         msrcdir = mdt_parent_find_check(info, rr->rr_fid1, 0);
2630         if (IS_ERR(msrcdir))
2631                 RETURN(PTR_ERR(msrcdir));
2632
2633         rc = mdt_check_enc(info, msrcdir);
2634         if (rc)
2635                 GOTO(out_put_srcdir, rc);
2636
2637         remote = mdt_object_remote(msrcdir);
2638         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
2639
2640         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
2641                 mtgtdir = msrcdir;
2642                 mdt_object_get(info->mti_env, mtgtdir);
2643         } else {
2644                 mtgtdir = mdt_parent_find_check(info, rr->rr_fid2, 1);
2645                 if (IS_ERR(mtgtdir))
2646                         GOTO(out_put_srcdir, rc = PTR_ERR(mtgtdir));
2647         }
2648
2649         rc = mdt_check_enc(info, mtgtdir);
2650         if (rc)
2651                 GOTO(out_put_tgtdir, rc);
2652
2653         if (!uc->uc_rbac_fscrypt_admin &&
2654             mtgtdir->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD)
2655                 GOTO(out_put_tgtdir, rc = -EPERM);
2656
2657         /*
2658          * Note: do not enqueue rename lock for replay request, because
2659          * if other MDT holds rename lock, but being blocked to wait for
2660          * this MDT to finish its recovery, and the failover MDT can not
2661          * get rename lock, which will cause deadlock.
2662          */
2663         if (!req_is_replay(req)) {
2664                 /*
2665                  * Normally rename RPC is handled on the MDT with the target
2666                  * directory (if target exists, it's on the MDT with the
2667                  * target), if the source directory is remote, it's a hint that
2668                  * source is remote too (this may not be true, but it won't
2669                  * cause any issue), return -EXDEV early to avoid taking
2670                  * rename_lock.
2671                  */
2672                 if (!mdt->mdt_enable_remote_rename && remote)
2673                         GOTO(out_put_tgtdir, rc = -EXDEV);
2674
2675                 if (remote ||
2676                     (S_ISDIR(ma->ma_attr.la_mode) &&
2677                      (msrcdir != mtgtdir ||
2678                       !mdt->mdt_enable_parallel_rename_dir)) ||
2679                     (!S_ISDIR(ma->ma_attr.la_mode) &&
2680                      (!mdt->mdt_enable_parallel_rename_file ||
2681                       (msrcdir != mtgtdir &&
2682                        !mdt->mdt_enable_parallel_rename_crossdir)))) {
2683                         rc = mdt_rename_lock(info, rename_lh);
2684                         if (rc != 0) {
2685                                 CERROR("%s: cannot lock for rename: rc = %d\n",
2686                                        mdt_obd_name(mdt), rc);
2687                                 GOTO(out_put_tgtdir, rc);
2688                         }
2689                         bfl = true;
2690                 } else {
2691                         if (S_ISDIR(ma->ma_attr.la_mode))
2692                                 msi = LPROC_MDT_RENAME_PAR_DIR;
2693                         else
2694                                 msi = LPROC_MDT_RENAME_PAR_FILE;
2695
2696                         CDEBUG(D_INFO,
2697                                "%s: %s %s parallel rename "DFID"/"DNAME"\n",
2698                                mdt_obd_name(mdt),
2699                                msrcdir == mtgtdir ? "samedir" : "crossdir",
2700                                S_ISDIR(ma->ma_attr.la_mode) ? "dir" : "file",
2701                                PFID(rr->rr_fid1), PNAME(&rr->rr_name));
2702                 }
2703         }
2704
2705 lock_parents:
2706         rc = mdt_rename_determine_lock_order(info, msrcdir, mtgtdir);
2707         if (rc < 0)
2708                 GOTO(out_unlock_rename, rc);
2709         reverse = rc;
2710
2711         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2712         CFS_RACE(OBD_FAIL_MDS_REINT_OPEN);
2713         CFS_RACE(OBD_FAIL_MDS_REINT_OPEN2);
2714
2715         /* lock parents in the proper order. */
2716         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
2717         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
2718         mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name);
2719         mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name);
2720
2721         /* In case of same dir local rename we must sort by the hash,
2722          * otherwise a lock deadlock is possible when renaming
2723          * a to b and b to a at the same time LU-15285
2724          */
2725         if (!mdt_object_remote(mtgtdir) && mtgtdir == msrcdir)
2726                 reverse = lh_srcdirp->mlh_pdo_hash > lh_tgtdirp->mlh_pdo_hash;
2727         if (unlikely(CFS_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK)))
2728                 reverse = 0;
2729
2730         if (reverse)
2731                 rc = mdt_lock_two_dirs(info, mtgtdir, lh_tgtdirp,
2732                                        &rr->rr_tgt_name, msrcdir, lh_srcdirp,
2733                                        &rr->rr_name);
2734         else
2735                 rc = mdt_lock_two_dirs(info, msrcdir, lh_srcdirp, &rr->rr_name,
2736                                        mtgtdir, lh_tgtdirp, &rr->rr_tgt_name);
2737
2738         if (rc != 0)
2739                 GOTO(out_unlock_rename, rc);
2740
2741         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2742         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME2, 5);
2743
2744         /* find mold object. */
2745         fid_zero(old_fid);
2746         rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2);
2747         if (rc != 0)
2748                 GOTO(out_unlock_parents, rc);
2749
2750         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
2751                 GOTO(out_unlock_parents, rc = -EINVAL);
2752
2753         if (!fid_is_md_operative(old_fid))
2754                 GOTO(out_unlock_parents, rc = -EPERM);
2755
2756         mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
2757         if (IS_ERR(mold))
2758                 GOTO(out_unlock_parents, rc = PTR_ERR(mold));
2759
2760         if (!mdt_object_exists(mold)) {
2761                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2762                                 &mold->mot_obj,
2763                                 "object does not exist");
2764                 GOTO(out_put_old, rc = -ENOENT);
2765         }
2766
2767         if (mdt_object_remote(mold) && !mdt->mdt_enable_remote_rename)
2768                 GOTO(out_put_old, rc = -EXDEV);
2769
2770         /* we used msrcdir as a hint to take BFL, but it may be wrong */
2771         if (unlikely(!bfl && !req_is_replay(req) &&
2772                      !S_ISDIR(ma->ma_attr.la_mode) &&
2773                      mdt_object_remote(mold))) {
2774                 LASSERT(!remote);
2775                 mdt_object_put(info->mti_env, mold);
2776                 mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
2777                 mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
2778
2779                 rc = mdt_rename_lock(info, rename_lh);
2780                 if (rc != 0) {
2781                         CERROR("%s: cannot re-lock for rename: rc = %d\n",
2782                                mdt_obd_name(mdt), rc);
2783                         GOTO(out_put_tgtdir, rc);
2784                 }
2785                 bfl = true;
2786                 msi = 0;
2787                 goto lock_parents;
2788         }
2789
2790         /* Check if @mtgtdir is subdir of @mold, before locking child
2791          * to avoid reverse locking.
2792          */
2793         if (mtgtdir != msrcdir) {
2794                 rc = mdo_is_subdir(info->mti_env, mdt_object_child(mtgtdir),
2795                                    old_fid);
2796                 if (rc) {
2797                         if (rc == 1)
2798                                 rc = -EINVAL;
2799                         GOTO(out_put_old, rc);
2800                 }
2801         }
2802
2803         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold));
2804         /* save version after locking */
2805         mdt_version_get_save(info, mold, 2);
2806
2807         /* find mnew object:
2808          * mnew target object may not exist now
2809          * lookup with version checking
2810          */
2811         fid_zero(new_fid);
2812         rc = mdt_lookup_version_check(info, mtgtdir, &rr->rr_tgt_name, new_fid,
2813                                       3);
2814         if (rc == 0) {
2815                 /* the new_fid should have been filled at this moment */
2816                 if (lu_fid_eq(old_fid, new_fid))
2817                         GOTO(out_put_old, rc);
2818
2819                 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
2820                     lu_fid_eq(new_fid, rr->rr_fid2))
2821                         GOTO(out_put_old, rc = -EINVAL);
2822
2823                 if (!fid_is_md_operative(new_fid))
2824                         GOTO(out_put_old, rc = -EPERM);
2825
2826                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
2827                 if (IS_ERR(mnew))
2828                         GOTO(out_put_old, rc = PTR_ERR(mnew));
2829
2830                 if (!mdt_object_exists(mnew)) {
2831                         LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2832                                         &mnew->mot_obj,
2833                                         "object does not exist");
2834                         GOTO(out_put_new, rc = -ENOENT);
2835                 }
2836
2837                 if (mdt_object_remote(mnew)) {
2838                         struct mdt_body  *repbody;
2839
2840                         /* Always send rename req to the target child MDT */
2841                         repbody = req_capsule_server_get(info->mti_pill,
2842                                                          &RMF_MDT_BODY);
2843                         LASSERT(repbody != NULL);
2844                         repbody->mbo_fid1 = *new_fid;
2845                         repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
2846                         GOTO(out_put_new, rc = -EXDEV);
2847                 }
2848                 /* Before locking the target dir, check we do not replace
2849                  * a dir with a non-dir, otherwise it may deadlock with
2850                  * link op which tries to create a link in this dir
2851                  * back to this non-dir.
2852                  */
2853                 if (S_ISDIR(lu_object_attr(&mnew->mot_obj)) &&
2854                     !S_ISDIR(lu_object_attr(&mold->mot_obj)))
2855                         GOTO(out_put_new, rc = -EISDIR);
2856
2857                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2858                 lh_lookup = &info->mti_lh[MDT_LH_LOOKUP];
2859                 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2860                                             lh_lookup,
2861                                             MDS_INODELOCK_LOOKUP |
2862                                             MDS_INODELOCK_XATTR);
2863                 if (rc < 0)
2864                         GOTO(out_put_new, rc);
2865
2866                 /* Check if @msrcdir is subdir of @mnew, before locking child
2867                  * to avoid reverse locking.
2868                  */
2869                 if (mtgtdir != msrcdir) {
2870                         rc = mdo_is_subdir(info->mti_env,
2871                                            mdt_object_child(msrcdir), new_fid);
2872                         if (rc) {
2873                                 if (rc == 1)
2874                                         rc = -EINVAL;
2875                                 GOTO(out_unlock_old, rc);
2876                         }
2877                 }
2878
2879                 /* We used to acquire MDS_INODELOCK_FULL here but we
2880                  * can't do this now because a running HSM restore on
2881                  * the rename onto victim will hold the layout
2882                  * lock. See LU-4002.
2883                  */
2884
2885                 lh_newp = &info->mti_lh[MDT_LH_NEW];
2886                 rc = mdt_object_check_lock(info, mtgtdir, mnew, lh_newp,
2887                                            MDS_INODELOCK_LOOKUP |
2888                                            MDS_INODELOCK_UPDATE, LCK_EX);
2889                 if (rc != 0)
2890                         GOTO(out_unlock_new, rc);
2891
2892                 /* get and save version after locking */
2893                 mdt_version_get_save(info, mnew, 3);
2894         } else if (rc != -ENOENT) {
2895                 GOTO(out_put_old, rc);
2896         } else {
2897                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2898                 lh_lookup = &info->mti_lh[MDT_LH_LOOKUP];
2899                 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2900                                             lh_lookup,
2901                                             MDS_INODELOCK_LOOKUP |
2902                                             MDS_INODELOCK_XATTR);
2903                 if (rc != 0)
2904                         GOTO(out_put_old, rc);
2905
2906                 mdt_enoent_version_save(info, 3);
2907         }
2908
2909         /* step 5: rename it */
2910         mdt_reint_init_ma(info, ma);
2911
2912         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
2913                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
2914
2915         if (mnew != NULL)
2916                 mutex_lock(&mnew->mot_lov_mutex);
2917
2918         rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
2919                         mdt_object_child(mtgtdir), old_fid, &rr->rr_name,
2920                         mnew != NULL ? mdt_object_child(mnew) : NULL,
2921                         &rr->rr_tgt_name, ma);
2922
2923         if (mnew != NULL)
2924                 mutex_unlock(&mnew->mot_lov_mutex);
2925
2926         /* handle last link of tgt object */
2927         if (rc == 0) {
2928                 if (mnew) {
2929                         mdt_handle_last_unlink(info, mnew, ma);
2930                         discard = mdt_dom_check_for_discard(info, mnew);
2931                 }
2932                 mdt_rename_counter_tally(info, info->mti_mdt, req,
2933                                          msrcdir, mtgtdir, msi,
2934                                          ktime_us_delta(ktime_get(), kstart));
2935         }
2936
2937         EXIT;
2938 out_unlock_new:
2939         if (mnew != NULL)
2940                 /* mnew is gone, no need to keep lock */
2941                 mdt_object_unlock(info, mnew, lh_newp, 1);
2942 out_unlock_old:
2943         mdt_object_unlock(info, NULL, lh_lookup, rc);
2944         mdt_object_unlock(info, mold, lh_oldp, rc);
2945 out_put_new:
2946         if (mnew && !discard)
2947                 mdt_object_put(info->mti_env, mnew);
2948 out_put_old:
2949         mdt_object_put(info->mti_env, mold);
2950 out_unlock_parents:
2951         mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
2952         mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
2953 out_unlock_rename:
2954         mdt_rename_unlock(info, rename_lh);
2955 out_put_tgtdir:
2956         mdt_object_put(info->mti_env, mtgtdir);
2957 out_put_srcdir:
2958         mdt_object_put(info->mti_env, msrcdir);
2959
2960         /* The DoM discard can be done right in the place above where it is
2961          * assigned, meanwhile it is done here after rename unlock due to
2962          * compatibility with old clients, for them the discard blocks
2963          * the main thread until completion. Check LU-11359 for details.
2964          */
2965         if (discard) {
2966                 mdt_dom_discard_data(info, mnew);
2967                 mdt_object_put(info->mti_env, mnew);
2968         }
2969         CFS_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
2970         return rc;
2971 }
2972
2973 static int mdt_reint_resync(struct mdt_thread_info *info,
2974                             struct mdt_lock_handle *lhc)
2975 {
2976         struct mdt_reint_record *rr = &info->mti_rr;
2977         struct ptlrpc_request *req = mdt_info_req(info);
2978         struct md_attr *ma = &info->mti_attr;
2979         struct mdt_object *mo;
2980         struct ldlm_lock *lease;
2981         struct mdt_body *repbody;
2982         struct md_layout_change layout = { .mlc_mirror_id = rr->rr_mirror_id };
2983         bool lease_broken;
2984         int rc;
2985
2986         ENTRY;
2987         DEBUG_REQ(D_INODE, req, DFID", FLR file resync", PFID(rr->rr_fid1));
2988
2989         if (info->mti_dlm_req)
2990                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2991
2992         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
2993         if (IS_ERR(mo))
2994                 GOTO(out, rc = PTR_ERR(mo));
2995
2996         if (!mdt_object_exists(mo))
2997                 GOTO(out_obj, rc = -ENOENT);
2998
2999         if (!S_ISREG(lu_object_attr(&mo->mot_obj)))
3000                 GOTO(out_obj, rc = -EINVAL);
3001
3002         if (mdt_object_remote(mo))
3003                 GOTO(out_obj, rc = -EREMOTE);
3004
3005         lease = ldlm_handle2lock(rr->rr_lease_handle);
3006         if (lease == NULL)
3007                 GOTO(out_obj, rc = -ESTALE);
3008
3009         /* It's really necessary to grab open_sem and check if the lease lock
3010          * has been lost. There would exist a concurrent writer coming in and
3011          * generating some dirty data in memory cache, the writeback would fail
3012          * after the layout version is increased by MDS_REINT_RESYNC RPC.
3013          */
3014         if (!down_write_trylock(&mo->mot_open_sem))
3015                 GOTO(out_put_lease, rc = -EBUSY);
3016
3017         lock_res_and_lock(lease);
3018         lease_broken = ldlm_is_cancel(lease);
3019         unlock_res_and_lock(lease);
3020         if (lease_broken)
3021                 GOTO(out_unlock, rc = -EBUSY);
3022
3023         /* the file has yet opened by anyone else after we took the lease. */
3024         layout.mlc_opc = MD_LAYOUT_RESYNC;
3025         lhc = &info->mti_lh[MDT_LH_LOCAL];
3026         rc = mdt_layout_change(info, mo, lhc, &layout);
3027         if (rc)
3028                 GOTO(out_unlock, rc);
3029
3030         mdt_object_unlock(info, mo, lhc, 0);
3031
3032         ma->ma_need = MA_INODE;
3033         ma->ma_valid = 0;
3034         rc = mdt_attr_get_complex(info, mo, ma);
3035         if (rc != 0)
3036                 GOTO(out_unlock, rc);
3037
3038         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
3039         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
3040
3041         EXIT;
3042 out_unlock:
3043         up_write(&mo->mot_open_sem);
3044 out_put_lease:
3045         LDLM_LOCK_PUT(lease);
3046 out_obj:
3047         mdt_object_put(info->mti_env, mo);
3048 out:
3049         mdt_client_compatibility(info);
3050         return rc;
3051 }
3052
3053 struct mdt_reinter {
3054         int (*mr_handler)(struct mdt_thread_info *, struct mdt_lock_handle *);
3055         enum lprocfs_extra_opc mr_extra_opc;
3056 };
3057
3058 static const struct mdt_reinter mdt_reinters[] = {
3059         [REINT_SETATTR] = {
3060                 .mr_handler = &mdt_reint_setattr,
3061                 .mr_extra_opc = MDS_REINT_SETATTR,
3062         },
3063         [REINT_CREATE] = {
3064                 .mr_handler = &mdt_reint_create,
3065                 .mr_extra_opc = MDS_REINT_CREATE,
3066         },
3067         [REINT_LINK] = {
3068                 .mr_handler = &mdt_reint_link,
3069                 .mr_extra_opc = MDS_REINT_LINK,
3070         },
3071         [REINT_UNLINK] = {
3072                 .mr_handler = &mdt_reint_unlink,
3073                 .mr_extra_opc = MDS_REINT_UNLINK,
3074         },
3075         [REINT_RENAME] = {
3076                 .mr_handler = &mdt_reint_rename,
3077                 .mr_extra_opc = MDS_REINT_RENAME,
3078         },
3079         [REINT_OPEN] = {
3080                 .mr_handler = &mdt_reint_open,
3081                 .mr_extra_opc = MDS_REINT_OPEN,
3082         },
3083         [REINT_SETXATTR] = {
3084                 .mr_handler = &mdt_reint_setxattr,
3085                 .mr_extra_opc = MDS_REINT_SETXATTR,
3086         },
3087         [REINT_RMENTRY] = {
3088                 .mr_handler = &mdt_reint_unlink,
3089                 .mr_extra_opc = MDS_REINT_UNLINK,
3090         },
3091         [REINT_MIGRATE] = {
3092                 .mr_handler = &mdt_reint_migrate,
3093                 .mr_extra_opc = MDS_REINT_RENAME,
3094         },
3095         [REINT_RESYNC] = {
3096                 .mr_handler = &mdt_reint_resync,
3097                 .mr_extra_opc = MDS_REINT_RESYNC,
3098         },
3099 };
3100
3101 int mdt_reint_rec(struct mdt_thread_info *info,
3102                   struct mdt_lock_handle *lhc)
3103 {
3104         const struct mdt_reinter *mr;
3105         int rc;
3106
3107         ENTRY;
3108         if (!(info->mti_rr.rr_opcode < ARRAY_SIZE(mdt_reinters)))
3109                 RETURN(-EPROTO);
3110
3111         mr = &mdt_reinters[info->mti_rr.rr_opcode];
3112         if (mr->mr_handler == NULL)
3113                 RETURN(-EPROTO);
3114
3115         rc = (*mr->mr_handler)(info, lhc);
3116
3117         lprocfs_counter_incr(ptlrpc_req2svc(mdt_info_req(info))->srv_stats,
3118                              PTLRPC_LAST_CNTR + mr->mr_extra_opc);
3119
3120         RETURN(rc);
3121 }