Whamcloud - gitweb
b16b197fc746caefd6853a71c067130864d8c325
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mdt/mdt_reint.c
32  *
33  * Lustre Metadata Target (mdt) reintegration routines
34  *
35  * Author: Peter Braam <braam@clusterfs.com>
36  * Author: Andreas Dilger <adilger@clusterfs.com>
37  * Author: Phil Schwan <phil@clusterfs.com>
38  * Author: Huang Hua <huanghua@clusterfs.com>
39  * Author: Yury Umanets <umka@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_MDS
43
44 #include <lprocfs_status.h>
45 #include "mdt_internal.h"
46 #include <lustre_lmv.h>
47 #include <lustre_crypto.h>
48
49 static inline void mdt_reint_init_ma(struct mdt_thread_info *info,
50                                      struct md_attr *ma)
51 {
52         ma->ma_need = MA_INODE;
53         ma->ma_valid = 0;
54 }
55
56 /**
57  * Get version of object by fid.
58  *
59  * Return real version or ENOENT_VERSION if object doesn't exist
60  */
61 static void mdt_obj_version_get(struct mdt_thread_info *info,
62                                 struct mdt_object *o, __u64 *version)
63 {
64         LASSERT(o);
65
66         if (mdt_object_exists(o) && !mdt_object_remote(o) &&
67             !fid_is_obf(mdt_object_fid(o)))
68                 *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
69         else
70                 *version = ENOENT_VERSION;
71         CDEBUG(D_INODE, "FID "DFID" version is %#llx\n",
72                PFID(mdt_object_fid(o)), *version);
73 }
74
75 /**
76  * Check version is correct.
77  *
78  * Should be called only during replay.
79  */
80 static int mdt_version_check(struct ptlrpc_request *req,
81                              __u64 version, int idx)
82 {
83         __u64 *pre_ver = lustre_msg_get_versions(req->rq_reqmsg);
84
85         ENTRY;
86         if (!exp_connect_vbr(req->rq_export))
87                 RETURN(0);
88
89         LASSERT(req_is_replay(req));
90         /** VBR: version is checked always because costs nothing */
91         LASSERT(idx < PTLRPC_NUM_VERSIONS);
92         /** Sanity check for malformed buffers */
93         if (pre_ver == NULL) {
94                 CERROR("No versions in request buffer\n");
95                 spin_lock(&req->rq_export->exp_lock);
96                 req->rq_export->exp_vbr_failed = 1;
97                 spin_unlock(&req->rq_export->exp_lock);
98                 RETURN(-EOVERFLOW);
99         } else if (pre_ver[idx] != version) {
100                 CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n",
101                        pre_ver[idx], version);
102                 spin_lock(&req->rq_export->exp_lock);
103                 req->rq_export->exp_vbr_failed = 1;
104                 spin_unlock(&req->rq_export->exp_lock);
105                 RETURN(-EOVERFLOW);
106         }
107         RETURN(0);
108 }
109
110 /**
111  * Save pre-versions in reply.
112  */
113 static void mdt_version_save(struct ptlrpc_request *req, __u64 version,
114                              int idx)
115 {
116         __u64 *reply_ver;
117
118         if (!exp_connect_vbr(req->rq_export))
119                 return;
120
121         LASSERT(!req_is_replay(req));
122         LASSERT(req->rq_repmsg != NULL);
123         reply_ver = lustre_msg_get_versions(req->rq_repmsg);
124         if (reply_ver)
125                 reply_ver[idx] = version;
126 }
127
128 /**
129  * Save enoent version, it is needed when it is obvious that object doesn't
130  * exist, e.g. child during create.
131  */
132 static void mdt_enoent_version_save(struct mdt_thread_info *info, int idx)
133 {
134         /* save version of file name for replay, it must be ENOENT here */
135         if (!req_is_replay(mdt_info_req(info))) {
136                 info->mti_ver[idx] = ENOENT_VERSION;
137                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
138         }
139 }
140
141 /**
142  * Get version from disk and save in reply buffer.
143  *
144  * Versions are saved in reply only during normal operations not replays.
145  */
146 void mdt_version_get_save(struct mdt_thread_info *info,
147                           struct mdt_object *mto, int idx)
148 {
149         /* don't save versions during replay */
150         if (!req_is_replay(mdt_info_req(info))) {
151                 mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
152                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
153         }
154 }
155
156 /**
157  * Get version from disk and check it, no save in reply.
158  */
159 int mdt_version_get_check(struct mdt_thread_info *info,
160                           struct mdt_object *mto, int idx)
161 {
162         /* only check versions during replay */
163         if (!req_is_replay(mdt_info_req(info)))
164                 return 0;
165
166         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
167         return mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
168 }
169
170 /**
171  * Get version from disk and check if recovery or just save.
172  */
173 int mdt_version_get_check_save(struct mdt_thread_info *info,
174                                struct mdt_object *mto, int idx)
175 {
176         int rc = 0;
177
178         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
179         if (req_is_replay(mdt_info_req(info)))
180                 rc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx],
181                                        idx);
182         else
183                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
184         return rc;
185 }
186
187 /**
188  * Lookup with version checking.
189  *
190  * This checks version of 'name'. Many reint functions uses 'name' for child not
191  * FID, therefore we need to get object by name and check its version.
192  */
193 int mdt_lookup_version_check(struct mdt_thread_info *info,
194                              struct mdt_object *p,
195                              const struct lu_name *lname,
196                              struct lu_fid *fid, int idx)
197 {
198         int rc, vbrc;
199
200         rc = mdo_lookup(info->mti_env, mdt_object_child(p), lname, fid,
201                         &info->mti_spec);
202         /* Check version only during replay */
203         if (!req_is_replay(mdt_info_req(info)))
204                 return rc;
205
206         info->mti_ver[idx] = ENOENT_VERSION;
207         if (rc == 0) {
208                 struct mdt_object *child;
209
210                 child = mdt_object_find(info->mti_env, info->mti_mdt, fid);
211                 if (likely(!IS_ERR(child))) {
212                         mdt_obj_version_get(info, child, &info->mti_ver[idx]);
213                         mdt_object_put(info->mti_env, child);
214                 }
215         }
216         vbrc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
217         return vbrc ? vbrc : rc;
218
219 }
220
221 static int mdt_unlock_slaves(struct mdt_thread_info *mti,
222                              struct mdt_object *obj,
223                              struct ldlm_enqueue_info *einfo,
224                              int decref)
225 {
226         union ldlm_policy_data *policy = &mti->mti_policy;
227         struct mdt_lock_handle *lh = &mti->mti_lh[MDT_LH_LOCAL];
228         struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
229         int i;
230
231         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
232         LASSERT(slave_locks);
233
234         memset(policy, 0, sizeof(*policy));
235         policy->l_inodebits.bits = einfo->ei_inodebits;
236         mdt_lock_handle_init(lh);
237         mdt_lock_reg_init(lh, einfo->ei_mode);
238         for (i = 0; i < slave_locks->ha_count; i++) {
239                 if (test_bit(i, (void *)slave_locks->ha_map))
240                         lh->mlh_rreg_lh = slave_locks->ha_handles[i];
241                 else
242                         lh->mlh_reg_lh = slave_locks->ha_handles[i];
243                 mdt_object_unlock(mti, NULL, lh, decref);
244                 slave_locks->ha_handles[i].cookie = 0ull;
245         }
246
247         return mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
248                                 policy);
249 }
250
251 static inline int mdt_object_striped(struct mdt_thread_info *mti,
252                                      struct mdt_object *obj)
253 {
254         struct lu_device *bottom_dev;
255         struct lu_object *bottom_obj;
256         int rc;
257
258         if (!S_ISDIR(obj->mot_header.loh_attr))
259                 return 0;
260
261         /* getxattr from bottom obj to avoid reading in shard FIDs */
262         bottom_dev = dt2lu_dev(mti->mti_mdt->mdt_bottom);
263         bottom_obj = lu_object_find_slice(mti->mti_env, bottom_dev,
264                                           mdt_object_fid(obj), NULL);
265         if (IS_ERR(bottom_obj))
266                 return PTR_ERR(bottom_obj);
267
268         rc = dt_xattr_get(mti->mti_env, lu2dt(bottom_obj), &LU_BUF_NULL,
269                           XATTR_NAME_LMV);
270         lu_object_put(mti->mti_env, bottom_obj);
271
272         return (rc > 0) ? 1 : (rc == -ENODATA) ? 0 : rc;
273 }
274
275 /**
276  * Lock slave stripes if necessary, the lock handles of slave stripes
277  * will be stored in einfo->ei_cbdata.
278  **/
279 static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
280                            enum ldlm_mode mode, __u64 ibits,
281                            struct ldlm_enqueue_info *einfo)
282 {
283         union ldlm_policy_data *policy = &mti->mti_policy;
284
285         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
286
287         einfo->ei_type = LDLM_IBITS;
288         einfo->ei_mode = mode;
289         einfo->ei_cb_bl = mdt_remote_blocking_ast;
290         einfo->ei_cb_local_bl = mdt_blocking_ast;
291         einfo->ei_cb_cp = ldlm_completion_ast;
292         einfo->ei_enq_slave = 1;
293         einfo->ei_namespace = mti->mti_mdt->mdt_namespace;
294         einfo->ei_inodebits = ibits;
295         einfo->ei_req_slot = 1;
296         memset(policy, 0, sizeof(*policy));
297         policy->l_inodebits.bits = ibits;
298
299         return mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
300                               policy);
301 }
302
303 int mdt_reint_striped_lock(struct mdt_thread_info *info,
304                            struct mdt_object *o,
305                            struct mdt_lock_handle *lh,
306                            __u64 ibits,
307                            struct ldlm_enqueue_info *einfo,
308                            bool cos_incompat)
309 {
310         int rc;
311
312         LASSERT(!mdt_object_remote(o));
313
314         memset(einfo, 0, sizeof(*einfo));
315
316         rc = mdt_reint_object_lock(info, o, lh, ibits, cos_incompat);
317         if (rc)
318                 return rc;
319
320         rc = mdt_object_striped(info, o);
321         if (rc != 1) {
322                 if (rc < 0)
323                         mdt_object_unlock(info, o, lh, rc);
324                 return rc;
325         }
326
327         rc = mdt_lock_slaves(info, o, lh->mlh_reg_mode, ibits, einfo);
328         if (rc) {
329                 mdt_object_unlock(info, o, lh, rc);
330                 if (rc == -EIO && OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME))
331                         rc = 0;
332         }
333
334         return rc;
335 }
336
337 void mdt_reint_striped_unlock(struct mdt_thread_info *info,
338                               struct mdt_object *o,
339                               struct mdt_lock_handle *lh,
340                               struct ldlm_enqueue_info *einfo, int decref)
341 {
342         if (einfo->ei_cbdata)
343                 mdt_unlock_slaves(info, o, einfo, decref);
344         mdt_object_unlock(info, o, lh, decref);
345 }
346
347 static int mdt_restripe(struct mdt_thread_info *info,
348                         struct mdt_object *parent,
349                         const struct lu_name *lname,
350                         const struct lu_fid *tfid,
351                         struct md_op_spec *spec,
352                         struct md_attr *ma)
353 {
354         struct mdt_device *mdt = info->mti_mdt;
355         struct lu_fid *fid = &info->mti_tmp_fid2;
356         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
357         struct lmv_user_md *lum = spec->u.sp_ea.eadata;
358         struct lu_ucred *uc = mdt_ucred(info);
359         struct lmv_mds_md_v1 *lmv;
360         struct mdt_object *child;
361         struct mdt_lock_handle *lhp;
362         struct mdt_lock_handle *lhc;
363         struct mdt_body *repbody;
364         int rc;
365
366         ENTRY;
367
368         /* we want rbac roles to have precedence over any other
369          * permission or capability checks
370          */
371         if (!mdt->mdt_enable_dir_restripe && !uc->uc_rbac_dne_ops)
372                 RETURN(-EPERM);
373
374         LASSERT(lum);
375         lum->lum_hash_type |= cpu_to_le32(LMV_HASH_FLAG_FIXED);
376
377         rc = mdt_version_get_check_save(info, parent, 0);
378         if (rc)
379                 RETURN(rc);
380
381         lhp = &info->mti_lh[MDT_LH_PARENT];
382         mdt_lock_pdo_init(lhp, LCK_PW, lname);
383         rc = mdt_reint_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
384                                    true);
385         if (rc)
386                 RETURN(rc);
387
388         rc = mdt_stripe_get(info, parent, ma, XATTR_NAME_LMV);
389         if (rc)
390                 GOTO(unlock_parent, rc);
391
392         if (ma->ma_valid & MA_LMV) {
393                 /* don't allow restripe if parent dir layout is changing */
394                 lmv = &ma->ma_lmv->lmv_md_v1;
395                 if (!lmv_is_sane2(lmv))
396                         GOTO(unlock_parent, rc = -EBADF);
397
398                 if (lmv_is_layout_changing(lmv))
399                         GOTO(unlock_parent, rc = -EBUSY);
400         }
401
402         fid_zero(fid);
403         rc = mdt_lookup_version_check(info, parent, lname, fid, 1);
404         if (rc)
405                 GOTO(unlock_parent, rc);
406
407         child = mdt_object_find(info->mti_env, mdt, fid);
408         if (IS_ERR(child))
409                 GOTO(unlock_parent, rc = PTR_ERR(child));
410
411         if (!mdt_object_exists(child))
412                 GOTO(out_child, rc = -ENOENT);
413
414         if (mdt_object_remote(child)) {
415                 struct mdt_body *repbody;
416
417                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
418                 if (!repbody)
419                         GOTO(out_child, rc = -EPROTO);
420
421                 repbody->mbo_fid1 = *fid;
422                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
423                 GOTO(out_child, rc = -EREMOTE);
424         }
425
426         if (!S_ISDIR(lu_object_attr(&child->mot_obj)))
427                 GOTO(out_child, rc = -ENOTDIR);
428
429         rc = mdt_stripe_get(info, child, ma, XATTR_NAME_LMV);
430         if (rc)
431                 GOTO(out_child, rc);
432
433         /* race with migrate? */
434         if ((ma->ma_valid & MA_LMV) &&
435              lmv_is_migrating(&ma->ma_lmv->lmv_md_v1))
436                 GOTO(out_child, rc = -EBUSY);
437
438         /* lock object */
439         lhc = &info->mti_lh[MDT_LH_CHILD];
440         mdt_lock_reg_init(lhc, LCK_EX);
441
442         /* enqueue object remote LOOKUP lock */
443         if (mdt_object_remote(parent)) {
444                 rc = mdt_remote_object_lock(info, parent, fid,
445                                             &lhc->mlh_rreg_lh,
446                                             lhc->mlh_rreg_mode,
447                                             MDS_INODELOCK_LOOKUP, false);
448                 if (rc != ELDLM_OK)
449                         GOTO(out_child, rc);
450         }
451
452         rc = mdt_reint_striped_lock(info, child, lhc, MDS_INODELOCK_FULL, einfo,
453                                     true);
454         if (rc)
455                 GOTO(unlock_child, rc);
456
457         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
458         rc = mdt_version_get_check_save(info, child, 1);
459         if (rc)
460                 GOTO(unlock_child, rc);
461
462         spin_lock(&mdt->mdt_restriper.mdr_lock);
463         if (child->mot_restriping) {
464                 /* race? */
465                 spin_unlock(&mdt->mdt_restriper.mdr_lock);
466                 GOTO(unlock_child, rc = -EBUSY);
467         }
468         child->mot_restriping = 1;
469         spin_unlock(&mdt->mdt_restriper.mdr_lock);
470
471         *fid = *tfid;
472         rc = mdt_restripe_internal(info, parent, child, lname, fid, spec, ma);
473         if (rc)
474                 GOTO(restriping_clear, rc);
475
476         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
477         if (!repbody)
478                 GOTO(restriping_clear, rc = -EPROTO);
479
480         mdt_pack_attr2body(info, repbody, &ma->ma_attr, fid);
481         EXIT;
482
483 restriping_clear:
484         child->mot_restriping = 0;
485 unlock_child:
486         mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
487 out_child:
488         mdt_object_put(info->mti_env, child);
489 unlock_parent:
490         mdt_object_unlock(info, parent, lhp, rc);
491
492         return rc;
493 }
494
495 /*
496  * VBR: we save three versions in reply:
497  * 0 - parent. Check that parent version is the same during replay.
498  * 1 - name. Version of 'name' if file exists with the same name or
499  * ENOENT_VERSION, it is needed because file may appear due to missed replays.
500  * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
501  * check.
502  */
503 static int mdt_create(struct mdt_thread_info *info)
504 {
505         struct mdt_device *mdt = info->mti_mdt;
506         struct mdt_object *parent;
507         struct mdt_object *child;
508         struct mdt_lock_handle *lh;
509         struct mdt_body *repbody;
510         struct md_attr *ma = &info->mti_attr;
511         struct mdt_reint_record *rr = &info->mti_rr;
512         struct md_op_spec *spec = &info->mti_spec;
513         bool restripe = false;
514         int rc;
515
516         ENTRY;
517         DEBUG_REQ(D_INODE, mdt_info_req(info),
518                   "Create ("DNAME"->"DFID") in "DFID,
519                   PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1));
520
521         if (!fid_is_md_operative(rr->rr_fid1))
522                 RETURN(-EPERM);
523
524         if (S_ISDIR(ma->ma_attr.la_mode) &&
525             spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
526                 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
527                 struct lu_ucred *uc = mdt_ucred(info);
528                 struct obd_export *exp = mdt_info_req(info)->rq_export;
529
530                 /* Only new clients can create remote dir( >= 2.4) and
531                  * striped dir(>= 2.6), old client will return -ENOTSUPP
532                  */
533                 if (!mdt_is_dne_client(exp))
534                         RETURN(-ENOTSUPP);
535
536                 if (le32_to_cpu(lum->lum_stripe_count) > 1) {
537                         if (!mdt_is_striped_client(exp))
538                                 RETURN(-ENOTSUPP);
539
540                         if (!mdt->mdt_enable_striped_dir)
541                                 RETURN(-EPERM);
542                 } else if (!mdt->mdt_enable_remote_dir) {
543                         RETURN(-EPERM);
544                 }
545
546                 if ((!(exp_connect_flags2(exp) & OBD_CONNECT2_CRUSH)) &&
547                     (le32_to_cpu(lum->lum_hash_type) & LMV_HASH_TYPE_MASK) >=
548                     LMV_HASH_TYPE_CRUSH)
549                         RETURN(-EPROTO);
550
551                 /* we want rbac roles to have precedence over any other
552                  * permission or capability checks
553                  */
554                 if (!uc->uc_rbac_dne_ops ||
555                     (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
556                      uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
557                      mdt->mdt_enable_remote_dir_gid != -1))
558                         RETURN(-EPERM);
559
560                 /* restripe if later found dir exists, MDS_OPEN_CREAT means
561                  * this is create only, don't try restripe.
562                  */
563                 if (mdt->mdt_enable_dir_restripe &&
564                     le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT &&
565                     !(spec->sp_cr_flags & MDS_OPEN_CREAT))
566                         restripe = true;
567         }
568
569         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
570
571         parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
572         if (IS_ERR(parent))
573                 RETURN(PTR_ERR(parent));
574
575         if (!mdt_object_exists(parent))
576                 GOTO(put_parent, rc = -ENOENT);
577
578         rc = mdt_check_enc(info, parent);
579         if (rc)
580                 GOTO(put_parent, rc);
581
582         /*
583          * LU-10235: check if name exists locklessly first to avoid massive
584          * lock recalls on existing directories.
585          */
586         rc = mdt_lookup_version_check(info, parent, &rr->rr_name,
587                                       &info->mti_tmp_fid1, 1);
588         if (rc == 0) {
589                 if (!restripe)
590                         GOTO(put_parent, rc = -EEXIST);
591
592                 rc = mdt_restripe(info, parent, &rr->rr_name, rr->rr_fid2, spec,
593                                   ma);
594         }
595
596         /* -ENOENT is expected here */
597         if (rc != -ENOENT)
598                 GOTO(put_parent, rc);
599
600         /* save version of file name for replay, it must be ENOENT here */
601         mdt_enoent_version_save(info, 1);
602
603         OBD_RACE(OBD_FAIL_MDS_CREATE_RACE);
604
605         lh = &info->mti_lh[MDT_LH_PARENT];
606         mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
607         rc = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE);
608         if (rc)
609                 GOTO(put_parent, rc);
610
611         if (!mdt_object_remote(parent)) {
612                 rc = mdt_version_get_check_save(info, parent, 0);
613                 if (rc)
614                         GOTO(unlock_parent, rc);
615         }
616
617         child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
618         if (unlikely(IS_ERR(child)))
619                 GOTO(unlock_parent, rc = PTR_ERR(child));
620
621         ma->ma_need = MA_INODE;
622         ma->ma_valid = 0;
623
624         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
625                         OBD_FAIL_MDS_REINT_CREATE_WRITE);
626
627         /* Version of child will be updated on disk. */
628         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
629         rc = mdt_version_get_check_save(info, child, 2);
630         if (rc)
631                 GOTO(put_child, rc);
632
633         /*
634          * Do not perform lookup sanity check. We know that name does
635          * not exist.
636          */
637         info->mti_spec.sp_cr_lookup = 0;
638         if (mdt_object_remote(parent))
639                 info->mti_spec.sp_cr_lookup = 1;
640         info->mti_spec.sp_feat = &dt_directory_features;
641
642         rc = mdo_create(info->mti_env, mdt_object_child(parent), &rr->rr_name,
643                         mdt_object_child(child), &info->mti_spec, ma);
644         if (rc == 0)
645                 rc = mdt_attr_get_complex(info, child, ma);
646
647         if (rc < 0)
648                 GOTO(put_child, rc);
649
650         /*
651          * On DNE, we need to eliminate dependey between 'mkdir a' and
652          * 'mkdir a/b' if b is a striped directory, to achieve this, two
653          * things are done below:
654          * 1. save child and slaves lock.
655          * 2. if the child is a striped directory, relock parent so to
656          *    compare against with COS locks to ensure parent was
657          *    committed to disk.
658          */
659         if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
660                 struct mdt_lock_handle *lhc;
661                 struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
662                 bool cos_incompat;
663
664                 rc = mdt_object_striped(info, child);
665                 if (rc < 0)
666                         GOTO(put_child, rc);
667
668                 cos_incompat = rc;
669                 if (cos_incompat) {
670                         if (!mdt_object_remote(parent)) {
671                                 mdt_object_unlock(info, parent, lh, 1);
672                                 mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
673                                 rc = mdt_reint_object_lock(info, parent, lh,
674                                                            MDS_INODELOCK_UPDATE,
675                                                            true);
676                                 if (rc)
677                                         GOTO(put_child, rc);
678                         }
679                 }
680
681                 lhc = &info->mti_lh[MDT_LH_CHILD];
682                 mdt_lock_handle_init(lhc);
683                 mdt_lock_reg_init(lhc, LCK_PW);
684                 rc = mdt_reint_striped_lock(info, child, lhc,
685                                             MDS_INODELOCK_UPDATE, einfo,
686                                             cos_incompat);
687                 if (rc)
688                         GOTO(put_child, rc);
689
690                 mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
691         }
692
693         /* Return fid & attr to client. */
694         if (ma->ma_valid & MA_INODE)
695                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
696                                    mdt_object_fid(child));
697         EXIT;
698 put_child:
699         mdt_object_put(info->mti_env, child);
700 unlock_parent:
701         mdt_object_unlock(info, parent, lh, rc);
702 put_parent:
703         mdt_object_put(info->mti_env, parent);
704         return rc;
705 }
706
707 static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
708                         struct md_attr *ma)
709 {
710         struct mdt_lock_handle  *lh;
711         int do_vbr = ma->ma_attr.la_valid &
712                         (LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS);
713         __u64 lockpart = MDS_INODELOCK_UPDATE;
714         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
715         bool cos_incompat;
716         int rc;
717
718         ENTRY;
719         rc = mdt_object_striped(info, mo);
720         if (rc < 0)
721                 RETURN(rc);
722
723         cos_incompat = rc;
724
725         lh = &info->mti_lh[MDT_LH_PARENT];
726         mdt_lock_reg_init(lh, LCK_PW);
727
728         /* Even though the new MDT will grant PERM lock to the old
729          * client, but the old client will almost ignore that during
730          * So it needs to revoke both LOOKUP and PERM lock here, so
731          * both new and old client can cancel the dcache
732          */
733         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
734                 lockpart |= MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM;
735         /* Clear xattr cache on clients, so the virtual project ID xattr
736          * can get the new project ID
737          */
738         if (ma->ma_attr.la_valid & LA_PROJID)
739                 lockpart |= MDS_INODELOCK_XATTR;
740
741         rc = mdt_reint_striped_lock(info, mo, lh, lockpart, einfo,
742                                     cos_incompat);
743         if (rc != 0)
744                 RETURN(rc);
745
746         /* all attrs are packed into mti_attr in unpack_setattr */
747         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
748                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
749
750         /* VBR: update version if attr changed are important for recovery */
751         if (do_vbr) {
752                 /* update on-disk version of changed object */
753                 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mo));
754                 rc = mdt_version_get_check_save(info, mo, 0);
755                 if (rc)
756                         GOTO(out_unlock, rc);
757         }
758
759         /* Ensure constant striping during chown(). See LU-2789. */
760         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
761                 mutex_lock(&mo->mot_lov_mutex);
762
763         /* all attrs are packed into mti_attr in unpack_setattr */
764         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
765
766         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
767                 mutex_unlock(&mo->mot_lov_mutex);
768
769         if (rc != 0)
770                 GOTO(out_unlock, rc);
771         mdt_dom_obj_lvb_update(info->mti_env, mo, NULL, false);
772         EXIT;
773 out_unlock:
774         mdt_reint_striped_unlock(info, mo, lh, einfo, rc);
775         return rc;
776 }
777
778 /**
779  * Check HSM flags and add HS_DIRTY flag if relevant.
780  *
781  * A file could be set dirty only if it has a copy in the backend (HS_EXISTS)
782  * and is not RELEASED.
783  */
784 int mdt_add_dirty_flag(struct mdt_thread_info *info, struct mdt_object *mo,
785                         struct md_attr *ma)
786 {
787         struct lu_ucred *uc = mdt_ucred(info);
788         kernel_cap_t cap_saved;
789         int rc;
790
791         ENTRY;
792         /* If the file was modified, add the dirty flag */
793         ma->ma_need = MA_HSM;
794         rc = mdt_attr_get_complex(info, mo, ma);
795         if (rc) {
796                 CERROR("file attribute read error for "DFID": %d.\n",
797                         PFID(mdt_object_fid(mo)), rc);
798                 RETURN(rc);
799         }
800
801         /* If an up2date copy exists in the backend, add dirty flag */
802         if ((ma->ma_valid & MA_HSM) && (ma->ma_hsm.mh_flags & HS_EXISTS)
803             && !(ma->ma_hsm.mh_flags & (HS_DIRTY|HS_RELEASED))) {
804                 ma->ma_hsm.mh_flags |= HS_DIRTY;
805
806                 /* Bump cap so that closes from non-owner writers can
807                  * set the HSM state to dirty.
808                  */
809                 cap_saved = uc->uc_cap;
810                 cap_raise(uc->uc_cap, CAP_FOWNER);
811                 rc = mdt_hsm_attr_set(info, mo, &ma->ma_hsm);
812                 uc->uc_cap = cap_saved;
813                 if (rc)
814                         CERROR("file attribute change error for "DFID": %d\n",
815                                 PFID(mdt_object_fid(mo)), rc);
816         }
817
818         RETURN(rc);
819 }
820
821 static int mdt_reint_setattr(struct mdt_thread_info *info,
822                              struct mdt_lock_handle *lhc)
823 {
824         struct mdt_device *mdt = info->mti_mdt;
825         struct md_attr *ma = &info->mti_attr;
826         struct mdt_reint_record *rr = &info->mti_rr;
827         struct ptlrpc_request *req = mdt_info_req(info);
828         struct mdt_object *mo;
829         struct mdt_body *repbody;
830         ktime_t kstart = ktime_get();
831         int rc;
832
833         ENTRY;
834         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
835                   (unsigned int)ma->ma_attr.la_valid);
836
837         if (info->mti_dlm_req)
838                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
839
840         OBD_RACE(OBD_FAIL_PTLRPC_RESEND_RACE);
841
842         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
843         mo = mdt_object_find(info->mti_env, mdt, rr->rr_fid1);
844         if (IS_ERR(mo))
845                 GOTO(out, rc = PTR_ERR(mo));
846
847         if (!mdt_object_exists(mo))
848                 GOTO(out_put, rc = -ENOENT);
849
850         if (mdt_object_remote(mo))
851                 GOTO(out_put, rc = -EREMOTE);
852
853         ma->ma_enable_chprojid_gid = mdt->mdt_enable_chprojid_gid;
854         /* revoke lease lock if size is going to be changed */
855         if (unlikely(ma->ma_attr.la_valid & LA_SIZE &&
856                      !(ma->ma_attr_flags & MDS_TRUNC_KEEP_LEASE) &&
857                      atomic_read(&mo->mot_lease_count) > 0)) {
858                 down_read(&mo->mot_open_sem);
859
860                 if (atomic_read(&mo->mot_lease_count) > 0) { /* lease exists */
861                         lhc = &info->mti_lh[MDT_LH_LOCAL];
862                         mdt_lock_reg_init(lhc, LCK_CW);
863
864                         rc = mdt_object_lock(info, mo, lhc, MDS_INODELOCK_OPEN);
865                         if (rc != 0) {
866                                 up_read(&mo->mot_open_sem);
867                                 GOTO(out_put, rc);
868                         }
869
870                         /* revoke lease lock */
871                         mdt_object_unlock(info, mo, lhc, 1);
872                 }
873                 up_read(&mo->mot_open_sem);
874         }
875
876         if (ma->ma_attr.la_valid & LA_SIZE || rr->rr_flags & MRF_OPEN_TRUNC) {
877                 /* Check write access for the O_TRUNC case */
878                 if (mdt_write_read(mo) < 0)
879                         GOTO(out_put, rc = -ETXTBSY);
880
881                 /* LU-10286: compatibility check for FLR.
882                  * Please check the comment in mdt_finish_open() for details
883                  */
884                 if (!exp_connect_flr(info->mti_exp) ||
885                     !exp_connect_overstriping(info->mti_exp)) {
886                         rc = mdt_big_xattr_get(info, mo, XATTR_NAME_LOV);
887                         if (rc < 0 && rc != -ENODATA)
888                                 GOTO(out_put, rc);
889
890                         if (!exp_connect_flr(info->mti_exp)) {
891                                 if (rc > 0 &&
892                                     mdt_lmm_is_flr(info->mti_big_lmm))
893                                         GOTO(out_put, rc = -EOPNOTSUPP);
894                         }
895
896                         if (!exp_connect_overstriping(info->mti_exp)) {
897                                 if (rc > 0 &&
898                                     mdt_lmm_is_overstriping(info->mti_big_lmm))
899                                         GOTO(out_put, rc = -EOPNOTSUPP);
900                         }
901                 }
902
903                 /* For truncate, the file size sent from client
904                  * is believable, but the blocks are incorrect,
905                  * which makes the block size in LSOM attribute
906                  * inconsisent with the real block size.
907                  */
908                 rc = mdt_lsom_update(info, mo, true);
909                 if (rc)
910                         GOTO(out_put, rc);
911         }
912
913         if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
914                 if (ma->ma_valid & MA_LOV)
915                         GOTO(out_put, rc = -EPROTO);
916
917                 /* MDT supports FMD for regular files due to Data-on-MDT */
918                 if (S_ISREG(lu_object_attr(&mo->mot_obj)) &&
919                     ma->ma_attr.la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) {
920                         tgt_fmd_update(info->mti_exp, mdt_object_fid(mo),
921                                        req->rq_xid);
922
923                         if (ma->ma_attr.la_valid & LA_MTIME) {
924                                 rc = mdt_attr_get_pfid(info, mo, &ma->ma_pfid);
925                                 if (!rc)
926                                         ma->ma_valid |= MA_PFID;
927                         }
928                 }
929
930                 rc = mdt_attr_set(info, mo, ma);
931                 if (rc)
932                         GOTO(out_put, rc);
933         } else if ((ma->ma_valid & (MA_LOV | MA_LMV)) &&
934                    (ma->ma_valid & MA_INODE)) {
935                 struct lu_buf *buf = &info->mti_buf;
936                 struct lu_ucred *uc = mdt_ucred(info);
937                 struct mdt_lock_handle *lh;
938                 const char *name;
939                 __u64 lockpart = MDS_INODELOCK_XATTR;
940
941                 /* reject if either remote or striped dir is disabled */
942                 if (ma->ma_valid & MA_LMV) {
943                         if (!mdt->mdt_enable_remote_dir ||
944                             !mdt->mdt_enable_striped_dir)
945                                 GOTO(out_put, rc = -EPERM);
946
947                         /* we want rbac roles to have precedence over any other
948                          * permission or capability checks
949                          */
950                         if (!uc->uc_rbac_dne_ops ||
951                             (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
952                              uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
953                              mdt->mdt_enable_remote_dir_gid != -1))
954                                 GOTO(out_put, rc = -EPERM);
955                 }
956
957                 if (!S_ISDIR(lu_object_attr(&mo->mot_obj)))
958                         GOTO(out_put, rc = -ENOTDIR);
959
960                 if (ma->ma_attr.la_valid != 0)
961                         GOTO(out_put, rc = -EPROTO);
962
963                 lh = &info->mti_lh[MDT_LH_PARENT];
964                 mdt_lock_reg_init(lh, LCK_PW);
965
966                 if (ma->ma_valid & MA_LOV) {
967                         buf->lb_buf = ma->ma_lmm;
968                         buf->lb_len = ma->ma_lmm_size;
969                         name = XATTR_NAME_LOV;
970                 } else {
971                         struct lmv_user_md *lmu = &ma->ma_lmv->lmv_user_md;
972                         struct lu_fid *pfid = &info->mti_tmp_fid1;
973                         struct lu_name *pname = &info->mti_name;
974                         const char dotdot[] = "..";
975                         struct mdt_object *pobj;
976
977                         buf->lb_buf = lmu;
978                         buf->lb_len = ma->ma_lmv_size;
979                         name = XATTR_NAME_DEFAULT_LMV;
980
981                         if (fid_is_root(rr->rr_fid1)) {
982                                 lockpart |= MDS_INODELOCK_LOOKUP;
983                         } else {
984                                 /* force client to update dir default layout */
985                                 fid_zero(pfid);
986                                 pname->ln_name = dotdot;
987                                 pname->ln_namelen = sizeof(dotdot);
988                                 rc = mdo_lookup(info->mti_env,
989                                                 mdt_object_child(mo), pname,
990                                                 pfid, NULL);
991                                 if (rc)
992                                         GOTO(out_put, rc);
993
994                                 pobj = mdt_object_find(info->mti_env, mdt,
995                                                        pfid);
996                                 if (IS_ERR(pobj))
997                                         GOTO(out_put, rc = PTR_ERR(pobj));
998
999                                 if (mdt_object_remote(pobj))
1000                                         rc = mdt_remote_object_lock(info, pobj,
1001                                                 mdt_object_fid(mo),
1002                                                 &lh->mlh_rreg_lh, LCK_EX,
1003                                                 MDS_INODELOCK_LOOKUP, false);
1004                                 else
1005                                         lockpart |= MDS_INODELOCK_LOOKUP;
1006
1007                                 mdt_object_put(info->mti_env, pobj);
1008
1009                                 if (rc)
1010                                         GOTO(out_put, rc);
1011                         }
1012                 }
1013
1014                 rc = mdt_object_lock(info, mo, lh, lockpart);
1015                 if (rc != 0)
1016                         GOTO(out_put, rc);
1017
1018                 rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), buf,
1019                                   name, 0);
1020
1021                 mdt_object_unlock(info, mo, lh, rc);
1022                 if (rc)
1023                         GOTO(out_put, rc);
1024         } else {
1025                 GOTO(out_put, rc = -EPROTO);
1026         }
1027
1028         /* If file data is modified, add the dirty flag */
1029         if (ma->ma_attr_flags & MDS_DATA_MODIFIED)
1030                 rc = mdt_add_dirty_flag(info, mo, ma);
1031
1032         ma->ma_need = MA_INODE;
1033         ma->ma_valid = 0;
1034         rc = mdt_attr_get_complex(info, mo, ma);
1035         if (rc != 0)
1036                 GOTO(out_put, rc);
1037
1038         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
1039
1040         EXIT;
1041 out_put:
1042         mdt_object_put(info->mti_env, mo);
1043 out:
1044         if (rc == 0)
1045                 mdt_counter_incr(req, LPROC_MDT_SETATTR,
1046                                  ktime_us_delta(ktime_get(), kstart));
1047
1048         mdt_client_compatibility(info);
1049         return rc;
1050 }
1051
1052 static int mdt_reint_create(struct mdt_thread_info *info,
1053                             struct mdt_lock_handle *lhc)
1054 {
1055         struct ptlrpc_request   *req = mdt_info_req(info);
1056         ktime_t                 kstart = ktime_get();
1057         int                     rc;
1058
1059         ENTRY;
1060         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
1061                 RETURN(err_serious(-ESTALE));
1062
1063         if (info->mti_dlm_req)
1064                 ldlm_request_cancel(mdt_info_req(info),
1065                                     info->mti_dlm_req, 0, LATF_SKIP);
1066
1067         if (!lu_name_is_valid(&info->mti_rr.rr_name))
1068                 RETURN(-EPROTO);
1069
1070         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
1071         case S_IFDIR:
1072         case S_IFREG:
1073         case S_IFLNK:
1074         case S_IFCHR:
1075         case S_IFBLK:
1076         case S_IFIFO:
1077         case S_IFSOCK:
1078                 break;
1079         default:
1080                 CERROR("%s: Unsupported mode %o\n",
1081                        mdt_obd_name(info->mti_mdt),
1082                        info->mti_attr.ma_attr.la_mode);
1083                 RETURN(err_serious(-EOPNOTSUPP));
1084         }
1085
1086         rc = mdt_create(info);
1087         if (rc == 0) {
1088                 if ((info->mti_attr.ma_attr.la_mode & S_IFMT) == S_IFDIR)
1089                         mdt_counter_incr(req, LPROC_MDT_MKDIR,
1090                                          ktime_us_delta(ktime_get(), kstart));
1091                 else
1092                         /* Special file should stay on the same node as parent*/
1093                         mdt_counter_incr(req, LPROC_MDT_MKNOD,
1094                                          ktime_us_delta(ktime_get(), kstart));
1095         }
1096
1097         RETURN(rc);
1098 }
1099
1100 /*
1101  * VBR: save parent version in reply and child version getting by its name.
1102  * Version of child is getting and checking during its lookup. If
1103  */
1104 static int mdt_reint_unlink(struct mdt_thread_info *info,
1105                             struct mdt_lock_handle *lhc)
1106 {
1107         struct mdt_reint_record *rr = &info->mti_rr;
1108         struct ptlrpc_request *req = mdt_info_req(info);
1109         struct md_attr *ma = &info->mti_attr;
1110         struct lu_fid *child_fid = &info->mti_tmp_fid1;
1111         struct mdt_object *mp;
1112         struct mdt_object *mc;
1113         struct mdt_lock_handle *parent_lh;
1114         struct mdt_lock_handle *child_lh;
1115         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
1116         __u64 lock_ibits;
1117         bool cos_incompat = false;
1118         int no_name = 0;
1119         ktime_t kstart = ktime_get();
1120         int rc;
1121
1122         ENTRY;
1123         DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1),
1124                   PNAME(&rr->rr_name));
1125
1126         if (info->mti_dlm_req)
1127                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1128
1129         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
1130                 RETURN(err_serious(-ENOENT));
1131
1132         if (!fid_is_md_operative(rr->rr_fid1))
1133                 RETURN(-EPERM);
1134
1135         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1136         if (IS_ERR(mp))
1137                 RETURN(PTR_ERR(mp));
1138
1139         if (mdt_object_remote(mp)) {
1140                 cos_incompat = true;
1141         } else {
1142                 rc = mdt_version_get_check_save(info, mp, 0);
1143                 if (rc)
1144                         GOTO(put_parent, rc);
1145         }
1146
1147         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN);
1148         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN2);
1149 relock:
1150         parent_lh = &info->mti_lh[MDT_LH_PARENT];
1151         mdt_lock_pdo_init(parent_lh, LCK_PW, &rr->rr_name);
1152         rc = mdt_reint_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE,
1153                                    cos_incompat);
1154         if (rc != 0)
1155                 GOTO(put_parent, rc);
1156
1157         if (info->mti_spec.sp_rm_entry) {
1158                 struct lu_ucred *uc  = mdt_ucred(info);
1159
1160                 if (!mdt_is_dne_client(req->rq_export))
1161                         /* Return -ENOTSUPP for old client */
1162                         GOTO(unlock_parent, rc = -ENOTSUPP);
1163
1164                 if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN))
1165                         GOTO(unlock_parent, rc = -EPERM);
1166
1167                 ma->ma_need = MA_INODE;
1168                 ma->ma_valid = 0;
1169                 rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1170                                 NULL, &rr->rr_name, ma, no_name);
1171                 GOTO(unlock_parent, rc);
1172         }
1173
1174         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID) {
1175                 *child_fid = *rr->rr_fid2;
1176         } else {
1177                 /* lookup child object along with version checking */
1178                 fid_zero(child_fid);
1179                 rc = mdt_lookup_version_check(info, mp, &rr->rr_name, child_fid,
1180                                               1);
1181                 if (rc != 0) {
1182                         /* Name might not be able to find during resend of
1183                          * remote unlink, considering following case.
1184                          * dir_A is a remote directory, the name entry of
1185                          * dir_A is on MDT0, the directory is on MDT1,
1186                          *
1187                          * 1. client sends unlink req to MDT1.
1188                          * 2. MDT1 sends name delete update to MDT0.
1189                          * 3. name entry is being deleted in MDT0 synchronously.
1190                          * 4. MDT1 is restarted.
1191                          * 5. client resends unlink req to MDT1. So it can not
1192                          *    find the name entry on MDT0 anymore.
1193                          * In this case, MDT1 only needs to destory the local
1194                          * directory.
1195                          */
1196                         if (mdt_object_remote(mp) && rc == -ENOENT &&
1197                             !fid_is_zero(rr->rr_fid2) &&
1198                             lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
1199                                 no_name = 1;
1200                                 *child_fid = *rr->rr_fid2;
1201                         } else {
1202                                 GOTO(unlock_parent, rc);
1203                         }
1204                 }
1205         }
1206
1207         if (!fid_is_md_operative(child_fid))
1208                 GOTO(unlock_parent, rc = -EPERM);
1209
1210         /* We will lock the child regardless it is local or remote. No harm. */
1211         mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
1212         if (IS_ERR(mc))
1213                 GOTO(unlock_parent, rc = PTR_ERR(mc));
1214
1215         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID) {
1216                 /* In this case, child fid is embedded in the request, and we do
1217                  * not have a proper name as rr_name contains an encoded
1218                  * hash. So find name that matches provided hash.
1219                  */
1220                 if (!find_name_matching_hash(info, &rr->rr_name,
1221                                              NULL, mc))
1222                         GOTO(put_child, rc = -ENOENT);
1223         }
1224
1225         if (!cos_incompat) {
1226                 rc = mdt_object_striped(info, mc);
1227                 if (rc < 0)
1228                         GOTO(put_child, rc);
1229
1230                 cos_incompat = rc;
1231                 if (cos_incompat) {
1232                         mdt_object_put(info->mti_env, mc);
1233                         mdt_object_unlock(info, mp, parent_lh, -EAGAIN);
1234                         goto relock;
1235                 }
1236         }
1237
1238         child_lh = &info->mti_lh[MDT_LH_CHILD];
1239         mdt_lock_reg_init(child_lh, LCK_EX);
1240         if (mdt_object_remote(mc)) {
1241                 struct mdt_body  *repbody;
1242
1243                 if (!fid_is_zero(rr->rr_fid2)) {
1244                         CDEBUG(D_INFO, "%s: name "DNAME" cannot find "DFID"\n",
1245                                mdt_obd_name(info->mti_mdt),
1246                                PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1247                         GOTO(put_child, rc = -ENOENT);
1248                 }
1249                 CDEBUG(D_INFO, "%s: name "DNAME": "DFID" is on another MDT\n",
1250                        mdt_obd_name(info->mti_mdt),
1251                        PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1252
1253                 if (!mdt_is_dne_client(req->rq_export))
1254                         /* Return -ENOTSUPP for old client */
1255                         GOTO(put_child, rc = -ENOTSUPP);
1256
1257                 /* Revoke the LOOKUP lock of the remote object granted by
1258                  * this MDT. Since the unlink will happen on another MDT,
1259                  * it will release the LOOKUP lock right away. Then What
1260                  * would happen if another client try to grab the LOOKUP
1261                  * lock at the same time with unlink XXX
1262                  */
1263                 mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_LOOKUP);
1264                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1265                 LASSERT(repbody != NULL);
1266                 repbody->mbo_fid1 = *mdt_object_fid(mc);
1267                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
1268                 GOTO(unlock_child, rc = -EREMOTE);
1269         }
1270         /* We used to acquire MDS_INODELOCK_FULL here but we can't do
1271          * this now because a running HSM restore on the child (unlink
1272          * victim) will hold the layout lock. See LU-4002.
1273          */
1274         lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
1275         if (mdt_object_remote(mp)) {
1276                 /* Enqueue lookup lock from parent MDT */
1277                 rc = mdt_remote_object_lock(info, mp, mdt_object_fid(mc),
1278                                             &child_lh->mlh_rreg_lh,
1279                                             child_lh->mlh_rreg_mode,
1280                                             MDS_INODELOCK_LOOKUP, false);
1281                 if (rc != ELDLM_OK)
1282                         GOTO(put_child, rc);
1283
1284                 lock_ibits &= ~MDS_INODELOCK_LOOKUP;
1285         }
1286
1287         rc = mdt_reint_striped_lock(info, mc, child_lh, lock_ibits, einfo,
1288                                     cos_incompat);
1289         if (rc != 0)
1290                 GOTO(put_child, rc);
1291
1292         /*
1293          * Now we can only make sure we need MA_INODE, in mdd layer, will check
1294          * whether need MA_LOV and MA_COOKIE.
1295          */
1296         ma->ma_need = MA_INODE;
1297         ma->ma_valid = 0;
1298
1299         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1300                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
1301         /* save version when object is locked */
1302         mdt_version_get_save(info, mc, 1);
1303
1304         mutex_lock(&mc->mot_lov_mutex);
1305
1306         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1307                         mdt_object_child(mc), &rr->rr_name, ma, no_name);
1308
1309         mutex_unlock(&mc->mot_lov_mutex);
1310         if (rc != 0)
1311                 GOTO(unlock_child, rc);
1312
1313         if (!lu_object_is_dying(&mc->mot_header)) {
1314                 rc = mdt_attr_get_complex(info, mc, ma);
1315                 if (rc)
1316                         GOTO(out_stat, rc);
1317         } else if (mdt_dom_check_for_discard(info, mc)) {
1318                 mdt_dom_discard_data(info, mc);
1319         }
1320         mdt_handle_last_unlink(info, mc, ma);
1321
1322 out_stat:
1323         if (ma->ma_valid & MA_INODE) {
1324                 switch (ma->ma_attr.la_mode & S_IFMT) {
1325                 case S_IFDIR:
1326                         mdt_counter_incr(req, LPROC_MDT_RMDIR,
1327                                          ktime_us_delta(ktime_get(), kstart));
1328                         break;
1329                 case S_IFREG:
1330                 case S_IFLNK:
1331                 case S_IFCHR:
1332                 case S_IFBLK:
1333                 case S_IFIFO:
1334                 case S_IFSOCK:
1335                         mdt_counter_incr(req, LPROC_MDT_UNLINK,
1336                                          ktime_us_delta(ktime_get(), kstart));
1337                         break;
1338                 default:
1339                         LASSERTF(0, "bad file type %o unlinking\n",
1340                                 ma->ma_attr.la_mode);
1341                 }
1342         }
1343
1344         EXIT;
1345
1346 unlock_child:
1347         mdt_reint_striped_unlock(info, mc, child_lh, einfo, rc);
1348 put_child:
1349         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID &&
1350             info->mti_big_buf.lb_buf)
1351                 lu_buf_free(&info->mti_big_buf);
1352         mdt_object_put(info->mti_env, mc);
1353 unlock_parent:
1354         mdt_object_unlock(info, mp, parent_lh, rc);
1355 put_parent:
1356         mdt_object_put(info->mti_env, mp);
1357         CFS_RACE_WAKEUP(OBD_FAIL_OBD_ZERO_NLINK_RACE);
1358         return rc;
1359 }
1360
1361 /*
1362  * VBR: save versions in reply: 0 - parent; 1 - child by fid; 2 - target by
1363  * name.
1364  */
1365 static int mdt_reint_link(struct mdt_thread_info *info,
1366                           struct mdt_lock_handle *lhc)
1367 {
1368         struct mdt_reint_record *rr = &info->mti_rr;
1369         struct ptlrpc_request   *req = mdt_info_req(info);
1370         struct md_attr          *ma = &info->mti_attr;
1371         struct mdt_object       *ms;
1372         struct mdt_object       *mp;
1373         struct mdt_lock_handle  *lhs;
1374         struct mdt_lock_handle  *lhp;
1375         ktime_t kstart = ktime_get();
1376         bool cos_incompat;
1377         int rc;
1378
1379         ENTRY;
1380         DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/"DNAME,
1381                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), PNAME(&rr->rr_name));
1382
1383         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
1384                 RETURN(err_serious(-ENOENT));
1385
1386         if (OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_RESEND_RACE) ||
1387             OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND)) {
1388                 req->rq_no_reply = 1;
1389                 RETURN(err_serious(-ENOENT));
1390         }
1391
1392         if (info->mti_dlm_req)
1393                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1394
1395         /* Invalid case so return error immediately instead of
1396          * processing it
1397          */
1398         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
1399                 RETURN(-EPERM);
1400
1401         if (!fid_is_md_operative(rr->rr_fid1) ||
1402             !fid_is_md_operative(rr->rr_fid2))
1403                 RETURN(-EPERM);
1404
1405         /* step 1: find target parent dir */
1406         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
1407         if (IS_ERR(mp))
1408                 RETURN(PTR_ERR(mp));
1409
1410         rc = mdt_version_get_check_save(info, mp, 0);
1411         if (rc)
1412                 GOTO(put_parent, rc);
1413
1414         rc = mdt_check_enc(info, mp);
1415         if (rc)
1416                 GOTO(put_parent, rc);
1417
1418         /* step 2: find source */
1419         ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1420         if (IS_ERR(ms))
1421                 GOTO(put_parent, rc = PTR_ERR(ms));
1422
1423         if (!mdt_object_exists(ms)) {
1424                 CDEBUG(D_INFO, "%s: "DFID" does not exist.\n",
1425                        mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1));
1426                 GOTO(put_source, rc = -ENOENT);
1427         }
1428
1429         cos_incompat = (mdt_object_remote(mp) || mdt_object_remote(ms));
1430
1431         OBD_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
1432
1433         lhp = &info->mti_lh[MDT_LH_PARENT];
1434         mdt_lock_pdo_init(lhp, LCK_PW, &rr->rr_name);
1435         rc = mdt_reint_object_lock(info, mp, lhp, MDS_INODELOCK_UPDATE,
1436                                    cos_incompat);
1437         if (rc != 0)
1438                 GOTO(put_source, rc);
1439
1440         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
1441
1442         lhs = &info->mti_lh[MDT_LH_CHILD];
1443         mdt_lock_reg_init(lhs, LCK_EX);
1444         rc = mdt_reint_object_lock(info, ms, lhs,
1445                                    MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
1446                                    cos_incompat);
1447         if (rc != 0)
1448                 GOTO(unlock_parent, rc);
1449
1450         /* step 3: link it */
1451         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1452                         OBD_FAIL_MDS_REINT_LINK_WRITE);
1453
1454         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(ms));
1455         rc = mdt_version_get_check_save(info, ms, 1);
1456         if (rc)
1457                 GOTO(unlock_source, rc);
1458
1459         /** check target version by name during replay */
1460         rc = mdt_lookup_version_check(info, mp, &rr->rr_name,
1461                                       &info->mti_tmp_fid1, 2);
1462         if (rc != 0 && rc != -ENOENT)
1463                 GOTO(unlock_source, rc);
1464         /* save version of file name for replay, it must be ENOENT here */
1465         if (!req_is_replay(mdt_info_req(info))) {
1466                 if (rc != -ENOENT) {
1467                         CDEBUG(D_INFO, "link target "DNAME" existed!\n",
1468                                PNAME(&rr->rr_name));
1469                         GOTO(unlock_source, rc = -EEXIST);
1470                 }
1471                 info->mti_ver[2] = ENOENT_VERSION;
1472                 mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2);
1473         }
1474
1475         rc = mdo_link(info->mti_env, mdt_object_child(mp),
1476                       mdt_object_child(ms), &rr->rr_name, ma);
1477
1478         if (rc == 0)
1479                 mdt_counter_incr(req, LPROC_MDT_LINK,
1480                                  ktime_us_delta(ktime_get(), kstart));
1481
1482         EXIT;
1483 unlock_source:
1484         mdt_object_unlock(info, ms, lhs, rc);
1485 unlock_parent:
1486         mdt_object_unlock(info, mp, lhp, rc);
1487 put_source:
1488         mdt_object_put(info->mti_env, ms);
1489 put_parent:
1490         mdt_object_put(info->mti_env, mp);
1491         return rc;
1492 }
1493 /**
1494  * lock the part of the directory according to the hash of the name
1495  * (lh->mlh_pdo_hash) in parallel directory lock.
1496  */
1497 static int mdt_pdir_hash_lock(struct mdt_thread_info *info,
1498                               struct mdt_lock_handle *lh,
1499                               struct mdt_object *obj, __u64 ibits,
1500                               bool cos_incompat)
1501 {
1502         struct ldlm_res_id *res = &info->mti_res_id;
1503         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1504         union ldlm_policy_data *policy = &info->mti_policy;
1505         __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
1506         int rc;
1507
1508         /*
1509          * Finish res_id initializing by name hash marking part of
1510          * directory which is taking modification.
1511          */
1512         LASSERT(lh->mlh_pdo_hash != 0);
1513         fid_build_pdo_res_name(mdt_object_fid(obj), lh->mlh_pdo_hash, res);
1514         memset(policy, 0, sizeof(*policy));
1515         policy->l_inodebits.bits = ibits;
1516         if (cos_incompat &&
1517             (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX))
1518                 dlmflags |= LDLM_FL_COS_INCOMPAT;
1519         /*
1520          * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
1521          * going to be sent to client. If it is - mdt_intent_policy() path will
1522          * fix it up and turn FL_LOCAL flag off.
1523          */
1524         rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode,
1525                           policy, res, dlmflags,
1526                           &info->mti_exp->exp_handle.h_cookie);
1527         return rc;
1528 }
1529
1530 /**
1531  * Get BFL lock for rename or migrate process.
1532  **/
1533 static int mdt_rename_lock(struct mdt_thread_info *info,
1534                            struct lustre_handle *lh)
1535 {
1536         int     rc;
1537
1538         ENTRY;
1539         if (mdt_seq_site(info->mti_mdt)->ss_node_id != 0) {
1540                 struct lu_fid *fid = &info->mti_tmp_fid1;
1541                 struct mdt_object *obj;
1542
1543                 /* XXX, right now, it has to use object API to
1544                  * enqueue lock cross MDT, so it will enqueue
1545                  * rename lock(with LUSTRE_BFL_FID) by root object
1546                  */
1547                 lu_root_fid(fid);
1548                 obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1549                 if (IS_ERR(obj))
1550                         RETURN(PTR_ERR(obj));
1551
1552                 rc = mdt_remote_object_lock(info, obj,
1553                                             &LUSTRE_BFL_FID, lh,
1554                                             LCK_EX,
1555                                             MDS_INODELOCK_UPDATE, false);
1556                 mdt_object_put(info->mti_env, obj);
1557         } else {
1558                 struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1559                 union ldlm_policy_data *policy = &info->mti_policy;
1560                 struct ldlm_res_id *res_id = &info->mti_res_id;
1561                 __u64 flags = 0;
1562
1563                 fid_build_reg_res_name(&LUSTRE_BFL_FID, res_id);
1564                 memset(policy, 0, sizeof(*policy));
1565                 policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
1566                 flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
1567                 rc = ldlm_cli_enqueue_local(info->mti_env, ns, res_id,
1568                                             LDLM_IBITS, policy, LCK_EX, &flags,
1569                                             ldlm_blocking_ast,
1570                                             ldlm_completion_ast, NULL, NULL, 0,
1571                                             LVB_T_NONE,
1572                                             &info->mti_exp->exp_handle.h_cookie,
1573                                             lh);
1574                 RETURN(rc);
1575         }
1576         RETURN(rc);
1577 }
1578
1579 static void mdt_rename_unlock(struct lustre_handle *lh)
1580 {
1581         ENTRY;
1582         LASSERT(lustre_handle_is_used(lh));
1583         /* Cancel the single rename lock right away */
1584         ldlm_lock_decref_and_cancel(lh, LCK_EX);
1585         EXIT;
1586 }
1587
1588 static struct mdt_object *mdt_parent_find_check(struct mdt_thread_info *info,
1589                                                 const struct lu_fid *fid,
1590                                                 int idx)
1591 {
1592         struct mdt_object *dir;
1593         int rc;
1594
1595         ENTRY;
1596         dir = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1597         if (IS_ERR(dir))
1598                 RETURN(dir);
1599
1600         /* check early, the real version will be saved after locking */
1601         rc = mdt_version_get_check(info, dir, idx);
1602         if (rc)
1603                 GOTO(out_put, rc);
1604
1605         if (!mdt_object_exists(dir))
1606                 GOTO(out_put, rc = -ENOENT);
1607
1608         if (!S_ISDIR(lu_object_attr(&dir->mot_obj)))
1609                 GOTO(out_put, rc = -ENOTDIR);
1610
1611         RETURN(dir);
1612 out_put:
1613         mdt_object_put(info->mti_env, dir);
1614         return ERR_PTR(rc);
1615 }
1616
1617 /*
1618  * in case obj is remote obj on its parent, revoke LOOKUP lock,
1619  * herein we don't really check it, just do revoke.
1620  */
1621 int mdt_revoke_remote_lookup_lock(struct mdt_thread_info *info,
1622                                   struct mdt_object *pobj,
1623                                   struct mdt_object *obj)
1624 {
1625         struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
1626         int rc;
1627
1628         mdt_lock_handle_init(lh);
1629         mdt_lock_reg_init(lh, LCK_EX);
1630
1631         if (mdt_object_remote(pobj)) {
1632                 /* don't bother to check if pobj and obj are on the same MDT. */
1633                 rc = mdt_remote_object_lock(info, pobj, mdt_object_fid(obj),
1634                                             &lh->mlh_rreg_lh, LCK_EX,
1635                                             MDS_INODELOCK_LOOKUP, false);
1636         } else if (mdt_object_remote(obj)) {
1637                 struct ldlm_res_id *res = &info->mti_res_id;
1638                 union ldlm_policy_data *policy = &info->mti_policy;
1639                 __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB |
1640                                  LDLM_FL_COS_INCOMPAT;
1641
1642                 fid_build_reg_res_name(mdt_object_fid(obj), res);
1643                 memset(policy, 0, sizeof(*policy));
1644                 policy->l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1645                 rc = mdt_fid_lock(info->mti_env, info->mti_mdt->mdt_namespace,
1646                                   &lh->mlh_reg_lh, LCK_EX, policy, res,
1647                                   dlmflags, NULL);
1648         } else {
1649                 /* do nothing if both are local */
1650                 return 0;
1651         }
1652
1653         if (rc != ELDLM_OK)
1654                 return rc;
1655
1656         /*
1657          * TODO, currently we don't save this lock because there is no place to
1658          * hold this lock handle, but to avoid race we need to save this lock.
1659          */
1660         mdt_object_unlock(info, NULL, lh, 1);
1661
1662         return 0;
1663 }
1664
1665 /*
1666  * operation may takes locks of linkea, or directory stripes, group them in
1667  * different list.
1668  */
1669 struct mdt_sub_lock {
1670         struct mdt_object *msl_obj;
1671         struct mdt_lock_handle msl_lh;
1672         struct list_head msl_linkage;
1673 };
1674
1675 static void mdt_unlock_list(struct mdt_thread_info *info,
1676                             struct list_head *list, int decref)
1677 {
1678         struct mdt_sub_lock *msl;
1679         struct mdt_sub_lock *tmp;
1680
1681         list_for_each_entry_safe(msl, tmp, list, msl_linkage) {
1682                 mdt_object_unlock_put(info, msl->msl_obj, &msl->msl_lh, decref);
1683                 list_del(&msl->msl_linkage);
1684                 OBD_FREE_PTR(msl);
1685         }
1686 }
1687
1688 static inline void mdt_migrate_object_unlock(struct mdt_thread_info *info,
1689                                              struct mdt_object *obj,
1690                                              struct mdt_lock_handle *lh,
1691                                              struct ldlm_enqueue_info *einfo,
1692                                              struct list_head *slave_locks,
1693                                              int decref)
1694 {
1695         if (mdt_object_remote(obj)) {
1696                 mdt_unlock_list(info, slave_locks, decref);
1697                 mdt_object_unlock(info, obj, lh, decref);
1698         } else {
1699                 mdt_reint_striped_unlock(info, obj, lh, einfo, decref);
1700         }
1701 }
1702
1703 /*
1704  * lock parents of links, and also check whether total locks don't exceed
1705  * RS_MAX_LOCKS.
1706  *
1707  * \retval      0 on success, and locks can be saved in ptlrpc_reply_stat
1708  * \retval      1 on success, but total lock count may exceed RS_MAX_LOCKS
1709  * \retval      -ev negative errno upon error
1710  */
1711 static int mdt_link_parents_lock(struct mdt_thread_info *info,
1712                                  struct mdt_object *pobj,
1713                                  const struct md_attr *ma,
1714                                  struct mdt_object *obj,
1715                                  struct mdt_lock_handle *lhp,
1716                                  struct ldlm_enqueue_info *peinfo,
1717                                  struct list_head *parent_slave_locks,
1718                                  struct list_head *link_locks)
1719 {
1720         struct mdt_device *mdt = info->mti_mdt;
1721         struct lu_buf *buf = &info->mti_big_buf;
1722         struct lu_name *lname = &info->mti_name;
1723         struct linkea_data ldata = { NULL };
1724         bool blocked = false;
1725         int local_lnkp_cnt = 0;
1726         int rc;
1727
1728         ENTRY;
1729         if (S_ISDIR(lu_object_attr(&obj->mot_obj)))
1730                 RETURN(0);
1731
1732         buf = lu_buf_check_and_alloc(buf, MAX_LINKEA_SIZE);
1733         if (buf->lb_buf == NULL)
1734                 RETURN(-ENOMEM);
1735
1736         ldata.ld_buf = buf;
1737         rc = mdt_links_read(info, obj, &ldata);
1738         if (rc) {
1739                 if (rc == -ENOENT || rc == -ENODATA)
1740                         rc = 0;
1741                 RETURN(rc);
1742         }
1743
1744         for (linkea_first_entry(&ldata); ldata.ld_lee && !rc;
1745              linkea_next_entry(&ldata)) {
1746                 struct mdt_object *lnkp;
1747                 struct mdt_sub_lock *msl;
1748                 struct lu_fid fid;
1749                 __u64 ibits;
1750
1751                 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, lname,
1752                                     &fid);
1753
1754                 /* check if it's also linked to parent */
1755                 if (lu_fid_eq(mdt_object_fid(pobj), &fid)) {
1756                         CDEBUG(D_INFO, "skip parent "DFID", reovke "DNAME"\n",
1757                                PFID(&fid), PNAME(lname));
1758                         /* in case link is remote object, revoke LOOKUP lock */
1759                         rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
1760                         continue;
1761                 }
1762
1763                 lnkp = NULL;
1764
1765                 /* check if it's linked to a stripe of parent */
1766                 if (ma->ma_valid & MA_LMV) {
1767                         struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1768                         struct lu_fid *stripe_fid = &info->mti_tmp_fid1;
1769                         int j = 0;
1770
1771                         for (; j < le32_to_cpu(lmv->lmv_stripe_count); j++) {
1772                                 fid_le_to_cpu(stripe_fid,
1773                                               &lmv->lmv_stripe_fids[j]);
1774                                 if (lu_fid_eq(stripe_fid, &fid)) {
1775                                         CDEBUG(D_INFO, "skip stripe "DFID
1776                                                ", reovke "DNAME"\n",
1777                                                PFID(&fid), PNAME(lname));
1778                                         lnkp = mdt_object_find(info->mti_env,
1779                                                                mdt, &fid);
1780                                         if (IS_ERR(lnkp))
1781                                                 GOTO(out, rc = PTR_ERR(lnkp));
1782                                         break;
1783                                 }
1784                         }
1785
1786                         if (lnkp) {
1787                                 rc = mdt_revoke_remote_lookup_lock(info, lnkp,
1788                                                                    obj);
1789                                 mdt_object_put(info->mti_env, lnkp);
1790                                 continue;
1791                         }
1792                 }
1793
1794                 /* Check if it's already locked */
1795                 list_for_each_entry(msl, link_locks, msl_linkage) {
1796                         if (lu_fid_eq(mdt_object_fid(msl->msl_obj), &fid)) {
1797                                 CDEBUG(D_INFO,
1798                                        DFID" was locked, revoke "DNAME"\n",
1799                                        PFID(&fid), PNAME(lname));
1800                                 lnkp = msl->msl_obj;
1801                                 break;
1802                         }
1803                 }
1804
1805                 if (lnkp) {
1806                         rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
1807                         continue;
1808                 }
1809
1810                 CDEBUG(D_INFO, "lock "DFID":"DNAME"\n",
1811                        PFID(&fid), PNAME(lname));
1812
1813                 lnkp = mdt_object_find(info->mti_env, mdt, &fid);
1814                 if (IS_ERR(lnkp)) {
1815                         CWARN("%s: cannot find obj "DFID": %ld\n",
1816                               mdt_obd_name(mdt), PFID(&fid), PTR_ERR(lnkp));
1817                         continue;
1818                 }
1819
1820                 if (!mdt_object_exists(lnkp)) {
1821                         CDEBUG(D_INFO, DFID" doesn't exist, skip "DNAME"\n",
1822                               PFID(&fid), PNAME(lname));
1823                         mdt_object_put(info->mti_env, lnkp);
1824                         continue;
1825                 }
1826
1827                 if (!mdt_object_remote(lnkp))
1828                         local_lnkp_cnt++;
1829
1830                 OBD_ALLOC_PTR(msl);
1831                 if (msl == NULL)
1832                         GOTO(out, rc = -ENOMEM);
1833
1834                 /*
1835                  * we can't follow parent-child lock order like other MD
1836                  * operations, use lock_try here to avoid deadlock, if the lock
1837                  * cannot be taken, drop all locks taken, revoke the blocked
1838                  * one, and continue processing the remaining entries, and in
1839                  * the end of the loop restart from beginning.
1840                  */
1841                 mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
1842                 ibits = 0;
1843                 rc = mdt_object_lock_try(info, lnkp, &msl->msl_lh, &ibits,
1844                                          MDS_INODELOCK_UPDATE, true);
1845                 if (!(ibits & MDS_INODELOCK_UPDATE)) {
1846
1847                         CDEBUG(D_INFO, "busy lock on "DFID" "DNAME"\n",
1848                                PFID(&fid), PNAME(lname));
1849
1850                         mdt_unlock_list(info, link_locks, 1);
1851                         /* also unlock parent locks to avoid deadlock */
1852                         if (!blocked)
1853                                 mdt_migrate_object_unlock(info, pobj, lhp,
1854                                                           peinfo,
1855                                                           parent_slave_locks,
1856                                                           1);
1857
1858                         blocked = true;
1859
1860                         mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
1861                         rc = mdt_object_lock(info, lnkp, &msl->msl_lh,
1862                                              MDS_INODELOCK_UPDATE);
1863                         if (rc) {
1864                                 mdt_object_put(info->mti_env, lnkp);
1865                                 OBD_FREE_PTR(msl);
1866                                 GOTO(out, rc);
1867                         }
1868
1869                         if (mdt_object_remote(lnkp)) {
1870                                 struct ldlm_lock *lock;
1871
1872                                 /*
1873                                  * for remote object, set lock cb_atomic,
1874                                  * so lock can be released in blocking_ast()
1875                                  * immediately, then the next lock_try will
1876                                  * have better chance of success.
1877                                  */
1878                                 lock = ldlm_handle2lock(
1879                                                 &msl->msl_lh.mlh_rreg_lh);
1880                                 LASSERT(lock != NULL);
1881                                 lock_res_and_lock(lock);
1882                                 ldlm_set_atomic_cb(lock);
1883                                 unlock_res_and_lock(lock);
1884                                 LDLM_LOCK_PUT(lock);
1885                         }
1886
1887                         mdt_object_unlock_put(info, lnkp, &msl->msl_lh, 1);
1888                         OBD_FREE_PTR(msl);
1889                         continue;
1890                 }
1891
1892                 INIT_LIST_HEAD(&msl->msl_linkage);
1893                 msl->msl_obj = lnkp;
1894                 list_add_tail(&msl->msl_linkage, link_locks);
1895
1896                 rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
1897         }
1898
1899         if (blocked)
1900                 GOTO(out, rc = -EBUSY);
1901
1902         EXIT;
1903 out:
1904         if (rc) {
1905                 mdt_unlock_list(info, link_locks, rc);
1906         } else if (local_lnkp_cnt > RS_MAX_LOCKS - 5) {
1907                 CDEBUG(D_INFO, "Too many links (%d), sync operations\n",
1908                        local_lnkp_cnt);
1909                 /*
1910                  * parent may have 3 local objects: master object and 2 stripes
1911                  * (if it's being migrated too); source may have 1 local objects
1912                  * as regular file; target has 1 local object.
1913                  * Note, source may have 2 local locks if it is directory but it
1914                  * can't have hardlinks, so it is not considered here.
1915                  */
1916                 rc = 1;
1917         }
1918         return rc;
1919 }
1920
1921 static int mdt_lock_remote_slaves(struct mdt_thread_info *info,
1922                                   struct mdt_object *obj,
1923                                   const struct md_attr *ma,
1924                                   struct list_head *slave_locks)
1925 {
1926         struct mdt_device *mdt = info->mti_mdt;
1927         const struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1928         struct lu_fid *fid = &info->mti_tmp_fid1;
1929         struct mdt_object *slave;
1930         struct mdt_sub_lock *msl;
1931         int i;
1932         int rc;
1933
1934         ENTRY;
1935         LASSERT(mdt_object_remote(obj));
1936         LASSERT(ma->ma_valid & MA_LMV);
1937         LASSERT(lmv);
1938
1939         if (!lmv_is_sane(lmv))
1940                 RETURN(-EINVAL);
1941
1942         for (i = 0; i < le32_to_cpu(lmv->lmv_stripe_count); i++) {
1943                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[i]);
1944
1945                 if (!fid_is_sane(fid))
1946                         continue;
1947
1948                 slave = mdt_object_find(info->mti_env, mdt, fid);
1949                 if (IS_ERR(slave))
1950                         GOTO(out, rc = PTR_ERR(slave));
1951
1952                 OBD_ALLOC_PTR(msl);
1953                 if (!msl) {
1954                         mdt_object_put(info->mti_env, slave);
1955                         GOTO(out, rc = -ENOMEM);
1956                 }
1957
1958                 mdt_lock_reg_init(&msl->msl_lh, LCK_EX);
1959                 rc = mdt_reint_object_lock(info, slave, &msl->msl_lh,
1960                                            MDS_INODELOCK_UPDATE, true);
1961                 if (rc) {
1962                         OBD_FREE_PTR(msl);
1963                         mdt_object_put(info->mti_env, slave);
1964                         GOTO(out, rc);
1965                 }
1966
1967                 INIT_LIST_HEAD(&msl->msl_linkage);
1968                 msl->msl_obj = slave;
1969                 list_add_tail(&msl->msl_linkage, slave_locks);
1970         }
1971         EXIT;
1972
1973 out:
1974         if (rc)
1975                 mdt_unlock_list(info, slave_locks, rc);
1976         return rc;
1977 }
1978
1979 /* lock parent and its stripes */
1980 static int mdt_migrate_parent_lock(struct mdt_thread_info *info,
1981                                    struct mdt_object *obj,
1982                                    const struct md_attr *ma,
1983                                    struct mdt_lock_handle *lh,
1984                                    struct ldlm_enqueue_info *einfo,
1985                                    struct list_head *slave_locks)
1986 {
1987         int rc;
1988
1989         if (mdt_object_remote(obj)) {
1990                 rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
1991                                             &lh->mlh_rreg_lh, LCK_PW,
1992                                             MDS_INODELOCK_UPDATE, false);
1993                 if (rc != ELDLM_OK)
1994                         return rc;
1995
1996                 /*
1997                  * if obj is remote and striped, lock its stripes explicitly
1998                  * because it's not striped in LOD layer on this MDT.
1999                  */
2000                 if (ma->ma_valid & MA_LMV) {
2001                         rc = mdt_lock_remote_slaves(info, obj, ma, slave_locks);
2002                         if (rc)
2003                                 mdt_object_unlock(info, obj, lh, rc);
2004                 }
2005         } else {
2006                 rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_UPDATE,
2007                                             einfo, true);
2008         }
2009
2010         return rc;
2011 }
2012
2013 /*
2014  * in migration, object may be remote, and we need take full lock of it and its
2015  * stripes if it's directory, besides, object may be a remote object on its
2016  * parent, revoke its LOOKUP lock on where its parent is located.
2017  */
2018 static int mdt_migrate_object_lock(struct mdt_thread_info *info,
2019                                    struct mdt_object *pobj,
2020                                    struct mdt_object *obj,
2021                                    struct mdt_lock_handle *lh,
2022                                    struct ldlm_enqueue_info *einfo,
2023                                    struct list_head *slave_locks)
2024 {
2025         int rc;
2026
2027         if (mdt_object_remote(obj)) {
2028                 rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
2029                 if (rc)
2030                         return rc;
2031
2032                 rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
2033                                             &lh->mlh_rreg_lh, LCK_EX,
2034                                             MDS_INODELOCK_FULL, false);
2035                 if (rc != ELDLM_OK)
2036                         return rc;
2037
2038                 /*
2039                  * if obj is remote and striped, lock its stripes explicitly
2040                  * because it's not striped in LOD layer on this MDT.
2041                  */
2042                 if (S_ISDIR(lu_object_attr(&obj->mot_obj))) {
2043                         struct md_attr *ma = &info->mti_attr;
2044
2045                         rc = mdt_stripe_get(info, obj, ma, XATTR_NAME_LMV);
2046                         if (rc) {
2047                                 mdt_object_unlock(info, obj, lh, rc);
2048                                 return rc;
2049                         }
2050
2051                         if (ma->ma_valid & MA_LMV) {
2052                                 rc = mdt_lock_remote_slaves(info, obj, ma,
2053                                                             slave_locks);
2054                                 if (rc)
2055                                         mdt_object_unlock(info, obj, lh, rc);
2056                         }
2057                 }
2058         } else {
2059                 if (mdt_object_remote(pobj)) {
2060                         rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
2061                         if (rc)
2062                                 return rc;
2063                 }
2064
2065                 rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_FULL,
2066                                             einfo, true);
2067         }
2068
2069         return rc;
2070 }
2071
2072 /*
2073  * lookup source by name, if parent is striped directory, we need to find the
2074  * corresponding stripe where source is located, and then lookup there.
2075  *
2076  * besides, if parent is migrating too, and file is already in target stripe,
2077  * this should be a redo of 'lfs migrate' on client side.
2078  */
2079 static int mdt_migrate_lookup(struct mdt_thread_info *info,
2080                               struct mdt_object *pobj,
2081                               const struct md_attr *ma,
2082                               const struct lu_name *lname,
2083                               struct mdt_object **spobj,
2084                               struct mdt_object **sobj)
2085 {
2086         const struct lu_env *env = info->mti_env;
2087         struct lu_fid *fid = &info->mti_tmp_fid1;
2088         struct mdt_object *stripe;
2089         int rc;
2090
2091         if (ma->ma_valid & MA_LMV) {
2092                 /* if parent is striped, lookup on corresponding stripe */
2093                 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
2094
2095                 if (!lmv_is_sane(lmv))
2096                         return -EBADF;
2097
2098                 rc = lmv_name_to_stripe_index_old(lmv, lname->ln_name,
2099                                                   lname->ln_namelen);
2100                 if (rc < 0)
2101                         return rc;
2102
2103                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
2104
2105                 stripe = mdt_object_find(env, info->mti_mdt, fid);
2106                 if (IS_ERR(stripe))
2107                         return PTR_ERR(stripe);
2108
2109                 fid_zero(fid);
2110                 rc = mdo_lookup(env, mdt_object_child(stripe), lname, fid,
2111                                 &info->mti_spec);
2112                 if (rc == -ENOENT && lmv_is_layout_changing(lmv)) {
2113                         /*
2114                          * if parent layout is changeing, and lookup child
2115                          * failed on source stripe, lookup again on target
2116                          * stripe, if it exists, it means previous migration
2117                          * was interrupted, and current file was migrated
2118                          * already.
2119                          */
2120                         mdt_object_put(env, stripe);
2121
2122                         rc = lmv_name_to_stripe_index(lmv, lname->ln_name,
2123                                                       lname->ln_namelen);
2124                         if (rc < 0)
2125                                 return rc;
2126
2127                         fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
2128
2129                         stripe = mdt_object_find(env, info->mti_mdt, fid);
2130                         if (IS_ERR(stripe))
2131                                 return PTR_ERR(stripe);
2132
2133                         fid_zero(fid);
2134                         rc = mdo_lookup(env, mdt_object_child(stripe), lname,
2135                                         fid, &info->mti_spec);
2136                         mdt_object_put(env, stripe);
2137                         return rc ?: -EALREADY;
2138                 } else if (rc) {
2139                         mdt_object_put(env, stripe);
2140                         return rc;
2141                 }
2142         } else {
2143                 fid_zero(fid);
2144                 rc = mdo_lookup(env, mdt_object_child(pobj), lname, fid,
2145                                 &info->mti_spec);
2146                 if (rc)
2147                         return rc;
2148
2149                 stripe = pobj;
2150                 mdt_object_get(env, stripe);
2151         }
2152
2153         *spobj = stripe;
2154
2155         *sobj = mdt_object_find(env, info->mti_mdt, fid);
2156         if (IS_ERR(*sobj)) {
2157                 mdt_object_put(env, stripe);
2158                 rc = PTR_ERR(*sobj);
2159                 *spobj = NULL;
2160                 *sobj = NULL;
2161         }
2162
2163         return rc;
2164 }
2165
2166 /* end lease and close file for regular file */
2167 static int mdd_migrate_close(struct mdt_thread_info *info,
2168                              struct mdt_object *obj)
2169 {
2170         struct close_data *data;
2171         struct mdt_body *repbody;
2172         struct ldlm_lock *lease;
2173         int rc;
2174         int rc2;
2175
2176         rc = -EPROTO;
2177         if (!req_capsule_field_present(info->mti_pill, &RMF_MDT_EPOCH,
2178                                       RCL_CLIENT) ||
2179             !req_capsule_field_present(info->mti_pill, &RMF_CLOSE_DATA,
2180                                       RCL_CLIENT))
2181                 goto close;
2182
2183         data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
2184         if (!data)
2185                 goto close;
2186
2187         rc = -ESTALE;
2188         lease = ldlm_handle2lock(&data->cd_handle);
2189         if (!lease)
2190                 goto close;
2191
2192         /* check if the lease was already canceled */
2193         lock_res_and_lock(lease);
2194         rc = ldlm_is_cancel(lease);
2195         unlock_res_and_lock(lease);
2196
2197         if (rc) {
2198                 rc = -EAGAIN;
2199                 LDLM_DEBUG(lease, DFID" lease broken",
2200                            PFID(mdt_object_fid(obj)));
2201         }
2202
2203         /*
2204          * cancel server side lease, client side counterpart should have been
2205          * cancelled, it's okay to cancel it now as we've held mot_open_sem.
2206          */
2207         ldlm_lock_cancel(lease);
2208         ldlm_reprocess_all(lease->l_resource,
2209                            lease->l_policy_data.l_inodebits.bits);
2210         LDLM_LOCK_PUT(lease);
2211
2212 close:
2213         rc2 = mdt_close_internal(info, mdt_info_req(info), NULL);
2214         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2215         repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
2216
2217         return rc ?: rc2;
2218 }
2219
2220 /*
2221  * migrate file in below steps:
2222  *  1. lock parent and its stripes
2223  *  2. lookup source by name
2224  *  3. lock parents of source links if source is not directory
2225  *  4. reject if source is in HSM
2226  *  5. take source open_sem and close file if source is regular file
2227  *  6. lock source and its stripes if it's directory
2228  *  7. lock target so subsequent change to it can trigger COS
2229  *  8. migrate file
2230  *  9. unlock above locks
2231  * 10. sync device if source has links
2232  */
2233 int mdt_reint_migrate(struct mdt_thread_info *info,
2234                       struct mdt_lock_handle *unused)
2235 {
2236         const struct lu_env *env = info->mti_env;
2237         struct mdt_device *mdt = info->mti_mdt;
2238         struct ptlrpc_request *req = mdt_info_req(info);
2239         struct mdt_reint_record *rr = &info->mti_rr;
2240         struct lu_ucred *uc = mdt_ucred(info);
2241         struct md_attr *ma = &info->mti_attr;
2242         struct ldlm_enqueue_info *peinfo = &info->mti_einfo[0];
2243         struct ldlm_enqueue_info *seinfo = &info->mti_einfo[1];
2244         struct mdt_object *pobj;
2245         struct mdt_object *spobj = NULL;
2246         struct mdt_object *sobj = NULL;
2247         struct mdt_object *tobj;
2248         struct lustre_handle rename_lh = { 0 };
2249         struct mdt_lock_handle *lhp;
2250         struct mdt_lock_handle *lhs;
2251         struct mdt_lock_handle *lht;
2252         LIST_HEAD(parent_slave_locks);
2253         LIST_HEAD(child_slave_locks);
2254         LIST_HEAD(link_locks);
2255         int lock_retries = 5;
2256         bool open_sem_locked = false;
2257         bool do_sync = false;
2258         int rc;
2259
2260         ENTRY;
2261         CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1),
2262                PNAME(&rr->rr_name), PFID(rr->rr_fid2));
2263
2264         if (info->mti_dlm_req)
2265                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2266
2267         if (!fid_is_md_operative(rr->rr_fid1) ||
2268             !fid_is_md_operative(rr->rr_fid2))
2269                 RETURN(-EPERM);
2270
2271         /* don't allow migrate . or .. */
2272         if (lu_name_is_dot_or_dotdot(&rr->rr_name))
2273                 RETURN(-EBUSY);
2274
2275         if (!mdt->mdt_enable_remote_dir || !mdt->mdt_enable_dir_migration)
2276                 RETURN(-EPERM);
2277
2278         /* we want rbac roles to have precedence over any other
2279          * permission or capability checks
2280          */
2281         if (uc && (!uc->uc_rbac_dne_ops ||
2282                    (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
2283                     uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
2284                     mdt->mdt_enable_remote_dir_gid != -1)))
2285                 RETURN(-EPERM);
2286
2287         /*
2288          * Note: do not enqueue rename lock for replay request, because
2289          * if other MDT holds rename lock, but being blocked to wait for
2290          * this MDT to finish its recovery, and the failover MDT can not
2291          * get rename lock, which will cause deadlock.
2292          *
2293          * req is NULL if this is called by directory auto-split.
2294          */
2295         if (req && !req_is_replay(req)) {
2296                 rc = mdt_rename_lock(info, &rename_lh);
2297                 if (rc != 0) {
2298                         CERROR("%s: can't lock FS for rename: rc = %d\n",
2299                                mdt_obd_name(info->mti_mdt), rc);
2300                         RETURN(rc);
2301                 }
2302         }
2303
2304         /* pobj is master object of parent */
2305         pobj = mdt_object_find(env, mdt, rr->rr_fid1);
2306         if (IS_ERR(pobj))
2307                 GOTO(unlock_rename, rc = PTR_ERR(pobj));
2308
2309         if (req) {
2310                 rc = mdt_version_get_check(info, pobj, 0);
2311                 if (rc)
2312                         GOTO(put_parent, rc);
2313         }
2314
2315         if (!mdt_object_exists(pobj))
2316                 GOTO(put_parent, rc = -ENOENT);
2317
2318         if (!S_ISDIR(lu_object_attr(&pobj->mot_obj)))
2319                 GOTO(put_parent, rc = -ENOTDIR);
2320
2321         rc = mdt_check_enc(info, pobj);
2322         if (rc)
2323                 GOTO(put_parent, rc);
2324
2325         rc = mdt_stripe_get(info, pobj, ma, XATTR_NAME_LMV);
2326         if (rc)
2327                 GOTO(put_parent, rc);
2328
2329 lock_parent:
2330         /* lock parent object */
2331         lhp = &info->mti_lh[MDT_LH_PARENT];
2332         mdt_lock_reg_init(lhp, LCK_PW);
2333         rc = mdt_migrate_parent_lock(info, pobj, ma, lhp, peinfo,
2334                                      &parent_slave_locks);
2335         if (rc)
2336                 GOTO(put_parent, rc);
2337
2338         /*
2339          * spobj is the corresponding stripe against name if pobj is striped
2340          * directory, which is the real parent, and no need to lock, because
2341          * we've taken full lock of pobj.
2342          */
2343         rc = mdt_migrate_lookup(info, pobj, ma, &rr->rr_name, &spobj, &sobj);
2344         if (rc)
2345                 GOTO(unlock_parent, rc);
2346
2347         /* lock parents of source links, and revoke LOOKUP lock of links */
2348         rc = mdt_link_parents_lock(info, pobj, ma, sobj, lhp, peinfo,
2349                                    &parent_slave_locks, &link_locks);
2350         if (rc == -EBUSY && lock_retries-- > 0) {
2351                 mdt_object_put(env, sobj);
2352                 mdt_object_put(env, spobj);
2353                 goto lock_parent;
2354         }
2355
2356         if (rc < 0)
2357                 GOTO(put_source, rc);
2358
2359         /*
2360          * RS_MAX_LOCKS is the limit of number of locks that can be saved along
2361          * with one request, if total lock count exceeds this limit, we will
2362          * drop all locks after migration, and synchronous device in the end.
2363          */
2364         do_sync = rc;
2365
2366         /* TODO: DoM migration is not supported, migrate dirent only */
2367         if (S_ISREG(lu_object_attr(&sobj->mot_obj))) {
2368                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LOV);
2369                 if (rc)
2370                         GOTO(unlock_links, rc);
2371
2372                 if (ma->ma_valid & MA_LOV && mdt_lmm_dom_stripesize(ma->ma_lmm))
2373                         info->mti_spec.sp_migrate_nsonly = 1;
2374         } else if (S_ISDIR(lu_object_attr(&sobj->mot_obj))) {
2375                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2376                 if (rc)
2377                         GOTO(unlock_links, rc);
2378
2379                 /* race with restripe/auto-split? */
2380                 if ((ma->ma_valid & MA_LMV) &&
2381                     lmv_is_restriping(&ma->ma_lmv->lmv_md_v1))
2382                         GOTO(unlock_links, rc = -EBUSY);
2383         }
2384
2385         /* if migration HSM is allowed */
2386         if (!mdt->mdt_opts.mo_migrate_hsm_allowed) {
2387                 ma->ma_need = MA_HSM;
2388                 ma->ma_valid = 0;
2389                 rc = mdt_attr_get_complex(info, sobj, ma);
2390                 if (rc)
2391                         GOTO(unlock_links, rc);
2392
2393                 if ((ma->ma_valid & MA_HSM) && ma->ma_hsm.mh_flags != 0)
2394                         GOTO(unlock_links, rc = -EOPNOTSUPP);
2395         }
2396
2397         /* end lease and close file for regular file */
2398         if (info->mti_spec.sp_migrate_close) {
2399                 /* try to hold open_sem so that nobody else can open the file */
2400                 if (!down_write_trylock(&sobj->mot_open_sem)) {
2401                         /* close anyway */
2402                         mdd_migrate_close(info, sobj);
2403                         GOTO(unlock_links, rc = -EBUSY);
2404                 } else {
2405                         open_sem_locked = true;
2406                         rc = mdd_migrate_close(info, sobj);
2407                         if (rc)
2408                                 GOTO(unlock_open_sem, rc);
2409                 }
2410         }
2411
2412         /* lock source */
2413         lhs = &info->mti_lh[MDT_LH_OLD];
2414         mdt_lock_reg_init(lhs, LCK_EX);
2415         rc = mdt_migrate_object_lock(info, spobj, sobj, lhs, seinfo,
2416                                      &child_slave_locks);
2417         if (rc)
2418                 GOTO(unlock_open_sem, rc);
2419
2420         /* lock target */
2421         tobj = mdt_object_find(env, mdt, rr->rr_fid2);
2422         if (IS_ERR(tobj))
2423                 GOTO(unlock_source, rc = PTR_ERR(tobj));
2424
2425         lht = &info->mti_lh[MDT_LH_NEW];
2426         mdt_lock_reg_init(lht, LCK_EX);
2427         rc = mdt_reint_object_lock(info, tobj, lht, MDS_INODELOCK_FULL, true);
2428         if (rc)
2429                 GOTO(put_target, rc);
2430
2431         /* Don't do lookup sanity check. We know name doesn't exist. */
2432         info->mti_spec.sp_cr_lookup = 0;
2433         info->mti_spec.sp_feat = &dt_directory_features;
2434
2435         rc = mdo_migrate(env, mdt_object_child(pobj),
2436                          mdt_object_child(sobj), &rr->rr_name,
2437                          mdt_object_child(tobj),
2438                          &info->mti_spec, ma);
2439         if (!rc)
2440                 lprocfs_counter_incr(mdt->mdt_lu_dev.ld_obd->obd_md_stats,
2441                                      LPROC_MDT_MIGRATE + LPROC_MD_LAST_OPC);
2442         EXIT;
2443
2444         mdt_object_unlock(info, tobj, lht, rc);
2445 put_target:
2446         mdt_object_put(env, tobj);
2447 unlock_source:
2448         mdt_migrate_object_unlock(info, sobj, lhs, seinfo,
2449                                   &child_slave_locks, rc);
2450 unlock_open_sem:
2451         if (open_sem_locked)
2452                 up_write(&sobj->mot_open_sem);
2453 unlock_links:
2454         /* if we've got too many locks to save into RPC,
2455          * then just commit before the locks are released
2456          */
2457         if (!rc && do_sync)
2458                 mdt_device_sync(env, mdt);
2459         mdt_unlock_list(info, &link_locks, do_sync ? 1 : rc);
2460 put_source:
2461         mdt_object_put(env, sobj);
2462         mdt_object_put(env, spobj);
2463 unlock_parent:
2464         mdt_migrate_object_unlock(info, pobj, lhp, peinfo,
2465                                   &parent_slave_locks, rc);
2466 put_parent:
2467         mdt_object_put(env, pobj);
2468 unlock_rename:
2469         if (lustre_handle_is_used(&rename_lh))
2470                 mdt_rename_unlock(&rename_lh);
2471
2472         return rc;
2473 }
2474
2475 static int mdt_object_lock_save(struct mdt_thread_info *info,
2476                                 struct mdt_object *dir,
2477                                 struct mdt_lock_handle *lh,
2478                                 int idx, bool cos_incompat)
2479 {
2480         int rc;
2481
2482         /* we lock the target dir if it is local */
2483         rc = mdt_reint_object_lock(info, dir, lh, MDS_INODELOCK_UPDATE,
2484                                    cos_incompat);
2485         if (rc != 0)
2486                 return rc;
2487
2488         /* get and save correct version after locking */
2489         mdt_version_get_save(info, dir, idx);
2490         return 0;
2491 }
2492
2493 /*
2494  * determine lock order of sobj and tobj
2495  *
2496  * there are two situations we need to lock tobj before sobj:
2497  * 1. sobj is child of tobj
2498  * 2. sobj and tobj are stripes of a directory, and stripe index of sobj is
2499  *    larger than that of tobj
2500  *
2501  * \retval      1 lock tobj before sobj
2502  * \retval      0 lock sobj before tobj
2503  * \retval      -ev negative errno upon error
2504  */
2505 static int mdt_rename_determine_lock_order(struct mdt_thread_info *info,
2506                                            struct mdt_object *sobj,
2507                                            struct mdt_object *tobj)
2508 {
2509         struct md_attr *ma = &info->mti_attr;
2510         struct lu_fid *spfid = &info->mti_tmp_fid1;
2511         struct lu_fid *tpfid = &info->mti_tmp_fid2;
2512         struct lmv_mds_md_v1 *lmv;
2513         __u32 sindex;
2514         __u32 tindex;
2515         int rc;
2516
2517         /* sobj and tobj are the same */
2518         if (sobj == tobj)
2519                 return 0;
2520
2521         if (fid_is_root(mdt_object_fid(sobj)))
2522                 return 0;
2523
2524         if (fid_is_root(mdt_object_fid(tobj)))
2525                 return 1;
2526
2527         /* check whether sobj is child of tobj */
2528         rc = mdo_is_subdir(info->mti_env, mdt_object_child(sobj),
2529                            mdt_object_fid(tobj));
2530         if (rc < 0)
2531                 return rc;
2532
2533         if (rc == 1)
2534                 return 1;
2535
2536         /* check whether sobj and tobj are children of the same parent */
2537         rc = mdt_attr_get_pfid(info, sobj, spfid);
2538         if (rc)
2539                 return rc;
2540
2541         rc = mdt_attr_get_pfid(info, tobj, tpfid);
2542         if (rc)
2543                 return rc;
2544
2545         if (!lu_fid_eq(spfid, tpfid))
2546                 return 0;
2547
2548         /* check whether sobj and tobj are sibling stripes */
2549         rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2550         if (rc)
2551                 return rc;
2552
2553         if (!(ma->ma_valid & MA_LMV))
2554                 return 0;
2555
2556         lmv = &ma->ma_lmv->lmv_md_v1;
2557         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2558                 return 0;
2559         sindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2560
2561         ma->ma_valid = 0;
2562         rc = mdt_stripe_get(info, tobj, ma, XATTR_NAME_LMV);
2563         if (rc)
2564                 return rc;
2565
2566         if (!(ma->ma_valid & MA_LMV))
2567                 return -ENODATA;
2568
2569         lmv = &ma->ma_lmv->lmv_md_v1;
2570         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2571                 return -EINVAL;
2572         tindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2573
2574         /* check stripe index of sobj and tobj */
2575         if (sindex == tindex)
2576                 return -EINVAL;
2577
2578         return sindex < tindex ? 0 : 1;
2579 }
2580
2581 /*
2582  * lock rename source object.
2583  *
2584  * Both source and source parent may be remote, and source may be a remote
2585  * object on source parent, to avoid overriding lock handle, store remote
2586  * LOOKUP lock separately in @lhr.
2587  *
2588  * \retval      0 on success
2589  * \retval      -ev negative errno upon error
2590  */
2591 static int mdt_rename_source_lock(struct mdt_thread_info *info,
2592                                   struct mdt_object *parent,
2593                                   struct mdt_object *child,
2594                                   struct mdt_lock_handle *lhc,
2595                                   struct mdt_lock_handle *lhr,
2596                                   __u64 ibits,
2597                                   bool cos_incompat)
2598 {
2599         int rc;
2600
2601         rc = mdt_is_remote_object(info, parent, child);
2602         if (rc < 0)
2603                 return rc;
2604
2605         if (rc) {
2606                 /* enqueue remote LOOKUP lock from the parent MDT */
2607                 __u64 rmt_ibits = MDS_INODELOCK_LOOKUP;
2608
2609                 if (mdt_object_remote(parent)) {
2610                         rc = mdt_remote_object_lock(info, parent,
2611                                                     mdt_object_fid(child),
2612                                                     &lhr->mlh_rreg_lh,
2613                                                     lhr->mlh_rreg_mode,
2614                                                     rmt_ibits, false);
2615                         if (rc != ELDLM_OK)
2616                                 return rc;
2617                 } else {
2618                         LASSERT(mdt_object_remote(child));
2619                         rc = mdt_object_local_lock(info, child, lhr,
2620                                                    &rmt_ibits, 0, true);
2621                         if (rc < 0)
2622                                 return rc;
2623                 }
2624
2625                 ibits &= ~MDS_INODELOCK_LOOKUP;
2626         }
2627
2628         if (mdt_object_remote(child)) {
2629                 rc = mdt_remote_object_lock(info, child, mdt_object_fid(child),
2630                                             &lhc->mlh_rreg_lh,
2631                                             lhc->mlh_rreg_mode,
2632                                             ibits, false);
2633                 if (rc == ELDLM_OK)
2634                         rc = 0;
2635         } else {
2636                 rc = mdt_reint_object_lock(info, child, lhc, ibits,
2637                                            cos_incompat);
2638         }
2639
2640         if (!rc)
2641                 mdt_object_unlock(info, child, lhr, rc);
2642
2643         return rc;
2644 }
2645
2646 /* Helper function for mdt_reint_rename so we don't need to opencode
2647  * two different order lockings
2648  */
2649 static int mdt_lock_two_dirs(struct mdt_thread_info *info,
2650                              struct mdt_object *mfirstdir,
2651                              struct mdt_lock_handle *lh_firstdirp,
2652                              struct mdt_object *mseconddir,
2653                              struct mdt_lock_handle *lh_seconddirp,
2654                              bool cos_incompat)
2655 {
2656         int rc;
2657
2658         rc = mdt_object_lock_save(info, mfirstdir, lh_firstdirp, 0,
2659                                   cos_incompat);
2660         if (rc)
2661                 return rc;
2662
2663         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
2664
2665         if (mfirstdir != mseconddir) {
2666                 rc = mdt_object_lock_save(info, mseconddir, lh_seconddirp, 1,
2667                                           cos_incompat);
2668         } else if (!mdt_object_remote(mseconddir) &&
2669                    lh_firstdirp->mlh_pdo_hash !=
2670                    lh_seconddirp->mlh_pdo_hash) {
2671                 rc = mdt_pdir_hash_lock(info, lh_seconddirp, mseconddir,
2672                                         MDS_INODELOCK_UPDATE,
2673                                         cos_incompat);
2674                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
2675         }
2676
2677         if (rc != 0)
2678                 mdt_object_unlock(info, mfirstdir, lh_firstdirp, rc);
2679
2680         return rc;
2681 }
2682
2683 /*
2684  * VBR: rename versions in reply: 0 - srcdir parent; 1 - tgtdir parent;
2685  * 2 - srcdir child; 3 - tgtdir child.
2686  * Update on disk version of srcdir child.
2687  */
2688 static int mdt_reint_rename(struct mdt_thread_info *info,
2689                             struct mdt_lock_handle *unused)
2690 {
2691         struct mdt_device *mdt = info->mti_mdt;
2692         struct mdt_reint_record *rr = &info->mti_rr;
2693         struct md_attr *ma = &info->mti_attr;
2694         struct ptlrpc_request *req = mdt_info_req(info);
2695         struct mdt_object *msrcdir = NULL;
2696         struct mdt_object *mtgtdir = NULL;
2697         struct mdt_object *mold;
2698         struct mdt_object *mnew = NULL;
2699         struct lustre_handle rename_lh = { 0 };
2700         struct mdt_lock_handle *lh_srcdirp;
2701         struct mdt_lock_handle *lh_tgtdirp;
2702         struct mdt_lock_handle *lh_oldp = NULL;
2703         struct mdt_lock_handle *lh_rmt = NULL;
2704         struct mdt_lock_handle *lh_newp = NULL;
2705         struct lu_fid *old_fid = &info->mti_tmp_fid1;
2706         struct lu_fid *new_fid = &info->mti_tmp_fid2;
2707         __u64 lock_ibits;
2708         bool reverse = false, discard = false;
2709         bool cos_incompat;
2710         ktime_t kstart = ktime_get();
2711         enum mdt_stat_idx msi = 0;
2712         int rc;
2713
2714         ENTRY;
2715         DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME,
2716                   PFID(rr->rr_fid1), PNAME(&rr->rr_name),
2717                   PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name));
2718
2719         if (info->mti_dlm_req)
2720                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2721
2722         if (!fid_is_md_operative(rr->rr_fid1) ||
2723             !fid_is_md_operative(rr->rr_fid2))
2724                 RETURN(-EPERM);
2725
2726         /* find both parents. */
2727         msrcdir = mdt_parent_find_check(info, rr->rr_fid1, 0);
2728         if (IS_ERR(msrcdir))
2729                 RETURN(PTR_ERR(msrcdir));
2730
2731         rc = mdt_check_enc(info, msrcdir);
2732         if (rc)
2733                 GOTO(out_put_srcdir, rc);
2734
2735         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
2736
2737         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
2738                 mtgtdir = msrcdir;
2739                 mdt_object_get(info->mti_env, mtgtdir);
2740         } else {
2741                 mtgtdir = mdt_parent_find_check(info, rr->rr_fid2, 1);
2742                 if (IS_ERR(mtgtdir))
2743                         GOTO(out_put_srcdir, rc = PTR_ERR(mtgtdir));
2744         }
2745
2746         rc = mdt_check_enc(info, mtgtdir);
2747         if (rc)
2748                 GOTO(out_put_tgtdir, rc);
2749
2750         /*
2751          * Note: do not enqueue rename lock for replay request, because
2752          * if other MDT holds rename lock, but being blocked to wait for
2753          * this MDT to finish its recovery, and the failover MDT can not
2754          * get rename lock, which will cause deadlock.
2755          */
2756         if (!req_is_replay(req)) {
2757                 bool remote = mdt_object_remote(msrcdir);
2758
2759                 /*
2760                  * Normally rename RPC is handled on the MDT with the target
2761                  * directory (if target exists, it's on the MDT with the
2762                  * target), if the source directory is remote, it's a hint that
2763                  * source is remote too (this may not be true, but it won't
2764                  * cause any issue), return -EXDEV early to avoid taking
2765                  * rename_lock.
2766                  */
2767                 if (!mdt->mdt_enable_remote_rename && remote)
2768                         GOTO(out_put_tgtdir, rc = -EXDEV);
2769
2770                 /* This might be further relaxed in the future for regular file
2771                  * renames in different source and target parents. Start with
2772                  * only same-directory renames for simplicity and because this
2773                  * is by far the most the common use case.
2774                  *
2775                  * Striped directories should be considered "remote".
2776                  */
2777                 if (msrcdir != mtgtdir || remote ||
2778                     (S_ISDIR(ma->ma_attr.la_mode) &&
2779                      !mdt->mdt_enable_parallel_rename_dir) ||
2780                     (!S_ISDIR(ma->ma_attr.la_mode) &&
2781                      !mdt->mdt_enable_parallel_rename_file)) {
2782                         rc = mdt_rename_lock(info, &rename_lh);
2783                         if (rc != 0) {
2784                                 CERROR("%s: cannot lock for rename: rc = %d\n",
2785                                        mdt_obd_name(mdt), rc);
2786                                 GOTO(out_put_tgtdir, rc);
2787                         }
2788                 } else {
2789                         if (S_ISDIR(ma->ma_attr.la_mode))
2790                                 msi = LPROC_MDT_RENAME_PAR_DIR;
2791                         else
2792                                 msi = LPROC_MDT_RENAME_PAR_FILE;
2793
2794                         CDEBUG(D_INFO,
2795                                "%s: samedir parallel rename "DFID"/"DNAME"\n",
2796                                mdt_obd_name(mdt), PFID(rr->rr_fid1),
2797                                PNAME(&rr->rr_name));
2798                 }
2799         }
2800
2801         rc = mdt_rename_determine_lock_order(info, msrcdir, mtgtdir);
2802         if (rc < 0)
2803                 GOTO(out_unlock_rename, rc);
2804         reverse = rc;
2805
2806         /* source needs to be looked up after locking source parent, otherwise
2807          * this rename may race with unlink source, and cause rename hang, see
2808          * sanityn.sh 55b, so check parents first, if later we found source is
2809          * remote, relock parents.
2810          */
2811         cos_incompat = (mdt_object_remote(msrcdir) ||
2812                         mdt_object_remote(mtgtdir));
2813
2814         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2815
2816         /* lock parents in the proper order. */
2817         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
2818         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
2819
2820         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN);
2821         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN2);
2822 relock:
2823         mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name);
2824         mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name);
2825
2826         /* In case of same dir local rename we must sort by the hash,
2827          * otherwise a lock deadlock is possible when renaming
2828          * a to b and b to a at the same time LU-15285
2829          */
2830         if (!mdt_object_remote(mtgtdir) && mtgtdir == msrcdir)
2831                 reverse = lh_srcdirp->mlh_pdo_hash > lh_tgtdirp->mlh_pdo_hash;
2832         if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK)))
2833                 reverse = 0;
2834
2835         if (reverse)
2836                 rc = mdt_lock_two_dirs(info, mtgtdir, lh_tgtdirp, msrcdir,
2837                                        lh_srcdirp, cos_incompat);
2838         else
2839                 rc = mdt_lock_two_dirs(info, msrcdir, lh_srcdirp, mtgtdir,
2840                                        lh_tgtdirp, cos_incompat);
2841
2842         if (rc != 0)
2843                 GOTO(out_unlock_rename, rc);
2844
2845         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2846         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME2, 5);
2847
2848         /* find mold object. */
2849         fid_zero(old_fid);
2850         rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2);
2851         if (rc != 0)
2852                 GOTO(out_unlock_parents, rc);
2853
2854         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
2855                 GOTO(out_unlock_parents, rc = -EINVAL);
2856
2857         if (!fid_is_md_operative(old_fid))
2858                 GOTO(out_unlock_parents, rc = -EPERM);
2859
2860         mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
2861         if (IS_ERR(mold))
2862                 GOTO(out_unlock_parents, rc = PTR_ERR(mold));
2863
2864         if (!mdt_object_exists(mold)) {
2865                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2866                                 &mold->mot_obj,
2867                                 "object does not exist");
2868                 GOTO(out_put_old, rc = -ENOENT);
2869         }
2870
2871         if (mdt_object_remote(mold) && !mdt->mdt_enable_remote_rename)
2872                 GOTO(out_put_old, rc = -EXDEV);
2873
2874         /* Check if @mtgtdir is subdir of @mold, before locking child
2875          * to avoid reverse locking.
2876          */
2877         if (mtgtdir != msrcdir) {
2878                 rc = mdo_is_subdir(info->mti_env, mdt_object_child(mtgtdir),
2879                                    old_fid);
2880                 if (rc) {
2881                         if (rc == 1)
2882                                 rc = -EINVAL;
2883                         GOTO(out_put_old, rc);
2884                 }
2885         }
2886
2887         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold));
2888         /* save version after locking */
2889         mdt_version_get_save(info, mold, 2);
2890
2891         if (!cos_incompat && mdt_object_remote(mold)) {
2892                 cos_incompat = true;
2893                 mdt_object_put(info->mti_env, mold);
2894                 mdt_object_unlock(info, mtgtdir, lh_tgtdirp, -EAGAIN);
2895                 mdt_object_unlock(info, msrcdir, lh_srcdirp, -EAGAIN);
2896                 goto relock;
2897         }
2898
2899         /* find mnew object:
2900          * mnew target object may not exist now
2901          * lookup with version checking
2902          */
2903         fid_zero(new_fid);
2904         rc = mdt_lookup_version_check(info, mtgtdir, &rr->rr_tgt_name, new_fid,
2905                                       3);
2906         if (rc == 0) {
2907                 /* the new_fid should have been filled at this moment */
2908                 if (lu_fid_eq(old_fid, new_fid))
2909                         GOTO(out_put_old, rc);
2910
2911                 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
2912                     lu_fid_eq(new_fid, rr->rr_fid2))
2913                         GOTO(out_put_old, rc = -EINVAL);
2914
2915                 if (!fid_is_md_operative(new_fid))
2916                         GOTO(out_put_old, rc = -EPERM);
2917
2918                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
2919                 if (IS_ERR(mnew))
2920                         GOTO(out_put_old, rc = PTR_ERR(mnew));
2921
2922                 if (!mdt_object_exists(mnew)) {
2923                         LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2924                                         &mnew->mot_obj,
2925                                         "object does not exist");
2926                         GOTO(out_put_new, rc = -ENOENT);
2927                 }
2928
2929                 if (mdt_object_remote(mnew)) {
2930                         struct mdt_body  *repbody;
2931
2932                         /* Always send rename req to the target child MDT */
2933                         repbody = req_capsule_server_get(info->mti_pill,
2934                                                          &RMF_MDT_BODY);
2935                         LASSERT(repbody != NULL);
2936                         repbody->mbo_fid1 = *new_fid;
2937                         repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
2938                         GOTO(out_put_new, rc = -EXDEV);
2939                 }
2940                 /* Before locking the target dir, check we do not replace
2941                  * a dir with a non-dir, otherwise it may deadlock with
2942                  * link op which tries to create a link in this dir
2943                  * back to this non-dir.
2944                  */
2945                 if (S_ISDIR(lu_object_attr(&mnew->mot_obj)) &&
2946                     !S_ISDIR(lu_object_attr(&mold->mot_obj)))
2947                         GOTO(out_put_new, rc = -EISDIR);
2948
2949                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2950                 lh_rmt = &info->mti_lh[MDT_LH_RMT];
2951                 mdt_lock_reg_init(lh_oldp, LCK_EX);
2952                 mdt_lock_reg_init(lh_rmt, LCK_EX);
2953                 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
2954                 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2955                                             lh_rmt, lock_ibits, cos_incompat);
2956                 if (rc < 0)
2957                         GOTO(out_put_new, rc);
2958
2959                 /* Check if @msrcdir is subdir of @mnew, before locking child
2960                  * to avoid reverse locking.
2961                  */
2962                 if (mtgtdir != msrcdir) {
2963                         rc = mdo_is_subdir(info->mti_env,
2964                                            mdt_object_child(msrcdir), new_fid);
2965                         if (rc) {
2966                                 if (rc == 1)
2967                                         rc = -EINVAL;
2968                                 GOTO(out_unlock_old, rc);
2969                         }
2970                 }
2971
2972                 /* We used to acquire MDS_INODELOCK_FULL here but we
2973                  * can't do this now because a running HSM restore on
2974                  * the rename onto victim will hold the layout
2975                  * lock. See LU-4002.
2976                  */
2977
2978                 lh_newp = &info->mti_lh[MDT_LH_NEW];
2979                 mdt_lock_reg_init(lh_newp, LCK_EX);
2980                 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
2981                 if (mdt_object_remote(mtgtdir)) {
2982                         rc = mdt_remote_object_lock(info, mtgtdir,
2983                                                     mdt_object_fid(mnew),
2984                                                     &lh_newp->mlh_rreg_lh,
2985                                                     lh_newp->mlh_rreg_mode,
2986                                                     MDS_INODELOCK_LOOKUP,
2987                                                     false);
2988                         if (rc != ELDLM_OK)
2989                                 GOTO(out_unlock_old, rc);
2990
2991                         lock_ibits &= ~MDS_INODELOCK_LOOKUP;
2992                 }
2993                 rc = mdt_reint_object_lock(info, mnew, lh_newp, lock_ibits,
2994                                            cos_incompat);
2995                 if (rc != 0)
2996                         GOTO(out_unlock_new, rc);
2997
2998                 /* get and save version after locking */
2999                 mdt_version_get_save(info, mnew, 3);
3000         } else if (rc != -ENOENT) {
3001                 GOTO(out_put_old, rc);
3002         } else {
3003                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
3004                 lh_rmt = &info->mti_lh[MDT_LH_RMT];
3005                 mdt_lock_reg_init(lh_oldp, LCK_EX);
3006                 mdt_lock_reg_init(lh_rmt, LCK_EX);
3007                 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
3008                 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
3009                                             lh_rmt, lock_ibits, cos_incompat);
3010                 if (rc != 0)
3011                         GOTO(out_put_old, rc);
3012
3013                 mdt_enoent_version_save(info, 3);
3014         }
3015
3016         /* step 5: rename it */
3017         mdt_reint_init_ma(info, ma);
3018
3019         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
3020                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
3021
3022         if (mnew != NULL)
3023                 mutex_lock(&mnew->mot_lov_mutex);
3024
3025         rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
3026                         mdt_object_child(mtgtdir), old_fid, &rr->rr_name,
3027                         mnew != NULL ? mdt_object_child(mnew) : NULL,
3028                         &rr->rr_tgt_name, ma);
3029
3030         if (mnew != NULL)
3031                 mutex_unlock(&mnew->mot_lov_mutex);
3032
3033         /* handle last link of tgt object */
3034         if (rc == 0) {
3035                 if (mnew) {
3036                         mdt_handle_last_unlink(info, mnew, ma);
3037                         discard = mdt_dom_check_for_discard(info, mnew);
3038                 }
3039                 mdt_rename_counter_tally(info, info->mti_mdt, req,
3040                                          msrcdir, mtgtdir, msi,
3041                                          ktime_us_delta(ktime_get(), kstart));
3042         }
3043
3044         EXIT;
3045 out_unlock_new:
3046         if (mnew != NULL)
3047                 mdt_object_unlock(info, mnew, lh_newp, rc);
3048 out_unlock_old:
3049         mdt_object_unlock(info, NULL, lh_rmt, rc);
3050         mdt_object_unlock(info, mold, lh_oldp, rc);
3051 out_put_new:
3052         if (mnew && !discard)
3053                 mdt_object_put(info->mti_env, mnew);
3054 out_put_old:
3055         mdt_object_put(info->mti_env, mold);
3056 out_unlock_parents:
3057         mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
3058         mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
3059 out_unlock_rename:
3060         if (lustre_handle_is_used(&rename_lh))
3061                 mdt_rename_unlock(&rename_lh);
3062 out_put_tgtdir:
3063         mdt_object_put(info->mti_env, mtgtdir);
3064 out_put_srcdir:
3065         mdt_object_put(info->mti_env, msrcdir);
3066
3067         /* The DoM discard can be done right in the place above where it is
3068          * assigned, meanwhile it is done here after rename unlock due to
3069          * compatibility with old clients, for them the discard blocks
3070          * the main thread until completion. Check LU-11359 for details.
3071          */
3072         if (discard) {
3073                 mdt_dom_discard_data(info, mnew);
3074                 mdt_object_put(info->mti_env, mnew);
3075         }
3076         OBD_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
3077         return rc;
3078 }
3079
3080 static int mdt_reint_resync(struct mdt_thread_info *info,
3081                             struct mdt_lock_handle *lhc)
3082 {
3083         struct mdt_reint_record *rr = &info->mti_rr;
3084         struct ptlrpc_request *req = mdt_info_req(info);
3085         struct md_attr *ma = &info->mti_attr;
3086         struct mdt_object *mo;
3087         struct ldlm_lock *lease;
3088         struct mdt_body *repbody;
3089         struct md_layout_change layout = { .mlc_mirror_id = rr->rr_mirror_id };
3090         bool lease_broken;
3091         int rc;
3092
3093         ENTRY;
3094         DEBUG_REQ(D_INODE, req, DFID", FLR file resync", PFID(rr->rr_fid1));
3095
3096         if (info->mti_dlm_req)
3097                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
3098
3099         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
3100         if (IS_ERR(mo))
3101                 GOTO(out, rc = PTR_ERR(mo));
3102
3103         if (!mdt_object_exists(mo))
3104                 GOTO(out_obj, rc = -ENOENT);
3105
3106         if (!S_ISREG(lu_object_attr(&mo->mot_obj)))
3107                 GOTO(out_obj, rc = -EINVAL);
3108
3109         if (mdt_object_remote(mo))
3110                 GOTO(out_obj, rc = -EREMOTE);
3111
3112         lease = ldlm_handle2lock(rr->rr_lease_handle);
3113         if (lease == NULL)
3114                 GOTO(out_obj, rc = -ESTALE);
3115
3116         /* It's really necessary to grab open_sem and check if the lease lock
3117          * has been lost. There would exist a concurrent writer coming in and
3118          * generating some dirty data in memory cache, the writeback would fail
3119          * after the layout version is increased by MDS_REINT_RESYNC RPC.
3120          */
3121         if (!down_write_trylock(&mo->mot_open_sem))
3122                 GOTO(out_put_lease, rc = -EBUSY);
3123
3124         lock_res_and_lock(lease);
3125         lease_broken = ldlm_is_cancel(lease);
3126         unlock_res_and_lock(lease);
3127         if (lease_broken)
3128                 GOTO(out_unlock, rc = -EBUSY);
3129
3130         /* the file has yet opened by anyone else after we took the lease. */
3131         layout.mlc_opc = MD_LAYOUT_RESYNC;
3132         lhc = &info->mti_lh[MDT_LH_LOCAL];
3133         rc = mdt_layout_change(info, mo, lhc, &layout);
3134         if (rc)
3135                 GOTO(out_unlock, rc);
3136
3137         mdt_object_unlock(info, mo, lhc, 0);
3138
3139         ma->ma_need = MA_INODE;
3140         ma->ma_valid = 0;
3141         rc = mdt_attr_get_complex(info, mo, ma);
3142         if (rc != 0)
3143                 GOTO(out_unlock, rc);
3144
3145         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
3146         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
3147
3148         EXIT;
3149 out_unlock:
3150         up_write(&mo->mot_open_sem);
3151 out_put_lease:
3152         LDLM_LOCK_PUT(lease);
3153 out_obj:
3154         mdt_object_put(info->mti_env, mo);
3155 out:
3156         mdt_client_compatibility(info);
3157         return rc;
3158 }
3159
3160 struct mdt_reinter {
3161         int (*mr_handler)(struct mdt_thread_info *, struct mdt_lock_handle *);
3162         enum lprocfs_extra_opc mr_extra_opc;
3163 };
3164
3165 static const struct mdt_reinter mdt_reinters[] = {
3166         [REINT_SETATTR] = {
3167                 .mr_handler = &mdt_reint_setattr,
3168                 .mr_extra_opc = MDS_REINT_SETATTR,
3169         },
3170         [REINT_CREATE] = {
3171                 .mr_handler = &mdt_reint_create,
3172                 .mr_extra_opc = MDS_REINT_CREATE,
3173         },
3174         [REINT_LINK] = {
3175                 .mr_handler = &mdt_reint_link,
3176                 .mr_extra_opc = MDS_REINT_LINK,
3177         },
3178         [REINT_UNLINK] = {
3179                 .mr_handler = &mdt_reint_unlink,
3180                 .mr_extra_opc = MDS_REINT_UNLINK,
3181         },
3182         [REINT_RENAME] = {
3183                 .mr_handler = &mdt_reint_rename,
3184                 .mr_extra_opc = MDS_REINT_RENAME,
3185         },
3186         [REINT_OPEN] = {
3187                 .mr_handler = &mdt_reint_open,
3188                 .mr_extra_opc = MDS_REINT_OPEN,
3189         },
3190         [REINT_SETXATTR] = {
3191                 .mr_handler = &mdt_reint_setxattr,
3192                 .mr_extra_opc = MDS_REINT_SETXATTR,
3193         },
3194         [REINT_RMENTRY] = {
3195                 .mr_handler = &mdt_reint_unlink,
3196                 .mr_extra_opc = MDS_REINT_UNLINK,
3197         },
3198         [REINT_MIGRATE] = {
3199                 .mr_handler = &mdt_reint_migrate,
3200                 .mr_extra_opc = MDS_REINT_RENAME,
3201         },
3202         [REINT_RESYNC] = {
3203                 .mr_handler = &mdt_reint_resync,
3204                 .mr_extra_opc = MDS_REINT_RESYNC,
3205         },
3206 };
3207
3208 int mdt_reint_rec(struct mdt_thread_info *info,
3209                   struct mdt_lock_handle *lhc)
3210 {
3211         const struct mdt_reinter *mr;
3212         int rc;
3213
3214         ENTRY;
3215         if (!(info->mti_rr.rr_opcode < ARRAY_SIZE(mdt_reinters)))
3216                 RETURN(-EPROTO);
3217
3218         mr = &mdt_reinters[info->mti_rr.rr_opcode];
3219         if (mr->mr_handler == NULL)
3220                 RETURN(-EPROTO);
3221
3222         rc = (*mr->mr_handler)(info, lhc);
3223
3224         lprocfs_counter_incr(ptlrpc_req2svc(mdt_info_req(info))->srv_stats,
3225                              PTLRPC_LAST_CNTR + mr->mr_extra_opc);
3226
3227         RETURN(rc);
3228 }