Whamcloud - gitweb
701863ea9d010e0b34b71c97aab861982790572f
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mdt/mdt_reint.c
32  *
33  * Lustre Metadata Target (mdt) reintegration routines
34  *
35  * Author: Peter Braam <braam@clusterfs.com>
36  * Author: Andreas Dilger <adilger@clusterfs.com>
37  * Author: Phil Schwan <phil@clusterfs.com>
38  * Author: Huang Hua <huanghua@clusterfs.com>
39  * Author: Yury Umanets <umka@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_MDS
43
44 #include <lprocfs_status.h>
45 #include "mdt_internal.h"
46 #include <lustre_lmv.h>
47 #include <lustre_crypto.h>
48
49 static inline void mdt_reint_init_ma(struct mdt_thread_info *info,
50                                      struct md_attr *ma)
51 {
52         ma->ma_need = MA_INODE;
53         ma->ma_valid = 0;
54 }
55
56 /**
57  * Get version of object by fid.
58  *
59  * Return real version or ENOENT_VERSION if object doesn't exist
60  */
61 static void mdt_obj_version_get(struct mdt_thread_info *info,
62                                 struct mdt_object *o, __u64 *version)
63 {
64         LASSERT(o);
65
66         if (mdt_object_exists(o) && !mdt_object_remote(o) &&
67             !fid_is_obf(mdt_object_fid(o)))
68                 *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
69         else
70                 *version = ENOENT_VERSION;
71         CDEBUG(D_INODE, "FID "DFID" version is %#llx\n",
72                PFID(mdt_object_fid(o)), *version);
73 }
74
75 /**
76  * Check version is correct.
77  *
78  * Should be called only during replay.
79  */
80 static int mdt_version_check(struct ptlrpc_request *req,
81                              __u64 version, int idx)
82 {
83         __u64 *pre_ver = lustre_msg_get_versions(req->rq_reqmsg);
84
85         ENTRY;
86         if (!exp_connect_vbr(req->rq_export))
87                 RETURN(0);
88
89         LASSERT(req_is_replay(req));
90         /** VBR: version is checked always because costs nothing */
91         LASSERT(idx < PTLRPC_NUM_VERSIONS);
92         /** Sanity check for malformed buffers */
93         if (pre_ver == NULL) {
94                 CERROR("No versions in request buffer\n");
95                 spin_lock(&req->rq_export->exp_lock);
96                 req->rq_export->exp_vbr_failed = 1;
97                 spin_unlock(&req->rq_export->exp_lock);
98                 RETURN(-EOVERFLOW);
99         } else if (pre_ver[idx] != version) {
100                 CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n",
101                        pre_ver[idx], version);
102                 spin_lock(&req->rq_export->exp_lock);
103                 req->rq_export->exp_vbr_failed = 1;
104                 spin_unlock(&req->rq_export->exp_lock);
105                 RETURN(-EOVERFLOW);
106         }
107         RETURN(0);
108 }
109
110 /**
111  * Save pre-versions in reply.
112  */
113 static void mdt_version_save(struct ptlrpc_request *req, __u64 version,
114                              int idx)
115 {
116         __u64 *reply_ver;
117
118         if (!exp_connect_vbr(req->rq_export))
119                 return;
120
121         LASSERT(!req_is_replay(req));
122         LASSERT(req->rq_repmsg != NULL);
123         reply_ver = lustre_msg_get_versions(req->rq_repmsg);
124         if (reply_ver)
125                 reply_ver[idx] = version;
126 }
127
128 /**
129  * Save enoent version, it is needed when it is obvious that object doesn't
130  * exist, e.g. child during create.
131  */
132 static void mdt_enoent_version_save(struct mdt_thread_info *info, int idx)
133 {
134         /* save version of file name for replay, it must be ENOENT here */
135         if (!req_is_replay(mdt_info_req(info))) {
136                 info->mti_ver[idx] = ENOENT_VERSION;
137                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
138         }
139 }
140
141 /**
142  * Get version from disk and save in reply buffer.
143  *
144  * Versions are saved in reply only during normal operations not replays.
145  */
146 void mdt_version_get_save(struct mdt_thread_info *info,
147                           struct mdt_object *mto, int idx)
148 {
149         /* don't save versions during replay */
150         if (!req_is_replay(mdt_info_req(info))) {
151                 mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
152                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
153         }
154 }
155
156 /**
157  * Get version from disk and check it, no save in reply.
158  */
159 int mdt_version_get_check(struct mdt_thread_info *info,
160                           struct mdt_object *mto, int idx)
161 {
162         /* only check versions during replay */
163         if (!req_is_replay(mdt_info_req(info)))
164                 return 0;
165
166         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
167         return mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
168 }
169
170 /**
171  * Get version from disk and check if recovery or just save.
172  */
173 int mdt_version_get_check_save(struct mdt_thread_info *info,
174                                struct mdt_object *mto, int idx)
175 {
176         int rc = 0;
177
178         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
179         if (req_is_replay(mdt_info_req(info)))
180                 rc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx],
181                                        idx);
182         else
183                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
184         return rc;
185 }
186
187 /**
188  * Lookup with version checking.
189  *
190  * This checks version of 'name'. Many reint functions uses 'name' for child not
191  * FID, therefore we need to get object by name and check its version.
192  */
193 int mdt_lookup_version_check(struct mdt_thread_info *info,
194                              struct mdt_object *p,
195                              const struct lu_name *lname,
196                              struct lu_fid *fid, int idx)
197 {
198         int rc, vbrc;
199
200         rc = mdo_lookup(info->mti_env, mdt_object_child(p), lname, fid,
201                         &info->mti_spec);
202         /* Check version only during replay */
203         if (!req_is_replay(mdt_info_req(info)))
204                 return rc;
205
206         info->mti_ver[idx] = ENOENT_VERSION;
207         if (rc == 0) {
208                 struct mdt_object *child;
209
210                 child = mdt_object_find(info->mti_env, info->mti_mdt, fid);
211                 if (likely(!IS_ERR(child))) {
212                         mdt_obj_version_get(info, child, &info->mti_ver[idx]);
213                         mdt_object_put(info->mti_env, child);
214                 }
215         }
216         vbrc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
217         return vbrc ? vbrc : rc;
218
219 }
220
221 static int mdt_stripes_unlock(struct mdt_thread_info *mti,
222                               struct mdt_object *obj,
223                               struct ldlm_enqueue_info *einfo,
224                               int decref)
225 {
226         union ldlm_policy_data *policy = &mti->mti_policy;
227         struct mdt_lock_handle *lh = &mti->mti_lh[MDT_LH_LOCAL];
228         struct lustre_handle_array *locks = einfo->ei_cbdata;
229         int i;
230
231         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
232         LASSERT(locks);
233
234         memset(policy, 0, sizeof(*policy));
235         policy->l_inodebits.bits = einfo->ei_inodebits;
236         mdt_lock_reg_init(lh, einfo->ei_mode);
237         for (i = 0; i < locks->ha_count; i++) {
238                 if (test_bit(i, (void *)locks->ha_map))
239                         lh->mlh_rreg_lh = locks->ha_handles[i];
240                 else
241                         lh->mlh_reg_lh = locks->ha_handles[i];
242                 mdt_object_unlock(mti, NULL, lh, decref);
243                 locks->ha_handles[i].cookie = 0ull;
244         }
245
246         return mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
247                                 policy);
248 }
249
250 /**
251  * Lock slave stripes if necessary, the lock handles of slave stripes
252  * will be stored in einfo->ei_cbdata.
253  **/
254 static int mdt_stripes_lock(struct mdt_thread_info *mti, struct mdt_object *obj,
255                             enum ldlm_mode mode, __u64 ibits,
256                             struct ldlm_enqueue_info *einfo)
257 {
258         union ldlm_policy_data *policy = &mti->mti_policy;
259
260         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
261         einfo->ei_type = LDLM_IBITS;
262         einfo->ei_mode = mode;
263         einfo->ei_cb_bl = mdt_remote_blocking_ast;
264         einfo->ei_cb_local_bl = mdt_blocking_ast;
265         einfo->ei_cb_cp = ldlm_completion_ast;
266         einfo->ei_enq_slave = 1;
267         einfo->ei_namespace = mti->mti_mdt->mdt_namespace;
268         einfo->ei_inodebits = ibits;
269         einfo->ei_req_slot = 1;
270         memset(policy, 0, sizeof(*policy));
271         policy->l_inodebits.bits = ibits;
272         policy->l_inodebits.li_initiator_id = mdt_node_id(mti->mti_mdt);
273
274         return mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
275                               policy);
276 }
277
278 /** lock object, and stripes if it's a striped directory
279  *
280  * object should be local, this is called in operations which modify both object
281  * and stripes.
282  *
283  * \param info          struct mdt_thread_info
284  * \param parent        parent object, if it's NULL, find parent by mdo_lookup()
285  * \param child         child object
286  * \param lh            lock handle
287  * \param einfo         struct ldlm_enqueue_info
288  * \param ibits         MDS inode lock bits
289  * \param mode          lock mode
290  *
291  * \retval              0 on success, -ev on error.
292  */
293 int mdt_object_stripes_lock(struct mdt_thread_info *info,
294                             struct mdt_object *parent,
295                             struct mdt_object *child,
296                             struct mdt_lock_handle *lh,
297                             struct ldlm_enqueue_info *einfo, __u64 ibits,
298                             enum ldlm_mode mode)
299 {
300         int rc;
301
302         ENTRY;
303         /* according to the protocol, child should be local, is request sent to
304          * wrong MDT?
305          */
306         if (mdt_object_remote(child)) {
307                 CERROR("%s: lock target "DFID", but it is on other MDT: rc = %d\n",
308                        mdt_obd_name(info->mti_mdt), PFID(mdt_object_fid(child)),
309                        -EREMOTE);
310                 RETURN(-EREMOTE);
311         }
312
313         memset(einfo, 0, sizeof(*einfo));
314         if (ibits & MDS_INODELOCK_LOOKUP) {
315                 LASSERT(parent);
316                 rc = mdt_object_check_lock(info, parent, child, lh, ibits,
317                                            mode);
318         } else {
319                 rc = mdt_object_lock(info, child, lh, ibits, mode);
320         }
321         if (rc)
322                 RETURN(rc);
323
324         if (!S_ISDIR(child->mot_header.loh_attr))
325                 RETURN(0);
326
327         /* lock stripes for striped directory */
328         rc = mdt_stripes_lock(info, child, lh->mlh_reg_mode, ibits, einfo);
329         if (rc == -EIO && CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME))
330                 rc = 0;
331         if (rc)
332                 mdt_object_unlock(info, child, lh, rc);
333
334         RETURN(rc);
335 }
336
337 void mdt_object_stripes_unlock(struct mdt_thread_info *info,
338                               struct mdt_object *obj,
339                               struct mdt_lock_handle *lh,
340                               struct ldlm_enqueue_info *einfo, int decref)
341 {
342         if (einfo->ei_cbdata)
343                 mdt_stripes_unlock(info, obj, einfo, decref);
344         mdt_object_unlock(info, obj, lh, decref);
345 }
346
347 static int mdt_restripe(struct mdt_thread_info *info,
348                         struct mdt_object *parent,
349                         const struct lu_name *lname,
350                         const struct lu_fid *tfid,
351                         struct md_op_spec *spec,
352                         struct md_attr *ma)
353 {
354         struct mdt_device *mdt = info->mti_mdt;
355         struct lu_fid *fid = &info->mti_tmp_fid2;
356         struct ldlm_enqueue_info *einfo = &info->mti_einfo;
357         struct lmv_user_md *lum = spec->u.sp_ea.eadata;
358         struct lu_ucred *uc = mdt_ucred(info);
359         struct lmv_mds_md_v1 *lmv;
360         struct mdt_object *child;
361         struct mdt_lock_handle *lhp;
362         struct mdt_lock_handle *lhc;
363         struct mdt_body *repbody;
364         int rc;
365
366         ENTRY;
367
368         /* we want rbac roles to have precedence over any other
369          * permission or capability checks
370          */
371         if (!mdt->mdt_enable_dir_restripe && !uc->uc_rbac_dne_ops)
372                 RETURN(-EPERM);
373
374         LASSERT(lum);
375         lum->lum_hash_type |= cpu_to_le32(LMV_HASH_FLAG_FIXED);
376
377         rc = mdt_version_get_check_save(info, parent, 0);
378         if (rc)
379                 RETURN(rc);
380
381         lhp = &info->mti_lh[MDT_LH_PARENT];
382         rc = mdt_parent_lock(info, parent, lhp, lname, LCK_PW);
383         if (rc)
384                 RETURN(rc);
385
386         rc = mdt_stripe_get(info, parent, ma, XATTR_NAME_LMV);
387         if (rc)
388                 GOTO(unlock_parent, rc);
389
390         if (ma->ma_valid & MA_LMV) {
391                 /* don't allow restripe if parent dir layout is changing */
392                 lmv = &ma->ma_lmv->lmv_md_v1;
393                 if (!lmv_is_sane2(lmv))
394                         GOTO(unlock_parent, rc = -EBADF);
395
396                 if (lmv_is_layout_changing(lmv))
397                         GOTO(unlock_parent, rc = -EBUSY);
398         }
399
400         fid_zero(fid);
401         rc = mdt_lookup_version_check(info, parent, lname, fid, 1);
402         if (rc)
403                 GOTO(unlock_parent, rc);
404
405         child = mdt_object_find(info->mti_env, mdt, fid);
406         if (IS_ERR(child))
407                 GOTO(unlock_parent, rc = PTR_ERR(child));
408
409         if (!mdt_object_exists(child))
410                 GOTO(out_child, rc = -ENOENT);
411
412         if (mdt_object_remote(child)) {
413                 struct mdt_body *repbody;
414
415                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
416                 if (!repbody)
417                         GOTO(out_child, rc = -EPROTO);
418
419                 repbody->mbo_fid1 = *fid;
420                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
421                 GOTO(out_child, rc = -EREMOTE);
422         }
423
424         if (!S_ISDIR(lu_object_attr(&child->mot_obj)))
425                 GOTO(out_child, rc = -ENOTDIR);
426
427         rc = mdt_stripe_get(info, child, ma, XATTR_NAME_LMV);
428         if (rc)
429                 GOTO(out_child, rc);
430
431         /* race with migrate? */
432         if ((ma->ma_valid & MA_LMV) &&
433              lmv_is_migrating(&ma->ma_lmv->lmv_md_v1))
434                 GOTO(out_child, rc = -EBUSY);
435
436         /* lock object */
437         lhc = &info->mti_lh[MDT_LH_CHILD];
438         rc = mdt_object_stripes_lock(info, parent, child, lhc, einfo,
439                                      MDS_INODELOCK_FULL, LCK_PW);
440         if (rc)
441                 GOTO(unlock_child, rc);
442
443         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
444         rc = mdt_version_get_check_save(info, child, 1);
445         if (rc)
446                 GOTO(unlock_child, rc);
447
448         spin_lock(&mdt->mdt_restriper.mdr_lock);
449         if (child->mot_restriping) {
450                 /* race? */
451                 spin_unlock(&mdt->mdt_restriper.mdr_lock);
452                 GOTO(unlock_child, rc = -EBUSY);
453         }
454         child->mot_restriping = 1;
455         spin_unlock(&mdt->mdt_restriper.mdr_lock);
456
457         *fid = *tfid;
458         rc = mdt_restripe_internal(info, parent, child, lname, fid, spec, ma);
459         if (rc)
460                 GOTO(restriping_clear, rc);
461
462         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
463         if (!repbody)
464                 GOTO(restriping_clear, rc = -EPROTO);
465
466         mdt_pack_attr2body(info, repbody, &ma->ma_attr, fid);
467         EXIT;
468
469 restriping_clear:
470         child->mot_restriping = 0;
471 unlock_child:
472         mdt_object_stripes_unlock(info, child, lhc, einfo, rc);
473 out_child:
474         mdt_object_put(info->mti_env, child);
475 unlock_parent:
476         mdt_object_unlock(info, parent, lhp, rc);
477
478         return rc;
479 }
480
481 /*
482  * VBR: we save three versions in reply:
483  * 0 - parent. Check that parent version is the same during replay.
484  * 1 - name. Version of 'name' if file exists with the same name or
485  * ENOENT_VERSION, it is needed because file may appear due to missed replays.
486  * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
487  * check.
488  */
489 static int mdt_create(struct mdt_thread_info *info)
490 {
491         struct mdt_device *mdt = info->mti_mdt;
492         struct mdt_object *parent;
493         struct mdt_object *child;
494         struct mdt_lock_handle *lh;
495         struct mdt_body *repbody;
496         struct md_attr *ma = &info->mti_attr;
497         struct mdt_reint_record *rr = &info->mti_rr;
498         struct md_op_spec *spec = &info->mti_spec;
499         struct lu_ucred *uc = mdt_ucred(info);
500         bool restripe = false;
501         int rc;
502
503         ENTRY;
504         DEBUG_REQ(D_INODE, mdt_info_req(info),
505                   "Create ("DNAME"->"DFID") in "DFID,
506                   PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1));
507
508         if (!fid_is_md_operative(rr->rr_fid1))
509                 RETURN(-EPERM);
510
511         /* MDS_OPEN_DEFAULT_LMV means eadata is parent default LMV, which is set
512          * if client maintains inherited default LMV
513          */
514         if (S_ISDIR(ma->ma_attr.la_mode) &&
515             spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0 &&
516             !(spec->sp_cr_flags & MDS_OPEN_DEFAULT_LMV)) {
517                 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
518                 struct obd_export *exp = mdt_info_req(info)->rq_export;
519
520                 /* Only new clients can create remote dir( >= 2.4) and
521                  * striped dir(>= 2.6), old client will return -ENOTSUPP
522                  */
523                 if (!mdt_is_dne_client(exp))
524                         RETURN(-ENOTSUPP);
525
526                 if (le32_to_cpu(lum->lum_stripe_count) > 1) {
527                         if (!mdt_is_striped_client(exp))
528                                 RETURN(-ENOTSUPP);
529
530                         if (!mdt->mdt_enable_striped_dir)
531                                 RETURN(-EPERM);
532                 } else if (!mdt->mdt_enable_remote_dir) {
533                         RETURN(-EPERM);
534                 }
535
536                 if ((!(exp_connect_flags2(exp) & OBD_CONNECT2_CRUSH)) &&
537                     (le32_to_cpu(lum->lum_hash_type) & LMV_HASH_TYPE_MASK) >=
538                     LMV_HASH_TYPE_CRUSH)
539                         RETURN(-EPROTO);
540
541                 /* we want rbac roles to have precedence over any other
542                  * permission or capability checks
543                  */
544                 if (!uc->uc_rbac_dne_ops ||
545                     (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
546                      uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
547                      mdt->mdt_enable_remote_dir_gid != -1))
548                         RETURN(-EPERM);
549
550                 /* restripe if later found dir exists, MDS_OPEN_CREAT means
551                  * this is create only, don't try restripe.
552                  */
553                 if (mdt->mdt_enable_dir_restripe &&
554                     le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT &&
555                     !(spec->sp_cr_flags & MDS_OPEN_CREAT))
556                         restripe = true;
557         }
558
559         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
560
561         parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
562         if (IS_ERR(parent))
563                 RETURN(PTR_ERR(parent));
564
565         if (!mdt_object_exists(parent))
566                 GOTO(put_parent, rc = -ENOENT);
567
568         rc = mdt_check_enc(info, parent);
569         if (rc)
570                 GOTO(put_parent, rc);
571
572         if (!uc->uc_rbac_fscrypt_admin &&
573             parent->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD)
574                 GOTO(put_parent, rc = -EPERM);
575
576         /*
577          * LU-10235: check if name exists locklessly first to avoid massive
578          * lock recalls on existing directories.
579          */
580         rc = mdt_lookup_version_check(info, parent, &rr->rr_name,
581                                       &info->mti_tmp_fid1, 1);
582         if (rc == 0) {
583                 if (!restripe)
584                         GOTO(put_parent, rc = -EEXIST);
585
586                 rc = mdt_restripe(info, parent, &rr->rr_name, rr->rr_fid2, spec,
587                                   ma);
588         }
589
590         /* -ENOENT is expected here */
591         if (rc != -ENOENT)
592                 GOTO(put_parent, rc);
593
594         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PAUSE_CREATE_AFTER_LOOKUP, cfs_fail_val);
595
596         /* save version of file name for replay, it must be ENOENT here */
597         mdt_enoent_version_save(info, 1);
598
599         CFS_RACE(OBD_FAIL_MDS_CREATE_RACE);
600
601         lh = &info->mti_lh[MDT_LH_PARENT];
602         rc = mdt_parent_lock(info, parent, lh, &rr->rr_name, LCK_PW);
603         if (rc)
604                 GOTO(put_parent, rc);
605
606         if (!mdt_object_remote(parent)) {
607                 rc = mdt_version_get_check_save(info, parent, 0);
608                 if (rc)
609                         GOTO(unlock_parent, rc);
610         }
611
612         /*
613          * now repeat the lookup having a LDLM lock on the parent dir,
614          * as another thread could create the same name. notice this
615          * lookup is supposed to hit cache in OSD and be cheap if the
616          * directory is not being modified concurrently.
617          */
618         rc = mdo_lookup(info->mti_env, mdt_object_child(parent), &rr->rr_name,
619                         &info->mti_tmp_fid1, &info->mti_spec);
620         if (unlikely(rc == 0))
621                 GOTO(unlock_parent, rc = -EEXIST);
622
623         child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
624         if (unlikely(IS_ERR(child)))
625                 GOTO(unlock_parent, rc = PTR_ERR(child));
626
627         ma->ma_need = MA_INODE;
628         ma->ma_valid = 0;
629
630         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
631                         OBD_FAIL_MDS_REINT_CREATE_WRITE);
632
633         /* Version of child will be updated on disk. */
634         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
635         rc = mdt_version_get_check_save(info, child, 2);
636         if (rc)
637                 GOTO(put_child, rc);
638
639         if (parent->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD ||
640             (rr->rr_name.ln_namelen == strlen(dot_fscrypt_name) &&
641              strncmp(rr->rr_name.ln_name, dot_fscrypt_name,
642                      rr->rr_name.ln_namelen) == 0))
643                 child->mot_obj.lo_header->loh_attr |= LOHA_FSCRYPT_MD;
644
645         /*
646          * Do not perform lookup sanity check. We know that name does
647          * not exist.
648          */
649         info->mti_spec.sp_cr_lookup = 0;
650         if (mdt_object_remote(parent))
651                 info->mti_spec.sp_cr_lookup = 1;
652         info->mti_spec.sp_feat = &dt_directory_features;
653
654         /* set jobid xattr name from sysfs parameter */
655         strncpy(info->mti_spec.sp_cr_job_xattr, mdt->mdt_job_xattr,
656                 XATTR_JOB_MAX_LEN);
657
658         rc = mdo_create(info->mti_env, mdt_object_child(parent), &rr->rr_name,
659                         mdt_object_child(child), &info->mti_spec, ma);
660         if (rc == 0)
661                 rc = mdt_attr_get_complex(info, child, ma);
662
663         if (rc < 0)
664                 GOTO(put_child, rc);
665
666         /* save child locks to eliminate dependey between 'mkdir a' and
667          * 'mkdir a/b' if b is a remote directory
668          */
669         if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
670                 struct mdt_lock_handle *lhc;
671                 struct ldlm_enqueue_info *einfo = &info->mti_einfo;
672
673                 lhc = &info->mti_lh[MDT_LH_CHILD];
674                 rc = mdt_object_stripes_lock(info, parent, child, lhc, einfo,
675                                              MDS_INODELOCK_UPDATE, LCK_PW);
676                 if (rc)
677                         GOTO(put_child, rc);
678
679                 mdt_object_stripes_unlock(info, child, lhc, einfo, rc);
680         }
681
682         /* Return fid & attr to client. */
683         if (ma->ma_valid & MA_INODE)
684                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
685                                    mdt_object_fid(child));
686         EXIT;
687 put_child:
688         mdt_object_put(info->mti_env, child);
689 unlock_parent:
690         mdt_object_unlock(info, parent, lh, rc);
691 put_parent:
692         mdt_object_put(info->mti_env, parent);
693         return rc;
694 }
695
696 static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
697                         struct md_attr *ma)
698 {
699         struct mdt_lock_handle  *lh;
700         int do_vbr = ma->ma_attr.la_valid &
701                         (LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS);
702         __u64 lockpart = MDS_INODELOCK_UPDATE;
703         struct ldlm_enqueue_info *einfo = &info->mti_einfo;
704         int rc;
705
706         ENTRY;
707         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
708                 lockpart |= MDS_INODELOCK_PERM;
709         /* Clear xattr cache on clients, so the virtual project ID xattr
710          * can get the new project ID
711          */
712         if (ma->ma_attr.la_valid & LA_PROJID)
713                 lockpart |= MDS_INODELOCK_XATTR;
714
715         lh = &info->mti_lh[MDT_LH_PARENT];
716         rc = mdt_object_stripes_lock(info, NULL, mo, lh, einfo, lockpart,
717                                      LCK_PW);
718         if (rc != 0)
719                 RETURN(rc);
720
721         /* all attrs are packed into mti_attr in unpack_setattr */
722         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
723                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
724
725         /* VBR: update version if attr changed are important for recovery */
726         if (do_vbr) {
727                 /* update on-disk version of changed object */
728                 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mo));
729                 rc = mdt_version_get_check_save(info, mo, 0);
730                 if (rc)
731                         GOTO(out_unlock, rc);
732         }
733
734         /* Ensure constant striping during chown(). See LU-2789. */
735         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
736                 mutex_lock(&mo->mot_lov_mutex);
737
738         /* all attrs are packed into mti_attr in unpack_setattr */
739         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
740
741         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
742                 mutex_unlock(&mo->mot_lov_mutex);
743
744         if (rc != 0)
745                 GOTO(out_unlock, rc);
746         mdt_dom_obj_lvb_update(info->mti_env, mo, NULL, false);
747         EXIT;
748 out_unlock:
749         mdt_object_stripes_unlock(info, mo, lh, einfo, rc);
750         return rc;
751 }
752
753 /**
754  * Check HSM flags and add HS_DIRTY flag if relevant.
755  *
756  * A file could be set dirty only if it has a copy in the backend (HS_EXISTS)
757  * and is not RELEASED.
758  */
759 int mdt_add_dirty_flag(struct mdt_thread_info *info, struct mdt_object *mo,
760                         struct md_attr *ma)
761 {
762         struct lu_ucred *uc = mdt_ucred(info);
763         kernel_cap_t cap_saved;
764         int rc;
765
766         ENTRY;
767         /* If the file was modified, add the dirty flag */
768         ma->ma_need = MA_HSM;
769         rc = mdt_attr_get_complex(info, mo, ma);
770         if (rc) {
771                 CERROR("file attribute read error for "DFID": %d.\n",
772                         PFID(mdt_object_fid(mo)), rc);
773                 RETURN(rc);
774         }
775
776         /* If an up2date copy exists in the backend, add dirty flag */
777         if ((ma->ma_valid & MA_HSM) && (ma->ma_hsm.mh_flags & HS_EXISTS)
778             && !(ma->ma_hsm.mh_flags & (HS_DIRTY|HS_RELEASED))) {
779                 ma->ma_hsm.mh_flags |= HS_DIRTY;
780
781                 /* Bump cap so that closes from non-owner writers can
782                  * set the HSM state to dirty.
783                  */
784                 cap_saved = uc->uc_cap;
785                 cap_raise(uc->uc_cap, CAP_FOWNER);
786                 rc = mdt_hsm_attr_set(info, mo, &ma->ma_hsm);
787                 uc->uc_cap = cap_saved;
788                 if (rc)
789                         CERROR("file attribute change error for "DFID": %d\n",
790                                 PFID(mdt_object_fid(mo)), rc);
791         }
792
793         RETURN(rc);
794 }
795
796 static int mdt_reint_setattr(struct mdt_thread_info *info,
797                              struct mdt_lock_handle *lhc)
798 {
799         struct mdt_device *mdt = info->mti_mdt;
800         struct md_attr *ma = &info->mti_attr;
801         struct mdt_reint_record *rr = &info->mti_rr;
802         struct ptlrpc_request *req = mdt_info_req(info);
803         struct mdt_object *mo;
804         struct mdt_body *repbody;
805         ktime_t kstart = ktime_get();
806         int rc;
807
808         ENTRY;
809         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
810                   (unsigned int)ma->ma_attr.la_valid);
811
812         if (info->mti_dlm_req)
813                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
814
815         CFS_RACE(OBD_FAIL_PTLRPC_RESEND_RACE);
816
817         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
818         mo = mdt_object_find(info->mti_env, mdt, rr->rr_fid1);
819         if (IS_ERR(mo))
820                 GOTO(out, rc = PTR_ERR(mo));
821
822         if (!mdt_object_exists(mo))
823                 GOTO(out_put, rc = -ENOENT);
824
825         if (mdt_object_remote(mo))
826                 GOTO(out_put, rc = -EREMOTE);
827
828         ma->ma_enable_chprojid_gid = mdt->mdt_enable_chprojid_gid;
829         /* revoke lease lock if size is going to be changed */
830         if (unlikely(ma->ma_attr.la_valid & LA_SIZE &&
831                      !(ma->ma_attr_flags & MDS_TRUNC_KEEP_LEASE) &&
832                      atomic_read(&mo->mot_lease_count) > 0)) {
833                 down_read(&mo->mot_open_sem);
834
835                 if (atomic_read(&mo->mot_lease_count) > 0) { /* lease exists */
836                         lhc = &info->mti_lh[MDT_LH_LOCAL];
837                         rc = mdt_object_lock(info, mo, lhc, MDS_INODELOCK_OPEN,
838                                              LCK_CW);
839                         if (rc != 0) {
840                                 up_read(&mo->mot_open_sem);
841                                 GOTO(out_put, rc);
842                         }
843
844                         /* revoke lease lock */
845                         mdt_object_unlock(info, mo, lhc, 1);
846                 }
847                 up_read(&mo->mot_open_sem);
848         }
849
850         if (ma->ma_attr.la_valid & LA_SIZE || rr->rr_flags & MRF_OPEN_TRUNC) {
851                 /* Check write access for the O_TRUNC case */
852                 if (mdt_write_read(mo) < 0)
853                         GOTO(out_put, rc = -ETXTBSY);
854
855                 /* LU-10286: compatibility check for FLR.
856                  * Please check the comment in mdt_finish_open() for details
857                  */
858                 if (!exp_connect_flr(info->mti_exp) ||
859                     !exp_connect_overstriping(info->mti_exp)) {
860                         rc = mdt_big_xattr_get(info, mo, XATTR_NAME_LOV);
861                         if (rc < 0 && rc != -ENODATA)
862                                 GOTO(out_put, rc);
863
864                         if (!exp_connect_flr(info->mti_exp)) {
865                                 if (rc > 0 &&
866                                     mdt_lmm_is_flr(info->mti_big_lmm))
867                                         GOTO(out_put, rc = -EOPNOTSUPP);
868                         }
869
870                         if (!exp_connect_overstriping(info->mti_exp)) {
871                                 if (rc > 0 &&
872                                     mdt_lmm_is_overstriping(info->mti_big_lmm))
873                                         GOTO(out_put, rc = -EOPNOTSUPP);
874                         }
875                 }
876
877                 /* For truncate, the file size sent from client
878                  * is believable, but the blocks are incorrect,
879                  * which makes the block size in LSOM attribute
880                  * inconsisent with the real block size.
881                  */
882                 rc = mdt_lsom_update(info, mo, true);
883                 if (rc)
884                         GOTO(out_put, rc);
885         }
886
887         if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
888                 if (ma->ma_valid & MA_LOV)
889                         GOTO(out_put, rc = -EPROTO);
890
891                 /* MDT supports FMD for regular files due to Data-on-MDT */
892                 if (S_ISREG(lu_object_attr(&mo->mot_obj)) &&
893                     ma->ma_attr.la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) {
894                         tgt_fmd_update(info->mti_exp, mdt_object_fid(mo),
895                                        req->rq_xid);
896
897                         if (ma->ma_attr.la_valid & LA_MTIME) {
898                                 rc = mdt_attr_get_pfid(info, mo, &ma->ma_pfid);
899                                 if (!rc)
900                                         ma->ma_valid |= MA_PFID;
901                         }
902                 }
903
904                 rc = mdt_attr_set(info, mo, ma);
905                 if (rc)
906                         GOTO(out_put, rc);
907         } else if ((ma->ma_valid & (MA_LOV | MA_LMV)) &&
908                    (ma->ma_valid & MA_INODE)) {
909                 struct lu_buf *buf = &info->mti_buf;
910                 struct lu_ucred *uc = mdt_ucred(info);
911                 struct mdt_lock_handle *lh;
912                 const char *name;
913
914                 /* reject if either remote or striped dir is disabled */
915                 if (ma->ma_valid & MA_LMV) {
916                         if (!mdt->mdt_enable_remote_dir ||
917                             !mdt->mdt_enable_striped_dir)
918                                 GOTO(out_put, rc = -EPERM);
919
920                         /* we want rbac roles to have precedence over any other
921                          * permission or capability checks
922                          */
923                         if (!uc->uc_rbac_dne_ops ||
924                             (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
925                              uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
926                              mdt->mdt_enable_remote_dir_gid != -1))
927                                 GOTO(out_put, rc = -EPERM);
928                 }
929
930                 if (!S_ISDIR(lu_object_attr(&mo->mot_obj)))
931                         GOTO(out_put, rc = -ENOTDIR);
932
933                 if (ma->ma_attr.la_valid != 0)
934                         GOTO(out_put, rc = -EPROTO);
935
936                 lh = &info->mti_lh[MDT_LH_PARENT];
937                 if (ma->ma_valid & MA_LOV) {
938                         buf->lb_buf = ma->ma_lmm;
939                         buf->lb_len = ma->ma_lmm_size;
940                         name = XATTR_NAME_LOV;
941                         rc = mdt_object_lock(info, mo, lh, MDS_INODELOCK_XATTR,
942                                              LCK_PW);
943                 } else {
944                         buf->lb_buf = &ma->ma_lmv->lmv_user_md;
945                         buf->lb_len = ma->ma_lmv_size;
946                         name = XATTR_NAME_DEFAULT_LMV;
947
948                         if (unlikely(fid_is_root(mdt_object_fid(mo)))) {
949                                 rc = mdt_object_lock(info, mo, lh,
950                                                      MDS_INODELOCK_XATTR |
951                                                      MDS_INODELOCK_LOOKUP,
952                                                      LCK_PW);
953                         } else {
954                                 struct lu_fid *pfid = &info->mti_tmp_fid1;
955                                 struct lu_name *pname = &info->mti_name;
956                                 const char dotdot[] = "..";
957                                 struct mdt_object *pobj;
958
959                                 fid_zero(pfid);
960                                 pname->ln_name = dotdot;
961                                 pname->ln_namelen = sizeof(dotdot);
962                                 rc = mdo_lookup(info->mti_env,
963                                                 mdt_object_child(mo), pname,
964                                                 pfid, NULL);
965                                 if (rc)
966                                         GOTO(out_put, rc);
967
968                                 pobj = mdt_object_find(info->mti_env,
969                                                        info->mti_mdt, pfid);
970                                 if (IS_ERR(pobj))
971                                         GOTO(out_put, rc = PTR_ERR(pobj));
972
973                                 rc = mdt_object_check_lock(info, pobj, mo, lh,
974                                                            MDS_INODELOCK_XATTR |
975                                                            MDS_INODELOCK_LOOKUP,
976                                                            LCK_PW);
977                                 mdt_object_put(info->mti_env, pobj);
978                         }
979                 }
980
981                 if (rc != 0)
982                         GOTO(out_put, rc);
983
984                 rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), buf,
985                                   name, 0);
986
987                 mdt_object_unlock(info, mo, lh, rc);
988                 if (rc)
989                         GOTO(out_put, rc);
990         } else {
991                 GOTO(out_put, rc = -EPROTO);
992         }
993
994         /* If file data is modified, add the dirty flag */
995         if (ma->ma_attr_flags & MDS_DATA_MODIFIED)
996                 rc = mdt_add_dirty_flag(info, mo, ma);
997
998         ma->ma_need = MA_INODE;
999         ma->ma_valid = 0;
1000         rc = mdt_attr_get_complex(info, mo, ma);
1001         if (rc != 0)
1002                 GOTO(out_put, rc);
1003
1004         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
1005
1006         EXIT;
1007 out_put:
1008         mdt_object_put(info->mti_env, mo);
1009 out:
1010         if (rc == 0)
1011                 mdt_counter_incr(req, LPROC_MDT_SETATTR,
1012                                  ktime_us_delta(ktime_get(), kstart));
1013
1014         mdt_client_compatibility(info);
1015         return rc;
1016 }
1017
1018 static int mdt_reint_create(struct mdt_thread_info *info,
1019                             struct mdt_lock_handle *lhc)
1020 {
1021         struct ptlrpc_request   *req = mdt_info_req(info);
1022         ktime_t                 kstart = ktime_get();
1023         int                     rc;
1024
1025         ENTRY;
1026         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
1027                 RETURN(err_serious(-ESTALE));
1028
1029         if (info->mti_dlm_req)
1030                 ldlm_request_cancel(mdt_info_req(info),
1031                                     info->mti_dlm_req, 0, LATF_SKIP);
1032
1033         if (!lu_name_is_valid(&info->mti_rr.rr_name))
1034                 RETURN(-EPROTO);
1035
1036         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
1037         case S_IFDIR:
1038         case S_IFREG:
1039         case S_IFLNK:
1040         case S_IFCHR:
1041         case S_IFBLK:
1042         case S_IFIFO:
1043         case S_IFSOCK:
1044                 break;
1045         default:
1046                 CERROR("%s: Unsupported mode %o\n",
1047                        mdt_obd_name(info->mti_mdt),
1048                        info->mti_attr.ma_attr.la_mode);
1049                 RETURN(err_serious(-EOPNOTSUPP));
1050         }
1051
1052         rc = mdt_create(info);
1053         if (rc == 0) {
1054                 if ((info->mti_attr.ma_attr.la_mode & S_IFMT) == S_IFDIR)
1055                         mdt_counter_incr(req, LPROC_MDT_MKDIR,
1056                                          ktime_us_delta(ktime_get(), kstart));
1057                 else
1058                         /* Special file should stay on the same node as parent*/
1059                         mdt_counter_incr(req, LPROC_MDT_MKNOD,
1060                                          ktime_us_delta(ktime_get(), kstart));
1061         }
1062
1063         RETURN(rc);
1064 }
1065
1066 /*
1067  * VBR: save parent version in reply and child version getting by its name.
1068  * Version of child is getting and checking during its lookup. If
1069  */
1070 static int mdt_reint_unlink(struct mdt_thread_info *info,
1071                             struct mdt_lock_handle *lhc)
1072 {
1073         struct mdt_reint_record *rr = &info->mti_rr;
1074         struct ptlrpc_request *req = mdt_info_req(info);
1075         struct md_attr *ma = &info->mti_attr;
1076         struct lu_fid *child_fid = &info->mti_tmp_fid1;
1077         struct mdt_object *mp;
1078         struct mdt_object *mc;
1079         struct mdt_lock_handle *parent_lh;
1080         struct mdt_lock_handle *child_lh;
1081         struct ldlm_enqueue_info *einfo = &info->mti_einfo;
1082         struct lu_ucred *uc  = mdt_ucred(info);
1083         int no_name = 0;
1084         ktime_t kstart = ktime_get();
1085         int rc;
1086
1087         ENTRY;
1088         DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1),
1089                   PNAME(&rr->rr_name));
1090
1091         if (info->mti_dlm_req)
1092                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1093
1094         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
1095                 RETURN(err_serious(-ENOENT));
1096
1097         if (!fid_is_md_operative(rr->rr_fid1))
1098                 RETURN(-EPERM);
1099
1100         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1101         if (IS_ERR(mp))
1102                 RETURN(PTR_ERR(mp));
1103
1104         if (!mdt_object_remote(mp)) {
1105                 rc = mdt_version_get_check_save(info, mp, 0);
1106                 if (rc)
1107                         GOTO(put_parent, rc);
1108         }
1109
1110         if (!uc->uc_rbac_fscrypt_admin &&
1111             mp->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD)
1112                 GOTO(put_parent, rc = -EPERM);
1113
1114         CFS_RACE(OBD_FAIL_MDS_REINT_OPEN);
1115         CFS_RACE(OBD_FAIL_MDS_REINT_OPEN2);
1116         parent_lh = &info->mti_lh[MDT_LH_PARENT];
1117         rc = mdt_parent_lock(info, mp, parent_lh, &rr->rr_name, LCK_PW);
1118         if (rc != 0)
1119                 GOTO(put_parent, rc);
1120
1121         if (info->mti_spec.sp_rm_entry) {
1122                 if (!mdt_is_dne_client(req->rq_export))
1123                         /* Return -ENOTSUPP for old client */
1124                         GOTO(unlock_parent, rc = -ENOTSUPP);
1125
1126                 if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN))
1127                         GOTO(unlock_parent, rc = -EPERM);
1128
1129                 ma->ma_need = MA_INODE;
1130                 ma->ma_valid = 0;
1131                 rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1132                                 NULL, &rr->rr_name, ma, no_name);
1133                 GOTO(unlock_parent, rc);
1134         }
1135
1136         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID) {
1137                 *child_fid = *rr->rr_fid2;
1138         } else {
1139                 /* lookup child object along with version checking */
1140                 fid_zero(child_fid);
1141                 rc = mdt_lookup_version_check(info, mp, &rr->rr_name, child_fid,
1142                                               1);
1143                 if (rc != 0) {
1144                         /* Name might not be able to find during resend of
1145                          * remote unlink, considering following case.
1146                          * dir_A is a remote directory, the name entry of
1147                          * dir_A is on MDT0, the directory is on MDT1,
1148                          *
1149                          * 1. client sends unlink req to MDT1.
1150                          * 2. MDT1 sends name delete update to MDT0.
1151                          * 3. name entry is being deleted in MDT0 synchronously.
1152                          * 4. MDT1 is restarted.
1153                          * 5. client resends unlink req to MDT1. So it can not
1154                          *    find the name entry on MDT0 anymore.
1155                          * In this case, MDT1 only needs to destory the local
1156                          * directory.
1157                          */
1158                         if (mdt_object_remote(mp) && rc == -ENOENT &&
1159                             !fid_is_zero(rr->rr_fid2) &&
1160                             lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
1161                                 no_name = 1;
1162                                 *child_fid = *rr->rr_fid2;
1163                         } else {
1164                                 GOTO(unlock_parent, rc);
1165                         }
1166                 }
1167         }
1168
1169         if (!fid_is_md_operative(child_fid))
1170                 GOTO(unlock_parent, rc = -EPERM);
1171
1172         /* We will lock the child regardless it is local or remote. No harm. */
1173         mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
1174         if (IS_ERR(mc))
1175                 GOTO(unlock_parent, rc = PTR_ERR(mc));
1176
1177         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID) {
1178                 /* In this case, child fid is embedded in the request, and we do
1179                  * not have a proper name as rr_name contains an encoded
1180                  * hash. So find name that matches provided hash.
1181                  */
1182                 if (!find_name_matching_hash(info, &rr->rr_name,
1183                                              NULL, mc))
1184                         GOTO(put_child, rc = -ENOENT);
1185         }
1186
1187         child_lh = &info->mti_lh[MDT_LH_CHILD];
1188         if (mdt_object_remote(mc)) {
1189                 struct mdt_body  *repbody;
1190
1191                 if (!fid_is_zero(rr->rr_fid2)) {
1192                         CDEBUG(D_INFO, "%s: name "DNAME" cannot find "DFID"\n",
1193                                mdt_obd_name(info->mti_mdt),
1194                                PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1195                         GOTO(put_child, rc = -ENOENT);
1196                 }
1197                 CDEBUG(D_INFO, "%s: name "DNAME": "DFID" is on another MDT\n",
1198                        mdt_obd_name(info->mti_mdt),
1199                        PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1200
1201                 if (!mdt_is_dne_client(req->rq_export))
1202                         /* Return -ENOTSUPP for old client */
1203                         GOTO(put_child, rc = -ENOTSUPP);
1204
1205                 /* Revoke the LOOKUP lock of the remote object granted by
1206                  * this MDT. Since the unlink will happen on another MDT,
1207                  * it will release the LOOKUP lock right away. Then What
1208                  * would happen if another client try to grab the LOOKUP
1209                  * lock at the same time with unlink XXX
1210                  */
1211                 rc = mdt_object_lookup_lock(info, NULL, mc, child_lh, LCK_EX);
1212                 if (rc)
1213                         GOTO(put_child, rc);
1214
1215                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1216                 LASSERT(repbody != NULL);
1217                 repbody->mbo_fid1 = *mdt_object_fid(mc);
1218                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
1219                 GOTO(unlock_child, rc = -EREMOTE);
1220         }
1221         /* We used to acquire MDS_INODELOCK_FULL here but we can't do
1222          * this now because a running HSM restore on the child (unlink
1223          * victim) will hold the layout lock. See LU-4002.
1224          */
1225         rc = mdt_object_stripes_lock(info, mp, mc, child_lh, einfo,
1226                                      MDS_INODELOCK_LOOKUP |
1227                                      MDS_INODELOCK_UPDATE, LCK_EX);
1228         if (rc != 0)
1229                 GOTO(put_child, rc);
1230
1231         /*
1232          * Now we can only make sure we need MA_INODE, in mdd layer, will check
1233          * whether need MA_LOV and MA_COOKIE.
1234          */
1235         ma->ma_need = MA_INODE;
1236         ma->ma_valid = 0;
1237
1238         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1239                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
1240         /* save version when object is locked */
1241         mdt_version_get_save(info, mc, 1);
1242
1243         mutex_lock(&mc->mot_lov_mutex);
1244
1245         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1246                         mdt_object_child(mc), &rr->rr_name, ma, no_name);
1247
1248         mutex_unlock(&mc->mot_lov_mutex);
1249         if (rc != 0)
1250                 GOTO(unlock_child, rc);
1251
1252         if (!lu_object_is_dying(&mc->mot_header)) {
1253                 rc = mdt_attr_get_complex(info, mc, ma);
1254                 if (rc)
1255                         GOTO(out_stat, rc);
1256         } else if (mdt_dom_check_for_discard(info, mc)) {
1257                 mdt_dom_discard_data(info, mc);
1258         }
1259         mdt_handle_last_unlink(info, mc, ma);
1260
1261 out_stat:
1262         if (ma->ma_valid & MA_INODE) {
1263                 switch (ma->ma_attr.la_mode & S_IFMT) {
1264                 case S_IFDIR:
1265                         mdt_counter_incr(req, LPROC_MDT_RMDIR,
1266                                          ktime_us_delta(ktime_get(), kstart));
1267                         break;
1268                 case S_IFREG:
1269                 case S_IFLNK:
1270                 case S_IFCHR:
1271                 case S_IFBLK:
1272                 case S_IFIFO:
1273                 case S_IFSOCK:
1274                         mdt_counter_incr(req, LPROC_MDT_UNLINK,
1275                                          ktime_us_delta(ktime_get(), kstart));
1276                         break;
1277                 default:
1278                         LASSERTF(0, "bad file type %o unlinking\n",
1279                                 ma->ma_attr.la_mode);
1280                 }
1281         }
1282
1283         EXIT;
1284
1285 unlock_child:
1286         /* after unlink the object is gone, no need to keep lock */
1287         mdt_object_stripes_unlock(info, mc, child_lh, einfo, 1);
1288 put_child:
1289         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID &&
1290             info->mti_big_buf.lb_buf)
1291                 lu_buf_free(&info->mti_big_buf);
1292         mdt_object_put(info->mti_env, mc);
1293 unlock_parent:
1294         mdt_object_unlock(info, mp, parent_lh, rc);
1295 put_parent:
1296         mdt_object_put(info->mti_env, mp);
1297         CFS_RACE_WAKEUP(OBD_FAIL_OBD_ZERO_NLINK_RACE);
1298         return rc;
1299 }
1300
1301 /*
1302  * VBR: save versions in reply: 0 - parent; 1 - child by fid; 2 - target by
1303  * name.
1304  */
1305 static int mdt_reint_link(struct mdt_thread_info *info,
1306                           struct mdt_lock_handle *lhc)
1307 {
1308         struct mdt_reint_record *rr = &info->mti_rr;
1309         struct ptlrpc_request   *req = mdt_info_req(info);
1310         struct md_attr          *ma = &info->mti_attr;
1311         struct mdt_object       *ms;
1312         struct mdt_object       *mp;
1313         struct mdt_lock_handle  *lhs;
1314         struct mdt_lock_handle  *lhp;
1315         ktime_t kstart = ktime_get();
1316         int rc;
1317
1318         ENTRY;
1319         DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/"DNAME,
1320                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), PNAME(&rr->rr_name));
1321
1322         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
1323                 RETURN(err_serious(-ENOENT));
1324
1325         if (CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_RESEND_RACE) ||
1326             CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND)) {
1327                 req->rq_no_reply = 1;
1328                 RETURN(err_serious(-ENOENT));
1329         }
1330
1331         if (info->mti_dlm_req)
1332                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1333
1334         /* Invalid case so return error immediately instead of
1335          * processing it
1336          */
1337         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
1338                 RETURN(-EPERM);
1339
1340         if (!fid_is_md_operative(rr->rr_fid1) ||
1341             !fid_is_md_operative(rr->rr_fid2))
1342                 RETURN(-EPERM);
1343
1344         /* step 1: find target parent dir */
1345         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
1346         if (IS_ERR(mp))
1347                 RETURN(PTR_ERR(mp));
1348
1349         rc = mdt_version_get_check_save(info, mp, 0);
1350         if (rc)
1351                 GOTO(put_parent, rc);
1352
1353         rc = mdt_check_enc(info, mp);
1354         if (rc)
1355                 GOTO(put_parent, rc);
1356
1357         /* step 2: find source */
1358         ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1359         if (IS_ERR(ms))
1360                 GOTO(put_parent, rc = PTR_ERR(ms));
1361
1362         if (!mdt_object_exists(ms)) {
1363                 CDEBUG(D_INFO, "%s: "DFID" does not exist.\n",
1364                        mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1));
1365                 GOTO(put_source, rc = -ENOENT);
1366         }
1367
1368         CFS_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
1369
1370         lhp = &info->mti_lh[MDT_LH_PARENT];
1371         rc = mdt_parent_lock(info, mp, lhp, &rr->rr_name, LCK_PW);
1372         if (rc != 0)
1373                 GOTO(put_source, rc);
1374
1375         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
1376
1377         lhs = &info->mti_lh[MDT_LH_CHILD];
1378         rc = mdt_object_lock(info, ms, lhs,
1379                              MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
1380                              LCK_EX);
1381         if (rc != 0)
1382                 GOTO(unlock_parent, rc);
1383
1384         /* step 3: link it */
1385         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1386                         OBD_FAIL_MDS_REINT_LINK_WRITE);
1387
1388         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(ms));
1389         rc = mdt_version_get_check_save(info, ms, 1);
1390         if (rc)
1391                 GOTO(unlock_source, rc);
1392
1393         /** check target version by name during replay */
1394         rc = mdt_lookup_version_check(info, mp, &rr->rr_name,
1395                                       &info->mti_tmp_fid1, 2);
1396         if (rc != 0 && rc != -ENOENT)
1397                 GOTO(unlock_source, rc);
1398         /* save version of file name for replay, it must be ENOENT here */
1399         if (!req_is_replay(mdt_info_req(info))) {
1400                 if (rc != -ENOENT) {
1401                         CDEBUG(D_INFO, "link target "DNAME" existed!\n",
1402                                PNAME(&rr->rr_name));
1403                         GOTO(unlock_source, rc = -EEXIST);
1404                 }
1405                 info->mti_ver[2] = ENOENT_VERSION;
1406                 mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2);
1407         }
1408
1409         rc = mdo_link(info->mti_env, mdt_object_child(mp),
1410                       mdt_object_child(ms), &rr->rr_name, ma);
1411
1412         if (rc == 0)
1413                 mdt_counter_incr(req, LPROC_MDT_LINK,
1414                                  ktime_us_delta(ktime_get(), kstart));
1415
1416         EXIT;
1417 unlock_source:
1418         mdt_object_unlock(info, ms, lhs, rc);
1419 unlock_parent:
1420         mdt_object_unlock(info, mp, lhp, rc);
1421 put_source:
1422         mdt_object_put(info->mti_env, ms);
1423 put_parent:
1424         mdt_object_put(info->mti_env, mp);
1425         return rc;
1426 }
1427
1428 /**
1429  * Get BFL lock for rename or migrate process.
1430  **/
1431 static int mdt_rename_lock(struct mdt_thread_info *info,
1432                            struct mdt_lock_handle *lh)
1433 {
1434         struct lu_fid *fid = &info->mti_tmp_fid1;
1435         struct mdt_object *obj;
1436         __u64 ibits = MDS_INODELOCK_UPDATE;
1437         int rc;
1438
1439         ENTRY;
1440         lu_root_fid(fid);
1441         obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1442         if (IS_ERR(obj))
1443                 RETURN(PTR_ERR(obj));
1444
1445         mdt_lock_reg_init(lh, LCK_EX);
1446         rc = mdt_object_lock_internal(info, obj, &LUSTRE_BFL_FID, lh,
1447                                       &ibits, 0, false);
1448         mdt_object_put(info->mti_env, obj);
1449         RETURN(rc);
1450 }
1451
1452 static void mdt_rename_unlock(struct mdt_thread_info *info,
1453                               struct mdt_lock_handle *lh)
1454 {
1455         ENTRY;
1456         /* Cancel the single rename lock right away */
1457         mdt_object_unlock(info, NULL, lh, 1);
1458         EXIT;
1459 }
1460
1461 static struct mdt_object *mdt_parent_find_check(struct mdt_thread_info *info,
1462                                                 const struct lu_fid *fid,
1463                                                 int idx)
1464 {
1465         struct mdt_object *dir;
1466         int rc;
1467
1468         ENTRY;
1469         dir = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1470         if (IS_ERR(dir))
1471                 RETURN(dir);
1472
1473         /* check early, the real version will be saved after locking */
1474         rc = mdt_version_get_check(info, dir, idx);
1475         if (rc)
1476                 GOTO(out_put, rc);
1477
1478         if (!mdt_object_exists(dir))
1479                 GOTO(out_put, rc = -ENOENT);
1480
1481         if (!S_ISDIR(lu_object_attr(&dir->mot_obj)))
1482                 GOTO(out_put, rc = -ENOTDIR);
1483
1484         RETURN(dir);
1485 out_put:
1486         mdt_object_put(info->mti_env, dir);
1487         return ERR_PTR(rc);
1488 }
1489
1490 /*
1491  * lock rename source object.
1492  *
1493  * Both source and its parent object may be located on remote MDTs, and even on
1494  * different MDTs, which means source object is a remote object on parent.
1495  *
1496  * \retval      0 on success
1497  * \retval      -ev negative errno upon error
1498  */
1499 static int mdt_rename_source_lock(struct mdt_thread_info *info,
1500                                   struct mdt_object *parent,
1501                                   struct mdt_object *child,
1502                                   struct mdt_lock_handle *lh,
1503                                   struct mdt_lock_handle *lh_lookup,
1504                                   __u64 ibits)
1505 {
1506         int rc;
1507
1508         LASSERT(ibits & MDS_INODELOCK_LOOKUP);
1509         /* if @obj is remote object, LOOKUP lock needs to be taken from
1510          * parent MDT.
1511          */
1512         rc = mdt_is_remote_object(info, parent, child);
1513         if (rc < 0)
1514                 return rc;
1515
1516         if (rc == 1) {
1517                 rc = mdt_object_lookup_lock(info, parent, child, lh_lookup,
1518                                             LCK_EX);
1519                 if (rc)
1520                         return rc;
1521
1522                 ibits &= ~MDS_INODELOCK_LOOKUP;
1523         }
1524
1525         rc = mdt_object_lock(info, child, lh, ibits, LCK_EX);
1526         if (unlikely(rc && !(ibits & MDS_INODELOCK_LOOKUP)))
1527                 mdt_object_unlock(info, NULL, lh_lookup, rc);
1528
1529         return 0;
1530 }
1531
1532 static void mdt_rename_source_unlock(struct mdt_thread_info *info,
1533                                      struct mdt_object *obj,
1534                                      struct mdt_lock_handle *lh,
1535                                      struct mdt_lock_handle *lh_lookup,
1536                                      int decref)
1537 {
1538         mdt_object_unlock(info, obj, lh, decref);
1539         mdt_object_unlock(info, NULL, lh_lookup, decref);
1540 }
1541
1542 /* migration takes UPDATE lock of link parent, and LOOKUP lock of link */
1543 struct mdt_link_lock {
1544         struct mdt_object *mll_obj;
1545         struct mdt_lock_handle mll_lh;
1546         struct list_head mll_linkage;
1547 };
1548
1549 static inline int mdt_migrate_link_lock_add(struct mdt_thread_info *info,
1550                                             struct mdt_object *o,
1551                                             struct mdt_lock_handle *lh,
1552                                             struct list_head *list)
1553 {
1554         struct mdt_link_lock *mll;
1555
1556         OBD_ALLOC_PTR(mll);
1557         if (mll == NULL)
1558                 return -ENOMEM;
1559
1560         INIT_LIST_HEAD(&mll->mll_linkage);
1561         mdt_object_get(info->mti_env, o);
1562         mll->mll_obj = o;
1563         mll->mll_lh = *lh;
1564         memset(lh, 0, sizeof(*lh));
1565         list_add_tail(&mll->mll_linkage, list);
1566
1567         return 0;
1568 }
1569
1570 static inline void mdt_migrate_link_lock_del(struct mdt_thread_info *info,
1571                                              struct mdt_link_lock *mll,
1572                                              int decref)
1573 {
1574         mdt_object_unlock(info, mll->mll_obj, &mll->mll_lh, decref);
1575         mdt_object_put(info->mti_env, mll->mll_obj);
1576         list_del(&mll->mll_linkage);
1577         OBD_FREE_PTR(mll);
1578 }
1579
1580 static void mdt_migrate_links_unlock(struct mdt_thread_info *info,
1581                                      struct list_head *list, int decref)
1582 {
1583         struct mdt_link_lock *mll;
1584         struct mdt_link_lock *tmp;
1585
1586         list_for_each_entry_safe(mll, tmp, list, mll_linkage)
1587                 mdt_migrate_link_lock_del(info, mll, decref);
1588 }
1589
1590 /* take link parent UPDATE lock.
1591  * \retval      0 \a lnkp is already locked, no lock taken.
1592  *              1 lock taken
1593  *              -ev negative errno.
1594  */
1595 static int mdt_migrate_link_parent_lock(struct mdt_thread_info *info,
1596                                         struct mdt_object *lnkp,
1597                                         struct list_head *update_locks,
1598                                         bool *blocked)
1599 {
1600         const struct lu_fid *fid = mdt_object_fid(lnkp);
1601         struct mdt_lock_handle *lhl = &info->mti_lh[MDT_LH_LOCAL];
1602         struct mdt_link_lock *entry;
1603         __u64 ibits = 0;
1604         int rc;
1605
1606         ENTRY;
1607
1608         /* check if it's already locked */
1609         list_for_each_entry(entry, update_locks, mll_linkage) {
1610                 if (lu_fid_eq(mdt_object_fid(entry->mll_obj), fid)) {
1611                         CDEBUG(D_INFO, "skip "DFID" lock\n", PFID(fid));
1612                         RETURN(0);
1613                 }
1614         }
1615
1616         /* link parent UPDATE lock */
1617         CDEBUG(D_INFO, "lock "DFID"\n", PFID(fid));
1618
1619         if (*blocked) {
1620                 /* revoke lock instead of take in *blocked* mode */
1621                 rc = mdt_object_lock(info, lnkp, lhl, MDS_INODELOCK_UPDATE,
1622                                      LCK_PW);
1623                 if (rc)
1624                         RETURN(rc);
1625
1626                 if (mdt_object_remote(lnkp)) {
1627                         struct ldlm_lock *lock;
1628
1629                         /*
1630                          * for remote object, set lock cb_atomic, so lock can be
1631                          * released in blocking_ast() immediately, then the next
1632                          * lock_try will have better chance of success.
1633                          */
1634                         lock = ldlm_handle2lock(&lhl->mlh_rreg_lh);
1635                         LASSERT(lock != NULL);
1636                         lock_res_and_lock(lock);
1637                         ldlm_set_atomic_cb(lock);
1638                         unlock_res_and_lock(lock);
1639                         LDLM_LOCK_PUT(lock);
1640                 }
1641
1642                 mdt_object_unlock(info, lnkp, lhl, 1);
1643                 RETURN(0);
1644         }
1645
1646         /*
1647          * we can't follow parent-child lock order like other MD
1648          * operations, use lock_try here to avoid deadlock, if the lock
1649          * cannot be taken, drop all locks taken, revoke the blocked
1650          * one, and continue processing the remaining entries, and in
1651          * the end of the loop restart from beginning.
1652          *
1653          * don't lock with PDO mode in case two links are under the same
1654          * parent and their hash values are different.
1655          */
1656         rc = mdt_object_lock_try(info, lnkp, lhl, &ibits, MDS_INODELOCK_UPDATE,
1657                                  LCK_PW);
1658         if (rc < 0)
1659                 RETURN(rc);
1660
1661         if (!(ibits & MDS_INODELOCK_UPDATE)) {
1662                 CDEBUG(D_INFO, "busy lock on "DFID"\n", PFID(fid));
1663                 *blocked = true;
1664                 RETURN(-EAGAIN);
1665         }
1666
1667         rc = mdt_migrate_link_lock_add(info, lnkp, lhl, update_locks);
1668         if (rc) {
1669                 mdt_object_unlock(info, lnkp, lhl, 1);
1670                 RETURN(rc);
1671         }
1672
1673         RETURN(1);
1674 }
1675
1676 /* take link LOOKUP lock.
1677  * \retval      0 \a lnkp is already locked, no lock taken.
1678  *              1 lock taken.
1679  *              -ev negative errno.
1680  */
1681 static int mdt_migrate_link_lock(struct mdt_thread_info *info,
1682                                  struct mdt_object *lnkp,
1683                                  struct mdt_object *spobj,
1684                                  struct mdt_object *obj,
1685                                  struct list_head *lookup_locks)
1686 {
1687         const struct lu_fid *fid = mdt_object_fid(lnkp);
1688         struct mdt_lock_handle *lhl = &info->mti_lh[MDT_LH_LOCAL];
1689         struct mdt_link_lock *entry;
1690         int rc;
1691
1692         ENTRY;
1693
1694         /* check if it's already locked by source */
1695         rc = mdt_fids_different_target(info, fid, mdt_object_fid(spobj));
1696         if (rc <= 0) {
1697                 CDEBUG(D_INFO, "skip lookup lock on source parent "DFID"\n",
1698                        PFID(fid));
1699                 RETURN(rc);
1700         }
1701
1702         /* check if it's already locked by other links */
1703         list_for_each_entry(entry, lookup_locks, mll_linkage) {
1704                 rc = mdt_fids_different_target(info, fid,
1705                                                mdt_object_fid(entry->mll_obj));
1706                 if (rc <= 0) {
1707                         CDEBUG(D_INFO, "skip lookup lock on parent "DFID"\n",
1708                                PFID(fid));
1709                         RETURN(rc);
1710                 }
1711         }
1712
1713         rc = mdt_object_lookup_lock(info, lnkp, obj, lhl, LCK_EX);
1714         if (rc)
1715                 RETURN(rc);
1716
1717         /* don't take local LOOKUP lock, because later we will lock other ibits
1718          * of sobj (which is on local MDT), and lock the same object twice may
1719          * deadlock, just revoke this lock.
1720          */
1721         if (!mdt_object_remote(lnkp))
1722                 GOTO(unlock, rc = 0);
1723
1724         rc = mdt_migrate_link_lock_add(info, lnkp, lhl, lookup_locks);
1725         if (rc)
1726                 GOTO(unlock, rc);
1727
1728         RETURN(1);
1729 unlock:
1730         mdt_object_unlock(info, lnkp, lhl, 1);
1731         return rc;
1732 }
1733
1734 /*
1735  * take UPDATE lock of link parents and LOOKUP lock of links, also check whether
1736  * total local lock count exceeds RS_MAX_LOCKS.
1737  *
1738  * \retval      0 on success, and locks can be saved in ptlrpc_reply_stat
1739  * \retval      1 on success, but total lock count may exceed RS_MAX_LOCKS
1740  * \retval      -ev negative errno upon error
1741  */
1742 static int mdt_migrate_links_lock(struct mdt_thread_info *info,
1743                                   struct mdt_object *spobj,
1744                                   struct mdt_object *tpobj,
1745                                   struct mdt_object *obj,
1746                                   struct mdt_lock_handle *lhsp,
1747                                   struct mdt_lock_handle *lhtp,
1748                                   struct list_head *link_locks)
1749 {
1750         struct mdt_device *mdt = info->mti_mdt;
1751         struct lu_buf *buf = &info->mti_big_buf;
1752         struct lu_name *lname = &info->mti_name;
1753         struct linkea_data ldata = { NULL };
1754         int local_lock_cnt = 0;
1755         bool blocked = false;
1756         bool saved;
1757         struct mdt_object *lnkp;
1758         struct lu_fid fid;
1759         LIST_HEAD(update_locks);
1760         LIST_HEAD(lookup_locks);
1761         int rc;
1762
1763         ENTRY;
1764         if (S_ISDIR(lu_object_attr(&obj->mot_obj)))
1765                 RETURN(0);
1766
1767         buf = lu_buf_check_and_alloc(buf, MAX_LINKEA_SIZE);
1768         if (buf->lb_buf == NULL)
1769                 RETURN(-ENOMEM);
1770
1771         ldata.ld_buf = buf;
1772         rc = mdt_links_read(info, obj, &ldata);
1773         if (rc) {
1774                 if (rc == -ENOENT || rc == -ENODATA)
1775                         rc = 0;
1776                 RETURN(rc);
1777         }
1778
1779         for (linkea_first_entry(&ldata); ldata.ld_lee && !rc;
1780              linkea_next_entry(&ldata)) {
1781                 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, lname,
1782                                     &fid);
1783
1784                 /* check if link parent is source parent too */
1785                 if (lu_fid_eq(mdt_object_fid(spobj), &fid)) {
1786                         CDEBUG(D_INFO,
1787                                "skip lock on source parent "DFID"/"DNAME"\n",
1788                                PFID(&fid), PNAME(lname));
1789                         continue;
1790                 }
1791
1792                 /* check if link parent is target parent too */
1793                 if (tpobj != spobj && lu_fid_eq(mdt_object_fid(tpobj), &fid)) {
1794                         CDEBUG(D_INFO,
1795                                "skip lock on target parent "DFID"/"DNAME"\n",
1796                                PFID(&fid), PNAME(lname));
1797                         continue;
1798                 }
1799
1800                 lnkp = mdt_object_find(info->mti_env, mdt, &fid);
1801                 if (IS_ERR(lnkp)) {
1802                         CWARN("%s: cannot find obj "DFID": %ld\n",
1803                               mdt_obd_name(mdt), PFID(&fid), PTR_ERR(lnkp));
1804                         continue;
1805                 }
1806
1807                 if (!mdt_object_exists(lnkp)) {
1808                         CDEBUG(D_INFO, DFID" doesn't exist, skip "DNAME"\n",
1809                                PFID(&fid), PNAME(lname));
1810                         mdt_object_put(info->mti_env, lnkp);
1811                         continue;
1812                 }
1813 relock:
1814                 saved = blocked;
1815                 rc = mdt_migrate_link_parent_lock(info, lnkp, &update_locks,
1816                                                   &blocked);
1817                 if (!saved && blocked) {
1818                         /* unlock all locks taken to avoid deadlock */
1819                         mdt_migrate_links_unlock(info, &update_locks, 1);
1820                         mdt_object_unlock(info, spobj, lhsp, 1);
1821                         if (tpobj != spobj)
1822                                 mdt_object_unlock(info, tpobj, lhtp, 1);
1823                         goto relock;
1824                 }
1825                 if (rc < 0) {
1826                         mdt_object_put(info->mti_env, lnkp);
1827                         GOTO(out, rc);
1828                 }
1829
1830                 if (rc == 1 && !mdt_object_remote(lnkp))
1831                         local_lock_cnt++;
1832
1833                 rc = mdt_migrate_link_lock(info, lnkp, spobj, obj,
1834                                            &lookup_locks);
1835                 if (rc < 0) {
1836                         mdt_object_put(info->mti_env, lnkp);
1837                         GOTO(out, rc);
1838                 }
1839                 if (rc == 1 && !mdt_object_remote(lnkp))
1840                         local_lock_cnt++;
1841                 mdt_object_put(info->mti_env, lnkp);
1842         }
1843
1844         if (blocked)
1845                 GOTO(out, rc = -EBUSY);
1846
1847         EXIT;
1848 out:
1849         list_splice(&update_locks, link_locks);
1850         list_splice(&lookup_locks, link_locks);
1851         if (rc < 0) {
1852                 mdt_migrate_links_unlock(info, link_locks, rc);
1853         } else if (local_lock_cnt > RS_MAX_LOCKS - 5) {
1854                 /*
1855                  * parent may have 3 local objects: master object and 2 stripes
1856                  * (if it's being migrated too); source may have 1 local objects
1857                  * as regular file; target has 1 local object.
1858                  * Note, source may have 2 local locks if it is directory but it
1859                  * can't have hardlinks, so it is not considered here.
1860                  */
1861                 CDEBUG(D_INFO, "Too many local locks (%d), migrate in sync mode\n",
1862                        local_lock_cnt);
1863                 rc = 1;
1864         }
1865         return rc;
1866 }
1867
1868 /*
1869  * lookup source by name, if parent is striped directory, we need to find the
1870  * corresponding stripe where source is located, and then lookup there.
1871  *
1872  * besides, if parent is migrating too, and file is already in target stripe,
1873  * this should be a redo of 'lfs migrate' on client side.
1874  *
1875  * \retval 1 tpobj stripe index is less than spobj stripe index
1876  * \retval 0 tpobj stripe index is larger than or equal to spobj stripe index
1877  * \retval -ev negative errno upon error
1878  */
1879 static int mdt_migrate_lookup(struct mdt_thread_info *info,
1880                               struct mdt_object *pobj,
1881                               const struct md_attr *ma,
1882                               const struct lu_name *lname,
1883                               struct mdt_object **spobj,
1884                               struct mdt_object **tpobj,
1885                               struct mdt_object **sobj)
1886 {
1887         const struct lu_env *env = info->mti_env;
1888         struct lu_fid *fid = &info->mti_tmp_fid1;
1889         int spindex = -1;
1890         int tpindex = -1;
1891         int rc;
1892
1893         if (ma->ma_valid & MA_LMV) {
1894                 /* if parent is striped, lookup on corresponding stripe */
1895                 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1896                 struct lu_fid *fid2 = &info->mti_tmp_fid2;
1897
1898                 if (!lmv_is_sane(lmv))
1899                         return -EBADF;
1900
1901                 spindex = lmv_name_to_stripe_index_old(lmv, lname->ln_name,
1902                                                        lname->ln_namelen);
1903                 if (spindex < 0)
1904                         return spindex;
1905
1906                 fid_le_to_cpu(fid2, &lmv->lmv_stripe_fids[spindex]);
1907
1908                 *spobj = mdt_object_find(env, info->mti_mdt, fid2);
1909                 if (IS_ERR(*spobj)) {
1910                         rc = PTR_ERR(*spobj);
1911                         *spobj = NULL;
1912                         return rc;
1913                 }
1914
1915                 if (!mdt_object_exists(*spobj))
1916                         GOTO(spobj_put, rc = -ENOENT);
1917
1918                 fid_zero(fid);
1919                 rc = mdo_lookup(env, mdt_object_child(*spobj), lname, fid,
1920                                 &info->mti_spec);
1921                 if ((rc == -ENOENT || rc == 0) && lmv_is_layout_changing(lmv)) {
1922                         /* fail check here to let top dir migration succeed. */
1923                         if (CFS_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_ENTRIES, 0))
1924                                 GOTO(spobj_put, rc = -EIO);
1925
1926                         /*
1927                          * if parent layout is changeing, and lookup child
1928                          * failed on source stripe, lookup again on target
1929                          * stripe, if it exists, it means previous migration
1930                          * was interrupted, and current file was migrated
1931                          * already.
1932                          */
1933                         tpindex = lmv_name_to_stripe_index(lmv, lname->ln_name,
1934                                                            lname->ln_namelen);
1935                         if (tpindex < 0)
1936                                 GOTO(spobj_put, rc = tpindex);
1937
1938                         fid_le_to_cpu(fid2, &lmv->lmv_stripe_fids[tpindex]);
1939
1940                         *tpobj = mdt_object_find(env, info->mti_mdt, fid2);
1941                         if (IS_ERR(*tpobj)) {
1942                                 rc = PTR_ERR(*tpobj);
1943                                 *tpobj = NULL;
1944                                 GOTO(spobj_put, rc);
1945                         }
1946
1947                         if (!mdt_object_exists(*tpobj))
1948                                 GOTO(tpobj_put, rc = -ENOENT);
1949
1950                         if (rc == -ENOENT) {
1951                                 fid_zero(fid);
1952                                 rc = mdo_lookup(env, mdt_object_child(*tpobj),
1953                                                 lname, fid, &info->mti_spec);
1954                                 GOTO(tpobj_put, rc = rc ?: -EALREADY);
1955                         }
1956                 } else if (rc) {
1957                         GOTO(spobj_put, rc);
1958                 } else {
1959                         *tpobj = *spobj;
1960                         tpindex = spindex;
1961                         mdt_object_get(env, *tpobj);
1962                 }
1963         } else {
1964                 fid_zero(fid);
1965                 rc = mdo_lookup(env, mdt_object_child(pobj), lname, fid,
1966                                 &info->mti_spec);
1967                 if (rc)
1968                         return rc;
1969
1970                 *spobj = pobj;
1971                 *tpobj = pobj;
1972                 mdt_object_get(env, pobj);
1973                 mdt_object_get(env, pobj);
1974         }
1975
1976         *sobj = mdt_object_find(env, info->mti_mdt, fid);
1977         if (IS_ERR(*sobj)) {
1978                 rc = PTR_ERR(*sobj);
1979                 *sobj = NULL;
1980                 GOTO(tpobj_put, rc);
1981         }
1982
1983         if (!mdt_object_exists(*sobj))
1984                 GOTO(sobj_put, rc = -ENOENT);
1985
1986         return (tpindex < spindex);
1987
1988 sobj_put:
1989         mdt_object_put(env, *sobj);
1990         *sobj = NULL;
1991 tpobj_put:
1992         mdt_object_put(env, *tpobj);
1993         *tpobj = NULL;
1994 spobj_put:
1995         mdt_object_put(env, *spobj);
1996         *spobj = NULL;
1997
1998         return rc;
1999 }
2000
2001 /* end lease and close file for regular file */
2002 static int mdd_migrate_close(struct mdt_thread_info *info,
2003                              struct mdt_object *obj)
2004 {
2005         struct close_data *data;
2006         struct mdt_body *repbody;
2007         struct ldlm_lock *lease;
2008         int rc;
2009         int rc2;
2010
2011         rc = -EPROTO;
2012         if (!req_capsule_field_present(info->mti_pill, &RMF_MDT_EPOCH,
2013                                       RCL_CLIENT) ||
2014             !req_capsule_field_present(info->mti_pill, &RMF_CLOSE_DATA,
2015                                       RCL_CLIENT))
2016                 goto close;
2017
2018         data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
2019         if (!data)
2020                 goto close;
2021
2022         rc = -ESTALE;
2023         lease = ldlm_handle2lock(&data->cd_handle);
2024         if (!lease)
2025                 goto close;
2026
2027         /* check if the lease was already canceled */
2028         lock_res_and_lock(lease);
2029         rc = ldlm_is_cancel(lease);
2030         unlock_res_and_lock(lease);
2031
2032         if (rc) {
2033                 rc = -EAGAIN;
2034                 LDLM_DEBUG(lease, DFID" lease broken",
2035                            PFID(mdt_object_fid(obj)));
2036         }
2037
2038         /*
2039          * cancel server side lease, client side counterpart should have been
2040          * cancelled, it's okay to cancel it now as we've held mot_open_sem.
2041          */
2042         ldlm_lock_cancel(lease);
2043         ldlm_reprocess_all(lease->l_resource,
2044                            lease->l_policy_data.l_inodebits.bits);
2045         LDLM_LOCK_PUT(lease);
2046
2047 close:
2048         rc2 = mdt_close_internal(info, mdt_info_req(info), NULL);
2049         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2050         repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
2051
2052         return rc ?: rc2;
2053 }
2054
2055 /* LFSCK used to clear hash type and MIGRATION flag upon migration failure */
2056 static inline bool lmv_is_failed_migration(const struct lmv_mds_md_v1 *lmv)
2057 {
2058         return le32_to_cpu(lmv->lmv_hash_type) ==
2059                 (LMV_HASH_TYPE_UNKNOWN | LMV_HASH_FLAG_BAD_TYPE) &&
2060                lmv_is_known_hash_type(le32_to_cpu(lmv->lmv_migrate_hash)) &&
2061                le32_to_cpu(lmv->lmv_migrate_offset) > 0 &&
2062                le32_to_cpu(lmv->lmv_migrate_offset) <
2063                 le32_to_cpu(lmv->lmv_stripe_count);
2064 }
2065
2066 /*
2067  * migrate file in below steps:
2068  *  1. lock source and target stripes
2069  *  2. lookup source by name
2070  *  3. lock parents of source links if source is not directory
2071  *  4. reject if source is in HSM
2072  *  5. take source open_sem and close file if source is regular file
2073  *  6. lock source, and its stripes if it's directory
2074  *  7. migrate file
2075  *  8. lock target so subsequent change to it can trigger COS
2076  *  9. unlock above locks
2077  * 10. sync device if source has too many links
2078  */
2079 int mdt_reint_migrate(struct mdt_thread_info *info,
2080                       struct mdt_lock_handle *unused)
2081 {
2082         const struct lu_env *env = info->mti_env;
2083         struct mdt_device *mdt = info->mti_mdt;
2084         struct ptlrpc_request *req = mdt_info_req(info);
2085         struct mdt_reint_record *rr = &info->mti_rr;
2086         struct lu_ucred *uc = mdt_ucred(info);
2087         struct md_attr *ma = &info->mti_attr;
2088         struct mdt_object *pobj;
2089         struct mdt_object *spobj;
2090         struct mdt_object *tpobj;
2091         struct mdt_object *sobj;
2092         struct mdt_object *tobj;
2093         struct mdt_lock_handle *rename_lh = &info->mti_lh[MDT_LH_RMT];
2094         struct mdt_lock_handle *lhsp;
2095         struct mdt_lock_handle *lhtp;
2096         struct mdt_lock_handle *lhs;
2097         struct mdt_lock_handle *lhl;
2098         LIST_HEAD(link_locks);
2099         int lock_retries = 5;
2100         bool reverse = false;
2101         bool open_sem_locked = false;
2102         bool do_sync = false;
2103         bool is_plain_dir = false;
2104         int rc;
2105
2106         ENTRY;
2107         CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1),
2108                PNAME(&rr->rr_name), PFID(rr->rr_fid2));
2109
2110         if (info->mti_dlm_req)
2111                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2112
2113         if (!fid_is_md_operative(rr->rr_fid1) ||
2114             !fid_is_md_operative(rr->rr_fid2))
2115                 RETURN(-EPERM);
2116
2117         /* don't allow migrate . or .. */
2118         if (lu_name_is_dot_or_dotdot(&rr->rr_name))
2119                 RETURN(-EBUSY);
2120
2121         if (!mdt->mdt_enable_remote_dir || !mdt->mdt_enable_dir_migration)
2122                 RETURN(-EPERM);
2123
2124         /* we want rbac roles to have precedence over any other
2125          * permission or capability checks
2126          */
2127         if (uc && (!uc->uc_rbac_dne_ops ||
2128                    (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
2129                     uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
2130                     mdt->mdt_enable_remote_dir_gid != -1)))
2131                 RETURN(-EPERM);
2132
2133         /*
2134          * Note: do not enqueue rename lock for replay request, because
2135          * if other MDT holds rename lock, but being blocked to wait for
2136          * this MDT to finish its recovery, and the failover MDT can not
2137          * get rename lock, which will cause deadlock.
2138          *
2139          * req is NULL if this is called by directory auto-split.
2140          */
2141         if (req && !req_is_replay(req)) {
2142                 rc = mdt_rename_lock(info, rename_lh);
2143                 if (rc != 0) {
2144                         CERROR("%s: can't lock FS for rename: rc = %d\n",
2145                                mdt_obd_name(info->mti_mdt), rc);
2146                         RETURN(rc);
2147                 }
2148         }
2149
2150         /* pobj is master object of parent */
2151         pobj = mdt_object_find(env, mdt, rr->rr_fid1);
2152         if (IS_ERR(pobj))
2153                 GOTO(unlock_rename, rc = PTR_ERR(pobj));
2154
2155         if (req) {
2156                 rc = mdt_version_get_check(info, pobj, 0);
2157                 if (rc)
2158                         GOTO(put_parent, rc);
2159         }
2160
2161         if (!mdt_object_exists(pobj))
2162                 GOTO(put_parent, rc = -ENOENT);
2163
2164         if (!S_ISDIR(lu_object_attr(&pobj->mot_obj)))
2165                 GOTO(put_parent, rc = -ENOTDIR);
2166
2167         rc = mdt_check_enc(info, pobj);
2168         if (rc)
2169                 GOTO(put_parent, rc);
2170
2171         rc = mdt_stripe_get(info, pobj, ma, XATTR_NAME_LMV);
2172         if (rc)
2173                 GOTO(put_parent, rc);
2174
2175         if (CFS_FAIL_CHECK(OBD_FAIL_MIGRATE_BAD_HASH) &&
2176             (ma->ma_valid & MA_LMV) &&
2177             lmv_is_migrating(&ma->ma_lmv->lmv_md_v1)) {
2178                 struct lu_buf *buf = &info->mti_buf;
2179                 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
2180                 __u32 version = le32_to_cpu(lmv->lmv_layout_version);
2181
2182                 lmv->lmv_hash_type = cpu_to_le32(LMV_HASH_TYPE_UNKNOWN |
2183                                                  LMV_HASH_FLAG_BAD_TYPE);
2184                 lmv->lmv_layout_version = cpu_to_le32(version + 1);
2185                 buf->lb_buf = lmv;
2186                 buf->lb_len = sizeof(*lmv);
2187                 rc = mo_xattr_set(env, mdt_object_child(pobj), buf,
2188                                   XATTR_NAME_LMV, LU_XATTR_REPLACE);
2189                 mo_invalidate(env, mdt_object_child(pobj));
2190                 GOTO(put_parent, rc);
2191         }
2192
2193         /* @spobj is the parent stripe of @sobj if @pobj is striped directory,
2194          * if @pobj is migrating too, tpobj is the target parent stripe.
2195          */
2196         rc = mdt_migrate_lookup(info, pobj, ma, &rr->rr_name, &spobj, &tpobj,
2197                                 &sobj);
2198         if (rc < 0)
2199                 GOTO(put_parent, rc);
2200         reverse = rc;
2201
2202         /* parent unchanged, this happens in dir restripe */
2203         if (info->mti_spec.sp_migrate_nsonly && spobj == tpobj)
2204                 GOTO(put_source, rc = -EALREADY);
2205
2206 lock_parent:
2207         LASSERT(spobj);
2208         LASSERT(tpobj);
2209         lhsp = &info->mti_lh[MDT_LH_PARENT];
2210         lhtp = &info->mti_lh[MDT_LH_CHILD];
2211         /* lock spobj and tpobj in stripe index order */
2212         if (reverse) {
2213                 rc = mdt_parent_lock(info, tpobj, lhtp, &rr->rr_name, LCK_PW);
2214                 if (rc)
2215                         GOTO(put_source, rc);
2216
2217                 LASSERT(spobj != tpobj);
2218                 rc = mdt_parent_lock(info, spobj, lhsp, &rr->rr_name, LCK_PW);
2219                 if (rc)
2220                         GOTO(unlock_parent, rc);
2221         } else {
2222                 rc = mdt_parent_lock(info, spobj, lhsp, &rr->rr_name, LCK_PW);
2223                 if (rc)
2224                         GOTO(put_source, rc);
2225
2226                 if (tpobj != spobj) {
2227                         rc = mdt_parent_lock(info, tpobj, lhtp, &rr->rr_name,
2228                                              LCK_PW);
2229                         if (rc)
2230                                 GOTO(unlock_parent, rc);
2231                 }
2232         }
2233
2234         /* if inode is not migrated, or is dir, no need to lock links */
2235         if (!info->mti_spec.sp_migrate_nsonly &&
2236             !S_ISDIR(lu_object_attr(&sobj->mot_obj))) {
2237                 /* lock link parents, and take LOOKUP lock of links */
2238                 rc = mdt_migrate_links_lock(info, spobj, tpobj, sobj, lhsp,
2239                                             lhtp, &link_locks);
2240                 if (rc == -EBUSY && lock_retries-- > 0) {
2241                         LASSERT(list_empty(&link_locks));
2242                         goto lock_parent;
2243                 }
2244
2245                 if (rc < 0)
2246                         GOTO(put_source, rc);
2247
2248                 /*
2249                  * RS_MAX_LOCKS is the limit of number of locks that can be
2250                  * saved along with one request, if total lock count exceeds
2251                  * this limit, we will drop all locks after migration, and
2252                  * trigger commit in the end.
2253                  */
2254                 do_sync = rc;
2255         }
2256
2257         /* lock source */
2258         lhs = &info->mti_lh[MDT_LH_OLD];
2259         lhl = &info->mti_lh[MDT_LH_LOOKUP];
2260         rc = mdt_rename_source_lock(info, spobj, sobj, lhs, lhl,
2261                                     MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR |
2262                                     MDS_INODELOCK_OPEN);
2263         if (rc)
2264                 GOTO(unlock_links, rc);
2265
2266         if (S_ISREG(lu_object_attr(&sobj->mot_obj))) {
2267                 /* TODO: DoM migration is not supported, migrate dirent only */
2268                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LOV);
2269                 if (rc)
2270                         GOTO(unlock_source, rc);
2271
2272                 if (ma->ma_valid & MA_LOV && mdt_lmm_dom_stripesize(ma->ma_lmm))
2273                         info->mti_spec.sp_migrate_nsonly = 1;
2274         } else if (S_ISDIR(lu_object_attr(&sobj->mot_obj))) {
2275                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2276                 if (rc)
2277                         GOTO(unlock_source, rc);
2278
2279                 if (!(ma->ma_valid & MA_LMV))
2280                         is_plain_dir = true;
2281                 else if (lmv_is_restriping(&ma->ma_lmv->lmv_md_v1))
2282                         /* race with restripe/auto-split */
2283                         GOTO(unlock_source, rc = -EBUSY);
2284                 else if (lmv_is_failed_migration(&ma->ma_lmv->lmv_md_v1)) {
2285                         struct lu_buf *buf = &info->mti_buf;
2286                         struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
2287                         __u32 version = le32_to_cpu(lmv->lmv_layout_version);
2288
2289                         /* migration failed before, and LFSCK cleared hash type
2290                          * and flags, fake it to resume migration.
2291                          */
2292                         lmv->lmv_hash_type =
2293                                 cpu_to_le32(LMV_HASH_TYPE_FNV_1A_64 |
2294                                             LMV_HASH_FLAG_MIGRATION |
2295                                             LMV_HASH_FLAG_BAD_TYPE |
2296                                             LMV_HASH_FLAG_FIXED);
2297                         lmv->lmv_layout_version = cpu_to_le32(version + 1);
2298                         buf->lb_buf = lmv;
2299                         buf->lb_len = sizeof(*lmv);
2300                         rc = mo_xattr_set(env, mdt_object_child(sobj), buf,
2301                                           XATTR_NAME_LMV, LU_XATTR_REPLACE);
2302                         mo_invalidate(env, mdt_object_child(sobj));
2303                         GOTO(unlock_source, rc = -EALREADY);
2304                 }
2305         }
2306
2307         /* if migration HSM is allowed */
2308         if (!mdt->mdt_opts.mo_migrate_hsm_allowed) {
2309                 ma->ma_need = MA_HSM;
2310                 ma->ma_valid = 0;
2311                 rc = mdt_attr_get_complex(info, sobj, ma);
2312                 if (rc)
2313                         GOTO(unlock_source, rc);
2314
2315                 if ((ma->ma_valid & MA_HSM) && ma->ma_hsm.mh_flags != 0)
2316                         GOTO(unlock_source, rc = -EOPNOTSUPP);
2317         }
2318
2319         /* end lease and close file for regular file */
2320         if (info->mti_spec.sp_migrate_close) {
2321                 /* try to hold open_sem so that nobody else can open the file */
2322                 if (!down_write_trylock(&sobj->mot_open_sem)) {
2323                         /* close anyway */
2324                         mdd_migrate_close(info, sobj);
2325                         GOTO(unlock_source, rc = -EBUSY);
2326                 } else {
2327                         open_sem_locked = true;
2328                         rc = mdd_migrate_close(info, sobj);
2329                         if (rc && rc != -ESTALE)
2330                                 GOTO(unlock_open_sem, rc);
2331                 }
2332         }
2333
2334         tobj = mdt_object_find(env, mdt, rr->rr_fid2);
2335         if (IS_ERR(tobj))
2336                 GOTO(unlock_open_sem, rc = PTR_ERR(tobj));
2337
2338         /* Don't do lookup sanity check. We know name doesn't exist. */
2339         info->mti_spec.sp_cr_lookup = 0;
2340         info->mti_spec.sp_feat = &dt_directory_features;
2341
2342         rc = mdo_migrate(env, mdt_object_child(spobj),
2343                          mdt_object_child(tpobj), mdt_object_child(sobj),
2344                          mdt_object_child(tobj), &rr->rr_name,
2345                          &info->mti_spec, ma);
2346         if (rc)
2347                 GOTO(put_target, rc);
2348
2349         /* save target locks for directory */
2350         if (S_ISDIR(lu_object_attr(&sobj->mot_obj)) &&
2351             !info->mti_spec.sp_migrate_nsonly) {
2352                 struct mdt_lock_handle *lht = &info->mti_lh[MDT_LH_NEW];
2353                 struct ldlm_enqueue_info *einfo = &info->mti_einfo;
2354
2355                 /* in case sobj becomes a stripe of tobj, unlock sobj here,
2356                  * otherwise stripes lock may deadlock.
2357                  */
2358                 if (is_plain_dir)
2359                         mdt_rename_source_unlock(info, sobj, lhs, lhl, 1);
2360
2361                 rc = mdt_object_stripes_lock(info, tpobj, tobj, lht, einfo,
2362                                              MDS_INODELOCK_UPDATE, LCK_PW);
2363                 if (rc)
2364                         GOTO(put_target, rc);
2365
2366                 mdt_object_stripes_unlock(info, tobj, lht, einfo, 0);
2367         }
2368
2369         lprocfs_counter_incr(mdt->mdt_lu_dev.ld_obd->obd_md_stats,
2370                              LPROC_MDT_MIGRATE + LPROC_MD_LAST_OPC);
2371
2372         EXIT;
2373 put_target:
2374         mdt_object_put(env, tobj);
2375 unlock_open_sem:
2376         if (open_sem_locked)
2377                 up_write(&sobj->mot_open_sem);
2378 unlock_source:
2379         mdt_rename_source_unlock(info, sobj, lhs, lhl, rc);
2380 unlock_links:
2381         /* if we've got too many locks to save into RPC,
2382          * then just commit before the locks are released
2383          */
2384         if (!rc && do_sync)
2385                 mdt_device_sync(env, mdt);
2386         mdt_migrate_links_unlock(info, &link_locks, do_sync ? 1 : rc);
2387 unlock_parent:
2388         mdt_object_unlock(info, spobj, lhsp, rc);
2389         mdt_object_unlock(info, tpobj, lhtp, rc);
2390 put_source:
2391         mdt_object_put(env, sobj);
2392         mdt_object_put(env, spobj);
2393         mdt_object_put(env, tpobj);
2394 put_parent:
2395         mo_invalidate(env, mdt_object_child(pobj));
2396         mdt_object_put(env, pobj);
2397 unlock_rename:
2398         mdt_rename_unlock(info, rename_lh);
2399
2400         if (rc)
2401                 CERROR("%s: migrate "DFID"/"DNAME" failed: rc = %d\n",
2402                        mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1),
2403                        PNAME(&rr->rr_name), rc);
2404
2405         return rc;
2406 }
2407
2408 /*
2409  * determine lock order of sobj and tobj
2410  *
2411  * there are two situations we need to lock tobj before sobj:
2412  * 1. sobj is child of tobj
2413  * 2. sobj and tobj are stripes of a directory, and stripe index of sobj is
2414  *    larger than that of tobj
2415  *
2416  * \retval      1 lock tobj before sobj
2417  * \retval      0 lock sobj before tobj
2418  * \retval      -ev negative errno upon error
2419  */
2420 static int mdt_rename_determine_lock_order(struct mdt_thread_info *info,
2421                                            struct mdt_object *sobj,
2422                                            struct mdt_object *tobj)
2423 {
2424         struct md_attr *ma = &info->mti_attr;
2425         struct lu_fid *spfid = &info->mti_tmp_fid1;
2426         struct lu_fid *tpfid = &info->mti_tmp_fid2;
2427         struct lmv_mds_md_v1 *lmv;
2428         __u32 sindex;
2429         __u32 tindex;
2430         int rc;
2431
2432         /* sobj and tobj are the same */
2433         if (sobj == tobj)
2434                 return 0;
2435
2436         if (fid_is_root(mdt_object_fid(sobj)))
2437                 return 0;
2438
2439         if (fid_is_root(mdt_object_fid(tobj)))
2440                 return 1;
2441
2442         /* check whether sobj is child of tobj */
2443         rc = mdo_is_subdir(info->mti_env, mdt_object_child(sobj),
2444                            mdt_object_fid(tobj));
2445         if (rc < 0)
2446                 return rc;
2447
2448         if (rc == 1)
2449                 return 1;
2450
2451         /* check whether sobj and tobj are children of the same parent */
2452         rc = mdt_attr_get_pfid(info, sobj, spfid);
2453         if (rc)
2454                 return rc;
2455
2456         rc = mdt_attr_get_pfid(info, tobj, tpfid);
2457         if (rc)
2458                 return rc;
2459
2460         if (!lu_fid_eq(spfid, tpfid))
2461                 return 0;
2462
2463         /* check whether sobj and tobj are sibling stripes */
2464         rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2465         if (rc)
2466                 return rc;
2467
2468         if (!(ma->ma_valid & MA_LMV))
2469                 return 0;
2470
2471         lmv = &ma->ma_lmv->lmv_md_v1;
2472         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2473                 return 0;
2474         sindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2475
2476         ma->ma_valid = 0;
2477         rc = mdt_stripe_get(info, tobj, ma, XATTR_NAME_LMV);
2478         if (rc)
2479                 return rc;
2480
2481         if (!(ma->ma_valid & MA_LMV))
2482                 return -ENODATA;
2483
2484         lmv = &ma->ma_lmv->lmv_md_v1;
2485         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2486                 return -EINVAL;
2487         tindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2488
2489         /* check stripe index of sobj and tobj */
2490         if (sindex == tindex)
2491                 return -EINVAL;
2492
2493         return sindex < tindex ? 0 : 1;
2494 }
2495
2496 /* Helper function for mdt_reint_rename so we don't need to opencode
2497  * two different order lockings
2498  */
2499 static int mdt_lock_two_dirs(struct mdt_thread_info *info,
2500                              struct mdt_object *mfirstdir,
2501                              struct mdt_lock_handle *lh_firstdirp,
2502                              const struct lu_name *firstname,
2503                              struct mdt_object *mseconddir,
2504                              struct mdt_lock_handle *lh_seconddirp,
2505                              const struct lu_name *secondname)
2506 {
2507         int rc;
2508
2509         rc = mdt_parent_lock(info, mfirstdir, lh_firstdirp, firstname, LCK_PW);
2510         if (rc)
2511                 return rc;
2512
2513         mdt_version_get_save(info, mfirstdir, 0);
2514         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
2515
2516         if (mfirstdir != mseconddir) {
2517                 rc = mdt_parent_lock(info, mseconddir, lh_seconddirp,
2518                                      secondname, LCK_PW);
2519         } else if (!mdt_object_remote(mseconddir)) {
2520                 if (lh_firstdirp->mlh_pdo_hash !=
2521                     lh_seconddirp->mlh_pdo_hash) {
2522                         rc = mdt_object_pdo_lock(info, mseconddir,
2523                                                  lh_seconddirp, secondname,
2524                                                  LCK_PW, false);
2525                         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
2526                 }
2527         }
2528         mdt_version_get_save(info, mseconddir, 1);
2529
2530         if (rc != 0)
2531                 mdt_object_unlock(info, mfirstdir, lh_firstdirp, rc);
2532
2533         return rc;
2534 }
2535
2536 /*
2537  * VBR: rename versions in reply: 0 - srcdir parent; 1 - tgtdir parent;
2538  * 2 - srcdir child; 3 - tgtdir child.
2539  * Update on disk version of srcdir child.
2540  */
2541 static int mdt_reint_rename(struct mdt_thread_info *info,
2542                             struct mdt_lock_handle *unused)
2543 {
2544         struct mdt_device *mdt = info->mti_mdt;
2545         struct mdt_reint_record *rr = &info->mti_rr;
2546         struct md_attr *ma = &info->mti_attr;
2547         struct ptlrpc_request *req = mdt_info_req(info);
2548         struct mdt_object *msrcdir = NULL;
2549         struct mdt_object *mtgtdir = NULL;
2550         struct mdt_object *mold;
2551         struct mdt_object *mnew = NULL;
2552         struct mdt_lock_handle *rename_lh = &info->mti_lh[MDT_LH_RMT];
2553         struct mdt_lock_handle *lh_srcdirp;
2554         struct mdt_lock_handle *lh_tgtdirp;
2555         struct mdt_lock_handle *lh_oldp = NULL;
2556         struct mdt_lock_handle *lh_lookup = NULL;
2557         struct mdt_lock_handle *lh_newp = NULL;
2558         struct lu_fid *old_fid = &info->mti_tmp_fid1;
2559         struct lu_fid *new_fid = &info->mti_tmp_fid2;
2560         struct lu_ucred *uc = mdt_ucred(info);
2561         bool reverse = false, discard = false;
2562         ktime_t kstart = ktime_get();
2563         enum mdt_stat_idx msi = 0;
2564         int rc;
2565
2566         ENTRY;
2567         DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME,
2568                   PFID(rr->rr_fid1), PNAME(&rr->rr_name),
2569                   PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name));
2570
2571         if (info->mti_dlm_req)
2572                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2573
2574         if (!fid_is_md_operative(rr->rr_fid1) ||
2575             !fid_is_md_operative(rr->rr_fid2))
2576                 RETURN(-EPERM);
2577
2578         /* find both parents. */
2579         msrcdir = mdt_parent_find_check(info, rr->rr_fid1, 0);
2580         if (IS_ERR(msrcdir))
2581                 RETURN(PTR_ERR(msrcdir));
2582
2583         rc = mdt_check_enc(info, msrcdir);
2584         if (rc)
2585                 GOTO(out_put_srcdir, rc);
2586
2587         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
2588
2589         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
2590                 mtgtdir = msrcdir;
2591                 mdt_object_get(info->mti_env, mtgtdir);
2592         } else {
2593                 mtgtdir = mdt_parent_find_check(info, rr->rr_fid2, 1);
2594                 if (IS_ERR(mtgtdir))
2595                         GOTO(out_put_srcdir, rc = PTR_ERR(mtgtdir));
2596         }
2597
2598         rc = mdt_check_enc(info, mtgtdir);
2599         if (rc)
2600                 GOTO(out_put_tgtdir, rc);
2601
2602         if (!uc->uc_rbac_fscrypt_admin &&
2603             mtgtdir->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD)
2604                 GOTO(out_put_tgtdir, rc = -EPERM);
2605
2606         /*
2607          * Note: do not enqueue rename lock for replay request, because
2608          * if other MDT holds rename lock, but being blocked to wait for
2609          * this MDT to finish its recovery, and the failover MDT can not
2610          * get rename lock, which will cause deadlock.
2611          */
2612         if (!req_is_replay(req)) {
2613                 bool remote = mdt_object_remote(msrcdir);
2614
2615                 /*
2616                  * Normally rename RPC is handled on the MDT with the target
2617                  * directory (if target exists, it's on the MDT with the
2618                  * target), if the source directory is remote, it's a hint that
2619                  * source is remote too (this may not be true, but it won't
2620                  * cause any issue), return -EXDEV early to avoid taking
2621                  * rename_lock.
2622                  */
2623                 if (!mdt->mdt_enable_remote_rename && remote)
2624                         GOTO(out_put_tgtdir, rc = -EXDEV);
2625
2626                 /* This might be further relaxed in the future for regular file
2627                  * renames in different source and target parents. Start with
2628                  * only same-directory renames for simplicity and because this
2629                  * is by far the most the common use case.
2630                  *
2631                  * Striped directories should be considered "remote".
2632                  */
2633                 if (msrcdir != mtgtdir || remote ||
2634                     (S_ISDIR(ma->ma_attr.la_mode) &&
2635                      !mdt->mdt_enable_parallel_rename_dir) ||
2636                     (!S_ISDIR(ma->ma_attr.la_mode) &&
2637                      !mdt->mdt_enable_parallel_rename_file)) {
2638                         rc = mdt_rename_lock(info, rename_lh);
2639                         if (rc != 0) {
2640                                 CERROR("%s: cannot lock for rename: rc = %d\n",
2641                                        mdt_obd_name(mdt), rc);
2642                                 GOTO(out_put_tgtdir, rc);
2643                         }
2644                 } else {
2645                         if (S_ISDIR(ma->ma_attr.la_mode))
2646                                 msi = LPROC_MDT_RENAME_PAR_DIR;
2647                         else
2648                                 msi = LPROC_MDT_RENAME_PAR_FILE;
2649
2650                         CDEBUG(D_INFO,
2651                                "%s: samedir parallel rename "DFID"/"DNAME"\n",
2652                                mdt_obd_name(mdt), PFID(rr->rr_fid1),
2653                                PNAME(&rr->rr_name));
2654                 }
2655         }
2656
2657         rc = mdt_rename_determine_lock_order(info, msrcdir, mtgtdir);
2658         if (rc < 0)
2659                 GOTO(out_unlock_rename, rc);
2660         reverse = rc;
2661
2662         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2663         CFS_RACE(OBD_FAIL_MDS_REINT_OPEN);
2664         CFS_RACE(OBD_FAIL_MDS_REINT_OPEN2);
2665
2666         /* lock parents in the proper order. */
2667         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
2668         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
2669         mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name);
2670         mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name);
2671
2672         /* In case of same dir local rename we must sort by the hash,
2673          * otherwise a lock deadlock is possible when renaming
2674          * a to b and b to a at the same time LU-15285
2675          */
2676         if (!mdt_object_remote(mtgtdir) && mtgtdir == msrcdir)
2677                 reverse = lh_srcdirp->mlh_pdo_hash > lh_tgtdirp->mlh_pdo_hash;
2678         if (unlikely(CFS_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK)))
2679                 reverse = 0;
2680
2681         if (reverse)
2682                 rc = mdt_lock_two_dirs(info, mtgtdir, lh_tgtdirp,
2683                                        &rr->rr_tgt_name, msrcdir, lh_srcdirp,
2684                                        &rr->rr_name);
2685         else
2686                 rc = mdt_lock_two_dirs(info, msrcdir, lh_srcdirp, &rr->rr_name,
2687                                        mtgtdir, lh_tgtdirp, &rr->rr_tgt_name);
2688
2689         if (rc != 0)
2690                 GOTO(out_unlock_rename, rc);
2691
2692         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2693         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME2, 5);
2694
2695         /* find mold object. */
2696         fid_zero(old_fid);
2697         rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2);
2698         if (rc != 0)
2699                 GOTO(out_unlock_parents, rc);
2700
2701         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
2702                 GOTO(out_unlock_parents, rc = -EINVAL);
2703
2704         if (!fid_is_md_operative(old_fid))
2705                 GOTO(out_unlock_parents, rc = -EPERM);
2706
2707         mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
2708         if (IS_ERR(mold))
2709                 GOTO(out_unlock_parents, rc = PTR_ERR(mold));
2710
2711         if (!mdt_object_exists(mold)) {
2712                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2713                                 &mold->mot_obj,
2714                                 "object does not exist");
2715                 GOTO(out_put_old, rc = -ENOENT);
2716         }
2717
2718         if (mdt_object_remote(mold) && !mdt->mdt_enable_remote_rename)
2719                 GOTO(out_put_old, rc = -EXDEV);
2720
2721         /* Check if @mtgtdir is subdir of @mold, before locking child
2722          * to avoid reverse locking.
2723          */
2724         if (mtgtdir != msrcdir) {
2725                 rc = mdo_is_subdir(info->mti_env, mdt_object_child(mtgtdir),
2726                                    old_fid);
2727                 if (rc) {
2728                         if (rc == 1)
2729                                 rc = -EINVAL;
2730                         GOTO(out_put_old, rc);
2731                 }
2732         }
2733
2734         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold));
2735         /* save version after locking */
2736         mdt_version_get_save(info, mold, 2);
2737
2738         /* find mnew object:
2739          * mnew target object may not exist now
2740          * lookup with version checking
2741          */
2742         fid_zero(new_fid);
2743         rc = mdt_lookup_version_check(info, mtgtdir, &rr->rr_tgt_name, new_fid,
2744                                       3);
2745         if (rc == 0) {
2746                 /* the new_fid should have been filled at this moment */
2747                 if (lu_fid_eq(old_fid, new_fid))
2748                         GOTO(out_put_old, rc);
2749
2750                 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
2751                     lu_fid_eq(new_fid, rr->rr_fid2))
2752                         GOTO(out_put_old, rc = -EINVAL);
2753
2754                 if (!fid_is_md_operative(new_fid))
2755                         GOTO(out_put_old, rc = -EPERM);
2756
2757                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
2758                 if (IS_ERR(mnew))
2759                         GOTO(out_put_old, rc = PTR_ERR(mnew));
2760
2761                 if (!mdt_object_exists(mnew)) {
2762                         LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2763                                         &mnew->mot_obj,
2764                                         "object does not exist");
2765                         GOTO(out_put_new, rc = -ENOENT);
2766                 }
2767
2768                 if (mdt_object_remote(mnew)) {
2769                         struct mdt_body  *repbody;
2770
2771                         /* Always send rename req to the target child MDT */
2772                         repbody = req_capsule_server_get(info->mti_pill,
2773                                                          &RMF_MDT_BODY);
2774                         LASSERT(repbody != NULL);
2775                         repbody->mbo_fid1 = *new_fid;
2776                         repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
2777                         GOTO(out_put_new, rc = -EXDEV);
2778                 }
2779                 /* Before locking the target dir, check we do not replace
2780                  * a dir with a non-dir, otherwise it may deadlock with
2781                  * link op which tries to create a link in this dir
2782                  * back to this non-dir.
2783                  */
2784                 if (S_ISDIR(lu_object_attr(&mnew->mot_obj)) &&
2785                     !S_ISDIR(lu_object_attr(&mold->mot_obj)))
2786                         GOTO(out_put_new, rc = -EISDIR);
2787
2788                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2789                 lh_lookup = &info->mti_lh[MDT_LH_LOOKUP];
2790                 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2791                                             lh_lookup,
2792                                             MDS_INODELOCK_LOOKUP |
2793                                             MDS_INODELOCK_XATTR);
2794                 if (rc < 0)
2795                         GOTO(out_put_new, rc);
2796
2797                 /* Check if @msrcdir is subdir of @mnew, before locking child
2798                  * to avoid reverse locking.
2799                  */
2800                 if (mtgtdir != msrcdir) {
2801                         rc = mdo_is_subdir(info->mti_env,
2802                                            mdt_object_child(msrcdir), new_fid);
2803                         if (rc) {
2804                                 if (rc == 1)
2805                                         rc = -EINVAL;
2806                                 GOTO(out_unlock_old, rc);
2807                         }
2808                 }
2809
2810                 /* We used to acquire MDS_INODELOCK_FULL here but we
2811                  * can't do this now because a running HSM restore on
2812                  * the rename onto victim will hold the layout
2813                  * lock. See LU-4002.
2814                  */
2815
2816                 lh_newp = &info->mti_lh[MDT_LH_NEW];
2817                 rc = mdt_object_check_lock(info, mtgtdir, mnew, lh_newp,
2818                                            MDS_INODELOCK_LOOKUP |
2819                                            MDS_INODELOCK_UPDATE, LCK_EX);
2820                 if (rc != 0)
2821                         GOTO(out_unlock_new, rc);
2822
2823                 /* get and save version after locking */
2824                 mdt_version_get_save(info, mnew, 3);
2825         } else if (rc != -ENOENT) {
2826                 GOTO(out_put_old, rc);
2827         } else {
2828                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2829                 lh_lookup = &info->mti_lh[MDT_LH_LOOKUP];
2830                 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2831                                             lh_lookup,
2832                                             MDS_INODELOCK_LOOKUP |
2833                                             MDS_INODELOCK_XATTR);
2834                 if (rc != 0)
2835                         GOTO(out_put_old, rc);
2836
2837                 mdt_enoent_version_save(info, 3);
2838         }
2839
2840         /* step 5: rename it */
2841         mdt_reint_init_ma(info, ma);
2842
2843         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
2844                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
2845
2846         if (mnew != NULL)
2847                 mutex_lock(&mnew->mot_lov_mutex);
2848
2849         rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
2850                         mdt_object_child(mtgtdir), old_fid, &rr->rr_name,
2851                         mnew != NULL ? mdt_object_child(mnew) : NULL,
2852                         &rr->rr_tgt_name, ma);
2853
2854         if (mnew != NULL)
2855                 mutex_unlock(&mnew->mot_lov_mutex);
2856
2857         /* handle last link of tgt object */
2858         if (rc == 0) {
2859                 if (mnew) {
2860                         mdt_handle_last_unlink(info, mnew, ma);
2861                         discard = mdt_dom_check_for_discard(info, mnew);
2862                 }
2863                 mdt_rename_counter_tally(info, info->mti_mdt, req,
2864                                          msrcdir, mtgtdir, msi,
2865                                          ktime_us_delta(ktime_get(), kstart));
2866         }
2867
2868         EXIT;
2869 out_unlock_new:
2870         if (mnew != NULL)
2871                 /* mnew is gone, no need to keep lock */
2872                 mdt_object_unlock(info, mnew, lh_newp, 1);
2873 out_unlock_old:
2874         mdt_object_unlock(info, NULL, lh_lookup, rc);
2875         mdt_object_unlock(info, mold, lh_oldp, rc);
2876 out_put_new:
2877         if (mnew && !discard)
2878                 mdt_object_put(info->mti_env, mnew);
2879 out_put_old:
2880         mdt_object_put(info->mti_env, mold);
2881 out_unlock_parents:
2882         mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
2883         mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
2884 out_unlock_rename:
2885         mdt_rename_unlock(info, rename_lh);
2886 out_put_tgtdir:
2887         mdt_object_put(info->mti_env, mtgtdir);
2888 out_put_srcdir:
2889         mdt_object_put(info->mti_env, msrcdir);
2890
2891         /* The DoM discard can be done right in the place above where it is
2892          * assigned, meanwhile it is done here after rename unlock due to
2893          * compatibility with old clients, for them the discard blocks
2894          * the main thread until completion. Check LU-11359 for details.
2895          */
2896         if (discard) {
2897                 mdt_dom_discard_data(info, mnew);
2898                 mdt_object_put(info->mti_env, mnew);
2899         }
2900         CFS_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
2901         return rc;
2902 }
2903
2904 static int mdt_reint_resync(struct mdt_thread_info *info,
2905                             struct mdt_lock_handle *lhc)
2906 {
2907         struct mdt_reint_record *rr = &info->mti_rr;
2908         struct ptlrpc_request *req = mdt_info_req(info);
2909         struct md_attr *ma = &info->mti_attr;
2910         struct mdt_object *mo;
2911         struct ldlm_lock *lease;
2912         struct mdt_body *repbody;
2913         struct md_layout_change layout = { .mlc_mirror_id = rr->rr_mirror_id };
2914         bool lease_broken;
2915         int rc;
2916
2917         ENTRY;
2918         DEBUG_REQ(D_INODE, req, DFID", FLR file resync", PFID(rr->rr_fid1));
2919
2920         if (info->mti_dlm_req)
2921                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2922
2923         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
2924         if (IS_ERR(mo))
2925                 GOTO(out, rc = PTR_ERR(mo));
2926
2927         if (!mdt_object_exists(mo))
2928                 GOTO(out_obj, rc = -ENOENT);
2929
2930         if (!S_ISREG(lu_object_attr(&mo->mot_obj)))
2931                 GOTO(out_obj, rc = -EINVAL);
2932
2933         if (mdt_object_remote(mo))
2934                 GOTO(out_obj, rc = -EREMOTE);
2935
2936         lease = ldlm_handle2lock(rr->rr_lease_handle);
2937         if (lease == NULL)
2938                 GOTO(out_obj, rc = -ESTALE);
2939
2940         /* It's really necessary to grab open_sem and check if the lease lock
2941          * has been lost. There would exist a concurrent writer coming in and
2942          * generating some dirty data in memory cache, the writeback would fail
2943          * after the layout version is increased by MDS_REINT_RESYNC RPC.
2944          */
2945         if (!down_write_trylock(&mo->mot_open_sem))
2946                 GOTO(out_put_lease, rc = -EBUSY);
2947
2948         lock_res_and_lock(lease);
2949         lease_broken = ldlm_is_cancel(lease);
2950         unlock_res_and_lock(lease);
2951         if (lease_broken)
2952                 GOTO(out_unlock, rc = -EBUSY);
2953
2954         /* the file has yet opened by anyone else after we took the lease. */
2955         layout.mlc_opc = MD_LAYOUT_RESYNC;
2956         lhc = &info->mti_lh[MDT_LH_LOCAL];
2957         rc = mdt_layout_change(info, mo, lhc, &layout);
2958         if (rc)
2959                 GOTO(out_unlock, rc);
2960
2961         mdt_object_unlock(info, mo, lhc, 0);
2962
2963         ma->ma_need = MA_INODE;
2964         ma->ma_valid = 0;
2965         rc = mdt_attr_get_complex(info, mo, ma);
2966         if (rc != 0)
2967                 GOTO(out_unlock, rc);
2968
2969         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2970         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
2971
2972         EXIT;
2973 out_unlock:
2974         up_write(&mo->mot_open_sem);
2975 out_put_lease:
2976         LDLM_LOCK_PUT(lease);
2977 out_obj:
2978         mdt_object_put(info->mti_env, mo);
2979 out:
2980         mdt_client_compatibility(info);
2981         return rc;
2982 }
2983
2984 struct mdt_reinter {
2985         int (*mr_handler)(struct mdt_thread_info *, struct mdt_lock_handle *);
2986         enum lprocfs_extra_opc mr_extra_opc;
2987 };
2988
2989 static const struct mdt_reinter mdt_reinters[] = {
2990         [REINT_SETATTR] = {
2991                 .mr_handler = &mdt_reint_setattr,
2992                 .mr_extra_opc = MDS_REINT_SETATTR,
2993         },
2994         [REINT_CREATE] = {
2995                 .mr_handler = &mdt_reint_create,
2996                 .mr_extra_opc = MDS_REINT_CREATE,
2997         },
2998         [REINT_LINK] = {
2999                 .mr_handler = &mdt_reint_link,
3000                 .mr_extra_opc = MDS_REINT_LINK,
3001         },
3002         [REINT_UNLINK] = {
3003                 .mr_handler = &mdt_reint_unlink,
3004                 .mr_extra_opc = MDS_REINT_UNLINK,
3005         },
3006         [REINT_RENAME] = {
3007                 .mr_handler = &mdt_reint_rename,
3008                 .mr_extra_opc = MDS_REINT_RENAME,
3009         },
3010         [REINT_OPEN] = {
3011                 .mr_handler = &mdt_reint_open,
3012                 .mr_extra_opc = MDS_REINT_OPEN,
3013         },
3014         [REINT_SETXATTR] = {
3015                 .mr_handler = &mdt_reint_setxattr,
3016                 .mr_extra_opc = MDS_REINT_SETXATTR,
3017         },
3018         [REINT_RMENTRY] = {
3019                 .mr_handler = &mdt_reint_unlink,
3020                 .mr_extra_opc = MDS_REINT_UNLINK,
3021         },
3022         [REINT_MIGRATE] = {
3023                 .mr_handler = &mdt_reint_migrate,
3024                 .mr_extra_opc = MDS_REINT_RENAME,
3025         },
3026         [REINT_RESYNC] = {
3027                 .mr_handler = &mdt_reint_resync,
3028                 .mr_extra_opc = MDS_REINT_RESYNC,
3029         },
3030 };
3031
3032 int mdt_reint_rec(struct mdt_thread_info *info,
3033                   struct mdt_lock_handle *lhc)
3034 {
3035         const struct mdt_reinter *mr;
3036         int rc;
3037
3038         ENTRY;
3039         if (!(info->mti_rr.rr_opcode < ARRAY_SIZE(mdt_reinters)))
3040                 RETURN(-EPROTO);
3041
3042         mr = &mdt_reinters[info->mti_rr.rr_opcode];
3043         if (mr->mr_handler == NULL)
3044                 RETURN(-EPROTO);
3045
3046         rc = (*mr->mr_handler)(info, lhc);
3047
3048         lprocfs_counter_incr(ptlrpc_req2svc(mdt_info_req(info))->srv_stats,
3049                              PTLRPC_LAST_CNTR + mr->mr_extra_opc);
3050
3051         RETURN(rc);
3052 }