Whamcloud - gitweb
LU-16062 ldlm: improve bl_timeout for prolong
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mdt/mdt_reint.c
32  *
33  * Lustre Metadata Target (mdt) reintegration routines
34  *
35  * Author: Peter Braam <braam@clusterfs.com>
36  * Author: Andreas Dilger <adilger@clusterfs.com>
37  * Author: Phil Schwan <phil@clusterfs.com>
38  * Author: Huang Hua <huanghua@clusterfs.com>
39  * Author: Yury Umanets <umka@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_MDS
43
44 #include <lprocfs_status.h>
45 #include "mdt_internal.h"
46 #include <lustre_lmv.h>
47 #include <lustre_crypto.h>
48
49 static inline void mdt_reint_init_ma(struct mdt_thread_info *info,
50                                      struct md_attr *ma)
51 {
52         ma->ma_need = MA_INODE;
53         ma->ma_valid = 0;
54 }
55
56 /**
57  * Get version of object by fid.
58  *
59  * Return real version or ENOENT_VERSION if object doesn't exist
60  */
61 static void mdt_obj_version_get(struct mdt_thread_info *info,
62                                 struct mdt_object *o, __u64 *version)
63 {
64         LASSERT(o);
65
66         if (mdt_object_exists(o) && !mdt_object_remote(o) &&
67             !fid_is_obf(mdt_object_fid(o)))
68                 *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
69         else
70                 *version = ENOENT_VERSION;
71         CDEBUG(D_INODE, "FID "DFID" version is %#llx\n",
72                PFID(mdt_object_fid(o)), *version);
73 }
74
75 /**
76  * Check version is correct.
77  *
78  * Should be called only during replay.
79  */
80 static int mdt_version_check(struct ptlrpc_request *req,
81                              __u64 version, int idx)
82 {
83         __u64 *pre_ver = lustre_msg_get_versions(req->rq_reqmsg);
84
85         ENTRY;
86         if (!exp_connect_vbr(req->rq_export))
87                 RETURN(0);
88
89         LASSERT(req_is_replay(req));
90         /** VBR: version is checked always because costs nothing */
91         LASSERT(idx < PTLRPC_NUM_VERSIONS);
92         /** Sanity check for malformed buffers */
93         if (pre_ver == NULL) {
94                 CERROR("No versions in request buffer\n");
95                 spin_lock(&req->rq_export->exp_lock);
96                 req->rq_export->exp_vbr_failed = 1;
97                 spin_unlock(&req->rq_export->exp_lock);
98                 RETURN(-EOVERFLOW);
99         } else if (pre_ver[idx] != version) {
100                 CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n",
101                        pre_ver[idx], version);
102                 spin_lock(&req->rq_export->exp_lock);
103                 req->rq_export->exp_vbr_failed = 1;
104                 spin_unlock(&req->rq_export->exp_lock);
105                 RETURN(-EOVERFLOW);
106         }
107         RETURN(0);
108 }
109
110 /**
111  * Save pre-versions in reply.
112  */
113 static void mdt_version_save(struct ptlrpc_request *req, __u64 version,
114                              int idx)
115 {
116         __u64 *reply_ver;
117
118         if (!exp_connect_vbr(req->rq_export))
119                 return;
120
121         LASSERT(!req_is_replay(req));
122         LASSERT(req->rq_repmsg != NULL);
123         reply_ver = lustre_msg_get_versions(req->rq_repmsg);
124         if (reply_ver)
125                 reply_ver[idx] = version;
126 }
127
128 /**
129  * Save enoent version, it is needed when it is obvious that object doesn't
130  * exist, e.g. child during create.
131  */
132 static void mdt_enoent_version_save(struct mdt_thread_info *info, int idx)
133 {
134         /* save version of file name for replay, it must be ENOENT here */
135         if (!req_is_replay(mdt_info_req(info))) {
136                 info->mti_ver[idx] = ENOENT_VERSION;
137                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
138         }
139 }
140
141 /**
142  * Get version from disk and save in reply buffer.
143  *
144  * Versions are saved in reply only during normal operations not replays.
145  */
146 void mdt_version_get_save(struct mdt_thread_info *info,
147                           struct mdt_object *mto, int idx)
148 {
149         /* don't save versions during replay */
150         if (!req_is_replay(mdt_info_req(info))) {
151                 mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
152                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
153         }
154 }
155
156 /**
157  * Get version from disk and check it, no save in reply.
158  */
159 int mdt_version_get_check(struct mdt_thread_info *info,
160                           struct mdt_object *mto, int idx)
161 {
162         /* only check versions during replay */
163         if (!req_is_replay(mdt_info_req(info)))
164                 return 0;
165
166         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
167         return mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
168 }
169
170 /**
171  * Get version from disk and check if recovery or just save.
172  */
173 int mdt_version_get_check_save(struct mdt_thread_info *info,
174                                struct mdt_object *mto, int idx)
175 {
176         int rc = 0;
177
178         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
179         if (req_is_replay(mdt_info_req(info)))
180                 rc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx],
181                                        idx);
182         else
183                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
184         return rc;
185 }
186
187 /**
188  * Lookup with version checking.
189  *
190  * This checks version of 'name'. Many reint functions uses 'name' for child not
191  * FID, therefore we need to get object by name and check its version.
192  */
193 int mdt_lookup_version_check(struct mdt_thread_info *info,
194                              struct mdt_object *p,
195                              const struct lu_name *lname,
196                              struct lu_fid *fid, int idx)
197 {
198         int rc, vbrc;
199
200         rc = mdo_lookup(info->mti_env, mdt_object_child(p), lname, fid,
201                         &info->mti_spec);
202         /* Check version only during replay */
203         if (!req_is_replay(mdt_info_req(info)))
204                 return rc;
205
206         info->mti_ver[idx] = ENOENT_VERSION;
207         if (rc == 0) {
208                 struct mdt_object *child;
209
210                 child = mdt_object_find(info->mti_env, info->mti_mdt, fid);
211                 if (likely(!IS_ERR(child))) {
212                         mdt_obj_version_get(info, child, &info->mti_ver[idx]);
213                         mdt_object_put(info->mti_env, child);
214                 }
215         }
216         vbrc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
217         return vbrc ? vbrc : rc;
218
219 }
220
221 static int mdt_unlock_slaves(struct mdt_thread_info *mti,
222                              struct mdt_object *obj,
223                              struct ldlm_enqueue_info *einfo,
224                              int decref)
225 {
226         union ldlm_policy_data *policy = &mti->mti_policy;
227         struct mdt_lock_handle *lh = &mti->mti_lh[MDT_LH_LOCAL];
228         struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
229         int i;
230
231         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
232         LASSERT(slave_locks);
233
234         memset(policy, 0, sizeof(*policy));
235         policy->l_inodebits.bits = einfo->ei_inodebits;
236         mdt_lock_handle_init(lh);
237         mdt_lock_reg_init(lh, einfo->ei_mode);
238         for (i = 0; i < slave_locks->ha_count; i++) {
239                 if (test_bit(i, (void *)slave_locks->ha_map))
240                         lh->mlh_rreg_lh = slave_locks->ha_handles[i];
241                 else
242                         lh->mlh_reg_lh = slave_locks->ha_handles[i];
243                 mdt_object_unlock(mti, NULL, lh, decref);
244                 slave_locks->ha_handles[i].cookie = 0ull;
245         }
246
247         return mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
248                                 policy);
249 }
250
251 static inline int mdt_object_striped(struct mdt_thread_info *mti,
252                                      struct mdt_object *obj)
253 {
254         struct lu_device *bottom_dev;
255         struct lu_object *bottom_obj;
256         int rc;
257
258         if (!S_ISDIR(obj->mot_header.loh_attr))
259                 return 0;
260
261         /* getxattr from bottom obj to avoid reading in shard FIDs */
262         bottom_dev = dt2lu_dev(mti->mti_mdt->mdt_bottom);
263         bottom_obj = lu_object_find_slice(mti->mti_env, bottom_dev,
264                                           mdt_object_fid(obj), NULL);
265         if (IS_ERR(bottom_obj))
266                 return PTR_ERR(bottom_obj);
267
268         rc = dt_xattr_get(mti->mti_env, lu2dt(bottom_obj), &LU_BUF_NULL,
269                           XATTR_NAME_LMV);
270         lu_object_put(mti->mti_env, bottom_obj);
271
272         return (rc > 0) ? 1 : (rc == -ENODATA) ? 0 : rc;
273 }
274
275 /**
276  * Lock slave stripes if necessary, the lock handles of slave stripes
277  * will be stored in einfo->ei_cbdata.
278  **/
279 static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
280                            enum ldlm_mode mode, __u64 ibits,
281                            struct ldlm_enqueue_info *einfo)
282 {
283         union ldlm_policy_data *policy = &mti->mti_policy;
284
285         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
286
287         einfo->ei_type = LDLM_IBITS;
288         einfo->ei_mode = mode;
289         einfo->ei_cb_bl = mdt_remote_blocking_ast;
290         einfo->ei_cb_local_bl = mdt_blocking_ast;
291         einfo->ei_cb_cp = ldlm_completion_ast;
292         einfo->ei_enq_slave = 1;
293         einfo->ei_namespace = mti->mti_mdt->mdt_namespace;
294         einfo->ei_inodebits = ibits;
295         einfo->ei_req_slot = 1;
296         memset(policy, 0, sizeof(*policy));
297         policy->l_inodebits.bits = ibits;
298
299         return mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
300                               policy);
301 }
302
303 int mdt_reint_striped_lock(struct mdt_thread_info *info,
304                            struct mdt_object *o,
305                            struct mdt_lock_handle *lh,
306                            __u64 ibits,
307                            struct ldlm_enqueue_info *einfo,
308                            bool cos_incompat)
309 {
310         int rc;
311
312         LASSERT(!mdt_object_remote(o));
313
314         memset(einfo, 0, sizeof(*einfo));
315
316         rc = mdt_reint_object_lock(info, o, lh, ibits, cos_incompat);
317         if (rc)
318                 return rc;
319
320         rc = mdt_object_striped(info, o);
321         if (rc != 1) {
322                 if (rc < 0)
323                         mdt_object_unlock(info, o, lh, rc);
324                 return rc;
325         }
326
327         rc = mdt_lock_slaves(info, o, lh->mlh_reg_mode, ibits, einfo);
328         if (rc) {
329                 mdt_object_unlock(info, o, lh, rc);
330                 if (rc == -EIO && OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME))
331                         rc = 0;
332         }
333
334         return rc;
335 }
336
337 void mdt_reint_striped_unlock(struct mdt_thread_info *info,
338                               struct mdt_object *o,
339                               struct mdt_lock_handle *lh,
340                               struct ldlm_enqueue_info *einfo, int decref)
341 {
342         if (einfo->ei_cbdata)
343                 mdt_unlock_slaves(info, o, einfo, decref);
344         mdt_object_unlock(info, o, lh, decref);
345 }
346
347 static int mdt_restripe(struct mdt_thread_info *info,
348                         struct mdt_object *parent,
349                         const struct lu_name *lname,
350                         const struct lu_fid *tfid,
351                         struct md_op_spec *spec,
352                         struct md_attr *ma)
353 {
354         struct mdt_device *mdt = info->mti_mdt;
355         struct lu_fid *fid = &info->mti_tmp_fid2;
356         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
357         struct lmv_user_md *lum = spec->u.sp_ea.eadata;
358         struct lmv_mds_md_v1 *lmv;
359         struct mdt_object *child;
360         struct mdt_lock_handle *lhp;
361         struct mdt_lock_handle *lhc;
362         struct mdt_body *repbody;
363         int rc;
364
365         ENTRY;
366         if (!mdt->mdt_enable_dir_restripe)
367                 RETURN(-EPERM);
368
369         LASSERT(lum);
370         lum->lum_hash_type |= cpu_to_le32(LMV_HASH_FLAG_FIXED);
371
372         rc = mdt_version_get_check_save(info, parent, 0);
373         if (rc)
374                 RETURN(rc);
375
376         lhp = &info->mti_lh[MDT_LH_PARENT];
377         mdt_lock_pdo_init(lhp, LCK_PW, lname);
378         rc = mdt_reint_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
379                                    true);
380         if (rc)
381                 RETURN(rc);
382
383         rc = mdt_stripe_get(info, parent, ma, XATTR_NAME_LMV);
384         if (rc)
385                 GOTO(unlock_parent, rc);
386
387         if (ma->ma_valid & MA_LMV) {
388                 /* don't allow restripe if parent dir layout is changing */
389                 lmv = &ma->ma_lmv->lmv_md_v1;
390                 if (!lmv_is_sane2(lmv))
391                         GOTO(unlock_parent, rc = -EBADF);
392
393                 if (lmv_is_layout_changing(lmv))
394                         GOTO(unlock_parent, rc = -EBUSY);
395         }
396
397         fid_zero(fid);
398         rc = mdt_lookup_version_check(info, parent, lname, fid, 1);
399         if (rc)
400                 GOTO(unlock_parent, rc);
401
402         child = mdt_object_find(info->mti_env, mdt, fid);
403         if (IS_ERR(child))
404                 GOTO(unlock_parent, rc = PTR_ERR(child));
405
406         if (!mdt_object_exists(child))
407                 GOTO(out_child, rc = -ENOENT);
408
409         if (mdt_object_remote(child)) {
410                 struct mdt_body *repbody;
411
412                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
413                 if (!repbody)
414                         GOTO(out_child, rc = -EPROTO);
415
416                 repbody->mbo_fid1 = *fid;
417                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
418                 GOTO(out_child, rc = -EREMOTE);
419         }
420
421         if (!S_ISDIR(lu_object_attr(&child->mot_obj)))
422                 GOTO(out_child, rc = -ENOTDIR);
423
424         rc = mdt_stripe_get(info, child, ma, XATTR_NAME_LMV);
425         if (rc)
426                 GOTO(out_child, rc);
427
428         /* race with migrate? */
429         if ((ma->ma_valid & MA_LMV) &&
430              lmv_is_migrating(&ma->ma_lmv->lmv_md_v1))
431                 GOTO(out_child, rc = -EBUSY);
432
433         /* lock object */
434         lhc = &info->mti_lh[MDT_LH_CHILD];
435         mdt_lock_reg_init(lhc, LCK_EX);
436
437         /* enqueue object remote LOOKUP lock */
438         if (mdt_object_remote(parent)) {
439                 rc = mdt_remote_object_lock(info, parent, fid,
440                                             &lhc->mlh_rreg_lh,
441                                             lhc->mlh_rreg_mode,
442                                             MDS_INODELOCK_LOOKUP, false);
443                 if (rc != ELDLM_OK)
444                         GOTO(out_child, rc);
445         }
446
447         rc = mdt_reint_striped_lock(info, child, lhc, MDS_INODELOCK_FULL, einfo,
448                                     true);
449         if (rc)
450                 GOTO(unlock_child, rc);
451
452         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
453         rc = mdt_version_get_check_save(info, child, 1);
454         if (rc)
455                 GOTO(unlock_child, rc);
456
457         spin_lock(&mdt->mdt_restriper.mdr_lock);
458         if (child->mot_restriping) {
459                 /* race? */
460                 spin_unlock(&mdt->mdt_restriper.mdr_lock);
461                 GOTO(unlock_child, rc = -EBUSY);
462         }
463         child->mot_restriping = 1;
464         spin_unlock(&mdt->mdt_restriper.mdr_lock);
465
466         *fid = *tfid;
467         rc = mdt_restripe_internal(info, parent, child, lname, fid, spec, ma);
468         if (rc)
469                 GOTO(restriping_clear, rc);
470
471         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
472         if (!repbody)
473                 GOTO(restriping_clear, rc = -EPROTO);
474
475         mdt_pack_attr2body(info, repbody, &ma->ma_attr, fid);
476         EXIT;
477
478 restriping_clear:
479         child->mot_restriping = 0;
480 unlock_child:
481         mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
482 out_child:
483         mdt_object_put(info->mti_env, child);
484 unlock_parent:
485         mdt_object_unlock(info, parent, lhp, rc);
486
487         return rc;
488 }
489
490 /*
491  * VBR: we save three versions in reply:
492  * 0 - parent. Check that parent version is the same during replay.
493  * 1 - name. Version of 'name' if file exists with the same name or
494  * ENOENT_VERSION, it is needed because file may appear due to missed replays.
495  * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
496  * check.
497  */
498 static int mdt_create(struct mdt_thread_info *info)
499 {
500         struct mdt_device *mdt = info->mti_mdt;
501         struct mdt_object *parent;
502         struct mdt_object *child;
503         struct mdt_lock_handle *lh;
504         struct mdt_body *repbody;
505         struct md_attr *ma = &info->mti_attr;
506         struct mdt_reint_record *rr = &info->mti_rr;
507         struct md_op_spec *spec = &info->mti_spec;
508         bool restripe = false;
509         int rc;
510
511         ENTRY;
512         DEBUG_REQ(D_INODE, mdt_info_req(info),
513                   "Create ("DNAME"->"DFID") in "DFID,
514                   PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1));
515
516         if (!fid_is_md_operative(rr->rr_fid1))
517                 RETURN(-EPERM);
518
519         if (S_ISDIR(ma->ma_attr.la_mode) &&
520             spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
521                 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
522                 struct lu_ucred *uc = mdt_ucred(info);
523                 struct obd_export *exp = mdt_info_req(info)->rq_export;
524
525                 /* Only new clients can create remote dir( >= 2.4) and
526                  * striped dir(>= 2.6), old client will return -ENOTSUPP
527                  */
528                 if (!mdt_is_dne_client(exp))
529                         RETURN(-ENOTSUPP);
530
531                 if (le32_to_cpu(lum->lum_stripe_count) > 1) {
532                         if (!mdt_is_striped_client(exp))
533                                 RETURN(-ENOTSUPP);
534
535                         if (!mdt->mdt_enable_striped_dir)
536                                 RETURN(-EPERM);
537                 } else if (!mdt->mdt_enable_remote_dir) {
538                         RETURN(-EPERM);
539                 }
540
541                 if ((!(exp_connect_flags2(exp) & OBD_CONNECT2_CRUSH)) &&
542                     (le32_to_cpu(lum->lum_hash_type) & LMV_HASH_TYPE_MASK) >=
543                     LMV_HASH_TYPE_CRUSH)
544                         RETURN(-EPROTO);
545
546                 if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
547                     uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
548                     mdt->mdt_enable_remote_dir_gid != -1)
549                         RETURN(-EPERM);
550
551                 /* restripe if later found dir exists, MDS_OPEN_CREAT means
552                  * this is create only, don't try restripe.
553                  */
554                 if (mdt->mdt_enable_dir_restripe &&
555                     le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT &&
556                     !(spec->sp_cr_flags & MDS_OPEN_CREAT))
557                         restripe = true;
558         }
559
560         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
561
562         parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
563         if (IS_ERR(parent))
564                 RETURN(PTR_ERR(parent));
565
566         if (!mdt_object_exists(parent))
567                 GOTO(put_parent, rc = -ENOENT);
568
569         rc = mdt_check_enc(info, parent);
570         if (rc)
571                 GOTO(put_parent, rc);
572
573         /*
574          * LU-10235: check if name exists locklessly first to avoid massive
575          * lock recalls on existing directories.
576          */
577         rc = mdt_lookup_version_check(info, parent, &rr->rr_name,
578                                       &info->mti_tmp_fid1, 1);
579         if (rc == 0) {
580                 if (!restripe)
581                         GOTO(put_parent, rc = -EEXIST);
582
583                 rc = mdt_restripe(info, parent, &rr->rr_name, rr->rr_fid2, spec,
584                                   ma);
585         }
586
587         /* -ENOENT is expected here */
588         if (rc != -ENOENT)
589                 GOTO(put_parent, rc);
590
591         /* save version of file name for replay, it must be ENOENT here */
592         mdt_enoent_version_save(info, 1);
593
594         OBD_RACE(OBD_FAIL_MDS_CREATE_RACE);
595
596         lh = &info->mti_lh[MDT_LH_PARENT];
597         mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
598         rc = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE);
599         if (rc)
600                 GOTO(put_parent, rc);
601
602         if (!mdt_object_remote(parent)) {
603                 rc = mdt_version_get_check_save(info, parent, 0);
604                 if (rc)
605                         GOTO(unlock_parent, rc);
606         }
607
608         child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
609         if (unlikely(IS_ERR(child)))
610                 GOTO(unlock_parent, rc = PTR_ERR(child));
611
612         ma->ma_need = MA_INODE;
613         ma->ma_valid = 0;
614
615         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
616                         OBD_FAIL_MDS_REINT_CREATE_WRITE);
617
618         /* Version of child will be updated on disk. */
619         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
620         rc = mdt_version_get_check_save(info, child, 2);
621         if (rc)
622                 GOTO(put_child, rc);
623
624         /*
625          * Do not perform lookup sanity check. We know that name does
626          * not exist.
627          */
628         info->mti_spec.sp_cr_lookup = 0;
629         if (mdt_object_remote(parent))
630                 info->mti_spec.sp_cr_lookup = 1;
631         info->mti_spec.sp_feat = &dt_directory_features;
632
633         rc = mdo_create(info->mti_env, mdt_object_child(parent), &rr->rr_name,
634                         mdt_object_child(child), &info->mti_spec, ma);
635         if (rc == 0)
636                 rc = mdt_attr_get_complex(info, child, ma);
637
638         if (rc < 0)
639                 GOTO(put_child, rc);
640
641         /*
642          * On DNE, we need to eliminate dependey between 'mkdir a' and
643          * 'mkdir a/b' if b is a striped directory, to achieve this, two
644          * things are done below:
645          * 1. save child and slaves lock.
646          * 2. if the child is a striped directory, relock parent so to
647          *    compare against with COS locks to ensure parent was
648          *    committed to disk.
649          */
650         if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
651                 struct mdt_lock_handle *lhc;
652                 struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
653                 bool cos_incompat;
654
655                 rc = mdt_object_striped(info, child);
656                 if (rc < 0)
657                         GOTO(put_child, rc);
658
659                 cos_incompat = rc;
660                 if (cos_incompat) {
661                         if (!mdt_object_remote(parent)) {
662                                 mdt_object_unlock(info, parent, lh, 1);
663                                 mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
664                                 rc = mdt_reint_object_lock(info, parent, lh,
665                                                            MDS_INODELOCK_UPDATE,
666                                                            true);
667                                 if (rc)
668                                         GOTO(put_child, rc);
669                         }
670                 }
671
672                 lhc = &info->mti_lh[MDT_LH_CHILD];
673                 mdt_lock_handle_init(lhc);
674                 mdt_lock_reg_init(lhc, LCK_PW);
675                 rc = mdt_reint_striped_lock(info, child, lhc,
676                                             MDS_INODELOCK_UPDATE, einfo,
677                                             cos_incompat);
678                 if (rc)
679                         GOTO(put_child, rc);
680
681                 mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
682         }
683
684         /* Return fid & attr to client. */
685         if (ma->ma_valid & MA_INODE)
686                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
687                                    mdt_object_fid(child));
688         EXIT;
689 put_child:
690         mdt_object_put(info->mti_env, child);
691 unlock_parent:
692         mdt_object_unlock(info, parent, lh, rc);
693 put_parent:
694         mdt_object_put(info->mti_env, parent);
695         return rc;
696 }
697
698 static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
699                         struct md_attr *ma)
700 {
701         struct mdt_lock_handle  *lh;
702         int do_vbr = ma->ma_attr.la_valid &
703                         (LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS);
704         __u64 lockpart = MDS_INODELOCK_UPDATE;
705         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
706         bool cos_incompat;
707         int rc;
708
709         ENTRY;
710         rc = mdt_object_striped(info, mo);
711         if (rc < 0)
712                 RETURN(rc);
713
714         cos_incompat = rc;
715
716         lh = &info->mti_lh[MDT_LH_PARENT];
717         mdt_lock_reg_init(lh, LCK_PW);
718
719         /* Even though the new MDT will grant PERM lock to the old
720          * client, but the old client will almost ignore that during
721          * So it needs to revoke both LOOKUP and PERM lock here, so
722          * both new and old client can cancel the dcache
723          */
724         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
725                 lockpart |= MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM;
726         /* Clear xattr cache on clients, so the virtual project ID xattr
727          * can get the new project ID
728          */
729         if (ma->ma_attr.la_valid & LA_PROJID)
730                 lockpart |= MDS_INODELOCK_XATTR;
731
732         rc = mdt_reint_striped_lock(info, mo, lh, lockpart, einfo,
733                                     cos_incompat);
734         if (rc != 0)
735                 RETURN(rc);
736
737         /* all attrs are packed into mti_attr in unpack_setattr */
738         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
739                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
740
741         /* VBR: update version if attr changed are important for recovery */
742         if (do_vbr) {
743                 /* update on-disk version of changed object */
744                 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mo));
745                 rc = mdt_version_get_check_save(info, mo, 0);
746                 if (rc)
747                         GOTO(out_unlock, rc);
748         }
749
750         /* Ensure constant striping during chown(). See LU-2789. */
751         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
752                 mutex_lock(&mo->mot_lov_mutex);
753
754         /* all attrs are packed into mti_attr in unpack_setattr */
755         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
756
757         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
758                 mutex_unlock(&mo->mot_lov_mutex);
759
760         if (rc != 0)
761                 GOTO(out_unlock, rc);
762         mdt_dom_obj_lvb_update(info->mti_env, mo, NULL, false);
763         EXIT;
764 out_unlock:
765         mdt_reint_striped_unlock(info, mo, lh, einfo, rc);
766         return rc;
767 }
768
769 /**
770  * Check HSM flags and add HS_DIRTY flag if relevant.
771  *
772  * A file could be set dirty only if it has a copy in the backend (HS_EXISTS)
773  * and is not RELEASED.
774  */
775 int mdt_add_dirty_flag(struct mdt_thread_info *info, struct mdt_object *mo,
776                         struct md_attr *ma)
777 {
778         struct lu_ucred *uc = mdt_ucred(info);
779         kernel_cap_t cap_saved;
780         int rc;
781
782         ENTRY;
783         /* If the file was modified, add the dirty flag */
784         ma->ma_need = MA_HSM;
785         rc = mdt_attr_get_complex(info, mo, ma);
786         if (rc) {
787                 CERROR("file attribute read error for "DFID": %d.\n",
788                         PFID(mdt_object_fid(mo)), rc);
789                 RETURN(rc);
790         }
791
792         /* If an up2date copy exists in the backend, add dirty flag */
793         if ((ma->ma_valid & MA_HSM) && (ma->ma_hsm.mh_flags & HS_EXISTS)
794             && !(ma->ma_hsm.mh_flags & (HS_DIRTY|HS_RELEASED))) {
795                 ma->ma_hsm.mh_flags |= HS_DIRTY;
796
797                 /* Bump cap so that closes from non-owner writers can
798                  * set the HSM state to dirty.
799                  */
800                 cap_saved = uc->uc_cap;
801                 cap_raise(uc->uc_cap, CAP_FOWNER);
802                 rc = mdt_hsm_attr_set(info, mo, &ma->ma_hsm);
803                 uc->uc_cap = cap_saved;
804                 if (rc)
805                         CERROR("file attribute change error for "DFID": %d\n",
806                                 PFID(mdt_object_fid(mo)), rc);
807         }
808
809         RETURN(rc);
810 }
811
812 static int mdt_reint_setattr(struct mdt_thread_info *info,
813                              struct mdt_lock_handle *lhc)
814 {
815         struct mdt_device *mdt = info->mti_mdt;
816         struct md_attr *ma = &info->mti_attr;
817         struct mdt_reint_record *rr = &info->mti_rr;
818         struct ptlrpc_request *req = mdt_info_req(info);
819         struct mdt_object *mo;
820         struct mdt_body *repbody;
821         ktime_t kstart = ktime_get();
822         int rc;
823
824         ENTRY;
825         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
826                   (unsigned int)ma->ma_attr.la_valid);
827
828         if (info->mti_dlm_req)
829                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
830
831         OBD_RACE(OBD_FAIL_PTLRPC_RESEND_RACE);
832
833         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
834         mo = mdt_object_find(info->mti_env, mdt, rr->rr_fid1);
835         if (IS_ERR(mo))
836                 GOTO(out, rc = PTR_ERR(mo));
837
838         if (!mdt_object_exists(mo))
839                 GOTO(out_put, rc = -ENOENT);
840
841         if (mdt_object_remote(mo))
842                 GOTO(out_put, rc = -EREMOTE);
843
844         ma->ma_enable_chprojid_gid = mdt->mdt_enable_chprojid_gid;
845         /* revoke lease lock if size is going to be changed */
846         if (unlikely(ma->ma_attr.la_valid & LA_SIZE &&
847                      !(ma->ma_attr_flags & MDS_TRUNC_KEEP_LEASE) &&
848                      atomic_read(&mo->mot_lease_count) > 0)) {
849                 down_read(&mo->mot_open_sem);
850
851                 if (atomic_read(&mo->mot_lease_count) > 0) { /* lease exists */
852                         lhc = &info->mti_lh[MDT_LH_LOCAL];
853                         mdt_lock_reg_init(lhc, LCK_CW);
854
855                         rc = mdt_object_lock(info, mo, lhc, MDS_INODELOCK_OPEN);
856                         if (rc != 0) {
857                                 up_read(&mo->mot_open_sem);
858                                 GOTO(out_put, rc);
859                         }
860
861                         /* revoke lease lock */
862                         mdt_object_unlock(info, mo, lhc, 1);
863                 }
864                 up_read(&mo->mot_open_sem);
865         }
866
867         if (ma->ma_attr.la_valid & LA_SIZE || rr->rr_flags & MRF_OPEN_TRUNC) {
868                 /* Check write access for the O_TRUNC case */
869                 if (mdt_write_read(mo) < 0)
870                         GOTO(out_put, rc = -ETXTBSY);
871
872                 /* LU-10286: compatibility check for FLR.
873                  * Please check the comment in mdt_finish_open() for details
874                  */
875                 if (!exp_connect_flr(info->mti_exp) ||
876                     !exp_connect_overstriping(info->mti_exp)) {
877                         rc = mdt_big_xattr_get(info, mo, XATTR_NAME_LOV);
878                         if (rc < 0 && rc != -ENODATA)
879                                 GOTO(out_put, rc);
880
881                         if (!exp_connect_flr(info->mti_exp)) {
882                                 if (rc > 0 &&
883                                     mdt_lmm_is_flr(info->mti_big_lmm))
884                                         GOTO(out_put, rc = -EOPNOTSUPP);
885                         }
886
887                         if (!exp_connect_overstriping(info->mti_exp)) {
888                                 if (rc > 0 &&
889                                     mdt_lmm_is_overstriping(info->mti_big_lmm))
890                                         GOTO(out_put, rc = -EOPNOTSUPP);
891                         }
892                 }
893
894                 /* For truncate, the file size sent from client
895                  * is believable, but the blocks are incorrect,
896                  * which makes the block size in LSOM attribute
897                  * inconsisent with the real block size.
898                  */
899                 rc = mdt_lsom_update(info, mo, true);
900                 if (rc)
901                         GOTO(out_put, rc);
902         }
903
904         if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
905                 if (ma->ma_valid & MA_LOV)
906                         GOTO(out_put, rc = -EPROTO);
907
908                 /* MDT supports FMD for regular files due to Data-on-MDT */
909                 if (S_ISREG(lu_object_attr(&mo->mot_obj)) &&
910                     ma->ma_attr.la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) {
911                         tgt_fmd_update(info->mti_exp, mdt_object_fid(mo),
912                                        req->rq_xid);
913
914                         if (ma->ma_attr.la_valid & LA_MTIME) {
915                                 rc = mdt_attr_get_pfid(info, mo, &ma->ma_pfid);
916                                 if (!rc)
917                                         ma->ma_valid |= MA_PFID;
918                         }
919                 }
920
921                 rc = mdt_attr_set(info, mo, ma);
922                 if (rc)
923                         GOTO(out_put, rc);
924         } else if ((ma->ma_valid & (MA_LOV | MA_LMV)) &&
925                    (ma->ma_valid & MA_INODE)) {
926                 struct lu_buf *buf = &info->mti_buf;
927                 struct lu_ucred *uc = mdt_ucred(info);
928                 struct mdt_lock_handle *lh;
929                 const char *name;
930                 __u64 lockpart = MDS_INODELOCK_XATTR;
931
932                 /* reject if either remote or striped dir is disabled */
933                 if (ma->ma_valid & MA_LMV) {
934                         if (!mdt->mdt_enable_remote_dir ||
935                             !mdt->mdt_enable_striped_dir)
936                                 GOTO(out_put, rc = -EPERM);
937
938                         if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
939                             uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
940                             mdt->mdt_enable_remote_dir_gid != -1)
941                                 GOTO(out_put, rc = -EPERM);
942                 }
943
944                 if (!S_ISDIR(lu_object_attr(&mo->mot_obj)))
945                         GOTO(out_put, rc = -ENOTDIR);
946
947                 if (ma->ma_attr.la_valid != 0)
948                         GOTO(out_put, rc = -EPROTO);
949
950                 lh = &info->mti_lh[MDT_LH_PARENT];
951                 mdt_lock_reg_init(lh, LCK_PW);
952
953                 if (ma->ma_valid & MA_LOV) {
954                         buf->lb_buf = ma->ma_lmm;
955                         buf->lb_len = ma->ma_lmm_size;
956                         name = XATTR_NAME_LOV;
957                 } else {
958                         struct lmv_user_md *lmu = &ma->ma_lmv->lmv_user_md;
959                         struct lu_fid *pfid = &info->mti_tmp_fid1;
960                         struct lu_name *pname = &info->mti_name;
961                         const char dotdot[] = "..";
962                         struct mdt_object *pobj;
963
964                         buf->lb_buf = lmu;
965                         buf->lb_len = ma->ma_lmv_size;
966                         name = XATTR_NAME_DEFAULT_LMV;
967
968                         if (fid_is_root(rr->rr_fid1)) {
969                                 lockpart |= MDS_INODELOCK_LOOKUP;
970                         } else {
971                                 /* force client to update dir default layout */
972                                 fid_zero(pfid);
973                                 pname->ln_name = dotdot;
974                                 pname->ln_namelen = sizeof(dotdot);
975                                 rc = mdo_lookup(info->mti_env,
976                                                 mdt_object_child(mo), pname,
977                                                 pfid, NULL);
978                                 if (rc)
979                                         GOTO(out_put, rc);
980
981                                 pobj = mdt_object_find(info->mti_env, mdt,
982                                                        pfid);
983                                 if (IS_ERR(pobj))
984                                         GOTO(out_put, rc = PTR_ERR(pobj));
985
986                                 if (mdt_object_remote(pobj))
987                                         rc = mdt_remote_object_lock(info, pobj,
988                                                 mdt_object_fid(mo),
989                                                 &lh->mlh_rreg_lh, LCK_EX,
990                                                 MDS_INODELOCK_LOOKUP, false);
991                                 else
992                                         lockpart |= MDS_INODELOCK_LOOKUP;
993
994                                 mdt_object_put(info->mti_env, pobj);
995
996                                 if (rc)
997                                         GOTO(out_put, rc);
998                         }
999                 }
1000
1001                 rc = mdt_object_lock(info, mo, lh, lockpart);
1002                 if (rc != 0)
1003                         GOTO(out_put, rc);
1004
1005                 rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), buf,
1006                                   name, 0);
1007
1008                 mdt_object_unlock(info, mo, lh, rc);
1009                 if (rc)
1010                         GOTO(out_put, rc);
1011         } else {
1012                 GOTO(out_put, rc = -EPROTO);
1013         }
1014
1015         /* If file data is modified, add the dirty flag */
1016         if (ma->ma_attr_flags & MDS_DATA_MODIFIED)
1017                 rc = mdt_add_dirty_flag(info, mo, ma);
1018
1019         ma->ma_need = MA_INODE;
1020         ma->ma_valid = 0;
1021         rc = mdt_attr_get_complex(info, mo, ma);
1022         if (rc != 0)
1023                 GOTO(out_put, rc);
1024
1025         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
1026
1027         EXIT;
1028 out_put:
1029         mdt_object_put(info->mti_env, mo);
1030 out:
1031         if (rc == 0)
1032                 mdt_counter_incr(req, LPROC_MDT_SETATTR,
1033                                  ktime_us_delta(ktime_get(), kstart));
1034
1035         mdt_client_compatibility(info);
1036         return rc;
1037 }
1038
1039 static int mdt_reint_create(struct mdt_thread_info *info,
1040                             struct mdt_lock_handle *lhc)
1041 {
1042         struct ptlrpc_request   *req = mdt_info_req(info);
1043         ktime_t                 kstart = ktime_get();
1044         int                     rc;
1045
1046         ENTRY;
1047         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
1048                 RETURN(err_serious(-ESTALE));
1049
1050         if (info->mti_dlm_req)
1051                 ldlm_request_cancel(mdt_info_req(info),
1052                                     info->mti_dlm_req, 0, LATF_SKIP);
1053
1054         if (!lu_name_is_valid(&info->mti_rr.rr_name))
1055                 RETURN(-EPROTO);
1056
1057         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
1058         case S_IFDIR:
1059         case S_IFREG:
1060         case S_IFLNK:
1061         case S_IFCHR:
1062         case S_IFBLK:
1063         case S_IFIFO:
1064         case S_IFSOCK:
1065                 break;
1066         default:
1067                 CERROR("%s: Unsupported mode %o\n",
1068                        mdt_obd_name(info->mti_mdt),
1069                        info->mti_attr.ma_attr.la_mode);
1070                 RETURN(err_serious(-EOPNOTSUPP));
1071         }
1072
1073         rc = mdt_create(info);
1074         if (rc == 0) {
1075                 if ((info->mti_attr.ma_attr.la_mode & S_IFMT) == S_IFDIR)
1076                         mdt_counter_incr(req, LPROC_MDT_MKDIR,
1077                                          ktime_us_delta(ktime_get(), kstart));
1078                 else
1079                         /* Special file should stay on the same node as parent*/
1080                         mdt_counter_incr(req, LPROC_MDT_MKNOD,
1081                                          ktime_us_delta(ktime_get(), kstart));
1082         }
1083
1084         RETURN(rc);
1085 }
1086
1087 /*
1088  * VBR: save parent version in reply and child version getting by its name.
1089  * Version of child is getting and checking during its lookup. If
1090  */
1091 static int mdt_reint_unlink(struct mdt_thread_info *info,
1092                             struct mdt_lock_handle *lhc)
1093 {
1094         struct mdt_reint_record *rr = &info->mti_rr;
1095         struct ptlrpc_request *req = mdt_info_req(info);
1096         struct md_attr *ma = &info->mti_attr;
1097         struct lu_fid *child_fid = &info->mti_tmp_fid1;
1098         struct mdt_object *mp;
1099         struct mdt_object *mc;
1100         struct mdt_lock_handle *parent_lh;
1101         struct mdt_lock_handle *child_lh;
1102         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
1103         __u64 lock_ibits;
1104         bool cos_incompat = false;
1105         int no_name = 0;
1106         ktime_t kstart = ktime_get();
1107         int rc;
1108
1109         ENTRY;
1110         DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1),
1111                   PNAME(&rr->rr_name));
1112
1113         if (info->mti_dlm_req)
1114                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1115
1116         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
1117                 RETURN(err_serious(-ENOENT));
1118
1119         if (!fid_is_md_operative(rr->rr_fid1))
1120                 RETURN(-EPERM);
1121
1122         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1123         if (IS_ERR(mp))
1124                 RETURN(PTR_ERR(mp));
1125
1126         if (mdt_object_remote(mp)) {
1127                 cos_incompat = true;
1128         } else {
1129                 rc = mdt_version_get_check_save(info, mp, 0);
1130                 if (rc)
1131                         GOTO(put_parent, rc);
1132         }
1133
1134         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN);
1135         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN2);
1136 relock:
1137         parent_lh = &info->mti_lh[MDT_LH_PARENT];
1138         mdt_lock_pdo_init(parent_lh, LCK_PW, &rr->rr_name);
1139         rc = mdt_reint_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE,
1140                                    cos_incompat);
1141         if (rc != 0)
1142                 GOTO(put_parent, rc);
1143
1144         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID) {
1145                 *child_fid = *rr->rr_fid2;
1146         } else {
1147                 /* lookup child object along with version checking */
1148                 fid_zero(child_fid);
1149                 rc = mdt_lookup_version_check(info, mp, &rr->rr_name, child_fid,
1150                                               1);
1151                 if (rc != 0) {
1152                         /* Name might not be able to find during resend of
1153                          * remote unlink, considering following case.
1154                          * dir_A is a remote directory, the name entry of
1155                          * dir_A is on MDT0, the directory is on MDT1,
1156                          *
1157                          * 1. client sends unlink req to MDT1.
1158                          * 2. MDT1 sends name delete update to MDT0.
1159                          * 3. name entry is being deleted in MDT0 synchronously.
1160                          * 4. MDT1 is restarted.
1161                          * 5. client resends unlink req to MDT1. So it can not
1162                          *    find the name entry on MDT0 anymore.
1163                          * In this case, MDT1 only needs to destory the local
1164                          * directory.
1165                          */
1166                         if (mdt_object_remote(mp) && rc == -ENOENT &&
1167                             !fid_is_zero(rr->rr_fid2) &&
1168                             lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
1169                                 no_name = 1;
1170                                 *child_fid = *rr->rr_fid2;
1171                         } else {
1172                                 GOTO(unlock_parent, rc);
1173                         }
1174                 }
1175         }
1176
1177         if (!fid_is_md_operative(child_fid))
1178                 GOTO(unlock_parent, rc = -EPERM);
1179
1180         /* We will lock the child regardless it is local or remote. No harm. */
1181         mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
1182         if (IS_ERR(mc))
1183                 GOTO(unlock_parent, rc = PTR_ERR(mc));
1184
1185         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID) {
1186                 /* In this case, child fid is embedded in the request, and we do
1187                  * not have a proper name as rr_name contains an encoded
1188                  * hash. So find name that matches provided hash.
1189                  */
1190                 if (!find_name_matching_hash(info, &rr->rr_name,
1191                                              NULL, mc))
1192                         GOTO(put_child, rc = -ENOENT);
1193         }
1194
1195         if (!cos_incompat) {
1196                 rc = mdt_object_striped(info, mc);
1197                 if (rc < 0)
1198                         GOTO(put_child, rc);
1199
1200                 cos_incompat = rc;
1201                 if (cos_incompat) {
1202                         mdt_object_put(info->mti_env, mc);
1203                         mdt_object_unlock(info, mp, parent_lh, -EAGAIN);
1204                         goto relock;
1205                 }
1206         }
1207
1208         child_lh = &info->mti_lh[MDT_LH_CHILD];
1209         mdt_lock_reg_init(child_lh, LCK_EX);
1210         if (info->mti_spec.sp_rm_entry) {
1211                 struct lu_ucred *uc  = mdt_ucred(info);
1212
1213                 if (!mdt_is_dne_client(req->rq_export))
1214                         /* Return -ENOTSUPP for old client */
1215                         GOTO(put_child, rc = -ENOTSUPP);
1216
1217                 if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN))
1218                         GOTO(put_child, rc = -EPERM);
1219
1220                 ma->ma_need = MA_INODE;
1221                 ma->ma_valid = 0;
1222                 rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1223                                 NULL, &rr->rr_name, ma, no_name);
1224                 GOTO(put_child, rc);
1225         }
1226
1227         if (mdt_object_remote(mc)) {
1228                 struct mdt_body  *repbody;
1229
1230                 if (!fid_is_zero(rr->rr_fid2)) {
1231                         CDEBUG(D_INFO, "%s: name "DNAME" cannot find "DFID"\n",
1232                                mdt_obd_name(info->mti_mdt),
1233                                PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1234                         GOTO(put_child, rc = -ENOENT);
1235                 }
1236                 CDEBUG(D_INFO, "%s: name "DNAME": "DFID" is on another MDT\n",
1237                        mdt_obd_name(info->mti_mdt),
1238                        PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1239
1240                 if (!mdt_is_dne_client(req->rq_export))
1241                         /* Return -ENOTSUPP for old client */
1242                         GOTO(put_child, rc = -ENOTSUPP);
1243
1244                 /* Revoke the LOOKUP lock of the remote object granted by
1245                  * this MDT. Since the unlink will happen on another MDT,
1246                  * it will release the LOOKUP lock right away. Then What
1247                  * would happen if another client try to grab the LOOKUP
1248                  * lock at the same time with unlink XXX
1249                  */
1250                 mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_LOOKUP);
1251                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1252                 LASSERT(repbody != NULL);
1253                 repbody->mbo_fid1 = *mdt_object_fid(mc);
1254                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
1255                 GOTO(unlock_child, rc = -EREMOTE);
1256         }
1257         /* We used to acquire MDS_INODELOCK_FULL here but we can't do
1258          * this now because a running HSM restore on the child (unlink
1259          * victim) will hold the layout lock. See LU-4002.
1260          */
1261         lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
1262         if (mdt_object_remote(mp)) {
1263                 /* Enqueue lookup lock from parent MDT */
1264                 rc = mdt_remote_object_lock(info, mp, mdt_object_fid(mc),
1265                                             &child_lh->mlh_rreg_lh,
1266                                             child_lh->mlh_rreg_mode,
1267                                             MDS_INODELOCK_LOOKUP, false);
1268                 if (rc != ELDLM_OK)
1269                         GOTO(put_child, rc);
1270
1271                 lock_ibits &= ~MDS_INODELOCK_LOOKUP;
1272         }
1273
1274         rc = mdt_reint_striped_lock(info, mc, child_lh, lock_ibits, einfo,
1275                                     cos_incompat);
1276         if (rc != 0)
1277                 GOTO(put_child, rc);
1278
1279         /*
1280          * Now we can only make sure we need MA_INODE, in mdd layer, will check
1281          * whether need MA_LOV and MA_COOKIE.
1282          */
1283         ma->ma_need = MA_INODE;
1284         ma->ma_valid = 0;
1285
1286         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1287                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
1288         /* save version when object is locked */
1289         mdt_version_get_save(info, mc, 1);
1290
1291         mutex_lock(&mc->mot_lov_mutex);
1292
1293         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1294                         mdt_object_child(mc), &rr->rr_name, ma, no_name);
1295
1296         mutex_unlock(&mc->mot_lov_mutex);
1297         if (rc != 0)
1298                 GOTO(unlock_child, rc);
1299
1300         if (!lu_object_is_dying(&mc->mot_header)) {
1301                 rc = mdt_attr_get_complex(info, mc, ma);
1302                 if (rc)
1303                         GOTO(out_stat, rc);
1304         } else if (mdt_dom_check_for_discard(info, mc)) {
1305                 mdt_dom_discard_data(info, mc);
1306         }
1307         mdt_handle_last_unlink(info, mc, ma);
1308
1309 out_stat:
1310         if (ma->ma_valid & MA_INODE) {
1311                 switch (ma->ma_attr.la_mode & S_IFMT) {
1312                 case S_IFDIR:
1313                         mdt_counter_incr(req, LPROC_MDT_RMDIR,
1314                                          ktime_us_delta(ktime_get(), kstart));
1315                         break;
1316                 case S_IFREG:
1317                 case S_IFLNK:
1318                 case S_IFCHR:
1319                 case S_IFBLK:
1320                 case S_IFIFO:
1321                 case S_IFSOCK:
1322                         mdt_counter_incr(req, LPROC_MDT_UNLINK,
1323                                          ktime_us_delta(ktime_get(), kstart));
1324                         break;
1325                 default:
1326                         LASSERTF(0, "bad file type %o unlinking\n",
1327                                 ma->ma_attr.la_mode);
1328                 }
1329         }
1330
1331         EXIT;
1332
1333 unlock_child:
1334         mdt_reint_striped_unlock(info, mc, child_lh, einfo, rc);
1335 put_child:
1336         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID &&
1337             info->mti_big_buf.lb_buf)
1338                 lu_buf_free(&info->mti_big_buf);
1339         mdt_object_put(info->mti_env, mc);
1340 unlock_parent:
1341         mdt_object_unlock(info, mp, parent_lh, rc);
1342 put_parent:
1343         mdt_object_put(info->mti_env, mp);
1344         CFS_RACE_WAKEUP(OBD_FAIL_OBD_ZERO_NLINK_RACE);
1345         return rc;
1346 }
1347
1348 /*
1349  * VBR: save versions in reply: 0 - parent; 1 - child by fid; 2 - target by
1350  * name.
1351  */
1352 static int mdt_reint_link(struct mdt_thread_info *info,
1353                           struct mdt_lock_handle *lhc)
1354 {
1355         struct mdt_reint_record *rr = &info->mti_rr;
1356         struct ptlrpc_request   *req = mdt_info_req(info);
1357         struct md_attr          *ma = &info->mti_attr;
1358         struct mdt_object       *ms;
1359         struct mdt_object       *mp;
1360         struct mdt_lock_handle  *lhs;
1361         struct mdt_lock_handle  *lhp;
1362         ktime_t kstart = ktime_get();
1363         bool cos_incompat;
1364         int rc;
1365
1366         ENTRY;
1367         DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/"DNAME,
1368                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), PNAME(&rr->rr_name));
1369
1370         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
1371                 RETURN(err_serious(-ENOENT));
1372
1373         if (OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_RESEND_RACE) ||
1374             OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND)) {
1375                 req->rq_no_reply = 1;
1376                 RETURN(err_serious(-ENOENT));
1377         }
1378
1379         if (info->mti_dlm_req)
1380                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1381
1382         /* Invalid case so return error immediately instead of
1383          * processing it
1384          */
1385         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
1386                 RETURN(-EPERM);
1387
1388         if (!fid_is_md_operative(rr->rr_fid1) ||
1389             !fid_is_md_operative(rr->rr_fid2))
1390                 RETURN(-EPERM);
1391
1392         /* step 1: find target parent dir */
1393         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
1394         if (IS_ERR(mp))
1395                 RETURN(PTR_ERR(mp));
1396
1397         rc = mdt_version_get_check_save(info, mp, 0);
1398         if (rc)
1399                 GOTO(put_parent, rc);
1400
1401         rc = mdt_check_enc(info, mp);
1402         if (rc)
1403                 GOTO(put_parent, rc);
1404
1405         /* step 2: find source */
1406         ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1407         if (IS_ERR(ms))
1408                 GOTO(put_parent, rc = PTR_ERR(ms));
1409
1410         if (!mdt_object_exists(ms)) {
1411                 CDEBUG(D_INFO, "%s: "DFID" does not exist.\n",
1412                        mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1));
1413                 GOTO(put_source, rc = -ENOENT);
1414         }
1415
1416         cos_incompat = (mdt_object_remote(mp) || mdt_object_remote(ms));
1417
1418         OBD_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
1419
1420         lhp = &info->mti_lh[MDT_LH_PARENT];
1421         mdt_lock_pdo_init(lhp, LCK_PW, &rr->rr_name);
1422         rc = mdt_reint_object_lock(info, mp, lhp, MDS_INODELOCK_UPDATE,
1423                                    cos_incompat);
1424         if (rc != 0)
1425                 GOTO(put_source, rc);
1426
1427         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
1428
1429         lhs = &info->mti_lh[MDT_LH_CHILD];
1430         mdt_lock_reg_init(lhs, LCK_EX);
1431         rc = mdt_reint_object_lock(info, ms, lhs,
1432                                    MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
1433                                    cos_incompat);
1434         if (rc != 0)
1435                 GOTO(unlock_parent, rc);
1436
1437         /* step 3: link it */
1438         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1439                         OBD_FAIL_MDS_REINT_LINK_WRITE);
1440
1441         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(ms));
1442         rc = mdt_version_get_check_save(info, ms, 1);
1443         if (rc)
1444                 GOTO(unlock_source, rc);
1445
1446         /** check target version by name during replay */
1447         rc = mdt_lookup_version_check(info, mp, &rr->rr_name,
1448                                       &info->mti_tmp_fid1, 2);
1449         if (rc != 0 && rc != -ENOENT)
1450                 GOTO(unlock_source, rc);
1451         /* save version of file name for replay, it must be ENOENT here */
1452         if (!req_is_replay(mdt_info_req(info))) {
1453                 if (rc != -ENOENT) {
1454                         CDEBUG(D_INFO, "link target "DNAME" existed!\n",
1455                                PNAME(&rr->rr_name));
1456                         GOTO(unlock_source, rc = -EEXIST);
1457                 }
1458                 info->mti_ver[2] = ENOENT_VERSION;
1459                 mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2);
1460         }
1461
1462         rc = mdo_link(info->mti_env, mdt_object_child(mp),
1463                       mdt_object_child(ms), &rr->rr_name, ma);
1464
1465         if (rc == 0)
1466                 mdt_counter_incr(req, LPROC_MDT_LINK,
1467                                  ktime_us_delta(ktime_get(), kstart));
1468
1469         EXIT;
1470 unlock_source:
1471         mdt_object_unlock(info, ms, lhs, rc);
1472 unlock_parent:
1473         mdt_object_unlock(info, mp, lhp, rc);
1474 put_source:
1475         mdt_object_put(info->mti_env, ms);
1476 put_parent:
1477         mdt_object_put(info->mti_env, mp);
1478         return rc;
1479 }
1480 /**
1481  * lock the part of the directory according to the hash of the name
1482  * (lh->mlh_pdo_hash) in parallel directory lock.
1483  */
1484 static int mdt_pdir_hash_lock(struct mdt_thread_info *info,
1485                               struct mdt_lock_handle *lh,
1486                               struct mdt_object *obj, __u64 ibits,
1487                               bool cos_incompat)
1488 {
1489         struct ldlm_res_id *res = &info->mti_res_id;
1490         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1491         union ldlm_policy_data *policy = &info->mti_policy;
1492         __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
1493         int rc;
1494
1495         /*
1496          * Finish res_id initializing by name hash marking part of
1497          * directory which is taking modification.
1498          */
1499         LASSERT(lh->mlh_pdo_hash != 0);
1500         fid_build_pdo_res_name(mdt_object_fid(obj), lh->mlh_pdo_hash, res);
1501         memset(policy, 0, sizeof(*policy));
1502         policy->l_inodebits.bits = ibits;
1503         if (cos_incompat &&
1504             (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX))
1505                 dlmflags |= LDLM_FL_COS_INCOMPAT;
1506         /*
1507          * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
1508          * going to be sent to client. If it is - mdt_intent_policy() path will
1509          * fix it up and turn FL_LOCAL flag off.
1510          */
1511         rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode,
1512                           policy, res, dlmflags,
1513                           &info->mti_exp->exp_handle.h_cookie);
1514         return rc;
1515 }
1516
1517 /**
1518  * Get BFL lock for rename or migrate process.
1519  **/
1520 static int mdt_rename_lock(struct mdt_thread_info *info,
1521                            struct lustre_handle *lh)
1522 {
1523         int     rc;
1524
1525         ENTRY;
1526         if (mdt_seq_site(info->mti_mdt)->ss_node_id != 0) {
1527                 struct lu_fid *fid = &info->mti_tmp_fid1;
1528                 struct mdt_object *obj;
1529
1530                 /* XXX, right now, it has to use object API to
1531                  * enqueue lock cross MDT, so it will enqueue
1532                  * rename lock(with LUSTRE_BFL_FID) by root object
1533                  */
1534                 lu_root_fid(fid);
1535                 obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1536                 if (IS_ERR(obj))
1537                         RETURN(PTR_ERR(obj));
1538
1539                 rc = mdt_remote_object_lock(info, obj,
1540                                             &LUSTRE_BFL_FID, lh,
1541                                             LCK_EX,
1542                                             MDS_INODELOCK_UPDATE, false);
1543                 mdt_object_put(info->mti_env, obj);
1544         } else {
1545                 struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1546                 union ldlm_policy_data *policy = &info->mti_policy;
1547                 struct ldlm_res_id *res_id = &info->mti_res_id;
1548                 __u64 flags = 0;
1549
1550                 fid_build_reg_res_name(&LUSTRE_BFL_FID, res_id);
1551                 memset(policy, 0, sizeof(*policy));
1552                 policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
1553                 flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
1554                 rc = ldlm_cli_enqueue_local(info->mti_env, ns, res_id,
1555                                             LDLM_IBITS, policy, LCK_EX, &flags,
1556                                             ldlm_blocking_ast,
1557                                             ldlm_completion_ast, NULL, NULL, 0,
1558                                             LVB_T_NONE,
1559                                             &info->mti_exp->exp_handle.h_cookie,
1560                                             lh);
1561                 RETURN(rc);
1562         }
1563         RETURN(rc);
1564 }
1565
1566 static void mdt_rename_unlock(struct lustre_handle *lh)
1567 {
1568         ENTRY;
1569         LASSERT(lustre_handle_is_used(lh));
1570         /* Cancel the single rename lock right away */
1571         ldlm_lock_decref_and_cancel(lh, LCK_EX);
1572         EXIT;
1573 }
1574
1575 static struct mdt_object *mdt_parent_find_check(struct mdt_thread_info *info,
1576                                                 const struct lu_fid *fid,
1577                                                 int idx)
1578 {
1579         struct mdt_object *dir;
1580         int rc;
1581
1582         ENTRY;
1583         dir = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1584         if (IS_ERR(dir))
1585                 RETURN(dir);
1586
1587         /* check early, the real version will be saved after locking */
1588         rc = mdt_version_get_check(info, dir, idx);
1589         if (rc)
1590                 GOTO(out_put, rc);
1591
1592         if (!mdt_object_exists(dir))
1593                 GOTO(out_put, rc = -ENOENT);
1594
1595         if (!S_ISDIR(lu_object_attr(&dir->mot_obj)))
1596                 GOTO(out_put, rc = -ENOTDIR);
1597
1598         RETURN(dir);
1599 out_put:
1600         mdt_object_put(info->mti_env, dir);
1601         return ERR_PTR(rc);
1602 }
1603
1604 /*
1605  * in case obj is remote obj on its parent, revoke LOOKUP lock,
1606  * herein we don't really check it, just do revoke.
1607  */
1608 int mdt_revoke_remote_lookup_lock(struct mdt_thread_info *info,
1609                                   struct mdt_object *pobj,
1610                                   struct mdt_object *obj)
1611 {
1612         struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
1613         int rc;
1614
1615         mdt_lock_handle_init(lh);
1616         mdt_lock_reg_init(lh, LCK_EX);
1617
1618         if (mdt_object_remote(pobj)) {
1619                 /* don't bother to check if pobj and obj are on the same MDT. */
1620                 rc = mdt_remote_object_lock(info, pobj, mdt_object_fid(obj),
1621                                             &lh->mlh_rreg_lh, LCK_EX,
1622                                             MDS_INODELOCK_LOOKUP, false);
1623         } else if (mdt_object_remote(obj)) {
1624                 struct ldlm_res_id *res = &info->mti_res_id;
1625                 union ldlm_policy_data *policy = &info->mti_policy;
1626                 __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB |
1627                                  LDLM_FL_COS_INCOMPAT;
1628
1629                 fid_build_reg_res_name(mdt_object_fid(obj), res);
1630                 memset(policy, 0, sizeof(*policy));
1631                 policy->l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1632                 rc = mdt_fid_lock(info->mti_env, info->mti_mdt->mdt_namespace,
1633                                   &lh->mlh_reg_lh, LCK_EX, policy, res,
1634                                   dlmflags, NULL);
1635         } else {
1636                 /* do nothing if both are local */
1637                 return 0;
1638         }
1639
1640         if (rc != ELDLM_OK)
1641                 return rc;
1642
1643         /*
1644          * TODO, currently we don't save this lock because there is no place to
1645          * hold this lock handle, but to avoid race we need to save this lock.
1646          */
1647         mdt_object_unlock(info, NULL, lh, 1);
1648
1649         return 0;
1650 }
1651
1652 /*
1653  * operation may takes locks of linkea, or directory stripes, group them in
1654  * different list.
1655  */
1656 struct mdt_sub_lock {
1657         struct mdt_object *msl_obj;
1658         struct mdt_lock_handle msl_lh;
1659         struct list_head msl_linkage;
1660 };
1661
1662 static void mdt_unlock_list(struct mdt_thread_info *info,
1663                             struct list_head *list, int decref)
1664 {
1665         struct mdt_sub_lock *msl;
1666         struct mdt_sub_lock *tmp;
1667
1668         list_for_each_entry_safe(msl, tmp, list, msl_linkage) {
1669                 mdt_object_unlock_put(info, msl->msl_obj, &msl->msl_lh, decref);
1670                 list_del(&msl->msl_linkage);
1671                 OBD_FREE_PTR(msl);
1672         }
1673 }
1674
1675 static inline void mdt_migrate_object_unlock(struct mdt_thread_info *info,
1676                                              struct mdt_object *obj,
1677                                              struct mdt_lock_handle *lh,
1678                                              struct ldlm_enqueue_info *einfo,
1679                                              struct list_head *slave_locks,
1680                                              int decref)
1681 {
1682         if (mdt_object_remote(obj)) {
1683                 mdt_unlock_list(info, slave_locks, decref);
1684                 mdt_object_unlock(info, obj, lh, decref);
1685         } else {
1686                 mdt_reint_striped_unlock(info, obj, lh, einfo, decref);
1687         }
1688 }
1689
1690 /*
1691  * lock parents of links, and also check whether total locks don't exceed
1692  * RS_MAX_LOCKS.
1693  *
1694  * \retval      0 on success, and locks can be saved in ptlrpc_reply_stat
1695  * \retval      1 on success, but total lock count may exceed RS_MAX_LOCKS
1696  * \retval      -ev negative errno upon error
1697  */
1698 static int mdt_link_parents_lock(struct mdt_thread_info *info,
1699                                  struct mdt_object *pobj,
1700                                  const struct md_attr *ma,
1701                                  struct mdt_object *obj,
1702                                  struct mdt_lock_handle *lhp,
1703                                  struct ldlm_enqueue_info *peinfo,
1704                                  struct list_head *parent_slave_locks,
1705                                  struct list_head *link_locks)
1706 {
1707         struct mdt_device *mdt = info->mti_mdt;
1708         struct lu_buf *buf = &info->mti_big_buf;
1709         struct lu_name *lname = &info->mti_name;
1710         struct linkea_data ldata = { NULL };
1711         bool blocked = false;
1712         int local_lnkp_cnt = 0;
1713         int rc;
1714
1715         ENTRY;
1716         if (S_ISDIR(lu_object_attr(&obj->mot_obj)))
1717                 RETURN(0);
1718
1719         buf = lu_buf_check_and_alloc(buf, MAX_LINKEA_SIZE);
1720         if (buf->lb_buf == NULL)
1721                 RETURN(-ENOMEM);
1722
1723         ldata.ld_buf = buf;
1724         rc = mdt_links_read(info, obj, &ldata);
1725         if (rc) {
1726                 if (rc == -ENOENT || rc == -ENODATA)
1727                         rc = 0;
1728                 RETURN(rc);
1729         }
1730
1731         for (linkea_first_entry(&ldata); ldata.ld_lee && !rc;
1732              linkea_next_entry(&ldata)) {
1733                 struct mdt_object *lnkp;
1734                 struct mdt_sub_lock *msl;
1735                 struct lu_fid fid;
1736                 __u64 ibits;
1737
1738                 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, lname,
1739                                     &fid);
1740
1741                 /* check if it's also linked to parent */
1742                 if (lu_fid_eq(mdt_object_fid(pobj), &fid)) {
1743                         CDEBUG(D_INFO, "skip parent "DFID", reovke "DNAME"\n",
1744                                PFID(&fid), PNAME(lname));
1745                         /* in case link is remote object, revoke LOOKUP lock */
1746                         rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
1747                         continue;
1748                 }
1749
1750                 lnkp = NULL;
1751
1752                 /* check if it's linked to a stripe of parent */
1753                 if (ma->ma_valid & MA_LMV) {
1754                         struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1755                         struct lu_fid *stripe_fid = &info->mti_tmp_fid1;
1756                         int j = 0;
1757
1758                         for (; j < le32_to_cpu(lmv->lmv_stripe_count); j++) {
1759                                 fid_le_to_cpu(stripe_fid,
1760                                               &lmv->lmv_stripe_fids[j]);
1761                                 if (lu_fid_eq(stripe_fid, &fid)) {
1762                                         CDEBUG(D_INFO, "skip stripe "DFID
1763                                                ", reovke "DNAME"\n",
1764                                                PFID(&fid), PNAME(lname));
1765                                         lnkp = mdt_object_find(info->mti_env,
1766                                                                mdt, &fid);
1767                                         if (IS_ERR(lnkp))
1768                                                 GOTO(out, rc = PTR_ERR(lnkp));
1769                                         break;
1770                                 }
1771                         }
1772
1773                         if (lnkp) {
1774                                 rc = mdt_revoke_remote_lookup_lock(info, lnkp,
1775                                                                    obj);
1776                                 mdt_object_put(info->mti_env, lnkp);
1777                                 continue;
1778                         }
1779                 }
1780
1781                 /* Check if it's already locked */
1782                 list_for_each_entry(msl, link_locks, msl_linkage) {
1783                         if (lu_fid_eq(mdt_object_fid(msl->msl_obj), &fid)) {
1784                                 CDEBUG(D_INFO,
1785                                        DFID" was locked, revoke "DNAME"\n",
1786                                        PFID(&fid), PNAME(lname));
1787                                 lnkp = msl->msl_obj;
1788                                 break;
1789                         }
1790                 }
1791
1792                 if (lnkp) {
1793                         rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
1794                         continue;
1795                 }
1796
1797                 CDEBUG(D_INFO, "lock "DFID":"DNAME"\n",
1798                        PFID(&fid), PNAME(lname));
1799
1800                 lnkp = mdt_object_find(info->mti_env, mdt, &fid);
1801                 if (IS_ERR(lnkp)) {
1802                         CWARN("%s: cannot find obj "DFID": %ld\n",
1803                               mdt_obd_name(mdt), PFID(&fid), PTR_ERR(lnkp));
1804                         continue;
1805                 }
1806
1807                 if (!mdt_object_exists(lnkp)) {
1808                         CDEBUG(D_INFO, DFID" doesn't exist, skip "DNAME"\n",
1809                               PFID(&fid), PNAME(lname));
1810                         mdt_object_put(info->mti_env, lnkp);
1811                         continue;
1812                 }
1813
1814                 if (!mdt_object_remote(lnkp))
1815                         local_lnkp_cnt++;
1816
1817                 OBD_ALLOC_PTR(msl);
1818                 if (msl == NULL)
1819                         GOTO(out, rc = -ENOMEM);
1820
1821                 /*
1822                  * we can't follow parent-child lock order like other MD
1823                  * operations, use lock_try here to avoid deadlock, if the lock
1824                  * cannot be taken, drop all locks taken, revoke the blocked
1825                  * one, and continue processing the remaining entries, and in
1826                  * the end of the loop restart from beginning.
1827                  */
1828                 mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
1829                 ibits = 0;
1830                 rc = mdt_object_lock_try(info, lnkp, &msl->msl_lh, &ibits,
1831                                          MDS_INODELOCK_UPDATE, true);
1832                 if (!(ibits & MDS_INODELOCK_UPDATE)) {
1833
1834                         CDEBUG(D_INFO, "busy lock on "DFID" "DNAME"\n",
1835                                PFID(&fid), PNAME(lname));
1836
1837                         mdt_unlock_list(info, link_locks, 1);
1838                         /* also unlock parent locks to avoid deadlock */
1839                         if (!blocked)
1840                                 mdt_migrate_object_unlock(info, pobj, lhp,
1841                                                           peinfo,
1842                                                           parent_slave_locks,
1843                                                           1);
1844
1845                         blocked = true;
1846
1847                         mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
1848                         rc = mdt_object_lock(info, lnkp, &msl->msl_lh,
1849                                              MDS_INODELOCK_UPDATE);
1850                         if (rc) {
1851                                 mdt_object_put(info->mti_env, lnkp);
1852                                 OBD_FREE_PTR(msl);
1853                                 GOTO(out, rc);
1854                         }
1855
1856                         if (mdt_object_remote(lnkp)) {
1857                                 struct ldlm_lock *lock;
1858
1859                                 /*
1860                                  * for remote object, set lock cb_atomic,
1861                                  * so lock can be released in blocking_ast()
1862                                  * immediately, then the next lock_try will
1863                                  * have better chance of success.
1864                                  */
1865                                 lock = ldlm_handle2lock(
1866                                                 &msl->msl_lh.mlh_rreg_lh);
1867                                 LASSERT(lock != NULL);
1868                                 lock_res_and_lock(lock);
1869                                 ldlm_set_atomic_cb(lock);
1870                                 unlock_res_and_lock(lock);
1871                                 LDLM_LOCK_PUT(lock);
1872                         }
1873
1874                         mdt_object_unlock_put(info, lnkp, &msl->msl_lh, 1);
1875                         OBD_FREE_PTR(msl);
1876                         continue;
1877                 }
1878
1879                 INIT_LIST_HEAD(&msl->msl_linkage);
1880                 msl->msl_obj = lnkp;
1881                 list_add_tail(&msl->msl_linkage, link_locks);
1882
1883                 rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
1884         }
1885
1886         if (blocked)
1887                 GOTO(out, rc = -EBUSY);
1888
1889         EXIT;
1890 out:
1891         if (rc) {
1892                 mdt_unlock_list(info, link_locks, rc);
1893         } else if (local_lnkp_cnt > RS_MAX_LOCKS - 5) {
1894                 CDEBUG(D_INFO, "Too many links (%d), sync operations\n",
1895                        local_lnkp_cnt);
1896                 /*
1897                  * parent may have 3 local objects: master object and 2 stripes
1898                  * (if it's being migrated too); source may have 1 local objects
1899                  * as regular file; target has 1 local object.
1900                  * Note, source may have 2 local locks if it is directory but it
1901                  * can't have hardlinks, so it is not considered here.
1902                  */
1903                 rc = 1;
1904         }
1905         return rc;
1906 }
1907
1908 static int mdt_lock_remote_slaves(struct mdt_thread_info *info,
1909                                   struct mdt_object *obj,
1910                                   const struct md_attr *ma,
1911                                   struct list_head *slave_locks)
1912 {
1913         struct mdt_device *mdt = info->mti_mdt;
1914         const struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1915         struct lu_fid *fid = &info->mti_tmp_fid1;
1916         struct mdt_object *slave;
1917         struct mdt_sub_lock *msl;
1918         int i;
1919         int rc;
1920
1921         ENTRY;
1922         LASSERT(mdt_object_remote(obj));
1923         LASSERT(ma->ma_valid & MA_LMV);
1924         LASSERT(lmv);
1925
1926         if (!lmv_is_sane(lmv))
1927                 RETURN(-EINVAL);
1928
1929         for (i = 0; i < le32_to_cpu(lmv->lmv_stripe_count); i++) {
1930                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[i]);
1931
1932                 if (!fid_is_sane(fid))
1933                         continue;
1934
1935                 slave = mdt_object_find(info->mti_env, mdt, fid);
1936                 if (IS_ERR(slave))
1937                         GOTO(out, rc = PTR_ERR(slave));
1938
1939                 OBD_ALLOC_PTR(msl);
1940                 if (!msl) {
1941                         mdt_object_put(info->mti_env, slave);
1942                         GOTO(out, rc = -ENOMEM);
1943                 }
1944
1945                 mdt_lock_reg_init(&msl->msl_lh, LCK_EX);
1946                 rc = mdt_reint_object_lock(info, slave, &msl->msl_lh,
1947                                            MDS_INODELOCK_UPDATE, true);
1948                 if (rc) {
1949                         OBD_FREE_PTR(msl);
1950                         mdt_object_put(info->mti_env, slave);
1951                         GOTO(out, rc);
1952                 }
1953
1954                 INIT_LIST_HEAD(&msl->msl_linkage);
1955                 msl->msl_obj = slave;
1956                 list_add_tail(&msl->msl_linkage, slave_locks);
1957         }
1958         EXIT;
1959
1960 out:
1961         if (rc)
1962                 mdt_unlock_list(info, slave_locks, rc);
1963         return rc;
1964 }
1965
1966 /* lock parent and its stripes */
1967 static int mdt_migrate_parent_lock(struct mdt_thread_info *info,
1968                                    struct mdt_object *obj,
1969                                    const struct md_attr *ma,
1970                                    struct mdt_lock_handle *lh,
1971                                    struct ldlm_enqueue_info *einfo,
1972                                    struct list_head *slave_locks)
1973 {
1974         int rc;
1975
1976         if (mdt_object_remote(obj)) {
1977                 rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
1978                                             &lh->mlh_rreg_lh, LCK_PW,
1979                                             MDS_INODELOCK_UPDATE, false);
1980                 if (rc != ELDLM_OK)
1981                         return rc;
1982
1983                 /*
1984                  * if obj is remote and striped, lock its stripes explicitly
1985                  * because it's not striped in LOD layer on this MDT.
1986                  */
1987                 if (ma->ma_valid & MA_LMV) {
1988                         rc = mdt_lock_remote_slaves(info, obj, ma, slave_locks);
1989                         if (rc)
1990                                 mdt_object_unlock(info, obj, lh, rc);
1991                 }
1992         } else {
1993                 rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_UPDATE,
1994                                             einfo, true);
1995         }
1996
1997         return rc;
1998 }
1999
2000 /*
2001  * in migration, object may be remote, and we need take full lock of it and its
2002  * stripes if it's directory, besides, object may be a remote object on its
2003  * parent, revoke its LOOKUP lock on where its parent is located.
2004  */
2005 static int mdt_migrate_object_lock(struct mdt_thread_info *info,
2006                                    struct mdt_object *pobj,
2007                                    struct mdt_object *obj,
2008                                    struct mdt_lock_handle *lh,
2009                                    struct ldlm_enqueue_info *einfo,
2010                                    struct list_head *slave_locks)
2011 {
2012         int rc;
2013
2014         if (mdt_object_remote(obj)) {
2015                 rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
2016                 if (rc)
2017                         return rc;
2018
2019                 rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
2020                                             &lh->mlh_rreg_lh, LCK_EX,
2021                                             MDS_INODELOCK_FULL, false);
2022                 if (rc != ELDLM_OK)
2023                         return rc;
2024
2025                 /*
2026                  * if obj is remote and striped, lock its stripes explicitly
2027                  * because it's not striped in LOD layer on this MDT.
2028                  */
2029                 if (S_ISDIR(lu_object_attr(&obj->mot_obj))) {
2030                         struct md_attr *ma = &info->mti_attr;
2031
2032                         rc = mdt_stripe_get(info, obj, ma, XATTR_NAME_LMV);
2033                         if (rc) {
2034                                 mdt_object_unlock(info, obj, lh, rc);
2035                                 return rc;
2036                         }
2037
2038                         if (ma->ma_valid & MA_LMV) {
2039                                 rc = mdt_lock_remote_slaves(info, obj, ma,
2040                                                             slave_locks);
2041                                 if (rc)
2042                                         mdt_object_unlock(info, obj, lh, rc);
2043                         }
2044                 }
2045         } else {
2046                 if (mdt_object_remote(pobj)) {
2047                         rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
2048                         if (rc)
2049                                 return rc;
2050                 }
2051
2052                 rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_FULL,
2053                                             einfo, true);
2054         }
2055
2056         return rc;
2057 }
2058
2059 /*
2060  * lookup source by name, if parent is striped directory, we need to find the
2061  * corresponding stripe where source is located, and then lookup there.
2062  *
2063  * besides, if parent is migrating too, and file is already in target stripe,
2064  * this should be a redo of 'lfs migrate' on client side.
2065  */
2066 static int mdt_migrate_lookup(struct mdt_thread_info *info,
2067                               struct mdt_object *pobj,
2068                               const struct md_attr *ma,
2069                               const struct lu_name *lname,
2070                               struct mdt_object **spobj,
2071                               struct mdt_object **sobj)
2072 {
2073         const struct lu_env *env = info->mti_env;
2074         struct lu_fid *fid = &info->mti_tmp_fid1;
2075         struct mdt_object *stripe;
2076         int rc;
2077
2078         if (ma->ma_valid & MA_LMV) {
2079                 /* if parent is striped, lookup on corresponding stripe */
2080                 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
2081
2082                 if (!lmv_is_sane(lmv))
2083                         return -EBADF;
2084
2085                 rc = lmv_name_to_stripe_index_old(lmv, lname->ln_name,
2086                                                   lname->ln_namelen);
2087                 if (rc < 0)
2088                         return rc;
2089
2090                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
2091
2092                 stripe = mdt_object_find(env, info->mti_mdt, fid);
2093                 if (IS_ERR(stripe))
2094                         return PTR_ERR(stripe);
2095
2096                 fid_zero(fid);
2097                 rc = mdo_lookup(env, mdt_object_child(stripe), lname, fid,
2098                                 &info->mti_spec);
2099                 if (rc == -ENOENT && lmv_is_layout_changing(lmv)) {
2100                         /*
2101                          * if parent layout is changeing, and lookup child
2102                          * failed on source stripe, lookup again on target
2103                          * stripe, if it exists, it means previous migration
2104                          * was interrupted, and current file was migrated
2105                          * already.
2106                          */
2107                         mdt_object_put(env, stripe);
2108
2109                         rc = lmv_name_to_stripe_index(lmv, lname->ln_name,
2110                                                       lname->ln_namelen);
2111                         if (rc < 0)
2112                                 return rc;
2113
2114                         fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
2115
2116                         stripe = mdt_object_find(env, info->mti_mdt, fid);
2117                         if (IS_ERR(stripe))
2118                                 return PTR_ERR(stripe);
2119
2120                         fid_zero(fid);
2121                         rc = mdo_lookup(env, mdt_object_child(stripe), lname,
2122                                         fid, &info->mti_spec);
2123                         mdt_object_put(env, stripe);
2124                         return rc ?: -EALREADY;
2125                 } else if (rc) {
2126                         mdt_object_put(env, stripe);
2127                         return rc;
2128                 }
2129         } else {
2130                 fid_zero(fid);
2131                 rc = mdo_lookup(env, mdt_object_child(pobj), lname, fid,
2132                                 &info->mti_spec);
2133                 if (rc)
2134                         return rc;
2135
2136                 stripe = pobj;
2137                 mdt_object_get(env, stripe);
2138         }
2139
2140         *spobj = stripe;
2141
2142         *sobj = mdt_object_find(env, info->mti_mdt, fid);
2143         if (IS_ERR(*sobj)) {
2144                 mdt_object_put(env, stripe);
2145                 rc = PTR_ERR(*sobj);
2146                 *spobj = NULL;
2147                 *sobj = NULL;
2148         }
2149
2150         return rc;
2151 }
2152
2153 /* end lease and close file for regular file */
2154 static int mdd_migrate_close(struct mdt_thread_info *info,
2155                              struct mdt_object *obj)
2156 {
2157         struct close_data *data;
2158         struct mdt_body *repbody;
2159         struct ldlm_lock *lease;
2160         int rc;
2161         int rc2;
2162
2163         rc = -EPROTO;
2164         if (!req_capsule_field_present(info->mti_pill, &RMF_MDT_EPOCH,
2165                                       RCL_CLIENT) ||
2166             !req_capsule_field_present(info->mti_pill, &RMF_CLOSE_DATA,
2167                                       RCL_CLIENT))
2168                 goto close;
2169
2170         data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
2171         if (!data)
2172                 goto close;
2173
2174         rc = -ESTALE;
2175         lease = ldlm_handle2lock(&data->cd_handle);
2176         if (!lease)
2177                 goto close;
2178
2179         /* check if the lease was already canceled */
2180         lock_res_and_lock(lease);
2181         rc = ldlm_is_cancel(lease);
2182         unlock_res_and_lock(lease);
2183
2184         if (rc) {
2185                 rc = -EAGAIN;
2186                 LDLM_DEBUG(lease, DFID" lease broken",
2187                            PFID(mdt_object_fid(obj)));
2188         }
2189
2190         /*
2191          * cancel server side lease, client side counterpart should have been
2192          * cancelled, it's okay to cancel it now as we've held mot_open_sem.
2193          */
2194         ldlm_lock_cancel(lease);
2195         ldlm_reprocess_all(lease->l_resource,
2196                            lease->l_policy_data.l_inodebits.bits);
2197         LDLM_LOCK_PUT(lease);
2198
2199 close:
2200         rc2 = mdt_close_internal(info, mdt_info_req(info), NULL);
2201         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2202         repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
2203
2204         return rc ?: rc2;
2205 }
2206
2207 /*
2208  * migrate file in below steps:
2209  *  1. lock parent and its stripes
2210  *  2. lookup source by name
2211  *  3. lock parents of source links if source is not directory
2212  *  4. reject if source is in HSM
2213  *  5. take source open_sem and close file if source is regular file
2214  *  6. lock source and its stripes if it's directory
2215  *  7. lock target so subsequent change to it can trigger COS
2216  *  8. migrate file
2217  *  9. unlock above locks
2218  * 10. sync device if source has links
2219  */
2220 int mdt_reint_migrate(struct mdt_thread_info *info,
2221                       struct mdt_lock_handle *unused)
2222 {
2223         const struct lu_env *env = info->mti_env;
2224         struct mdt_device *mdt = info->mti_mdt;
2225         struct ptlrpc_request *req = mdt_info_req(info);
2226         struct mdt_reint_record *rr = &info->mti_rr;
2227         struct lu_ucred *uc = mdt_ucred(info);
2228         struct md_attr *ma = &info->mti_attr;
2229         struct ldlm_enqueue_info *peinfo = &info->mti_einfo[0];
2230         struct ldlm_enqueue_info *seinfo = &info->mti_einfo[1];
2231         struct mdt_object *pobj;
2232         struct mdt_object *spobj = NULL;
2233         struct mdt_object *sobj = NULL;
2234         struct mdt_object *tobj;
2235         struct lustre_handle rename_lh = { 0 };
2236         struct mdt_lock_handle *lhp;
2237         struct mdt_lock_handle *lhs;
2238         struct mdt_lock_handle *lht;
2239         LIST_HEAD(parent_slave_locks);
2240         LIST_HEAD(child_slave_locks);
2241         LIST_HEAD(link_locks);
2242         int lock_retries = 5;
2243         bool open_sem_locked = false;
2244         bool do_sync = false;
2245         int rc;
2246
2247         ENTRY;
2248         CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1),
2249                PNAME(&rr->rr_name), PFID(rr->rr_fid2));
2250
2251         if (info->mti_dlm_req)
2252                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2253
2254         if (!fid_is_md_operative(rr->rr_fid1) ||
2255             !fid_is_md_operative(rr->rr_fid2))
2256                 RETURN(-EPERM);
2257
2258         /* don't allow migrate . or .. */
2259         if (lu_name_is_dot_or_dotdot(&rr->rr_name))
2260                 RETURN(-EBUSY);
2261
2262         if (!mdt->mdt_enable_remote_dir || !mdt->mdt_enable_dir_migration)
2263                 RETURN(-EPERM);
2264
2265         if (uc && !cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
2266             uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
2267             mdt->mdt_enable_remote_dir_gid != -1)
2268                 RETURN(-EPERM);
2269
2270         /*
2271          * Note: do not enqueue rename lock for replay request, because
2272          * if other MDT holds rename lock, but being blocked to wait for
2273          * this MDT to finish its recovery, and the failover MDT can not
2274          * get rename lock, which will cause deadlock.
2275          *
2276          * req is NULL if this is called by directory auto-split.
2277          */
2278         if (req && !req_is_replay(req)) {
2279                 rc = mdt_rename_lock(info, &rename_lh);
2280                 if (rc != 0) {
2281                         CERROR("%s: can't lock FS for rename: rc = %d\n",
2282                                mdt_obd_name(info->mti_mdt), rc);
2283                         RETURN(rc);
2284                 }
2285         }
2286
2287         /* pobj is master object of parent */
2288         pobj = mdt_object_find(env, mdt, rr->rr_fid1);
2289         if (IS_ERR(pobj))
2290                 GOTO(unlock_rename, rc = PTR_ERR(pobj));
2291
2292         if (req) {
2293                 rc = mdt_version_get_check(info, pobj, 0);
2294                 if (rc)
2295                         GOTO(put_parent, rc);
2296         }
2297
2298         if (!mdt_object_exists(pobj))
2299                 GOTO(put_parent, rc = -ENOENT);
2300
2301         if (!S_ISDIR(lu_object_attr(&pobj->mot_obj)))
2302                 GOTO(put_parent, rc = -ENOTDIR);
2303
2304         rc = mdt_check_enc(info, pobj);
2305         if (rc)
2306                 GOTO(put_parent, rc);
2307
2308         rc = mdt_stripe_get(info, pobj, ma, XATTR_NAME_LMV);
2309         if (rc)
2310                 GOTO(put_parent, rc);
2311
2312 lock_parent:
2313         /* lock parent object */
2314         lhp = &info->mti_lh[MDT_LH_PARENT];
2315         mdt_lock_reg_init(lhp, LCK_PW);
2316         rc = mdt_migrate_parent_lock(info, pobj, ma, lhp, peinfo,
2317                                      &parent_slave_locks);
2318         if (rc)
2319                 GOTO(put_parent, rc);
2320
2321         /*
2322          * spobj is the corresponding stripe against name if pobj is striped
2323          * directory, which is the real parent, and no need to lock, because
2324          * we've taken full lock of pobj.
2325          */
2326         rc = mdt_migrate_lookup(info, pobj, ma, &rr->rr_name, &spobj, &sobj);
2327         if (rc)
2328                 GOTO(unlock_parent, rc);
2329
2330         /* lock parents of source links, and revoke LOOKUP lock of links */
2331         rc = mdt_link_parents_lock(info, pobj, ma, sobj, lhp, peinfo,
2332                                    &parent_slave_locks, &link_locks);
2333         if (rc == -EBUSY && lock_retries-- > 0) {
2334                 mdt_object_put(env, sobj);
2335                 mdt_object_put(env, spobj);
2336                 goto lock_parent;
2337         }
2338
2339         if (rc < 0)
2340                 GOTO(put_source, rc);
2341
2342         /*
2343          * RS_MAX_LOCKS is the limit of number of locks that can be saved along
2344          * with one request, if total lock count exceeds this limit, we will
2345          * drop all locks after migration, and synchronous device in the end.
2346          */
2347         do_sync = rc;
2348
2349         /* TODO: DoM migration is not supported, migrate dirent only */
2350         if (S_ISREG(lu_object_attr(&sobj->mot_obj))) {
2351                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LOV);
2352                 if (rc)
2353                         GOTO(unlock_links, rc);
2354
2355                 if (ma->ma_valid & MA_LOV && mdt_lmm_dom_stripesize(ma->ma_lmm))
2356                         info->mti_spec.sp_migrate_nsonly = 1;
2357         } else if (S_ISDIR(lu_object_attr(&sobj->mot_obj))) {
2358                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2359                 if (rc)
2360                         GOTO(unlock_links, rc);
2361
2362                 /* race with restripe/auto-split? */
2363                 if ((ma->ma_valid & MA_LMV) &&
2364                     lmv_is_restriping(&ma->ma_lmv->lmv_md_v1))
2365                         GOTO(unlock_links, rc = -EBUSY);
2366         }
2367
2368         /* if migration HSM is allowed */
2369         if (!mdt->mdt_opts.mo_migrate_hsm_allowed) {
2370                 ma->ma_need = MA_HSM;
2371                 ma->ma_valid = 0;
2372                 rc = mdt_attr_get_complex(info, sobj, ma);
2373                 if (rc)
2374                         GOTO(unlock_links, rc);
2375
2376                 if ((ma->ma_valid & MA_HSM) && ma->ma_hsm.mh_flags != 0)
2377                         GOTO(unlock_links, rc = -EOPNOTSUPP);
2378         }
2379
2380         /* end lease and close file for regular file */
2381         if (info->mti_spec.sp_migrate_close) {
2382                 /* try to hold open_sem so that nobody else can open the file */
2383                 if (!down_write_trylock(&sobj->mot_open_sem)) {
2384                         /* close anyway */
2385                         mdd_migrate_close(info, sobj);
2386                         GOTO(unlock_links, rc = -EBUSY);
2387                 } else {
2388                         open_sem_locked = true;
2389                         rc = mdd_migrate_close(info, sobj);
2390                         if (rc)
2391                                 GOTO(unlock_open_sem, rc);
2392                 }
2393         }
2394
2395         /* lock source */
2396         lhs = &info->mti_lh[MDT_LH_OLD];
2397         mdt_lock_reg_init(lhs, LCK_EX);
2398         rc = mdt_migrate_object_lock(info, spobj, sobj, lhs, seinfo,
2399                                      &child_slave_locks);
2400         if (rc)
2401                 GOTO(unlock_open_sem, rc);
2402
2403         /* lock target */
2404         tobj = mdt_object_find(env, mdt, rr->rr_fid2);
2405         if (IS_ERR(tobj))
2406                 GOTO(unlock_source, rc = PTR_ERR(tobj));
2407
2408         lht = &info->mti_lh[MDT_LH_NEW];
2409         mdt_lock_reg_init(lht, LCK_EX);
2410         rc = mdt_reint_object_lock(info, tobj, lht, MDS_INODELOCK_FULL, true);
2411         if (rc)
2412                 GOTO(put_target, rc);
2413
2414         /* Don't do lookup sanity check. We know name doesn't exist. */
2415         info->mti_spec.sp_cr_lookup = 0;
2416         info->mti_spec.sp_feat = &dt_directory_features;
2417
2418         rc = mdo_migrate(env, mdt_object_child(pobj),
2419                          mdt_object_child(sobj), &rr->rr_name,
2420                          mdt_object_child(tobj),
2421                          &info->mti_spec, ma);
2422         if (!rc)
2423                 lprocfs_counter_incr(mdt->mdt_lu_dev.ld_obd->obd_md_stats,
2424                                      LPROC_MDT_MIGRATE + LPROC_MD_LAST_OPC);
2425         EXIT;
2426
2427         mdt_object_unlock(info, tobj, lht, rc);
2428 put_target:
2429         mdt_object_put(env, tobj);
2430 unlock_source:
2431         mdt_migrate_object_unlock(info, sobj, lhs, seinfo,
2432                                   &child_slave_locks, rc);
2433 unlock_open_sem:
2434         if (open_sem_locked)
2435                 up_write(&sobj->mot_open_sem);
2436 unlock_links:
2437         /* if we've got too many locks to save into RPC,
2438          * then just commit before the locks are released
2439          */
2440         if (!rc && do_sync)
2441                 mdt_device_sync(env, mdt);
2442         mdt_unlock_list(info, &link_locks, do_sync ? 1 : rc);
2443 put_source:
2444         mdt_object_put(env, sobj);
2445         mdt_object_put(env, spobj);
2446 unlock_parent:
2447         mdt_migrate_object_unlock(info, pobj, lhp, peinfo,
2448                                   &parent_slave_locks, rc);
2449 put_parent:
2450         mdt_object_put(env, pobj);
2451 unlock_rename:
2452         if (lustre_handle_is_used(&rename_lh))
2453                 mdt_rename_unlock(&rename_lh);
2454
2455         return rc;
2456 }
2457
2458 static int mdt_object_lock_save(struct mdt_thread_info *info,
2459                                 struct mdt_object *dir,
2460                                 struct mdt_lock_handle *lh,
2461                                 int idx, bool cos_incompat)
2462 {
2463         int rc;
2464
2465         /* we lock the target dir if it is local */
2466         rc = mdt_reint_object_lock(info, dir, lh, MDS_INODELOCK_UPDATE,
2467                                    cos_incompat);
2468         if (rc != 0)
2469                 return rc;
2470
2471         /* get and save correct version after locking */
2472         mdt_version_get_save(info, dir, idx);
2473         return 0;
2474 }
2475
2476 /*
2477  * determine lock order of sobj and tobj
2478  *
2479  * there are two situations we need to lock tobj before sobj:
2480  * 1. sobj is child of tobj
2481  * 2. sobj and tobj are stripes of a directory, and stripe index of sobj is
2482  *    larger than that of tobj
2483  *
2484  * \retval      1 lock tobj before sobj
2485  * \retval      0 lock sobj before tobj
2486  * \retval      -ev negative errno upon error
2487  */
2488 static int mdt_rename_determine_lock_order(struct mdt_thread_info *info,
2489                                            struct mdt_object *sobj,
2490                                            struct mdt_object *tobj)
2491 {
2492         struct md_attr *ma = &info->mti_attr;
2493         struct lu_fid *spfid = &info->mti_tmp_fid1;
2494         struct lu_fid *tpfid = &info->mti_tmp_fid2;
2495         struct lmv_mds_md_v1 *lmv;
2496         __u32 sindex;
2497         __u32 tindex;
2498         int rc;
2499
2500         /* sobj and tobj are the same */
2501         if (sobj == tobj)
2502                 return 0;
2503
2504         if (fid_is_root(mdt_object_fid(sobj)))
2505                 return 0;
2506
2507         if (fid_is_root(mdt_object_fid(tobj)))
2508                 return 1;
2509
2510         /* check whether sobj is child of tobj */
2511         rc = mdo_is_subdir(info->mti_env, mdt_object_child(sobj),
2512                            mdt_object_fid(tobj));
2513         if (rc < 0)
2514                 return rc;
2515
2516         if (rc == 1)
2517                 return 1;
2518
2519         /* check whether sobj and tobj are children of the same parent */
2520         rc = mdt_attr_get_pfid(info, sobj, spfid);
2521         if (rc)
2522                 return rc;
2523
2524         rc = mdt_attr_get_pfid(info, tobj, tpfid);
2525         if (rc)
2526                 return rc;
2527
2528         if (!lu_fid_eq(spfid, tpfid))
2529                 return 0;
2530
2531         /* check whether sobj and tobj are sibling stripes */
2532         rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2533         if (rc)
2534                 return rc;
2535
2536         if (!(ma->ma_valid & MA_LMV))
2537                 return 0;
2538
2539         lmv = &ma->ma_lmv->lmv_md_v1;
2540         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2541                 return 0;
2542         sindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2543
2544         ma->ma_valid = 0;
2545         rc = mdt_stripe_get(info, tobj, ma, XATTR_NAME_LMV);
2546         if (rc)
2547                 return rc;
2548
2549         if (!(ma->ma_valid & MA_LMV))
2550                 return -ENODATA;
2551
2552         lmv = &ma->ma_lmv->lmv_md_v1;
2553         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2554                 return -EINVAL;
2555         tindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2556
2557         /* check stripe index of sobj and tobj */
2558         if (sindex == tindex)
2559                 return -EINVAL;
2560
2561         return sindex < tindex ? 0 : 1;
2562 }
2563
2564 /*
2565  * lock rename source object.
2566  *
2567  * Both source and source parent may be remote, and source may be a remote
2568  * object on source parent, to avoid overriding lock handle, store remote
2569  * LOOKUP lock separately in @lhr.
2570  *
2571  * \retval      0 on success
2572  * \retval      -ev negative errno upon error
2573  */
2574 static int mdt_rename_source_lock(struct mdt_thread_info *info,
2575                                   struct mdt_object *parent,
2576                                   struct mdt_object *child,
2577                                   struct mdt_lock_handle *lhc,
2578                                   struct mdt_lock_handle *lhr,
2579                                   __u64 ibits,
2580                                   bool cos_incompat)
2581 {
2582         int rc;
2583
2584         rc = mdt_is_remote_object(info, parent, child);
2585         if (rc < 0)
2586                 return rc;
2587
2588         if (rc) {
2589                 /* enqueue remote LOOKUP lock from the parent MDT */
2590                 __u64 rmt_ibits = MDS_INODELOCK_LOOKUP;
2591
2592                 if (mdt_object_remote(parent)) {
2593                         rc = mdt_remote_object_lock(info, parent,
2594                                                     mdt_object_fid(child),
2595                                                     &lhr->mlh_rreg_lh,
2596                                                     lhr->mlh_rreg_mode,
2597                                                     rmt_ibits, false);
2598                         if (rc != ELDLM_OK)
2599                                 return rc;
2600                 } else {
2601                         LASSERT(mdt_object_remote(child));
2602                         rc = mdt_object_local_lock(info, child, lhr,
2603                                                    &rmt_ibits, 0, true);
2604                         if (rc < 0)
2605                                 return rc;
2606                 }
2607
2608                 ibits &= ~MDS_INODELOCK_LOOKUP;
2609         }
2610
2611         if (mdt_object_remote(child)) {
2612                 rc = mdt_remote_object_lock(info, child, mdt_object_fid(child),
2613                                             &lhc->mlh_rreg_lh,
2614                                             lhc->mlh_rreg_mode,
2615                                             ibits, false);
2616                 if (rc == ELDLM_OK)
2617                         rc = 0;
2618         } else {
2619                 rc = mdt_reint_object_lock(info, child, lhc, ibits,
2620                                            cos_incompat);
2621         }
2622
2623         if (!rc)
2624                 mdt_object_unlock(info, child, lhr, rc);
2625
2626         return rc;
2627 }
2628
2629 /* Helper function for mdt_reint_rename so we don't need to opencode
2630  * two different order lockings
2631  */
2632 static int mdt_lock_two_dirs(struct mdt_thread_info *info,
2633                              struct mdt_object *mfirstdir,
2634                              struct mdt_lock_handle *lh_firstdirp,
2635                              struct mdt_object *mseconddir,
2636                              struct mdt_lock_handle *lh_seconddirp,
2637                              bool cos_incompat)
2638 {
2639         int rc;
2640
2641         rc = mdt_object_lock_save(info, mfirstdir, lh_firstdirp, 0,
2642                                   cos_incompat);
2643         if (rc)
2644                 return rc;
2645
2646         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
2647
2648         if (mfirstdir != mseconddir) {
2649                 rc = mdt_object_lock_save(info, mseconddir, lh_seconddirp, 1,
2650                                           cos_incompat);
2651         } else if (!mdt_object_remote(mseconddir) &&
2652                    lh_firstdirp->mlh_pdo_hash !=
2653                    lh_seconddirp->mlh_pdo_hash) {
2654                 rc = mdt_pdir_hash_lock(info, lh_seconddirp, mseconddir,
2655                                         MDS_INODELOCK_UPDATE,
2656                                         cos_incompat);
2657                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
2658         }
2659
2660         if (rc != 0)
2661                 mdt_object_unlock(info, mfirstdir, lh_firstdirp, rc);
2662
2663         return rc;
2664 }
2665
2666 /*
2667  * VBR: rename versions in reply: 0 - srcdir parent; 1 - tgtdir parent;
2668  * 2 - srcdir child; 3 - tgtdir child.
2669  * Update on disk version of srcdir child.
2670  */
2671 static int mdt_reint_rename(struct mdt_thread_info *info,
2672                             struct mdt_lock_handle *unused)
2673 {
2674         struct mdt_device *mdt = info->mti_mdt;
2675         struct mdt_reint_record *rr = &info->mti_rr;
2676         struct md_attr *ma = &info->mti_attr;
2677         struct ptlrpc_request *req = mdt_info_req(info);
2678         struct mdt_object *msrcdir = NULL;
2679         struct mdt_object *mtgtdir = NULL;
2680         struct mdt_object *mold;
2681         struct mdt_object *mnew = NULL;
2682         struct lustre_handle rename_lh = { 0 };
2683         struct mdt_lock_handle *lh_srcdirp;
2684         struct mdt_lock_handle *lh_tgtdirp;
2685         struct mdt_lock_handle *lh_oldp = NULL;
2686         struct mdt_lock_handle *lh_rmt = NULL;
2687         struct mdt_lock_handle *lh_newp = NULL;
2688         struct lu_fid *old_fid = &info->mti_tmp_fid1;
2689         struct lu_fid *new_fid = &info->mti_tmp_fid2;
2690         __u64 lock_ibits;
2691         bool reverse = false, discard = false;
2692         bool cos_incompat;
2693         ktime_t kstart = ktime_get();
2694         enum mdt_stat_idx msi = 0;
2695         int rc;
2696
2697         ENTRY;
2698         DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME,
2699                   PFID(rr->rr_fid1), PNAME(&rr->rr_name),
2700                   PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name));
2701
2702         if (info->mti_dlm_req)
2703                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2704
2705         if (!fid_is_md_operative(rr->rr_fid1) ||
2706             !fid_is_md_operative(rr->rr_fid2))
2707                 RETURN(-EPERM);
2708
2709         /* find both parents. */
2710         msrcdir = mdt_parent_find_check(info, rr->rr_fid1, 0);
2711         if (IS_ERR(msrcdir))
2712                 RETURN(PTR_ERR(msrcdir));
2713
2714         rc = mdt_check_enc(info, msrcdir);
2715         if (rc)
2716                 GOTO(out_put_srcdir, rc);
2717
2718         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
2719
2720         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
2721                 mtgtdir = msrcdir;
2722                 mdt_object_get(info->mti_env, mtgtdir);
2723         } else {
2724                 mtgtdir = mdt_parent_find_check(info, rr->rr_fid2, 1);
2725                 if (IS_ERR(mtgtdir))
2726                         GOTO(out_put_srcdir, rc = PTR_ERR(mtgtdir));
2727         }
2728
2729         rc = mdt_check_enc(info, mtgtdir);
2730         if (rc)
2731                 GOTO(out_put_tgtdir, rc);
2732
2733         /*
2734          * Note: do not enqueue rename lock for replay request, because
2735          * if other MDT holds rename lock, but being blocked to wait for
2736          * this MDT to finish its recovery, and the failover MDT can not
2737          * get rename lock, which will cause deadlock.
2738          */
2739         if (!req_is_replay(req)) {
2740                 bool remote = mdt_object_remote(msrcdir);
2741
2742                 /*
2743                  * Normally rename RPC is handled on the MDT with the target
2744                  * directory (if target exists, it's on the MDT with the
2745                  * target), if the source directory is remote, it's a hint that
2746                  * source is remote too (this may not be true, but it won't
2747                  * cause any issue), return -EXDEV early to avoid taking
2748                  * rename_lock.
2749                  */
2750                 if (!mdt->mdt_enable_remote_rename && remote)
2751                         GOTO(out_put_tgtdir, rc = -EXDEV);
2752
2753                 /* This might be further relaxed in the future for regular file
2754                  * renames in different source and target parents. Start with
2755                  * only same-directory renames for simplicity and because this
2756                  * is by far the most the common use case.
2757                  *
2758                  * Striped directories should be considered "remote".
2759                  */
2760                 if (msrcdir != mtgtdir || remote ||
2761                     (S_ISDIR(ma->ma_attr.la_mode) &&
2762                      !mdt->mdt_enable_parallel_rename_dir) ||
2763                     (!S_ISDIR(ma->ma_attr.la_mode) &&
2764                      !mdt->mdt_enable_parallel_rename_file)) {
2765                         rc = mdt_rename_lock(info, &rename_lh);
2766                         if (rc != 0) {
2767                                 CERROR("%s: cannot lock for rename: rc = %d\n",
2768                                        mdt_obd_name(mdt), rc);
2769                                 GOTO(out_put_tgtdir, rc);
2770                         }
2771                 } else {
2772                         if (S_ISDIR(ma->ma_attr.la_mode))
2773                                 msi = LPROC_MDT_RENAME_PAR_DIR;
2774                         else
2775                                 msi = LPROC_MDT_RENAME_PAR_FILE;
2776
2777                         CDEBUG(D_INFO,
2778                                "%s: samedir parallel rename "DFID"/"DNAME"\n",
2779                                mdt_obd_name(mdt), PFID(rr->rr_fid1),
2780                                PNAME(&rr->rr_name));
2781                 }
2782         }
2783
2784         rc = mdt_rename_determine_lock_order(info, msrcdir, mtgtdir);
2785         if (rc < 0)
2786                 GOTO(out_unlock_rename, rc);
2787         reverse = rc;
2788
2789         /* source needs to be looked up after locking source parent, otherwise
2790          * this rename may race with unlink source, and cause rename hang, see
2791          * sanityn.sh 55b, so check parents first, if later we found source is
2792          * remote, relock parents.
2793          */
2794         cos_incompat = (mdt_object_remote(msrcdir) ||
2795                         mdt_object_remote(mtgtdir));
2796
2797         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2798
2799         /* lock parents in the proper order. */
2800         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
2801         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
2802
2803         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN);
2804         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN2);
2805 relock:
2806         mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name);
2807         mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name);
2808
2809         /* In case of same dir local rename we must sort by the hash,
2810          * otherwise a lock deadlock is possible when renaming
2811          * a to b and b to a at the same time LU-15285
2812          */
2813         if (!mdt_object_remote(mtgtdir) && mtgtdir == msrcdir)
2814                 reverse = lh_srcdirp->mlh_pdo_hash > lh_tgtdirp->mlh_pdo_hash;
2815         if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK)))
2816                 reverse = 0;
2817
2818         if (reverse)
2819                 rc = mdt_lock_two_dirs(info, mtgtdir, lh_tgtdirp, msrcdir,
2820                                        lh_srcdirp, cos_incompat);
2821         else
2822                 rc = mdt_lock_two_dirs(info, msrcdir, lh_srcdirp, mtgtdir,
2823                                        lh_tgtdirp, cos_incompat);
2824
2825         if (rc != 0)
2826                 GOTO(out_unlock_rename, rc);
2827
2828         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2829         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME2, 5);
2830
2831         /* find mold object. */
2832         fid_zero(old_fid);
2833         rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2);
2834         if (rc != 0)
2835                 GOTO(out_unlock_parents, rc);
2836
2837         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
2838                 GOTO(out_unlock_parents, rc = -EINVAL);
2839
2840         if (!fid_is_md_operative(old_fid))
2841                 GOTO(out_unlock_parents, rc = -EPERM);
2842
2843         mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
2844         if (IS_ERR(mold))
2845                 GOTO(out_unlock_parents, rc = PTR_ERR(mold));
2846
2847         if (!mdt_object_exists(mold)) {
2848                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2849                                 &mold->mot_obj,
2850                                 "object does not exist");
2851                 GOTO(out_put_old, rc = -ENOENT);
2852         }
2853
2854         if (mdt_object_remote(mold) && !mdt->mdt_enable_remote_rename)
2855                 GOTO(out_put_old, rc = -EXDEV);
2856
2857         /* Check if @mtgtdir is subdir of @mold, before locking child
2858          * to avoid reverse locking.
2859          */
2860         if (mtgtdir != msrcdir) {
2861                 rc = mdo_is_subdir(info->mti_env, mdt_object_child(mtgtdir),
2862                                    old_fid);
2863                 if (rc) {
2864                         if (rc == 1)
2865                                 rc = -EINVAL;
2866                         GOTO(out_put_old, rc);
2867                 }
2868         }
2869
2870         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold));
2871         /* save version after locking */
2872         mdt_version_get_save(info, mold, 2);
2873
2874         if (!cos_incompat && mdt_object_remote(mold)) {
2875                 cos_incompat = true;
2876                 mdt_object_put(info->mti_env, mold);
2877                 mdt_object_unlock(info, mtgtdir, lh_tgtdirp, -EAGAIN);
2878                 mdt_object_unlock(info, msrcdir, lh_srcdirp, -EAGAIN);
2879                 goto relock;
2880         }
2881
2882         /* find mnew object:
2883          * mnew target object may not exist now
2884          * lookup with version checking
2885          */
2886         fid_zero(new_fid);
2887         rc = mdt_lookup_version_check(info, mtgtdir, &rr->rr_tgt_name, new_fid,
2888                                       3);
2889         if (rc == 0) {
2890                 /* the new_fid should have been filled at this moment */
2891                 if (lu_fid_eq(old_fid, new_fid))
2892                         GOTO(out_put_old, rc);
2893
2894                 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
2895                     lu_fid_eq(new_fid, rr->rr_fid2))
2896                         GOTO(out_put_old, rc = -EINVAL);
2897
2898                 if (!fid_is_md_operative(new_fid))
2899                         GOTO(out_put_old, rc = -EPERM);
2900
2901                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
2902                 if (IS_ERR(mnew))
2903                         GOTO(out_put_old, rc = PTR_ERR(mnew));
2904
2905                 if (!mdt_object_exists(mnew)) {
2906                         LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2907                                         &mnew->mot_obj,
2908                                         "object does not exist");
2909                         GOTO(out_put_new, rc = -ENOENT);
2910                 }
2911
2912                 if (mdt_object_remote(mnew)) {
2913                         struct mdt_body  *repbody;
2914
2915                         /* Always send rename req to the target child MDT */
2916                         repbody = req_capsule_server_get(info->mti_pill,
2917                                                          &RMF_MDT_BODY);
2918                         LASSERT(repbody != NULL);
2919                         repbody->mbo_fid1 = *new_fid;
2920                         repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
2921                         GOTO(out_put_new, rc = -EXDEV);
2922                 }
2923                 /* Before locking the target dir, check we do not replace
2924                  * a dir with a non-dir, otherwise it may deadlock with
2925                  * link op which tries to create a link in this dir
2926                  * back to this non-dir.
2927                  */
2928                 if (S_ISDIR(lu_object_attr(&mnew->mot_obj)) &&
2929                     !S_ISDIR(lu_object_attr(&mold->mot_obj)))
2930                         GOTO(out_put_new, rc = -EISDIR);
2931
2932                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2933                 lh_rmt = &info->mti_lh[MDT_LH_RMT];
2934                 mdt_lock_reg_init(lh_oldp, LCK_EX);
2935                 mdt_lock_reg_init(lh_rmt, LCK_EX);
2936                 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
2937                 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2938                                             lh_rmt, lock_ibits, cos_incompat);
2939                 if (rc < 0)
2940                         GOTO(out_put_new, rc);
2941
2942                 /* Check if @msrcdir is subdir of @mnew, before locking child
2943                  * to avoid reverse locking.
2944                  */
2945                 if (mtgtdir != msrcdir) {
2946                         rc = mdo_is_subdir(info->mti_env,
2947                                            mdt_object_child(msrcdir), new_fid);
2948                         if (rc) {
2949                                 if (rc == 1)
2950                                         rc = -EINVAL;
2951                                 GOTO(out_unlock_old, rc);
2952                         }
2953                 }
2954
2955                 /* We used to acquire MDS_INODELOCK_FULL here but we
2956                  * can't do this now because a running HSM restore on
2957                  * the rename onto victim will hold the layout
2958                  * lock. See LU-4002.
2959                  */
2960
2961                 lh_newp = &info->mti_lh[MDT_LH_NEW];
2962                 mdt_lock_reg_init(lh_newp, LCK_EX);
2963                 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
2964                 if (mdt_object_remote(mtgtdir)) {
2965                         rc = mdt_remote_object_lock(info, mtgtdir,
2966                                                     mdt_object_fid(mnew),
2967                                                     &lh_newp->mlh_rreg_lh,
2968                                                     lh_newp->mlh_rreg_mode,
2969                                                     MDS_INODELOCK_LOOKUP,
2970                                                     false);
2971                         if (rc != ELDLM_OK)
2972                                 GOTO(out_unlock_old, rc);
2973
2974                         lock_ibits &= ~MDS_INODELOCK_LOOKUP;
2975                 }
2976                 rc = mdt_reint_object_lock(info, mnew, lh_newp, lock_ibits,
2977                                            cos_incompat);
2978                 if (rc != 0)
2979                         GOTO(out_unlock_new, rc);
2980
2981                 /* get and save version after locking */
2982                 mdt_version_get_save(info, mnew, 3);
2983         } else if (rc != -ENOENT) {
2984                 GOTO(out_put_old, rc);
2985         } else {
2986                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2987                 lh_rmt = &info->mti_lh[MDT_LH_RMT];
2988                 mdt_lock_reg_init(lh_oldp, LCK_EX);
2989                 mdt_lock_reg_init(lh_rmt, LCK_EX);
2990                 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
2991                 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2992                                             lh_rmt, lock_ibits, cos_incompat);
2993                 if (rc != 0)
2994                         GOTO(out_put_old, rc);
2995
2996                 mdt_enoent_version_save(info, 3);
2997         }
2998
2999         /* step 5: rename it */
3000         mdt_reint_init_ma(info, ma);
3001
3002         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
3003                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
3004
3005         if (mnew != NULL)
3006                 mutex_lock(&mnew->mot_lov_mutex);
3007
3008         rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
3009                         mdt_object_child(mtgtdir), old_fid, &rr->rr_name,
3010                         mnew != NULL ? mdt_object_child(mnew) : NULL,
3011                         &rr->rr_tgt_name, ma);
3012
3013         if (mnew != NULL)
3014                 mutex_unlock(&mnew->mot_lov_mutex);
3015
3016         /* handle last link of tgt object */
3017         if (rc == 0) {
3018                 if (mnew) {
3019                         mdt_handle_last_unlink(info, mnew, ma);
3020                         discard = mdt_dom_check_for_discard(info, mnew);
3021                 }
3022                 mdt_rename_counter_tally(info, info->mti_mdt, req,
3023                                          msrcdir, mtgtdir, msi,
3024                                          ktime_us_delta(ktime_get(), kstart));
3025         }
3026
3027         EXIT;
3028 out_unlock_new:
3029         if (mnew != NULL)
3030                 mdt_object_unlock(info, mnew, lh_newp, rc);
3031 out_unlock_old:
3032         mdt_object_unlock(info, NULL, lh_rmt, rc);
3033         mdt_object_unlock(info, mold, lh_oldp, rc);
3034 out_put_new:
3035         if (mnew && !discard)
3036                 mdt_object_put(info->mti_env, mnew);
3037 out_put_old:
3038         mdt_object_put(info->mti_env, mold);
3039 out_unlock_parents:
3040         mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
3041         mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
3042 out_unlock_rename:
3043         if (lustre_handle_is_used(&rename_lh))
3044                 mdt_rename_unlock(&rename_lh);
3045 out_put_tgtdir:
3046         mdt_object_put(info->mti_env, mtgtdir);
3047 out_put_srcdir:
3048         mdt_object_put(info->mti_env, msrcdir);
3049
3050         /* The DoM discard can be done right in the place above where it is
3051          * assigned, meanwhile it is done here after rename unlock due to
3052          * compatibility with old clients, for them the discard blocks
3053          * the main thread until completion. Check LU-11359 for details.
3054          */
3055         if (discard) {
3056                 mdt_dom_discard_data(info, mnew);
3057                 mdt_object_put(info->mti_env, mnew);
3058         }
3059         OBD_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
3060         return rc;
3061 }
3062
3063 static int mdt_reint_resync(struct mdt_thread_info *info,
3064                             struct mdt_lock_handle *lhc)
3065 {
3066         struct mdt_reint_record *rr = &info->mti_rr;
3067         struct ptlrpc_request *req = mdt_info_req(info);
3068         struct md_attr *ma = &info->mti_attr;
3069         struct mdt_object *mo;
3070         struct ldlm_lock *lease;
3071         struct mdt_body *repbody;
3072         struct md_layout_change layout = { .mlc_mirror_id = rr->rr_mirror_id };
3073         bool lease_broken;
3074         int rc;
3075
3076         ENTRY;
3077         DEBUG_REQ(D_INODE, req, DFID", FLR file resync", PFID(rr->rr_fid1));
3078
3079         if (info->mti_dlm_req)
3080                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
3081
3082         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
3083         if (IS_ERR(mo))
3084                 GOTO(out, rc = PTR_ERR(mo));
3085
3086         if (!mdt_object_exists(mo))
3087                 GOTO(out_obj, rc = -ENOENT);
3088
3089         if (!S_ISREG(lu_object_attr(&mo->mot_obj)))
3090                 GOTO(out_obj, rc = -EINVAL);
3091
3092         if (mdt_object_remote(mo))
3093                 GOTO(out_obj, rc = -EREMOTE);
3094
3095         lease = ldlm_handle2lock(rr->rr_lease_handle);
3096         if (lease == NULL)
3097                 GOTO(out_obj, rc = -ESTALE);
3098
3099         /* It's really necessary to grab open_sem and check if the lease lock
3100          * has been lost. There would exist a concurrent writer coming in and
3101          * generating some dirty data in memory cache, the writeback would fail
3102          * after the layout version is increased by MDS_REINT_RESYNC RPC.
3103          */
3104         if (!down_write_trylock(&mo->mot_open_sem))
3105                 GOTO(out_put_lease, rc = -EBUSY);
3106
3107         lock_res_and_lock(lease);
3108         lease_broken = ldlm_is_cancel(lease);
3109         unlock_res_and_lock(lease);
3110         if (lease_broken)
3111                 GOTO(out_unlock, rc = -EBUSY);
3112
3113         /* the file has yet opened by anyone else after we took the lease. */
3114         layout.mlc_opc = MD_LAYOUT_RESYNC;
3115         lhc = &info->mti_lh[MDT_LH_LOCAL];
3116         rc = mdt_layout_change(info, mo, lhc, &layout);
3117         if (rc)
3118                 GOTO(out_unlock, rc);
3119
3120         mdt_object_unlock(info, mo, lhc, 0);
3121
3122         ma->ma_need = MA_INODE;
3123         ma->ma_valid = 0;
3124         rc = mdt_attr_get_complex(info, mo, ma);
3125         if (rc != 0)
3126                 GOTO(out_unlock, rc);
3127
3128         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
3129         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
3130
3131         EXIT;
3132 out_unlock:
3133         up_write(&mo->mot_open_sem);
3134 out_put_lease:
3135         LDLM_LOCK_PUT(lease);
3136 out_obj:
3137         mdt_object_put(info->mti_env, mo);
3138 out:
3139         mdt_client_compatibility(info);
3140         return rc;
3141 }
3142
3143 struct mdt_reinter {
3144         int (*mr_handler)(struct mdt_thread_info *, struct mdt_lock_handle *);
3145         enum lprocfs_extra_opc mr_extra_opc;
3146 };
3147
3148 static const struct mdt_reinter mdt_reinters[] = {
3149         [REINT_SETATTR] = {
3150                 .mr_handler = &mdt_reint_setattr,
3151                 .mr_extra_opc = MDS_REINT_SETATTR,
3152         },
3153         [REINT_CREATE] = {
3154                 .mr_handler = &mdt_reint_create,
3155                 .mr_extra_opc = MDS_REINT_CREATE,
3156         },
3157         [REINT_LINK] = {
3158                 .mr_handler = &mdt_reint_link,
3159                 .mr_extra_opc = MDS_REINT_LINK,
3160         },
3161         [REINT_UNLINK] = {
3162                 .mr_handler = &mdt_reint_unlink,
3163                 .mr_extra_opc = MDS_REINT_UNLINK,
3164         },
3165         [REINT_RENAME] = {
3166                 .mr_handler = &mdt_reint_rename,
3167                 .mr_extra_opc = MDS_REINT_RENAME,
3168         },
3169         [REINT_OPEN] = {
3170                 .mr_handler = &mdt_reint_open,
3171                 .mr_extra_opc = MDS_REINT_OPEN,
3172         },
3173         [REINT_SETXATTR] = {
3174                 .mr_handler = &mdt_reint_setxattr,
3175                 .mr_extra_opc = MDS_REINT_SETXATTR,
3176         },
3177         [REINT_RMENTRY] = {
3178                 .mr_handler = &mdt_reint_unlink,
3179                 .mr_extra_opc = MDS_REINT_UNLINK,
3180         },
3181         [REINT_MIGRATE] = {
3182                 .mr_handler = &mdt_reint_migrate,
3183                 .mr_extra_opc = MDS_REINT_RENAME,
3184         },
3185         [REINT_RESYNC] = {
3186                 .mr_handler = &mdt_reint_resync,
3187                 .mr_extra_opc = MDS_REINT_RESYNC,
3188         },
3189 };
3190
3191 int mdt_reint_rec(struct mdt_thread_info *info,
3192                   struct mdt_lock_handle *lhc)
3193 {
3194         const struct mdt_reinter *mr;
3195         int rc;
3196
3197         ENTRY;
3198         if (!(info->mti_rr.rr_opcode < ARRAY_SIZE(mdt_reinters)))
3199                 RETURN(-EPROTO);
3200
3201         mr = &mdt_reinters[info->mti_rr.rr_opcode];
3202         if (mr->mr_handler == NULL)
3203                 RETURN(-EPROTO);
3204
3205         rc = (*mr->mr_handler)(info, lhc);
3206
3207         lprocfs_counter_incr(ptlrpc_req2svc(mdt_info_req(info))->srv_stats,
3208                              PTLRPC_LAST_CNTR + mr->mr_extra_opc);
3209
3210         RETURN(rc);
3211 }