Whamcloud - gitweb
LU-13577 wbc: reimplement mkdir() by using intent lock
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mdt/mdt_reint.c
32  *
33  * Lustre Metadata Target (mdt) reintegration routines
34  *
35  * Author: Peter Braam <braam@clusterfs.com>
36  * Author: Andreas Dilger <adilger@clusterfs.com>
37  * Author: Phil Schwan <phil@clusterfs.com>
38  * Author: Huang Hua <huanghua@clusterfs.com>
39  * Author: Yury Umanets <umka@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_MDS
43
44 #include <lprocfs_status.h>
45 #include "mdt_internal.h"
46 #include <lustre_lmv.h>
47 #include <lustre_crypto.h>
48
49 static inline void mdt_reint_init_ma(struct mdt_thread_info *info,
50                                      struct md_attr *ma)
51 {
52         ma->ma_need = MA_INODE;
53         ma->ma_valid = 0;
54 }
55
56 /**
57  * Get version of object by fid.
58  *
59  * Return real version or ENOENT_VERSION if object doesn't exist
60  */
61 static void mdt_obj_version_get(struct mdt_thread_info *info,
62                                 struct mdt_object *o, __u64 *version)
63 {
64         LASSERT(o);
65
66         if (mdt_object_exists(o) && !mdt_object_remote(o) &&
67             !fid_is_obf(mdt_object_fid(o)))
68                 *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
69         else
70                 *version = ENOENT_VERSION;
71         CDEBUG(D_INODE, "FID "DFID" version is %#llx\n",
72                PFID(mdt_object_fid(o)), *version);
73 }
74
75 /**
76  * Check version is correct.
77  *
78  * Should be called only during replay.
79  */
80 static int mdt_version_check(struct ptlrpc_request *req,
81                              __u64 version, int idx)
82 {
83         __u64 *pre_ver = lustre_msg_get_versions(req->rq_reqmsg);
84
85         ENTRY;
86         if (!exp_connect_vbr(req->rq_export))
87                 RETURN(0);
88
89         LASSERT(req_is_replay(req));
90         /** VBR: version is checked always because costs nothing */
91         LASSERT(idx < PTLRPC_NUM_VERSIONS);
92         /** Sanity check for malformed buffers */
93         if (pre_ver == NULL) {
94                 CERROR("No versions in request buffer\n");
95                 spin_lock(&req->rq_export->exp_lock);
96                 req->rq_export->exp_vbr_failed = 1;
97                 spin_unlock(&req->rq_export->exp_lock);
98                 RETURN(-EOVERFLOW);
99         } else if (pre_ver[idx] != version) {
100                 CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n",
101                        pre_ver[idx], version);
102                 spin_lock(&req->rq_export->exp_lock);
103                 req->rq_export->exp_vbr_failed = 1;
104                 spin_unlock(&req->rq_export->exp_lock);
105                 RETURN(-EOVERFLOW);
106         }
107         RETURN(0);
108 }
109
110 /**
111  * Save pre-versions in reply.
112  */
113 static void mdt_version_save(struct ptlrpc_request *req, __u64 version,
114                              int idx)
115 {
116         __u64 *reply_ver;
117
118         if (!exp_connect_vbr(req->rq_export))
119                 return;
120
121         LASSERT(!req_is_replay(req));
122         LASSERT(req->rq_repmsg != NULL);
123         reply_ver = lustre_msg_get_versions(req->rq_repmsg);
124         if (reply_ver)
125                 reply_ver[idx] = version;
126 }
127
128 /**
129  * Save enoent version, it is needed when it is obvious that object doesn't
130  * exist, e.g. child during create.
131  */
132 static void mdt_enoent_version_save(struct mdt_thread_info *info, int idx)
133 {
134         /* save version of file name for replay, it must be ENOENT here */
135         if (!req_is_replay(mdt_info_req(info))) {
136                 info->mti_ver[idx] = ENOENT_VERSION;
137                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
138         }
139 }
140
141 /**
142  * Get version from disk and save in reply buffer.
143  *
144  * Versions are saved in reply only during normal operations not replays.
145  */
146 void mdt_version_get_save(struct mdt_thread_info *info,
147                           struct mdt_object *mto, int idx)
148 {
149         /* don't save versions during replay */
150         if (!req_is_replay(mdt_info_req(info))) {
151                 mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
152                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
153         }
154 }
155
156 /**
157  * Get version from disk and check it, no save in reply.
158  */
159 int mdt_version_get_check(struct mdt_thread_info *info,
160                           struct mdt_object *mto, int idx)
161 {
162         /* only check versions during replay */
163         if (!req_is_replay(mdt_info_req(info)))
164                 return 0;
165
166         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
167         return mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
168 }
169
170 /**
171  * Get version from disk and check if recovery or just save.
172  */
173 int mdt_version_get_check_save(struct mdt_thread_info *info,
174                                struct mdt_object *mto, int idx)
175 {
176         int rc = 0;
177
178         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
179         if (req_is_replay(mdt_info_req(info)))
180                 rc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx],
181                                        idx);
182         else
183                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
184         return rc;
185 }
186
187 /**
188  * Lookup with version checking.
189  *
190  * This checks version of 'name'. Many reint functions uses 'name' for child not
191  * FID, therefore we need to get object by name and check its version.
192  */
193 int mdt_lookup_version_check(struct mdt_thread_info *info,
194                              struct mdt_object *p,
195                              const struct lu_name *lname,
196                              struct lu_fid *fid, int idx)
197 {
198         int rc, vbrc;
199
200         rc = mdo_lookup(info->mti_env, mdt_object_child(p), lname, fid,
201                         &info->mti_spec);
202         /* Check version only during replay */
203         if (!req_is_replay(mdt_info_req(info)))
204                 return rc;
205
206         info->mti_ver[idx] = ENOENT_VERSION;
207         if (rc == 0) {
208                 struct mdt_object *child;
209
210                 child = mdt_object_find(info->mti_env, info->mti_mdt, fid);
211                 if (likely(!IS_ERR(child))) {
212                         mdt_obj_version_get(info, child, &info->mti_ver[idx]);
213                         mdt_object_put(info->mti_env, child);
214                 }
215         }
216         vbrc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
217         return vbrc ? vbrc : rc;
218
219 }
220
221 static int mdt_stripes_unlock(struct mdt_thread_info *mti,
222                               struct mdt_object *obj,
223                               struct ldlm_enqueue_info *einfo,
224                               int decref)
225 {
226         union ldlm_policy_data *policy = &mti->mti_policy;
227         struct mdt_lock_handle *lh = &mti->mti_lh[MDT_LH_LOCAL];
228         struct lustre_handle_array *locks = einfo->ei_cbdata;
229         int i;
230
231         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
232         LASSERT(locks);
233
234         memset(policy, 0, sizeof(*policy));
235         policy->l_inodebits.bits = einfo->ei_inodebits;
236         mdt_lock_reg_init(lh, einfo->ei_mode);
237         for (i = 0; i < locks->ha_count; i++) {
238                 if (test_bit(i, (void *)locks->ha_map))
239                         lh->mlh_rreg_lh = locks->ha_handles[i];
240                 else
241                         lh->mlh_reg_lh = locks->ha_handles[i];
242                 mdt_object_unlock(mti, NULL, lh, decref);
243                 locks->ha_handles[i].cookie = 0ull;
244         }
245
246         return mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
247                                 policy);
248 }
249
250 /**
251  * Lock slave stripes if necessary, the lock handles of slave stripes
252  * will be stored in einfo->ei_cbdata.
253  **/
254 static int mdt_stripes_lock(struct mdt_thread_info *mti, struct mdt_object *obj,
255                             enum ldlm_mode mode, __u64 ibits,
256                             struct ldlm_enqueue_info *einfo)
257 {
258         union ldlm_policy_data *policy = &mti->mti_policy;
259
260         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
261         einfo->ei_type = LDLM_IBITS;
262         einfo->ei_mode = mode;
263         einfo->ei_cb_bl = mdt_remote_blocking_ast;
264         einfo->ei_cb_local_bl = mdt_blocking_ast;
265         einfo->ei_cb_cp = ldlm_completion_ast;
266         einfo->ei_enq_slave = 1;
267         einfo->ei_namespace = mti->mti_mdt->mdt_namespace;
268         einfo->ei_inodebits = ibits;
269         einfo->ei_req_slot = 1;
270         memset(policy, 0, sizeof(*policy));
271         policy->l_inodebits.bits = ibits;
272         policy->l_inodebits.li_initiator_id = mdt_node_id(mti->mti_mdt);
273
274         return mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
275                               policy);
276 }
277
278 /** lock object, and stripes if it's a striped directory
279  *
280  * object should be local, this is called in operations which modify both object
281  * and stripes.
282  *
283  * \param info          struct mdt_thread_info
284  * \param parent        parent object, if it's NULL, find parent by mdo_lookup()
285  * \param child         child object
286  * \param lh            lock handle
287  * \param einfo         struct ldlm_enqueue_info
288  * \param ibits         MDS inode lock bits
289  * \param mode          lock mode
290  *
291  * \retval              0 on success, -ev on error.
292  */
293 int mdt_object_stripes_lock(struct mdt_thread_info *info,
294                             struct mdt_object *parent,
295                             struct mdt_object *child,
296                             struct mdt_lock_handle *lh,
297                             struct ldlm_enqueue_info *einfo, __u64 ibits,
298                             enum ldlm_mode mode)
299 {
300         int rc;
301
302         ENTRY;
303         /* according to the protocol, child should be local, is request sent to
304          * wrong MDT?
305          */
306         if (mdt_object_remote(child)) {
307                 CERROR("%s: lock target "DFID", but it is on other MDT: rc = %d\n",
308                        mdt_obd_name(info->mti_mdt), PFID(mdt_object_fid(child)),
309                        -EREMOTE);
310                 RETURN(-EREMOTE);
311         }
312
313         memset(einfo, 0, sizeof(*einfo));
314         if (ibits & MDS_INODELOCK_LOOKUP) {
315                 LASSERT(parent);
316                 rc = mdt_object_check_lock(info, parent, child, lh, ibits,
317                                            mode);
318         } else {
319                 rc = mdt_object_lock(info, child, lh, ibits, mode);
320         }
321         if (rc)
322                 RETURN(rc);
323
324         if (!S_ISDIR(child->mot_header.loh_attr))
325                 RETURN(0);
326
327         /* lock stripes for striped directory */
328         rc = mdt_stripes_lock(info, child, lh->mlh_reg_mode, ibits, einfo);
329         if (rc == -EIO && CFS_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME))
330                 rc = 0;
331         if (rc)
332                 mdt_object_unlock(info, child, lh, rc);
333
334         RETURN(rc);
335 }
336
337 void mdt_object_stripes_unlock(struct mdt_thread_info *info,
338                               struct mdt_object *obj,
339                               struct mdt_lock_handle *lh,
340                               struct ldlm_enqueue_info *einfo, int decref)
341 {
342         if (einfo->ei_cbdata)
343                 mdt_stripes_unlock(info, obj, einfo, decref);
344         mdt_object_unlock(info, obj, lh, decref);
345 }
346
347 static int mdt_restripe(struct mdt_thread_info *info,
348                         struct mdt_object *parent,
349                         const struct lu_name *lname,
350                         const struct lu_fid *tfid,
351                         struct md_op_spec *spec,
352                         struct md_attr *ma)
353 {
354         struct mdt_device *mdt = info->mti_mdt;
355         struct lu_fid *fid = &info->mti_tmp_fid2;
356         struct ldlm_enqueue_info *einfo = &info->mti_einfo;
357         struct lmv_user_md *lum = spec->u.sp_ea.eadata;
358         struct lu_ucred *uc = mdt_ucred(info);
359         struct lmv_mds_md_v1 *lmv;
360         struct mdt_object *child;
361         struct mdt_lock_handle *lhp;
362         struct mdt_lock_handle *lhc;
363         struct mdt_body *repbody;
364         int rc;
365
366         ENTRY;
367
368         /* we want rbac roles to have precedence over any other
369          * permission or capability checks
370          */
371         if (!mdt->mdt_enable_dir_restripe && !uc->uc_rbac_dne_ops)
372                 RETURN(-EPERM);
373
374         LASSERT(lum);
375         lum->lum_hash_type |= cpu_to_le32(LMV_HASH_FLAG_FIXED);
376
377         rc = mdt_version_get_check_save(info, parent, 0);
378         if (rc)
379                 RETURN(rc);
380
381         lhp = &info->mti_lh[MDT_LH_PARENT];
382         rc = mdt_parent_lock(info, parent, lhp, lname, LCK_PW);
383         if (rc)
384                 RETURN(rc);
385
386         rc = mdt_stripe_get(info, parent, ma, XATTR_NAME_LMV);
387         if (rc)
388                 GOTO(unlock_parent, rc);
389
390         if (ma->ma_valid & MA_LMV) {
391                 /* don't allow restripe if parent dir layout is changing */
392                 lmv = &ma->ma_lmv->lmv_md_v1;
393                 if (!lmv_is_sane2(lmv))
394                         GOTO(unlock_parent, rc = -EBADF);
395
396                 if (lmv_is_layout_changing(lmv))
397                         GOTO(unlock_parent, rc = -EBUSY);
398         }
399
400         fid_zero(fid);
401         rc = mdt_lookup_version_check(info, parent, lname, fid, 1);
402         if (rc)
403                 GOTO(unlock_parent, rc);
404
405         child = mdt_object_find(info->mti_env, mdt, fid);
406         if (IS_ERR(child))
407                 GOTO(unlock_parent, rc = PTR_ERR(child));
408
409         if (!mdt_object_exists(child))
410                 GOTO(out_child, rc = -ENOENT);
411
412         if (mdt_object_remote(child)) {
413                 struct mdt_body *repbody;
414
415                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
416                 if (!repbody)
417                         GOTO(out_child, rc = -EPROTO);
418
419                 repbody->mbo_fid1 = *fid;
420                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
421                 GOTO(out_child, rc = -EREMOTE);
422         }
423
424         if (!S_ISDIR(lu_object_attr(&child->mot_obj)))
425                 GOTO(out_child, rc = -ENOTDIR);
426
427         rc = mdt_stripe_get(info, child, ma, XATTR_NAME_LMV);
428         if (rc)
429                 GOTO(out_child, rc);
430
431         /* race with migrate? */
432         if ((ma->ma_valid & MA_LMV) &&
433              lmv_is_migrating(&ma->ma_lmv->lmv_md_v1))
434                 GOTO(out_child, rc = -EBUSY);
435
436         /* lock object */
437         lhc = &info->mti_lh[MDT_LH_CHILD];
438         rc = mdt_object_stripes_lock(info, parent, child, lhc, einfo,
439                                      MDS_INODELOCK_FULL, LCK_PW);
440         if (rc)
441                 GOTO(unlock_child, rc);
442
443         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
444         rc = mdt_version_get_check_save(info, child, 1);
445         if (rc)
446                 GOTO(unlock_child, rc);
447
448         spin_lock(&mdt->mdt_restriper.mdr_lock);
449         if (child->mot_restriping) {
450                 /* race? */
451                 spin_unlock(&mdt->mdt_restriper.mdr_lock);
452                 GOTO(unlock_child, rc = -EBUSY);
453         }
454         child->mot_restriping = 1;
455         spin_unlock(&mdt->mdt_restriper.mdr_lock);
456
457         *fid = *tfid;
458         rc = mdt_restripe_internal(info, parent, child, lname, fid, spec, ma);
459         if (rc)
460                 GOTO(restriping_clear, rc);
461
462         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
463         if (!repbody)
464                 GOTO(restriping_clear, rc = -EPROTO);
465
466         mdt_pack_attr2body(info, repbody, &ma->ma_attr, fid);
467         EXIT;
468
469 restriping_clear:
470         child->mot_restriping = 0;
471 unlock_child:
472         mdt_object_stripes_unlock(info, child, lhc, einfo, rc);
473 out_child:
474         mdt_object_put(info->mti_env, child);
475 unlock_parent:
476         mdt_object_unlock(info, parent, lhp, rc);
477
478         return rc;
479 }
480
481 /*
482  * VBR: we save three versions in reply:
483  * 0 - parent. Check that parent version is the same during replay.
484  * 1 - name. Version of 'name' if file exists with the same name or
485  * ENOENT_VERSION, it is needed because file may appear due to missed replays.
486  * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
487  * check.
488  */
489 static int mdt_create(struct mdt_thread_info *info, struct mdt_lock_handle *lhc)
490 {
491         struct mdt_device *mdt = info->mti_mdt;
492         struct mdt_object *parent;
493         struct mdt_object *child;
494         struct mdt_lock_handle *lh;
495         struct mdt_body *repbody;
496         struct md_attr *ma = &info->mti_attr;
497         struct mdt_reint_record *rr = &info->mti_rr;
498         struct md_op_spec *spec = &info->mti_spec;
499         struct lu_ucred *uc = mdt_ucred(info);
500         struct ldlm_reply *dlmrep = NULL;
501         bool restripe = false;
502         bool recreate_obj = false;
503         int rc;
504
505         ENTRY;
506         DEBUG_REQ(D_INODE, mdt_info_req(info),
507                   "Create ("DNAME"->"DFID") in "DFID,
508                   PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1));
509
510         if (!fid_is_md_operative(rr->rr_fid1))
511                 RETURN(-EPERM);
512
513         /* MDS_OPEN_DEFAULT_LMV means eadata is parent default LMV, which is set
514          * if client maintains inherited default LMV
515          */
516         if (S_ISDIR(ma->ma_attr.la_mode) &&
517             spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0 &&
518             !(spec->sp_cr_flags & MDS_OPEN_DEFAULT_LMV)) {
519                 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
520                 struct obd_export *exp = mdt_info_req(info)->rq_export;
521
522                 /* Only new clients can create remote dir( >= 2.4) and
523                  * striped dir(>= 2.6), old client will return -ENOTSUPP
524                  */
525                 if (!mdt_is_dne_client(exp))
526                         RETURN(-ENOTSUPP);
527
528                 if (le32_to_cpu(lum->lum_stripe_count) > 1) {
529                         if (!mdt_is_striped_client(exp))
530                                 RETURN(-ENOTSUPP);
531
532                         if (!mdt->mdt_enable_striped_dir)
533                                 RETURN(-EPERM);
534                 } else if (!mdt->mdt_enable_remote_dir) {
535                         RETURN(-EPERM);
536                 }
537
538                 if ((!(exp_connect_flags2(exp) & OBD_CONNECT2_CRUSH)) &&
539                     (le32_to_cpu(lum->lum_hash_type) & LMV_HASH_TYPE_MASK) >=
540                     LMV_HASH_TYPE_CRUSH)
541                         RETURN(-EPROTO);
542
543                 /* we want rbac roles to have precedence over any other
544                  * permission or capability checks
545                  */
546                 if (!uc->uc_rbac_dne_ops ||
547                     (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
548                      uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
549                      mdt->mdt_enable_remote_dir_gid != -1))
550                         RETURN(-EPERM);
551
552                 /* restripe if later found dir exists, MDS_OPEN_CREAT means
553                  * this is create only, don't try restripe.
554                  */
555                 if (mdt->mdt_enable_dir_restripe &&
556                     le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT &&
557                     !(spec->sp_cr_flags & MDS_OPEN_CREAT))
558                         restripe = true;
559         }
560
561         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
562         /*
563          * TODO: rewrite ll_mknod(), ll_create_nd(), ll_symlink(),
564          * ll_dir_setdirstripe() to all use intent lock.
565          */
566         if (info->mti_intent_lock) {
567                 dlmrep = req_capsule_server_get(info->mti_pill, &RMF_DLM_REP);
568                 mdt_set_disposition(info, dlmrep,
569                                     DISP_IT_EXECD | DISP_LOOKUP_EXECD);
570         }
571
572         parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
573         if (IS_ERR(parent))
574                 RETURN(PTR_ERR(parent));
575
576         if (!mdt_object_exists(parent))
577                 GOTO(put_parent, rc = -ENOENT);
578
579         rc = mdt_check_enc(info, parent);
580         if (rc)
581                 GOTO(put_parent, rc);
582
583         if (!uc->uc_rbac_fscrypt_admin &&
584             parent->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD)
585                 GOTO(put_parent, rc = -EPERM);
586
587         info->mti_spec.sp_replay = req_is_replay(mdt_info_req(info));
588
589         /*
590          * LU-10235: check if name exists locklessly first to avoid massive
591          * lock recalls on existing directories.
592          */
593         rc = mdo_lookup(info->mti_env, mdt_object_child(parent), &rr->rr_name,
594                         &info->mti_tmp_fid1, &info->mti_spec);
595         if (rc == 0) {
596                 /* mkdir may be partially executed: name entry was successfully
597                  * inserted into parent diretory on remote MDT, while target not
598                  * created on local MDT. This happens when update log recovery
599                  * is aborted, and mkdir is replayed by client request.
600                  */
601                 if (unlikely(!(info->mti_spec.sp_replay &&
602                                mdt_object_remote(parent)) &&
603                              !restripe))
604                         GOTO(put_parent, rc = -EEXIST);
605
606                 child = mdt_object_find(info->mti_env, info->mti_mdt,
607                                         &info->mti_tmp_fid1);
608                 if (unlikely(IS_ERR(child)))
609                         GOTO(put_parent, rc = PTR_ERR(child));
610
611                 if (mdt_object_exists(child)) {
612                         mdt_object_put(info->mti_env, child);
613                         rc = -EEXIST;
614                         if (restripe)
615                                 rc = mdt_restripe(info, parent, &rr->rr_name,
616                                                   rr->rr_fid2, spec, ma);
617                         GOTO(put_parent, rc);
618                 }
619                 mdt_object_put(info->mti_env, child);
620                 recreate_obj = true;
621         } else if (rc != -ENOENT) {
622                 GOTO(put_parent, rc);
623         }
624
625         if (unlikely(info->mti_spec.sp_replay)) {
626                 /* check version only during replay */
627                 rc = mdt_version_check(mdt_info_req(info), ENOENT_VERSION, 1);
628                 if (rc)
629                         GOTO(put_parent, rc);
630         } else {
631                 CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_PAUSE_CREATE_AFTER_LOOKUP,
632                                  cfs_fail_val);
633
634                 /* save version of file name for replay, must be ENOENT here */
635                 mdt_enoent_version_save(info, 1);
636         }
637
638         CFS_RACE(OBD_FAIL_MDS_CREATE_RACE);
639
640         lh = &info->mti_lh[MDT_LH_PARENT];
641         rc = mdt_parent_lock(info, parent, lh, &rr->rr_name, LCK_PW);
642         if (rc)
643                 GOTO(put_parent, rc);
644
645         if (!mdt_object_remote(parent)) {
646                 rc = mdt_version_get_check_save(info, parent, 0);
647                 if (rc)
648                         GOTO(unlock_parent, rc);
649         }
650
651         /*
652          * now repeat the lookup having a LDLM lock on the parent dir,
653          * as another thread could create the same name. notice this
654          * lookup is supposed to hit cache in OSD and be cheap if the
655          * directory is not being modified concurrently.
656          */
657         rc = mdo_lookup(info->mti_env, mdt_object_child(parent), &rr->rr_name,
658                         &info->mti_tmp_fid1, &info->mti_spec);
659         if (unlikely(rc == 0 && !recreate_obj))
660                 GOTO(unlock_parent, rc = -EEXIST);
661
662         if (info->mti_intent_lock)
663                 mdt_set_disposition(info, dlmrep, DISP_OPEN_CREATE);
664
665         child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
666         if (unlikely(IS_ERR(child)))
667                 GOTO(unlock_parent, rc = PTR_ERR(child));
668
669         ma->ma_need = MA_INODE;
670         ma->ma_valid = 0;
671
672         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
673                         OBD_FAIL_MDS_REINT_CREATE_WRITE);
674
675         /* Version of child will be updated on disk. */
676         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
677         rc = mdt_version_get_check_save(info, child, 2);
678         if (rc)
679                 GOTO(put_child, rc);
680
681         if (parent->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD ||
682             (rr->rr_name.ln_namelen == strlen(dot_fscrypt_name) &&
683              strncmp(rr->rr_name.ln_name, dot_fscrypt_name,
684                      rr->rr_name.ln_namelen) == 0))
685                 child->mot_obj.lo_header->loh_attr |= LOHA_FSCRYPT_MD;
686
687         /*
688          * Do not perform lookup sanity check. We know that name does
689          * not exist.
690          */
691         info->mti_spec.sp_cr_lookup = 0;
692         if (mdt_object_remote(parent))
693                 info->mti_spec.sp_cr_lookup = 1;
694         info->mti_spec.sp_feat = &dt_directory_features;
695
696         /* set jobid xattr name from sysfs parameter */
697         strncpy(info->mti_spec.sp_cr_job_xattr, mdt->mdt_job_xattr,
698                 XATTR_JOB_MAX_LEN);
699
700         rc = mdo_create(info->mti_env, mdt_object_child(parent), &rr->rr_name,
701                         mdt_object_child(child), &info->mti_spec, ma);
702         if (rc < 0)
703                 GOTO(put_child, rc);
704
705         if ((S_ISDIR(ma->ma_attr.la_mode) &&
706              (info->mti_spec.sp_cr_flags & MDS_MKDIR_LMV)) ||
707              info->mti_intent_lock)
708                 mdt_prep_ma_buf_from_rep(info, child, ma, 0);
709
710         rc = mdt_attr_get_complex(info, child, ma);
711         if (rc < 0)
712                 GOTO(put_child, rc);
713
714         if (ma->ma_valid & MA_LOV) {
715                 LASSERT(ma->ma_lmm_size != 0);
716                 repbody->mbo_eadatasize = ma->ma_lmm_size;
717                 if (S_ISREG(ma->ma_attr.la_mode))
718                         repbody->mbo_valid |= OBD_MD_FLEASIZE;
719                 else if (S_ISDIR(ma->ma_attr.la_mode))
720                         repbody->mbo_valid |= OBD_MD_FLDIREA;
721         }
722
723         if (ma->ma_valid & MA_LMV) {
724                 mdt_dump_lmv(D_INFO, ma->ma_lmv);
725                 repbody->mbo_eadatasize = ma->ma_lmv_size;
726                 repbody->mbo_valid |= (OBD_MD_FLDIREA|OBD_MD_MEA);
727         }
728
729         if (ma->ma_valid & MA_LMV_DEF) {
730                 /* Return -EOPNOTSUPP for old client. */
731                 if (!mdt_is_striped_client(mdt_info_req(info)->rq_export))
732                         GOTO(put_child, rc = -EOPNOTSUPP);
733
734                 LASSERT(S_ISDIR(ma->ma_attr.la_mode));
735                 repbody->mbo_valid |= OBD_MD_FLDIREA | OBD_MD_DEFAULT_MEA;
736         }
737
738         /* save child locks to eliminate dependey between 'mkdir a' and
739          * 'mkdir a/b' if b is a remote directory
740          */
741         if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode) &&
742             !info->mti_intent_lock) {
743                 struct mdt_lock_handle *lhc;
744                 struct ldlm_enqueue_info *einfo = &info->mti_einfo;
745
746                 lhc = &info->mti_lh[MDT_LH_CHILD];
747                 rc = mdt_object_stripes_lock(info, parent, child, lhc, einfo,
748                                              MDS_INODELOCK_UPDATE, LCK_PW);
749                 if (rc)
750                         GOTO(put_child, rc);
751
752                 mdt_object_stripes_unlock(info, child, lhc, einfo, rc);
753         }
754
755         /* Return fid & attr to client. */
756         if (ma->ma_valid & MA_INODE)
757                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
758                                    mdt_object_fid(child));
759
760         if (info->mti_intent_lock) {
761                 mdt_set_disposition(info, dlmrep, DISP_LOOKUP_NEG);
762                 rc = mdt_check_resent_lock(info, child, lhc);
763                 /*
764                  * rc < 0 is error and we fall right back through,
765                  * rc == 0 is the open lock might already be gotten in
766                  * ldlm_handle_enqueue due to this being a resend.
767                  */
768                 if (rc <= 0)
769                         GOTO(put_child, rc);
770
771                 /*
772                  * For the normal intent create (mkdir):
773                  * - Grant LOOKUP lock with CR mode to the client at
774                  *   least.
775                  * - Grant the lock similar to getattr():
776                  *   lock mode: PR;
777                  *   inodebits: LOOK | UPDATE | PERM [| LAYOUT].
778                  * However, it can not grant LCK_CR to the client as during
779                  * the setting of LMV layout for a directory from a client,
780                  * it will acquire LCK_PW mode lock which is compat with LCK_CR
781                  * lock mode, this may result that the cached LMV layout on a
782                  * client will not be released when set (default) LMV layout on
783                  * a directory.
784                  * Due to the above reason, it grants a lock with LCK_PR mode to
785                  * the client.
786                  */
787                 rc = mdt_object_lock(info, child, lhc, MDS_INODELOCK_LOOKUP |
788                                      MDS_INODELOCK_UPDATE | MDS_INODELOCK_PERM,
789                                      LCK_PR);
790         }
791
792         EXIT;
793 put_child:
794         mdt_object_put(info->mti_env, child);
795 unlock_parent:
796         mdt_object_unlock(info, parent, lh, rc);
797         if (rc && dlmrep)
798                 mdt_clear_disposition(info, dlmrep, DISP_OPEN_CREATE);
799 put_parent:
800         mdt_object_put(info->mti_env, parent);
801         return rc;
802 }
803
804 static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
805                         struct md_attr *ma)
806 {
807         struct mdt_lock_handle  *lh;
808         int do_vbr = ma->ma_attr.la_valid &
809                         (LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS);
810         __u64 lockpart = MDS_INODELOCK_UPDATE;
811         struct ldlm_enqueue_info *einfo = &info->mti_einfo;
812         int rc;
813
814         ENTRY;
815         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
816                 lockpart |= MDS_INODELOCK_PERM;
817         /* Clear xattr cache on clients, so the virtual project ID xattr
818          * can get the new project ID
819          */
820         if (ma->ma_attr.la_valid & LA_PROJID)
821                 lockpart |= MDS_INODELOCK_XATTR;
822
823         lh = &info->mti_lh[MDT_LH_PARENT];
824         rc = mdt_object_stripes_lock(info, NULL, mo, lh, einfo, lockpart,
825                                      LCK_PW);
826         if (rc != 0)
827                 RETURN(rc);
828
829         /* all attrs are packed into mti_attr in unpack_setattr */
830         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
831                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
832
833         /* VBR: update version if attr changed are important for recovery */
834         if (do_vbr) {
835                 /* update on-disk version of changed object */
836                 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mo));
837                 rc = mdt_version_get_check_save(info, mo, 0);
838                 if (rc)
839                         GOTO(out_unlock, rc);
840         }
841
842         /* Ensure constant striping during chown(). See LU-2789. */
843         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
844                 mutex_lock(&mo->mot_lov_mutex);
845
846         /* all attrs are packed into mti_attr in unpack_setattr */
847         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
848
849         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
850                 mutex_unlock(&mo->mot_lov_mutex);
851
852         if (rc != 0)
853                 GOTO(out_unlock, rc);
854         mdt_dom_obj_lvb_update(info->mti_env, mo, NULL, false);
855         EXIT;
856 out_unlock:
857         mdt_object_stripes_unlock(info, mo, lh, einfo, rc);
858         return rc;
859 }
860
861 /**
862  * Check HSM flags and add HS_DIRTY flag if relevant.
863  *
864  * A file could be set dirty only if it has a copy in the backend (HS_EXISTS)
865  * and is not RELEASED.
866  */
867 int mdt_add_dirty_flag(struct mdt_thread_info *info, struct mdt_object *mo,
868                         struct md_attr *ma)
869 {
870         struct lu_ucred *uc = mdt_ucred(info);
871         kernel_cap_t cap_saved;
872         int rc;
873
874         ENTRY;
875         /* If the file was modified, add the dirty flag */
876         ma->ma_need = MA_HSM;
877         rc = mdt_attr_get_complex(info, mo, ma);
878         if (rc) {
879                 CERROR("file attribute read error for "DFID": %d.\n",
880                         PFID(mdt_object_fid(mo)), rc);
881                 RETURN(rc);
882         }
883
884         /* If an up2date copy exists in the backend, add dirty flag */
885         if ((ma->ma_valid & MA_HSM) && (ma->ma_hsm.mh_flags & HS_EXISTS)
886             && !(ma->ma_hsm.mh_flags & (HS_DIRTY|HS_RELEASED))) {
887                 ma->ma_hsm.mh_flags |= HS_DIRTY;
888
889                 /* Bump cap so that closes from non-owner writers can
890                  * set the HSM state to dirty.
891                  */
892                 cap_saved = uc->uc_cap;
893                 cap_raise(uc->uc_cap, CAP_FOWNER);
894                 rc = mdt_hsm_attr_set(info, mo, &ma->ma_hsm);
895                 uc->uc_cap = cap_saved;
896                 if (rc)
897                         CERROR("file attribute change error for "DFID": %d\n",
898                                 PFID(mdt_object_fid(mo)), rc);
899         }
900
901         RETURN(rc);
902 }
903
904 static int mdt_reint_setattr(struct mdt_thread_info *info,
905                              struct mdt_lock_handle *lhc)
906 {
907         struct mdt_device *mdt = info->mti_mdt;
908         struct md_attr *ma = &info->mti_attr;
909         struct mdt_reint_record *rr = &info->mti_rr;
910         struct ptlrpc_request *req = mdt_info_req(info);
911         struct mdt_object *mo;
912         struct mdt_body *repbody;
913         ktime_t kstart = ktime_get();
914         int rc;
915
916         ENTRY;
917         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
918                   (unsigned int)ma->ma_attr.la_valid);
919
920         if (info->mti_dlm_req)
921                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
922
923         CFS_RACE(OBD_FAIL_PTLRPC_RESEND_RACE);
924
925         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
926         mo = mdt_object_find(info->mti_env, mdt, rr->rr_fid1);
927         if (IS_ERR(mo))
928                 GOTO(out, rc = PTR_ERR(mo));
929
930         if (!mdt_object_exists(mo))
931                 GOTO(out_put, rc = -ENOENT);
932
933         if (mdt_object_remote(mo))
934                 GOTO(out_put, rc = -EREMOTE);
935
936         ma->ma_enable_chprojid_gid = mdt->mdt_enable_chprojid_gid;
937         /* revoke lease lock if size is going to be changed */
938         if (unlikely(ma->ma_attr.la_valid & LA_SIZE &&
939                      !(ma->ma_attr_flags & MDS_TRUNC_KEEP_LEASE) &&
940                      atomic_read(&mo->mot_lease_count) > 0)) {
941                 down_read(&mo->mot_open_sem);
942
943                 if (atomic_read(&mo->mot_lease_count) > 0) { /* lease exists */
944                         lhc = &info->mti_lh[MDT_LH_LOCAL];
945                         rc = mdt_object_lock(info, mo, lhc, MDS_INODELOCK_OPEN,
946                                              LCK_CW);
947                         if (rc != 0) {
948                                 up_read(&mo->mot_open_sem);
949                                 GOTO(out_put, rc);
950                         }
951
952                         /* revoke lease lock */
953                         mdt_object_unlock(info, mo, lhc, 1);
954                 }
955                 up_read(&mo->mot_open_sem);
956         }
957
958         if (ma->ma_attr.la_valid & LA_SIZE || rr->rr_flags & MRF_OPEN_TRUNC) {
959                 /* Check write access for the O_TRUNC case */
960                 if (mdt_write_read(mo) < 0)
961                         GOTO(out_put, rc = -ETXTBSY);
962
963                 /* LU-10286: compatibility check for FLR.
964                  * Please check the comment in mdt_finish_open() for details
965                  */
966                 if (!exp_connect_flr(info->mti_exp) ||
967                     !exp_connect_overstriping(info->mti_exp)) {
968                         rc = mdt_big_xattr_get(info, mo, XATTR_NAME_LOV);
969                         if (rc < 0 && rc != -ENODATA)
970                                 GOTO(out_put, rc);
971
972                         if (!exp_connect_flr(info->mti_exp)) {
973                                 if (rc > 0 &&
974                                     mdt_lmm_is_flr(info->mti_big_lmm))
975                                         GOTO(out_put, rc = -EOPNOTSUPP);
976                         }
977
978                         if (!exp_connect_overstriping(info->mti_exp)) {
979                                 if (rc > 0 &&
980                                     mdt_lmm_is_overstriping(info->mti_big_lmm))
981                                         GOTO(out_put, rc = -EOPNOTSUPP);
982                         }
983                 }
984
985                 /* For truncate, the file size sent from client
986                  * is believable, but the blocks are incorrect,
987                  * which makes the block size in LSOM attribute
988                  * inconsisent with the real block size.
989                  */
990                 rc = mdt_lsom_update(info, mo, true);
991                 if (rc)
992                         GOTO(out_put, rc);
993         }
994
995         if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
996                 if (ma->ma_valid & MA_LOV)
997                         GOTO(out_put, rc = -EPROTO);
998
999                 /* MDT supports FMD for regular files due to Data-on-MDT */
1000                 if (S_ISREG(lu_object_attr(&mo->mot_obj)) &&
1001                     ma->ma_attr.la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) {
1002                         tgt_fmd_update(info->mti_exp, mdt_object_fid(mo),
1003                                        req->rq_xid);
1004
1005                         if (ma->ma_attr.la_valid & LA_MTIME) {
1006                                 rc = mdt_attr_get_pfid(info, mo, &ma->ma_pfid);
1007                                 if (!rc)
1008                                         ma->ma_valid |= MA_PFID;
1009                         }
1010                 }
1011
1012                 rc = mdt_attr_set(info, mo, ma);
1013                 if (rc)
1014                         GOTO(out_put, rc);
1015         } else if ((ma->ma_valid & (MA_LOV | MA_LMV)) &&
1016                    (ma->ma_valid & MA_INODE)) {
1017                 struct lu_buf *buf = &info->mti_buf;
1018                 struct lu_ucred *uc = mdt_ucred(info);
1019                 struct mdt_lock_handle *lh;
1020                 const char *name;
1021
1022                 /* reject if either remote or striped dir is disabled */
1023                 if (ma->ma_valid & MA_LMV) {
1024                         if (!mdt->mdt_enable_remote_dir ||
1025                             !mdt->mdt_enable_striped_dir)
1026                                 GOTO(out_put, rc = -EPERM);
1027
1028                         /* we want rbac roles to have precedence over any other
1029                          * permission or capability checks
1030                          */
1031                         if (!uc->uc_rbac_dne_ops ||
1032                             (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
1033                              uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
1034                              mdt->mdt_enable_remote_dir_gid != -1))
1035                                 GOTO(out_put, rc = -EPERM);
1036                 }
1037
1038                 if (!S_ISDIR(lu_object_attr(&mo->mot_obj)))
1039                         GOTO(out_put, rc = -ENOTDIR);
1040
1041                 if (ma->ma_attr.la_valid != 0)
1042                         GOTO(out_put, rc = -EPROTO);
1043
1044                 lh = &info->mti_lh[MDT_LH_PARENT];
1045                 if (ma->ma_valid & MA_LOV) {
1046                         buf->lb_buf = ma->ma_lmm;
1047                         buf->lb_len = ma->ma_lmm_size;
1048                         name = XATTR_NAME_LOV;
1049                         rc = mdt_object_lock(info, mo, lh, MDS_INODELOCK_XATTR,
1050                                              LCK_PW);
1051                 } else {
1052                         buf->lb_buf = &ma->ma_lmv->lmv_user_md;
1053                         buf->lb_len = ma->ma_lmv_size;
1054                         name = XATTR_NAME_DEFAULT_LMV;
1055
1056                         if (unlikely(fid_is_root(mdt_object_fid(mo)))) {
1057                                 rc = mdt_object_lock(info, mo, lh,
1058                                                      MDS_INODELOCK_XATTR |
1059                                                      MDS_INODELOCK_LOOKUP,
1060                                                      LCK_PW);
1061                         } else {
1062                                 struct lu_fid *pfid = &info->mti_tmp_fid1;
1063                                 struct lu_name *pname = &info->mti_name;
1064                                 const char dotdot[] = "..";
1065                                 struct mdt_object *pobj;
1066
1067                                 fid_zero(pfid);
1068                                 pname->ln_name = dotdot;
1069                                 pname->ln_namelen = sizeof(dotdot);
1070                                 rc = mdo_lookup(info->mti_env,
1071                                                 mdt_object_child(mo), pname,
1072                                                 pfid, NULL);
1073                                 if (rc)
1074                                         GOTO(out_put, rc);
1075
1076                                 pobj = mdt_object_find(info->mti_env,
1077                                                        info->mti_mdt, pfid);
1078                                 if (IS_ERR(pobj))
1079                                         GOTO(out_put, rc = PTR_ERR(pobj));
1080
1081                                 rc = mdt_object_check_lock(info, pobj, mo, lh,
1082                                                            MDS_INODELOCK_XATTR |
1083                                                            MDS_INODELOCK_LOOKUP,
1084                                                            LCK_PW);
1085                                 mdt_object_put(info->mti_env, pobj);
1086                         }
1087                 }
1088
1089                 if (rc != 0)
1090                         GOTO(out_put, rc);
1091
1092                 rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), buf,
1093                                   name, 0);
1094
1095                 mdt_object_unlock(info, mo, lh, rc);
1096                 if (rc)
1097                         GOTO(out_put, rc);
1098         } else {
1099                 GOTO(out_put, rc = -EPROTO);
1100         }
1101
1102         /* If file data is modified, add the dirty flag */
1103         if (ma->ma_attr_flags & MDS_DATA_MODIFIED)
1104                 rc = mdt_add_dirty_flag(info, mo, ma);
1105
1106         ma->ma_need = MA_INODE;
1107         ma->ma_valid = 0;
1108         rc = mdt_attr_get_complex(info, mo, ma);
1109         if (rc != 0)
1110                 GOTO(out_put, rc);
1111
1112         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
1113
1114         EXIT;
1115 out_put:
1116         mdt_object_put(info->mti_env, mo);
1117 out:
1118         if (rc == 0)
1119                 mdt_counter_incr(req, LPROC_MDT_SETATTR,
1120                                  ktime_us_delta(ktime_get(), kstart));
1121
1122         mdt_client_compatibility(info);
1123         return rc;
1124 }
1125
1126 static int mdt_reint_create(struct mdt_thread_info *info,
1127                             struct mdt_lock_handle *lhc)
1128 {
1129         struct ptlrpc_request   *req = mdt_info_req(info);
1130         ktime_t                 kstart = ktime_get();
1131         int                     rc;
1132
1133         ENTRY;
1134         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
1135                 RETURN(err_serious(-ESTALE));
1136
1137         if (info->mti_dlm_req)
1138                 ldlm_request_cancel(mdt_info_req(info),
1139                                     info->mti_dlm_req, 0, LATF_SKIP);
1140
1141         if (!lu_name_is_valid(&info->mti_rr.rr_name))
1142                 RETURN(-EPROTO);
1143
1144         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
1145         case S_IFDIR:
1146         case S_IFREG:
1147         case S_IFLNK:
1148         case S_IFCHR:
1149         case S_IFBLK:
1150         case S_IFIFO:
1151         case S_IFSOCK:
1152                 break;
1153         default:
1154                 CERROR("%s: Unsupported mode %o\n",
1155                        mdt_obd_name(info->mti_mdt),
1156                        info->mti_attr.ma_attr.la_mode);
1157                 RETURN(err_serious(-EOPNOTSUPP));
1158         }
1159
1160         rc = mdt_create(info, lhc);
1161         if (rc == 0) {
1162                 if ((info->mti_attr.ma_attr.la_mode & S_IFMT) == S_IFDIR)
1163                         mdt_counter_incr(req, LPROC_MDT_MKDIR,
1164                                          ktime_us_delta(ktime_get(), kstart));
1165                 else
1166                         /* Special file should stay on the same node as parent*/
1167                         mdt_counter_incr(req, LPROC_MDT_MKNOD,
1168                                          ktime_us_delta(ktime_get(), kstart));
1169         }
1170
1171         RETURN(rc);
1172 }
1173
1174 /*
1175  * VBR: save parent version in reply and child version getting by its name.
1176  * Version of child is getting and checking during its lookup. If
1177  */
1178 static int mdt_reint_unlink(struct mdt_thread_info *info,
1179                             struct mdt_lock_handle *lhc)
1180 {
1181         struct mdt_reint_record *rr = &info->mti_rr;
1182         struct ptlrpc_request *req = mdt_info_req(info);
1183         struct md_attr *ma = &info->mti_attr;
1184         struct lu_fid *child_fid = &info->mti_tmp_fid1;
1185         struct mdt_object *mp;
1186         struct mdt_object *mc;
1187         struct mdt_lock_handle *parent_lh;
1188         struct mdt_lock_handle *child_lh;
1189         struct ldlm_enqueue_info *einfo = &info->mti_einfo;
1190         struct lu_ucred *uc  = mdt_ucred(info);
1191         int no_name = 0;
1192         ktime_t kstart = ktime_get();
1193         int rc;
1194
1195         ENTRY;
1196         DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1),
1197                   PNAME(&rr->rr_name));
1198
1199         if (info->mti_dlm_req)
1200                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1201
1202         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
1203                 RETURN(err_serious(-ENOENT));
1204
1205         if (!fid_is_md_operative(rr->rr_fid1))
1206                 RETURN(-EPERM);
1207
1208         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1209         if (IS_ERR(mp))
1210                 RETURN(PTR_ERR(mp));
1211
1212         if (!mdt_object_remote(mp)) {
1213                 rc = mdt_version_get_check_save(info, mp, 0);
1214                 if (rc)
1215                         GOTO(put_parent, rc);
1216         }
1217
1218         if (!uc->uc_rbac_fscrypt_admin &&
1219             mp->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD)
1220                 GOTO(put_parent, rc = -EPERM);
1221
1222         CFS_RACE(OBD_FAIL_MDS_REINT_OPEN);
1223         CFS_RACE(OBD_FAIL_MDS_REINT_OPEN2);
1224         parent_lh = &info->mti_lh[MDT_LH_PARENT];
1225         rc = mdt_parent_lock(info, mp, parent_lh, &rr->rr_name, LCK_PW);
1226         if (rc != 0)
1227                 GOTO(put_parent, rc);
1228
1229         if (info->mti_spec.sp_rm_entry) {
1230                 if (!mdt_is_dne_client(req->rq_export))
1231                         /* Return -ENOTSUPP for old client */
1232                         GOTO(unlock_parent, rc = -ENOTSUPP);
1233
1234                 if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN))
1235                         GOTO(unlock_parent, rc = -EPERM);
1236
1237                 ma->ma_need = MA_INODE;
1238                 ma->ma_valid = 0;
1239                 rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1240                                 NULL, &rr->rr_name, ma, no_name);
1241                 GOTO(unlock_parent, rc);
1242         }
1243
1244         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID) {
1245                 *child_fid = *rr->rr_fid2;
1246         } else {
1247                 /* lookup child object along with version checking */
1248                 fid_zero(child_fid);
1249                 rc = mdt_lookup_version_check(info, mp, &rr->rr_name, child_fid,
1250                                               1);
1251                 if (rc != 0) {
1252                         /* Name might not be able to find during resend of
1253                          * remote unlink, considering following case.
1254                          * dir_A is a remote directory, the name entry of
1255                          * dir_A is on MDT0, the directory is on MDT1,
1256                          *
1257                          * 1. client sends unlink req to MDT1.
1258                          * 2. MDT1 sends name delete update to MDT0.
1259                          * 3. name entry is being deleted in MDT0 synchronously.
1260                          * 4. MDT1 is restarted.
1261                          * 5. client resends unlink req to MDT1. So it can not
1262                          *    find the name entry on MDT0 anymore.
1263                          * In this case, MDT1 only needs to destory the local
1264                          * directory.
1265                          */
1266                         if (mdt_object_remote(mp) && rc == -ENOENT &&
1267                             !fid_is_zero(rr->rr_fid2) &&
1268                             lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
1269                                 no_name = 1;
1270                                 *child_fid = *rr->rr_fid2;
1271                         } else {
1272                                 GOTO(unlock_parent, rc);
1273                         }
1274                 }
1275         }
1276
1277         if (!fid_is_md_operative(child_fid))
1278                 GOTO(unlock_parent, rc = -EPERM);
1279
1280         /* We will lock the child regardless it is local or remote. No harm. */
1281         mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
1282         if (IS_ERR(mc))
1283                 GOTO(unlock_parent, rc = PTR_ERR(mc));
1284
1285         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID) {
1286                 /* In this case, child fid is embedded in the request, and we do
1287                  * not have a proper name as rr_name contains an encoded
1288                  * hash. So find name that matches provided hash.
1289                  */
1290                 if (!find_name_matching_hash(info, &rr->rr_name,
1291                                              NULL, mc))
1292                         GOTO(put_child, rc = -ENOENT);
1293         }
1294
1295         child_lh = &info->mti_lh[MDT_LH_CHILD];
1296         if (mdt_object_remote(mc)) {
1297                 struct mdt_body  *repbody;
1298
1299                 if (!fid_is_zero(rr->rr_fid2)) {
1300                         CDEBUG(D_INFO, "%s: name "DNAME" cannot find "DFID"\n",
1301                                mdt_obd_name(info->mti_mdt),
1302                                PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1303                         GOTO(put_child, rc = -ENOENT);
1304                 }
1305                 CDEBUG(D_INFO, "%s: name "DNAME": "DFID" is on another MDT\n",
1306                        mdt_obd_name(info->mti_mdt),
1307                        PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1308
1309                 if (!mdt_is_dne_client(req->rq_export))
1310                         /* Return -ENOTSUPP for old client */
1311                         GOTO(put_child, rc = -ENOTSUPP);
1312
1313                 /* Revoke the LOOKUP lock of the remote object granted by
1314                  * this MDT. Since the unlink will happen on another MDT,
1315                  * it will release the LOOKUP lock right away. Then What
1316                  * would happen if another client try to grab the LOOKUP
1317                  * lock at the same time with unlink XXX
1318                  */
1319                 rc = mdt_object_lookup_lock(info, NULL, mc, child_lh, LCK_EX);
1320                 if (rc)
1321                         GOTO(put_child, rc);
1322
1323                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1324                 LASSERT(repbody != NULL);
1325                 repbody->mbo_fid1 = *mdt_object_fid(mc);
1326                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
1327                 GOTO(unlock_child, rc = -EREMOTE);
1328         }
1329         /* We used to acquire MDS_INODELOCK_FULL here but we can't do
1330          * this now because a running HSM restore on the child (unlink
1331          * victim) will hold the layout lock. See LU-4002.
1332          */
1333         rc = mdt_object_stripes_lock(info, mp, mc, child_lh, einfo,
1334                                      MDS_INODELOCK_LOOKUP |
1335                                      MDS_INODELOCK_UPDATE, LCK_EX);
1336         if (rc != 0)
1337                 GOTO(put_child, rc);
1338
1339         /*
1340          * Now we can only make sure we need MA_INODE, in mdd layer, will check
1341          * whether need MA_LOV and MA_COOKIE.
1342          */
1343         ma->ma_need = MA_INODE;
1344         ma->ma_valid = 0;
1345
1346         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1347                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
1348         /* save version when object is locked */
1349         mdt_version_get_save(info, mc, 1);
1350
1351         mutex_lock(&mc->mot_lov_mutex);
1352
1353         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1354                         mdt_object_child(mc), &rr->rr_name, ma, no_name);
1355
1356         mutex_unlock(&mc->mot_lov_mutex);
1357         if (rc != 0)
1358                 GOTO(unlock_child, rc);
1359
1360         if (!lu_object_is_dying(&mc->mot_header)) {
1361                 rc = mdt_attr_get_complex(info, mc, ma);
1362                 if (rc)
1363                         GOTO(out_stat, rc);
1364         } else if (mdt_dom_check_for_discard(info, mc)) {
1365                 mdt_dom_discard_data(info, mc);
1366         }
1367         mdt_handle_last_unlink(info, mc, ma);
1368
1369 out_stat:
1370         if (ma->ma_valid & MA_INODE) {
1371                 switch (ma->ma_attr.la_mode & S_IFMT) {
1372                 case S_IFDIR:
1373                         mdt_counter_incr(req, LPROC_MDT_RMDIR,
1374                                          ktime_us_delta(ktime_get(), kstart));
1375                         break;
1376                 case S_IFREG:
1377                 case S_IFLNK:
1378                 case S_IFCHR:
1379                 case S_IFBLK:
1380                 case S_IFIFO:
1381                 case S_IFSOCK:
1382                         mdt_counter_incr(req, LPROC_MDT_UNLINK,
1383                                          ktime_us_delta(ktime_get(), kstart));
1384                         break;
1385                 default:
1386                         LASSERTF(0, "bad file type %o unlinking\n",
1387                                 ma->ma_attr.la_mode);
1388                 }
1389         }
1390
1391         EXIT;
1392
1393 unlock_child:
1394         /* after unlink the object is gone, no need to keep lock */
1395         mdt_object_stripes_unlock(info, mc, child_lh, einfo, 1);
1396 put_child:
1397         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID &&
1398             info->mti_big_buf.lb_buf)
1399                 lu_buf_free(&info->mti_big_buf);
1400         mdt_object_put(info->mti_env, mc);
1401 unlock_parent:
1402         mdt_object_unlock(info, mp, parent_lh, rc);
1403 put_parent:
1404         mdt_object_put(info->mti_env, mp);
1405         CFS_RACE_WAKEUP(OBD_FAIL_OBD_ZERO_NLINK_RACE);
1406         return rc;
1407 }
1408
1409 /*
1410  * VBR: save versions in reply: 0 - parent; 1 - child by fid; 2 - target by
1411  * name.
1412  */
1413 static int mdt_reint_link(struct mdt_thread_info *info,
1414                           struct mdt_lock_handle *lhc)
1415 {
1416         struct mdt_reint_record *rr = &info->mti_rr;
1417         struct ptlrpc_request   *req = mdt_info_req(info);
1418         struct md_attr          *ma = &info->mti_attr;
1419         struct mdt_object       *ms;
1420         struct mdt_object       *mp;
1421         struct mdt_lock_handle  *lhs;
1422         struct mdt_lock_handle  *lhp;
1423         ktime_t kstart = ktime_get();
1424         int rc;
1425
1426         ENTRY;
1427         DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/"DNAME,
1428                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), PNAME(&rr->rr_name));
1429
1430         if (CFS_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
1431                 RETURN(err_serious(-ENOENT));
1432
1433         if (CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_RESEND_RACE) ||
1434             CFS_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND)) {
1435                 req->rq_no_reply = 1;
1436                 RETURN(err_serious(-ENOENT));
1437         }
1438
1439         if (info->mti_dlm_req)
1440                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1441
1442         /* Invalid case so return error immediately instead of
1443          * processing it
1444          */
1445         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
1446                 RETURN(-EPERM);
1447
1448         if (!fid_is_md_operative(rr->rr_fid1) ||
1449             !fid_is_md_operative(rr->rr_fid2))
1450                 RETURN(-EPERM);
1451
1452         /* step 1: find target parent dir */
1453         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
1454         if (IS_ERR(mp))
1455                 RETURN(PTR_ERR(mp));
1456
1457         rc = mdt_version_get_check_save(info, mp, 0);
1458         if (rc)
1459                 GOTO(put_parent, rc);
1460
1461         rc = mdt_check_enc(info, mp);
1462         if (rc)
1463                 GOTO(put_parent, rc);
1464
1465         /* step 2: find source */
1466         ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1467         if (IS_ERR(ms))
1468                 GOTO(put_parent, rc = PTR_ERR(ms));
1469
1470         if (!mdt_object_exists(ms)) {
1471                 CDEBUG(D_INFO, "%s: "DFID" does not exist.\n",
1472                        mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1));
1473                 GOTO(put_source, rc = -ENOENT);
1474         }
1475
1476         CFS_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
1477
1478         lhp = &info->mti_lh[MDT_LH_PARENT];
1479         rc = mdt_parent_lock(info, mp, lhp, &rr->rr_name, LCK_PW);
1480         if (rc != 0)
1481                 GOTO(put_source, rc);
1482
1483         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
1484
1485         lhs = &info->mti_lh[MDT_LH_CHILD];
1486         rc = mdt_object_lock(info, ms, lhs,
1487                              MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
1488                              LCK_EX);
1489         if (rc != 0)
1490                 GOTO(unlock_parent, rc);
1491
1492         /* step 3: link it */
1493         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1494                         OBD_FAIL_MDS_REINT_LINK_WRITE);
1495
1496         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(ms));
1497         rc = mdt_version_get_check_save(info, ms, 1);
1498         if (rc)
1499                 GOTO(unlock_source, rc);
1500
1501         /** check target version by name during replay */
1502         rc = mdt_lookup_version_check(info, mp, &rr->rr_name,
1503                                       &info->mti_tmp_fid1, 2);
1504         if (rc != 0 && rc != -ENOENT)
1505                 GOTO(unlock_source, rc);
1506         /* save version of file name for replay, it must be ENOENT here */
1507         if (!req_is_replay(mdt_info_req(info))) {
1508                 if (rc != -ENOENT) {
1509                         CDEBUG(D_INFO, "link target "DNAME" existed!\n",
1510                                PNAME(&rr->rr_name));
1511                         GOTO(unlock_source, rc = -EEXIST);
1512                 }
1513                 info->mti_ver[2] = ENOENT_VERSION;
1514                 mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2);
1515         }
1516
1517         rc = mdo_link(info->mti_env, mdt_object_child(mp),
1518                       mdt_object_child(ms), &rr->rr_name, ma);
1519
1520         if (rc == 0)
1521                 mdt_counter_incr(req, LPROC_MDT_LINK,
1522                                  ktime_us_delta(ktime_get(), kstart));
1523
1524         EXIT;
1525 unlock_source:
1526         mdt_object_unlock(info, ms, lhs, rc);
1527 unlock_parent:
1528         mdt_object_unlock(info, mp, lhp, rc);
1529 put_source:
1530         mdt_object_put(info->mti_env, ms);
1531 put_parent:
1532         mdt_object_put(info->mti_env, mp);
1533         return rc;
1534 }
1535
1536 /**
1537  * Get BFL lock for rename or migrate process.
1538  **/
1539 static int mdt_rename_lock(struct mdt_thread_info *info,
1540                            struct mdt_lock_handle *lh)
1541 {
1542         struct lu_fid *fid = &info->mti_tmp_fid1;
1543         struct mdt_object *obj;
1544         __u64 ibits = MDS_INODELOCK_UPDATE;
1545         int rc;
1546
1547         ENTRY;
1548         lu_root_fid(fid);
1549         obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1550         if (IS_ERR(obj))
1551                 RETURN(PTR_ERR(obj));
1552
1553         mdt_lock_reg_init(lh, LCK_EX);
1554         rc = mdt_object_lock_internal(info, obj, &LUSTRE_BFL_FID, lh,
1555                                       &ibits, 0, false);
1556         mdt_object_put(info->mti_env, obj);
1557         RETURN(rc);
1558 }
1559
1560 static void mdt_rename_unlock(struct mdt_thread_info *info,
1561                               struct mdt_lock_handle *lh)
1562 {
1563         ENTRY;
1564         /* Cancel the single rename lock right away */
1565         mdt_object_unlock(info, NULL, lh, 1);
1566         EXIT;
1567 }
1568
1569 static struct mdt_object *mdt_parent_find_check(struct mdt_thread_info *info,
1570                                                 const struct lu_fid *fid,
1571                                                 int idx)
1572 {
1573         struct mdt_object *dir;
1574         int rc;
1575
1576         ENTRY;
1577         dir = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1578         if (IS_ERR(dir))
1579                 RETURN(dir);
1580
1581         /* check early, the real version will be saved after locking */
1582         rc = mdt_version_get_check(info, dir, idx);
1583         if (rc)
1584                 GOTO(out_put, rc);
1585
1586         if (!mdt_object_exists(dir))
1587                 GOTO(out_put, rc = -ENOENT);
1588
1589         if (!S_ISDIR(lu_object_attr(&dir->mot_obj)))
1590                 GOTO(out_put, rc = -ENOTDIR);
1591
1592         RETURN(dir);
1593 out_put:
1594         mdt_object_put(info->mti_env, dir);
1595         return ERR_PTR(rc);
1596 }
1597
1598 /*
1599  * lock rename source object.
1600  *
1601  * Both source and its parent object may be located on remote MDTs, and even on
1602  * different MDTs, which means source object is a remote object on parent.
1603  *
1604  * \retval      0 on success
1605  * \retval      -ev negative errno upon error
1606  */
1607 static int mdt_rename_source_lock(struct mdt_thread_info *info,
1608                                   struct mdt_object *parent,
1609                                   struct mdt_object *child,
1610                                   struct mdt_lock_handle *lh,
1611                                   struct mdt_lock_handle *lh_lookup,
1612                                   __u64 ibits)
1613 {
1614         int rc;
1615
1616         LASSERT(ibits & MDS_INODELOCK_LOOKUP);
1617         /* if @obj is remote object, LOOKUP lock needs to be taken from
1618          * parent MDT.
1619          */
1620         rc = mdt_is_remote_object(info, parent, child);
1621         if (rc < 0)
1622                 return rc;
1623
1624         if (rc == 1) {
1625                 rc = mdt_object_lookup_lock(info, parent, child, lh_lookup,
1626                                             LCK_EX);
1627                 if (rc)
1628                         return rc;
1629
1630                 ibits &= ~MDS_INODELOCK_LOOKUP;
1631         }
1632
1633         rc = mdt_object_lock(info, child, lh, ibits, LCK_EX);
1634         if (unlikely(rc && !(ibits & MDS_INODELOCK_LOOKUP)))
1635                 mdt_object_unlock(info, NULL, lh_lookup, rc);
1636
1637         return 0;
1638 }
1639
1640 static void mdt_rename_source_unlock(struct mdt_thread_info *info,
1641                                      struct mdt_object *obj,
1642                                      struct mdt_lock_handle *lh,
1643                                      struct mdt_lock_handle *lh_lookup,
1644                                      int decref)
1645 {
1646         mdt_object_unlock(info, obj, lh, decref);
1647         mdt_object_unlock(info, NULL, lh_lookup, decref);
1648 }
1649
1650 /* migration takes UPDATE lock of link parent, and LOOKUP lock of link */
1651 struct mdt_link_lock {
1652         struct mdt_object *mll_obj;
1653         struct mdt_lock_handle mll_lh;
1654         struct list_head mll_linkage;
1655 };
1656
1657 static inline int mdt_migrate_link_lock_add(struct mdt_thread_info *info,
1658                                             struct mdt_object *o,
1659                                             struct mdt_lock_handle *lh,
1660                                             struct list_head *list)
1661 {
1662         struct mdt_link_lock *mll;
1663
1664         OBD_ALLOC_PTR(mll);
1665         if (mll == NULL)
1666                 return -ENOMEM;
1667
1668         INIT_LIST_HEAD(&mll->mll_linkage);
1669         mdt_object_get(info->mti_env, o);
1670         mll->mll_obj = o;
1671         mll->mll_lh = *lh;
1672         memset(lh, 0, sizeof(*lh));
1673         list_add_tail(&mll->mll_linkage, list);
1674
1675         return 0;
1676 }
1677
1678 static inline void mdt_migrate_link_lock_del(struct mdt_thread_info *info,
1679                                              struct mdt_link_lock *mll,
1680                                              int decref)
1681 {
1682         mdt_object_unlock(info, mll->mll_obj, &mll->mll_lh, decref);
1683         mdt_object_put(info->mti_env, mll->mll_obj);
1684         list_del(&mll->mll_linkage);
1685         OBD_FREE_PTR(mll);
1686 }
1687
1688 static void mdt_migrate_links_unlock(struct mdt_thread_info *info,
1689                                      struct list_head *list, int decref)
1690 {
1691         struct mdt_link_lock *mll;
1692         struct mdt_link_lock *tmp;
1693
1694         list_for_each_entry_safe(mll, tmp, list, mll_linkage)
1695                 mdt_migrate_link_lock_del(info, mll, decref);
1696 }
1697
1698 /* take link parent UPDATE lock.
1699  * \retval      0 \a lnkp is already locked, no lock taken.
1700  *              1 lock taken
1701  *              -ev negative errno.
1702  */
1703 static int mdt_migrate_link_parent_lock(struct mdt_thread_info *info,
1704                                         struct mdt_object *lnkp,
1705                                         struct list_head *update_locks,
1706                                         bool *blocked)
1707 {
1708         const struct lu_fid *fid = mdt_object_fid(lnkp);
1709         struct mdt_lock_handle *lhl = &info->mti_lh[MDT_LH_LOCAL];
1710         struct mdt_link_lock *entry;
1711         __u64 ibits = 0;
1712         int rc;
1713
1714         ENTRY;
1715
1716         /* check if it's already locked */
1717         list_for_each_entry(entry, update_locks, mll_linkage) {
1718                 if (lu_fid_eq(mdt_object_fid(entry->mll_obj), fid)) {
1719                         CDEBUG(D_INFO, "skip "DFID" lock\n", PFID(fid));
1720                         RETURN(0);
1721                 }
1722         }
1723
1724         /* link parent UPDATE lock */
1725         CDEBUG(D_INFO, "lock "DFID"\n", PFID(fid));
1726
1727         if (*blocked) {
1728                 /* revoke lock instead of take in *blocked* mode */
1729                 rc = mdt_object_lock(info, lnkp, lhl, MDS_INODELOCK_UPDATE,
1730                                      LCK_PW);
1731                 if (rc)
1732                         RETURN(rc);
1733
1734                 if (mdt_object_remote(lnkp)) {
1735                         struct ldlm_lock *lock;
1736
1737                         /*
1738                          * for remote object, set lock cb_atomic, so lock can be
1739                          * released in blocking_ast() immediately, then the next
1740                          * lock_try will have better chance of success.
1741                          */
1742                         lock = ldlm_handle2lock(&lhl->mlh_rreg_lh);
1743                         LASSERT(lock != NULL);
1744                         lock_res_and_lock(lock);
1745                         ldlm_set_atomic_cb(lock);
1746                         unlock_res_and_lock(lock);
1747                         LDLM_LOCK_PUT(lock);
1748                 }
1749
1750                 mdt_object_unlock(info, lnkp, lhl, 1);
1751                 RETURN(0);
1752         }
1753
1754         /*
1755          * we can't follow parent-child lock order like other MD
1756          * operations, use lock_try here to avoid deadlock, if the lock
1757          * cannot be taken, drop all locks taken, revoke the blocked
1758          * one, and continue processing the remaining entries, and in
1759          * the end of the loop restart from beginning.
1760          *
1761          * don't lock with PDO mode in case two links are under the same
1762          * parent and their hash values are different.
1763          */
1764         rc = mdt_object_lock_try(info, lnkp, lhl, &ibits, MDS_INODELOCK_UPDATE,
1765                                  LCK_PW);
1766         if (rc < 0)
1767                 RETURN(rc);
1768
1769         if (!(ibits & MDS_INODELOCK_UPDATE)) {
1770                 CDEBUG(D_INFO, "busy lock on "DFID"\n", PFID(fid));
1771                 *blocked = true;
1772                 RETURN(-EAGAIN);
1773         }
1774
1775         rc = mdt_migrate_link_lock_add(info, lnkp, lhl, update_locks);
1776         if (rc) {
1777                 mdt_object_unlock(info, lnkp, lhl, 1);
1778                 RETURN(rc);
1779         }
1780
1781         RETURN(1);
1782 }
1783
1784 /* take link LOOKUP lock.
1785  * \retval      0 \a lnkp is already locked, no lock taken.
1786  *              1 lock taken.
1787  *              -ev negative errno.
1788  */
1789 static int mdt_migrate_link_lock(struct mdt_thread_info *info,
1790                                  struct mdt_object *lnkp,
1791                                  struct mdt_object *spobj,
1792                                  struct mdt_object *obj,
1793                                  struct list_head *lookup_locks)
1794 {
1795         const struct lu_fid *fid = mdt_object_fid(lnkp);
1796         struct mdt_lock_handle *lhl = &info->mti_lh[MDT_LH_LOCAL];
1797         struct mdt_link_lock *entry;
1798         int rc;
1799
1800         ENTRY;
1801
1802         /* check if it's already locked by source */
1803         rc = mdt_fids_different_target(info, fid, mdt_object_fid(spobj));
1804         if (rc <= 0) {
1805                 CDEBUG(D_INFO, "skip lookup lock on source parent "DFID"\n",
1806                        PFID(fid));
1807                 RETURN(rc);
1808         }
1809
1810         /* check if it's already locked by other links */
1811         list_for_each_entry(entry, lookup_locks, mll_linkage) {
1812                 rc = mdt_fids_different_target(info, fid,
1813                                                mdt_object_fid(entry->mll_obj));
1814                 if (rc <= 0) {
1815                         CDEBUG(D_INFO, "skip lookup lock on parent "DFID"\n",
1816                                PFID(fid));
1817                         RETURN(rc);
1818                 }
1819         }
1820
1821         rc = mdt_object_lookup_lock(info, lnkp, obj, lhl, LCK_EX);
1822         if (rc)
1823                 RETURN(rc);
1824
1825         /* don't take local LOOKUP lock, because later we will lock other ibits
1826          * of sobj (which is on local MDT), and lock the same object twice may
1827          * deadlock, just revoke this lock.
1828          */
1829         if (!mdt_object_remote(lnkp))
1830                 GOTO(unlock, rc = 0);
1831
1832         rc = mdt_migrate_link_lock_add(info, lnkp, lhl, lookup_locks);
1833         if (rc)
1834                 GOTO(unlock, rc);
1835
1836         RETURN(1);
1837 unlock:
1838         mdt_object_unlock(info, lnkp, lhl, 1);
1839         return rc;
1840 }
1841
1842 /*
1843  * take UPDATE lock of link parents and LOOKUP lock of links, also check whether
1844  * total local lock count exceeds RS_MAX_LOCKS.
1845  *
1846  * \retval      0 on success, and locks can be saved in ptlrpc_reply_stat
1847  * \retval      1 on success, but total lock count may exceed RS_MAX_LOCKS
1848  * \retval      -ev negative errno upon error
1849  */
1850 static int mdt_migrate_links_lock(struct mdt_thread_info *info,
1851                                   struct mdt_object *spobj,
1852                                   struct mdt_object *tpobj,
1853                                   struct mdt_object *obj,
1854                                   struct mdt_lock_handle *lhsp,
1855                                   struct mdt_lock_handle *lhtp,
1856                                   struct list_head *link_locks)
1857 {
1858         struct mdt_device *mdt = info->mti_mdt;
1859         struct lu_buf *buf = &info->mti_big_buf;
1860         struct lu_name *lname = &info->mti_name;
1861         struct linkea_data ldata = { NULL };
1862         int local_lock_cnt = 0;
1863         bool blocked = false;
1864         bool saved;
1865         struct mdt_object *lnkp;
1866         struct lu_fid fid;
1867         LIST_HEAD(update_locks);
1868         LIST_HEAD(lookup_locks);
1869         int rc;
1870
1871         ENTRY;
1872         if (S_ISDIR(lu_object_attr(&obj->mot_obj)))
1873                 RETURN(0);
1874
1875         buf = lu_buf_check_and_alloc(buf, MAX_LINKEA_SIZE);
1876         if (buf->lb_buf == NULL)
1877                 RETURN(-ENOMEM);
1878
1879         ldata.ld_buf = buf;
1880         rc = mdt_links_read(info, obj, &ldata);
1881         if (rc) {
1882                 if (rc == -ENOENT || rc == -ENODATA)
1883                         rc = 0;
1884                 RETURN(rc);
1885         }
1886
1887         for (linkea_first_entry(&ldata); ldata.ld_lee && !rc;
1888              linkea_next_entry(&ldata)) {
1889                 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, lname,
1890                                     &fid);
1891
1892                 /* check if link parent is source parent too */
1893                 if (lu_fid_eq(mdt_object_fid(spobj), &fid)) {
1894                         CDEBUG(D_INFO,
1895                                "skip lock on source parent "DFID"/"DNAME"\n",
1896                                PFID(&fid), PNAME(lname));
1897                         continue;
1898                 }
1899
1900                 /* check if link parent is target parent too */
1901                 if (tpobj != spobj && lu_fid_eq(mdt_object_fid(tpobj), &fid)) {
1902                         CDEBUG(D_INFO,
1903                                "skip lock on target parent "DFID"/"DNAME"\n",
1904                                PFID(&fid), PNAME(lname));
1905                         continue;
1906                 }
1907
1908                 lnkp = mdt_object_find(info->mti_env, mdt, &fid);
1909                 if (IS_ERR(lnkp)) {
1910                         CWARN("%s: cannot find obj "DFID": %ld\n",
1911                               mdt_obd_name(mdt), PFID(&fid), PTR_ERR(lnkp));
1912                         continue;
1913                 }
1914
1915                 if (!mdt_object_exists(lnkp)) {
1916                         CDEBUG(D_INFO, DFID" doesn't exist, skip "DNAME"\n",
1917                                PFID(&fid), PNAME(lname));
1918                         mdt_object_put(info->mti_env, lnkp);
1919                         continue;
1920                 }
1921 relock:
1922                 saved = blocked;
1923                 rc = mdt_migrate_link_parent_lock(info, lnkp, &update_locks,
1924                                                   &blocked);
1925                 if (!saved && blocked) {
1926                         /* unlock all locks taken to avoid deadlock */
1927                         mdt_migrate_links_unlock(info, &update_locks, 1);
1928                         mdt_object_unlock(info, spobj, lhsp, 1);
1929                         if (tpobj != spobj)
1930                                 mdt_object_unlock(info, tpobj, lhtp, 1);
1931                         goto relock;
1932                 }
1933                 if (rc < 0) {
1934                         mdt_object_put(info->mti_env, lnkp);
1935                         GOTO(out, rc);
1936                 }
1937
1938                 if (rc == 1 && !mdt_object_remote(lnkp))
1939                         local_lock_cnt++;
1940
1941                 rc = mdt_migrate_link_lock(info, lnkp, spobj, obj,
1942                                            &lookup_locks);
1943                 if (rc < 0) {
1944                         mdt_object_put(info->mti_env, lnkp);
1945                         GOTO(out, rc);
1946                 }
1947                 if (rc == 1 && !mdt_object_remote(lnkp))
1948                         local_lock_cnt++;
1949                 mdt_object_put(info->mti_env, lnkp);
1950         }
1951
1952         if (blocked)
1953                 GOTO(out, rc = -EBUSY);
1954
1955         EXIT;
1956 out:
1957         list_splice(&update_locks, link_locks);
1958         list_splice(&lookup_locks, link_locks);
1959         if (rc < 0) {
1960                 mdt_migrate_links_unlock(info, link_locks, rc);
1961         } else if (local_lock_cnt > RS_MAX_LOCKS - 5) {
1962                 /*
1963                  * parent may have 3 local objects: master object and 2 stripes
1964                  * (if it's being migrated too); source may have 1 local objects
1965                  * as regular file; target has 1 local object.
1966                  * Note, source may have 2 local locks if it is directory but it
1967                  * can't have hardlinks, so it is not considered here.
1968                  */
1969                 CDEBUG(D_INFO, "Too many local locks (%d), migrate in sync mode\n",
1970                        local_lock_cnt);
1971                 rc = 1;
1972         }
1973         return rc;
1974 }
1975
1976 /*
1977  * lookup source by name, if parent is striped directory, we need to find the
1978  * corresponding stripe where source is located, and then lookup there.
1979  *
1980  * besides, if parent is migrating too, and file is already in target stripe,
1981  * this should be a redo of 'lfs migrate' on client side.
1982  *
1983  * \retval 1 tpobj stripe index is less than spobj stripe index
1984  * \retval 0 tpobj stripe index is larger than or equal to spobj stripe index
1985  * \retval -ev negative errno upon error
1986  */
1987 static int mdt_migrate_lookup(struct mdt_thread_info *info,
1988                               struct mdt_object *pobj,
1989                               const struct md_attr *ma,
1990                               const struct lu_name *lname,
1991                               struct mdt_object **spobj,
1992                               struct mdt_object **tpobj,
1993                               struct mdt_object **sobj)
1994 {
1995         const struct lu_env *env = info->mti_env;
1996         struct lu_fid *fid = &info->mti_tmp_fid1;
1997         int spindex = -1;
1998         int tpindex = -1;
1999         int rc;
2000
2001         if (ma->ma_valid & MA_LMV) {
2002                 /* if parent is striped, lookup on corresponding stripe */
2003                 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
2004                 struct lu_fid *fid2 = &info->mti_tmp_fid2;
2005
2006                 if (!lmv_is_sane(lmv))
2007                         return -EBADF;
2008
2009                 spindex = lmv_name_to_stripe_index_old(lmv, lname->ln_name,
2010                                                        lname->ln_namelen);
2011                 if (spindex < 0)
2012                         return spindex;
2013
2014                 fid_le_to_cpu(fid2, &lmv->lmv_stripe_fids[spindex]);
2015
2016                 *spobj = mdt_object_find(env, info->mti_mdt, fid2);
2017                 if (IS_ERR(*spobj)) {
2018                         rc = PTR_ERR(*spobj);
2019                         *spobj = NULL;
2020                         return rc;
2021                 }
2022
2023                 if (!mdt_object_exists(*spobj))
2024                         GOTO(spobj_put, rc = -ENOENT);
2025
2026                 fid_zero(fid);
2027                 rc = mdo_lookup(env, mdt_object_child(*spobj), lname, fid,
2028                                 &info->mti_spec);
2029                 if ((rc == -ENOENT || rc == 0) && lmv_is_layout_changing(lmv)) {
2030                         /* fail check here to let top dir migration succeed. */
2031                         if (CFS_FAIL_CHECK_RESET(OBD_FAIL_MIGRATE_ENTRIES, 0))
2032                                 GOTO(spobj_put, rc = -EIO);
2033
2034                         /*
2035                          * if parent layout is changeing, and lookup child
2036                          * failed on source stripe, lookup again on target
2037                          * stripe, if it exists, it means previous migration
2038                          * was interrupted, and current file was migrated
2039                          * already.
2040                          */
2041                         tpindex = lmv_name_to_stripe_index(lmv, lname->ln_name,
2042                                                            lname->ln_namelen);
2043                         if (tpindex < 0)
2044                                 GOTO(spobj_put, rc = tpindex);
2045
2046                         fid_le_to_cpu(fid2, &lmv->lmv_stripe_fids[tpindex]);
2047
2048                         *tpobj = mdt_object_find(env, info->mti_mdt, fid2);
2049                         if (IS_ERR(*tpobj)) {
2050                                 rc = PTR_ERR(*tpobj);
2051                                 *tpobj = NULL;
2052                                 GOTO(spobj_put, rc);
2053                         }
2054
2055                         if (!mdt_object_exists(*tpobj))
2056                                 GOTO(tpobj_put, rc = -ENOENT);
2057
2058                         if (rc == -ENOENT) {
2059                                 fid_zero(fid);
2060                                 rc = mdo_lookup(env, mdt_object_child(*tpobj),
2061                                                 lname, fid, &info->mti_spec);
2062                                 GOTO(tpobj_put, rc = rc ?: -EALREADY);
2063                         }
2064                 } else if (rc) {
2065                         GOTO(spobj_put, rc);
2066                 } else {
2067                         *tpobj = *spobj;
2068                         tpindex = spindex;
2069                         mdt_object_get(env, *tpobj);
2070                 }
2071         } else {
2072                 fid_zero(fid);
2073                 rc = mdo_lookup(env, mdt_object_child(pobj), lname, fid,
2074                                 &info->mti_spec);
2075                 if (rc)
2076                         return rc;
2077
2078                 *spobj = pobj;
2079                 *tpobj = pobj;
2080                 mdt_object_get(env, pobj);
2081                 mdt_object_get(env, pobj);
2082         }
2083
2084         *sobj = mdt_object_find(env, info->mti_mdt, fid);
2085         if (IS_ERR(*sobj)) {
2086                 rc = PTR_ERR(*sobj);
2087                 *sobj = NULL;
2088                 GOTO(tpobj_put, rc);
2089         }
2090
2091         if (!mdt_object_exists(*sobj))
2092                 GOTO(sobj_put, rc = -ENOENT);
2093
2094         return (tpindex < spindex);
2095
2096 sobj_put:
2097         mdt_object_put(env, *sobj);
2098         *sobj = NULL;
2099 tpobj_put:
2100         mdt_object_put(env, *tpobj);
2101         *tpobj = NULL;
2102 spobj_put:
2103         mdt_object_put(env, *spobj);
2104         *spobj = NULL;
2105
2106         return rc;
2107 }
2108
2109 /* end lease and close file for regular file */
2110 static int mdd_migrate_close(struct mdt_thread_info *info,
2111                              struct mdt_object *obj)
2112 {
2113         struct close_data *data;
2114         struct mdt_body *repbody;
2115         struct ldlm_lock *lease;
2116         int rc;
2117         int rc2;
2118
2119         rc = -EPROTO;
2120         if (!req_capsule_field_present(info->mti_pill, &RMF_MDT_EPOCH,
2121                                       RCL_CLIENT) ||
2122             !req_capsule_field_present(info->mti_pill, &RMF_CLOSE_DATA,
2123                                       RCL_CLIENT))
2124                 goto close;
2125
2126         data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
2127         if (!data)
2128                 goto close;
2129
2130         rc = -ESTALE;
2131         lease = ldlm_handle2lock(&data->cd_handle);
2132         if (!lease)
2133                 goto close;
2134
2135         /* check if the lease was already canceled */
2136         lock_res_and_lock(lease);
2137         rc = ldlm_is_cancel(lease);
2138         unlock_res_and_lock(lease);
2139
2140         if (rc) {
2141                 rc = -EAGAIN;
2142                 LDLM_DEBUG(lease, DFID" lease broken",
2143                            PFID(mdt_object_fid(obj)));
2144         }
2145
2146         /*
2147          * cancel server side lease, client side counterpart should have been
2148          * cancelled, it's okay to cancel it now as we've held mot_open_sem.
2149          */
2150         ldlm_lock_cancel(lease);
2151         ldlm_reprocess_all(lease->l_resource,
2152                            lease->l_policy_data.l_inodebits.bits);
2153         LDLM_LOCK_PUT(lease);
2154
2155 close:
2156         rc2 = mdt_close_internal(info, mdt_info_req(info), NULL);
2157         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2158         repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
2159
2160         return rc ?: rc2;
2161 }
2162
2163 /* LFSCK used to clear hash type and MIGRATION flag upon migration failure */
2164 static inline bool lmv_is_failed_migration(const struct lmv_mds_md_v1 *lmv)
2165 {
2166         return le32_to_cpu(lmv->lmv_hash_type) ==
2167                 (LMV_HASH_TYPE_UNKNOWN | LMV_HASH_FLAG_BAD_TYPE) &&
2168                lmv_is_known_hash_type(le32_to_cpu(lmv->lmv_migrate_hash)) &&
2169                le32_to_cpu(lmv->lmv_migrate_offset) > 0 &&
2170                le32_to_cpu(lmv->lmv_migrate_offset) <
2171                 le32_to_cpu(lmv->lmv_stripe_count);
2172 }
2173
2174 /*
2175  * migrate file in below steps:
2176  *  1. lock source and target stripes
2177  *  2. lookup source by name
2178  *  3. lock parents of source links if source is not directory
2179  *  4. reject if source is in HSM
2180  *  5. take source open_sem and close file if source is regular file
2181  *  6. lock source, and its stripes if it's directory
2182  *  7. migrate file
2183  *  8. lock target so subsequent change to it can trigger COS
2184  *  9. unlock above locks
2185  * 10. sync device if source has too many links
2186  */
2187 int mdt_reint_migrate(struct mdt_thread_info *info,
2188                       struct mdt_lock_handle *unused)
2189 {
2190         const struct lu_env *env = info->mti_env;
2191         struct mdt_device *mdt = info->mti_mdt;
2192         struct ptlrpc_request *req = mdt_info_req(info);
2193         struct mdt_reint_record *rr = &info->mti_rr;
2194         struct lu_ucred *uc = mdt_ucred(info);
2195         struct md_attr *ma = &info->mti_attr;
2196         struct mdt_object *pobj;
2197         struct mdt_object *spobj;
2198         struct mdt_object *tpobj;
2199         struct mdt_object *sobj;
2200         struct mdt_object *tobj;
2201         struct mdt_lock_handle *rename_lh = &info->mti_lh[MDT_LH_RMT];
2202         struct mdt_lock_handle *lhsp;
2203         struct mdt_lock_handle *lhtp;
2204         struct mdt_lock_handle *lhs;
2205         struct mdt_lock_handle *lhl;
2206         LIST_HEAD(link_locks);
2207         int lock_retries = 5;
2208         bool reverse = false;
2209         bool open_sem_locked = false;
2210         bool do_sync = false;
2211         bool is_plain_dir = false;
2212         int rc;
2213
2214         ENTRY;
2215         CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1),
2216                PNAME(&rr->rr_name), PFID(rr->rr_fid2));
2217
2218         if (info->mti_dlm_req)
2219                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2220
2221         if (!fid_is_md_operative(rr->rr_fid1) ||
2222             !fid_is_md_operative(rr->rr_fid2))
2223                 RETURN(-EPERM);
2224
2225         /* don't allow migrate . or .. */
2226         if (lu_name_is_dot_or_dotdot(&rr->rr_name))
2227                 RETURN(-EBUSY);
2228
2229         if (!mdt->mdt_enable_remote_dir || !mdt->mdt_enable_dir_migration)
2230                 RETURN(-EPERM);
2231
2232         /* we want rbac roles to have precedence over any other
2233          * permission or capability checks
2234          */
2235         if (uc && (!uc->uc_rbac_dne_ops ||
2236                    (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
2237                     uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
2238                     mdt->mdt_enable_remote_dir_gid != -1)))
2239                 RETURN(-EPERM);
2240
2241         /*
2242          * Note: do not enqueue rename lock for replay request, because
2243          * if other MDT holds rename lock, but being blocked to wait for
2244          * this MDT to finish its recovery, and the failover MDT can not
2245          * get rename lock, which will cause deadlock.
2246          *
2247          * req is NULL if this is called by directory auto-split.
2248          */
2249         if (req && !req_is_replay(req)) {
2250                 rc = mdt_rename_lock(info, rename_lh);
2251                 if (rc != 0) {
2252                         CERROR("%s: can't lock FS for rename: rc = %d\n",
2253                                mdt_obd_name(info->mti_mdt), rc);
2254                         RETURN(rc);
2255                 }
2256         }
2257
2258         /* pobj is master object of parent */
2259         pobj = mdt_object_find(env, mdt, rr->rr_fid1);
2260         if (IS_ERR(pobj))
2261                 GOTO(unlock_rename, rc = PTR_ERR(pobj));
2262
2263         if (req) {
2264                 rc = mdt_version_get_check(info, pobj, 0);
2265                 if (rc)
2266                         GOTO(put_parent, rc);
2267         }
2268
2269         if (!mdt_object_exists(pobj))
2270                 GOTO(put_parent, rc = -ENOENT);
2271
2272         if (!S_ISDIR(lu_object_attr(&pobj->mot_obj)))
2273                 GOTO(put_parent, rc = -ENOTDIR);
2274
2275         rc = mdt_check_enc(info, pobj);
2276         if (rc)
2277                 GOTO(put_parent, rc);
2278
2279         rc = mdt_stripe_get(info, pobj, ma, XATTR_NAME_LMV);
2280         if (rc)
2281                 GOTO(put_parent, rc);
2282
2283         if (CFS_FAIL_CHECK(OBD_FAIL_MIGRATE_BAD_HASH) &&
2284             (ma->ma_valid & MA_LMV) &&
2285             lmv_is_migrating(&ma->ma_lmv->lmv_md_v1)) {
2286                 struct lu_buf *buf = &info->mti_buf;
2287                 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
2288                 __u32 version = le32_to_cpu(lmv->lmv_layout_version);
2289
2290                 lmv->lmv_hash_type = cpu_to_le32(LMV_HASH_TYPE_UNKNOWN |
2291                                                  LMV_HASH_FLAG_BAD_TYPE);
2292                 lmv->lmv_layout_version = cpu_to_le32(version + 1);
2293                 buf->lb_buf = lmv;
2294                 buf->lb_len = sizeof(*lmv);
2295                 rc = mo_xattr_set(env, mdt_object_child(pobj), buf,
2296                                   XATTR_NAME_LMV, LU_XATTR_REPLACE);
2297                 mo_invalidate(env, mdt_object_child(pobj));
2298                 GOTO(put_parent, rc);
2299         }
2300
2301         /* @spobj is the parent stripe of @sobj if @pobj is striped directory,
2302          * if @pobj is migrating too, tpobj is the target parent stripe.
2303          */
2304         rc = mdt_migrate_lookup(info, pobj, ma, &rr->rr_name, &spobj, &tpobj,
2305                                 &sobj);
2306         if (rc < 0)
2307                 GOTO(put_parent, rc);
2308         reverse = rc;
2309
2310         /* parent unchanged, this happens in dir restripe */
2311         if (info->mti_spec.sp_migrate_nsonly && spobj == tpobj)
2312                 GOTO(put_source, rc = -EALREADY);
2313
2314 lock_parent:
2315         LASSERT(spobj);
2316         LASSERT(tpobj);
2317         lhsp = &info->mti_lh[MDT_LH_PARENT];
2318         lhtp = &info->mti_lh[MDT_LH_CHILD];
2319         /* lock spobj and tpobj in stripe index order */
2320         if (reverse) {
2321                 rc = mdt_parent_lock(info, tpobj, lhtp, &rr->rr_name, LCK_PW);
2322                 if (rc)
2323                         GOTO(put_source, rc);
2324
2325                 LASSERT(spobj != tpobj);
2326                 rc = mdt_parent_lock(info, spobj, lhsp, &rr->rr_name, LCK_PW);
2327                 if (rc)
2328                         GOTO(unlock_parent, rc);
2329         } else {
2330                 rc = mdt_parent_lock(info, spobj, lhsp, &rr->rr_name, LCK_PW);
2331                 if (rc)
2332                         GOTO(put_source, rc);
2333
2334                 if (tpobj != spobj) {
2335                         rc = mdt_parent_lock(info, tpobj, lhtp, &rr->rr_name,
2336                                              LCK_PW);
2337                         if (rc)
2338                                 GOTO(unlock_parent, rc);
2339                 }
2340         }
2341
2342         /* if inode is not migrated, or is dir, no need to lock links */
2343         if (!info->mti_spec.sp_migrate_nsonly &&
2344             !S_ISDIR(lu_object_attr(&sobj->mot_obj))) {
2345                 /* lock link parents, and take LOOKUP lock of links */
2346                 rc = mdt_migrate_links_lock(info, spobj, tpobj, sobj, lhsp,
2347                                             lhtp, &link_locks);
2348                 if (rc == -EBUSY && lock_retries-- > 0) {
2349                         LASSERT(list_empty(&link_locks));
2350                         goto lock_parent;
2351                 }
2352
2353                 if (rc < 0)
2354                         GOTO(put_source, rc);
2355
2356                 /*
2357                  * RS_MAX_LOCKS is the limit of number of locks that can be
2358                  * saved along with one request, if total lock count exceeds
2359                  * this limit, we will drop all locks after migration, and
2360                  * trigger commit in the end.
2361                  */
2362                 do_sync = rc;
2363         }
2364
2365         /* lock source */
2366         lhs = &info->mti_lh[MDT_LH_OLD];
2367         lhl = &info->mti_lh[MDT_LH_LOOKUP];
2368         rc = mdt_rename_source_lock(info, spobj, sobj, lhs, lhl,
2369                                     MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR |
2370                                     MDS_INODELOCK_OPEN);
2371         if (rc)
2372                 GOTO(unlock_links, rc);
2373
2374         if (mdt_object_remote(sobj)) {
2375                 struct md_attr *ma2 = &info->mti_attr2;
2376                 ma2->ma_need = MA_INODE;
2377                 rc = mo_attr_get(env, mdt_object_child(sobj), ma2);
2378                 if (rc)
2379                         GOTO(unlock_source, rc);
2380         }
2381
2382         if (S_ISREG(lu_object_attr(&sobj->mot_obj))) {
2383                 /* TODO: DoM migration is not supported, migrate dirent only */
2384                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LOV);
2385                 if (rc)
2386                         GOTO(unlock_source, rc);
2387
2388                 if (ma->ma_valid & MA_LOV && mdt_lmm_dom_stripesize(ma->ma_lmm))
2389                         info->mti_spec.sp_migrate_nsonly = 1;
2390         } else if (S_ISDIR(lu_object_attr(&sobj->mot_obj))) {
2391                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2392                 if (rc)
2393                         GOTO(unlock_source, rc);
2394
2395                 if (!(ma->ma_valid & MA_LMV))
2396                         is_plain_dir = true;
2397                 else if (lmv_is_restriping(&ma->ma_lmv->lmv_md_v1))
2398                         /* race with restripe/auto-split */
2399                         GOTO(unlock_source, rc = -EBUSY);
2400                 else if (lmv_is_failed_migration(&ma->ma_lmv->lmv_md_v1)) {
2401                         struct lu_buf *buf = &info->mti_buf;
2402                         struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
2403                         __u32 version = le32_to_cpu(lmv->lmv_layout_version);
2404
2405                         /* migration failed before, and LFSCK cleared hash type
2406                          * and flags, fake it to resume migration.
2407                          */
2408                         lmv->lmv_hash_type =
2409                                 cpu_to_le32(LMV_HASH_TYPE_FNV_1A_64 |
2410                                             LMV_HASH_FLAG_MIGRATION |
2411                                             LMV_HASH_FLAG_BAD_TYPE |
2412                                             LMV_HASH_FLAG_FIXED);
2413                         lmv->lmv_layout_version = cpu_to_le32(version + 1);
2414                         buf->lb_buf = lmv;
2415                         buf->lb_len = sizeof(*lmv);
2416                         rc = mo_xattr_set(env, mdt_object_child(sobj), buf,
2417                                           XATTR_NAME_LMV, LU_XATTR_REPLACE);
2418                         mo_invalidate(env, mdt_object_child(sobj));
2419                         GOTO(unlock_source, rc = -EALREADY);
2420                 }
2421         }
2422
2423         /* if migration HSM is allowed */
2424         if (!mdt->mdt_migrate_hsm_allowed) {
2425                 ma->ma_need = MA_HSM;
2426                 ma->ma_valid = 0;
2427                 rc = mdt_attr_get_complex(info, sobj, ma);
2428                 if (rc)
2429                         GOTO(unlock_source, rc);
2430
2431                 if ((ma->ma_valid & MA_HSM) && ma->ma_hsm.mh_flags != 0)
2432                         GOTO(unlock_source, rc = -EOPNOTSUPP);
2433         }
2434
2435         /* end lease and close file for regular file */
2436         if (info->mti_spec.sp_migrate_close) {
2437                 /* try to hold open_sem so that nobody else can open the file */
2438                 if (!down_write_trylock(&sobj->mot_open_sem)) {
2439                         /* close anyway */
2440                         mdd_migrate_close(info, sobj);
2441                         GOTO(unlock_source, rc = -EBUSY);
2442                 } else {
2443                         open_sem_locked = true;
2444                         rc = mdd_migrate_close(info, sobj);
2445                         if (rc && rc != -ESTALE)
2446                                 GOTO(unlock_open_sem, rc);
2447                 }
2448         }
2449
2450         tobj = mdt_object_find(env, mdt, rr->rr_fid2);
2451         if (IS_ERR(tobj))
2452                 GOTO(unlock_open_sem, rc = PTR_ERR(tobj));
2453
2454         /* Don't do lookup sanity check. We know name doesn't exist. */
2455         info->mti_spec.sp_cr_lookup = 0;
2456         info->mti_spec.sp_feat = &dt_directory_features;
2457
2458         rc = mdo_migrate(env, mdt_object_child(spobj),
2459                          mdt_object_child(tpobj), mdt_object_child(sobj),
2460                          mdt_object_child(tobj), &rr->rr_name,
2461                          &info->mti_spec, ma);
2462         if (rc)
2463                 GOTO(put_target, rc);
2464
2465         /* save target locks for directory */
2466         if (S_ISDIR(lu_object_attr(&sobj->mot_obj)) &&
2467             !info->mti_spec.sp_migrate_nsonly) {
2468                 struct mdt_lock_handle *lht = &info->mti_lh[MDT_LH_NEW];
2469                 struct ldlm_enqueue_info *einfo = &info->mti_einfo;
2470
2471                 /* in case sobj becomes a stripe of tobj, unlock sobj here,
2472                  * otherwise stripes lock may deadlock.
2473                  */
2474                 if (is_plain_dir)
2475                         mdt_rename_source_unlock(info, sobj, lhs, lhl, 1);
2476
2477                 rc = mdt_object_stripes_lock(info, tpobj, tobj, lht, einfo,
2478                                              MDS_INODELOCK_UPDATE, LCK_PW);
2479                 if (rc)
2480                         GOTO(put_target, rc);
2481
2482                 mdt_object_stripes_unlock(info, tobj, lht, einfo, 0);
2483         }
2484
2485         lprocfs_counter_incr(mdt->mdt_lu_dev.ld_obd->obd_md_stats,
2486                              LPROC_MDT_MIGRATE + LPROC_MD_LAST_OPC);
2487
2488         EXIT;
2489 put_target:
2490         mdt_object_put(env, tobj);
2491 unlock_open_sem:
2492         if (open_sem_locked)
2493                 up_write(&sobj->mot_open_sem);
2494 unlock_source:
2495         mdt_rename_source_unlock(info, sobj, lhs, lhl, rc);
2496 unlock_links:
2497         /* if we've got too many locks to save into RPC,
2498          * then just commit before the locks are released
2499          */
2500         if (!rc && do_sync)
2501                 mdt_device_sync(env, mdt);
2502         mdt_migrate_links_unlock(info, &link_locks, do_sync ? 1 : rc);
2503 unlock_parent:
2504         mdt_object_unlock(info, spobj, lhsp, rc);
2505         mdt_object_unlock(info, tpobj, lhtp, rc);
2506 put_source:
2507         mdt_object_put(env, sobj);
2508         mdt_object_put(env, spobj);
2509         mdt_object_put(env, tpobj);
2510 put_parent:
2511         mo_invalidate(env, mdt_object_child(pobj));
2512         mdt_object_put(env, pobj);
2513 unlock_rename:
2514         mdt_rename_unlock(info, rename_lh);
2515
2516         if (rc)
2517                 CERROR("%s: migrate "DFID"/"DNAME" failed: rc = %d\n",
2518                        mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1),
2519                        PNAME(&rr->rr_name), rc);
2520
2521         return rc;
2522 }
2523
2524 /*
2525  * determine lock order of sobj and tobj
2526  *
2527  * there are two situations we need to lock tobj before sobj:
2528  * 1. sobj is child of tobj
2529  * 2. sobj and tobj are stripes of a directory, and stripe index of sobj is
2530  *    larger than that of tobj
2531  *
2532  * \retval      1 lock tobj before sobj
2533  * \retval      0 lock sobj before tobj
2534  * \retval      -ev negative errno upon error
2535  */
2536 static int mdt_rename_determine_lock_order(struct mdt_thread_info *info,
2537                                            struct mdt_object *sobj,
2538                                            struct mdt_object *tobj)
2539 {
2540         struct md_attr *ma = &info->mti_attr;
2541         struct lu_fid *spfid = &info->mti_tmp_fid1;
2542         struct lu_fid *tpfid = &info->mti_tmp_fid2;
2543         struct lmv_mds_md_v1 *lmv;
2544         __u32 sindex;
2545         __u32 tindex;
2546         int rc;
2547
2548         /* sobj and tobj are the same */
2549         if (sobj == tobj)
2550                 return 0;
2551
2552         if (fid_is_root(mdt_object_fid(sobj)))
2553                 return 0;
2554
2555         if (fid_is_root(mdt_object_fid(tobj)))
2556                 return 1;
2557
2558         /* check whether sobj is child of tobj */
2559         rc = mdo_is_subdir(info->mti_env, mdt_object_child(sobj),
2560                            mdt_object_fid(tobj));
2561         if (rc < 0)
2562                 return rc;
2563
2564         if (rc == 1)
2565                 return 1;
2566
2567         /* check whether sobj and tobj are children of the same parent */
2568         rc = mdt_attr_get_pfid(info, sobj, spfid);
2569         if (rc)
2570                 return rc;
2571
2572         rc = mdt_attr_get_pfid(info, tobj, tpfid);
2573         if (rc)
2574                 return rc;
2575
2576         if (!lu_fid_eq(spfid, tpfid))
2577                 return 0;
2578
2579         /* check whether sobj and tobj are sibling stripes */
2580         rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2581         if (rc)
2582                 return rc;
2583
2584         if (!(ma->ma_valid & MA_LMV))
2585                 return 0;
2586
2587         lmv = &ma->ma_lmv->lmv_md_v1;
2588         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2589                 return 0;
2590         sindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2591
2592         ma->ma_valid = 0;
2593         rc = mdt_stripe_get(info, tobj, ma, XATTR_NAME_LMV);
2594         if (rc)
2595                 return rc;
2596
2597         if (!(ma->ma_valid & MA_LMV))
2598                 return -ENODATA;
2599
2600         lmv = &ma->ma_lmv->lmv_md_v1;
2601         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2602                 return -EINVAL;
2603         tindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2604
2605         /* check stripe index of sobj and tobj */
2606         if (sindex == tindex)
2607                 return -EINVAL;
2608
2609         return sindex < tindex ? 0 : 1;
2610 }
2611
2612 /* Helper function for mdt_reint_rename so we don't need to opencode
2613  * two different order lockings
2614  */
2615 static int mdt_lock_two_dirs(struct mdt_thread_info *info,
2616                              struct mdt_object *mfirstdir,
2617                              struct mdt_lock_handle *lh_firstdirp,
2618                              const struct lu_name *firstname,
2619                              struct mdt_object *mseconddir,
2620                              struct mdt_lock_handle *lh_seconddirp,
2621                              const struct lu_name *secondname)
2622 {
2623         int rc;
2624
2625         rc = mdt_parent_lock(info, mfirstdir, lh_firstdirp, firstname, LCK_PW);
2626         if (rc)
2627                 return rc;
2628
2629         mdt_version_get_save(info, mfirstdir, 0);
2630         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
2631
2632         if (mfirstdir != mseconddir) {
2633                 rc = mdt_parent_lock(info, mseconddir, lh_seconddirp,
2634                                      secondname, LCK_PW);
2635         } else if (!mdt_object_remote(mseconddir)) {
2636                 if (lh_firstdirp->mlh_pdo_hash !=
2637                     lh_seconddirp->mlh_pdo_hash) {
2638                         rc = mdt_object_pdo_lock(info, mseconddir,
2639                                                  lh_seconddirp, secondname,
2640                                                  LCK_PW, false);
2641                         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
2642                 }
2643         }
2644         mdt_version_get_save(info, mseconddir, 1);
2645
2646         if (rc != 0)
2647                 mdt_object_unlock(info, mfirstdir, lh_firstdirp, rc);
2648
2649         return rc;
2650 }
2651
2652 /*
2653  * VBR: rename versions in reply: 0 - srcdir parent; 1 - tgtdir parent;
2654  * 2 - srcdir child; 3 - tgtdir child.
2655  * Update on disk version of srcdir child.
2656  */
2657 static int mdt_reint_rename(struct mdt_thread_info *info,
2658                             struct mdt_lock_handle *unused)
2659 {
2660         struct mdt_device *mdt = info->mti_mdt;
2661         struct mdt_reint_record *rr = &info->mti_rr;
2662         struct md_attr *ma = &info->mti_attr;
2663         struct ptlrpc_request *req = mdt_info_req(info);
2664         struct mdt_object *msrcdir = NULL;
2665         struct mdt_object *mtgtdir = NULL;
2666         struct mdt_object *mold;
2667         struct mdt_object *mnew = NULL;
2668         struct mdt_lock_handle *rename_lh = &info->mti_lh[MDT_LH_RMT];
2669         struct mdt_lock_handle *lh_srcdirp;
2670         struct mdt_lock_handle *lh_tgtdirp;
2671         struct mdt_lock_handle *lh_oldp = NULL;
2672         struct mdt_lock_handle *lh_lookup = NULL;
2673         struct mdt_lock_handle *lh_newp = NULL;
2674         struct lu_fid *old_fid = &info->mti_tmp_fid1;
2675         struct lu_fid *new_fid = &info->mti_tmp_fid2;
2676         struct lu_ucred *uc = mdt_ucred(info);
2677         bool reverse = false, discard = false;
2678         ktime_t kstart = ktime_get();
2679         enum mdt_stat_idx msi = 0;
2680         bool remote;
2681         bool bfl = false;
2682         int rc;
2683
2684         ENTRY;
2685         DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME,
2686                   PFID(rr->rr_fid1), PNAME(&rr->rr_name),
2687                   PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name));
2688
2689         if (info->mti_dlm_req)
2690                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2691
2692         if (!fid_is_md_operative(rr->rr_fid1) ||
2693             !fid_is_md_operative(rr->rr_fid2))
2694                 RETURN(-EPERM);
2695
2696         /* find both parents. */
2697         msrcdir = mdt_parent_find_check(info, rr->rr_fid1, 0);
2698         if (IS_ERR(msrcdir))
2699                 RETURN(PTR_ERR(msrcdir));
2700
2701         rc = mdt_check_enc(info, msrcdir);
2702         if (rc)
2703                 GOTO(out_put_srcdir, rc);
2704
2705         remote = mdt_object_remote(msrcdir);
2706         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
2707
2708         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
2709                 mtgtdir = msrcdir;
2710                 mdt_object_get(info->mti_env, mtgtdir);
2711         } else {
2712                 mtgtdir = mdt_parent_find_check(info, rr->rr_fid2, 1);
2713                 if (IS_ERR(mtgtdir))
2714                         GOTO(out_put_srcdir, rc = PTR_ERR(mtgtdir));
2715         }
2716
2717         rc = mdt_check_enc(info, mtgtdir);
2718         if (rc)
2719                 GOTO(out_put_tgtdir, rc);
2720
2721         if (!uc->uc_rbac_fscrypt_admin &&
2722             mtgtdir->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD)
2723                 GOTO(out_put_tgtdir, rc = -EPERM);
2724
2725         /*
2726          * Note: do not enqueue rename lock for replay request, because
2727          * if other MDT holds rename lock, but being blocked to wait for
2728          * this MDT to finish its recovery, and the failover MDT can not
2729          * get rename lock, which will cause deadlock.
2730          */
2731         if (!req_is_replay(req)) {
2732                 /*
2733                  * Normally rename RPC is handled on the MDT with the target
2734                  * directory (if target exists, it's on the MDT with the
2735                  * target), if the source directory is remote, it's a hint that
2736                  * source is remote too (this may not be true, but it won't
2737                  * cause any issue), return -EXDEV early to avoid taking
2738                  * rename_lock.
2739                  */
2740                 if (!mdt->mdt_enable_remote_rename && remote)
2741                         GOTO(out_put_tgtdir, rc = -EXDEV);
2742
2743                 if (remote ||
2744                     (S_ISDIR(ma->ma_attr.la_mode) &&
2745                      (msrcdir != mtgtdir ||
2746                       !mdt->mdt_enable_parallel_rename_dir)) ||
2747                     (!S_ISDIR(ma->ma_attr.la_mode) &&
2748                      (!mdt->mdt_enable_parallel_rename_file ||
2749                       (msrcdir != mtgtdir &&
2750                        !mdt->mdt_enable_parallel_rename_crossdir)))) {
2751                         rc = mdt_rename_lock(info, rename_lh);
2752                         if (rc != 0) {
2753                                 CERROR("%s: cannot lock for rename: rc = %d\n",
2754                                        mdt_obd_name(mdt), rc);
2755                                 GOTO(out_put_tgtdir, rc);
2756                         }
2757                         bfl = true;
2758                 } else {
2759                         if (S_ISDIR(ma->ma_attr.la_mode))
2760                                 msi = LPROC_MDT_RENAME_PAR_DIR;
2761                         else
2762                                 msi = LPROC_MDT_RENAME_PAR_FILE;
2763
2764                         CDEBUG(D_INFO,
2765                                "%s: %s %s parallel rename "DFID"/"DNAME"\n",
2766                                mdt_obd_name(mdt),
2767                                msrcdir == mtgtdir ? "samedir" : "crossdir",
2768                                S_ISDIR(ma->ma_attr.la_mode) ? "dir" : "file",
2769                                PFID(rr->rr_fid1), PNAME(&rr->rr_name));
2770                 }
2771         }
2772
2773 lock_parents:
2774         rc = mdt_rename_determine_lock_order(info, msrcdir, mtgtdir);
2775         if (rc < 0)
2776                 GOTO(out_unlock_rename, rc);
2777         reverse = rc;
2778
2779         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2780         CFS_RACE(OBD_FAIL_MDS_REINT_OPEN);
2781         CFS_RACE(OBD_FAIL_MDS_REINT_OPEN2);
2782
2783         /* lock parents in the proper order. */
2784         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
2785         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
2786         mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name);
2787         mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name);
2788
2789         /* In case of same dir local rename we must sort by the hash,
2790          * otherwise a lock deadlock is possible when renaming
2791          * a to b and b to a at the same time LU-15285
2792          */
2793         if (!mdt_object_remote(mtgtdir) && mtgtdir == msrcdir)
2794                 reverse = lh_srcdirp->mlh_pdo_hash > lh_tgtdirp->mlh_pdo_hash;
2795         if (unlikely(CFS_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK)))
2796                 reverse = 0;
2797
2798         if (reverse)
2799                 rc = mdt_lock_two_dirs(info, mtgtdir, lh_tgtdirp,
2800                                        &rr->rr_tgt_name, msrcdir, lh_srcdirp,
2801                                        &rr->rr_name);
2802         else
2803                 rc = mdt_lock_two_dirs(info, msrcdir, lh_srcdirp, &rr->rr_name,
2804                                        mtgtdir, lh_tgtdirp, &rr->rr_tgt_name);
2805
2806         if (rc != 0)
2807                 GOTO(out_unlock_rename, rc);
2808
2809         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2810         CFS_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME2, 5);
2811
2812         /* find mold object. */
2813         fid_zero(old_fid);
2814         rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2);
2815         if (rc != 0)
2816                 GOTO(out_unlock_parents, rc);
2817
2818         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
2819                 GOTO(out_unlock_parents, rc = -EINVAL);
2820
2821         if (!fid_is_md_operative(old_fid))
2822                 GOTO(out_unlock_parents, rc = -EPERM);
2823
2824         mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
2825         if (IS_ERR(mold))
2826                 GOTO(out_unlock_parents, rc = PTR_ERR(mold));
2827
2828         if (!mdt_object_exists(mold)) {
2829                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2830                                 &mold->mot_obj,
2831                                 "object does not exist");
2832                 GOTO(out_put_old, rc = -ENOENT);
2833         }
2834
2835         if (mdt_object_remote(mold) && !mdt->mdt_enable_remote_rename)
2836                 GOTO(out_put_old, rc = -EXDEV);
2837
2838         /* we used msrcdir as a hint to take BFL, but it may be wrong */
2839         if (unlikely(!bfl && !req_is_replay(req) &&
2840                      !S_ISDIR(ma->ma_attr.la_mode) &&
2841                      mdt_object_remote(mold))) {
2842                 LASSERT(!remote);
2843                 mdt_object_put(info->mti_env, mold);
2844                 mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
2845                 mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
2846
2847                 rc = mdt_rename_lock(info, rename_lh);
2848                 if (rc != 0) {
2849                         CERROR("%s: cannot re-lock for rename: rc = %d\n",
2850                                mdt_obd_name(mdt), rc);
2851                         GOTO(out_put_tgtdir, rc);
2852                 }
2853                 bfl = true;
2854                 msi = 0;
2855                 goto lock_parents;
2856         }
2857
2858         /* Check if @mtgtdir is subdir of @mold, before locking child
2859          * to avoid reverse locking.
2860          */
2861         if (mtgtdir != msrcdir) {
2862                 rc = mdo_is_subdir(info->mti_env, mdt_object_child(mtgtdir),
2863                                    old_fid);
2864                 if (rc) {
2865                         if (rc == 1)
2866                                 rc = -EINVAL;
2867                         GOTO(out_put_old, rc);
2868                 }
2869         }
2870
2871         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold));
2872         /* save version after locking */
2873         mdt_version_get_save(info, mold, 2);
2874
2875         /* find mnew object:
2876          * mnew target object may not exist now
2877          * lookup with version checking
2878          */
2879         fid_zero(new_fid);
2880         rc = mdt_lookup_version_check(info, mtgtdir, &rr->rr_tgt_name, new_fid,
2881                                       3);
2882         if (rc == 0) {
2883                 /* the new_fid should have been filled at this moment */
2884                 if (lu_fid_eq(old_fid, new_fid))
2885                         GOTO(out_put_old, rc);
2886
2887                 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
2888                     lu_fid_eq(new_fid, rr->rr_fid2))
2889                         GOTO(out_put_old, rc = -EINVAL);
2890
2891                 if (!fid_is_md_operative(new_fid))
2892                         GOTO(out_put_old, rc = -EPERM);
2893
2894                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
2895                 if (IS_ERR(mnew))
2896                         GOTO(out_put_old, rc = PTR_ERR(mnew));
2897
2898                 if (!mdt_object_exists(mnew)) {
2899                         LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2900                                         &mnew->mot_obj,
2901                                         "object does not exist");
2902                         GOTO(out_put_new, rc = -ENOENT);
2903                 }
2904
2905                 if (mdt_object_remote(mnew)) {
2906                         struct mdt_body  *repbody;
2907
2908                         /* Always send rename req to the target child MDT */
2909                         repbody = req_capsule_server_get(info->mti_pill,
2910                                                          &RMF_MDT_BODY);
2911                         LASSERT(repbody != NULL);
2912                         repbody->mbo_fid1 = *new_fid;
2913                         repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
2914                         GOTO(out_put_new, rc = -EXDEV);
2915                 }
2916                 /* Before locking the target dir, check we do not replace
2917                  * a dir with a non-dir, otherwise it may deadlock with
2918                  * link op which tries to create a link in this dir
2919                  * back to this non-dir.
2920                  */
2921                 if (S_ISDIR(lu_object_attr(&mnew->mot_obj)) &&
2922                     !S_ISDIR(lu_object_attr(&mold->mot_obj)))
2923                         GOTO(out_put_new, rc = -EISDIR);
2924
2925                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2926                 lh_lookup = &info->mti_lh[MDT_LH_LOOKUP];
2927                 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2928                                             lh_lookup,
2929                                             MDS_INODELOCK_LOOKUP |
2930                                             MDS_INODELOCK_XATTR);
2931                 if (rc < 0)
2932                         GOTO(out_put_new, rc);
2933
2934                 /* Check if @msrcdir is subdir of @mnew, before locking child
2935                  * to avoid reverse locking.
2936                  */
2937                 if (mtgtdir != msrcdir) {
2938                         rc = mdo_is_subdir(info->mti_env,
2939                                            mdt_object_child(msrcdir), new_fid);
2940                         if (rc) {
2941                                 if (rc == 1)
2942                                         rc = -EINVAL;
2943                                 GOTO(out_unlock_old, rc);
2944                         }
2945                 }
2946
2947                 /* We used to acquire MDS_INODELOCK_FULL here but we
2948                  * can't do this now because a running HSM restore on
2949                  * the rename onto victim will hold the layout
2950                  * lock. See LU-4002.
2951                  */
2952
2953                 lh_newp = &info->mti_lh[MDT_LH_NEW];
2954                 rc = mdt_object_check_lock(info, mtgtdir, mnew, lh_newp,
2955                                            MDS_INODELOCK_LOOKUP |
2956                                            MDS_INODELOCK_UPDATE, LCK_EX);
2957                 if (rc != 0)
2958                         GOTO(out_unlock_new, rc);
2959
2960                 /* get and save version after locking */
2961                 mdt_version_get_save(info, mnew, 3);
2962         } else if (rc != -ENOENT) {
2963                 GOTO(out_put_old, rc);
2964         } else {
2965                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2966                 lh_lookup = &info->mti_lh[MDT_LH_LOOKUP];
2967                 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2968                                             lh_lookup,
2969                                             MDS_INODELOCK_LOOKUP |
2970                                             MDS_INODELOCK_XATTR);
2971                 if (rc != 0)
2972                         GOTO(out_put_old, rc);
2973
2974                 mdt_enoent_version_save(info, 3);
2975         }
2976
2977         /* step 5: rename it */
2978         mdt_reint_init_ma(info, ma);
2979
2980         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
2981                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
2982
2983         if (mnew != NULL)
2984                 mutex_lock(&mnew->mot_lov_mutex);
2985
2986         rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
2987                         mdt_object_child(mtgtdir), old_fid, &rr->rr_name,
2988                         mnew != NULL ? mdt_object_child(mnew) : NULL,
2989                         &rr->rr_tgt_name, ma);
2990
2991         if (mnew != NULL)
2992                 mutex_unlock(&mnew->mot_lov_mutex);
2993
2994         /* handle last link of tgt object */
2995         if (rc == 0) {
2996                 if (mnew) {
2997                         mdt_handle_last_unlink(info, mnew, ma);
2998                         discard = mdt_dom_check_for_discard(info, mnew);
2999                 }
3000                 mdt_rename_counter_tally(info, info->mti_mdt, req,
3001                                          msrcdir, mtgtdir, msi,
3002                                          ktime_us_delta(ktime_get(), kstart));
3003         }
3004
3005         EXIT;
3006 out_unlock_new:
3007         if (mnew != NULL)
3008                 /* mnew is gone, no need to keep lock */
3009                 mdt_object_unlock(info, mnew, lh_newp, 1);
3010 out_unlock_old:
3011         mdt_object_unlock(info, NULL, lh_lookup, rc);
3012         mdt_object_unlock(info, mold, lh_oldp, rc);
3013 out_put_new:
3014         if (mnew && !discard)
3015                 mdt_object_put(info->mti_env, mnew);
3016 out_put_old:
3017         mdt_object_put(info->mti_env, mold);
3018 out_unlock_parents:
3019         mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
3020         mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
3021 out_unlock_rename:
3022         mdt_rename_unlock(info, rename_lh);
3023 out_put_tgtdir:
3024         mdt_object_put(info->mti_env, mtgtdir);
3025 out_put_srcdir:
3026         mdt_object_put(info->mti_env, msrcdir);
3027
3028         /* The DoM discard can be done right in the place above where it is
3029          * assigned, meanwhile it is done here after rename unlock due to
3030          * compatibility with old clients, for them the discard blocks
3031          * the main thread until completion. Check LU-11359 for details.
3032          */
3033         if (discard) {
3034                 mdt_dom_discard_data(info, mnew);
3035                 mdt_object_put(info->mti_env, mnew);
3036         }
3037         CFS_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
3038         return rc;
3039 }
3040
3041 static int mdt_reint_resync(struct mdt_thread_info *info,
3042                             struct mdt_lock_handle *lhc)
3043 {
3044         struct mdt_reint_record *rr = &info->mti_rr;
3045         struct ptlrpc_request *req = mdt_info_req(info);
3046         struct md_attr *ma = &info->mti_attr;
3047         struct mdt_object *mo;
3048         struct ldlm_lock *lease;
3049         struct mdt_body *repbody;
3050         struct md_layout_change layout = { .mlc_mirror_id = rr->rr_mirror_id };
3051         bool lease_broken;
3052         int rc;
3053
3054         ENTRY;
3055         DEBUG_REQ(D_INODE, req, DFID", FLR file resync", PFID(rr->rr_fid1));
3056
3057         if (info->mti_dlm_req)
3058                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
3059
3060         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
3061         if (IS_ERR(mo))
3062                 GOTO(out, rc = PTR_ERR(mo));
3063
3064         if (!mdt_object_exists(mo))
3065                 GOTO(out_obj, rc = -ENOENT);
3066
3067         if (!S_ISREG(lu_object_attr(&mo->mot_obj)))
3068                 GOTO(out_obj, rc = -EINVAL);
3069
3070         if (mdt_object_remote(mo))
3071                 GOTO(out_obj, rc = -EREMOTE);
3072
3073         lease = ldlm_handle2lock(rr->rr_lease_handle);
3074         if (lease == NULL)
3075                 GOTO(out_obj, rc = -ESTALE);
3076
3077         /* It's really necessary to grab open_sem and check if the lease lock
3078          * has been lost. There would exist a concurrent writer coming in and
3079          * generating some dirty data in memory cache, the writeback would fail
3080          * after the layout version is increased by MDS_REINT_RESYNC RPC.
3081          */
3082         if (!down_write_trylock(&mo->mot_open_sem))
3083                 GOTO(out_put_lease, rc = -EBUSY);
3084
3085         lock_res_and_lock(lease);
3086         lease_broken = ldlm_is_cancel(lease);
3087         unlock_res_and_lock(lease);
3088         if (lease_broken)
3089                 GOTO(out_unlock, rc = -EBUSY);
3090
3091         /* the file has yet opened by anyone else after we took the lease. */
3092         layout.mlc_opc = MD_LAYOUT_RESYNC;
3093         lhc = &info->mti_lh[MDT_LH_LOCAL];
3094         rc = mdt_layout_change(info, mo, lhc, &layout);
3095         if (rc)
3096                 GOTO(out_unlock, rc);
3097
3098         mdt_object_unlock(info, mo, lhc, 0);
3099
3100         ma->ma_need = MA_INODE;
3101         ma->ma_valid = 0;
3102         rc = mdt_attr_get_complex(info, mo, ma);
3103         if (rc != 0)
3104                 GOTO(out_unlock, rc);
3105
3106         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
3107         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
3108
3109         EXIT;
3110 out_unlock:
3111         up_write(&mo->mot_open_sem);
3112 out_put_lease:
3113         LDLM_LOCK_PUT(lease);
3114 out_obj:
3115         mdt_object_put(info->mti_env, mo);
3116 out:
3117         mdt_client_compatibility(info);
3118         return rc;
3119 }
3120
3121 struct mdt_reinter {
3122         int (*mr_handler)(struct mdt_thread_info *, struct mdt_lock_handle *);
3123         enum lprocfs_extra_opc mr_extra_opc;
3124 };
3125
3126 static const struct mdt_reinter mdt_reinters[] = {
3127         [REINT_SETATTR] = {
3128                 .mr_handler = &mdt_reint_setattr,
3129                 .mr_extra_opc = MDS_REINT_SETATTR,
3130         },
3131         [REINT_CREATE] = {
3132                 .mr_handler = &mdt_reint_create,
3133                 .mr_extra_opc = MDS_REINT_CREATE,
3134         },
3135         [REINT_LINK] = {
3136                 .mr_handler = &mdt_reint_link,
3137                 .mr_extra_opc = MDS_REINT_LINK,
3138         },
3139         [REINT_UNLINK] = {
3140                 .mr_handler = &mdt_reint_unlink,
3141                 .mr_extra_opc = MDS_REINT_UNLINK,
3142         },
3143         [REINT_RENAME] = {
3144                 .mr_handler = &mdt_reint_rename,
3145                 .mr_extra_opc = MDS_REINT_RENAME,
3146         },
3147         [REINT_OPEN] = {
3148                 .mr_handler = &mdt_reint_open,
3149                 .mr_extra_opc = MDS_REINT_OPEN,
3150         },
3151         [REINT_SETXATTR] = {
3152                 .mr_handler = &mdt_reint_setxattr,
3153                 .mr_extra_opc = MDS_REINT_SETXATTR,
3154         },
3155         [REINT_RMENTRY] = {
3156                 .mr_handler = &mdt_reint_unlink,
3157                 .mr_extra_opc = MDS_REINT_UNLINK,
3158         },
3159         [REINT_MIGRATE] = {
3160                 .mr_handler = &mdt_reint_migrate,
3161                 .mr_extra_opc = MDS_REINT_RENAME,
3162         },
3163         [REINT_RESYNC] = {
3164                 .mr_handler = &mdt_reint_resync,
3165                 .mr_extra_opc = MDS_REINT_RESYNC,
3166         },
3167 };
3168
3169 int mdt_reint_rec(struct mdt_thread_info *info,
3170                   struct mdt_lock_handle *lhc)
3171 {
3172         const struct mdt_reinter *mr;
3173         int rc;
3174
3175         ENTRY;
3176         if (!(info->mti_rr.rr_opcode < ARRAY_SIZE(mdt_reinters)))
3177                 RETURN(-EPROTO);
3178
3179         mr = &mdt_reinters[info->mti_rr.rr_opcode];
3180         if (mr->mr_handler == NULL)
3181                 RETURN(-EPROTO);
3182
3183         rc = (*mr->mr_handler)(info, lhc);
3184
3185         lprocfs_counter_incr(ptlrpc_req2svc(mdt_info_req(info))->srv_stats,
3186                              PTLRPC_LAST_CNTR + mr->mr_extra_opc);
3187
3188         RETURN(rc);
3189 }