Whamcloud - gitweb
LU-12848 tests: link succeded to an ophan remote object
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mdt/mdt_reint.c
32  *
33  * Lustre Metadata Target (mdt) reintegration routines
34  *
35  * Author: Peter Braam <braam@clusterfs.com>
36  * Author: Andreas Dilger <adilger@clusterfs.com>
37  * Author: Phil Schwan <phil@clusterfs.com>
38  * Author: Huang Hua <huanghua@clusterfs.com>
39  * Author: Yury Umanets <umka@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_MDS
43
44 #include <lprocfs_status.h>
45 #include "mdt_internal.h"
46 #include <lustre_lmv.h>
47
48 static inline void mdt_reint_init_ma(struct mdt_thread_info *info,
49                                      struct md_attr *ma)
50 {
51         ma->ma_need = MA_INODE;
52         ma->ma_valid = 0;
53 }
54
55 /**
56  * Get version of object by fid.
57  *
58  * Return real version or ENOENT_VERSION if object doesn't exist
59  */
60 static void mdt_obj_version_get(struct mdt_thread_info *info,
61                                 struct mdt_object *o, __u64 *version)
62 {
63         LASSERT(o);
64
65         if (mdt_object_exists(o) && !mdt_object_remote(o) &&
66             !fid_is_obf(mdt_object_fid(o)))
67                 *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
68         else
69                 *version = ENOENT_VERSION;
70         CDEBUG(D_INODE, "FID "DFID" version is %#llx\n",
71                PFID(mdt_object_fid(o)), *version);
72 }
73
74 /**
75  * Check version is correct.
76  *
77  * Should be called only during replay.
78  */
79 static int mdt_version_check(struct ptlrpc_request *req,
80                              __u64 version, int idx)
81 {
82         __u64 *pre_ver = lustre_msg_get_versions(req->rq_reqmsg);
83
84         ENTRY;
85         if (!exp_connect_vbr(req->rq_export))
86                 RETURN(0);
87
88         LASSERT(req_is_replay(req));
89         /** VBR: version is checked always because costs nothing */
90         LASSERT(idx < PTLRPC_NUM_VERSIONS);
91         /** Sanity check for malformed buffers */
92         if (pre_ver == NULL) {
93                 CERROR("No versions in request buffer\n");
94                 spin_lock(&req->rq_export->exp_lock);
95                 req->rq_export->exp_vbr_failed = 1;
96                 spin_unlock(&req->rq_export->exp_lock);
97                 RETURN(-EOVERFLOW);
98         } else if (pre_ver[idx] != version) {
99                 CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n",
100                        pre_ver[idx], version);
101                 spin_lock(&req->rq_export->exp_lock);
102                 req->rq_export->exp_vbr_failed = 1;
103                 spin_unlock(&req->rq_export->exp_lock);
104                 RETURN(-EOVERFLOW);
105         }
106         RETURN(0);
107 }
108
109 /**
110  * Save pre-versions in reply.
111  */
112 static void mdt_version_save(struct ptlrpc_request *req, __u64 version,
113                              int idx)
114 {
115         __u64 *reply_ver;
116
117         if (!exp_connect_vbr(req->rq_export))
118                 return;
119
120         LASSERT(!req_is_replay(req));
121         LASSERT(req->rq_repmsg != NULL);
122         reply_ver = lustre_msg_get_versions(req->rq_repmsg);
123         if (reply_ver)
124                 reply_ver[idx] = version;
125 }
126
127 /**
128  * Save enoent version, it is needed when it is obvious that object doesn't
129  * exist, e.g. child during create.
130  */
131 static void mdt_enoent_version_save(struct mdt_thread_info *info, int idx)
132 {
133         /* save version of file name for replay, it must be ENOENT here */
134         if (!req_is_replay(mdt_info_req(info))) {
135                 info->mti_ver[idx] = ENOENT_VERSION;
136                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
137         }
138 }
139
140 /**
141  * Get version from disk and save in reply buffer.
142  *
143  * Versions are saved in reply only during normal operations not replays.
144  */
145 void mdt_version_get_save(struct mdt_thread_info *info,
146                           struct mdt_object *mto, int idx)
147 {
148         /* don't save versions during replay */
149         if (!req_is_replay(mdt_info_req(info))) {
150                 mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
151                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
152         }
153 }
154
155 /**
156  * Get version from disk and check it, no save in reply.
157  */
158 int mdt_version_get_check(struct mdt_thread_info *info,
159                           struct mdt_object *mto, int idx)
160 {
161         /* only check versions during replay */
162         if (!req_is_replay(mdt_info_req(info)))
163                 return 0;
164
165         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
166         return mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
167 }
168
169 /**
170  * Get version from disk and check if recovery or just save.
171  */
172 int mdt_version_get_check_save(struct mdt_thread_info *info,
173                                struct mdt_object *mto, int idx)
174 {
175         int rc = 0;
176
177         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
178         if (req_is_replay(mdt_info_req(info)))
179                 rc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx],
180                                        idx);
181         else
182                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
183         return rc;
184 }
185
186 /**
187  * Lookup with version checking.
188  *
189  * This checks version of 'name'. Many reint functions uses 'name' for child not
190  * FID, therefore we need to get object by name and check its version.
191  */
192 int mdt_lookup_version_check(struct mdt_thread_info *info,
193                              struct mdt_object *p,
194                              const struct lu_name *lname,
195                              struct lu_fid *fid, int idx)
196 {
197         int rc, vbrc;
198
199         rc = mdo_lookup(info->mti_env, mdt_object_child(p), lname, fid,
200                         &info->mti_spec);
201         /* Check version only during replay */
202         if (!req_is_replay(mdt_info_req(info)))
203                 return rc;
204
205         info->mti_ver[idx] = ENOENT_VERSION;
206         if (rc == 0) {
207                 struct mdt_object *child;
208
209                 child = mdt_object_find(info->mti_env, info->mti_mdt, fid);
210                 if (likely(!IS_ERR(child))) {
211                         mdt_obj_version_get(info, child, &info->mti_ver[idx]);
212                         mdt_object_put(info->mti_env, child);
213                 }
214         }
215         vbrc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
216         return vbrc ? vbrc : rc;
217
218 }
219
220 static int mdt_unlock_slaves(struct mdt_thread_info *mti,
221                              struct mdt_object *obj,
222                              struct ldlm_enqueue_info *einfo,
223                              int decref)
224 {
225         union ldlm_policy_data *policy = &mti->mti_policy;
226         struct mdt_lock_handle *lh = &mti->mti_lh[MDT_LH_LOCAL];
227         struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
228         int i;
229
230         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
231         LASSERT(slave_locks);
232
233         memset(policy, 0, sizeof(*policy));
234         policy->l_inodebits.bits = einfo->ei_inodebits;
235         mdt_lock_handle_init(lh);
236         mdt_lock_reg_init(lh, einfo->ei_mode);
237         for (i = 0; i < slave_locks->ha_count; i++) {
238                 if (test_bit(i, (void *)slave_locks->ha_map))
239                         lh->mlh_rreg_lh = slave_locks->ha_handles[i];
240                 else
241                         lh->mlh_reg_lh = slave_locks->ha_handles[i];
242                 mdt_object_unlock(mti, NULL, lh, decref);
243                 slave_locks->ha_handles[i].cookie = 0ull;
244         }
245
246         return mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
247                                 policy);
248 }
249
250 static inline int mdt_object_striped(struct mdt_thread_info *mti,
251                                      struct mdt_object *obj)
252 {
253         struct lu_device *bottom_dev;
254         struct lu_object *bottom_obj;
255         int rc;
256
257         if (!S_ISDIR(obj->mot_header.loh_attr))
258                 return 0;
259
260         /* getxattr from bottom obj to avoid reading in shard FIDs */
261         bottom_dev = dt2lu_dev(mti->mti_mdt->mdt_bottom);
262         bottom_obj = lu_object_find_slice(mti->mti_env, bottom_dev,
263                                           mdt_object_fid(obj), NULL);
264         if (IS_ERR(bottom_obj))
265                 return PTR_ERR(bottom_obj);
266
267         rc = dt_xattr_get(mti->mti_env, lu2dt(bottom_obj), &LU_BUF_NULL,
268                           XATTR_NAME_LMV);
269         lu_object_put(mti->mti_env, bottom_obj);
270
271         return (rc > 0) ? 1 : (rc == -ENODATA) ? 0 : rc;
272 }
273
274 /**
275  * Lock slave stripes if necessary, the lock handles of slave stripes
276  * will be stored in einfo->ei_cbdata.
277  **/
278 static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
279                            enum ldlm_mode mode, __u64 ibits,
280                            struct ldlm_enqueue_info *einfo)
281 {
282         union ldlm_policy_data *policy = &mti->mti_policy;
283
284         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
285
286         einfo->ei_type = LDLM_IBITS;
287         einfo->ei_mode = mode;
288         einfo->ei_cb_bl = mdt_remote_blocking_ast;
289         einfo->ei_cb_local_bl = mdt_blocking_ast;
290         einfo->ei_cb_cp = ldlm_completion_ast;
291         einfo->ei_enq_slave = 1;
292         einfo->ei_namespace = mti->mti_mdt->mdt_namespace;
293         einfo->ei_inodebits = ibits;
294         memset(policy, 0, sizeof(*policy));
295         policy->l_inodebits.bits = ibits;
296
297         return mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
298                               policy);
299 }
300
301 int mdt_reint_striped_lock(struct mdt_thread_info *info,
302                            struct mdt_object *o,
303                            struct mdt_lock_handle *lh,
304                            __u64 ibits,
305                            struct ldlm_enqueue_info *einfo,
306                            bool cos_incompat)
307 {
308         int rc;
309
310         LASSERT(!mdt_object_remote(o));
311
312         memset(einfo, 0, sizeof(*einfo));
313
314         rc = mdt_reint_object_lock(info, o, lh, ibits, cos_incompat);
315         if (rc)
316                 return rc;
317
318         rc = mdt_object_striped(info, o);
319         if (rc != 1) {
320                 if (rc < 0)
321                         mdt_object_unlock(info, o, lh, rc);
322                 return rc;
323         }
324
325         rc = mdt_lock_slaves(info, o, lh->mlh_reg_mode, ibits, einfo);
326         if (rc) {
327                 mdt_object_unlock(info, o, lh, rc);
328                 if (rc == -EIO && OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME))
329                         rc = 0;
330         }
331
332         return rc;
333 }
334
335 void mdt_reint_striped_unlock(struct mdt_thread_info *info,
336                               struct mdt_object *o,
337                               struct mdt_lock_handle *lh,
338                               struct ldlm_enqueue_info *einfo, int decref)
339 {
340         if (einfo->ei_cbdata)
341                 mdt_unlock_slaves(info, o, einfo, decref);
342         mdt_object_unlock(info, o, lh, decref);
343 }
344
345 static int mdt_restripe(struct mdt_thread_info *info,
346                         struct mdt_object *parent,
347                         const struct lu_name *lname,
348                         const struct lu_fid *tfid,
349                         struct md_op_spec *spec,
350                         struct md_attr *ma)
351 {
352         struct mdt_device *mdt = info->mti_mdt;
353         struct lu_fid *fid = &info->mti_tmp_fid2;
354         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
355         struct lmv_user_md *lum = spec->u.sp_ea.eadata;
356         struct lmv_mds_md_v1 *lmv;
357         struct mdt_object *child;
358         struct mdt_lock_handle *lhp;
359         struct mdt_lock_handle *lhc;
360         struct mdt_body *repbody;
361         int rc;
362
363         ENTRY;
364         if (!mdt->mdt_enable_dir_restripe)
365                 RETURN(-EPERM);
366
367         LASSERT(lum);
368         lum->lum_hash_type |= cpu_to_le32(LMV_HASH_FLAG_FIXED);
369
370         rc = mdt_version_get_check_save(info, parent, 0);
371         if (rc)
372                 RETURN(rc);
373
374         lhp = &info->mti_lh[MDT_LH_PARENT];
375         mdt_lock_pdo_init(lhp, LCK_PW, lname);
376         rc = mdt_reint_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
377                                    true);
378         if (rc)
379                 RETURN(rc);
380
381         rc = mdt_stripe_get(info, parent, ma, XATTR_NAME_LMV);
382         if (rc)
383                 GOTO(unlock_parent, rc);
384
385         if (ma->ma_valid & MA_LMV) {
386                 /* don't allow restripe if parent dir layout is changing */
387                 lmv = &ma->ma_lmv->lmv_md_v1;
388                 if (!lmv_is_sane2(lmv))
389                         GOTO(unlock_parent, rc = -EBADF);
390
391                 if (lmv_is_layout_changing(lmv))
392                         GOTO(unlock_parent, rc = -EBUSY);
393         }
394
395         fid_zero(fid);
396         rc = mdt_lookup_version_check(info, parent, lname, fid, 1);
397         if (rc)
398                 GOTO(unlock_parent, rc);
399
400         child = mdt_object_find(info->mti_env, mdt, fid);
401         if (IS_ERR(child))
402                 GOTO(unlock_parent, rc = PTR_ERR(child));
403
404         if (!mdt_object_exists(child))
405                 GOTO(out_child, rc = -ENOENT);
406
407         if (mdt_object_remote(child)) {
408                 struct mdt_body *repbody;
409
410                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
411                 if (!repbody)
412                         GOTO(out_child, rc = -EPROTO);
413
414                 repbody->mbo_fid1 = *fid;
415                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
416                 GOTO(out_child, rc = -EREMOTE);
417         }
418
419         if (!S_ISDIR(lu_object_attr(&child->mot_obj)))
420                 GOTO(out_child, rc = -ENOTDIR);
421
422         rc = mdt_stripe_get(info, child, ma, XATTR_NAME_LMV);
423         if (rc)
424                 GOTO(out_child, rc);
425
426         /* race with migrate? */
427         if ((ma->ma_valid & MA_LMV) &&
428              lmv_is_migrating(&ma->ma_lmv->lmv_md_v1))
429                 GOTO(out_child, rc = -EBUSY);
430
431         /* lock object */
432         lhc = &info->mti_lh[MDT_LH_CHILD];
433         mdt_lock_reg_init(lhc, LCK_EX);
434
435         /* enqueue object remote LOOKUP lock */
436         if (mdt_object_remote(parent)) {
437                 rc = mdt_remote_object_lock(info, parent, fid,
438                                             &lhc->mlh_rreg_lh,
439                                             lhc->mlh_rreg_mode,
440                                             MDS_INODELOCK_LOOKUP, false);
441                 if (rc != ELDLM_OK)
442                         GOTO(out_child, rc);
443         }
444
445         rc = mdt_reint_striped_lock(info, child, lhc, MDS_INODELOCK_FULL, einfo,
446                                     true);
447         if (rc)
448                 GOTO(unlock_child, rc);
449
450         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
451         rc = mdt_version_get_check_save(info, child, 1);
452         if (rc)
453                 GOTO(unlock_child, rc);
454
455         spin_lock(&mdt->mdt_restriper.mdr_lock);
456         if (child->mot_restriping) {
457                 /* race? */
458                 spin_unlock(&mdt->mdt_restriper.mdr_lock);
459                 GOTO(unlock_child, rc = -EBUSY);
460         }
461         child->mot_restriping = 1;
462         spin_unlock(&mdt->mdt_restriper.mdr_lock);
463
464         *fid = *tfid;
465         rc = mdt_restripe_internal(info, parent, child, lname, fid, spec, ma);
466         if (rc)
467                 GOTO(restriping_clear, rc);
468
469         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
470         if (!repbody)
471                 GOTO(restriping_clear, rc = -EPROTO);
472
473         mdt_pack_attr2body(info, repbody, &ma->ma_attr, fid);
474         EXIT;
475
476 restriping_clear:
477         child->mot_restriping = 0;
478 unlock_child:
479         mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
480 out_child:
481         mdt_object_put(info->mti_env, child);
482 unlock_parent:
483         mdt_object_unlock(info, parent, lhp, rc);
484
485         return rc;
486 }
487
488 /*
489  * VBR: we save three versions in reply:
490  * 0 - parent. Check that parent version is the same during replay.
491  * 1 - name. Version of 'name' if file exists with the same name or
492  * ENOENT_VERSION, it is needed because file may appear due to missed replays.
493  * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
494  * check.
495  */
496 static int mdt_create(struct mdt_thread_info *info)
497 {
498         struct mdt_device *mdt = info->mti_mdt;
499         struct mdt_object *parent;
500         struct mdt_object *child;
501         struct mdt_lock_handle *lh;
502         struct mdt_body *repbody;
503         struct md_attr *ma = &info->mti_attr;
504         struct mdt_reint_record *rr = &info->mti_rr;
505         struct md_op_spec *spec = &info->mti_spec;
506         bool restripe = false;
507         int rc;
508
509         ENTRY;
510         DEBUG_REQ(D_INODE, mdt_info_req(info),
511                   "Create ("DNAME"->"DFID") in "DFID,
512                   PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1));
513
514         if (!fid_is_md_operative(rr->rr_fid1))
515                 RETURN(-EPERM);
516
517         if (S_ISDIR(ma->ma_attr.la_mode) &&
518             spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
519                 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
520                 struct lu_ucred *uc = mdt_ucred(info);
521                 struct obd_export *exp = mdt_info_req(info)->rq_export;
522
523                 /* Only new clients can create remote dir( >= 2.4) and
524                  * striped dir(>= 2.6), old client will return -ENOTSUPP
525                  */
526                 if (!mdt_is_dne_client(exp))
527                         RETURN(-ENOTSUPP);
528
529                 if (le32_to_cpu(lum->lum_stripe_count) > 1) {
530                         if (!mdt_is_striped_client(exp))
531                                 RETURN(-ENOTSUPP);
532
533                         if (!mdt->mdt_enable_striped_dir)
534                                 RETURN(-EPERM);
535                 } else if (!mdt->mdt_enable_remote_dir) {
536                         RETURN(-EPERM);
537                 }
538
539                 if ((!(exp_connect_flags2(exp) & OBD_CONNECT2_CRUSH)) &&
540                     (le32_to_cpu(lum->lum_hash_type) & LMV_HASH_TYPE_MASK) ==
541                     LMV_HASH_TYPE_CRUSH)
542                         RETURN(-EPROTO);
543
544                 if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
545                     uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
546                     mdt->mdt_enable_remote_dir_gid != -1)
547                         RETURN(-EPERM);
548
549                 /* restripe if later found dir exists, MDS_OPEN_CREAT means
550                  * this is create only, don't try restripe.
551                  */
552                 if (mdt->mdt_enable_dir_restripe &&
553                     le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT &&
554                     !(spec->sp_cr_flags & MDS_OPEN_CREAT))
555                         restripe = true;
556         }
557
558         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
559
560         parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
561         if (IS_ERR(parent))
562                 RETURN(PTR_ERR(parent));
563
564         if (!mdt_object_exists(parent))
565                 GOTO(put_parent, rc = -ENOENT);
566
567         /*
568          * LU-10235: check if name exists locklessly first to avoid massive
569          * lock recalls on existing directories.
570          */
571         rc = mdt_lookup_version_check(info, parent, &rr->rr_name,
572                                       &info->mti_tmp_fid1, 1);
573         if (rc == 0) {
574                 if (!restripe)
575                         GOTO(put_parent, rc = -EEXIST);
576
577                 rc = mdt_restripe(info, parent, &rr->rr_name, rr->rr_fid2, spec,
578                                   ma);
579         }
580
581         /* -ENOENT is expected here */
582         if (rc != -ENOENT)
583                 GOTO(put_parent, rc);
584
585         /* save version of file name for replay, it must be ENOENT here */
586         mdt_enoent_version_save(info, 1);
587
588         OBD_RACE(OBD_FAIL_MDS_CREATE_RACE);
589
590         lh = &info->mti_lh[MDT_LH_PARENT];
591         mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
592         rc = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE);
593         if (rc)
594                 GOTO(put_parent, rc);
595
596         if (!mdt_object_remote(parent)) {
597                 rc = mdt_version_get_check_save(info, parent, 0);
598                 if (rc)
599                         GOTO(unlock_parent, rc);
600         }
601
602         child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
603         if (unlikely(IS_ERR(child)))
604                 GOTO(unlock_parent, rc = PTR_ERR(child));
605
606         ma->ma_need = MA_INODE;
607         ma->ma_valid = 0;
608
609         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
610                         OBD_FAIL_MDS_REINT_CREATE_WRITE);
611
612         /* Version of child will be updated on disk. */
613         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
614         rc = mdt_version_get_check_save(info, child, 2);
615         if (rc)
616                 GOTO(put_child, rc);
617
618         /*
619          * Do not perform lookup sanity check. We know that name does
620          * not exist.
621          */
622         info->mti_spec.sp_cr_lookup = 0;
623         info->mti_spec.sp_feat = &dt_directory_features;
624
625         rc = mdo_create(info->mti_env, mdt_object_child(parent), &rr->rr_name,
626                         mdt_object_child(child), &info->mti_spec, ma);
627         if (rc == 0)
628                 rc = mdt_attr_get_complex(info, child, ma);
629
630         if (rc < 0)
631                 GOTO(put_child, rc);
632
633         /*
634          * On DNE, we need to eliminate dependey between 'mkdir a' and
635          * 'mkdir a/b' if b is a striped directory, to achieve this, two
636          * things are done below:
637          * 1. save child and slaves lock.
638          * 2. if the child is a striped directory, relock parent so to
639          *    compare against with COS locks to ensure parent was
640          *    committed to disk.
641          */
642         if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
643                 struct mdt_lock_handle *lhc;
644                 struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
645                 bool cos_incompat;
646
647                 rc = mdt_object_striped(info, child);
648                 if (rc < 0)
649                         GOTO(put_child, rc);
650
651                 cos_incompat = rc;
652                 if (cos_incompat) {
653                         if (!mdt_object_remote(parent)) {
654                                 mdt_object_unlock(info, parent, lh, 1);
655                                 mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
656                                 rc = mdt_reint_object_lock(info, parent, lh,
657                                                            MDS_INODELOCK_UPDATE,
658                                                            true);
659                                 if (rc)
660                                         GOTO(put_child, rc);
661                         }
662                 }
663
664                 lhc = &info->mti_lh[MDT_LH_CHILD];
665                 mdt_lock_handle_init(lhc);
666                 mdt_lock_reg_init(lhc, LCK_PW);
667                 rc = mdt_reint_striped_lock(info, child, lhc,
668                                             MDS_INODELOCK_UPDATE, einfo,
669                                             cos_incompat);
670                 if (rc)
671                         GOTO(put_child, rc);
672
673                 mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
674         }
675
676         /* Return fid & attr to client. */
677         if (ma->ma_valid & MA_INODE)
678                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
679                                    mdt_object_fid(child));
680         EXIT;
681 put_child:
682         mdt_object_put(info->mti_env, child);
683 unlock_parent:
684         mdt_object_unlock(info, parent, lh, rc);
685 put_parent:
686         mdt_object_put(info->mti_env, parent);
687         return rc;
688 }
689
690 static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
691                         struct md_attr *ma)
692 {
693         struct mdt_lock_handle  *lh;
694         int do_vbr = ma->ma_attr.la_valid &
695                         (LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS);
696         __u64 lockpart = MDS_INODELOCK_UPDATE;
697         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
698         bool cos_incompat;
699         int rc;
700
701         ENTRY;
702         rc = mdt_object_striped(info, mo);
703         if (rc < 0)
704                 RETURN(rc);
705
706         cos_incompat = rc;
707
708         lh = &info->mti_lh[MDT_LH_PARENT];
709         mdt_lock_reg_init(lh, LCK_PW);
710
711         /* Even though the new MDT will grant PERM lock to the old
712          * client, but the old client will almost ignore that during
713          * So it needs to revoke both LOOKUP and PERM lock here, so
714          * both new and old client can cancel the dcache
715          */
716         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
717                 lockpart |= MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM;
718
719         rc = mdt_reint_striped_lock(info, mo, lh, lockpart, einfo,
720                                     cos_incompat);
721         if (rc != 0)
722                 RETURN(rc);
723
724         /* all attrs are packed into mti_attr in unpack_setattr */
725         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
726                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
727
728         /* VBR: update version if attr changed are important for recovery */
729         if (do_vbr) {
730                 /* update on-disk version of changed object */
731                 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mo));
732                 rc = mdt_version_get_check_save(info, mo, 0);
733                 if (rc)
734                         GOTO(out_unlock, rc);
735         }
736
737         /* Ensure constant striping during chown(). See LU-2789. */
738         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
739                 mutex_lock(&mo->mot_lov_mutex);
740
741         /* all attrs are packed into mti_attr in unpack_setattr */
742         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
743
744         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
745                 mutex_unlock(&mo->mot_lov_mutex);
746
747         if (rc != 0)
748                 GOTO(out_unlock, rc);
749         mdt_dom_obj_lvb_update(info->mti_env, mo, NULL, false);
750         EXIT;
751 out_unlock:
752         mdt_reint_striped_unlock(info, mo, lh, einfo, rc);
753         return rc;
754 }
755
756 /**
757  * Check HSM flags and add HS_DIRTY flag if relevant.
758  *
759  * A file could be set dirty only if it has a copy in the backend (HS_EXISTS)
760  * and is not RELEASED.
761  */
762 int mdt_add_dirty_flag(struct mdt_thread_info *info, struct mdt_object *mo,
763                         struct md_attr *ma)
764 {
765         struct lu_ucred *uc = mdt_ucred(info);
766         kernel_cap_t cap_saved;
767         int rc;
768
769         ENTRY;
770         /* If the file was modified, add the dirty flag */
771         ma->ma_need = MA_HSM;
772         rc = mdt_attr_get_complex(info, mo, ma);
773         if (rc) {
774                 CERROR("file attribute read error for "DFID": %d.\n",
775                         PFID(mdt_object_fid(mo)), rc);
776                 RETURN(rc);
777         }
778
779         /* If an up2date copy exists in the backend, add dirty flag */
780         if ((ma->ma_valid & MA_HSM) && (ma->ma_hsm.mh_flags & HS_EXISTS)
781             && !(ma->ma_hsm.mh_flags & (HS_DIRTY|HS_RELEASED))) {
782                 ma->ma_hsm.mh_flags |= HS_DIRTY;
783
784                 /* Bump cap so that closes from non-owner writers can
785                  * set the HSM state to dirty.
786                  */
787                 cap_saved = uc->uc_cap;
788                 cap_raise(uc->uc_cap, CAP_FOWNER);
789                 rc = mdt_hsm_attr_set(info, mo, &ma->ma_hsm);
790                 uc->uc_cap = cap_saved;
791                 if (rc)
792                         CERROR("file attribute change error for "DFID": %d\n",
793                                 PFID(mdt_object_fid(mo)), rc);
794         }
795
796         RETURN(rc);
797 }
798
799 static int mdt_reint_setattr(struct mdt_thread_info *info,
800                              struct mdt_lock_handle *lhc)
801 {
802         struct mdt_device *mdt = info->mti_mdt;
803         struct md_attr *ma = &info->mti_attr;
804         struct mdt_reint_record *rr = &info->mti_rr;
805         struct ptlrpc_request *req = mdt_info_req(info);
806         struct mdt_object *mo;
807         struct mdt_body *repbody;
808         ktime_t kstart = ktime_get();
809         int rc, rc2;
810
811         ENTRY;
812         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
813                   (unsigned int)ma->ma_attr.la_valid);
814
815         if (info->mti_dlm_req)
816                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
817
818         OBD_RACE(OBD_FAIL_PTLRPC_RESEND_RACE);
819
820         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
821         mo = mdt_object_find(info->mti_env, mdt, rr->rr_fid1);
822         if (IS_ERR(mo))
823                 GOTO(out, rc = PTR_ERR(mo));
824
825         if (!mdt_object_exists(mo))
826                 GOTO(out_put, rc = -ENOENT);
827
828         if (mdt_object_remote(mo))
829                 GOTO(out_put, rc = -EREMOTE);
830
831         ma->ma_enable_chprojid_gid = mdt->mdt_enable_chprojid_gid;
832         /* revoke lease lock if size is going to be changed */
833         if (unlikely(ma->ma_attr.la_valid & LA_SIZE &&
834                      !(ma->ma_attr_flags & MDS_TRUNC_KEEP_LEASE) &&
835                      atomic_read(&mo->mot_lease_count) > 0)) {
836                 down_read(&mo->mot_open_sem);
837
838                 if (atomic_read(&mo->mot_lease_count) > 0) { /* lease exists */
839                         lhc = &info->mti_lh[MDT_LH_LOCAL];
840                         mdt_lock_reg_init(lhc, LCK_CW);
841
842                         rc = mdt_object_lock(info, mo, lhc, MDS_INODELOCK_OPEN);
843                         if (rc != 0) {
844                                 up_read(&mo->mot_open_sem);
845                                 GOTO(out_put, rc);
846                         }
847
848                         /* revoke lease lock */
849                         mdt_object_unlock(info, mo, lhc, 1);
850                 }
851                 up_read(&mo->mot_open_sem);
852         }
853
854         if (ma->ma_attr.la_valid & LA_SIZE || rr->rr_flags & MRF_OPEN_TRUNC) {
855                 /* Check write access for the O_TRUNC case */
856                 if (mdt_write_read(mo) < 0)
857                         GOTO(out_put, rc = -ETXTBSY);
858
859                 /* LU-10286: compatibility check for FLR.
860                  * Please check the comment in mdt_finish_open() for details
861                  */
862                 if (!exp_connect_flr(info->mti_exp) ||
863                     !exp_connect_overstriping(info->mti_exp)) {
864                         rc = mdt_big_xattr_get(info, mo, XATTR_NAME_LOV);
865                         if (rc < 0 && rc != -ENODATA)
866                                 GOTO(out_put, rc);
867
868                         if (!exp_connect_flr(info->mti_exp)) {
869                                 if (rc > 0 &&
870                                     mdt_lmm_is_flr(info->mti_big_lmm))
871                                         GOTO(out_put, rc = -EOPNOTSUPP);
872                         }
873
874                         if (!exp_connect_overstriping(info->mti_exp)) {
875                                 if (rc > 0 &&
876                                     mdt_lmm_is_overstriping(info->mti_big_lmm))
877                                         GOTO(out_put, rc = -EOPNOTSUPP);
878                         }
879                 }
880
881                 /* For truncate, the file size sent from client
882                  * is believable, but the blocks are incorrect,
883                  * which makes the block size in LSOM attribute
884                  * inconsisent with the real block size.
885                  */
886                 rc = mdt_lsom_update(info, mo, true);
887                 if (rc)
888                         GOTO(out_put, rc);
889         }
890
891         if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
892                 if (ma->ma_valid & MA_LOV)
893                         GOTO(out_put, rc = -EPROTO);
894
895                 /* MDT supports FMD for regular files due to Data-on-MDT */
896                 if (S_ISREG(lu_object_attr(&mo->mot_obj)) &&
897                     ma->ma_attr.la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) {
898                         tgt_fmd_update(info->mti_exp, mdt_object_fid(mo),
899                                        req->rq_xid);
900
901                         if (ma->ma_attr.la_valid & LA_MTIME) {
902                                 rc = mdt_attr_get_pfid(info, mo, &ma->ma_pfid);
903                                 if (!rc)
904                                         ma->ma_valid |= MA_PFID;
905                         }
906                 }
907
908                 rc = mdt_attr_set(info, mo, ma);
909                 if (rc)
910                         GOTO(out_put, rc);
911         } else if ((ma->ma_valid & (MA_LOV | MA_LMV)) &&
912                    (ma->ma_valid & MA_INODE)) {
913                 struct lu_buf *buf = &info->mti_buf;
914                 struct lu_ucred *uc = mdt_ucred(info);
915                 struct mdt_lock_handle *lh;
916                 const char *name;
917                 __u64 lockpart = MDS_INODELOCK_XATTR;
918
919                 /* reject if either remote or striped dir is disabled */
920                 if (ma->ma_valid & MA_LMV) {
921                         if (!mdt->mdt_enable_remote_dir ||
922                             !mdt->mdt_enable_striped_dir)
923                                 GOTO(out_put, rc = -EPERM);
924
925                         if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
926                             uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
927                             mdt->mdt_enable_remote_dir_gid != -1)
928                                 GOTO(out_put, rc = -EPERM);
929                 }
930
931                 if (!S_ISDIR(lu_object_attr(&mo->mot_obj)))
932                         GOTO(out_put, rc = -ENOTDIR);
933
934                 if (ma->ma_attr.la_valid != 0)
935                         GOTO(out_put, rc = -EPROTO);
936
937                 if (ma->ma_valid & MA_LOV) {
938                         buf->lb_buf = ma->ma_lmm;
939                         buf->lb_len = ma->ma_lmm_size;
940                         name = XATTR_NAME_LOV;
941                 } else {
942                         struct lmv_user_md *lmu = &ma->ma_lmv->lmv_user_md;
943
944                         buf->lb_buf = lmu;
945                         buf->lb_len = ma->ma_lmv_size;
946                         name = XATTR_NAME_DEFAULT_LMV;
947                         /* force client to update dir default layout */
948                         lockpart |= MDS_INODELOCK_LOOKUP;
949                 }
950
951                 lh = &info->mti_lh[MDT_LH_PARENT];
952                 mdt_lock_reg_init(lh, LCK_PW);
953
954                 rc = mdt_object_lock(info, mo, lh, lockpart);
955                 if (rc != 0)
956                         GOTO(out_put, rc);
957
958                 rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), buf,
959                                   name, 0);
960
961                 mdt_object_unlock(info, mo, lh, rc);
962                 if (rc)
963                         GOTO(out_put, rc);
964         } else {
965                 GOTO(out_put, rc = -EPROTO);
966         }
967
968         /* If file data is modified, add the dirty flag */
969         if (ma->ma_attr_flags & MDS_DATA_MODIFIED)
970                 rc = mdt_add_dirty_flag(info, mo, ma);
971
972         ma->ma_need = MA_INODE;
973         ma->ma_valid = 0;
974         rc = mdt_attr_get_complex(info, mo, ma);
975         if (rc != 0)
976                 GOTO(out_put, rc);
977
978         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
979
980         EXIT;
981 out_put:
982         mdt_object_put(info->mti_env, mo);
983 out:
984         if (rc == 0)
985                 mdt_counter_incr(req, LPROC_MDT_SETATTR,
986                                  ktime_us_delta(ktime_get(), kstart));
987
988         mdt_client_compatibility(info);
989         rc2 = mdt_fix_reply(info);
990         if (rc == 0)
991                 rc = rc2;
992         return rc;
993 }
994
995 static int mdt_reint_create(struct mdt_thread_info *info,
996                             struct mdt_lock_handle *lhc)
997 {
998         struct ptlrpc_request   *req = mdt_info_req(info);
999         ktime_t                 kstart = ktime_get();
1000         int                     rc;
1001
1002         ENTRY;
1003         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
1004                 RETURN(err_serious(-ESTALE));
1005
1006         if (info->mti_dlm_req)
1007                 ldlm_request_cancel(mdt_info_req(info),
1008                                     info->mti_dlm_req, 0, LATF_SKIP);
1009
1010         if (!lu_name_is_valid(&info->mti_rr.rr_name))
1011                 RETURN(-EPROTO);
1012
1013         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
1014         case S_IFDIR:
1015         case S_IFREG:
1016         case S_IFLNK:
1017         case S_IFCHR:
1018         case S_IFBLK:
1019         case S_IFIFO:
1020         case S_IFSOCK:
1021                 break;
1022         default:
1023                 CERROR("%s: Unsupported mode %o\n",
1024                        mdt_obd_name(info->mti_mdt),
1025                        info->mti_attr.ma_attr.la_mode);
1026                 RETURN(err_serious(-EOPNOTSUPP));
1027         }
1028
1029         rc = mdt_create(info);
1030         if (rc == 0) {
1031                 if ((info->mti_attr.ma_attr.la_mode & S_IFMT) == S_IFDIR)
1032                         mdt_counter_incr(req, LPROC_MDT_MKDIR,
1033                                          ktime_us_delta(ktime_get(), kstart));
1034                 else
1035                         /* Special file should stay on the same node as parent*/
1036                         mdt_counter_incr(req, LPROC_MDT_MKNOD,
1037                                          ktime_us_delta(ktime_get(), kstart));
1038         }
1039
1040         RETURN(rc);
1041 }
1042
1043 /*
1044  * VBR: save parent version in reply and child version getting by its name.
1045  * Version of child is getting and checking during its lookup. If
1046  */
1047 static int mdt_reint_unlink(struct mdt_thread_info *info,
1048                             struct mdt_lock_handle *lhc)
1049 {
1050         struct mdt_reint_record *rr = &info->mti_rr;
1051         struct ptlrpc_request *req = mdt_info_req(info);
1052         struct md_attr *ma = &info->mti_attr;
1053         struct lu_fid *child_fid = &info->mti_tmp_fid1;
1054         struct mdt_object *mp;
1055         struct mdt_object *mc;
1056         struct mdt_lock_handle *parent_lh;
1057         struct mdt_lock_handle *child_lh;
1058         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
1059         __u64 lock_ibits;
1060         bool cos_incompat = false;
1061         int no_name = 0;
1062         ktime_t kstart = ktime_get();
1063         int rc;
1064
1065         ENTRY;
1066         DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1),
1067                   PNAME(&rr->rr_name));
1068
1069         if (info->mti_dlm_req)
1070                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1071
1072         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
1073                 RETURN(err_serious(-ENOENT));
1074
1075         if (!fid_is_md_operative(rr->rr_fid1))
1076                 RETURN(-EPERM);
1077
1078         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1079         if (IS_ERR(mp))
1080                 RETURN(PTR_ERR(mp));
1081
1082         if (mdt_object_remote(mp)) {
1083                 cos_incompat = true;
1084         } else {
1085                 rc = mdt_version_get_check_save(info, mp, 0);
1086                 if (rc)
1087                         GOTO(put_parent, rc);
1088         }
1089
1090         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN);
1091         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN2);
1092 relock:
1093         parent_lh = &info->mti_lh[MDT_LH_PARENT];
1094         mdt_lock_pdo_init(parent_lh, LCK_PW, &rr->rr_name);
1095         rc = mdt_reint_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE,
1096                                    cos_incompat);
1097         if (rc != 0)
1098                 GOTO(put_parent, rc);
1099
1100         /* lookup child object along with version checking */
1101         fid_zero(child_fid);
1102         rc = mdt_lookup_version_check(info, mp, &rr->rr_name, child_fid, 1);
1103         if (rc != 0) {
1104                 /* Name might not be able to find during resend of
1105                  * remote unlink, considering following case.
1106                  * dir_A is a remote directory, the name entry of
1107                  * dir_A is on MDT0, the directory is on MDT1,
1108                  *
1109                  * 1. client sends unlink req to MDT1.
1110                  * 2. MDT1 sends name delete update to MDT0.
1111                  * 3. name entry is being deleted in MDT0 synchronously.
1112                  * 4. MDT1 is restarted.
1113                  * 5. client resends unlink req to MDT1. So it can not
1114                  *    find the name entry on MDT0 anymore.
1115                  * In this case, MDT1 only needs to destory the local
1116                  * directory.
1117                  */
1118                 if (mdt_object_remote(mp) && rc == -ENOENT &&
1119                     !fid_is_zero(rr->rr_fid2) &&
1120                     lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
1121                         no_name = 1;
1122                         *child_fid = *rr->rr_fid2;
1123                 } else {
1124                         GOTO(unlock_parent, rc);
1125                 }
1126         }
1127
1128         if (!fid_is_md_operative(child_fid))
1129                 GOTO(unlock_parent, rc = -EPERM);
1130
1131         /* We will lock the child regardless it is local or remote. No harm. */
1132         mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
1133         if (IS_ERR(mc))
1134                 GOTO(unlock_parent, rc = PTR_ERR(mc));
1135
1136         if (!cos_incompat) {
1137                 rc = mdt_object_striped(info, mc);
1138                 if (rc < 0)
1139                         GOTO(put_child, rc);
1140
1141                 cos_incompat = rc;
1142                 if (cos_incompat) {
1143                         mdt_object_put(info->mti_env, mc);
1144                         mdt_object_unlock(info, mp, parent_lh, -EAGAIN);
1145                         goto relock;
1146                 }
1147         }
1148
1149         child_lh = &info->mti_lh[MDT_LH_CHILD];
1150         mdt_lock_reg_init(child_lh, LCK_EX);
1151         if (info->mti_spec.sp_rm_entry) {
1152                 struct lu_ucred *uc  = mdt_ucred(info);
1153
1154                 if (!mdt_is_dne_client(req->rq_export))
1155                         /* Return -ENOTSUPP for old client */
1156                         GOTO(put_child, rc = -ENOTSUPP);
1157
1158                 if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN))
1159                         GOTO(put_child, rc = -EPERM);
1160
1161                 ma->ma_need = MA_INODE;
1162                 ma->ma_valid = 0;
1163                 rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1164                                 NULL, &rr->rr_name, ma, no_name);
1165                 GOTO(put_child, rc);
1166         }
1167
1168         if (mdt_object_remote(mc)) {
1169                 struct mdt_body  *repbody;
1170
1171                 if (!fid_is_zero(rr->rr_fid2)) {
1172                         CDEBUG(D_INFO, "%s: name "DNAME" cannot find "DFID"\n",
1173                                mdt_obd_name(info->mti_mdt),
1174                                PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1175                         GOTO(put_child, rc = -ENOENT);
1176                 }
1177                 CDEBUG(D_INFO, "%s: name "DNAME": "DFID" is on another MDT\n",
1178                        mdt_obd_name(info->mti_mdt),
1179                        PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1180
1181                 if (!mdt_is_dne_client(req->rq_export))
1182                         /* Return -ENOTSUPP for old client */
1183                         GOTO(put_child, rc = -ENOTSUPP);
1184
1185                 /* Revoke the LOOKUP lock of the remote object granted by
1186                  * this MDT. Since the unlink will happen on another MDT,
1187                  * it will release the LOOKUP lock right away. Then What
1188                  * would happen if another client try to grab the LOOKUP
1189                  * lock at the same time with unlink XXX
1190                  */
1191                 mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_LOOKUP);
1192                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1193                 LASSERT(repbody != NULL);
1194                 repbody->mbo_fid1 = *mdt_object_fid(mc);
1195                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
1196                 GOTO(unlock_child, rc = -EREMOTE);
1197         }
1198         /* We used to acquire MDS_INODELOCK_FULL here but we can't do
1199          * this now because a running HSM restore on the child (unlink
1200          * victim) will hold the layout lock. See LU-4002.
1201          */
1202         lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
1203         if (mdt_object_remote(mp)) {
1204                 /* Enqueue lookup lock from parent MDT */
1205                 rc = mdt_remote_object_lock(info, mp, mdt_object_fid(mc),
1206                                             &child_lh->mlh_rreg_lh,
1207                                             child_lh->mlh_rreg_mode,
1208                                             MDS_INODELOCK_LOOKUP, false);
1209                 if (rc != ELDLM_OK)
1210                         GOTO(put_child, rc);
1211
1212                 lock_ibits &= ~MDS_INODELOCK_LOOKUP;
1213         }
1214
1215         rc = mdt_reint_striped_lock(info, mc, child_lh, lock_ibits, einfo,
1216                                     cos_incompat);
1217         if (rc != 0)
1218                 GOTO(put_child, rc);
1219
1220         /*
1221          * Now we can only make sure we need MA_INODE, in mdd layer, will check
1222          * whether need MA_LOV and MA_COOKIE.
1223          */
1224         ma->ma_need = MA_INODE;
1225         ma->ma_valid = 0;
1226
1227         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1228                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
1229         /* save version when object is locked */
1230         mdt_version_get_save(info, mc, 1);
1231
1232         mutex_lock(&mc->mot_lov_mutex);
1233
1234         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1235                         mdt_object_child(mc), &rr->rr_name, ma, no_name);
1236
1237         mutex_unlock(&mc->mot_lov_mutex);
1238         if (rc != 0)
1239                 GOTO(unlock_child, rc);
1240
1241         if (!lu_object_is_dying(&mc->mot_header)) {
1242                 rc = mdt_attr_get_complex(info, mc, ma);
1243                 if (rc)
1244                         GOTO(out_stat, rc);
1245         } else if (mdt_dom_check_for_discard(info, mc)) {
1246                 mdt_dom_discard_data(info, mc);
1247         }
1248         mdt_handle_last_unlink(info, mc, ma);
1249
1250 out_stat:
1251         if (ma->ma_valid & MA_INODE) {
1252                 switch (ma->ma_attr.la_mode & S_IFMT) {
1253                 case S_IFDIR:
1254                         mdt_counter_incr(req, LPROC_MDT_RMDIR,
1255                                          ktime_us_delta(ktime_get(), kstart));
1256                         break;
1257                 case S_IFREG:
1258                 case S_IFLNK:
1259                 case S_IFCHR:
1260                 case S_IFBLK:
1261                 case S_IFIFO:
1262                 case S_IFSOCK:
1263                         mdt_counter_incr(req, LPROC_MDT_UNLINK,
1264                                          ktime_us_delta(ktime_get(), kstart));
1265                         break;
1266                 default:
1267                         LASSERTF(0, "bad file type %o unlinking\n",
1268                                 ma->ma_attr.la_mode);
1269                 }
1270         }
1271
1272         EXIT;
1273
1274 unlock_child:
1275         mdt_reint_striped_unlock(info, mc, child_lh, einfo, rc);
1276 put_child:
1277         mdt_object_put(info->mti_env, mc);
1278 unlock_parent:
1279         mdt_object_unlock(info, mp, parent_lh, rc);
1280 put_parent:
1281         mdt_object_put(info->mti_env, mp);
1282         CFS_RACE_WAKEUP(OBD_FAIL_OBD_ZERO_NLINK_RACE);
1283         return rc;
1284 }
1285
1286 /*
1287  * VBR: save versions in reply: 0 - parent; 1 - child by fid; 2 - target by
1288  * name.
1289  */
1290 static int mdt_reint_link(struct mdt_thread_info *info,
1291                           struct mdt_lock_handle *lhc)
1292 {
1293         struct mdt_reint_record *rr = &info->mti_rr;
1294         struct ptlrpc_request   *req = mdt_info_req(info);
1295         struct md_attr          *ma = &info->mti_attr;
1296         struct mdt_object       *ms;
1297         struct mdt_object       *mp;
1298         struct mdt_lock_handle  *lhs;
1299         struct mdt_lock_handle  *lhp;
1300         ktime_t kstart = ktime_get();
1301         bool cos_incompat;
1302         int rc;
1303
1304         ENTRY;
1305         DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/"DNAME,
1306                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), PNAME(&rr->rr_name));
1307
1308         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
1309                 RETURN(err_serious(-ENOENT));
1310
1311         if (OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_RESEND_RACE) ||
1312             OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND)) {
1313                 req->rq_no_reply = 1;
1314                 RETURN(err_serious(-ENOENT));
1315         }
1316
1317         if (info->mti_dlm_req)
1318                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1319
1320         /* Invalid case so return error immediately instead of
1321          * processing it
1322          */
1323         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
1324                 RETURN(-EPERM);
1325
1326         if (!fid_is_md_operative(rr->rr_fid1) ||
1327             !fid_is_md_operative(rr->rr_fid2))
1328                 RETURN(-EPERM);
1329
1330         /* step 1: find target parent dir */
1331         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
1332         if (IS_ERR(mp))
1333                 RETURN(PTR_ERR(mp));
1334
1335         rc = mdt_version_get_check_save(info, mp, 0);
1336         if (rc)
1337                 GOTO(put_parent, rc);
1338
1339         /* step 2: find source */
1340         ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1341         if (IS_ERR(ms))
1342                 GOTO(put_parent, rc = PTR_ERR(ms));
1343
1344         if (!mdt_object_exists(ms)) {
1345                 CDEBUG(D_INFO, "%s: "DFID" does not exist.\n",
1346                        mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1));
1347                 GOTO(put_source, rc = -ENOENT);
1348         }
1349
1350         cos_incompat = (mdt_object_remote(mp) || mdt_object_remote(ms));
1351
1352         OBD_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
1353
1354         lhp = &info->mti_lh[MDT_LH_PARENT];
1355         mdt_lock_pdo_init(lhp, LCK_PW, &rr->rr_name);
1356         rc = mdt_reint_object_lock(info, mp, lhp, MDS_INODELOCK_UPDATE,
1357                                    cos_incompat);
1358         if (rc != 0)
1359                 GOTO(put_source, rc);
1360
1361         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
1362
1363         lhs = &info->mti_lh[MDT_LH_CHILD];
1364         mdt_lock_reg_init(lhs, LCK_EX);
1365         rc = mdt_reint_object_lock(info, ms, lhs,
1366                                    MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
1367                                    cos_incompat);
1368         if (rc != 0)
1369                 GOTO(unlock_parent, rc);
1370
1371         /* step 3: link it */
1372         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1373                         OBD_FAIL_MDS_REINT_LINK_WRITE);
1374
1375         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(ms));
1376         rc = mdt_version_get_check_save(info, ms, 1);
1377         if (rc)
1378                 GOTO(unlock_source, rc);
1379
1380         /** check target version by name during replay */
1381         rc = mdt_lookup_version_check(info, mp, &rr->rr_name,
1382                                       &info->mti_tmp_fid1, 2);
1383         if (rc != 0 && rc != -ENOENT)
1384                 GOTO(unlock_source, rc);
1385         /* save version of file name for replay, it must be ENOENT here */
1386         if (!req_is_replay(mdt_info_req(info))) {
1387                 if (rc != -ENOENT) {
1388                         CDEBUG(D_INFO, "link target "DNAME" existed!\n",
1389                                PNAME(&rr->rr_name));
1390                         GOTO(unlock_source, rc = -EEXIST);
1391                 }
1392                 info->mti_ver[2] = ENOENT_VERSION;
1393                 mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2);
1394         }
1395
1396         rc = mdo_link(info->mti_env, mdt_object_child(mp),
1397                       mdt_object_child(ms), &rr->rr_name, ma);
1398
1399         if (rc == 0)
1400                 mdt_counter_incr(req, LPROC_MDT_LINK,
1401                                  ktime_us_delta(ktime_get(), kstart));
1402
1403         EXIT;
1404 unlock_source:
1405         mdt_object_unlock(info, ms, lhs, rc);
1406 unlock_parent:
1407         mdt_object_unlock(info, mp, lhp, rc);
1408 put_source:
1409         mdt_object_put(info->mti_env, ms);
1410 put_parent:
1411         mdt_object_put(info->mti_env, mp);
1412         return rc;
1413 }
1414 /**
1415  * lock the part of the directory according to the hash of the name
1416  * (lh->mlh_pdo_hash) in parallel directory lock.
1417  */
1418 static int mdt_pdir_hash_lock(struct mdt_thread_info *info,
1419                               struct mdt_lock_handle *lh,
1420                               struct mdt_object *obj, __u64 ibits,
1421                               bool cos_incompat)
1422 {
1423         struct ldlm_res_id *res = &info->mti_res_id;
1424         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1425         union ldlm_policy_data *policy = &info->mti_policy;
1426         __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
1427         int rc;
1428
1429         /*
1430          * Finish res_id initializing by name hash marking part of
1431          * directory which is taking modification.
1432          */
1433         LASSERT(lh->mlh_pdo_hash != 0);
1434         fid_build_pdo_res_name(mdt_object_fid(obj), lh->mlh_pdo_hash, res);
1435         memset(policy, 0, sizeof(*policy));
1436         policy->l_inodebits.bits = ibits;
1437         if (cos_incompat &&
1438             (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX))
1439                 dlmflags |= LDLM_FL_COS_INCOMPAT;
1440         /*
1441          * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
1442          * going to be sent to client. If it is - mdt_intent_policy() path will
1443          * fix it up and turn FL_LOCAL flag off.
1444          */
1445         rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode,
1446                           policy, res, dlmflags,
1447                           &info->mti_exp->exp_handle.h_cookie);
1448         return rc;
1449 }
1450
1451 /**
1452  * Get BFL lock for rename or migrate process.
1453  **/
1454 static int mdt_rename_lock(struct mdt_thread_info *info,
1455                            struct lustre_handle *lh)
1456 {
1457         int     rc;
1458
1459         ENTRY;
1460         if (mdt_seq_site(info->mti_mdt)->ss_node_id != 0) {
1461                 struct lu_fid *fid = &info->mti_tmp_fid1;
1462                 struct mdt_object *obj;
1463
1464                 /* XXX, right now, it has to use object API to
1465                  * enqueue lock cross MDT, so it will enqueue
1466                  * rename lock(with LUSTRE_BFL_FID) by root object
1467                  */
1468                 lu_root_fid(fid);
1469                 obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1470                 if (IS_ERR(obj))
1471                         RETURN(PTR_ERR(obj));
1472
1473                 rc = mdt_remote_object_lock(info, obj,
1474                                             &LUSTRE_BFL_FID, lh,
1475                                             LCK_EX,
1476                                             MDS_INODELOCK_UPDATE, false);
1477                 mdt_object_put(info->mti_env, obj);
1478         } else {
1479                 struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1480                 union ldlm_policy_data *policy = &info->mti_policy;
1481                 struct ldlm_res_id *res_id = &info->mti_res_id;
1482                 __u64 flags = 0;
1483
1484                 fid_build_reg_res_name(&LUSTRE_BFL_FID, res_id);
1485                 memset(policy, 0, sizeof(*policy));
1486                 policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
1487                 flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
1488                 rc = ldlm_cli_enqueue_local(info->mti_env, ns, res_id,
1489                                             LDLM_IBITS, policy, LCK_EX, &flags,
1490                                             ldlm_blocking_ast,
1491                                             ldlm_completion_ast, NULL, NULL, 0,
1492                                             LVB_T_NONE,
1493                                             &info->mti_exp->exp_handle.h_cookie,
1494                                             lh);
1495                 RETURN(rc);
1496         }
1497         RETURN(rc);
1498 }
1499
1500 static void mdt_rename_unlock(struct lustre_handle *lh)
1501 {
1502         ENTRY;
1503         LASSERT(lustre_handle_is_used(lh));
1504         /* Cancel the single rename lock right away */
1505         ldlm_lock_decref_and_cancel(lh, LCK_EX);
1506         EXIT;
1507 }
1508
1509 static struct mdt_object *mdt_parent_find_check(struct mdt_thread_info *info,
1510                                                 const struct lu_fid *fid,
1511                                                 int idx)
1512 {
1513         struct mdt_object *dir;
1514         int rc;
1515
1516         ENTRY;
1517         dir = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1518         if (IS_ERR(dir))
1519                 RETURN(dir);
1520
1521         /* check early, the real version will be saved after locking */
1522         rc = mdt_version_get_check(info, dir, idx);
1523         if (rc)
1524                 GOTO(out_put, rc);
1525
1526         if (!mdt_object_exists(dir))
1527                 GOTO(out_put, rc = -ENOENT);
1528
1529         if (!S_ISDIR(lu_object_attr(&dir->mot_obj)))
1530                 GOTO(out_put, rc = -ENOTDIR);
1531
1532         RETURN(dir);
1533 out_put:
1534         mdt_object_put(info->mti_env, dir);
1535         return ERR_PTR(rc);
1536 }
1537
1538 /*
1539  * in case obj is remote obj on its parent, revoke LOOKUP lock,
1540  * herein we don't really check it, just do revoke.
1541  */
1542 int mdt_revoke_remote_lookup_lock(struct mdt_thread_info *info,
1543                                   struct mdt_object *pobj,
1544                                   struct mdt_object *obj)
1545 {
1546         struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
1547         int rc;
1548
1549         mdt_lock_handle_init(lh);
1550         mdt_lock_reg_init(lh, LCK_EX);
1551
1552         if (mdt_object_remote(pobj)) {
1553                 /* don't bother to check if pobj and obj are on the same MDT. */
1554                 rc = mdt_remote_object_lock(info, pobj, mdt_object_fid(obj),
1555                                             &lh->mlh_rreg_lh, LCK_EX,
1556                                             MDS_INODELOCK_LOOKUP, false);
1557         } else if (mdt_object_remote(obj)) {
1558                 struct ldlm_res_id *res = &info->mti_res_id;
1559                 union ldlm_policy_data *policy = &info->mti_policy;
1560                 __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB |
1561                                  LDLM_FL_COS_INCOMPAT;
1562
1563                 fid_build_reg_res_name(mdt_object_fid(obj), res);
1564                 memset(policy, 0, sizeof(*policy));
1565                 policy->l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1566                 rc = mdt_fid_lock(info->mti_env, info->mti_mdt->mdt_namespace,
1567                                   &lh->mlh_reg_lh, LCK_EX, policy, res,
1568                                   dlmflags, NULL);
1569         } else {
1570                 /* do nothing if both are local */
1571                 return 0;
1572         }
1573
1574         if (rc != ELDLM_OK)
1575                 return rc;
1576
1577         /*
1578          * TODO, currently we don't save this lock because there is no place to
1579          * hold this lock handle, but to avoid race we need to save this lock.
1580          */
1581         mdt_object_unlock(info, NULL, lh, 1);
1582
1583         return 0;
1584 }
1585
1586 /*
1587  * operation may takes locks of linkea, or directory stripes, group them in
1588  * different list.
1589  */
1590 struct mdt_sub_lock {
1591         struct mdt_object *msl_obj;
1592         struct mdt_lock_handle msl_lh;
1593         struct list_head msl_linkage;
1594 };
1595
1596 static void mdt_unlock_list(struct mdt_thread_info *info,
1597                             struct list_head *list, int decref)
1598 {
1599         struct mdt_sub_lock *msl;
1600         struct mdt_sub_lock *tmp;
1601
1602         list_for_each_entry_safe(msl, tmp, list, msl_linkage) {
1603                 mdt_object_unlock_put(info, msl->msl_obj, &msl->msl_lh, decref);
1604                 list_del(&msl->msl_linkage);
1605                 OBD_FREE_PTR(msl);
1606         }
1607 }
1608
1609 static inline void mdt_migrate_object_unlock(struct mdt_thread_info *info,
1610                                              struct mdt_object *obj,
1611                                              struct mdt_lock_handle *lh,
1612                                              struct ldlm_enqueue_info *einfo,
1613                                              struct list_head *slave_locks,
1614                                              int decref)
1615 {
1616         if (mdt_object_remote(obj)) {
1617                 mdt_unlock_list(info, slave_locks, decref);
1618                 mdt_object_unlock(info, obj, lh, decref);
1619         } else {
1620                 mdt_reint_striped_unlock(info, obj, lh, einfo, decref);
1621         }
1622 }
1623
1624 /*
1625  * lock parents of links, and also check whether total locks don't exceed
1626  * RS_MAX_LOCKS.
1627  *
1628  * \retval      0 on success, and locks can be saved in ptlrpc_reply_stat
1629  * \retval      1 on success, but total lock count may exceed RS_MAX_LOCKS
1630  * \retval      -ev negative errno upon error
1631  */
1632 static int mdt_link_parents_lock(struct mdt_thread_info *info,
1633                                  struct mdt_object *pobj,
1634                                  const struct md_attr *ma,
1635                                  struct mdt_object *obj,
1636                                  struct mdt_lock_handle *lhp,
1637                                  struct ldlm_enqueue_info *peinfo,
1638                                  struct list_head *parent_slave_locks,
1639                                  struct list_head *link_locks)
1640 {
1641         struct mdt_device *mdt = info->mti_mdt;
1642         struct lu_buf *buf = &info->mti_big_buf;
1643         struct lu_name *lname = &info->mti_name;
1644         struct linkea_data ldata = { NULL };
1645         bool blocked = false;
1646         int local_lnkp_cnt = 0;
1647         int rc;
1648
1649         ENTRY;
1650         if (S_ISDIR(lu_object_attr(&obj->mot_obj)))
1651                 RETURN(0);
1652
1653         buf = lu_buf_check_and_alloc(buf, MAX_LINKEA_SIZE);
1654         if (buf->lb_buf == NULL)
1655                 RETURN(-ENOMEM);
1656
1657         ldata.ld_buf = buf;
1658         rc = mdt_links_read(info, obj, &ldata);
1659         if (rc) {
1660                 if (rc == -ENOENT || rc == -ENODATA)
1661                         rc = 0;
1662                 RETURN(rc);
1663         }
1664
1665         for (linkea_first_entry(&ldata); ldata.ld_lee && !rc;
1666              linkea_next_entry(&ldata)) {
1667                 struct mdt_object *lnkp;
1668                 struct mdt_sub_lock *msl;
1669                 struct lu_fid fid;
1670                 __u64 ibits;
1671
1672                 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, lname,
1673                                     &fid);
1674
1675                 /* check if it's also linked to parent */
1676                 if (lu_fid_eq(mdt_object_fid(pobj), &fid)) {
1677                         CDEBUG(D_INFO, "skip parent "DFID", reovke "DNAME"\n",
1678                                PFID(&fid), PNAME(lname));
1679                         /* in case link is remote object, revoke LOOKUP lock */
1680                         rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
1681                         continue;
1682                 }
1683
1684                 lnkp = NULL;
1685
1686                 /* check if it's linked to a stripe of parent */
1687                 if (ma->ma_valid & MA_LMV) {
1688                         struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1689                         struct lu_fid *stripe_fid = &info->mti_tmp_fid1;
1690                         int j = 0;
1691
1692                         for (; j < le32_to_cpu(lmv->lmv_stripe_count); j++) {
1693                                 fid_le_to_cpu(stripe_fid,
1694                                               &lmv->lmv_stripe_fids[j]);
1695                                 if (lu_fid_eq(stripe_fid, &fid)) {
1696                                         CDEBUG(D_INFO, "skip stripe "DFID
1697                                                ", reovke "DNAME"\n",
1698                                                PFID(&fid), PNAME(lname));
1699                                         lnkp = mdt_object_find(info->mti_env,
1700                                                                mdt, &fid);
1701                                         if (IS_ERR(lnkp))
1702                                                 GOTO(out, rc = PTR_ERR(lnkp));
1703                                         break;
1704                                 }
1705                         }
1706
1707                         if (lnkp) {
1708                                 rc = mdt_revoke_remote_lookup_lock(info, lnkp,
1709                                                                    obj);
1710                                 mdt_object_put(info->mti_env, lnkp);
1711                                 continue;
1712                         }
1713                 }
1714
1715                 /* Check if it's already locked */
1716                 list_for_each_entry(msl, link_locks, msl_linkage) {
1717                         if (lu_fid_eq(mdt_object_fid(msl->msl_obj), &fid)) {
1718                                 CDEBUG(D_INFO,
1719                                        DFID" was locked, revoke "DNAME"\n",
1720                                        PFID(&fid), PNAME(lname));
1721                                 lnkp = msl->msl_obj;
1722                                 break;
1723                         }
1724                 }
1725
1726                 if (lnkp) {
1727                         rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
1728                         continue;
1729                 }
1730
1731                 CDEBUG(D_INFO, "lock "DFID":"DNAME"\n",
1732                        PFID(&fid), PNAME(lname));
1733
1734                 lnkp = mdt_object_find(info->mti_env, mdt, &fid);
1735                 if (IS_ERR(lnkp)) {
1736                         CWARN("%s: cannot find obj "DFID": %ld\n",
1737                               mdt_obd_name(mdt), PFID(&fid), PTR_ERR(lnkp));
1738                         continue;
1739                 }
1740
1741                 if (!mdt_object_exists(lnkp)) {
1742                         CDEBUG(D_INFO, DFID" doesn't exist, skip "DNAME"\n",
1743                               PFID(&fid), PNAME(lname));
1744                         mdt_object_put(info->mti_env, lnkp);
1745                         continue;
1746                 }
1747
1748                 if (!mdt_object_remote(lnkp))
1749                         local_lnkp_cnt++;
1750
1751                 OBD_ALLOC_PTR(msl);
1752                 if (msl == NULL)
1753                         GOTO(out, rc = -ENOMEM);
1754
1755                 /*
1756                  * we can't follow parent-child lock order like other MD
1757                  * operations, use lock_try here to avoid deadlock, if the lock
1758                  * cannot be taken, drop all locks taken, revoke the blocked
1759                  * one, and continue processing the remaining entries, and in
1760                  * the end of the loop restart from beginning.
1761                  */
1762                 mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
1763                 ibits = 0;
1764                 rc = mdt_object_lock_try(info, lnkp, &msl->msl_lh, &ibits,
1765                                          MDS_INODELOCK_UPDATE, true);
1766                 if (!(ibits & MDS_INODELOCK_UPDATE)) {
1767
1768                         CDEBUG(D_INFO, "busy lock on "DFID" "DNAME"\n",
1769                                PFID(&fid), PNAME(lname));
1770
1771                         mdt_unlock_list(info, link_locks, 1);
1772                         /* also unlock parent locks to avoid deadlock */
1773                         if (!blocked)
1774                                 mdt_migrate_object_unlock(info, pobj, lhp,
1775                                                           peinfo,
1776                                                           parent_slave_locks,
1777                                                           1);
1778
1779                         blocked = true;
1780
1781                         mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
1782                         rc = mdt_object_lock(info, lnkp, &msl->msl_lh,
1783                                              MDS_INODELOCK_UPDATE);
1784                         if (rc) {
1785                                 mdt_object_put(info->mti_env, lnkp);
1786                                 OBD_FREE_PTR(msl);
1787                                 GOTO(out, rc);
1788                         }
1789
1790                         if (mdt_object_remote(lnkp)) {
1791                                 struct ldlm_lock *lock;
1792
1793                                 /*
1794                                  * for remote object, set lock cb_atomic,
1795                                  * so lock can be released in blocking_ast()
1796                                  * immediately, then the next lock_try will
1797                                  * have better chance of success.
1798                                  */
1799                                 lock = ldlm_handle2lock(
1800                                                 &msl->msl_lh.mlh_rreg_lh);
1801                                 LASSERT(lock != NULL);
1802                                 lock_res_and_lock(lock);
1803                                 ldlm_set_atomic_cb(lock);
1804                                 unlock_res_and_lock(lock);
1805                                 LDLM_LOCK_PUT(lock);
1806                         }
1807
1808                         mdt_object_unlock_put(info, lnkp, &msl->msl_lh, 1);
1809                         OBD_FREE_PTR(msl);
1810                         continue;
1811                 }
1812
1813                 INIT_LIST_HEAD(&msl->msl_linkage);
1814                 msl->msl_obj = lnkp;
1815                 list_add_tail(&msl->msl_linkage, link_locks);
1816
1817                 rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
1818         }
1819
1820         if (blocked)
1821                 GOTO(out, rc = -EBUSY);
1822
1823         EXIT;
1824 out:
1825         if (rc) {
1826                 mdt_unlock_list(info, link_locks, rc);
1827         } else if (local_lnkp_cnt > RS_MAX_LOCKS - 5) {
1828                 CDEBUG(D_INFO, "Too many links (%d), sync operations\n",
1829                        local_lnkp_cnt);
1830                 /*
1831                  * parent may have 3 local objects: master object and 2 stripes
1832                  * (if it's being migrated too); source may have 1 local objects
1833                  * as regular file; target has 1 local object.
1834                  * Note, source may have 2 local locks if it is directory but it
1835                  * can't have hardlinks, so it is not considered here.
1836                  */
1837                 rc = 1;
1838         }
1839         return rc;
1840 }
1841
1842 static int mdt_lock_remote_slaves(struct mdt_thread_info *info,
1843                                   struct mdt_object *obj,
1844                                   const struct md_attr *ma,
1845                                   struct list_head *slave_locks)
1846 {
1847         struct mdt_device *mdt = info->mti_mdt;
1848         const struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1849         struct lu_fid *fid = &info->mti_tmp_fid1;
1850         struct mdt_object *slave;
1851         struct mdt_sub_lock *msl;
1852         int i;
1853         int rc;
1854
1855         ENTRY;
1856         LASSERT(mdt_object_remote(obj));
1857         LASSERT(ma->ma_valid & MA_LMV);
1858         LASSERT(lmv);
1859
1860         if (!lmv_is_sane(lmv))
1861                 RETURN(-EINVAL);
1862
1863         for (i = 0; i < le32_to_cpu(lmv->lmv_stripe_count); i++) {
1864                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[i]);
1865
1866                 if (!fid_is_sane(fid))
1867                         continue;
1868
1869                 slave = mdt_object_find(info->mti_env, mdt, fid);
1870                 if (IS_ERR(slave))
1871                         GOTO(out, rc = PTR_ERR(slave));
1872
1873                 OBD_ALLOC_PTR(msl);
1874                 if (!msl) {
1875                         mdt_object_put(info->mti_env, slave);
1876                         GOTO(out, rc = -ENOMEM);
1877                 }
1878
1879                 mdt_lock_reg_init(&msl->msl_lh, LCK_EX);
1880                 rc = mdt_reint_object_lock(info, slave, &msl->msl_lh,
1881                                            MDS_INODELOCK_UPDATE, true);
1882                 if (rc) {
1883                         OBD_FREE_PTR(msl);
1884                         mdt_object_put(info->mti_env, slave);
1885                         GOTO(out, rc);
1886                 }
1887
1888                 INIT_LIST_HEAD(&msl->msl_linkage);
1889                 msl->msl_obj = slave;
1890                 list_add_tail(&msl->msl_linkage, slave_locks);
1891         }
1892         EXIT;
1893
1894 out:
1895         if (rc)
1896                 mdt_unlock_list(info, slave_locks, rc);
1897         return rc;
1898 }
1899
1900 /* lock parent and its stripes */
1901 static int mdt_migrate_parent_lock(struct mdt_thread_info *info,
1902                                    struct mdt_object *obj,
1903                                    const struct md_attr *ma,
1904                                    struct mdt_lock_handle *lh,
1905                                    struct ldlm_enqueue_info *einfo,
1906                                    struct list_head *slave_locks)
1907 {
1908         int rc;
1909
1910         if (mdt_object_remote(obj)) {
1911                 rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
1912                                             &lh->mlh_rreg_lh, LCK_PW,
1913                                             MDS_INODELOCK_UPDATE, false);
1914                 if (rc != ELDLM_OK)
1915                         return rc;
1916
1917                 /*
1918                  * if obj is remote and striped, lock its stripes explicitly
1919                  * because it's not striped in LOD layer on this MDT.
1920                  */
1921                 if (ma->ma_valid & MA_LMV) {
1922                         rc = mdt_lock_remote_slaves(info, obj, ma, slave_locks);
1923                         if (rc)
1924                                 mdt_object_unlock(info, obj, lh, rc);
1925                 }
1926         } else {
1927                 rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_UPDATE,
1928                                             einfo, true);
1929         }
1930
1931         return rc;
1932 }
1933
1934 /*
1935  * in migration, object may be remote, and we need take full lock of it and its
1936  * stripes if it's directory, besides, object may be a remote object on its
1937  * parent, revoke its LOOKUP lock on where its parent is located.
1938  */
1939 static int mdt_migrate_object_lock(struct mdt_thread_info *info,
1940                                    struct mdt_object *pobj,
1941                                    struct mdt_object *obj,
1942                                    struct mdt_lock_handle *lh,
1943                                    struct ldlm_enqueue_info *einfo,
1944                                    struct list_head *slave_locks)
1945 {
1946         int rc;
1947
1948         if (mdt_object_remote(obj)) {
1949                 rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
1950                 if (rc)
1951                         return rc;
1952
1953                 rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
1954                                             &lh->mlh_rreg_lh, LCK_EX,
1955                                             MDS_INODELOCK_FULL, false);
1956                 if (rc != ELDLM_OK)
1957                         return rc;
1958
1959                 /*
1960                  * if obj is remote and striped, lock its stripes explicitly
1961                  * because it's not striped in LOD layer on this MDT.
1962                  */
1963                 if (S_ISDIR(lu_object_attr(&obj->mot_obj))) {
1964                         struct md_attr *ma = &info->mti_attr;
1965
1966                         rc = mdt_stripe_get(info, obj, ma, XATTR_NAME_LMV);
1967                         if (rc) {
1968                                 mdt_object_unlock(info, obj, lh, rc);
1969                                 return rc;
1970                         }
1971
1972                         if (ma->ma_valid & MA_LMV) {
1973                                 rc = mdt_lock_remote_slaves(info, obj, ma,
1974                                                             slave_locks);
1975                                 if (rc)
1976                                         mdt_object_unlock(info, obj, lh, rc);
1977                         }
1978                 }
1979         } else {
1980                 if (mdt_object_remote(pobj)) {
1981                         rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
1982                         if (rc)
1983                                 return rc;
1984                 }
1985
1986                 rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_FULL,
1987                                             einfo, true);
1988         }
1989
1990         return rc;
1991 }
1992
1993 /*
1994  * lookup source by name, if parent is striped directory, we need to find the
1995  * corresponding stripe where source is located, and then lookup there.
1996  *
1997  * besides, if parent is migrating too, and file is already in target stripe,
1998  * this should be a redo of 'lfs migrate' on client side.
1999  */
2000 static int mdt_migrate_lookup(struct mdt_thread_info *info,
2001                               struct mdt_object *pobj,
2002                               const struct md_attr *ma,
2003                               const struct lu_name *lname,
2004                               struct mdt_object **spobj,
2005                               struct mdt_object **sobj)
2006 {
2007         const struct lu_env *env = info->mti_env;
2008         struct lu_fid *fid = &info->mti_tmp_fid1;
2009         struct mdt_object *stripe;
2010         int rc;
2011
2012         if (ma->ma_valid & MA_LMV) {
2013                 /* if parent is striped, lookup on corresponding stripe */
2014                 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
2015
2016                 if (!lmv_is_sane(lmv))
2017                         return -EBADF;
2018
2019                 rc = lmv_name_to_stripe_index_old(lmv, lname->ln_name,
2020                                                   lname->ln_namelen);
2021                 if (rc < 0)
2022                         return rc;
2023
2024                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
2025
2026                 stripe = mdt_object_find(env, info->mti_mdt, fid);
2027                 if (IS_ERR(stripe))
2028                         return PTR_ERR(stripe);
2029
2030                 fid_zero(fid);
2031                 rc = mdo_lookup(env, mdt_object_child(stripe), lname, fid,
2032                                 &info->mti_spec);
2033                 if (rc == -ENOENT && lmv_is_layout_changing(lmv)) {
2034                         /*
2035                          * if parent layout is changeing, and lookup child
2036                          * failed on source stripe, lookup again on target
2037                          * stripe, if it exists, it means previous migration
2038                          * was interrupted, and current file was migrated
2039                          * already.
2040                          */
2041                         mdt_object_put(env, stripe);
2042
2043                         rc = lmv_name_to_stripe_index(lmv, lname->ln_name,
2044                                                       lname->ln_namelen);
2045                         if (rc < 0)
2046                                 return rc;
2047
2048                         fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
2049
2050                         stripe = mdt_object_find(env, info->mti_mdt, fid);
2051                         if (IS_ERR(stripe))
2052                                 return PTR_ERR(stripe);
2053
2054                         fid_zero(fid);
2055                         rc = mdo_lookup(env, mdt_object_child(stripe), lname,
2056                                         fid, &info->mti_spec);
2057                         mdt_object_put(env, stripe);
2058                         return rc ?: -EALREADY;
2059                 } else if (rc) {
2060                         mdt_object_put(env, stripe);
2061                         return rc;
2062                 }
2063         } else {
2064                 fid_zero(fid);
2065                 rc = mdo_lookup(env, mdt_object_child(pobj), lname, fid,
2066                                 &info->mti_spec);
2067                 if (rc)
2068                         return rc;
2069
2070                 stripe = pobj;
2071                 mdt_object_get(env, stripe);
2072         }
2073
2074         *spobj = stripe;
2075
2076         *sobj = mdt_object_find(env, info->mti_mdt, fid);
2077         if (IS_ERR(*sobj)) {
2078                 mdt_object_put(env, stripe);
2079                 rc = PTR_ERR(*sobj);
2080                 *spobj = NULL;
2081                 *sobj = NULL;
2082         }
2083
2084         return rc;
2085 }
2086
2087 /* end lease and close file for regular file */
2088 static int mdd_migrate_close(struct mdt_thread_info *info,
2089                              struct mdt_object *obj)
2090 {
2091         struct close_data *data;
2092         struct mdt_body *repbody;
2093         struct ldlm_lock *lease;
2094         int rc;
2095         int rc2;
2096
2097         rc = -EPROTO;
2098         if (!req_capsule_field_present(info->mti_pill, &RMF_MDT_EPOCH,
2099                                       RCL_CLIENT) ||
2100             !req_capsule_field_present(info->mti_pill, &RMF_CLOSE_DATA,
2101                                       RCL_CLIENT))
2102                 goto close;
2103
2104         data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
2105         if (!data)
2106                 goto close;
2107
2108         rc = -ESTALE;
2109         lease = ldlm_handle2lock(&data->cd_handle);
2110         if (!lease)
2111                 goto close;
2112
2113         /* check if the lease was already canceled */
2114         lock_res_and_lock(lease);
2115         rc = ldlm_is_cancel(lease);
2116         unlock_res_and_lock(lease);
2117
2118         if (rc) {
2119                 rc = -EAGAIN;
2120                 LDLM_DEBUG(lease, DFID" lease broken",
2121                            PFID(mdt_object_fid(obj)));
2122         }
2123
2124         /*
2125          * cancel server side lease, client side counterpart should have been
2126          * cancelled, it's okay to cancel it now as we've held mot_open_sem.
2127          */
2128         ldlm_lock_cancel(lease);
2129         ldlm_reprocess_all(lease->l_resource, lease);
2130         LDLM_LOCK_PUT(lease);
2131
2132 close:
2133         rc2 = mdt_close_internal(info, mdt_info_req(info), NULL);
2134         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2135         repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
2136
2137         return rc ?: rc2;
2138 }
2139
2140 /*
2141  * migrate file in below steps:
2142  *  1. lock parent and its stripes
2143  *  2. lookup source by name
2144  *  3. lock parents of source links if source is not directory
2145  *  4. reject if source is in HSM
2146  *  5. take source open_sem and close file if source is regular file
2147  *  6. lock source and its stripes if it's directory
2148  *  7. lock target so subsequent change to it can trigger COS
2149  *  8. migrate file
2150  *  9. unlock above locks
2151  * 10. sync device if source has links
2152  */
2153 int mdt_reint_migrate(struct mdt_thread_info *info,
2154                       struct mdt_lock_handle *unused)
2155 {
2156         const struct lu_env *env = info->mti_env;
2157         struct mdt_device *mdt = info->mti_mdt;
2158         struct ptlrpc_request *req = mdt_info_req(info);
2159         struct mdt_reint_record *rr = &info->mti_rr;
2160         struct lu_ucred *uc = mdt_ucred(info);
2161         struct md_attr *ma = &info->mti_attr;
2162         struct ldlm_enqueue_info *peinfo = &info->mti_einfo[0];
2163         struct ldlm_enqueue_info *seinfo = &info->mti_einfo[1];
2164         struct mdt_object *pobj;
2165         struct mdt_object *spobj = NULL;
2166         struct mdt_object *sobj = NULL;
2167         struct mdt_object *tobj;
2168         struct lustre_handle rename_lh = { 0 };
2169         struct mdt_lock_handle *lhp;
2170         struct mdt_lock_handle *lhs;
2171         struct mdt_lock_handle *lht;
2172         LIST_HEAD(parent_slave_locks);
2173         LIST_HEAD(child_slave_locks);
2174         LIST_HEAD(link_locks);
2175         int lock_retries = 5;
2176         bool open_sem_locked = false;
2177         bool do_sync = false;
2178         int rc;
2179
2180         ENTRY;
2181         CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1),
2182                PNAME(&rr->rr_name), PFID(rr->rr_fid2));
2183
2184         if (info->mti_dlm_req)
2185                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2186
2187         if (!fid_is_md_operative(rr->rr_fid1) ||
2188             !fid_is_md_operative(rr->rr_fid2))
2189                 RETURN(-EPERM);
2190
2191         /* don't allow migrate . or .. */
2192         if (lu_name_is_dot_or_dotdot(&rr->rr_name))
2193                 RETURN(-EBUSY);
2194
2195         if (!mdt->mdt_enable_remote_dir || !mdt->mdt_enable_dir_migration)
2196                 RETURN(-EPERM);
2197
2198         if (uc && !cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
2199             uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
2200             mdt->mdt_enable_remote_dir_gid != -1)
2201                 RETURN(-EPERM);
2202
2203         /*
2204          * Note: do not enqueue rename lock for replay request, because
2205          * if other MDT holds rename lock, but being blocked to wait for
2206          * this MDT to finish its recovery, and the failover MDT can not
2207          * get rename lock, which will cause deadlock.
2208          *
2209          * req is NULL if this is called by directory auto-split.
2210          */
2211         if (req && !req_is_replay(req)) {
2212                 rc = mdt_rename_lock(info, &rename_lh);
2213                 if (rc != 0) {
2214                         CERROR("%s: can't lock FS for rename: rc = %d\n",
2215                                mdt_obd_name(info->mti_mdt), rc);
2216                         RETURN(rc);
2217                 }
2218         }
2219
2220         /* pobj is master object of parent */
2221         pobj = mdt_object_find(env, mdt, rr->rr_fid1);
2222         if (IS_ERR(pobj))
2223                 GOTO(unlock_rename, rc = PTR_ERR(pobj));
2224
2225         if (req) {
2226                 rc = mdt_version_get_check(info, pobj, 0);
2227                 if (rc)
2228                         GOTO(put_parent, rc);
2229         }
2230
2231         if (!mdt_object_exists(pobj))
2232                 GOTO(put_parent, rc = -ENOENT);
2233
2234         if (!S_ISDIR(lu_object_attr(&pobj->mot_obj)))
2235                 GOTO(put_parent, rc = -ENOTDIR);
2236
2237         rc = mdt_stripe_get(info, pobj, ma, XATTR_NAME_LMV);
2238         if (rc)
2239                 GOTO(put_parent, rc);
2240
2241 lock_parent:
2242         /* lock parent object */
2243         lhp = &info->mti_lh[MDT_LH_PARENT];
2244         mdt_lock_reg_init(lhp, LCK_PW);
2245         rc = mdt_migrate_parent_lock(info, pobj, ma, lhp, peinfo,
2246                                      &parent_slave_locks);
2247         if (rc)
2248                 GOTO(put_parent, rc);
2249
2250         /*
2251          * spobj is the corresponding stripe against name if pobj is striped
2252          * directory, which is the real parent, and no need to lock, because
2253          * we've taken full lock of pobj.
2254          */
2255         rc = mdt_migrate_lookup(info, pobj, ma, &rr->rr_name, &spobj, &sobj);
2256         if (rc)
2257                 GOTO(unlock_parent, rc);
2258
2259         /* lock parents of source links, and revoke LOOKUP lock of links */
2260         rc = mdt_link_parents_lock(info, pobj, ma, sobj, lhp, peinfo,
2261                                    &parent_slave_locks, &link_locks);
2262         if (rc == -EBUSY && lock_retries-- > 0) {
2263                 mdt_object_put(env, sobj);
2264                 mdt_object_put(env, spobj);
2265                 goto lock_parent;
2266         }
2267
2268         if (rc < 0)
2269                 GOTO(put_source, rc);
2270
2271         /*
2272          * RS_MAX_LOCKS is the limit of number of locks that can be saved along
2273          * with one request, if total lock count exceeds this limit, we will
2274          * drop all locks after migration, and synchronous device in the end.
2275          */
2276         do_sync = rc;
2277
2278         /* TODO: DoM migration is not supported, migrate dirent only */
2279         if (S_ISREG(lu_object_attr(&sobj->mot_obj))) {
2280                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LOV);
2281                 if (rc)
2282                         GOTO(unlock_links, rc);
2283
2284                 if (ma->ma_valid & MA_LOV && mdt_lmm_dom_stripesize(ma->ma_lmm))
2285                         info->mti_spec.sp_migrate_nsonly = 1;
2286         } else if (S_ISDIR(lu_object_attr(&sobj->mot_obj))) {
2287                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2288                 if (rc)
2289                         GOTO(unlock_links, rc);
2290
2291                 /* race with restripe/auto-split? */
2292                 if ((ma->ma_valid & MA_LMV) &&
2293                     lmv_is_restriping(&ma->ma_lmv->lmv_md_v1))
2294                         GOTO(unlock_links, rc = -EBUSY);
2295         }
2296
2297         /* if migration HSM is allowed */
2298         if (!mdt->mdt_opts.mo_migrate_hsm_allowed) {
2299                 ma->ma_need = MA_HSM;
2300                 ma->ma_valid = 0;
2301                 rc = mdt_attr_get_complex(info, sobj, ma);
2302                 if (rc)
2303                         GOTO(unlock_links, rc);
2304
2305                 if ((ma->ma_valid & MA_HSM) && ma->ma_hsm.mh_flags != 0)
2306                         GOTO(unlock_links, rc = -EOPNOTSUPP);
2307         }
2308
2309         /* end lease and close file for regular file */
2310         if (info->mti_spec.sp_migrate_close) {
2311                 /* try to hold open_sem so that nobody else can open the file */
2312                 if (!down_write_trylock(&sobj->mot_open_sem)) {
2313                         /* close anyway */
2314                         mdd_migrate_close(info, sobj);
2315                         GOTO(unlock_links, rc = -EBUSY);
2316                 } else {
2317                         open_sem_locked = true;
2318                         rc = mdd_migrate_close(info, sobj);
2319                         if (rc)
2320                                 GOTO(unlock_open_sem, rc);
2321                 }
2322         }
2323
2324         /* lock source */
2325         lhs = &info->mti_lh[MDT_LH_OLD];
2326         mdt_lock_reg_init(lhs, LCK_EX);
2327         rc = mdt_migrate_object_lock(info, spobj, sobj, lhs, seinfo,
2328                                      &child_slave_locks);
2329         if (rc)
2330                 GOTO(unlock_open_sem, rc);
2331
2332         /* lock target */
2333         tobj = mdt_object_find(env, mdt, rr->rr_fid2);
2334         if (IS_ERR(tobj))
2335                 GOTO(unlock_source, rc = PTR_ERR(tobj));
2336
2337         lht = &info->mti_lh[MDT_LH_NEW];
2338         mdt_lock_reg_init(lht, LCK_EX);
2339         rc = mdt_reint_object_lock(info, tobj, lht, MDS_INODELOCK_FULL, true);
2340         if (rc)
2341                 GOTO(put_target, rc);
2342
2343         /* Don't do lookup sanity check. We know name doesn't exist. */
2344         info->mti_spec.sp_cr_lookup = 0;
2345         info->mti_spec.sp_feat = &dt_directory_features;
2346
2347         rc = mdo_migrate(env, mdt_object_child(pobj),
2348                          mdt_object_child(sobj), &rr->rr_name,
2349                          mdt_object_child(tobj),
2350                          &info->mti_spec, ma);
2351         if (!rc)
2352                 lprocfs_counter_incr(mdt->mdt_lu_dev.ld_obd->obd_md_stats,
2353                                      LPROC_MDT_MIGRATE + LPROC_MD_LAST_OPC);
2354         EXIT;
2355
2356         mdt_object_unlock(info, tobj, lht, rc);
2357 put_target:
2358         mdt_object_put(env, tobj);
2359 unlock_source:
2360         mdt_migrate_object_unlock(info, sobj, lhs, seinfo,
2361                                   &child_slave_locks, rc);
2362 unlock_open_sem:
2363         if (open_sem_locked)
2364                 up_write(&sobj->mot_open_sem);
2365 unlock_links:
2366         /* if we've got too many locks to save into RPC,
2367          * then just commit before the locks are released
2368          */
2369         if (!rc && do_sync)
2370                 mdt_device_sync(env, mdt);
2371         mdt_unlock_list(info, &link_locks, do_sync ? 1 : rc);
2372 put_source:
2373         mdt_object_put(env, sobj);
2374         mdt_object_put(env, spobj);
2375 unlock_parent:
2376         mdt_migrate_object_unlock(info, pobj, lhp, peinfo,
2377                                   &parent_slave_locks, rc);
2378 put_parent:
2379         mdt_object_put(env, pobj);
2380 unlock_rename:
2381         if (lustre_handle_is_used(&rename_lh))
2382                 mdt_rename_unlock(&rename_lh);
2383
2384         return rc;
2385 }
2386
2387 static int mdt_object_lock_save(struct mdt_thread_info *info,
2388                                 struct mdt_object *dir,
2389                                 struct mdt_lock_handle *lh,
2390                                 int idx, bool cos_incompat)
2391 {
2392         int rc;
2393
2394         /* we lock the target dir if it is local */
2395         rc = mdt_reint_object_lock(info, dir, lh, MDS_INODELOCK_UPDATE,
2396                                    cos_incompat);
2397         if (rc != 0)
2398                 return rc;
2399
2400         /* get and save correct version after locking */
2401         mdt_version_get_save(info, dir, idx);
2402         return 0;
2403 }
2404
2405 /*
2406  * determine lock order of sobj and tobj
2407  *
2408  * there are two situations we need to lock tobj before sobj:
2409  * 1. sobj is child of tobj
2410  * 2. sobj and tobj are stripes of a directory, and stripe index of sobj is
2411  *    larger than that of tobj
2412  *
2413  * \retval      1 lock tobj before sobj
2414  * \retval      0 lock sobj before tobj
2415  * \retval      -ev negative errno upon error
2416  */
2417 static int mdt_rename_determine_lock_order(struct mdt_thread_info *info,
2418                                            struct mdt_object *sobj,
2419                                            struct mdt_object *tobj)
2420 {
2421         struct md_attr *ma = &info->mti_attr;
2422         struct lu_fid *spfid = &info->mti_tmp_fid1;
2423         struct lu_fid *tpfid = &info->mti_tmp_fid2;
2424         struct lmv_mds_md_v1 *lmv;
2425         __u32 sindex;
2426         __u32 tindex;
2427         int rc;
2428
2429         /* sobj and tobj are the same */
2430         if (sobj == tobj)
2431                 return 0;
2432
2433         if (fid_is_root(mdt_object_fid(sobj)))
2434                 return 0;
2435
2436         if (fid_is_root(mdt_object_fid(tobj)))
2437                 return 1;
2438
2439         /* check whether sobj is child of tobj */
2440         rc = mdo_is_subdir(info->mti_env, mdt_object_child(sobj),
2441                            mdt_object_fid(tobj));
2442         if (rc < 0)
2443                 return rc;
2444
2445         if (rc == 1)
2446                 return 1;
2447
2448         /* check whether sobj and tobj are children of the same parent */
2449         rc = mdt_attr_get_pfid(info, sobj, spfid);
2450         if (rc)
2451                 return rc;
2452
2453         rc = mdt_attr_get_pfid(info, tobj, tpfid);
2454         if (rc)
2455                 return rc;
2456
2457         if (!lu_fid_eq(spfid, tpfid))
2458                 return 0;
2459
2460         /* check whether sobj and tobj are sibling stripes */
2461         rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2462         if (rc)
2463                 return rc;
2464
2465         if (!(ma->ma_valid & MA_LMV))
2466                 return 0;
2467
2468         lmv = &ma->ma_lmv->lmv_md_v1;
2469         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2470                 return 0;
2471         sindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2472
2473         ma->ma_valid = 0;
2474         rc = mdt_stripe_get(info, tobj, ma, XATTR_NAME_LMV);
2475         if (rc)
2476                 return rc;
2477
2478         if (!(ma->ma_valid & MA_LMV))
2479                 return -ENODATA;
2480
2481         lmv = &ma->ma_lmv->lmv_md_v1;
2482         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2483                 return -EINVAL;
2484         tindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2485
2486         /* check stripe index of sobj and tobj */
2487         if (sindex == tindex)
2488                 return -EINVAL;
2489
2490         return sindex < tindex ? 0 : 1;
2491 }
2492
2493 /*
2494  * lock rename source object.
2495  *
2496  * Both source and source parent may be remote, and source may be a remote
2497  * object on source parent, to avoid overriding lock handle, store remote
2498  * LOOKUP lock separately in @lhr.
2499  *
2500  * \retval      0 on success
2501  * \retval      -ev negative errno upon error
2502  */
2503 static int mdt_rename_source_lock(struct mdt_thread_info *info,
2504                                   struct mdt_object *parent,
2505                                   struct mdt_object *child,
2506                                   struct mdt_lock_handle *lhc,
2507                                   struct mdt_lock_handle *lhr,
2508                                   __u64 ibits,
2509                                   bool cos_incompat)
2510 {
2511         int rc;
2512
2513         rc = mdt_is_remote_object(info, parent, child);
2514         if (rc < 0)
2515                 return rc;
2516
2517         if (rc) {
2518                 /* enqueue remote LOOKUP lock from the parent MDT */
2519                 __u64 rmt_ibits = MDS_INODELOCK_LOOKUP;
2520
2521                 if (mdt_object_remote(parent)) {
2522                         rc = mdt_remote_object_lock(info, parent,
2523                                                     mdt_object_fid(child),
2524                                                     &lhr->mlh_rreg_lh,
2525                                                     lhr->mlh_rreg_mode,
2526                                                     rmt_ibits, false);
2527                         if (rc != ELDLM_OK)
2528                                 return rc;
2529                 } else {
2530                         LASSERT(mdt_object_remote(child));
2531                         rc = mdt_object_local_lock(info, child, lhr,
2532                                                    &rmt_ibits, 0, true);
2533                         if (rc < 0)
2534                                 return rc;
2535                 }
2536
2537                 ibits &= ~MDS_INODELOCK_LOOKUP;
2538         }
2539
2540         if (mdt_object_remote(child)) {
2541                 rc = mdt_remote_object_lock(info, child, mdt_object_fid(child),
2542                                             &lhc->mlh_rreg_lh,
2543                                             lhc->mlh_rreg_mode,
2544                                             ibits, false);
2545                 if (rc == ELDLM_OK)
2546                         rc = 0;
2547         } else {
2548                 rc = mdt_reint_object_lock(info, child, lhc, ibits,
2549                                            cos_incompat);
2550         }
2551
2552         if (!rc)
2553                 mdt_object_unlock(info, child, lhr, rc);
2554
2555         return rc;
2556 }
2557
2558 /*
2559  * VBR: rename versions in reply: 0 - srcdir parent; 1 - tgtdir parent;
2560  * 2 - srcdir child; 3 - tgtdir child.
2561  * Update on disk version of srcdir child.
2562  */
2563 static int mdt_reint_rename(struct mdt_thread_info *info,
2564                             struct mdt_lock_handle *unused)
2565 {
2566         struct mdt_device *mdt = info->mti_mdt;
2567         struct mdt_reint_record *rr = &info->mti_rr;
2568         struct md_attr *ma = &info->mti_attr;
2569         struct ptlrpc_request *req = mdt_info_req(info);
2570         struct mdt_object *msrcdir = NULL;
2571         struct mdt_object *mtgtdir = NULL;
2572         struct mdt_object *mold;
2573         struct mdt_object *mnew = NULL;
2574         struct lustre_handle rename_lh = { 0 };
2575         struct mdt_lock_handle *lh_srcdirp;
2576         struct mdt_lock_handle *lh_tgtdirp;
2577         struct mdt_lock_handle *lh_oldp = NULL;
2578         struct mdt_lock_handle *lh_rmt = NULL;
2579         struct mdt_lock_handle *lh_newp = NULL;
2580         struct lu_fid *old_fid = &info->mti_tmp_fid1;
2581         struct lu_fid *new_fid = &info->mti_tmp_fid2;
2582         __u64 lock_ibits;
2583         bool reverse = false, discard = false;
2584         bool cos_incompat;
2585         ktime_t kstart = ktime_get();
2586         int rc;
2587
2588         ENTRY;
2589         DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME,
2590                   PFID(rr->rr_fid1), PNAME(&rr->rr_name),
2591                   PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name));
2592
2593         if (info->mti_dlm_req)
2594                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2595
2596         if (!fid_is_md_operative(rr->rr_fid1) ||
2597             !fid_is_md_operative(rr->rr_fid2))
2598                 RETURN(-EPERM);
2599
2600         /* find both parents. */
2601         msrcdir = mdt_parent_find_check(info, rr->rr_fid1, 0);
2602         if (IS_ERR(msrcdir))
2603                 RETURN(PTR_ERR(msrcdir));
2604
2605         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
2606
2607         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
2608                 mtgtdir = msrcdir;
2609                 mdt_object_get(info->mti_env, mtgtdir);
2610         } else {
2611                 mtgtdir = mdt_parent_find_check(info, rr->rr_fid2, 1);
2612                 if (IS_ERR(mtgtdir))
2613                         GOTO(out_put_srcdir, rc = PTR_ERR(mtgtdir));
2614         }
2615
2616         /*
2617          * Note: do not enqueue rename lock for replay request, because
2618          * if other MDT holds rename lock, but being blocked to wait for
2619          * this MDT to finish its recovery, and the failover MDT can not
2620          * get rename lock, which will cause deadlock.
2621          */
2622         if (!req_is_replay(req)) {
2623                 /*
2624                  * Normally rename RPC is handled on the MDT with the target
2625                  * directory (if target exists, it's on the MDT with the
2626                  * target), if the source directory is remote, it's a hint that
2627                  * source is remote too (this may not be true, but it won't
2628                  * cause any issue), return -EXDEV early to avoid taking
2629                  * rename_lock.
2630                  */
2631                 if (!mdt->mdt_enable_remote_rename &&
2632                     mdt_object_remote(msrcdir))
2633                         GOTO(out_put_tgtdir, rc = -EXDEV);
2634
2635                 /* This might be further relaxed in the future for regular file
2636                  * renames in different source and target parents. Start with
2637                  * only same-directory renames for simplicity and because this
2638                  * is by far the most the common use case.
2639                  */
2640                 if (msrcdir != mtgtdir) {
2641                         rc = mdt_rename_lock(info, &rename_lh);
2642                         if (rc != 0) {
2643                                 CERROR("%s: cannot lock for rename: rc = %d\n",
2644                                        mdt_obd_name(mdt), rc);
2645                                 GOTO(out_put_tgtdir, rc);
2646                         }
2647                 } else {
2648                         CDEBUG(D_INFO, "%s: samedir rename "DFID"/"DNAME"\n",
2649                                mdt_obd_name(mdt), PFID(rr->rr_fid1),
2650                                PNAME(&rr->rr_name));
2651                 }
2652         }
2653
2654         rc = mdt_rename_determine_lock_order(info, msrcdir, mtgtdir);
2655         if (rc < 0)
2656                 GOTO(out_unlock_rename, rc);
2657
2658         reverse = rc;
2659
2660         /* source needs to be looked up after locking source parent, otherwise
2661          * this rename may race with unlink source, and cause rename hang, see
2662          * sanityn.sh 55b, so check parents first, if later we found source is
2663          * remote, relock parents.
2664          */
2665         cos_incompat = (mdt_object_remote(msrcdir) ||
2666                         mdt_object_remote(mtgtdir));
2667
2668         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2669
2670         /* lock parents in the proper order. */
2671         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
2672         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
2673
2674         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN);
2675         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN2);
2676 relock:
2677         mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name);
2678         mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name);
2679
2680         if (reverse) {
2681                 rc = mdt_object_lock_save(info, mtgtdir, lh_tgtdirp, 1,
2682                                           cos_incompat);
2683                 if (rc)
2684                         GOTO(out_unlock_rename, rc);
2685
2686                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
2687
2688                 rc = mdt_object_lock_save(info, msrcdir, lh_srcdirp, 0,
2689                                           cos_incompat);
2690                 if (rc != 0) {
2691                         mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
2692                         GOTO(out_unlock_rename, rc);
2693                 }
2694         } else {
2695                 rc = mdt_object_lock_save(info, msrcdir, lh_srcdirp, 0,
2696                                           cos_incompat);
2697                 if (rc)
2698                         GOTO(out_unlock_rename, rc);
2699
2700                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
2701
2702                 if (mtgtdir != msrcdir) {
2703                         rc = mdt_object_lock_save(info, mtgtdir, lh_tgtdirp, 1,
2704                                                   cos_incompat);
2705                 } else if (!mdt_object_remote(mtgtdir) &&
2706                            lh_srcdirp->mlh_pdo_hash !=
2707                            lh_tgtdirp->mlh_pdo_hash) {
2708                         rc = mdt_pdir_hash_lock(info, lh_tgtdirp, mtgtdir,
2709                                                 MDS_INODELOCK_UPDATE,
2710                                                 cos_incompat);
2711                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
2712                 }
2713                 if (rc != 0) {
2714                         mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
2715                         GOTO(out_unlock_rename, rc);
2716                 }
2717         }
2718
2719         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2720         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME2, 5);
2721
2722         /* find mold object. */
2723         fid_zero(old_fid);
2724         rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2);
2725         if (rc != 0)
2726                 GOTO(out_unlock_parents, rc);
2727
2728         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
2729                 GOTO(out_unlock_parents, rc = -EINVAL);
2730
2731         if (!fid_is_md_operative(old_fid))
2732                 GOTO(out_unlock_parents, rc = -EPERM);
2733
2734         mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
2735         if (IS_ERR(mold))
2736                 GOTO(out_unlock_parents, rc = PTR_ERR(mold));
2737
2738         if (!mdt_object_exists(mold)) {
2739                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2740                                 &mold->mot_obj,
2741                                 "object does not exist");
2742                 GOTO(out_put_old, rc = -ENOENT);
2743         }
2744
2745         if (mdt_object_remote(mold) && !mdt->mdt_enable_remote_rename)
2746                 GOTO(out_put_old, rc = -EXDEV);
2747
2748         /* Check if @mtgtdir is subdir of @mold, before locking child
2749          * to avoid reverse locking.
2750          */
2751         if (mtgtdir != msrcdir) {
2752                 rc = mdo_is_subdir(info->mti_env, mdt_object_child(mtgtdir),
2753                                    old_fid);
2754                 if (rc) {
2755                         if (rc == 1)
2756                                 rc = -EINVAL;
2757                         GOTO(out_put_old, rc);
2758                 }
2759         }
2760
2761         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold));
2762         /* save version after locking */
2763         mdt_version_get_save(info, mold, 2);
2764
2765         if (!cos_incompat && mdt_object_remote(mold)) {
2766                 cos_incompat = true;
2767                 mdt_object_put(info->mti_env, mold);
2768                 mdt_object_unlock(info, mtgtdir, lh_tgtdirp, -EAGAIN);
2769                 mdt_object_unlock(info, msrcdir, lh_srcdirp, -EAGAIN);
2770                 goto relock;
2771         }
2772
2773         /* find mnew object:
2774          * mnew target object may not exist now
2775          * lookup with version checking
2776          */
2777         fid_zero(new_fid);
2778         rc = mdt_lookup_version_check(info, mtgtdir, &rr->rr_tgt_name, new_fid,
2779                                       3);
2780         if (rc == 0) {
2781                 /* the new_fid should have been filled at this moment */
2782                 if (lu_fid_eq(old_fid, new_fid))
2783                         GOTO(out_put_old, rc);
2784
2785                 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
2786                     lu_fid_eq(new_fid, rr->rr_fid2))
2787                         GOTO(out_put_old, rc = -EINVAL);
2788
2789                 if (!fid_is_md_operative(new_fid))
2790                         GOTO(out_put_old, rc = -EPERM);
2791
2792                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
2793                 if (IS_ERR(mnew))
2794                         GOTO(out_put_old, rc = PTR_ERR(mnew));
2795
2796                 if (!mdt_object_exists(mnew)) {
2797                         LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2798                                         &mnew->mot_obj,
2799                                         "object does not exist");
2800                         GOTO(out_put_new, rc = -ENOENT);
2801                 }
2802
2803                 if (mdt_object_remote(mnew)) {
2804                         struct mdt_body  *repbody;
2805
2806                         /* Always send rename req to the target child MDT */
2807                         repbody = req_capsule_server_get(info->mti_pill,
2808                                                          &RMF_MDT_BODY);
2809                         LASSERT(repbody != NULL);
2810                         repbody->mbo_fid1 = *new_fid;
2811                         repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
2812                         GOTO(out_put_new, rc = -EXDEV);
2813                 }
2814                 /* Before locking the target dir, check we do not replace
2815                  * a dir with a non-dir, otherwise it may deadlock with
2816                  * link op which tries to create a link in this dir
2817                  * back to this non-dir.
2818                  */
2819                 if (S_ISDIR(lu_object_attr(&mnew->mot_obj)) &&
2820                     !S_ISDIR(lu_object_attr(&mold->mot_obj)))
2821                         GOTO(out_put_new, rc = -EISDIR);
2822
2823                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2824                 lh_rmt = &info->mti_lh[MDT_LH_RMT];
2825                 mdt_lock_reg_init(lh_oldp, LCK_EX);
2826                 mdt_lock_reg_init(lh_rmt, LCK_EX);
2827                 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
2828                 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2829                                             lh_rmt, lock_ibits, cos_incompat);
2830                 if (rc < 0)
2831                         GOTO(out_put_new, rc);
2832
2833                 /* Check if @msrcdir is subdir of @mnew, before locking child
2834                  * to avoid reverse locking.
2835                  */
2836                 if (mtgtdir != msrcdir) {
2837                         rc = mdo_is_subdir(info->mti_env,
2838                                            mdt_object_child(msrcdir), new_fid);
2839                         if (rc) {
2840                                 if (rc == 1)
2841                                         rc = -EINVAL;
2842                                 GOTO(out_unlock_old, rc);
2843                         }
2844                 }
2845
2846                 /* We used to acquire MDS_INODELOCK_FULL here but we
2847                  * can't do this now because a running HSM restore on
2848                  * the rename onto victim will hold the layout
2849                  * lock. See LU-4002.
2850                  */
2851
2852                 lh_newp = &info->mti_lh[MDT_LH_NEW];
2853                 mdt_lock_reg_init(lh_newp, LCK_EX);
2854                 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
2855                 if (mdt_object_remote(mtgtdir)) {
2856                         rc = mdt_remote_object_lock(info, mtgtdir,
2857                                                     mdt_object_fid(mnew),
2858                                                     &lh_newp->mlh_rreg_lh,
2859                                                     lh_newp->mlh_rreg_mode,
2860                                                     MDS_INODELOCK_LOOKUP,
2861                                                     false);
2862                         if (rc != ELDLM_OK)
2863                                 GOTO(out_unlock_old, rc);
2864
2865                         lock_ibits &= ~MDS_INODELOCK_LOOKUP;
2866                 }
2867                 rc = mdt_reint_object_lock(info, mnew, lh_newp, lock_ibits,
2868                                            cos_incompat);
2869                 if (rc != 0)
2870                         GOTO(out_unlock_new, rc);
2871
2872                 /* get and save version after locking */
2873                 mdt_version_get_save(info, mnew, 3);
2874         } else if (rc != -ENOENT) {
2875                 GOTO(out_put_old, rc);
2876         } else {
2877                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2878                 lh_rmt = &info->mti_lh[MDT_LH_RMT];
2879                 mdt_lock_reg_init(lh_oldp, LCK_EX);
2880                 mdt_lock_reg_init(lh_rmt, LCK_EX);
2881                 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
2882                 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2883                                             lh_rmt, lock_ibits, cos_incompat);
2884                 if (rc != 0)
2885                         GOTO(out_put_old, rc);
2886
2887                 mdt_enoent_version_save(info, 3);
2888         }
2889
2890         /* step 5: rename it */
2891         mdt_reint_init_ma(info, ma);
2892
2893         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
2894                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
2895
2896         if (mnew != NULL)
2897                 mutex_lock(&mnew->mot_lov_mutex);
2898
2899         rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
2900                         mdt_object_child(mtgtdir), old_fid, &rr->rr_name,
2901                         mnew != NULL ? mdt_object_child(mnew) : NULL,
2902                         &rr->rr_tgt_name, ma);
2903
2904         if (mnew != NULL)
2905                 mutex_unlock(&mnew->mot_lov_mutex);
2906
2907         /* handle last link of tgt object */
2908         if (rc == 0) {
2909                 mdt_counter_incr(req, LPROC_MDT_RENAME,
2910                                  ktime_us_delta(ktime_get(), kstart));
2911                 if (mnew) {
2912                         mdt_handle_last_unlink(info, mnew, ma);
2913                         discard = mdt_dom_check_for_discard(info, mnew);
2914                 }
2915                 mdt_rename_counter_tally(info, info->mti_mdt, req,
2916                                          msrcdir, mtgtdir,
2917                                          ktime_us_delta(ktime_get(), kstart));
2918         }
2919
2920         EXIT;
2921 out_unlock_new:
2922         if (mnew != NULL)
2923                 mdt_object_unlock(info, mnew, lh_newp, rc);
2924 out_unlock_old:
2925         mdt_object_unlock(info, NULL, lh_rmt, rc);
2926         mdt_object_unlock(info, mold, lh_oldp, rc);
2927 out_put_new:
2928         if (mnew && !discard)
2929                 mdt_object_put(info->mti_env, mnew);
2930 out_put_old:
2931         mdt_object_put(info->mti_env, mold);
2932 out_unlock_parents:
2933         mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
2934         mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
2935 out_unlock_rename:
2936         if (lustre_handle_is_used(&rename_lh))
2937                 mdt_rename_unlock(&rename_lh);
2938 out_put_tgtdir:
2939         mdt_object_put(info->mti_env, mtgtdir);
2940 out_put_srcdir:
2941         mdt_object_put(info->mti_env, msrcdir);
2942
2943         /* The DoM discard can be done right in the place above where it is
2944          * assigned, meanwhile it is done here after rename unlock due to
2945          * compatibility with old clients, for them the discard blocks
2946          * the main thread until completion. Check LU-11359 for details.
2947          */
2948         if (discard) {
2949                 mdt_dom_discard_data(info, mnew);
2950                 mdt_object_put(info->mti_env, mnew);
2951         }
2952         OBD_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
2953         return rc;
2954 }
2955
2956 static int mdt_reint_resync(struct mdt_thread_info *info,
2957                             struct mdt_lock_handle *lhc)
2958 {
2959         struct mdt_reint_record *rr = &info->mti_rr;
2960         struct ptlrpc_request *req = mdt_info_req(info);
2961         struct md_attr *ma = &info->mti_attr;
2962         struct mdt_object *mo;
2963         struct ldlm_lock *lease;
2964         struct mdt_body *repbody;
2965         struct md_layout_change layout = { .mlc_mirror_id = rr->rr_mirror_id };
2966         bool lease_broken;
2967         int rc, rc2;
2968
2969         ENTRY;
2970         DEBUG_REQ(D_INODE, req, DFID", FLR file resync", PFID(rr->rr_fid1));
2971
2972         if (info->mti_dlm_req)
2973                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2974
2975         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
2976         if (IS_ERR(mo))
2977                 GOTO(out, rc = PTR_ERR(mo));
2978
2979         if (!mdt_object_exists(mo))
2980                 GOTO(out_obj, rc = -ENOENT);
2981
2982         if (!S_ISREG(lu_object_attr(&mo->mot_obj)))
2983                 GOTO(out_obj, rc = -EINVAL);
2984
2985         if (mdt_object_remote(mo))
2986                 GOTO(out_obj, rc = -EREMOTE);
2987
2988         lease = ldlm_handle2lock(rr->rr_lease_handle);
2989         if (lease == NULL)
2990                 GOTO(out_obj, rc = -ESTALE);
2991
2992         /* It's really necessary to grab open_sem and check if the lease lock
2993          * has been lost. There would exist a concurrent writer coming in and
2994          * generating some dirty data in memory cache, the writeback would fail
2995          * after the layout version is increased by MDS_REINT_RESYNC RPC.
2996          */
2997         if (!down_write_trylock(&mo->mot_open_sem))
2998                 GOTO(out_put_lease, rc = -EBUSY);
2999
3000         lock_res_and_lock(lease);
3001         lease_broken = ldlm_is_cancel(lease);
3002         unlock_res_and_lock(lease);
3003         if (lease_broken)
3004                 GOTO(out_unlock, rc = -EBUSY);
3005
3006         /* the file has yet opened by anyone else after we took the lease. */
3007         layout.mlc_opc = MD_LAYOUT_RESYNC;
3008         lhc = &info->mti_lh[MDT_LH_LOCAL];
3009         rc = mdt_layout_change(info, mo, lhc, &layout);
3010         if (rc)
3011                 GOTO(out_unlock, rc);
3012
3013         mdt_object_unlock(info, mo, lhc, 0);
3014
3015         ma->ma_need = MA_INODE;
3016         ma->ma_valid = 0;
3017         rc = mdt_attr_get_complex(info, mo, ma);
3018         if (rc != 0)
3019                 GOTO(out_unlock, rc);
3020
3021         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
3022         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
3023
3024         EXIT;
3025 out_unlock:
3026         up_write(&mo->mot_open_sem);
3027 out_put_lease:
3028         LDLM_LOCK_PUT(lease);
3029 out_obj:
3030         mdt_object_put(info->mti_env, mo);
3031 out:
3032         mdt_client_compatibility(info);
3033         rc2 = mdt_fix_reply(info);
3034         if (rc == 0)
3035                 rc = rc2;
3036         return rc;
3037 }
3038
3039 struct mdt_reinter {
3040         int (*mr_handler)(struct mdt_thread_info *, struct mdt_lock_handle *);
3041         enum lprocfs_extra_opc mr_extra_opc;
3042 };
3043
3044 static const struct mdt_reinter mdt_reinters[] = {
3045         [REINT_SETATTR] = {
3046                 .mr_handler = &mdt_reint_setattr,
3047                 .mr_extra_opc = MDS_REINT_SETATTR,
3048         },
3049         [REINT_CREATE] = {
3050                 .mr_handler = &mdt_reint_create,
3051                 .mr_extra_opc = MDS_REINT_CREATE,
3052         },
3053         [REINT_LINK] = {
3054                 .mr_handler = &mdt_reint_link,
3055                 .mr_extra_opc = MDS_REINT_LINK,
3056         },
3057         [REINT_UNLINK] = {
3058                 .mr_handler = &mdt_reint_unlink,
3059                 .mr_extra_opc = MDS_REINT_UNLINK,
3060         },
3061         [REINT_RENAME] = {
3062                 .mr_handler = &mdt_reint_rename,
3063                 .mr_extra_opc = MDS_REINT_RENAME,
3064         },
3065         [REINT_OPEN] = {
3066                 .mr_handler = &mdt_reint_open,
3067                 .mr_extra_opc = MDS_REINT_OPEN,
3068         },
3069         [REINT_SETXATTR] = {
3070                 .mr_handler = &mdt_reint_setxattr,
3071                 .mr_extra_opc = MDS_REINT_SETXATTR,
3072         },
3073         [REINT_RMENTRY] = {
3074                 .mr_handler = &mdt_reint_unlink,
3075                 .mr_extra_opc = MDS_REINT_UNLINK,
3076         },
3077         [REINT_MIGRATE] = {
3078                 .mr_handler = &mdt_reint_migrate,
3079                 .mr_extra_opc = MDS_REINT_RENAME,
3080         },
3081         [REINT_RESYNC] = {
3082                 .mr_handler = &mdt_reint_resync,
3083                 .mr_extra_opc = MDS_REINT_RESYNC,
3084         },
3085 };
3086
3087 int mdt_reint_rec(struct mdt_thread_info *info,
3088                   struct mdt_lock_handle *lhc)
3089 {
3090         const struct mdt_reinter *mr;
3091         int rc;
3092
3093         ENTRY;
3094         if (!(info->mti_rr.rr_opcode < ARRAY_SIZE(mdt_reinters)))
3095                 RETURN(-EPROTO);
3096
3097         mr = &mdt_reinters[info->mti_rr.rr_opcode];
3098         if (mr->mr_handler == NULL)
3099                 RETURN(-EPROTO);
3100
3101         rc = (*mr->mr_handler)(info, lhc);
3102
3103         lprocfs_counter_incr(ptlrpc_req2svc(mdt_info_req(info))->srv_stats,
3104                              PTLRPC_LAST_CNTR + mr->mr_extra_opc);
3105
3106         RETURN(rc);
3107 }