Whamcloud - gitweb
97c9e8354772491555f30d56a7376d95367aa9c8
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mdt/mdt_reint.c
32  *
33  * Lustre Metadata Target (mdt) reintegration routines
34  *
35  * Author: Peter Braam <braam@clusterfs.com>
36  * Author: Andreas Dilger <adilger@clusterfs.com>
37  * Author: Phil Schwan <phil@clusterfs.com>
38  * Author: Huang Hua <huanghua@clusterfs.com>
39  * Author: Yury Umanets <umka@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_MDS
43
44 #include <lprocfs_status.h>
45 #include "mdt_internal.h"
46 #include <lustre_lmv.h>
47 #include <lustre_crypto.h>
48
49 static inline void mdt_reint_init_ma(struct mdt_thread_info *info,
50                                      struct md_attr *ma)
51 {
52         ma->ma_need = MA_INODE;
53         ma->ma_valid = 0;
54 }
55
56 /**
57  * Get version of object by fid.
58  *
59  * Return real version or ENOENT_VERSION if object doesn't exist
60  */
61 static void mdt_obj_version_get(struct mdt_thread_info *info,
62                                 struct mdt_object *o, __u64 *version)
63 {
64         LASSERT(o);
65
66         if (mdt_object_exists(o) && !mdt_object_remote(o) &&
67             !fid_is_obf(mdt_object_fid(o)))
68                 *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
69         else
70                 *version = ENOENT_VERSION;
71         CDEBUG(D_INODE, "FID "DFID" version is %#llx\n",
72                PFID(mdt_object_fid(o)), *version);
73 }
74
75 /**
76  * Check version is correct.
77  *
78  * Should be called only during replay.
79  */
80 static int mdt_version_check(struct ptlrpc_request *req,
81                              __u64 version, int idx)
82 {
83         __u64 *pre_ver = lustre_msg_get_versions(req->rq_reqmsg);
84
85         ENTRY;
86         if (!exp_connect_vbr(req->rq_export))
87                 RETURN(0);
88
89         LASSERT(req_is_replay(req));
90         /** VBR: version is checked always because costs nothing */
91         LASSERT(idx < PTLRPC_NUM_VERSIONS);
92         /** Sanity check for malformed buffers */
93         if (pre_ver == NULL) {
94                 CERROR("No versions in request buffer\n");
95                 spin_lock(&req->rq_export->exp_lock);
96                 req->rq_export->exp_vbr_failed = 1;
97                 spin_unlock(&req->rq_export->exp_lock);
98                 RETURN(-EOVERFLOW);
99         } else if (pre_ver[idx] != version) {
100                 CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n",
101                        pre_ver[idx], version);
102                 spin_lock(&req->rq_export->exp_lock);
103                 req->rq_export->exp_vbr_failed = 1;
104                 spin_unlock(&req->rq_export->exp_lock);
105                 RETURN(-EOVERFLOW);
106         }
107         RETURN(0);
108 }
109
110 /**
111  * Save pre-versions in reply.
112  */
113 static void mdt_version_save(struct ptlrpc_request *req, __u64 version,
114                              int idx)
115 {
116         __u64 *reply_ver;
117
118         if (!exp_connect_vbr(req->rq_export))
119                 return;
120
121         LASSERT(!req_is_replay(req));
122         LASSERT(req->rq_repmsg != NULL);
123         reply_ver = lustre_msg_get_versions(req->rq_repmsg);
124         if (reply_ver)
125                 reply_ver[idx] = version;
126 }
127
128 /**
129  * Save enoent version, it is needed when it is obvious that object doesn't
130  * exist, e.g. child during create.
131  */
132 static void mdt_enoent_version_save(struct mdt_thread_info *info, int idx)
133 {
134         /* save version of file name for replay, it must be ENOENT here */
135         if (!req_is_replay(mdt_info_req(info))) {
136                 info->mti_ver[idx] = ENOENT_VERSION;
137                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
138         }
139 }
140
141 /**
142  * Get version from disk and save in reply buffer.
143  *
144  * Versions are saved in reply only during normal operations not replays.
145  */
146 void mdt_version_get_save(struct mdt_thread_info *info,
147                           struct mdt_object *mto, int idx)
148 {
149         /* don't save versions during replay */
150         if (!req_is_replay(mdt_info_req(info))) {
151                 mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
152                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
153         }
154 }
155
156 /**
157  * Get version from disk and check it, no save in reply.
158  */
159 int mdt_version_get_check(struct mdt_thread_info *info,
160                           struct mdt_object *mto, int idx)
161 {
162         /* only check versions during replay */
163         if (!req_is_replay(mdt_info_req(info)))
164                 return 0;
165
166         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
167         return mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
168 }
169
170 /**
171  * Get version from disk and check if recovery or just save.
172  */
173 int mdt_version_get_check_save(struct mdt_thread_info *info,
174                                struct mdt_object *mto, int idx)
175 {
176         int rc = 0;
177
178         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
179         if (req_is_replay(mdt_info_req(info)))
180                 rc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx],
181                                        idx);
182         else
183                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
184         return rc;
185 }
186
187 /**
188  * Lookup with version checking.
189  *
190  * This checks version of 'name'. Many reint functions uses 'name' for child not
191  * FID, therefore we need to get object by name and check its version.
192  */
193 int mdt_lookup_version_check(struct mdt_thread_info *info,
194                              struct mdt_object *p,
195                              const struct lu_name *lname,
196                              struct lu_fid *fid, int idx)
197 {
198         int rc, vbrc;
199
200         rc = mdo_lookup(info->mti_env, mdt_object_child(p), lname, fid,
201                         &info->mti_spec);
202         /* Check version only during replay */
203         if (!req_is_replay(mdt_info_req(info)))
204                 return rc;
205
206         info->mti_ver[idx] = ENOENT_VERSION;
207         if (rc == 0) {
208                 struct mdt_object *child;
209
210                 child = mdt_object_find(info->mti_env, info->mti_mdt, fid);
211                 if (likely(!IS_ERR(child))) {
212                         mdt_obj_version_get(info, child, &info->mti_ver[idx]);
213                         mdt_object_put(info->mti_env, child);
214                 }
215         }
216         vbrc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
217         return vbrc ? vbrc : rc;
218
219 }
220
221 static int mdt_stripes_unlock(struct mdt_thread_info *mti,
222                               struct mdt_object *obj,
223                               struct ldlm_enqueue_info *einfo,
224                               int decref)
225 {
226         union ldlm_policy_data *policy = &mti->mti_policy;
227         struct mdt_lock_handle *lh = &mti->mti_lh[MDT_LH_LOCAL];
228         struct lustre_handle_array *locks = einfo->ei_cbdata;
229         int i;
230
231         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
232         LASSERT(locks);
233
234         memset(policy, 0, sizeof(*policy));
235         policy->l_inodebits.bits = einfo->ei_inodebits;
236         mdt_lock_reg_init(lh, einfo->ei_mode);
237         for (i = 0; i < locks->ha_count; i++) {
238                 if (test_bit(i, (void *)locks->ha_map))
239                         lh->mlh_rreg_lh = locks->ha_handles[i];
240                 else
241                         lh->mlh_reg_lh = locks->ha_handles[i];
242                 mdt_object_unlock(mti, NULL, lh, decref);
243                 locks->ha_handles[i].cookie = 0ull;
244         }
245
246         return mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
247                                 policy);
248 }
249
250 static inline int mdt_object_striped(struct mdt_thread_info *mti,
251                                      struct mdt_object *obj)
252 {
253         struct lu_device *bottom_dev;
254         struct lu_object *bottom_obj;
255         int rc;
256
257         if (!S_ISDIR(obj->mot_header.loh_attr))
258                 return 0;
259
260         /* getxattr from bottom obj to avoid reading in shard FIDs */
261         bottom_dev = dt2lu_dev(mti->mti_mdt->mdt_bottom);
262         bottom_obj = lu_object_find_slice(mti->mti_env, bottom_dev,
263                                           mdt_object_fid(obj), NULL);
264         if (IS_ERR(bottom_obj))
265                 return PTR_ERR(bottom_obj);
266
267         rc = dt_xattr_get(mti->mti_env, lu2dt(bottom_obj), &LU_BUF_NULL,
268                           XATTR_NAME_LMV);
269         lu_object_put(mti->mti_env, bottom_obj);
270
271         return (rc > 0) ? 1 : (rc == -ENODATA) ? 0 : rc;
272 }
273
274 /**
275  * Lock slave stripes if necessary, the lock handles of slave stripes
276  * will be stored in einfo->ei_cbdata.
277  **/
278 static int mdt_stripes_lock(struct mdt_thread_info *mti, struct mdt_object *obj,
279                             enum ldlm_mode mode, __u64 ibits,
280                             struct ldlm_enqueue_info *einfo)
281 {
282         union ldlm_policy_data *policy = &mti->mti_policy;
283
284         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
285         einfo->ei_type = LDLM_IBITS;
286         einfo->ei_mode = mode;
287         einfo->ei_cb_bl = mdt_remote_blocking_ast;
288         einfo->ei_cb_local_bl = mdt_blocking_ast;
289         einfo->ei_cb_cp = ldlm_completion_ast;
290         einfo->ei_enq_slave = 1;
291         einfo->ei_namespace = mti->mti_mdt->mdt_namespace;
292         einfo->ei_inodebits = ibits;
293         einfo->ei_req_slot = 1;
294         memset(policy, 0, sizeof(*policy));
295         policy->l_inodebits.bits = ibits;
296
297         return mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
298                               policy);
299 }
300
301 /** lock object, and stripes if it's a striped directory
302  *
303  * object should be local, this is called in operations which modify both object
304  * and stripes.
305  *
306  * \param info          struct mdt_thread_info
307  * \param parent        parent object, if it's NULL, find parent by mdo_lookup()
308  * \param child         child object
309  * \param lh            lock handle
310  * \param einfo         struct ldlm_enqueue_info
311  * \param ibits         MDS inode lock bits
312  * \param mode          lock mode
313  * \param cos_incompat  DNE COS incompatible
314  *
315  * \retval              0 on success, -ev on error.
316  */
317 int mdt_object_stripes_lock(struct mdt_thread_info *info,
318                             struct mdt_object *parent,
319                             struct mdt_object *child,
320                             struct mdt_lock_handle *lh,
321                             struct ldlm_enqueue_info *einfo, __u64 ibits,
322                             enum ldlm_mode mode, bool cos_incompat)
323 {
324         int rc;
325
326         ENTRY;
327         /* according to the protocol, child should be local, is request sent to
328          * wrong MDT?
329          */
330         if (mdt_object_remote(child)) {
331                 CERROR("%s: lock target "DFID", but it is on other MDT: rc = %d\n",
332                        mdt_obd_name(info->mti_mdt), PFID(mdt_object_fid(child)),
333                        -EREMOTE);
334                 RETURN(-EREMOTE);
335         }
336
337         memset(einfo, 0, sizeof(*einfo));
338         if (ibits & MDS_INODELOCK_LOOKUP) {
339                 LASSERT(parent);
340                 rc = mdt_object_check_lock(info, parent, child, lh, ibits,
341                                            mode, cos_incompat);
342         } else {
343                 rc = mdt_object_lock(info, child, lh, ibits, mode,
344                                      cos_incompat);
345         }
346         if (rc)
347                 RETURN(rc);
348
349         rc = mdt_object_striped(info, child);
350         if (rc == 0)
351                 return 0;
352
353         if (rc < 0)
354                 goto unlock;
355
356         /* lock stripes for striped directory */
357         rc = mdt_stripes_lock(info, child, lh->mlh_reg_mode, ibits, einfo);
358         if (rc == -EIO && CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME))
359                 rc = 0;
360
361 unlock:
362         if (rc)
363                 mdt_object_unlock(info, child, lh, rc);
364
365         return rc;
366 }
367
368 void mdt_object_stripes_unlock(struct mdt_thread_info *info,
369                               struct mdt_object *obj,
370                               struct mdt_lock_handle *lh,
371                               struct ldlm_enqueue_info *einfo, int decref)
372 {
373         if (einfo->ei_cbdata)
374                 mdt_stripes_unlock(info, obj, einfo, decref);
375         mdt_object_unlock(info, obj, lh, decref);
376 }
377
378 static int mdt_restripe(struct mdt_thread_info *info,
379                         struct mdt_object *parent,
380                         const struct lu_name *lname,
381                         const struct lu_fid *tfid,
382                         struct md_op_spec *spec,
383                         struct md_attr *ma)
384 {
385         struct mdt_device *mdt = info->mti_mdt;
386         struct lu_fid *fid = &info->mti_tmp_fid2;
387         struct ldlm_enqueue_info *einfo = &info->mti_einfo;
388         struct lmv_user_md *lum = spec->u.sp_ea.eadata;
389         struct lu_ucred *uc = mdt_ucred(info);
390         struct lmv_mds_md_v1 *lmv;
391         struct mdt_object *child;
392         struct mdt_lock_handle *lhp;
393         struct mdt_lock_handle *lhc;
394         struct mdt_body *repbody;
395         int rc;
396
397         ENTRY;
398
399         /* we want rbac roles to have precedence over any other
400          * permission or capability checks
401          */
402         if (!mdt->mdt_enable_dir_restripe && !uc->uc_rbac_dne_ops)
403                 RETURN(-EPERM);
404
405         LASSERT(lum);
406         lum->lum_hash_type |= cpu_to_le32(LMV_HASH_FLAG_FIXED);
407
408         rc = mdt_version_get_check_save(info, parent, 0);
409         if (rc)
410                 RETURN(rc);
411
412         lhp = &info->mti_lh[MDT_LH_PARENT];
413         rc = mdt_parent_lock(info, parent, lhp, lname, LCK_PW, true);
414         if (rc)
415                 RETURN(rc);
416
417         rc = mdt_stripe_get(info, parent, ma, XATTR_NAME_LMV);
418         if (rc)
419                 GOTO(unlock_parent, rc);
420
421         if (ma->ma_valid & MA_LMV) {
422                 /* don't allow restripe if parent dir layout is changing */
423                 lmv = &ma->ma_lmv->lmv_md_v1;
424                 if (!lmv_is_sane2(lmv))
425                         GOTO(unlock_parent, rc = -EBADF);
426
427                 if (lmv_is_layout_changing(lmv))
428                         GOTO(unlock_parent, rc = -EBUSY);
429         }
430
431         fid_zero(fid);
432         rc = mdt_lookup_version_check(info, parent, lname, fid, 1);
433         if (rc)
434                 GOTO(unlock_parent, rc);
435
436         child = mdt_object_find(info->mti_env, mdt, fid);
437         if (IS_ERR(child))
438                 GOTO(unlock_parent, rc = PTR_ERR(child));
439
440         if (!mdt_object_exists(child))
441                 GOTO(out_child, rc = -ENOENT);
442
443         if (mdt_object_remote(child)) {
444                 struct mdt_body *repbody;
445
446                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
447                 if (!repbody)
448                         GOTO(out_child, rc = -EPROTO);
449
450                 repbody->mbo_fid1 = *fid;
451                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
452                 GOTO(out_child, rc = -EREMOTE);
453         }
454
455         if (!S_ISDIR(lu_object_attr(&child->mot_obj)))
456                 GOTO(out_child, rc = -ENOTDIR);
457
458         rc = mdt_stripe_get(info, child, ma, XATTR_NAME_LMV);
459         if (rc)
460                 GOTO(out_child, rc);
461
462         /* race with migrate? */
463         if ((ma->ma_valid & MA_LMV) &&
464              lmv_is_migrating(&ma->ma_lmv->lmv_md_v1))
465                 GOTO(out_child, rc = -EBUSY);
466
467         /* lock object */
468         lhc = &info->mti_lh[MDT_LH_CHILD];
469         rc = mdt_object_stripes_lock(info, parent, child, lhc, einfo,
470                                      MDS_INODELOCK_FULL, LCK_PW, true);
471         if (rc)
472                 GOTO(unlock_child, rc);
473
474         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
475         rc = mdt_version_get_check_save(info, child, 1);
476         if (rc)
477                 GOTO(unlock_child, rc);
478
479         spin_lock(&mdt->mdt_restriper.mdr_lock);
480         if (child->mot_restriping) {
481                 /* race? */
482                 spin_unlock(&mdt->mdt_restriper.mdr_lock);
483                 GOTO(unlock_child, rc = -EBUSY);
484         }
485         child->mot_restriping = 1;
486         spin_unlock(&mdt->mdt_restriper.mdr_lock);
487
488         *fid = *tfid;
489         rc = mdt_restripe_internal(info, parent, child, lname, fid, spec, ma);
490         if (rc)
491                 GOTO(restriping_clear, rc);
492
493         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
494         if (!repbody)
495                 GOTO(restriping_clear, rc = -EPROTO);
496
497         mdt_pack_attr2body(info, repbody, &ma->ma_attr, fid);
498         EXIT;
499
500 restriping_clear:
501         child->mot_restriping = 0;
502 unlock_child:
503         mdt_object_stripes_unlock(info, child, lhc, einfo, rc);
504 out_child:
505         mdt_object_put(info->mti_env, child);
506 unlock_parent:
507         mdt_object_unlock(info, parent, lhp, rc);
508
509         return rc;
510 }
511
512 /*
513  * VBR: we save three versions in reply:
514  * 0 - parent. Check that parent version is the same during replay.
515  * 1 - name. Version of 'name' if file exists with the same name or
516  * ENOENT_VERSION, it is needed because file may appear due to missed replays.
517  * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
518  * check.
519  */
520 static int mdt_create(struct mdt_thread_info *info)
521 {
522         struct mdt_device *mdt = info->mti_mdt;
523         struct mdt_object *parent;
524         struct mdt_object *child;
525         struct mdt_lock_handle *lh;
526         struct mdt_body *repbody;
527         struct md_attr *ma = &info->mti_attr;
528         struct mdt_reint_record *rr = &info->mti_rr;
529         struct md_op_spec *spec = &info->mti_spec;
530         struct lu_ucred *uc = mdt_ucred(info);
531         bool restripe = false;
532         int rc;
533
534         ENTRY;
535         DEBUG_REQ(D_INODE, mdt_info_req(info),
536                   "Create ("DNAME"->"DFID") in "DFID,
537                   PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1));
538
539         if (!fid_is_md_operative(rr->rr_fid1))
540                 RETURN(-EPERM);
541
542         /* MDS_OPEN_DEFAULT_LMV means eadata is parent default LMV, which is set
543          * if client maintains inherited default LMV
544          */
545         if (S_ISDIR(ma->ma_attr.la_mode) &&
546             spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0 &&
547             !(spec->sp_cr_flags & MDS_OPEN_DEFAULT_LMV)) {
548                 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
549                 struct obd_export *exp = mdt_info_req(info)->rq_export;
550
551                 /* Only new clients can create remote dir( >= 2.4) and
552                  * striped dir(>= 2.6), old client will return -ENOTSUPP
553                  */
554                 if (!mdt_is_dne_client(exp))
555                         RETURN(-ENOTSUPP);
556
557                 if (le32_to_cpu(lum->lum_stripe_count) > 1) {
558                         if (!mdt_is_striped_client(exp))
559                                 RETURN(-ENOTSUPP);
560
561                         if (!mdt->mdt_enable_striped_dir)
562                                 RETURN(-EPERM);
563                 } else if (!mdt->mdt_enable_remote_dir) {
564                         RETURN(-EPERM);
565                 }
566
567                 if ((!(exp_connect_flags2(exp) & OBD_CONNECT2_CRUSH)) &&
568                     (le32_to_cpu(lum->lum_hash_type) & LMV_HASH_TYPE_MASK) >=
569                     LMV_HASH_TYPE_CRUSH)
570                         RETURN(-EPROTO);
571
572                 /* we want rbac roles to have precedence over any other
573                  * permission or capability checks
574                  */
575                 if (!uc->uc_rbac_dne_ops ||
576                     (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
577                      uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
578                      mdt->mdt_enable_remote_dir_gid != -1))
579                         RETURN(-EPERM);
580
581                 /* restripe if later found dir exists, MDS_OPEN_CREAT means
582                  * this is create only, don't try restripe.
583                  */
584                 if (mdt->mdt_enable_dir_restripe &&
585                     le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT &&
586                     !(spec->sp_cr_flags & MDS_OPEN_CREAT))
587                         restripe = true;
588         }
589
590         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
591
592         parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
593         if (IS_ERR(parent))
594                 RETURN(PTR_ERR(parent));
595
596         if (!mdt_object_exists(parent))
597                 GOTO(put_parent, rc = -ENOENT);
598
599         rc = mdt_check_enc(info, parent);
600         if (rc)
601                 GOTO(put_parent, rc);
602
603         if (!uc->uc_rbac_fscrypt_admin &&
604             parent->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD)
605                 GOTO(put_parent, rc = -EPERM);
606
607         /*
608          * LU-10235: check if name exists locklessly first to avoid massive
609          * lock recalls on existing directories.
610          */
611         rc = mdt_lookup_version_check(info, parent, &rr->rr_name,
612                                       &info->mti_tmp_fid1, 1);
613         if (rc == 0) {
614                 if (!restripe)
615                         GOTO(put_parent, rc = -EEXIST);
616
617                 rc = mdt_restripe(info, parent, &rr->rr_name, rr->rr_fid2, spec,
618                                   ma);
619         }
620
621         /* -ENOENT is expected here */
622         if (rc != -ENOENT)
623                 GOTO(put_parent, rc);
624
625         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PAUSE_CREATE_AFTER_LOOKUP, cfs_fail_val);
626
627         /* save version of file name for replay, it must be ENOENT here */
628         mdt_enoent_version_save(info, 1);
629
630         CFS_RACE(OBD_FAIL_MDS_CREATE_RACE);
631
632         lh = &info->mti_lh[MDT_LH_PARENT];
633         rc = mdt_parent_lock(info, parent, lh, &rr->rr_name, LCK_PW, false);
634         if (rc)
635                 GOTO(put_parent, rc);
636
637         if (!mdt_object_remote(parent)) {
638                 rc = mdt_version_get_check_save(info, parent, 0);
639                 if (rc)
640                         GOTO(unlock_parent, rc);
641         }
642
643         /*
644          * now repeat the lookup having a LDLM lock on the parent dir,
645          * as another thread could create the same name. notice this
646          * lookup is supposed to hit cache in OSD and be cheap if the
647          * directory is not being modified concurrently.
648          */
649         rc = mdo_lookup(info->mti_env, mdt_object_child(parent), &rr->rr_name,
650                         &info->mti_tmp_fid1, &info->mti_spec);
651         if (unlikely(rc == 0))
652                 GOTO(unlock_parent, rc = -EEXIST);
653
654         child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
655         if (unlikely(IS_ERR(child)))
656                 GOTO(unlock_parent, rc = PTR_ERR(child));
657
658         ma->ma_need = MA_INODE;
659         ma->ma_valid = 0;
660
661         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
662                         OBD_FAIL_MDS_REINT_CREATE_WRITE);
663
664         /* Version of child will be updated on disk. */
665         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
666         rc = mdt_version_get_check_save(info, child, 2);
667         if (rc)
668                 GOTO(put_child, rc);
669
670         if (parent->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD ||
671             (rr->rr_name.ln_namelen == strlen(dot_fscrypt_name) &&
672              strncmp(rr->rr_name.ln_name, dot_fscrypt_name,
673                      rr->rr_name.ln_namelen) == 0))
674                 child->mot_obj.lo_header->loh_attr |= LOHA_FSCRYPT_MD;
675
676         /*
677          * Do not perform lookup sanity check. We know that name does
678          * not exist.
679          */
680         info->mti_spec.sp_cr_lookup = 0;
681         if (mdt_object_remote(parent))
682                 info->mti_spec.sp_cr_lookup = 1;
683         info->mti_spec.sp_feat = &dt_directory_features;
684
685         rc = mdo_create(info->mti_env, mdt_object_child(parent), &rr->rr_name,
686                         mdt_object_child(child), &info->mti_spec, ma);
687         if (rc == 0)
688                 rc = mdt_attr_get_complex(info, child, ma);
689
690         if (rc < 0)
691                 GOTO(put_child, rc);
692
693         /*
694          * On DNE, we need to eliminate dependey between 'mkdir a' and
695          * 'mkdir a/b' if b is a striped directory, to achieve this, two
696          * things are done below:
697          * 1. save child and slaves lock.
698          * 2. if the child is a striped directory, relock parent so to
699          *    compare against with COS locks to ensure parent was
700          *    committed to disk.
701          */
702         if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
703                 struct mdt_lock_handle *lhc;
704                 struct ldlm_enqueue_info *einfo = &info->mti_einfo;
705                 bool cos_incompat;
706
707                 rc = mdt_object_striped(info, child);
708                 if (rc < 0)
709                         GOTO(put_child, rc);
710
711                 cos_incompat = rc;
712                 if (cos_incompat) {
713                         if (!mdt_object_remote(parent)) {
714                                 mdt_object_unlock(info, parent, lh, 1);
715                                 rc = mdt_parent_lock(info, parent, lh,
716                                                      &rr->rr_name, LCK_PW,
717                                                      true);
718                                 if (rc)
719                                         GOTO(put_child, rc);
720                         }
721                 }
722
723                 lhc = &info->mti_lh[MDT_LH_CHILD];
724                 rc = mdt_object_stripes_lock(info, parent, child, lhc, einfo,
725                                              MDS_INODELOCK_UPDATE, LCK_PW,
726                                              cos_incompat);
727                 if (rc)
728                         GOTO(put_child, rc);
729
730                 mdt_object_stripes_unlock(info, child, lhc, einfo, rc);
731         }
732
733         /* Return fid & attr to client. */
734         if (ma->ma_valid & MA_INODE)
735                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
736                                    mdt_object_fid(child));
737         EXIT;
738 put_child:
739         mdt_object_put(info->mti_env, child);
740 unlock_parent:
741         mdt_object_unlock(info, parent, lh, rc);
742 put_parent:
743         mdt_object_put(info->mti_env, parent);
744         return rc;
745 }
746
747 static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
748                         struct md_attr *ma)
749 {
750         struct mdt_lock_handle  *lh;
751         int do_vbr = ma->ma_attr.la_valid &
752                         (LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS);
753         __u64 lockpart = MDS_INODELOCK_UPDATE;
754         struct ldlm_enqueue_info *einfo = &info->mti_einfo;
755         bool cos_incompat;
756         int rc;
757
758         ENTRY;
759         rc = mdt_object_striped(info, mo);
760         if (rc < 0)
761                 RETURN(rc);
762         cos_incompat = rc;
763
764         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
765                 lockpart |= MDS_INODELOCK_PERM;
766         /* Clear xattr cache on clients, so the virtual project ID xattr
767          * can get the new project ID
768          */
769         if (ma->ma_attr.la_valid & LA_PROJID)
770                 lockpart |= MDS_INODELOCK_XATTR;
771
772         lh = &info->mti_lh[MDT_LH_PARENT];
773         rc = mdt_object_stripes_lock(info, NULL, mo, lh, einfo, lockpart,
774                                      LCK_PW, cos_incompat);
775         if (rc != 0)
776                 RETURN(rc);
777
778         /* all attrs are packed into mti_attr in unpack_setattr */
779         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
780                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
781
782         /* VBR: update version if attr changed are important for recovery */
783         if (do_vbr) {
784                 /* update on-disk version of changed object */
785                 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mo));
786                 rc = mdt_version_get_check_save(info, mo, 0);
787                 if (rc)
788                         GOTO(out_unlock, rc);
789         }
790
791         /* Ensure constant striping during chown(). See LU-2789. */
792         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
793                 mutex_lock(&mo->mot_lov_mutex);
794
795         /* all attrs are packed into mti_attr in unpack_setattr */
796         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
797
798         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
799                 mutex_unlock(&mo->mot_lov_mutex);
800
801         if (rc != 0)
802                 GOTO(out_unlock, rc);
803         mdt_dom_obj_lvb_update(info->mti_env, mo, NULL, false);
804         EXIT;
805 out_unlock:
806         mdt_object_stripes_unlock(info, mo, lh, einfo, rc);
807         return rc;
808 }
809
810 /**
811  * Check HSM flags and add HS_DIRTY flag if relevant.
812  *
813  * A file could be set dirty only if it has a copy in the backend (HS_EXISTS)
814  * and is not RELEASED.
815  */
816 int mdt_add_dirty_flag(struct mdt_thread_info *info, struct mdt_object *mo,
817                         struct md_attr *ma)
818 {
819         struct lu_ucred *uc = mdt_ucred(info);
820         kernel_cap_t cap_saved;
821         int rc;
822
823         ENTRY;
824         /* If the file was modified, add the dirty flag */
825         ma->ma_need = MA_HSM;
826         rc = mdt_attr_get_complex(info, mo, ma);
827         if (rc) {
828                 CERROR("file attribute read error for "DFID": %d.\n",
829                         PFID(mdt_object_fid(mo)), rc);
830                 RETURN(rc);
831         }
832
833         /* If an up2date copy exists in the backend, add dirty flag */
834         if ((ma->ma_valid & MA_HSM) && (ma->ma_hsm.mh_flags & HS_EXISTS)
835             && !(ma->ma_hsm.mh_flags & (HS_DIRTY|HS_RELEASED))) {
836                 ma->ma_hsm.mh_flags |= HS_DIRTY;
837
838                 /* Bump cap so that closes from non-owner writers can
839                  * set the HSM state to dirty.
840                  */
841                 cap_saved = uc->uc_cap;
842                 cap_raise(uc->uc_cap, CAP_FOWNER);
843                 rc = mdt_hsm_attr_set(info, mo, &ma->ma_hsm);
844                 uc->uc_cap = cap_saved;
845                 if (rc)
846                         CERROR("file attribute change error for "DFID": %d\n",
847                                 PFID(mdt_object_fid(mo)), rc);
848         }
849
850         RETURN(rc);
851 }
852
853 static int mdt_reint_setattr(struct mdt_thread_info *info,
854                              struct mdt_lock_handle *lhc)
855 {
856         struct mdt_device *mdt = info->mti_mdt;
857         struct md_attr *ma = &info->mti_attr;
858         struct mdt_reint_record *rr = &info->mti_rr;
859         struct ptlrpc_request *req = mdt_info_req(info);
860         struct mdt_object *mo;
861         struct mdt_body *repbody;
862         ktime_t kstart = ktime_get();
863         int rc;
864
865         ENTRY;
866         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
867                   (unsigned int)ma->ma_attr.la_valid);
868
869         if (info->mti_dlm_req)
870                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
871
872         CFS_RACE(OBD_FAIL_PTLRPC_RESEND_RACE);
873
874         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
875         mo = mdt_object_find(info->mti_env, mdt, rr->rr_fid1);
876         if (IS_ERR(mo))
877                 GOTO(out, rc = PTR_ERR(mo));
878
879         if (!mdt_object_exists(mo))
880                 GOTO(out_put, rc = -ENOENT);
881
882         if (mdt_object_remote(mo))
883                 GOTO(out_put, rc = -EREMOTE);
884
885         ma->ma_enable_chprojid_gid = mdt->mdt_enable_chprojid_gid;
886         /* revoke lease lock if size is going to be changed */
887         if (unlikely(ma->ma_attr.la_valid & LA_SIZE &&
888                      !(ma->ma_attr_flags & MDS_TRUNC_KEEP_LEASE) &&
889                      atomic_read(&mo->mot_lease_count) > 0)) {
890                 down_read(&mo->mot_open_sem);
891
892                 if (atomic_read(&mo->mot_lease_count) > 0) { /* lease exists */
893                         lhc = &info->mti_lh[MDT_LH_LOCAL];
894                         rc = mdt_object_lock(info, mo, lhc, MDS_INODELOCK_OPEN,
895                                              LCK_CW, false);
896                         if (rc != 0) {
897                                 up_read(&mo->mot_open_sem);
898                                 GOTO(out_put, rc);
899                         }
900
901                         /* revoke lease lock */
902                         mdt_object_unlock(info, mo, lhc, 1);
903                 }
904                 up_read(&mo->mot_open_sem);
905         }
906
907         if (ma->ma_attr.la_valid & LA_SIZE || rr->rr_flags & MRF_OPEN_TRUNC) {
908                 /* Check write access for the O_TRUNC case */
909                 if (mdt_write_read(mo) < 0)
910                         GOTO(out_put, rc = -ETXTBSY);
911
912                 /* LU-10286: compatibility check for FLR.
913                  * Please check the comment in mdt_finish_open() for details
914                  */
915                 if (!exp_connect_flr(info->mti_exp) ||
916                     !exp_connect_overstriping(info->mti_exp)) {
917                         rc = mdt_big_xattr_get(info, mo, XATTR_NAME_LOV);
918                         if (rc < 0 && rc != -ENODATA)
919                                 GOTO(out_put, rc);
920
921                         if (!exp_connect_flr(info->mti_exp)) {
922                                 if (rc > 0 &&
923                                     mdt_lmm_is_flr(info->mti_big_lmm))
924                                         GOTO(out_put, rc = -EOPNOTSUPP);
925                         }
926
927                         if (!exp_connect_overstriping(info->mti_exp)) {
928                                 if (rc > 0 &&
929                                     mdt_lmm_is_overstriping(info->mti_big_lmm))
930                                         GOTO(out_put, rc = -EOPNOTSUPP);
931                         }
932                 }
933
934                 /* For truncate, the file size sent from client
935                  * is believable, but the blocks are incorrect,
936                  * which makes the block size in LSOM attribute
937                  * inconsisent with the real block size.
938                  */
939                 rc = mdt_lsom_update(info, mo, true);
940                 if (rc)
941                         GOTO(out_put, rc);
942         }
943
944         if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
945                 if (ma->ma_valid & MA_LOV)
946                         GOTO(out_put, rc = -EPROTO);
947
948                 /* MDT supports FMD for regular files due to Data-on-MDT */
949                 if (S_ISREG(lu_object_attr(&mo->mot_obj)) &&
950                     ma->ma_attr.la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) {
951                         tgt_fmd_update(info->mti_exp, mdt_object_fid(mo),
952                                        req->rq_xid);
953
954                         if (ma->ma_attr.la_valid & LA_MTIME) {
955                                 rc = mdt_attr_get_pfid(info, mo, &ma->ma_pfid);
956                                 if (!rc)
957                                         ma->ma_valid |= MA_PFID;
958                         }
959                 }
960
961                 rc = mdt_attr_set(info, mo, ma);
962                 if (rc)
963                         GOTO(out_put, rc);
964         } else if ((ma->ma_valid & (MA_LOV | MA_LMV)) &&
965                    (ma->ma_valid & MA_INODE)) {
966                 struct lu_buf *buf = &info->mti_buf;
967                 struct lu_ucred *uc = mdt_ucred(info);
968                 struct mdt_lock_handle *lh;
969                 const char *name;
970
971                 /* reject if either remote or striped dir is disabled */
972                 if (ma->ma_valid & MA_LMV) {
973                         if (!mdt->mdt_enable_remote_dir ||
974                             !mdt->mdt_enable_striped_dir)
975                                 GOTO(out_put, rc = -EPERM);
976
977                         /* we want rbac roles to have precedence over any other
978                          * permission or capability checks
979                          */
980                         if (!uc->uc_rbac_dne_ops ||
981                             (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
982                              uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
983                              mdt->mdt_enable_remote_dir_gid != -1))
984                                 GOTO(out_put, rc = -EPERM);
985                 }
986
987                 if (!S_ISDIR(lu_object_attr(&mo->mot_obj)))
988                         GOTO(out_put, rc = -ENOTDIR);
989
990                 if (ma->ma_attr.la_valid != 0)
991                         GOTO(out_put, rc = -EPROTO);
992
993                 lh = &info->mti_lh[MDT_LH_PARENT];
994                 if (ma->ma_valid & MA_LOV) {
995                         buf->lb_buf = ma->ma_lmm;
996                         buf->lb_len = ma->ma_lmm_size;
997                         name = XATTR_NAME_LOV;
998                         rc = mdt_object_lock(info, mo, lh, MDS_INODELOCK_XATTR,
999                                              LCK_PW, false);
1000                 } else {
1001                         buf->lb_buf = &ma->ma_lmv->lmv_user_md;
1002                         buf->lb_len = ma->ma_lmv_size;
1003                         name = XATTR_NAME_DEFAULT_LMV;
1004
1005                         if (unlikely(fid_is_root(mdt_object_fid(mo)))) {
1006                                 rc = mdt_object_lock(info, mo, lh,
1007                                                      MDS_INODELOCK_XATTR |
1008                                                      MDS_INODELOCK_LOOKUP,
1009                                                      LCK_PW, false);
1010                         } else {
1011                                 struct lu_fid *pfid = &info->mti_tmp_fid1;
1012                                 struct lu_name *pname = &info->mti_name;
1013                                 const char dotdot[] = "..";
1014                                 struct mdt_object *pobj;
1015
1016                                 fid_zero(pfid);
1017                                 pname->ln_name = dotdot;
1018                                 pname->ln_namelen = sizeof(dotdot);
1019                                 rc = mdo_lookup(info->mti_env,
1020                                                 mdt_object_child(mo), pname,
1021                                                 pfid, NULL);
1022                                 if (rc)
1023                                         GOTO(out_put, rc);
1024
1025                                 pobj = mdt_object_find(info->mti_env,
1026                                                        info->mti_mdt, pfid);
1027                                 if (IS_ERR(pobj))
1028                                         GOTO(out_put, rc = PTR_ERR(pobj));
1029
1030                                 rc = mdt_object_check_lock(info, pobj, mo, lh,
1031                                                            MDS_INODELOCK_XATTR |
1032                                                            MDS_INODELOCK_LOOKUP,
1033                                                            LCK_PW, false);
1034                                 mdt_object_put(info->mti_env, pobj);
1035                         }
1036                 }
1037
1038                 if (rc != 0)
1039                         GOTO(out_put, rc);
1040
1041                 rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), buf,
1042                                   name, 0);
1043
1044                 mdt_object_unlock(info, mo, lh, rc);
1045                 if (rc)
1046                         GOTO(out_put, rc);
1047         } else {
1048                 GOTO(out_put, rc = -EPROTO);
1049         }
1050
1051         /* If file data is modified, add the dirty flag */
1052         if (ma->ma_attr_flags & MDS_DATA_MODIFIED)
1053                 rc = mdt_add_dirty_flag(info, mo, ma);
1054
1055         ma->ma_need = MA_INODE;
1056         ma->ma_valid = 0;
1057         rc = mdt_attr_get_complex(info, mo, ma);
1058         if (rc != 0)
1059                 GOTO(out_put, rc);
1060
1061         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
1062
1063         EXIT;
1064 out_put:
1065         mdt_object_put(info->mti_env, mo);
1066 out:
1067         if (rc == 0)
1068                 mdt_counter_incr(req, LPROC_MDT_SETATTR,
1069                                  ktime_us_delta(ktime_get(), kstart));
1070
1071         mdt_client_compatibility(info);
1072         return rc;
1073 }
1074
1075 static int mdt_reint_create(struct mdt_thread_info *info,
1076                             struct mdt_lock_handle *lhc)
1077 {
1078         struct ptlrpc_request   *req = mdt_info_req(info);
1079         ktime_t                 kstart = ktime_get();
1080         int                     rc;
1081
1082         ENTRY;
1083         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
1084                 RETURN(err_serious(-ESTALE));
1085
1086         if (info->mti_dlm_req)
1087                 ldlm_request_cancel(mdt_info_req(info),
1088                                     info->mti_dlm_req, 0, LATF_SKIP);
1089
1090         if (!lu_name_is_valid(&info->mti_rr.rr_name))
1091                 RETURN(-EPROTO);
1092
1093         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
1094         case S_IFDIR:
1095         case S_IFREG:
1096         case S_IFLNK:
1097         case S_IFCHR:
1098         case S_IFBLK:
1099         case S_IFIFO:
1100         case S_IFSOCK:
1101                 break;
1102         default:
1103                 CERROR("%s: Unsupported mode %o\n",
1104                        mdt_obd_name(info->mti_mdt),
1105                        info->mti_attr.ma_attr.la_mode);
1106                 RETURN(err_serious(-EOPNOTSUPP));
1107         }
1108
1109         rc = mdt_create(info);
1110         if (rc == 0) {
1111                 if ((info->mti_attr.ma_attr.la_mode & S_IFMT) == S_IFDIR)
1112                         mdt_counter_incr(req, LPROC_MDT_MKDIR,
1113                                          ktime_us_delta(ktime_get(), kstart));
1114                 else
1115                         /* Special file should stay on the same node as parent*/
1116                         mdt_counter_incr(req, LPROC_MDT_MKNOD,
1117                                          ktime_us_delta(ktime_get(), kstart));
1118         }
1119
1120         RETURN(rc);
1121 }
1122
1123 /*
1124  * VBR: save parent version in reply and child version getting by its name.
1125  * Version of child is getting and checking during its lookup. If
1126  */
1127 static int mdt_reint_unlink(struct mdt_thread_info *info,
1128                             struct mdt_lock_handle *lhc)
1129 {
1130         struct mdt_reint_record *rr = &info->mti_rr;
1131         struct ptlrpc_request *req = mdt_info_req(info);
1132         struct md_attr *ma = &info->mti_attr;
1133         struct lu_fid *child_fid = &info->mti_tmp_fid1;
1134         struct mdt_object *mp;
1135         struct mdt_object *mc;
1136         struct mdt_lock_handle *parent_lh;
1137         struct mdt_lock_handle *child_lh;
1138         struct ldlm_enqueue_info *einfo = &info->mti_einfo;
1139         struct lu_ucred *uc  = mdt_ucred(info);
1140         bool cos_incompat = false;
1141         int no_name = 0;
1142         ktime_t kstart = ktime_get();
1143         int rc;
1144
1145         ENTRY;
1146         DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1),
1147                   PNAME(&rr->rr_name));
1148
1149         if (info->mti_dlm_req)
1150                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1151
1152         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
1153                 RETURN(err_serious(-ENOENT));
1154
1155         if (!fid_is_md_operative(rr->rr_fid1))
1156                 RETURN(-EPERM);
1157
1158         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1159         if (IS_ERR(mp))
1160                 RETURN(PTR_ERR(mp));
1161
1162         if (mdt_object_remote(mp)) {
1163                 cos_incompat = true;
1164         } else {
1165                 rc = mdt_version_get_check_save(info, mp, 0);
1166                 if (rc)
1167                         GOTO(put_parent, rc);
1168         }
1169
1170         if (!uc->uc_rbac_fscrypt_admin &&
1171             mp->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD)
1172                 GOTO(put_parent, rc = -EPERM);
1173
1174         CFS_RACE(OBD_FAIL_MDS_REINT_OPEN);
1175         CFS_RACE(OBD_FAIL_MDS_REINT_OPEN2);
1176 relock:
1177         parent_lh = &info->mti_lh[MDT_LH_PARENT];
1178         rc = mdt_parent_lock(info, mp, parent_lh, &rr->rr_name, LCK_PW,
1179                              cos_incompat);
1180         if (rc != 0)
1181                 GOTO(put_parent, rc);
1182
1183         if (info->mti_spec.sp_rm_entry) {
1184                 if (!mdt_is_dne_client(req->rq_export))
1185                         /* Return -ENOTSUPP for old client */
1186                         GOTO(unlock_parent, rc = -ENOTSUPP);
1187
1188                 if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN))
1189                         GOTO(unlock_parent, rc = -EPERM);
1190
1191                 ma->ma_need = MA_INODE;
1192                 ma->ma_valid = 0;
1193                 rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1194                                 NULL, &rr->rr_name, ma, no_name);
1195                 GOTO(unlock_parent, rc);
1196         }
1197
1198         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID) {
1199                 *child_fid = *rr->rr_fid2;
1200         } else {
1201                 /* lookup child object along with version checking */
1202                 fid_zero(child_fid);
1203                 rc = mdt_lookup_version_check(info, mp, &rr->rr_name, child_fid,
1204                                               1);
1205                 if (rc != 0) {
1206                         /* Name might not be able to find during resend of
1207                          * remote unlink, considering following case.
1208                          * dir_A is a remote directory, the name entry of
1209                          * dir_A is on MDT0, the directory is on MDT1,
1210                          *
1211                          * 1. client sends unlink req to MDT1.
1212                          * 2. MDT1 sends name delete update to MDT0.
1213                          * 3. name entry is being deleted in MDT0 synchronously.
1214                          * 4. MDT1 is restarted.
1215                          * 5. client resends unlink req to MDT1. So it can not
1216                          *    find the name entry on MDT0 anymore.
1217                          * In this case, MDT1 only needs to destory the local
1218                          * directory.
1219                          */
1220                         if (mdt_object_remote(mp) && rc == -ENOENT &&
1221                             !fid_is_zero(rr->rr_fid2) &&
1222                             lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
1223                                 no_name = 1;
1224                                 *child_fid = *rr->rr_fid2;
1225                         } else {
1226                                 GOTO(unlock_parent, rc);
1227                         }
1228                 }
1229         }
1230
1231         if (!fid_is_md_operative(child_fid))
1232                 GOTO(unlock_parent, rc = -EPERM);
1233
1234         /* We will lock the child regardless it is local or remote. No harm. */
1235         mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
1236         if (IS_ERR(mc))
1237                 GOTO(unlock_parent, rc = PTR_ERR(mc));
1238
1239         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID) {
1240                 /* In this case, child fid is embedded in the request, and we do
1241                  * not have a proper name as rr_name contains an encoded
1242                  * hash. So find name that matches provided hash.
1243                  */
1244                 if (!find_name_matching_hash(info, &rr->rr_name,
1245                                              NULL, mc))
1246                         GOTO(put_child, rc = -ENOENT);
1247         }
1248
1249         if (!cos_incompat) {
1250                 rc = mdt_object_striped(info, mc);
1251                 if (rc < 0)
1252                         GOTO(put_child, rc);
1253
1254                 cos_incompat = rc;
1255                 if (cos_incompat) {
1256                         mdt_object_put(info->mti_env, mc);
1257                         mdt_object_unlock(info, mp, parent_lh, -EAGAIN);
1258                         goto relock;
1259                 }
1260         }
1261
1262         child_lh = &info->mti_lh[MDT_LH_CHILD];
1263         if (mdt_object_remote(mc)) {
1264                 struct mdt_body  *repbody;
1265
1266                 if (!fid_is_zero(rr->rr_fid2)) {
1267                         CDEBUG(D_INFO, "%s: name "DNAME" cannot find "DFID"\n",
1268                                mdt_obd_name(info->mti_mdt),
1269                                PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1270                         GOTO(put_child, rc = -ENOENT);
1271                 }
1272                 CDEBUG(D_INFO, "%s: name "DNAME": "DFID" is on another MDT\n",
1273                        mdt_obd_name(info->mti_mdt),
1274                        PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1275
1276                 if (!mdt_is_dne_client(req->rq_export))
1277                         /* Return -ENOTSUPP for old client */
1278                         GOTO(put_child, rc = -ENOTSUPP);
1279
1280                 /* Revoke the LOOKUP lock of the remote object granted by
1281                  * this MDT. Since the unlink will happen on another MDT,
1282                  * it will release the LOOKUP lock right away. Then What
1283                  * would happen if another client try to grab the LOOKUP
1284                  * lock at the same time with unlink XXX
1285                  */
1286                 rc = mdt_object_lookup_lock(info, NULL, mc, child_lh, LCK_EX,
1287                                             false);
1288                 if (rc)
1289                         GOTO(put_child, rc);
1290
1291                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1292                 LASSERT(repbody != NULL);
1293                 repbody->mbo_fid1 = *mdt_object_fid(mc);
1294                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
1295                 GOTO(unlock_child, rc = -EREMOTE);
1296         }
1297         /* We used to acquire MDS_INODELOCK_FULL here but we can't do
1298          * this now because a running HSM restore on the child (unlink
1299          * victim) will hold the layout lock. See LU-4002.
1300          */
1301         rc = mdt_object_stripes_lock(info, mp, mc, child_lh, einfo,
1302                                      MDS_INODELOCK_LOOKUP |
1303                                      MDS_INODELOCK_UPDATE,
1304                                      LCK_EX, cos_incompat);
1305         if (rc != 0)
1306                 GOTO(put_child, rc);
1307
1308         /*
1309          * Now we can only make sure we need MA_INODE, in mdd layer, will check
1310          * whether need MA_LOV and MA_COOKIE.
1311          */
1312         ma->ma_need = MA_INODE;
1313         ma->ma_valid = 0;
1314
1315         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1316                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
1317         /* save version when object is locked */
1318         mdt_version_get_save(info, mc, 1);
1319
1320         mutex_lock(&mc->mot_lov_mutex);
1321
1322         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1323                         mdt_object_child(mc), &rr->rr_name, ma, no_name);
1324
1325         mutex_unlock(&mc->mot_lov_mutex);
1326         if (rc != 0)
1327                 GOTO(unlock_child, rc);
1328
1329         if (!lu_object_is_dying(&mc->mot_header)) {
1330                 rc = mdt_attr_get_complex(info, mc, ma);
1331                 if (rc)
1332                         GOTO(out_stat, rc);
1333         } else if (mdt_dom_check_for_discard(info, mc)) {
1334                 mdt_dom_discard_data(info, mc);
1335         }
1336         mdt_handle_last_unlink(info, mc, ma);
1337
1338 out_stat:
1339         if (ma->ma_valid & MA_INODE) {
1340                 switch (ma->ma_attr.la_mode & S_IFMT) {
1341                 case S_IFDIR:
1342                         mdt_counter_incr(req, LPROC_MDT_RMDIR,
1343                                          ktime_us_delta(ktime_get(), kstart));
1344                         break;
1345                 case S_IFREG:
1346                 case S_IFLNK:
1347                 case S_IFCHR:
1348                 case S_IFBLK:
1349                 case S_IFIFO:
1350                 case S_IFSOCK:
1351                         mdt_counter_incr(req, LPROC_MDT_UNLINK,
1352                                          ktime_us_delta(ktime_get(), kstart));
1353                         break;
1354                 default:
1355                         LASSERTF(0, "bad file type %o unlinking\n",
1356                                 ma->ma_attr.la_mode);
1357                 }
1358         }
1359
1360         EXIT;
1361
1362 unlock_child:
1363         mdt_object_stripes_unlock(info, mc, child_lh, einfo, rc);
1364 put_child:
1365         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID &&
1366             info->mti_big_buf.lb_buf)
1367                 lu_buf_free(&info->mti_big_buf);
1368         mdt_object_put(info->mti_env, mc);
1369 unlock_parent:
1370         mdt_object_unlock(info, mp, parent_lh, rc);
1371 put_parent:
1372         mdt_object_put(info->mti_env, mp);
1373         CFS_RACE_WAKEUP(OBD_FAIL_OBD_ZERO_NLINK_RACE);
1374         return rc;
1375 }
1376
1377 /*
1378  * VBR: save versions in reply: 0 - parent; 1 - child by fid; 2 - target by
1379  * name.
1380  */
1381 static int mdt_reint_link(struct mdt_thread_info *info,
1382                           struct mdt_lock_handle *lhc)
1383 {
1384         struct mdt_reint_record *rr = &info->mti_rr;
1385         struct ptlrpc_request   *req = mdt_info_req(info);
1386         struct md_attr          *ma = &info->mti_attr;
1387         struct mdt_object       *ms;
1388         struct mdt_object       *mp;
1389         struct mdt_lock_handle  *lhs;
1390         struct mdt_lock_handle  *lhp;
1391         ktime_t kstart = ktime_get();
1392         bool cos_incompat;
1393         int rc;
1394
1395         ENTRY;
1396         DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/"DNAME,
1397                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), PNAME(&rr->rr_name));
1398
1399         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
1400                 RETURN(err_serious(-ENOENT));
1401
1402         if (CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_RESEND_RACE) ||
1403             CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND)) {
1404                 req->rq_no_reply = 1;
1405                 RETURN(err_serious(-ENOENT));
1406         }
1407
1408         if (info->mti_dlm_req)
1409                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1410
1411         /* Invalid case so return error immediately instead of
1412          * processing it
1413          */
1414         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
1415                 RETURN(-EPERM);
1416
1417         if (!fid_is_md_operative(rr->rr_fid1) ||
1418             !fid_is_md_operative(rr->rr_fid2))
1419                 RETURN(-EPERM);
1420
1421         /* step 1: find target parent dir */
1422         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
1423         if (IS_ERR(mp))
1424                 RETURN(PTR_ERR(mp));
1425
1426         rc = mdt_version_get_check_save(info, mp, 0);
1427         if (rc)
1428                 GOTO(put_parent, rc);
1429
1430         rc = mdt_check_enc(info, mp);
1431         if (rc)
1432                 GOTO(put_parent, rc);
1433
1434         /* step 2: find source */
1435         ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1436         if (IS_ERR(ms))
1437                 GOTO(put_parent, rc = PTR_ERR(ms));
1438
1439         if (!mdt_object_exists(ms)) {
1440                 CDEBUG(D_INFO, "%s: "DFID" does not exist.\n",
1441                        mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1));
1442                 GOTO(put_source, rc = -ENOENT);
1443         }
1444
1445         cos_incompat = (mdt_object_remote(mp) || mdt_object_remote(ms));
1446
1447         CFS_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
1448
1449         lhp = &info->mti_lh[MDT_LH_PARENT];
1450         rc = mdt_parent_lock(info, mp, lhp, &rr->rr_name, LCK_PW, cos_incompat);
1451         if (rc != 0)
1452                 GOTO(put_source, rc);
1453
1454         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
1455
1456         lhs = &info->mti_lh[MDT_LH_CHILD];
1457         rc = mdt_object_lock(info, ms, lhs,
1458                              MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR, LCK_EX,
1459                              cos_incompat);
1460         if (rc != 0)
1461                 GOTO(unlock_parent, rc);
1462
1463         /* step 3: link it */
1464         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1465                         OBD_FAIL_MDS_REINT_LINK_WRITE);
1466
1467         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(ms));
1468         rc = mdt_version_get_check_save(info, ms, 1);
1469         if (rc)
1470                 GOTO(unlock_source, rc);
1471
1472         /** check target version by name during replay */
1473         rc = mdt_lookup_version_check(info, mp, &rr->rr_name,
1474                                       &info->mti_tmp_fid1, 2);
1475         if (rc != 0 && rc != -ENOENT)
1476                 GOTO(unlock_source, rc);
1477         /* save version of file name for replay, it must be ENOENT here */
1478         if (!req_is_replay(mdt_info_req(info))) {
1479                 if (rc != -ENOENT) {
1480                         CDEBUG(D_INFO, "link target "DNAME" existed!\n",
1481                                PNAME(&rr->rr_name));
1482                         GOTO(unlock_source, rc = -EEXIST);
1483                 }
1484                 info->mti_ver[2] = ENOENT_VERSION;
1485                 mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2);
1486         }
1487
1488         rc = mdo_link(info->mti_env, mdt_object_child(mp),
1489                       mdt_object_child(ms), &rr->rr_name, ma);
1490
1491         if (rc == 0)
1492                 mdt_counter_incr(req, LPROC_MDT_LINK,
1493                                  ktime_us_delta(ktime_get(), kstart));
1494
1495         EXIT;
1496 unlock_source:
1497         mdt_object_unlock(info, ms, lhs, rc);
1498 unlock_parent:
1499         mdt_object_unlock(info, mp, lhp, rc);
1500 put_source:
1501         mdt_object_put(info->mti_env, ms);
1502 put_parent:
1503         mdt_object_put(info->mti_env, mp);
1504         return rc;
1505 }
1506
1507 /**
1508  * Get BFL lock for rename or migrate process.
1509  **/
1510 static int mdt_rename_lock(struct mdt_thread_info *info,
1511                            struct mdt_lock_handle *lh)
1512 {
1513         struct lu_fid *fid = &info->mti_tmp_fid1;
1514         struct mdt_object *obj;
1515         __u64 ibits = MDS_INODELOCK_UPDATE;
1516         int rc;
1517
1518         ENTRY;
1519         lu_root_fid(fid);
1520         obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1521         if (IS_ERR(obj))
1522                 RETURN(PTR_ERR(obj));
1523
1524         mdt_lock_reg_init(lh, LCK_EX);
1525         rc = mdt_object_lock_internal(info, obj, &LUSTRE_BFL_FID, lh,
1526                                       &ibits, 0, false, false);
1527         mdt_object_put(info->mti_env, obj);
1528         RETURN(rc);
1529 }
1530
1531 static void mdt_rename_unlock(struct mdt_thread_info *info,
1532                               struct mdt_lock_handle *lh)
1533 {
1534         ENTRY;
1535         /* Cancel the single rename lock right away */
1536         mdt_object_unlock(info, NULL, lh, 1);
1537         EXIT;
1538 }
1539
1540 static struct mdt_object *mdt_parent_find_check(struct mdt_thread_info *info,
1541                                                 const struct lu_fid *fid,
1542                                                 int idx)
1543 {
1544         struct mdt_object *dir;
1545         int rc;
1546
1547         ENTRY;
1548         dir = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1549         if (IS_ERR(dir))
1550                 RETURN(dir);
1551
1552         /* check early, the real version will be saved after locking */
1553         rc = mdt_version_get_check(info, dir, idx);
1554         if (rc)
1555                 GOTO(out_put, rc);
1556
1557         if (!mdt_object_exists(dir))
1558                 GOTO(out_put, rc = -ENOENT);
1559
1560         if (!S_ISDIR(lu_object_attr(&dir->mot_obj)))
1561                 GOTO(out_put, rc = -ENOTDIR);
1562
1563         RETURN(dir);
1564 out_put:
1565         mdt_object_put(info->mti_env, dir);
1566         return ERR_PTR(rc);
1567 }
1568
1569 /*
1570  * lock rename source object.
1571  *
1572  * Both source and its parent object may be located on remote MDTs, and even on
1573  * different MDTs, which means source object is a remote object on parent.
1574  *
1575  * \retval      0 on success
1576  * \retval      -ev negative errno upon error
1577  */
1578 static int mdt_rename_source_lock(struct mdt_thread_info *info,
1579                                   struct mdt_object *parent,
1580                                   struct mdt_object *child,
1581                                   struct mdt_lock_handle *lh,
1582                                   struct mdt_lock_handle *lh_lookup,
1583                                   __u64 ibits, bool cos_incompat)
1584 {
1585         int rc;
1586
1587         LASSERT(ibits & MDS_INODELOCK_LOOKUP);
1588         /* if @obj is remote object, LOOKUP lock needs to be taken from
1589          * parent MDT.
1590          */
1591         rc = mdt_is_remote_object(info, parent, child);
1592         if (rc < 0)
1593                 return rc;
1594
1595         if (rc == 1) {
1596                 rc = mdt_object_lookup_lock(info, parent, child, lh_lookup,
1597                                             LCK_EX, cos_incompat);
1598                 if (rc)
1599                         return rc;
1600
1601                 ibits &= ~MDS_INODELOCK_LOOKUP;
1602         }
1603
1604         rc = mdt_object_lock(info, child, lh, ibits, LCK_EX, cos_incompat);
1605         if (unlikely(rc && !(ibits & MDS_INODELOCK_LOOKUP)))
1606                 mdt_object_unlock(info, NULL, lh_lookup, rc);
1607
1608         return 0;
1609 }
1610
1611 static void mdt_rename_source_unlock(struct mdt_thread_info *info,
1612                                      struct mdt_object *obj,
1613                                      struct mdt_lock_handle *lh,
1614                                      struct mdt_lock_handle *lh_lookup,
1615                                      int decref)
1616 {
1617         mdt_object_unlock(info, obj, lh, decref);
1618         mdt_object_unlock(info, NULL, lh_lookup, decref);
1619 }
1620
1621 /* migration takes UPDATE lock of link parent, and LOOKUP lock of link */
1622 struct mdt_link_lock {
1623         struct mdt_object *mll_obj;
1624         struct mdt_lock_handle mll_lh;
1625         struct list_head mll_linkage;
1626 };
1627
1628 static inline int mdt_migrate_link_lock_add(struct mdt_thread_info *info,
1629                                             struct mdt_object *o,
1630                                             struct mdt_lock_handle *lh,
1631                                             struct list_head *list)
1632 {
1633         struct mdt_link_lock *mll;
1634
1635         OBD_ALLOC_PTR(mll);
1636         if (mll == NULL)
1637                 return -ENOMEM;
1638
1639         INIT_LIST_HEAD(&mll->mll_linkage);
1640         mdt_object_get(info->mti_env, o);
1641         mll->mll_obj = o;
1642         mll->mll_lh = *lh;
1643         memset(lh, 0, sizeof(*lh));
1644         list_add_tail(&mll->mll_linkage, list);
1645
1646         return 0;
1647 }
1648
1649 static inline void mdt_migrate_link_lock_del(struct mdt_thread_info *info,
1650                                              struct mdt_link_lock *mll,
1651                                              int decref)
1652 {
1653         mdt_object_unlock(info, mll->mll_obj, &mll->mll_lh, decref);
1654         mdt_object_put(info->mti_env, mll->mll_obj);
1655         list_del(&mll->mll_linkage);
1656         OBD_FREE_PTR(mll);
1657 }
1658
1659 static void mdt_migrate_links_unlock(struct mdt_thread_info *info,
1660                                      struct list_head *list, int decref)
1661 {
1662         struct mdt_link_lock *mll;
1663         struct mdt_link_lock *tmp;
1664
1665         list_for_each_entry_safe(mll, tmp, list, mll_linkage)
1666                 mdt_migrate_link_lock_del(info, mll, decref);
1667 }
1668
1669 /* take link parent UPDATE lock.
1670  * \retval      0 \a lnkp is already locked, no lock taken.
1671  *              1 lock taken
1672  *              -ev negative errno.
1673  */
1674 static int mdt_migrate_link_parent_lock(struct mdt_thread_info *info,
1675                                         struct mdt_object *lnkp,
1676                                         struct list_head *update_locks,
1677                                         bool *blocked)
1678 {
1679         const struct lu_fid *fid = mdt_object_fid(lnkp);
1680         struct mdt_lock_handle *lhl = &info->mti_lh[MDT_LH_LOCAL];
1681         struct mdt_link_lock *entry;
1682         __u64 ibits = 0;
1683         int rc;
1684
1685         ENTRY;
1686
1687         /* check if it's already locked */
1688         list_for_each_entry(entry, update_locks, mll_linkage) {
1689                 if (lu_fid_eq(mdt_object_fid(entry->mll_obj), fid)) {
1690                         CDEBUG(D_INFO, "skip "DFID" lock\n", PFID(fid));
1691                         RETURN(0);
1692                 }
1693         }
1694
1695         /* link parent UPDATE lock */
1696         CDEBUG(D_INFO, "lock "DFID"\n", PFID(fid));
1697
1698         if (*blocked) {
1699                 /* revoke lock instead of take in *blocked* mode */
1700                 rc = mdt_object_lock(info, lnkp, lhl, MDS_INODELOCK_UPDATE,
1701                                      LCK_PW, true);
1702                 if (rc)
1703                         RETURN(rc);
1704
1705                 if (mdt_object_remote(lnkp)) {
1706                         struct ldlm_lock *lock;
1707
1708                         /*
1709                          * for remote object, set lock cb_atomic, so lock can be
1710                          * released in blocking_ast() immediately, then the next
1711                          * lock_try will have better chance of success.
1712                          */
1713                         lock = ldlm_handle2lock(&lhl->mlh_rreg_lh);
1714                         LASSERT(lock != NULL);
1715                         lock_res_and_lock(lock);
1716                         ldlm_set_atomic_cb(lock);
1717                         unlock_res_and_lock(lock);
1718                         LDLM_LOCK_PUT(lock);
1719                 }
1720
1721                 mdt_object_unlock(info, lnkp, lhl, 1);
1722                 RETURN(0);
1723         }
1724
1725         /*
1726          * we can't follow parent-child lock order like other MD
1727          * operations, use lock_try here to avoid deadlock, if the lock
1728          * cannot be taken, drop all locks taken, revoke the blocked
1729          * one, and continue processing the remaining entries, and in
1730          * the end of the loop restart from beginning.
1731          *
1732          * don't lock with PDO mode in case two links are under the same
1733          * parent and their hash values are different.
1734          */
1735         rc = mdt_object_lock_try(info, lnkp, lhl, &ibits, MDS_INODELOCK_UPDATE,
1736                                  LCK_PW, true);
1737         if (rc < 0)
1738                 RETURN(rc);
1739
1740         if (!(ibits & MDS_INODELOCK_UPDATE)) {
1741                 CDEBUG(D_INFO, "busy lock on "DFID"\n", PFID(fid));
1742                 *blocked = true;
1743                 RETURN(-EAGAIN);
1744         }
1745
1746         rc = mdt_migrate_link_lock_add(info, lnkp, lhl, update_locks);
1747         if (rc) {
1748                 mdt_object_unlock(info, lnkp, lhl, 1);
1749                 RETURN(rc);
1750         }
1751
1752         RETURN(1);
1753 }
1754
1755 /* take link LOOKUP lock.
1756  * \retval      0 \a lnkp is already locked, no lock taken.
1757  *              1 lock taken.
1758  *              -ev negative errno.
1759  */
1760 static int mdt_migrate_link_lock(struct mdt_thread_info *info,
1761                                  struct mdt_object *lnkp,
1762                                  struct mdt_object *spobj,
1763                                  struct mdt_object *obj,
1764                                  struct list_head *lookup_locks)
1765 {
1766         const struct lu_fid *fid = mdt_object_fid(lnkp);
1767         struct mdt_lock_handle *lhl = &info->mti_lh[MDT_LH_LOCAL];
1768         struct mdt_link_lock *entry;
1769         int rc;
1770
1771         ENTRY;
1772
1773         /* check if it's already locked by source */
1774         rc = mdt_fids_different_target(info, fid, mdt_object_fid(spobj));
1775         if (rc <= 0) {
1776                 CDEBUG(D_INFO, "skip lookup lock on source parent "DFID"\n",
1777                        PFID(fid));
1778                 RETURN(rc);
1779         }
1780
1781         /* check if it's already locked by other links */
1782         list_for_each_entry(entry, lookup_locks, mll_linkage) {
1783                 rc = mdt_fids_different_target(info, fid,
1784                                                mdt_object_fid(entry->mll_obj));
1785                 if (rc <= 0) {
1786                         CDEBUG(D_INFO, "skip lookup lock on parent "DFID"\n",
1787                                PFID(fid));
1788                         RETURN(rc);
1789                 }
1790         }
1791
1792         rc = mdt_object_lookup_lock(info, lnkp, obj, lhl, LCK_EX, true);
1793         if (rc)
1794                 RETURN(rc);
1795
1796         /* don't take local LOOKUP lock, because later we will lock other ibits
1797          * of sobj (which is on local MDT), and lock the same object twice may
1798          * deadlock, just revoke this lock.
1799          */
1800         if (!mdt_object_remote(lnkp))
1801                 GOTO(unlock, rc = 0);
1802
1803         rc = mdt_migrate_link_lock_add(info, lnkp, lhl, lookup_locks);
1804         if (rc)
1805                 GOTO(unlock, rc);
1806
1807         RETURN(1);
1808 unlock:
1809         mdt_object_unlock(info, lnkp, lhl, 1);
1810         return rc;
1811 }
1812
1813 /*
1814  * take UPDATE lock of link parents and LOOKUP lock of links, also check whether
1815  * total local lock count exceeds RS_MAX_LOCKS.
1816  *
1817  * \retval      0 on success, and locks can be saved in ptlrpc_reply_stat
1818  * \retval      1 on success, but total lock count may exceed RS_MAX_LOCKS
1819  * \retval      -ev negative errno upon error
1820  */
1821 static int mdt_migrate_links_lock(struct mdt_thread_info *info,
1822                                   struct mdt_object *spobj,
1823                                   struct mdt_object *tpobj,
1824                                   struct mdt_object *obj,
1825                                   struct mdt_lock_handle *lhsp,
1826                                   struct mdt_lock_handle *lhtp,
1827                                   struct list_head *link_locks)
1828 {
1829         struct mdt_device *mdt = info->mti_mdt;
1830         struct lu_buf *buf = &info->mti_big_buf;
1831         struct lu_name *lname = &info->mti_name;
1832         struct linkea_data ldata = { NULL };
1833         int local_lock_cnt = 0;
1834         bool blocked = false;
1835         bool saved;
1836         struct mdt_object *lnkp;
1837         struct lu_fid fid;
1838         LIST_HEAD(update_locks);
1839         LIST_HEAD(lookup_locks);
1840         int rc;
1841
1842         ENTRY;
1843         if (S_ISDIR(lu_object_attr(&obj->mot_obj)))
1844                 RETURN(0);
1845
1846         buf = lu_buf_check_and_alloc(buf, MAX_LINKEA_SIZE);
1847         if (buf->lb_buf == NULL)
1848                 RETURN(-ENOMEM);
1849
1850         ldata.ld_buf = buf;
1851         rc = mdt_links_read(info, obj, &ldata);
1852         if (rc) {
1853                 if (rc == -ENOENT || rc == -ENODATA)
1854                         rc = 0;
1855                 RETURN(rc);
1856         }
1857
1858         for (linkea_first_entry(&ldata); ldata.ld_lee && !rc;
1859              linkea_next_entry(&ldata)) {
1860                 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, lname,
1861                                     &fid);
1862
1863                 /* check if link parent is source parent too */
1864                 if (lu_fid_eq(mdt_object_fid(spobj), &fid)) {
1865                         CDEBUG(D_INFO,
1866                                "skip lock on source parent "DFID"/"DNAME"\n",
1867                                PFID(&fid), PNAME(lname));
1868                         continue;
1869                 }
1870
1871                 /* check if link parent is target parent too */
1872                 if (tpobj != spobj && lu_fid_eq(mdt_object_fid(tpobj), &fid)) {
1873                         CDEBUG(D_INFO,
1874                                "skip lock on target parent "DFID"/"DNAME"\n",
1875                                PFID(&fid), PNAME(lname));
1876                         continue;
1877                 }
1878
1879                 lnkp = mdt_object_find(info->mti_env, mdt, &fid);
1880                 if (IS_ERR(lnkp)) {
1881                         CWARN("%s: cannot find obj "DFID": %ld\n",
1882                               mdt_obd_name(mdt), PFID(&fid), PTR_ERR(lnkp));
1883                         continue;
1884                 }
1885
1886                 if (!mdt_object_exists(lnkp)) {
1887                         CDEBUG(D_INFO, DFID" doesn't exist, skip "DNAME"\n",
1888                                PFID(&fid), PNAME(lname));
1889                         mdt_object_put(info->mti_env, lnkp);
1890                         continue;
1891                 }
1892 relock:
1893                 saved = blocked;
1894                 rc = mdt_migrate_link_parent_lock(info, lnkp, &update_locks,
1895                                                   &blocked);
1896                 if (!saved && blocked) {
1897                         /* unlock all locks taken to avoid deadlock */
1898                         mdt_migrate_links_unlock(info, &update_locks, 1);
1899                         mdt_object_unlock(info, spobj, lhsp, 1);
1900                         if (tpobj != spobj)
1901                                 mdt_object_unlock(info, tpobj, lhtp, 1);
1902                         goto relock;
1903                 }
1904                 if (rc < 0) {
1905                         mdt_object_put(info->mti_env, lnkp);
1906                         GOTO(out, rc);
1907                 }
1908
1909                 if (rc == 1 && !mdt_object_remote(lnkp))
1910                         local_lock_cnt++;
1911
1912                 rc = mdt_migrate_link_lock(info, lnkp, spobj, obj,
1913                                            &lookup_locks);
1914                 if (rc < 0) {
1915                         mdt_object_put(info->mti_env, lnkp);
1916                         GOTO(out, rc);
1917                 }
1918                 if (rc == 1 && !mdt_object_remote(lnkp))
1919                         local_lock_cnt++;
1920                 mdt_object_put(info->mti_env, lnkp);
1921         }
1922
1923         if (blocked)
1924                 GOTO(out, rc = -EBUSY);
1925
1926         EXIT;
1927 out:
1928         list_splice(&update_locks, link_locks);
1929         list_splice(&lookup_locks, link_locks);
1930         if (rc < 0) {
1931                 mdt_migrate_links_unlock(info, link_locks, rc);
1932         } else if (local_lock_cnt > RS_MAX_LOCKS - 5) {
1933                 /*
1934                  * parent may have 3 local objects: master object and 2 stripes
1935                  * (if it's being migrated too); source may have 1 local objects
1936                  * as regular file; target has 1 local object.
1937                  * Note, source may have 2 local locks if it is directory but it
1938                  * can't have hardlinks, so it is not considered here.
1939                  */
1940                 CDEBUG(D_INFO, "Too many local locks (%d), migrate in sync mode\n",
1941                        local_lock_cnt);
1942                 rc = 1;
1943         }
1944         return rc;
1945 }
1946
1947 /*
1948  * lookup source by name, if parent is striped directory, we need to find the
1949  * corresponding stripe where source is located, and then lookup there.
1950  *
1951  * besides, if parent is migrating too, and file is already in target stripe,
1952  * this should be a redo of 'lfs migrate' on client side.
1953  *
1954  * \retval 1 tpobj stripe index is less than spobj stripe index
1955  * \retval 0 tpobj stripe index is larger than or equal to spobj stripe index
1956  * \retval -ev negative errno upon error
1957  */
1958 static int mdt_migrate_lookup(struct mdt_thread_info *info,
1959                               struct mdt_object *pobj,
1960                               const struct md_attr *ma,
1961                               const struct lu_name *lname,
1962                               struct mdt_object **spobj,
1963                               struct mdt_object **tpobj,
1964                               struct mdt_object **sobj)
1965 {
1966         const struct lu_env *env = info->mti_env;
1967         struct lu_fid *fid = &info->mti_tmp_fid1;
1968         int spindex = -1;
1969         int tpindex = -1;
1970         int rc;
1971
1972         if (ma->ma_valid & MA_LMV) {
1973                 /* if parent is striped, lookup on corresponding stripe */
1974                 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1975                 struct lu_fid *fid2 = &info->mti_tmp_fid2;
1976
1977                 if (!lmv_is_sane(lmv))
1978                         return -EBADF;
1979
1980                 spindex = lmv_name_to_stripe_index_old(lmv, lname->ln_name,
1981                                                        lname->ln_namelen);
1982                 if (spindex < 0)
1983                         return spindex;
1984
1985                 fid_le_to_cpu(fid2, &lmv->lmv_stripe_fids[spindex]);
1986
1987                 *spobj = mdt_object_find(env, info->mti_mdt, fid2);
1988                 if (IS_ERR(*spobj)) {
1989                         rc = PTR_ERR(*spobj);
1990                         *spobj = NULL;
1991                         return rc;
1992                 }
1993
1994                 if (!mdt_object_exists(*spobj))
1995                         GOTO(spobj_put, rc = -ENOENT);
1996
1997                 fid_zero(fid);
1998                 rc = mdo_lookup(env, mdt_object_child(*spobj), lname, fid,
1999                                 &info->mti_spec);
2000                 if ((rc == -ENOENT || rc == 0) && lmv_is_layout_changing(lmv)) {
2001                         /* fail check here to let top dir migration succeed. */
2002                         if (CFS_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_ENTRIES, 0))
2003                                 GOTO(spobj_put, rc = -EIO);
2004
2005                         /*
2006                          * if parent layout is changeing, and lookup child
2007                          * failed on source stripe, lookup again on target
2008                          * stripe, if it exists, it means previous migration
2009                          * was interrupted, and current file was migrated
2010                          * already.
2011                          */
2012                         tpindex = lmv_name_to_stripe_index(lmv, lname->ln_name,
2013                                                            lname->ln_namelen);
2014                         if (tpindex < 0)
2015                                 GOTO(spobj_put, rc = tpindex);
2016
2017                         fid_le_to_cpu(fid2, &lmv->lmv_stripe_fids[tpindex]);
2018
2019                         *tpobj = mdt_object_find(env, info->mti_mdt, fid2);
2020                         if (IS_ERR(*tpobj)) {
2021                                 rc = PTR_ERR(*tpobj);
2022                                 *tpobj = NULL;
2023                                 GOTO(spobj_put, rc);
2024                         }
2025
2026                         if (!mdt_object_exists(*tpobj))
2027                                 GOTO(tpobj_put, rc = -ENOENT);
2028
2029                         if (rc == -ENOENT) {
2030                                 fid_zero(fid);
2031                                 rc = mdo_lookup(env, mdt_object_child(*tpobj),
2032                                                 lname, fid, &info->mti_spec);
2033                                 GOTO(tpobj_put, rc = rc ?: -EALREADY);
2034                         }
2035                 } else if (rc) {
2036                         GOTO(spobj_put, rc);
2037                 } else {
2038                         *tpobj = *spobj;
2039                         tpindex = spindex;
2040                         mdt_object_get(env, *tpobj);
2041                 }
2042         } else {
2043                 fid_zero(fid);
2044                 rc = mdo_lookup(env, mdt_object_child(pobj), lname, fid,
2045                                 &info->mti_spec);
2046                 if (rc)
2047                         return rc;
2048
2049                 *spobj = pobj;
2050                 *tpobj = pobj;
2051                 mdt_object_get(env, pobj);
2052                 mdt_object_get(env, pobj);
2053         }
2054
2055         *sobj = mdt_object_find(env, info->mti_mdt, fid);
2056         if (IS_ERR(*sobj)) {
2057                 rc = PTR_ERR(*sobj);
2058                 *sobj = NULL;
2059                 GOTO(tpobj_put, rc);
2060         }
2061
2062         if (!mdt_object_exists(*sobj))
2063                 GOTO(sobj_put, rc = -ENOENT);
2064
2065         return (tpindex < spindex);
2066
2067 sobj_put:
2068         mdt_object_put(env, *sobj);
2069         *sobj = NULL;
2070 tpobj_put:
2071         mdt_object_put(env, *tpobj);
2072         *tpobj = NULL;
2073 spobj_put:
2074         mdt_object_put(env, *spobj);
2075         *spobj = NULL;
2076
2077         return rc;
2078 }
2079
2080 /* end lease and close file for regular file */
2081 static int mdd_migrate_close(struct mdt_thread_info *info,
2082                              struct mdt_object *obj)
2083 {
2084         struct close_data *data;
2085         struct mdt_body *repbody;
2086         struct ldlm_lock *lease;
2087         int rc;
2088         int rc2;
2089
2090         rc = -EPROTO;
2091         if (!req_capsule_field_present(info->mti_pill, &RMF_MDT_EPOCH,
2092                                       RCL_CLIENT) ||
2093             !req_capsule_field_present(info->mti_pill, &RMF_CLOSE_DATA,
2094                                       RCL_CLIENT))
2095                 goto close;
2096
2097         data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
2098         if (!data)
2099                 goto close;
2100
2101         rc = -ESTALE;
2102         lease = ldlm_handle2lock(&data->cd_handle);
2103         if (!lease)
2104                 goto close;
2105
2106         /* check if the lease was already canceled */
2107         lock_res_and_lock(lease);
2108         rc = ldlm_is_cancel(lease);
2109         unlock_res_and_lock(lease);
2110
2111         if (rc) {
2112                 rc = -EAGAIN;
2113                 LDLM_DEBUG(lease, DFID" lease broken",
2114                            PFID(mdt_object_fid(obj)));
2115         }
2116
2117         /*
2118          * cancel server side lease, client side counterpart should have been
2119          * cancelled, it's okay to cancel it now as we've held mot_open_sem.
2120          */
2121         ldlm_lock_cancel(lease);
2122         ldlm_reprocess_all(lease->l_resource,
2123                            lease->l_policy_data.l_inodebits.bits);
2124         LDLM_LOCK_PUT(lease);
2125
2126 close:
2127         rc2 = mdt_close_internal(info, mdt_info_req(info), NULL);
2128         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2129         repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
2130
2131         return rc ?: rc2;
2132 }
2133
2134 /* LFSCK used to clear hash type and MIGRATION flag upon migration failure */
2135 static inline bool lmv_is_failed_migration(const struct lmv_mds_md_v1 *lmv)
2136 {
2137         return le32_to_cpu(lmv->lmv_hash_type) ==
2138                 (LMV_HASH_TYPE_UNKNOWN | LMV_HASH_FLAG_BAD_TYPE) &&
2139                lmv_is_known_hash_type(le32_to_cpu(lmv->lmv_migrate_hash)) &&
2140                le32_to_cpu(lmv->lmv_migrate_offset) > 0 &&
2141                le32_to_cpu(lmv->lmv_migrate_offset) <
2142                 le32_to_cpu(lmv->lmv_stripe_count);
2143 }
2144
2145 /*
2146  * migrate file in below steps:
2147  *  1. lock source and target stripes
2148  *  2. lookup source by name
2149  *  3. lock parents of source links if source is not directory
2150  *  4. reject if source is in HSM
2151  *  5. take source open_sem and close file if source is regular file
2152  *  6. lock source, and its stripes if it's directory
2153  *  7. migrate file
2154  *  8. lock target so subsequent change to it can trigger COS
2155  *  9. unlock above locks
2156  * 10. sync device if source has too many links
2157  */
2158 int mdt_reint_migrate(struct mdt_thread_info *info,
2159                       struct mdt_lock_handle *unused)
2160 {
2161         const struct lu_env *env = info->mti_env;
2162         struct mdt_device *mdt = info->mti_mdt;
2163         struct ptlrpc_request *req = mdt_info_req(info);
2164         struct mdt_reint_record *rr = &info->mti_rr;
2165         struct lu_ucred *uc = mdt_ucred(info);
2166         struct md_attr *ma = &info->mti_attr;
2167         struct mdt_object *pobj;
2168         struct mdt_object *spobj;
2169         struct mdt_object *tpobj;
2170         struct mdt_object *sobj;
2171         struct mdt_object *tobj;
2172         struct mdt_lock_handle *rename_lh = &info->mti_lh[MDT_LH_RMT];
2173         struct mdt_lock_handle *lhsp;
2174         struct mdt_lock_handle *lhtp;
2175         struct mdt_lock_handle *lhs;
2176         struct mdt_lock_handle *lhl;
2177         LIST_HEAD(link_locks);
2178         int lock_retries = 5;
2179         bool reverse = false;
2180         bool open_sem_locked = false;
2181         bool do_sync = false;
2182         bool is_plain_dir = false;
2183         int rc;
2184
2185         ENTRY;
2186         CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1),
2187                PNAME(&rr->rr_name), PFID(rr->rr_fid2));
2188
2189         if (info->mti_dlm_req)
2190                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2191
2192         if (!fid_is_md_operative(rr->rr_fid1) ||
2193             !fid_is_md_operative(rr->rr_fid2))
2194                 RETURN(-EPERM);
2195
2196         /* don't allow migrate . or .. */
2197         if (lu_name_is_dot_or_dotdot(&rr->rr_name))
2198                 RETURN(-EBUSY);
2199
2200         if (!mdt->mdt_enable_remote_dir || !mdt->mdt_enable_dir_migration)
2201                 RETURN(-EPERM);
2202
2203         /* we want rbac roles to have precedence over any other
2204          * permission or capability checks
2205          */
2206         if (uc && (!uc->uc_rbac_dne_ops ||
2207                    (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
2208                     uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
2209                     mdt->mdt_enable_remote_dir_gid != -1)))
2210                 RETURN(-EPERM);
2211
2212         /*
2213          * Note: do not enqueue rename lock for replay request, because
2214          * if other MDT holds rename lock, but being blocked to wait for
2215          * this MDT to finish its recovery, and the failover MDT can not
2216          * get rename lock, which will cause deadlock.
2217          *
2218          * req is NULL if this is called by directory auto-split.
2219          */
2220         if (req && !req_is_replay(req)) {
2221                 rc = mdt_rename_lock(info, rename_lh);
2222                 if (rc != 0) {
2223                         CERROR("%s: can't lock FS for rename: rc = %d\n",
2224                                mdt_obd_name(info->mti_mdt), rc);
2225                         RETURN(rc);
2226                 }
2227         }
2228
2229         /* pobj is master object of parent */
2230         pobj = mdt_object_find(env, mdt, rr->rr_fid1);
2231         if (IS_ERR(pobj))
2232                 GOTO(unlock_rename, rc = PTR_ERR(pobj));
2233
2234         if (req) {
2235                 rc = mdt_version_get_check(info, pobj, 0);
2236                 if (rc)
2237                         GOTO(put_parent, rc);
2238         }
2239
2240         if (!mdt_object_exists(pobj))
2241                 GOTO(put_parent, rc = -ENOENT);
2242
2243         if (!S_ISDIR(lu_object_attr(&pobj->mot_obj)))
2244                 GOTO(put_parent, rc = -ENOTDIR);
2245
2246         rc = mdt_check_enc(info, pobj);
2247         if (rc)
2248                 GOTO(put_parent, rc);
2249
2250         rc = mdt_stripe_get(info, pobj, ma, XATTR_NAME_LMV);
2251         if (rc)
2252                 GOTO(put_parent, rc);
2253
2254         if (CFS_FAIL_CHECK(OBD_FAIL_MIGRATE_BAD_HASH) &&
2255             (ma->ma_valid & MA_LMV) &&
2256             lmv_is_migrating(&ma->ma_lmv->lmv_md_v1)) {
2257                 struct lu_buf *buf = &info->mti_buf;
2258                 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
2259                 __u32 version = le32_to_cpu(lmv->lmv_layout_version);
2260
2261                 lmv->lmv_hash_type = cpu_to_le32(LMV_HASH_TYPE_UNKNOWN |
2262                                                  LMV_HASH_FLAG_BAD_TYPE);
2263                 lmv->lmv_layout_version = cpu_to_le32(version + 1);
2264                 buf->lb_buf = lmv;
2265                 buf->lb_len = sizeof(*lmv);
2266                 rc = mo_xattr_set(env, mdt_object_child(pobj), buf,
2267                                   XATTR_NAME_LMV, LU_XATTR_REPLACE);
2268                 mo_invalidate(env, mdt_object_child(pobj));
2269                 GOTO(put_parent, rc);
2270         }
2271
2272         /* @spobj is the parent stripe of @sobj if @pobj is striped directory,
2273          * if @pobj is migrating too, tpobj is the target parent stripe.
2274          */
2275         rc = mdt_migrate_lookup(info, pobj, ma, &rr->rr_name, &spobj, &tpobj,
2276                                 &sobj);
2277         if (rc < 0)
2278                 GOTO(put_parent, rc);
2279         reverse = rc;
2280
2281         /* parent unchanged, this happens in dir restripe */
2282         if (info->mti_spec.sp_migrate_nsonly && spobj == tpobj)
2283                 GOTO(put_source, rc = -EALREADY);
2284
2285 lock_parent:
2286         LASSERT(spobj);
2287         LASSERT(tpobj);
2288         lhsp = &info->mti_lh[MDT_LH_PARENT];
2289         lhtp = &info->mti_lh[MDT_LH_CHILD];
2290         /* lock spobj and tpobj in stripe index order */
2291         if (reverse) {
2292                 rc = mdt_parent_lock(info, tpobj, lhtp, &rr->rr_name, LCK_PW,
2293                                      true);
2294                 if (rc)
2295                         GOTO(put_source, rc);
2296
2297                 LASSERT(spobj != tpobj);
2298                 rc = mdt_parent_lock(info, spobj, lhsp, &rr->rr_name, LCK_PW,
2299                                      true);
2300                 if (rc)
2301                         GOTO(unlock_parent, rc);
2302         } else {
2303                 rc = mdt_parent_lock(info, spobj, lhsp, &rr->rr_name, LCK_PW,
2304                                      true);
2305                 if (rc)
2306                         GOTO(put_source, rc);
2307
2308                 if (tpobj != spobj) {
2309                         rc = mdt_parent_lock(info, tpobj, lhtp, &rr->rr_name,
2310                                              LCK_PW, true);
2311                         if (rc)
2312                                 GOTO(unlock_parent, rc);
2313                 }
2314         }
2315
2316         /* if inode is not migrated, or is dir, no need to lock links */
2317         if (!info->mti_spec.sp_migrate_nsonly &&
2318             !S_ISDIR(lu_object_attr(&sobj->mot_obj))) {
2319                 /* lock link parents, and take LOOKUP lock of links */
2320                 rc = mdt_migrate_links_lock(info, spobj, tpobj, sobj, lhsp,
2321                                             lhtp, &link_locks);
2322                 if (rc == -EBUSY && lock_retries-- > 0) {
2323                         LASSERT(list_empty(&link_locks));
2324                         goto lock_parent;
2325                 }
2326
2327                 if (rc < 0)
2328                         GOTO(put_source, rc);
2329
2330                 /*
2331                  * RS_MAX_LOCKS is the limit of number of locks that can be
2332                  * saved along with one request, if total lock count exceeds
2333                  * this limit, we will drop all locks after migration, and
2334                  * trigger commit in the end.
2335                  */
2336                 do_sync = rc;
2337         }
2338
2339         /* lock source */
2340         lhs = &info->mti_lh[MDT_LH_OLD];
2341         lhl = &info->mti_lh[MDT_LH_LOOKUP];
2342         rc = mdt_rename_source_lock(info, spobj, sobj, lhs, lhl,
2343                                     MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR |
2344                                     MDS_INODELOCK_OPEN, true);
2345         if (rc)
2346                 GOTO(unlock_links, rc);
2347
2348         if (S_ISREG(lu_object_attr(&sobj->mot_obj))) {
2349                 /* TODO: DoM migration is not supported, migrate dirent only */
2350                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LOV);
2351                 if (rc)
2352                         GOTO(unlock_source, rc);
2353
2354                 if (ma->ma_valid & MA_LOV && mdt_lmm_dom_stripesize(ma->ma_lmm))
2355                         info->mti_spec.sp_migrate_nsonly = 1;
2356         } else if (S_ISDIR(lu_object_attr(&sobj->mot_obj))) {
2357                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2358                 if (rc)
2359                         GOTO(unlock_source, rc);
2360
2361                 if (!(ma->ma_valid & MA_LMV))
2362                         is_plain_dir = true;
2363                 else if (lmv_is_restriping(&ma->ma_lmv->lmv_md_v1))
2364                         /* race with restripe/auto-split */
2365                         GOTO(unlock_source, rc = -EBUSY);
2366                 else if (lmv_is_failed_migration(&ma->ma_lmv->lmv_md_v1)) {
2367                         struct lu_buf *buf = &info->mti_buf;
2368                         struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
2369                         __u32 version = le32_to_cpu(lmv->lmv_layout_version);
2370
2371                         /* migration failed before, and LFSCK cleared hash type
2372                          * and flags, fake it to resume migration.
2373                          */
2374                         lmv->lmv_hash_type =
2375                                 cpu_to_le32(LMV_HASH_TYPE_FNV_1A_64 |
2376                                             LMV_HASH_FLAG_MIGRATION |
2377                                             LMV_HASH_FLAG_BAD_TYPE |
2378                                             LMV_HASH_FLAG_FIXED);
2379                         lmv->lmv_layout_version = cpu_to_le32(version + 1);
2380                         buf->lb_buf = lmv;
2381                         buf->lb_len = sizeof(*lmv);
2382                         rc = mo_xattr_set(env, mdt_object_child(sobj), buf,
2383                                           XATTR_NAME_LMV, LU_XATTR_REPLACE);
2384                         mo_invalidate(env, mdt_object_child(sobj));
2385                         GOTO(unlock_source, rc = -EALREADY);
2386                 }
2387         }
2388
2389         /* if migration HSM is allowed */
2390         if (!mdt->mdt_opts.mo_migrate_hsm_allowed) {
2391                 ma->ma_need = MA_HSM;
2392                 ma->ma_valid = 0;
2393                 rc = mdt_attr_get_complex(info, sobj, ma);
2394                 if (rc)
2395                         GOTO(unlock_source, rc);
2396
2397                 if ((ma->ma_valid & MA_HSM) && ma->ma_hsm.mh_flags != 0)
2398                         GOTO(unlock_source, rc = -EOPNOTSUPP);
2399         }
2400
2401         /* end lease and close file for regular file */
2402         if (info->mti_spec.sp_migrate_close) {
2403                 /* try to hold open_sem so that nobody else can open the file */
2404                 if (!down_write_trylock(&sobj->mot_open_sem)) {
2405                         /* close anyway */
2406                         mdd_migrate_close(info, sobj);
2407                         GOTO(unlock_source, rc = -EBUSY);
2408                 } else {
2409                         open_sem_locked = true;
2410                         rc = mdd_migrate_close(info, sobj);
2411                         if (rc && rc != -ESTALE)
2412                                 GOTO(unlock_open_sem, rc);
2413                 }
2414         }
2415
2416         tobj = mdt_object_find(env, mdt, rr->rr_fid2);
2417         if (IS_ERR(tobj))
2418                 GOTO(unlock_open_sem, rc = PTR_ERR(tobj));
2419
2420         /* Don't do lookup sanity check. We know name doesn't exist. */
2421         info->mti_spec.sp_cr_lookup = 0;
2422         info->mti_spec.sp_feat = &dt_directory_features;
2423
2424         rc = mdo_migrate(env, mdt_object_child(spobj),
2425                          mdt_object_child(tpobj), mdt_object_child(sobj),
2426                          mdt_object_child(tobj), &rr->rr_name,
2427                          &info->mti_spec, ma);
2428         if (rc)
2429                 GOTO(put_target, rc);
2430
2431         /* save target locks for directory */
2432         if (S_ISDIR(lu_object_attr(&sobj->mot_obj)) &&
2433             !info->mti_spec.sp_migrate_nsonly) {
2434                 struct mdt_lock_handle *lht = &info->mti_lh[MDT_LH_NEW];
2435                 struct ldlm_enqueue_info *einfo = &info->mti_einfo;
2436
2437                 /* in case sobj becomes a stripe of tobj, unlock sobj here,
2438                  * otherwise stripes lock may deadlock.
2439                  */
2440                 if (is_plain_dir)
2441                         mdt_rename_source_unlock(info, sobj, lhs, lhl, 1);
2442
2443                 rc = mdt_object_stripes_lock(info, tpobj, tobj, lht, einfo,
2444                                              MDS_INODELOCK_UPDATE, LCK_PW,
2445                                              true);
2446                 if (rc)
2447                         GOTO(put_target, rc);
2448
2449                 mdt_object_stripes_unlock(info, tobj, lht, einfo, 0);
2450         }
2451
2452         lprocfs_counter_incr(mdt->mdt_lu_dev.ld_obd->obd_md_stats,
2453                              LPROC_MDT_MIGRATE + LPROC_MD_LAST_OPC);
2454
2455         EXIT;
2456 put_target:
2457         mdt_object_put(env, tobj);
2458 unlock_open_sem:
2459         if (open_sem_locked)
2460                 up_write(&sobj->mot_open_sem);
2461 unlock_source:
2462         mdt_rename_source_unlock(info, sobj, lhs, lhl, rc);
2463 unlock_links:
2464         /* if we've got too many locks to save into RPC,
2465          * then just commit before the locks are released
2466          */
2467         if (!rc && do_sync)
2468                 mdt_device_sync(env, mdt);
2469         mdt_migrate_links_unlock(info, &link_locks, do_sync ? 1 : rc);
2470 unlock_parent:
2471         mdt_object_unlock(info, spobj, lhsp, rc);
2472         mdt_object_unlock(info, tpobj, lhtp, rc);
2473 put_source:
2474         mdt_object_put(env, sobj);
2475         mdt_object_put(env, spobj);
2476         mdt_object_put(env, tpobj);
2477 put_parent:
2478         mo_invalidate(env, mdt_object_child(pobj));
2479         mdt_object_put(env, pobj);
2480 unlock_rename:
2481         mdt_rename_unlock(info, rename_lh);
2482
2483         if (rc)
2484                 CERROR("%s: migrate "DFID"/"DNAME" failed: rc = %d\n",
2485                        mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1),
2486                        PNAME(&rr->rr_name), rc);
2487
2488         return rc;
2489 }
2490
2491 /*
2492  * determine lock order of sobj and tobj
2493  *
2494  * there are two situations we need to lock tobj before sobj:
2495  * 1. sobj is child of tobj
2496  * 2. sobj and tobj are stripes of a directory, and stripe index of sobj is
2497  *    larger than that of tobj
2498  *
2499  * \retval      1 lock tobj before sobj
2500  * \retval      0 lock sobj before tobj
2501  * \retval      -ev negative errno upon error
2502  */
2503 static int mdt_rename_determine_lock_order(struct mdt_thread_info *info,
2504                                            struct mdt_object *sobj,
2505                                            struct mdt_object *tobj)
2506 {
2507         struct md_attr *ma = &info->mti_attr;
2508         struct lu_fid *spfid = &info->mti_tmp_fid1;
2509         struct lu_fid *tpfid = &info->mti_tmp_fid2;
2510         struct lmv_mds_md_v1 *lmv;
2511         __u32 sindex;
2512         __u32 tindex;
2513         int rc;
2514
2515         /* sobj and tobj are the same */
2516         if (sobj == tobj)
2517                 return 0;
2518
2519         if (fid_is_root(mdt_object_fid(sobj)))
2520                 return 0;
2521
2522         if (fid_is_root(mdt_object_fid(tobj)))
2523                 return 1;
2524
2525         /* check whether sobj is child of tobj */
2526         rc = mdo_is_subdir(info->mti_env, mdt_object_child(sobj),
2527                            mdt_object_fid(tobj));
2528         if (rc < 0)
2529                 return rc;
2530
2531         if (rc == 1)
2532                 return 1;
2533
2534         /* check whether sobj and tobj are children of the same parent */
2535         rc = mdt_attr_get_pfid(info, sobj, spfid);
2536         if (rc)
2537                 return rc;
2538
2539         rc = mdt_attr_get_pfid(info, tobj, tpfid);
2540         if (rc)
2541                 return rc;
2542
2543         if (!lu_fid_eq(spfid, tpfid))
2544                 return 0;
2545
2546         /* check whether sobj and tobj are sibling stripes */
2547         rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2548         if (rc)
2549                 return rc;
2550
2551         if (!(ma->ma_valid & MA_LMV))
2552                 return 0;
2553
2554         lmv = &ma->ma_lmv->lmv_md_v1;
2555         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2556                 return 0;
2557         sindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2558
2559         ma->ma_valid = 0;
2560         rc = mdt_stripe_get(info, tobj, ma, XATTR_NAME_LMV);
2561         if (rc)
2562                 return rc;
2563
2564         if (!(ma->ma_valid & MA_LMV))
2565                 return -ENODATA;
2566
2567         lmv = &ma->ma_lmv->lmv_md_v1;
2568         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2569                 return -EINVAL;
2570         tindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2571
2572         /* check stripe index of sobj and tobj */
2573         if (sindex == tindex)
2574                 return -EINVAL;
2575
2576         return sindex < tindex ? 0 : 1;
2577 }
2578
2579 /* Helper function for mdt_reint_rename so we don't need to opencode
2580  * two different order lockings
2581  */
2582 static int mdt_lock_two_dirs(struct mdt_thread_info *info,
2583                              struct mdt_object *mfirstdir,
2584                              struct mdt_lock_handle *lh_firstdirp,
2585                              const struct lu_name *firstname,
2586                              struct mdt_object *mseconddir,
2587                              struct mdt_lock_handle *lh_seconddirp,
2588                              const struct lu_name *secondname,
2589                              bool cos_incompat)
2590 {
2591         int rc;
2592
2593         rc = mdt_parent_lock(info, mfirstdir, lh_firstdirp, firstname, LCK_PW,
2594                              cos_incompat);
2595         if (rc)
2596                 return rc;
2597
2598         mdt_version_get_save(info, mfirstdir, 0);
2599         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
2600
2601         if (mfirstdir != mseconddir) {
2602                 rc = mdt_parent_lock(info, mseconddir, lh_seconddirp,
2603                                      secondname, LCK_PW, cos_incompat);
2604         } else if (!mdt_object_remote(mseconddir)) {
2605                 if (lh_firstdirp->mlh_pdo_hash !=
2606                     lh_seconddirp->mlh_pdo_hash) {
2607                         rc = mdt_object_pdo_lock(info, mseconddir,
2608                                                  lh_seconddirp, secondname,
2609                                                  LCK_PW, false, cos_incompat);
2610                         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
2611                 }
2612         }
2613         mdt_version_get_save(info, mseconddir, 1);
2614
2615         if (rc != 0)
2616                 mdt_object_unlock(info, mfirstdir, lh_firstdirp, rc);
2617
2618         return rc;
2619 }
2620
2621 /*
2622  * VBR: rename versions in reply: 0 - srcdir parent; 1 - tgtdir parent;
2623  * 2 - srcdir child; 3 - tgtdir child.
2624  * Update on disk version of srcdir child.
2625  */
2626 static int mdt_reint_rename(struct mdt_thread_info *info,
2627                             struct mdt_lock_handle *unused)
2628 {
2629         struct mdt_device *mdt = info->mti_mdt;
2630         struct mdt_reint_record *rr = &info->mti_rr;
2631         struct md_attr *ma = &info->mti_attr;
2632         struct ptlrpc_request *req = mdt_info_req(info);
2633         struct mdt_object *msrcdir = NULL;
2634         struct mdt_object *mtgtdir = NULL;
2635         struct mdt_object *mold;
2636         struct mdt_object *mnew = NULL;
2637         struct mdt_lock_handle *rename_lh = &info->mti_lh[MDT_LH_RMT];
2638         struct mdt_lock_handle *lh_srcdirp;
2639         struct mdt_lock_handle *lh_tgtdirp;
2640         struct mdt_lock_handle *lh_oldp = NULL;
2641         struct mdt_lock_handle *lh_lookup = NULL;
2642         struct mdt_lock_handle *lh_newp = NULL;
2643         struct lu_fid *old_fid = &info->mti_tmp_fid1;
2644         struct lu_fid *new_fid = &info->mti_tmp_fid2;
2645         struct lu_ucred *uc = mdt_ucred(info);
2646         bool reverse = false, discard = false;
2647         bool cos_incompat;
2648         ktime_t kstart = ktime_get();
2649         enum mdt_stat_idx msi = 0;
2650         int rc;
2651
2652         ENTRY;
2653         DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME,
2654                   PFID(rr->rr_fid1), PNAME(&rr->rr_name),
2655                   PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name));
2656
2657         if (info->mti_dlm_req)
2658                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2659
2660         if (!fid_is_md_operative(rr->rr_fid1) ||
2661             !fid_is_md_operative(rr->rr_fid2))
2662                 RETURN(-EPERM);
2663
2664         /* find both parents. */
2665         msrcdir = mdt_parent_find_check(info, rr->rr_fid1, 0);
2666         if (IS_ERR(msrcdir))
2667                 RETURN(PTR_ERR(msrcdir));
2668
2669         rc = mdt_check_enc(info, msrcdir);
2670         if (rc)
2671                 GOTO(out_put_srcdir, rc);
2672
2673         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
2674
2675         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
2676                 mtgtdir = msrcdir;
2677                 mdt_object_get(info->mti_env, mtgtdir);
2678         } else {
2679                 mtgtdir = mdt_parent_find_check(info, rr->rr_fid2, 1);
2680                 if (IS_ERR(mtgtdir))
2681                         GOTO(out_put_srcdir, rc = PTR_ERR(mtgtdir));
2682         }
2683
2684         rc = mdt_check_enc(info, mtgtdir);
2685         if (rc)
2686                 GOTO(out_put_tgtdir, rc);
2687
2688         if (!uc->uc_rbac_fscrypt_admin &&
2689             mtgtdir->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD)
2690                 GOTO(out_put_tgtdir, rc = -EPERM);
2691
2692         /*
2693          * Note: do not enqueue rename lock for replay request, because
2694          * if other MDT holds rename lock, but being blocked to wait for
2695          * this MDT to finish its recovery, and the failover MDT can not
2696          * get rename lock, which will cause deadlock.
2697          */
2698         if (!req_is_replay(req)) {
2699                 bool remote = mdt_object_remote(msrcdir);
2700
2701                 /*
2702                  * Normally rename RPC is handled on the MDT with the target
2703                  * directory (if target exists, it's on the MDT with the
2704                  * target), if the source directory is remote, it's a hint that
2705                  * source is remote too (this may not be true, but it won't
2706                  * cause any issue), return -EXDEV early to avoid taking
2707                  * rename_lock.
2708                  */
2709                 if (!mdt->mdt_enable_remote_rename && remote)
2710                         GOTO(out_put_tgtdir, rc = -EXDEV);
2711
2712                 /* This might be further relaxed in the future for regular file
2713                  * renames in different source and target parents. Start with
2714                  * only same-directory renames for simplicity and because this
2715                  * is by far the most the common use case.
2716                  *
2717                  * Striped directories should be considered "remote".
2718                  */
2719                 if (msrcdir != mtgtdir || remote ||
2720                     (S_ISDIR(ma->ma_attr.la_mode) &&
2721                      !mdt->mdt_enable_parallel_rename_dir) ||
2722                     (!S_ISDIR(ma->ma_attr.la_mode) &&
2723                      !mdt->mdt_enable_parallel_rename_file)) {
2724                         rc = mdt_rename_lock(info, rename_lh);
2725                         if (rc != 0) {
2726                                 CERROR("%s: cannot lock for rename: rc = %d\n",
2727                                        mdt_obd_name(mdt), rc);
2728                                 GOTO(out_put_tgtdir, rc);
2729                         }
2730                 } else {
2731                         if (S_ISDIR(ma->ma_attr.la_mode))
2732                                 msi = LPROC_MDT_RENAME_PAR_DIR;
2733                         else
2734                                 msi = LPROC_MDT_RENAME_PAR_FILE;
2735
2736                         CDEBUG(D_INFO,
2737                                "%s: samedir parallel rename "DFID"/"DNAME"\n",
2738                                mdt_obd_name(mdt), PFID(rr->rr_fid1),
2739                                PNAME(&rr->rr_name));
2740                 }
2741         }
2742
2743         rc = mdt_rename_determine_lock_order(info, msrcdir, mtgtdir);
2744         if (rc < 0)
2745                 GOTO(out_unlock_rename, rc);
2746         reverse = rc;
2747
2748         /* source needs to be looked up after locking source parent, otherwise
2749          * this rename may race with unlink source, and cause rename hang, see
2750          * sanityn.sh 55b, so check parents first, if later we found source is
2751          * remote, relock parents.
2752          */
2753         cos_incompat = (mdt_object_remote(msrcdir) ||
2754                         mdt_object_remote(mtgtdir));
2755
2756         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2757
2758         /* lock parents in the proper order. */
2759         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
2760         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
2761
2762         CFS_RACE(OBD_FAIL_MDS_REINT_OPEN);
2763         CFS_RACE(OBD_FAIL_MDS_REINT_OPEN2);
2764 relock:
2765         mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name);
2766         mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name);
2767
2768         /* In case of same dir local rename we must sort by the hash,
2769          * otherwise a lock deadlock is possible when renaming
2770          * a to b and b to a at the same time LU-15285
2771          */
2772         if (!mdt_object_remote(mtgtdir) && mtgtdir == msrcdir)
2773                 reverse = lh_srcdirp->mlh_pdo_hash > lh_tgtdirp->mlh_pdo_hash;
2774         if (unlikely(CFS_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK)))
2775                 reverse = 0;
2776
2777         if (reverse)
2778                 rc = mdt_lock_two_dirs(info, mtgtdir, lh_tgtdirp,
2779                                        &rr->rr_tgt_name, msrcdir, lh_srcdirp,
2780                                        &rr->rr_name, cos_incompat);
2781         else
2782                 rc = mdt_lock_two_dirs(info, msrcdir, lh_srcdirp, &rr->rr_name,
2783                                        mtgtdir, lh_tgtdirp, &rr->rr_tgt_name,
2784                                        cos_incompat);
2785
2786         if (rc != 0)
2787                 GOTO(out_unlock_rename, rc);
2788
2789         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2790         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME2, 5);
2791
2792         /* find mold object. */
2793         fid_zero(old_fid);
2794         rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2);
2795         if (rc != 0)
2796                 GOTO(out_unlock_parents, rc);
2797
2798         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
2799                 GOTO(out_unlock_parents, rc = -EINVAL);
2800
2801         if (!fid_is_md_operative(old_fid))
2802                 GOTO(out_unlock_parents, rc = -EPERM);
2803
2804         mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
2805         if (IS_ERR(mold))
2806                 GOTO(out_unlock_parents, rc = PTR_ERR(mold));
2807
2808         if (!mdt_object_exists(mold)) {
2809                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2810                                 &mold->mot_obj,
2811                                 "object does not exist");
2812                 GOTO(out_put_old, rc = -ENOENT);
2813         }
2814
2815         if (mdt_object_remote(mold) && !mdt->mdt_enable_remote_rename)
2816                 GOTO(out_put_old, rc = -EXDEV);
2817
2818         /* Check if @mtgtdir is subdir of @mold, before locking child
2819          * to avoid reverse locking.
2820          */
2821         if (mtgtdir != msrcdir) {
2822                 rc = mdo_is_subdir(info->mti_env, mdt_object_child(mtgtdir),
2823                                    old_fid);
2824                 if (rc) {
2825                         if (rc == 1)
2826                                 rc = -EINVAL;
2827                         GOTO(out_put_old, rc);
2828                 }
2829         }
2830
2831         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold));
2832         /* save version after locking */
2833         mdt_version_get_save(info, mold, 2);
2834
2835         if (!cos_incompat && mdt_object_remote(mold)) {
2836                 cos_incompat = true;
2837                 mdt_object_put(info->mti_env, mold);
2838                 mdt_object_unlock(info, mtgtdir, lh_tgtdirp, -EAGAIN);
2839                 mdt_object_unlock(info, msrcdir, lh_srcdirp, -EAGAIN);
2840                 goto relock;
2841         }
2842
2843         /* find mnew object:
2844          * mnew target object may not exist now
2845          * lookup with version checking
2846          */
2847         fid_zero(new_fid);
2848         rc = mdt_lookup_version_check(info, mtgtdir, &rr->rr_tgt_name, new_fid,
2849                                       3);
2850         if (rc == 0) {
2851                 /* the new_fid should have been filled at this moment */
2852                 if (lu_fid_eq(old_fid, new_fid))
2853                         GOTO(out_put_old, rc);
2854
2855                 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
2856                     lu_fid_eq(new_fid, rr->rr_fid2))
2857                         GOTO(out_put_old, rc = -EINVAL);
2858
2859                 if (!fid_is_md_operative(new_fid))
2860                         GOTO(out_put_old, rc = -EPERM);
2861
2862                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
2863                 if (IS_ERR(mnew))
2864                         GOTO(out_put_old, rc = PTR_ERR(mnew));
2865
2866                 if (!mdt_object_exists(mnew)) {
2867                         LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2868                                         &mnew->mot_obj,
2869                                         "object does not exist");
2870                         GOTO(out_put_new, rc = -ENOENT);
2871                 }
2872
2873                 if (mdt_object_remote(mnew)) {
2874                         struct mdt_body  *repbody;
2875
2876                         /* Always send rename req to the target child MDT */
2877                         repbody = req_capsule_server_get(info->mti_pill,
2878                                                          &RMF_MDT_BODY);
2879                         LASSERT(repbody != NULL);
2880                         repbody->mbo_fid1 = *new_fid;
2881                         repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
2882                         GOTO(out_put_new, rc = -EXDEV);
2883                 }
2884                 /* Before locking the target dir, check we do not replace
2885                  * a dir with a non-dir, otherwise it may deadlock with
2886                  * link op which tries to create a link in this dir
2887                  * back to this non-dir.
2888                  */
2889                 if (S_ISDIR(lu_object_attr(&mnew->mot_obj)) &&
2890                     !S_ISDIR(lu_object_attr(&mold->mot_obj)))
2891                         GOTO(out_put_new, rc = -EISDIR);
2892
2893                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2894                 lh_lookup = &info->mti_lh[MDT_LH_LOOKUP];
2895                 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2896                                             lh_lookup,
2897                                             MDS_INODELOCK_LOOKUP |
2898                                             MDS_INODELOCK_XATTR, cos_incompat);
2899                 if (rc < 0)
2900                         GOTO(out_put_new, rc);
2901
2902                 /* Check if @msrcdir is subdir of @mnew, before locking child
2903                  * to avoid reverse locking.
2904                  */
2905                 if (mtgtdir != msrcdir) {
2906                         rc = mdo_is_subdir(info->mti_env,
2907                                            mdt_object_child(msrcdir), new_fid);
2908                         if (rc) {
2909                                 if (rc == 1)
2910                                         rc = -EINVAL;
2911                                 GOTO(out_unlock_old, rc);
2912                         }
2913                 }
2914
2915                 /* We used to acquire MDS_INODELOCK_FULL here but we
2916                  * can't do this now because a running HSM restore on
2917                  * the rename onto victim will hold the layout
2918                  * lock. See LU-4002.
2919                  */
2920
2921                 lh_newp = &info->mti_lh[MDT_LH_NEW];
2922                 rc = mdt_object_check_lock(info, mtgtdir, mnew, lh_newp,
2923                                            MDS_INODELOCK_LOOKUP |
2924                                            MDS_INODELOCK_UPDATE, LCK_EX,
2925                                            cos_incompat);
2926                 if (rc != 0)
2927                         GOTO(out_unlock_new, rc);
2928
2929                 /* get and save version after locking */
2930                 mdt_version_get_save(info, mnew, 3);
2931         } else if (rc != -ENOENT) {
2932                 GOTO(out_put_old, rc);
2933         } else {
2934                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2935                 lh_lookup = &info->mti_lh[MDT_LH_LOOKUP];
2936                 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2937                                             lh_lookup,
2938                                             MDS_INODELOCK_LOOKUP |
2939                                             MDS_INODELOCK_XATTR, cos_incompat);
2940                 if (rc != 0)
2941                         GOTO(out_put_old, rc);
2942
2943                 mdt_enoent_version_save(info, 3);
2944         }
2945
2946         /* step 5: rename it */
2947         mdt_reint_init_ma(info, ma);
2948
2949         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
2950                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
2951
2952         if (mnew != NULL)
2953                 mutex_lock(&mnew->mot_lov_mutex);
2954
2955         rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
2956                         mdt_object_child(mtgtdir), old_fid, &rr->rr_name,
2957                         mnew != NULL ? mdt_object_child(mnew) : NULL,
2958                         &rr->rr_tgt_name, ma);
2959
2960         if (mnew != NULL)
2961                 mutex_unlock(&mnew->mot_lov_mutex);
2962
2963         /* handle last link of tgt object */
2964         if (rc == 0) {
2965                 if (mnew) {
2966                         mdt_handle_last_unlink(info, mnew, ma);
2967                         discard = mdt_dom_check_for_discard(info, mnew);
2968                 }
2969                 mdt_rename_counter_tally(info, info->mti_mdt, req,
2970                                          msrcdir, mtgtdir, msi,
2971                                          ktime_us_delta(ktime_get(), kstart));
2972         }
2973
2974         EXIT;
2975 out_unlock_new:
2976         if (mnew != NULL)
2977                 mdt_object_unlock(info, mnew, lh_newp, rc);
2978 out_unlock_old:
2979         mdt_object_unlock(info, NULL, lh_lookup, rc);
2980         mdt_object_unlock(info, mold, lh_oldp, rc);
2981 out_put_new:
2982         if (mnew && !discard)
2983                 mdt_object_put(info->mti_env, mnew);
2984 out_put_old:
2985         mdt_object_put(info->mti_env, mold);
2986 out_unlock_parents:
2987         mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
2988         mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
2989 out_unlock_rename:
2990         mdt_rename_unlock(info, rename_lh);
2991 out_put_tgtdir:
2992         mdt_object_put(info->mti_env, mtgtdir);
2993 out_put_srcdir:
2994         mdt_object_put(info->mti_env, msrcdir);
2995
2996         /* The DoM discard can be done right in the place above where it is
2997          * assigned, meanwhile it is done here after rename unlock due to
2998          * compatibility with old clients, for them the discard blocks
2999          * the main thread until completion. Check LU-11359 for details.
3000          */
3001         if (discard) {
3002                 mdt_dom_discard_data(info, mnew);
3003                 mdt_object_put(info->mti_env, mnew);
3004         }
3005         CFS_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
3006         return rc;
3007 }
3008
3009 static int mdt_reint_resync(struct mdt_thread_info *info,
3010                             struct mdt_lock_handle *lhc)
3011 {
3012         struct mdt_reint_record *rr = &info->mti_rr;
3013         struct ptlrpc_request *req = mdt_info_req(info);
3014         struct md_attr *ma = &info->mti_attr;
3015         struct mdt_object *mo;
3016         struct ldlm_lock *lease;
3017         struct mdt_body *repbody;
3018         struct md_layout_change layout = { .mlc_mirror_id = rr->rr_mirror_id };
3019         bool lease_broken;
3020         int rc;
3021
3022         ENTRY;
3023         DEBUG_REQ(D_INODE, req, DFID", FLR file resync", PFID(rr->rr_fid1));
3024
3025         if (info->mti_dlm_req)
3026                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
3027
3028         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
3029         if (IS_ERR(mo))
3030                 GOTO(out, rc = PTR_ERR(mo));
3031
3032         if (!mdt_object_exists(mo))
3033                 GOTO(out_obj, rc = -ENOENT);
3034
3035         if (!S_ISREG(lu_object_attr(&mo->mot_obj)))
3036                 GOTO(out_obj, rc = -EINVAL);
3037
3038         if (mdt_object_remote(mo))
3039                 GOTO(out_obj, rc = -EREMOTE);
3040
3041         lease = ldlm_handle2lock(rr->rr_lease_handle);
3042         if (lease == NULL)
3043                 GOTO(out_obj, rc = -ESTALE);
3044
3045         /* It's really necessary to grab open_sem and check if the lease lock
3046          * has been lost. There would exist a concurrent writer coming in and
3047          * generating some dirty data in memory cache, the writeback would fail
3048          * after the layout version is increased by MDS_REINT_RESYNC RPC.
3049          */
3050         if (!down_write_trylock(&mo->mot_open_sem))
3051                 GOTO(out_put_lease, rc = -EBUSY);
3052
3053         lock_res_and_lock(lease);
3054         lease_broken = ldlm_is_cancel(lease);
3055         unlock_res_and_lock(lease);
3056         if (lease_broken)
3057                 GOTO(out_unlock, rc = -EBUSY);
3058
3059         /* the file has yet opened by anyone else after we took the lease. */
3060         layout.mlc_opc = MD_LAYOUT_RESYNC;
3061         lhc = &info->mti_lh[MDT_LH_LOCAL];
3062         rc = mdt_layout_change(info, mo, lhc, &layout);
3063         if (rc)
3064                 GOTO(out_unlock, rc);
3065
3066         mdt_object_unlock(info, mo, lhc, 0);
3067
3068         ma->ma_need = MA_INODE;
3069         ma->ma_valid = 0;
3070         rc = mdt_attr_get_complex(info, mo, ma);
3071         if (rc != 0)
3072                 GOTO(out_unlock, rc);
3073
3074         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
3075         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
3076
3077         EXIT;
3078 out_unlock:
3079         up_write(&mo->mot_open_sem);
3080 out_put_lease:
3081         LDLM_LOCK_PUT(lease);
3082 out_obj:
3083         mdt_object_put(info->mti_env, mo);
3084 out:
3085         mdt_client_compatibility(info);
3086         return rc;
3087 }
3088
3089 struct mdt_reinter {
3090         int (*mr_handler)(struct mdt_thread_info *, struct mdt_lock_handle *);
3091         enum lprocfs_extra_opc mr_extra_opc;
3092 };
3093
3094 static const struct mdt_reinter mdt_reinters[] = {
3095         [REINT_SETATTR] = {
3096                 .mr_handler = &mdt_reint_setattr,
3097                 .mr_extra_opc = MDS_REINT_SETATTR,
3098         },
3099         [REINT_CREATE] = {
3100                 .mr_handler = &mdt_reint_create,
3101                 .mr_extra_opc = MDS_REINT_CREATE,
3102         },
3103         [REINT_LINK] = {
3104                 .mr_handler = &mdt_reint_link,
3105                 .mr_extra_opc = MDS_REINT_LINK,
3106         },
3107         [REINT_UNLINK] = {
3108                 .mr_handler = &mdt_reint_unlink,
3109                 .mr_extra_opc = MDS_REINT_UNLINK,
3110         },
3111         [REINT_RENAME] = {
3112                 .mr_handler = &mdt_reint_rename,
3113                 .mr_extra_opc = MDS_REINT_RENAME,
3114         },
3115         [REINT_OPEN] = {
3116                 .mr_handler = &mdt_reint_open,
3117                 .mr_extra_opc = MDS_REINT_OPEN,
3118         },
3119         [REINT_SETXATTR] = {
3120                 .mr_handler = &mdt_reint_setxattr,
3121                 .mr_extra_opc = MDS_REINT_SETXATTR,
3122         },
3123         [REINT_RMENTRY] = {
3124                 .mr_handler = &mdt_reint_unlink,
3125                 .mr_extra_opc = MDS_REINT_UNLINK,
3126         },
3127         [REINT_MIGRATE] = {
3128                 .mr_handler = &mdt_reint_migrate,
3129                 .mr_extra_opc = MDS_REINT_RENAME,
3130         },
3131         [REINT_RESYNC] = {
3132                 .mr_handler = &mdt_reint_resync,
3133                 .mr_extra_opc = MDS_REINT_RESYNC,
3134         },
3135 };
3136
3137 int mdt_reint_rec(struct mdt_thread_info *info,
3138                   struct mdt_lock_handle *lhc)
3139 {
3140         const struct mdt_reinter *mr;
3141         int rc;
3142
3143         ENTRY;
3144         if (!(info->mti_rr.rr_opcode < ARRAY_SIZE(mdt_reinters)))
3145                 RETURN(-EPROTO);
3146
3147         mr = &mdt_reinters[info->mti_rr.rr_opcode];
3148         if (mr->mr_handler == NULL)
3149                 RETURN(-EPROTO);
3150
3151         rc = (*mr->mr_handler)(info, lhc);
3152
3153         lprocfs_counter_incr(ptlrpc_req2svc(mdt_info_req(info))->srv_stats,
3154                              PTLRPC_LAST_CNTR + mr->mr_extra_opc);
3155
3156         RETURN(rc);
3157 }