Whamcloud - gitweb
LU-15720 dne: add crush2 hash type
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mdt/mdt_reint.c
32  *
33  * Lustre Metadata Target (mdt) reintegration routines
34  *
35  * Author: Peter Braam <braam@clusterfs.com>
36  * Author: Andreas Dilger <adilger@clusterfs.com>
37  * Author: Phil Schwan <phil@clusterfs.com>
38  * Author: Huang Hua <huanghua@clusterfs.com>
39  * Author: Yury Umanets <umka@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_MDS
43
44 #include <lprocfs_status.h>
45 #include "mdt_internal.h"
46 #include <lustre_lmv.h>
47 #include <lustre_crypto.h>
48
49 static inline void mdt_reint_init_ma(struct mdt_thread_info *info,
50                                      struct md_attr *ma)
51 {
52         ma->ma_need = MA_INODE;
53         ma->ma_valid = 0;
54 }
55
56 /**
57  * Get version of object by fid.
58  *
59  * Return real version or ENOENT_VERSION if object doesn't exist
60  */
61 static void mdt_obj_version_get(struct mdt_thread_info *info,
62                                 struct mdt_object *o, __u64 *version)
63 {
64         LASSERT(o);
65
66         if (mdt_object_exists(o) && !mdt_object_remote(o) &&
67             !fid_is_obf(mdt_object_fid(o)))
68                 *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
69         else
70                 *version = ENOENT_VERSION;
71         CDEBUG(D_INODE, "FID "DFID" version is %#llx\n",
72                PFID(mdt_object_fid(o)), *version);
73 }
74
75 /**
76  * Check version is correct.
77  *
78  * Should be called only during replay.
79  */
80 static int mdt_version_check(struct ptlrpc_request *req,
81                              __u64 version, int idx)
82 {
83         __u64 *pre_ver = lustre_msg_get_versions(req->rq_reqmsg);
84
85         ENTRY;
86         if (!exp_connect_vbr(req->rq_export))
87                 RETURN(0);
88
89         LASSERT(req_is_replay(req));
90         /** VBR: version is checked always because costs nothing */
91         LASSERT(idx < PTLRPC_NUM_VERSIONS);
92         /** Sanity check for malformed buffers */
93         if (pre_ver == NULL) {
94                 CERROR("No versions in request buffer\n");
95                 spin_lock(&req->rq_export->exp_lock);
96                 req->rq_export->exp_vbr_failed = 1;
97                 spin_unlock(&req->rq_export->exp_lock);
98                 RETURN(-EOVERFLOW);
99         } else if (pre_ver[idx] != version) {
100                 CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n",
101                        pre_ver[idx], version);
102                 spin_lock(&req->rq_export->exp_lock);
103                 req->rq_export->exp_vbr_failed = 1;
104                 spin_unlock(&req->rq_export->exp_lock);
105                 RETURN(-EOVERFLOW);
106         }
107         RETURN(0);
108 }
109
110 /**
111  * Save pre-versions in reply.
112  */
113 static void mdt_version_save(struct ptlrpc_request *req, __u64 version,
114                              int idx)
115 {
116         __u64 *reply_ver;
117
118         if (!exp_connect_vbr(req->rq_export))
119                 return;
120
121         LASSERT(!req_is_replay(req));
122         LASSERT(req->rq_repmsg != NULL);
123         reply_ver = lustre_msg_get_versions(req->rq_repmsg);
124         if (reply_ver)
125                 reply_ver[idx] = version;
126 }
127
128 /**
129  * Save enoent version, it is needed when it is obvious that object doesn't
130  * exist, e.g. child during create.
131  */
132 static void mdt_enoent_version_save(struct mdt_thread_info *info, int idx)
133 {
134         /* save version of file name for replay, it must be ENOENT here */
135         if (!req_is_replay(mdt_info_req(info))) {
136                 info->mti_ver[idx] = ENOENT_VERSION;
137                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
138         }
139 }
140
141 /**
142  * Get version from disk and save in reply buffer.
143  *
144  * Versions are saved in reply only during normal operations not replays.
145  */
146 void mdt_version_get_save(struct mdt_thread_info *info,
147                           struct mdt_object *mto, int idx)
148 {
149         /* don't save versions during replay */
150         if (!req_is_replay(mdt_info_req(info))) {
151                 mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
152                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
153         }
154 }
155
156 /**
157  * Get version from disk and check it, no save in reply.
158  */
159 int mdt_version_get_check(struct mdt_thread_info *info,
160                           struct mdt_object *mto, int idx)
161 {
162         /* only check versions during replay */
163         if (!req_is_replay(mdt_info_req(info)))
164                 return 0;
165
166         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
167         return mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
168 }
169
170 /**
171  * Get version from disk and check if recovery or just save.
172  */
173 int mdt_version_get_check_save(struct mdt_thread_info *info,
174                                struct mdt_object *mto, int idx)
175 {
176         int rc = 0;
177
178         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
179         if (req_is_replay(mdt_info_req(info)))
180                 rc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx],
181                                        idx);
182         else
183                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
184         return rc;
185 }
186
187 /**
188  * Lookup with version checking.
189  *
190  * This checks version of 'name'. Many reint functions uses 'name' for child not
191  * FID, therefore we need to get object by name and check its version.
192  */
193 int mdt_lookup_version_check(struct mdt_thread_info *info,
194                              struct mdt_object *p,
195                              const struct lu_name *lname,
196                              struct lu_fid *fid, int idx)
197 {
198         int rc, vbrc;
199
200         rc = mdo_lookup(info->mti_env, mdt_object_child(p), lname, fid,
201                         &info->mti_spec);
202         /* Check version only during replay */
203         if (!req_is_replay(mdt_info_req(info)))
204                 return rc;
205
206         info->mti_ver[idx] = ENOENT_VERSION;
207         if (rc == 0) {
208                 struct mdt_object *child;
209
210                 child = mdt_object_find(info->mti_env, info->mti_mdt, fid);
211                 if (likely(!IS_ERR(child))) {
212                         mdt_obj_version_get(info, child, &info->mti_ver[idx]);
213                         mdt_object_put(info->mti_env, child);
214                 }
215         }
216         vbrc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
217         return vbrc ? vbrc : rc;
218
219 }
220
221 static int mdt_unlock_slaves(struct mdt_thread_info *mti,
222                              struct mdt_object *obj,
223                              struct ldlm_enqueue_info *einfo,
224                              int decref)
225 {
226         union ldlm_policy_data *policy = &mti->mti_policy;
227         struct mdt_lock_handle *lh = &mti->mti_lh[MDT_LH_LOCAL];
228         struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
229         int i;
230
231         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
232         LASSERT(slave_locks);
233
234         memset(policy, 0, sizeof(*policy));
235         policy->l_inodebits.bits = einfo->ei_inodebits;
236         mdt_lock_handle_init(lh);
237         mdt_lock_reg_init(lh, einfo->ei_mode);
238         for (i = 0; i < slave_locks->ha_count; i++) {
239                 if (test_bit(i, (void *)slave_locks->ha_map))
240                         lh->mlh_rreg_lh = slave_locks->ha_handles[i];
241                 else
242                         lh->mlh_reg_lh = slave_locks->ha_handles[i];
243                 mdt_object_unlock(mti, NULL, lh, decref);
244                 slave_locks->ha_handles[i].cookie = 0ull;
245         }
246
247         return mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
248                                 policy);
249 }
250
251 static inline int mdt_object_striped(struct mdt_thread_info *mti,
252                                      struct mdt_object *obj)
253 {
254         struct lu_device *bottom_dev;
255         struct lu_object *bottom_obj;
256         int rc;
257
258         if (!S_ISDIR(obj->mot_header.loh_attr))
259                 return 0;
260
261         /* getxattr from bottom obj to avoid reading in shard FIDs */
262         bottom_dev = dt2lu_dev(mti->mti_mdt->mdt_bottom);
263         bottom_obj = lu_object_find_slice(mti->mti_env, bottom_dev,
264                                           mdt_object_fid(obj), NULL);
265         if (IS_ERR(bottom_obj))
266                 return PTR_ERR(bottom_obj);
267
268         rc = dt_xattr_get(mti->mti_env, lu2dt(bottom_obj), &LU_BUF_NULL,
269                           XATTR_NAME_LMV);
270         lu_object_put(mti->mti_env, bottom_obj);
271
272         return (rc > 0) ? 1 : (rc == -ENODATA) ? 0 : rc;
273 }
274
275 /**
276  * Lock slave stripes if necessary, the lock handles of slave stripes
277  * will be stored in einfo->ei_cbdata.
278  **/
279 static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
280                            enum ldlm_mode mode, __u64 ibits,
281                            struct ldlm_enqueue_info *einfo)
282 {
283         union ldlm_policy_data *policy = &mti->mti_policy;
284
285         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
286
287         einfo->ei_type = LDLM_IBITS;
288         einfo->ei_mode = mode;
289         einfo->ei_cb_bl = mdt_remote_blocking_ast;
290         einfo->ei_cb_local_bl = mdt_blocking_ast;
291         einfo->ei_cb_cp = ldlm_completion_ast;
292         einfo->ei_enq_slave = 1;
293         einfo->ei_namespace = mti->mti_mdt->mdt_namespace;
294         einfo->ei_inodebits = ibits;
295         einfo->ei_req_slot = 1;
296         memset(policy, 0, sizeof(*policy));
297         policy->l_inodebits.bits = ibits;
298
299         return mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
300                               policy);
301 }
302
303 int mdt_reint_striped_lock(struct mdt_thread_info *info,
304                            struct mdt_object *o,
305                            struct mdt_lock_handle *lh,
306                            __u64 ibits,
307                            struct ldlm_enqueue_info *einfo,
308                            bool cos_incompat)
309 {
310         int rc;
311
312         LASSERT(!mdt_object_remote(o));
313
314         memset(einfo, 0, sizeof(*einfo));
315
316         rc = mdt_reint_object_lock(info, o, lh, ibits, cos_incompat);
317         if (rc)
318                 return rc;
319
320         rc = mdt_object_striped(info, o);
321         if (rc != 1) {
322                 if (rc < 0)
323                         mdt_object_unlock(info, o, lh, rc);
324                 return rc;
325         }
326
327         rc = mdt_lock_slaves(info, o, lh->mlh_reg_mode, ibits, einfo);
328         if (rc) {
329                 mdt_object_unlock(info, o, lh, rc);
330                 if (rc == -EIO && OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME))
331                         rc = 0;
332         }
333
334         return rc;
335 }
336
337 void mdt_reint_striped_unlock(struct mdt_thread_info *info,
338                               struct mdt_object *o,
339                               struct mdt_lock_handle *lh,
340                               struct ldlm_enqueue_info *einfo, int decref)
341 {
342         if (einfo->ei_cbdata)
343                 mdt_unlock_slaves(info, o, einfo, decref);
344         mdt_object_unlock(info, o, lh, decref);
345 }
346
347 static int mdt_restripe(struct mdt_thread_info *info,
348                         struct mdt_object *parent,
349                         const struct lu_name *lname,
350                         const struct lu_fid *tfid,
351                         struct md_op_spec *spec,
352                         struct md_attr *ma)
353 {
354         struct mdt_device *mdt = info->mti_mdt;
355         struct lu_fid *fid = &info->mti_tmp_fid2;
356         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
357         struct lmv_user_md *lum = spec->u.sp_ea.eadata;
358         struct lmv_mds_md_v1 *lmv;
359         struct mdt_object *child;
360         struct mdt_lock_handle *lhp;
361         struct mdt_lock_handle *lhc;
362         struct mdt_body *repbody;
363         int rc;
364
365         ENTRY;
366         if (!mdt->mdt_enable_dir_restripe)
367                 RETURN(-EPERM);
368
369         LASSERT(lum);
370         lum->lum_hash_type |= cpu_to_le32(LMV_HASH_FLAG_FIXED);
371
372         rc = mdt_version_get_check_save(info, parent, 0);
373         if (rc)
374                 RETURN(rc);
375
376         lhp = &info->mti_lh[MDT_LH_PARENT];
377         mdt_lock_pdo_init(lhp, LCK_PW, lname);
378         rc = mdt_reint_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
379                                    true);
380         if (rc)
381                 RETURN(rc);
382
383         rc = mdt_stripe_get(info, parent, ma, XATTR_NAME_LMV);
384         if (rc)
385                 GOTO(unlock_parent, rc);
386
387         if (ma->ma_valid & MA_LMV) {
388                 /* don't allow restripe if parent dir layout is changing */
389                 lmv = &ma->ma_lmv->lmv_md_v1;
390                 if (!lmv_is_sane2(lmv))
391                         GOTO(unlock_parent, rc = -EBADF);
392
393                 if (lmv_is_layout_changing(lmv))
394                         GOTO(unlock_parent, rc = -EBUSY);
395         }
396
397         fid_zero(fid);
398         rc = mdt_lookup_version_check(info, parent, lname, fid, 1);
399         if (rc)
400                 GOTO(unlock_parent, rc);
401
402         child = mdt_object_find(info->mti_env, mdt, fid);
403         if (IS_ERR(child))
404                 GOTO(unlock_parent, rc = PTR_ERR(child));
405
406         if (!mdt_object_exists(child))
407                 GOTO(out_child, rc = -ENOENT);
408
409         if (mdt_object_remote(child)) {
410                 struct mdt_body *repbody;
411
412                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
413                 if (!repbody)
414                         GOTO(out_child, rc = -EPROTO);
415
416                 repbody->mbo_fid1 = *fid;
417                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
418                 GOTO(out_child, rc = -EREMOTE);
419         }
420
421         if (!S_ISDIR(lu_object_attr(&child->mot_obj)))
422                 GOTO(out_child, rc = -ENOTDIR);
423
424         rc = mdt_stripe_get(info, child, ma, XATTR_NAME_LMV);
425         if (rc)
426                 GOTO(out_child, rc);
427
428         /* race with migrate? */
429         if ((ma->ma_valid & MA_LMV) &&
430              lmv_is_migrating(&ma->ma_lmv->lmv_md_v1))
431                 GOTO(out_child, rc = -EBUSY);
432
433         /* lock object */
434         lhc = &info->mti_lh[MDT_LH_CHILD];
435         mdt_lock_reg_init(lhc, LCK_EX);
436
437         /* enqueue object remote LOOKUP lock */
438         if (mdt_object_remote(parent)) {
439                 rc = mdt_remote_object_lock(info, parent, fid,
440                                             &lhc->mlh_rreg_lh,
441                                             lhc->mlh_rreg_mode,
442                                             MDS_INODELOCK_LOOKUP, false);
443                 if (rc != ELDLM_OK)
444                         GOTO(out_child, rc);
445         }
446
447         rc = mdt_reint_striped_lock(info, child, lhc, MDS_INODELOCK_FULL, einfo,
448                                     true);
449         if (rc)
450                 GOTO(unlock_child, rc);
451
452         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
453         rc = mdt_version_get_check_save(info, child, 1);
454         if (rc)
455                 GOTO(unlock_child, rc);
456
457         spin_lock(&mdt->mdt_restriper.mdr_lock);
458         if (child->mot_restriping) {
459                 /* race? */
460                 spin_unlock(&mdt->mdt_restriper.mdr_lock);
461                 GOTO(unlock_child, rc = -EBUSY);
462         }
463         child->mot_restriping = 1;
464         spin_unlock(&mdt->mdt_restriper.mdr_lock);
465
466         *fid = *tfid;
467         rc = mdt_restripe_internal(info, parent, child, lname, fid, spec, ma);
468         if (rc)
469                 GOTO(restriping_clear, rc);
470
471         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
472         if (!repbody)
473                 GOTO(restriping_clear, rc = -EPROTO);
474
475         mdt_pack_attr2body(info, repbody, &ma->ma_attr, fid);
476         EXIT;
477
478 restriping_clear:
479         child->mot_restriping = 0;
480 unlock_child:
481         mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
482 out_child:
483         mdt_object_put(info->mti_env, child);
484 unlock_parent:
485         mdt_object_unlock(info, parent, lhp, rc);
486
487         return rc;
488 }
489
490 /*
491  * VBR: we save three versions in reply:
492  * 0 - parent. Check that parent version is the same during replay.
493  * 1 - name. Version of 'name' if file exists with the same name or
494  * ENOENT_VERSION, it is needed because file may appear due to missed replays.
495  * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
496  * check.
497  */
498 static int mdt_create(struct mdt_thread_info *info)
499 {
500         struct mdt_device *mdt = info->mti_mdt;
501         struct mdt_object *parent;
502         struct mdt_object *child;
503         struct mdt_lock_handle *lh;
504         struct mdt_body *repbody;
505         struct md_attr *ma = &info->mti_attr;
506         struct mdt_reint_record *rr = &info->mti_rr;
507         struct md_op_spec *spec = &info->mti_spec;
508         bool restripe = false;
509         int rc;
510
511         ENTRY;
512         DEBUG_REQ(D_INODE, mdt_info_req(info),
513                   "Create ("DNAME"->"DFID") in "DFID,
514                   PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1));
515
516         if (!fid_is_md_operative(rr->rr_fid1))
517                 RETURN(-EPERM);
518
519         if (S_ISDIR(ma->ma_attr.la_mode) &&
520             spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
521                 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
522                 struct lu_ucred *uc = mdt_ucred(info);
523                 struct obd_export *exp = mdt_info_req(info)->rq_export;
524
525                 /* Only new clients can create remote dir( >= 2.4) and
526                  * striped dir(>= 2.6), old client will return -ENOTSUPP
527                  */
528                 if (!mdt_is_dne_client(exp))
529                         RETURN(-ENOTSUPP);
530
531                 if (le32_to_cpu(lum->lum_stripe_count) > 1) {
532                         if (!mdt_is_striped_client(exp))
533                                 RETURN(-ENOTSUPP);
534
535                         if (!mdt->mdt_enable_striped_dir)
536                                 RETURN(-EPERM);
537                 } else if (!mdt->mdt_enable_remote_dir) {
538                         RETURN(-EPERM);
539                 }
540
541                 if ((!(exp_connect_flags2(exp) & OBD_CONNECT2_CRUSH)) &&
542                     (le32_to_cpu(lum->lum_hash_type) & LMV_HASH_TYPE_MASK) >=
543                     LMV_HASH_TYPE_CRUSH)
544                         RETURN(-EPROTO);
545
546                 if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
547                     uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
548                     mdt->mdt_enable_remote_dir_gid != -1)
549                         RETURN(-EPERM);
550
551                 /* restripe if later found dir exists, MDS_OPEN_CREAT means
552                  * this is create only, don't try restripe.
553                  */
554                 if (mdt->mdt_enable_dir_restripe &&
555                     le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT &&
556                     !(spec->sp_cr_flags & MDS_OPEN_CREAT))
557                         restripe = true;
558         }
559
560         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
561
562         parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
563         if (IS_ERR(parent))
564                 RETURN(PTR_ERR(parent));
565
566         if (!mdt_object_exists(parent))
567                 GOTO(put_parent, rc = -ENOENT);
568
569         rc = mdt_check_enc(info, parent);
570         if (rc)
571                 GOTO(put_parent, rc);
572
573         /*
574          * LU-10235: check if name exists locklessly first to avoid massive
575          * lock recalls on existing directories.
576          */
577         rc = mdt_lookup_version_check(info, parent, &rr->rr_name,
578                                       &info->mti_tmp_fid1, 1);
579         if (rc == 0) {
580                 if (!restripe)
581                         GOTO(put_parent, rc = -EEXIST);
582
583                 rc = mdt_restripe(info, parent, &rr->rr_name, rr->rr_fid2, spec,
584                                   ma);
585         }
586
587         /* -ENOENT is expected here */
588         if (rc != -ENOENT)
589                 GOTO(put_parent, rc);
590
591         /* save version of file name for replay, it must be ENOENT here */
592         mdt_enoent_version_save(info, 1);
593
594         OBD_RACE(OBD_FAIL_MDS_CREATE_RACE);
595
596         lh = &info->mti_lh[MDT_LH_PARENT];
597         mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
598         rc = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE);
599         if (rc)
600                 GOTO(put_parent, rc);
601
602         if (!mdt_object_remote(parent)) {
603                 rc = mdt_version_get_check_save(info, parent, 0);
604                 if (rc)
605                         GOTO(unlock_parent, rc);
606         }
607
608         child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
609         if (unlikely(IS_ERR(child)))
610                 GOTO(unlock_parent, rc = PTR_ERR(child));
611
612         ma->ma_need = MA_INODE;
613         ma->ma_valid = 0;
614
615         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
616                         OBD_FAIL_MDS_REINT_CREATE_WRITE);
617
618         /* Version of child will be updated on disk. */
619         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
620         rc = mdt_version_get_check_save(info, child, 2);
621         if (rc)
622                 GOTO(put_child, rc);
623
624         /*
625          * Do not perform lookup sanity check. We know that name does
626          * not exist.
627          */
628         info->mti_spec.sp_cr_lookup = 0;
629         if (mdt_object_remote(parent))
630                 info->mti_spec.sp_cr_lookup = 1;
631         info->mti_spec.sp_feat = &dt_directory_features;
632
633         rc = mdo_create(info->mti_env, mdt_object_child(parent), &rr->rr_name,
634                         mdt_object_child(child), &info->mti_spec, ma);
635         if (rc == 0)
636                 rc = mdt_attr_get_complex(info, child, ma);
637
638         if (rc < 0)
639                 GOTO(put_child, rc);
640
641         /*
642          * On DNE, we need to eliminate dependey between 'mkdir a' and
643          * 'mkdir a/b' if b is a striped directory, to achieve this, two
644          * things are done below:
645          * 1. save child and slaves lock.
646          * 2. if the child is a striped directory, relock parent so to
647          *    compare against with COS locks to ensure parent was
648          *    committed to disk.
649          */
650         if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
651                 struct mdt_lock_handle *lhc;
652                 struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
653                 bool cos_incompat;
654
655                 rc = mdt_object_striped(info, child);
656                 if (rc < 0)
657                         GOTO(put_child, rc);
658
659                 cos_incompat = rc;
660                 if (cos_incompat) {
661                         if (!mdt_object_remote(parent)) {
662                                 mdt_object_unlock(info, parent, lh, 1);
663                                 mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
664                                 rc = mdt_reint_object_lock(info, parent, lh,
665                                                            MDS_INODELOCK_UPDATE,
666                                                            true);
667                                 if (rc)
668                                         GOTO(put_child, rc);
669                         }
670                 }
671
672                 lhc = &info->mti_lh[MDT_LH_CHILD];
673                 mdt_lock_handle_init(lhc);
674                 mdt_lock_reg_init(lhc, LCK_PW);
675                 rc = mdt_reint_striped_lock(info, child, lhc,
676                                             MDS_INODELOCK_UPDATE, einfo,
677                                             cos_incompat);
678                 if (rc)
679                         GOTO(put_child, rc);
680
681                 mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
682         }
683
684         /* Return fid & attr to client. */
685         if (ma->ma_valid & MA_INODE)
686                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
687                                    mdt_object_fid(child));
688         EXIT;
689 put_child:
690         mdt_object_put(info->mti_env, child);
691 unlock_parent:
692         mdt_object_unlock(info, parent, lh, rc);
693 put_parent:
694         mdt_object_put(info->mti_env, parent);
695         return rc;
696 }
697
698 static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
699                         struct md_attr *ma)
700 {
701         struct mdt_lock_handle  *lh;
702         int do_vbr = ma->ma_attr.la_valid &
703                         (LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS);
704         __u64 lockpart = MDS_INODELOCK_UPDATE;
705         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
706         bool cos_incompat;
707         int rc;
708
709         ENTRY;
710         rc = mdt_object_striped(info, mo);
711         if (rc < 0)
712                 RETURN(rc);
713
714         cos_incompat = rc;
715
716         lh = &info->mti_lh[MDT_LH_PARENT];
717         mdt_lock_reg_init(lh, LCK_PW);
718
719         /* Even though the new MDT will grant PERM lock to the old
720          * client, but the old client will almost ignore that during
721          * So it needs to revoke both LOOKUP and PERM lock here, so
722          * both new and old client can cancel the dcache
723          */
724         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
725                 lockpart |= MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM;
726         /* Clear xattr cache on clients, so the virtual project ID xattr
727          * can get the new project ID
728          */
729         if (ma->ma_attr.la_valid & LA_PROJID)
730                 lockpart |= MDS_INODELOCK_XATTR;
731
732         rc = mdt_reint_striped_lock(info, mo, lh, lockpart, einfo,
733                                     cos_incompat);
734         if (rc != 0)
735                 RETURN(rc);
736
737         /* all attrs are packed into mti_attr in unpack_setattr */
738         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
739                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
740
741         /* VBR: update version if attr changed are important for recovery */
742         if (do_vbr) {
743                 /* update on-disk version of changed object */
744                 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mo));
745                 rc = mdt_version_get_check_save(info, mo, 0);
746                 if (rc)
747                         GOTO(out_unlock, rc);
748         }
749
750         /* Ensure constant striping during chown(). See LU-2789. */
751         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
752                 mutex_lock(&mo->mot_lov_mutex);
753
754         /* all attrs are packed into mti_attr in unpack_setattr */
755         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
756
757         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
758                 mutex_unlock(&mo->mot_lov_mutex);
759
760         if (rc != 0)
761                 GOTO(out_unlock, rc);
762         mdt_dom_obj_lvb_update(info->mti_env, mo, NULL, false);
763         EXIT;
764 out_unlock:
765         mdt_reint_striped_unlock(info, mo, lh, einfo, rc);
766         return rc;
767 }
768
769 /**
770  * Check HSM flags and add HS_DIRTY flag if relevant.
771  *
772  * A file could be set dirty only if it has a copy in the backend (HS_EXISTS)
773  * and is not RELEASED.
774  */
775 int mdt_add_dirty_flag(struct mdt_thread_info *info, struct mdt_object *mo,
776                         struct md_attr *ma)
777 {
778         struct lu_ucred *uc = mdt_ucred(info);
779         kernel_cap_t cap_saved;
780         int rc;
781
782         ENTRY;
783         /* If the file was modified, add the dirty flag */
784         ma->ma_need = MA_HSM;
785         rc = mdt_attr_get_complex(info, mo, ma);
786         if (rc) {
787                 CERROR("file attribute read error for "DFID": %d.\n",
788                         PFID(mdt_object_fid(mo)), rc);
789                 RETURN(rc);
790         }
791
792         /* If an up2date copy exists in the backend, add dirty flag */
793         if ((ma->ma_valid & MA_HSM) && (ma->ma_hsm.mh_flags & HS_EXISTS)
794             && !(ma->ma_hsm.mh_flags & (HS_DIRTY|HS_RELEASED))) {
795                 ma->ma_hsm.mh_flags |= HS_DIRTY;
796
797                 /* Bump cap so that closes from non-owner writers can
798                  * set the HSM state to dirty.
799                  */
800                 cap_saved = uc->uc_cap;
801                 cap_raise(uc->uc_cap, CAP_FOWNER);
802                 rc = mdt_hsm_attr_set(info, mo, &ma->ma_hsm);
803                 uc->uc_cap = cap_saved;
804                 if (rc)
805                         CERROR("file attribute change error for "DFID": %d\n",
806                                 PFID(mdt_object_fid(mo)), rc);
807         }
808
809         RETURN(rc);
810 }
811
812 static int mdt_reint_setattr(struct mdt_thread_info *info,
813                              struct mdt_lock_handle *lhc)
814 {
815         struct mdt_device *mdt = info->mti_mdt;
816         struct md_attr *ma = &info->mti_attr;
817         struct mdt_reint_record *rr = &info->mti_rr;
818         struct ptlrpc_request *req = mdt_info_req(info);
819         struct mdt_object *mo;
820         struct mdt_body *repbody;
821         ktime_t kstart = ktime_get();
822         int rc, rc2;
823
824         ENTRY;
825         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
826                   (unsigned int)ma->ma_attr.la_valid);
827
828         if (info->mti_dlm_req)
829                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
830
831         OBD_RACE(OBD_FAIL_PTLRPC_RESEND_RACE);
832
833         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
834         mo = mdt_object_find(info->mti_env, mdt, rr->rr_fid1);
835         if (IS_ERR(mo))
836                 GOTO(out, rc = PTR_ERR(mo));
837
838         if (!mdt_object_exists(mo))
839                 GOTO(out_put, rc = -ENOENT);
840
841         if (mdt_object_remote(mo))
842                 GOTO(out_put, rc = -EREMOTE);
843
844         ma->ma_enable_chprojid_gid = mdt->mdt_enable_chprojid_gid;
845         /* revoke lease lock if size is going to be changed */
846         if (unlikely(ma->ma_attr.la_valid & LA_SIZE &&
847                      !(ma->ma_attr_flags & MDS_TRUNC_KEEP_LEASE) &&
848                      atomic_read(&mo->mot_lease_count) > 0)) {
849                 down_read(&mo->mot_open_sem);
850
851                 if (atomic_read(&mo->mot_lease_count) > 0) { /* lease exists */
852                         lhc = &info->mti_lh[MDT_LH_LOCAL];
853                         mdt_lock_reg_init(lhc, LCK_CW);
854
855                         rc = mdt_object_lock(info, mo, lhc, MDS_INODELOCK_OPEN);
856                         if (rc != 0) {
857                                 up_read(&mo->mot_open_sem);
858                                 GOTO(out_put, rc);
859                         }
860
861                         /* revoke lease lock */
862                         mdt_object_unlock(info, mo, lhc, 1);
863                 }
864                 up_read(&mo->mot_open_sem);
865         }
866
867         if (ma->ma_attr.la_valid & LA_SIZE || rr->rr_flags & MRF_OPEN_TRUNC) {
868                 /* Check write access for the O_TRUNC case */
869                 if (mdt_write_read(mo) < 0)
870                         GOTO(out_put, rc = -ETXTBSY);
871
872                 /* LU-10286: compatibility check for FLR.
873                  * Please check the comment in mdt_finish_open() for details
874                  */
875                 if (!exp_connect_flr(info->mti_exp) ||
876                     !exp_connect_overstriping(info->mti_exp)) {
877                         rc = mdt_big_xattr_get(info, mo, XATTR_NAME_LOV);
878                         if (rc < 0 && rc != -ENODATA)
879                                 GOTO(out_put, rc);
880
881                         if (!exp_connect_flr(info->mti_exp)) {
882                                 if (rc > 0 &&
883                                     mdt_lmm_is_flr(info->mti_big_lmm))
884                                         GOTO(out_put, rc = -EOPNOTSUPP);
885                         }
886
887                         if (!exp_connect_overstriping(info->mti_exp)) {
888                                 if (rc > 0 &&
889                                     mdt_lmm_is_overstriping(info->mti_big_lmm))
890                                         GOTO(out_put, rc = -EOPNOTSUPP);
891                         }
892                 }
893
894                 /* For truncate, the file size sent from client
895                  * is believable, but the blocks are incorrect,
896                  * which makes the block size in LSOM attribute
897                  * inconsisent with the real block size.
898                  */
899                 rc = mdt_lsom_update(info, mo, true);
900                 if (rc)
901                         GOTO(out_put, rc);
902         }
903
904         if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
905                 if (ma->ma_valid & MA_LOV)
906                         GOTO(out_put, rc = -EPROTO);
907
908                 /* MDT supports FMD for regular files due to Data-on-MDT */
909                 if (S_ISREG(lu_object_attr(&mo->mot_obj)) &&
910                     ma->ma_attr.la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) {
911                         tgt_fmd_update(info->mti_exp, mdt_object_fid(mo),
912                                        req->rq_xid);
913
914                         if (ma->ma_attr.la_valid & LA_MTIME) {
915                                 rc = mdt_attr_get_pfid(info, mo, &ma->ma_pfid);
916                                 if (!rc)
917                                         ma->ma_valid |= MA_PFID;
918                         }
919                 }
920
921                 rc = mdt_attr_set(info, mo, ma);
922                 if (rc)
923                         GOTO(out_put, rc);
924         } else if ((ma->ma_valid & (MA_LOV | MA_LMV)) &&
925                    (ma->ma_valid & MA_INODE)) {
926                 struct lu_buf *buf = &info->mti_buf;
927                 struct lu_ucred *uc = mdt_ucred(info);
928                 struct mdt_lock_handle *lh;
929                 const char *name;
930                 __u64 lockpart = MDS_INODELOCK_XATTR;
931
932                 /* reject if either remote or striped dir is disabled */
933                 if (ma->ma_valid & MA_LMV) {
934                         if (!mdt->mdt_enable_remote_dir ||
935                             !mdt->mdt_enable_striped_dir)
936                                 GOTO(out_put, rc = -EPERM);
937
938                         if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
939                             uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
940                             mdt->mdt_enable_remote_dir_gid != -1)
941                                 GOTO(out_put, rc = -EPERM);
942                 }
943
944                 if (!S_ISDIR(lu_object_attr(&mo->mot_obj)))
945                         GOTO(out_put, rc = -ENOTDIR);
946
947                 if (ma->ma_attr.la_valid != 0)
948                         GOTO(out_put, rc = -EPROTO);
949
950                 lh = &info->mti_lh[MDT_LH_PARENT];
951                 mdt_lock_reg_init(lh, LCK_PW);
952
953                 if (ma->ma_valid & MA_LOV) {
954                         buf->lb_buf = ma->ma_lmm;
955                         buf->lb_len = ma->ma_lmm_size;
956                         name = XATTR_NAME_LOV;
957                 } else {
958                         struct lmv_user_md *lmu = &ma->ma_lmv->lmv_user_md;
959                         struct lu_fid *pfid = &info->mti_tmp_fid1;
960                         struct lu_name *pname = &info->mti_name;
961                         const char dotdot[] = "..";
962                         struct mdt_object *pobj;
963
964                         buf->lb_buf = lmu;
965                         buf->lb_len = ma->ma_lmv_size;
966                         name = XATTR_NAME_DEFAULT_LMV;
967
968                         if (fid_is_root(rr->rr_fid1)) {
969                                 lockpart |= MDS_INODELOCK_LOOKUP;
970                         } else {
971                                 /* force client to update dir default layout */
972                                 fid_zero(pfid);
973                                 pname->ln_name = dotdot;
974                                 pname->ln_namelen = sizeof(dotdot);
975                                 rc = mdo_lookup(info->mti_env,
976                                                 mdt_object_child(mo), pname,
977                                                 pfid, NULL);
978                                 if (rc)
979                                         GOTO(out_put, rc);
980
981                                 pobj = mdt_object_find(info->mti_env, mdt,
982                                                        pfid);
983                                 if (IS_ERR(pobj))
984                                         GOTO(out_put, rc = PTR_ERR(pobj));
985
986                                 if (mdt_object_remote(pobj))
987                                         rc = mdt_remote_object_lock(info, pobj,
988                                                 mdt_object_fid(mo),
989                                                 &lh->mlh_rreg_lh, LCK_EX,
990                                                 MDS_INODELOCK_LOOKUP, false);
991                                 else
992                                         lockpart |= MDS_INODELOCK_LOOKUP;
993
994                                 mdt_object_put(info->mti_env, pobj);
995
996                                 if (rc)
997                                         GOTO(out_put, rc);
998                         }
999                 }
1000
1001                 rc = mdt_object_lock(info, mo, lh, lockpart);
1002                 if (rc != 0)
1003                         GOTO(out_put, rc);
1004
1005                 rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), buf,
1006                                   name, 0);
1007
1008                 mdt_object_unlock(info, mo, lh, rc);
1009                 if (rc)
1010                         GOTO(out_put, rc);
1011         } else {
1012                 GOTO(out_put, rc = -EPROTO);
1013         }
1014
1015         /* If file data is modified, add the dirty flag */
1016         if (ma->ma_attr_flags & MDS_DATA_MODIFIED)
1017                 rc = mdt_add_dirty_flag(info, mo, ma);
1018
1019         ma->ma_need = MA_INODE;
1020         ma->ma_valid = 0;
1021         rc = mdt_attr_get_complex(info, mo, ma);
1022         if (rc != 0)
1023                 GOTO(out_put, rc);
1024
1025         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
1026
1027         EXIT;
1028 out_put:
1029         mdt_object_put(info->mti_env, mo);
1030 out:
1031         if (rc == 0)
1032                 mdt_counter_incr(req, LPROC_MDT_SETATTR,
1033                                  ktime_us_delta(ktime_get(), kstart));
1034
1035         mdt_client_compatibility(info);
1036         rc2 = mdt_fix_reply(info);
1037         if (rc == 0)
1038                 rc = rc2;
1039         return rc;
1040 }
1041
1042 static int mdt_reint_create(struct mdt_thread_info *info,
1043                             struct mdt_lock_handle *lhc)
1044 {
1045         struct ptlrpc_request   *req = mdt_info_req(info);
1046         ktime_t                 kstart = ktime_get();
1047         int                     rc;
1048
1049         ENTRY;
1050         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
1051                 RETURN(err_serious(-ESTALE));
1052
1053         if (info->mti_dlm_req)
1054                 ldlm_request_cancel(mdt_info_req(info),
1055                                     info->mti_dlm_req, 0, LATF_SKIP);
1056
1057         if (!lu_name_is_valid(&info->mti_rr.rr_name))
1058                 RETURN(-EPROTO);
1059
1060         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
1061         case S_IFDIR:
1062         case S_IFREG:
1063         case S_IFLNK:
1064         case S_IFCHR:
1065         case S_IFBLK:
1066         case S_IFIFO:
1067         case S_IFSOCK:
1068                 break;
1069         default:
1070                 CERROR("%s: Unsupported mode %o\n",
1071                        mdt_obd_name(info->mti_mdt),
1072                        info->mti_attr.ma_attr.la_mode);
1073                 RETURN(err_serious(-EOPNOTSUPP));
1074         }
1075
1076         rc = mdt_create(info);
1077         if (rc == 0) {
1078                 if ((info->mti_attr.ma_attr.la_mode & S_IFMT) == S_IFDIR)
1079                         mdt_counter_incr(req, LPROC_MDT_MKDIR,
1080                                          ktime_us_delta(ktime_get(), kstart));
1081                 else
1082                         /* Special file should stay on the same node as parent*/
1083                         mdt_counter_incr(req, LPROC_MDT_MKNOD,
1084                                          ktime_us_delta(ktime_get(), kstart));
1085         }
1086
1087         RETURN(rc);
1088 }
1089
1090 /*
1091  * VBR: save parent version in reply and child version getting by its name.
1092  * Version of child is getting and checking during its lookup. If
1093  */
1094 static int mdt_reint_unlink(struct mdt_thread_info *info,
1095                             struct mdt_lock_handle *lhc)
1096 {
1097         struct mdt_reint_record *rr = &info->mti_rr;
1098         struct ptlrpc_request *req = mdt_info_req(info);
1099         struct md_attr *ma = &info->mti_attr;
1100         struct lu_fid *child_fid = &info->mti_tmp_fid1;
1101         struct mdt_object *mp;
1102         struct mdt_object *mc;
1103         struct mdt_lock_handle *parent_lh;
1104         struct mdt_lock_handle *child_lh;
1105         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
1106         __u64 lock_ibits;
1107         bool cos_incompat = false;
1108         int no_name = 0;
1109         ktime_t kstart = ktime_get();
1110         int rc;
1111
1112         ENTRY;
1113         DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1),
1114                   PNAME(&rr->rr_name));
1115
1116         if (info->mti_dlm_req)
1117                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1118
1119         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
1120                 RETURN(err_serious(-ENOENT));
1121
1122         if (!fid_is_md_operative(rr->rr_fid1))
1123                 RETURN(-EPERM);
1124
1125         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1126         if (IS_ERR(mp))
1127                 RETURN(PTR_ERR(mp));
1128
1129         if (mdt_object_remote(mp)) {
1130                 cos_incompat = true;
1131         } else {
1132                 rc = mdt_version_get_check_save(info, mp, 0);
1133                 if (rc)
1134                         GOTO(put_parent, rc);
1135         }
1136
1137         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN);
1138         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN2);
1139 relock:
1140         parent_lh = &info->mti_lh[MDT_LH_PARENT];
1141         mdt_lock_pdo_init(parent_lh, LCK_PW, &rr->rr_name);
1142         rc = mdt_reint_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE,
1143                                    cos_incompat);
1144         if (rc != 0)
1145                 GOTO(put_parent, rc);
1146
1147         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID) {
1148                 *child_fid = *rr->rr_fid2;
1149         } else {
1150                 /* lookup child object along with version checking */
1151                 fid_zero(child_fid);
1152                 rc = mdt_lookup_version_check(info, mp, &rr->rr_name, child_fid,
1153                                               1);
1154                 if (rc != 0) {
1155                         /* Name might not be able to find during resend of
1156                          * remote unlink, considering following case.
1157                          * dir_A is a remote directory, the name entry of
1158                          * dir_A is on MDT0, the directory is on MDT1,
1159                          *
1160                          * 1. client sends unlink req to MDT1.
1161                          * 2. MDT1 sends name delete update to MDT0.
1162                          * 3. name entry is being deleted in MDT0 synchronously.
1163                          * 4. MDT1 is restarted.
1164                          * 5. client resends unlink req to MDT1. So it can not
1165                          *    find the name entry on MDT0 anymore.
1166                          * In this case, MDT1 only needs to destory the local
1167                          * directory.
1168                          */
1169                         if (mdt_object_remote(mp) && rc == -ENOENT &&
1170                             !fid_is_zero(rr->rr_fid2) &&
1171                             lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
1172                                 no_name = 1;
1173                                 *child_fid = *rr->rr_fid2;
1174                         } else {
1175                                 GOTO(unlock_parent, rc);
1176                         }
1177                 }
1178         }
1179
1180         if (!fid_is_md_operative(child_fid))
1181                 GOTO(unlock_parent, rc = -EPERM);
1182
1183         /* We will lock the child regardless it is local or remote. No harm. */
1184         mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
1185         if (IS_ERR(mc))
1186                 GOTO(unlock_parent, rc = PTR_ERR(mc));
1187
1188         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID) {
1189                 /* In this case, child fid is embedded in the request, and we do
1190                  * not have a proper name as rr_name contains an encoded
1191                  * hash. So find name that matches provided hash.
1192                  */
1193                 if (!find_name_matching_hash(info, &rr->rr_name,
1194                                              NULL, mc))
1195                         GOTO(put_child, rc = -ENOENT);
1196         }
1197
1198         if (!cos_incompat) {
1199                 rc = mdt_object_striped(info, mc);
1200                 if (rc < 0)
1201                         GOTO(put_child, rc);
1202
1203                 cos_incompat = rc;
1204                 if (cos_incompat) {
1205                         mdt_object_put(info->mti_env, mc);
1206                         mdt_object_unlock(info, mp, parent_lh, -EAGAIN);
1207                         goto relock;
1208                 }
1209         }
1210
1211         child_lh = &info->mti_lh[MDT_LH_CHILD];
1212         mdt_lock_reg_init(child_lh, LCK_EX);
1213         if (info->mti_spec.sp_rm_entry) {
1214                 struct lu_ucred *uc  = mdt_ucred(info);
1215
1216                 if (!mdt_is_dne_client(req->rq_export))
1217                         /* Return -ENOTSUPP for old client */
1218                         GOTO(put_child, rc = -ENOTSUPP);
1219
1220                 if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN))
1221                         GOTO(put_child, rc = -EPERM);
1222
1223                 ma->ma_need = MA_INODE;
1224                 ma->ma_valid = 0;
1225                 rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1226                                 NULL, &rr->rr_name, ma, no_name);
1227                 GOTO(put_child, rc);
1228         }
1229
1230         if (mdt_object_remote(mc)) {
1231                 struct mdt_body  *repbody;
1232
1233                 if (!fid_is_zero(rr->rr_fid2)) {
1234                         CDEBUG(D_INFO, "%s: name "DNAME" cannot find "DFID"\n",
1235                                mdt_obd_name(info->mti_mdt),
1236                                PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1237                         GOTO(put_child, rc = -ENOENT);
1238                 }
1239                 CDEBUG(D_INFO, "%s: name "DNAME": "DFID" is on another MDT\n",
1240                        mdt_obd_name(info->mti_mdt),
1241                        PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1242
1243                 if (!mdt_is_dne_client(req->rq_export))
1244                         /* Return -ENOTSUPP for old client */
1245                         GOTO(put_child, rc = -ENOTSUPP);
1246
1247                 /* Revoke the LOOKUP lock of the remote object granted by
1248                  * this MDT. Since the unlink will happen on another MDT,
1249                  * it will release the LOOKUP lock right away. Then What
1250                  * would happen if another client try to grab the LOOKUP
1251                  * lock at the same time with unlink XXX
1252                  */
1253                 mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_LOOKUP);
1254                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1255                 LASSERT(repbody != NULL);
1256                 repbody->mbo_fid1 = *mdt_object_fid(mc);
1257                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
1258                 GOTO(unlock_child, rc = -EREMOTE);
1259         }
1260         /* We used to acquire MDS_INODELOCK_FULL here but we can't do
1261          * this now because a running HSM restore on the child (unlink
1262          * victim) will hold the layout lock. See LU-4002.
1263          */
1264         lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
1265         if (mdt_object_remote(mp)) {
1266                 /* Enqueue lookup lock from parent MDT */
1267                 rc = mdt_remote_object_lock(info, mp, mdt_object_fid(mc),
1268                                             &child_lh->mlh_rreg_lh,
1269                                             child_lh->mlh_rreg_mode,
1270                                             MDS_INODELOCK_LOOKUP, false);
1271                 if (rc != ELDLM_OK)
1272                         GOTO(put_child, rc);
1273
1274                 lock_ibits &= ~MDS_INODELOCK_LOOKUP;
1275         }
1276
1277         rc = mdt_reint_striped_lock(info, mc, child_lh, lock_ibits, einfo,
1278                                     cos_incompat);
1279         if (rc != 0)
1280                 GOTO(put_child, rc);
1281
1282         /*
1283          * Now we can only make sure we need MA_INODE, in mdd layer, will check
1284          * whether need MA_LOV and MA_COOKIE.
1285          */
1286         ma->ma_need = MA_INODE;
1287         ma->ma_valid = 0;
1288
1289         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1290                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
1291         /* save version when object is locked */
1292         mdt_version_get_save(info, mc, 1);
1293
1294         mutex_lock(&mc->mot_lov_mutex);
1295
1296         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1297                         mdt_object_child(mc), &rr->rr_name, ma, no_name);
1298
1299         mutex_unlock(&mc->mot_lov_mutex);
1300         if (rc != 0)
1301                 GOTO(unlock_child, rc);
1302
1303         if (!lu_object_is_dying(&mc->mot_header)) {
1304                 rc = mdt_attr_get_complex(info, mc, ma);
1305                 if (rc)
1306                         GOTO(out_stat, rc);
1307         } else if (mdt_dom_check_for_discard(info, mc)) {
1308                 mdt_dom_discard_data(info, mc);
1309         }
1310         mdt_handle_last_unlink(info, mc, ma);
1311
1312 out_stat:
1313         if (ma->ma_valid & MA_INODE) {
1314                 switch (ma->ma_attr.la_mode & S_IFMT) {
1315                 case S_IFDIR:
1316                         mdt_counter_incr(req, LPROC_MDT_RMDIR,
1317                                          ktime_us_delta(ktime_get(), kstart));
1318                         break;
1319                 case S_IFREG:
1320                 case S_IFLNK:
1321                 case S_IFCHR:
1322                 case S_IFBLK:
1323                 case S_IFIFO:
1324                 case S_IFSOCK:
1325                         mdt_counter_incr(req, LPROC_MDT_UNLINK,
1326                                          ktime_us_delta(ktime_get(), kstart));
1327                         break;
1328                 default:
1329                         LASSERTF(0, "bad file type %o unlinking\n",
1330                                 ma->ma_attr.la_mode);
1331                 }
1332         }
1333
1334         EXIT;
1335
1336 unlock_child:
1337         mdt_reint_striped_unlock(info, mc, child_lh, einfo, rc);
1338 put_child:
1339         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID &&
1340             info->mti_big_buf.lb_buf)
1341                 lu_buf_free(&info->mti_big_buf);
1342         mdt_object_put(info->mti_env, mc);
1343 unlock_parent:
1344         mdt_object_unlock(info, mp, parent_lh, rc);
1345 put_parent:
1346         mdt_object_put(info->mti_env, mp);
1347         CFS_RACE_WAKEUP(OBD_FAIL_OBD_ZERO_NLINK_RACE);
1348         return rc;
1349 }
1350
1351 /*
1352  * VBR: save versions in reply: 0 - parent; 1 - child by fid; 2 - target by
1353  * name.
1354  */
1355 static int mdt_reint_link(struct mdt_thread_info *info,
1356                           struct mdt_lock_handle *lhc)
1357 {
1358         struct mdt_reint_record *rr = &info->mti_rr;
1359         struct ptlrpc_request   *req = mdt_info_req(info);
1360         struct md_attr          *ma = &info->mti_attr;
1361         struct mdt_object       *ms;
1362         struct mdt_object       *mp;
1363         struct mdt_lock_handle  *lhs;
1364         struct mdt_lock_handle  *lhp;
1365         ktime_t kstart = ktime_get();
1366         bool cos_incompat;
1367         int rc;
1368
1369         ENTRY;
1370         DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/"DNAME,
1371                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), PNAME(&rr->rr_name));
1372
1373         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
1374                 RETURN(err_serious(-ENOENT));
1375
1376         if (OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_RESEND_RACE) ||
1377             OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND)) {
1378                 req->rq_no_reply = 1;
1379                 RETURN(err_serious(-ENOENT));
1380         }
1381
1382         if (info->mti_dlm_req)
1383                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1384
1385         /* Invalid case so return error immediately instead of
1386          * processing it
1387          */
1388         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
1389                 RETURN(-EPERM);
1390
1391         if (!fid_is_md_operative(rr->rr_fid1) ||
1392             !fid_is_md_operative(rr->rr_fid2))
1393                 RETURN(-EPERM);
1394
1395         /* step 1: find target parent dir */
1396         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
1397         if (IS_ERR(mp))
1398                 RETURN(PTR_ERR(mp));
1399
1400         rc = mdt_version_get_check_save(info, mp, 0);
1401         if (rc)
1402                 GOTO(put_parent, rc);
1403
1404         rc = mdt_check_enc(info, mp);
1405         if (rc)
1406                 GOTO(put_parent, rc);
1407
1408         /* step 2: find source */
1409         ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1410         if (IS_ERR(ms))
1411                 GOTO(put_parent, rc = PTR_ERR(ms));
1412
1413         if (!mdt_object_exists(ms)) {
1414                 CDEBUG(D_INFO, "%s: "DFID" does not exist.\n",
1415                        mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1));
1416                 GOTO(put_source, rc = -ENOENT);
1417         }
1418
1419         cos_incompat = (mdt_object_remote(mp) || mdt_object_remote(ms));
1420
1421         OBD_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
1422
1423         lhp = &info->mti_lh[MDT_LH_PARENT];
1424         mdt_lock_pdo_init(lhp, LCK_PW, &rr->rr_name);
1425         rc = mdt_reint_object_lock(info, mp, lhp, MDS_INODELOCK_UPDATE,
1426                                    cos_incompat);
1427         if (rc != 0)
1428                 GOTO(put_source, rc);
1429
1430         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
1431
1432         lhs = &info->mti_lh[MDT_LH_CHILD];
1433         mdt_lock_reg_init(lhs, LCK_EX);
1434         rc = mdt_reint_object_lock(info, ms, lhs,
1435                                    MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
1436                                    cos_incompat);
1437         if (rc != 0)
1438                 GOTO(unlock_parent, rc);
1439
1440         /* step 3: link it */
1441         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1442                         OBD_FAIL_MDS_REINT_LINK_WRITE);
1443
1444         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(ms));
1445         rc = mdt_version_get_check_save(info, ms, 1);
1446         if (rc)
1447                 GOTO(unlock_source, rc);
1448
1449         /** check target version by name during replay */
1450         rc = mdt_lookup_version_check(info, mp, &rr->rr_name,
1451                                       &info->mti_tmp_fid1, 2);
1452         if (rc != 0 && rc != -ENOENT)
1453                 GOTO(unlock_source, rc);
1454         /* save version of file name for replay, it must be ENOENT here */
1455         if (!req_is_replay(mdt_info_req(info))) {
1456                 if (rc != -ENOENT) {
1457                         CDEBUG(D_INFO, "link target "DNAME" existed!\n",
1458                                PNAME(&rr->rr_name));
1459                         GOTO(unlock_source, rc = -EEXIST);
1460                 }
1461                 info->mti_ver[2] = ENOENT_VERSION;
1462                 mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2);
1463         }
1464
1465         rc = mdo_link(info->mti_env, mdt_object_child(mp),
1466                       mdt_object_child(ms), &rr->rr_name, ma);
1467
1468         if (rc == 0)
1469                 mdt_counter_incr(req, LPROC_MDT_LINK,
1470                                  ktime_us_delta(ktime_get(), kstart));
1471
1472         EXIT;
1473 unlock_source:
1474         mdt_object_unlock(info, ms, lhs, rc);
1475 unlock_parent:
1476         mdt_object_unlock(info, mp, lhp, rc);
1477 put_source:
1478         mdt_object_put(info->mti_env, ms);
1479 put_parent:
1480         mdt_object_put(info->mti_env, mp);
1481         return rc;
1482 }
1483 /**
1484  * lock the part of the directory according to the hash of the name
1485  * (lh->mlh_pdo_hash) in parallel directory lock.
1486  */
1487 static int mdt_pdir_hash_lock(struct mdt_thread_info *info,
1488                               struct mdt_lock_handle *lh,
1489                               struct mdt_object *obj, __u64 ibits,
1490                               bool cos_incompat)
1491 {
1492         struct ldlm_res_id *res = &info->mti_res_id;
1493         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1494         union ldlm_policy_data *policy = &info->mti_policy;
1495         __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
1496         int rc;
1497
1498         /*
1499          * Finish res_id initializing by name hash marking part of
1500          * directory which is taking modification.
1501          */
1502         LASSERT(lh->mlh_pdo_hash != 0);
1503         fid_build_pdo_res_name(mdt_object_fid(obj), lh->mlh_pdo_hash, res);
1504         memset(policy, 0, sizeof(*policy));
1505         policy->l_inodebits.bits = ibits;
1506         if (cos_incompat &&
1507             (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX))
1508                 dlmflags |= LDLM_FL_COS_INCOMPAT;
1509         /*
1510          * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
1511          * going to be sent to client. If it is - mdt_intent_policy() path will
1512          * fix it up and turn FL_LOCAL flag off.
1513          */
1514         rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode,
1515                           policy, res, dlmflags,
1516                           &info->mti_exp->exp_handle.h_cookie);
1517         return rc;
1518 }
1519
1520 /**
1521  * Get BFL lock for rename or migrate process.
1522  **/
1523 static int mdt_rename_lock(struct mdt_thread_info *info,
1524                            struct lustre_handle *lh)
1525 {
1526         int     rc;
1527
1528         ENTRY;
1529         if (mdt_seq_site(info->mti_mdt)->ss_node_id != 0) {
1530                 struct lu_fid *fid = &info->mti_tmp_fid1;
1531                 struct mdt_object *obj;
1532
1533                 /* XXX, right now, it has to use object API to
1534                  * enqueue lock cross MDT, so it will enqueue
1535                  * rename lock(with LUSTRE_BFL_FID) by root object
1536                  */
1537                 lu_root_fid(fid);
1538                 obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1539                 if (IS_ERR(obj))
1540                         RETURN(PTR_ERR(obj));
1541
1542                 rc = mdt_remote_object_lock(info, obj,
1543                                             &LUSTRE_BFL_FID, lh,
1544                                             LCK_EX,
1545                                             MDS_INODELOCK_UPDATE, false);
1546                 mdt_object_put(info->mti_env, obj);
1547         } else {
1548                 struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1549                 union ldlm_policy_data *policy = &info->mti_policy;
1550                 struct ldlm_res_id *res_id = &info->mti_res_id;
1551                 __u64 flags = 0;
1552
1553                 fid_build_reg_res_name(&LUSTRE_BFL_FID, res_id);
1554                 memset(policy, 0, sizeof(*policy));
1555                 policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
1556                 flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
1557                 rc = ldlm_cli_enqueue_local(info->mti_env, ns, res_id,
1558                                             LDLM_IBITS, policy, LCK_EX, &flags,
1559                                             ldlm_blocking_ast,
1560                                             ldlm_completion_ast, NULL, NULL, 0,
1561                                             LVB_T_NONE,
1562                                             &info->mti_exp->exp_handle.h_cookie,
1563                                             lh);
1564                 RETURN(rc);
1565         }
1566         RETURN(rc);
1567 }
1568
1569 static void mdt_rename_unlock(struct lustre_handle *lh)
1570 {
1571         ENTRY;
1572         LASSERT(lustre_handle_is_used(lh));
1573         /* Cancel the single rename lock right away */
1574         ldlm_lock_decref_and_cancel(lh, LCK_EX);
1575         EXIT;
1576 }
1577
1578 static struct mdt_object *mdt_parent_find_check(struct mdt_thread_info *info,
1579                                                 const struct lu_fid *fid,
1580                                                 int idx)
1581 {
1582         struct mdt_object *dir;
1583         int rc;
1584
1585         ENTRY;
1586         dir = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1587         if (IS_ERR(dir))
1588                 RETURN(dir);
1589
1590         /* check early, the real version will be saved after locking */
1591         rc = mdt_version_get_check(info, dir, idx);
1592         if (rc)
1593                 GOTO(out_put, rc);
1594
1595         if (!mdt_object_exists(dir))
1596                 GOTO(out_put, rc = -ENOENT);
1597
1598         if (!S_ISDIR(lu_object_attr(&dir->mot_obj)))
1599                 GOTO(out_put, rc = -ENOTDIR);
1600
1601         RETURN(dir);
1602 out_put:
1603         mdt_object_put(info->mti_env, dir);
1604         return ERR_PTR(rc);
1605 }
1606
1607 /*
1608  * in case obj is remote obj on its parent, revoke LOOKUP lock,
1609  * herein we don't really check it, just do revoke.
1610  */
1611 int mdt_revoke_remote_lookup_lock(struct mdt_thread_info *info,
1612                                   struct mdt_object *pobj,
1613                                   struct mdt_object *obj)
1614 {
1615         struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
1616         int rc;
1617
1618         mdt_lock_handle_init(lh);
1619         mdt_lock_reg_init(lh, LCK_EX);
1620
1621         if (mdt_object_remote(pobj)) {
1622                 /* don't bother to check if pobj and obj are on the same MDT. */
1623                 rc = mdt_remote_object_lock(info, pobj, mdt_object_fid(obj),
1624                                             &lh->mlh_rreg_lh, LCK_EX,
1625                                             MDS_INODELOCK_LOOKUP, false);
1626         } else if (mdt_object_remote(obj)) {
1627                 struct ldlm_res_id *res = &info->mti_res_id;
1628                 union ldlm_policy_data *policy = &info->mti_policy;
1629                 __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB |
1630                                  LDLM_FL_COS_INCOMPAT;
1631
1632                 fid_build_reg_res_name(mdt_object_fid(obj), res);
1633                 memset(policy, 0, sizeof(*policy));
1634                 policy->l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1635                 rc = mdt_fid_lock(info->mti_env, info->mti_mdt->mdt_namespace,
1636                                   &lh->mlh_reg_lh, LCK_EX, policy, res,
1637                                   dlmflags, NULL);
1638         } else {
1639                 /* do nothing if both are local */
1640                 return 0;
1641         }
1642
1643         if (rc != ELDLM_OK)
1644                 return rc;
1645
1646         /*
1647          * TODO, currently we don't save this lock because there is no place to
1648          * hold this lock handle, but to avoid race we need to save this lock.
1649          */
1650         mdt_object_unlock(info, NULL, lh, 1);
1651
1652         return 0;
1653 }
1654
1655 /*
1656  * operation may takes locks of linkea, or directory stripes, group them in
1657  * different list.
1658  */
1659 struct mdt_sub_lock {
1660         struct mdt_object *msl_obj;
1661         struct mdt_lock_handle msl_lh;
1662         struct list_head msl_linkage;
1663 };
1664
1665 static void mdt_unlock_list(struct mdt_thread_info *info,
1666                             struct list_head *list, int decref)
1667 {
1668         struct mdt_sub_lock *msl;
1669         struct mdt_sub_lock *tmp;
1670
1671         list_for_each_entry_safe(msl, tmp, list, msl_linkage) {
1672                 mdt_object_unlock_put(info, msl->msl_obj, &msl->msl_lh, decref);
1673                 list_del(&msl->msl_linkage);
1674                 OBD_FREE_PTR(msl);
1675         }
1676 }
1677
1678 static inline void mdt_migrate_object_unlock(struct mdt_thread_info *info,
1679                                              struct mdt_object *obj,
1680                                              struct mdt_lock_handle *lh,
1681                                              struct ldlm_enqueue_info *einfo,
1682                                              struct list_head *slave_locks,
1683                                              int decref)
1684 {
1685         if (mdt_object_remote(obj)) {
1686                 mdt_unlock_list(info, slave_locks, decref);
1687                 mdt_object_unlock(info, obj, lh, decref);
1688         } else {
1689                 mdt_reint_striped_unlock(info, obj, lh, einfo, decref);
1690         }
1691 }
1692
1693 /*
1694  * lock parents of links, and also check whether total locks don't exceed
1695  * RS_MAX_LOCKS.
1696  *
1697  * \retval      0 on success, and locks can be saved in ptlrpc_reply_stat
1698  * \retval      1 on success, but total lock count may exceed RS_MAX_LOCKS
1699  * \retval      -ev negative errno upon error
1700  */
1701 static int mdt_link_parents_lock(struct mdt_thread_info *info,
1702                                  struct mdt_object *pobj,
1703                                  const struct md_attr *ma,
1704                                  struct mdt_object *obj,
1705                                  struct mdt_lock_handle *lhp,
1706                                  struct ldlm_enqueue_info *peinfo,
1707                                  struct list_head *parent_slave_locks,
1708                                  struct list_head *link_locks)
1709 {
1710         struct mdt_device *mdt = info->mti_mdt;
1711         struct lu_buf *buf = &info->mti_big_buf;
1712         struct lu_name *lname = &info->mti_name;
1713         struct linkea_data ldata = { NULL };
1714         bool blocked = false;
1715         int local_lnkp_cnt = 0;
1716         int rc;
1717
1718         ENTRY;
1719         if (S_ISDIR(lu_object_attr(&obj->mot_obj)))
1720                 RETURN(0);
1721
1722         buf = lu_buf_check_and_alloc(buf, MAX_LINKEA_SIZE);
1723         if (buf->lb_buf == NULL)
1724                 RETURN(-ENOMEM);
1725
1726         ldata.ld_buf = buf;
1727         rc = mdt_links_read(info, obj, &ldata);
1728         if (rc) {
1729                 if (rc == -ENOENT || rc == -ENODATA)
1730                         rc = 0;
1731                 RETURN(rc);
1732         }
1733
1734         for (linkea_first_entry(&ldata); ldata.ld_lee && !rc;
1735              linkea_next_entry(&ldata)) {
1736                 struct mdt_object *lnkp;
1737                 struct mdt_sub_lock *msl;
1738                 struct lu_fid fid;
1739                 __u64 ibits;
1740
1741                 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, lname,
1742                                     &fid);
1743
1744                 /* check if it's also linked to parent */
1745                 if (lu_fid_eq(mdt_object_fid(pobj), &fid)) {
1746                         CDEBUG(D_INFO, "skip parent "DFID", reovke "DNAME"\n",
1747                                PFID(&fid), PNAME(lname));
1748                         /* in case link is remote object, revoke LOOKUP lock */
1749                         rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
1750                         continue;
1751                 }
1752
1753                 lnkp = NULL;
1754
1755                 /* check if it's linked to a stripe of parent */
1756                 if (ma->ma_valid & MA_LMV) {
1757                         struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1758                         struct lu_fid *stripe_fid = &info->mti_tmp_fid1;
1759                         int j = 0;
1760
1761                         for (; j < le32_to_cpu(lmv->lmv_stripe_count); j++) {
1762                                 fid_le_to_cpu(stripe_fid,
1763                                               &lmv->lmv_stripe_fids[j]);
1764                                 if (lu_fid_eq(stripe_fid, &fid)) {
1765                                         CDEBUG(D_INFO, "skip stripe "DFID
1766                                                ", reovke "DNAME"\n",
1767                                                PFID(&fid), PNAME(lname));
1768                                         lnkp = mdt_object_find(info->mti_env,
1769                                                                mdt, &fid);
1770                                         if (IS_ERR(lnkp))
1771                                                 GOTO(out, rc = PTR_ERR(lnkp));
1772                                         break;
1773                                 }
1774                         }
1775
1776                         if (lnkp) {
1777                                 rc = mdt_revoke_remote_lookup_lock(info, lnkp,
1778                                                                    obj);
1779                                 mdt_object_put(info->mti_env, lnkp);
1780                                 continue;
1781                         }
1782                 }
1783
1784                 /* Check if it's already locked */
1785                 list_for_each_entry(msl, link_locks, msl_linkage) {
1786                         if (lu_fid_eq(mdt_object_fid(msl->msl_obj), &fid)) {
1787                                 CDEBUG(D_INFO,
1788                                        DFID" was locked, revoke "DNAME"\n",
1789                                        PFID(&fid), PNAME(lname));
1790                                 lnkp = msl->msl_obj;
1791                                 break;
1792                         }
1793                 }
1794
1795                 if (lnkp) {
1796                         rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
1797                         continue;
1798                 }
1799
1800                 CDEBUG(D_INFO, "lock "DFID":"DNAME"\n",
1801                        PFID(&fid), PNAME(lname));
1802
1803                 lnkp = mdt_object_find(info->mti_env, mdt, &fid);
1804                 if (IS_ERR(lnkp)) {
1805                         CWARN("%s: cannot find obj "DFID": %ld\n",
1806                               mdt_obd_name(mdt), PFID(&fid), PTR_ERR(lnkp));
1807                         continue;
1808                 }
1809
1810                 if (!mdt_object_exists(lnkp)) {
1811                         CDEBUG(D_INFO, DFID" doesn't exist, skip "DNAME"\n",
1812                               PFID(&fid), PNAME(lname));
1813                         mdt_object_put(info->mti_env, lnkp);
1814                         continue;
1815                 }
1816
1817                 if (!mdt_object_remote(lnkp))
1818                         local_lnkp_cnt++;
1819
1820                 OBD_ALLOC_PTR(msl);
1821                 if (msl == NULL)
1822                         GOTO(out, rc = -ENOMEM);
1823
1824                 /*
1825                  * we can't follow parent-child lock order like other MD
1826                  * operations, use lock_try here to avoid deadlock, if the lock
1827                  * cannot be taken, drop all locks taken, revoke the blocked
1828                  * one, and continue processing the remaining entries, and in
1829                  * the end of the loop restart from beginning.
1830                  */
1831                 mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
1832                 ibits = 0;
1833                 rc = mdt_object_lock_try(info, lnkp, &msl->msl_lh, &ibits,
1834                                          MDS_INODELOCK_UPDATE, true);
1835                 if (!(ibits & MDS_INODELOCK_UPDATE)) {
1836
1837                         CDEBUG(D_INFO, "busy lock on "DFID" "DNAME"\n",
1838                                PFID(&fid), PNAME(lname));
1839
1840                         mdt_unlock_list(info, link_locks, 1);
1841                         /* also unlock parent locks to avoid deadlock */
1842                         if (!blocked)
1843                                 mdt_migrate_object_unlock(info, pobj, lhp,
1844                                                           peinfo,
1845                                                           parent_slave_locks,
1846                                                           1);
1847
1848                         blocked = true;
1849
1850                         mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
1851                         rc = mdt_object_lock(info, lnkp, &msl->msl_lh,
1852                                              MDS_INODELOCK_UPDATE);
1853                         if (rc) {
1854                                 mdt_object_put(info->mti_env, lnkp);
1855                                 OBD_FREE_PTR(msl);
1856                                 GOTO(out, rc);
1857                         }
1858
1859                         if (mdt_object_remote(lnkp)) {
1860                                 struct ldlm_lock *lock;
1861
1862                                 /*
1863                                  * for remote object, set lock cb_atomic,
1864                                  * so lock can be released in blocking_ast()
1865                                  * immediately, then the next lock_try will
1866                                  * have better chance of success.
1867                                  */
1868                                 lock = ldlm_handle2lock(
1869                                                 &msl->msl_lh.mlh_rreg_lh);
1870                                 LASSERT(lock != NULL);
1871                                 lock_res_and_lock(lock);
1872                                 ldlm_set_atomic_cb(lock);
1873                                 unlock_res_and_lock(lock);
1874                                 LDLM_LOCK_PUT(lock);
1875                         }
1876
1877                         mdt_object_unlock_put(info, lnkp, &msl->msl_lh, 1);
1878                         OBD_FREE_PTR(msl);
1879                         continue;
1880                 }
1881
1882                 INIT_LIST_HEAD(&msl->msl_linkage);
1883                 msl->msl_obj = lnkp;
1884                 list_add_tail(&msl->msl_linkage, link_locks);
1885
1886                 rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
1887         }
1888
1889         if (blocked)
1890                 GOTO(out, rc = -EBUSY);
1891
1892         EXIT;
1893 out:
1894         if (rc) {
1895                 mdt_unlock_list(info, link_locks, rc);
1896         } else if (local_lnkp_cnt > RS_MAX_LOCKS - 5) {
1897                 CDEBUG(D_INFO, "Too many links (%d), sync operations\n",
1898                        local_lnkp_cnt);
1899                 /*
1900                  * parent may have 3 local objects: master object and 2 stripes
1901                  * (if it's being migrated too); source may have 1 local objects
1902                  * as regular file; target has 1 local object.
1903                  * Note, source may have 2 local locks if it is directory but it
1904                  * can't have hardlinks, so it is not considered here.
1905                  */
1906                 rc = 1;
1907         }
1908         return rc;
1909 }
1910
1911 static int mdt_lock_remote_slaves(struct mdt_thread_info *info,
1912                                   struct mdt_object *obj,
1913                                   const struct md_attr *ma,
1914                                   struct list_head *slave_locks)
1915 {
1916         struct mdt_device *mdt = info->mti_mdt;
1917         const struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1918         struct lu_fid *fid = &info->mti_tmp_fid1;
1919         struct mdt_object *slave;
1920         struct mdt_sub_lock *msl;
1921         int i;
1922         int rc;
1923
1924         ENTRY;
1925         LASSERT(mdt_object_remote(obj));
1926         LASSERT(ma->ma_valid & MA_LMV);
1927         LASSERT(lmv);
1928
1929         if (!lmv_is_sane(lmv))
1930                 RETURN(-EINVAL);
1931
1932         for (i = 0; i < le32_to_cpu(lmv->lmv_stripe_count); i++) {
1933                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[i]);
1934
1935                 if (!fid_is_sane(fid))
1936                         continue;
1937
1938                 slave = mdt_object_find(info->mti_env, mdt, fid);
1939                 if (IS_ERR(slave))
1940                         GOTO(out, rc = PTR_ERR(slave));
1941
1942                 OBD_ALLOC_PTR(msl);
1943                 if (!msl) {
1944                         mdt_object_put(info->mti_env, slave);
1945                         GOTO(out, rc = -ENOMEM);
1946                 }
1947
1948                 mdt_lock_reg_init(&msl->msl_lh, LCK_EX);
1949                 rc = mdt_reint_object_lock(info, slave, &msl->msl_lh,
1950                                            MDS_INODELOCK_UPDATE, true);
1951                 if (rc) {
1952                         OBD_FREE_PTR(msl);
1953                         mdt_object_put(info->mti_env, slave);
1954                         GOTO(out, rc);
1955                 }
1956
1957                 INIT_LIST_HEAD(&msl->msl_linkage);
1958                 msl->msl_obj = slave;
1959                 list_add_tail(&msl->msl_linkage, slave_locks);
1960         }
1961         EXIT;
1962
1963 out:
1964         if (rc)
1965                 mdt_unlock_list(info, slave_locks, rc);
1966         return rc;
1967 }
1968
1969 /* lock parent and its stripes */
1970 static int mdt_migrate_parent_lock(struct mdt_thread_info *info,
1971                                    struct mdt_object *obj,
1972                                    const struct md_attr *ma,
1973                                    struct mdt_lock_handle *lh,
1974                                    struct ldlm_enqueue_info *einfo,
1975                                    struct list_head *slave_locks)
1976 {
1977         int rc;
1978
1979         if (mdt_object_remote(obj)) {
1980                 rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
1981                                             &lh->mlh_rreg_lh, LCK_PW,
1982                                             MDS_INODELOCK_UPDATE, false);
1983                 if (rc != ELDLM_OK)
1984                         return rc;
1985
1986                 /*
1987                  * if obj is remote and striped, lock its stripes explicitly
1988                  * because it's not striped in LOD layer on this MDT.
1989                  */
1990                 if (ma->ma_valid & MA_LMV) {
1991                         rc = mdt_lock_remote_slaves(info, obj, ma, slave_locks);
1992                         if (rc)
1993                                 mdt_object_unlock(info, obj, lh, rc);
1994                 }
1995         } else {
1996                 rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_UPDATE,
1997                                             einfo, true);
1998         }
1999
2000         return rc;
2001 }
2002
2003 /*
2004  * in migration, object may be remote, and we need take full lock of it and its
2005  * stripes if it's directory, besides, object may be a remote object on its
2006  * parent, revoke its LOOKUP lock on where its parent is located.
2007  */
2008 static int mdt_migrate_object_lock(struct mdt_thread_info *info,
2009                                    struct mdt_object *pobj,
2010                                    struct mdt_object *obj,
2011                                    struct mdt_lock_handle *lh,
2012                                    struct ldlm_enqueue_info *einfo,
2013                                    struct list_head *slave_locks)
2014 {
2015         int rc;
2016
2017         if (mdt_object_remote(obj)) {
2018                 rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
2019                 if (rc)
2020                         return rc;
2021
2022                 rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
2023                                             &lh->mlh_rreg_lh, LCK_EX,
2024                                             MDS_INODELOCK_FULL, false);
2025                 if (rc != ELDLM_OK)
2026                         return rc;
2027
2028                 /*
2029                  * if obj is remote and striped, lock its stripes explicitly
2030                  * because it's not striped in LOD layer on this MDT.
2031                  */
2032                 if (S_ISDIR(lu_object_attr(&obj->mot_obj))) {
2033                         struct md_attr *ma = &info->mti_attr;
2034
2035                         rc = mdt_stripe_get(info, obj, ma, XATTR_NAME_LMV);
2036                         if (rc) {
2037                                 mdt_object_unlock(info, obj, lh, rc);
2038                                 return rc;
2039                         }
2040
2041                         if (ma->ma_valid & MA_LMV) {
2042                                 rc = mdt_lock_remote_slaves(info, obj, ma,
2043                                                             slave_locks);
2044                                 if (rc)
2045                                         mdt_object_unlock(info, obj, lh, rc);
2046                         }
2047                 }
2048         } else {
2049                 if (mdt_object_remote(pobj)) {
2050                         rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
2051                         if (rc)
2052                                 return rc;
2053                 }
2054
2055                 rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_FULL,
2056                                             einfo, true);
2057         }
2058
2059         return rc;
2060 }
2061
2062 /*
2063  * lookup source by name, if parent is striped directory, we need to find the
2064  * corresponding stripe where source is located, and then lookup there.
2065  *
2066  * besides, if parent is migrating too, and file is already in target stripe,
2067  * this should be a redo of 'lfs migrate' on client side.
2068  */
2069 static int mdt_migrate_lookup(struct mdt_thread_info *info,
2070                               struct mdt_object *pobj,
2071                               const struct md_attr *ma,
2072                               const struct lu_name *lname,
2073                               struct mdt_object **spobj,
2074                               struct mdt_object **sobj)
2075 {
2076         const struct lu_env *env = info->mti_env;
2077         struct lu_fid *fid = &info->mti_tmp_fid1;
2078         struct mdt_object *stripe;
2079         int rc;
2080
2081         if (ma->ma_valid & MA_LMV) {
2082                 /* if parent is striped, lookup on corresponding stripe */
2083                 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
2084
2085                 if (!lmv_is_sane(lmv))
2086                         return -EBADF;
2087
2088                 rc = lmv_name_to_stripe_index_old(lmv, lname->ln_name,
2089                                                   lname->ln_namelen);
2090                 if (rc < 0)
2091                         return rc;
2092
2093                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
2094
2095                 stripe = mdt_object_find(env, info->mti_mdt, fid);
2096                 if (IS_ERR(stripe))
2097                         return PTR_ERR(stripe);
2098
2099                 fid_zero(fid);
2100                 rc = mdo_lookup(env, mdt_object_child(stripe), lname, fid,
2101                                 &info->mti_spec);
2102                 if (rc == -ENOENT && lmv_is_layout_changing(lmv)) {
2103                         /*
2104                          * if parent layout is changeing, and lookup child
2105                          * failed on source stripe, lookup again on target
2106                          * stripe, if it exists, it means previous migration
2107                          * was interrupted, and current file was migrated
2108                          * already.
2109                          */
2110                         mdt_object_put(env, stripe);
2111
2112                         rc = lmv_name_to_stripe_index(lmv, lname->ln_name,
2113                                                       lname->ln_namelen);
2114                         if (rc < 0)
2115                                 return rc;
2116
2117                         fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
2118
2119                         stripe = mdt_object_find(env, info->mti_mdt, fid);
2120                         if (IS_ERR(stripe))
2121                                 return PTR_ERR(stripe);
2122
2123                         fid_zero(fid);
2124                         rc = mdo_lookup(env, mdt_object_child(stripe), lname,
2125                                         fid, &info->mti_spec);
2126                         mdt_object_put(env, stripe);
2127                         return rc ?: -EALREADY;
2128                 } else if (rc) {
2129                         mdt_object_put(env, stripe);
2130                         return rc;
2131                 }
2132         } else {
2133                 fid_zero(fid);
2134                 rc = mdo_lookup(env, mdt_object_child(pobj), lname, fid,
2135                                 &info->mti_spec);
2136                 if (rc)
2137                         return rc;
2138
2139                 stripe = pobj;
2140                 mdt_object_get(env, stripe);
2141         }
2142
2143         *spobj = stripe;
2144
2145         *sobj = mdt_object_find(env, info->mti_mdt, fid);
2146         if (IS_ERR(*sobj)) {
2147                 mdt_object_put(env, stripe);
2148                 rc = PTR_ERR(*sobj);
2149                 *spobj = NULL;
2150                 *sobj = NULL;
2151         }
2152
2153         return rc;
2154 }
2155
2156 /* end lease and close file for regular file */
2157 static int mdd_migrate_close(struct mdt_thread_info *info,
2158                              struct mdt_object *obj)
2159 {
2160         struct close_data *data;
2161         struct mdt_body *repbody;
2162         struct ldlm_lock *lease;
2163         int rc;
2164         int rc2;
2165
2166         rc = -EPROTO;
2167         if (!req_capsule_field_present(info->mti_pill, &RMF_MDT_EPOCH,
2168                                       RCL_CLIENT) ||
2169             !req_capsule_field_present(info->mti_pill, &RMF_CLOSE_DATA,
2170                                       RCL_CLIENT))
2171                 goto close;
2172
2173         data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
2174         if (!data)
2175                 goto close;
2176
2177         rc = -ESTALE;
2178         lease = ldlm_handle2lock(&data->cd_handle);
2179         if (!lease)
2180                 goto close;
2181
2182         /* check if the lease was already canceled */
2183         lock_res_and_lock(lease);
2184         rc = ldlm_is_cancel(lease);
2185         unlock_res_and_lock(lease);
2186
2187         if (rc) {
2188                 rc = -EAGAIN;
2189                 LDLM_DEBUG(lease, DFID" lease broken",
2190                            PFID(mdt_object_fid(obj)));
2191         }
2192
2193         /*
2194          * cancel server side lease, client side counterpart should have been
2195          * cancelled, it's okay to cancel it now as we've held mot_open_sem.
2196          */
2197         ldlm_lock_cancel(lease);
2198         ldlm_reprocess_all(lease->l_resource,
2199                            lease->l_policy_data.l_inodebits.bits);
2200         LDLM_LOCK_PUT(lease);
2201
2202 close:
2203         rc2 = mdt_close_internal(info, mdt_info_req(info), NULL);
2204         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2205         repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
2206
2207         return rc ?: rc2;
2208 }
2209
2210 /*
2211  * migrate file in below steps:
2212  *  1. lock parent and its stripes
2213  *  2. lookup source by name
2214  *  3. lock parents of source links if source is not directory
2215  *  4. reject if source is in HSM
2216  *  5. take source open_sem and close file if source is regular file
2217  *  6. lock source and its stripes if it's directory
2218  *  7. lock target so subsequent change to it can trigger COS
2219  *  8. migrate file
2220  *  9. unlock above locks
2221  * 10. sync device if source has links
2222  */
2223 int mdt_reint_migrate(struct mdt_thread_info *info,
2224                       struct mdt_lock_handle *unused)
2225 {
2226         const struct lu_env *env = info->mti_env;
2227         struct mdt_device *mdt = info->mti_mdt;
2228         struct ptlrpc_request *req = mdt_info_req(info);
2229         struct mdt_reint_record *rr = &info->mti_rr;
2230         struct lu_ucred *uc = mdt_ucred(info);
2231         struct md_attr *ma = &info->mti_attr;
2232         struct ldlm_enqueue_info *peinfo = &info->mti_einfo[0];
2233         struct ldlm_enqueue_info *seinfo = &info->mti_einfo[1];
2234         struct mdt_object *pobj;
2235         struct mdt_object *spobj = NULL;
2236         struct mdt_object *sobj = NULL;
2237         struct mdt_object *tobj;
2238         struct lustre_handle rename_lh = { 0 };
2239         struct mdt_lock_handle *lhp;
2240         struct mdt_lock_handle *lhs;
2241         struct mdt_lock_handle *lht;
2242         LIST_HEAD(parent_slave_locks);
2243         LIST_HEAD(child_slave_locks);
2244         LIST_HEAD(link_locks);
2245         int lock_retries = 5;
2246         bool open_sem_locked = false;
2247         bool do_sync = false;
2248         int rc;
2249
2250         ENTRY;
2251         CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1),
2252                PNAME(&rr->rr_name), PFID(rr->rr_fid2));
2253
2254         if (info->mti_dlm_req)
2255                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2256
2257         if (!fid_is_md_operative(rr->rr_fid1) ||
2258             !fid_is_md_operative(rr->rr_fid2))
2259                 RETURN(-EPERM);
2260
2261         /* don't allow migrate . or .. */
2262         if (lu_name_is_dot_or_dotdot(&rr->rr_name))
2263                 RETURN(-EBUSY);
2264
2265         if (!mdt->mdt_enable_remote_dir || !mdt->mdt_enable_dir_migration)
2266                 RETURN(-EPERM);
2267
2268         if (uc && !cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
2269             uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
2270             mdt->mdt_enable_remote_dir_gid != -1)
2271                 RETURN(-EPERM);
2272
2273         /*
2274          * Note: do not enqueue rename lock for replay request, because
2275          * if other MDT holds rename lock, but being blocked to wait for
2276          * this MDT to finish its recovery, and the failover MDT can not
2277          * get rename lock, which will cause deadlock.
2278          *
2279          * req is NULL if this is called by directory auto-split.
2280          */
2281         if (req && !req_is_replay(req)) {
2282                 rc = mdt_rename_lock(info, &rename_lh);
2283                 if (rc != 0) {
2284                         CERROR("%s: can't lock FS for rename: rc = %d\n",
2285                                mdt_obd_name(info->mti_mdt), rc);
2286                         RETURN(rc);
2287                 }
2288         }
2289
2290         /* pobj is master object of parent */
2291         pobj = mdt_object_find(env, mdt, rr->rr_fid1);
2292         if (IS_ERR(pobj))
2293                 GOTO(unlock_rename, rc = PTR_ERR(pobj));
2294
2295         if (req) {
2296                 rc = mdt_version_get_check(info, pobj, 0);
2297                 if (rc)
2298                         GOTO(put_parent, rc);
2299         }
2300
2301         if (!mdt_object_exists(pobj))
2302                 GOTO(put_parent, rc = -ENOENT);
2303
2304         if (!S_ISDIR(lu_object_attr(&pobj->mot_obj)))
2305                 GOTO(put_parent, rc = -ENOTDIR);
2306
2307         rc = mdt_check_enc(info, pobj);
2308         if (rc)
2309                 GOTO(put_parent, rc);
2310
2311         rc = mdt_stripe_get(info, pobj, ma, XATTR_NAME_LMV);
2312         if (rc)
2313                 GOTO(put_parent, rc);
2314
2315 lock_parent:
2316         /* lock parent object */
2317         lhp = &info->mti_lh[MDT_LH_PARENT];
2318         mdt_lock_reg_init(lhp, LCK_PW);
2319         rc = mdt_migrate_parent_lock(info, pobj, ma, lhp, peinfo,
2320                                      &parent_slave_locks);
2321         if (rc)
2322                 GOTO(put_parent, rc);
2323
2324         /*
2325          * spobj is the corresponding stripe against name if pobj is striped
2326          * directory, which is the real parent, and no need to lock, because
2327          * we've taken full lock of pobj.
2328          */
2329         rc = mdt_migrate_lookup(info, pobj, ma, &rr->rr_name, &spobj, &sobj);
2330         if (rc)
2331                 GOTO(unlock_parent, rc);
2332
2333         /* lock parents of source links, and revoke LOOKUP lock of links */
2334         rc = mdt_link_parents_lock(info, pobj, ma, sobj, lhp, peinfo,
2335                                    &parent_slave_locks, &link_locks);
2336         if (rc == -EBUSY && lock_retries-- > 0) {
2337                 mdt_object_put(env, sobj);
2338                 mdt_object_put(env, spobj);
2339                 goto lock_parent;
2340         }
2341
2342         if (rc < 0)
2343                 GOTO(put_source, rc);
2344
2345         /*
2346          * RS_MAX_LOCKS is the limit of number of locks that can be saved along
2347          * with one request, if total lock count exceeds this limit, we will
2348          * drop all locks after migration, and synchronous device in the end.
2349          */
2350         do_sync = rc;
2351
2352         /* TODO: DoM migration is not supported, migrate dirent only */
2353         if (S_ISREG(lu_object_attr(&sobj->mot_obj))) {
2354                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LOV);
2355                 if (rc)
2356                         GOTO(unlock_links, rc);
2357
2358                 if (ma->ma_valid & MA_LOV && mdt_lmm_dom_stripesize(ma->ma_lmm))
2359                         info->mti_spec.sp_migrate_nsonly = 1;
2360         } else if (S_ISDIR(lu_object_attr(&sobj->mot_obj))) {
2361                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2362                 if (rc)
2363                         GOTO(unlock_links, rc);
2364
2365                 /* race with restripe/auto-split? */
2366                 if ((ma->ma_valid & MA_LMV) &&
2367                     lmv_is_restriping(&ma->ma_lmv->lmv_md_v1))
2368                         GOTO(unlock_links, rc = -EBUSY);
2369         }
2370
2371         /* if migration HSM is allowed */
2372         if (!mdt->mdt_opts.mo_migrate_hsm_allowed) {
2373                 ma->ma_need = MA_HSM;
2374                 ma->ma_valid = 0;
2375                 rc = mdt_attr_get_complex(info, sobj, ma);
2376                 if (rc)
2377                         GOTO(unlock_links, rc);
2378
2379                 if ((ma->ma_valid & MA_HSM) && ma->ma_hsm.mh_flags != 0)
2380                         GOTO(unlock_links, rc = -EOPNOTSUPP);
2381         }
2382
2383         /* end lease and close file for regular file */
2384         if (info->mti_spec.sp_migrate_close) {
2385                 /* try to hold open_sem so that nobody else can open the file */
2386                 if (!down_write_trylock(&sobj->mot_open_sem)) {
2387                         /* close anyway */
2388                         mdd_migrate_close(info, sobj);
2389                         GOTO(unlock_links, rc = -EBUSY);
2390                 } else {
2391                         open_sem_locked = true;
2392                         rc = mdd_migrate_close(info, sobj);
2393                         if (rc)
2394                                 GOTO(unlock_open_sem, rc);
2395                 }
2396         }
2397
2398         /* lock source */
2399         lhs = &info->mti_lh[MDT_LH_OLD];
2400         mdt_lock_reg_init(lhs, LCK_EX);
2401         rc = mdt_migrate_object_lock(info, spobj, sobj, lhs, seinfo,
2402                                      &child_slave_locks);
2403         if (rc)
2404                 GOTO(unlock_open_sem, rc);
2405
2406         /* lock target */
2407         tobj = mdt_object_find(env, mdt, rr->rr_fid2);
2408         if (IS_ERR(tobj))
2409                 GOTO(unlock_source, rc = PTR_ERR(tobj));
2410
2411         lht = &info->mti_lh[MDT_LH_NEW];
2412         mdt_lock_reg_init(lht, LCK_EX);
2413         rc = mdt_reint_object_lock(info, tobj, lht, MDS_INODELOCK_FULL, true);
2414         if (rc)
2415                 GOTO(put_target, rc);
2416
2417         /* Don't do lookup sanity check. We know name doesn't exist. */
2418         info->mti_spec.sp_cr_lookup = 0;
2419         info->mti_spec.sp_feat = &dt_directory_features;
2420
2421         rc = mdo_migrate(env, mdt_object_child(pobj),
2422                          mdt_object_child(sobj), &rr->rr_name,
2423                          mdt_object_child(tobj),
2424                          &info->mti_spec, ma);
2425         if (!rc)
2426                 lprocfs_counter_incr(mdt->mdt_lu_dev.ld_obd->obd_md_stats,
2427                                      LPROC_MDT_MIGRATE + LPROC_MD_LAST_OPC);
2428         EXIT;
2429
2430         mdt_object_unlock(info, tobj, lht, rc);
2431 put_target:
2432         mdt_object_put(env, tobj);
2433 unlock_source:
2434         mdt_migrate_object_unlock(info, sobj, lhs, seinfo,
2435                                   &child_slave_locks, rc);
2436 unlock_open_sem:
2437         if (open_sem_locked)
2438                 up_write(&sobj->mot_open_sem);
2439 unlock_links:
2440         /* if we've got too many locks to save into RPC,
2441          * then just commit before the locks are released
2442          */
2443         if (!rc && do_sync)
2444                 mdt_device_sync(env, mdt);
2445         mdt_unlock_list(info, &link_locks, do_sync ? 1 : rc);
2446 put_source:
2447         mdt_object_put(env, sobj);
2448         mdt_object_put(env, spobj);
2449 unlock_parent:
2450         mdt_migrate_object_unlock(info, pobj, lhp, peinfo,
2451                                   &parent_slave_locks, rc);
2452 put_parent:
2453         mdt_object_put(env, pobj);
2454 unlock_rename:
2455         if (lustre_handle_is_used(&rename_lh))
2456                 mdt_rename_unlock(&rename_lh);
2457
2458         return rc;
2459 }
2460
2461 static int mdt_object_lock_save(struct mdt_thread_info *info,
2462                                 struct mdt_object *dir,
2463                                 struct mdt_lock_handle *lh,
2464                                 int idx, bool cos_incompat)
2465 {
2466         int rc;
2467
2468         /* we lock the target dir if it is local */
2469         rc = mdt_reint_object_lock(info, dir, lh, MDS_INODELOCK_UPDATE,
2470                                    cos_incompat);
2471         if (rc != 0)
2472                 return rc;
2473
2474         /* get and save correct version after locking */
2475         mdt_version_get_save(info, dir, idx);
2476         return 0;
2477 }
2478
2479 /*
2480  * determine lock order of sobj and tobj
2481  *
2482  * there are two situations we need to lock tobj before sobj:
2483  * 1. sobj is child of tobj
2484  * 2. sobj and tobj are stripes of a directory, and stripe index of sobj is
2485  *    larger than that of tobj
2486  *
2487  * \retval      1 lock tobj before sobj
2488  * \retval      0 lock sobj before tobj
2489  * \retval      -ev negative errno upon error
2490  */
2491 static int mdt_rename_determine_lock_order(struct mdt_thread_info *info,
2492                                            struct mdt_object *sobj,
2493                                            struct mdt_object *tobj)
2494 {
2495         struct md_attr *ma = &info->mti_attr;
2496         struct lu_fid *spfid = &info->mti_tmp_fid1;
2497         struct lu_fid *tpfid = &info->mti_tmp_fid2;
2498         struct lmv_mds_md_v1 *lmv;
2499         __u32 sindex;
2500         __u32 tindex;
2501         int rc;
2502
2503         /* sobj and tobj are the same */
2504         if (sobj == tobj)
2505                 return 0;
2506
2507         if (fid_is_root(mdt_object_fid(sobj)))
2508                 return 0;
2509
2510         if (fid_is_root(mdt_object_fid(tobj)))
2511                 return 1;
2512
2513         /* check whether sobj is child of tobj */
2514         rc = mdo_is_subdir(info->mti_env, mdt_object_child(sobj),
2515                            mdt_object_fid(tobj));
2516         if (rc < 0)
2517                 return rc;
2518
2519         if (rc == 1)
2520                 return 1;
2521
2522         /* check whether sobj and tobj are children of the same parent */
2523         rc = mdt_attr_get_pfid(info, sobj, spfid);
2524         if (rc)
2525                 return rc;
2526
2527         rc = mdt_attr_get_pfid(info, tobj, tpfid);
2528         if (rc)
2529                 return rc;
2530
2531         if (!lu_fid_eq(spfid, tpfid))
2532                 return 0;
2533
2534         /* check whether sobj and tobj are sibling stripes */
2535         rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2536         if (rc)
2537                 return rc;
2538
2539         if (!(ma->ma_valid & MA_LMV))
2540                 return 0;
2541
2542         lmv = &ma->ma_lmv->lmv_md_v1;
2543         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2544                 return 0;
2545         sindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2546
2547         ma->ma_valid = 0;
2548         rc = mdt_stripe_get(info, tobj, ma, XATTR_NAME_LMV);
2549         if (rc)
2550                 return rc;
2551
2552         if (!(ma->ma_valid & MA_LMV))
2553                 return -ENODATA;
2554
2555         lmv = &ma->ma_lmv->lmv_md_v1;
2556         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2557                 return -EINVAL;
2558         tindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2559
2560         /* check stripe index of sobj and tobj */
2561         if (sindex == tindex)
2562                 return -EINVAL;
2563
2564         return sindex < tindex ? 0 : 1;
2565 }
2566
2567 /*
2568  * lock rename source object.
2569  *
2570  * Both source and source parent may be remote, and source may be a remote
2571  * object on source parent, to avoid overriding lock handle, store remote
2572  * LOOKUP lock separately in @lhr.
2573  *
2574  * \retval      0 on success
2575  * \retval      -ev negative errno upon error
2576  */
2577 static int mdt_rename_source_lock(struct mdt_thread_info *info,
2578                                   struct mdt_object *parent,
2579                                   struct mdt_object *child,
2580                                   struct mdt_lock_handle *lhc,
2581                                   struct mdt_lock_handle *lhr,
2582                                   __u64 ibits,
2583                                   bool cos_incompat)
2584 {
2585         int rc;
2586
2587         rc = mdt_is_remote_object(info, parent, child);
2588         if (rc < 0)
2589                 return rc;
2590
2591         if (rc) {
2592                 /* enqueue remote LOOKUP lock from the parent MDT */
2593                 __u64 rmt_ibits = MDS_INODELOCK_LOOKUP;
2594
2595                 if (mdt_object_remote(parent)) {
2596                         rc = mdt_remote_object_lock(info, parent,
2597                                                     mdt_object_fid(child),
2598                                                     &lhr->mlh_rreg_lh,
2599                                                     lhr->mlh_rreg_mode,
2600                                                     rmt_ibits, false);
2601                         if (rc != ELDLM_OK)
2602                                 return rc;
2603                 } else {
2604                         LASSERT(mdt_object_remote(child));
2605                         rc = mdt_object_local_lock(info, child, lhr,
2606                                                    &rmt_ibits, 0, true);
2607                         if (rc < 0)
2608                                 return rc;
2609                 }
2610
2611                 ibits &= ~MDS_INODELOCK_LOOKUP;
2612         }
2613
2614         if (mdt_object_remote(child)) {
2615                 rc = mdt_remote_object_lock(info, child, mdt_object_fid(child),
2616                                             &lhc->mlh_rreg_lh,
2617                                             lhc->mlh_rreg_mode,
2618                                             ibits, false);
2619                 if (rc == ELDLM_OK)
2620                         rc = 0;
2621         } else {
2622                 rc = mdt_reint_object_lock(info, child, lhc, ibits,
2623                                            cos_incompat);
2624         }
2625
2626         if (!rc)
2627                 mdt_object_unlock(info, child, lhr, rc);
2628
2629         return rc;
2630 }
2631
2632 /* Helper function for mdt_reint_rename so we don't need to opencode
2633  * two different order lockings
2634  */
2635 static int mdt_lock_two_dirs(struct mdt_thread_info *info,
2636                              struct mdt_object *mfirstdir,
2637                              struct mdt_lock_handle *lh_firstdirp,
2638                              struct mdt_object *mseconddir,
2639                              struct mdt_lock_handle *lh_seconddirp,
2640                              bool cos_incompat)
2641 {
2642         int rc;
2643
2644         rc = mdt_object_lock_save(info, mfirstdir, lh_firstdirp, 0,
2645                                   cos_incompat);
2646         if (rc)
2647                 return rc;
2648
2649         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
2650
2651         if (mfirstdir != mseconddir) {
2652                 rc = mdt_object_lock_save(info, mseconddir, lh_seconddirp, 1,
2653                                           cos_incompat);
2654         } else if (!mdt_object_remote(mseconddir) &&
2655                    lh_firstdirp->mlh_pdo_hash !=
2656                    lh_seconddirp->mlh_pdo_hash) {
2657                 rc = mdt_pdir_hash_lock(info, lh_seconddirp, mseconddir,
2658                                         MDS_INODELOCK_UPDATE,
2659                                         cos_incompat);
2660                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
2661         }
2662
2663         if (rc != 0)
2664                 mdt_object_unlock(info, mfirstdir, lh_firstdirp, rc);
2665
2666         return rc;
2667 }
2668
2669 /*
2670  * VBR: rename versions in reply: 0 - srcdir parent; 1 - tgtdir parent;
2671  * 2 - srcdir child; 3 - tgtdir child.
2672  * Update on disk version of srcdir child.
2673  */
2674 static int mdt_reint_rename(struct mdt_thread_info *info,
2675                             struct mdt_lock_handle *unused)
2676 {
2677         struct mdt_device *mdt = info->mti_mdt;
2678         struct mdt_reint_record *rr = &info->mti_rr;
2679         struct md_attr *ma = &info->mti_attr;
2680         struct ptlrpc_request *req = mdt_info_req(info);
2681         struct mdt_object *msrcdir = NULL;
2682         struct mdt_object *mtgtdir = NULL;
2683         struct mdt_object *mold;
2684         struct mdt_object *mnew = NULL;
2685         struct lustre_handle rename_lh = { 0 };
2686         struct mdt_lock_handle *lh_srcdirp;
2687         struct mdt_lock_handle *lh_tgtdirp;
2688         struct mdt_lock_handle *lh_oldp = NULL;
2689         struct mdt_lock_handle *lh_rmt = NULL;
2690         struct mdt_lock_handle *lh_newp = NULL;
2691         struct lu_fid *old_fid = &info->mti_tmp_fid1;
2692         struct lu_fid *new_fid = &info->mti_tmp_fid2;
2693         __u64 lock_ibits;
2694         bool reverse = false, discard = false;
2695         bool cos_incompat;
2696         ktime_t kstart = ktime_get();
2697         int rc;
2698
2699         ENTRY;
2700         DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME,
2701                   PFID(rr->rr_fid1), PNAME(&rr->rr_name),
2702                   PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name));
2703
2704         if (info->mti_dlm_req)
2705                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2706
2707         if (!fid_is_md_operative(rr->rr_fid1) ||
2708             !fid_is_md_operative(rr->rr_fid2))
2709                 RETURN(-EPERM);
2710
2711         /* find both parents. */
2712         msrcdir = mdt_parent_find_check(info, rr->rr_fid1, 0);
2713         if (IS_ERR(msrcdir))
2714                 RETURN(PTR_ERR(msrcdir));
2715
2716         rc = mdt_check_enc(info, msrcdir);
2717         if (rc)
2718                 GOTO(out_put_srcdir, rc);
2719
2720         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
2721
2722         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
2723                 mtgtdir = msrcdir;
2724                 mdt_object_get(info->mti_env, mtgtdir);
2725         } else {
2726                 mtgtdir = mdt_parent_find_check(info, rr->rr_fid2, 1);
2727                 if (IS_ERR(mtgtdir))
2728                         GOTO(out_put_srcdir, rc = PTR_ERR(mtgtdir));
2729         }
2730
2731         rc = mdt_check_enc(info, mtgtdir);
2732         if (rc)
2733                 GOTO(out_put_tgtdir, rc);
2734
2735         /*
2736          * Note: do not enqueue rename lock for replay request, because
2737          * if other MDT holds rename lock, but being blocked to wait for
2738          * this MDT to finish its recovery, and the failover MDT can not
2739          * get rename lock, which will cause deadlock.
2740          */
2741         if (!req_is_replay(req)) {
2742                 /*
2743                  * Normally rename RPC is handled on the MDT with the target
2744                  * directory (if target exists, it's on the MDT with the
2745                  * target), if the source directory is remote, it's a hint that
2746                  * source is remote too (this may not be true, but it won't
2747                  * cause any issue), return -EXDEV early to avoid taking
2748                  * rename_lock.
2749                  */
2750                 if (!mdt->mdt_enable_remote_rename &&
2751                     mdt_object_remote(msrcdir))
2752                         GOTO(out_put_tgtdir, rc = -EXDEV);
2753
2754                 /* This might be further relaxed in the future for regular file
2755                  * renames in different source and target parents. Start with
2756                  * only same-directory renames for simplicity and because this
2757                  * is by far the most the common use case.
2758                  */
2759                 if (msrcdir != mtgtdir) {
2760                         rc = mdt_rename_lock(info, &rename_lh);
2761                         if (rc != 0) {
2762                                 CERROR("%s: cannot lock for rename: rc = %d\n",
2763                                        mdt_obd_name(mdt), rc);
2764                                 GOTO(out_put_tgtdir, rc);
2765                         }
2766                 } else {
2767                         CDEBUG(D_INFO, "%s: samedir rename "DFID"/"DNAME"\n",
2768                                mdt_obd_name(mdt), PFID(rr->rr_fid1),
2769                                PNAME(&rr->rr_name));
2770                 }
2771         }
2772
2773         rc = mdt_rename_determine_lock_order(info, msrcdir, mtgtdir);
2774         if (rc < 0)
2775                 GOTO(out_unlock_rename, rc);
2776         reverse = rc;
2777
2778         /* source needs to be looked up after locking source parent, otherwise
2779          * this rename may race with unlink source, and cause rename hang, see
2780          * sanityn.sh 55b, so check parents first, if later we found source is
2781          * remote, relock parents.
2782          */
2783         cos_incompat = (mdt_object_remote(msrcdir) ||
2784                         mdt_object_remote(mtgtdir));
2785
2786         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2787
2788         /* lock parents in the proper order. */
2789         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
2790         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
2791
2792         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN);
2793         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN2);
2794 relock:
2795         mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name);
2796         mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name);
2797
2798         /* In case of same dir local rename we must sort by the hash,
2799          * otherwise a lock deadlock is possible when renaming
2800          * a to b and b to a at the same time LU-15285
2801          */
2802         if (!mdt_object_remote(mtgtdir) && mtgtdir == msrcdir)
2803                 reverse = lh_srcdirp->mlh_pdo_hash > lh_tgtdirp->mlh_pdo_hash;
2804         if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK)))
2805                 reverse = 0;
2806
2807         if (reverse)
2808                 rc = mdt_lock_two_dirs(info, mtgtdir, lh_tgtdirp, msrcdir,
2809                                        lh_srcdirp, cos_incompat);
2810         else
2811                 rc = mdt_lock_two_dirs(info, msrcdir, lh_srcdirp, mtgtdir,
2812                                        lh_tgtdirp, cos_incompat);
2813
2814         if (rc != 0)
2815                 GOTO(out_unlock_rename, rc);
2816
2817         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2818         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME2, 5);
2819
2820         /* find mold object. */
2821         fid_zero(old_fid);
2822         rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2);
2823         if (rc != 0)
2824                 GOTO(out_unlock_parents, rc);
2825
2826         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
2827                 GOTO(out_unlock_parents, rc = -EINVAL);
2828
2829         if (!fid_is_md_operative(old_fid))
2830                 GOTO(out_unlock_parents, rc = -EPERM);
2831
2832         mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
2833         if (IS_ERR(mold))
2834                 GOTO(out_unlock_parents, rc = PTR_ERR(mold));
2835
2836         if (!mdt_object_exists(mold)) {
2837                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2838                                 &mold->mot_obj,
2839                                 "object does not exist");
2840                 GOTO(out_put_old, rc = -ENOENT);
2841         }
2842
2843         if (mdt_object_remote(mold) && !mdt->mdt_enable_remote_rename)
2844                 GOTO(out_put_old, rc = -EXDEV);
2845
2846         /* Check if @mtgtdir is subdir of @mold, before locking child
2847          * to avoid reverse locking.
2848          */
2849         if (mtgtdir != msrcdir) {
2850                 rc = mdo_is_subdir(info->mti_env, mdt_object_child(mtgtdir),
2851                                    old_fid);
2852                 if (rc) {
2853                         if (rc == 1)
2854                                 rc = -EINVAL;
2855                         GOTO(out_put_old, rc);
2856                 }
2857         }
2858
2859         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold));
2860         /* save version after locking */
2861         mdt_version_get_save(info, mold, 2);
2862
2863         if (!cos_incompat && mdt_object_remote(mold)) {
2864                 cos_incompat = true;
2865                 mdt_object_put(info->mti_env, mold);
2866                 mdt_object_unlock(info, mtgtdir, lh_tgtdirp, -EAGAIN);
2867                 mdt_object_unlock(info, msrcdir, lh_srcdirp, -EAGAIN);
2868                 goto relock;
2869         }
2870
2871         /* find mnew object:
2872          * mnew target object may not exist now
2873          * lookup with version checking
2874          */
2875         fid_zero(new_fid);
2876         rc = mdt_lookup_version_check(info, mtgtdir, &rr->rr_tgt_name, new_fid,
2877                                       3);
2878         if (rc == 0) {
2879                 /* the new_fid should have been filled at this moment */
2880                 if (lu_fid_eq(old_fid, new_fid))
2881                         GOTO(out_put_old, rc);
2882
2883                 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
2884                     lu_fid_eq(new_fid, rr->rr_fid2))
2885                         GOTO(out_put_old, rc = -EINVAL);
2886
2887                 if (!fid_is_md_operative(new_fid))
2888                         GOTO(out_put_old, rc = -EPERM);
2889
2890                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
2891                 if (IS_ERR(mnew))
2892                         GOTO(out_put_old, rc = PTR_ERR(mnew));
2893
2894                 if (!mdt_object_exists(mnew)) {
2895                         LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2896                                         &mnew->mot_obj,
2897                                         "object does not exist");
2898                         GOTO(out_put_new, rc = -ENOENT);
2899                 }
2900
2901                 if (mdt_object_remote(mnew)) {
2902                         struct mdt_body  *repbody;
2903
2904                         /* Always send rename req to the target child MDT */
2905                         repbody = req_capsule_server_get(info->mti_pill,
2906                                                          &RMF_MDT_BODY);
2907                         LASSERT(repbody != NULL);
2908                         repbody->mbo_fid1 = *new_fid;
2909                         repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
2910                         GOTO(out_put_new, rc = -EXDEV);
2911                 }
2912                 /* Before locking the target dir, check we do not replace
2913                  * a dir with a non-dir, otherwise it may deadlock with
2914                  * link op which tries to create a link in this dir
2915                  * back to this non-dir.
2916                  */
2917                 if (S_ISDIR(lu_object_attr(&mnew->mot_obj)) &&
2918                     !S_ISDIR(lu_object_attr(&mold->mot_obj)))
2919                         GOTO(out_put_new, rc = -EISDIR);
2920
2921                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2922                 lh_rmt = &info->mti_lh[MDT_LH_RMT];
2923                 mdt_lock_reg_init(lh_oldp, LCK_EX);
2924                 mdt_lock_reg_init(lh_rmt, LCK_EX);
2925                 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
2926                 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2927                                             lh_rmt, lock_ibits, cos_incompat);
2928                 if (rc < 0)
2929                         GOTO(out_put_new, rc);
2930
2931                 /* Check if @msrcdir is subdir of @mnew, before locking child
2932                  * to avoid reverse locking.
2933                  */
2934                 if (mtgtdir != msrcdir) {
2935                         rc = mdo_is_subdir(info->mti_env,
2936                                            mdt_object_child(msrcdir), new_fid);
2937                         if (rc) {
2938                                 if (rc == 1)
2939                                         rc = -EINVAL;
2940                                 GOTO(out_unlock_old, rc);
2941                         }
2942                 }
2943
2944                 /* We used to acquire MDS_INODELOCK_FULL here but we
2945                  * can't do this now because a running HSM restore on
2946                  * the rename onto victim will hold the layout
2947                  * lock. See LU-4002.
2948                  */
2949
2950                 lh_newp = &info->mti_lh[MDT_LH_NEW];
2951                 mdt_lock_reg_init(lh_newp, LCK_EX);
2952                 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
2953                 if (mdt_object_remote(mtgtdir)) {
2954                         rc = mdt_remote_object_lock(info, mtgtdir,
2955                                                     mdt_object_fid(mnew),
2956                                                     &lh_newp->mlh_rreg_lh,
2957                                                     lh_newp->mlh_rreg_mode,
2958                                                     MDS_INODELOCK_LOOKUP,
2959                                                     false);
2960                         if (rc != ELDLM_OK)
2961                                 GOTO(out_unlock_old, rc);
2962
2963                         lock_ibits &= ~MDS_INODELOCK_LOOKUP;
2964                 }
2965                 rc = mdt_reint_object_lock(info, mnew, lh_newp, lock_ibits,
2966                                            cos_incompat);
2967                 if (rc != 0)
2968                         GOTO(out_unlock_new, rc);
2969
2970                 /* get and save version after locking */
2971                 mdt_version_get_save(info, mnew, 3);
2972         } else if (rc != -ENOENT) {
2973                 GOTO(out_put_old, rc);
2974         } else {
2975                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2976                 lh_rmt = &info->mti_lh[MDT_LH_RMT];
2977                 mdt_lock_reg_init(lh_oldp, LCK_EX);
2978                 mdt_lock_reg_init(lh_rmt, LCK_EX);
2979                 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
2980                 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2981                                             lh_rmt, lock_ibits, cos_incompat);
2982                 if (rc != 0)
2983                         GOTO(out_put_old, rc);
2984
2985                 mdt_enoent_version_save(info, 3);
2986         }
2987
2988         /* step 5: rename it */
2989         mdt_reint_init_ma(info, ma);
2990
2991         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
2992                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
2993
2994         if (mnew != NULL)
2995                 mutex_lock(&mnew->mot_lov_mutex);
2996
2997         rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
2998                         mdt_object_child(mtgtdir), old_fid, &rr->rr_name,
2999                         mnew != NULL ? mdt_object_child(mnew) : NULL,
3000                         &rr->rr_tgt_name, ma);
3001
3002         if (mnew != NULL)
3003                 mutex_unlock(&mnew->mot_lov_mutex);
3004
3005         /* handle last link of tgt object */
3006         if (rc == 0) {
3007                 mdt_counter_incr(req, LPROC_MDT_RENAME,
3008                                  ktime_us_delta(ktime_get(), kstart));
3009                 if (mnew) {
3010                         mdt_handle_last_unlink(info, mnew, ma);
3011                         discard = mdt_dom_check_for_discard(info, mnew);
3012                 }
3013                 mdt_rename_counter_tally(info, info->mti_mdt, req,
3014                                          msrcdir, mtgtdir,
3015                                          ktime_us_delta(ktime_get(), kstart));
3016         }
3017
3018         EXIT;
3019 out_unlock_new:
3020         if (mnew != NULL)
3021                 mdt_object_unlock(info, mnew, lh_newp, rc);
3022 out_unlock_old:
3023         mdt_object_unlock(info, NULL, lh_rmt, rc);
3024         mdt_object_unlock(info, mold, lh_oldp, rc);
3025 out_put_new:
3026         if (mnew && !discard)
3027                 mdt_object_put(info->mti_env, mnew);
3028 out_put_old:
3029         mdt_object_put(info->mti_env, mold);
3030 out_unlock_parents:
3031         mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
3032         mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
3033 out_unlock_rename:
3034         if (lustre_handle_is_used(&rename_lh))
3035                 mdt_rename_unlock(&rename_lh);
3036 out_put_tgtdir:
3037         mdt_object_put(info->mti_env, mtgtdir);
3038 out_put_srcdir:
3039         mdt_object_put(info->mti_env, msrcdir);
3040
3041         /* The DoM discard can be done right in the place above where it is
3042          * assigned, meanwhile it is done here after rename unlock due to
3043          * compatibility with old clients, for them the discard blocks
3044          * the main thread until completion. Check LU-11359 for details.
3045          */
3046         if (discard) {
3047                 mdt_dom_discard_data(info, mnew);
3048                 mdt_object_put(info->mti_env, mnew);
3049         }
3050         OBD_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
3051         return rc;
3052 }
3053
3054 static int mdt_reint_resync(struct mdt_thread_info *info,
3055                             struct mdt_lock_handle *lhc)
3056 {
3057         struct mdt_reint_record *rr = &info->mti_rr;
3058         struct ptlrpc_request *req = mdt_info_req(info);
3059         struct md_attr *ma = &info->mti_attr;
3060         struct mdt_object *mo;
3061         struct ldlm_lock *lease;
3062         struct mdt_body *repbody;
3063         struct md_layout_change layout = { .mlc_mirror_id = rr->rr_mirror_id };
3064         bool lease_broken;
3065         int rc, rc2;
3066
3067         ENTRY;
3068         DEBUG_REQ(D_INODE, req, DFID", FLR file resync", PFID(rr->rr_fid1));
3069
3070         if (info->mti_dlm_req)
3071                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
3072
3073         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
3074         if (IS_ERR(mo))
3075                 GOTO(out, rc = PTR_ERR(mo));
3076
3077         if (!mdt_object_exists(mo))
3078                 GOTO(out_obj, rc = -ENOENT);
3079
3080         if (!S_ISREG(lu_object_attr(&mo->mot_obj)))
3081                 GOTO(out_obj, rc = -EINVAL);
3082
3083         if (mdt_object_remote(mo))
3084                 GOTO(out_obj, rc = -EREMOTE);
3085
3086         lease = ldlm_handle2lock(rr->rr_lease_handle);
3087         if (lease == NULL)
3088                 GOTO(out_obj, rc = -ESTALE);
3089
3090         /* It's really necessary to grab open_sem and check if the lease lock
3091          * has been lost. There would exist a concurrent writer coming in and
3092          * generating some dirty data in memory cache, the writeback would fail
3093          * after the layout version is increased by MDS_REINT_RESYNC RPC.
3094          */
3095         if (!down_write_trylock(&mo->mot_open_sem))
3096                 GOTO(out_put_lease, rc = -EBUSY);
3097
3098         lock_res_and_lock(lease);
3099         lease_broken = ldlm_is_cancel(lease);
3100         unlock_res_and_lock(lease);
3101         if (lease_broken)
3102                 GOTO(out_unlock, rc = -EBUSY);
3103
3104         /* the file has yet opened by anyone else after we took the lease. */
3105         layout.mlc_opc = MD_LAYOUT_RESYNC;
3106         lhc = &info->mti_lh[MDT_LH_LOCAL];
3107         rc = mdt_layout_change(info, mo, lhc, &layout);
3108         if (rc)
3109                 GOTO(out_unlock, rc);
3110
3111         mdt_object_unlock(info, mo, lhc, 0);
3112
3113         ma->ma_need = MA_INODE;
3114         ma->ma_valid = 0;
3115         rc = mdt_attr_get_complex(info, mo, ma);
3116         if (rc != 0)
3117                 GOTO(out_unlock, rc);
3118
3119         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
3120         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
3121
3122         EXIT;
3123 out_unlock:
3124         up_write(&mo->mot_open_sem);
3125 out_put_lease:
3126         LDLM_LOCK_PUT(lease);
3127 out_obj:
3128         mdt_object_put(info->mti_env, mo);
3129 out:
3130         mdt_client_compatibility(info);
3131         rc2 = mdt_fix_reply(info);
3132         if (rc == 0)
3133                 rc = rc2;
3134         return rc;
3135 }
3136
3137 struct mdt_reinter {
3138         int (*mr_handler)(struct mdt_thread_info *, struct mdt_lock_handle *);
3139         enum lprocfs_extra_opc mr_extra_opc;
3140 };
3141
3142 static const struct mdt_reinter mdt_reinters[] = {
3143         [REINT_SETATTR] = {
3144                 .mr_handler = &mdt_reint_setattr,
3145                 .mr_extra_opc = MDS_REINT_SETATTR,
3146         },
3147         [REINT_CREATE] = {
3148                 .mr_handler = &mdt_reint_create,
3149                 .mr_extra_opc = MDS_REINT_CREATE,
3150         },
3151         [REINT_LINK] = {
3152                 .mr_handler = &mdt_reint_link,
3153                 .mr_extra_opc = MDS_REINT_LINK,
3154         },
3155         [REINT_UNLINK] = {
3156                 .mr_handler = &mdt_reint_unlink,
3157                 .mr_extra_opc = MDS_REINT_UNLINK,
3158         },
3159         [REINT_RENAME] = {
3160                 .mr_handler = &mdt_reint_rename,
3161                 .mr_extra_opc = MDS_REINT_RENAME,
3162         },
3163         [REINT_OPEN] = {
3164                 .mr_handler = &mdt_reint_open,
3165                 .mr_extra_opc = MDS_REINT_OPEN,
3166         },
3167         [REINT_SETXATTR] = {
3168                 .mr_handler = &mdt_reint_setxattr,
3169                 .mr_extra_opc = MDS_REINT_SETXATTR,
3170         },
3171         [REINT_RMENTRY] = {
3172                 .mr_handler = &mdt_reint_unlink,
3173                 .mr_extra_opc = MDS_REINT_UNLINK,
3174         },
3175         [REINT_MIGRATE] = {
3176                 .mr_handler = &mdt_reint_migrate,
3177                 .mr_extra_opc = MDS_REINT_RENAME,
3178         },
3179         [REINT_RESYNC] = {
3180                 .mr_handler = &mdt_reint_resync,
3181                 .mr_extra_opc = MDS_REINT_RESYNC,
3182         },
3183 };
3184
3185 int mdt_reint_rec(struct mdt_thread_info *info,
3186                   struct mdt_lock_handle *lhc)
3187 {
3188         const struct mdt_reinter *mr;
3189         int rc;
3190
3191         ENTRY;
3192         if (!(info->mti_rr.rr_opcode < ARRAY_SIZE(mdt_reinters)))
3193                 RETURN(-EPROTO);
3194
3195         mr = &mdt_reinters[info->mti_rr.rr_opcode];
3196         if (mr->mr_handler == NULL)
3197                 RETURN(-EPROTO);
3198
3199         rc = (*mr->mr_handler)(info, lhc);
3200
3201         lprocfs_counter_incr(ptlrpc_req2svc(mdt_info_req(info))->srv_stats,
3202                              PTLRPC_LAST_CNTR + mr->mr_extra_opc);
3203
3204         RETURN(rc);
3205 }