Whamcloud - gitweb
ce0e657b4444c7c197863cf75a12eddc24ccab8f
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mdt/mdt_reint.c
32  *
33  * Lustre Metadata Target (mdt) reintegration routines
34  *
35  * Author: Peter Braam <braam@clusterfs.com>
36  * Author: Andreas Dilger <adilger@clusterfs.com>
37  * Author: Phil Schwan <phil@clusterfs.com>
38  * Author: Huang Hua <huanghua@clusterfs.com>
39  * Author: Yury Umanets <umka@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_MDS
43
44 #include <lprocfs_status.h>
45 #include "mdt_internal.h"
46 #include <lustre_lmv.h>
47 #include <lustre_crypto.h>
48
49 static inline void mdt_reint_init_ma(struct mdt_thread_info *info,
50                                      struct md_attr *ma)
51 {
52         ma->ma_need = MA_INODE;
53         ma->ma_valid = 0;
54 }
55
56 /**
57  * Get version of object by fid.
58  *
59  * Return real version or ENOENT_VERSION if object doesn't exist
60  */
61 static void mdt_obj_version_get(struct mdt_thread_info *info,
62                                 struct mdt_object *o, __u64 *version)
63 {
64         LASSERT(o);
65
66         if (mdt_object_exists(o) && !mdt_object_remote(o) &&
67             !fid_is_obf(mdt_object_fid(o)))
68                 *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
69         else
70                 *version = ENOENT_VERSION;
71         CDEBUG(D_INODE, "FID "DFID" version is %#llx\n",
72                PFID(mdt_object_fid(o)), *version);
73 }
74
75 /**
76  * Check version is correct.
77  *
78  * Should be called only during replay.
79  */
80 static int mdt_version_check(struct ptlrpc_request *req,
81                              __u64 version, int idx)
82 {
83         __u64 *pre_ver = lustre_msg_get_versions(req->rq_reqmsg);
84
85         ENTRY;
86         if (!exp_connect_vbr(req->rq_export))
87                 RETURN(0);
88
89         LASSERT(req_is_replay(req));
90         /** VBR: version is checked always because costs nothing */
91         LASSERT(idx < PTLRPC_NUM_VERSIONS);
92         /** Sanity check for malformed buffers */
93         if (pre_ver == NULL) {
94                 CERROR("No versions in request buffer\n");
95                 spin_lock(&req->rq_export->exp_lock);
96                 req->rq_export->exp_vbr_failed = 1;
97                 spin_unlock(&req->rq_export->exp_lock);
98                 RETURN(-EOVERFLOW);
99         } else if (pre_ver[idx] != version) {
100                 CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n",
101                        pre_ver[idx], version);
102                 spin_lock(&req->rq_export->exp_lock);
103                 req->rq_export->exp_vbr_failed = 1;
104                 spin_unlock(&req->rq_export->exp_lock);
105                 RETURN(-EOVERFLOW);
106         }
107         RETURN(0);
108 }
109
110 /**
111  * Save pre-versions in reply.
112  */
113 static void mdt_version_save(struct ptlrpc_request *req, __u64 version,
114                              int idx)
115 {
116         __u64 *reply_ver;
117
118         if (!exp_connect_vbr(req->rq_export))
119                 return;
120
121         LASSERT(!req_is_replay(req));
122         LASSERT(req->rq_repmsg != NULL);
123         reply_ver = lustre_msg_get_versions(req->rq_repmsg);
124         if (reply_ver)
125                 reply_ver[idx] = version;
126 }
127
128 /**
129  * Save enoent version, it is needed when it is obvious that object doesn't
130  * exist, e.g. child during create.
131  */
132 static void mdt_enoent_version_save(struct mdt_thread_info *info, int idx)
133 {
134         /* save version of file name for replay, it must be ENOENT here */
135         if (!req_is_replay(mdt_info_req(info))) {
136                 info->mti_ver[idx] = ENOENT_VERSION;
137                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
138         }
139 }
140
141 /**
142  * Get version from disk and save in reply buffer.
143  *
144  * Versions are saved in reply only during normal operations not replays.
145  */
146 void mdt_version_get_save(struct mdt_thread_info *info,
147                           struct mdt_object *mto, int idx)
148 {
149         /* don't save versions during replay */
150         if (!req_is_replay(mdt_info_req(info))) {
151                 mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
152                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
153         }
154 }
155
156 /**
157  * Get version from disk and check it, no save in reply.
158  */
159 int mdt_version_get_check(struct mdt_thread_info *info,
160                           struct mdt_object *mto, int idx)
161 {
162         /* only check versions during replay */
163         if (!req_is_replay(mdt_info_req(info)))
164                 return 0;
165
166         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
167         return mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
168 }
169
170 /**
171  * Get version from disk and check if recovery or just save.
172  */
173 int mdt_version_get_check_save(struct mdt_thread_info *info,
174                                struct mdt_object *mto, int idx)
175 {
176         int rc = 0;
177
178         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
179         if (req_is_replay(mdt_info_req(info)))
180                 rc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx],
181                                        idx);
182         else
183                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
184         return rc;
185 }
186
187 /**
188  * Lookup with version checking.
189  *
190  * This checks version of 'name'. Many reint functions uses 'name' for child not
191  * FID, therefore we need to get object by name and check its version.
192  */
193 int mdt_lookup_version_check(struct mdt_thread_info *info,
194                              struct mdt_object *p,
195                              const struct lu_name *lname,
196                              struct lu_fid *fid, int idx)
197 {
198         int rc, vbrc;
199
200         rc = mdo_lookup(info->mti_env, mdt_object_child(p), lname, fid,
201                         &info->mti_spec);
202         /* Check version only during replay */
203         if (!req_is_replay(mdt_info_req(info)))
204                 return rc;
205
206         info->mti_ver[idx] = ENOENT_VERSION;
207         if (rc == 0) {
208                 struct mdt_object *child;
209
210                 child = mdt_object_find(info->mti_env, info->mti_mdt, fid);
211                 if (likely(!IS_ERR(child))) {
212                         mdt_obj_version_get(info, child, &info->mti_ver[idx]);
213                         mdt_object_put(info->mti_env, child);
214                 }
215         }
216         vbrc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
217         return vbrc ? vbrc : rc;
218
219 }
220
221 static int mdt_stripes_unlock(struct mdt_thread_info *mti,
222                               struct mdt_object *obj,
223                               struct ldlm_enqueue_info *einfo,
224                               int decref)
225 {
226         union ldlm_policy_data *policy = &mti->mti_policy;
227         struct mdt_lock_handle *lh = &mti->mti_lh[MDT_LH_LOCAL];
228         struct lustre_handle_array *locks = einfo->ei_cbdata;
229         int i;
230
231         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
232         LASSERT(locks);
233
234         memset(policy, 0, sizeof(*policy));
235         policy->l_inodebits.bits = einfo->ei_inodebits;
236         mdt_lock_reg_init(lh, einfo->ei_mode);
237         for (i = 0; i < locks->ha_count; i++) {
238                 if (test_bit(i, (void *)locks->ha_map))
239                         lh->mlh_rreg_lh = locks->ha_handles[i];
240                 else
241                         lh->mlh_reg_lh = locks->ha_handles[i];
242                 mdt_object_unlock(mti, NULL, lh, decref);
243                 locks->ha_handles[i].cookie = 0ull;
244         }
245
246         return mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
247                                 policy);
248 }
249
250 static inline int mdt_object_striped(struct mdt_thread_info *mti,
251                                      struct mdt_object *obj)
252 {
253         struct lu_device *bottom_dev;
254         struct lu_object *bottom_obj;
255         int rc;
256
257         if (!S_ISDIR(obj->mot_header.loh_attr))
258                 return 0;
259
260         /* getxattr from bottom obj to avoid reading in shard FIDs */
261         bottom_dev = dt2lu_dev(mti->mti_mdt->mdt_bottom);
262         bottom_obj = lu_object_find_slice(mti->mti_env, bottom_dev,
263                                           mdt_object_fid(obj), NULL);
264         if (IS_ERR(bottom_obj))
265                 return PTR_ERR(bottom_obj);
266
267         rc = dt_xattr_get(mti->mti_env, lu2dt(bottom_obj), &LU_BUF_NULL,
268                           XATTR_NAME_LMV);
269         lu_object_put(mti->mti_env, bottom_obj);
270
271         return (rc > 0) ? 1 : (rc == -ENODATA) ? 0 : rc;
272 }
273
274 /**
275  * Lock slave stripes if necessary, the lock handles of slave stripes
276  * will be stored in einfo->ei_cbdata.
277  **/
278 static int mdt_stripes_lock(struct mdt_thread_info *mti, struct mdt_object *obj,
279                             enum ldlm_mode mode, __u64 ibits,
280                             struct ldlm_enqueue_info *einfo)
281 {
282         union ldlm_policy_data *policy = &mti->mti_policy;
283
284         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
285         einfo->ei_type = LDLM_IBITS;
286         einfo->ei_mode = mode;
287         einfo->ei_cb_bl = mdt_remote_blocking_ast;
288         einfo->ei_cb_local_bl = mdt_blocking_ast;
289         einfo->ei_cb_cp = ldlm_completion_ast;
290         einfo->ei_enq_slave = 1;
291         einfo->ei_namespace = mti->mti_mdt->mdt_namespace;
292         einfo->ei_inodebits = ibits;
293         einfo->ei_req_slot = 1;
294         memset(policy, 0, sizeof(*policy));
295         policy->l_inodebits.bits = ibits;
296
297         return mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
298                               policy);
299 }
300
301 /** lock object, and stripes if it's a striped directory
302  *
303  * object should be local, this is called in operations which modify both object
304  * and stripes.
305  *
306  * \param info          struct mdt_thread_info
307  * \param parent        parent object, if it's NULL, find parent by mdo_lookup()
308  * \param child         child object
309  * \param lh            lock handle
310  * \param einfo         struct ldlm_enqueue_info
311  * \param ibits         MDS inode lock bits
312  * \param mode          lock mode
313  * \param cos_incompat  DNE COS incompatible
314  *
315  * \retval              0 on success, -ev on error.
316  */
317 int mdt_object_stripes_lock(struct mdt_thread_info *info,
318                             struct mdt_object *parent,
319                             struct mdt_object *child,
320                             struct mdt_lock_handle *lh,
321                             struct ldlm_enqueue_info *einfo, __u64 ibits,
322                             enum ldlm_mode mode, bool cos_incompat)
323 {
324         int rc;
325
326         ENTRY;
327         /* according to the protocol, child should be local, is request sent to
328          * wrong MDT?
329          */
330         if (mdt_object_remote(child)) {
331                 CERROR("%s: lock target "DFID", but it is on other MDT: rc = %d\n",
332                        mdt_obd_name(info->mti_mdt), PFID(mdt_object_fid(child)),
333                        -EREMOTE);
334                 RETURN(-EREMOTE);
335         }
336
337         memset(einfo, 0, sizeof(*einfo));
338         if (ibits & MDS_INODELOCK_LOOKUP) {
339                 LASSERT(parent);
340                 rc = mdt_object_check_lock(info, parent, child, lh, ibits,
341                                            mode, cos_incompat);
342         } else {
343                 rc = mdt_object_lock(info, child, lh, ibits, mode,
344                                      cos_incompat);
345         }
346         if (rc)
347                 RETURN(rc);
348
349         rc = mdt_object_striped(info, child);
350         if (rc == 0)
351                 return 0;
352
353         if (rc < 0)
354                 goto unlock;
355
356         /* lock stripes for striped directory */
357         rc = mdt_stripes_lock(info, child, lh->mlh_reg_mode, ibits, einfo);
358         if (rc == -EIO && OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME))
359                 rc = 0;
360
361 unlock:
362         if (rc)
363                 mdt_object_unlock(info, child, lh, rc);
364
365         return rc;
366 }
367
368 void mdt_object_stripes_unlock(struct mdt_thread_info *info,
369                               struct mdt_object *obj,
370                               struct mdt_lock_handle *lh,
371                               struct ldlm_enqueue_info *einfo, int decref)
372 {
373         if (einfo->ei_cbdata)
374                 mdt_stripes_unlock(info, obj, einfo, decref);
375         mdt_object_unlock(info, obj, lh, decref);
376 }
377
378 static int mdt_restripe(struct mdt_thread_info *info,
379                         struct mdt_object *parent,
380                         const struct lu_name *lname,
381                         const struct lu_fid *tfid,
382                         struct md_op_spec *spec,
383                         struct md_attr *ma)
384 {
385         struct mdt_device *mdt = info->mti_mdt;
386         struct lu_fid *fid = &info->mti_tmp_fid2;
387         struct ldlm_enqueue_info *einfo = &info->mti_einfo;
388         struct lmv_user_md *lum = spec->u.sp_ea.eadata;
389         struct lu_ucred *uc = mdt_ucred(info);
390         struct lmv_mds_md_v1 *lmv;
391         struct mdt_object *child;
392         struct mdt_lock_handle *lhp;
393         struct mdt_lock_handle *lhc;
394         struct mdt_body *repbody;
395         int rc;
396
397         ENTRY;
398
399         /* we want rbac roles to have precedence over any other
400          * permission or capability checks
401          */
402         if (!mdt->mdt_enable_dir_restripe && !uc->uc_rbac_dne_ops)
403                 RETURN(-EPERM);
404
405         LASSERT(lum);
406         lum->lum_hash_type |= cpu_to_le32(LMV_HASH_FLAG_FIXED);
407
408         rc = mdt_version_get_check_save(info, parent, 0);
409         if (rc)
410                 RETURN(rc);
411
412         lhp = &info->mti_lh[MDT_LH_PARENT];
413         rc = mdt_parent_lock(info, parent, lhp, lname, LCK_PW, true);
414         if (rc)
415                 RETURN(rc);
416
417         rc = mdt_stripe_get(info, parent, ma, XATTR_NAME_LMV);
418         if (rc)
419                 GOTO(unlock_parent, rc);
420
421         if (ma->ma_valid & MA_LMV) {
422                 /* don't allow restripe if parent dir layout is changing */
423                 lmv = &ma->ma_lmv->lmv_md_v1;
424                 if (!lmv_is_sane2(lmv))
425                         GOTO(unlock_parent, rc = -EBADF);
426
427                 if (lmv_is_layout_changing(lmv))
428                         GOTO(unlock_parent, rc = -EBUSY);
429         }
430
431         fid_zero(fid);
432         rc = mdt_lookup_version_check(info, parent, lname, fid, 1);
433         if (rc)
434                 GOTO(unlock_parent, rc);
435
436         child = mdt_object_find(info->mti_env, mdt, fid);
437         if (IS_ERR(child))
438                 GOTO(unlock_parent, rc = PTR_ERR(child));
439
440         if (!mdt_object_exists(child))
441                 GOTO(out_child, rc = -ENOENT);
442
443         if (mdt_object_remote(child)) {
444                 struct mdt_body *repbody;
445
446                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
447                 if (!repbody)
448                         GOTO(out_child, rc = -EPROTO);
449
450                 repbody->mbo_fid1 = *fid;
451                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
452                 GOTO(out_child, rc = -EREMOTE);
453         }
454
455         if (!S_ISDIR(lu_object_attr(&child->mot_obj)))
456                 GOTO(out_child, rc = -ENOTDIR);
457
458         rc = mdt_stripe_get(info, child, ma, XATTR_NAME_LMV);
459         if (rc)
460                 GOTO(out_child, rc);
461
462         /* race with migrate? */
463         if ((ma->ma_valid & MA_LMV) &&
464              lmv_is_migrating(&ma->ma_lmv->lmv_md_v1))
465                 GOTO(out_child, rc = -EBUSY);
466
467         /* lock object */
468         lhc = &info->mti_lh[MDT_LH_CHILD];
469         rc = mdt_object_stripes_lock(info, parent, child, lhc, einfo,
470                                      MDS_INODELOCK_FULL, LCK_PW, true);
471         if (rc)
472                 GOTO(unlock_child, rc);
473
474         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
475         rc = mdt_version_get_check_save(info, child, 1);
476         if (rc)
477                 GOTO(unlock_child, rc);
478
479         spin_lock(&mdt->mdt_restriper.mdr_lock);
480         if (child->mot_restriping) {
481                 /* race? */
482                 spin_unlock(&mdt->mdt_restriper.mdr_lock);
483                 GOTO(unlock_child, rc = -EBUSY);
484         }
485         child->mot_restriping = 1;
486         spin_unlock(&mdt->mdt_restriper.mdr_lock);
487
488         *fid = *tfid;
489         rc = mdt_restripe_internal(info, parent, child, lname, fid, spec, ma);
490         if (rc)
491                 GOTO(restriping_clear, rc);
492
493         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
494         if (!repbody)
495                 GOTO(restriping_clear, rc = -EPROTO);
496
497         mdt_pack_attr2body(info, repbody, &ma->ma_attr, fid);
498         EXIT;
499
500 restriping_clear:
501         child->mot_restriping = 0;
502 unlock_child:
503         mdt_object_stripes_unlock(info, child, lhc, einfo, rc);
504 out_child:
505         mdt_object_put(info->mti_env, child);
506 unlock_parent:
507         mdt_object_unlock(info, parent, lhp, rc);
508
509         return rc;
510 }
511
512 /*
513  * VBR: we save three versions in reply:
514  * 0 - parent. Check that parent version is the same during replay.
515  * 1 - name. Version of 'name' if file exists with the same name or
516  * ENOENT_VERSION, it is needed because file may appear due to missed replays.
517  * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
518  * check.
519  */
520 static int mdt_create(struct mdt_thread_info *info)
521 {
522         struct mdt_device *mdt = info->mti_mdt;
523         struct mdt_object *parent;
524         struct mdt_object *child;
525         struct mdt_lock_handle *lh;
526         struct mdt_body *repbody;
527         struct md_attr *ma = &info->mti_attr;
528         struct mdt_reint_record *rr = &info->mti_rr;
529         struct md_op_spec *spec = &info->mti_spec;
530         struct lu_ucred *uc = mdt_ucred(info);
531         bool restripe = false;
532         int rc;
533
534         ENTRY;
535         DEBUG_REQ(D_INODE, mdt_info_req(info),
536                   "Create ("DNAME"->"DFID") in "DFID,
537                   PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1));
538
539         if (!fid_is_md_operative(rr->rr_fid1))
540                 RETURN(-EPERM);
541
542         /* MDS_OPEN_DEFAULT_LMV means eadata is parent default LMV, which is set
543          * if client maintains inherited default LMV
544          */
545         if (S_ISDIR(ma->ma_attr.la_mode) &&
546             spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0 &&
547             !(spec->sp_cr_flags & MDS_OPEN_DEFAULT_LMV)) {
548                 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
549                 struct obd_export *exp = mdt_info_req(info)->rq_export;
550
551                 /* Only new clients can create remote dir( >= 2.4) and
552                  * striped dir(>= 2.6), old client will return -ENOTSUPP
553                  */
554                 if (!mdt_is_dne_client(exp))
555                         RETURN(-ENOTSUPP);
556
557                 if (le32_to_cpu(lum->lum_stripe_count) > 1) {
558                         if (!mdt_is_striped_client(exp))
559                                 RETURN(-ENOTSUPP);
560
561                         if (!mdt->mdt_enable_striped_dir)
562                                 RETURN(-EPERM);
563                 } else if (!mdt->mdt_enable_remote_dir) {
564                         RETURN(-EPERM);
565                 }
566
567                 if ((!(exp_connect_flags2(exp) & OBD_CONNECT2_CRUSH)) &&
568                     (le32_to_cpu(lum->lum_hash_type) & LMV_HASH_TYPE_MASK) >=
569                     LMV_HASH_TYPE_CRUSH)
570                         RETURN(-EPROTO);
571
572                 /* we want rbac roles to have precedence over any other
573                  * permission or capability checks
574                  */
575                 if (!uc->uc_rbac_dne_ops ||
576                     (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
577                      uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
578                      mdt->mdt_enable_remote_dir_gid != -1))
579                         RETURN(-EPERM);
580
581                 /* restripe if later found dir exists, MDS_OPEN_CREAT means
582                  * this is create only, don't try restripe.
583                  */
584                 if (mdt->mdt_enable_dir_restripe &&
585                     le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT &&
586                     !(spec->sp_cr_flags & MDS_OPEN_CREAT))
587                         restripe = true;
588         }
589
590         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
591
592         parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
593         if (IS_ERR(parent))
594                 RETURN(PTR_ERR(parent));
595
596         if (!mdt_object_exists(parent))
597                 GOTO(put_parent, rc = -ENOENT);
598
599         rc = mdt_check_enc(info, parent);
600         if (rc)
601                 GOTO(put_parent, rc);
602
603         if (!uc->uc_rbac_fscrypt_admin &&
604             parent->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD)
605                 GOTO(put_parent, rc = -EPERM);
606
607         /*
608          * LU-10235: check if name exists locklessly first to avoid massive
609          * lock recalls on existing directories.
610          */
611         rc = mdt_lookup_version_check(info, parent, &rr->rr_name,
612                                       &info->mti_tmp_fid1, 1);
613         if (rc == 0) {
614                 if (!restripe)
615                         GOTO(put_parent, rc = -EEXIST);
616
617                 rc = mdt_restripe(info, parent, &rr->rr_name, rr->rr_fid2, spec,
618                                   ma);
619         }
620
621         /* -ENOENT is expected here */
622         if (rc != -ENOENT)
623                 GOTO(put_parent, rc);
624
625         /* save version of file name for replay, it must be ENOENT here */
626         mdt_enoent_version_save(info, 1);
627
628         CFS_RACE(OBD_FAIL_MDS_CREATE_RACE);
629
630         lh = &info->mti_lh[MDT_LH_PARENT];
631         rc = mdt_parent_lock(info, parent, lh, &rr->rr_name, LCK_PW, false);
632         if (rc)
633                 GOTO(put_parent, rc);
634
635         if (!mdt_object_remote(parent)) {
636                 rc = mdt_version_get_check_save(info, parent, 0);
637                 if (rc)
638                         GOTO(unlock_parent, rc);
639         }
640
641         child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
642         if (unlikely(IS_ERR(child)))
643                 GOTO(unlock_parent, rc = PTR_ERR(child));
644
645         ma->ma_need = MA_INODE;
646         ma->ma_valid = 0;
647
648         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
649                         OBD_FAIL_MDS_REINT_CREATE_WRITE);
650
651         /* Version of child will be updated on disk. */
652         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
653         rc = mdt_version_get_check_save(info, child, 2);
654         if (rc)
655                 GOTO(put_child, rc);
656
657         if (parent->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD ||
658             (rr->rr_name.ln_namelen == strlen(dot_fscrypt_name) &&
659              strncmp(rr->rr_name.ln_name, dot_fscrypt_name,
660                      rr->rr_name.ln_namelen) == 0))
661                 child->mot_obj.lo_header->loh_attr |= LOHA_FSCRYPT_MD;
662
663         /*
664          * Do not perform lookup sanity check. We know that name does
665          * not exist.
666          */
667         info->mti_spec.sp_cr_lookup = 0;
668         if (mdt_object_remote(parent))
669                 info->mti_spec.sp_cr_lookup = 1;
670         info->mti_spec.sp_feat = &dt_directory_features;
671
672         rc = mdo_create(info->mti_env, mdt_object_child(parent), &rr->rr_name,
673                         mdt_object_child(child), &info->mti_spec, ma);
674         if (rc == 0)
675                 rc = mdt_attr_get_complex(info, child, ma);
676
677         if (rc < 0)
678                 GOTO(put_child, rc);
679
680         /*
681          * On DNE, we need to eliminate dependey between 'mkdir a' and
682          * 'mkdir a/b' if b is a striped directory, to achieve this, two
683          * things are done below:
684          * 1. save child and slaves lock.
685          * 2. if the child is a striped directory, relock parent so to
686          *    compare against with COS locks to ensure parent was
687          *    committed to disk.
688          */
689         if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
690                 struct mdt_lock_handle *lhc;
691                 struct ldlm_enqueue_info *einfo = &info->mti_einfo;
692                 bool cos_incompat;
693
694                 rc = mdt_object_striped(info, child);
695                 if (rc < 0)
696                         GOTO(put_child, rc);
697
698                 cos_incompat = rc;
699                 if (cos_incompat) {
700                         if (!mdt_object_remote(parent)) {
701                                 mdt_object_unlock(info, parent, lh, 1);
702                                 rc = mdt_parent_lock(info, parent, lh,
703                                                      &rr->rr_name, LCK_PW,
704                                                      true);
705                                 if (rc)
706                                         GOTO(put_child, rc);
707                         }
708                 }
709
710                 lhc = &info->mti_lh[MDT_LH_CHILD];
711                 rc = mdt_object_stripes_lock(info, parent, child, lhc, einfo,
712                                              MDS_INODELOCK_UPDATE, LCK_PW,
713                                              cos_incompat);
714                 if (rc)
715                         GOTO(put_child, rc);
716
717                 mdt_object_stripes_unlock(info, child, lhc, einfo, rc);
718         }
719
720         /* Return fid & attr to client. */
721         if (ma->ma_valid & MA_INODE)
722                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
723                                    mdt_object_fid(child));
724         EXIT;
725 put_child:
726         mdt_object_put(info->mti_env, child);
727 unlock_parent:
728         mdt_object_unlock(info, parent, lh, rc);
729 put_parent:
730         mdt_object_put(info->mti_env, parent);
731         return rc;
732 }
733
734 static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
735                         struct md_attr *ma)
736 {
737         struct mdt_lock_handle  *lh;
738         int do_vbr = ma->ma_attr.la_valid &
739                         (LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS);
740         __u64 lockpart = MDS_INODELOCK_UPDATE;
741         struct ldlm_enqueue_info *einfo = &info->mti_einfo;
742         bool cos_incompat;
743         int rc;
744
745         ENTRY;
746         rc = mdt_object_striped(info, mo);
747         if (rc < 0)
748                 RETURN(rc);
749         cos_incompat = rc;
750
751         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
752                 lockpart |= MDS_INODELOCK_PERM;
753         /* Clear xattr cache on clients, so the virtual project ID xattr
754          * can get the new project ID
755          */
756         if (ma->ma_attr.la_valid & LA_PROJID)
757                 lockpart |= MDS_INODELOCK_XATTR;
758
759         lh = &info->mti_lh[MDT_LH_PARENT];
760         rc = mdt_object_stripes_lock(info, NULL, mo, lh, einfo, lockpart,
761                                      LCK_PW, cos_incompat);
762         if (rc != 0)
763                 RETURN(rc);
764
765         /* all attrs are packed into mti_attr in unpack_setattr */
766         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
767                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
768
769         /* VBR: update version if attr changed are important for recovery */
770         if (do_vbr) {
771                 /* update on-disk version of changed object */
772                 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mo));
773                 rc = mdt_version_get_check_save(info, mo, 0);
774                 if (rc)
775                         GOTO(out_unlock, rc);
776         }
777
778         /* Ensure constant striping during chown(). See LU-2789. */
779         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
780                 mutex_lock(&mo->mot_lov_mutex);
781
782         /* all attrs are packed into mti_attr in unpack_setattr */
783         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
784
785         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
786                 mutex_unlock(&mo->mot_lov_mutex);
787
788         if (rc != 0)
789                 GOTO(out_unlock, rc);
790         mdt_dom_obj_lvb_update(info->mti_env, mo, NULL, false);
791         EXIT;
792 out_unlock:
793         mdt_object_stripes_unlock(info, mo, lh, einfo, rc);
794         return rc;
795 }
796
797 /**
798  * Check HSM flags and add HS_DIRTY flag if relevant.
799  *
800  * A file could be set dirty only if it has a copy in the backend (HS_EXISTS)
801  * and is not RELEASED.
802  */
803 int mdt_add_dirty_flag(struct mdt_thread_info *info, struct mdt_object *mo,
804                         struct md_attr *ma)
805 {
806         struct lu_ucred *uc = mdt_ucred(info);
807         kernel_cap_t cap_saved;
808         int rc;
809
810         ENTRY;
811         /* If the file was modified, add the dirty flag */
812         ma->ma_need = MA_HSM;
813         rc = mdt_attr_get_complex(info, mo, ma);
814         if (rc) {
815                 CERROR("file attribute read error for "DFID": %d.\n",
816                         PFID(mdt_object_fid(mo)), rc);
817                 RETURN(rc);
818         }
819
820         /* If an up2date copy exists in the backend, add dirty flag */
821         if ((ma->ma_valid & MA_HSM) && (ma->ma_hsm.mh_flags & HS_EXISTS)
822             && !(ma->ma_hsm.mh_flags & (HS_DIRTY|HS_RELEASED))) {
823                 ma->ma_hsm.mh_flags |= HS_DIRTY;
824
825                 /* Bump cap so that closes from non-owner writers can
826                  * set the HSM state to dirty.
827                  */
828                 cap_saved = uc->uc_cap;
829                 cap_raise(uc->uc_cap, CAP_FOWNER);
830                 rc = mdt_hsm_attr_set(info, mo, &ma->ma_hsm);
831                 uc->uc_cap = cap_saved;
832                 if (rc)
833                         CERROR("file attribute change error for "DFID": %d\n",
834                                 PFID(mdt_object_fid(mo)), rc);
835         }
836
837         RETURN(rc);
838 }
839
840 static int mdt_reint_setattr(struct mdt_thread_info *info,
841                              struct mdt_lock_handle *lhc)
842 {
843         struct mdt_device *mdt = info->mti_mdt;
844         struct md_attr *ma = &info->mti_attr;
845         struct mdt_reint_record *rr = &info->mti_rr;
846         struct ptlrpc_request *req = mdt_info_req(info);
847         struct mdt_object *mo;
848         struct mdt_body *repbody;
849         ktime_t kstart = ktime_get();
850         int rc;
851
852         ENTRY;
853         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
854                   (unsigned int)ma->ma_attr.la_valid);
855
856         if (info->mti_dlm_req)
857                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
858
859         CFS_RACE(OBD_FAIL_PTLRPC_RESEND_RACE);
860
861         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
862         mo = mdt_object_find(info->mti_env, mdt, rr->rr_fid1);
863         if (IS_ERR(mo))
864                 GOTO(out, rc = PTR_ERR(mo));
865
866         if (!mdt_object_exists(mo))
867                 GOTO(out_put, rc = -ENOENT);
868
869         if (mdt_object_remote(mo))
870                 GOTO(out_put, rc = -EREMOTE);
871
872         ma->ma_enable_chprojid_gid = mdt->mdt_enable_chprojid_gid;
873         /* revoke lease lock if size is going to be changed */
874         if (unlikely(ma->ma_attr.la_valid & LA_SIZE &&
875                      !(ma->ma_attr_flags & MDS_TRUNC_KEEP_LEASE) &&
876                      atomic_read(&mo->mot_lease_count) > 0)) {
877                 down_read(&mo->mot_open_sem);
878
879                 if (atomic_read(&mo->mot_lease_count) > 0) { /* lease exists */
880                         lhc = &info->mti_lh[MDT_LH_LOCAL];
881                         rc = mdt_object_lock(info, mo, lhc, MDS_INODELOCK_OPEN,
882                                              LCK_CW, false);
883                         if (rc != 0) {
884                                 up_read(&mo->mot_open_sem);
885                                 GOTO(out_put, rc);
886                         }
887
888                         /* revoke lease lock */
889                         mdt_object_unlock(info, mo, lhc, 1);
890                 }
891                 up_read(&mo->mot_open_sem);
892         }
893
894         if (ma->ma_attr.la_valid & LA_SIZE || rr->rr_flags & MRF_OPEN_TRUNC) {
895                 /* Check write access for the O_TRUNC case */
896                 if (mdt_write_read(mo) < 0)
897                         GOTO(out_put, rc = -ETXTBSY);
898
899                 /* LU-10286: compatibility check for FLR.
900                  * Please check the comment in mdt_finish_open() for details
901                  */
902                 if (!exp_connect_flr(info->mti_exp) ||
903                     !exp_connect_overstriping(info->mti_exp)) {
904                         rc = mdt_big_xattr_get(info, mo, XATTR_NAME_LOV);
905                         if (rc < 0 && rc != -ENODATA)
906                                 GOTO(out_put, rc);
907
908                         if (!exp_connect_flr(info->mti_exp)) {
909                                 if (rc > 0 &&
910                                     mdt_lmm_is_flr(info->mti_big_lmm))
911                                         GOTO(out_put, rc = -EOPNOTSUPP);
912                         }
913
914                         if (!exp_connect_overstriping(info->mti_exp)) {
915                                 if (rc > 0 &&
916                                     mdt_lmm_is_overstriping(info->mti_big_lmm))
917                                         GOTO(out_put, rc = -EOPNOTSUPP);
918                         }
919                 }
920
921                 /* For truncate, the file size sent from client
922                  * is believable, but the blocks are incorrect,
923                  * which makes the block size in LSOM attribute
924                  * inconsisent with the real block size.
925                  */
926                 rc = mdt_lsom_update(info, mo, true);
927                 if (rc)
928                         GOTO(out_put, rc);
929         }
930
931         if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
932                 if (ma->ma_valid & MA_LOV)
933                         GOTO(out_put, rc = -EPROTO);
934
935                 /* MDT supports FMD for regular files due to Data-on-MDT */
936                 if (S_ISREG(lu_object_attr(&mo->mot_obj)) &&
937                     ma->ma_attr.la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) {
938                         tgt_fmd_update(info->mti_exp, mdt_object_fid(mo),
939                                        req->rq_xid);
940
941                         if (ma->ma_attr.la_valid & LA_MTIME) {
942                                 rc = mdt_attr_get_pfid(info, mo, &ma->ma_pfid);
943                                 if (!rc)
944                                         ma->ma_valid |= MA_PFID;
945                         }
946                 }
947
948                 rc = mdt_attr_set(info, mo, ma);
949                 if (rc)
950                         GOTO(out_put, rc);
951         } else if ((ma->ma_valid & (MA_LOV | MA_LMV)) &&
952                    (ma->ma_valid & MA_INODE)) {
953                 struct lu_buf *buf = &info->mti_buf;
954                 struct lu_ucred *uc = mdt_ucred(info);
955                 struct mdt_lock_handle *lh;
956                 const char *name;
957
958                 /* reject if either remote or striped dir is disabled */
959                 if (ma->ma_valid & MA_LMV) {
960                         if (!mdt->mdt_enable_remote_dir ||
961                             !mdt->mdt_enable_striped_dir)
962                                 GOTO(out_put, rc = -EPERM);
963
964                         /* we want rbac roles to have precedence over any other
965                          * permission or capability checks
966                          */
967                         if (!uc->uc_rbac_dne_ops ||
968                             (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
969                              uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
970                              mdt->mdt_enable_remote_dir_gid != -1))
971                                 GOTO(out_put, rc = -EPERM);
972                 }
973
974                 if (!S_ISDIR(lu_object_attr(&mo->mot_obj)))
975                         GOTO(out_put, rc = -ENOTDIR);
976
977                 if (ma->ma_attr.la_valid != 0)
978                         GOTO(out_put, rc = -EPROTO);
979
980                 lh = &info->mti_lh[MDT_LH_PARENT];
981                 if (ma->ma_valid & MA_LOV) {
982                         buf->lb_buf = ma->ma_lmm;
983                         buf->lb_len = ma->ma_lmm_size;
984                         name = XATTR_NAME_LOV;
985                         rc = mdt_object_lock(info, mo, lh, MDS_INODELOCK_XATTR,
986                                              LCK_PW, false);
987                 } else {
988                         buf->lb_buf = &ma->ma_lmv->lmv_user_md;
989                         buf->lb_len = ma->ma_lmv_size;
990                         name = XATTR_NAME_DEFAULT_LMV;
991
992                         if (unlikely(fid_is_root(mdt_object_fid(mo)))) {
993                                 rc = mdt_object_lock(info, mo, lh,
994                                                      MDS_INODELOCK_XATTR |
995                                                      MDS_INODELOCK_LOOKUP,
996                                                      LCK_PW, false);
997                         } else {
998                                 struct lu_fid *pfid = &info->mti_tmp_fid1;
999                                 struct lu_name *pname = &info->mti_name;
1000                                 const char dotdot[] = "..";
1001                                 struct mdt_object *pobj;
1002
1003                                 fid_zero(pfid);
1004                                 pname->ln_name = dotdot;
1005                                 pname->ln_namelen = sizeof(dotdot);
1006                                 rc = mdo_lookup(info->mti_env,
1007                                                 mdt_object_child(mo), pname,
1008                                                 pfid, NULL);
1009                                 if (rc)
1010                                         GOTO(out_put, rc);
1011
1012                                 pobj = mdt_object_find(info->mti_env,
1013                                                        info->mti_mdt, pfid);
1014                                 if (IS_ERR(pobj))
1015                                         GOTO(out_put, rc = PTR_ERR(pobj));
1016
1017                                 rc = mdt_object_check_lock(info, pobj, mo, lh,
1018                                                            MDS_INODELOCK_XATTR |
1019                                                            MDS_INODELOCK_LOOKUP,
1020                                                            LCK_PW, false);
1021                                 mdt_object_put(info->mti_env, pobj);
1022                         }
1023                 }
1024
1025                 if (rc != 0)
1026                         GOTO(out_put, rc);
1027
1028                 rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), buf,
1029                                   name, 0);
1030
1031                 mdt_object_unlock(info, mo, lh, rc);
1032                 if (rc)
1033                         GOTO(out_put, rc);
1034         } else {
1035                 GOTO(out_put, rc = -EPROTO);
1036         }
1037
1038         /* If file data is modified, add the dirty flag */
1039         if (ma->ma_attr_flags & MDS_DATA_MODIFIED)
1040                 rc = mdt_add_dirty_flag(info, mo, ma);
1041
1042         ma->ma_need = MA_INODE;
1043         ma->ma_valid = 0;
1044         rc = mdt_attr_get_complex(info, mo, ma);
1045         if (rc != 0)
1046                 GOTO(out_put, rc);
1047
1048         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
1049
1050         EXIT;
1051 out_put:
1052         mdt_object_put(info->mti_env, mo);
1053 out:
1054         if (rc == 0)
1055                 mdt_counter_incr(req, LPROC_MDT_SETATTR,
1056                                  ktime_us_delta(ktime_get(), kstart));
1057
1058         mdt_client_compatibility(info);
1059         return rc;
1060 }
1061
1062 static int mdt_reint_create(struct mdt_thread_info *info,
1063                             struct mdt_lock_handle *lhc)
1064 {
1065         struct ptlrpc_request   *req = mdt_info_req(info);
1066         ktime_t                 kstart = ktime_get();
1067         int                     rc;
1068
1069         ENTRY;
1070         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
1071                 RETURN(err_serious(-ESTALE));
1072
1073         if (info->mti_dlm_req)
1074                 ldlm_request_cancel(mdt_info_req(info),
1075                                     info->mti_dlm_req, 0, LATF_SKIP);
1076
1077         if (!lu_name_is_valid(&info->mti_rr.rr_name))
1078                 RETURN(-EPROTO);
1079
1080         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
1081         case S_IFDIR:
1082         case S_IFREG:
1083         case S_IFLNK:
1084         case S_IFCHR:
1085         case S_IFBLK:
1086         case S_IFIFO:
1087         case S_IFSOCK:
1088                 break;
1089         default:
1090                 CERROR("%s: Unsupported mode %o\n",
1091                        mdt_obd_name(info->mti_mdt),
1092                        info->mti_attr.ma_attr.la_mode);
1093                 RETURN(err_serious(-EOPNOTSUPP));
1094         }
1095
1096         rc = mdt_create(info);
1097         if (rc == 0) {
1098                 if ((info->mti_attr.ma_attr.la_mode & S_IFMT) == S_IFDIR)
1099                         mdt_counter_incr(req, LPROC_MDT_MKDIR,
1100                                          ktime_us_delta(ktime_get(), kstart));
1101                 else
1102                         /* Special file should stay on the same node as parent*/
1103                         mdt_counter_incr(req, LPROC_MDT_MKNOD,
1104                                          ktime_us_delta(ktime_get(), kstart));
1105         }
1106
1107         RETURN(rc);
1108 }
1109
1110 /*
1111  * VBR: save parent version in reply and child version getting by its name.
1112  * Version of child is getting and checking during its lookup. If
1113  */
1114 static int mdt_reint_unlink(struct mdt_thread_info *info,
1115                             struct mdt_lock_handle *lhc)
1116 {
1117         struct mdt_reint_record *rr = &info->mti_rr;
1118         struct ptlrpc_request *req = mdt_info_req(info);
1119         struct md_attr *ma = &info->mti_attr;
1120         struct lu_fid *child_fid = &info->mti_tmp_fid1;
1121         struct mdt_object *mp;
1122         struct mdt_object *mc;
1123         struct mdt_lock_handle *parent_lh;
1124         struct mdt_lock_handle *child_lh;
1125         struct ldlm_enqueue_info *einfo = &info->mti_einfo;
1126         struct lu_ucred *uc  = mdt_ucred(info);
1127         bool cos_incompat = false;
1128         int no_name = 0;
1129         ktime_t kstart = ktime_get();
1130         int rc;
1131
1132         ENTRY;
1133         DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1),
1134                   PNAME(&rr->rr_name));
1135
1136         if (info->mti_dlm_req)
1137                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1138
1139         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
1140                 RETURN(err_serious(-ENOENT));
1141
1142         if (!fid_is_md_operative(rr->rr_fid1))
1143                 RETURN(-EPERM);
1144
1145         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1146         if (IS_ERR(mp))
1147                 RETURN(PTR_ERR(mp));
1148
1149         if (mdt_object_remote(mp)) {
1150                 cos_incompat = true;
1151         } else {
1152                 rc = mdt_version_get_check_save(info, mp, 0);
1153                 if (rc)
1154                         GOTO(put_parent, rc);
1155         }
1156
1157         if (!uc->uc_rbac_fscrypt_admin &&
1158             mp->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD)
1159                 GOTO(put_parent, rc = -EPERM);
1160
1161         CFS_RACE(OBD_FAIL_MDS_REINT_OPEN);
1162         CFS_RACE(OBD_FAIL_MDS_REINT_OPEN2);
1163 relock:
1164         parent_lh = &info->mti_lh[MDT_LH_PARENT];
1165         rc = mdt_parent_lock(info, mp, parent_lh, &rr->rr_name, LCK_PW,
1166                              cos_incompat);
1167         if (rc != 0)
1168                 GOTO(put_parent, rc);
1169
1170         if (info->mti_spec.sp_rm_entry) {
1171                 if (!mdt_is_dne_client(req->rq_export))
1172                         /* Return -ENOTSUPP for old client */
1173                         GOTO(unlock_parent, rc = -ENOTSUPP);
1174
1175                 if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN))
1176                         GOTO(unlock_parent, rc = -EPERM);
1177
1178                 ma->ma_need = MA_INODE;
1179                 ma->ma_valid = 0;
1180                 rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1181                                 NULL, &rr->rr_name, ma, no_name);
1182                 GOTO(unlock_parent, rc);
1183         }
1184
1185         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID) {
1186                 *child_fid = *rr->rr_fid2;
1187         } else {
1188                 /* lookup child object along with version checking */
1189                 fid_zero(child_fid);
1190                 rc = mdt_lookup_version_check(info, mp, &rr->rr_name, child_fid,
1191                                               1);
1192                 if (rc != 0) {
1193                         /* Name might not be able to find during resend of
1194                          * remote unlink, considering following case.
1195                          * dir_A is a remote directory, the name entry of
1196                          * dir_A is on MDT0, the directory is on MDT1,
1197                          *
1198                          * 1. client sends unlink req to MDT1.
1199                          * 2. MDT1 sends name delete update to MDT0.
1200                          * 3. name entry is being deleted in MDT0 synchronously.
1201                          * 4. MDT1 is restarted.
1202                          * 5. client resends unlink req to MDT1. So it can not
1203                          *    find the name entry on MDT0 anymore.
1204                          * In this case, MDT1 only needs to destory the local
1205                          * directory.
1206                          */
1207                         if (mdt_object_remote(mp) && rc == -ENOENT &&
1208                             !fid_is_zero(rr->rr_fid2) &&
1209                             lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
1210                                 no_name = 1;
1211                                 *child_fid = *rr->rr_fid2;
1212                         } else {
1213                                 GOTO(unlock_parent, rc);
1214                         }
1215                 }
1216         }
1217
1218         if (!fid_is_md_operative(child_fid))
1219                 GOTO(unlock_parent, rc = -EPERM);
1220
1221         /* We will lock the child regardless it is local or remote. No harm. */
1222         mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
1223         if (IS_ERR(mc))
1224                 GOTO(unlock_parent, rc = PTR_ERR(mc));
1225
1226         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID) {
1227                 /* In this case, child fid is embedded in the request, and we do
1228                  * not have a proper name as rr_name contains an encoded
1229                  * hash. So find name that matches provided hash.
1230                  */
1231                 if (!find_name_matching_hash(info, &rr->rr_name,
1232                                              NULL, mc))
1233                         GOTO(put_child, rc = -ENOENT);
1234         }
1235
1236         if (!cos_incompat) {
1237                 rc = mdt_object_striped(info, mc);
1238                 if (rc < 0)
1239                         GOTO(put_child, rc);
1240
1241                 cos_incompat = rc;
1242                 if (cos_incompat) {
1243                         mdt_object_put(info->mti_env, mc);
1244                         mdt_object_unlock(info, mp, parent_lh, -EAGAIN);
1245                         goto relock;
1246                 }
1247         }
1248
1249         child_lh = &info->mti_lh[MDT_LH_CHILD];
1250         if (mdt_object_remote(mc)) {
1251                 struct mdt_body  *repbody;
1252
1253                 if (!fid_is_zero(rr->rr_fid2)) {
1254                         CDEBUG(D_INFO, "%s: name "DNAME" cannot find "DFID"\n",
1255                                mdt_obd_name(info->mti_mdt),
1256                                PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1257                         GOTO(put_child, rc = -ENOENT);
1258                 }
1259                 CDEBUG(D_INFO, "%s: name "DNAME": "DFID" is on another MDT\n",
1260                        mdt_obd_name(info->mti_mdt),
1261                        PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1262
1263                 if (!mdt_is_dne_client(req->rq_export))
1264                         /* Return -ENOTSUPP for old client */
1265                         GOTO(put_child, rc = -ENOTSUPP);
1266
1267                 /* Revoke the LOOKUP lock of the remote object granted by
1268                  * this MDT. Since the unlink will happen on another MDT,
1269                  * it will release the LOOKUP lock right away. Then What
1270                  * would happen if another client try to grab the LOOKUP
1271                  * lock at the same time with unlink XXX
1272                  */
1273                 rc = mdt_object_lookup_lock(info, NULL, mc, child_lh, LCK_EX,
1274                                             false);
1275                 if (rc)
1276                         GOTO(put_child, rc);
1277
1278                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1279                 LASSERT(repbody != NULL);
1280                 repbody->mbo_fid1 = *mdt_object_fid(mc);
1281                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
1282                 GOTO(unlock_child, rc = -EREMOTE);
1283         }
1284         /* We used to acquire MDS_INODELOCK_FULL here but we can't do
1285          * this now because a running HSM restore on the child (unlink
1286          * victim) will hold the layout lock. See LU-4002.
1287          */
1288         rc = mdt_object_stripes_lock(info, mp, mc, child_lh, einfo,
1289                                      MDS_INODELOCK_LOOKUP |
1290                                      MDS_INODELOCK_UPDATE,
1291                                      LCK_EX, cos_incompat);
1292         if (rc != 0)
1293                 GOTO(put_child, rc);
1294
1295         /*
1296          * Now we can only make sure we need MA_INODE, in mdd layer, will check
1297          * whether need MA_LOV and MA_COOKIE.
1298          */
1299         ma->ma_need = MA_INODE;
1300         ma->ma_valid = 0;
1301
1302         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1303                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
1304         /* save version when object is locked */
1305         mdt_version_get_save(info, mc, 1);
1306
1307         mutex_lock(&mc->mot_lov_mutex);
1308
1309         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1310                         mdt_object_child(mc), &rr->rr_name, ma, no_name);
1311
1312         mutex_unlock(&mc->mot_lov_mutex);
1313         if (rc != 0)
1314                 GOTO(unlock_child, rc);
1315
1316         if (!lu_object_is_dying(&mc->mot_header)) {
1317                 rc = mdt_attr_get_complex(info, mc, ma);
1318                 if (rc)
1319                         GOTO(out_stat, rc);
1320         } else if (mdt_dom_check_for_discard(info, mc)) {
1321                 mdt_dom_discard_data(info, mc);
1322         }
1323         mdt_handle_last_unlink(info, mc, ma);
1324
1325 out_stat:
1326         if (ma->ma_valid & MA_INODE) {
1327                 switch (ma->ma_attr.la_mode & S_IFMT) {
1328                 case S_IFDIR:
1329                         mdt_counter_incr(req, LPROC_MDT_RMDIR,
1330                                          ktime_us_delta(ktime_get(), kstart));
1331                         break;
1332                 case S_IFREG:
1333                 case S_IFLNK:
1334                 case S_IFCHR:
1335                 case S_IFBLK:
1336                 case S_IFIFO:
1337                 case S_IFSOCK:
1338                         mdt_counter_incr(req, LPROC_MDT_UNLINK,
1339                                          ktime_us_delta(ktime_get(), kstart));
1340                         break;
1341                 default:
1342                         LASSERTF(0, "bad file type %o unlinking\n",
1343                                 ma->ma_attr.la_mode);
1344                 }
1345         }
1346
1347         EXIT;
1348
1349 unlock_child:
1350         mdt_object_stripes_unlock(info, mc, child_lh, einfo, rc);
1351 put_child:
1352         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID &&
1353             info->mti_big_buf.lb_buf)
1354                 lu_buf_free(&info->mti_big_buf);
1355         mdt_object_put(info->mti_env, mc);
1356 unlock_parent:
1357         mdt_object_unlock(info, mp, parent_lh, rc);
1358 put_parent:
1359         mdt_object_put(info->mti_env, mp);
1360         CFS_RACE_WAKEUP(OBD_FAIL_OBD_ZERO_NLINK_RACE);
1361         return rc;
1362 }
1363
1364 /*
1365  * VBR: save versions in reply: 0 - parent; 1 - child by fid; 2 - target by
1366  * name.
1367  */
1368 static int mdt_reint_link(struct mdt_thread_info *info,
1369                           struct mdt_lock_handle *lhc)
1370 {
1371         struct mdt_reint_record *rr = &info->mti_rr;
1372         struct ptlrpc_request   *req = mdt_info_req(info);
1373         struct md_attr          *ma = &info->mti_attr;
1374         struct mdt_object       *ms;
1375         struct mdt_object       *mp;
1376         struct mdt_lock_handle  *lhs;
1377         struct mdt_lock_handle  *lhp;
1378         ktime_t kstart = ktime_get();
1379         bool cos_incompat;
1380         int rc;
1381
1382         ENTRY;
1383         DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/"DNAME,
1384                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), PNAME(&rr->rr_name));
1385
1386         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
1387                 RETURN(err_serious(-ENOENT));
1388
1389         if (CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_RESEND_RACE) ||
1390             CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND)) {
1391                 req->rq_no_reply = 1;
1392                 RETURN(err_serious(-ENOENT));
1393         }
1394
1395         if (info->mti_dlm_req)
1396                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1397
1398         /* Invalid case so return error immediately instead of
1399          * processing it
1400          */
1401         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
1402                 RETURN(-EPERM);
1403
1404         if (!fid_is_md_operative(rr->rr_fid1) ||
1405             !fid_is_md_operative(rr->rr_fid2))
1406                 RETURN(-EPERM);
1407
1408         /* step 1: find target parent dir */
1409         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
1410         if (IS_ERR(mp))
1411                 RETURN(PTR_ERR(mp));
1412
1413         rc = mdt_version_get_check_save(info, mp, 0);
1414         if (rc)
1415                 GOTO(put_parent, rc);
1416
1417         rc = mdt_check_enc(info, mp);
1418         if (rc)
1419                 GOTO(put_parent, rc);
1420
1421         /* step 2: find source */
1422         ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1423         if (IS_ERR(ms))
1424                 GOTO(put_parent, rc = PTR_ERR(ms));
1425
1426         if (!mdt_object_exists(ms)) {
1427                 CDEBUG(D_INFO, "%s: "DFID" does not exist.\n",
1428                        mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1));
1429                 GOTO(put_source, rc = -ENOENT);
1430         }
1431
1432         cos_incompat = (mdt_object_remote(mp) || mdt_object_remote(ms));
1433
1434         CFS_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
1435
1436         lhp = &info->mti_lh[MDT_LH_PARENT];
1437         rc = mdt_parent_lock(info, mp, lhp, &rr->rr_name, LCK_PW, cos_incompat);
1438         if (rc != 0)
1439                 GOTO(put_source, rc);
1440
1441         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
1442
1443         lhs = &info->mti_lh[MDT_LH_CHILD];
1444         rc = mdt_object_lock(info, ms, lhs,
1445                              MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR, LCK_EX,
1446                              cos_incompat);
1447         if (rc != 0)
1448                 GOTO(unlock_parent, rc);
1449
1450         /* step 3: link it */
1451         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1452                         OBD_FAIL_MDS_REINT_LINK_WRITE);
1453
1454         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(ms));
1455         rc = mdt_version_get_check_save(info, ms, 1);
1456         if (rc)
1457                 GOTO(unlock_source, rc);
1458
1459         /** check target version by name during replay */
1460         rc = mdt_lookup_version_check(info, mp, &rr->rr_name,
1461                                       &info->mti_tmp_fid1, 2);
1462         if (rc != 0 && rc != -ENOENT)
1463                 GOTO(unlock_source, rc);
1464         /* save version of file name for replay, it must be ENOENT here */
1465         if (!req_is_replay(mdt_info_req(info))) {
1466                 if (rc != -ENOENT) {
1467                         CDEBUG(D_INFO, "link target "DNAME" existed!\n",
1468                                PNAME(&rr->rr_name));
1469                         GOTO(unlock_source, rc = -EEXIST);
1470                 }
1471                 info->mti_ver[2] = ENOENT_VERSION;
1472                 mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2);
1473         }
1474
1475         rc = mdo_link(info->mti_env, mdt_object_child(mp),
1476                       mdt_object_child(ms), &rr->rr_name, ma);
1477
1478         if (rc == 0)
1479                 mdt_counter_incr(req, LPROC_MDT_LINK,
1480                                  ktime_us_delta(ktime_get(), kstart));
1481
1482         EXIT;
1483 unlock_source:
1484         mdt_object_unlock(info, ms, lhs, rc);
1485 unlock_parent:
1486         mdt_object_unlock(info, mp, lhp, rc);
1487 put_source:
1488         mdt_object_put(info->mti_env, ms);
1489 put_parent:
1490         mdt_object_put(info->mti_env, mp);
1491         return rc;
1492 }
1493
1494 /**
1495  * Get BFL lock for rename or migrate process.
1496  **/
1497 static int mdt_rename_lock(struct mdt_thread_info *info,
1498                            struct mdt_lock_handle *lh)
1499 {
1500         struct lu_fid *fid = &info->mti_tmp_fid1;
1501         struct mdt_object *obj;
1502         __u64 ibits = MDS_INODELOCK_UPDATE;
1503         int rc;
1504
1505         ENTRY;
1506         lu_root_fid(fid);
1507         obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1508         if (IS_ERR(obj))
1509                 RETURN(PTR_ERR(obj));
1510
1511         mdt_lock_reg_init(lh, LCK_EX);
1512         rc = mdt_object_lock_internal(info, obj, &LUSTRE_BFL_FID, lh,
1513                                       &ibits, 0, false, false);
1514         mdt_object_put(info->mti_env, obj);
1515         RETURN(rc);
1516 }
1517
1518 static void mdt_rename_unlock(struct mdt_thread_info *info,
1519                               struct mdt_lock_handle *lh)
1520 {
1521         ENTRY;
1522         /* Cancel the single rename lock right away */
1523         mdt_object_unlock(info, NULL, lh, 1);
1524         EXIT;
1525 }
1526
1527 static struct mdt_object *mdt_parent_find_check(struct mdt_thread_info *info,
1528                                                 const struct lu_fid *fid,
1529                                                 int idx)
1530 {
1531         struct mdt_object *dir;
1532         int rc;
1533
1534         ENTRY;
1535         dir = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1536         if (IS_ERR(dir))
1537                 RETURN(dir);
1538
1539         /* check early, the real version will be saved after locking */
1540         rc = mdt_version_get_check(info, dir, idx);
1541         if (rc)
1542                 GOTO(out_put, rc);
1543
1544         if (!mdt_object_exists(dir))
1545                 GOTO(out_put, rc = -ENOENT);
1546
1547         if (!S_ISDIR(lu_object_attr(&dir->mot_obj)))
1548                 GOTO(out_put, rc = -ENOTDIR);
1549
1550         RETURN(dir);
1551 out_put:
1552         mdt_object_put(info->mti_env, dir);
1553         return ERR_PTR(rc);
1554 }
1555
1556 /*
1557  * lock rename source object.
1558  *
1559  * Both source and its parent object may be located on remote MDTs, and even on
1560  * different MDTs, which means source object is a remote object on parent.
1561  *
1562  * \retval      0 on success
1563  * \retval      -ev negative errno upon error
1564  */
1565 static int mdt_rename_source_lock(struct mdt_thread_info *info,
1566                                   struct mdt_object *parent,
1567                                   struct mdt_object *child,
1568                                   struct mdt_lock_handle *lh,
1569                                   struct mdt_lock_handle *lh_lookup,
1570                                   __u64 ibits, bool cos_incompat)
1571 {
1572         int rc;
1573
1574         LASSERT(ibits & MDS_INODELOCK_LOOKUP);
1575         /* if @obj is remote object, LOOKUP lock needs to be taken from
1576          * parent MDT.
1577          */
1578         rc = mdt_is_remote_object(info, parent, child);
1579         if (rc < 0)
1580                 return rc;
1581
1582         if (rc == 1) {
1583                 rc = mdt_object_lookup_lock(info, parent, child, lh_lookup,
1584                                             LCK_EX, cos_incompat);
1585                 if (rc)
1586                         return rc;
1587
1588                 ibits &= ~MDS_INODELOCK_LOOKUP;
1589         }
1590
1591         rc = mdt_object_lock(info, child, lh, ibits, LCK_EX, cos_incompat);
1592         if (unlikely(rc && !(ibits & MDS_INODELOCK_LOOKUP)))
1593                 mdt_object_unlock(info, NULL, lh_lookup, rc);
1594
1595         return 0;
1596 }
1597
1598 static void mdt_rename_source_unlock(struct mdt_thread_info *info,
1599                                      struct mdt_object *obj,
1600                                      struct mdt_lock_handle *lh,
1601                                      struct mdt_lock_handle *lh_lookup,
1602                                      int decref)
1603 {
1604         mdt_object_unlock(info, obj, lh, decref);
1605         mdt_object_unlock(info, NULL, lh_lookup, decref);
1606 }
1607
1608 /* migration takes UPDATE lock of link parent, and LOOKUP lock of link */
1609 struct mdt_link_lock {
1610         struct mdt_object *mll_obj;
1611         struct mdt_lock_handle mll_lh;
1612         struct list_head mll_linkage;
1613 };
1614
1615 static inline int mdt_migrate_link_lock_add(struct mdt_thread_info *info,
1616                                             struct mdt_object *o,
1617                                             struct mdt_lock_handle *lh,
1618                                             struct list_head *list)
1619 {
1620         struct mdt_link_lock *mll;
1621
1622         OBD_ALLOC_PTR(mll);
1623         if (mll == NULL)
1624                 return -ENOMEM;
1625
1626         INIT_LIST_HEAD(&mll->mll_linkage);
1627         mdt_object_get(info->mti_env, o);
1628         mll->mll_obj = o;
1629         mll->mll_lh = *lh;
1630         memset(lh, 0, sizeof(*lh));
1631         list_add_tail(&mll->mll_linkage, list);
1632
1633         return 0;
1634 }
1635
1636 static inline void mdt_migrate_link_lock_del(struct mdt_thread_info *info,
1637                                              struct mdt_link_lock *mll,
1638                                              int decref)
1639 {
1640         mdt_object_unlock(info, mll->mll_obj, &mll->mll_lh, decref);
1641         mdt_object_put(info->mti_env, mll->mll_obj);
1642         list_del(&mll->mll_linkage);
1643         OBD_FREE_PTR(mll);
1644 }
1645
1646 static void mdt_migrate_links_unlock(struct mdt_thread_info *info,
1647                                      struct list_head *list, int decref)
1648 {
1649         struct mdt_link_lock *mll;
1650         struct mdt_link_lock *tmp;
1651
1652         list_for_each_entry_safe(mll, tmp, list, mll_linkage)
1653                 mdt_migrate_link_lock_del(info, mll, decref);
1654 }
1655
1656 /* take link parent UPDATE lock.
1657  * \retval      0 \a lnkp is already locked, no lock taken.
1658  *              1 lock taken
1659  *              -ev negative errno.
1660  */
1661 static int mdt_migrate_link_parent_lock(struct mdt_thread_info *info,
1662                                         struct mdt_object *lnkp,
1663                                         struct list_head *update_locks,
1664                                         bool *blocked)
1665 {
1666         const struct lu_fid *fid = mdt_object_fid(lnkp);
1667         struct mdt_lock_handle *lhl = &info->mti_lh[MDT_LH_LOCAL];
1668         struct mdt_link_lock *entry;
1669         __u64 ibits = 0;
1670         int rc;
1671
1672         ENTRY;
1673
1674         /* check if it's already locked */
1675         list_for_each_entry(entry, update_locks, mll_linkage) {
1676                 if (lu_fid_eq(mdt_object_fid(entry->mll_obj), fid)) {
1677                         CDEBUG(D_INFO, "skip "DFID" lock\n", PFID(fid));
1678                         RETURN(0);
1679                 }
1680         }
1681
1682         /* link parent UPDATE lock */
1683         CDEBUG(D_INFO, "lock "DFID"\n", PFID(fid));
1684
1685         if (*blocked) {
1686                 /* revoke lock instead of take in *blocked* mode */
1687                 rc = mdt_object_lock(info, lnkp, lhl, MDS_INODELOCK_UPDATE,
1688                                      LCK_PW, true);
1689                 if (rc)
1690                         RETURN(rc);
1691
1692                 if (mdt_object_remote(lnkp)) {
1693                         struct ldlm_lock *lock;
1694
1695                         /*
1696                          * for remote object, set lock cb_atomic, so lock can be
1697                          * released in blocking_ast() immediately, then the next
1698                          * lock_try will have better chance of success.
1699                          */
1700                         lock = ldlm_handle2lock(&lhl->mlh_rreg_lh);
1701                         LASSERT(lock != NULL);
1702                         lock_res_and_lock(lock);
1703                         ldlm_set_atomic_cb(lock);
1704                         unlock_res_and_lock(lock);
1705                         LDLM_LOCK_PUT(lock);
1706                 }
1707
1708                 mdt_object_unlock(info, lnkp, lhl, 1);
1709                 RETURN(0);
1710         }
1711
1712         /*
1713          * we can't follow parent-child lock order like other MD
1714          * operations, use lock_try here to avoid deadlock, if the lock
1715          * cannot be taken, drop all locks taken, revoke the blocked
1716          * one, and continue processing the remaining entries, and in
1717          * the end of the loop restart from beginning.
1718          *
1719          * don't lock with PDO mode in case two links are under the same
1720          * parent and their hash values are different.
1721          */
1722         rc = mdt_object_lock_try(info, lnkp, lhl, &ibits, MDS_INODELOCK_UPDATE,
1723                                  LCK_PW, true);
1724         if (rc < 0)
1725                 RETURN(rc);
1726
1727         if (!(ibits & MDS_INODELOCK_UPDATE)) {
1728                 CDEBUG(D_INFO, "busy lock on "DFID"\n", PFID(fid));
1729                 *blocked = true;
1730                 RETURN(-EAGAIN);
1731         }
1732
1733         rc = mdt_migrate_link_lock_add(info, lnkp, lhl, update_locks);
1734         if (rc) {
1735                 mdt_object_unlock(info, lnkp, lhl, 1);
1736                 RETURN(rc);
1737         }
1738
1739         RETURN(1);
1740 }
1741
1742 /* take link LOOKUP lock.
1743  * \retval      0 \a lnkp is already locked, no lock taken.
1744  *              1 lock taken.
1745  *              -ev negative errno.
1746  */
1747 static int mdt_migrate_link_lock(struct mdt_thread_info *info,
1748                                  struct mdt_object *lnkp,
1749                                  struct mdt_object *spobj,
1750                                  struct mdt_object *obj,
1751                                  struct list_head *lookup_locks)
1752 {
1753         const struct lu_fid *fid = mdt_object_fid(lnkp);
1754         struct mdt_lock_handle *lhl = &info->mti_lh[MDT_LH_LOCAL];
1755         struct mdt_link_lock *entry;
1756         int rc;
1757
1758         ENTRY;
1759
1760         /* check if it's already locked by source */
1761         rc = mdt_fids_different_target(info, fid, mdt_object_fid(spobj));
1762         if (rc <= 0) {
1763                 CDEBUG(D_INFO, "skip lookup lock on source parent "DFID"\n",
1764                        PFID(fid));
1765                 RETURN(rc);
1766         }
1767
1768         /* check if it's already locked by other links */
1769         list_for_each_entry(entry, lookup_locks, mll_linkage) {
1770                 rc = mdt_fids_different_target(info, fid,
1771                                                mdt_object_fid(entry->mll_obj));
1772                 if (rc <= 0) {
1773                         CDEBUG(D_INFO, "skip lookup lock on parent "DFID"\n",
1774                                PFID(fid));
1775                         RETURN(rc);
1776                 }
1777         }
1778
1779         rc = mdt_object_lookup_lock(info, lnkp, obj, lhl, LCK_EX, true);
1780         if (rc)
1781                 RETURN(rc);
1782
1783         /* don't take local LOOKUP lock, because later we will lock other ibits
1784          * of sobj (which is on local MDT), and lock the same object twice may
1785          * deadlock, just revoke this lock.
1786          */
1787         if (!mdt_object_remote(lnkp))
1788                 GOTO(unlock, rc = 0);
1789
1790         rc = mdt_migrate_link_lock_add(info, lnkp, lhl, lookup_locks);
1791         if (rc)
1792                 GOTO(unlock, rc);
1793
1794         RETURN(1);
1795 unlock:
1796         mdt_object_unlock(info, lnkp, lhl, 1);
1797         return rc;
1798 }
1799
1800 /*
1801  * take UPDATE lock of link parents and LOOKUP lock of links, also check whether
1802  * total local lock count exceeds RS_MAX_LOCKS.
1803  *
1804  * \retval      0 on success, and locks can be saved in ptlrpc_reply_stat
1805  * \retval      1 on success, but total lock count may exceed RS_MAX_LOCKS
1806  * \retval      -ev negative errno upon error
1807  */
1808 static int mdt_migrate_links_lock(struct mdt_thread_info *info,
1809                                   struct mdt_object *spobj,
1810                                   struct mdt_object *tpobj,
1811                                   struct mdt_object *obj,
1812                                   struct mdt_lock_handle *lhsp,
1813                                   struct mdt_lock_handle *lhtp,
1814                                   struct list_head *link_locks)
1815 {
1816         struct mdt_device *mdt = info->mti_mdt;
1817         struct lu_buf *buf = &info->mti_big_buf;
1818         struct lu_name *lname = &info->mti_name;
1819         struct linkea_data ldata = { NULL };
1820         int local_lock_cnt = 0;
1821         bool blocked = false;
1822         bool saved;
1823         struct mdt_object *lnkp;
1824         struct lu_fid fid;
1825         LIST_HEAD(update_locks);
1826         LIST_HEAD(lookup_locks);
1827         int rc;
1828
1829         ENTRY;
1830         if (S_ISDIR(lu_object_attr(&obj->mot_obj)))
1831                 RETURN(0);
1832
1833         buf = lu_buf_check_and_alloc(buf, MAX_LINKEA_SIZE);
1834         if (buf->lb_buf == NULL)
1835                 RETURN(-ENOMEM);
1836
1837         ldata.ld_buf = buf;
1838         rc = mdt_links_read(info, obj, &ldata);
1839         if (rc) {
1840                 if (rc == -ENOENT || rc == -ENODATA)
1841                         rc = 0;
1842                 RETURN(rc);
1843         }
1844
1845         for (linkea_first_entry(&ldata); ldata.ld_lee && !rc;
1846              linkea_next_entry(&ldata)) {
1847                 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, lname,
1848                                     &fid);
1849
1850                 /* check if link parent is source parent too */
1851                 if (lu_fid_eq(mdt_object_fid(spobj), &fid)) {
1852                         CDEBUG(D_INFO,
1853                                "skip lock on source parent "DFID"/"DNAME"\n",
1854                                PFID(&fid), PNAME(lname));
1855                         continue;
1856                 }
1857
1858                 /* check if link parent is target parent too */
1859                 if (tpobj != spobj && lu_fid_eq(mdt_object_fid(tpobj), &fid)) {
1860                         CDEBUG(D_INFO,
1861                                "skip lock on target parent "DFID"/"DNAME"\n",
1862                                PFID(&fid), PNAME(lname));
1863                         continue;
1864                 }
1865
1866                 lnkp = mdt_object_find(info->mti_env, mdt, &fid);
1867                 if (IS_ERR(lnkp)) {
1868                         CWARN("%s: cannot find obj "DFID": %ld\n",
1869                               mdt_obd_name(mdt), PFID(&fid), PTR_ERR(lnkp));
1870                         continue;
1871                 }
1872
1873                 if (!mdt_object_exists(lnkp)) {
1874                         CDEBUG(D_INFO, DFID" doesn't exist, skip "DNAME"\n",
1875                                PFID(&fid), PNAME(lname));
1876                         mdt_object_put(info->mti_env, lnkp);
1877                         continue;
1878                 }
1879 relock:
1880                 saved = blocked;
1881                 rc = mdt_migrate_link_parent_lock(info, lnkp, &update_locks,
1882                                                   &blocked);
1883                 if (!saved && blocked) {
1884                         /* unlock all locks taken to avoid deadlock */
1885                         mdt_migrate_links_unlock(info, &update_locks, 1);
1886                         mdt_object_unlock(info, spobj, lhsp, 1);
1887                         if (tpobj != spobj)
1888                                 mdt_object_unlock(info, tpobj, lhtp, 1);
1889                         goto relock;
1890                 }
1891                 if (rc < 0) {
1892                         mdt_object_put(info->mti_env, lnkp);
1893                         GOTO(out, rc);
1894                 }
1895
1896                 if (rc == 1 && !mdt_object_remote(lnkp))
1897                         local_lock_cnt++;
1898
1899                 rc = mdt_migrate_link_lock(info, lnkp, spobj, obj,
1900                                            &lookup_locks);
1901                 if (rc < 0) {
1902                         mdt_object_put(info->mti_env, lnkp);
1903                         GOTO(out, rc);
1904                 }
1905                 if (rc == 1 && !mdt_object_remote(lnkp))
1906                         local_lock_cnt++;
1907                 mdt_object_put(info->mti_env, lnkp);
1908         }
1909
1910         if (blocked)
1911                 GOTO(out, rc = -EBUSY);
1912
1913         EXIT;
1914 out:
1915         list_splice(&update_locks, link_locks);
1916         list_splice(&lookup_locks, link_locks);
1917         if (rc < 0) {
1918                 mdt_migrate_links_unlock(info, link_locks, rc);
1919         } else if (local_lock_cnt > RS_MAX_LOCKS - 5) {
1920                 /*
1921                  * parent may have 3 local objects: master object and 2 stripes
1922                  * (if it's being migrated too); source may have 1 local objects
1923                  * as regular file; target has 1 local object.
1924                  * Note, source may have 2 local locks if it is directory but it
1925                  * can't have hardlinks, so it is not considered here.
1926                  */
1927                 CDEBUG(D_INFO, "Too many local locks (%d), migrate in sync mode\n",
1928                        local_lock_cnt);
1929                 rc = 1;
1930         }
1931         return rc;
1932 }
1933
1934 /*
1935  * lookup source by name, if parent is striped directory, we need to find the
1936  * corresponding stripe where source is located, and then lookup there.
1937  *
1938  * besides, if parent is migrating too, and file is already in target stripe,
1939  * this should be a redo of 'lfs migrate' on client side.
1940  *
1941  * \retval 1 tpobj stripe index is less than spobj stripe index
1942  * \retval 0 tpobj stripe index is larger than or equal to spobj stripe index
1943  * \retval -ev negative errno upon error
1944  */
1945 static int mdt_migrate_lookup(struct mdt_thread_info *info,
1946                               struct mdt_object *pobj,
1947                               const struct md_attr *ma,
1948                               const struct lu_name *lname,
1949                               struct mdt_object **spobj,
1950                               struct mdt_object **tpobj,
1951                               struct mdt_object **sobj)
1952 {
1953         const struct lu_env *env = info->mti_env;
1954         struct lu_fid *fid = &info->mti_tmp_fid1;
1955         int spindex = -1;
1956         int tpindex = -1;
1957         int rc;
1958
1959         if (ma->ma_valid & MA_LMV) {
1960                 /* if parent is striped, lookup on corresponding stripe */
1961                 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1962                 struct lu_fid *fid2 = &info->mti_tmp_fid2;
1963
1964                 if (!lmv_is_sane(lmv))
1965                         return -EBADF;
1966
1967                 spindex = lmv_name_to_stripe_index_old(lmv, lname->ln_name,
1968                                                        lname->ln_namelen);
1969                 if (spindex < 0)
1970                         return spindex;
1971
1972                 fid_le_to_cpu(fid2, &lmv->lmv_stripe_fids[spindex]);
1973
1974                 *spobj = mdt_object_find(env, info->mti_mdt, fid2);
1975                 if (IS_ERR(*spobj)) {
1976                         rc = PTR_ERR(*spobj);
1977                         *spobj = NULL;
1978                         return rc;
1979                 }
1980
1981                 if (!mdt_object_exists(*spobj))
1982                         GOTO(spobj_put, rc = -ENOENT);
1983
1984                 fid_zero(fid);
1985                 rc = mdo_lookup(env, mdt_object_child(*spobj), lname, fid,
1986                                 &info->mti_spec);
1987                 if ((rc == -ENOENT || rc == 0) && lmv_is_layout_changing(lmv)) {
1988                         /* fail check here to let top dir migration succeed. */
1989                         if (OBD_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_ENTRIES, 0))
1990                                 GOTO(spobj_put, rc = -EIO);
1991
1992                         /*
1993                          * if parent layout is changeing, and lookup child
1994                          * failed on source stripe, lookup again on target
1995                          * stripe, if it exists, it means previous migration
1996                          * was interrupted, and current file was migrated
1997                          * already.
1998                          */
1999                         tpindex = lmv_name_to_stripe_index(lmv, lname->ln_name,
2000                                                            lname->ln_namelen);
2001                         if (tpindex < 0)
2002                                 GOTO(spobj_put, rc = tpindex);
2003
2004                         fid_le_to_cpu(fid2, &lmv->lmv_stripe_fids[tpindex]);
2005
2006                         *tpobj = mdt_object_find(env, info->mti_mdt, fid2);
2007                         if (IS_ERR(*tpobj)) {
2008                                 rc = PTR_ERR(*tpobj);
2009                                 *tpobj = NULL;
2010                                 GOTO(spobj_put, rc);
2011                         }
2012
2013                         if (!mdt_object_exists(*tpobj))
2014                                 GOTO(tpobj_put, rc = -ENOENT);
2015
2016                         if (rc == -ENOENT) {
2017                                 fid_zero(fid);
2018                                 rc = mdo_lookup(env, mdt_object_child(*tpobj),
2019                                                 lname, fid, &info->mti_spec);
2020                                 GOTO(tpobj_put, rc = rc ?: -EALREADY);
2021                         }
2022                 } else if (rc) {
2023                         GOTO(spobj_put, rc);
2024                 } else {
2025                         *tpobj = *spobj;
2026                         tpindex = spindex;
2027                         mdt_object_get(env, *tpobj);
2028                 }
2029         } else {
2030                 fid_zero(fid);
2031                 rc = mdo_lookup(env, mdt_object_child(pobj), lname, fid,
2032                                 &info->mti_spec);
2033                 if (rc)
2034                         return rc;
2035
2036                 *spobj = pobj;
2037                 *tpobj = pobj;
2038                 mdt_object_get(env, pobj);
2039                 mdt_object_get(env, pobj);
2040         }
2041
2042         *sobj = mdt_object_find(env, info->mti_mdt, fid);
2043         if (IS_ERR(*sobj)) {
2044                 rc = PTR_ERR(*sobj);
2045                 *sobj = NULL;
2046                 GOTO(tpobj_put, rc);
2047         }
2048
2049         if (!mdt_object_exists(*sobj))
2050                 GOTO(sobj_put, rc = -ENOENT);
2051
2052         return (tpindex < spindex);
2053
2054 sobj_put:
2055         mdt_object_put(env, *sobj);
2056         *sobj = NULL;
2057 tpobj_put:
2058         mdt_object_put(env, *tpobj);
2059         *tpobj = NULL;
2060 spobj_put:
2061         mdt_object_put(env, *spobj);
2062         *spobj = NULL;
2063
2064         return rc;
2065 }
2066
2067 /* end lease and close file for regular file */
2068 static int mdd_migrate_close(struct mdt_thread_info *info,
2069                              struct mdt_object *obj)
2070 {
2071         struct close_data *data;
2072         struct mdt_body *repbody;
2073         struct ldlm_lock *lease;
2074         int rc;
2075         int rc2;
2076
2077         rc = -EPROTO;
2078         if (!req_capsule_field_present(info->mti_pill, &RMF_MDT_EPOCH,
2079                                       RCL_CLIENT) ||
2080             !req_capsule_field_present(info->mti_pill, &RMF_CLOSE_DATA,
2081                                       RCL_CLIENT))
2082                 goto close;
2083
2084         data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
2085         if (!data)
2086                 goto close;
2087
2088         rc = -ESTALE;
2089         lease = ldlm_handle2lock(&data->cd_handle);
2090         if (!lease)
2091                 goto close;
2092
2093         /* check if the lease was already canceled */
2094         lock_res_and_lock(lease);
2095         rc = ldlm_is_cancel(lease);
2096         unlock_res_and_lock(lease);
2097
2098         if (rc) {
2099                 rc = -EAGAIN;
2100                 LDLM_DEBUG(lease, DFID" lease broken",
2101                            PFID(mdt_object_fid(obj)));
2102         }
2103
2104         /*
2105          * cancel server side lease, client side counterpart should have been
2106          * cancelled, it's okay to cancel it now as we've held mot_open_sem.
2107          */
2108         ldlm_lock_cancel(lease);
2109         ldlm_reprocess_all(lease->l_resource,
2110                            lease->l_policy_data.l_inodebits.bits);
2111         LDLM_LOCK_PUT(lease);
2112
2113 close:
2114         rc2 = mdt_close_internal(info, mdt_info_req(info), NULL);
2115         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2116         repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
2117
2118         return rc ?: rc2;
2119 }
2120
2121 /* LFSCK used to clear hash type and MIGRATION flag upon migration failure */
2122 static inline bool lmv_is_failed_migration(const struct lmv_mds_md_v1 *lmv)
2123 {
2124         return le32_to_cpu(lmv->lmv_hash_type) ==
2125                 (LMV_HASH_TYPE_UNKNOWN | LMV_HASH_FLAG_BAD_TYPE) &&
2126                lmv_is_known_hash_type(le32_to_cpu(lmv->lmv_migrate_hash)) &&
2127                le32_to_cpu(lmv->lmv_migrate_offset) > 0 &&
2128                le32_to_cpu(lmv->lmv_migrate_offset) <
2129                 le32_to_cpu(lmv->lmv_stripe_count);
2130 }
2131
2132 /*
2133  * migrate file in below steps:
2134  *  1. lock source and target stripes
2135  *  2. lookup source by name
2136  *  3. lock parents of source links if source is not directory
2137  *  4. reject if source is in HSM
2138  *  5. take source open_sem and close file if source is regular file
2139  *  6. lock source, and its stripes if it's directory
2140  *  7. migrate file
2141  *  8. lock target so subsequent change to it can trigger COS
2142  *  9. unlock above locks
2143  * 10. sync device if source has too many links
2144  */
2145 int mdt_reint_migrate(struct mdt_thread_info *info,
2146                       struct mdt_lock_handle *unused)
2147 {
2148         const struct lu_env *env = info->mti_env;
2149         struct mdt_device *mdt = info->mti_mdt;
2150         struct ptlrpc_request *req = mdt_info_req(info);
2151         struct mdt_reint_record *rr = &info->mti_rr;
2152         struct lu_ucred *uc = mdt_ucred(info);
2153         struct md_attr *ma = &info->mti_attr;
2154         struct mdt_object *pobj;
2155         struct mdt_object *spobj;
2156         struct mdt_object *tpobj;
2157         struct mdt_object *sobj;
2158         struct mdt_object *tobj;
2159         struct mdt_lock_handle *rename_lh = &info->mti_lh[MDT_LH_RMT];
2160         struct mdt_lock_handle *lhsp;
2161         struct mdt_lock_handle *lhtp;
2162         struct mdt_lock_handle *lhs;
2163         struct mdt_lock_handle *lhl;
2164         LIST_HEAD(link_locks);
2165         int lock_retries = 5;
2166         bool reverse = false;
2167         bool open_sem_locked = false;
2168         bool do_sync = false;
2169         bool is_plain_dir = false;
2170         int rc;
2171
2172         ENTRY;
2173         CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1),
2174                PNAME(&rr->rr_name), PFID(rr->rr_fid2));
2175
2176         if (info->mti_dlm_req)
2177                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2178
2179         if (!fid_is_md_operative(rr->rr_fid1) ||
2180             !fid_is_md_operative(rr->rr_fid2))
2181                 RETURN(-EPERM);
2182
2183         /* don't allow migrate . or .. */
2184         if (lu_name_is_dot_or_dotdot(&rr->rr_name))
2185                 RETURN(-EBUSY);
2186
2187         if (!mdt->mdt_enable_remote_dir || !mdt->mdt_enable_dir_migration)
2188                 RETURN(-EPERM);
2189
2190         /* we want rbac roles to have precedence over any other
2191          * permission or capability checks
2192          */
2193         if (uc && (!uc->uc_rbac_dne_ops ||
2194                    (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
2195                     uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
2196                     mdt->mdt_enable_remote_dir_gid != -1)))
2197                 RETURN(-EPERM);
2198
2199         /*
2200          * Note: do not enqueue rename lock for replay request, because
2201          * if other MDT holds rename lock, but being blocked to wait for
2202          * this MDT to finish its recovery, and the failover MDT can not
2203          * get rename lock, which will cause deadlock.
2204          *
2205          * req is NULL if this is called by directory auto-split.
2206          */
2207         if (req && !req_is_replay(req)) {
2208                 rc = mdt_rename_lock(info, rename_lh);
2209                 if (rc != 0) {
2210                         CERROR("%s: can't lock FS for rename: rc = %d\n",
2211                                mdt_obd_name(info->mti_mdt), rc);
2212                         RETURN(rc);
2213                 }
2214         }
2215
2216         /* pobj is master object of parent */
2217         pobj = mdt_object_find(env, mdt, rr->rr_fid1);
2218         if (IS_ERR(pobj))
2219                 GOTO(unlock_rename, rc = PTR_ERR(pobj));
2220
2221         if (req) {
2222                 rc = mdt_version_get_check(info, pobj, 0);
2223                 if (rc)
2224                         GOTO(put_parent, rc);
2225         }
2226
2227         if (!mdt_object_exists(pobj))
2228                 GOTO(put_parent, rc = -ENOENT);
2229
2230         if (!S_ISDIR(lu_object_attr(&pobj->mot_obj)))
2231                 GOTO(put_parent, rc = -ENOTDIR);
2232
2233         rc = mdt_check_enc(info, pobj);
2234         if (rc)
2235                 GOTO(put_parent, rc);
2236
2237         rc = mdt_stripe_get(info, pobj, ma, XATTR_NAME_LMV);
2238         if (rc)
2239                 GOTO(put_parent, rc);
2240
2241         if (CFS_FAIL_CHECK(OBD_FAIL_MIGRATE_BAD_HASH) &&
2242             (ma->ma_valid & MA_LMV) &&
2243             lmv_is_migrating(&ma->ma_lmv->lmv_md_v1)) {
2244                 struct lu_buf *buf = &info->mti_buf;
2245                 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
2246                 __u32 version = le32_to_cpu(lmv->lmv_layout_version);
2247
2248                 lmv->lmv_hash_type = cpu_to_le32(LMV_HASH_TYPE_UNKNOWN |
2249                                                  LMV_HASH_FLAG_BAD_TYPE);
2250                 lmv->lmv_layout_version = cpu_to_le32(version + 1);
2251                 buf->lb_buf = lmv;
2252                 buf->lb_len = sizeof(*lmv);
2253                 rc = mo_xattr_set(env, mdt_object_child(pobj), buf,
2254                                   XATTR_NAME_LMV, LU_XATTR_REPLACE);
2255                 mo_invalidate(env, mdt_object_child(pobj));
2256                 GOTO(put_parent, rc);
2257         }
2258
2259         /* @spobj is the parent stripe of @sobj if @pobj is striped directory,
2260          * if @pobj is migrating too, tpobj is the target parent stripe.
2261          */
2262         rc = mdt_migrate_lookup(info, pobj, ma, &rr->rr_name, &spobj, &tpobj,
2263                                 &sobj);
2264         if (rc < 0)
2265                 GOTO(put_parent, rc);
2266         reverse = rc;
2267
2268         /* parent unchanged, this happens in dir restripe */
2269         if (info->mti_spec.sp_migrate_nsonly && spobj == tpobj)
2270                 GOTO(put_source, rc = -EALREADY);
2271
2272 lock_parent:
2273         LASSERT(spobj);
2274         LASSERT(tpobj);
2275         lhsp = &info->mti_lh[MDT_LH_PARENT];
2276         lhtp = &info->mti_lh[MDT_LH_CHILD];
2277         /* lock spobj and tpobj in stripe index order */
2278         if (reverse) {
2279                 rc = mdt_parent_lock(info, tpobj, lhtp, &rr->rr_name, LCK_PW,
2280                                      true);
2281                 if (rc)
2282                         GOTO(put_source, rc);
2283
2284                 LASSERT(spobj != tpobj);
2285                 rc = mdt_parent_lock(info, spobj, lhsp, &rr->rr_name, LCK_PW,
2286                                      true);
2287                 if (rc)
2288                         GOTO(unlock_parent, rc);
2289         } else {
2290                 rc = mdt_parent_lock(info, spobj, lhsp, &rr->rr_name, LCK_PW,
2291                                      true);
2292                 if (rc)
2293                         GOTO(put_source, rc);
2294
2295                 if (tpobj != spobj) {
2296                         rc = mdt_parent_lock(info, tpobj, lhtp, &rr->rr_name,
2297                                              LCK_PW, true);
2298                         if (rc)
2299                                 GOTO(unlock_parent, rc);
2300                 }
2301         }
2302
2303         /* if inode is not migrated, or is dir, no need to lock links */
2304         if (!info->mti_spec.sp_migrate_nsonly &&
2305             !S_ISDIR(lu_object_attr(&sobj->mot_obj))) {
2306                 /* lock link parents, and take LOOKUP lock of links */
2307                 rc = mdt_migrate_links_lock(info, spobj, tpobj, sobj, lhsp,
2308                                             lhtp, &link_locks);
2309                 if (rc == -EBUSY && lock_retries-- > 0) {
2310                         LASSERT(list_empty(&link_locks));
2311                         goto lock_parent;
2312                 }
2313
2314                 if (rc < 0)
2315                         GOTO(put_source, rc);
2316
2317                 /*
2318                  * RS_MAX_LOCKS is the limit of number of locks that can be
2319                  * saved along with one request, if total lock count exceeds
2320                  * this limit, we will drop all locks after migration, and
2321                  * trigger commit in the end.
2322                  */
2323                 do_sync = rc;
2324         }
2325
2326         /* lock source */
2327         lhs = &info->mti_lh[MDT_LH_OLD];
2328         lhl = &info->mti_lh[MDT_LH_LOOKUP];
2329         rc = mdt_rename_source_lock(info, spobj, sobj, lhs, lhl,
2330                                     MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR |
2331                                     MDS_INODELOCK_OPEN, true);
2332         if (rc)
2333                 GOTO(unlock_links, rc);
2334
2335         if (S_ISREG(lu_object_attr(&sobj->mot_obj))) {
2336                 /* TODO: DoM migration is not supported, migrate dirent only */
2337                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LOV);
2338                 if (rc)
2339                         GOTO(unlock_source, rc);
2340
2341                 if (ma->ma_valid & MA_LOV && mdt_lmm_dom_stripesize(ma->ma_lmm))
2342                         info->mti_spec.sp_migrate_nsonly = 1;
2343         } else if (S_ISDIR(lu_object_attr(&sobj->mot_obj))) {
2344                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2345                 if (rc)
2346                         GOTO(unlock_source, rc);
2347
2348                 if (!(ma->ma_valid & MA_LMV))
2349                         is_plain_dir = true;
2350                 else if (lmv_is_restriping(&ma->ma_lmv->lmv_md_v1))
2351                         /* race with restripe/auto-split */
2352                         GOTO(unlock_source, rc = -EBUSY);
2353                 else if (lmv_is_failed_migration(&ma->ma_lmv->lmv_md_v1)) {
2354                         struct lu_buf *buf = &info->mti_buf;
2355                         struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
2356                         __u32 version = le32_to_cpu(lmv->lmv_layout_version);
2357
2358                         /* migration failed before, and LFSCK cleared hash type
2359                          * and flags, fake it to resume migration.
2360                          */
2361                         lmv->lmv_hash_type =
2362                                 cpu_to_le32(LMV_HASH_TYPE_FNV_1A_64 |
2363                                             LMV_HASH_FLAG_MIGRATION |
2364                                             LMV_HASH_FLAG_BAD_TYPE |
2365                                             LMV_HASH_FLAG_FIXED);
2366                         lmv->lmv_layout_version = cpu_to_le32(version + 1);
2367                         buf->lb_buf = lmv;
2368                         buf->lb_len = sizeof(*lmv);
2369                         rc = mo_xattr_set(env, mdt_object_child(sobj), buf,
2370                                           XATTR_NAME_LMV, LU_XATTR_REPLACE);
2371                         mo_invalidate(env, mdt_object_child(sobj));
2372                         GOTO(unlock_source, rc = -EALREADY);
2373                 }
2374         }
2375
2376         /* if migration HSM is allowed */
2377         if (!mdt->mdt_opts.mo_migrate_hsm_allowed) {
2378                 ma->ma_need = MA_HSM;
2379                 ma->ma_valid = 0;
2380                 rc = mdt_attr_get_complex(info, sobj, ma);
2381                 if (rc)
2382                         GOTO(unlock_source, rc);
2383
2384                 if ((ma->ma_valid & MA_HSM) && ma->ma_hsm.mh_flags != 0)
2385                         GOTO(unlock_source, rc = -EOPNOTSUPP);
2386         }
2387
2388         /* end lease and close file for regular file */
2389         if (info->mti_spec.sp_migrate_close) {
2390                 /* try to hold open_sem so that nobody else can open the file */
2391                 if (!down_write_trylock(&sobj->mot_open_sem)) {
2392                         /* close anyway */
2393                         mdd_migrate_close(info, sobj);
2394                         GOTO(unlock_source, rc = -EBUSY);
2395                 } else {
2396                         open_sem_locked = true;
2397                         rc = mdd_migrate_close(info, sobj);
2398                         if (rc && rc != -ESTALE)
2399                                 GOTO(unlock_open_sem, rc);
2400                 }
2401         }
2402
2403         tobj = mdt_object_find(env, mdt, rr->rr_fid2);
2404         if (IS_ERR(tobj))
2405                 GOTO(unlock_open_sem, rc = PTR_ERR(tobj));
2406
2407         /* Don't do lookup sanity check. We know name doesn't exist. */
2408         info->mti_spec.sp_cr_lookup = 0;
2409         info->mti_spec.sp_feat = &dt_directory_features;
2410
2411         rc = mdo_migrate(env, mdt_object_child(spobj),
2412                          mdt_object_child(tpobj), mdt_object_child(sobj),
2413                          mdt_object_child(tobj), &rr->rr_name,
2414                          &info->mti_spec, ma);
2415         if (rc)
2416                 GOTO(put_target, rc);
2417
2418         /* save target locks for directory */
2419         if (S_ISDIR(lu_object_attr(&sobj->mot_obj)) &&
2420             !info->mti_spec.sp_migrate_nsonly) {
2421                 struct mdt_lock_handle *lht = &info->mti_lh[MDT_LH_NEW];
2422                 struct ldlm_enqueue_info *einfo = &info->mti_einfo;
2423
2424                 /* in case sobj becomes a stripe of tobj, unlock sobj here,
2425                  * otherwise stripes lock may deadlock.
2426                  */
2427                 if (is_plain_dir)
2428                         mdt_rename_source_unlock(info, sobj, lhs, lhl, 1);
2429
2430                 rc = mdt_object_stripes_lock(info, tpobj, tobj, lht, einfo,
2431                                              MDS_INODELOCK_UPDATE, LCK_PW,
2432                                              true);
2433                 if (rc)
2434                         GOTO(put_target, rc);
2435
2436                 mdt_object_stripes_unlock(info, tobj, lht, einfo, 0);
2437         }
2438
2439         lprocfs_counter_incr(mdt->mdt_lu_dev.ld_obd->obd_md_stats,
2440                              LPROC_MDT_MIGRATE + LPROC_MD_LAST_OPC);
2441
2442         EXIT;
2443 put_target:
2444         mdt_object_put(env, tobj);
2445 unlock_open_sem:
2446         if (open_sem_locked)
2447                 up_write(&sobj->mot_open_sem);
2448 unlock_source:
2449         mdt_rename_source_unlock(info, sobj, lhs, lhl, rc);
2450 unlock_links:
2451         /* if we've got too many locks to save into RPC,
2452          * then just commit before the locks are released
2453          */
2454         if (!rc && do_sync)
2455                 mdt_device_sync(env, mdt);
2456         mdt_migrate_links_unlock(info, &link_locks, do_sync ? 1 : rc);
2457 unlock_parent:
2458         mdt_object_unlock(info, spobj, lhsp, rc);
2459         mdt_object_unlock(info, tpobj, lhtp, rc);
2460 put_source:
2461         mdt_object_put(env, sobj);
2462         mdt_object_put(env, spobj);
2463         mdt_object_put(env, tpobj);
2464 put_parent:
2465         mo_invalidate(env, mdt_object_child(pobj));
2466         mdt_object_put(env, pobj);
2467 unlock_rename:
2468         mdt_rename_unlock(info, rename_lh);
2469
2470         if (rc)
2471                 CERROR("%s: migrate "DFID"/"DNAME" failed: rc = %d\n",
2472                        mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1),
2473                        PNAME(&rr->rr_name), rc);
2474
2475         return rc;
2476 }
2477
2478 /*
2479  * determine lock order of sobj and tobj
2480  *
2481  * there are two situations we need to lock tobj before sobj:
2482  * 1. sobj is child of tobj
2483  * 2. sobj and tobj are stripes of a directory, and stripe index of sobj is
2484  *    larger than that of tobj
2485  *
2486  * \retval      1 lock tobj before sobj
2487  * \retval      0 lock sobj before tobj
2488  * \retval      -ev negative errno upon error
2489  */
2490 static int mdt_rename_determine_lock_order(struct mdt_thread_info *info,
2491                                            struct mdt_object *sobj,
2492                                            struct mdt_object *tobj)
2493 {
2494         struct md_attr *ma = &info->mti_attr;
2495         struct lu_fid *spfid = &info->mti_tmp_fid1;
2496         struct lu_fid *tpfid = &info->mti_tmp_fid2;
2497         struct lmv_mds_md_v1 *lmv;
2498         __u32 sindex;
2499         __u32 tindex;
2500         int rc;
2501
2502         /* sobj and tobj are the same */
2503         if (sobj == tobj)
2504                 return 0;
2505
2506         if (fid_is_root(mdt_object_fid(sobj)))
2507                 return 0;
2508
2509         if (fid_is_root(mdt_object_fid(tobj)))
2510                 return 1;
2511
2512         /* check whether sobj is child of tobj */
2513         rc = mdo_is_subdir(info->mti_env, mdt_object_child(sobj),
2514                            mdt_object_fid(tobj));
2515         if (rc < 0)
2516                 return rc;
2517
2518         if (rc == 1)
2519                 return 1;
2520
2521         /* check whether sobj and tobj are children of the same parent */
2522         rc = mdt_attr_get_pfid(info, sobj, spfid);
2523         if (rc)
2524                 return rc;
2525
2526         rc = mdt_attr_get_pfid(info, tobj, tpfid);
2527         if (rc)
2528                 return rc;
2529
2530         if (!lu_fid_eq(spfid, tpfid))
2531                 return 0;
2532
2533         /* check whether sobj and tobj are sibling stripes */
2534         rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2535         if (rc)
2536                 return rc;
2537
2538         if (!(ma->ma_valid & MA_LMV))
2539                 return 0;
2540
2541         lmv = &ma->ma_lmv->lmv_md_v1;
2542         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2543                 return 0;
2544         sindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2545
2546         ma->ma_valid = 0;
2547         rc = mdt_stripe_get(info, tobj, ma, XATTR_NAME_LMV);
2548         if (rc)
2549                 return rc;
2550
2551         if (!(ma->ma_valid & MA_LMV))
2552                 return -ENODATA;
2553
2554         lmv = &ma->ma_lmv->lmv_md_v1;
2555         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2556                 return -EINVAL;
2557         tindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2558
2559         /* check stripe index of sobj and tobj */
2560         if (sindex == tindex)
2561                 return -EINVAL;
2562
2563         return sindex < tindex ? 0 : 1;
2564 }
2565
2566 /* Helper function for mdt_reint_rename so we don't need to opencode
2567  * two different order lockings
2568  */
2569 static int mdt_lock_two_dirs(struct mdt_thread_info *info,
2570                              struct mdt_object *mfirstdir,
2571                              struct mdt_lock_handle *lh_firstdirp,
2572                              const struct lu_name *firstname,
2573                              struct mdt_object *mseconddir,
2574                              struct mdt_lock_handle *lh_seconddirp,
2575                              const struct lu_name *secondname,
2576                              bool cos_incompat)
2577 {
2578         int rc;
2579
2580         rc = mdt_parent_lock(info, mfirstdir, lh_firstdirp, firstname, LCK_PW,
2581                              cos_incompat);
2582         if (rc)
2583                 return rc;
2584
2585         mdt_version_get_save(info, mfirstdir, 0);
2586         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
2587
2588         if (mfirstdir != mseconddir) {
2589                 rc = mdt_parent_lock(info, mseconddir, lh_seconddirp,
2590                                      secondname, LCK_PW, cos_incompat);
2591         } else if (!mdt_object_remote(mseconddir)) {
2592                 if (lh_firstdirp->mlh_pdo_hash !=
2593                     lh_seconddirp->mlh_pdo_hash) {
2594                         rc = mdt_object_pdo_lock(info, mseconddir,
2595                                                  lh_seconddirp, secondname,
2596                                                  LCK_PW, false, cos_incompat);
2597                         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
2598                 }
2599         }
2600         mdt_version_get_save(info, mseconddir, 1);
2601
2602         if (rc != 0)
2603                 mdt_object_unlock(info, mfirstdir, lh_firstdirp, rc);
2604
2605         return rc;
2606 }
2607
2608 /*
2609  * VBR: rename versions in reply: 0 - srcdir parent; 1 - tgtdir parent;
2610  * 2 - srcdir child; 3 - tgtdir child.
2611  * Update on disk version of srcdir child.
2612  */
2613 static int mdt_reint_rename(struct mdt_thread_info *info,
2614                             struct mdt_lock_handle *unused)
2615 {
2616         struct mdt_device *mdt = info->mti_mdt;
2617         struct mdt_reint_record *rr = &info->mti_rr;
2618         struct md_attr *ma = &info->mti_attr;
2619         struct ptlrpc_request *req = mdt_info_req(info);
2620         struct mdt_object *msrcdir = NULL;
2621         struct mdt_object *mtgtdir = NULL;
2622         struct mdt_object *mold;
2623         struct mdt_object *mnew = NULL;
2624         struct mdt_lock_handle *rename_lh = &info->mti_lh[MDT_LH_RMT];
2625         struct mdt_lock_handle *lh_srcdirp;
2626         struct mdt_lock_handle *lh_tgtdirp;
2627         struct mdt_lock_handle *lh_oldp = NULL;
2628         struct mdt_lock_handle *lh_lookup = NULL;
2629         struct mdt_lock_handle *lh_newp = NULL;
2630         struct lu_fid *old_fid = &info->mti_tmp_fid1;
2631         struct lu_fid *new_fid = &info->mti_tmp_fid2;
2632         struct lu_ucred *uc = mdt_ucred(info);
2633         bool reverse = false, discard = false;
2634         bool cos_incompat;
2635         ktime_t kstart = ktime_get();
2636         enum mdt_stat_idx msi = 0;
2637         int rc;
2638
2639         ENTRY;
2640         DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME,
2641                   PFID(rr->rr_fid1), PNAME(&rr->rr_name),
2642                   PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name));
2643
2644         if (info->mti_dlm_req)
2645                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2646
2647         if (!fid_is_md_operative(rr->rr_fid1) ||
2648             !fid_is_md_operative(rr->rr_fid2))
2649                 RETURN(-EPERM);
2650
2651         /* find both parents. */
2652         msrcdir = mdt_parent_find_check(info, rr->rr_fid1, 0);
2653         if (IS_ERR(msrcdir))
2654                 RETURN(PTR_ERR(msrcdir));
2655
2656         rc = mdt_check_enc(info, msrcdir);
2657         if (rc)
2658                 GOTO(out_put_srcdir, rc);
2659
2660         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
2661
2662         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
2663                 mtgtdir = msrcdir;
2664                 mdt_object_get(info->mti_env, mtgtdir);
2665         } else {
2666                 mtgtdir = mdt_parent_find_check(info, rr->rr_fid2, 1);
2667                 if (IS_ERR(mtgtdir))
2668                         GOTO(out_put_srcdir, rc = PTR_ERR(mtgtdir));
2669         }
2670
2671         rc = mdt_check_enc(info, mtgtdir);
2672         if (rc)
2673                 GOTO(out_put_tgtdir, rc);
2674
2675         if (!uc->uc_rbac_fscrypt_admin &&
2676             mtgtdir->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD)
2677                 GOTO(out_put_tgtdir, rc = -EPERM);
2678
2679         /*
2680          * Note: do not enqueue rename lock for replay request, because
2681          * if other MDT holds rename lock, but being blocked to wait for
2682          * this MDT to finish its recovery, and the failover MDT can not
2683          * get rename lock, which will cause deadlock.
2684          */
2685         if (!req_is_replay(req)) {
2686                 bool remote = mdt_object_remote(msrcdir);
2687
2688                 /*
2689                  * Normally rename RPC is handled on the MDT with the target
2690                  * directory (if target exists, it's on the MDT with the
2691                  * target), if the source directory is remote, it's a hint that
2692                  * source is remote too (this may not be true, but it won't
2693                  * cause any issue), return -EXDEV early to avoid taking
2694                  * rename_lock.
2695                  */
2696                 if (!mdt->mdt_enable_remote_rename && remote)
2697                         GOTO(out_put_tgtdir, rc = -EXDEV);
2698
2699                 /* This might be further relaxed in the future for regular file
2700                  * renames in different source and target parents. Start with
2701                  * only same-directory renames for simplicity and because this
2702                  * is by far the most the common use case.
2703                  *
2704                  * Striped directories should be considered "remote".
2705                  */
2706                 if (msrcdir != mtgtdir || remote ||
2707                     (S_ISDIR(ma->ma_attr.la_mode) &&
2708                      !mdt->mdt_enable_parallel_rename_dir) ||
2709                     (!S_ISDIR(ma->ma_attr.la_mode) &&
2710                      !mdt->mdt_enable_parallel_rename_file)) {
2711                         rc = mdt_rename_lock(info, rename_lh);
2712                         if (rc != 0) {
2713                                 CERROR("%s: cannot lock for rename: rc = %d\n",
2714                                        mdt_obd_name(mdt), rc);
2715                                 GOTO(out_put_tgtdir, rc);
2716                         }
2717                 } else {
2718                         if (S_ISDIR(ma->ma_attr.la_mode))
2719                                 msi = LPROC_MDT_RENAME_PAR_DIR;
2720                         else
2721                                 msi = LPROC_MDT_RENAME_PAR_FILE;
2722
2723                         CDEBUG(D_INFO,
2724                                "%s: samedir parallel rename "DFID"/"DNAME"\n",
2725                                mdt_obd_name(mdt), PFID(rr->rr_fid1),
2726                                PNAME(&rr->rr_name));
2727                 }
2728         }
2729
2730         rc = mdt_rename_determine_lock_order(info, msrcdir, mtgtdir);
2731         if (rc < 0)
2732                 GOTO(out_unlock_rename, rc);
2733         reverse = rc;
2734
2735         /* source needs to be looked up after locking source parent, otherwise
2736          * this rename may race with unlink source, and cause rename hang, see
2737          * sanityn.sh 55b, so check parents first, if later we found source is
2738          * remote, relock parents.
2739          */
2740         cos_incompat = (mdt_object_remote(msrcdir) ||
2741                         mdt_object_remote(mtgtdir));
2742
2743         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2744
2745         /* lock parents in the proper order. */
2746         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
2747         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
2748
2749         CFS_RACE(OBD_FAIL_MDS_REINT_OPEN);
2750         CFS_RACE(OBD_FAIL_MDS_REINT_OPEN2);
2751 relock:
2752         mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name);
2753         mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name);
2754
2755         /* In case of same dir local rename we must sort by the hash,
2756          * otherwise a lock deadlock is possible when renaming
2757          * a to b and b to a at the same time LU-15285
2758          */
2759         if (!mdt_object_remote(mtgtdir) && mtgtdir == msrcdir)
2760                 reverse = lh_srcdirp->mlh_pdo_hash > lh_tgtdirp->mlh_pdo_hash;
2761         if (unlikely(CFS_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK)))
2762                 reverse = 0;
2763
2764         if (reverse)
2765                 rc = mdt_lock_two_dirs(info, mtgtdir, lh_tgtdirp,
2766                                        &rr->rr_tgt_name, msrcdir, lh_srcdirp,
2767                                        &rr->rr_name, cos_incompat);
2768         else
2769                 rc = mdt_lock_two_dirs(info, msrcdir, lh_srcdirp, &rr->rr_name,
2770                                        mtgtdir, lh_tgtdirp, &rr->rr_tgt_name,
2771                                        cos_incompat);
2772
2773         if (rc != 0)
2774                 GOTO(out_unlock_rename, rc);
2775
2776         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2777         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME2, 5);
2778
2779         /* find mold object. */
2780         fid_zero(old_fid);
2781         rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2);
2782         if (rc != 0)
2783                 GOTO(out_unlock_parents, rc);
2784
2785         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
2786                 GOTO(out_unlock_parents, rc = -EINVAL);
2787
2788         if (!fid_is_md_operative(old_fid))
2789                 GOTO(out_unlock_parents, rc = -EPERM);
2790
2791         mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
2792         if (IS_ERR(mold))
2793                 GOTO(out_unlock_parents, rc = PTR_ERR(mold));
2794
2795         if (!mdt_object_exists(mold)) {
2796                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2797                                 &mold->mot_obj,
2798                                 "object does not exist");
2799                 GOTO(out_put_old, rc = -ENOENT);
2800         }
2801
2802         if (mdt_object_remote(mold) && !mdt->mdt_enable_remote_rename)
2803                 GOTO(out_put_old, rc = -EXDEV);
2804
2805         /* Check if @mtgtdir is subdir of @mold, before locking child
2806          * to avoid reverse locking.
2807          */
2808         if (mtgtdir != msrcdir) {
2809                 rc = mdo_is_subdir(info->mti_env, mdt_object_child(mtgtdir),
2810                                    old_fid);
2811                 if (rc) {
2812                         if (rc == 1)
2813                                 rc = -EINVAL;
2814                         GOTO(out_put_old, rc);
2815                 }
2816         }
2817
2818         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold));
2819         /* save version after locking */
2820         mdt_version_get_save(info, mold, 2);
2821
2822         if (!cos_incompat && mdt_object_remote(mold)) {
2823                 cos_incompat = true;
2824                 mdt_object_put(info->mti_env, mold);
2825                 mdt_object_unlock(info, mtgtdir, lh_tgtdirp, -EAGAIN);
2826                 mdt_object_unlock(info, msrcdir, lh_srcdirp, -EAGAIN);
2827                 goto relock;
2828         }
2829
2830         /* find mnew object:
2831          * mnew target object may not exist now
2832          * lookup with version checking
2833          */
2834         fid_zero(new_fid);
2835         rc = mdt_lookup_version_check(info, mtgtdir, &rr->rr_tgt_name, new_fid,
2836                                       3);
2837         if (rc == 0) {
2838                 /* the new_fid should have been filled at this moment */
2839                 if (lu_fid_eq(old_fid, new_fid))
2840                         GOTO(out_put_old, rc);
2841
2842                 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
2843                     lu_fid_eq(new_fid, rr->rr_fid2))
2844                         GOTO(out_put_old, rc = -EINVAL);
2845
2846                 if (!fid_is_md_operative(new_fid))
2847                         GOTO(out_put_old, rc = -EPERM);
2848
2849                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
2850                 if (IS_ERR(mnew))
2851                         GOTO(out_put_old, rc = PTR_ERR(mnew));
2852
2853                 if (!mdt_object_exists(mnew)) {
2854                         LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2855                                         &mnew->mot_obj,
2856                                         "object does not exist");
2857                         GOTO(out_put_new, rc = -ENOENT);
2858                 }
2859
2860                 if (mdt_object_remote(mnew)) {
2861                         struct mdt_body  *repbody;
2862
2863                         /* Always send rename req to the target child MDT */
2864                         repbody = req_capsule_server_get(info->mti_pill,
2865                                                          &RMF_MDT_BODY);
2866                         LASSERT(repbody != NULL);
2867                         repbody->mbo_fid1 = *new_fid;
2868                         repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
2869                         GOTO(out_put_new, rc = -EXDEV);
2870                 }
2871                 /* Before locking the target dir, check we do not replace
2872                  * a dir with a non-dir, otherwise it may deadlock with
2873                  * link op which tries to create a link in this dir
2874                  * back to this non-dir.
2875                  */
2876                 if (S_ISDIR(lu_object_attr(&mnew->mot_obj)) &&
2877                     !S_ISDIR(lu_object_attr(&mold->mot_obj)))
2878                         GOTO(out_put_new, rc = -EISDIR);
2879
2880                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2881                 lh_lookup = &info->mti_lh[MDT_LH_LOOKUP];
2882                 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2883                                             lh_lookup,
2884                                             MDS_INODELOCK_LOOKUP |
2885                                             MDS_INODELOCK_XATTR, cos_incompat);
2886                 if (rc < 0)
2887                         GOTO(out_put_new, rc);
2888
2889                 /* Check if @msrcdir is subdir of @mnew, before locking child
2890                  * to avoid reverse locking.
2891                  */
2892                 if (mtgtdir != msrcdir) {
2893                         rc = mdo_is_subdir(info->mti_env,
2894                                            mdt_object_child(msrcdir), new_fid);
2895                         if (rc) {
2896                                 if (rc == 1)
2897                                         rc = -EINVAL;
2898                                 GOTO(out_unlock_old, rc);
2899                         }
2900                 }
2901
2902                 /* We used to acquire MDS_INODELOCK_FULL here but we
2903                  * can't do this now because a running HSM restore on
2904                  * the rename onto victim will hold the layout
2905                  * lock. See LU-4002.
2906                  */
2907
2908                 lh_newp = &info->mti_lh[MDT_LH_NEW];
2909                 rc = mdt_object_check_lock(info, mtgtdir, mnew, lh_newp,
2910                                            MDS_INODELOCK_LOOKUP |
2911                                            MDS_INODELOCK_UPDATE, LCK_EX,
2912                                            cos_incompat);
2913                 if (rc != 0)
2914                         GOTO(out_unlock_new, rc);
2915
2916                 /* get and save version after locking */
2917                 mdt_version_get_save(info, mnew, 3);
2918         } else if (rc != -ENOENT) {
2919                 GOTO(out_put_old, rc);
2920         } else {
2921                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2922                 lh_lookup = &info->mti_lh[MDT_LH_LOOKUP];
2923                 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2924                                             lh_lookup,
2925                                             MDS_INODELOCK_LOOKUP |
2926                                             MDS_INODELOCK_XATTR, cos_incompat);
2927                 if (rc != 0)
2928                         GOTO(out_put_old, rc);
2929
2930                 mdt_enoent_version_save(info, 3);
2931         }
2932
2933         /* step 5: rename it */
2934         mdt_reint_init_ma(info, ma);
2935
2936         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
2937                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
2938
2939         if (mnew != NULL)
2940                 mutex_lock(&mnew->mot_lov_mutex);
2941
2942         rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
2943                         mdt_object_child(mtgtdir), old_fid, &rr->rr_name,
2944                         mnew != NULL ? mdt_object_child(mnew) : NULL,
2945                         &rr->rr_tgt_name, ma);
2946
2947         if (mnew != NULL)
2948                 mutex_unlock(&mnew->mot_lov_mutex);
2949
2950         /* handle last link of tgt object */
2951         if (rc == 0) {
2952                 if (mnew) {
2953                         mdt_handle_last_unlink(info, mnew, ma);
2954                         discard = mdt_dom_check_for_discard(info, mnew);
2955                 }
2956                 mdt_rename_counter_tally(info, info->mti_mdt, req,
2957                                          msrcdir, mtgtdir, msi,
2958                                          ktime_us_delta(ktime_get(), kstart));
2959         }
2960
2961         EXIT;
2962 out_unlock_new:
2963         if (mnew != NULL)
2964                 mdt_object_unlock(info, mnew, lh_newp, rc);
2965 out_unlock_old:
2966         mdt_object_unlock(info, NULL, lh_lookup, rc);
2967         mdt_object_unlock(info, mold, lh_oldp, rc);
2968 out_put_new:
2969         if (mnew && !discard)
2970                 mdt_object_put(info->mti_env, mnew);
2971 out_put_old:
2972         mdt_object_put(info->mti_env, mold);
2973 out_unlock_parents:
2974         mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
2975         mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
2976 out_unlock_rename:
2977         mdt_rename_unlock(info, rename_lh);
2978 out_put_tgtdir:
2979         mdt_object_put(info->mti_env, mtgtdir);
2980 out_put_srcdir:
2981         mdt_object_put(info->mti_env, msrcdir);
2982
2983         /* The DoM discard can be done right in the place above where it is
2984          * assigned, meanwhile it is done here after rename unlock due to
2985          * compatibility with old clients, for them the discard blocks
2986          * the main thread until completion. Check LU-11359 for details.
2987          */
2988         if (discard) {
2989                 mdt_dom_discard_data(info, mnew);
2990                 mdt_object_put(info->mti_env, mnew);
2991         }
2992         CFS_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
2993         return rc;
2994 }
2995
2996 static int mdt_reint_resync(struct mdt_thread_info *info,
2997                             struct mdt_lock_handle *lhc)
2998 {
2999         struct mdt_reint_record *rr = &info->mti_rr;
3000         struct ptlrpc_request *req = mdt_info_req(info);
3001         struct md_attr *ma = &info->mti_attr;
3002         struct mdt_object *mo;
3003         struct ldlm_lock *lease;
3004         struct mdt_body *repbody;
3005         struct md_layout_change layout = { .mlc_mirror_id = rr->rr_mirror_id };
3006         bool lease_broken;
3007         int rc;
3008
3009         ENTRY;
3010         DEBUG_REQ(D_INODE, req, DFID", FLR file resync", PFID(rr->rr_fid1));
3011
3012         if (info->mti_dlm_req)
3013                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
3014
3015         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
3016         if (IS_ERR(mo))
3017                 GOTO(out, rc = PTR_ERR(mo));
3018
3019         if (!mdt_object_exists(mo))
3020                 GOTO(out_obj, rc = -ENOENT);
3021
3022         if (!S_ISREG(lu_object_attr(&mo->mot_obj)))
3023                 GOTO(out_obj, rc = -EINVAL);
3024
3025         if (mdt_object_remote(mo))
3026                 GOTO(out_obj, rc = -EREMOTE);
3027
3028         lease = ldlm_handle2lock(rr->rr_lease_handle);
3029         if (lease == NULL)
3030                 GOTO(out_obj, rc = -ESTALE);
3031
3032         /* It's really necessary to grab open_sem and check if the lease lock
3033          * has been lost. There would exist a concurrent writer coming in and
3034          * generating some dirty data in memory cache, the writeback would fail
3035          * after the layout version is increased by MDS_REINT_RESYNC RPC.
3036          */
3037         if (!down_write_trylock(&mo->mot_open_sem))
3038                 GOTO(out_put_lease, rc = -EBUSY);
3039
3040         lock_res_and_lock(lease);
3041         lease_broken = ldlm_is_cancel(lease);
3042         unlock_res_and_lock(lease);
3043         if (lease_broken)
3044                 GOTO(out_unlock, rc = -EBUSY);
3045
3046         /* the file has yet opened by anyone else after we took the lease. */
3047         layout.mlc_opc = MD_LAYOUT_RESYNC;
3048         lhc = &info->mti_lh[MDT_LH_LOCAL];
3049         rc = mdt_layout_change(info, mo, lhc, &layout);
3050         if (rc)
3051                 GOTO(out_unlock, rc);
3052
3053         mdt_object_unlock(info, mo, lhc, 0);
3054
3055         ma->ma_need = MA_INODE;
3056         ma->ma_valid = 0;
3057         rc = mdt_attr_get_complex(info, mo, ma);
3058         if (rc != 0)
3059                 GOTO(out_unlock, rc);
3060
3061         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
3062         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
3063
3064         EXIT;
3065 out_unlock:
3066         up_write(&mo->mot_open_sem);
3067 out_put_lease:
3068         LDLM_LOCK_PUT(lease);
3069 out_obj:
3070         mdt_object_put(info->mti_env, mo);
3071 out:
3072         mdt_client_compatibility(info);
3073         return rc;
3074 }
3075
3076 struct mdt_reinter {
3077         int (*mr_handler)(struct mdt_thread_info *, struct mdt_lock_handle *);
3078         enum lprocfs_extra_opc mr_extra_opc;
3079 };
3080
3081 static const struct mdt_reinter mdt_reinters[] = {
3082         [REINT_SETATTR] = {
3083                 .mr_handler = &mdt_reint_setattr,
3084                 .mr_extra_opc = MDS_REINT_SETATTR,
3085         },
3086         [REINT_CREATE] = {
3087                 .mr_handler = &mdt_reint_create,
3088                 .mr_extra_opc = MDS_REINT_CREATE,
3089         },
3090         [REINT_LINK] = {
3091                 .mr_handler = &mdt_reint_link,
3092                 .mr_extra_opc = MDS_REINT_LINK,
3093         },
3094         [REINT_UNLINK] = {
3095                 .mr_handler = &mdt_reint_unlink,
3096                 .mr_extra_opc = MDS_REINT_UNLINK,
3097         },
3098         [REINT_RENAME] = {
3099                 .mr_handler = &mdt_reint_rename,
3100                 .mr_extra_opc = MDS_REINT_RENAME,
3101         },
3102         [REINT_OPEN] = {
3103                 .mr_handler = &mdt_reint_open,
3104                 .mr_extra_opc = MDS_REINT_OPEN,
3105         },
3106         [REINT_SETXATTR] = {
3107                 .mr_handler = &mdt_reint_setxattr,
3108                 .mr_extra_opc = MDS_REINT_SETXATTR,
3109         },
3110         [REINT_RMENTRY] = {
3111                 .mr_handler = &mdt_reint_unlink,
3112                 .mr_extra_opc = MDS_REINT_UNLINK,
3113         },
3114         [REINT_MIGRATE] = {
3115                 .mr_handler = &mdt_reint_migrate,
3116                 .mr_extra_opc = MDS_REINT_RENAME,
3117         },
3118         [REINT_RESYNC] = {
3119                 .mr_handler = &mdt_reint_resync,
3120                 .mr_extra_opc = MDS_REINT_RESYNC,
3121         },
3122 };
3123
3124 int mdt_reint_rec(struct mdt_thread_info *info,
3125                   struct mdt_lock_handle *lhc)
3126 {
3127         const struct mdt_reinter *mr;
3128         int rc;
3129
3130         ENTRY;
3131         if (!(info->mti_rr.rr_opcode < ARRAY_SIZE(mdt_reinters)))
3132                 RETURN(-EPROTO);
3133
3134         mr = &mdt_reinters[info->mti_rr.rr_opcode];
3135         if (mr->mr_handler == NULL)
3136                 RETURN(-EPROTO);
3137
3138         rc = (*mr->mr_handler)(info, lhc);
3139
3140         lprocfs_counter_incr(ptlrpc_req2svc(mdt_info_req(info))->srv_stats,
3141                              PTLRPC_LAST_CNTR + mr->mr_extra_opc);
3142
3143         RETURN(rc);
3144 }