Whamcloud - gitweb
LU-15787 sec: block enc unaware clients on enc files
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mdt/mdt_reint.c
32  *
33  * Lustre Metadata Target (mdt) reintegration routines
34  *
35  * Author: Peter Braam <braam@clusterfs.com>
36  * Author: Andreas Dilger <adilger@clusterfs.com>
37  * Author: Phil Schwan <phil@clusterfs.com>
38  * Author: Huang Hua <huanghua@clusterfs.com>
39  * Author: Yury Umanets <umka@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_MDS
43
44 #include <lprocfs_status.h>
45 #include "mdt_internal.h"
46 #include <lustre_lmv.h>
47 #include <lustre_crypto.h>
48
49 static inline void mdt_reint_init_ma(struct mdt_thread_info *info,
50                                      struct md_attr *ma)
51 {
52         ma->ma_need = MA_INODE;
53         ma->ma_valid = 0;
54 }
55
56 /**
57  * Get version of object by fid.
58  *
59  * Return real version or ENOENT_VERSION if object doesn't exist
60  */
61 static void mdt_obj_version_get(struct mdt_thread_info *info,
62                                 struct mdt_object *o, __u64 *version)
63 {
64         LASSERT(o);
65
66         if (mdt_object_exists(o) && !mdt_object_remote(o) &&
67             !fid_is_obf(mdt_object_fid(o)))
68                 *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
69         else
70                 *version = ENOENT_VERSION;
71         CDEBUG(D_INODE, "FID "DFID" version is %#llx\n",
72                PFID(mdt_object_fid(o)), *version);
73 }
74
75 /**
76  * Check version is correct.
77  *
78  * Should be called only during replay.
79  */
80 static int mdt_version_check(struct ptlrpc_request *req,
81                              __u64 version, int idx)
82 {
83         __u64 *pre_ver = lustre_msg_get_versions(req->rq_reqmsg);
84
85         ENTRY;
86         if (!exp_connect_vbr(req->rq_export))
87                 RETURN(0);
88
89         LASSERT(req_is_replay(req));
90         /** VBR: version is checked always because costs nothing */
91         LASSERT(idx < PTLRPC_NUM_VERSIONS);
92         /** Sanity check for malformed buffers */
93         if (pre_ver == NULL) {
94                 CERROR("No versions in request buffer\n");
95                 spin_lock(&req->rq_export->exp_lock);
96                 req->rq_export->exp_vbr_failed = 1;
97                 spin_unlock(&req->rq_export->exp_lock);
98                 RETURN(-EOVERFLOW);
99         } else if (pre_ver[idx] != version) {
100                 CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n",
101                        pre_ver[idx], version);
102                 spin_lock(&req->rq_export->exp_lock);
103                 req->rq_export->exp_vbr_failed = 1;
104                 spin_unlock(&req->rq_export->exp_lock);
105                 RETURN(-EOVERFLOW);
106         }
107         RETURN(0);
108 }
109
110 /**
111  * Save pre-versions in reply.
112  */
113 static void mdt_version_save(struct ptlrpc_request *req, __u64 version,
114                              int idx)
115 {
116         __u64 *reply_ver;
117
118         if (!exp_connect_vbr(req->rq_export))
119                 return;
120
121         LASSERT(!req_is_replay(req));
122         LASSERT(req->rq_repmsg != NULL);
123         reply_ver = lustre_msg_get_versions(req->rq_repmsg);
124         if (reply_ver)
125                 reply_ver[idx] = version;
126 }
127
128 /**
129  * Save enoent version, it is needed when it is obvious that object doesn't
130  * exist, e.g. child during create.
131  */
132 static void mdt_enoent_version_save(struct mdt_thread_info *info, int idx)
133 {
134         /* save version of file name for replay, it must be ENOENT here */
135         if (!req_is_replay(mdt_info_req(info))) {
136                 info->mti_ver[idx] = ENOENT_VERSION;
137                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
138         }
139 }
140
141 /**
142  * Get version from disk and save in reply buffer.
143  *
144  * Versions are saved in reply only during normal operations not replays.
145  */
146 void mdt_version_get_save(struct mdt_thread_info *info,
147                           struct mdt_object *mto, int idx)
148 {
149         /* don't save versions during replay */
150         if (!req_is_replay(mdt_info_req(info))) {
151                 mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
152                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
153         }
154 }
155
156 /**
157  * Get version from disk and check it, no save in reply.
158  */
159 int mdt_version_get_check(struct mdt_thread_info *info,
160                           struct mdt_object *mto, int idx)
161 {
162         /* only check versions during replay */
163         if (!req_is_replay(mdt_info_req(info)))
164                 return 0;
165
166         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
167         return mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
168 }
169
170 /**
171  * Get version from disk and check if recovery or just save.
172  */
173 int mdt_version_get_check_save(struct mdt_thread_info *info,
174                                struct mdt_object *mto, int idx)
175 {
176         int rc = 0;
177
178         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
179         if (req_is_replay(mdt_info_req(info)))
180                 rc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx],
181                                        idx);
182         else
183                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
184         return rc;
185 }
186
187 /**
188  * Lookup with version checking.
189  *
190  * This checks version of 'name'. Many reint functions uses 'name' for child not
191  * FID, therefore we need to get object by name and check its version.
192  */
193 int mdt_lookup_version_check(struct mdt_thread_info *info,
194                              struct mdt_object *p,
195                              const struct lu_name *lname,
196                              struct lu_fid *fid, int idx)
197 {
198         int rc, vbrc;
199
200         rc = mdo_lookup(info->mti_env, mdt_object_child(p), lname, fid,
201                         &info->mti_spec);
202         /* Check version only during replay */
203         if (!req_is_replay(mdt_info_req(info)))
204                 return rc;
205
206         info->mti_ver[idx] = ENOENT_VERSION;
207         if (rc == 0) {
208                 struct mdt_object *child;
209
210                 child = mdt_object_find(info->mti_env, info->mti_mdt, fid);
211                 if (likely(!IS_ERR(child))) {
212                         mdt_obj_version_get(info, child, &info->mti_ver[idx]);
213                         mdt_object_put(info->mti_env, child);
214                 }
215         }
216         vbrc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
217         return vbrc ? vbrc : rc;
218
219 }
220
221 static int mdt_unlock_slaves(struct mdt_thread_info *mti,
222                              struct mdt_object *obj,
223                              struct ldlm_enqueue_info *einfo,
224                              int decref)
225 {
226         union ldlm_policy_data *policy = &mti->mti_policy;
227         struct mdt_lock_handle *lh = &mti->mti_lh[MDT_LH_LOCAL];
228         struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
229         int i;
230
231         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
232         LASSERT(slave_locks);
233
234         memset(policy, 0, sizeof(*policy));
235         policy->l_inodebits.bits = einfo->ei_inodebits;
236         mdt_lock_handle_init(lh);
237         mdt_lock_reg_init(lh, einfo->ei_mode);
238         for (i = 0; i < slave_locks->ha_count; i++) {
239                 if (test_bit(i, (void *)slave_locks->ha_map))
240                         lh->mlh_rreg_lh = slave_locks->ha_handles[i];
241                 else
242                         lh->mlh_reg_lh = slave_locks->ha_handles[i];
243                 mdt_object_unlock(mti, NULL, lh, decref);
244                 slave_locks->ha_handles[i].cookie = 0ull;
245         }
246
247         return mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
248                                 policy);
249 }
250
251 static inline int mdt_object_striped(struct mdt_thread_info *mti,
252                                      struct mdt_object *obj)
253 {
254         struct lu_device *bottom_dev;
255         struct lu_object *bottom_obj;
256         int rc;
257
258         if (!S_ISDIR(obj->mot_header.loh_attr))
259                 return 0;
260
261         /* getxattr from bottom obj to avoid reading in shard FIDs */
262         bottom_dev = dt2lu_dev(mti->mti_mdt->mdt_bottom);
263         bottom_obj = lu_object_find_slice(mti->mti_env, bottom_dev,
264                                           mdt_object_fid(obj), NULL);
265         if (IS_ERR(bottom_obj))
266                 return PTR_ERR(bottom_obj);
267
268         rc = dt_xattr_get(mti->mti_env, lu2dt(bottom_obj), &LU_BUF_NULL,
269                           XATTR_NAME_LMV);
270         lu_object_put(mti->mti_env, bottom_obj);
271
272         return (rc > 0) ? 1 : (rc == -ENODATA) ? 0 : rc;
273 }
274
275 /**
276  * Lock slave stripes if necessary, the lock handles of slave stripes
277  * will be stored in einfo->ei_cbdata.
278  **/
279 static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
280                            enum ldlm_mode mode, __u64 ibits,
281                            struct ldlm_enqueue_info *einfo)
282 {
283         union ldlm_policy_data *policy = &mti->mti_policy;
284
285         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
286
287         einfo->ei_type = LDLM_IBITS;
288         einfo->ei_mode = mode;
289         einfo->ei_cb_bl = mdt_remote_blocking_ast;
290         einfo->ei_cb_local_bl = mdt_blocking_ast;
291         einfo->ei_cb_cp = ldlm_completion_ast;
292         einfo->ei_enq_slave = 1;
293         einfo->ei_namespace = mti->mti_mdt->mdt_namespace;
294         einfo->ei_inodebits = ibits;
295         einfo->ei_req_slot = 1;
296         memset(policy, 0, sizeof(*policy));
297         policy->l_inodebits.bits = ibits;
298
299         return mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
300                               policy);
301 }
302
303 int mdt_reint_striped_lock(struct mdt_thread_info *info,
304                            struct mdt_object *o,
305                            struct mdt_lock_handle *lh,
306                            __u64 ibits,
307                            struct ldlm_enqueue_info *einfo,
308                            bool cos_incompat)
309 {
310         int rc;
311
312         LASSERT(!mdt_object_remote(o));
313
314         memset(einfo, 0, sizeof(*einfo));
315
316         rc = mdt_reint_object_lock(info, o, lh, ibits, cos_incompat);
317         if (rc)
318                 return rc;
319
320         rc = mdt_object_striped(info, o);
321         if (rc != 1) {
322                 if (rc < 0)
323                         mdt_object_unlock(info, o, lh, rc);
324                 return rc;
325         }
326
327         rc = mdt_lock_slaves(info, o, lh->mlh_reg_mode, ibits, einfo);
328         if (rc) {
329                 mdt_object_unlock(info, o, lh, rc);
330                 if (rc == -EIO && OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME))
331                         rc = 0;
332         }
333
334         return rc;
335 }
336
337 void mdt_reint_striped_unlock(struct mdt_thread_info *info,
338                               struct mdt_object *o,
339                               struct mdt_lock_handle *lh,
340                               struct ldlm_enqueue_info *einfo, int decref)
341 {
342         if (einfo->ei_cbdata)
343                 mdt_unlock_slaves(info, o, einfo, decref);
344         mdt_object_unlock(info, o, lh, decref);
345 }
346
347 static int mdt_restripe(struct mdt_thread_info *info,
348                         struct mdt_object *parent,
349                         const struct lu_name *lname,
350                         const struct lu_fid *tfid,
351                         struct md_op_spec *spec,
352                         struct md_attr *ma)
353 {
354         struct mdt_device *mdt = info->mti_mdt;
355         struct lu_fid *fid = &info->mti_tmp_fid2;
356         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
357         struct lmv_user_md *lum = spec->u.sp_ea.eadata;
358         struct lmv_mds_md_v1 *lmv;
359         struct mdt_object *child;
360         struct mdt_lock_handle *lhp;
361         struct mdt_lock_handle *lhc;
362         struct mdt_body *repbody;
363         int rc;
364
365         ENTRY;
366         if (!mdt->mdt_enable_dir_restripe)
367                 RETURN(-EPERM);
368
369         LASSERT(lum);
370         lum->lum_hash_type |= cpu_to_le32(LMV_HASH_FLAG_FIXED);
371
372         rc = mdt_version_get_check_save(info, parent, 0);
373         if (rc)
374                 RETURN(rc);
375
376         lhp = &info->mti_lh[MDT_LH_PARENT];
377         mdt_lock_pdo_init(lhp, LCK_PW, lname);
378         rc = mdt_reint_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
379                                    true);
380         if (rc)
381                 RETURN(rc);
382
383         rc = mdt_stripe_get(info, parent, ma, XATTR_NAME_LMV);
384         if (rc)
385                 GOTO(unlock_parent, rc);
386
387         if (ma->ma_valid & MA_LMV) {
388                 /* don't allow restripe if parent dir layout is changing */
389                 lmv = &ma->ma_lmv->lmv_md_v1;
390                 if (!lmv_is_sane2(lmv))
391                         GOTO(unlock_parent, rc = -EBADF);
392
393                 if (lmv_is_layout_changing(lmv))
394                         GOTO(unlock_parent, rc = -EBUSY);
395         }
396
397         fid_zero(fid);
398         rc = mdt_lookup_version_check(info, parent, lname, fid, 1);
399         if (rc)
400                 GOTO(unlock_parent, rc);
401
402         child = mdt_object_find(info->mti_env, mdt, fid);
403         if (IS_ERR(child))
404                 GOTO(unlock_parent, rc = PTR_ERR(child));
405
406         if (!mdt_object_exists(child))
407                 GOTO(out_child, rc = -ENOENT);
408
409         if (mdt_object_remote(child)) {
410                 struct mdt_body *repbody;
411
412                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
413                 if (!repbody)
414                         GOTO(out_child, rc = -EPROTO);
415
416                 repbody->mbo_fid1 = *fid;
417                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
418                 GOTO(out_child, rc = -EREMOTE);
419         }
420
421         if (!S_ISDIR(lu_object_attr(&child->mot_obj)))
422                 GOTO(out_child, rc = -ENOTDIR);
423
424         rc = mdt_stripe_get(info, child, ma, XATTR_NAME_LMV);
425         if (rc)
426                 GOTO(out_child, rc);
427
428         /* race with migrate? */
429         if ((ma->ma_valid & MA_LMV) &&
430              lmv_is_migrating(&ma->ma_lmv->lmv_md_v1))
431                 GOTO(out_child, rc = -EBUSY);
432
433         /* lock object */
434         lhc = &info->mti_lh[MDT_LH_CHILD];
435         mdt_lock_reg_init(lhc, LCK_EX);
436
437         /* enqueue object remote LOOKUP lock */
438         if (mdt_object_remote(parent)) {
439                 rc = mdt_remote_object_lock(info, parent, fid,
440                                             &lhc->mlh_rreg_lh,
441                                             lhc->mlh_rreg_mode,
442                                             MDS_INODELOCK_LOOKUP, false);
443                 if (rc != ELDLM_OK)
444                         GOTO(out_child, rc);
445         }
446
447         rc = mdt_reint_striped_lock(info, child, lhc, MDS_INODELOCK_FULL, einfo,
448                                     true);
449         if (rc)
450                 GOTO(unlock_child, rc);
451
452         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
453         rc = mdt_version_get_check_save(info, child, 1);
454         if (rc)
455                 GOTO(unlock_child, rc);
456
457         spin_lock(&mdt->mdt_restriper.mdr_lock);
458         if (child->mot_restriping) {
459                 /* race? */
460                 spin_unlock(&mdt->mdt_restriper.mdr_lock);
461                 GOTO(unlock_child, rc = -EBUSY);
462         }
463         child->mot_restriping = 1;
464         spin_unlock(&mdt->mdt_restriper.mdr_lock);
465
466         *fid = *tfid;
467         rc = mdt_restripe_internal(info, parent, child, lname, fid, spec, ma);
468         if (rc)
469                 GOTO(restriping_clear, rc);
470
471         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
472         if (!repbody)
473                 GOTO(restriping_clear, rc = -EPROTO);
474
475         mdt_pack_attr2body(info, repbody, &ma->ma_attr, fid);
476         EXIT;
477
478 restriping_clear:
479         child->mot_restriping = 0;
480 unlock_child:
481         mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
482 out_child:
483         mdt_object_put(info->mti_env, child);
484 unlock_parent:
485         mdt_object_unlock(info, parent, lhp, rc);
486
487         return rc;
488 }
489
490 /*
491  * VBR: we save three versions in reply:
492  * 0 - parent. Check that parent version is the same during replay.
493  * 1 - name. Version of 'name' if file exists with the same name or
494  * ENOENT_VERSION, it is needed because file may appear due to missed replays.
495  * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
496  * check.
497  */
498 static int mdt_create(struct mdt_thread_info *info)
499 {
500         struct mdt_device *mdt = info->mti_mdt;
501         struct mdt_object *parent;
502         struct mdt_object *child;
503         struct mdt_lock_handle *lh;
504         struct mdt_body *repbody;
505         struct md_attr *ma = &info->mti_attr;
506         struct mdt_reint_record *rr = &info->mti_rr;
507         struct md_op_spec *spec = &info->mti_spec;
508         bool restripe = false;
509         int rc;
510
511         ENTRY;
512         DEBUG_REQ(D_INODE, mdt_info_req(info),
513                   "Create ("DNAME"->"DFID") in "DFID,
514                   PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1));
515
516         if (!fid_is_md_operative(rr->rr_fid1))
517                 RETURN(-EPERM);
518
519         if (S_ISDIR(ma->ma_attr.la_mode) &&
520             spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
521                 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
522                 struct lu_ucred *uc = mdt_ucred(info);
523                 struct obd_export *exp = mdt_info_req(info)->rq_export;
524
525                 /* Only new clients can create remote dir( >= 2.4) and
526                  * striped dir(>= 2.6), old client will return -ENOTSUPP
527                  */
528                 if (!mdt_is_dne_client(exp))
529                         RETURN(-ENOTSUPP);
530
531                 if (le32_to_cpu(lum->lum_stripe_count) > 1) {
532                         if (!mdt_is_striped_client(exp))
533                                 RETURN(-ENOTSUPP);
534
535                         if (!mdt->mdt_enable_striped_dir)
536                                 RETURN(-EPERM);
537                 } else if (!mdt->mdt_enable_remote_dir) {
538                         RETURN(-EPERM);
539                 }
540
541                 if ((!(exp_connect_flags2(exp) & OBD_CONNECT2_CRUSH)) &&
542                     (le32_to_cpu(lum->lum_hash_type) & LMV_HASH_TYPE_MASK) ==
543                     LMV_HASH_TYPE_CRUSH)
544                         RETURN(-EPROTO);
545
546                 if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
547                     uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
548                     mdt->mdt_enable_remote_dir_gid != -1)
549                         RETURN(-EPERM);
550
551                 /* restripe if later found dir exists, MDS_OPEN_CREAT means
552                  * this is create only, don't try restripe.
553                  */
554                 if (mdt->mdt_enable_dir_restripe &&
555                     le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT &&
556                     !(spec->sp_cr_flags & MDS_OPEN_CREAT))
557                         restripe = true;
558         }
559
560         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
561
562         parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
563         if (IS_ERR(parent))
564                 RETURN(PTR_ERR(parent));
565
566         if (!mdt_object_exists(parent))
567                 GOTO(put_parent, rc = -ENOENT);
568
569         rc = mdt_check_enc(info, parent);
570         if (rc)
571                 GOTO(put_parent, rc);
572
573         /*
574          * LU-10235: check if name exists locklessly first to avoid massive
575          * lock recalls on existing directories.
576          */
577         rc = mdt_lookup_version_check(info, parent, &rr->rr_name,
578                                       &info->mti_tmp_fid1, 1);
579         if (rc == 0) {
580                 if (!restripe)
581                         GOTO(put_parent, rc = -EEXIST);
582
583                 rc = mdt_restripe(info, parent, &rr->rr_name, rr->rr_fid2, spec,
584                                   ma);
585         }
586
587         /* -ENOENT is expected here */
588         if (rc != -ENOENT)
589                 GOTO(put_parent, rc);
590
591         /* save version of file name for replay, it must be ENOENT here */
592         mdt_enoent_version_save(info, 1);
593
594         OBD_RACE(OBD_FAIL_MDS_CREATE_RACE);
595
596         lh = &info->mti_lh[MDT_LH_PARENT];
597         mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
598         rc = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE);
599         if (rc)
600                 GOTO(put_parent, rc);
601
602         if (!mdt_object_remote(parent)) {
603                 rc = mdt_version_get_check_save(info, parent, 0);
604                 if (rc)
605                         GOTO(unlock_parent, rc);
606         }
607
608         child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
609         if (unlikely(IS_ERR(child)))
610                 GOTO(unlock_parent, rc = PTR_ERR(child));
611
612         ma->ma_need = MA_INODE;
613         ma->ma_valid = 0;
614
615         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
616                         OBD_FAIL_MDS_REINT_CREATE_WRITE);
617
618         /* Version of child will be updated on disk. */
619         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
620         rc = mdt_version_get_check_save(info, child, 2);
621         if (rc)
622                 GOTO(put_child, rc);
623
624         /*
625          * Do not perform lookup sanity check. We know that name does
626          * not exist.
627          */
628         info->mti_spec.sp_cr_lookup = 0;
629         info->mti_spec.sp_feat = &dt_directory_features;
630
631         rc = mdo_create(info->mti_env, mdt_object_child(parent), &rr->rr_name,
632                         mdt_object_child(child), &info->mti_spec, ma);
633         if (rc == 0)
634                 rc = mdt_attr_get_complex(info, child, ma);
635
636         if (rc < 0)
637                 GOTO(put_child, rc);
638
639         /*
640          * On DNE, we need to eliminate dependey between 'mkdir a' and
641          * 'mkdir a/b' if b is a striped directory, to achieve this, two
642          * things are done below:
643          * 1. save child and slaves lock.
644          * 2. if the child is a striped directory, relock parent so to
645          *    compare against with COS locks to ensure parent was
646          *    committed to disk.
647          */
648         if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
649                 struct mdt_lock_handle *lhc;
650                 struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
651                 bool cos_incompat;
652
653                 rc = mdt_object_striped(info, child);
654                 if (rc < 0)
655                         GOTO(put_child, rc);
656
657                 cos_incompat = rc;
658                 if (cos_incompat) {
659                         if (!mdt_object_remote(parent)) {
660                                 mdt_object_unlock(info, parent, lh, 1);
661                                 mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
662                                 rc = mdt_reint_object_lock(info, parent, lh,
663                                                            MDS_INODELOCK_UPDATE,
664                                                            true);
665                                 if (rc)
666                                         GOTO(put_child, rc);
667                         }
668                 }
669
670                 lhc = &info->mti_lh[MDT_LH_CHILD];
671                 mdt_lock_handle_init(lhc);
672                 mdt_lock_reg_init(lhc, LCK_PW);
673                 rc = mdt_reint_striped_lock(info, child, lhc,
674                                             MDS_INODELOCK_UPDATE, einfo,
675                                             cos_incompat);
676                 if (rc)
677                         GOTO(put_child, rc);
678
679                 mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
680         }
681
682         /* Return fid & attr to client. */
683         if (ma->ma_valid & MA_INODE)
684                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
685                                    mdt_object_fid(child));
686         EXIT;
687 put_child:
688         mdt_object_put(info->mti_env, child);
689 unlock_parent:
690         mdt_object_unlock(info, parent, lh, rc);
691 put_parent:
692         mdt_object_put(info->mti_env, parent);
693         return rc;
694 }
695
696 static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
697                         struct md_attr *ma)
698 {
699         struct mdt_lock_handle  *lh;
700         int do_vbr = ma->ma_attr.la_valid &
701                         (LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS);
702         __u64 lockpart = MDS_INODELOCK_UPDATE;
703         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
704         bool cos_incompat;
705         int rc;
706
707         ENTRY;
708         rc = mdt_object_striped(info, mo);
709         if (rc < 0)
710                 RETURN(rc);
711
712         cos_incompat = rc;
713
714         lh = &info->mti_lh[MDT_LH_PARENT];
715         mdt_lock_reg_init(lh, LCK_PW);
716
717         /* Even though the new MDT will grant PERM lock to the old
718          * client, but the old client will almost ignore that during
719          * So it needs to revoke both LOOKUP and PERM lock here, so
720          * both new and old client can cancel the dcache
721          */
722         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
723                 lockpart |= MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM;
724         /* Clear xattr cache on clients, so the virtual project ID xattr
725          * can get the new project ID
726          */
727         if (ma->ma_attr.la_valid & LA_PROJID)
728                 lockpart |= MDS_INODELOCK_XATTR;
729
730         rc = mdt_reint_striped_lock(info, mo, lh, lockpart, einfo,
731                                     cos_incompat);
732         if (rc != 0)
733                 RETURN(rc);
734
735         /* all attrs are packed into mti_attr in unpack_setattr */
736         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
737                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
738
739         /* VBR: update version if attr changed are important for recovery */
740         if (do_vbr) {
741                 /* update on-disk version of changed object */
742                 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mo));
743                 rc = mdt_version_get_check_save(info, mo, 0);
744                 if (rc)
745                         GOTO(out_unlock, rc);
746         }
747
748         /* Ensure constant striping during chown(). See LU-2789. */
749         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
750                 mutex_lock(&mo->mot_lov_mutex);
751
752         /* all attrs are packed into mti_attr in unpack_setattr */
753         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
754
755         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
756                 mutex_unlock(&mo->mot_lov_mutex);
757
758         if (rc != 0)
759                 GOTO(out_unlock, rc);
760         mdt_dom_obj_lvb_update(info->mti_env, mo, NULL, false);
761         EXIT;
762 out_unlock:
763         mdt_reint_striped_unlock(info, mo, lh, einfo, rc);
764         return rc;
765 }
766
767 /**
768  * Check HSM flags and add HS_DIRTY flag if relevant.
769  *
770  * A file could be set dirty only if it has a copy in the backend (HS_EXISTS)
771  * and is not RELEASED.
772  */
773 int mdt_add_dirty_flag(struct mdt_thread_info *info, struct mdt_object *mo,
774                         struct md_attr *ma)
775 {
776         struct lu_ucred *uc = mdt_ucred(info);
777         kernel_cap_t cap_saved;
778         int rc;
779
780         ENTRY;
781         /* If the file was modified, add the dirty flag */
782         ma->ma_need = MA_HSM;
783         rc = mdt_attr_get_complex(info, mo, ma);
784         if (rc) {
785                 CERROR("file attribute read error for "DFID": %d.\n",
786                         PFID(mdt_object_fid(mo)), rc);
787                 RETURN(rc);
788         }
789
790         /* If an up2date copy exists in the backend, add dirty flag */
791         if ((ma->ma_valid & MA_HSM) && (ma->ma_hsm.mh_flags & HS_EXISTS)
792             && !(ma->ma_hsm.mh_flags & (HS_DIRTY|HS_RELEASED))) {
793                 ma->ma_hsm.mh_flags |= HS_DIRTY;
794
795                 /* Bump cap so that closes from non-owner writers can
796                  * set the HSM state to dirty.
797                  */
798                 cap_saved = uc->uc_cap;
799                 cap_raise(uc->uc_cap, CAP_FOWNER);
800                 rc = mdt_hsm_attr_set(info, mo, &ma->ma_hsm);
801                 uc->uc_cap = cap_saved;
802                 if (rc)
803                         CERROR("file attribute change error for "DFID": %d\n",
804                                 PFID(mdt_object_fid(mo)), rc);
805         }
806
807         RETURN(rc);
808 }
809
810 static int mdt_reint_setattr(struct mdt_thread_info *info,
811                              struct mdt_lock_handle *lhc)
812 {
813         struct mdt_device *mdt = info->mti_mdt;
814         struct md_attr *ma = &info->mti_attr;
815         struct mdt_reint_record *rr = &info->mti_rr;
816         struct ptlrpc_request *req = mdt_info_req(info);
817         struct mdt_object *mo;
818         struct mdt_body *repbody;
819         ktime_t kstart = ktime_get();
820         int rc, rc2;
821
822         ENTRY;
823         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
824                   (unsigned int)ma->ma_attr.la_valid);
825
826         if (info->mti_dlm_req)
827                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
828
829         OBD_RACE(OBD_FAIL_PTLRPC_RESEND_RACE);
830
831         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
832         mo = mdt_object_find(info->mti_env, mdt, rr->rr_fid1);
833         if (IS_ERR(mo))
834                 GOTO(out, rc = PTR_ERR(mo));
835
836         if (!mdt_object_exists(mo))
837                 GOTO(out_put, rc = -ENOENT);
838
839         if (mdt_object_remote(mo))
840                 GOTO(out_put, rc = -EREMOTE);
841
842         ma->ma_enable_chprojid_gid = mdt->mdt_enable_chprojid_gid;
843         /* revoke lease lock if size is going to be changed */
844         if (unlikely(ma->ma_attr.la_valid & LA_SIZE &&
845                      !(ma->ma_attr_flags & MDS_TRUNC_KEEP_LEASE) &&
846                      atomic_read(&mo->mot_lease_count) > 0)) {
847                 down_read(&mo->mot_open_sem);
848
849                 if (atomic_read(&mo->mot_lease_count) > 0) { /* lease exists */
850                         lhc = &info->mti_lh[MDT_LH_LOCAL];
851                         mdt_lock_reg_init(lhc, LCK_CW);
852
853                         rc = mdt_object_lock(info, mo, lhc, MDS_INODELOCK_OPEN);
854                         if (rc != 0) {
855                                 up_read(&mo->mot_open_sem);
856                                 GOTO(out_put, rc);
857                         }
858
859                         /* revoke lease lock */
860                         mdt_object_unlock(info, mo, lhc, 1);
861                 }
862                 up_read(&mo->mot_open_sem);
863         }
864
865         if (ma->ma_attr.la_valid & LA_SIZE || rr->rr_flags & MRF_OPEN_TRUNC) {
866                 /* Check write access for the O_TRUNC case */
867                 if (mdt_write_read(mo) < 0)
868                         GOTO(out_put, rc = -ETXTBSY);
869
870                 /* LU-10286: compatibility check for FLR.
871                  * Please check the comment in mdt_finish_open() for details
872                  */
873                 if (!exp_connect_flr(info->mti_exp) ||
874                     !exp_connect_overstriping(info->mti_exp)) {
875                         rc = mdt_big_xattr_get(info, mo, XATTR_NAME_LOV);
876                         if (rc < 0 && rc != -ENODATA)
877                                 GOTO(out_put, rc);
878
879                         if (!exp_connect_flr(info->mti_exp)) {
880                                 if (rc > 0 &&
881                                     mdt_lmm_is_flr(info->mti_big_lmm))
882                                         GOTO(out_put, rc = -EOPNOTSUPP);
883                         }
884
885                         if (!exp_connect_overstriping(info->mti_exp)) {
886                                 if (rc > 0 &&
887                                     mdt_lmm_is_overstriping(info->mti_big_lmm))
888                                         GOTO(out_put, rc = -EOPNOTSUPP);
889                         }
890                 }
891
892                 /* For truncate, the file size sent from client
893                  * is believable, but the blocks are incorrect,
894                  * which makes the block size in LSOM attribute
895                  * inconsisent with the real block size.
896                  */
897                 rc = mdt_lsom_update(info, mo, true);
898                 if (rc)
899                         GOTO(out_put, rc);
900         }
901
902         if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
903                 if (ma->ma_valid & MA_LOV)
904                         GOTO(out_put, rc = -EPROTO);
905
906                 /* MDT supports FMD for regular files due to Data-on-MDT */
907                 if (S_ISREG(lu_object_attr(&mo->mot_obj)) &&
908                     ma->ma_attr.la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) {
909                         tgt_fmd_update(info->mti_exp, mdt_object_fid(mo),
910                                        req->rq_xid);
911
912                         if (ma->ma_attr.la_valid & LA_MTIME) {
913                                 rc = mdt_attr_get_pfid(info, mo, &ma->ma_pfid);
914                                 if (!rc)
915                                         ma->ma_valid |= MA_PFID;
916                         }
917                 }
918
919                 rc = mdt_attr_set(info, mo, ma);
920                 if (rc)
921                         GOTO(out_put, rc);
922         } else if ((ma->ma_valid & (MA_LOV | MA_LMV)) &&
923                    (ma->ma_valid & MA_INODE)) {
924                 struct lu_buf *buf = &info->mti_buf;
925                 struct lu_ucred *uc = mdt_ucred(info);
926                 struct mdt_lock_handle *lh;
927                 const char *name;
928                 __u64 lockpart = MDS_INODELOCK_XATTR;
929
930                 /* reject if either remote or striped dir is disabled */
931                 if (ma->ma_valid & MA_LMV) {
932                         if (!mdt->mdt_enable_remote_dir ||
933                             !mdt->mdt_enable_striped_dir)
934                                 GOTO(out_put, rc = -EPERM);
935
936                         if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
937                             uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
938                             mdt->mdt_enable_remote_dir_gid != -1)
939                                 GOTO(out_put, rc = -EPERM);
940                 }
941
942                 if (!S_ISDIR(lu_object_attr(&mo->mot_obj)))
943                         GOTO(out_put, rc = -ENOTDIR);
944
945                 if (ma->ma_attr.la_valid != 0)
946                         GOTO(out_put, rc = -EPROTO);
947
948                 lh = &info->mti_lh[MDT_LH_PARENT];
949                 mdt_lock_reg_init(lh, LCK_PW);
950
951                 if (ma->ma_valid & MA_LOV) {
952                         buf->lb_buf = ma->ma_lmm;
953                         buf->lb_len = ma->ma_lmm_size;
954                         name = XATTR_NAME_LOV;
955                 } else {
956                         struct lmv_user_md *lmu = &ma->ma_lmv->lmv_user_md;
957                         struct lu_fid *pfid = &info->mti_tmp_fid1;
958                         struct lu_name *pname = &info->mti_name;
959                         const char dotdot[] = "..";
960                         struct mdt_object *pobj;
961
962                         buf->lb_buf = lmu;
963                         buf->lb_len = ma->ma_lmv_size;
964                         name = XATTR_NAME_DEFAULT_LMV;
965
966                         if (fid_is_root(rr->rr_fid1)) {
967                                 lockpart |= MDS_INODELOCK_LOOKUP;
968                         } else {
969                                 /* force client to update dir default layout */
970                                 fid_zero(pfid);
971                                 pname->ln_name = dotdot;
972                                 pname->ln_namelen = sizeof(dotdot);
973                                 rc = mdo_lookup(info->mti_env,
974                                                 mdt_object_child(mo), pname,
975                                                 pfid, NULL);
976                                 if (rc)
977                                         GOTO(out_put, rc);
978
979                                 pobj = mdt_object_find(info->mti_env, mdt,
980                                                        pfid);
981                                 if (IS_ERR(pobj))
982                                         GOTO(out_put, rc = PTR_ERR(pobj));
983
984                                 if (mdt_object_remote(pobj))
985                                         rc = mdt_remote_object_lock(info, pobj,
986                                                 mdt_object_fid(mo),
987                                                 &lh->mlh_rreg_lh, LCK_EX,
988                                                 MDS_INODELOCK_LOOKUP, false);
989                                 else
990                                         lockpart |= MDS_INODELOCK_LOOKUP;
991
992                                 mdt_object_put(info->mti_env, pobj);
993
994                                 if (rc)
995                                         GOTO(out_put, rc);
996                         }
997                 }
998
999                 rc = mdt_object_lock(info, mo, lh, lockpart);
1000                 if (rc != 0)
1001                         GOTO(out_put, rc);
1002
1003                 rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), buf,
1004                                   name, 0);
1005
1006                 mdt_object_unlock(info, mo, lh, rc);
1007                 if (rc)
1008                         GOTO(out_put, rc);
1009         } else {
1010                 GOTO(out_put, rc = -EPROTO);
1011         }
1012
1013         /* If file data is modified, add the dirty flag */
1014         if (ma->ma_attr_flags & MDS_DATA_MODIFIED)
1015                 rc = mdt_add_dirty_flag(info, mo, ma);
1016
1017         ma->ma_need = MA_INODE;
1018         ma->ma_valid = 0;
1019         rc = mdt_attr_get_complex(info, mo, ma);
1020         if (rc != 0)
1021                 GOTO(out_put, rc);
1022
1023         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
1024
1025         EXIT;
1026 out_put:
1027         mdt_object_put(info->mti_env, mo);
1028 out:
1029         if (rc == 0)
1030                 mdt_counter_incr(req, LPROC_MDT_SETATTR,
1031                                  ktime_us_delta(ktime_get(), kstart));
1032
1033         mdt_client_compatibility(info);
1034         rc2 = mdt_fix_reply(info);
1035         if (rc == 0)
1036                 rc = rc2;
1037         return rc;
1038 }
1039
1040 static int mdt_reint_create(struct mdt_thread_info *info,
1041                             struct mdt_lock_handle *lhc)
1042 {
1043         struct ptlrpc_request   *req = mdt_info_req(info);
1044         ktime_t                 kstart = ktime_get();
1045         int                     rc;
1046
1047         ENTRY;
1048         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
1049                 RETURN(err_serious(-ESTALE));
1050
1051         if (info->mti_dlm_req)
1052                 ldlm_request_cancel(mdt_info_req(info),
1053                                     info->mti_dlm_req, 0, LATF_SKIP);
1054
1055         if (!lu_name_is_valid(&info->mti_rr.rr_name))
1056                 RETURN(-EPROTO);
1057
1058         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
1059         case S_IFDIR:
1060         case S_IFREG:
1061         case S_IFLNK:
1062         case S_IFCHR:
1063         case S_IFBLK:
1064         case S_IFIFO:
1065         case S_IFSOCK:
1066                 break;
1067         default:
1068                 CERROR("%s: Unsupported mode %o\n",
1069                        mdt_obd_name(info->mti_mdt),
1070                        info->mti_attr.ma_attr.la_mode);
1071                 RETURN(err_serious(-EOPNOTSUPP));
1072         }
1073
1074         rc = mdt_create(info);
1075         if (rc == 0) {
1076                 if ((info->mti_attr.ma_attr.la_mode & S_IFMT) == S_IFDIR)
1077                         mdt_counter_incr(req, LPROC_MDT_MKDIR,
1078                                          ktime_us_delta(ktime_get(), kstart));
1079                 else
1080                         /* Special file should stay on the same node as parent*/
1081                         mdt_counter_incr(req, LPROC_MDT_MKNOD,
1082                                          ktime_us_delta(ktime_get(), kstart));
1083         }
1084
1085         RETURN(rc);
1086 }
1087
1088 /*
1089  * VBR: save parent version in reply and child version getting by its name.
1090  * Version of child is getting and checking during its lookup. If
1091  */
1092 static int mdt_reint_unlink(struct mdt_thread_info *info,
1093                             struct mdt_lock_handle *lhc)
1094 {
1095         struct mdt_reint_record *rr = &info->mti_rr;
1096         struct ptlrpc_request *req = mdt_info_req(info);
1097         struct md_attr *ma = &info->mti_attr;
1098         struct lu_fid *child_fid = &info->mti_tmp_fid1;
1099         struct mdt_object *mp;
1100         struct mdt_object *mc;
1101         struct mdt_lock_handle *parent_lh;
1102         struct mdt_lock_handle *child_lh;
1103         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
1104         __u64 lock_ibits;
1105         bool cos_incompat = false;
1106         int no_name = 0;
1107         ktime_t kstart = ktime_get();
1108         int rc;
1109
1110         ENTRY;
1111         DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1),
1112                   PNAME(&rr->rr_name));
1113
1114         if (info->mti_dlm_req)
1115                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1116
1117         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
1118                 RETURN(err_serious(-ENOENT));
1119
1120         if (!fid_is_md_operative(rr->rr_fid1))
1121                 RETURN(-EPERM);
1122
1123         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1124         if (IS_ERR(mp))
1125                 RETURN(PTR_ERR(mp));
1126
1127         if (mdt_object_remote(mp)) {
1128                 cos_incompat = true;
1129         } else {
1130                 rc = mdt_version_get_check_save(info, mp, 0);
1131                 if (rc)
1132                         GOTO(put_parent, rc);
1133         }
1134
1135         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN);
1136         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN2);
1137 relock:
1138         parent_lh = &info->mti_lh[MDT_LH_PARENT];
1139         mdt_lock_pdo_init(parent_lh, LCK_PW, &rr->rr_name);
1140         rc = mdt_reint_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE,
1141                                    cos_incompat);
1142         if (rc != 0)
1143                 GOTO(put_parent, rc);
1144
1145         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID) {
1146                 *child_fid = *rr->rr_fid2;
1147         } else {
1148                 /* lookup child object along with version checking */
1149                 fid_zero(child_fid);
1150                 rc = mdt_lookup_version_check(info, mp, &rr->rr_name, child_fid,
1151                                               1);
1152                 if (rc != 0) {
1153                         /* Name might not be able to find during resend of
1154                          * remote unlink, considering following case.
1155                          * dir_A is a remote directory, the name entry of
1156                          * dir_A is on MDT0, the directory is on MDT1,
1157                          *
1158                          * 1. client sends unlink req to MDT1.
1159                          * 2. MDT1 sends name delete update to MDT0.
1160                          * 3. name entry is being deleted in MDT0 synchronously.
1161                          * 4. MDT1 is restarted.
1162                          * 5. client resends unlink req to MDT1. So it can not
1163                          *    find the name entry on MDT0 anymore.
1164                          * In this case, MDT1 only needs to destory the local
1165                          * directory.
1166                          */
1167                         if (mdt_object_remote(mp) && rc == -ENOENT &&
1168                             !fid_is_zero(rr->rr_fid2) &&
1169                             lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
1170                                 no_name = 1;
1171                                 *child_fid = *rr->rr_fid2;
1172                         } else {
1173                                 GOTO(unlock_parent, rc);
1174                         }
1175                 }
1176         }
1177
1178         if (!fid_is_md_operative(child_fid))
1179                 GOTO(unlock_parent, rc = -EPERM);
1180
1181         /* We will lock the child regardless it is local or remote. No harm. */
1182         mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
1183         if (IS_ERR(mc))
1184                 GOTO(unlock_parent, rc = PTR_ERR(mc));
1185
1186         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID) {
1187                 /* In this case, child fid is embedded in the request, and we do
1188                  * not have a proper name as rr_name contains an encoded
1189                  * hash. So find name that matches provided hash.
1190                  */
1191                 if (!find_name_matching_hash(info, &rr->rr_name,
1192                                              NULL, mc))
1193                         GOTO(put_child, rc = -ENOENT);
1194         }
1195
1196         if (!cos_incompat) {
1197                 rc = mdt_object_striped(info, mc);
1198                 if (rc < 0)
1199                         GOTO(put_child, rc);
1200
1201                 cos_incompat = rc;
1202                 if (cos_incompat) {
1203                         mdt_object_put(info->mti_env, mc);
1204                         mdt_object_unlock(info, mp, parent_lh, -EAGAIN);
1205                         goto relock;
1206                 }
1207         }
1208
1209         child_lh = &info->mti_lh[MDT_LH_CHILD];
1210         mdt_lock_reg_init(child_lh, LCK_EX);
1211         if (info->mti_spec.sp_rm_entry) {
1212                 struct lu_ucred *uc  = mdt_ucred(info);
1213
1214                 if (!mdt_is_dne_client(req->rq_export))
1215                         /* Return -ENOTSUPP for old client */
1216                         GOTO(put_child, rc = -ENOTSUPP);
1217
1218                 if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN))
1219                         GOTO(put_child, rc = -EPERM);
1220
1221                 ma->ma_need = MA_INODE;
1222                 ma->ma_valid = 0;
1223                 rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1224                                 NULL, &rr->rr_name, ma, no_name);
1225                 GOTO(put_child, rc);
1226         }
1227
1228         if (mdt_object_remote(mc)) {
1229                 struct mdt_body  *repbody;
1230
1231                 if (!fid_is_zero(rr->rr_fid2)) {
1232                         CDEBUG(D_INFO, "%s: name "DNAME" cannot find "DFID"\n",
1233                                mdt_obd_name(info->mti_mdt),
1234                                PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1235                         GOTO(put_child, rc = -ENOENT);
1236                 }
1237                 CDEBUG(D_INFO, "%s: name "DNAME": "DFID" is on another MDT\n",
1238                        mdt_obd_name(info->mti_mdt),
1239                        PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1240
1241                 if (!mdt_is_dne_client(req->rq_export))
1242                         /* Return -ENOTSUPP for old client */
1243                         GOTO(put_child, rc = -ENOTSUPP);
1244
1245                 /* Revoke the LOOKUP lock of the remote object granted by
1246                  * this MDT. Since the unlink will happen on another MDT,
1247                  * it will release the LOOKUP lock right away. Then What
1248                  * would happen if another client try to grab the LOOKUP
1249                  * lock at the same time with unlink XXX
1250                  */
1251                 mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_LOOKUP);
1252                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1253                 LASSERT(repbody != NULL);
1254                 repbody->mbo_fid1 = *mdt_object_fid(mc);
1255                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
1256                 GOTO(unlock_child, rc = -EREMOTE);
1257         }
1258         /* We used to acquire MDS_INODELOCK_FULL here but we can't do
1259          * this now because a running HSM restore on the child (unlink
1260          * victim) will hold the layout lock. See LU-4002.
1261          */
1262         lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
1263         if (mdt_object_remote(mp)) {
1264                 /* Enqueue lookup lock from parent MDT */
1265                 rc = mdt_remote_object_lock(info, mp, mdt_object_fid(mc),
1266                                             &child_lh->mlh_rreg_lh,
1267                                             child_lh->mlh_rreg_mode,
1268                                             MDS_INODELOCK_LOOKUP, false);
1269                 if (rc != ELDLM_OK)
1270                         GOTO(put_child, rc);
1271
1272                 lock_ibits &= ~MDS_INODELOCK_LOOKUP;
1273         }
1274
1275         rc = mdt_reint_striped_lock(info, mc, child_lh, lock_ibits, einfo,
1276                                     cos_incompat);
1277         if (rc != 0)
1278                 GOTO(put_child, rc);
1279
1280         /*
1281          * Now we can only make sure we need MA_INODE, in mdd layer, will check
1282          * whether need MA_LOV and MA_COOKIE.
1283          */
1284         ma->ma_need = MA_INODE;
1285         ma->ma_valid = 0;
1286
1287         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1288                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
1289         /* save version when object is locked */
1290         mdt_version_get_save(info, mc, 1);
1291
1292         mutex_lock(&mc->mot_lov_mutex);
1293
1294         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1295                         mdt_object_child(mc), &rr->rr_name, ma, no_name);
1296
1297         mutex_unlock(&mc->mot_lov_mutex);
1298         if (rc != 0)
1299                 GOTO(unlock_child, rc);
1300
1301         if (!lu_object_is_dying(&mc->mot_header)) {
1302                 rc = mdt_attr_get_complex(info, mc, ma);
1303                 if (rc)
1304                         GOTO(out_stat, rc);
1305         } else if (mdt_dom_check_for_discard(info, mc)) {
1306                 mdt_dom_discard_data(info, mc);
1307         }
1308         mdt_handle_last_unlink(info, mc, ma);
1309
1310 out_stat:
1311         if (ma->ma_valid & MA_INODE) {
1312                 switch (ma->ma_attr.la_mode & S_IFMT) {
1313                 case S_IFDIR:
1314                         mdt_counter_incr(req, LPROC_MDT_RMDIR,
1315                                          ktime_us_delta(ktime_get(), kstart));
1316                         break;
1317                 case S_IFREG:
1318                 case S_IFLNK:
1319                 case S_IFCHR:
1320                 case S_IFBLK:
1321                 case S_IFIFO:
1322                 case S_IFSOCK:
1323                         mdt_counter_incr(req, LPROC_MDT_UNLINK,
1324                                          ktime_us_delta(ktime_get(), kstart));
1325                         break;
1326                 default:
1327                         LASSERTF(0, "bad file type %o unlinking\n",
1328                                 ma->ma_attr.la_mode);
1329                 }
1330         }
1331
1332         EXIT;
1333
1334 unlock_child:
1335         mdt_reint_striped_unlock(info, mc, child_lh, einfo, rc);
1336 put_child:
1337         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID &&
1338             info->mti_big_buf.lb_buf)
1339                 lu_buf_free(&info->mti_big_buf);
1340         mdt_object_put(info->mti_env, mc);
1341 unlock_parent:
1342         mdt_object_unlock(info, mp, parent_lh, rc);
1343 put_parent:
1344         mdt_object_put(info->mti_env, mp);
1345         CFS_RACE_WAKEUP(OBD_FAIL_OBD_ZERO_NLINK_RACE);
1346         return rc;
1347 }
1348
1349 /*
1350  * VBR: save versions in reply: 0 - parent; 1 - child by fid; 2 - target by
1351  * name.
1352  */
1353 static int mdt_reint_link(struct mdt_thread_info *info,
1354                           struct mdt_lock_handle *lhc)
1355 {
1356         struct mdt_reint_record *rr = &info->mti_rr;
1357         struct ptlrpc_request   *req = mdt_info_req(info);
1358         struct md_attr          *ma = &info->mti_attr;
1359         struct mdt_object       *ms;
1360         struct mdt_object       *mp;
1361         struct mdt_lock_handle  *lhs;
1362         struct mdt_lock_handle  *lhp;
1363         ktime_t kstart = ktime_get();
1364         bool cos_incompat;
1365         int rc;
1366
1367         ENTRY;
1368         DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/"DNAME,
1369                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), PNAME(&rr->rr_name));
1370
1371         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
1372                 RETURN(err_serious(-ENOENT));
1373
1374         if (OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_RESEND_RACE) ||
1375             OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND)) {
1376                 req->rq_no_reply = 1;
1377                 RETURN(err_serious(-ENOENT));
1378         }
1379
1380         if (info->mti_dlm_req)
1381                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1382
1383         /* Invalid case so return error immediately instead of
1384          * processing it
1385          */
1386         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
1387                 RETURN(-EPERM);
1388
1389         if (!fid_is_md_operative(rr->rr_fid1) ||
1390             !fid_is_md_operative(rr->rr_fid2))
1391                 RETURN(-EPERM);
1392
1393         /* step 1: find target parent dir */
1394         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
1395         if (IS_ERR(mp))
1396                 RETURN(PTR_ERR(mp));
1397
1398         rc = mdt_version_get_check_save(info, mp, 0);
1399         if (rc)
1400                 GOTO(put_parent, rc);
1401
1402         rc = mdt_check_enc(info, mp);
1403         if (rc)
1404                 GOTO(put_parent, rc);
1405
1406         /* step 2: find source */
1407         ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1408         if (IS_ERR(ms))
1409                 GOTO(put_parent, rc = PTR_ERR(ms));
1410
1411         if (!mdt_object_exists(ms)) {
1412                 CDEBUG(D_INFO, "%s: "DFID" does not exist.\n",
1413                        mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1));
1414                 GOTO(put_source, rc = -ENOENT);
1415         }
1416
1417         cos_incompat = (mdt_object_remote(mp) || mdt_object_remote(ms));
1418
1419         OBD_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
1420
1421         lhp = &info->mti_lh[MDT_LH_PARENT];
1422         mdt_lock_pdo_init(lhp, LCK_PW, &rr->rr_name);
1423         rc = mdt_reint_object_lock(info, mp, lhp, MDS_INODELOCK_UPDATE,
1424                                    cos_incompat);
1425         if (rc != 0)
1426                 GOTO(put_source, rc);
1427
1428         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
1429
1430         lhs = &info->mti_lh[MDT_LH_CHILD];
1431         mdt_lock_reg_init(lhs, LCK_EX);
1432         rc = mdt_reint_object_lock(info, ms, lhs,
1433                                    MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
1434                                    cos_incompat);
1435         if (rc != 0)
1436                 GOTO(unlock_parent, rc);
1437
1438         /* step 3: link it */
1439         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1440                         OBD_FAIL_MDS_REINT_LINK_WRITE);
1441
1442         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(ms));
1443         rc = mdt_version_get_check_save(info, ms, 1);
1444         if (rc)
1445                 GOTO(unlock_source, rc);
1446
1447         /** check target version by name during replay */
1448         rc = mdt_lookup_version_check(info, mp, &rr->rr_name,
1449                                       &info->mti_tmp_fid1, 2);
1450         if (rc != 0 && rc != -ENOENT)
1451                 GOTO(unlock_source, rc);
1452         /* save version of file name for replay, it must be ENOENT here */
1453         if (!req_is_replay(mdt_info_req(info))) {
1454                 if (rc != -ENOENT) {
1455                         CDEBUG(D_INFO, "link target "DNAME" existed!\n",
1456                                PNAME(&rr->rr_name));
1457                         GOTO(unlock_source, rc = -EEXIST);
1458                 }
1459                 info->mti_ver[2] = ENOENT_VERSION;
1460                 mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2);
1461         }
1462
1463         rc = mdo_link(info->mti_env, mdt_object_child(mp),
1464                       mdt_object_child(ms), &rr->rr_name, ma);
1465
1466         if (rc == 0)
1467                 mdt_counter_incr(req, LPROC_MDT_LINK,
1468                                  ktime_us_delta(ktime_get(), kstart));
1469
1470         EXIT;
1471 unlock_source:
1472         mdt_object_unlock(info, ms, lhs, rc);
1473 unlock_parent:
1474         mdt_object_unlock(info, mp, lhp, rc);
1475 put_source:
1476         mdt_object_put(info->mti_env, ms);
1477 put_parent:
1478         mdt_object_put(info->mti_env, mp);
1479         return rc;
1480 }
1481 /**
1482  * lock the part of the directory according to the hash of the name
1483  * (lh->mlh_pdo_hash) in parallel directory lock.
1484  */
1485 static int mdt_pdir_hash_lock(struct mdt_thread_info *info,
1486                               struct mdt_lock_handle *lh,
1487                               struct mdt_object *obj, __u64 ibits,
1488                               bool cos_incompat)
1489 {
1490         struct ldlm_res_id *res = &info->mti_res_id;
1491         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1492         union ldlm_policy_data *policy = &info->mti_policy;
1493         __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
1494         int rc;
1495
1496         /*
1497          * Finish res_id initializing by name hash marking part of
1498          * directory which is taking modification.
1499          */
1500         LASSERT(lh->mlh_pdo_hash != 0);
1501         fid_build_pdo_res_name(mdt_object_fid(obj), lh->mlh_pdo_hash, res);
1502         memset(policy, 0, sizeof(*policy));
1503         policy->l_inodebits.bits = ibits;
1504         if (cos_incompat &&
1505             (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX))
1506                 dlmflags |= LDLM_FL_COS_INCOMPAT;
1507         /*
1508          * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
1509          * going to be sent to client. If it is - mdt_intent_policy() path will
1510          * fix it up and turn FL_LOCAL flag off.
1511          */
1512         rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode,
1513                           policy, res, dlmflags,
1514                           &info->mti_exp->exp_handle.h_cookie);
1515         return rc;
1516 }
1517
1518 /**
1519  * Get BFL lock for rename or migrate process.
1520  **/
1521 static int mdt_rename_lock(struct mdt_thread_info *info,
1522                            struct lustre_handle *lh)
1523 {
1524         int     rc;
1525
1526         ENTRY;
1527         if (mdt_seq_site(info->mti_mdt)->ss_node_id != 0) {
1528                 struct lu_fid *fid = &info->mti_tmp_fid1;
1529                 struct mdt_object *obj;
1530
1531                 /* XXX, right now, it has to use object API to
1532                  * enqueue lock cross MDT, so it will enqueue
1533                  * rename lock(with LUSTRE_BFL_FID) by root object
1534                  */
1535                 lu_root_fid(fid);
1536                 obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1537                 if (IS_ERR(obj))
1538                         RETURN(PTR_ERR(obj));
1539
1540                 rc = mdt_remote_object_lock(info, obj,
1541                                             &LUSTRE_BFL_FID, lh,
1542                                             LCK_EX,
1543                                             MDS_INODELOCK_UPDATE, false);
1544                 mdt_object_put(info->mti_env, obj);
1545         } else {
1546                 struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1547                 union ldlm_policy_data *policy = &info->mti_policy;
1548                 struct ldlm_res_id *res_id = &info->mti_res_id;
1549                 __u64 flags = 0;
1550
1551                 fid_build_reg_res_name(&LUSTRE_BFL_FID, res_id);
1552                 memset(policy, 0, sizeof(*policy));
1553                 policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
1554                 flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
1555                 rc = ldlm_cli_enqueue_local(info->mti_env, ns, res_id,
1556                                             LDLM_IBITS, policy, LCK_EX, &flags,
1557                                             ldlm_blocking_ast,
1558                                             ldlm_completion_ast, NULL, NULL, 0,
1559                                             LVB_T_NONE,
1560                                             &info->mti_exp->exp_handle.h_cookie,
1561                                             lh);
1562                 RETURN(rc);
1563         }
1564         RETURN(rc);
1565 }
1566
1567 static void mdt_rename_unlock(struct lustre_handle *lh)
1568 {
1569         ENTRY;
1570         LASSERT(lustre_handle_is_used(lh));
1571         /* Cancel the single rename lock right away */
1572         ldlm_lock_decref_and_cancel(lh, LCK_EX);
1573         EXIT;
1574 }
1575
1576 static struct mdt_object *mdt_parent_find_check(struct mdt_thread_info *info,
1577                                                 const struct lu_fid *fid,
1578                                                 int idx)
1579 {
1580         struct mdt_object *dir;
1581         int rc;
1582
1583         ENTRY;
1584         dir = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1585         if (IS_ERR(dir))
1586                 RETURN(dir);
1587
1588         /* check early, the real version will be saved after locking */
1589         rc = mdt_version_get_check(info, dir, idx);
1590         if (rc)
1591                 GOTO(out_put, rc);
1592
1593         if (!mdt_object_exists(dir))
1594                 GOTO(out_put, rc = -ENOENT);
1595
1596         if (!S_ISDIR(lu_object_attr(&dir->mot_obj)))
1597                 GOTO(out_put, rc = -ENOTDIR);
1598
1599         RETURN(dir);
1600 out_put:
1601         mdt_object_put(info->mti_env, dir);
1602         return ERR_PTR(rc);
1603 }
1604
1605 /*
1606  * in case obj is remote obj on its parent, revoke LOOKUP lock,
1607  * herein we don't really check it, just do revoke.
1608  */
1609 int mdt_revoke_remote_lookup_lock(struct mdt_thread_info *info,
1610                                   struct mdt_object *pobj,
1611                                   struct mdt_object *obj)
1612 {
1613         struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
1614         int rc;
1615
1616         mdt_lock_handle_init(lh);
1617         mdt_lock_reg_init(lh, LCK_EX);
1618
1619         if (mdt_object_remote(pobj)) {
1620                 /* don't bother to check if pobj and obj are on the same MDT. */
1621                 rc = mdt_remote_object_lock(info, pobj, mdt_object_fid(obj),
1622                                             &lh->mlh_rreg_lh, LCK_EX,
1623                                             MDS_INODELOCK_LOOKUP, false);
1624         } else if (mdt_object_remote(obj)) {
1625                 struct ldlm_res_id *res = &info->mti_res_id;
1626                 union ldlm_policy_data *policy = &info->mti_policy;
1627                 __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB |
1628                                  LDLM_FL_COS_INCOMPAT;
1629
1630                 fid_build_reg_res_name(mdt_object_fid(obj), res);
1631                 memset(policy, 0, sizeof(*policy));
1632                 policy->l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1633                 rc = mdt_fid_lock(info->mti_env, info->mti_mdt->mdt_namespace,
1634                                   &lh->mlh_reg_lh, LCK_EX, policy, res,
1635                                   dlmflags, NULL);
1636         } else {
1637                 /* do nothing if both are local */
1638                 return 0;
1639         }
1640
1641         if (rc != ELDLM_OK)
1642                 return rc;
1643
1644         /*
1645          * TODO, currently we don't save this lock because there is no place to
1646          * hold this lock handle, but to avoid race we need to save this lock.
1647          */
1648         mdt_object_unlock(info, NULL, lh, 1);
1649
1650         return 0;
1651 }
1652
1653 /*
1654  * operation may takes locks of linkea, or directory stripes, group them in
1655  * different list.
1656  */
1657 struct mdt_sub_lock {
1658         struct mdt_object *msl_obj;
1659         struct mdt_lock_handle msl_lh;
1660         struct list_head msl_linkage;
1661 };
1662
1663 static void mdt_unlock_list(struct mdt_thread_info *info,
1664                             struct list_head *list, int decref)
1665 {
1666         struct mdt_sub_lock *msl;
1667         struct mdt_sub_lock *tmp;
1668
1669         list_for_each_entry_safe(msl, tmp, list, msl_linkage) {
1670                 mdt_object_unlock_put(info, msl->msl_obj, &msl->msl_lh, decref);
1671                 list_del(&msl->msl_linkage);
1672                 OBD_FREE_PTR(msl);
1673         }
1674 }
1675
1676 static inline void mdt_migrate_object_unlock(struct mdt_thread_info *info,
1677                                              struct mdt_object *obj,
1678                                              struct mdt_lock_handle *lh,
1679                                              struct ldlm_enqueue_info *einfo,
1680                                              struct list_head *slave_locks,
1681                                              int decref)
1682 {
1683         if (mdt_object_remote(obj)) {
1684                 mdt_unlock_list(info, slave_locks, decref);
1685                 mdt_object_unlock(info, obj, lh, decref);
1686         } else {
1687                 mdt_reint_striped_unlock(info, obj, lh, einfo, decref);
1688         }
1689 }
1690
1691 /*
1692  * lock parents of links, and also check whether total locks don't exceed
1693  * RS_MAX_LOCKS.
1694  *
1695  * \retval      0 on success, and locks can be saved in ptlrpc_reply_stat
1696  * \retval      1 on success, but total lock count may exceed RS_MAX_LOCKS
1697  * \retval      -ev negative errno upon error
1698  */
1699 static int mdt_link_parents_lock(struct mdt_thread_info *info,
1700                                  struct mdt_object *pobj,
1701                                  const struct md_attr *ma,
1702                                  struct mdt_object *obj,
1703                                  struct mdt_lock_handle *lhp,
1704                                  struct ldlm_enqueue_info *peinfo,
1705                                  struct list_head *parent_slave_locks,
1706                                  struct list_head *link_locks)
1707 {
1708         struct mdt_device *mdt = info->mti_mdt;
1709         struct lu_buf *buf = &info->mti_big_buf;
1710         struct lu_name *lname = &info->mti_name;
1711         struct linkea_data ldata = { NULL };
1712         bool blocked = false;
1713         int local_lnkp_cnt = 0;
1714         int rc;
1715
1716         ENTRY;
1717         if (S_ISDIR(lu_object_attr(&obj->mot_obj)))
1718                 RETURN(0);
1719
1720         buf = lu_buf_check_and_alloc(buf, MAX_LINKEA_SIZE);
1721         if (buf->lb_buf == NULL)
1722                 RETURN(-ENOMEM);
1723
1724         ldata.ld_buf = buf;
1725         rc = mdt_links_read(info, obj, &ldata);
1726         if (rc) {
1727                 if (rc == -ENOENT || rc == -ENODATA)
1728                         rc = 0;
1729                 RETURN(rc);
1730         }
1731
1732         for (linkea_first_entry(&ldata); ldata.ld_lee && !rc;
1733              linkea_next_entry(&ldata)) {
1734                 struct mdt_object *lnkp;
1735                 struct mdt_sub_lock *msl;
1736                 struct lu_fid fid;
1737                 __u64 ibits;
1738
1739                 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, lname,
1740                                     &fid);
1741
1742                 /* check if it's also linked to parent */
1743                 if (lu_fid_eq(mdt_object_fid(pobj), &fid)) {
1744                         CDEBUG(D_INFO, "skip parent "DFID", reovke "DNAME"\n",
1745                                PFID(&fid), PNAME(lname));
1746                         /* in case link is remote object, revoke LOOKUP lock */
1747                         rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
1748                         continue;
1749                 }
1750
1751                 lnkp = NULL;
1752
1753                 /* check if it's linked to a stripe of parent */
1754                 if (ma->ma_valid & MA_LMV) {
1755                         struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1756                         struct lu_fid *stripe_fid = &info->mti_tmp_fid1;
1757                         int j = 0;
1758
1759                         for (; j < le32_to_cpu(lmv->lmv_stripe_count); j++) {
1760                                 fid_le_to_cpu(stripe_fid,
1761                                               &lmv->lmv_stripe_fids[j]);
1762                                 if (lu_fid_eq(stripe_fid, &fid)) {
1763                                         CDEBUG(D_INFO, "skip stripe "DFID
1764                                                ", reovke "DNAME"\n",
1765                                                PFID(&fid), PNAME(lname));
1766                                         lnkp = mdt_object_find(info->mti_env,
1767                                                                mdt, &fid);
1768                                         if (IS_ERR(lnkp))
1769                                                 GOTO(out, rc = PTR_ERR(lnkp));
1770                                         break;
1771                                 }
1772                         }
1773
1774                         if (lnkp) {
1775                                 rc = mdt_revoke_remote_lookup_lock(info, lnkp,
1776                                                                    obj);
1777                                 mdt_object_put(info->mti_env, lnkp);
1778                                 continue;
1779                         }
1780                 }
1781
1782                 /* Check if it's already locked */
1783                 list_for_each_entry(msl, link_locks, msl_linkage) {
1784                         if (lu_fid_eq(mdt_object_fid(msl->msl_obj), &fid)) {
1785                                 CDEBUG(D_INFO,
1786                                        DFID" was locked, revoke "DNAME"\n",
1787                                        PFID(&fid), PNAME(lname));
1788                                 lnkp = msl->msl_obj;
1789                                 break;
1790                         }
1791                 }
1792
1793                 if (lnkp) {
1794                         rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
1795                         continue;
1796                 }
1797
1798                 CDEBUG(D_INFO, "lock "DFID":"DNAME"\n",
1799                        PFID(&fid), PNAME(lname));
1800
1801                 lnkp = mdt_object_find(info->mti_env, mdt, &fid);
1802                 if (IS_ERR(lnkp)) {
1803                         CWARN("%s: cannot find obj "DFID": %ld\n",
1804                               mdt_obd_name(mdt), PFID(&fid), PTR_ERR(lnkp));
1805                         continue;
1806                 }
1807
1808                 if (!mdt_object_exists(lnkp)) {
1809                         CDEBUG(D_INFO, DFID" doesn't exist, skip "DNAME"\n",
1810                               PFID(&fid), PNAME(lname));
1811                         mdt_object_put(info->mti_env, lnkp);
1812                         continue;
1813                 }
1814
1815                 if (!mdt_object_remote(lnkp))
1816                         local_lnkp_cnt++;
1817
1818                 OBD_ALLOC_PTR(msl);
1819                 if (msl == NULL)
1820                         GOTO(out, rc = -ENOMEM);
1821
1822                 /*
1823                  * we can't follow parent-child lock order like other MD
1824                  * operations, use lock_try here to avoid deadlock, if the lock
1825                  * cannot be taken, drop all locks taken, revoke the blocked
1826                  * one, and continue processing the remaining entries, and in
1827                  * the end of the loop restart from beginning.
1828                  */
1829                 mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
1830                 ibits = 0;
1831                 rc = mdt_object_lock_try(info, lnkp, &msl->msl_lh, &ibits,
1832                                          MDS_INODELOCK_UPDATE, true);
1833                 if (!(ibits & MDS_INODELOCK_UPDATE)) {
1834
1835                         CDEBUG(D_INFO, "busy lock on "DFID" "DNAME"\n",
1836                                PFID(&fid), PNAME(lname));
1837
1838                         mdt_unlock_list(info, link_locks, 1);
1839                         /* also unlock parent locks to avoid deadlock */
1840                         if (!blocked)
1841                                 mdt_migrate_object_unlock(info, pobj, lhp,
1842                                                           peinfo,
1843                                                           parent_slave_locks,
1844                                                           1);
1845
1846                         blocked = true;
1847
1848                         mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
1849                         rc = mdt_object_lock(info, lnkp, &msl->msl_lh,
1850                                              MDS_INODELOCK_UPDATE);
1851                         if (rc) {
1852                                 mdt_object_put(info->mti_env, lnkp);
1853                                 OBD_FREE_PTR(msl);
1854                                 GOTO(out, rc);
1855                         }
1856
1857                         if (mdt_object_remote(lnkp)) {
1858                                 struct ldlm_lock *lock;
1859
1860                                 /*
1861                                  * for remote object, set lock cb_atomic,
1862                                  * so lock can be released in blocking_ast()
1863                                  * immediately, then the next lock_try will
1864                                  * have better chance of success.
1865                                  */
1866                                 lock = ldlm_handle2lock(
1867                                                 &msl->msl_lh.mlh_rreg_lh);
1868                                 LASSERT(lock != NULL);
1869                                 lock_res_and_lock(lock);
1870                                 ldlm_set_atomic_cb(lock);
1871                                 unlock_res_and_lock(lock);
1872                                 LDLM_LOCK_PUT(lock);
1873                         }
1874
1875                         mdt_object_unlock_put(info, lnkp, &msl->msl_lh, 1);
1876                         OBD_FREE_PTR(msl);
1877                         continue;
1878                 }
1879
1880                 INIT_LIST_HEAD(&msl->msl_linkage);
1881                 msl->msl_obj = lnkp;
1882                 list_add_tail(&msl->msl_linkage, link_locks);
1883
1884                 rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
1885         }
1886
1887         if (blocked)
1888                 GOTO(out, rc = -EBUSY);
1889
1890         EXIT;
1891 out:
1892         if (rc) {
1893                 mdt_unlock_list(info, link_locks, rc);
1894         } else if (local_lnkp_cnt > RS_MAX_LOCKS - 5) {
1895                 CDEBUG(D_INFO, "Too many links (%d), sync operations\n",
1896                        local_lnkp_cnt);
1897                 /*
1898                  * parent may have 3 local objects: master object and 2 stripes
1899                  * (if it's being migrated too); source may have 1 local objects
1900                  * as regular file; target has 1 local object.
1901                  * Note, source may have 2 local locks if it is directory but it
1902                  * can't have hardlinks, so it is not considered here.
1903                  */
1904                 rc = 1;
1905         }
1906         return rc;
1907 }
1908
1909 static int mdt_lock_remote_slaves(struct mdt_thread_info *info,
1910                                   struct mdt_object *obj,
1911                                   const struct md_attr *ma,
1912                                   struct list_head *slave_locks)
1913 {
1914         struct mdt_device *mdt = info->mti_mdt;
1915         const struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1916         struct lu_fid *fid = &info->mti_tmp_fid1;
1917         struct mdt_object *slave;
1918         struct mdt_sub_lock *msl;
1919         int i;
1920         int rc;
1921
1922         ENTRY;
1923         LASSERT(mdt_object_remote(obj));
1924         LASSERT(ma->ma_valid & MA_LMV);
1925         LASSERT(lmv);
1926
1927         if (!lmv_is_sane(lmv))
1928                 RETURN(-EINVAL);
1929
1930         for (i = 0; i < le32_to_cpu(lmv->lmv_stripe_count); i++) {
1931                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[i]);
1932
1933                 if (!fid_is_sane(fid))
1934                         continue;
1935
1936                 slave = mdt_object_find(info->mti_env, mdt, fid);
1937                 if (IS_ERR(slave))
1938                         GOTO(out, rc = PTR_ERR(slave));
1939
1940                 OBD_ALLOC_PTR(msl);
1941                 if (!msl) {
1942                         mdt_object_put(info->mti_env, slave);
1943                         GOTO(out, rc = -ENOMEM);
1944                 }
1945
1946                 mdt_lock_reg_init(&msl->msl_lh, LCK_EX);
1947                 rc = mdt_reint_object_lock(info, slave, &msl->msl_lh,
1948                                            MDS_INODELOCK_UPDATE, true);
1949                 if (rc) {
1950                         OBD_FREE_PTR(msl);
1951                         mdt_object_put(info->mti_env, slave);
1952                         GOTO(out, rc);
1953                 }
1954
1955                 INIT_LIST_HEAD(&msl->msl_linkage);
1956                 msl->msl_obj = slave;
1957                 list_add_tail(&msl->msl_linkage, slave_locks);
1958         }
1959         EXIT;
1960
1961 out:
1962         if (rc)
1963                 mdt_unlock_list(info, slave_locks, rc);
1964         return rc;
1965 }
1966
1967 /* lock parent and its stripes */
1968 static int mdt_migrate_parent_lock(struct mdt_thread_info *info,
1969                                    struct mdt_object *obj,
1970                                    const struct md_attr *ma,
1971                                    struct mdt_lock_handle *lh,
1972                                    struct ldlm_enqueue_info *einfo,
1973                                    struct list_head *slave_locks)
1974 {
1975         int rc;
1976
1977         if (mdt_object_remote(obj)) {
1978                 rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
1979                                             &lh->mlh_rreg_lh, LCK_PW,
1980                                             MDS_INODELOCK_UPDATE, false);
1981                 if (rc != ELDLM_OK)
1982                         return rc;
1983
1984                 /*
1985                  * if obj is remote and striped, lock its stripes explicitly
1986                  * because it's not striped in LOD layer on this MDT.
1987                  */
1988                 if (ma->ma_valid & MA_LMV) {
1989                         rc = mdt_lock_remote_slaves(info, obj, ma, slave_locks);
1990                         if (rc)
1991                                 mdt_object_unlock(info, obj, lh, rc);
1992                 }
1993         } else {
1994                 rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_UPDATE,
1995                                             einfo, true);
1996         }
1997
1998         return rc;
1999 }
2000
2001 /*
2002  * in migration, object may be remote, and we need take full lock of it and its
2003  * stripes if it's directory, besides, object may be a remote object on its
2004  * parent, revoke its LOOKUP lock on where its parent is located.
2005  */
2006 static int mdt_migrate_object_lock(struct mdt_thread_info *info,
2007                                    struct mdt_object *pobj,
2008                                    struct mdt_object *obj,
2009                                    struct mdt_lock_handle *lh,
2010                                    struct ldlm_enqueue_info *einfo,
2011                                    struct list_head *slave_locks)
2012 {
2013         int rc;
2014
2015         if (mdt_object_remote(obj)) {
2016                 rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
2017                 if (rc)
2018                         return rc;
2019
2020                 rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
2021                                             &lh->mlh_rreg_lh, LCK_EX,
2022                                             MDS_INODELOCK_FULL, false);
2023                 if (rc != ELDLM_OK)
2024                         return rc;
2025
2026                 /*
2027                  * if obj is remote and striped, lock its stripes explicitly
2028                  * because it's not striped in LOD layer on this MDT.
2029                  */
2030                 if (S_ISDIR(lu_object_attr(&obj->mot_obj))) {
2031                         struct md_attr *ma = &info->mti_attr;
2032
2033                         rc = mdt_stripe_get(info, obj, ma, XATTR_NAME_LMV);
2034                         if (rc) {
2035                                 mdt_object_unlock(info, obj, lh, rc);
2036                                 return rc;
2037                         }
2038
2039                         if (ma->ma_valid & MA_LMV) {
2040                                 rc = mdt_lock_remote_slaves(info, obj, ma,
2041                                                             slave_locks);
2042                                 if (rc)
2043                                         mdt_object_unlock(info, obj, lh, rc);
2044                         }
2045                 }
2046         } else {
2047                 if (mdt_object_remote(pobj)) {
2048                         rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
2049                         if (rc)
2050                                 return rc;
2051                 }
2052
2053                 rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_FULL,
2054                                             einfo, true);
2055         }
2056
2057         return rc;
2058 }
2059
2060 /*
2061  * lookup source by name, if parent is striped directory, we need to find the
2062  * corresponding stripe where source is located, and then lookup there.
2063  *
2064  * besides, if parent is migrating too, and file is already in target stripe,
2065  * this should be a redo of 'lfs migrate' on client side.
2066  */
2067 static int mdt_migrate_lookup(struct mdt_thread_info *info,
2068                               struct mdt_object *pobj,
2069                               const struct md_attr *ma,
2070                               const struct lu_name *lname,
2071                               struct mdt_object **spobj,
2072                               struct mdt_object **sobj)
2073 {
2074         const struct lu_env *env = info->mti_env;
2075         struct lu_fid *fid = &info->mti_tmp_fid1;
2076         struct mdt_object *stripe;
2077         int rc;
2078
2079         if (ma->ma_valid & MA_LMV) {
2080                 /* if parent is striped, lookup on corresponding stripe */
2081                 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
2082
2083                 if (!lmv_is_sane(lmv))
2084                         return -EBADF;
2085
2086                 rc = lmv_name_to_stripe_index_old(lmv, lname->ln_name,
2087                                                   lname->ln_namelen);
2088                 if (rc < 0)
2089                         return rc;
2090
2091                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
2092
2093                 stripe = mdt_object_find(env, info->mti_mdt, fid);
2094                 if (IS_ERR(stripe))
2095                         return PTR_ERR(stripe);
2096
2097                 fid_zero(fid);
2098                 rc = mdo_lookup(env, mdt_object_child(stripe), lname, fid,
2099                                 &info->mti_spec);
2100                 if (rc == -ENOENT && lmv_is_layout_changing(lmv)) {
2101                         /*
2102                          * if parent layout is changeing, and lookup child
2103                          * failed on source stripe, lookup again on target
2104                          * stripe, if it exists, it means previous migration
2105                          * was interrupted, and current file was migrated
2106                          * already.
2107                          */
2108                         mdt_object_put(env, stripe);
2109
2110                         rc = lmv_name_to_stripe_index(lmv, lname->ln_name,
2111                                                       lname->ln_namelen);
2112                         if (rc < 0)
2113                                 return rc;
2114
2115                         fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
2116
2117                         stripe = mdt_object_find(env, info->mti_mdt, fid);
2118                         if (IS_ERR(stripe))
2119                                 return PTR_ERR(stripe);
2120
2121                         fid_zero(fid);
2122                         rc = mdo_lookup(env, mdt_object_child(stripe), lname,
2123                                         fid, &info->mti_spec);
2124                         mdt_object_put(env, stripe);
2125                         return rc ?: -EALREADY;
2126                 } else if (rc) {
2127                         mdt_object_put(env, stripe);
2128                         return rc;
2129                 }
2130         } else {
2131                 fid_zero(fid);
2132                 rc = mdo_lookup(env, mdt_object_child(pobj), lname, fid,
2133                                 &info->mti_spec);
2134                 if (rc)
2135                         return rc;
2136
2137                 stripe = pobj;
2138                 mdt_object_get(env, stripe);
2139         }
2140
2141         *spobj = stripe;
2142
2143         *sobj = mdt_object_find(env, info->mti_mdt, fid);
2144         if (IS_ERR(*sobj)) {
2145                 mdt_object_put(env, stripe);
2146                 rc = PTR_ERR(*sobj);
2147                 *spobj = NULL;
2148                 *sobj = NULL;
2149         }
2150
2151         return rc;
2152 }
2153
2154 /* end lease and close file for regular file */
2155 static int mdd_migrate_close(struct mdt_thread_info *info,
2156                              struct mdt_object *obj)
2157 {
2158         struct close_data *data;
2159         struct mdt_body *repbody;
2160         struct ldlm_lock *lease;
2161         int rc;
2162         int rc2;
2163
2164         rc = -EPROTO;
2165         if (!req_capsule_field_present(info->mti_pill, &RMF_MDT_EPOCH,
2166                                       RCL_CLIENT) ||
2167             !req_capsule_field_present(info->mti_pill, &RMF_CLOSE_DATA,
2168                                       RCL_CLIENT))
2169                 goto close;
2170
2171         data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
2172         if (!data)
2173                 goto close;
2174
2175         rc = -ESTALE;
2176         lease = ldlm_handle2lock(&data->cd_handle);
2177         if (!lease)
2178                 goto close;
2179
2180         /* check if the lease was already canceled */
2181         lock_res_and_lock(lease);
2182         rc = ldlm_is_cancel(lease);
2183         unlock_res_and_lock(lease);
2184
2185         if (rc) {
2186                 rc = -EAGAIN;
2187                 LDLM_DEBUG(lease, DFID" lease broken",
2188                            PFID(mdt_object_fid(obj)));
2189         }
2190
2191         /*
2192          * cancel server side lease, client side counterpart should have been
2193          * cancelled, it's okay to cancel it now as we've held mot_open_sem.
2194          */
2195         ldlm_lock_cancel(lease);
2196         ldlm_reprocess_all(lease->l_resource,
2197                            lease->l_policy_data.l_inodebits.bits);
2198         LDLM_LOCK_PUT(lease);
2199
2200 close:
2201         rc2 = mdt_close_internal(info, mdt_info_req(info), NULL);
2202         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2203         repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
2204
2205         return rc ?: rc2;
2206 }
2207
2208 /*
2209  * migrate file in below steps:
2210  *  1. lock parent and its stripes
2211  *  2. lookup source by name
2212  *  3. lock parents of source links if source is not directory
2213  *  4. reject if source is in HSM
2214  *  5. take source open_sem and close file if source is regular file
2215  *  6. lock source and its stripes if it's directory
2216  *  7. lock target so subsequent change to it can trigger COS
2217  *  8. migrate file
2218  *  9. unlock above locks
2219  * 10. sync device if source has links
2220  */
2221 int mdt_reint_migrate(struct mdt_thread_info *info,
2222                       struct mdt_lock_handle *unused)
2223 {
2224         const struct lu_env *env = info->mti_env;
2225         struct mdt_device *mdt = info->mti_mdt;
2226         struct ptlrpc_request *req = mdt_info_req(info);
2227         struct mdt_reint_record *rr = &info->mti_rr;
2228         struct lu_ucred *uc = mdt_ucred(info);
2229         struct md_attr *ma = &info->mti_attr;
2230         struct ldlm_enqueue_info *peinfo = &info->mti_einfo[0];
2231         struct ldlm_enqueue_info *seinfo = &info->mti_einfo[1];
2232         struct mdt_object *pobj;
2233         struct mdt_object *spobj = NULL;
2234         struct mdt_object *sobj = NULL;
2235         struct mdt_object *tobj;
2236         struct lustre_handle rename_lh = { 0 };
2237         struct mdt_lock_handle *lhp;
2238         struct mdt_lock_handle *lhs;
2239         struct mdt_lock_handle *lht;
2240         LIST_HEAD(parent_slave_locks);
2241         LIST_HEAD(child_slave_locks);
2242         LIST_HEAD(link_locks);
2243         int lock_retries = 5;
2244         bool open_sem_locked = false;
2245         bool do_sync = false;
2246         int rc;
2247
2248         ENTRY;
2249         CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1),
2250                PNAME(&rr->rr_name), PFID(rr->rr_fid2));
2251
2252         if (info->mti_dlm_req)
2253                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2254
2255         if (!fid_is_md_operative(rr->rr_fid1) ||
2256             !fid_is_md_operative(rr->rr_fid2))
2257                 RETURN(-EPERM);
2258
2259         /* don't allow migrate . or .. */
2260         if (lu_name_is_dot_or_dotdot(&rr->rr_name))
2261                 RETURN(-EBUSY);
2262
2263         if (!mdt->mdt_enable_remote_dir || !mdt->mdt_enable_dir_migration)
2264                 RETURN(-EPERM);
2265
2266         if (uc && !cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
2267             uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
2268             mdt->mdt_enable_remote_dir_gid != -1)
2269                 RETURN(-EPERM);
2270
2271         /*
2272          * Note: do not enqueue rename lock for replay request, because
2273          * if other MDT holds rename lock, but being blocked to wait for
2274          * this MDT to finish its recovery, and the failover MDT can not
2275          * get rename lock, which will cause deadlock.
2276          *
2277          * req is NULL if this is called by directory auto-split.
2278          */
2279         if (req && !req_is_replay(req)) {
2280                 rc = mdt_rename_lock(info, &rename_lh);
2281                 if (rc != 0) {
2282                         CERROR("%s: can't lock FS for rename: rc = %d\n",
2283                                mdt_obd_name(info->mti_mdt), rc);
2284                         RETURN(rc);
2285                 }
2286         }
2287
2288         /* pobj is master object of parent */
2289         pobj = mdt_object_find(env, mdt, rr->rr_fid1);
2290         if (IS_ERR(pobj))
2291                 GOTO(unlock_rename, rc = PTR_ERR(pobj));
2292
2293         if (req) {
2294                 rc = mdt_version_get_check(info, pobj, 0);
2295                 if (rc)
2296                         GOTO(put_parent, rc);
2297         }
2298
2299         if (!mdt_object_exists(pobj))
2300                 GOTO(put_parent, rc = -ENOENT);
2301
2302         if (!S_ISDIR(lu_object_attr(&pobj->mot_obj)))
2303                 GOTO(put_parent, rc = -ENOTDIR);
2304
2305         rc = mdt_check_enc(info, pobj);
2306         if (rc)
2307                 GOTO(put_parent, rc);
2308
2309         rc = mdt_stripe_get(info, pobj, ma, XATTR_NAME_LMV);
2310         if (rc)
2311                 GOTO(put_parent, rc);
2312
2313 lock_parent:
2314         /* lock parent object */
2315         lhp = &info->mti_lh[MDT_LH_PARENT];
2316         mdt_lock_reg_init(lhp, LCK_PW);
2317         rc = mdt_migrate_parent_lock(info, pobj, ma, lhp, peinfo,
2318                                      &parent_slave_locks);
2319         if (rc)
2320                 GOTO(put_parent, rc);
2321
2322         /*
2323          * spobj is the corresponding stripe against name if pobj is striped
2324          * directory, which is the real parent, and no need to lock, because
2325          * we've taken full lock of pobj.
2326          */
2327         rc = mdt_migrate_lookup(info, pobj, ma, &rr->rr_name, &spobj, &sobj);
2328         if (rc)
2329                 GOTO(unlock_parent, rc);
2330
2331         /* lock parents of source links, and revoke LOOKUP lock of links */
2332         rc = mdt_link_parents_lock(info, pobj, ma, sobj, lhp, peinfo,
2333                                    &parent_slave_locks, &link_locks);
2334         if (rc == -EBUSY && lock_retries-- > 0) {
2335                 mdt_object_put(env, sobj);
2336                 mdt_object_put(env, spobj);
2337                 goto lock_parent;
2338         }
2339
2340         if (rc < 0)
2341                 GOTO(put_source, rc);
2342
2343         /*
2344          * RS_MAX_LOCKS is the limit of number of locks that can be saved along
2345          * with one request, if total lock count exceeds this limit, we will
2346          * drop all locks after migration, and synchronous device in the end.
2347          */
2348         do_sync = rc;
2349
2350         /* TODO: DoM migration is not supported, migrate dirent only */
2351         if (S_ISREG(lu_object_attr(&sobj->mot_obj))) {
2352                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LOV);
2353                 if (rc)
2354                         GOTO(unlock_links, rc);
2355
2356                 if (ma->ma_valid & MA_LOV && mdt_lmm_dom_stripesize(ma->ma_lmm))
2357                         info->mti_spec.sp_migrate_nsonly = 1;
2358         } else if (S_ISDIR(lu_object_attr(&sobj->mot_obj))) {
2359                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2360                 if (rc)
2361                         GOTO(unlock_links, rc);
2362
2363                 /* race with restripe/auto-split? */
2364                 if ((ma->ma_valid & MA_LMV) &&
2365                     lmv_is_restriping(&ma->ma_lmv->lmv_md_v1))
2366                         GOTO(unlock_links, rc = -EBUSY);
2367         }
2368
2369         /* if migration HSM is allowed */
2370         if (!mdt->mdt_opts.mo_migrate_hsm_allowed) {
2371                 ma->ma_need = MA_HSM;
2372                 ma->ma_valid = 0;
2373                 rc = mdt_attr_get_complex(info, sobj, ma);
2374                 if (rc)
2375                         GOTO(unlock_links, rc);
2376
2377                 if ((ma->ma_valid & MA_HSM) && ma->ma_hsm.mh_flags != 0)
2378                         GOTO(unlock_links, rc = -EOPNOTSUPP);
2379         }
2380
2381         /* end lease and close file for regular file */
2382         if (info->mti_spec.sp_migrate_close) {
2383                 /* try to hold open_sem so that nobody else can open the file */
2384                 if (!down_write_trylock(&sobj->mot_open_sem)) {
2385                         /* close anyway */
2386                         mdd_migrate_close(info, sobj);
2387                         GOTO(unlock_links, rc = -EBUSY);
2388                 } else {
2389                         open_sem_locked = true;
2390                         rc = mdd_migrate_close(info, sobj);
2391                         if (rc)
2392                                 GOTO(unlock_open_sem, rc);
2393                 }
2394         }
2395
2396         /* lock source */
2397         lhs = &info->mti_lh[MDT_LH_OLD];
2398         mdt_lock_reg_init(lhs, LCK_EX);
2399         rc = mdt_migrate_object_lock(info, spobj, sobj, lhs, seinfo,
2400                                      &child_slave_locks);
2401         if (rc)
2402                 GOTO(unlock_open_sem, rc);
2403
2404         /* lock target */
2405         tobj = mdt_object_find(env, mdt, rr->rr_fid2);
2406         if (IS_ERR(tobj))
2407                 GOTO(unlock_source, rc = PTR_ERR(tobj));
2408
2409         lht = &info->mti_lh[MDT_LH_NEW];
2410         mdt_lock_reg_init(lht, LCK_EX);
2411         rc = mdt_reint_object_lock(info, tobj, lht, MDS_INODELOCK_FULL, true);
2412         if (rc)
2413                 GOTO(put_target, rc);
2414
2415         /* Don't do lookup sanity check. We know name doesn't exist. */
2416         info->mti_spec.sp_cr_lookup = 0;
2417         info->mti_spec.sp_feat = &dt_directory_features;
2418
2419         rc = mdo_migrate(env, mdt_object_child(pobj),
2420                          mdt_object_child(sobj), &rr->rr_name,
2421                          mdt_object_child(tobj),
2422                          &info->mti_spec, ma);
2423         if (!rc)
2424                 lprocfs_counter_incr(mdt->mdt_lu_dev.ld_obd->obd_md_stats,
2425                                      LPROC_MDT_MIGRATE + LPROC_MD_LAST_OPC);
2426         EXIT;
2427
2428         mdt_object_unlock(info, tobj, lht, rc);
2429 put_target:
2430         mdt_object_put(env, tobj);
2431 unlock_source:
2432         mdt_migrate_object_unlock(info, sobj, lhs, seinfo,
2433                                   &child_slave_locks, rc);
2434 unlock_open_sem:
2435         if (open_sem_locked)
2436                 up_write(&sobj->mot_open_sem);
2437 unlock_links:
2438         /* if we've got too many locks to save into RPC,
2439          * then just commit before the locks are released
2440          */
2441         if (!rc && do_sync)
2442                 mdt_device_sync(env, mdt);
2443         mdt_unlock_list(info, &link_locks, do_sync ? 1 : rc);
2444 put_source:
2445         mdt_object_put(env, sobj);
2446         mdt_object_put(env, spobj);
2447 unlock_parent:
2448         mdt_migrate_object_unlock(info, pobj, lhp, peinfo,
2449                                   &parent_slave_locks, rc);
2450 put_parent:
2451         mdt_object_put(env, pobj);
2452 unlock_rename:
2453         if (lustre_handle_is_used(&rename_lh))
2454                 mdt_rename_unlock(&rename_lh);
2455
2456         return rc;
2457 }
2458
2459 static int mdt_object_lock_save(struct mdt_thread_info *info,
2460                                 struct mdt_object *dir,
2461                                 struct mdt_lock_handle *lh,
2462                                 int idx, bool cos_incompat)
2463 {
2464         int rc;
2465
2466         /* we lock the target dir if it is local */
2467         rc = mdt_reint_object_lock(info, dir, lh, MDS_INODELOCK_UPDATE,
2468                                    cos_incompat);
2469         if (rc != 0)
2470                 return rc;
2471
2472         /* get and save correct version after locking */
2473         mdt_version_get_save(info, dir, idx);
2474         return 0;
2475 }
2476
2477 /*
2478  * determine lock order of sobj and tobj
2479  *
2480  * there are two situations we need to lock tobj before sobj:
2481  * 1. sobj is child of tobj
2482  * 2. sobj and tobj are stripes of a directory, and stripe index of sobj is
2483  *    larger than that of tobj
2484  *
2485  * \retval      1 lock tobj before sobj
2486  * \retval      0 lock sobj before tobj
2487  * \retval      -ev negative errno upon error
2488  */
2489 static int mdt_rename_determine_lock_order(struct mdt_thread_info *info,
2490                                            struct mdt_object *sobj,
2491                                            struct mdt_object *tobj)
2492 {
2493         struct md_attr *ma = &info->mti_attr;
2494         struct lu_fid *spfid = &info->mti_tmp_fid1;
2495         struct lu_fid *tpfid = &info->mti_tmp_fid2;
2496         struct lmv_mds_md_v1 *lmv;
2497         __u32 sindex;
2498         __u32 tindex;
2499         int rc;
2500
2501         /* sobj and tobj are the same */
2502         if (sobj == tobj)
2503                 return 0;
2504
2505         if (fid_is_root(mdt_object_fid(sobj)))
2506                 return 0;
2507
2508         if (fid_is_root(mdt_object_fid(tobj)))
2509                 return 1;
2510
2511         /* check whether sobj is child of tobj */
2512         rc = mdo_is_subdir(info->mti_env, mdt_object_child(sobj),
2513                            mdt_object_fid(tobj));
2514         if (rc < 0)
2515                 return rc;
2516
2517         if (rc == 1)
2518                 return 1;
2519
2520         /* check whether sobj and tobj are children of the same parent */
2521         rc = mdt_attr_get_pfid(info, sobj, spfid);
2522         if (rc)
2523                 return rc;
2524
2525         rc = mdt_attr_get_pfid(info, tobj, tpfid);
2526         if (rc)
2527                 return rc;
2528
2529         if (!lu_fid_eq(spfid, tpfid))
2530                 return 0;
2531
2532         /* check whether sobj and tobj are sibling stripes */
2533         rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2534         if (rc)
2535                 return rc;
2536
2537         if (!(ma->ma_valid & MA_LMV))
2538                 return 0;
2539
2540         lmv = &ma->ma_lmv->lmv_md_v1;
2541         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2542                 return 0;
2543         sindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2544
2545         ma->ma_valid = 0;
2546         rc = mdt_stripe_get(info, tobj, ma, XATTR_NAME_LMV);
2547         if (rc)
2548                 return rc;
2549
2550         if (!(ma->ma_valid & MA_LMV))
2551                 return -ENODATA;
2552
2553         lmv = &ma->ma_lmv->lmv_md_v1;
2554         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2555                 return -EINVAL;
2556         tindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2557
2558         /* check stripe index of sobj and tobj */
2559         if (sindex == tindex)
2560                 return -EINVAL;
2561
2562         return sindex < tindex ? 0 : 1;
2563 }
2564
2565 /*
2566  * lock rename source object.
2567  *
2568  * Both source and source parent may be remote, and source may be a remote
2569  * object on source parent, to avoid overriding lock handle, store remote
2570  * LOOKUP lock separately in @lhr.
2571  *
2572  * \retval      0 on success
2573  * \retval      -ev negative errno upon error
2574  */
2575 static int mdt_rename_source_lock(struct mdt_thread_info *info,
2576                                   struct mdt_object *parent,
2577                                   struct mdt_object *child,
2578                                   struct mdt_lock_handle *lhc,
2579                                   struct mdt_lock_handle *lhr,
2580                                   __u64 ibits,
2581                                   bool cos_incompat)
2582 {
2583         int rc;
2584
2585         rc = mdt_is_remote_object(info, parent, child);
2586         if (rc < 0)
2587                 return rc;
2588
2589         if (rc) {
2590                 /* enqueue remote LOOKUP lock from the parent MDT */
2591                 __u64 rmt_ibits = MDS_INODELOCK_LOOKUP;
2592
2593                 if (mdt_object_remote(parent)) {
2594                         rc = mdt_remote_object_lock(info, parent,
2595                                                     mdt_object_fid(child),
2596                                                     &lhr->mlh_rreg_lh,
2597                                                     lhr->mlh_rreg_mode,
2598                                                     rmt_ibits, false);
2599                         if (rc != ELDLM_OK)
2600                                 return rc;
2601                 } else {
2602                         LASSERT(mdt_object_remote(child));
2603                         rc = mdt_object_local_lock(info, child, lhr,
2604                                                    &rmt_ibits, 0, true);
2605                         if (rc < 0)
2606                                 return rc;
2607                 }
2608
2609                 ibits &= ~MDS_INODELOCK_LOOKUP;
2610         }
2611
2612         if (mdt_object_remote(child)) {
2613                 rc = mdt_remote_object_lock(info, child, mdt_object_fid(child),
2614                                             &lhc->mlh_rreg_lh,
2615                                             lhc->mlh_rreg_mode,
2616                                             ibits, false);
2617                 if (rc == ELDLM_OK)
2618                         rc = 0;
2619         } else {
2620                 rc = mdt_reint_object_lock(info, child, lhc, ibits,
2621                                            cos_incompat);
2622         }
2623
2624         if (!rc)
2625                 mdt_object_unlock(info, child, lhr, rc);
2626
2627         return rc;
2628 }
2629
2630 /* Helper function for mdt_reint_rename so we don't need to opencode
2631  * two different order lockings
2632  */
2633 static int mdt_lock_two_dirs(struct mdt_thread_info *info,
2634                              struct mdt_object *mfirstdir,
2635                              struct mdt_lock_handle *lh_firstdirp,
2636                              struct mdt_object *mseconddir,
2637                              struct mdt_lock_handle *lh_seconddirp,
2638                              bool cos_incompat)
2639 {
2640         int rc;
2641
2642         rc = mdt_object_lock_save(info, mfirstdir, lh_firstdirp, 0,
2643                                   cos_incompat);
2644         if (rc)
2645                 return rc;
2646
2647         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
2648
2649         if (mfirstdir != mseconddir) {
2650                 rc = mdt_object_lock_save(info, mseconddir, lh_seconddirp, 1,
2651                                           cos_incompat);
2652         } else if (!mdt_object_remote(mseconddir) &&
2653                    lh_firstdirp->mlh_pdo_hash !=
2654                    lh_seconddirp->mlh_pdo_hash) {
2655                 rc = mdt_pdir_hash_lock(info, lh_seconddirp, mseconddir,
2656                                         MDS_INODELOCK_UPDATE,
2657                                         cos_incompat);
2658                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
2659         }
2660
2661         if (rc != 0)
2662                 mdt_object_unlock(info, mfirstdir, lh_firstdirp, rc);
2663
2664         return rc;
2665 }
2666
2667 /*
2668  * VBR: rename versions in reply: 0 - srcdir parent; 1 - tgtdir parent;
2669  * 2 - srcdir child; 3 - tgtdir child.
2670  * Update on disk version of srcdir child.
2671  */
2672 static int mdt_reint_rename(struct mdt_thread_info *info,
2673                             struct mdt_lock_handle *unused)
2674 {
2675         struct mdt_device *mdt = info->mti_mdt;
2676         struct mdt_reint_record *rr = &info->mti_rr;
2677         struct md_attr *ma = &info->mti_attr;
2678         struct ptlrpc_request *req = mdt_info_req(info);
2679         struct mdt_object *msrcdir = NULL;
2680         struct mdt_object *mtgtdir = NULL;
2681         struct mdt_object *mold;
2682         struct mdt_object *mnew = NULL;
2683         struct lustre_handle rename_lh = { 0 };
2684         struct mdt_lock_handle *lh_srcdirp;
2685         struct mdt_lock_handle *lh_tgtdirp;
2686         struct mdt_lock_handle *lh_oldp = NULL;
2687         struct mdt_lock_handle *lh_rmt = NULL;
2688         struct mdt_lock_handle *lh_newp = NULL;
2689         struct lu_fid *old_fid = &info->mti_tmp_fid1;
2690         struct lu_fid *new_fid = &info->mti_tmp_fid2;
2691         __u64 lock_ibits;
2692         bool reverse = false, discard = false;
2693         bool cos_incompat;
2694         ktime_t kstart = ktime_get();
2695         int rc;
2696
2697         ENTRY;
2698         DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME,
2699                   PFID(rr->rr_fid1), PNAME(&rr->rr_name),
2700                   PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name));
2701
2702         if (info->mti_dlm_req)
2703                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2704
2705         if (!fid_is_md_operative(rr->rr_fid1) ||
2706             !fid_is_md_operative(rr->rr_fid2))
2707                 RETURN(-EPERM);
2708
2709         /* find both parents. */
2710         msrcdir = mdt_parent_find_check(info, rr->rr_fid1, 0);
2711         if (IS_ERR(msrcdir))
2712                 RETURN(PTR_ERR(msrcdir));
2713
2714         rc = mdt_check_enc(info, msrcdir);
2715         if (rc)
2716                 GOTO(out_put_srcdir, rc);
2717
2718         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
2719
2720         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
2721                 mtgtdir = msrcdir;
2722                 mdt_object_get(info->mti_env, mtgtdir);
2723         } else {
2724                 mtgtdir = mdt_parent_find_check(info, rr->rr_fid2, 1);
2725                 if (IS_ERR(mtgtdir))
2726                         GOTO(out_put_srcdir, rc = PTR_ERR(mtgtdir));
2727         }
2728
2729         rc = mdt_check_enc(info, mtgtdir);
2730         if (rc)
2731                 GOTO(out_put_tgtdir, rc);
2732
2733         /*
2734          * Note: do not enqueue rename lock for replay request, because
2735          * if other MDT holds rename lock, but being blocked to wait for
2736          * this MDT to finish its recovery, and the failover MDT can not
2737          * get rename lock, which will cause deadlock.
2738          */
2739         if (!req_is_replay(req)) {
2740                 /*
2741                  * Normally rename RPC is handled on the MDT with the target
2742                  * directory (if target exists, it's on the MDT with the
2743                  * target), if the source directory is remote, it's a hint that
2744                  * source is remote too (this may not be true, but it won't
2745                  * cause any issue), return -EXDEV early to avoid taking
2746                  * rename_lock.
2747                  */
2748                 if (!mdt->mdt_enable_remote_rename &&
2749                     mdt_object_remote(msrcdir))
2750                         GOTO(out_put_tgtdir, rc = -EXDEV);
2751
2752                 /* This might be further relaxed in the future for regular file
2753                  * renames in different source and target parents. Start with
2754                  * only same-directory renames for simplicity and because this
2755                  * is by far the most the common use case.
2756                  */
2757                 if (msrcdir != mtgtdir) {
2758                         rc = mdt_rename_lock(info, &rename_lh);
2759                         if (rc != 0) {
2760                                 CERROR("%s: cannot lock for rename: rc = %d\n",
2761                                        mdt_obd_name(mdt), rc);
2762                                 GOTO(out_put_tgtdir, rc);
2763                         }
2764                 } else {
2765                         CDEBUG(D_INFO, "%s: samedir rename "DFID"/"DNAME"\n",
2766                                mdt_obd_name(mdt), PFID(rr->rr_fid1),
2767                                PNAME(&rr->rr_name));
2768                 }
2769         }
2770
2771         rc = mdt_rename_determine_lock_order(info, msrcdir, mtgtdir);
2772         if (rc < 0)
2773                 GOTO(out_unlock_rename, rc);
2774         reverse = rc;
2775
2776         /* source needs to be looked up after locking source parent, otherwise
2777          * this rename may race with unlink source, and cause rename hang, see
2778          * sanityn.sh 55b, so check parents first, if later we found source is
2779          * remote, relock parents.
2780          */
2781         cos_incompat = (mdt_object_remote(msrcdir) ||
2782                         mdt_object_remote(mtgtdir));
2783
2784         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2785
2786         /* lock parents in the proper order. */
2787         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
2788         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
2789
2790         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN);
2791         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN2);
2792 relock:
2793         mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name);
2794         mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name);
2795
2796         /* In case of same dir local rename we must sort by the hash,
2797          * otherwise a lock deadlock is possible when renaming
2798          * a to b and b to a at the same time LU-15285
2799          */
2800         if (!mdt_object_remote(mtgtdir) && mtgtdir == msrcdir)
2801                 reverse = lh_srcdirp->mlh_pdo_hash > lh_tgtdirp->mlh_pdo_hash;
2802         if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK)))
2803                 reverse = 0;
2804
2805         if (reverse)
2806                 rc = mdt_lock_two_dirs(info, mtgtdir, lh_tgtdirp, msrcdir,
2807                                        lh_srcdirp, cos_incompat);
2808         else
2809                 rc = mdt_lock_two_dirs(info, msrcdir, lh_srcdirp, mtgtdir,
2810                                        lh_tgtdirp, cos_incompat);
2811
2812         if (rc != 0)
2813                 GOTO(out_unlock_rename, rc);
2814
2815         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2816         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME2, 5);
2817
2818         /* find mold object. */
2819         fid_zero(old_fid);
2820         rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2);
2821         if (rc != 0)
2822                 GOTO(out_unlock_parents, rc);
2823
2824         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
2825                 GOTO(out_unlock_parents, rc = -EINVAL);
2826
2827         if (!fid_is_md_operative(old_fid))
2828                 GOTO(out_unlock_parents, rc = -EPERM);
2829
2830         mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
2831         if (IS_ERR(mold))
2832                 GOTO(out_unlock_parents, rc = PTR_ERR(mold));
2833
2834         if (!mdt_object_exists(mold)) {
2835                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2836                                 &mold->mot_obj,
2837                                 "object does not exist");
2838                 GOTO(out_put_old, rc = -ENOENT);
2839         }
2840
2841         if (mdt_object_remote(mold) && !mdt->mdt_enable_remote_rename)
2842                 GOTO(out_put_old, rc = -EXDEV);
2843
2844         /* Check if @mtgtdir is subdir of @mold, before locking child
2845          * to avoid reverse locking.
2846          */
2847         if (mtgtdir != msrcdir) {
2848                 rc = mdo_is_subdir(info->mti_env, mdt_object_child(mtgtdir),
2849                                    old_fid);
2850                 if (rc) {
2851                         if (rc == 1)
2852                                 rc = -EINVAL;
2853                         GOTO(out_put_old, rc);
2854                 }
2855         }
2856
2857         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold));
2858         /* save version after locking */
2859         mdt_version_get_save(info, mold, 2);
2860
2861         if (!cos_incompat && mdt_object_remote(mold)) {
2862                 cos_incompat = true;
2863                 mdt_object_put(info->mti_env, mold);
2864                 mdt_object_unlock(info, mtgtdir, lh_tgtdirp, -EAGAIN);
2865                 mdt_object_unlock(info, msrcdir, lh_srcdirp, -EAGAIN);
2866                 goto relock;
2867         }
2868
2869         /* find mnew object:
2870          * mnew target object may not exist now
2871          * lookup with version checking
2872          */
2873         fid_zero(new_fid);
2874         rc = mdt_lookup_version_check(info, mtgtdir, &rr->rr_tgt_name, new_fid,
2875                                       3);
2876         if (rc == 0) {
2877                 /* the new_fid should have been filled at this moment */
2878                 if (lu_fid_eq(old_fid, new_fid))
2879                         GOTO(out_put_old, rc);
2880
2881                 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
2882                     lu_fid_eq(new_fid, rr->rr_fid2))
2883                         GOTO(out_put_old, rc = -EINVAL);
2884
2885                 if (!fid_is_md_operative(new_fid))
2886                         GOTO(out_put_old, rc = -EPERM);
2887
2888                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
2889                 if (IS_ERR(mnew))
2890                         GOTO(out_put_old, rc = PTR_ERR(mnew));
2891
2892                 if (!mdt_object_exists(mnew)) {
2893                         LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2894                                         &mnew->mot_obj,
2895                                         "object does not exist");
2896                         GOTO(out_put_new, rc = -ENOENT);
2897                 }
2898
2899                 if (mdt_object_remote(mnew)) {
2900                         struct mdt_body  *repbody;
2901
2902                         /* Always send rename req to the target child MDT */
2903                         repbody = req_capsule_server_get(info->mti_pill,
2904                                                          &RMF_MDT_BODY);
2905                         LASSERT(repbody != NULL);
2906                         repbody->mbo_fid1 = *new_fid;
2907                         repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
2908                         GOTO(out_put_new, rc = -EXDEV);
2909                 }
2910                 /* Before locking the target dir, check we do not replace
2911                  * a dir with a non-dir, otherwise it may deadlock with
2912                  * link op which tries to create a link in this dir
2913                  * back to this non-dir.
2914                  */
2915                 if (S_ISDIR(lu_object_attr(&mnew->mot_obj)) &&
2916                     !S_ISDIR(lu_object_attr(&mold->mot_obj)))
2917                         GOTO(out_put_new, rc = -EISDIR);
2918
2919                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2920                 lh_rmt = &info->mti_lh[MDT_LH_RMT];
2921                 mdt_lock_reg_init(lh_oldp, LCK_EX);
2922                 mdt_lock_reg_init(lh_rmt, LCK_EX);
2923                 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
2924                 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2925                                             lh_rmt, lock_ibits, cos_incompat);
2926                 if (rc < 0)
2927                         GOTO(out_put_new, rc);
2928
2929                 /* Check if @msrcdir is subdir of @mnew, before locking child
2930                  * to avoid reverse locking.
2931                  */
2932                 if (mtgtdir != msrcdir) {
2933                         rc = mdo_is_subdir(info->mti_env,
2934                                            mdt_object_child(msrcdir), new_fid);
2935                         if (rc) {
2936                                 if (rc == 1)
2937                                         rc = -EINVAL;
2938                                 GOTO(out_unlock_old, rc);
2939                         }
2940                 }
2941
2942                 /* We used to acquire MDS_INODELOCK_FULL here but we
2943                  * can't do this now because a running HSM restore on
2944                  * the rename onto victim will hold the layout
2945                  * lock. See LU-4002.
2946                  */
2947
2948                 lh_newp = &info->mti_lh[MDT_LH_NEW];
2949                 mdt_lock_reg_init(lh_newp, LCK_EX);
2950                 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
2951                 if (mdt_object_remote(mtgtdir)) {
2952                         rc = mdt_remote_object_lock(info, mtgtdir,
2953                                                     mdt_object_fid(mnew),
2954                                                     &lh_newp->mlh_rreg_lh,
2955                                                     lh_newp->mlh_rreg_mode,
2956                                                     MDS_INODELOCK_LOOKUP,
2957                                                     false);
2958                         if (rc != ELDLM_OK)
2959                                 GOTO(out_unlock_old, rc);
2960
2961                         lock_ibits &= ~MDS_INODELOCK_LOOKUP;
2962                 }
2963                 rc = mdt_reint_object_lock(info, mnew, lh_newp, lock_ibits,
2964                                            cos_incompat);
2965                 if (rc != 0)
2966                         GOTO(out_unlock_new, rc);
2967
2968                 /* get and save version after locking */
2969                 mdt_version_get_save(info, mnew, 3);
2970         } else if (rc != -ENOENT) {
2971                 GOTO(out_put_old, rc);
2972         } else {
2973                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2974                 lh_rmt = &info->mti_lh[MDT_LH_RMT];
2975                 mdt_lock_reg_init(lh_oldp, LCK_EX);
2976                 mdt_lock_reg_init(lh_rmt, LCK_EX);
2977                 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
2978                 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2979                                             lh_rmt, lock_ibits, cos_incompat);
2980                 if (rc != 0)
2981                         GOTO(out_put_old, rc);
2982
2983                 mdt_enoent_version_save(info, 3);
2984         }
2985
2986         /* step 5: rename it */
2987         mdt_reint_init_ma(info, ma);
2988
2989         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
2990                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
2991
2992         if (mnew != NULL)
2993                 mutex_lock(&mnew->mot_lov_mutex);
2994
2995         rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
2996                         mdt_object_child(mtgtdir), old_fid, &rr->rr_name,
2997                         mnew != NULL ? mdt_object_child(mnew) : NULL,
2998                         &rr->rr_tgt_name, ma);
2999
3000         if (mnew != NULL)
3001                 mutex_unlock(&mnew->mot_lov_mutex);
3002
3003         /* handle last link of tgt object */
3004         if (rc == 0) {
3005                 mdt_counter_incr(req, LPROC_MDT_RENAME,
3006                                  ktime_us_delta(ktime_get(), kstart));
3007                 if (mnew) {
3008                         mdt_handle_last_unlink(info, mnew, ma);
3009                         discard = mdt_dom_check_for_discard(info, mnew);
3010                 }
3011                 mdt_rename_counter_tally(info, info->mti_mdt, req,
3012                                          msrcdir, mtgtdir,
3013                                          ktime_us_delta(ktime_get(), kstart));
3014         }
3015
3016         EXIT;
3017 out_unlock_new:
3018         if (mnew != NULL)
3019                 mdt_object_unlock(info, mnew, lh_newp, rc);
3020 out_unlock_old:
3021         mdt_object_unlock(info, NULL, lh_rmt, rc);
3022         mdt_object_unlock(info, mold, lh_oldp, rc);
3023 out_put_new:
3024         if (mnew && !discard)
3025                 mdt_object_put(info->mti_env, mnew);
3026 out_put_old:
3027         mdt_object_put(info->mti_env, mold);
3028 out_unlock_parents:
3029         mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
3030         mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
3031 out_unlock_rename:
3032         if (lustre_handle_is_used(&rename_lh))
3033                 mdt_rename_unlock(&rename_lh);
3034 out_put_tgtdir:
3035         mdt_object_put(info->mti_env, mtgtdir);
3036 out_put_srcdir:
3037         mdt_object_put(info->mti_env, msrcdir);
3038
3039         /* The DoM discard can be done right in the place above where it is
3040          * assigned, meanwhile it is done here after rename unlock due to
3041          * compatibility with old clients, for them the discard blocks
3042          * the main thread until completion. Check LU-11359 for details.
3043          */
3044         if (discard) {
3045                 mdt_dom_discard_data(info, mnew);
3046                 mdt_object_put(info->mti_env, mnew);
3047         }
3048         OBD_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
3049         return rc;
3050 }
3051
3052 static int mdt_reint_resync(struct mdt_thread_info *info,
3053                             struct mdt_lock_handle *lhc)
3054 {
3055         struct mdt_reint_record *rr = &info->mti_rr;
3056         struct ptlrpc_request *req = mdt_info_req(info);
3057         struct md_attr *ma = &info->mti_attr;
3058         struct mdt_object *mo;
3059         struct ldlm_lock *lease;
3060         struct mdt_body *repbody;
3061         struct md_layout_change layout = { .mlc_mirror_id = rr->rr_mirror_id };
3062         bool lease_broken;
3063         int rc, rc2;
3064
3065         ENTRY;
3066         DEBUG_REQ(D_INODE, req, DFID", FLR file resync", PFID(rr->rr_fid1));
3067
3068         if (info->mti_dlm_req)
3069                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
3070
3071         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
3072         if (IS_ERR(mo))
3073                 GOTO(out, rc = PTR_ERR(mo));
3074
3075         if (!mdt_object_exists(mo))
3076                 GOTO(out_obj, rc = -ENOENT);
3077
3078         if (!S_ISREG(lu_object_attr(&mo->mot_obj)))
3079                 GOTO(out_obj, rc = -EINVAL);
3080
3081         if (mdt_object_remote(mo))
3082                 GOTO(out_obj, rc = -EREMOTE);
3083
3084         lease = ldlm_handle2lock(rr->rr_lease_handle);
3085         if (lease == NULL)
3086                 GOTO(out_obj, rc = -ESTALE);
3087
3088         /* It's really necessary to grab open_sem and check if the lease lock
3089          * has been lost. There would exist a concurrent writer coming in and
3090          * generating some dirty data in memory cache, the writeback would fail
3091          * after the layout version is increased by MDS_REINT_RESYNC RPC.
3092          */
3093         if (!down_write_trylock(&mo->mot_open_sem))
3094                 GOTO(out_put_lease, rc = -EBUSY);
3095
3096         lock_res_and_lock(lease);
3097         lease_broken = ldlm_is_cancel(lease);
3098         unlock_res_and_lock(lease);
3099         if (lease_broken)
3100                 GOTO(out_unlock, rc = -EBUSY);
3101
3102         /* the file has yet opened by anyone else after we took the lease. */
3103         layout.mlc_opc = MD_LAYOUT_RESYNC;
3104         lhc = &info->mti_lh[MDT_LH_LOCAL];
3105         rc = mdt_layout_change(info, mo, lhc, &layout);
3106         if (rc)
3107                 GOTO(out_unlock, rc);
3108
3109         mdt_object_unlock(info, mo, lhc, 0);
3110
3111         ma->ma_need = MA_INODE;
3112         ma->ma_valid = 0;
3113         rc = mdt_attr_get_complex(info, mo, ma);
3114         if (rc != 0)
3115                 GOTO(out_unlock, rc);
3116
3117         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
3118         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
3119
3120         EXIT;
3121 out_unlock:
3122         up_write(&mo->mot_open_sem);
3123 out_put_lease:
3124         LDLM_LOCK_PUT(lease);
3125 out_obj:
3126         mdt_object_put(info->mti_env, mo);
3127 out:
3128         mdt_client_compatibility(info);
3129         rc2 = mdt_fix_reply(info);
3130         if (rc == 0)
3131                 rc = rc2;
3132         return rc;
3133 }
3134
3135 struct mdt_reinter {
3136         int (*mr_handler)(struct mdt_thread_info *, struct mdt_lock_handle *);
3137         enum lprocfs_extra_opc mr_extra_opc;
3138 };
3139
3140 static const struct mdt_reinter mdt_reinters[] = {
3141         [REINT_SETATTR] = {
3142                 .mr_handler = &mdt_reint_setattr,
3143                 .mr_extra_opc = MDS_REINT_SETATTR,
3144         },
3145         [REINT_CREATE] = {
3146                 .mr_handler = &mdt_reint_create,
3147                 .mr_extra_opc = MDS_REINT_CREATE,
3148         },
3149         [REINT_LINK] = {
3150                 .mr_handler = &mdt_reint_link,
3151                 .mr_extra_opc = MDS_REINT_LINK,
3152         },
3153         [REINT_UNLINK] = {
3154                 .mr_handler = &mdt_reint_unlink,
3155                 .mr_extra_opc = MDS_REINT_UNLINK,
3156         },
3157         [REINT_RENAME] = {
3158                 .mr_handler = &mdt_reint_rename,
3159                 .mr_extra_opc = MDS_REINT_RENAME,
3160         },
3161         [REINT_OPEN] = {
3162                 .mr_handler = &mdt_reint_open,
3163                 .mr_extra_opc = MDS_REINT_OPEN,
3164         },
3165         [REINT_SETXATTR] = {
3166                 .mr_handler = &mdt_reint_setxattr,
3167                 .mr_extra_opc = MDS_REINT_SETXATTR,
3168         },
3169         [REINT_RMENTRY] = {
3170                 .mr_handler = &mdt_reint_unlink,
3171                 .mr_extra_opc = MDS_REINT_UNLINK,
3172         },
3173         [REINT_MIGRATE] = {
3174                 .mr_handler = &mdt_reint_migrate,
3175                 .mr_extra_opc = MDS_REINT_RENAME,
3176         },
3177         [REINT_RESYNC] = {
3178                 .mr_handler = &mdt_reint_resync,
3179                 .mr_extra_opc = MDS_REINT_RESYNC,
3180         },
3181 };
3182
3183 int mdt_reint_rec(struct mdt_thread_info *info,
3184                   struct mdt_lock_handle *lhc)
3185 {
3186         const struct mdt_reinter *mr;
3187         int rc;
3188
3189         ENTRY;
3190         if (!(info->mti_rr.rr_opcode < ARRAY_SIZE(mdt_reinters)))
3191                 RETURN(-EPROTO);
3192
3193         mr = &mdt_reinters[info->mti_rr.rr_opcode];
3194         if (mr->mr_handler == NULL)
3195                 RETURN(-EPROTO);
3196
3197         rc = (*mr->mr_handler)(info, lhc);
3198
3199         lprocfs_counter_incr(ptlrpc_req2svc(mdt_info_req(info))->srv_stats,
3200                              PTLRPC_LAST_CNTR + mr->mr_extra_opc);
3201
3202         RETURN(rc);
3203 }