Whamcloud - gitweb
6090ff0ed250a19855c5ce6da62ddd3ab5083e47
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mdt/mdt_reint.c
32  *
33  * Lustre Metadata Target (mdt) reintegration routines
34  *
35  * Author: Peter Braam <braam@clusterfs.com>
36  * Author: Andreas Dilger <adilger@clusterfs.com>
37  * Author: Phil Schwan <phil@clusterfs.com>
38  * Author: Huang Hua <huanghua@clusterfs.com>
39  * Author: Yury Umanets <umka@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_MDS
43
44 #include <lprocfs_status.h>
45 #include "mdt_internal.h"
46 #include <lustre_lmv.h>
47 #include <lustre_crypto.h>
48
49 static inline void mdt_reint_init_ma(struct mdt_thread_info *info,
50                                      struct md_attr *ma)
51 {
52         ma->ma_need = MA_INODE;
53         ma->ma_valid = 0;
54 }
55
56 /**
57  * Get version of object by fid.
58  *
59  * Return real version or ENOENT_VERSION if object doesn't exist
60  */
61 static void mdt_obj_version_get(struct mdt_thread_info *info,
62                                 struct mdt_object *o, __u64 *version)
63 {
64         LASSERT(o);
65
66         if (mdt_object_exists(o) && !mdt_object_remote(o) &&
67             !fid_is_obf(mdt_object_fid(o)))
68                 *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
69         else
70                 *version = ENOENT_VERSION;
71         CDEBUG(D_INODE, "FID "DFID" version is %#llx\n",
72                PFID(mdt_object_fid(o)), *version);
73 }
74
75 /**
76  * Check version is correct.
77  *
78  * Should be called only during replay.
79  */
80 static int mdt_version_check(struct ptlrpc_request *req,
81                              __u64 version, int idx)
82 {
83         __u64 *pre_ver = lustre_msg_get_versions(req->rq_reqmsg);
84
85         ENTRY;
86         if (!exp_connect_vbr(req->rq_export))
87                 RETURN(0);
88
89         LASSERT(req_is_replay(req));
90         /** VBR: version is checked always because costs nothing */
91         LASSERT(idx < PTLRPC_NUM_VERSIONS);
92         /** Sanity check for malformed buffers */
93         if (pre_ver == NULL) {
94                 CERROR("No versions in request buffer\n");
95                 spin_lock(&req->rq_export->exp_lock);
96                 req->rq_export->exp_vbr_failed = 1;
97                 spin_unlock(&req->rq_export->exp_lock);
98                 RETURN(-EOVERFLOW);
99         } else if (pre_ver[idx] != version) {
100                 CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n",
101                        pre_ver[idx], version);
102                 spin_lock(&req->rq_export->exp_lock);
103                 req->rq_export->exp_vbr_failed = 1;
104                 spin_unlock(&req->rq_export->exp_lock);
105                 RETURN(-EOVERFLOW);
106         }
107         RETURN(0);
108 }
109
110 /**
111  * Save pre-versions in reply.
112  */
113 static void mdt_version_save(struct ptlrpc_request *req, __u64 version,
114                              int idx)
115 {
116         __u64 *reply_ver;
117
118         if (!exp_connect_vbr(req->rq_export))
119                 return;
120
121         LASSERT(!req_is_replay(req));
122         LASSERT(req->rq_repmsg != NULL);
123         reply_ver = lustre_msg_get_versions(req->rq_repmsg);
124         if (reply_ver)
125                 reply_ver[idx] = version;
126 }
127
128 /**
129  * Save enoent version, it is needed when it is obvious that object doesn't
130  * exist, e.g. child during create.
131  */
132 static void mdt_enoent_version_save(struct mdt_thread_info *info, int idx)
133 {
134         /* save version of file name for replay, it must be ENOENT here */
135         if (!req_is_replay(mdt_info_req(info))) {
136                 info->mti_ver[idx] = ENOENT_VERSION;
137                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
138         }
139 }
140
141 /**
142  * Get version from disk and save in reply buffer.
143  *
144  * Versions are saved in reply only during normal operations not replays.
145  */
146 void mdt_version_get_save(struct mdt_thread_info *info,
147                           struct mdt_object *mto, int idx)
148 {
149         /* don't save versions during replay */
150         if (!req_is_replay(mdt_info_req(info))) {
151                 mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
152                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
153         }
154 }
155
156 /**
157  * Get version from disk and check it, no save in reply.
158  */
159 int mdt_version_get_check(struct mdt_thread_info *info,
160                           struct mdt_object *mto, int idx)
161 {
162         /* only check versions during replay */
163         if (!req_is_replay(mdt_info_req(info)))
164                 return 0;
165
166         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
167         return mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
168 }
169
170 /**
171  * Get version from disk and check if recovery or just save.
172  */
173 int mdt_version_get_check_save(struct mdt_thread_info *info,
174                                struct mdt_object *mto, int idx)
175 {
176         int rc = 0;
177
178         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
179         if (req_is_replay(mdt_info_req(info)))
180                 rc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx],
181                                        idx);
182         else
183                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
184         return rc;
185 }
186
187 /**
188  * Lookup with version checking.
189  *
190  * This checks version of 'name'. Many reint functions uses 'name' for child not
191  * FID, therefore we need to get object by name and check its version.
192  */
193 int mdt_lookup_version_check(struct mdt_thread_info *info,
194                              struct mdt_object *p,
195                              const struct lu_name *lname,
196                              struct lu_fid *fid, int idx)
197 {
198         int rc, vbrc;
199
200         rc = mdo_lookup(info->mti_env, mdt_object_child(p), lname, fid,
201                         &info->mti_spec);
202         /* Check version only during replay */
203         if (!req_is_replay(mdt_info_req(info)))
204                 return rc;
205
206         info->mti_ver[idx] = ENOENT_VERSION;
207         if (rc == 0) {
208                 struct mdt_object *child;
209
210                 child = mdt_object_find(info->mti_env, info->mti_mdt, fid);
211                 if (likely(!IS_ERR(child))) {
212                         mdt_obj_version_get(info, child, &info->mti_ver[idx]);
213                         mdt_object_put(info->mti_env, child);
214                 }
215         }
216         vbrc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
217         return vbrc ? vbrc : rc;
218
219 }
220
221 static int mdt_unlock_slaves(struct mdt_thread_info *mti,
222                              struct mdt_object *obj,
223                              struct ldlm_enqueue_info *einfo,
224                              int decref)
225 {
226         union ldlm_policy_data *policy = &mti->mti_policy;
227         struct mdt_lock_handle *lh = &mti->mti_lh[MDT_LH_LOCAL];
228         struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
229         int i;
230
231         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
232         LASSERT(slave_locks);
233
234         memset(policy, 0, sizeof(*policy));
235         policy->l_inodebits.bits = einfo->ei_inodebits;
236         mdt_lock_handle_init(lh);
237         mdt_lock_reg_init(lh, einfo->ei_mode);
238         for (i = 0; i < slave_locks->ha_count; i++) {
239                 if (test_bit(i, (void *)slave_locks->ha_map))
240                         lh->mlh_rreg_lh = slave_locks->ha_handles[i];
241                 else
242                         lh->mlh_reg_lh = slave_locks->ha_handles[i];
243                 mdt_object_unlock(mti, NULL, lh, decref);
244                 slave_locks->ha_handles[i].cookie = 0ull;
245         }
246
247         return mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
248                                 policy);
249 }
250
251 static inline int mdt_object_striped(struct mdt_thread_info *mti,
252                                      struct mdt_object *obj)
253 {
254         struct lu_device *bottom_dev;
255         struct lu_object *bottom_obj;
256         int rc;
257
258         if (!S_ISDIR(obj->mot_header.loh_attr))
259                 return 0;
260
261         /* getxattr from bottom obj to avoid reading in shard FIDs */
262         bottom_dev = dt2lu_dev(mti->mti_mdt->mdt_bottom);
263         bottom_obj = lu_object_find_slice(mti->mti_env, bottom_dev,
264                                           mdt_object_fid(obj), NULL);
265         if (IS_ERR(bottom_obj))
266                 return PTR_ERR(bottom_obj);
267
268         rc = dt_xattr_get(mti->mti_env, lu2dt(bottom_obj), &LU_BUF_NULL,
269                           XATTR_NAME_LMV);
270         lu_object_put(mti->mti_env, bottom_obj);
271
272         return (rc > 0) ? 1 : (rc == -ENODATA) ? 0 : rc;
273 }
274
275 /**
276  * Lock slave stripes if necessary, the lock handles of slave stripes
277  * will be stored in einfo->ei_cbdata.
278  **/
279 static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
280                            enum ldlm_mode mode, __u64 ibits,
281                            struct ldlm_enqueue_info *einfo)
282 {
283         union ldlm_policy_data *policy = &mti->mti_policy;
284
285         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
286
287         einfo->ei_type = LDLM_IBITS;
288         einfo->ei_mode = mode;
289         einfo->ei_cb_bl = mdt_remote_blocking_ast;
290         einfo->ei_cb_local_bl = mdt_blocking_ast;
291         einfo->ei_cb_cp = ldlm_completion_ast;
292         einfo->ei_enq_slave = 1;
293         einfo->ei_namespace = mti->mti_mdt->mdt_namespace;
294         einfo->ei_inodebits = ibits;
295         memset(policy, 0, sizeof(*policy));
296         policy->l_inodebits.bits = ibits;
297
298         return mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
299                               policy);
300 }
301
302 int mdt_reint_striped_lock(struct mdt_thread_info *info,
303                            struct mdt_object *o,
304                            struct mdt_lock_handle *lh,
305                            __u64 ibits,
306                            struct ldlm_enqueue_info *einfo,
307                            bool cos_incompat)
308 {
309         int rc;
310
311         LASSERT(!mdt_object_remote(o));
312
313         memset(einfo, 0, sizeof(*einfo));
314
315         rc = mdt_reint_object_lock(info, o, lh, ibits, cos_incompat);
316         if (rc)
317                 return rc;
318
319         rc = mdt_object_striped(info, o);
320         if (rc != 1) {
321                 if (rc < 0)
322                         mdt_object_unlock(info, o, lh, rc);
323                 return rc;
324         }
325
326         rc = mdt_lock_slaves(info, o, lh->mlh_reg_mode, ibits, einfo);
327         if (rc) {
328                 mdt_object_unlock(info, o, lh, rc);
329                 if (rc == -EIO && OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME))
330                         rc = 0;
331         }
332
333         return rc;
334 }
335
336 void mdt_reint_striped_unlock(struct mdt_thread_info *info,
337                               struct mdt_object *o,
338                               struct mdt_lock_handle *lh,
339                               struct ldlm_enqueue_info *einfo, int decref)
340 {
341         if (einfo->ei_cbdata)
342                 mdt_unlock_slaves(info, o, einfo, decref);
343         mdt_object_unlock(info, o, lh, decref);
344 }
345
346 static int mdt_restripe(struct mdt_thread_info *info,
347                         struct mdt_object *parent,
348                         const struct lu_name *lname,
349                         const struct lu_fid *tfid,
350                         struct md_op_spec *spec,
351                         struct md_attr *ma)
352 {
353         struct mdt_device *mdt = info->mti_mdt;
354         struct lu_fid *fid = &info->mti_tmp_fid2;
355         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
356         struct lmv_user_md *lum = spec->u.sp_ea.eadata;
357         struct lmv_mds_md_v1 *lmv;
358         struct mdt_object *child;
359         struct mdt_lock_handle *lhp;
360         struct mdt_lock_handle *lhc;
361         struct mdt_body *repbody;
362         int rc;
363
364         ENTRY;
365         if (!mdt->mdt_enable_dir_restripe)
366                 RETURN(-EPERM);
367
368         LASSERT(lum);
369         lum->lum_hash_type |= cpu_to_le32(LMV_HASH_FLAG_FIXED);
370
371         rc = mdt_version_get_check_save(info, parent, 0);
372         if (rc)
373                 RETURN(rc);
374
375         lhp = &info->mti_lh[MDT_LH_PARENT];
376         mdt_lock_pdo_init(lhp, LCK_PW, lname);
377         rc = mdt_reint_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
378                                    true);
379         if (rc)
380                 RETURN(rc);
381
382         rc = mdt_stripe_get(info, parent, ma, XATTR_NAME_LMV);
383         if (rc)
384                 GOTO(unlock_parent, rc);
385
386         if (ma->ma_valid & MA_LMV) {
387                 /* don't allow restripe if parent dir layout is changing */
388                 lmv = &ma->ma_lmv->lmv_md_v1;
389                 if (!lmv_is_sane2(lmv))
390                         GOTO(unlock_parent, rc = -EBADF);
391
392                 if (lmv_is_layout_changing(lmv))
393                         GOTO(unlock_parent, rc = -EBUSY);
394         }
395
396         fid_zero(fid);
397         rc = mdt_lookup_version_check(info, parent, lname, fid, 1);
398         if (rc)
399                 GOTO(unlock_parent, rc);
400
401         child = mdt_object_find(info->mti_env, mdt, fid);
402         if (IS_ERR(child))
403                 GOTO(unlock_parent, rc = PTR_ERR(child));
404
405         if (!mdt_object_exists(child))
406                 GOTO(out_child, rc = -ENOENT);
407
408         if (mdt_object_remote(child)) {
409                 struct mdt_body *repbody;
410
411                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
412                 if (!repbody)
413                         GOTO(out_child, rc = -EPROTO);
414
415                 repbody->mbo_fid1 = *fid;
416                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
417                 GOTO(out_child, rc = -EREMOTE);
418         }
419
420         if (!S_ISDIR(lu_object_attr(&child->mot_obj)))
421                 GOTO(out_child, rc = -ENOTDIR);
422
423         rc = mdt_stripe_get(info, child, ma, XATTR_NAME_LMV);
424         if (rc)
425                 GOTO(out_child, rc);
426
427         /* race with migrate? */
428         if ((ma->ma_valid & MA_LMV) &&
429              lmv_is_migrating(&ma->ma_lmv->lmv_md_v1))
430                 GOTO(out_child, rc = -EBUSY);
431
432         /* lock object */
433         lhc = &info->mti_lh[MDT_LH_CHILD];
434         mdt_lock_reg_init(lhc, LCK_EX);
435
436         /* enqueue object remote LOOKUP lock */
437         if (mdt_object_remote(parent)) {
438                 rc = mdt_remote_object_lock(info, parent, fid,
439                                             &lhc->mlh_rreg_lh,
440                                             lhc->mlh_rreg_mode,
441                                             MDS_INODELOCK_LOOKUP, false);
442                 if (rc != ELDLM_OK)
443                         GOTO(out_child, rc);
444         }
445
446         rc = mdt_reint_striped_lock(info, child, lhc, MDS_INODELOCK_FULL, einfo,
447                                     true);
448         if (rc)
449                 GOTO(unlock_child, rc);
450
451         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
452         rc = mdt_version_get_check_save(info, child, 1);
453         if (rc)
454                 GOTO(unlock_child, rc);
455
456         spin_lock(&mdt->mdt_restriper.mdr_lock);
457         if (child->mot_restriping) {
458                 /* race? */
459                 spin_unlock(&mdt->mdt_restriper.mdr_lock);
460                 GOTO(unlock_child, rc = -EBUSY);
461         }
462         child->mot_restriping = 1;
463         spin_unlock(&mdt->mdt_restriper.mdr_lock);
464
465         *fid = *tfid;
466         rc = mdt_restripe_internal(info, parent, child, lname, fid, spec, ma);
467         if (rc)
468                 GOTO(restriping_clear, rc);
469
470         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
471         if (!repbody)
472                 GOTO(restriping_clear, rc = -EPROTO);
473
474         mdt_pack_attr2body(info, repbody, &ma->ma_attr, fid);
475         EXIT;
476
477 restriping_clear:
478         child->mot_restriping = 0;
479 unlock_child:
480         mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
481 out_child:
482         mdt_object_put(info->mti_env, child);
483 unlock_parent:
484         mdt_object_unlock(info, parent, lhp, rc);
485
486         return rc;
487 }
488
489 /*
490  * VBR: we save three versions in reply:
491  * 0 - parent. Check that parent version is the same during replay.
492  * 1 - name. Version of 'name' if file exists with the same name or
493  * ENOENT_VERSION, it is needed because file may appear due to missed replays.
494  * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
495  * check.
496  */
497 static int mdt_create(struct mdt_thread_info *info)
498 {
499         struct mdt_device *mdt = info->mti_mdt;
500         struct mdt_object *parent;
501         struct mdt_object *child;
502         struct mdt_lock_handle *lh;
503         struct mdt_body *repbody;
504         struct md_attr *ma = &info->mti_attr;
505         struct mdt_reint_record *rr = &info->mti_rr;
506         struct md_op_spec *spec = &info->mti_spec;
507         bool restripe = false;
508         int rc;
509
510         ENTRY;
511         DEBUG_REQ(D_INODE, mdt_info_req(info),
512                   "Create ("DNAME"->"DFID") in "DFID,
513                   PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1));
514
515         if (!fid_is_md_operative(rr->rr_fid1))
516                 RETURN(-EPERM);
517
518         if (S_ISDIR(ma->ma_attr.la_mode) &&
519             spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
520                 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
521                 struct lu_ucred *uc = mdt_ucred(info);
522                 struct obd_export *exp = mdt_info_req(info)->rq_export;
523
524                 /* Only new clients can create remote dir( >= 2.4) and
525                  * striped dir(>= 2.6), old client will return -ENOTSUPP
526                  */
527                 if (!mdt_is_dne_client(exp))
528                         RETURN(-ENOTSUPP);
529
530                 if (le32_to_cpu(lum->lum_stripe_count) > 1) {
531                         if (!mdt_is_striped_client(exp))
532                                 RETURN(-ENOTSUPP);
533
534                         if (!mdt->mdt_enable_striped_dir)
535                                 RETURN(-EPERM);
536                 } else if (!mdt->mdt_enable_remote_dir) {
537                         RETURN(-EPERM);
538                 }
539
540                 if ((!(exp_connect_flags2(exp) & OBD_CONNECT2_CRUSH)) &&
541                     (le32_to_cpu(lum->lum_hash_type) & LMV_HASH_TYPE_MASK) ==
542                     LMV_HASH_TYPE_CRUSH)
543                         RETURN(-EPROTO);
544
545                 if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
546                     uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
547                     mdt->mdt_enable_remote_dir_gid != -1)
548                         RETURN(-EPERM);
549
550                 /* restripe if later found dir exists, MDS_OPEN_CREAT means
551                  * this is create only, don't try restripe.
552                  */
553                 if (mdt->mdt_enable_dir_restripe &&
554                     le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT &&
555                     !(spec->sp_cr_flags & MDS_OPEN_CREAT))
556                         restripe = true;
557         }
558
559         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
560
561         parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
562         if (IS_ERR(parent))
563                 RETURN(PTR_ERR(parent));
564
565         if (!mdt_object_exists(parent))
566                 GOTO(put_parent, rc = -ENOENT);
567
568         /*
569          * LU-10235: check if name exists locklessly first to avoid massive
570          * lock recalls on existing directories.
571          */
572         rc = mdt_lookup_version_check(info, parent, &rr->rr_name,
573                                       &info->mti_tmp_fid1, 1);
574         if (rc == 0) {
575                 if (!restripe)
576                         GOTO(put_parent, rc = -EEXIST);
577
578                 rc = mdt_restripe(info, parent, &rr->rr_name, rr->rr_fid2, spec,
579                                   ma);
580         }
581
582         /* -ENOENT is expected here */
583         if (rc != -ENOENT)
584                 GOTO(put_parent, rc);
585
586         /* save version of file name for replay, it must be ENOENT here */
587         mdt_enoent_version_save(info, 1);
588
589         OBD_RACE(OBD_FAIL_MDS_CREATE_RACE);
590
591         lh = &info->mti_lh[MDT_LH_PARENT];
592         mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
593         rc = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE);
594         if (rc)
595                 GOTO(put_parent, rc);
596
597         if (!mdt_object_remote(parent)) {
598                 rc = mdt_version_get_check_save(info, parent, 0);
599                 if (rc)
600                         GOTO(unlock_parent, rc);
601         }
602
603         child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
604         if (unlikely(IS_ERR(child)))
605                 GOTO(unlock_parent, rc = PTR_ERR(child));
606
607         ma->ma_need = MA_INODE;
608         ma->ma_valid = 0;
609
610         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
611                         OBD_FAIL_MDS_REINT_CREATE_WRITE);
612
613         /* Version of child will be updated on disk. */
614         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
615         rc = mdt_version_get_check_save(info, child, 2);
616         if (rc)
617                 GOTO(put_child, rc);
618
619         /*
620          * Do not perform lookup sanity check. We know that name does
621          * not exist.
622          */
623         info->mti_spec.sp_cr_lookup = 0;
624         info->mti_spec.sp_feat = &dt_directory_features;
625
626         rc = mdo_create(info->mti_env, mdt_object_child(parent), &rr->rr_name,
627                         mdt_object_child(child), &info->mti_spec, ma);
628         if (rc == 0)
629                 rc = mdt_attr_get_complex(info, child, ma);
630
631         if (rc < 0)
632                 GOTO(put_child, rc);
633
634         /*
635          * On DNE, we need to eliminate dependey between 'mkdir a' and
636          * 'mkdir a/b' if b is a striped directory, to achieve this, two
637          * things are done below:
638          * 1. save child and slaves lock.
639          * 2. if the child is a striped directory, relock parent so to
640          *    compare against with COS locks to ensure parent was
641          *    committed to disk.
642          */
643         if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
644                 struct mdt_lock_handle *lhc;
645                 struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
646                 bool cos_incompat;
647
648                 rc = mdt_object_striped(info, child);
649                 if (rc < 0)
650                         GOTO(put_child, rc);
651
652                 cos_incompat = rc;
653                 if (cos_incompat) {
654                         if (!mdt_object_remote(parent)) {
655                                 mdt_object_unlock(info, parent, lh, 1);
656                                 mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
657                                 rc = mdt_reint_object_lock(info, parent, lh,
658                                                            MDS_INODELOCK_UPDATE,
659                                                            true);
660                                 if (rc)
661                                         GOTO(put_child, rc);
662                         }
663                 }
664
665                 lhc = &info->mti_lh[MDT_LH_CHILD];
666                 mdt_lock_handle_init(lhc);
667                 mdt_lock_reg_init(lhc, LCK_PW);
668                 rc = mdt_reint_striped_lock(info, child, lhc,
669                                             MDS_INODELOCK_UPDATE, einfo,
670                                             cos_incompat);
671                 if (rc)
672                         GOTO(put_child, rc);
673
674                 mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
675         }
676
677         /* Return fid & attr to client. */
678         if (ma->ma_valid & MA_INODE)
679                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
680                                    mdt_object_fid(child));
681         EXIT;
682 put_child:
683         mdt_object_put(info->mti_env, child);
684 unlock_parent:
685         mdt_object_unlock(info, parent, lh, rc);
686 put_parent:
687         mdt_object_put(info->mti_env, parent);
688         return rc;
689 }
690
691 static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
692                         struct md_attr *ma)
693 {
694         struct mdt_lock_handle  *lh;
695         int do_vbr = ma->ma_attr.la_valid &
696                         (LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS);
697         __u64 lockpart = MDS_INODELOCK_UPDATE;
698         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
699         bool cos_incompat;
700         int rc;
701
702         ENTRY;
703         rc = mdt_object_striped(info, mo);
704         if (rc < 0)
705                 RETURN(rc);
706
707         cos_incompat = rc;
708
709         lh = &info->mti_lh[MDT_LH_PARENT];
710         mdt_lock_reg_init(lh, LCK_PW);
711
712         /* Even though the new MDT will grant PERM lock to the old
713          * client, but the old client will almost ignore that during
714          * So it needs to revoke both LOOKUP and PERM lock here, so
715          * both new and old client can cancel the dcache
716          */
717         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
718                 lockpart |= MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM;
719
720         rc = mdt_reint_striped_lock(info, mo, lh, lockpart, einfo,
721                                     cos_incompat);
722         if (rc != 0)
723                 RETURN(rc);
724
725         /* all attrs are packed into mti_attr in unpack_setattr */
726         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
727                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
728
729         /* VBR: update version if attr changed are important for recovery */
730         if (do_vbr) {
731                 /* update on-disk version of changed object */
732                 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mo));
733                 rc = mdt_version_get_check_save(info, mo, 0);
734                 if (rc)
735                         GOTO(out_unlock, rc);
736         }
737
738         /* Ensure constant striping during chown(). See LU-2789. */
739         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
740                 mutex_lock(&mo->mot_lov_mutex);
741
742         /* all attrs are packed into mti_attr in unpack_setattr */
743         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
744
745         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
746                 mutex_unlock(&mo->mot_lov_mutex);
747
748         if (rc != 0)
749                 GOTO(out_unlock, rc);
750         mdt_dom_obj_lvb_update(info->mti_env, mo, NULL, false);
751         EXIT;
752 out_unlock:
753         mdt_reint_striped_unlock(info, mo, lh, einfo, rc);
754         return rc;
755 }
756
757 /**
758  * Check HSM flags and add HS_DIRTY flag if relevant.
759  *
760  * A file could be set dirty only if it has a copy in the backend (HS_EXISTS)
761  * and is not RELEASED.
762  */
763 int mdt_add_dirty_flag(struct mdt_thread_info *info, struct mdt_object *mo,
764                         struct md_attr *ma)
765 {
766         struct lu_ucred *uc = mdt_ucred(info);
767         kernel_cap_t cap_saved;
768         int rc;
769
770         ENTRY;
771         /* If the file was modified, add the dirty flag */
772         ma->ma_need = MA_HSM;
773         rc = mdt_attr_get_complex(info, mo, ma);
774         if (rc) {
775                 CERROR("file attribute read error for "DFID": %d.\n",
776                         PFID(mdt_object_fid(mo)), rc);
777                 RETURN(rc);
778         }
779
780         /* If an up2date copy exists in the backend, add dirty flag */
781         if ((ma->ma_valid & MA_HSM) && (ma->ma_hsm.mh_flags & HS_EXISTS)
782             && !(ma->ma_hsm.mh_flags & (HS_DIRTY|HS_RELEASED))) {
783                 ma->ma_hsm.mh_flags |= HS_DIRTY;
784
785                 /* Bump cap so that closes from non-owner writers can
786                  * set the HSM state to dirty.
787                  */
788                 cap_saved = uc->uc_cap;
789                 cap_raise(uc->uc_cap, CAP_FOWNER);
790                 rc = mdt_hsm_attr_set(info, mo, &ma->ma_hsm);
791                 uc->uc_cap = cap_saved;
792                 if (rc)
793                         CERROR("file attribute change error for "DFID": %d\n",
794                                 PFID(mdt_object_fid(mo)), rc);
795         }
796
797         RETURN(rc);
798 }
799
800 static int mdt_reint_setattr(struct mdt_thread_info *info,
801                              struct mdt_lock_handle *lhc)
802 {
803         struct mdt_device *mdt = info->mti_mdt;
804         struct md_attr *ma = &info->mti_attr;
805         struct mdt_reint_record *rr = &info->mti_rr;
806         struct ptlrpc_request *req = mdt_info_req(info);
807         struct mdt_object *mo;
808         struct mdt_body *repbody;
809         ktime_t kstart = ktime_get();
810         int rc, rc2;
811
812         ENTRY;
813         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
814                   (unsigned int)ma->ma_attr.la_valid);
815
816         if (info->mti_dlm_req)
817                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
818
819         OBD_RACE(OBD_FAIL_PTLRPC_RESEND_RACE);
820
821         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
822         mo = mdt_object_find(info->mti_env, mdt, rr->rr_fid1);
823         if (IS_ERR(mo))
824                 GOTO(out, rc = PTR_ERR(mo));
825
826         if (!mdt_object_exists(mo))
827                 GOTO(out_put, rc = -ENOENT);
828
829         if (mdt_object_remote(mo))
830                 GOTO(out_put, rc = -EREMOTE);
831
832         ma->ma_enable_chprojid_gid = mdt->mdt_enable_chprojid_gid;
833         /* revoke lease lock if size is going to be changed */
834         if (unlikely(ma->ma_attr.la_valid & LA_SIZE &&
835                      !(ma->ma_attr_flags & MDS_TRUNC_KEEP_LEASE) &&
836                      atomic_read(&mo->mot_lease_count) > 0)) {
837                 down_read(&mo->mot_open_sem);
838
839                 if (atomic_read(&mo->mot_lease_count) > 0) { /* lease exists */
840                         lhc = &info->mti_lh[MDT_LH_LOCAL];
841                         mdt_lock_reg_init(lhc, LCK_CW);
842
843                         rc = mdt_object_lock(info, mo, lhc, MDS_INODELOCK_OPEN);
844                         if (rc != 0) {
845                                 up_read(&mo->mot_open_sem);
846                                 GOTO(out_put, rc);
847                         }
848
849                         /* revoke lease lock */
850                         mdt_object_unlock(info, mo, lhc, 1);
851                 }
852                 up_read(&mo->mot_open_sem);
853         }
854
855         if (ma->ma_attr.la_valid & LA_SIZE || rr->rr_flags & MRF_OPEN_TRUNC) {
856                 /* Check write access for the O_TRUNC case */
857                 if (mdt_write_read(mo) < 0)
858                         GOTO(out_put, rc = -ETXTBSY);
859
860                 /* LU-10286: compatibility check for FLR.
861                  * Please check the comment in mdt_finish_open() for details
862                  */
863                 if (!exp_connect_flr(info->mti_exp) ||
864                     !exp_connect_overstriping(info->mti_exp)) {
865                         rc = mdt_big_xattr_get(info, mo, XATTR_NAME_LOV);
866                         if (rc < 0 && rc != -ENODATA)
867                                 GOTO(out_put, rc);
868
869                         if (!exp_connect_flr(info->mti_exp)) {
870                                 if (rc > 0 &&
871                                     mdt_lmm_is_flr(info->mti_big_lmm))
872                                         GOTO(out_put, rc = -EOPNOTSUPP);
873                         }
874
875                         if (!exp_connect_overstriping(info->mti_exp)) {
876                                 if (rc > 0 &&
877                                     mdt_lmm_is_overstriping(info->mti_big_lmm))
878                                         GOTO(out_put, rc = -EOPNOTSUPP);
879                         }
880                 }
881
882                 /* For truncate, the file size sent from client
883                  * is believable, but the blocks are incorrect,
884                  * which makes the block size in LSOM attribute
885                  * inconsisent with the real block size.
886                  */
887                 rc = mdt_lsom_update(info, mo, true);
888                 if (rc)
889                         GOTO(out_put, rc);
890         }
891
892         if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
893                 if (ma->ma_valid & MA_LOV)
894                         GOTO(out_put, rc = -EPROTO);
895
896                 /* MDT supports FMD for regular files due to Data-on-MDT */
897                 if (S_ISREG(lu_object_attr(&mo->mot_obj)) &&
898                     ma->ma_attr.la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) {
899                         tgt_fmd_update(info->mti_exp, mdt_object_fid(mo),
900                                        req->rq_xid);
901
902                         if (ma->ma_attr.la_valid & LA_MTIME) {
903                                 rc = mdt_attr_get_pfid(info, mo, &ma->ma_pfid);
904                                 if (!rc)
905                                         ma->ma_valid |= MA_PFID;
906                         }
907                 }
908
909                 rc = mdt_attr_set(info, mo, ma);
910                 if (rc)
911                         GOTO(out_put, rc);
912         } else if ((ma->ma_valid & (MA_LOV | MA_LMV)) &&
913                    (ma->ma_valid & MA_INODE)) {
914                 struct lu_buf *buf = &info->mti_buf;
915                 struct lu_ucred *uc = mdt_ucred(info);
916                 struct mdt_lock_handle *lh;
917                 const char *name;
918                 __u64 lockpart = MDS_INODELOCK_XATTR;
919
920                 /* reject if either remote or striped dir is disabled */
921                 if (ma->ma_valid & MA_LMV) {
922                         if (!mdt->mdt_enable_remote_dir ||
923                             !mdt->mdt_enable_striped_dir)
924                                 GOTO(out_put, rc = -EPERM);
925
926                         if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
927                             uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
928                             mdt->mdt_enable_remote_dir_gid != -1)
929                                 GOTO(out_put, rc = -EPERM);
930                 }
931
932                 if (!S_ISDIR(lu_object_attr(&mo->mot_obj)))
933                         GOTO(out_put, rc = -ENOTDIR);
934
935                 if (ma->ma_attr.la_valid != 0)
936                         GOTO(out_put, rc = -EPROTO);
937
938                 if (ma->ma_valid & MA_LOV) {
939                         buf->lb_buf = ma->ma_lmm;
940                         buf->lb_len = ma->ma_lmm_size;
941                         name = XATTR_NAME_LOV;
942                 } else {
943                         struct lmv_user_md *lmu = &ma->ma_lmv->lmv_user_md;
944
945                         buf->lb_buf = lmu;
946                         buf->lb_len = ma->ma_lmv_size;
947                         name = XATTR_NAME_DEFAULT_LMV;
948                         /* force client to update dir default layout */
949                         lockpart |= MDS_INODELOCK_LOOKUP;
950                 }
951
952                 lh = &info->mti_lh[MDT_LH_PARENT];
953                 mdt_lock_reg_init(lh, LCK_PW);
954
955                 rc = mdt_object_lock(info, mo, lh, lockpart);
956                 if (rc != 0)
957                         GOTO(out_put, rc);
958
959                 rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), buf,
960                                   name, 0);
961
962                 mdt_object_unlock(info, mo, lh, rc);
963                 if (rc)
964                         GOTO(out_put, rc);
965         } else {
966                 GOTO(out_put, rc = -EPROTO);
967         }
968
969         /* If file data is modified, add the dirty flag */
970         if (ma->ma_attr_flags & MDS_DATA_MODIFIED)
971                 rc = mdt_add_dirty_flag(info, mo, ma);
972
973         ma->ma_need = MA_INODE;
974         ma->ma_valid = 0;
975         rc = mdt_attr_get_complex(info, mo, ma);
976         if (rc != 0)
977                 GOTO(out_put, rc);
978
979         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
980
981         EXIT;
982 out_put:
983         mdt_object_put(info->mti_env, mo);
984 out:
985         if (rc == 0)
986                 mdt_counter_incr(req, LPROC_MDT_SETATTR,
987                                  ktime_us_delta(ktime_get(), kstart));
988
989         mdt_client_compatibility(info);
990         rc2 = mdt_fix_reply(info);
991         if (rc == 0)
992                 rc = rc2;
993         return rc;
994 }
995
996 static int mdt_reint_create(struct mdt_thread_info *info,
997                             struct mdt_lock_handle *lhc)
998 {
999         struct ptlrpc_request   *req = mdt_info_req(info);
1000         ktime_t                 kstart = ktime_get();
1001         int                     rc;
1002
1003         ENTRY;
1004         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
1005                 RETURN(err_serious(-ESTALE));
1006
1007         if (info->mti_dlm_req)
1008                 ldlm_request_cancel(mdt_info_req(info),
1009                                     info->mti_dlm_req, 0, LATF_SKIP);
1010
1011         if (!lu_name_is_valid(&info->mti_rr.rr_name))
1012                 RETURN(-EPROTO);
1013
1014         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
1015         case S_IFDIR:
1016         case S_IFREG:
1017         case S_IFLNK:
1018         case S_IFCHR:
1019         case S_IFBLK:
1020         case S_IFIFO:
1021         case S_IFSOCK:
1022                 break;
1023         default:
1024                 CERROR("%s: Unsupported mode %o\n",
1025                        mdt_obd_name(info->mti_mdt),
1026                        info->mti_attr.ma_attr.la_mode);
1027                 RETURN(err_serious(-EOPNOTSUPP));
1028         }
1029
1030         rc = mdt_create(info);
1031         if (rc == 0) {
1032                 if ((info->mti_attr.ma_attr.la_mode & S_IFMT) == S_IFDIR)
1033                         mdt_counter_incr(req, LPROC_MDT_MKDIR,
1034                                          ktime_us_delta(ktime_get(), kstart));
1035                 else
1036                         /* Special file should stay on the same node as parent*/
1037                         mdt_counter_incr(req, LPROC_MDT_MKNOD,
1038                                          ktime_us_delta(ktime_get(), kstart));
1039         }
1040
1041         RETURN(rc);
1042 }
1043
1044 /*
1045  * VBR: save parent version in reply and child version getting by its name.
1046  * Version of child is getting and checking during its lookup. If
1047  */
1048 static int mdt_reint_unlink(struct mdt_thread_info *info,
1049                             struct mdt_lock_handle *lhc)
1050 {
1051         struct mdt_reint_record *rr = &info->mti_rr;
1052         struct ptlrpc_request *req = mdt_info_req(info);
1053         struct md_attr *ma = &info->mti_attr;
1054         struct lu_fid *child_fid = &info->mti_tmp_fid1;
1055         struct mdt_object *mp;
1056         struct mdt_object *mc;
1057         struct mdt_lock_handle *parent_lh;
1058         struct mdt_lock_handle *child_lh;
1059         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
1060         __u64 lock_ibits;
1061         bool cos_incompat = false;
1062         int no_name = 0;
1063         ktime_t kstart = ktime_get();
1064         int rc;
1065
1066         ENTRY;
1067         DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1),
1068                   PNAME(&rr->rr_name));
1069
1070         if (info->mti_dlm_req)
1071                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1072
1073         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
1074                 RETURN(err_serious(-ENOENT));
1075
1076         if (!fid_is_md_operative(rr->rr_fid1))
1077                 RETURN(-EPERM);
1078
1079         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1080         if (IS_ERR(mp))
1081                 RETURN(PTR_ERR(mp));
1082
1083         if (mdt_object_remote(mp)) {
1084                 cos_incompat = true;
1085         } else {
1086                 rc = mdt_version_get_check_save(info, mp, 0);
1087                 if (rc)
1088                         GOTO(put_parent, rc);
1089         }
1090
1091         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN);
1092         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN2);
1093 relock:
1094         parent_lh = &info->mti_lh[MDT_LH_PARENT];
1095         mdt_lock_pdo_init(parent_lh, LCK_PW, &rr->rr_name);
1096         rc = mdt_reint_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE,
1097                                    cos_incompat);
1098         if (rc != 0)
1099                 GOTO(put_parent, rc);
1100
1101         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID) {
1102                 *child_fid = *rr->rr_fid2;
1103         } else {
1104                 /* lookup child object along with version checking */
1105                 fid_zero(child_fid);
1106                 rc = mdt_lookup_version_check(info, mp, &rr->rr_name, child_fid,
1107                                               1);
1108                 if (rc != 0) {
1109                         /* Name might not be able to find during resend of
1110                          * remote unlink, considering following case.
1111                          * dir_A is a remote directory, the name entry of
1112                          * dir_A is on MDT0, the directory is on MDT1,
1113                          *
1114                          * 1. client sends unlink req to MDT1.
1115                          * 2. MDT1 sends name delete update to MDT0.
1116                          * 3. name entry is being deleted in MDT0 synchronously.
1117                          * 4. MDT1 is restarted.
1118                          * 5. client resends unlink req to MDT1. So it can not
1119                          *    find the name entry on MDT0 anymore.
1120                          * In this case, MDT1 only needs to destory the local
1121                          * directory.
1122                          */
1123                         if (mdt_object_remote(mp) && rc == -ENOENT &&
1124                             !fid_is_zero(rr->rr_fid2) &&
1125                             lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
1126                                 no_name = 1;
1127                                 *child_fid = *rr->rr_fid2;
1128                         } else {
1129                                 GOTO(unlock_parent, rc);
1130                         }
1131                 }
1132         }
1133
1134         if (!fid_is_md_operative(child_fid))
1135                 GOTO(unlock_parent, rc = -EPERM);
1136
1137         /* We will lock the child regardless it is local or remote. No harm. */
1138         mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
1139         if (IS_ERR(mc))
1140                 GOTO(unlock_parent, rc = PTR_ERR(mc));
1141
1142         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID) {
1143                 /* In this case, child fid is embedded in the request, and we do
1144                  * not have a proper name as rr_name contains an encoded
1145                  * hash. So find name that matches provided hash.
1146                  */
1147                 if (!find_name_matching_hash(info, &rr->rr_name,
1148                                              NULL, mc, false))
1149                         GOTO(put_child, rc = -ENOENT);
1150         }
1151
1152         if (!cos_incompat) {
1153                 rc = mdt_object_striped(info, mc);
1154                 if (rc < 0)
1155                         GOTO(put_child, rc);
1156
1157                 cos_incompat = rc;
1158                 if (cos_incompat) {
1159                         mdt_object_put(info->mti_env, mc);
1160                         mdt_object_unlock(info, mp, parent_lh, -EAGAIN);
1161                         goto relock;
1162                 }
1163         }
1164
1165         child_lh = &info->mti_lh[MDT_LH_CHILD];
1166         mdt_lock_reg_init(child_lh, LCK_EX);
1167         if (info->mti_spec.sp_rm_entry) {
1168                 struct lu_ucred *uc  = mdt_ucred(info);
1169
1170                 if (!mdt_is_dne_client(req->rq_export))
1171                         /* Return -ENOTSUPP for old client */
1172                         GOTO(put_child, rc = -ENOTSUPP);
1173
1174                 if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN))
1175                         GOTO(put_child, rc = -EPERM);
1176
1177                 ma->ma_need = MA_INODE;
1178                 ma->ma_valid = 0;
1179                 rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1180                                 NULL, &rr->rr_name, ma, no_name);
1181                 GOTO(put_child, rc);
1182         }
1183
1184         if (mdt_object_remote(mc)) {
1185                 struct mdt_body  *repbody;
1186
1187                 if (!fid_is_zero(rr->rr_fid2)) {
1188                         CDEBUG(D_INFO, "%s: name "DNAME" cannot find "DFID"\n",
1189                                mdt_obd_name(info->mti_mdt),
1190                                PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1191                         GOTO(put_child, rc = -ENOENT);
1192                 }
1193                 CDEBUG(D_INFO, "%s: name "DNAME": "DFID" is on another MDT\n",
1194                        mdt_obd_name(info->mti_mdt),
1195                        PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1196
1197                 if (!mdt_is_dne_client(req->rq_export))
1198                         /* Return -ENOTSUPP for old client */
1199                         GOTO(put_child, rc = -ENOTSUPP);
1200
1201                 /* Revoke the LOOKUP lock of the remote object granted by
1202                  * this MDT. Since the unlink will happen on another MDT,
1203                  * it will release the LOOKUP lock right away. Then What
1204                  * would happen if another client try to grab the LOOKUP
1205                  * lock at the same time with unlink XXX
1206                  */
1207                 mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_LOOKUP);
1208                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1209                 LASSERT(repbody != NULL);
1210                 repbody->mbo_fid1 = *mdt_object_fid(mc);
1211                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
1212                 GOTO(unlock_child, rc = -EREMOTE);
1213         }
1214         /* We used to acquire MDS_INODELOCK_FULL here but we can't do
1215          * this now because a running HSM restore on the child (unlink
1216          * victim) will hold the layout lock. See LU-4002.
1217          */
1218         lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
1219         if (mdt_object_remote(mp)) {
1220                 /* Enqueue lookup lock from parent MDT */
1221                 rc = mdt_remote_object_lock(info, mp, mdt_object_fid(mc),
1222                                             &child_lh->mlh_rreg_lh,
1223                                             child_lh->mlh_rreg_mode,
1224                                             MDS_INODELOCK_LOOKUP, false);
1225                 if (rc != ELDLM_OK)
1226                         GOTO(put_child, rc);
1227
1228                 lock_ibits &= ~MDS_INODELOCK_LOOKUP;
1229         }
1230
1231         rc = mdt_reint_striped_lock(info, mc, child_lh, lock_ibits, einfo,
1232                                     cos_incompat);
1233         if (rc != 0)
1234                 GOTO(put_child, rc);
1235
1236         /*
1237          * Now we can only make sure we need MA_INODE, in mdd layer, will check
1238          * whether need MA_LOV and MA_COOKIE.
1239          */
1240         ma->ma_need = MA_INODE;
1241         ma->ma_valid = 0;
1242
1243         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1244                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
1245         /* save version when object is locked */
1246         mdt_version_get_save(info, mc, 1);
1247
1248         mutex_lock(&mc->mot_lov_mutex);
1249
1250         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1251                         mdt_object_child(mc), &rr->rr_name, ma, no_name);
1252
1253         mutex_unlock(&mc->mot_lov_mutex);
1254         if (rc != 0)
1255                 GOTO(unlock_child, rc);
1256
1257         if (!lu_object_is_dying(&mc->mot_header)) {
1258                 rc = mdt_attr_get_complex(info, mc, ma);
1259                 if (rc)
1260                         GOTO(out_stat, rc);
1261         } else if (mdt_dom_check_for_discard(info, mc)) {
1262                 mdt_dom_discard_data(info, mc);
1263         }
1264         mdt_handle_last_unlink(info, mc, ma);
1265
1266 out_stat:
1267         if (ma->ma_valid & MA_INODE) {
1268                 switch (ma->ma_attr.la_mode & S_IFMT) {
1269                 case S_IFDIR:
1270                         mdt_counter_incr(req, LPROC_MDT_RMDIR,
1271                                          ktime_us_delta(ktime_get(), kstart));
1272                         break;
1273                 case S_IFREG:
1274                 case S_IFLNK:
1275                 case S_IFCHR:
1276                 case S_IFBLK:
1277                 case S_IFIFO:
1278                 case S_IFSOCK:
1279                         mdt_counter_incr(req, LPROC_MDT_UNLINK,
1280                                          ktime_us_delta(ktime_get(), kstart));
1281                         break;
1282                 default:
1283                         LASSERTF(0, "bad file type %o unlinking\n",
1284                                 ma->ma_attr.la_mode);
1285                 }
1286         }
1287
1288         EXIT;
1289
1290 unlock_child:
1291         mdt_reint_striped_unlock(info, mc, child_lh, einfo, rc);
1292 put_child:
1293         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID &&
1294             info->mti_big_buf.lb_buf)
1295                 lu_buf_free(&info->mti_big_buf);
1296         mdt_object_put(info->mti_env, mc);
1297 unlock_parent:
1298         mdt_object_unlock(info, mp, parent_lh, rc);
1299 put_parent:
1300         mdt_object_put(info->mti_env, mp);
1301         CFS_RACE_WAKEUP(OBD_FAIL_OBD_ZERO_NLINK_RACE);
1302         return rc;
1303 }
1304
1305 /*
1306  * VBR: save versions in reply: 0 - parent; 1 - child by fid; 2 - target by
1307  * name.
1308  */
1309 static int mdt_reint_link(struct mdt_thread_info *info,
1310                           struct mdt_lock_handle *lhc)
1311 {
1312         struct mdt_reint_record *rr = &info->mti_rr;
1313         struct ptlrpc_request   *req = mdt_info_req(info);
1314         struct md_attr          *ma = &info->mti_attr;
1315         struct mdt_object       *ms;
1316         struct mdt_object       *mp;
1317         struct mdt_lock_handle  *lhs;
1318         struct mdt_lock_handle  *lhp;
1319         ktime_t kstart = ktime_get();
1320         bool cos_incompat;
1321         int rc;
1322
1323         ENTRY;
1324         DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/"DNAME,
1325                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), PNAME(&rr->rr_name));
1326
1327         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
1328                 RETURN(err_serious(-ENOENT));
1329
1330         if (OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_RESEND_RACE) ||
1331             OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND)) {
1332                 req->rq_no_reply = 1;
1333                 RETURN(err_serious(-ENOENT));
1334         }
1335
1336         if (info->mti_dlm_req)
1337                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1338
1339         /* Invalid case so return error immediately instead of
1340          * processing it
1341          */
1342         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
1343                 RETURN(-EPERM);
1344
1345         if (!fid_is_md_operative(rr->rr_fid1) ||
1346             !fid_is_md_operative(rr->rr_fid2))
1347                 RETURN(-EPERM);
1348
1349         /* step 1: find target parent dir */
1350         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
1351         if (IS_ERR(mp))
1352                 RETURN(PTR_ERR(mp));
1353
1354         rc = mdt_version_get_check_save(info, mp, 0);
1355         if (rc)
1356                 GOTO(put_parent, rc);
1357
1358         /* step 2: find source */
1359         ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1360         if (IS_ERR(ms))
1361                 GOTO(put_parent, rc = PTR_ERR(ms));
1362
1363         if (!mdt_object_exists(ms)) {
1364                 CDEBUG(D_INFO, "%s: "DFID" does not exist.\n",
1365                        mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1));
1366                 GOTO(put_source, rc = -ENOENT);
1367         }
1368
1369         cos_incompat = (mdt_object_remote(mp) || mdt_object_remote(ms));
1370
1371         OBD_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
1372
1373         lhp = &info->mti_lh[MDT_LH_PARENT];
1374         mdt_lock_pdo_init(lhp, LCK_PW, &rr->rr_name);
1375         rc = mdt_reint_object_lock(info, mp, lhp, MDS_INODELOCK_UPDATE,
1376                                    cos_incompat);
1377         if (rc != 0)
1378                 GOTO(put_source, rc);
1379
1380         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
1381
1382         lhs = &info->mti_lh[MDT_LH_CHILD];
1383         mdt_lock_reg_init(lhs, LCK_EX);
1384         rc = mdt_reint_object_lock(info, ms, lhs,
1385                                    MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
1386                                    cos_incompat);
1387         if (rc != 0)
1388                 GOTO(unlock_parent, rc);
1389
1390         /* step 3: link it */
1391         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1392                         OBD_FAIL_MDS_REINT_LINK_WRITE);
1393
1394         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(ms));
1395         rc = mdt_version_get_check_save(info, ms, 1);
1396         if (rc)
1397                 GOTO(unlock_source, rc);
1398
1399         /** check target version by name during replay */
1400         rc = mdt_lookup_version_check(info, mp, &rr->rr_name,
1401                                       &info->mti_tmp_fid1, 2);
1402         if (rc != 0 && rc != -ENOENT)
1403                 GOTO(unlock_source, rc);
1404         /* save version of file name for replay, it must be ENOENT here */
1405         if (!req_is_replay(mdt_info_req(info))) {
1406                 if (rc != -ENOENT) {
1407                         CDEBUG(D_INFO, "link target "DNAME" existed!\n",
1408                                PNAME(&rr->rr_name));
1409                         GOTO(unlock_source, rc = -EEXIST);
1410                 }
1411                 info->mti_ver[2] = ENOENT_VERSION;
1412                 mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2);
1413         }
1414
1415         rc = mdo_link(info->mti_env, mdt_object_child(mp),
1416                       mdt_object_child(ms), &rr->rr_name, ma);
1417
1418         if (rc == 0)
1419                 mdt_counter_incr(req, LPROC_MDT_LINK,
1420                                  ktime_us_delta(ktime_get(), kstart));
1421
1422         EXIT;
1423 unlock_source:
1424         mdt_object_unlock(info, ms, lhs, rc);
1425 unlock_parent:
1426         mdt_object_unlock(info, mp, lhp, rc);
1427 put_source:
1428         mdt_object_put(info->mti_env, ms);
1429 put_parent:
1430         mdt_object_put(info->mti_env, mp);
1431         return rc;
1432 }
1433 /**
1434  * lock the part of the directory according to the hash of the name
1435  * (lh->mlh_pdo_hash) in parallel directory lock.
1436  */
1437 static int mdt_pdir_hash_lock(struct mdt_thread_info *info,
1438                               struct mdt_lock_handle *lh,
1439                               struct mdt_object *obj, __u64 ibits,
1440                               bool cos_incompat)
1441 {
1442         struct ldlm_res_id *res = &info->mti_res_id;
1443         struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1444         union ldlm_policy_data *policy = &info->mti_policy;
1445         __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
1446         int rc;
1447
1448         /*
1449          * Finish res_id initializing by name hash marking part of
1450          * directory which is taking modification.
1451          */
1452         LASSERT(lh->mlh_pdo_hash != 0);
1453         fid_build_pdo_res_name(mdt_object_fid(obj), lh->mlh_pdo_hash, res);
1454         memset(policy, 0, sizeof(*policy));
1455         policy->l_inodebits.bits = ibits;
1456         if (cos_incompat &&
1457             (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX))
1458                 dlmflags |= LDLM_FL_COS_INCOMPAT;
1459         /*
1460          * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
1461          * going to be sent to client. If it is - mdt_intent_policy() path will
1462          * fix it up and turn FL_LOCAL flag off.
1463          */
1464         rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode,
1465                           policy, res, dlmflags,
1466                           &info->mti_exp->exp_handle.h_cookie);
1467         return rc;
1468 }
1469
1470 /**
1471  * Get BFL lock for rename or migrate process.
1472  **/
1473 static int mdt_rename_lock(struct mdt_thread_info *info,
1474                            struct lustre_handle *lh)
1475 {
1476         int     rc;
1477
1478         ENTRY;
1479         if (mdt_seq_site(info->mti_mdt)->ss_node_id != 0) {
1480                 struct lu_fid *fid = &info->mti_tmp_fid1;
1481                 struct mdt_object *obj;
1482
1483                 /* XXX, right now, it has to use object API to
1484                  * enqueue lock cross MDT, so it will enqueue
1485                  * rename lock(with LUSTRE_BFL_FID) by root object
1486                  */
1487                 lu_root_fid(fid);
1488                 obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1489                 if (IS_ERR(obj))
1490                         RETURN(PTR_ERR(obj));
1491
1492                 rc = mdt_remote_object_lock(info, obj,
1493                                             &LUSTRE_BFL_FID, lh,
1494                                             LCK_EX,
1495                                             MDS_INODELOCK_UPDATE, false);
1496                 mdt_object_put(info->mti_env, obj);
1497         } else {
1498                 struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1499                 union ldlm_policy_data *policy = &info->mti_policy;
1500                 struct ldlm_res_id *res_id = &info->mti_res_id;
1501                 __u64 flags = 0;
1502
1503                 fid_build_reg_res_name(&LUSTRE_BFL_FID, res_id);
1504                 memset(policy, 0, sizeof(*policy));
1505                 policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
1506                 flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
1507                 rc = ldlm_cli_enqueue_local(info->mti_env, ns, res_id,
1508                                             LDLM_IBITS, policy, LCK_EX, &flags,
1509                                             ldlm_blocking_ast,
1510                                             ldlm_completion_ast, NULL, NULL, 0,
1511                                             LVB_T_NONE,
1512                                             &info->mti_exp->exp_handle.h_cookie,
1513                                             lh);
1514                 RETURN(rc);
1515         }
1516         RETURN(rc);
1517 }
1518
1519 static void mdt_rename_unlock(struct lustre_handle *lh)
1520 {
1521         ENTRY;
1522         LASSERT(lustre_handle_is_used(lh));
1523         /* Cancel the single rename lock right away */
1524         ldlm_lock_decref_and_cancel(lh, LCK_EX);
1525         EXIT;
1526 }
1527
1528 static struct mdt_object *mdt_parent_find_check(struct mdt_thread_info *info,
1529                                                 const struct lu_fid *fid,
1530                                                 int idx)
1531 {
1532         struct mdt_object *dir;
1533         int rc;
1534
1535         ENTRY;
1536         dir = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1537         if (IS_ERR(dir))
1538                 RETURN(dir);
1539
1540         /* check early, the real version will be saved after locking */
1541         rc = mdt_version_get_check(info, dir, idx);
1542         if (rc)
1543                 GOTO(out_put, rc);
1544
1545         if (!mdt_object_exists(dir))
1546                 GOTO(out_put, rc = -ENOENT);
1547
1548         if (!S_ISDIR(lu_object_attr(&dir->mot_obj)))
1549                 GOTO(out_put, rc = -ENOTDIR);
1550
1551         RETURN(dir);
1552 out_put:
1553         mdt_object_put(info->mti_env, dir);
1554         return ERR_PTR(rc);
1555 }
1556
1557 /*
1558  * in case obj is remote obj on its parent, revoke LOOKUP lock,
1559  * herein we don't really check it, just do revoke.
1560  */
1561 int mdt_revoke_remote_lookup_lock(struct mdt_thread_info *info,
1562                                   struct mdt_object *pobj,
1563                                   struct mdt_object *obj)
1564 {
1565         struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
1566         int rc;
1567
1568         mdt_lock_handle_init(lh);
1569         mdt_lock_reg_init(lh, LCK_EX);
1570
1571         if (mdt_object_remote(pobj)) {
1572                 /* don't bother to check if pobj and obj are on the same MDT. */
1573                 rc = mdt_remote_object_lock(info, pobj, mdt_object_fid(obj),
1574                                             &lh->mlh_rreg_lh, LCK_EX,
1575                                             MDS_INODELOCK_LOOKUP, false);
1576         } else if (mdt_object_remote(obj)) {
1577                 struct ldlm_res_id *res = &info->mti_res_id;
1578                 union ldlm_policy_data *policy = &info->mti_policy;
1579                 __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB |
1580                                  LDLM_FL_COS_INCOMPAT;
1581
1582                 fid_build_reg_res_name(mdt_object_fid(obj), res);
1583                 memset(policy, 0, sizeof(*policy));
1584                 policy->l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1585                 rc = mdt_fid_lock(info->mti_env, info->mti_mdt->mdt_namespace,
1586                                   &lh->mlh_reg_lh, LCK_EX, policy, res,
1587                                   dlmflags, NULL);
1588         } else {
1589                 /* do nothing if both are local */
1590                 return 0;
1591         }
1592
1593         if (rc != ELDLM_OK)
1594                 return rc;
1595
1596         /*
1597          * TODO, currently we don't save this lock because there is no place to
1598          * hold this lock handle, but to avoid race we need to save this lock.
1599          */
1600         mdt_object_unlock(info, NULL, lh, 1);
1601
1602         return 0;
1603 }
1604
1605 /*
1606  * operation may takes locks of linkea, or directory stripes, group them in
1607  * different list.
1608  */
1609 struct mdt_sub_lock {
1610         struct mdt_object *msl_obj;
1611         struct mdt_lock_handle msl_lh;
1612         struct list_head msl_linkage;
1613 };
1614
1615 static void mdt_unlock_list(struct mdt_thread_info *info,
1616                             struct list_head *list, int decref)
1617 {
1618         struct mdt_sub_lock *msl;
1619         struct mdt_sub_lock *tmp;
1620
1621         list_for_each_entry_safe(msl, tmp, list, msl_linkage) {
1622                 mdt_object_unlock_put(info, msl->msl_obj, &msl->msl_lh, decref);
1623                 list_del(&msl->msl_linkage);
1624                 OBD_FREE_PTR(msl);
1625         }
1626 }
1627
1628 static inline void mdt_migrate_object_unlock(struct mdt_thread_info *info,
1629                                              struct mdt_object *obj,
1630                                              struct mdt_lock_handle *lh,
1631                                              struct ldlm_enqueue_info *einfo,
1632                                              struct list_head *slave_locks,
1633                                              int decref)
1634 {
1635         if (mdt_object_remote(obj)) {
1636                 mdt_unlock_list(info, slave_locks, decref);
1637                 mdt_object_unlock(info, obj, lh, decref);
1638         } else {
1639                 mdt_reint_striped_unlock(info, obj, lh, einfo, decref);
1640         }
1641 }
1642
1643 /*
1644  * lock parents of links, and also check whether total locks don't exceed
1645  * RS_MAX_LOCKS.
1646  *
1647  * \retval      0 on success, and locks can be saved in ptlrpc_reply_stat
1648  * \retval      1 on success, but total lock count may exceed RS_MAX_LOCKS
1649  * \retval      -ev negative errno upon error
1650  */
1651 static int mdt_link_parents_lock(struct mdt_thread_info *info,
1652                                  struct mdt_object *pobj,
1653                                  const struct md_attr *ma,
1654                                  struct mdt_object *obj,
1655                                  struct mdt_lock_handle *lhp,
1656                                  struct ldlm_enqueue_info *peinfo,
1657                                  struct list_head *parent_slave_locks,
1658                                  struct list_head *link_locks)
1659 {
1660         struct mdt_device *mdt = info->mti_mdt;
1661         struct lu_buf *buf = &info->mti_big_buf;
1662         struct lu_name *lname = &info->mti_name;
1663         struct linkea_data ldata = { NULL };
1664         bool blocked = false;
1665         int local_lnkp_cnt = 0;
1666         int rc;
1667
1668         ENTRY;
1669         if (S_ISDIR(lu_object_attr(&obj->mot_obj)))
1670                 RETURN(0);
1671
1672         buf = lu_buf_check_and_alloc(buf, MAX_LINKEA_SIZE);
1673         if (buf->lb_buf == NULL)
1674                 RETURN(-ENOMEM);
1675
1676         ldata.ld_buf = buf;
1677         rc = mdt_links_read(info, obj, &ldata);
1678         if (rc) {
1679                 if (rc == -ENOENT || rc == -ENODATA)
1680                         rc = 0;
1681                 RETURN(rc);
1682         }
1683
1684         for (linkea_first_entry(&ldata); ldata.ld_lee && !rc;
1685              linkea_next_entry(&ldata)) {
1686                 struct mdt_object *lnkp;
1687                 struct mdt_sub_lock *msl;
1688                 struct lu_fid fid;
1689                 __u64 ibits;
1690
1691                 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, lname,
1692                                     &fid);
1693
1694                 /* check if it's also linked to parent */
1695                 if (lu_fid_eq(mdt_object_fid(pobj), &fid)) {
1696                         CDEBUG(D_INFO, "skip parent "DFID", reovke "DNAME"\n",
1697                                PFID(&fid), PNAME(lname));
1698                         /* in case link is remote object, revoke LOOKUP lock */
1699                         rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
1700                         continue;
1701                 }
1702
1703                 lnkp = NULL;
1704
1705                 /* check if it's linked to a stripe of parent */
1706                 if (ma->ma_valid & MA_LMV) {
1707                         struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1708                         struct lu_fid *stripe_fid = &info->mti_tmp_fid1;
1709                         int j = 0;
1710
1711                         for (; j < le32_to_cpu(lmv->lmv_stripe_count); j++) {
1712                                 fid_le_to_cpu(stripe_fid,
1713                                               &lmv->lmv_stripe_fids[j]);
1714                                 if (lu_fid_eq(stripe_fid, &fid)) {
1715                                         CDEBUG(D_INFO, "skip stripe "DFID
1716                                                ", reovke "DNAME"\n",
1717                                                PFID(&fid), PNAME(lname));
1718                                         lnkp = mdt_object_find(info->mti_env,
1719                                                                mdt, &fid);
1720                                         if (IS_ERR(lnkp))
1721                                                 GOTO(out, rc = PTR_ERR(lnkp));
1722                                         break;
1723                                 }
1724                         }
1725
1726                         if (lnkp) {
1727                                 rc = mdt_revoke_remote_lookup_lock(info, lnkp,
1728                                                                    obj);
1729                                 mdt_object_put(info->mti_env, lnkp);
1730                                 continue;
1731                         }
1732                 }
1733
1734                 /* Check if it's already locked */
1735                 list_for_each_entry(msl, link_locks, msl_linkage) {
1736                         if (lu_fid_eq(mdt_object_fid(msl->msl_obj), &fid)) {
1737                                 CDEBUG(D_INFO,
1738                                        DFID" was locked, revoke "DNAME"\n",
1739                                        PFID(&fid), PNAME(lname));
1740                                 lnkp = msl->msl_obj;
1741                                 break;
1742                         }
1743                 }
1744
1745                 if (lnkp) {
1746                         rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
1747                         continue;
1748                 }
1749
1750                 CDEBUG(D_INFO, "lock "DFID":"DNAME"\n",
1751                        PFID(&fid), PNAME(lname));
1752
1753                 lnkp = mdt_object_find(info->mti_env, mdt, &fid);
1754                 if (IS_ERR(lnkp)) {
1755                         CWARN("%s: cannot find obj "DFID": %ld\n",
1756                               mdt_obd_name(mdt), PFID(&fid), PTR_ERR(lnkp));
1757                         continue;
1758                 }
1759
1760                 if (!mdt_object_exists(lnkp)) {
1761                         CDEBUG(D_INFO, DFID" doesn't exist, skip "DNAME"\n",
1762                               PFID(&fid), PNAME(lname));
1763                         mdt_object_put(info->mti_env, lnkp);
1764                         continue;
1765                 }
1766
1767                 if (!mdt_object_remote(lnkp))
1768                         local_lnkp_cnt++;
1769
1770                 OBD_ALLOC_PTR(msl);
1771                 if (msl == NULL)
1772                         GOTO(out, rc = -ENOMEM);
1773
1774                 /*
1775                  * we can't follow parent-child lock order like other MD
1776                  * operations, use lock_try here to avoid deadlock, if the lock
1777                  * cannot be taken, drop all locks taken, revoke the blocked
1778                  * one, and continue processing the remaining entries, and in
1779                  * the end of the loop restart from beginning.
1780                  */
1781                 mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
1782                 ibits = 0;
1783                 rc = mdt_object_lock_try(info, lnkp, &msl->msl_lh, &ibits,
1784                                          MDS_INODELOCK_UPDATE, true);
1785                 if (!(ibits & MDS_INODELOCK_UPDATE)) {
1786
1787                         CDEBUG(D_INFO, "busy lock on "DFID" "DNAME"\n",
1788                                PFID(&fid), PNAME(lname));
1789
1790                         mdt_unlock_list(info, link_locks, 1);
1791                         /* also unlock parent locks to avoid deadlock */
1792                         if (!blocked)
1793                                 mdt_migrate_object_unlock(info, pobj, lhp,
1794                                                           peinfo,
1795                                                           parent_slave_locks,
1796                                                           1);
1797
1798                         blocked = true;
1799
1800                         mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
1801                         rc = mdt_object_lock(info, lnkp, &msl->msl_lh,
1802                                              MDS_INODELOCK_UPDATE);
1803                         if (rc) {
1804                                 mdt_object_put(info->mti_env, lnkp);
1805                                 OBD_FREE_PTR(msl);
1806                                 GOTO(out, rc);
1807                         }
1808
1809                         if (mdt_object_remote(lnkp)) {
1810                                 struct ldlm_lock *lock;
1811
1812                                 /*
1813                                  * for remote object, set lock cb_atomic,
1814                                  * so lock can be released in blocking_ast()
1815                                  * immediately, then the next lock_try will
1816                                  * have better chance of success.
1817                                  */
1818                                 lock = ldlm_handle2lock(
1819                                                 &msl->msl_lh.mlh_rreg_lh);
1820                                 LASSERT(lock != NULL);
1821                                 lock_res_and_lock(lock);
1822                                 ldlm_set_atomic_cb(lock);
1823                                 unlock_res_and_lock(lock);
1824                                 LDLM_LOCK_PUT(lock);
1825                         }
1826
1827                         mdt_object_unlock_put(info, lnkp, &msl->msl_lh, 1);
1828                         OBD_FREE_PTR(msl);
1829                         continue;
1830                 }
1831
1832                 INIT_LIST_HEAD(&msl->msl_linkage);
1833                 msl->msl_obj = lnkp;
1834                 list_add_tail(&msl->msl_linkage, link_locks);
1835
1836                 rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
1837         }
1838
1839         if (blocked)
1840                 GOTO(out, rc = -EBUSY);
1841
1842         EXIT;
1843 out:
1844         if (rc) {
1845                 mdt_unlock_list(info, link_locks, rc);
1846         } else if (local_lnkp_cnt > RS_MAX_LOCKS - 5) {
1847                 CDEBUG(D_INFO, "Too many links (%d), sync operations\n",
1848                        local_lnkp_cnt);
1849                 /*
1850                  * parent may have 3 local objects: master object and 2 stripes
1851                  * (if it's being migrated too); source may have 1 local objects
1852                  * as regular file; target has 1 local object.
1853                  * Note, source may have 2 local locks if it is directory but it
1854                  * can't have hardlinks, so it is not considered here.
1855                  */
1856                 rc = 1;
1857         }
1858         return rc;
1859 }
1860
1861 static int mdt_lock_remote_slaves(struct mdt_thread_info *info,
1862                                   struct mdt_object *obj,
1863                                   const struct md_attr *ma,
1864                                   struct list_head *slave_locks)
1865 {
1866         struct mdt_device *mdt = info->mti_mdt;
1867         const struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1868         struct lu_fid *fid = &info->mti_tmp_fid1;
1869         struct mdt_object *slave;
1870         struct mdt_sub_lock *msl;
1871         int i;
1872         int rc;
1873
1874         ENTRY;
1875         LASSERT(mdt_object_remote(obj));
1876         LASSERT(ma->ma_valid & MA_LMV);
1877         LASSERT(lmv);
1878
1879         if (!lmv_is_sane(lmv))
1880                 RETURN(-EINVAL);
1881
1882         for (i = 0; i < le32_to_cpu(lmv->lmv_stripe_count); i++) {
1883                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[i]);
1884
1885                 if (!fid_is_sane(fid))
1886                         continue;
1887
1888                 slave = mdt_object_find(info->mti_env, mdt, fid);
1889                 if (IS_ERR(slave))
1890                         GOTO(out, rc = PTR_ERR(slave));
1891
1892                 OBD_ALLOC_PTR(msl);
1893                 if (!msl) {
1894                         mdt_object_put(info->mti_env, slave);
1895                         GOTO(out, rc = -ENOMEM);
1896                 }
1897
1898                 mdt_lock_reg_init(&msl->msl_lh, LCK_EX);
1899                 rc = mdt_reint_object_lock(info, slave, &msl->msl_lh,
1900                                            MDS_INODELOCK_UPDATE, true);
1901                 if (rc) {
1902                         OBD_FREE_PTR(msl);
1903                         mdt_object_put(info->mti_env, slave);
1904                         GOTO(out, rc);
1905                 }
1906
1907                 INIT_LIST_HEAD(&msl->msl_linkage);
1908                 msl->msl_obj = slave;
1909                 list_add_tail(&msl->msl_linkage, slave_locks);
1910         }
1911         EXIT;
1912
1913 out:
1914         if (rc)
1915                 mdt_unlock_list(info, slave_locks, rc);
1916         return rc;
1917 }
1918
1919 /* lock parent and its stripes */
1920 static int mdt_migrate_parent_lock(struct mdt_thread_info *info,
1921                                    struct mdt_object *obj,
1922                                    const struct md_attr *ma,
1923                                    struct mdt_lock_handle *lh,
1924                                    struct ldlm_enqueue_info *einfo,
1925                                    struct list_head *slave_locks)
1926 {
1927         int rc;
1928
1929         if (mdt_object_remote(obj)) {
1930                 rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
1931                                             &lh->mlh_rreg_lh, LCK_PW,
1932                                             MDS_INODELOCK_UPDATE, false);
1933                 if (rc != ELDLM_OK)
1934                         return rc;
1935
1936                 /*
1937                  * if obj is remote and striped, lock its stripes explicitly
1938                  * because it's not striped in LOD layer on this MDT.
1939                  */
1940                 if (ma->ma_valid & MA_LMV) {
1941                         rc = mdt_lock_remote_slaves(info, obj, ma, slave_locks);
1942                         if (rc)
1943                                 mdt_object_unlock(info, obj, lh, rc);
1944                 }
1945         } else {
1946                 rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_UPDATE,
1947                                             einfo, true);
1948         }
1949
1950         return rc;
1951 }
1952
1953 /*
1954  * in migration, object may be remote, and we need take full lock of it and its
1955  * stripes if it's directory, besides, object may be a remote object on its
1956  * parent, revoke its LOOKUP lock on where its parent is located.
1957  */
1958 static int mdt_migrate_object_lock(struct mdt_thread_info *info,
1959                                    struct mdt_object *pobj,
1960                                    struct mdt_object *obj,
1961                                    struct mdt_lock_handle *lh,
1962                                    struct ldlm_enqueue_info *einfo,
1963                                    struct list_head *slave_locks)
1964 {
1965         int rc;
1966
1967         if (mdt_object_remote(obj)) {
1968                 rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
1969                 if (rc)
1970                         return rc;
1971
1972                 rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
1973                                             &lh->mlh_rreg_lh, LCK_EX,
1974                                             MDS_INODELOCK_FULL, false);
1975                 if (rc != ELDLM_OK)
1976                         return rc;
1977
1978                 /*
1979                  * if obj is remote and striped, lock its stripes explicitly
1980                  * because it's not striped in LOD layer on this MDT.
1981                  */
1982                 if (S_ISDIR(lu_object_attr(&obj->mot_obj))) {
1983                         struct md_attr *ma = &info->mti_attr;
1984
1985                         rc = mdt_stripe_get(info, obj, ma, XATTR_NAME_LMV);
1986                         if (rc) {
1987                                 mdt_object_unlock(info, obj, lh, rc);
1988                                 return rc;
1989                         }
1990
1991                         if (ma->ma_valid & MA_LMV) {
1992                                 rc = mdt_lock_remote_slaves(info, obj, ma,
1993                                                             slave_locks);
1994                                 if (rc)
1995                                         mdt_object_unlock(info, obj, lh, rc);
1996                         }
1997                 }
1998         } else {
1999                 if (mdt_object_remote(pobj)) {
2000                         rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
2001                         if (rc)
2002                                 return rc;
2003                 }
2004
2005                 rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_FULL,
2006                                             einfo, true);
2007         }
2008
2009         return rc;
2010 }
2011
2012 /*
2013  * lookup source by name, if parent is striped directory, we need to find the
2014  * corresponding stripe where source is located, and then lookup there.
2015  *
2016  * besides, if parent is migrating too, and file is already in target stripe,
2017  * this should be a redo of 'lfs migrate' on client side.
2018  */
2019 static int mdt_migrate_lookup(struct mdt_thread_info *info,
2020                               struct mdt_object *pobj,
2021                               const struct md_attr *ma,
2022                               const struct lu_name *lname,
2023                               struct mdt_object **spobj,
2024                               struct mdt_object **sobj)
2025 {
2026         const struct lu_env *env = info->mti_env;
2027         struct lu_fid *fid = &info->mti_tmp_fid1;
2028         struct mdt_object *stripe;
2029         int rc;
2030
2031         if (ma->ma_valid & MA_LMV) {
2032                 /* if parent is striped, lookup on corresponding stripe */
2033                 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
2034
2035                 if (!lmv_is_sane(lmv))
2036                         return -EBADF;
2037
2038                 rc = lmv_name_to_stripe_index_old(lmv, lname->ln_name,
2039                                                   lname->ln_namelen);
2040                 if (rc < 0)
2041                         return rc;
2042
2043                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
2044
2045                 stripe = mdt_object_find(env, info->mti_mdt, fid);
2046                 if (IS_ERR(stripe))
2047                         return PTR_ERR(stripe);
2048
2049                 fid_zero(fid);
2050                 rc = mdo_lookup(env, mdt_object_child(stripe), lname, fid,
2051                                 &info->mti_spec);
2052                 if (rc == -ENOENT && lmv_is_layout_changing(lmv)) {
2053                         /*
2054                          * if parent layout is changeing, and lookup child
2055                          * failed on source stripe, lookup again on target
2056                          * stripe, if it exists, it means previous migration
2057                          * was interrupted, and current file was migrated
2058                          * already.
2059                          */
2060                         mdt_object_put(env, stripe);
2061
2062                         rc = lmv_name_to_stripe_index(lmv, lname->ln_name,
2063                                                       lname->ln_namelen);
2064                         if (rc < 0)
2065                                 return rc;
2066
2067                         fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
2068
2069                         stripe = mdt_object_find(env, info->mti_mdt, fid);
2070                         if (IS_ERR(stripe))
2071                                 return PTR_ERR(stripe);
2072
2073                         fid_zero(fid);
2074                         rc = mdo_lookup(env, mdt_object_child(stripe), lname,
2075                                         fid, &info->mti_spec);
2076                         mdt_object_put(env, stripe);
2077                         return rc ?: -EALREADY;
2078                 } else if (rc) {
2079                         mdt_object_put(env, stripe);
2080                         return rc;
2081                 }
2082         } else {
2083                 fid_zero(fid);
2084                 rc = mdo_lookup(env, mdt_object_child(pobj), lname, fid,
2085                                 &info->mti_spec);
2086                 if (rc)
2087                         return rc;
2088
2089                 stripe = pobj;
2090                 mdt_object_get(env, stripe);
2091         }
2092
2093         *spobj = stripe;
2094
2095         *sobj = mdt_object_find(env, info->mti_mdt, fid);
2096         if (IS_ERR(*sobj)) {
2097                 mdt_object_put(env, stripe);
2098                 rc = PTR_ERR(*sobj);
2099                 *spobj = NULL;
2100                 *sobj = NULL;
2101         }
2102
2103         return rc;
2104 }
2105
2106 /* end lease and close file for regular file */
2107 static int mdd_migrate_close(struct mdt_thread_info *info,
2108                              struct mdt_object *obj)
2109 {
2110         struct close_data *data;
2111         struct mdt_body *repbody;
2112         struct ldlm_lock *lease;
2113         int rc;
2114         int rc2;
2115
2116         rc = -EPROTO;
2117         if (!req_capsule_field_present(info->mti_pill, &RMF_MDT_EPOCH,
2118                                       RCL_CLIENT) ||
2119             !req_capsule_field_present(info->mti_pill, &RMF_CLOSE_DATA,
2120                                       RCL_CLIENT))
2121                 goto close;
2122
2123         data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
2124         if (!data)
2125                 goto close;
2126
2127         rc = -ESTALE;
2128         lease = ldlm_handle2lock(&data->cd_handle);
2129         if (!lease)
2130                 goto close;
2131
2132         /* check if the lease was already canceled */
2133         lock_res_and_lock(lease);
2134         rc = ldlm_is_cancel(lease);
2135         unlock_res_and_lock(lease);
2136
2137         if (rc) {
2138                 rc = -EAGAIN;
2139                 LDLM_DEBUG(lease, DFID" lease broken",
2140                            PFID(mdt_object_fid(obj)));
2141         }
2142
2143         /*
2144          * cancel server side lease, client side counterpart should have been
2145          * cancelled, it's okay to cancel it now as we've held mot_open_sem.
2146          */
2147         ldlm_lock_cancel(lease);
2148         ldlm_reprocess_all(lease->l_resource, lease);
2149         LDLM_LOCK_PUT(lease);
2150
2151 close:
2152         rc2 = mdt_close_internal(info, mdt_info_req(info), NULL);
2153         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2154         repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
2155
2156         return rc ?: rc2;
2157 }
2158
2159 /*
2160  * migrate file in below steps:
2161  *  1. lock parent and its stripes
2162  *  2. lookup source by name
2163  *  3. lock parents of source links if source is not directory
2164  *  4. reject if source is in HSM
2165  *  5. take source open_sem and close file if source is regular file
2166  *  6. lock source and its stripes if it's directory
2167  *  7. lock target so subsequent change to it can trigger COS
2168  *  8. migrate file
2169  *  9. unlock above locks
2170  * 10. sync device if source has links
2171  */
2172 int mdt_reint_migrate(struct mdt_thread_info *info,
2173                       struct mdt_lock_handle *unused)
2174 {
2175         const struct lu_env *env = info->mti_env;
2176         struct mdt_device *mdt = info->mti_mdt;
2177         struct ptlrpc_request *req = mdt_info_req(info);
2178         struct mdt_reint_record *rr = &info->mti_rr;
2179         struct lu_ucred *uc = mdt_ucred(info);
2180         struct md_attr *ma = &info->mti_attr;
2181         struct ldlm_enqueue_info *peinfo = &info->mti_einfo[0];
2182         struct ldlm_enqueue_info *seinfo = &info->mti_einfo[1];
2183         struct mdt_object *pobj;
2184         struct mdt_object *spobj = NULL;
2185         struct mdt_object *sobj = NULL;
2186         struct mdt_object *tobj;
2187         struct lustre_handle rename_lh = { 0 };
2188         struct mdt_lock_handle *lhp;
2189         struct mdt_lock_handle *lhs;
2190         struct mdt_lock_handle *lht;
2191         LIST_HEAD(parent_slave_locks);
2192         LIST_HEAD(child_slave_locks);
2193         LIST_HEAD(link_locks);
2194         int lock_retries = 5;
2195         bool open_sem_locked = false;
2196         bool do_sync = false;
2197         int rc;
2198
2199         ENTRY;
2200         CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1),
2201                PNAME(&rr->rr_name), PFID(rr->rr_fid2));
2202
2203         if (info->mti_dlm_req)
2204                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2205
2206         if (!fid_is_md_operative(rr->rr_fid1) ||
2207             !fid_is_md_operative(rr->rr_fid2))
2208                 RETURN(-EPERM);
2209
2210         /* don't allow migrate . or .. */
2211         if (lu_name_is_dot_or_dotdot(&rr->rr_name))
2212                 RETURN(-EBUSY);
2213
2214         if (!mdt->mdt_enable_remote_dir || !mdt->mdt_enable_dir_migration)
2215                 RETURN(-EPERM);
2216
2217         if (uc && !cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
2218             uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
2219             mdt->mdt_enable_remote_dir_gid != -1)
2220                 RETURN(-EPERM);
2221
2222         /*
2223          * Note: do not enqueue rename lock for replay request, because
2224          * if other MDT holds rename lock, but being blocked to wait for
2225          * this MDT to finish its recovery, and the failover MDT can not
2226          * get rename lock, which will cause deadlock.
2227          *
2228          * req is NULL if this is called by directory auto-split.
2229          */
2230         if (req && !req_is_replay(req)) {
2231                 rc = mdt_rename_lock(info, &rename_lh);
2232                 if (rc != 0) {
2233                         CERROR("%s: can't lock FS for rename: rc = %d\n",
2234                                mdt_obd_name(info->mti_mdt), rc);
2235                         RETURN(rc);
2236                 }
2237         }
2238
2239         /* pobj is master object of parent */
2240         pobj = mdt_object_find(env, mdt, rr->rr_fid1);
2241         if (IS_ERR(pobj))
2242                 GOTO(unlock_rename, rc = PTR_ERR(pobj));
2243
2244         if (req) {
2245                 rc = mdt_version_get_check(info, pobj, 0);
2246                 if (rc)
2247                         GOTO(put_parent, rc);
2248         }
2249
2250         if (!mdt_object_exists(pobj))
2251                 GOTO(put_parent, rc = -ENOENT);
2252
2253         if (!S_ISDIR(lu_object_attr(&pobj->mot_obj)))
2254                 GOTO(put_parent, rc = -ENOTDIR);
2255
2256         rc = mdt_stripe_get(info, pobj, ma, XATTR_NAME_LMV);
2257         if (rc)
2258                 GOTO(put_parent, rc);
2259
2260 lock_parent:
2261         /* lock parent object */
2262         lhp = &info->mti_lh[MDT_LH_PARENT];
2263         mdt_lock_reg_init(lhp, LCK_PW);
2264         rc = mdt_migrate_parent_lock(info, pobj, ma, lhp, peinfo,
2265                                      &parent_slave_locks);
2266         if (rc)
2267                 GOTO(put_parent, rc);
2268
2269         /*
2270          * spobj is the corresponding stripe against name if pobj is striped
2271          * directory, which is the real parent, and no need to lock, because
2272          * we've taken full lock of pobj.
2273          */
2274         rc = mdt_migrate_lookup(info, pobj, ma, &rr->rr_name, &spobj, &sobj);
2275         if (rc)
2276                 GOTO(unlock_parent, rc);
2277
2278         /* lock parents of source links, and revoke LOOKUP lock of links */
2279         rc = mdt_link_parents_lock(info, pobj, ma, sobj, lhp, peinfo,
2280                                    &parent_slave_locks, &link_locks);
2281         if (rc == -EBUSY && lock_retries-- > 0) {
2282                 mdt_object_put(env, sobj);
2283                 mdt_object_put(env, spobj);
2284                 goto lock_parent;
2285         }
2286
2287         if (rc < 0)
2288                 GOTO(put_source, rc);
2289
2290         /*
2291          * RS_MAX_LOCKS is the limit of number of locks that can be saved along
2292          * with one request, if total lock count exceeds this limit, we will
2293          * drop all locks after migration, and synchronous device in the end.
2294          */
2295         do_sync = rc;
2296
2297         /* TODO: DoM migration is not supported, migrate dirent only */
2298         if (S_ISREG(lu_object_attr(&sobj->mot_obj))) {
2299                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LOV);
2300                 if (rc)
2301                         GOTO(unlock_links, rc);
2302
2303                 if (ma->ma_valid & MA_LOV && mdt_lmm_dom_stripesize(ma->ma_lmm))
2304                         info->mti_spec.sp_migrate_nsonly = 1;
2305         } else if (S_ISDIR(lu_object_attr(&sobj->mot_obj))) {
2306                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2307                 if (rc)
2308                         GOTO(unlock_links, rc);
2309
2310                 /* race with restripe/auto-split? */
2311                 if ((ma->ma_valid & MA_LMV) &&
2312                     lmv_is_restriping(&ma->ma_lmv->lmv_md_v1))
2313                         GOTO(unlock_links, rc = -EBUSY);
2314         }
2315
2316         /* if migration HSM is allowed */
2317         if (!mdt->mdt_opts.mo_migrate_hsm_allowed) {
2318                 ma->ma_need = MA_HSM;
2319                 ma->ma_valid = 0;
2320                 rc = mdt_attr_get_complex(info, sobj, ma);
2321                 if (rc)
2322                         GOTO(unlock_links, rc);
2323
2324                 if ((ma->ma_valid & MA_HSM) && ma->ma_hsm.mh_flags != 0)
2325                         GOTO(unlock_links, rc = -EOPNOTSUPP);
2326         }
2327
2328         /* end lease and close file for regular file */
2329         if (info->mti_spec.sp_migrate_close) {
2330                 /* try to hold open_sem so that nobody else can open the file */
2331                 if (!down_write_trylock(&sobj->mot_open_sem)) {
2332                         /* close anyway */
2333                         mdd_migrate_close(info, sobj);
2334                         GOTO(unlock_links, rc = -EBUSY);
2335                 } else {
2336                         open_sem_locked = true;
2337                         rc = mdd_migrate_close(info, sobj);
2338                         if (rc)
2339                                 GOTO(unlock_open_sem, rc);
2340                 }
2341         }
2342
2343         /* lock source */
2344         lhs = &info->mti_lh[MDT_LH_OLD];
2345         mdt_lock_reg_init(lhs, LCK_EX);
2346         rc = mdt_migrate_object_lock(info, spobj, sobj, lhs, seinfo,
2347                                      &child_slave_locks);
2348         if (rc)
2349                 GOTO(unlock_open_sem, rc);
2350
2351         /* lock target */
2352         tobj = mdt_object_find(env, mdt, rr->rr_fid2);
2353         if (IS_ERR(tobj))
2354                 GOTO(unlock_source, rc = PTR_ERR(tobj));
2355
2356         lht = &info->mti_lh[MDT_LH_NEW];
2357         mdt_lock_reg_init(lht, LCK_EX);
2358         rc = mdt_reint_object_lock(info, tobj, lht, MDS_INODELOCK_FULL, true);
2359         if (rc)
2360                 GOTO(put_target, rc);
2361
2362         /* Don't do lookup sanity check. We know name doesn't exist. */
2363         info->mti_spec.sp_cr_lookup = 0;
2364         info->mti_spec.sp_feat = &dt_directory_features;
2365
2366         rc = mdo_migrate(env, mdt_object_child(pobj),
2367                          mdt_object_child(sobj), &rr->rr_name,
2368                          mdt_object_child(tobj),
2369                          &info->mti_spec, ma);
2370         if (!rc)
2371                 lprocfs_counter_incr(mdt->mdt_lu_dev.ld_obd->obd_md_stats,
2372                                      LPROC_MDT_MIGRATE + LPROC_MD_LAST_OPC);
2373         EXIT;
2374
2375         mdt_object_unlock(info, tobj, lht, rc);
2376 put_target:
2377         mdt_object_put(env, tobj);
2378 unlock_source:
2379         mdt_migrate_object_unlock(info, sobj, lhs, seinfo,
2380                                   &child_slave_locks, rc);
2381 unlock_open_sem:
2382         if (open_sem_locked)
2383                 up_write(&sobj->mot_open_sem);
2384 unlock_links:
2385         /* if we've got too many locks to save into RPC,
2386          * then just commit before the locks are released
2387          */
2388         if (!rc && do_sync)
2389                 mdt_device_sync(env, mdt);
2390         mdt_unlock_list(info, &link_locks, do_sync ? 1 : rc);
2391 put_source:
2392         mdt_object_put(env, sobj);
2393         mdt_object_put(env, spobj);
2394 unlock_parent:
2395         mdt_migrate_object_unlock(info, pobj, lhp, peinfo,
2396                                   &parent_slave_locks, rc);
2397 put_parent:
2398         mdt_object_put(env, pobj);
2399 unlock_rename:
2400         if (lustre_handle_is_used(&rename_lh))
2401                 mdt_rename_unlock(&rename_lh);
2402
2403         return rc;
2404 }
2405
2406 static int mdt_object_lock_save(struct mdt_thread_info *info,
2407                                 struct mdt_object *dir,
2408                                 struct mdt_lock_handle *lh,
2409                                 int idx, bool cos_incompat)
2410 {
2411         int rc;
2412
2413         /* we lock the target dir if it is local */
2414         rc = mdt_reint_object_lock(info, dir, lh, MDS_INODELOCK_UPDATE,
2415                                    cos_incompat);
2416         if (rc != 0)
2417                 return rc;
2418
2419         /* get and save correct version after locking */
2420         mdt_version_get_save(info, dir, idx);
2421         return 0;
2422 }
2423
2424 /*
2425  * determine lock order of sobj and tobj
2426  *
2427  * there are two situations we need to lock tobj before sobj:
2428  * 1. sobj is child of tobj
2429  * 2. sobj and tobj are stripes of a directory, and stripe index of sobj is
2430  *    larger than that of tobj
2431  *
2432  * \retval      1 lock tobj before sobj
2433  * \retval      0 lock sobj before tobj
2434  * \retval      -ev negative errno upon error
2435  */
2436 static int mdt_rename_determine_lock_order(struct mdt_thread_info *info,
2437                                            struct mdt_object *sobj,
2438                                            struct mdt_object *tobj)
2439 {
2440         struct md_attr *ma = &info->mti_attr;
2441         struct lu_fid *spfid = &info->mti_tmp_fid1;
2442         struct lu_fid *tpfid = &info->mti_tmp_fid2;
2443         struct lmv_mds_md_v1 *lmv;
2444         __u32 sindex;
2445         __u32 tindex;
2446         int rc;
2447
2448         /* sobj and tobj are the same */
2449         if (sobj == tobj)
2450                 return 0;
2451
2452         if (fid_is_root(mdt_object_fid(sobj)))
2453                 return 0;
2454
2455         if (fid_is_root(mdt_object_fid(tobj)))
2456                 return 1;
2457
2458         /* check whether sobj is child of tobj */
2459         rc = mdo_is_subdir(info->mti_env, mdt_object_child(sobj),
2460                            mdt_object_fid(tobj));
2461         if (rc < 0)
2462                 return rc;
2463
2464         if (rc == 1)
2465                 return 1;
2466
2467         /* check whether sobj and tobj are children of the same parent */
2468         rc = mdt_attr_get_pfid(info, sobj, spfid);
2469         if (rc)
2470                 return rc;
2471
2472         rc = mdt_attr_get_pfid(info, tobj, tpfid);
2473         if (rc)
2474                 return rc;
2475
2476         if (!lu_fid_eq(spfid, tpfid))
2477                 return 0;
2478
2479         /* check whether sobj and tobj are sibling stripes */
2480         rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2481         if (rc)
2482                 return rc;
2483
2484         if (!(ma->ma_valid & MA_LMV))
2485                 return 0;
2486
2487         lmv = &ma->ma_lmv->lmv_md_v1;
2488         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2489                 return 0;
2490         sindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2491
2492         ma->ma_valid = 0;
2493         rc = mdt_stripe_get(info, tobj, ma, XATTR_NAME_LMV);
2494         if (rc)
2495                 return rc;
2496
2497         if (!(ma->ma_valid & MA_LMV))
2498                 return -ENODATA;
2499
2500         lmv = &ma->ma_lmv->lmv_md_v1;
2501         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2502                 return -EINVAL;
2503         tindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2504
2505         /* check stripe index of sobj and tobj */
2506         if (sindex == tindex)
2507                 return -EINVAL;
2508
2509         return sindex < tindex ? 0 : 1;
2510 }
2511
2512 /*
2513  * lock rename source object.
2514  *
2515  * Both source and source parent may be remote, and source may be a remote
2516  * object on source parent, to avoid overriding lock handle, store remote
2517  * LOOKUP lock separately in @lhr.
2518  *
2519  * \retval      0 on success
2520  * \retval      -ev negative errno upon error
2521  */
2522 static int mdt_rename_source_lock(struct mdt_thread_info *info,
2523                                   struct mdt_object *parent,
2524                                   struct mdt_object *child,
2525                                   struct mdt_lock_handle *lhc,
2526                                   struct mdt_lock_handle *lhr,
2527                                   __u64 ibits,
2528                                   bool cos_incompat)
2529 {
2530         int rc;
2531
2532         rc = mdt_is_remote_object(info, parent, child);
2533         if (rc < 0)
2534                 return rc;
2535
2536         if (rc) {
2537                 /* enqueue remote LOOKUP lock from the parent MDT */
2538                 __u64 rmt_ibits = MDS_INODELOCK_LOOKUP;
2539
2540                 if (mdt_object_remote(parent)) {
2541                         rc = mdt_remote_object_lock(info, parent,
2542                                                     mdt_object_fid(child),
2543                                                     &lhr->mlh_rreg_lh,
2544                                                     lhr->mlh_rreg_mode,
2545                                                     rmt_ibits, false);
2546                         if (rc != ELDLM_OK)
2547                                 return rc;
2548                 } else {
2549                         LASSERT(mdt_object_remote(child));
2550                         rc = mdt_object_local_lock(info, child, lhr,
2551                                                    &rmt_ibits, 0, true);
2552                         if (rc < 0)
2553                                 return rc;
2554                 }
2555
2556                 ibits &= ~MDS_INODELOCK_LOOKUP;
2557         }
2558
2559         if (mdt_object_remote(child)) {
2560                 rc = mdt_remote_object_lock(info, child, mdt_object_fid(child),
2561                                             &lhc->mlh_rreg_lh,
2562                                             lhc->mlh_rreg_mode,
2563                                             ibits, false);
2564                 if (rc == ELDLM_OK)
2565                         rc = 0;
2566         } else {
2567                 rc = mdt_reint_object_lock(info, child, lhc, ibits,
2568                                            cos_incompat);
2569         }
2570
2571         if (!rc)
2572                 mdt_object_unlock(info, child, lhr, rc);
2573
2574         return rc;
2575 }
2576
2577 /*
2578  * VBR: rename versions in reply: 0 - srcdir parent; 1 - tgtdir parent;
2579  * 2 - srcdir child; 3 - tgtdir child.
2580  * Update on disk version of srcdir child.
2581  */
2582 static int mdt_reint_rename(struct mdt_thread_info *info,
2583                             struct mdt_lock_handle *unused)
2584 {
2585         struct mdt_device *mdt = info->mti_mdt;
2586         struct mdt_reint_record *rr = &info->mti_rr;
2587         struct md_attr *ma = &info->mti_attr;
2588         struct ptlrpc_request *req = mdt_info_req(info);
2589         struct mdt_object *msrcdir = NULL;
2590         struct mdt_object *mtgtdir = NULL;
2591         struct mdt_object *mold;
2592         struct mdt_object *mnew = NULL;
2593         struct lustre_handle rename_lh = { 0 };
2594         struct mdt_lock_handle *lh_srcdirp;
2595         struct mdt_lock_handle *lh_tgtdirp;
2596         struct mdt_lock_handle *lh_oldp = NULL;
2597         struct mdt_lock_handle *lh_rmt = NULL;
2598         struct mdt_lock_handle *lh_newp = NULL;
2599         struct lu_fid *old_fid = &info->mti_tmp_fid1;
2600         struct lu_fid *new_fid = &info->mti_tmp_fid2;
2601         __u64 lock_ibits;
2602         bool reverse = false, discard = false;
2603         bool cos_incompat;
2604         ktime_t kstart = ktime_get();
2605         int rc;
2606
2607         ENTRY;
2608         DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME,
2609                   PFID(rr->rr_fid1), PNAME(&rr->rr_name),
2610                   PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name));
2611
2612         if (info->mti_dlm_req)
2613                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2614
2615         if (!fid_is_md_operative(rr->rr_fid1) ||
2616             !fid_is_md_operative(rr->rr_fid2))
2617                 RETURN(-EPERM);
2618
2619         /* find both parents. */
2620         msrcdir = mdt_parent_find_check(info, rr->rr_fid1, 0);
2621         if (IS_ERR(msrcdir))
2622                 RETURN(PTR_ERR(msrcdir));
2623
2624         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
2625
2626         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
2627                 mtgtdir = msrcdir;
2628                 mdt_object_get(info->mti_env, mtgtdir);
2629         } else {
2630                 mtgtdir = mdt_parent_find_check(info, rr->rr_fid2, 1);
2631                 if (IS_ERR(mtgtdir))
2632                         GOTO(out_put_srcdir, rc = PTR_ERR(mtgtdir));
2633         }
2634
2635         /*
2636          * Note: do not enqueue rename lock for replay request, because
2637          * if other MDT holds rename lock, but being blocked to wait for
2638          * this MDT to finish its recovery, and the failover MDT can not
2639          * get rename lock, which will cause deadlock.
2640          */
2641         if (!req_is_replay(req)) {
2642                 /*
2643                  * Normally rename RPC is handled on the MDT with the target
2644                  * directory (if target exists, it's on the MDT with the
2645                  * target), if the source directory is remote, it's a hint that
2646                  * source is remote too (this may not be true, but it won't
2647                  * cause any issue), return -EXDEV early to avoid taking
2648                  * rename_lock.
2649                  */
2650                 if (!mdt->mdt_enable_remote_rename &&
2651                     mdt_object_remote(msrcdir))
2652                         GOTO(out_put_tgtdir, rc = -EXDEV);
2653
2654                 /* This might be further relaxed in the future for regular file
2655                  * renames in different source and target parents. Start with
2656                  * only same-directory renames for simplicity and because this
2657                  * is by far the most the common use case.
2658                  */
2659                 if (msrcdir != mtgtdir) {
2660                         rc = mdt_rename_lock(info, &rename_lh);
2661                         if (rc != 0) {
2662                                 CERROR("%s: cannot lock for rename: rc = %d\n",
2663                                        mdt_obd_name(mdt), rc);
2664                                 GOTO(out_put_tgtdir, rc);
2665                         }
2666                 } else {
2667                         CDEBUG(D_INFO, "%s: samedir rename "DFID"/"DNAME"\n",
2668                                mdt_obd_name(mdt), PFID(rr->rr_fid1),
2669                                PNAME(&rr->rr_name));
2670                 }
2671         }
2672
2673         rc = mdt_rename_determine_lock_order(info, msrcdir, mtgtdir);
2674         if (rc < 0)
2675                 GOTO(out_unlock_rename, rc);
2676
2677         reverse = rc;
2678
2679         /* source needs to be looked up after locking source parent, otherwise
2680          * this rename may race with unlink source, and cause rename hang, see
2681          * sanityn.sh 55b, so check parents first, if later we found source is
2682          * remote, relock parents.
2683          */
2684         cos_incompat = (mdt_object_remote(msrcdir) ||
2685                         mdt_object_remote(mtgtdir));
2686
2687         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2688
2689         /* lock parents in the proper order. */
2690         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
2691         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
2692
2693         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN);
2694         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN2);
2695 relock:
2696         mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name);
2697         mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name);
2698
2699         if (reverse) {
2700                 rc = mdt_object_lock_save(info, mtgtdir, lh_tgtdirp, 1,
2701                                           cos_incompat);
2702                 if (rc)
2703                         GOTO(out_unlock_rename, rc);
2704
2705                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
2706
2707                 rc = mdt_object_lock_save(info, msrcdir, lh_srcdirp, 0,
2708                                           cos_incompat);
2709                 if (rc != 0) {
2710                         mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
2711                         GOTO(out_unlock_rename, rc);
2712                 }
2713         } else {
2714                 rc = mdt_object_lock_save(info, msrcdir, lh_srcdirp, 0,
2715                                           cos_incompat);
2716                 if (rc)
2717                         GOTO(out_unlock_rename, rc);
2718
2719                 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
2720
2721                 if (mtgtdir != msrcdir) {
2722                         rc = mdt_object_lock_save(info, mtgtdir, lh_tgtdirp, 1,
2723                                                   cos_incompat);
2724                 } else if (!mdt_object_remote(mtgtdir) &&
2725                            lh_srcdirp->mlh_pdo_hash !=
2726                            lh_tgtdirp->mlh_pdo_hash) {
2727                         rc = mdt_pdir_hash_lock(info, lh_tgtdirp, mtgtdir,
2728                                                 MDS_INODELOCK_UPDATE,
2729                                                 cos_incompat);
2730                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
2731                 }
2732                 if (rc != 0) {
2733                         mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
2734                         GOTO(out_unlock_rename, rc);
2735                 }
2736         }
2737
2738         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2739         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME2, 5);
2740
2741         /* find mold object. */
2742         fid_zero(old_fid);
2743         rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2);
2744         if (rc != 0)
2745                 GOTO(out_unlock_parents, rc);
2746
2747         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
2748                 GOTO(out_unlock_parents, rc = -EINVAL);
2749
2750         if (!fid_is_md_operative(old_fid))
2751                 GOTO(out_unlock_parents, rc = -EPERM);
2752
2753         mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
2754         if (IS_ERR(mold))
2755                 GOTO(out_unlock_parents, rc = PTR_ERR(mold));
2756
2757         if (!mdt_object_exists(mold)) {
2758                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2759                                 &mold->mot_obj,
2760                                 "object does not exist");
2761                 GOTO(out_put_old, rc = -ENOENT);
2762         }
2763
2764         if (mdt_object_remote(mold) && !mdt->mdt_enable_remote_rename)
2765                 GOTO(out_put_old, rc = -EXDEV);
2766
2767         /* Check if @mtgtdir is subdir of @mold, before locking child
2768          * to avoid reverse locking.
2769          */
2770         if (mtgtdir != msrcdir) {
2771                 rc = mdo_is_subdir(info->mti_env, mdt_object_child(mtgtdir),
2772                                    old_fid);
2773                 if (rc) {
2774                         if (rc == 1)
2775                                 rc = -EINVAL;
2776                         GOTO(out_put_old, rc);
2777                 }
2778         }
2779
2780         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold));
2781         /* save version after locking */
2782         mdt_version_get_save(info, mold, 2);
2783
2784         if (!cos_incompat && mdt_object_remote(mold)) {
2785                 cos_incompat = true;
2786                 mdt_object_put(info->mti_env, mold);
2787                 mdt_object_unlock(info, mtgtdir, lh_tgtdirp, -EAGAIN);
2788                 mdt_object_unlock(info, msrcdir, lh_srcdirp, -EAGAIN);
2789                 goto relock;
2790         }
2791
2792         /* find mnew object:
2793          * mnew target object may not exist now
2794          * lookup with version checking
2795          */
2796         fid_zero(new_fid);
2797         rc = mdt_lookup_version_check(info, mtgtdir, &rr->rr_tgt_name, new_fid,
2798                                       3);
2799         if (rc == 0) {
2800                 /* the new_fid should have been filled at this moment */
2801                 if (lu_fid_eq(old_fid, new_fid))
2802                         GOTO(out_put_old, rc);
2803
2804                 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
2805                     lu_fid_eq(new_fid, rr->rr_fid2))
2806                         GOTO(out_put_old, rc = -EINVAL);
2807
2808                 if (!fid_is_md_operative(new_fid))
2809                         GOTO(out_put_old, rc = -EPERM);
2810
2811                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
2812                 if (IS_ERR(mnew))
2813                         GOTO(out_put_old, rc = PTR_ERR(mnew));
2814
2815                 if (!mdt_object_exists(mnew)) {
2816                         LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2817                                         &mnew->mot_obj,
2818                                         "object does not exist");
2819                         GOTO(out_put_new, rc = -ENOENT);
2820                 }
2821
2822                 if (mdt_object_remote(mnew)) {
2823                         struct mdt_body  *repbody;
2824
2825                         /* Always send rename req to the target child MDT */
2826                         repbody = req_capsule_server_get(info->mti_pill,
2827                                                          &RMF_MDT_BODY);
2828                         LASSERT(repbody != NULL);
2829                         repbody->mbo_fid1 = *new_fid;
2830                         repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
2831                         GOTO(out_put_new, rc = -EXDEV);
2832                 }
2833                 /* Before locking the target dir, check we do not replace
2834                  * a dir with a non-dir, otherwise it may deadlock with
2835                  * link op which tries to create a link in this dir
2836                  * back to this non-dir.
2837                  */
2838                 if (S_ISDIR(lu_object_attr(&mnew->mot_obj)) &&
2839                     !S_ISDIR(lu_object_attr(&mold->mot_obj)))
2840                         GOTO(out_put_new, rc = -EISDIR);
2841
2842                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2843                 lh_rmt = &info->mti_lh[MDT_LH_RMT];
2844                 mdt_lock_reg_init(lh_oldp, LCK_EX);
2845                 mdt_lock_reg_init(lh_rmt, LCK_EX);
2846                 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
2847                 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2848                                             lh_rmt, lock_ibits, cos_incompat);
2849                 if (rc < 0)
2850                         GOTO(out_put_new, rc);
2851
2852                 /* Check if @msrcdir is subdir of @mnew, before locking child
2853                  * to avoid reverse locking.
2854                  */
2855                 if (mtgtdir != msrcdir) {
2856                         rc = mdo_is_subdir(info->mti_env,
2857                                            mdt_object_child(msrcdir), new_fid);
2858                         if (rc) {
2859                                 if (rc == 1)
2860                                         rc = -EINVAL;
2861                                 GOTO(out_unlock_old, rc);
2862                         }
2863                 }
2864
2865                 /* We used to acquire MDS_INODELOCK_FULL here but we
2866                  * can't do this now because a running HSM restore on
2867                  * the rename onto victim will hold the layout
2868                  * lock. See LU-4002.
2869                  */
2870
2871                 lh_newp = &info->mti_lh[MDT_LH_NEW];
2872                 mdt_lock_reg_init(lh_newp, LCK_EX);
2873                 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
2874                 if (mdt_object_remote(mtgtdir)) {
2875                         rc = mdt_remote_object_lock(info, mtgtdir,
2876                                                     mdt_object_fid(mnew),
2877                                                     &lh_newp->mlh_rreg_lh,
2878                                                     lh_newp->mlh_rreg_mode,
2879                                                     MDS_INODELOCK_LOOKUP,
2880                                                     false);
2881                         if (rc != ELDLM_OK)
2882                                 GOTO(out_unlock_old, rc);
2883
2884                         lock_ibits &= ~MDS_INODELOCK_LOOKUP;
2885                 }
2886                 rc = mdt_reint_object_lock(info, mnew, lh_newp, lock_ibits,
2887                                            cos_incompat);
2888                 if (rc != 0)
2889                         GOTO(out_unlock_new, rc);
2890
2891                 /* get and save version after locking */
2892                 mdt_version_get_save(info, mnew, 3);
2893         } else if (rc != -ENOENT) {
2894                 GOTO(out_put_old, rc);
2895         } else {
2896                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2897                 lh_rmt = &info->mti_lh[MDT_LH_RMT];
2898                 mdt_lock_reg_init(lh_oldp, LCK_EX);
2899                 mdt_lock_reg_init(lh_rmt, LCK_EX);
2900                 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
2901                 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2902                                             lh_rmt, lock_ibits, cos_incompat);
2903                 if (rc != 0)
2904                         GOTO(out_put_old, rc);
2905
2906                 mdt_enoent_version_save(info, 3);
2907         }
2908
2909         /* step 5: rename it */
2910         mdt_reint_init_ma(info, ma);
2911
2912         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
2913                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
2914
2915         if (mnew != NULL)
2916                 mutex_lock(&mnew->mot_lov_mutex);
2917
2918         rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
2919                         mdt_object_child(mtgtdir), old_fid, &rr->rr_name,
2920                         mnew != NULL ? mdt_object_child(mnew) : NULL,
2921                         &rr->rr_tgt_name, ma);
2922
2923         if (mnew != NULL)
2924                 mutex_unlock(&mnew->mot_lov_mutex);
2925
2926         /* handle last link of tgt object */
2927         if (rc == 0) {
2928                 mdt_counter_incr(req, LPROC_MDT_RENAME,
2929                                  ktime_us_delta(ktime_get(), kstart));
2930                 if (mnew) {
2931                         mdt_handle_last_unlink(info, mnew, ma);
2932                         discard = mdt_dom_check_for_discard(info, mnew);
2933                 }
2934                 mdt_rename_counter_tally(info, info->mti_mdt, req,
2935                                          msrcdir, mtgtdir,
2936                                          ktime_us_delta(ktime_get(), kstart));
2937         }
2938
2939         EXIT;
2940 out_unlock_new:
2941         if (mnew != NULL)
2942                 mdt_object_unlock(info, mnew, lh_newp, rc);
2943 out_unlock_old:
2944         mdt_object_unlock(info, NULL, lh_rmt, rc);
2945         mdt_object_unlock(info, mold, lh_oldp, rc);
2946 out_put_new:
2947         if (mnew && !discard)
2948                 mdt_object_put(info->mti_env, mnew);
2949 out_put_old:
2950         mdt_object_put(info->mti_env, mold);
2951 out_unlock_parents:
2952         mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
2953         mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
2954 out_unlock_rename:
2955         if (lustre_handle_is_used(&rename_lh))
2956                 mdt_rename_unlock(&rename_lh);
2957 out_put_tgtdir:
2958         mdt_object_put(info->mti_env, mtgtdir);
2959 out_put_srcdir:
2960         mdt_object_put(info->mti_env, msrcdir);
2961
2962         /* The DoM discard can be done right in the place above where it is
2963          * assigned, meanwhile it is done here after rename unlock due to
2964          * compatibility with old clients, for them the discard blocks
2965          * the main thread until completion. Check LU-11359 for details.
2966          */
2967         if (discard) {
2968                 mdt_dom_discard_data(info, mnew);
2969                 mdt_object_put(info->mti_env, mnew);
2970         }
2971         OBD_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
2972         return rc;
2973 }
2974
2975 static int mdt_reint_resync(struct mdt_thread_info *info,
2976                             struct mdt_lock_handle *lhc)
2977 {
2978         struct mdt_reint_record *rr = &info->mti_rr;
2979         struct ptlrpc_request *req = mdt_info_req(info);
2980         struct md_attr *ma = &info->mti_attr;
2981         struct mdt_object *mo;
2982         struct ldlm_lock *lease;
2983         struct mdt_body *repbody;
2984         struct md_layout_change layout = { .mlc_mirror_id = rr->rr_mirror_id };
2985         bool lease_broken;
2986         int rc, rc2;
2987
2988         ENTRY;
2989         DEBUG_REQ(D_INODE, req, DFID", FLR file resync", PFID(rr->rr_fid1));
2990
2991         if (info->mti_dlm_req)
2992                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2993
2994         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
2995         if (IS_ERR(mo))
2996                 GOTO(out, rc = PTR_ERR(mo));
2997
2998         if (!mdt_object_exists(mo))
2999                 GOTO(out_obj, rc = -ENOENT);
3000
3001         if (!S_ISREG(lu_object_attr(&mo->mot_obj)))
3002                 GOTO(out_obj, rc = -EINVAL);
3003
3004         if (mdt_object_remote(mo))
3005                 GOTO(out_obj, rc = -EREMOTE);
3006
3007         lease = ldlm_handle2lock(rr->rr_lease_handle);
3008         if (lease == NULL)
3009                 GOTO(out_obj, rc = -ESTALE);
3010
3011         /* It's really necessary to grab open_sem and check if the lease lock
3012          * has been lost. There would exist a concurrent writer coming in and
3013          * generating some dirty data in memory cache, the writeback would fail
3014          * after the layout version is increased by MDS_REINT_RESYNC RPC.
3015          */
3016         if (!down_write_trylock(&mo->mot_open_sem))
3017                 GOTO(out_put_lease, rc = -EBUSY);
3018
3019         lock_res_and_lock(lease);
3020         lease_broken = ldlm_is_cancel(lease);
3021         unlock_res_and_lock(lease);
3022         if (lease_broken)
3023                 GOTO(out_unlock, rc = -EBUSY);
3024
3025         /* the file has yet opened by anyone else after we took the lease. */
3026         layout.mlc_opc = MD_LAYOUT_RESYNC;
3027         lhc = &info->mti_lh[MDT_LH_LOCAL];
3028         rc = mdt_layout_change(info, mo, lhc, &layout);
3029         if (rc)
3030                 GOTO(out_unlock, rc);
3031
3032         mdt_object_unlock(info, mo, lhc, 0);
3033
3034         ma->ma_need = MA_INODE;
3035         ma->ma_valid = 0;
3036         rc = mdt_attr_get_complex(info, mo, ma);
3037         if (rc != 0)
3038                 GOTO(out_unlock, rc);
3039
3040         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
3041         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
3042
3043         EXIT;
3044 out_unlock:
3045         up_write(&mo->mot_open_sem);
3046 out_put_lease:
3047         LDLM_LOCK_PUT(lease);
3048 out_obj:
3049         mdt_object_put(info->mti_env, mo);
3050 out:
3051         mdt_client_compatibility(info);
3052         rc2 = mdt_fix_reply(info);
3053         if (rc == 0)
3054                 rc = rc2;
3055         return rc;
3056 }
3057
3058 struct mdt_reinter {
3059         int (*mr_handler)(struct mdt_thread_info *, struct mdt_lock_handle *);
3060         enum lprocfs_extra_opc mr_extra_opc;
3061 };
3062
3063 static const struct mdt_reinter mdt_reinters[] = {
3064         [REINT_SETATTR] = {
3065                 .mr_handler = &mdt_reint_setattr,
3066                 .mr_extra_opc = MDS_REINT_SETATTR,
3067         },
3068         [REINT_CREATE] = {
3069                 .mr_handler = &mdt_reint_create,
3070                 .mr_extra_opc = MDS_REINT_CREATE,
3071         },
3072         [REINT_LINK] = {
3073                 .mr_handler = &mdt_reint_link,
3074                 .mr_extra_opc = MDS_REINT_LINK,
3075         },
3076         [REINT_UNLINK] = {
3077                 .mr_handler = &mdt_reint_unlink,
3078                 .mr_extra_opc = MDS_REINT_UNLINK,
3079         },
3080         [REINT_RENAME] = {
3081                 .mr_handler = &mdt_reint_rename,
3082                 .mr_extra_opc = MDS_REINT_RENAME,
3083         },
3084         [REINT_OPEN] = {
3085                 .mr_handler = &mdt_reint_open,
3086                 .mr_extra_opc = MDS_REINT_OPEN,
3087         },
3088         [REINT_SETXATTR] = {
3089                 .mr_handler = &mdt_reint_setxattr,
3090                 .mr_extra_opc = MDS_REINT_SETXATTR,
3091         },
3092         [REINT_RMENTRY] = {
3093                 .mr_handler = &mdt_reint_unlink,
3094                 .mr_extra_opc = MDS_REINT_UNLINK,
3095         },
3096         [REINT_MIGRATE] = {
3097                 .mr_handler = &mdt_reint_migrate,
3098                 .mr_extra_opc = MDS_REINT_RENAME,
3099         },
3100         [REINT_RESYNC] = {
3101                 .mr_handler = &mdt_reint_resync,
3102                 .mr_extra_opc = MDS_REINT_RESYNC,
3103         },
3104 };
3105
3106 int mdt_reint_rec(struct mdt_thread_info *info,
3107                   struct mdt_lock_handle *lhc)
3108 {
3109         const struct mdt_reinter *mr;
3110         int rc;
3111
3112         ENTRY;
3113         if (!(info->mti_rr.rr_opcode < ARRAY_SIZE(mdt_reinters)))
3114                 RETURN(-EPROTO);
3115
3116         mr = &mdt_reinters[info->mti_rr.rr_opcode];
3117         if (mr->mr_handler == NULL)
3118                 RETURN(-EPROTO);
3119
3120         rc = (*mr->mr_handler)(info, lhc);
3121
3122         lprocfs_counter_incr(ptlrpc_req2svc(mdt_info_req(info))->srv_stats,
3123                              PTLRPC_LAST_CNTR + mr->mr_extra_opc);
3124
3125         RETURN(rc);
3126 }