Whamcloud - gitweb
82a3bf1b2ce45343b8f268dd5f84c21ba302d71f
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mdt/mdt_reint.c
32  *
33  * Lustre Metadata Target (mdt) reintegration routines
34  *
35  * Author: Peter Braam <braam@clusterfs.com>
36  * Author: Andreas Dilger <adilger@clusterfs.com>
37  * Author: Phil Schwan <phil@clusterfs.com>
38  * Author: Huang Hua <huanghua@clusterfs.com>
39  * Author: Yury Umanets <umka@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_MDS
43
44 #include <lprocfs_status.h>
45 #include "mdt_internal.h"
46 #include <lustre_lmv.h>
47
48 static inline void mdt_reint_init_ma(struct mdt_thread_info *info,
49                                      struct md_attr *ma)
50 {
51         ma->ma_need = MA_INODE;
52         ma->ma_valid = 0;
53 }
54
55 /**
56  * Get version of object by fid.
57  *
58  * Return real version or ENOENT_VERSION if object doesn't exist
59  */
60 static void mdt_obj_version_get(struct mdt_thread_info *info,
61                                 struct mdt_object *o, __u64 *version)
62 {
63         LASSERT(o);
64
65         if (mdt_object_exists(o) && !mdt_object_remote(o) &&
66             !fid_is_obf(mdt_object_fid(o)))
67                 *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
68         else
69                 *version = ENOENT_VERSION;
70         CDEBUG(D_INODE, "FID "DFID" version is %#llx\n",
71                PFID(mdt_object_fid(o)), *version);
72 }
73
74 /**
75  * Check version is correct.
76  *
77  * Should be called only during replay.
78  */
79 static int mdt_version_check(struct ptlrpc_request *req,
80                              __u64 version, int idx)
81 {
82         __u64 *pre_ver = lustre_msg_get_versions(req->rq_reqmsg);
83
84         ENTRY;
85         if (!exp_connect_vbr(req->rq_export))
86                 RETURN(0);
87
88         LASSERT(req_is_replay(req));
89         /** VBR: version is checked always because costs nothing */
90         LASSERT(idx < PTLRPC_NUM_VERSIONS);
91         /** Sanity check for malformed buffers */
92         if (pre_ver == NULL) {
93                 CERROR("No versions in request buffer\n");
94                 spin_lock(&req->rq_export->exp_lock);
95                 req->rq_export->exp_vbr_failed = 1;
96                 spin_unlock(&req->rq_export->exp_lock);
97                 RETURN(-EOVERFLOW);
98         } else if (pre_ver[idx] != version) {
99                 CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n",
100                        pre_ver[idx], version);
101                 spin_lock(&req->rq_export->exp_lock);
102                 req->rq_export->exp_vbr_failed = 1;
103                 spin_unlock(&req->rq_export->exp_lock);
104                 RETURN(-EOVERFLOW);
105         }
106         RETURN(0);
107 }
108
109 /**
110  * Save pre-versions in reply.
111  */
112 static void mdt_version_save(struct ptlrpc_request *req, __u64 version,
113                              int idx)
114 {
115         __u64 *reply_ver;
116
117         if (!exp_connect_vbr(req->rq_export))
118                 return;
119
120         LASSERT(!req_is_replay(req));
121         LASSERT(req->rq_repmsg != NULL);
122         reply_ver = lustre_msg_get_versions(req->rq_repmsg);
123         if (reply_ver)
124                 reply_ver[idx] = version;
125 }
126
127 /**
128  * Save enoent version, it is needed when it is obvious that object doesn't
129  * exist, e.g. child during create.
130  */
131 static void mdt_enoent_version_save(struct mdt_thread_info *info, int idx)
132 {
133         /* save version of file name for replay, it must be ENOENT here */
134         if (!req_is_replay(mdt_info_req(info))) {
135                 info->mti_ver[idx] = ENOENT_VERSION;
136                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
137         }
138 }
139
140 /**
141  * Get version from disk and save in reply buffer.
142  *
143  * Versions are saved in reply only during normal operations not replays.
144  */
145 void mdt_version_get_save(struct mdt_thread_info *info,
146                           struct mdt_object *mto, int idx)
147 {
148         /* don't save versions during replay */
149         if (!req_is_replay(mdt_info_req(info))) {
150                 mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
151                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
152         }
153 }
154
155 /**
156  * Get version from disk and check it, no save in reply.
157  */
158 int mdt_version_get_check(struct mdt_thread_info *info,
159                           struct mdt_object *mto, int idx)
160 {
161         /* only check versions during replay */
162         if (!req_is_replay(mdt_info_req(info)))
163                 return 0;
164
165         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
166         return mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
167 }
168
169 /**
170  * Get version from disk and check if recovery or just save.
171  */
172 int mdt_version_get_check_save(struct mdt_thread_info *info,
173                                struct mdt_object *mto, int idx)
174 {
175         int rc = 0;
176
177         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
178         if (req_is_replay(mdt_info_req(info)))
179                 rc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx],
180                                        idx);
181         else
182                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
183         return rc;
184 }
185
186 /**
187  * Lookup with version checking.
188  *
189  * This checks version of 'name'. Many reint functions uses 'name' for child not
190  * FID, therefore we need to get object by name and check its version.
191  */
192 int mdt_lookup_version_check(struct mdt_thread_info *info,
193                              struct mdt_object *p,
194                              const struct lu_name *lname,
195                              struct lu_fid *fid, int idx)
196 {
197         int rc, vbrc;
198
199         rc = mdo_lookup(info->mti_env, mdt_object_child(p), lname, fid,
200                         &info->mti_spec);
201         /* Check version only during replay */
202         if (!req_is_replay(mdt_info_req(info)))
203                 return rc;
204
205         info->mti_ver[idx] = ENOENT_VERSION;
206         if (rc == 0) {
207                 struct mdt_object *child;
208
209                 child = mdt_object_find(info->mti_env, info->mti_mdt, fid);
210                 if (likely(!IS_ERR(child))) {
211                         mdt_obj_version_get(info, child, &info->mti_ver[idx]);
212                         mdt_object_put(info->mti_env, child);
213                 }
214         }
215         vbrc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
216         return vbrc ? vbrc : rc;
217
218 }
219
220 static int mdt_unlock_slaves(struct mdt_thread_info *mti,
221                              struct mdt_object *obj,
222                              struct ldlm_enqueue_info *einfo,
223                              int decref)
224 {
225         union ldlm_policy_data *policy = &mti->mti_policy;
226         struct mdt_lock_handle *lh = &mti->mti_lh[MDT_LH_LOCAL];
227         struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
228         int i;
229
230         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
231         LASSERT(slave_locks);
232
233         memset(policy, 0, sizeof(*policy));
234         policy->l_inodebits.bits = einfo->ei_inodebits;
235         mdt_lock_handle_init(lh);
236         mdt_lock_reg_init(lh, einfo->ei_mode);
237         for (i = 0; i < slave_locks->ha_count; i++) {
238                 if (test_bit(i, (void *)slave_locks->ha_map))
239                         lh->mlh_rreg_lh = slave_locks->ha_handles[i];
240                 else
241                         lh->mlh_reg_lh = slave_locks->ha_handles[i];
242                 mdt_object_unlock(mti, NULL, lh, decref);
243                 slave_locks->ha_handles[i].cookie = 0ull;
244         }
245
246         return mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
247                                 policy);
248 }
249
250 static inline int mdt_object_striped(struct mdt_thread_info *mti,
251                                      struct mdt_object *obj)
252 {
253         struct lu_device *bottom_dev;
254         struct lu_object *bottom_obj;
255         int rc;
256
257         if (!S_ISDIR(obj->mot_header.loh_attr))
258                 return 0;
259
260         /* getxattr from bottom obj to avoid reading in shard FIDs */
261         bottom_dev = dt2lu_dev(mti->mti_mdt->mdt_bottom);
262         bottom_obj = lu_object_find_slice(mti->mti_env, bottom_dev,
263                                           mdt_object_fid(obj), NULL);
264         if (IS_ERR(bottom_obj))
265                 return PTR_ERR(bottom_obj);
266
267         rc = dt_xattr_get(mti->mti_env, lu2dt(bottom_obj), &LU_BUF_NULL,
268                           XATTR_NAME_LMV);
269         lu_object_put(mti->mti_env, bottom_obj);
270
271         return (rc > 0) ? 1 : (rc == -ENODATA) ? 0 : rc;
272 }
273
274 /**
275  * Lock slave stripes if necessary, the lock handles of slave stripes
276  * will be stored in einfo->ei_cbdata.
277  **/
278 static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
279                            enum ldlm_mode mode, __u64 ibits,
280                            struct ldlm_enqueue_info *einfo)
281 {
282         union ldlm_policy_data *policy = &mti->mti_policy;
283
284         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
285
286         einfo->ei_type = LDLM_IBITS;
287         einfo->ei_mode = mode;
288         einfo->ei_cb_bl = mdt_remote_blocking_ast;
289         einfo->ei_cb_local_bl = mdt_blocking_ast;
290         einfo->ei_cb_cp = ldlm_completion_ast;
291         einfo->ei_enq_slave = 1;
292         einfo->ei_namespace = mti->mti_mdt->mdt_namespace;
293         einfo->ei_inodebits = ibits;
294         memset(policy, 0, sizeof(*policy));
295         policy->l_inodebits.bits = ibits;
296
297         return mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
298                               policy);
299 }
300
301 int mdt_reint_striped_lock(struct mdt_thread_info *info,
302                            struct mdt_object *o,
303                            struct mdt_lock_handle *lh,
304                            __u64 ibits,
305                            struct ldlm_enqueue_info *einfo,
306                            bool cos_incompat)
307 {
308         int rc;
309
310         LASSERT(!mdt_object_remote(o));
311
312         memset(einfo, 0, sizeof(*einfo));
313
314         rc = mdt_reint_object_lock(info, o, lh, ibits, cos_incompat);
315         if (rc)
316                 return rc;
317
318         rc = mdt_object_striped(info, o);
319         if (rc != 1) {
320                 if (rc < 0)
321                         mdt_object_unlock(info, o, lh, rc);
322                 return rc;
323         }
324
325         rc = mdt_lock_slaves(info, o, lh->mlh_reg_mode, ibits, einfo);
326         if (rc) {
327                 mdt_object_unlock(info, o, lh, rc);
328                 if (rc == -EIO && OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME))
329                         rc = 0;
330         }
331
332         return rc;
333 }
334
335 void mdt_reint_striped_unlock(struct mdt_thread_info *info,
336                               struct mdt_object *o,
337                               struct mdt_lock_handle *lh,
338                               struct ldlm_enqueue_info *einfo, int decref)
339 {
340         if (einfo->ei_cbdata)
341                 mdt_unlock_slaves(info, o, einfo, decref);
342         mdt_object_unlock(info, o, lh, decref);
343 }
344
345 static int mdt_restripe(struct mdt_thread_info *info,
346                         struct mdt_object *parent,
347                         const struct lu_name *lname,
348                         const struct lu_fid *tfid,
349                         struct md_op_spec *spec,
350                         struct md_attr *ma)
351 {
352         struct mdt_device *mdt = info->mti_mdt;
353         struct lu_fid *fid = &info->mti_tmp_fid2;
354         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
355         struct lmv_user_md *lum = spec->u.sp_ea.eadata;
356         struct lmv_mds_md_v1 *lmv;
357         struct mdt_object *child;
358         struct mdt_lock_handle *lhp;
359         struct mdt_lock_handle *lhc;
360         struct mdt_body *repbody;
361         int rc;
362
363         ENTRY;
364         if (!mdt->mdt_enable_dir_restripe)
365                 RETURN(-EPERM);
366
367         LASSERT(lum);
368         lum->lum_hash_type |= cpu_to_le32(LMV_HASH_FLAG_FIXED);
369
370         rc = mdt_version_get_check_save(info, parent, 0);
371         if (rc)
372                 RETURN(rc);
373
374         lhp = &info->mti_lh[MDT_LH_PARENT];
375         mdt_lock_pdo_init(lhp, LCK_PW, lname);
376         rc = mdt_reint_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
377                                    true);
378         if (rc)
379                 RETURN(rc);
380
381         rc = mdt_stripe_get(info, parent, ma, XATTR_NAME_LMV);
382         if (rc)
383                 GOTO(unlock_parent, rc);
384
385         if (ma->ma_valid & MA_LMV) {
386                 /* don't allow restripe if parent dir layout is changing */
387                 lmv = &ma->ma_lmv->lmv_md_v1;
388                 if (!lmv_is_sane2(lmv))
389                         GOTO(unlock_parent, rc = -EBADF);
390
391                 if (lmv_is_layout_changing(lmv))
392                         GOTO(unlock_parent, rc = -EBUSY);
393         }
394
395         fid_zero(fid);
396         rc = mdt_lookup_version_check(info, parent, lname, fid, 1);
397         if (rc)
398                 GOTO(unlock_parent, rc);
399
400         child = mdt_object_find(info->mti_env, mdt, fid);
401         if (IS_ERR(child))
402                 GOTO(unlock_parent, rc = PTR_ERR(child));
403
404         if (!mdt_object_exists(child))
405                 GOTO(out_child, rc = -ENOENT);
406
407         if (mdt_object_remote(child)) {
408                 struct mdt_body *repbody;
409
410                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
411                 if (!repbody)
412                         GOTO(out_child, rc = -EPROTO);
413
414                 repbody->mbo_fid1 = *fid;
415                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
416                 GOTO(out_child, rc = -EREMOTE);
417         }
418
419         if (!S_ISDIR(lu_object_attr(&child->mot_obj)))
420                 GOTO(out_child, rc = -ENOTDIR);
421
422         rc = mdt_stripe_get(info, child, ma, XATTR_NAME_LMV);
423         if (rc)
424                 GOTO(out_child, rc);
425
426         /* race with migrate? */
427         if ((ma->ma_valid & MA_LMV) &&
428              lmv_is_migrating(&ma->ma_lmv->lmv_md_v1))
429                 GOTO(out_child, rc = -EBUSY);
430
431         /* lock object */
432         lhc = &info->mti_lh[MDT_LH_CHILD];
433         mdt_lock_reg_init(lhc, LCK_EX);
434
435         /* enqueue object remote LOOKUP lock */
436         if (mdt_object_remote(parent)) {
437                 rc = mdt_remote_object_lock(info, parent, fid,
438                                             &lhc->mlh_rreg_lh,
439                                             lhc->mlh_rreg_mode,
440                                             MDS_INODELOCK_LOOKUP, false);
441                 if (rc != ELDLM_OK)
442                         GOTO(out_child, rc);
443         }
444
445         rc = mdt_reint_striped_lock(info, child, lhc, MDS_INODELOCK_FULL, einfo,
446                                     true);
447         if (rc)
448                 GOTO(unlock_child, rc);
449
450         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
451         rc = mdt_version_get_check_save(info, child, 1);
452         if (rc)
453                 GOTO(unlock_child, rc);
454
455         spin_lock(&mdt->mdt_restriper.mdr_lock);
456         if (child->mot_restriping) {
457                 /* race? */
458                 spin_unlock(&mdt->mdt_restriper.mdr_lock);
459                 GOTO(unlock_child, rc = -EBUSY);
460         }
461         child->mot_restriping = 1;
462         spin_unlock(&mdt->mdt_restriper.mdr_lock);
463
464         *fid = *tfid;
465         rc = mdt_restripe_internal(info, parent, child, lname, fid, spec, ma);
466         if (rc)
467                 GOTO(restriping_clear, rc);
468
469         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
470         if (!repbody)
471                 GOTO(restriping_clear, rc = -EPROTO);
472
473         mdt_pack_attr2body(info, repbody, &ma->ma_attr, fid);
474         EXIT;
475
476 restriping_clear:
477         child->mot_restriping = 0;
478 unlock_child:
479         mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
480 out_child:
481         mdt_object_put(info->mti_env, child);
482 unlock_parent:
483         mdt_object_unlock(info, parent, lhp, rc);
484
485         return rc;
486 }
487
488 /*
489  * VBR: we save three versions in reply:
490  * 0 - parent. Check that parent version is the same during replay.
491  * 1 - name. Version of 'name' if file exists with the same name or
492  * ENOENT_VERSION, it is needed because file may appear due to missed replays.
493  * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
494  * check.
495  */
496 static int mdt_create(struct mdt_thread_info *info)
497 {
498         struct mdt_device *mdt = info->mti_mdt;
499         struct mdt_object *parent;
500         struct mdt_object *child;
501         struct mdt_lock_handle *lh;
502         struct mdt_body *repbody;
503         struct md_attr *ma = &info->mti_attr;
504         struct mdt_reint_record *rr = &info->mti_rr;
505         struct md_op_spec *spec = &info->mti_spec;
506         bool restripe = false;
507         int rc;
508
509         ENTRY;
510         DEBUG_REQ(D_INODE, mdt_info_req(info),
511                   "Create ("DNAME"->"DFID") in "DFID,
512                   PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1));
513
514         if (!fid_is_md_operative(rr->rr_fid1))
515                 RETURN(-EPERM);
516
517         if (S_ISDIR(ma->ma_attr.la_mode) &&
518             spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
519                 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
520                 struct lu_ucred *uc = mdt_ucred(info);
521                 struct obd_export *exp = mdt_info_req(info)->rq_export;
522
523                 /* Only new clients can create remote dir( >= 2.4) and
524                  * striped dir(>= 2.6), old client will return -ENOTSUPP
525                  */
526                 if (!mdt_is_dne_client(exp))
527                         RETURN(-ENOTSUPP);
528
529                 if (le32_to_cpu(lum->lum_stripe_count) > 1) {
530                         if (!mdt_is_striped_client(exp))
531                                 RETURN(-ENOTSUPP);
532
533                         if (!mdt->mdt_enable_striped_dir)
534                                 RETURN(-EPERM);
535                 } else if (!mdt->mdt_enable_remote_dir) {
536                         RETURN(-EPERM);
537                 }
538
539                 if ((!(exp_connect_flags2(exp) & OBD_CONNECT2_CRUSH)) &&
540                     (le32_to_cpu(lum->lum_hash_type) & LMV_HASH_TYPE_MASK) ==
541                     LMV_HASH_TYPE_CRUSH)
542                         RETURN(-EPROTO);
543
544                 if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
545                     uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
546                     mdt->mdt_enable_remote_dir_gid != -1)
547                         RETURN(-EPERM);
548
549                 /* restripe if later found dir exists, MDS_OPEN_CREAT means
550                  * this is create only, don't try restripe.
551                  */
552                 if (mdt->mdt_enable_dir_restripe &&
553                     le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT &&
554                     !(spec->sp_cr_flags & MDS_OPEN_CREAT))
555                         restripe = true;
556         }
557
558         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
559
560         parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
561         if (IS_ERR(parent))
562                 RETURN(PTR_ERR(parent));
563
564         if (!mdt_object_exists(parent))
565                 GOTO(put_parent, rc = -ENOENT);
566
567         /*
568          * LU-10235: check if name exists locklessly first to avoid massive
569          * lock recalls on existing directories.
570          */
571         rc = mdt_lookup_version_check(info, parent, &rr->rr_name,
572                                       &info->mti_tmp_fid1, 1);
573         if (rc == 0) {
574                 if (!restripe)
575                         GOTO(put_parent, rc = -EEXIST);
576
577                 rc = mdt_restripe(info, parent, &rr->rr_name, rr->rr_fid2, spec,
578                                   ma);
579         }
580
581         /* -ENOENT is expected here */
582         if (rc != -ENOENT)
583                 GOTO(put_parent, rc);
584
585         /* save version of file name for replay, it must be ENOENT here */
586         mdt_enoent_version_save(info, 1);
587
588         OBD_RACE(OBD_FAIL_MDS_CREATE_RACE);
589
590         lh = &info->mti_lh[MDT_LH_PARENT];
591         mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
592         rc = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE);
593         if (rc)
594                 GOTO(put_parent, rc);
595
596         if (!mdt_object_remote(parent)) {
597                 rc = mdt_version_get_check_save(info, parent, 0);
598                 if (rc)
599                         GOTO(unlock_parent, rc);
600         }
601
602         child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
603         if (unlikely(IS_ERR(child)))
604                 GOTO(unlock_parent, rc = PTR_ERR(child));
605
606         ma->ma_need = MA_INODE;
607         ma->ma_valid = 0;
608
609         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
610                         OBD_FAIL_MDS_REINT_CREATE_WRITE);
611
612         /* Version of child will be updated on disk. */
613         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
614         rc = mdt_version_get_check_save(info, child, 2);
615         if (rc)
616                 GOTO(put_child, rc);
617
618         /*
619          * Do not perform lookup sanity check. We know that name does
620          * not exist.
621          */
622         info->mti_spec.sp_cr_lookup = 0;
623         info->mti_spec.sp_feat = &dt_directory_features;
624
625         rc = mdo_create(info->mti_env, mdt_object_child(parent), &rr->rr_name,
626                         mdt_object_child(child), &info->mti_spec, ma);
627         if (rc == 0)
628                 rc = mdt_attr_get_complex(info, child, ma);
629
630         if (rc < 0)
631                 GOTO(put_child, rc);
632
633         /*
634          * On DNE, we need to eliminate dependey between 'mkdir a' and
635          * 'mkdir a/b' if b is a striped directory, to achieve this, two
636          * things are done below:
637          * 1. save child and slaves lock.
638          * 2. if the child is a striped directory, relock parent so to
639          *    compare against with COS locks to ensure parent was
640          *    committed to disk.
641          */
642         if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
643                 struct mdt_lock_handle *lhc;
644                 struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
645                 bool cos_incompat;
646
647                 rc = mdt_object_striped(info, child);
648                 if (rc < 0)
649                         GOTO(put_child, rc);
650
651                 cos_incompat = rc;
652                 if (cos_incompat) {
653                         if (!mdt_object_remote(parent)) {
654                                 mdt_object_unlock(info, parent, lh, 1);
655                                 mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
656                                 rc = mdt_reint_object_lock(info, parent, lh,
657                                                            MDS_INODELOCK_UPDATE,
658                                                            true);
659                                 if (rc)
660                                         GOTO(put_child, rc);
661                         }
662                 }
663
664                 lhc = &info->mti_lh[MDT_LH_CHILD];
665                 mdt_lock_handle_init(lhc);
666                 mdt_lock_reg_init(lhc, LCK_PW);
667                 rc = mdt_reint_striped_lock(info, child, lhc,
668                                             MDS_INODELOCK_UPDATE, einfo,
669                                             cos_incompat);
670                 if (rc)
671                         GOTO(put_child, rc);
672
673                 mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
674         }
675
676         /* Return fid & attr to client. */
677         if (ma->ma_valid & MA_INODE)
678                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
679                                    mdt_object_fid(child));
680         EXIT;
681 put_child:
682         mdt_object_put(info->mti_env, child);
683 unlock_parent:
684         mdt_object_unlock(info, parent, lh, rc);
685 put_parent:
686         mdt_object_put(info->mti_env, parent);
687         return rc;
688 }
689
690 static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
691                         struct md_attr *ma)
692 {
693         struct mdt_lock_handle  *lh;
694         int do_vbr = ma->ma_attr.la_valid &
695                         (LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS);
696         __u64 lockpart = MDS_INODELOCK_UPDATE;
697         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
698         bool cos_incompat;
699         int rc;
700
701         ENTRY;
702         rc = mdt_object_striped(info, mo);
703         if (rc < 0)
704                 RETURN(rc);
705
706         cos_incompat = rc;
707
708         lh = &info->mti_lh[MDT_LH_PARENT];
709         mdt_lock_reg_init(lh, LCK_PW);
710
711         /* Even though the new MDT will grant PERM lock to the old
712          * client, but the old client will almost ignore that during
713          * So it needs to revoke both LOOKUP and PERM lock here, so
714          * both new and old client can cancel the dcache
715          */
716         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
717                 lockpart |= MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM;
718
719         rc = mdt_reint_striped_lock(info, mo, lh, lockpart, einfo,
720                                     cos_incompat);
721         if (rc != 0)
722                 RETURN(rc);
723
724         /* all attrs are packed into mti_attr in unpack_setattr */
725         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
726                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
727
728         /* VBR: update version if attr changed are important for recovery */
729         if (do_vbr) {
730                 /* update on-disk version of changed object */
731                 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mo));
732                 rc = mdt_version_get_check_save(info, mo, 0);
733                 if (rc)
734                         GOTO(out_unlock, rc);
735         }
736
737         /* Ensure constant striping during chown(). See LU-2789. */
738         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
739                 mutex_lock(&mo->mot_lov_mutex);
740
741         /* all attrs are packed into mti_attr in unpack_setattr */
742         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
743
744         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
745                 mutex_unlock(&mo->mot_lov_mutex);
746
747         if (rc != 0)
748                 GOTO(out_unlock, rc);
749         mdt_dom_obj_lvb_update(info->mti_env, mo, NULL, false);
750         EXIT;
751 out_unlock:
752         mdt_reint_striped_unlock(info, mo, lh, einfo, rc);
753         return rc;
754 }
755
756 /**
757  * Check HSM flags and add HS_DIRTY flag if relevant.
758  *
759  * A file could be set dirty only if it has a copy in the backend (HS_EXISTS)
760  * and is not RELEASED.
761  */
762 int mdt_add_dirty_flag(struct mdt_thread_info *info, struct mdt_object *mo,
763                         struct md_attr *ma)
764 {
765         struct lu_ucred *uc = mdt_ucred(info);
766         kernel_cap_t cap_saved;
767         int rc;
768
769         ENTRY;
770         /* If the file was modified, add the dirty flag */
771         ma->ma_need = MA_HSM;
772         rc = mdt_attr_get_complex(info, mo, ma);
773         if (rc) {
774                 CERROR("file attribute read error for "DFID": %d.\n",
775                         PFID(mdt_object_fid(mo)), rc);
776                 RETURN(rc);
777         }
778
779         /* If an up2date copy exists in the backend, add dirty flag */
780         if ((ma->ma_valid & MA_HSM) && (ma->ma_hsm.mh_flags & HS_EXISTS)
781             && !(ma->ma_hsm.mh_flags & (HS_DIRTY|HS_RELEASED))) {
782                 ma->ma_hsm.mh_flags |= HS_DIRTY;
783
784                 /* Bump cap so that closes from non-owner writers can
785                  * set the HSM state to dirty.
786                  */
787                 cap_saved = uc->uc_cap;
788                 cap_raise(uc->uc_cap, CAP_FOWNER);
789                 rc = mdt_hsm_attr_set(info, mo, &ma->ma_hsm);
790                 uc->uc_cap = cap_saved;
791                 if (rc)
792                         CERROR("file attribute change error for "DFID": %d\n",
793                                 PFID(mdt_object_fid(mo)), rc);
794         }
795
796         RETURN(rc);
797 }
798
799 static int mdt_reint_setattr(struct mdt_thread_info *info,
800                              struct mdt_lock_handle *lhc)
801 {
802         struct mdt_device *mdt = info->mti_mdt;
803         struct md_attr *ma = &info->mti_attr;
804         struct mdt_reint_record *rr = &info->mti_rr;
805         struct ptlrpc_request *req = mdt_info_req(info);
806         struct mdt_object *mo;
807         struct mdt_body *repbody;
808         ktime_t kstart = ktime_get();
809         int rc, rc2;
810
811         ENTRY;
812         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
813                   (unsigned int)ma->ma_attr.la_valid);
814
815         if (info->mti_dlm_req)
816                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
817
818         OBD_RACE(OBD_FAIL_PTLRPC_RESEND_RACE);
819
820         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
821         mo = mdt_object_find(info->mti_env, mdt, rr->rr_fid1);
822         if (IS_ERR(mo))
823                 GOTO(out, rc = PTR_ERR(mo));
824
825         if (!mdt_object_exists(mo))
826                 GOTO(out_put, rc = -ENOENT);
827
828         if (mdt_object_remote(mo))
829                 GOTO(out_put, rc = -EREMOTE);
830
831         ma->ma_enable_chprojid_gid = mdt->mdt_enable_chprojid_gid;
832         /* revoke lease lock if size is going to be changed */
833         if (unlikely(ma->ma_attr.la_valid & LA_SIZE &&
834                      !(ma->ma_attr_flags & MDS_TRUNC_KEEP_LEASE) &&
835                      atomic_read(&mo->mot_lease_count) > 0)) {
836                 down_read(&mo->mot_open_sem);
837
838                 if (atomic_read(&mo->mot_lease_count) > 0) { /* lease exists */
839                         lhc = &info->mti_lh[MDT_LH_LOCAL];
840                         mdt_lock_reg_init(lhc, LCK_CW);
841
842                         rc = mdt_object_lock(info, mo, lhc, MDS_INODELOCK_OPEN);
843                         if (rc != 0) {
844                                 up_read(&mo->mot_open_sem);
845                                 GOTO(out_put, rc);
846                         }
847
848                         /* revoke lease lock */
849                         mdt_object_unlock(info, mo, lhc, 1);
850                 }
851                 up_read(&mo->mot_open_sem);
852         }
853
854         if (ma->ma_attr.la_valid & LA_SIZE || rr->rr_flags & MRF_OPEN_TRUNC) {
855                 /* Check write access for the O_TRUNC case */
856                 if (mdt_write_read(mo) < 0)
857                         GOTO(out_put, rc = -ETXTBSY);
858
859                 /* LU-10286: compatibility check for FLR.
860                  * Please check the comment in mdt_finish_open() for details
861                  */
862                 if (!exp_connect_flr(info->mti_exp) ||
863                     !exp_connect_overstriping(info->mti_exp)) {
864                         rc = mdt_big_xattr_get(info, mo, XATTR_NAME_LOV);
865                         if (rc < 0 && rc != -ENODATA)
866                                 GOTO(out_put, rc);
867
868                         if (!exp_connect_flr(info->mti_exp)) {
869                                 if (rc > 0 &&
870                                     mdt_lmm_is_flr(info->mti_big_lmm))
871                                         GOTO(out_put, rc = -EOPNOTSUPP);
872                         }
873
874                         if (!exp_connect_overstriping(info->mti_exp)) {
875                                 if (rc > 0 &&
876                                     mdt_lmm_is_overstriping(info->mti_big_lmm))
877                                         GOTO(out_put, rc = -EOPNOTSUPP);
878                         }
879                 }
880
881                 /* For truncate, the file size sent from client
882                  * is believable, but the blocks are incorrect,
883                  * which makes the block size in LSOM attribute
884                  * inconsisent with the real block size.
885                  */
886                 rc = mdt_lsom_update(info, mo, true);
887                 if (rc)
888                         GOTO(out_put, rc);
889         }
890
891         if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
892                 if (ma->ma_valid & MA_LOV)
893                         GOTO(out_put, rc = -EPROTO);
894
895                 /* MDT supports FMD for regular files due to Data-on-MDT */
896                 if (S_ISREG(lu_object_attr(&mo->mot_obj)) &&
897                     ma->ma_attr.la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) {
898                         tgt_fmd_update(info->mti_exp, mdt_object_fid(mo),
899                                        req->rq_xid);
900
901                         if (ma->ma_attr.la_valid & LA_MTIME) {
902                                 rc = mdt_attr_get_pfid(info, mo, &ma->ma_pfid);
903                                 if (!rc)
904                                         ma->ma_valid |= MA_PFID;
905                         }
906                 }
907
908                 rc = mdt_attr_set(info, mo, ma);
909                 if (rc)
910                         GOTO(out_put, rc);
911         } else if ((ma->ma_valid & (MA_LOV | MA_LMV)) &&
912                    (ma->ma_valid & MA_INODE)) {
913                 struct lu_buf *buf = &info->mti_buf;
914                 struct lu_ucred *uc = mdt_ucred(info);
915                 struct mdt_lock_handle *lh;
916                 const char *name;
917                 __u64 lockpart = MDS_INODELOCK_XATTR;
918
919                 /* reject if either remote or striped dir is disabled */
920                 if (ma->ma_valid & MA_LMV) {
921                         if (!mdt->mdt_enable_remote_dir ||
922                             !mdt->mdt_enable_striped_dir)
923                                 GOTO(out_put, rc = -EPERM);
924
925                         if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
926                             uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
927                             mdt->mdt_enable_remote_dir_gid != -1)
928                                 GOTO(out_put, rc = -EPERM);
929                 }
930
931                 if (!S_ISDIR(lu_object_attr(&mo->mot_obj)))
932                         GOTO(out_put, rc = -ENOTDIR);
933
934                 if (ma->ma_attr.la_valid != 0)
935                         GOTO(out_put, rc = -EPROTO);
936
937                 if (ma->ma_valid & MA_LOV) {
938                         buf->lb_buf = ma->ma_lmm;
939                         buf->lb_len = ma->ma_lmm_size;
940                         name = XATTR_NAME_LOV;
941                 } else {
942                         struct lmv_user_md *lmu = &ma->ma_lmv->lmv_user_md;
943
944                         buf->lb_buf = lmu;
945                         buf->lb_len = ma->ma_lmv_size;
946                         name = XATTR_NAME_DEFAULT_LMV;
947                         /* force client to update dir default layout */
948                         lockpart |= MDS_INODELOCK_LOOKUP;
949                 }
950
951                 lh = &info->mti_lh[MDT_LH_PARENT];
952                 mdt_lock_reg_init(lh, LCK_PW);
953
954                 rc = mdt_object_lock(info, mo, lh, lockpart);
955                 if (rc != 0)
956                         GOTO(out_put, rc);
957
958                 rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), buf,
959                                   name, 0);
960
961                 mdt_object_unlock(info, mo, lh, rc);
962                 if (rc)
963                         GOTO(out_put, rc);
964         } else {
965                 GOTO(out_put, rc = -EPROTO);
966         }
967
968         /* If file data is modified, add the dirty flag */
969         if (ma->ma_attr_flags & MDS_DATA_MODIFIED)
970                 rc = mdt_add_dirty_flag(info, mo, ma);
971
972         ma->ma_need = MA_INODE;
973         ma->ma_valid = 0;
974         rc = mdt_attr_get_complex(info, mo, ma);
975         if (rc != 0)
976                 GOTO(out_put, rc);
977
978         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
979
980         EXIT;
981 out_put:
982         mdt_object_put(info->mti_env, mo);
983 out:
984         if (rc == 0)
985                 mdt_counter_incr(req, LPROC_MDT_SETATTR,
986                                  ktime_us_delta(ktime_get(), kstart));
987
988         mdt_client_compatibility(info);
989         rc2 = mdt_fix_reply(info);
990         if (rc == 0)
991                 rc = rc2;
992         return rc;
993 }
994
995 static int mdt_reint_create(struct mdt_thread_info *info,
996                             struct mdt_lock_handle *lhc)
997 {
998         struct ptlrpc_request   *req = mdt_info_req(info);
999         ktime_t                 kstart = ktime_get();
1000         int                     rc;
1001
1002         ENTRY;
1003         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
1004                 RETURN(err_serious(-ESTALE));
1005
1006         if (info->mti_dlm_req)
1007                 ldlm_request_cancel(mdt_info_req(info),
1008                                     info->mti_dlm_req, 0, LATF_SKIP);
1009
1010         if (!lu_name_is_valid(&info->mti_rr.rr_name))
1011                 RETURN(-EPROTO);
1012
1013         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
1014         case S_IFDIR:
1015         case S_IFREG:
1016         case S_IFLNK:
1017         case S_IFCHR:
1018         case S_IFBLK:
1019         case S_IFIFO:
1020         case S_IFSOCK:
1021                 break;
1022         default:
1023                 CERROR("%s: Unsupported mode %o\n",
1024                        mdt_obd_name(info->mti_mdt),
1025                        info->mti_attr.ma_attr.la_mode);
1026                 RETURN(err_serious(-EOPNOTSUPP));
1027         }
1028
1029         rc = mdt_create(info);
1030         if (rc == 0) {
1031                 if ((info->mti_attr.ma_attr.la_mode & S_IFMT) == S_IFDIR)
1032                         mdt_counter_incr(req, LPROC_MDT_MKDIR,
1033                                          ktime_us_delta(ktime_get(), kstart));
1034                 else
1035                         /* Special file should stay on the same node as parent*/
1036                         mdt_counter_incr(req, LPROC_MDT_MKNOD,
1037                                          ktime_us_delta(ktime_get(), kstart));
1038         }
1039
1040         RETURN(rc);
1041 }
1042
1043 /*
1044  * VBR: save parent version in reply and child version getting by its name.
1045  * Version of child is getting and checking during its lookup. If
1046  */
1047 static int mdt_reint_unlink(struct mdt_thread_info *info,
1048                             struct mdt_lock_handle *lhc)
1049 {
1050         struct mdt_reint_record *rr = &info->mti_rr;
1051         struct ptlrpc_request *req = mdt_info_req(info);
1052         struct md_attr *ma = &info->mti_attr;
1053         struct lu_fid *child_fid = &info->mti_tmp_fid1;
1054         struct mdt_object *mp;
1055         struct mdt_object *mc;
1056         struct mdt_lock_handle *parent_lh;
1057         struct mdt_lock_handle *child_lh;
1058         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
1059         __u64 lock_ibits;
1060         bool cos_incompat = false;
1061         int no_name = 0;
1062         ktime_t kstart = ktime_get();
1063         int rc;
1064
1065         ENTRY;
1066         DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1),
1067                   PNAME(&rr->rr_name));
1068
1069         if (info->mti_dlm_req)
1070                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1071
1072         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
1073                 RETURN(err_serious(-ENOENT));
1074
1075         if (!fid_is_md_operative(rr->rr_fid1))
1076                 RETURN(-EPERM);
1077
1078         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1079         if (IS_ERR(mp))
1080                 RETURN(PTR_ERR(mp));
1081
1082         if (mdt_object_remote(mp)) {
1083                 cos_incompat = true;
1084         } else {
1085                 rc = mdt_version_get_check_save(info, mp, 0);
1086                 if (rc)
1087                         GOTO(put_parent, rc);
1088         }
1089
1090         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN);
1091         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN2);
1092 relock:
1093         parent_lh = &info->mti_lh[MDT_LH_PARENT];
1094         mdt_lock_pdo_init(parent_lh, LCK_PW, &rr->rr_name);
1095         rc = mdt_reint_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE,
1096                                    cos_incompat);
1097         if (rc != 0)
1098                 GOTO(put_parent, rc);
1099
1100         /* lookup child object along with version checking */
1101         fid_zero(child_fid);
1102         rc = mdt_lookup_version_check(info, mp, &rr->rr_name, child_fid, 1);
1103         if (rc != 0) {
1104                 /* Name might not be able to find during resend of
1105                  * remote unlink, considering following case.
1106                  * dir_A is a remote directory, the name entry of
1107                  * dir_A is on MDT0, the directory is on MDT1,
1108                  *
1109                  * 1. client sends unlink req to MDT1.
1110                  * 2. MDT1 sends name delete update to MDT0.
1111                  * 3. name entry is being deleted in MDT0 synchronously.
1112                  * 4. MDT1 is restarted.
1113                  * 5. client resends unlink req to MDT1. So it can not
1114                  *    find the name entry on MDT0 anymore.
1115                  * In this case, MDT1 only needs to destory the local
1116                  * directory.
1117                  */
1118                 if (mdt_object_remote(mp) && rc == -ENOENT &&
1119                     !fid_is_zero(rr->rr_fid2) &&
1120                     lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
1121                         no_name = 1;
1122                         *child_fid = *rr->rr_fid2;
1123                 } else {
1124                         GOTO(unlock_parent, rc);
1125                 }
1126         }
1127
1128         if (!fid_is_md_operative(child_fid))
1129                 GOTO(unlock_parent, rc = -EPERM);
1130
1131         /* We will lock the child regardless it is local or remote. No harm. */
1132         mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
1133         if (IS_ERR(mc))
1134                 GOTO(unlock_parent, rc = PTR_ERR(mc));
1135
1136         if (!cos_incompat) {
1137                 rc = mdt_object_striped(info, mc);
1138                 if (rc < 0)
1139                         GOTO(put_child, rc);
1140
1141                 cos_incompat = rc;
1142                 if (cos_incompat) {
1143                         mdt_object_put(info->mti_env, mc);
1144                         mdt_object_unlock(info, mp, parent_lh, -EAGAIN);
1145                         goto relock;
1146                 }
1147         }
1148
1149         child_lh = &info->mti_lh[MDT_LH_CHILD];
1150         mdt_lock_reg_init(child_lh, LCK_EX);
1151         if (info->mti_spec.sp_rm_entry) {
1152                 struct lu_ucred *uc  = mdt_ucred(info);
1153
1154                 if (!mdt_is_dne_client(req->rq_export))
1155                         /* Return -ENOTSUPP for old client */
1156                         GOTO(put_child, rc = -ENOTSUPP);
1157
1158                 if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN))
1159                         GOTO(put_child, rc = -EPERM);
1160
1161                 ma->ma_need = MA_INODE;
1162                 ma->ma_valid = 0;
1163                 rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1164                                 NULL, &rr->rr_name, ma, no_name);
1165                 GOTO(put_child, rc);
1166         }
1167
1168         if (mdt_object_remote(mc)) {
1169                 struct mdt_body  *repbody;
1170
1171                 if (!fid_is_zero(rr->rr_fid2)) {
1172                         CDEBUG(D_INFO, "%s: name "DNAME" cannot find "DFID"\n",
1173                                mdt_obd_name(info->mti_mdt),
1174                                PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1175                         GOTO(put_child, rc = -ENOENT);
1176                 }
1177                 CDEBUG(D_INFO, "%s: name "DNAME": "DFID" is on another MDT\n",
1178                        mdt_obd_name(info->mti_mdt),
1179                        PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1180
1181                 if (!mdt_is_dne_client(req->rq_export))
1182                         /* Return -ENOTSUPP for old client */
1183                         GOTO(put_child, rc = -ENOTSUPP);
1184
1185                 /* Revoke the LOOKUP lock of the remote object granted by
1186                  * this MDT. Since the unlink will happen on another MDT,
1187                  * it will release the LOOKUP lock right away. Then What
1188                  * would happen if another client try to grab the LOOKUP
1189                  * lock at the same time with unlink XXX
1190                  */
1191                 mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_LOOKUP);
1192                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1193                 LASSERT(repbody != NULL);
1194                 repbody->mbo_fid1 = *mdt_object_fid(mc);
1195                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
1196                 GOTO(unlock_child, rc = -EREMOTE);
1197         }
1198         /* We used to acquire MDS_INODELOCK_FULL here but we can't do
1199          * this now because a running HSM restore on the child (unlink
1200          * victim) will hold the layout lock. See LU-4002.
1201          */
1202         lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
1203         if (mdt_object_remote(mp)) {
1204                 /* Enqueue lookup lock from parent MDT */
1205                 rc = mdt_remote_object_lock(info, mp, mdt_object_fid(mc),
1206                                             &child_lh->mlh_rreg_lh,
1207                                             child_lh->mlh_rreg_mode,
1208                                             MDS_INODELOCK_LOOKUP, false);
1209                 if (rc != ELDLM_OK)
1210                         GOTO(put_child, rc);
1211
1212                 lock_ibits &= ~MDS_INODELOCK_LOOKUP;
1213         }
1214
1215         rc = mdt_reint_striped_lock(info, mc, child_lh, lock_ibits, einfo,
1216                                     cos_incompat);
1217         if (rc != 0)
1218                 GOTO(put_child, rc);
1219
1220         /*
1221          * Now we can only make sure we need MA_INODE, in mdd layer, will check
1222          * whether need MA_LOV and MA_COOKIE.
1223          */
1224         ma->ma_need = MA_INODE;
1225         ma->ma_valid = 0;
1226
1227         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1228                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
1229         /* save version when object is locked */
1230         mdt_version_get_save(info, mc, 1);
1231
1232         mutex_lock(&mc->mot_lov_mutex);
1233
1234         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1235                         mdt_object_child(mc), &rr->rr_name, ma, no_name);
1236
1237         mutex_unlock(&mc->mot_lov_mutex);
1238         if (rc != 0)
1239                 GOTO(unlock_child, rc);
1240
1241         if (!lu_object_is_dying(&mc->mot_header)) {
1242                 rc = mdt_attr_get_complex(info, mc, ma);
1243                 if (rc)
1244                         GOTO(out_stat, rc);
1245         } else if (mdt_dom_check_for_discard(info, mc)) {
1246                 mdt_dom_discard_data(info, mc);
1247         }
1248         mdt_handle_last_unlink(info, mc, ma);
1249
1250 out_stat:
1251         if (ma->ma_valid & MA_INODE) {
1252                 switch (ma->ma_attr.la_mode & S_IFMT) {
1253                 case S_IFDIR:
1254                         mdt_counter_incr(req, LPROC_MDT_RMDIR,
1255                                          ktime_us_delta(ktime_get(), kstart));
1256                         break;
1257                 case S_IFREG:
1258                 case S_IFLNK:
1259                 case S_IFCHR:
1260                 case S_IFBLK:
1261                 case S_IFIFO:
1262                 case S_IFSOCK:
1263                         mdt_counter_incr(req, LPROC_MDT_UNLINK,
1264                                          ktime_us_delta(ktime_get(), kstart));
1265                         break;
1266                 default:
1267                         LASSERTF(0, "bad file type %o unlinking\n",
1268                                 ma->ma_attr.la_mode);
1269                 }
1270         }
1271
1272         EXIT;
1273
1274 unlock_child:
1275         mdt_reint_striped_unlock(info, mc, child_lh, einfo, rc);
1276 put_child:
1277         mdt_object_put(info->mti_env, mc);
1278 unlock_parent:
1279         mdt_object_unlock(info, mp, parent_lh, rc);
1280 put_parent:
1281         mdt_object_put(info->mti_env, mp);
1282         CFS_RACE_WAKEUP(OBD_FAIL_OBD_ZERO_NLINK_RACE);
1283         return rc;
1284 }
1285
1286 /*
1287  * VBR: save versions in reply: 0 - parent; 1 - child by fid; 2 - target by
1288  * name.
1289  */
1290 static int mdt_reint_link(struct mdt_thread_info *info,
1291                           struct mdt_lock_handle *lhc)
1292 {
1293         struct mdt_reint_record *rr = &info->mti_rr;
1294         struct ptlrpc_request   *req = mdt_info_req(info);
1295         struct md_attr          *ma = &info->mti_attr;
1296         struct mdt_object       *ms;
1297         struct mdt_object       *mp;
1298         struct mdt_lock_handle  *lhs;
1299         struct mdt_lock_handle  *lhp;
1300         ktime_t kstart = ktime_get();
1301         bool cos_incompat;
1302         int rc;
1303
1304         ENTRY;
1305         DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/"DNAME,
1306                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), PNAME(&rr->rr_name));
1307
1308         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
1309                 RETURN(err_serious(-ENOENT));
1310
1311         if (OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_RESEND_RACE) ||
1312             OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND)) {
1313                 req->rq_no_reply = 1;
1314                 RETURN(err_serious(-ENOENT));
1315         }
1316
1317         if (info->mti_dlm_req)
1318                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1319
1320         /* Invalid case so return error immediately instead of
1321          * processing it
1322          */
1323         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
1324                 RETURN(-EPERM);
1325
1326         if (!fid_is_md_operative(rr->rr_fid1) ||
1327             !fid_is_md_operative(rr->rr_fid2))
1328                 RETURN(-EPERM);
1329
1330         /* step 1: find target parent dir */
1331         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
1332         if (IS_ERR(mp))
1333                 RETURN(PTR_ERR(mp));
1334
1335         rc = mdt_version_get_check_save(info, mp, 0);
1336         if (rc)
1337                 GOTO(put_parent, rc);
1338
1339         /* step 2: find source */
1340         ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1341         if (IS_ERR(ms))
1342                 GOTO(put_parent, rc = PTR_ERR(ms));
1343
1344         if (!mdt_object_exists(ms)) {
1345                 CDEBUG(D_INFO, "%s: "DFID" does not exist.\n",
1346                        mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1));
1347                 GOTO(put_source, rc = -ENOENT);
1348         }
1349
1350         cos_incompat = (mdt_object_remote(mp) || mdt_object_remote(ms));
1351
1352         lhp = &info->mti_lh[MDT_LH_PARENT];
1353         mdt_lock_pdo_init(lhp, LCK_PW, &rr->rr_name);
1354         rc = mdt_reint_object_lock(info, mp, lhp, MDS_INODELOCK_UPDATE,
1355                                    cos_incompat);
1356         if (rc != 0)
1357                 GOTO(put_source, rc);
1358
1359         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
1360
1361         lhs = &info->mti_lh[MDT_LH_CHILD];
1362         mdt_lock_reg_init(lhs, LCK_EX);
1363         rc = mdt_reint_object_lock(info, ms, lhs,
1364                                    MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
1365                                    cos_incompat);
1366         if (rc != 0)
1367                 GOTO(unlock_parent, rc);
1368
1369         /* step 3: link it */
1370         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1371                         OBD_FAIL_MDS_REINT_LINK_WRITE);
1372
1373         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(ms));
1374         rc = mdt_version_get_check_save(info, ms, 1);
1375         if (rc)
1376                 GOTO(unlock_source, rc);
1377
1378         /** check target version by name during replay */
1379         rc = mdt_lookup_version_check(info, mp, &rr->rr_name,
1380                                       &info->mti_tmp_fid1, 2);
1381         if (rc != 0 && rc != -ENOENT)
1382                 GOTO(unlock_source, rc);
1383         /* save version of file name for replay, it must be ENOENT here */
1384         if (!req_is_replay(mdt_info_req(info))) {
1385                 if (rc != -ENOENT) {
1386                         CDEBUG(D_INFO, "link target "DNAME" existed!\n",
1387                                PNAME(&rr->rr_name));
1388                         GOTO(unlock_source, rc = -EEXIST);
1389                 }
1390                 info->mti_ver[2] = ENOENT_VERSION;
1391                 mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2);
1392         }
1393
1394         rc = mdo_link(info->mti_env, mdt_object_child(mp),
1395                       mdt_object_child(ms), &rr->rr_name, ma);
1396
1397         if (rc == 0)
1398                 mdt_counter_incr(req, LPROC_MDT_LINK,
1399                                  ktime_us_delta(ktime_get(), kstart));
1400
1401         EXIT;
1402 unlock_source:
1403         mdt_object_unlock(info, ms, lhs, rc);
1404 unlock_parent:
1405         mdt_object_unlock(info, mp, lhp, rc);
1406 put_source:
1407         mdt_object_put(info->mti_env, ms);
1408 put_parent:
1409         mdt_object_put(info->mti_env, mp);
1410         return rc;
1411 }
1412 /**
1413  * lock the part of the directory according to the hash of the name
1414  * (lh->mlh_pdo_hash) in parallel directory lock.
1415  */
1416 static int mdt_pdir_hash_lock(struct mdt_thread_info *info,
1417                               struct mdt_lock_handle *lh,
1418                               struct mdt_object *obj, __u64 ibits,
1419                               bool cos_incompat)
1420 {
1421         struct ldlm_res_id *res = &info->mti_res_id;
1422         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1423         union ldlm_policy_data *policy = &info->mti_policy;
1424         __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
1425         int rc;
1426
1427         /*
1428          * Finish res_id initializing by name hash marking part of
1429          * directory which is taking modification.
1430          */
1431         LASSERT(lh->mlh_pdo_hash != 0);
1432         fid_build_pdo_res_name(mdt_object_fid(obj), lh->mlh_pdo_hash, res);
1433         memset(policy, 0, sizeof(*policy));
1434         policy->l_inodebits.bits = ibits;
1435         if (cos_incompat &&
1436             (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX))
1437                 dlmflags |= LDLM_FL_COS_INCOMPAT;
1438         /*
1439          * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
1440          * going to be sent to client. If it is - mdt_intent_policy() path will
1441          * fix it up and turn FL_LOCAL flag off.
1442          */
1443         rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode,
1444                           policy, res, dlmflags,
1445                           &info->mti_exp->exp_handle.h_cookie);
1446         return rc;
1447 }
1448
1449 /**
1450  * Get BFL lock for rename or migrate process.
1451  **/
1452 static int mdt_rename_lock(struct mdt_thread_info *info,
1453                            struct lustre_handle *lh)
1454 {
1455         int     rc;
1456
1457         ENTRY;
1458         if (mdt_seq_site(info->mti_mdt)->ss_node_id != 0) {
1459                 struct lu_fid *fid = &info->mti_tmp_fid1;
1460                 struct mdt_object *obj;
1461
1462                 /* XXX, right now, it has to use object API to
1463                  * enqueue lock cross MDT, so it will enqueue
1464                  * rename lock(with LUSTRE_BFL_FID) by root object
1465                  */
1466                 lu_root_fid(fid);
1467                 obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1468                 if (IS_ERR(obj))
1469                         RETURN(PTR_ERR(obj));
1470
1471                 rc = mdt_remote_object_lock(info, obj,
1472                                             &LUSTRE_BFL_FID, lh,
1473                                             LCK_EX,
1474                                             MDS_INODELOCK_UPDATE, false);
1475                 mdt_object_put(info->mti_env, obj);
1476         } else {
1477                 struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1478                 union ldlm_policy_data *policy = &info->mti_policy;
1479                 struct ldlm_res_id *res_id = &info->mti_res_id;
1480                 __u64 flags = 0;
1481
1482                 fid_build_reg_res_name(&LUSTRE_BFL_FID, res_id);
1483                 memset(policy, 0, sizeof(*policy));
1484                 policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
1485                 flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
1486                 rc = ldlm_cli_enqueue_local(info->mti_env, ns, res_id,
1487                                             LDLM_IBITS, policy, LCK_EX, &flags,
1488                                             ldlm_blocking_ast,
1489                                             ldlm_completion_ast, NULL, NULL, 0,
1490                                             LVB_T_NONE,
1491                                             &info->mti_exp->exp_handle.h_cookie,
1492                                             lh);
1493                 RETURN(rc);
1494         }
1495         RETURN(rc);
1496 }
1497
1498 static void mdt_rename_unlock(struct lustre_handle *lh)
1499 {
1500         ENTRY;
1501         LASSERT(lustre_handle_is_used(lh));
1502         /* Cancel the single rename lock right away */
1503         ldlm_lock_decref_and_cancel(lh, LCK_EX);
1504         EXIT;
1505 }
1506
1507 static struct mdt_object *mdt_parent_find_check(struct mdt_thread_info *info,
1508                                                 const struct lu_fid *fid,
1509                                                 int idx)
1510 {
1511         struct mdt_object *dir;
1512         int rc;
1513
1514         ENTRY;
1515         dir = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1516         if (IS_ERR(dir))
1517                 RETURN(dir);
1518
1519         /* check early, the real version will be saved after locking */
1520         rc = mdt_version_get_check(info, dir, idx);
1521         if (rc)
1522                 GOTO(out_put, rc);
1523
1524         if (!mdt_object_exists(dir))
1525                 GOTO(out_put, rc = -ENOENT);
1526
1527         if (!S_ISDIR(lu_object_attr(&dir->mot_obj)))
1528                 GOTO(out_put, rc = -ENOTDIR);
1529
1530         RETURN(dir);
1531 out_put:
1532         mdt_object_put(info->mti_env, dir);
1533         return ERR_PTR(rc);
1534 }
1535
1536 /*
1537  * in case obj is remote obj on its parent, revoke LOOKUP lock,
1538  * herein we don't really check it, just do revoke.
1539  */
1540 int mdt_revoke_remote_lookup_lock(struct mdt_thread_info *info,
1541                                   struct mdt_object *pobj,
1542                                   struct mdt_object *obj)
1543 {
1544         struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
1545         int rc;
1546
1547         mdt_lock_handle_init(lh);
1548         mdt_lock_reg_init(lh, LCK_EX);
1549
1550         if (mdt_object_remote(pobj)) {
1551                 /* don't bother to check if pobj and obj are on the same MDT. */
1552                 rc = mdt_remote_object_lock(info, pobj, mdt_object_fid(obj),
1553                                             &lh->mlh_rreg_lh, LCK_EX,
1554                                             MDS_INODELOCK_LOOKUP, false);
1555         } else if (mdt_object_remote(obj)) {
1556                 struct ldlm_res_id *res = &info->mti_res_id;
1557                 union ldlm_policy_data *policy = &info->mti_policy;
1558                 __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB |
1559                                  LDLM_FL_COS_INCOMPAT;
1560
1561                 fid_build_reg_res_name(mdt_object_fid(obj), res);
1562                 memset(policy, 0, sizeof(*policy));
1563                 policy->l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1564                 rc = mdt_fid_lock(info->mti_env, info->mti_mdt->mdt_namespace,
1565                                   &lh->mlh_reg_lh, LCK_EX, policy, res,
1566                                   dlmflags, NULL);
1567         } else {
1568                 /* do nothing if both are local */
1569                 return 0;
1570         }
1571
1572         if (rc != ELDLM_OK)
1573                 return rc;
1574
1575         /*
1576          * TODO, currently we don't save this lock because there is no place to
1577          * hold this lock handle, but to avoid race we need to save this lock.
1578          */
1579         mdt_object_unlock(info, NULL, lh, 1);
1580
1581         return 0;
1582 }
1583
1584 /*
1585  * operation may takes locks of linkea, or directory stripes, group them in
1586  * different list.
1587  */
1588 struct mdt_sub_lock {
1589         struct mdt_object *msl_obj;
1590         struct mdt_lock_handle msl_lh;
1591         struct list_head msl_linkage;
1592 };
1593
1594 static void mdt_unlock_list(struct mdt_thread_info *info,
1595                             struct list_head *list, int decref)
1596 {
1597         struct mdt_sub_lock *msl;
1598         struct mdt_sub_lock *tmp;
1599
1600         list_for_each_entry_safe(msl, tmp, list, msl_linkage) {
1601                 mdt_object_unlock_put(info, msl->msl_obj, &msl->msl_lh, decref);
1602                 list_del(&msl->msl_linkage);
1603                 OBD_FREE_PTR(msl);
1604         }
1605 }
1606
1607 static inline void mdt_migrate_object_unlock(struct mdt_thread_info *info,
1608                                              struct mdt_object *obj,
1609                                              struct mdt_lock_handle *lh,
1610                                              struct ldlm_enqueue_info *einfo,
1611                                              struct list_head *slave_locks,
1612                                              int decref)
1613 {
1614         if (mdt_object_remote(obj)) {
1615                 mdt_unlock_list(info, slave_locks, decref);
1616                 mdt_object_unlock(info, obj, lh, decref);
1617         } else {
1618                 mdt_reint_striped_unlock(info, obj, lh, einfo, decref);
1619         }
1620 }
1621
1622 /*
1623  * lock parents of links, and also check whether total locks don't exceed
1624  * RS_MAX_LOCKS.
1625  *
1626  * \retval      0 on success, and locks can be saved in ptlrpc_reply_stat
1627  * \retval      1 on success, but total lock count may exceed RS_MAX_LOCKS
1628  * \retval      -ev negative errno upon error
1629  */
1630 static int mdt_link_parents_lock(struct mdt_thread_info *info,
1631                                  struct mdt_object *pobj,
1632                                  const struct md_attr *ma,
1633                                  struct mdt_object *obj,
1634                                  struct mdt_lock_handle *lhp,
1635                                  struct ldlm_enqueue_info *peinfo,
1636                                  struct list_head *parent_slave_locks,
1637                                  struct list_head *link_locks)
1638 {
1639         struct mdt_device *mdt = info->mti_mdt;
1640         struct lu_buf *buf = &info->mti_big_buf;
1641         struct lu_name *lname = &info->mti_name;
1642         struct linkea_data ldata = { NULL };
1643         bool blocked = false;
1644         int local_lnkp_cnt = 0;
1645         int rc;
1646
1647         ENTRY;
1648         if (S_ISDIR(lu_object_attr(&obj->mot_obj)))
1649                 RETURN(0);
1650
1651         buf = lu_buf_check_and_alloc(buf, MAX_LINKEA_SIZE);
1652         if (buf->lb_buf == NULL)
1653                 RETURN(-ENOMEM);
1654
1655         ldata.ld_buf = buf;
1656         rc = mdt_links_read(info, obj, &ldata);
1657         if (rc) {
1658                 if (rc == -ENOENT || rc == -ENODATA)
1659                         rc = 0;
1660                 RETURN(rc);
1661         }
1662
1663         for (linkea_first_entry(&ldata); ldata.ld_lee && !rc;
1664              linkea_next_entry(&ldata)) {
1665                 struct mdt_object *lnkp;
1666                 struct mdt_sub_lock *msl;
1667                 struct lu_fid fid;
1668                 __u64 ibits;
1669
1670                 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, lname,
1671                                     &fid);
1672
1673                 /* check if it's also linked to parent */
1674                 if (lu_fid_eq(mdt_object_fid(pobj), &fid)) {
1675                         CDEBUG(D_INFO, "skip parent "DFID", reovke "DNAME"\n",
1676                                PFID(&fid), PNAME(lname));
1677                         /* in case link is remote object, revoke LOOKUP lock */
1678                         rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
1679                         continue;
1680                 }
1681
1682                 lnkp = NULL;
1683
1684                 /* check if it's linked to a stripe of parent */
1685                 if (ma->ma_valid & MA_LMV) {
1686                         struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1687                         struct lu_fid *stripe_fid = &info->mti_tmp_fid1;
1688                         int j = 0;
1689
1690                         for (; j < le32_to_cpu(lmv->lmv_stripe_count); j++) {
1691                                 fid_le_to_cpu(stripe_fid,
1692                                               &lmv->lmv_stripe_fids[j]);
1693                                 if (lu_fid_eq(stripe_fid, &fid)) {
1694                                         CDEBUG(D_INFO, "skip stripe "DFID
1695                                                ", reovke "DNAME"\n",
1696                                                PFID(&fid), PNAME(lname));
1697                                         lnkp = mdt_object_find(info->mti_env,
1698                                                                mdt, &fid);
1699                                         if (IS_ERR(lnkp))
1700                                                 GOTO(out, rc = PTR_ERR(lnkp));
1701                                         break;
1702                                 }
1703                         }
1704
1705                         if (lnkp) {
1706                                 rc = mdt_revoke_remote_lookup_lock(info, lnkp,
1707                                                                    obj);
1708                                 mdt_object_put(info->mti_env, lnkp);
1709                                 continue;
1710                         }
1711                 }
1712
1713                 /* Check if it's already locked */
1714                 list_for_each_entry(msl, link_locks, msl_linkage) {
1715                         if (lu_fid_eq(mdt_object_fid(msl->msl_obj), &fid)) {
1716                                 CDEBUG(D_INFO,
1717                                        DFID" was locked, revoke "DNAME"\n",
1718                                        PFID(&fid), PNAME(lname));
1719                                 lnkp = msl->msl_obj;
1720                                 break;
1721                         }
1722                 }
1723
1724                 if (lnkp) {
1725                         rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
1726                         continue;
1727                 }
1728
1729                 CDEBUG(D_INFO, "lock "DFID":"DNAME"\n",
1730                        PFID(&fid), PNAME(lname));
1731
1732                 lnkp = mdt_object_find(info->mti_env, mdt, &fid);
1733                 if (IS_ERR(lnkp)) {
1734                         CWARN("%s: cannot find obj "DFID": %ld\n",
1735                               mdt_obd_name(mdt), PFID(&fid), PTR_ERR(lnkp));
1736                         continue;
1737                 }
1738
1739                 if (!mdt_object_exists(lnkp)) {
1740                         CDEBUG(D_INFO, DFID" doesn't exist, skip "DNAME"\n",
1741                               PFID(&fid), PNAME(lname));
1742                         mdt_object_put(info->mti_env, lnkp);
1743                         continue;
1744                 }
1745
1746                 if (!mdt_object_remote(lnkp))
1747                         local_lnkp_cnt++;
1748
1749                 OBD_ALLOC_PTR(msl);
1750                 if (msl == NULL)
1751                         GOTO(out, rc = -ENOMEM);
1752
1753                 /*
1754                  * we can't follow parent-child lock order like other MD
1755                  * operations, use lock_try here to avoid deadlock, if the lock
1756                  * cannot be taken, drop all locks taken, revoke the blocked
1757                  * one, and continue processing the remaining entries, and in
1758                  * the end of the loop restart from beginning.
1759                  */
1760                 mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
1761                 ibits = 0;
1762                 rc = mdt_object_lock_try(info, lnkp, &msl->msl_lh, &ibits,
1763                                          MDS_INODELOCK_UPDATE, true);
1764                 if (!(ibits & MDS_INODELOCK_UPDATE)) {
1765
1766                         CDEBUG(D_INFO, "busy lock on "DFID" "DNAME"\n",
1767                                PFID(&fid), PNAME(lname));
1768
1769                         mdt_unlock_list(info, link_locks, 1);
1770                         /* also unlock parent locks to avoid deadlock */
1771                         if (!blocked)
1772                                 mdt_migrate_object_unlock(info, pobj, lhp,
1773                                                           peinfo,
1774                                                           parent_slave_locks,
1775                                                           1);
1776
1777                         blocked = true;
1778
1779                         mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
1780                         rc = mdt_object_lock(info, lnkp, &msl->msl_lh,
1781                                              MDS_INODELOCK_UPDATE);
1782                         if (rc) {
1783                                 mdt_object_put(info->mti_env, lnkp);
1784                                 OBD_FREE_PTR(msl);
1785                                 GOTO(out, rc);
1786                         }
1787
1788                         if (mdt_object_remote(lnkp)) {
1789                                 struct ldlm_lock *lock;
1790
1791                                 /*
1792                                  * for remote object, set lock cb_atomic,
1793                                  * so lock can be released in blocking_ast()
1794                                  * immediately, then the next lock_try will
1795                                  * have better chance of success.
1796                                  */
1797                                 lock = ldlm_handle2lock(
1798                                                 &msl->msl_lh.mlh_rreg_lh);
1799                                 LASSERT(lock != NULL);
1800                                 lock_res_and_lock(lock);
1801                                 ldlm_set_atomic_cb(lock);
1802                                 unlock_res_and_lock(lock);
1803                                 LDLM_LOCK_PUT(lock);
1804                         }
1805
1806                         mdt_object_unlock_put(info, lnkp, &msl->msl_lh, 1);
1807                         OBD_FREE_PTR(msl);
1808                         continue;
1809                 }
1810
1811                 INIT_LIST_HEAD(&msl->msl_linkage);
1812                 msl->msl_obj = lnkp;
1813                 list_add_tail(&msl->msl_linkage, link_locks);
1814
1815                 rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
1816         }
1817
1818         if (blocked)
1819                 GOTO(out, rc = -EBUSY);
1820
1821         EXIT;
1822 out:
1823         if (rc) {
1824                 mdt_unlock_list(info, link_locks, rc);
1825         } else if (local_lnkp_cnt > RS_MAX_LOCKS - 5) {
1826                 CDEBUG(D_INFO, "Too many links (%d), sync operations\n",
1827                        local_lnkp_cnt);
1828                 /*
1829                  * parent may have 3 local objects: master object and 2 stripes
1830                  * (if it's being migrated too); source may have 1 local objects
1831                  * as regular file; target has 1 local object.
1832                  * Note, source may have 2 local locks if it is directory but it
1833                  * can't have hardlinks, so it is not considered here.
1834                  */
1835                 rc = 1;
1836         }
1837         return rc;
1838 }
1839
1840 static int mdt_lock_remote_slaves(struct mdt_thread_info *info,
1841                                   struct mdt_object *obj,
1842                                   const struct md_attr *ma,
1843                                   struct list_head *slave_locks)
1844 {
1845         struct mdt_device *mdt = info->mti_mdt;
1846         const struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1847         struct lu_fid *fid = &info->mti_tmp_fid1;
1848         struct mdt_object *slave;
1849         struct mdt_sub_lock *msl;
1850         int i;
1851         int rc;
1852
1853         ENTRY;
1854         LASSERT(mdt_object_remote(obj));
1855         LASSERT(ma->ma_valid & MA_LMV);
1856         LASSERT(lmv);
1857
1858         if (!lmv_is_sane(lmv))
1859                 RETURN(-EINVAL);
1860
1861         for (i = 0; i < le32_to_cpu(lmv->lmv_stripe_count); i++) {
1862                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[i]);
1863
1864                 if (!fid_is_sane(fid))
1865                         continue;
1866
1867                 slave = mdt_object_find(info->mti_env, mdt, fid);
1868                 if (IS_ERR(slave))
1869                         GOTO(out, rc = PTR_ERR(slave));
1870
1871                 OBD_ALLOC_PTR(msl);
1872                 if (!msl) {
1873                         mdt_object_put(info->mti_env, slave);
1874                         GOTO(out, rc = -ENOMEM);
1875                 }
1876
1877                 mdt_lock_reg_init(&msl->msl_lh, LCK_EX);
1878                 rc = mdt_reint_object_lock(info, slave, &msl->msl_lh,
1879                                            MDS_INODELOCK_UPDATE, true);
1880                 if (rc) {
1881                         OBD_FREE_PTR(msl);
1882                         mdt_object_put(info->mti_env, slave);
1883                         GOTO(out, rc);
1884                 }
1885
1886                 INIT_LIST_HEAD(&msl->msl_linkage);
1887                 msl->msl_obj = slave;
1888                 list_add_tail(&msl->msl_linkage, slave_locks);
1889         }
1890         EXIT;
1891
1892 out:
1893         if (rc)
1894                 mdt_unlock_list(info, slave_locks, rc);
1895         return rc;
1896 }
1897
1898 /* lock parent and its stripes */
1899 static int mdt_migrate_parent_lock(struct mdt_thread_info *info,
1900                                    struct mdt_object *obj,
1901                                    const struct md_attr *ma,
1902                                    struct mdt_lock_handle *lh,
1903                                    struct ldlm_enqueue_info *einfo,
1904                                    struct list_head *slave_locks)
1905 {
1906         int rc;
1907
1908         if (mdt_object_remote(obj)) {
1909                 rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
1910                                             &lh->mlh_rreg_lh, LCK_PW,
1911                                             MDS_INODELOCK_UPDATE, false);
1912                 if (rc != ELDLM_OK)
1913                         return rc;
1914
1915                 /*
1916                  * if obj is remote and striped, lock its stripes explicitly
1917                  * because it's not striped in LOD layer on this MDT.
1918                  */
1919                 if (ma->ma_valid & MA_LMV) {
1920                         rc = mdt_lock_remote_slaves(info, obj, ma, slave_locks);
1921                         if (rc)
1922                                 mdt_object_unlock(info, obj, lh, rc);
1923                 }
1924         } else {
1925                 rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_UPDATE,
1926                                             einfo, true);
1927         }
1928
1929         return rc;
1930 }
1931
1932 /*
1933  * in migration, object may be remote, and we need take full lock of it and its
1934  * stripes if it's directory, besides, object may be a remote object on its
1935  * parent, revoke its LOOKUP lock on where its parent is located.
1936  */
1937 static int mdt_migrate_object_lock(struct mdt_thread_info *info,
1938                                    struct mdt_object *pobj,
1939                                    struct mdt_object *obj,
1940                                    struct mdt_lock_handle *lh,
1941                                    struct ldlm_enqueue_info *einfo,
1942                                    struct list_head *slave_locks)
1943 {
1944         int rc;
1945
1946         if (mdt_object_remote(obj)) {
1947                 rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
1948                 if (rc)
1949                         return rc;
1950
1951                 rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
1952                                             &lh->mlh_rreg_lh, LCK_EX,
1953                                             MDS_INODELOCK_FULL, false);
1954                 if (rc != ELDLM_OK)
1955                         return rc;
1956
1957                 /*
1958                  * if obj is remote and striped, lock its stripes explicitly
1959                  * because it's not striped in LOD layer on this MDT.
1960                  */
1961                 if (S_ISDIR(lu_object_attr(&obj->mot_obj))) {
1962                         struct md_attr *ma = &info->mti_attr;
1963
1964                         rc = mdt_stripe_get(info, obj, ma, XATTR_NAME_LMV);
1965                         if (rc) {
1966                                 mdt_object_unlock(info, obj, lh, rc);
1967                                 return rc;
1968                         }
1969
1970                         if (ma->ma_valid & MA_LMV) {
1971                                 rc = mdt_lock_remote_slaves(info, obj, ma,
1972                                                             slave_locks);
1973                                 if (rc)
1974                                         mdt_object_unlock(info, obj, lh, rc);
1975                         }
1976                 }
1977         } else {
1978                 if (mdt_object_remote(pobj)) {
1979                         rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
1980                         if (rc)
1981                                 return rc;
1982                 }
1983
1984                 rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_FULL,
1985                                             einfo, true);
1986         }
1987
1988         return rc;
1989 }
1990
1991 /*
1992  * lookup source by name, if parent is striped directory, we need to find the
1993  * corresponding stripe where source is located, and then lookup there.
1994  *
1995  * besides, if parent is migrating too, and file is already in target stripe,
1996  * this should be a redo of 'lfs migrate' on client side.
1997  */
1998 static int mdt_migrate_lookup(struct mdt_thread_info *info,
1999                               struct mdt_object *pobj,
2000                               const struct md_attr *ma,
2001                               const struct lu_name *lname,
2002                               struct mdt_object **spobj,
2003                               struct mdt_object **sobj)
2004 {
2005         const struct lu_env *env = info->mti_env;
2006         struct lu_fid *fid = &info->mti_tmp_fid1;
2007         struct mdt_object *stripe;
2008         int rc;
2009
2010         if (ma->ma_valid & MA_LMV) {
2011                 /* if parent is striped, lookup on corresponding stripe */
2012                 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
2013
2014                 if (!lmv_is_sane(lmv))
2015                         return -EBADF;
2016
2017                 rc = lmv_name_to_stripe_index_old(lmv, lname->ln_name,
2018                                                   lname->ln_namelen);
2019                 if (rc < 0)
2020                         return rc;
2021
2022                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
2023
2024                 stripe = mdt_object_find(env, info->mti_mdt, fid);
2025                 if (IS_ERR(stripe))
2026                         return PTR_ERR(stripe);
2027
2028                 fid_zero(fid);
2029                 rc = mdo_lookup(env, mdt_object_child(stripe), lname, fid,
2030                                 &info->mti_spec);
2031                 if (rc == -ENOENT && lmv_is_layout_changing(lmv)) {
2032                         /*
2033                          * if parent layout is changeing, and lookup child
2034                          * failed on source stripe, lookup again on target
2035                          * stripe, if it exists, it means previous migration
2036                          * was interrupted, and current file was migrated
2037                          * already.
2038                          */
2039                         mdt_object_put(env, stripe);
2040
2041                         rc = lmv_name_to_stripe_index(lmv, lname->ln_name,
2042                                                       lname->ln_namelen);
2043                         if (rc < 0)
2044                                 return rc;
2045
2046                         fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
2047
2048                         stripe = mdt_object_find(env, info->mti_mdt, fid);
2049                         if (IS_ERR(stripe))
2050                                 return PTR_ERR(stripe);
2051
2052                         fid_zero(fid);
2053                         rc = mdo_lookup(env, mdt_object_child(stripe), lname,
2054                                         fid, &info->mti_spec);
2055                         mdt_object_put(env, stripe);
2056                         return rc ?: -EALREADY;
2057                 } else if (rc) {
2058                         mdt_object_put(env, stripe);
2059                         return rc;
2060                 }
2061         } else {
2062                 fid_zero(fid);
2063                 rc = mdo_lookup(env, mdt_object_child(pobj), lname, fid,
2064                                 &info->mti_spec);
2065                 if (rc)
2066                         return rc;
2067
2068                 stripe = pobj;
2069                 mdt_object_get(env, stripe);
2070         }
2071
2072         *spobj = stripe;
2073
2074         *sobj = mdt_object_find(env, info->mti_mdt, fid);
2075         if (IS_ERR(*sobj)) {
2076                 mdt_object_put(env, stripe);
2077                 rc = PTR_ERR(*sobj);
2078                 *spobj = NULL;
2079                 *sobj = NULL;
2080         }
2081
2082         return rc;
2083 }
2084
2085 /* end lease and close file for regular file */
2086 static int mdd_migrate_close(struct mdt_thread_info *info,
2087                              struct mdt_object *obj)
2088 {
2089         struct close_data *data;
2090         struct mdt_body *repbody;
2091         struct ldlm_lock *lease;
2092         int rc;
2093         int rc2;
2094
2095         rc = -EPROTO;
2096         if (!req_capsule_field_present(info->mti_pill, &RMF_MDT_EPOCH,
2097                                       RCL_CLIENT) ||
2098             !req_capsule_field_present(info->mti_pill, &RMF_CLOSE_DATA,
2099                                       RCL_CLIENT))
2100                 goto close;
2101
2102         data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
2103         if (!data)
2104                 goto close;
2105
2106         rc = -ESTALE;
2107         lease = ldlm_handle2lock(&data->cd_handle);
2108         if (!lease)
2109                 goto close;
2110
2111         /* check if the lease was already canceled */
2112         lock_res_and_lock(lease);
2113         rc = ldlm_is_cancel(lease);
2114         unlock_res_and_lock(lease);
2115
2116         if (rc) {
2117                 rc = -EAGAIN;
2118                 LDLM_DEBUG(lease, DFID" lease broken",
2119                            PFID(mdt_object_fid(obj)));
2120         }
2121
2122         /*
2123          * cancel server side lease, client side counterpart should have been
2124          * cancelled, it's okay to cancel it now as we've held mot_open_sem.
2125          */
2126         ldlm_lock_cancel(lease);
2127         ldlm_reprocess_all(lease->l_resource, lease);
2128         LDLM_LOCK_PUT(lease);
2129
2130 close:
2131         rc2 = mdt_close_internal(info, mdt_info_req(info), NULL);
2132         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2133         repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
2134
2135         return rc ?: rc2;
2136 }
2137
2138 /*
2139  * migrate file in below steps:
2140  *  1. lock parent and its stripes
2141  *  2. lookup source by name
2142  *  3. lock parents of source links if source is not directory
2143  *  4. reject if source is in HSM
2144  *  5. take source open_sem and close file if source is regular file
2145  *  6. lock source and its stripes if it's directory
2146  *  7. lock target so subsequent change to it can trigger COS
2147  *  8. migrate file
2148  *  9. unlock above locks
2149  * 10. sync device if source has links
2150  */
2151 int mdt_reint_migrate(struct mdt_thread_info *info,
2152                       struct mdt_lock_handle *unused)
2153 {
2154         const struct lu_env *env = info->mti_env;
2155         struct mdt_device *mdt = info->mti_mdt;
2156         struct ptlrpc_request *req = mdt_info_req(info);
2157         struct mdt_reint_record *rr = &info->mti_rr;
2158         struct lu_ucred *uc = mdt_ucred(info);
2159         struct md_attr *ma = &info->mti_attr;
2160         struct ldlm_enqueue_info *peinfo = &info->mti_einfo[0];
2161         struct ldlm_enqueue_info *seinfo = &info->mti_einfo[1];
2162         struct mdt_object *pobj;
2163         struct mdt_object *spobj = NULL;
2164         struct mdt_object *sobj = NULL;
2165         struct mdt_object *tobj;
2166         struct lustre_handle rename_lh = { 0 };
2167         struct mdt_lock_handle *lhp;
2168         struct mdt_lock_handle *lhs;
2169         struct mdt_lock_handle *lht;
2170         LIST_HEAD(parent_slave_locks);
2171         LIST_HEAD(child_slave_locks);
2172         LIST_HEAD(link_locks);
2173         int lock_retries = 5;
2174         bool open_sem_locked = false;
2175         bool do_sync = false;
2176         int rc;
2177
2178         ENTRY;
2179         CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1),
2180                PNAME(&rr->rr_name), PFID(rr->rr_fid2));
2181
2182         if (info->mti_dlm_req)
2183                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2184
2185         if (!fid_is_md_operative(rr->rr_fid1) ||
2186             !fid_is_md_operative(rr->rr_fid2))
2187                 RETURN(-EPERM);
2188
2189         /* don't allow migrate . or .. */
2190         if (lu_name_is_dot_or_dotdot(&rr->rr_name))
2191                 RETURN(-EBUSY);
2192
2193         if (!mdt->mdt_enable_remote_dir || !mdt->mdt_enable_dir_migration)
2194                 RETURN(-EPERM);
2195
2196         if (uc && !cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
2197             uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
2198             mdt->mdt_enable_remote_dir_gid != -1)
2199                 RETURN(-EPERM);
2200
2201         /*
2202          * Note: do not enqueue rename lock for replay request, because
2203          * if other MDT holds rename lock, but being blocked to wait for
2204          * this MDT to finish its recovery, and the failover MDT can not
2205          * get rename lock, which will cause deadlock.
2206          *
2207          * req is NULL if this is called by directory auto-split.
2208          */
2209         if (req && !req_is_replay(req)) {
2210                 rc = mdt_rename_lock(info, &rename_lh);
2211                 if (rc != 0) {
2212                         CERROR("%s: can't lock FS for rename: rc = %d\n",
2213                                mdt_obd_name(info->mti_mdt), rc);
2214                         RETURN(rc);
2215                 }
2216         }
2217
2218         /* pobj is master object of parent */
2219         pobj = mdt_object_find(env, mdt, rr->rr_fid1);
2220         if (IS_ERR(pobj))
2221                 GOTO(unlock_rename, rc = PTR_ERR(pobj));
2222
2223         if (req) {
2224                 rc = mdt_version_get_check(info, pobj, 0);
2225                 if (rc)
2226                         GOTO(put_parent, rc);
2227         }
2228
2229         if (!mdt_object_exists(pobj))
2230                 GOTO(put_parent, rc = -ENOENT);
2231
2232         if (!S_ISDIR(lu_object_attr(&pobj->mot_obj)))
2233                 GOTO(put_parent, rc = -ENOTDIR);
2234
2235         rc = mdt_stripe_get(info, pobj, ma, XATTR_NAME_LMV);
2236         if (rc)
2237                 GOTO(put_parent, rc);
2238
2239 lock_parent:
2240         /* lock parent object */
2241         lhp = &info->mti_lh[MDT_LH_PARENT];
2242         mdt_lock_reg_init(lhp, LCK_PW);
2243         rc = mdt_migrate_parent_lock(info, pobj, ma, lhp, peinfo,
2244                                      &parent_slave_locks);
2245         if (rc)
2246                 GOTO(put_parent, rc);
2247
2248         /*
2249          * spobj is the corresponding stripe against name if pobj is striped
2250          * directory, which is the real parent, and no need to lock, because
2251          * we've taken full lock of pobj.
2252          */
2253         rc = mdt_migrate_lookup(info, pobj, ma, &rr->rr_name, &spobj, &sobj);
2254         if (rc)
2255                 GOTO(unlock_parent, rc);
2256
2257         /* lock parents of source links, and revoke LOOKUP lock of links */
2258         rc = mdt_link_parents_lock(info, pobj, ma, sobj, lhp, peinfo,
2259                                    &parent_slave_locks, &link_locks);
2260         if (rc == -EBUSY && lock_retries-- > 0) {
2261                 mdt_object_put(env, sobj);
2262                 mdt_object_put(env, spobj);
2263                 goto lock_parent;
2264         }
2265
2266         if (rc < 0)
2267                 GOTO(put_source, rc);
2268
2269         /*
2270          * RS_MAX_LOCKS is the limit of number of locks that can be saved along
2271          * with one request, if total lock count exceeds this limit, we will
2272          * drop all locks after migration, and synchronous device in the end.
2273          */
2274         do_sync = rc;
2275
2276         /* TODO: DoM migration is not supported, migrate dirent only */
2277         if (S_ISREG(lu_object_attr(&sobj->mot_obj))) {
2278                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LOV);
2279                 if (rc)
2280                         GOTO(unlock_links, rc);
2281
2282                 if (ma->ma_valid & MA_LOV && mdt_lmm_dom_stripesize(ma->ma_lmm))
2283                         info->mti_spec.sp_migrate_nsonly = 1;
2284         } else if (S_ISDIR(lu_object_attr(&sobj->mot_obj))) {
2285                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2286                 if (rc)
2287                         GOTO(unlock_links, rc);
2288
2289                 /* race with restripe/auto-split? */
2290                 if ((ma->ma_valid & MA_LMV) &&
2291                     lmv_is_restriping(&ma->ma_lmv->lmv_md_v1))
2292                         GOTO(unlock_links, rc = -EBUSY);
2293         }
2294
2295         /* if migration HSM is allowed */
2296         if (!mdt->mdt_opts.mo_migrate_hsm_allowed) {
2297                 ma->ma_need = MA_HSM;
2298                 ma->ma_valid = 0;
2299                 rc = mdt_attr_get_complex(info, sobj, ma);
2300                 if (rc)
2301                         GOTO(unlock_links, rc);
2302
2303                 if ((ma->ma_valid & MA_HSM) && ma->ma_hsm.mh_flags != 0)
2304                         GOTO(unlock_links, rc = -EOPNOTSUPP);
2305         }
2306
2307         /* end lease and close file for regular file */
2308         if (info->mti_spec.sp_migrate_close) {
2309                 /* try to hold open_sem so that nobody else can open the file */
2310                 if (!down_write_trylock(&sobj->mot_open_sem)) {
2311                         /* close anyway */
2312                         mdd_migrate_close(info, sobj);
2313                         GOTO(unlock_links, rc = -EBUSY);
2314                 } else {
2315                         open_sem_locked = true;
2316                         rc = mdd_migrate_close(info, sobj);
2317                         if (rc)
2318                                 GOTO(unlock_open_sem, rc);
2319                 }
2320         }
2321
2322         /* lock source */
2323         lhs = &info->mti_lh[MDT_LH_OLD];
2324         mdt_lock_reg_init(lhs, LCK_EX);
2325         rc = mdt_migrate_object_lock(info, spobj, sobj, lhs, seinfo,
2326                                      &child_slave_locks);
2327         if (rc)
2328                 GOTO(unlock_open_sem, rc);
2329
2330         /* lock target */
2331         tobj = mdt_object_find(env, mdt, rr->rr_fid2);
2332         if (IS_ERR(tobj))
2333                 GOTO(unlock_source, rc = PTR_ERR(tobj));
2334
2335         lht = &info->mti_lh[MDT_LH_NEW];
2336         mdt_lock_reg_init(lht, LCK_EX);
2337         rc = mdt_reint_object_lock(info, tobj, lht, MDS_INODELOCK_FULL, true);
2338         if (rc)
2339                 GOTO(put_target, rc);
2340
2341         /* Don't do lookup sanity check. We know name doesn't exist. */
2342         info->mti_spec.sp_cr_lookup = 0;
2343         info->mti_spec.sp_feat = &dt_directory_features;
2344
2345         rc = mdo_migrate(env, mdt_object_child(pobj),
2346                          mdt_object_child(sobj), &rr->rr_name,
2347                          mdt_object_child(tobj),
2348                          &info->mti_spec, ma);
2349         if (!rc)
2350                 lprocfs_counter_incr(mdt->mdt_lu_dev.ld_obd->obd_md_stats,
2351                                      LPROC_MDT_MIGRATE + LPROC_MD_LAST_OPC);
2352         EXIT;
2353
2354         mdt_object_unlock(info, tobj, lht, rc);
2355 put_target:
2356         mdt_object_put(env, tobj);
2357 unlock_source:
2358         mdt_migrate_object_unlock(info, sobj, lhs, seinfo,
2359                                   &child_slave_locks, rc);
2360 unlock_open_sem:
2361         if (open_sem_locked)
2362                 up_write(&sobj->mot_open_sem);
2363 unlock_links:
2364         /* if we've got too many locks to save into RPC,
2365          * then just commit before the locks are released
2366          */
2367         if (!rc && do_sync)
2368                 mdt_device_sync(env, mdt);
2369         mdt_unlock_list(info, &link_locks, do_sync ? 1 : rc);
2370 put_source:
2371         mdt_object_put(env, sobj);
2372         mdt_object_put(env, spobj);
2373 unlock_parent:
2374         mdt_migrate_object_unlock(info, pobj, lhp, peinfo,
2375                                   &parent_slave_locks, rc);
2376 put_parent:
2377         mdt_object_put(env, pobj);
2378 unlock_rename:
2379         if (lustre_handle_is_used(&rename_lh))
2380                 mdt_rename_unlock(&rename_lh);
2381
2382         return rc;
2383 }
2384
2385 static int mdt_object_lock_save(struct mdt_thread_info *info,
2386                                 struct mdt_object *dir,
2387                                 struct mdt_lock_handle *lh,
2388                                 int idx, bool cos_incompat)
2389 {
2390         int rc;
2391
2392         /* we lock the target dir if it is local */
2393         rc = mdt_reint_object_lock(info, dir, lh, MDS_INODELOCK_UPDATE,
2394                                    cos_incompat);
2395         if (rc != 0)
2396                 return rc;
2397
2398         /* get and save correct version after locking */
2399         mdt_version_get_save(info, dir, idx);
2400         return 0;
2401 }
2402
2403 /*
2404  * determine lock order of sobj and tobj
2405  *
2406  * there are two situations we need to lock tobj before sobj:
2407  * 1. sobj is child of tobj
2408  * 2. sobj and tobj are stripes of a directory, and stripe index of sobj is
2409  *    larger than that of tobj
2410  *
2411  * \retval      1 lock tobj before sobj
2412  * \retval      0 lock sobj before tobj
2413  * \retval      -ev negative errno upon error
2414  */
2415 static int mdt_rename_determine_lock_order(struct mdt_thread_info *info,
2416                                            struct mdt_object *sobj,
2417                                            struct mdt_object *tobj)
2418 {
2419         struct md_attr *ma = &info->mti_attr;
2420         struct lu_fid *spfid = &info->mti_tmp_fid1;
2421         struct lu_fid *tpfid = &info->mti_tmp_fid2;
2422         struct lmv_mds_md_v1 *lmv;
2423         __u32 sindex;
2424         __u32 tindex;
2425         int rc;
2426
2427         /* sobj and tobj are the same */
2428         if (sobj == tobj)
2429                 return 0;
2430
2431         if (fid_is_root(mdt_object_fid(sobj)))
2432                 return 0;
2433
2434         if (fid_is_root(mdt_object_fid(tobj)))
2435                 return 1;
2436
2437         /* check whether sobj is child of tobj */
2438         rc = mdo_is_subdir(info->mti_env, mdt_object_child(sobj),
2439                            mdt_object_fid(tobj));
2440         if (rc < 0)
2441                 return rc;
2442
2443         if (rc == 1)
2444                 return 1;
2445
2446         /* check whether sobj and tobj are children of the same parent */
2447         rc = mdt_attr_get_pfid(info, sobj, spfid);
2448         if (rc)
2449                 return rc;
2450
2451         rc = mdt_attr_get_pfid(info, tobj, tpfid);
2452         if (rc)
2453                 return rc;
2454
2455         if (!lu_fid_eq(spfid, tpfid))
2456                 return 0;
2457
2458         /* check whether sobj and tobj are sibling stripes */
2459         rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2460         if (rc)
2461                 return rc;
2462
2463         if (!(ma->ma_valid & MA_LMV))
2464                 return 0;
2465
2466         lmv = &ma->ma_lmv->lmv_md_v1;
2467         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2468                 return 0;
2469         sindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2470
2471         ma->ma_valid = 0;
2472         rc = mdt_stripe_get(info, tobj, ma, XATTR_NAME_LMV);
2473         if (rc)
2474                 return rc;
2475
2476         if (!(ma->ma_valid & MA_LMV))
2477                 return -ENODATA;
2478
2479         lmv = &ma->ma_lmv->lmv_md_v1;
2480         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2481                 return -EINVAL;
2482         tindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2483
2484         /* check stripe index of sobj and tobj */
2485         if (sindex == tindex)
2486                 return -EINVAL;
2487
2488         return sindex < tindex ? 0 : 1;
2489 }
2490
2491 /*
2492  * lock rename source object.
2493  *
2494  * Both source and source parent may be remote, and source may be a remote
2495  * object on source parent, to avoid overriding lock handle, store remote
2496  * LOOKUP lock separately in @lhr.
2497  *
2498  * \retval      0 on success
2499  * \retval      -ev negative errno upon error
2500  */
2501 static int mdt_rename_source_lock(struct mdt_thread_info *info,
2502                                   struct mdt_object *parent,
2503                                   struct mdt_object *child,
2504                                   struct mdt_lock_handle *lhc,
2505                                   struct mdt_lock_handle *lhr,
2506                                   __u64 ibits,
2507                                   bool cos_incompat)
2508 {
2509         int rc;
2510
2511         rc = mdt_is_remote_object(info, parent, child);
2512         if (rc < 0)
2513                 return rc;
2514
2515         if (rc) {
2516                 /* enqueue remote LOOKUP lock from the parent MDT */
2517                 __u64 rmt_ibits = MDS_INODELOCK_LOOKUP;
2518
2519                 if (mdt_object_remote(parent)) {
2520                         rc = mdt_remote_object_lock(info, parent,
2521                                                     mdt_object_fid(child),
2522                                                     &lhr->mlh_rreg_lh,
2523                                                     lhr->mlh_rreg_mode,
2524                                                     rmt_ibits, false);
2525                         if (rc != ELDLM_OK)
2526                                 return rc;
2527                 } else {
2528                         LASSERT(mdt_object_remote(child));
2529                         rc = mdt_object_local_lock(info, child, lhr,
2530                                                    &rmt_ibits, 0, true);
2531                         if (rc < 0)
2532                                 return rc;
2533                 }
2534
2535                 ibits &= ~MDS_INODELOCK_LOOKUP;
2536         }
2537
2538         if (mdt_object_remote(child)) {
2539                 rc = mdt_remote_object_lock(info, child, mdt_object_fid(child),
2540                                             &lhc->mlh_rreg_lh,
2541                                             lhc->mlh_rreg_mode,
2542                                             ibits, false);
2543                 if (rc == ELDLM_OK)
2544                         rc = 0;
2545         } else {
2546                 rc = mdt_reint_object_lock(info, child, lhc, ibits,
2547                                            cos_incompat);
2548         }
2549
2550         if (!rc)
2551                 mdt_object_unlock(info, child, lhr, rc);
2552
2553         return rc;
2554 }
2555
2556 /*
2557  * VBR: rename versions in reply: 0 - srcdir parent; 1 - tgtdir parent;
2558  * 2 - srcdir child; 3 - tgtdir child.
2559  * Update on disk version of srcdir child.
2560  */
2561 static int mdt_reint_rename(struct mdt_thread_info *info,
2562                             struct mdt_lock_handle *unused)
2563 {
2564         struct mdt_device *mdt = info->mti_mdt;
2565         struct mdt_reint_record *rr = &info->mti_rr;
2566         struct md_attr *ma = &info->mti_attr;
2567         struct ptlrpc_request *req = mdt_info_req(info);
2568         struct mdt_object *msrcdir = NULL;
2569         struct mdt_object *mtgtdir = NULL;
2570         struct mdt_object *mold;
2571         struct mdt_object *mnew = NULL;
2572         struct lustre_handle rename_lh = { 0 };
2573         struct mdt_lock_handle *lh_srcdirp;
2574         struct mdt_lock_handle *lh_tgtdirp;
2575         struct mdt_lock_handle *lh_oldp = NULL;
2576         struct mdt_lock_handle *lh_rmt = NULL;
2577         struct mdt_lock_handle *lh_newp = NULL;
2578         struct lu_fid *old_fid = &info->mti_tmp_fid1;
2579         struct lu_fid *new_fid = &info->mti_tmp_fid2;
2580         __u64 lock_ibits;
2581         bool reverse = false, discard = false;
2582         bool cos_incompat;
2583         ktime_t kstart = ktime_get();
2584         int rc;
2585
2586         ENTRY;
2587         DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME,
2588                   PFID(rr->rr_fid1), PNAME(&rr->rr_name),
2589                   PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name));
2590
2591         if (info->mti_dlm_req)
2592                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2593
2594         if (!fid_is_md_operative(rr->rr_fid1) ||
2595             !fid_is_md_operative(rr->rr_fid2))
2596                 RETURN(-EPERM);
2597
2598         /* find both parents. */
2599         msrcdir = mdt_parent_find_check(info, rr->rr_fid1, 0);
2600         if (IS_ERR(msrcdir))
2601                 RETURN(PTR_ERR(msrcdir));
2602
2603         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
2604
2605         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
2606                 mtgtdir = msrcdir;
2607                 mdt_object_get(info->mti_env, mtgtdir);
2608         } else {
2609                 mtgtdir = mdt_parent_find_check(info, rr->rr_fid2, 1);
2610                 if (IS_ERR(mtgtdir))
2611                         GOTO(out_put_srcdir, rc = PTR_ERR(mtgtdir));
2612         }
2613
2614         /*
2615          * Note: do not enqueue rename lock for replay request, because
2616          * if other MDT holds rename lock, but being blocked to wait for
2617          * this MDT to finish its recovery, and the failover MDT can not
2618          * get rename lock, which will cause deadlock.
2619          */
2620         if (!req_is_replay(req)) {
2621                 /*
2622                  * Normally rename RPC is handled on the MDT with the target
2623                  * directory (if target exists, it's on the MDT with the
2624                  * target), if the source directory is remote, it's a hint that
2625                  * source is remote too (this may not be true, but it won't
2626                  * cause any issue), return -EXDEV early to avoid taking
2627                  * rename_lock.
2628                  */
2629                 if (!mdt->mdt_enable_remote_rename &&
2630                     mdt_object_remote(msrcdir))
2631                         GOTO(out_put_tgtdir, rc = -EXDEV);
2632
2633                 /* This might be further relaxed in the future for regular file
2634                  * renames in different source and target parents. Start with
2635                  * only same-directory renames for simplicity and because this
2636                  * is by far the most the common use case.
2637                  */
2638                 if (msrcdir != mtgtdir) {
2639                         rc = mdt_rename_lock(info, &rename_lh);
2640                         if (rc != 0) {
2641                                 CERROR("%s: cannot lock for rename: rc = %d\n",
2642                                        mdt_obd_name(mdt), rc);
2643                                 GOTO(out_put_tgtdir, rc);
2644                         }
2645                 } else {
2646                         CDEBUG(D_INFO, "%s: samedir rename "DFID"/"DNAME"\n",
2647                                mdt_obd_name(mdt), PFID(rr->rr_fid1),
2648                                PNAME(&rr->rr_name));
2649                 }
2650         }
2651
2652         rc = mdt_rename_determine_lock_order(info, msrcdir, mtgtdir);
2653         if (rc < 0)
2654                 GOTO(out_unlock_rename, rc);
2655
2656         reverse = rc;
2657
2658         /* source needs to be looked up after locking source parent, otherwise
2659          * this rename may race with unlink source, and cause rename hang, see
2660          * sanityn.sh 55b, so check parents first, if later we found source is
2661          * remote, relock parents.
2662          */
2663         cos_incompat = (mdt_object_remote(msrcdir) ||
2664                         mdt_object_remote(mtgtdir));
2665
2666         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2667
2668         /* lock parents in the proper order. */
2669         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
2670         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
2671
2672         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN);
2673         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN2);
2674 relock:
2675         mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name);
2676         mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name);
2677
2678         if (reverse) {
2679                 rc = mdt_object_lock_save(info, mtgtdir, lh_tgtdirp, 1,
2680                                           cos_incompat);
2681                 if (rc)
2682                         GOTO(out_unlock_rename, rc);
2683
2684                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
2685
2686                 rc = mdt_object_lock_save(info, msrcdir, lh_srcdirp, 0,
2687                                           cos_incompat);
2688                 if (rc != 0) {
2689                         mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
2690                         GOTO(out_unlock_rename, rc);
2691                 }
2692         } else {
2693                 rc = mdt_object_lock_save(info, msrcdir, lh_srcdirp, 0,
2694                                           cos_incompat);
2695                 if (rc)
2696                         GOTO(out_unlock_rename, rc);
2697
2698                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
2699
2700                 if (mtgtdir != msrcdir) {
2701                         rc = mdt_object_lock_save(info, mtgtdir, lh_tgtdirp, 1,
2702                                                   cos_incompat);
2703                 } else if (!mdt_object_remote(mtgtdir) &&
2704                            lh_srcdirp->mlh_pdo_hash !=
2705                            lh_tgtdirp->mlh_pdo_hash) {
2706                         rc = mdt_pdir_hash_lock(info, lh_tgtdirp, mtgtdir,
2707                                                 MDS_INODELOCK_UPDATE,
2708                                                 cos_incompat);
2709                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
2710                 }
2711                 if (rc != 0) {
2712                         mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
2713                         GOTO(out_unlock_rename, rc);
2714                 }
2715         }
2716
2717         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2718         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME2, 5);
2719
2720         /* find mold object. */
2721         fid_zero(old_fid);
2722         rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2);
2723         if (rc != 0)
2724                 GOTO(out_unlock_parents, rc);
2725
2726         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
2727                 GOTO(out_unlock_parents, rc = -EINVAL);
2728
2729         if (!fid_is_md_operative(old_fid))
2730                 GOTO(out_unlock_parents, rc = -EPERM);
2731
2732         mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
2733         if (IS_ERR(mold))
2734                 GOTO(out_unlock_parents, rc = PTR_ERR(mold));
2735
2736         if (!mdt_object_exists(mold)) {
2737                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2738                                 &mold->mot_obj,
2739                                 "object does not exist");
2740                 GOTO(out_put_old, rc = -ENOENT);
2741         }
2742
2743         if (mdt_object_remote(mold) && !mdt->mdt_enable_remote_rename)
2744                 GOTO(out_put_old, rc = -EXDEV);
2745
2746         /* Check if @mtgtdir is subdir of @mold, before locking child
2747          * to avoid reverse locking.
2748          */
2749         if (mtgtdir != msrcdir) {
2750                 rc = mdo_is_subdir(info->mti_env, mdt_object_child(mtgtdir),
2751                                    old_fid);
2752                 if (rc) {
2753                         if (rc == 1)
2754                                 rc = -EINVAL;
2755                         GOTO(out_put_old, rc);
2756                 }
2757         }
2758
2759         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold));
2760         /* save version after locking */
2761         mdt_version_get_save(info, mold, 2);
2762
2763         if (!cos_incompat && mdt_object_remote(mold)) {
2764                 cos_incompat = true;
2765                 mdt_object_put(info->mti_env, mold);
2766                 mdt_object_unlock(info, mtgtdir, lh_tgtdirp, -EAGAIN);
2767                 mdt_object_unlock(info, msrcdir, lh_srcdirp, -EAGAIN);
2768                 goto relock;
2769         }
2770
2771         /* find mnew object:
2772          * mnew target object may not exist now
2773          * lookup with version checking
2774          */
2775         fid_zero(new_fid);
2776         rc = mdt_lookup_version_check(info, mtgtdir, &rr->rr_tgt_name, new_fid,
2777                                       3);
2778         if (rc == 0) {
2779                 /* the new_fid should have been filled at this moment */
2780                 if (lu_fid_eq(old_fid, new_fid))
2781                         GOTO(out_put_old, rc);
2782
2783                 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
2784                     lu_fid_eq(new_fid, rr->rr_fid2))
2785                         GOTO(out_put_old, rc = -EINVAL);
2786
2787                 if (!fid_is_md_operative(new_fid))
2788                         GOTO(out_put_old, rc = -EPERM);
2789
2790                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
2791                 if (IS_ERR(mnew))
2792                         GOTO(out_put_old, rc = PTR_ERR(mnew));
2793
2794                 if (!mdt_object_exists(mnew)) {
2795                         LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2796                                         &mnew->mot_obj,
2797                                         "object does not exist");
2798                         GOTO(out_put_new, rc = -ENOENT);
2799                 }
2800
2801                 if (mdt_object_remote(mnew)) {
2802                         struct mdt_body  *repbody;
2803
2804                         /* Always send rename req to the target child MDT */
2805                         repbody = req_capsule_server_get(info->mti_pill,
2806                                                          &RMF_MDT_BODY);
2807                         LASSERT(repbody != NULL);
2808                         repbody->mbo_fid1 = *new_fid;
2809                         repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
2810                         GOTO(out_put_new, rc = -EXDEV);
2811                 }
2812                 /* Before locking the target dir, check we do not replace
2813                  * a dir with a non-dir, otherwise it may deadlock with
2814                  * link op which tries to create a link in this dir
2815                  * back to this non-dir.
2816                  */
2817                 if (S_ISDIR(lu_object_attr(&mnew->mot_obj)) &&
2818                     !S_ISDIR(lu_object_attr(&mold->mot_obj)))
2819                         GOTO(out_put_new, rc = -EISDIR);
2820
2821                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2822                 lh_rmt = &info->mti_lh[MDT_LH_RMT];
2823                 mdt_lock_reg_init(lh_oldp, LCK_EX);
2824                 mdt_lock_reg_init(lh_rmt, LCK_EX);
2825                 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
2826                 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2827                                             lh_rmt, lock_ibits, cos_incompat);
2828                 if (rc < 0)
2829                         GOTO(out_put_new, rc);
2830
2831                 /* Check if @msrcdir is subdir of @mnew, before locking child
2832                  * to avoid reverse locking.
2833                  */
2834                 if (mtgtdir != msrcdir) {
2835                         rc = mdo_is_subdir(info->mti_env,
2836                                            mdt_object_child(msrcdir), new_fid);
2837                         if (rc) {
2838                                 if (rc == 1)
2839                                         rc = -EINVAL;
2840                                 GOTO(out_unlock_old, rc);
2841                         }
2842                 }
2843
2844                 /* We used to acquire MDS_INODELOCK_FULL here but we
2845                  * can't do this now because a running HSM restore on
2846                  * the rename onto victim will hold the layout
2847                  * lock. See LU-4002.
2848                  */
2849
2850                 lh_newp = &info->mti_lh[MDT_LH_NEW];
2851                 mdt_lock_reg_init(lh_newp, LCK_EX);
2852                 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
2853                 if (mdt_object_remote(mtgtdir)) {
2854                         rc = mdt_remote_object_lock(info, mtgtdir,
2855                                                     mdt_object_fid(mnew),
2856                                                     &lh_newp->mlh_rreg_lh,
2857                                                     lh_newp->mlh_rreg_mode,
2858                                                     MDS_INODELOCK_LOOKUP,
2859                                                     false);
2860                         if (rc != ELDLM_OK)
2861                                 GOTO(out_unlock_old, rc);
2862
2863                         lock_ibits &= ~MDS_INODELOCK_LOOKUP;
2864                 }
2865                 rc = mdt_reint_object_lock(info, mnew, lh_newp, lock_ibits,
2866                                            cos_incompat);
2867                 if (rc != 0)
2868                         GOTO(out_unlock_new, rc);
2869
2870                 /* get and save version after locking */
2871                 mdt_version_get_save(info, mnew, 3);
2872         } else if (rc != -ENOENT) {
2873                 GOTO(out_put_old, rc);
2874         } else {
2875                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2876                 lh_rmt = &info->mti_lh[MDT_LH_RMT];
2877                 mdt_lock_reg_init(lh_oldp, LCK_EX);
2878                 mdt_lock_reg_init(lh_rmt, LCK_EX);
2879                 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
2880                 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2881                                             lh_rmt, lock_ibits, cos_incompat);
2882                 if (rc != 0)
2883                         GOTO(out_put_old, rc);
2884
2885                 mdt_enoent_version_save(info, 3);
2886         }
2887
2888         /* step 5: rename it */
2889         mdt_reint_init_ma(info, ma);
2890
2891         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
2892                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
2893
2894         if (mnew != NULL)
2895                 mutex_lock(&mnew->mot_lov_mutex);
2896
2897         rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
2898                         mdt_object_child(mtgtdir), old_fid, &rr->rr_name,
2899                         mnew != NULL ? mdt_object_child(mnew) : NULL,
2900                         &rr->rr_tgt_name, ma);
2901
2902         if (mnew != NULL)
2903                 mutex_unlock(&mnew->mot_lov_mutex);
2904
2905         /* handle last link of tgt object */
2906         if (rc == 0) {
2907                 mdt_counter_incr(req, LPROC_MDT_RENAME,
2908                                  ktime_us_delta(ktime_get(), kstart));
2909                 if (mnew) {
2910                         mdt_handle_last_unlink(info, mnew, ma);
2911                         discard = mdt_dom_check_for_discard(info, mnew);
2912                 }
2913                 mdt_rename_counter_tally(info, info->mti_mdt, req,
2914                                          msrcdir, mtgtdir,
2915                                          ktime_us_delta(ktime_get(), kstart));
2916         }
2917
2918         EXIT;
2919 out_unlock_new:
2920         if (mnew != NULL)
2921                 mdt_object_unlock(info, mnew, lh_newp, rc);
2922 out_unlock_old:
2923         mdt_object_unlock(info, NULL, lh_rmt, rc);
2924         mdt_object_unlock(info, mold, lh_oldp, rc);
2925 out_put_new:
2926         if (mnew && !discard)
2927                 mdt_object_put(info->mti_env, mnew);
2928 out_put_old:
2929         mdt_object_put(info->mti_env, mold);
2930 out_unlock_parents:
2931         mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
2932         mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
2933 out_unlock_rename:
2934         if (lustre_handle_is_used(&rename_lh))
2935                 mdt_rename_unlock(&rename_lh);
2936 out_put_tgtdir:
2937         mdt_object_put(info->mti_env, mtgtdir);
2938 out_put_srcdir:
2939         mdt_object_put(info->mti_env, msrcdir);
2940
2941         /* The DoM discard can be done right in the place above where it is
2942          * assigned, meanwhile it is done here after rename unlock due to
2943          * compatibility with old clients, for them the discard blocks
2944          * the main thread until completion. Check LU-11359 for details.
2945          */
2946         if (discard) {
2947                 mdt_dom_discard_data(info, mnew);
2948                 mdt_object_put(info->mti_env, mnew);
2949         }
2950         return rc;
2951 }
2952
2953 static int mdt_reint_resync(struct mdt_thread_info *info,
2954                             struct mdt_lock_handle *lhc)
2955 {
2956         struct mdt_reint_record *rr = &info->mti_rr;
2957         struct ptlrpc_request *req = mdt_info_req(info);
2958         struct md_attr *ma = &info->mti_attr;
2959         struct mdt_object *mo;
2960         struct ldlm_lock *lease;
2961         struct mdt_body *repbody;
2962         struct md_layout_change layout = { .mlc_mirror_id = rr->rr_mirror_id };
2963         bool lease_broken;
2964         int rc, rc2;
2965
2966         ENTRY;
2967         DEBUG_REQ(D_INODE, req, DFID", FLR file resync", PFID(rr->rr_fid1));
2968
2969         if (info->mti_dlm_req)
2970                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2971
2972         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
2973         if (IS_ERR(mo))
2974                 GOTO(out, rc = PTR_ERR(mo));
2975
2976         if (!mdt_object_exists(mo))
2977                 GOTO(out_obj, rc = -ENOENT);
2978
2979         if (!S_ISREG(lu_object_attr(&mo->mot_obj)))
2980                 GOTO(out_obj, rc = -EINVAL);
2981
2982         if (mdt_object_remote(mo))
2983                 GOTO(out_obj, rc = -EREMOTE);
2984
2985         lease = ldlm_handle2lock(rr->rr_lease_handle);
2986         if (lease == NULL)
2987                 GOTO(out_obj, rc = -ESTALE);
2988
2989         /* It's really necessary to grab open_sem and check if the lease lock
2990          * has been lost. There would exist a concurrent writer coming in and
2991          * generating some dirty data in memory cache, the writeback would fail
2992          * after the layout version is increased by MDS_REINT_RESYNC RPC.
2993          */
2994         if (!down_write_trylock(&mo->mot_open_sem))
2995                 GOTO(out_put_lease, rc = -EBUSY);
2996
2997         lock_res_and_lock(lease);
2998         lease_broken = ldlm_is_cancel(lease);
2999         unlock_res_and_lock(lease);
3000         if (lease_broken)
3001                 GOTO(out_unlock, rc = -EBUSY);
3002
3003         /* the file has yet opened by anyone else after we took the lease. */
3004         layout.mlc_opc = MD_LAYOUT_RESYNC;
3005         lhc = &info->mti_lh[MDT_LH_LOCAL];
3006         rc = mdt_layout_change(info, mo, lhc, &layout);
3007         if (rc)
3008                 GOTO(out_unlock, rc);
3009
3010         mdt_object_unlock(info, mo, lhc, 0);
3011
3012         ma->ma_need = MA_INODE;
3013         ma->ma_valid = 0;
3014         rc = mdt_attr_get_complex(info, mo, ma);
3015         if (rc != 0)
3016                 GOTO(out_unlock, rc);
3017
3018         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
3019         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
3020
3021         EXIT;
3022 out_unlock:
3023         up_write(&mo->mot_open_sem);
3024 out_put_lease:
3025         LDLM_LOCK_PUT(lease);
3026 out_obj:
3027         mdt_object_put(info->mti_env, mo);
3028 out:
3029         mdt_client_compatibility(info);
3030         rc2 = mdt_fix_reply(info);
3031         if (rc == 0)
3032                 rc = rc2;
3033         return rc;
3034 }
3035
3036 struct mdt_reinter {
3037         int (*mr_handler)(struct mdt_thread_info *, struct mdt_lock_handle *);
3038         enum lprocfs_extra_opc mr_extra_opc;
3039 };
3040
3041 static const struct mdt_reinter mdt_reinters[] = {
3042         [REINT_SETATTR] = {
3043                 .mr_handler = &mdt_reint_setattr,
3044                 .mr_extra_opc = MDS_REINT_SETATTR,
3045         },
3046         [REINT_CREATE] = {
3047                 .mr_handler = &mdt_reint_create,
3048                 .mr_extra_opc = MDS_REINT_CREATE,
3049         },
3050         [REINT_LINK] = {
3051                 .mr_handler = &mdt_reint_link,
3052                 .mr_extra_opc = MDS_REINT_LINK,
3053         },
3054         [REINT_UNLINK] = {
3055                 .mr_handler = &mdt_reint_unlink,
3056                 .mr_extra_opc = MDS_REINT_UNLINK,
3057         },
3058         [REINT_RENAME] = {
3059                 .mr_handler = &mdt_reint_rename,
3060                 .mr_extra_opc = MDS_REINT_RENAME,
3061         },
3062         [REINT_OPEN] = {
3063                 .mr_handler = &mdt_reint_open,
3064                 .mr_extra_opc = MDS_REINT_OPEN,
3065         },
3066         [REINT_SETXATTR] = {
3067                 .mr_handler = &mdt_reint_setxattr,
3068                 .mr_extra_opc = MDS_REINT_SETXATTR,
3069         },
3070         [REINT_RMENTRY] = {
3071                 .mr_handler = &mdt_reint_unlink,
3072                 .mr_extra_opc = MDS_REINT_UNLINK,
3073         },
3074         [REINT_MIGRATE] = {
3075                 .mr_handler = &mdt_reint_migrate,
3076                 .mr_extra_opc = MDS_REINT_RENAME,
3077         },
3078         [REINT_RESYNC] = {
3079                 .mr_handler = &mdt_reint_resync,
3080                 .mr_extra_opc = MDS_REINT_RESYNC,
3081         },
3082 };
3083
3084 int mdt_reint_rec(struct mdt_thread_info *info,
3085                   struct mdt_lock_handle *lhc)
3086 {
3087         const struct mdt_reinter *mr;
3088         int rc;
3089
3090         ENTRY;
3091         if (!(info->mti_rr.rr_opcode < ARRAY_SIZE(mdt_reinters)))
3092                 RETURN(-EPROTO);
3093
3094         mr = &mdt_reinters[info->mti_rr.rr_opcode];
3095         if (mr->mr_handler == NULL)
3096                 RETURN(-EPROTO);
3097
3098         rc = (*mr->mr_handler)(info, lhc);
3099
3100         lprocfs_counter_incr(ptlrpc_req2svc(mdt_info_req(info))->srv_stats,
3101                              PTLRPC_LAST_CNTR + mr->mr_extra_opc);
3102
3103         RETURN(rc);
3104 }