Whamcloud - gitweb
LU-11047 mdt: standardize mdt object locking
[fs/lustre-release.git] / lustre / mdt / mdt_reint.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/mdt/mdt_reint.c
32  *
33  * Lustre Metadata Target (mdt) reintegration routines
34  *
35  * Author: Peter Braam <braam@clusterfs.com>
36  * Author: Andreas Dilger <adilger@clusterfs.com>
37  * Author: Phil Schwan <phil@clusterfs.com>
38  * Author: Huang Hua <huanghua@clusterfs.com>
39  * Author: Yury Umanets <umka@clusterfs.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_MDS
43
44 #include <lprocfs_status.h>
45 #include "mdt_internal.h"
46 #include <lustre_lmv.h>
47 #include <lustre_crypto.h>
48
49 static inline void mdt_reint_init_ma(struct mdt_thread_info *info,
50                                      struct md_attr *ma)
51 {
52         ma->ma_need = MA_INODE;
53         ma->ma_valid = 0;
54 }
55
56 /**
57  * Get version of object by fid.
58  *
59  * Return real version or ENOENT_VERSION if object doesn't exist
60  */
61 static void mdt_obj_version_get(struct mdt_thread_info *info,
62                                 struct mdt_object *o, __u64 *version)
63 {
64         LASSERT(o);
65
66         if (mdt_object_exists(o) && !mdt_object_remote(o) &&
67             !fid_is_obf(mdt_object_fid(o)))
68                 *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
69         else
70                 *version = ENOENT_VERSION;
71         CDEBUG(D_INODE, "FID "DFID" version is %#llx\n",
72                PFID(mdt_object_fid(o)), *version);
73 }
74
75 /**
76  * Check version is correct.
77  *
78  * Should be called only during replay.
79  */
80 static int mdt_version_check(struct ptlrpc_request *req,
81                              __u64 version, int idx)
82 {
83         __u64 *pre_ver = lustre_msg_get_versions(req->rq_reqmsg);
84
85         ENTRY;
86         if (!exp_connect_vbr(req->rq_export))
87                 RETURN(0);
88
89         LASSERT(req_is_replay(req));
90         /** VBR: version is checked always because costs nothing */
91         LASSERT(idx < PTLRPC_NUM_VERSIONS);
92         /** Sanity check for malformed buffers */
93         if (pre_ver == NULL) {
94                 CERROR("No versions in request buffer\n");
95                 spin_lock(&req->rq_export->exp_lock);
96                 req->rq_export->exp_vbr_failed = 1;
97                 spin_unlock(&req->rq_export->exp_lock);
98                 RETURN(-EOVERFLOW);
99         } else if (pre_ver[idx] != version) {
100                 CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n",
101                        pre_ver[idx], version);
102                 spin_lock(&req->rq_export->exp_lock);
103                 req->rq_export->exp_vbr_failed = 1;
104                 spin_unlock(&req->rq_export->exp_lock);
105                 RETURN(-EOVERFLOW);
106         }
107         RETURN(0);
108 }
109
110 /**
111  * Save pre-versions in reply.
112  */
113 static void mdt_version_save(struct ptlrpc_request *req, __u64 version,
114                              int idx)
115 {
116         __u64 *reply_ver;
117
118         if (!exp_connect_vbr(req->rq_export))
119                 return;
120
121         LASSERT(!req_is_replay(req));
122         LASSERT(req->rq_repmsg != NULL);
123         reply_ver = lustre_msg_get_versions(req->rq_repmsg);
124         if (reply_ver)
125                 reply_ver[idx] = version;
126 }
127
128 /**
129  * Save enoent version, it is needed when it is obvious that object doesn't
130  * exist, e.g. child during create.
131  */
132 static void mdt_enoent_version_save(struct mdt_thread_info *info, int idx)
133 {
134         /* save version of file name for replay, it must be ENOENT here */
135         if (!req_is_replay(mdt_info_req(info))) {
136                 info->mti_ver[idx] = ENOENT_VERSION;
137                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
138         }
139 }
140
141 /**
142  * Get version from disk and save in reply buffer.
143  *
144  * Versions are saved in reply only during normal operations not replays.
145  */
146 void mdt_version_get_save(struct mdt_thread_info *info,
147                           struct mdt_object *mto, int idx)
148 {
149         /* don't save versions during replay */
150         if (!req_is_replay(mdt_info_req(info))) {
151                 mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
152                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
153         }
154 }
155
156 /**
157  * Get version from disk and check it, no save in reply.
158  */
159 int mdt_version_get_check(struct mdt_thread_info *info,
160                           struct mdt_object *mto, int idx)
161 {
162         /* only check versions during replay */
163         if (!req_is_replay(mdt_info_req(info)))
164                 return 0;
165
166         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
167         return mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
168 }
169
170 /**
171  * Get version from disk and check if recovery or just save.
172  */
173 int mdt_version_get_check_save(struct mdt_thread_info *info,
174                                struct mdt_object *mto, int idx)
175 {
176         int rc = 0;
177
178         mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
179         if (req_is_replay(mdt_info_req(info)))
180                 rc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx],
181                                        idx);
182         else
183                 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
184         return rc;
185 }
186
187 /**
188  * Lookup with version checking.
189  *
190  * This checks version of 'name'. Many reint functions uses 'name' for child not
191  * FID, therefore we need to get object by name and check its version.
192  */
193 int mdt_lookup_version_check(struct mdt_thread_info *info,
194                              struct mdt_object *p,
195                              const struct lu_name *lname,
196                              struct lu_fid *fid, int idx)
197 {
198         int rc, vbrc;
199
200         rc = mdo_lookup(info->mti_env, mdt_object_child(p), lname, fid,
201                         &info->mti_spec);
202         /* Check version only during replay */
203         if (!req_is_replay(mdt_info_req(info)))
204                 return rc;
205
206         info->mti_ver[idx] = ENOENT_VERSION;
207         if (rc == 0) {
208                 struct mdt_object *child;
209
210                 child = mdt_object_find(info->mti_env, info->mti_mdt, fid);
211                 if (likely(!IS_ERR(child))) {
212                         mdt_obj_version_get(info, child, &info->mti_ver[idx]);
213                         mdt_object_put(info->mti_env, child);
214                 }
215         }
216         vbrc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
217         return vbrc ? vbrc : rc;
218
219 }
220
221 static int mdt_stripes_unlock(struct mdt_thread_info *mti,
222                               struct mdt_object *obj,
223                               struct ldlm_enqueue_info *einfo,
224                               int decref)
225 {
226         union ldlm_policy_data *policy = &mti->mti_policy;
227         struct mdt_lock_handle *lh = &mti->mti_lh[MDT_LH_LOCAL];
228         struct lustre_handle_array *locks = einfo->ei_cbdata;
229         int i;
230
231         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
232         LASSERT(locks);
233
234         memset(policy, 0, sizeof(*policy));
235         policy->l_inodebits.bits = einfo->ei_inodebits;
236         mdt_lock_reg_init(lh, einfo->ei_mode);
237         for (i = 0; i < locks->ha_count; i++) {
238                 if (test_bit(i, (void *)locks->ha_map))
239                         lh->mlh_rreg_lh = locks->ha_handles[i];
240                 else
241                         lh->mlh_reg_lh = locks->ha_handles[i];
242                 mdt_object_unlock(mti, NULL, lh, decref);
243                 locks->ha_handles[i].cookie = 0ull;
244         }
245
246         return mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
247                                 policy);
248 }
249
250 static inline int mdt_object_striped(struct mdt_thread_info *mti,
251                                      struct mdt_object *obj)
252 {
253         struct lu_device *bottom_dev;
254         struct lu_object *bottom_obj;
255         int rc;
256
257         if (!S_ISDIR(obj->mot_header.loh_attr))
258                 return 0;
259
260         /* getxattr from bottom obj to avoid reading in shard FIDs */
261         bottom_dev = dt2lu_dev(mti->mti_mdt->mdt_bottom);
262         bottom_obj = lu_object_find_slice(mti->mti_env, bottom_dev,
263                                           mdt_object_fid(obj), NULL);
264         if (IS_ERR(bottom_obj))
265                 return PTR_ERR(bottom_obj);
266
267         rc = dt_xattr_get(mti->mti_env, lu2dt(bottom_obj), &LU_BUF_NULL,
268                           XATTR_NAME_LMV);
269         lu_object_put(mti->mti_env, bottom_obj);
270
271         return (rc > 0) ? 1 : (rc == -ENODATA) ? 0 : rc;
272 }
273
274 /**
275  * Lock slave stripes if necessary, the lock handles of slave stripes
276  * will be stored in einfo->ei_cbdata.
277  **/
278 static int mdt_stripes_lock(struct mdt_thread_info *mti, struct mdt_object *obj,
279                             enum ldlm_mode mode, __u64 ibits,
280                             struct ldlm_enqueue_info *einfo)
281 {
282         union ldlm_policy_data *policy = &mti->mti_policy;
283
284         LASSERT(S_ISDIR(obj->mot_header.loh_attr));
285         einfo->ei_type = LDLM_IBITS;
286         einfo->ei_mode = mode;
287         einfo->ei_cb_bl = mdt_remote_blocking_ast;
288         einfo->ei_cb_local_bl = mdt_blocking_ast;
289         einfo->ei_cb_cp = ldlm_completion_ast;
290         einfo->ei_enq_slave = 1;
291         einfo->ei_namespace = mti->mti_mdt->mdt_namespace;
292         einfo->ei_inodebits = ibits;
293         einfo->ei_req_slot = 1;
294         memset(policy, 0, sizeof(*policy));
295         policy->l_inodebits.bits = ibits;
296
297         return mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
298                               policy);
299 }
300
301 /** lock object, and stripes if it's a striped directory
302  *
303  * object should be local, this is called in operations which modify both object
304  * and stripes.
305  *
306  * \param info          struct mdt_thread_info
307  * \param parent        parent object, if it's NULL, find parent by mdo_lookup()
308  * \param child         child object
309  * \param lh            lock handle
310  * \param einfo         struct ldlm_enqueue_info
311  * \param ibits         MDS inode lock bits
312  * \param mode          lock mode
313  * \param cos_incompat  DNE COS incompatible
314  *
315  * \retval              0 on success, -ev on error.
316  */
317 int mdt_object_stripes_lock(struct mdt_thread_info *info,
318                             struct mdt_object *parent,
319                             struct mdt_object *child,
320                             struct mdt_lock_handle *lh,
321                             struct ldlm_enqueue_info *einfo, __u64 ibits,
322                             enum ldlm_mode mode, bool cos_incompat)
323 {
324         int rc;
325
326         ENTRY;
327         /* according to the protocol, child should be local, is request sent to
328          * wrong MDT.
329          */
330         if (mdt_object_remote(child)) {
331                 CERROR("%s: lock target "DFID", but it is on other MDT: rc = %d\n",
332                        mdt_obd_name(info->mti_mdt), PFID(mdt_object_fid(child)),
333                        -EREMOTE);
334                 RETURN(-EREMOTE);
335         }
336
337         memset(einfo, 0, sizeof(*einfo));
338         if (ibits & MDS_INODELOCK_LOOKUP) {
339                 LASSERT(parent);
340                 rc = mdt_object_check_lock(info, parent, child, lh, ibits,
341                                            mode, cos_incompat);
342         } else {
343                 rc = mdt_object_lock(info, child, lh, ibits, mode,
344                                      cos_incompat);
345         }
346         if (rc)
347                 RETURN(rc);
348
349         if (S_ISDIR(child->mot_header.loh_attr)) {
350                 rc = mdt_stripes_lock(info, child, mode, ibits, einfo);
351                 if (rc) {
352                         mdt_object_unlock(info, child, lh, rc);
353                         if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) &&
354                             rc == -EIO)
355                                 rc = 0;
356                 }
357         }
358
359         RETURN(rc);
360 }
361
362 void mdt_object_stripes_unlock(struct mdt_thread_info *info,
363                               struct mdt_object *obj,
364                               struct mdt_lock_handle *lh,
365                               struct ldlm_enqueue_info *einfo, int decref)
366 {
367         /* this is checked in mdt_object_stripes_lock() */
368         LASSERT(!mdt_object_remote(obj));
369         if (einfo->ei_cbdata)
370                 mdt_stripes_unlock(info, obj, einfo, decref);
371         mdt_object_unlock(info, obj, lh, decref);
372 }
373
374 static int mdt_restripe(struct mdt_thread_info *info,
375                         struct mdt_object *parent,
376                         const struct lu_name *lname,
377                         const struct lu_fid *tfid,
378                         struct md_op_spec *spec,
379                         struct md_attr *ma)
380 {
381         struct mdt_device *mdt = info->mti_mdt;
382         struct lu_fid *fid = &info->mti_tmp_fid2;
383         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
384         struct lmv_user_md *lum = spec->u.sp_ea.eadata;
385         struct lu_ucred *uc = mdt_ucred(info);
386         struct lmv_mds_md_v1 *lmv;
387         struct mdt_object *child;
388         struct mdt_lock_handle *lhp;
389         struct mdt_lock_handle *lhc;
390         struct mdt_body *repbody;
391         int rc;
392
393         ENTRY;
394
395         /* we want rbac roles to have precedence over any other
396          * permission or capability checks
397          */
398         if (!mdt->mdt_enable_dir_restripe && !uc->uc_rbac_dne_ops)
399                 RETURN(-EPERM);
400
401         LASSERT(lum);
402         lum->lum_hash_type |= cpu_to_le32(LMV_HASH_FLAG_FIXED);
403
404         rc = mdt_version_get_check_save(info, parent, 0);
405         if (rc)
406                 RETURN(rc);
407
408         lhp = &info->mti_lh[MDT_LH_PARENT];
409         rc = mdt_parent_lock(info, parent, lhp, lname, LCK_PW, true);
410         if (rc)
411                 RETURN(rc);
412
413         rc = mdt_stripe_get(info, parent, ma, XATTR_NAME_LMV);
414         if (rc)
415                 GOTO(unlock_parent, rc);
416
417         if (ma->ma_valid & MA_LMV) {
418                 /* don't allow restripe if parent dir layout is changing */
419                 lmv = &ma->ma_lmv->lmv_md_v1;
420                 if (!lmv_is_sane2(lmv))
421                         GOTO(unlock_parent, rc = -EBADF);
422
423                 if (lmv_is_layout_changing(lmv))
424                         GOTO(unlock_parent, rc = -EBUSY);
425         }
426
427         fid_zero(fid);
428         rc = mdt_lookup_version_check(info, parent, lname, fid, 1);
429         if (rc)
430                 GOTO(unlock_parent, rc);
431
432         child = mdt_object_find(info->mti_env, mdt, fid);
433         if (IS_ERR(child))
434                 GOTO(unlock_parent, rc = PTR_ERR(child));
435
436         if (!mdt_object_exists(child))
437                 GOTO(out_child, rc = -ENOENT);
438
439         if (mdt_object_remote(child)) {
440                 struct mdt_body *repbody;
441
442                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
443                 if (!repbody)
444                         GOTO(out_child, rc = -EPROTO);
445
446                 repbody->mbo_fid1 = *fid;
447                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
448                 GOTO(out_child, rc = -EREMOTE);
449         }
450
451         if (!S_ISDIR(lu_object_attr(&child->mot_obj)))
452                 GOTO(out_child, rc = -ENOTDIR);
453
454         rc = mdt_stripe_get(info, child, ma, XATTR_NAME_LMV);
455         if (rc)
456                 GOTO(out_child, rc);
457
458         /* race with migrate? */
459         if ((ma->ma_valid & MA_LMV) &&
460              lmv_is_migrating(&ma->ma_lmv->lmv_md_v1))
461                 GOTO(out_child, rc = -EBUSY);
462
463         /* lock object */
464         lhc = &info->mti_lh[MDT_LH_CHILD];
465         rc = mdt_object_stripes_lock(info, parent, child, lhc, einfo,
466                                      MDS_INODELOCK_FULL, LCK_PW, true);
467         if (rc)
468                 GOTO(unlock_child, rc);
469
470         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
471         rc = mdt_version_get_check_save(info, child, 1);
472         if (rc)
473                 GOTO(unlock_child, rc);
474
475         spin_lock(&mdt->mdt_restriper.mdr_lock);
476         if (child->mot_restriping) {
477                 /* race? */
478                 spin_unlock(&mdt->mdt_restriper.mdr_lock);
479                 GOTO(unlock_child, rc = -EBUSY);
480         }
481         child->mot_restriping = 1;
482         spin_unlock(&mdt->mdt_restriper.mdr_lock);
483
484         *fid = *tfid;
485         rc = mdt_restripe_internal(info, parent, child, lname, fid, spec, ma);
486         if (rc)
487                 GOTO(restriping_clear, rc);
488
489         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
490         if (!repbody)
491                 GOTO(restriping_clear, rc = -EPROTO);
492
493         mdt_pack_attr2body(info, repbody, &ma->ma_attr, fid);
494         EXIT;
495
496 restriping_clear:
497         child->mot_restriping = 0;
498 unlock_child:
499         mdt_object_stripes_unlock(info, child, lhc, einfo, rc);
500 out_child:
501         mdt_object_put(info->mti_env, child);
502 unlock_parent:
503         mdt_object_unlock(info, parent, lhp, rc);
504
505         return rc;
506 }
507
508 /*
509  * VBR: we save three versions in reply:
510  * 0 - parent. Check that parent version is the same during replay.
511  * 1 - name. Version of 'name' if file exists with the same name or
512  * ENOENT_VERSION, it is needed because file may appear due to missed replays.
513  * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
514  * check.
515  */
516 static int mdt_create(struct mdt_thread_info *info)
517 {
518         struct mdt_device *mdt = info->mti_mdt;
519         struct mdt_object *parent;
520         struct mdt_object *child;
521         struct mdt_lock_handle *lh;
522         struct mdt_body *repbody;
523         struct md_attr *ma = &info->mti_attr;
524         struct mdt_reint_record *rr = &info->mti_rr;
525         struct md_op_spec *spec = &info->mti_spec;
526         struct lu_ucred *uc = mdt_ucred(info);
527         bool restripe = false;
528         int rc;
529
530         ENTRY;
531         DEBUG_REQ(D_INODE, mdt_info_req(info),
532                   "Create ("DNAME"->"DFID") in "DFID,
533                   PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1));
534
535         if (!fid_is_md_operative(rr->rr_fid1))
536                 RETURN(-EPERM);
537
538         if (S_ISDIR(ma->ma_attr.la_mode) &&
539             spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
540                 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
541                 struct obd_export *exp = mdt_info_req(info)->rq_export;
542
543                 /* Only new clients can create remote dir( >= 2.4) and
544                  * striped dir(>= 2.6), old client will return -ENOTSUPP
545                  */
546                 if (!mdt_is_dne_client(exp))
547                         RETURN(-ENOTSUPP);
548
549                 if (le32_to_cpu(lum->lum_stripe_count) > 1) {
550                         if (!mdt_is_striped_client(exp))
551                                 RETURN(-ENOTSUPP);
552
553                         if (!mdt->mdt_enable_striped_dir)
554                                 RETURN(-EPERM);
555                 } else if (!mdt->mdt_enable_remote_dir) {
556                         RETURN(-EPERM);
557                 }
558
559                 if ((!(exp_connect_flags2(exp) & OBD_CONNECT2_CRUSH)) &&
560                     (le32_to_cpu(lum->lum_hash_type) & LMV_HASH_TYPE_MASK) >=
561                     LMV_HASH_TYPE_CRUSH)
562                         RETURN(-EPROTO);
563
564                 /* we want rbac roles to have precedence over any other
565                  * permission or capability checks
566                  */
567                 if (!uc->uc_rbac_dne_ops ||
568                     (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
569                      uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
570                      mdt->mdt_enable_remote_dir_gid != -1))
571                         RETURN(-EPERM);
572
573                 /* restripe if later found dir exists, MDS_OPEN_CREAT means
574                  * this is create only, don't try restripe.
575                  */
576                 if (mdt->mdt_enable_dir_restripe &&
577                     le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT &&
578                     !(spec->sp_cr_flags & MDS_OPEN_CREAT))
579                         restripe = true;
580         }
581
582         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
583
584         parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
585         if (IS_ERR(parent))
586                 RETURN(PTR_ERR(parent));
587
588         if (!mdt_object_exists(parent))
589                 GOTO(put_parent, rc = -ENOENT);
590
591         rc = mdt_check_enc(info, parent);
592         if (rc)
593                 GOTO(put_parent, rc);
594
595         if (!uc->uc_rbac_fscrypt_admin &&
596             parent->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD)
597                 GOTO(put_parent, rc = -EPERM);
598
599         /*
600          * LU-10235: check if name exists locklessly first to avoid massive
601          * lock recalls on existing directories.
602          */
603         rc = mdt_lookup_version_check(info, parent, &rr->rr_name,
604                                       &info->mti_tmp_fid1, 1);
605         if (rc == 0) {
606                 if (!restripe)
607                         GOTO(put_parent, rc = -EEXIST);
608
609                 rc = mdt_restripe(info, parent, &rr->rr_name, rr->rr_fid2, spec,
610                                   ma);
611         }
612
613         /* -ENOENT is expected here */
614         if (rc != -ENOENT)
615                 GOTO(put_parent, rc);
616
617         /* save version of file name for replay, it must be ENOENT here */
618         mdt_enoent_version_save(info, 1);
619
620         OBD_RACE(OBD_FAIL_MDS_CREATE_RACE);
621
622         lh = &info->mti_lh[MDT_LH_PARENT];
623         rc = mdt_parent_lock(info, parent, lh, &rr->rr_name, LCK_PW, false);
624         if (rc)
625                 GOTO(put_parent, rc);
626
627         if (!mdt_object_remote(parent)) {
628                 rc = mdt_version_get_check_save(info, parent, 0);
629                 if (rc)
630                         GOTO(unlock_parent, rc);
631         }
632
633         child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
634         if (unlikely(IS_ERR(child)))
635                 GOTO(unlock_parent, rc = PTR_ERR(child));
636
637         ma->ma_need = MA_INODE;
638         ma->ma_valid = 0;
639
640         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
641                         OBD_FAIL_MDS_REINT_CREATE_WRITE);
642
643         /* Version of child will be updated on disk. */
644         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
645         rc = mdt_version_get_check_save(info, child, 2);
646         if (rc)
647                 GOTO(put_child, rc);
648
649         if (parent->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD ||
650             (rr->rr_name.ln_namelen == strlen(dot_fscrypt_name) &&
651              strncmp(rr->rr_name.ln_name, dot_fscrypt_name,
652                      rr->rr_name.ln_namelen) == 0))
653                 child->mot_obj.lo_header->loh_attr |= LOHA_FSCRYPT_MD;
654
655         /*
656          * Do not perform lookup sanity check. We know that name does
657          * not exist.
658          */
659         info->mti_spec.sp_cr_lookup = 0;
660         if (mdt_object_remote(parent))
661                 info->mti_spec.sp_cr_lookup = 1;
662         info->mti_spec.sp_feat = &dt_directory_features;
663
664         rc = mdo_create(info->mti_env, mdt_object_child(parent), &rr->rr_name,
665                         mdt_object_child(child), &info->mti_spec, ma);
666         if (rc == 0)
667                 rc = mdt_attr_get_complex(info, child, ma);
668
669         if (rc < 0)
670                 GOTO(put_child, rc);
671
672         /*
673          * On DNE, we need to eliminate dependey between 'mkdir a' and
674          * 'mkdir a/b' if b is a striped directory, to achieve this, two
675          * things are done below:
676          * 1. save child and slaves lock.
677          * 2. if the child is a striped directory, relock parent so to
678          *    compare against with COS locks to ensure parent was
679          *    committed to disk.
680          */
681         if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
682                 struct mdt_lock_handle *lhc;
683                 struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
684                 bool cos_incompat;
685
686                 rc = mdt_object_striped(info, child);
687                 if (rc < 0)
688                         GOTO(put_child, rc);
689
690                 cos_incompat = rc;
691                 if (cos_incompat) {
692                         if (!mdt_object_remote(parent)) {
693                                 mdt_object_unlock(info, parent, lh, 1);
694                                 rc = mdt_parent_lock(info, parent, lh,
695                                                      &rr->rr_name, LCK_PW,
696                                                      true);
697                                 if (rc)
698                                         GOTO(put_child, rc);
699                         }
700                 }
701
702                 lhc = &info->mti_lh[MDT_LH_CHILD];
703                 rc = mdt_object_stripes_lock(info, parent, child, lhc, einfo,
704                                              MDS_INODELOCK_UPDATE, LCK_PW,
705                                              cos_incompat);
706                 if (rc)
707                         GOTO(put_child, rc);
708
709                 mdt_object_stripes_unlock(info, child, lhc, einfo, rc);
710         }
711
712         /* Return fid & attr to client. */
713         if (ma->ma_valid & MA_INODE)
714                 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
715                                    mdt_object_fid(child));
716         EXIT;
717 put_child:
718         mdt_object_put(info->mti_env, child);
719 unlock_parent:
720         mdt_object_unlock(info, parent, lh, rc);
721 put_parent:
722         mdt_object_put(info->mti_env, parent);
723         return rc;
724 }
725
726 static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
727                         struct md_attr *ma)
728 {
729         struct mdt_lock_handle  *lh;
730         int do_vbr = ma->ma_attr.la_valid &
731                         (LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS);
732         __u64 lockpart = MDS_INODELOCK_UPDATE;
733         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
734         bool cos_incompat;
735         int rc;
736
737         ENTRY;
738         rc = mdt_object_striped(info, mo);
739         if (rc < 0)
740                 RETURN(rc);
741         cos_incompat = rc;
742
743         if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
744                 lockpart |= MDS_INODELOCK_PERM;
745         /* Clear xattr cache on clients, so the virtual project ID xattr
746          * can get the new project ID
747          */
748         if (ma->ma_attr.la_valid & LA_PROJID)
749                 lockpart |= MDS_INODELOCK_XATTR;
750
751         lh = &info->mti_lh[MDT_LH_PARENT];
752         rc = mdt_object_stripes_lock(info, NULL, mo, lh, einfo, lockpart,
753                                      LCK_PW, cos_incompat);
754         if (rc != 0)
755                 RETURN(rc);
756
757         /* all attrs are packed into mti_attr in unpack_setattr */
758         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
759                        OBD_FAIL_MDS_REINT_SETATTR_WRITE);
760
761         /* VBR: update version if attr changed are important for recovery */
762         if (do_vbr) {
763                 /* update on-disk version of changed object */
764                 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mo));
765                 rc = mdt_version_get_check_save(info, mo, 0);
766                 if (rc)
767                         GOTO(out_unlock, rc);
768         }
769
770         /* Ensure constant striping during chown(). See LU-2789. */
771         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
772                 mutex_lock(&mo->mot_lov_mutex);
773
774         /* all attrs are packed into mti_attr in unpack_setattr */
775         rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
776
777         if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
778                 mutex_unlock(&mo->mot_lov_mutex);
779
780         if (rc != 0)
781                 GOTO(out_unlock, rc);
782         mdt_dom_obj_lvb_update(info->mti_env, mo, NULL, false);
783         EXIT;
784 out_unlock:
785         mdt_object_stripes_unlock(info, mo, lh, einfo, rc);
786         return rc;
787 }
788
789 /**
790  * Check HSM flags and add HS_DIRTY flag if relevant.
791  *
792  * A file could be set dirty only if it has a copy in the backend (HS_EXISTS)
793  * and is not RELEASED.
794  */
795 int mdt_add_dirty_flag(struct mdt_thread_info *info, struct mdt_object *mo,
796                         struct md_attr *ma)
797 {
798         struct lu_ucred *uc = mdt_ucred(info);
799         kernel_cap_t cap_saved;
800         int rc;
801
802         ENTRY;
803         /* If the file was modified, add the dirty flag */
804         ma->ma_need = MA_HSM;
805         rc = mdt_attr_get_complex(info, mo, ma);
806         if (rc) {
807                 CERROR("file attribute read error for "DFID": %d.\n",
808                         PFID(mdt_object_fid(mo)), rc);
809                 RETURN(rc);
810         }
811
812         /* If an up2date copy exists in the backend, add dirty flag */
813         if ((ma->ma_valid & MA_HSM) && (ma->ma_hsm.mh_flags & HS_EXISTS)
814             && !(ma->ma_hsm.mh_flags & (HS_DIRTY|HS_RELEASED))) {
815                 ma->ma_hsm.mh_flags |= HS_DIRTY;
816
817                 /* Bump cap so that closes from non-owner writers can
818                  * set the HSM state to dirty.
819                  */
820                 cap_saved = uc->uc_cap;
821                 cap_raise(uc->uc_cap, CAP_FOWNER);
822                 rc = mdt_hsm_attr_set(info, mo, &ma->ma_hsm);
823                 uc->uc_cap = cap_saved;
824                 if (rc)
825                         CERROR("file attribute change error for "DFID": %d\n",
826                                 PFID(mdt_object_fid(mo)), rc);
827         }
828
829         RETURN(rc);
830 }
831
832 static int mdt_reint_setattr(struct mdt_thread_info *info,
833                              struct mdt_lock_handle *lhc)
834 {
835         struct mdt_device *mdt = info->mti_mdt;
836         struct md_attr *ma = &info->mti_attr;
837         struct mdt_reint_record *rr = &info->mti_rr;
838         struct ptlrpc_request *req = mdt_info_req(info);
839         struct mdt_object *mo;
840         struct mdt_body *repbody;
841         ktime_t kstart = ktime_get();
842         int rc;
843
844         ENTRY;
845         DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
846                   (unsigned int)ma->ma_attr.la_valid);
847
848         if (info->mti_dlm_req)
849                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
850
851         OBD_RACE(OBD_FAIL_PTLRPC_RESEND_RACE);
852
853         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
854         mo = mdt_object_find(info->mti_env, mdt, rr->rr_fid1);
855         if (IS_ERR(mo))
856                 GOTO(out, rc = PTR_ERR(mo));
857
858         if (!mdt_object_exists(mo))
859                 GOTO(out_put, rc = -ENOENT);
860
861         if (mdt_object_remote(mo))
862                 GOTO(out_put, rc = -EREMOTE);
863
864         ma->ma_enable_chprojid_gid = mdt->mdt_enable_chprojid_gid;
865         /* revoke lease lock if size is going to be changed */
866         if (unlikely(ma->ma_attr.la_valid & LA_SIZE &&
867                      !(ma->ma_attr_flags & MDS_TRUNC_KEEP_LEASE) &&
868                      atomic_read(&mo->mot_lease_count) > 0)) {
869                 down_read(&mo->mot_open_sem);
870
871                 if (atomic_read(&mo->mot_lease_count) > 0) { /* lease exists */
872                         lhc = &info->mti_lh[MDT_LH_LOCAL];
873                         rc = mdt_object_lock(info, mo, lhc, MDS_INODELOCK_OPEN,
874                                              LCK_CW, false);
875                         if (rc != 0) {
876                                 up_read(&mo->mot_open_sem);
877                                 GOTO(out_put, rc);
878                         }
879
880                         /* revoke lease lock */
881                         mdt_object_unlock(info, mo, lhc, 1);
882                 }
883                 up_read(&mo->mot_open_sem);
884         }
885
886         if (ma->ma_attr.la_valid & LA_SIZE || rr->rr_flags & MRF_OPEN_TRUNC) {
887                 /* Check write access for the O_TRUNC case */
888                 if (mdt_write_read(mo) < 0)
889                         GOTO(out_put, rc = -ETXTBSY);
890
891                 /* LU-10286: compatibility check for FLR.
892                  * Please check the comment in mdt_finish_open() for details
893                  */
894                 if (!exp_connect_flr(info->mti_exp) ||
895                     !exp_connect_overstriping(info->mti_exp)) {
896                         rc = mdt_big_xattr_get(info, mo, XATTR_NAME_LOV);
897                         if (rc < 0 && rc != -ENODATA)
898                                 GOTO(out_put, rc);
899
900                         if (!exp_connect_flr(info->mti_exp)) {
901                                 if (rc > 0 &&
902                                     mdt_lmm_is_flr(info->mti_big_lmm))
903                                         GOTO(out_put, rc = -EOPNOTSUPP);
904                         }
905
906                         if (!exp_connect_overstriping(info->mti_exp)) {
907                                 if (rc > 0 &&
908                                     mdt_lmm_is_overstriping(info->mti_big_lmm))
909                                         GOTO(out_put, rc = -EOPNOTSUPP);
910                         }
911                 }
912
913                 /* For truncate, the file size sent from client
914                  * is believable, but the blocks are incorrect,
915                  * which makes the block size in LSOM attribute
916                  * inconsisent with the real block size.
917                  */
918                 rc = mdt_lsom_update(info, mo, true);
919                 if (rc)
920                         GOTO(out_put, rc);
921         }
922
923         if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
924                 if (ma->ma_valid & MA_LOV)
925                         GOTO(out_put, rc = -EPROTO);
926
927                 /* MDT supports FMD for regular files due to Data-on-MDT */
928                 if (S_ISREG(lu_object_attr(&mo->mot_obj)) &&
929                     ma->ma_attr.la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) {
930                         tgt_fmd_update(info->mti_exp, mdt_object_fid(mo),
931                                        req->rq_xid);
932
933                         if (ma->ma_attr.la_valid & LA_MTIME) {
934                                 rc = mdt_attr_get_pfid(info, mo, &ma->ma_pfid);
935                                 if (!rc)
936                                         ma->ma_valid |= MA_PFID;
937                         }
938                 }
939
940                 rc = mdt_attr_set(info, mo, ma);
941                 if (rc)
942                         GOTO(out_put, rc);
943         } else if ((ma->ma_valid & (MA_LOV | MA_LMV)) &&
944                    (ma->ma_valid & MA_INODE)) {
945                 struct lu_buf *buf = &info->mti_buf;
946                 struct lu_ucred *uc = mdt_ucred(info);
947                 struct mdt_lock_handle *lh;
948                 const char *name;
949
950                 /* reject if either remote or striped dir is disabled */
951                 if (ma->ma_valid & MA_LMV) {
952                         if (!mdt->mdt_enable_remote_dir ||
953                             !mdt->mdt_enable_striped_dir)
954                                 GOTO(out_put, rc = -EPERM);
955
956                         /* we want rbac roles to have precedence over any other
957                          * permission or capability checks
958                          */
959                         if (!uc->uc_rbac_dne_ops ||
960                             (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
961                              uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
962                              mdt->mdt_enable_remote_dir_gid != -1))
963                                 GOTO(out_put, rc = -EPERM);
964                 }
965
966                 if (!S_ISDIR(lu_object_attr(&mo->mot_obj)))
967                         GOTO(out_put, rc = -ENOTDIR);
968
969                 if (ma->ma_attr.la_valid != 0)
970                         GOTO(out_put, rc = -EPROTO);
971
972                 lh = &info->mti_lh[MDT_LH_PARENT];
973                 if (ma->ma_valid & MA_LOV) {
974                         buf->lb_buf = ma->ma_lmm;
975                         buf->lb_len = ma->ma_lmm_size;
976                         name = XATTR_NAME_LOV;
977                         rc = mdt_object_lock(info, mo, lh, MDS_INODELOCK_XATTR,
978                                              LCK_PW, false);
979                 } else {
980                         buf->lb_buf = &ma->ma_lmv->lmv_user_md;
981                         buf->lb_len = ma->ma_lmv_size;
982                         name = XATTR_NAME_DEFAULT_LMV;
983
984                         if (unlikely(fid_is_root(mdt_object_fid(mo)))) {
985                                 rc = mdt_object_lock(info, mo, lh,
986                                                      MDS_INODELOCK_XATTR |
987                                                      MDS_INODELOCK_LOOKUP,
988                                                      LCK_PW, false);
989                         } else {
990                                 struct lu_fid *pfid = &info->mti_tmp_fid1;
991                                 struct lu_name *pname = &info->mti_name;
992                                 const char dotdot[] = "..";
993                                 struct mdt_object *pobj;
994
995                                 fid_zero(pfid);
996                                 pname->ln_name = dotdot;
997                                 pname->ln_namelen = sizeof(dotdot);
998                                 rc = mdo_lookup(info->mti_env,
999                                                 mdt_object_child(mo), pname,
1000                                                 pfid, NULL);
1001                                 if (rc)
1002                                         GOTO(out_put, rc);
1003
1004                                 pobj = mdt_object_find(info->mti_env,
1005                                                        info->mti_mdt, pfid);
1006                                 if (IS_ERR(pobj))
1007                                         GOTO(out_put, rc = PTR_ERR(pobj));
1008
1009                                 rc = mdt_object_check_lock(info, pobj, mo, lh,
1010                                                            MDS_INODELOCK_XATTR |
1011                                                            MDS_INODELOCK_LOOKUP,
1012                                                            LCK_PW, false);
1013                                 mdt_object_put(info->mti_env, pobj);
1014                         }
1015                 }
1016
1017                 if (rc != 0)
1018                         GOTO(out_put, rc);
1019
1020                 rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), buf,
1021                                   name, 0);
1022
1023                 mdt_object_unlock(info, mo, lh, rc);
1024                 if (rc)
1025                         GOTO(out_put, rc);
1026         } else {
1027                 GOTO(out_put, rc = -EPROTO);
1028         }
1029
1030         /* If file data is modified, add the dirty flag */
1031         if (ma->ma_attr_flags & MDS_DATA_MODIFIED)
1032                 rc = mdt_add_dirty_flag(info, mo, ma);
1033
1034         ma->ma_need = MA_INODE;
1035         ma->ma_valid = 0;
1036         rc = mdt_attr_get_complex(info, mo, ma);
1037         if (rc != 0)
1038                 GOTO(out_put, rc);
1039
1040         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
1041
1042         EXIT;
1043 out_put:
1044         mdt_object_put(info->mti_env, mo);
1045 out:
1046         if (rc == 0)
1047                 mdt_counter_incr(req, LPROC_MDT_SETATTR,
1048                                  ktime_us_delta(ktime_get(), kstart));
1049
1050         mdt_client_compatibility(info);
1051         return rc;
1052 }
1053
1054 static int mdt_reint_create(struct mdt_thread_info *info,
1055                             struct mdt_lock_handle *lhc)
1056 {
1057         struct ptlrpc_request   *req = mdt_info_req(info);
1058         ktime_t                 kstart = ktime_get();
1059         int                     rc;
1060
1061         ENTRY;
1062         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
1063                 RETURN(err_serious(-ESTALE));
1064
1065         if (info->mti_dlm_req)
1066                 ldlm_request_cancel(mdt_info_req(info),
1067                                     info->mti_dlm_req, 0, LATF_SKIP);
1068
1069         if (!lu_name_is_valid(&info->mti_rr.rr_name))
1070                 RETURN(-EPROTO);
1071
1072         switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
1073         case S_IFDIR:
1074         case S_IFREG:
1075         case S_IFLNK:
1076         case S_IFCHR:
1077         case S_IFBLK:
1078         case S_IFIFO:
1079         case S_IFSOCK:
1080                 break;
1081         default:
1082                 CERROR("%s: Unsupported mode %o\n",
1083                        mdt_obd_name(info->mti_mdt),
1084                        info->mti_attr.ma_attr.la_mode);
1085                 RETURN(err_serious(-EOPNOTSUPP));
1086         }
1087
1088         rc = mdt_create(info);
1089         if (rc == 0) {
1090                 if ((info->mti_attr.ma_attr.la_mode & S_IFMT) == S_IFDIR)
1091                         mdt_counter_incr(req, LPROC_MDT_MKDIR,
1092                                          ktime_us_delta(ktime_get(), kstart));
1093                 else
1094                         /* Special file should stay on the same node as parent*/
1095                         mdt_counter_incr(req, LPROC_MDT_MKNOD,
1096                                          ktime_us_delta(ktime_get(), kstart));
1097         }
1098
1099         RETURN(rc);
1100 }
1101
1102 /*
1103  * VBR: save parent version in reply and child version getting by its name.
1104  * Version of child is getting and checking during its lookup. If
1105  */
1106 static int mdt_reint_unlink(struct mdt_thread_info *info,
1107                             struct mdt_lock_handle *lhc)
1108 {
1109         struct mdt_reint_record *rr = &info->mti_rr;
1110         struct ptlrpc_request *req = mdt_info_req(info);
1111         struct md_attr *ma = &info->mti_attr;
1112         struct lu_fid *child_fid = &info->mti_tmp_fid1;
1113         struct mdt_object *mp;
1114         struct mdt_object *mc;
1115         struct mdt_lock_handle *parent_lh;
1116         struct mdt_lock_handle *child_lh;
1117         struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
1118         struct lu_ucred *uc  = mdt_ucred(info);
1119         bool cos_incompat = false;
1120         int no_name = 0;
1121         ktime_t kstart = ktime_get();
1122         int rc;
1123
1124         ENTRY;
1125         DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1),
1126                   PNAME(&rr->rr_name));
1127
1128         if (info->mti_dlm_req)
1129                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1130
1131         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
1132                 RETURN(err_serious(-ENOENT));
1133
1134         if (!fid_is_md_operative(rr->rr_fid1))
1135                 RETURN(-EPERM);
1136
1137         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1138         if (IS_ERR(mp))
1139                 RETURN(PTR_ERR(mp));
1140
1141         if (mdt_object_remote(mp)) {
1142                 cos_incompat = true;
1143         } else {
1144                 rc = mdt_version_get_check_save(info, mp, 0);
1145                 if (rc)
1146                         GOTO(put_parent, rc);
1147         }
1148
1149         if (!uc->uc_rbac_fscrypt_admin &&
1150             mp->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD)
1151                 GOTO(put_parent, rc = -EPERM);
1152
1153         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN);
1154         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN2);
1155 relock:
1156         parent_lh = &info->mti_lh[MDT_LH_PARENT];
1157         rc = mdt_parent_lock(info, mp, parent_lh, &rr->rr_name, LCK_PW,
1158                              cos_incompat);
1159         if (rc != 0)
1160                 GOTO(put_parent, rc);
1161
1162         if (info->mti_spec.sp_rm_entry) {
1163                 if (!mdt_is_dne_client(req->rq_export))
1164                         /* Return -ENOTSUPP for old client */
1165                         GOTO(unlock_parent, rc = -ENOTSUPP);
1166
1167                 if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN))
1168                         GOTO(unlock_parent, rc = -EPERM);
1169
1170                 ma->ma_need = MA_INODE;
1171                 ma->ma_valid = 0;
1172                 rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1173                                 NULL, &rr->rr_name, ma, no_name);
1174                 GOTO(unlock_parent, rc);
1175         }
1176
1177         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID) {
1178                 *child_fid = *rr->rr_fid2;
1179         } else {
1180                 /* lookup child object along with version checking */
1181                 fid_zero(child_fid);
1182                 rc = mdt_lookup_version_check(info, mp, &rr->rr_name, child_fid,
1183                                               1);
1184                 if (rc != 0) {
1185                         /* Name might not be able to find during resend of
1186                          * remote unlink, considering following case.
1187                          * dir_A is a remote directory, the name entry of
1188                          * dir_A is on MDT0, the directory is on MDT1,
1189                          *
1190                          * 1. client sends unlink req to MDT1.
1191                          * 2. MDT1 sends name delete update to MDT0.
1192                          * 3. name entry is being deleted in MDT0 synchronously.
1193                          * 4. MDT1 is restarted.
1194                          * 5. client resends unlink req to MDT1. So it can not
1195                          *    find the name entry on MDT0 anymore.
1196                          * In this case, MDT1 only needs to destory the local
1197                          * directory.
1198                          */
1199                         if (mdt_object_remote(mp) && rc == -ENOENT &&
1200                             !fid_is_zero(rr->rr_fid2) &&
1201                             lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
1202                                 no_name = 1;
1203                                 *child_fid = *rr->rr_fid2;
1204                         } else {
1205                                 GOTO(unlock_parent, rc);
1206                         }
1207                 }
1208         }
1209
1210         if (!fid_is_md_operative(child_fid))
1211                 GOTO(unlock_parent, rc = -EPERM);
1212
1213         /* We will lock the child regardless it is local or remote. No harm. */
1214         mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
1215         if (IS_ERR(mc))
1216                 GOTO(unlock_parent, rc = PTR_ERR(mc));
1217
1218         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID) {
1219                 /* In this case, child fid is embedded in the request, and we do
1220                  * not have a proper name as rr_name contains an encoded
1221                  * hash. So find name that matches provided hash.
1222                  */
1223                 if (!find_name_matching_hash(info, &rr->rr_name,
1224                                              NULL, mc))
1225                         GOTO(put_child, rc = -ENOENT);
1226         }
1227
1228         if (!cos_incompat) {
1229                 rc = mdt_object_striped(info, mc);
1230                 if (rc < 0)
1231                         GOTO(put_child, rc);
1232
1233                 cos_incompat = rc;
1234                 if (cos_incompat) {
1235                         mdt_object_put(info->mti_env, mc);
1236                         mdt_object_unlock(info, mp, parent_lh, -EAGAIN);
1237                         goto relock;
1238                 }
1239         }
1240
1241         child_lh = &info->mti_lh[MDT_LH_CHILD];
1242         if (mdt_object_remote(mc)) {
1243                 struct mdt_body  *repbody;
1244
1245                 if (!fid_is_zero(rr->rr_fid2)) {
1246                         CDEBUG(D_INFO, "%s: name "DNAME" cannot find "DFID"\n",
1247                                mdt_obd_name(info->mti_mdt),
1248                                PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1249                         GOTO(put_child, rc = -ENOENT);
1250                 }
1251                 CDEBUG(D_INFO, "%s: name "DNAME": "DFID" is on another MDT\n",
1252                        mdt_obd_name(info->mti_mdt),
1253                        PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1254
1255                 if (!mdt_is_dne_client(req->rq_export))
1256                         /* Return -ENOTSUPP for old client */
1257                         GOTO(put_child, rc = -ENOTSUPP);
1258
1259                 /* Revoke the LOOKUP lock of the remote object granted by
1260                  * this MDT. Since the unlink will happen on another MDT,
1261                  * it will release the LOOKUP lock right away. Then What
1262                  * would happen if another client try to grab the LOOKUP
1263                  * lock at the same time with unlink XXX
1264                  */
1265                 rc = mdt_object_lookup_lock(info, NULL, mc, child_lh, LCK_EX,
1266                                             false);
1267                 if (rc)
1268                         GOTO(put_child, rc);
1269
1270                 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1271                 LASSERT(repbody != NULL);
1272                 repbody->mbo_fid1 = *mdt_object_fid(mc);
1273                 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
1274                 GOTO(unlock_child, rc = -EREMOTE);
1275         }
1276         /* We used to acquire MDS_INODELOCK_FULL here but we can't do
1277          * this now because a running HSM restore on the child (unlink
1278          * victim) will hold the layout lock. See LU-4002.
1279          */
1280         rc = mdt_object_stripes_lock(info, mp, mc, child_lh, einfo,
1281                                      MDS_INODELOCK_LOOKUP |
1282                                      MDS_INODELOCK_UPDATE,
1283                                      LCK_EX, cos_incompat);
1284         if (rc != 0)
1285                 GOTO(put_child, rc);
1286
1287         /*
1288          * Now we can only make sure we need MA_INODE, in mdd layer, will check
1289          * whether need MA_LOV and MA_COOKIE.
1290          */
1291         ma->ma_need = MA_INODE;
1292         ma->ma_valid = 0;
1293
1294         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1295                        OBD_FAIL_MDS_REINT_UNLINK_WRITE);
1296         /* save version when object is locked */
1297         mdt_version_get_save(info, mc, 1);
1298
1299         mutex_lock(&mc->mot_lov_mutex);
1300
1301         rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1302                         mdt_object_child(mc), &rr->rr_name, ma, no_name);
1303
1304         mutex_unlock(&mc->mot_lov_mutex);
1305         if (rc != 0)
1306                 GOTO(unlock_child, rc);
1307
1308         if (!lu_object_is_dying(&mc->mot_header)) {
1309                 rc = mdt_attr_get_complex(info, mc, ma);
1310                 if (rc)
1311                         GOTO(out_stat, rc);
1312         } else if (mdt_dom_check_for_discard(info, mc)) {
1313                 mdt_dom_discard_data(info, mc);
1314         }
1315         mdt_handle_last_unlink(info, mc, ma);
1316
1317 out_stat:
1318         if (ma->ma_valid & MA_INODE) {
1319                 switch (ma->ma_attr.la_mode & S_IFMT) {
1320                 case S_IFDIR:
1321                         mdt_counter_incr(req, LPROC_MDT_RMDIR,
1322                                          ktime_us_delta(ktime_get(), kstart));
1323                         break;
1324                 case S_IFREG:
1325                 case S_IFLNK:
1326                 case S_IFCHR:
1327                 case S_IFBLK:
1328                 case S_IFIFO:
1329                 case S_IFSOCK:
1330                         mdt_counter_incr(req, LPROC_MDT_UNLINK,
1331                                          ktime_us_delta(ktime_get(), kstart));
1332                         break;
1333                 default:
1334                         LASSERTF(0, "bad file type %o unlinking\n",
1335                                 ma->ma_attr.la_mode);
1336                 }
1337         }
1338
1339         EXIT;
1340
1341 unlock_child:
1342         mdt_object_stripes_unlock(info, mc, child_lh, einfo, rc);
1343 put_child:
1344         if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID &&
1345             info->mti_big_buf.lb_buf)
1346                 lu_buf_free(&info->mti_big_buf);
1347         mdt_object_put(info->mti_env, mc);
1348 unlock_parent:
1349         mdt_object_unlock(info, mp, parent_lh, rc);
1350 put_parent:
1351         mdt_object_put(info->mti_env, mp);
1352         CFS_RACE_WAKEUP(OBD_FAIL_OBD_ZERO_NLINK_RACE);
1353         return rc;
1354 }
1355
1356 /*
1357  * VBR: save versions in reply: 0 - parent; 1 - child by fid; 2 - target by
1358  * name.
1359  */
1360 static int mdt_reint_link(struct mdt_thread_info *info,
1361                           struct mdt_lock_handle *lhc)
1362 {
1363         struct mdt_reint_record *rr = &info->mti_rr;
1364         struct ptlrpc_request   *req = mdt_info_req(info);
1365         struct md_attr          *ma = &info->mti_attr;
1366         struct mdt_object       *ms;
1367         struct mdt_object       *mp;
1368         struct mdt_lock_handle  *lhs;
1369         struct mdt_lock_handle  *lhp;
1370         ktime_t kstart = ktime_get();
1371         bool cos_incompat;
1372         int rc;
1373
1374         ENTRY;
1375         DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/"DNAME,
1376                   PFID(rr->rr_fid1), PFID(rr->rr_fid2), PNAME(&rr->rr_name));
1377
1378         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
1379                 RETURN(err_serious(-ENOENT));
1380
1381         if (OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_RESEND_RACE) ||
1382             OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND)) {
1383                 req->rq_no_reply = 1;
1384                 RETURN(err_serious(-ENOENT));
1385         }
1386
1387         if (info->mti_dlm_req)
1388                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1389
1390         /* Invalid case so return error immediately instead of
1391          * processing it
1392          */
1393         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
1394                 RETURN(-EPERM);
1395
1396         if (!fid_is_md_operative(rr->rr_fid1) ||
1397             !fid_is_md_operative(rr->rr_fid2))
1398                 RETURN(-EPERM);
1399
1400         /* step 1: find target parent dir */
1401         mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
1402         if (IS_ERR(mp))
1403                 RETURN(PTR_ERR(mp));
1404
1405         rc = mdt_version_get_check_save(info, mp, 0);
1406         if (rc)
1407                 GOTO(put_parent, rc);
1408
1409         rc = mdt_check_enc(info, mp);
1410         if (rc)
1411                 GOTO(put_parent, rc);
1412
1413         /* step 2: find source */
1414         ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1415         if (IS_ERR(ms))
1416                 GOTO(put_parent, rc = PTR_ERR(ms));
1417
1418         if (!mdt_object_exists(ms)) {
1419                 CDEBUG(D_INFO, "%s: "DFID" does not exist.\n",
1420                        mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1));
1421                 GOTO(put_source, rc = -ENOENT);
1422         }
1423
1424         cos_incompat = (mdt_object_remote(mp) || mdt_object_remote(ms));
1425
1426         OBD_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
1427
1428         lhp = &info->mti_lh[MDT_LH_PARENT];
1429         rc = mdt_parent_lock(info, mp, lhp, &rr->rr_name, LCK_PW, cos_incompat);
1430         if (rc != 0)
1431                 GOTO(put_source, rc);
1432
1433         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
1434
1435         lhs = &info->mti_lh[MDT_LH_CHILD];
1436         rc = mdt_object_lock(info, ms, lhs,
1437                              MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR, LCK_EX,
1438                              cos_incompat);
1439         if (rc != 0)
1440                 GOTO(unlock_parent, rc);
1441
1442         /* step 3: link it */
1443         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1444                         OBD_FAIL_MDS_REINT_LINK_WRITE);
1445
1446         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(ms));
1447         rc = mdt_version_get_check_save(info, ms, 1);
1448         if (rc)
1449                 GOTO(unlock_source, rc);
1450
1451         /** check target version by name during replay */
1452         rc = mdt_lookup_version_check(info, mp, &rr->rr_name,
1453                                       &info->mti_tmp_fid1, 2);
1454         if (rc != 0 && rc != -ENOENT)
1455                 GOTO(unlock_source, rc);
1456         /* save version of file name for replay, it must be ENOENT here */
1457         if (!req_is_replay(mdt_info_req(info))) {
1458                 if (rc != -ENOENT) {
1459                         CDEBUG(D_INFO, "link target "DNAME" existed!\n",
1460                                PNAME(&rr->rr_name));
1461                         GOTO(unlock_source, rc = -EEXIST);
1462                 }
1463                 info->mti_ver[2] = ENOENT_VERSION;
1464                 mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2);
1465         }
1466
1467         rc = mdo_link(info->mti_env, mdt_object_child(mp),
1468                       mdt_object_child(ms), &rr->rr_name, ma);
1469
1470         if (rc == 0)
1471                 mdt_counter_incr(req, LPROC_MDT_LINK,
1472                                  ktime_us_delta(ktime_get(), kstart));
1473
1474         EXIT;
1475 unlock_source:
1476         mdt_object_unlock(info, ms, lhs, rc);
1477 unlock_parent:
1478         mdt_object_unlock(info, mp, lhp, rc);
1479 put_source:
1480         mdt_object_put(info->mti_env, ms);
1481 put_parent:
1482         mdt_object_put(info->mti_env, mp);
1483         return rc;
1484 }
1485
1486 /**
1487  * Get BFL lock for rename or migrate process.
1488  **/
1489 static int mdt_rename_lock(struct mdt_thread_info *info,
1490                            struct mdt_lock_handle *lh)
1491 {
1492         struct lu_fid *fid = &info->mti_tmp_fid1;
1493         struct mdt_object *obj;
1494         __u64 ibits = MDS_INODELOCK_UPDATE;
1495         int rc;
1496
1497         ENTRY;
1498         lu_root_fid(fid);
1499         obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1500         if (IS_ERR(obj))
1501                 RETURN(PTR_ERR(obj));
1502
1503         mdt_lock_reg_init(lh, LCK_EX);
1504         rc = mdt_object_lock_internal(info, obj, &LUSTRE_BFL_FID, lh,
1505                                       &ibits, 0, false, false);
1506         mdt_object_put(info->mti_env, obj);
1507         RETURN(rc);
1508 }
1509
1510 static void mdt_rename_unlock(struct mdt_thread_info *info,
1511                               struct mdt_lock_handle *lh)
1512 {
1513         ENTRY;
1514         /* Cancel the single rename lock right away */
1515         mdt_object_unlock(info, NULL, lh, 1);
1516         EXIT;
1517 }
1518
1519 static struct mdt_object *mdt_parent_find_check(struct mdt_thread_info *info,
1520                                                 const struct lu_fid *fid,
1521                                                 int idx)
1522 {
1523         struct mdt_object *dir;
1524         int rc;
1525
1526         ENTRY;
1527         dir = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1528         if (IS_ERR(dir))
1529                 RETURN(dir);
1530
1531         /* check early, the real version will be saved after locking */
1532         rc = mdt_version_get_check(info, dir, idx);
1533         if (rc)
1534                 GOTO(out_put, rc);
1535
1536         if (!mdt_object_exists(dir))
1537                 GOTO(out_put, rc = -ENOENT);
1538
1539         if (!S_ISDIR(lu_object_attr(&dir->mot_obj)))
1540                 GOTO(out_put, rc = -ENOTDIR);
1541
1542         RETURN(dir);
1543 out_put:
1544         mdt_object_put(info->mti_env, dir);
1545         return ERR_PTR(rc);
1546 }
1547
1548 /*
1549  * in case obj is remote obj on its parent, revoke LOOKUP lock,
1550  * herein we don't really check it, just do revoke.
1551  */
1552 int mdt_revoke_remote_lookup_lock(struct mdt_thread_info *info,
1553                                   struct mdt_object *pobj,
1554                                   struct mdt_object *obj)
1555 {
1556         struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
1557         int rc;
1558
1559         rc = mdt_object_lookup_lock(info, pobj, obj, lh, LCK_EX, true);
1560         if (rc)
1561                 return rc;
1562
1563         /*
1564          * TODO, currently we don't save this lock because there is no place to
1565          * hold this lock handle, but to avoid race we need to save this lock.
1566          */
1567         mdt_object_unlock(info, NULL, lh, 1);
1568
1569         return 0;
1570 }
1571
1572 /*
1573  * operation may takes locks of linkea, or directory stripes, group them in
1574  * different list.
1575  */
1576 struct mdt_sub_lock {
1577         struct mdt_object *msl_obj;
1578         struct mdt_lock_handle msl_lh;
1579         struct list_head msl_linkage;
1580 };
1581
1582 static void mdt_unlock_list(struct mdt_thread_info *info,
1583                             struct list_head *list, int decref)
1584 {
1585         struct mdt_sub_lock *msl;
1586         struct mdt_sub_lock *tmp;
1587
1588         list_for_each_entry_safe(msl, tmp, list, msl_linkage) {
1589                 mdt_object_unlock_put(info, msl->msl_obj, &msl->msl_lh, decref);
1590                 list_del(&msl->msl_linkage);
1591                 OBD_FREE_PTR(msl);
1592         }
1593 }
1594
1595 static inline void mdt_migrate_object_unlock(struct mdt_thread_info *info,
1596                                              struct mdt_object *obj,
1597                                              struct mdt_lock_handle *lh,
1598                                              struct ldlm_enqueue_info *einfo,
1599                                              struct list_head *slave_locks,
1600                                              int decref)
1601 {
1602         if (mdt_object_remote(obj)) {
1603                 mdt_unlock_list(info, slave_locks, decref);
1604                 mdt_object_unlock(info, obj, lh, decref);
1605         } else {
1606                 mdt_object_stripes_unlock(info, obj, lh, einfo, decref);
1607         }
1608 }
1609
1610 /*
1611  * lock parents of links, and also check whether total locks don't exceed
1612  * RS_MAX_LOCKS.
1613  *
1614  * \retval      0 on success, and locks can be saved in ptlrpc_reply_stat
1615  * \retval      1 on success, but total lock count may exceed RS_MAX_LOCKS
1616  * \retval      -ev negative errno upon error
1617  */
1618 static int mdt_link_parents_lock(struct mdt_thread_info *info,
1619                                  struct mdt_object *pobj,
1620                                  const struct md_attr *ma,
1621                                  struct mdt_object *obj,
1622                                  struct mdt_lock_handle *lhp,
1623                                  struct ldlm_enqueue_info *peinfo,
1624                                  struct list_head *parent_slave_locks,
1625                                  struct list_head *link_locks)
1626 {
1627         struct mdt_device *mdt = info->mti_mdt;
1628         struct lu_buf *buf = &info->mti_big_buf;
1629         struct lu_name *lname = &info->mti_name;
1630         struct linkea_data ldata = { NULL };
1631         bool blocked = false;
1632         int local_lnkp_cnt = 0;
1633         int rc;
1634
1635         ENTRY;
1636         if (S_ISDIR(lu_object_attr(&obj->mot_obj)))
1637                 RETURN(0);
1638
1639         buf = lu_buf_check_and_alloc(buf, MAX_LINKEA_SIZE);
1640         if (buf->lb_buf == NULL)
1641                 RETURN(-ENOMEM);
1642
1643         ldata.ld_buf = buf;
1644         rc = mdt_links_read(info, obj, &ldata);
1645         if (rc) {
1646                 if (rc == -ENOENT || rc == -ENODATA)
1647                         rc = 0;
1648                 RETURN(rc);
1649         }
1650
1651         for (linkea_first_entry(&ldata); ldata.ld_lee && !rc;
1652              linkea_next_entry(&ldata)) {
1653                 struct mdt_object *lnkp;
1654                 struct mdt_sub_lock *msl;
1655                 struct lu_fid fid;
1656                 __u64 ibits;
1657
1658                 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, lname,
1659                                     &fid);
1660
1661                 /* check if it's also linked to parent */
1662                 if (lu_fid_eq(mdt_object_fid(pobj), &fid)) {
1663                         CDEBUG(D_INFO, "skip parent "DFID", reovke "DNAME"\n",
1664                                PFID(&fid), PNAME(lname));
1665                         /* in case link is remote object, revoke LOOKUP lock */
1666                         rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
1667                         continue;
1668                 }
1669
1670                 lnkp = NULL;
1671
1672                 /* check if it's linked to a stripe of parent */
1673                 if (ma->ma_valid & MA_LMV) {
1674                         struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1675                         struct lu_fid *stripe_fid = &info->mti_tmp_fid1;
1676                         int j = 0;
1677
1678                         for (; j < le32_to_cpu(lmv->lmv_stripe_count); j++) {
1679                                 fid_le_to_cpu(stripe_fid,
1680                                               &lmv->lmv_stripe_fids[j]);
1681                                 if (lu_fid_eq(stripe_fid, &fid)) {
1682                                         CDEBUG(D_INFO, "skip stripe "DFID
1683                                                ", reovke "DNAME"\n",
1684                                                PFID(&fid), PNAME(lname));
1685                                         lnkp = mdt_object_find(info->mti_env,
1686                                                                mdt, &fid);
1687                                         if (IS_ERR(lnkp))
1688                                                 GOTO(out, rc = PTR_ERR(lnkp));
1689                                         break;
1690                                 }
1691                         }
1692
1693                         if (lnkp) {
1694                                 rc = mdt_revoke_remote_lookup_lock(info, lnkp,
1695                                                                    obj);
1696                                 mdt_object_put(info->mti_env, lnkp);
1697                                 continue;
1698                         }
1699                 }
1700
1701                 /* Check if it's already locked */
1702                 list_for_each_entry(msl, link_locks, msl_linkage) {
1703                         if (lu_fid_eq(mdt_object_fid(msl->msl_obj), &fid)) {
1704                                 CDEBUG(D_INFO,
1705                                        DFID" was locked, revoke "DNAME"\n",
1706                                        PFID(&fid), PNAME(lname));
1707                                 lnkp = msl->msl_obj;
1708                                 break;
1709                         }
1710                 }
1711
1712                 if (lnkp) {
1713                         rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
1714                         continue;
1715                 }
1716
1717                 CDEBUG(D_INFO, "lock "DFID":"DNAME"\n",
1718                        PFID(&fid), PNAME(lname));
1719
1720                 lnkp = mdt_object_find(info->mti_env, mdt, &fid);
1721                 if (IS_ERR(lnkp)) {
1722                         CWARN("%s: cannot find obj "DFID": %ld\n",
1723                               mdt_obd_name(mdt), PFID(&fid), PTR_ERR(lnkp));
1724                         continue;
1725                 }
1726
1727                 if (!mdt_object_exists(lnkp)) {
1728                         CDEBUG(D_INFO, DFID" doesn't exist, skip "DNAME"\n",
1729                               PFID(&fid), PNAME(lname));
1730                         mdt_object_put(info->mti_env, lnkp);
1731                         continue;
1732                 }
1733
1734                 if (!mdt_object_remote(lnkp))
1735                         local_lnkp_cnt++;
1736
1737                 OBD_ALLOC_PTR(msl);
1738                 if (msl == NULL)
1739                         GOTO(out, rc = -ENOMEM);
1740
1741                 /*
1742                  * we can't follow parent-child lock order like other MD
1743                  * operations, use lock_try here to avoid deadlock, if the lock
1744                  * cannot be taken, drop all locks taken, revoke the blocked
1745                  * one, and continue processing the remaining entries, and in
1746                  * the end of the loop restart from beginning.
1747                  */
1748                 ibits = 0;
1749                 rc = mdt_object_lock_try(info, lnkp, &msl->msl_lh, &ibits,
1750                                          MDS_INODELOCK_UPDATE, LCK_PW, true);
1751                 if (!(ibits & MDS_INODELOCK_UPDATE)) {
1752
1753                         CDEBUG(D_INFO, "busy lock on "DFID" "DNAME"\n",
1754                                PFID(&fid), PNAME(lname));
1755
1756                         mdt_unlock_list(info, link_locks, 1);
1757                         /* also unlock parent locks to avoid deadlock */
1758                         if (!blocked)
1759                                 mdt_migrate_object_unlock(info, pobj, lhp,
1760                                                           peinfo,
1761                                                           parent_slave_locks,
1762                                                           1);
1763
1764                         blocked = true;
1765
1766                         rc = mdt_object_lock(info, lnkp, &msl->msl_lh,
1767                                              MDS_INODELOCK_UPDATE, LCK_PW,
1768                                              true);
1769                         if (rc) {
1770                                 mdt_object_put(info->mti_env, lnkp);
1771                                 OBD_FREE_PTR(msl);
1772                                 GOTO(out, rc);
1773                         }
1774
1775                         if (mdt_object_remote(lnkp)) {
1776                                 struct ldlm_lock *lock;
1777
1778                                 /*
1779                                  * for remote object, set lock cb_atomic,
1780                                  * so lock can be released in blocking_ast()
1781                                  * immediately, then the next lock_try will
1782                                  * have better chance of success.
1783                                  */
1784                                 lock = ldlm_handle2lock(
1785                                                 &msl->msl_lh.mlh_rreg_lh);
1786                                 LASSERT(lock != NULL);
1787                                 lock_res_and_lock(lock);
1788                                 ldlm_set_atomic_cb(lock);
1789                                 unlock_res_and_lock(lock);
1790                                 LDLM_LOCK_PUT(lock);
1791                         }
1792
1793                         mdt_object_unlock_put(info, lnkp, &msl->msl_lh, 1);
1794                         OBD_FREE_PTR(msl);
1795                         continue;
1796                 }
1797
1798                 INIT_LIST_HEAD(&msl->msl_linkage);
1799                 msl->msl_obj = lnkp;
1800                 list_add_tail(&msl->msl_linkage, link_locks);
1801
1802                 rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
1803         }
1804
1805         if (blocked)
1806                 GOTO(out, rc = -EBUSY);
1807
1808         EXIT;
1809 out:
1810         if (rc) {
1811                 mdt_unlock_list(info, link_locks, rc);
1812         } else if (local_lnkp_cnt > RS_MAX_LOCKS - 5) {
1813                 CDEBUG(D_INFO, "Too many links (%d), sync operations\n",
1814                        local_lnkp_cnt);
1815                 /*
1816                  * parent may have 3 local objects: master object and 2 stripes
1817                  * (if it's being migrated too); source may have 1 local objects
1818                  * as regular file; target has 1 local object.
1819                  * Note, source may have 2 local locks if it is directory but it
1820                  * can't have hardlinks, so it is not considered here.
1821                  */
1822                 rc = 1;
1823         }
1824         return rc;
1825 }
1826
1827 static int mdt_lock_remote_slaves(struct mdt_thread_info *info,
1828                                   struct mdt_object *obj,
1829                                   const struct md_attr *ma,
1830                                   struct list_head *slave_locks)
1831 {
1832         struct mdt_device *mdt = info->mti_mdt;
1833         const struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1834         struct lu_fid *fid = &info->mti_tmp_fid1;
1835         struct mdt_object *slave;
1836         struct mdt_sub_lock *msl;
1837         int i;
1838         int rc;
1839
1840         ENTRY;
1841         LASSERT(mdt_object_remote(obj));
1842         LASSERT(ma->ma_valid & MA_LMV);
1843         LASSERT(lmv);
1844
1845         if (!lmv_is_sane(lmv))
1846                 RETURN(-EINVAL);
1847
1848         for (i = 0; i < le32_to_cpu(lmv->lmv_stripe_count); i++) {
1849                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[i]);
1850
1851                 if (!fid_is_sane(fid))
1852                         continue;
1853
1854                 slave = mdt_object_find(info->mti_env, mdt, fid);
1855                 if (IS_ERR(slave))
1856                         GOTO(out, rc = PTR_ERR(slave));
1857
1858                 OBD_ALLOC_PTR(msl);
1859                 if (!msl) {
1860                         mdt_object_put(info->mti_env, slave);
1861                         GOTO(out, rc = -ENOMEM);
1862                 }
1863
1864                 rc = mdt_object_lock(info, slave, &msl->msl_lh,
1865                                      MDS_INODELOCK_UPDATE, LCK_EX, true);
1866                 if (rc) {
1867                         OBD_FREE_PTR(msl);
1868                         mdt_object_put(info->mti_env, slave);
1869                         GOTO(out, rc);
1870                 }
1871
1872                 INIT_LIST_HEAD(&msl->msl_linkage);
1873                 msl->msl_obj = slave;
1874                 list_add_tail(&msl->msl_linkage, slave_locks);
1875         }
1876         EXIT;
1877
1878 out:
1879         if (rc)
1880                 mdt_unlock_list(info, slave_locks, rc);
1881         return rc;
1882 }
1883
1884 /* lock parent and its stripes */
1885 static int mdt_migrate_parent_lock(struct mdt_thread_info *info,
1886                                    struct mdt_object *obj,
1887                                    const struct md_attr *ma,
1888                                    struct mdt_lock_handle *lh,
1889                                    struct ldlm_enqueue_info *einfo,
1890                                    struct list_head *slave_locks)
1891 {
1892         int rc;
1893
1894         if (mdt_object_remote(obj)) {
1895                 rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_UPDATE,
1896                                      LCK_PW, true);
1897                 if (rc)
1898                         return rc;
1899
1900                 /*
1901                  * if obj is remote and striped, lock its stripes explicitly
1902                  * because it's not striped in LOD layer on this MDT.
1903                  */
1904                 if (ma->ma_valid & MA_LMV) {
1905                         rc = mdt_lock_remote_slaves(info, obj, ma, slave_locks);
1906                         if (rc)
1907                                 mdt_object_unlock(info, obj, lh, rc);
1908                 }
1909         } else {
1910                 rc = mdt_object_stripes_lock(info, NULL, obj, lh, einfo,
1911                                              MDS_INODELOCK_UPDATE, LCK_PW,
1912                                              true);
1913         }
1914
1915         return rc;
1916 }
1917
1918 /*
1919  * in migration, object may be remote, and we need take full lock of it and its
1920  * stripes if it's directory, besides, object may be a remote object on its
1921  * parent, revoke its LOOKUP lock on where its parent is located.
1922  */
1923 static int mdt_migrate_object_lock(struct mdt_thread_info *info,
1924                                    struct mdt_object *pobj,
1925                                    struct mdt_object *obj,
1926                                    struct mdt_lock_handle *lh,
1927                                    struct ldlm_enqueue_info *einfo,
1928                                    struct list_head *slave_locks)
1929 {
1930         int rc;
1931
1932         if (mdt_object_remote(obj)) {
1933                 rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
1934                 if (rc)
1935                         return rc;
1936
1937                 rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_FULL, LCK_EX,
1938                                      true);
1939                 if (rc)
1940                         return rc;
1941
1942                 /*
1943                  * if obj is remote and striped, lock its stripes explicitly
1944                  * because it's not striped in LOD layer on this MDT.
1945                  */
1946                 if (S_ISDIR(lu_object_attr(&obj->mot_obj))) {
1947                         struct md_attr *ma = &info->mti_attr;
1948
1949                         rc = mdt_stripe_get(info, obj, ma, XATTR_NAME_LMV);
1950                         if (rc) {
1951                                 mdt_object_unlock(info, obj, lh, rc);
1952                                 return rc;
1953                         }
1954
1955                         if (ma->ma_valid & MA_LMV) {
1956                                 rc = mdt_lock_remote_slaves(info, obj, ma,
1957                                                             slave_locks);
1958                                 if (rc)
1959                                         mdt_object_unlock(info, obj, lh, rc);
1960                         }
1961                 }
1962         } else {
1963                 rc = mdt_object_stripes_lock(info, pobj, obj, lh, einfo,
1964                                              MDS_INODELOCK_FULL, LCK_EX, true);
1965         }
1966
1967         return rc;
1968 }
1969
1970 /*
1971  * lookup source by name, if parent is striped directory, we need to find the
1972  * corresponding stripe where source is located, and then lookup there.
1973  *
1974  * besides, if parent is migrating too, and file is already in target stripe,
1975  * this should be a redo of 'lfs migrate' on client side.
1976  */
1977 static int mdt_migrate_lookup(struct mdt_thread_info *info,
1978                               struct mdt_object *pobj,
1979                               const struct md_attr *ma,
1980                               const struct lu_name *lname,
1981                               struct mdt_object **spobj,
1982                               struct mdt_object **sobj)
1983 {
1984         const struct lu_env *env = info->mti_env;
1985         struct lu_fid *fid = &info->mti_tmp_fid1;
1986         struct mdt_object *stripe;
1987         int rc;
1988
1989         if (ma->ma_valid & MA_LMV) {
1990                 /* if parent is striped, lookup on corresponding stripe */
1991                 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1992
1993                 if (!lmv_is_sane(lmv))
1994                         return -EBADF;
1995
1996                 rc = lmv_name_to_stripe_index_old(lmv, lname->ln_name,
1997                                                   lname->ln_namelen);
1998                 if (rc < 0)
1999                         return rc;
2000
2001                 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
2002
2003                 stripe = mdt_object_find(env, info->mti_mdt, fid);
2004                 if (IS_ERR(stripe))
2005                         return PTR_ERR(stripe);
2006
2007                 fid_zero(fid);
2008                 rc = mdo_lookup(env, mdt_object_child(stripe), lname, fid,
2009                                 &info->mti_spec);
2010                 if (rc == -ENOENT && lmv_is_layout_changing(lmv)) {
2011                         /*
2012                          * if parent layout is changeing, and lookup child
2013                          * failed on source stripe, lookup again on target
2014                          * stripe, if it exists, it means previous migration
2015                          * was interrupted, and current file was migrated
2016                          * already.
2017                          */
2018                         mdt_object_put(env, stripe);
2019
2020                         rc = lmv_name_to_stripe_index(lmv, lname->ln_name,
2021                                                       lname->ln_namelen);
2022                         if (rc < 0)
2023                                 return rc;
2024
2025                         fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
2026
2027                         stripe = mdt_object_find(env, info->mti_mdt, fid);
2028                         if (IS_ERR(stripe))
2029                                 return PTR_ERR(stripe);
2030
2031                         fid_zero(fid);
2032                         rc = mdo_lookup(env, mdt_object_child(stripe), lname,
2033                                         fid, &info->mti_spec);
2034                         mdt_object_put(env, stripe);
2035                         return rc ?: -EALREADY;
2036                 } else if (rc) {
2037                         mdt_object_put(env, stripe);
2038                         return rc;
2039                 }
2040         } else {
2041                 fid_zero(fid);
2042                 rc = mdo_lookup(env, mdt_object_child(pobj), lname, fid,
2043                                 &info->mti_spec);
2044                 if (rc)
2045                         return rc;
2046
2047                 stripe = pobj;
2048                 mdt_object_get(env, stripe);
2049         }
2050
2051         *spobj = stripe;
2052
2053         *sobj = mdt_object_find(env, info->mti_mdt, fid);
2054         if (IS_ERR(*sobj)) {
2055                 mdt_object_put(env, stripe);
2056                 rc = PTR_ERR(*sobj);
2057                 *spobj = NULL;
2058                 *sobj = NULL;
2059         }
2060
2061         return rc;
2062 }
2063
2064 /* end lease and close file for regular file */
2065 static int mdd_migrate_close(struct mdt_thread_info *info,
2066                              struct mdt_object *obj)
2067 {
2068         struct close_data *data;
2069         struct mdt_body *repbody;
2070         struct ldlm_lock *lease;
2071         int rc;
2072         int rc2;
2073
2074         rc = -EPROTO;
2075         if (!req_capsule_field_present(info->mti_pill, &RMF_MDT_EPOCH,
2076                                       RCL_CLIENT) ||
2077             !req_capsule_field_present(info->mti_pill, &RMF_CLOSE_DATA,
2078                                       RCL_CLIENT))
2079                 goto close;
2080
2081         data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
2082         if (!data)
2083                 goto close;
2084
2085         rc = -ESTALE;
2086         lease = ldlm_handle2lock(&data->cd_handle);
2087         if (!lease)
2088                 goto close;
2089
2090         /* check if the lease was already canceled */
2091         lock_res_and_lock(lease);
2092         rc = ldlm_is_cancel(lease);
2093         unlock_res_and_lock(lease);
2094
2095         if (rc) {
2096                 rc = -EAGAIN;
2097                 LDLM_DEBUG(lease, DFID" lease broken",
2098                            PFID(mdt_object_fid(obj)));
2099         }
2100
2101         /*
2102          * cancel server side lease, client side counterpart should have been
2103          * cancelled, it's okay to cancel it now as we've held mot_open_sem.
2104          */
2105         ldlm_lock_cancel(lease);
2106         ldlm_reprocess_all(lease->l_resource,
2107                            lease->l_policy_data.l_inodebits.bits);
2108         LDLM_LOCK_PUT(lease);
2109
2110 close:
2111         rc2 = mdt_close_internal(info, mdt_info_req(info), NULL);
2112         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2113         repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
2114
2115         return rc ?: rc2;
2116 }
2117
2118 /*
2119  * migrate file in below steps:
2120  *  1. lock parent and its stripes
2121  *  2. lookup source by name
2122  *  3. lock parents of source links if source is not directory
2123  *  4. reject if source is in HSM
2124  *  5. take source open_sem and close file if source is regular file
2125  *  6. lock source and its stripes if it's directory
2126  *  7. lock target so subsequent change to it can trigger COS
2127  *  8. migrate file
2128  *  9. unlock above locks
2129  * 10. sync device if source has links
2130  */
2131 int mdt_reint_migrate(struct mdt_thread_info *info,
2132                       struct mdt_lock_handle *unused)
2133 {
2134         const struct lu_env *env = info->mti_env;
2135         struct mdt_device *mdt = info->mti_mdt;
2136         struct ptlrpc_request *req = mdt_info_req(info);
2137         struct mdt_reint_record *rr = &info->mti_rr;
2138         struct lu_ucred *uc = mdt_ucred(info);
2139         struct md_attr *ma = &info->mti_attr;
2140         struct ldlm_enqueue_info *peinfo = &info->mti_einfo[0];
2141         struct ldlm_enqueue_info *seinfo = &info->mti_einfo[1];
2142         struct mdt_object *pobj;
2143         struct mdt_object *spobj = NULL;
2144         struct mdt_object *sobj = NULL;
2145         struct mdt_object *tobj;
2146         struct mdt_lock_handle *rename_lh = &info->mti_lh[MDT_LH_RMT];
2147         struct mdt_lock_handle *lhp;
2148         struct mdt_lock_handle *lhs;
2149         struct mdt_lock_handle *lht;
2150         LIST_HEAD(parent_slave_locks);
2151         LIST_HEAD(child_slave_locks);
2152         LIST_HEAD(link_locks);
2153         int lock_retries = 5;
2154         bool open_sem_locked = false;
2155         bool do_sync = false;
2156         int rc;
2157
2158         ENTRY;
2159         CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1),
2160                PNAME(&rr->rr_name), PFID(rr->rr_fid2));
2161
2162         if (info->mti_dlm_req)
2163                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2164
2165         if (!fid_is_md_operative(rr->rr_fid1) ||
2166             !fid_is_md_operative(rr->rr_fid2))
2167                 RETURN(-EPERM);
2168
2169         /* don't allow migrate . or .. */
2170         if (lu_name_is_dot_or_dotdot(&rr->rr_name))
2171                 RETURN(-EBUSY);
2172
2173         if (!mdt->mdt_enable_remote_dir || !mdt->mdt_enable_dir_migration)
2174                 RETURN(-EPERM);
2175
2176         /* we want rbac roles to have precedence over any other
2177          * permission or capability checks
2178          */
2179         if (uc && (!uc->uc_rbac_dne_ops ||
2180                    (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
2181                     uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
2182                     mdt->mdt_enable_remote_dir_gid != -1)))
2183                 RETURN(-EPERM);
2184
2185         /*
2186          * Note: do not enqueue rename lock for replay request, because
2187          * if other MDT holds rename lock, but being blocked to wait for
2188          * this MDT to finish its recovery, and the failover MDT can not
2189          * get rename lock, which will cause deadlock.
2190          *
2191          * req is NULL if this is called by directory auto-split.
2192          */
2193         if (req && !req_is_replay(req)) {
2194                 rc = mdt_rename_lock(info, rename_lh);
2195                 if (rc != 0) {
2196                         CERROR("%s: can't lock FS for rename: rc = %d\n",
2197                                mdt_obd_name(info->mti_mdt), rc);
2198                         RETURN(rc);
2199                 }
2200         }
2201
2202         /* pobj is master object of parent */
2203         pobj = mdt_object_find(env, mdt, rr->rr_fid1);
2204         if (IS_ERR(pobj))
2205                 GOTO(unlock_rename, rc = PTR_ERR(pobj));
2206
2207         if (req) {
2208                 rc = mdt_version_get_check(info, pobj, 0);
2209                 if (rc)
2210                         GOTO(put_parent, rc);
2211         }
2212
2213         if (!mdt_object_exists(pobj))
2214                 GOTO(put_parent, rc = -ENOENT);
2215
2216         if (!S_ISDIR(lu_object_attr(&pobj->mot_obj)))
2217                 GOTO(put_parent, rc = -ENOTDIR);
2218
2219         rc = mdt_check_enc(info, pobj);
2220         if (rc)
2221                 GOTO(put_parent, rc);
2222
2223         rc = mdt_stripe_get(info, pobj, ma, XATTR_NAME_LMV);
2224         if (rc)
2225                 GOTO(put_parent, rc);
2226
2227 lock_parent:
2228         /* lock parent object */
2229         lhp = &info->mti_lh[MDT_LH_PARENT];
2230         rc = mdt_migrate_parent_lock(info, pobj, ma, lhp, peinfo,
2231                                      &parent_slave_locks);
2232         if (rc)
2233                 GOTO(put_parent, rc);
2234
2235         /*
2236          * spobj is the corresponding stripe against name if pobj is striped
2237          * directory, which is the real parent, and no need to lock, because
2238          * we've taken full lock of pobj.
2239          */
2240         rc = mdt_migrate_lookup(info, pobj, ma, &rr->rr_name, &spobj, &sobj);
2241         if (rc)
2242                 GOTO(unlock_parent, rc);
2243
2244         /* lock parents of source links, and revoke LOOKUP lock of links */
2245         rc = mdt_link_parents_lock(info, pobj, ma, sobj, lhp, peinfo,
2246                                    &parent_slave_locks, &link_locks);
2247         if (rc == -EBUSY && lock_retries-- > 0) {
2248                 mdt_object_put(env, sobj);
2249                 mdt_object_put(env, spobj);
2250                 goto lock_parent;
2251         }
2252
2253         if (rc < 0)
2254                 GOTO(put_source, rc);
2255
2256         /*
2257          * RS_MAX_LOCKS is the limit of number of locks that can be saved along
2258          * with one request, if total lock count exceeds this limit, we will
2259          * drop all locks after migration, and synchronous device in the end.
2260          */
2261         do_sync = rc;
2262
2263         /* TODO: DoM migration is not supported, migrate dirent only */
2264         if (S_ISREG(lu_object_attr(&sobj->mot_obj))) {
2265                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LOV);
2266                 if (rc)
2267                         GOTO(unlock_links, rc);
2268
2269                 if (ma->ma_valid & MA_LOV && mdt_lmm_dom_stripesize(ma->ma_lmm))
2270                         info->mti_spec.sp_migrate_nsonly = 1;
2271         } else if (S_ISDIR(lu_object_attr(&sobj->mot_obj))) {
2272                 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2273                 if (rc)
2274                         GOTO(unlock_links, rc);
2275
2276                 /* race with restripe/auto-split? */
2277                 if ((ma->ma_valid & MA_LMV) &&
2278                     lmv_is_restriping(&ma->ma_lmv->lmv_md_v1))
2279                         GOTO(unlock_links, rc = -EBUSY);
2280         }
2281
2282         /* if migration HSM is allowed */
2283         if (!mdt->mdt_opts.mo_migrate_hsm_allowed) {
2284                 ma->ma_need = MA_HSM;
2285                 ma->ma_valid = 0;
2286                 rc = mdt_attr_get_complex(info, sobj, ma);
2287                 if (rc)
2288                         GOTO(unlock_links, rc);
2289
2290                 if ((ma->ma_valid & MA_HSM) && ma->ma_hsm.mh_flags != 0)
2291                         GOTO(unlock_links, rc = -EOPNOTSUPP);
2292         }
2293
2294         /* end lease and close file for regular file */
2295         if (info->mti_spec.sp_migrate_close) {
2296                 /* try to hold open_sem so that nobody else can open the file */
2297                 if (!down_write_trylock(&sobj->mot_open_sem)) {
2298                         /* close anyway */
2299                         mdd_migrate_close(info, sobj);
2300                         GOTO(unlock_links, rc = -EBUSY);
2301                 } else {
2302                         open_sem_locked = true;
2303                         rc = mdd_migrate_close(info, sobj);
2304                         if (rc)
2305                                 GOTO(unlock_open_sem, rc);
2306                 }
2307         }
2308
2309         /* lock source */
2310         lhs = &info->mti_lh[MDT_LH_OLD];
2311         rc = mdt_migrate_object_lock(info, spobj, sobj, lhs, seinfo,
2312                                      &child_slave_locks);
2313         if (rc)
2314                 GOTO(unlock_open_sem, rc);
2315
2316         /* lock target */
2317         tobj = mdt_object_find(env, mdt, rr->rr_fid2);
2318         if (IS_ERR(tobj))
2319                 GOTO(unlock_source, rc = PTR_ERR(tobj));
2320
2321         lht = &info->mti_lh[MDT_LH_NEW];
2322         rc = mdt_object_lock(info, tobj, lht, MDS_INODELOCK_FULL, LCK_EX, true);
2323         if (rc)
2324                 GOTO(put_target, rc);
2325
2326         /* Don't do lookup sanity check. We know name doesn't exist. */
2327         info->mti_spec.sp_cr_lookup = 0;
2328         info->mti_spec.sp_feat = &dt_directory_features;
2329
2330         rc = mdo_migrate(env, mdt_object_child(pobj),
2331                          mdt_object_child(sobj), &rr->rr_name,
2332                          mdt_object_child(tobj),
2333                          &info->mti_spec, ma);
2334         if (!rc)
2335                 lprocfs_counter_incr(mdt->mdt_lu_dev.ld_obd->obd_md_stats,
2336                                      LPROC_MDT_MIGRATE + LPROC_MD_LAST_OPC);
2337         EXIT;
2338
2339         mdt_object_unlock(info, tobj, lht, rc);
2340 put_target:
2341         mdt_object_put(env, tobj);
2342 unlock_source:
2343         mdt_migrate_object_unlock(info, sobj, lhs, seinfo,
2344                                   &child_slave_locks, rc);
2345 unlock_open_sem:
2346         if (open_sem_locked)
2347                 up_write(&sobj->mot_open_sem);
2348 unlock_links:
2349         /* if we've got too many locks to save into RPC,
2350          * then just commit before the locks are released
2351          */
2352         if (!rc && do_sync)
2353                 mdt_device_sync(env, mdt);
2354         mdt_unlock_list(info, &link_locks, do_sync ? 1 : rc);
2355 put_source:
2356         mdt_object_put(env, sobj);
2357         mdt_object_put(env, spobj);
2358 unlock_parent:
2359         mdt_migrate_object_unlock(info, pobj, lhp, peinfo,
2360                                   &parent_slave_locks, rc);
2361 put_parent:
2362         mdt_object_put(env, pobj);
2363 unlock_rename:
2364         mdt_rename_unlock(info, rename_lh);
2365
2366         return rc;
2367 }
2368
2369 /*
2370  * determine lock order of sobj and tobj
2371  *
2372  * there are two situations we need to lock tobj before sobj:
2373  * 1. sobj is child of tobj
2374  * 2. sobj and tobj are stripes of a directory, and stripe index of sobj is
2375  *    larger than that of tobj
2376  *
2377  * \retval      1 lock tobj before sobj
2378  * \retval      0 lock sobj before tobj
2379  * \retval      -ev negative errno upon error
2380  */
2381 static int mdt_rename_determine_lock_order(struct mdt_thread_info *info,
2382                                            struct mdt_object *sobj,
2383                                            struct mdt_object *tobj)
2384 {
2385         struct md_attr *ma = &info->mti_attr;
2386         struct lu_fid *spfid = &info->mti_tmp_fid1;
2387         struct lu_fid *tpfid = &info->mti_tmp_fid2;
2388         struct lmv_mds_md_v1 *lmv;
2389         __u32 sindex;
2390         __u32 tindex;
2391         int rc;
2392
2393         /* sobj and tobj are the same */
2394         if (sobj == tobj)
2395                 return 0;
2396
2397         if (fid_is_root(mdt_object_fid(sobj)))
2398                 return 0;
2399
2400         if (fid_is_root(mdt_object_fid(tobj)))
2401                 return 1;
2402
2403         /* check whether sobj is child of tobj */
2404         rc = mdo_is_subdir(info->mti_env, mdt_object_child(sobj),
2405                            mdt_object_fid(tobj));
2406         if (rc < 0)
2407                 return rc;
2408
2409         if (rc == 1)
2410                 return 1;
2411
2412         /* check whether sobj and tobj are children of the same parent */
2413         rc = mdt_attr_get_pfid(info, sobj, spfid);
2414         if (rc)
2415                 return rc;
2416
2417         rc = mdt_attr_get_pfid(info, tobj, tpfid);
2418         if (rc)
2419                 return rc;
2420
2421         if (!lu_fid_eq(spfid, tpfid))
2422                 return 0;
2423
2424         /* check whether sobj and tobj are sibling stripes */
2425         rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2426         if (rc)
2427                 return rc;
2428
2429         if (!(ma->ma_valid & MA_LMV))
2430                 return 0;
2431
2432         lmv = &ma->ma_lmv->lmv_md_v1;
2433         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2434                 return 0;
2435         sindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2436
2437         ma->ma_valid = 0;
2438         rc = mdt_stripe_get(info, tobj, ma, XATTR_NAME_LMV);
2439         if (rc)
2440                 return rc;
2441
2442         if (!(ma->ma_valid & MA_LMV))
2443                 return -ENODATA;
2444
2445         lmv = &ma->ma_lmv->lmv_md_v1;
2446         if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2447                 return -EINVAL;
2448         tindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2449
2450         /* check stripe index of sobj and tobj */
2451         if (sindex == tindex)
2452                 return -EINVAL;
2453
2454         return sindex < tindex ? 0 : 1;
2455 }
2456
2457 /*
2458  * lock rename source object.
2459  *
2460  * Both source and source parent may be remote, and source may be a remote
2461  * object on source parent, to avoid overriding lock handle, store remote
2462  * LOOKUP lock separately in @lhr.
2463  *
2464  * \retval      0 on success
2465  * \retval      -ev negative errno upon error
2466  */
2467 static int mdt_rename_source_lock(struct mdt_thread_info *info,
2468                                   struct mdt_object *parent,
2469                                   struct mdt_object *child,
2470                                   struct mdt_lock_handle *lhc,
2471                                   struct mdt_lock_handle *lhr,
2472                                   __u64 ibits,
2473                                   bool cos_incompat)
2474 {
2475         int rc;
2476
2477         rc = mdt_is_remote_object(info, parent, child);
2478         if (rc < 0)
2479                 return rc;
2480
2481         if (rc == 1) {
2482                 rc = mdt_object_lookup_lock(info, parent, child, lhr, LCK_EX,
2483                                             cos_incompat);
2484                 if (rc)
2485                         return rc;
2486
2487                 ibits &= ~MDS_INODELOCK_LOOKUP;
2488         }
2489
2490         rc = mdt_object_lock(info, child, lhc, ibits, LCK_EX, cos_incompat);
2491         if (rc && !(ibits & MDS_INODELOCK_LOOKUP))
2492                 mdt_object_unlock(info, NULL, lhr, rc);
2493
2494         return rc;
2495 }
2496
2497 /* Helper function for mdt_reint_rename so we don't need to opencode
2498  * two different order lockings
2499  */
2500 static int mdt_lock_two_dirs(struct mdt_thread_info *info,
2501                              struct mdt_object *mfirstdir,
2502                              struct mdt_lock_handle *lh_firstdirp,
2503                              const struct lu_name *firstname,
2504                              struct mdt_object *mseconddir,
2505                              struct mdt_lock_handle *lh_seconddirp,
2506                              const struct lu_name *secondname,
2507                              bool cos_incompat)
2508 {
2509         int rc;
2510
2511         rc = mdt_parent_lock(info, mfirstdir, lh_firstdirp, firstname, LCK_PW,
2512                              cos_incompat);
2513         if (rc)
2514                 return rc;
2515
2516         mdt_version_get_save(info, mfirstdir, 0);
2517         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
2518
2519         if (mfirstdir != mseconddir) {
2520                 rc = mdt_parent_lock(info, mseconddir, lh_seconddirp,
2521                                      secondname, LCK_PW, cos_incompat);
2522         } else if (!mdt_object_remote(mseconddir)) {
2523                 if (lh_firstdirp->mlh_pdo_hash !=
2524                     lh_seconddirp->mlh_pdo_hash) {
2525                         rc = mdt_object_pdo_lock(info, mseconddir,
2526                                                  lh_seconddirp, secondname,
2527                                                  LCK_PW, false, cos_incompat);
2528                         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
2529                 }
2530         }
2531         mdt_version_get_save(info, mseconddir, 1);
2532
2533         if (rc != 0)
2534                 mdt_object_unlock(info, mfirstdir, lh_firstdirp, rc);
2535
2536         return rc;
2537 }
2538
2539 /*
2540  * VBR: rename versions in reply: 0 - srcdir parent; 1 - tgtdir parent;
2541  * 2 - srcdir child; 3 - tgtdir child.
2542  * Update on disk version of srcdir child.
2543  */
2544 static int mdt_reint_rename(struct mdt_thread_info *info,
2545                             struct mdt_lock_handle *unused)
2546 {
2547         struct mdt_device *mdt = info->mti_mdt;
2548         struct mdt_reint_record *rr = &info->mti_rr;
2549         struct md_attr *ma = &info->mti_attr;
2550         struct ptlrpc_request *req = mdt_info_req(info);
2551         struct mdt_object *msrcdir = NULL;
2552         struct mdt_object *mtgtdir = NULL;
2553         struct mdt_object *mold;
2554         struct mdt_object *mnew = NULL;
2555         struct mdt_lock_handle *rename_lh = &info->mti_lh[MDT_LH_RMT];
2556         struct mdt_lock_handle *lh_srcdirp;
2557         struct mdt_lock_handle *lh_tgtdirp;
2558         struct mdt_lock_handle *lh_oldp = NULL;
2559         struct mdt_lock_handle *lh_rmt = NULL;
2560         struct mdt_lock_handle *lh_newp = NULL;
2561         struct lu_fid *old_fid = &info->mti_tmp_fid1;
2562         struct lu_fid *new_fid = &info->mti_tmp_fid2;
2563         struct lu_ucred *uc = mdt_ucred(info);
2564         bool reverse = false, discard = false;
2565         bool cos_incompat;
2566         ktime_t kstart = ktime_get();
2567         enum mdt_stat_idx msi = 0;
2568         int rc;
2569
2570         ENTRY;
2571         DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME,
2572                   PFID(rr->rr_fid1), PNAME(&rr->rr_name),
2573                   PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name));
2574
2575         if (info->mti_dlm_req)
2576                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2577
2578         if (!fid_is_md_operative(rr->rr_fid1) ||
2579             !fid_is_md_operative(rr->rr_fid2))
2580                 RETURN(-EPERM);
2581
2582         /* find both parents. */
2583         msrcdir = mdt_parent_find_check(info, rr->rr_fid1, 0);
2584         if (IS_ERR(msrcdir))
2585                 RETURN(PTR_ERR(msrcdir));
2586
2587         rc = mdt_check_enc(info, msrcdir);
2588         if (rc)
2589                 GOTO(out_put_srcdir, rc);
2590
2591         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
2592
2593         if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
2594                 mtgtdir = msrcdir;
2595                 mdt_object_get(info->mti_env, mtgtdir);
2596         } else {
2597                 mtgtdir = mdt_parent_find_check(info, rr->rr_fid2, 1);
2598                 if (IS_ERR(mtgtdir))
2599                         GOTO(out_put_srcdir, rc = PTR_ERR(mtgtdir));
2600         }
2601
2602         rc = mdt_check_enc(info, mtgtdir);
2603         if (rc)
2604                 GOTO(out_put_tgtdir, rc);
2605
2606         if (!uc->uc_rbac_fscrypt_admin &&
2607             mtgtdir->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD)
2608                 GOTO(out_put_tgtdir, rc = -EPERM);
2609
2610         /*
2611          * Note: do not enqueue rename lock for replay request, because
2612          * if other MDT holds rename lock, but being blocked to wait for
2613          * this MDT to finish its recovery, and the failover MDT can not
2614          * get rename lock, which will cause deadlock.
2615          */
2616         if (!req_is_replay(req)) {
2617                 bool remote = mdt_object_remote(msrcdir);
2618
2619                 /*
2620                  * Normally rename RPC is handled on the MDT with the target
2621                  * directory (if target exists, it's on the MDT with the
2622                  * target), if the source directory is remote, it's a hint that
2623                  * source is remote too (this may not be true, but it won't
2624                  * cause any issue), return -EXDEV early to avoid taking
2625                  * rename_lock.
2626                  */
2627                 if (!mdt->mdt_enable_remote_rename && remote)
2628                         GOTO(out_put_tgtdir, rc = -EXDEV);
2629
2630                 /* This might be further relaxed in the future for regular file
2631                  * renames in different source and target parents. Start with
2632                  * only same-directory renames for simplicity and because this
2633                  * is by far the most the common use case.
2634                  *
2635                  * Striped directories should be considered "remote".
2636                  */
2637                 if (msrcdir != mtgtdir || remote ||
2638                     (S_ISDIR(ma->ma_attr.la_mode) &&
2639                      !mdt->mdt_enable_parallel_rename_dir) ||
2640                     (!S_ISDIR(ma->ma_attr.la_mode) &&
2641                      !mdt->mdt_enable_parallel_rename_file)) {
2642                         rc = mdt_rename_lock(info, rename_lh);
2643                         if (rc != 0) {
2644                                 CERROR("%s: cannot lock for rename: rc = %d\n",
2645                                        mdt_obd_name(mdt), rc);
2646                                 GOTO(out_put_tgtdir, rc);
2647                         }
2648                 } else {
2649                         if (S_ISDIR(ma->ma_attr.la_mode))
2650                                 msi = LPROC_MDT_RENAME_PAR_DIR;
2651                         else
2652                                 msi = LPROC_MDT_RENAME_PAR_FILE;
2653
2654                         CDEBUG(D_INFO,
2655                                "%s: samedir parallel rename "DFID"/"DNAME"\n",
2656                                mdt_obd_name(mdt), PFID(rr->rr_fid1),
2657                                PNAME(&rr->rr_name));
2658                 }
2659         }
2660
2661         rc = mdt_rename_determine_lock_order(info, msrcdir, mtgtdir);
2662         if (rc < 0)
2663                 GOTO(out_unlock_rename, rc);
2664         reverse = rc;
2665
2666         /* source needs to be looked up after locking source parent, otherwise
2667          * this rename may race with unlink source, and cause rename hang, see
2668          * sanityn.sh 55b, so check parents first, if later we found source is
2669          * remote, relock parents.
2670          */
2671         cos_incompat = (mdt_object_remote(msrcdir) ||
2672                         mdt_object_remote(mtgtdir));
2673
2674         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2675
2676         /* lock parents in the proper order. */
2677         lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
2678         lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
2679
2680         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN);
2681         OBD_RACE(OBD_FAIL_MDS_REINT_OPEN2);
2682 relock:
2683         mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name);
2684         mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name);
2685
2686         /* In case of same dir local rename we must sort by the hash,
2687          * otherwise a lock deadlock is possible when renaming
2688          * a to b and b to a at the same time LU-15285
2689          */
2690         if (!mdt_object_remote(mtgtdir) && mtgtdir == msrcdir)
2691                 reverse = lh_srcdirp->mlh_pdo_hash > lh_tgtdirp->mlh_pdo_hash;
2692         if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK)))
2693                 reverse = 0;
2694
2695         if (reverse)
2696                 rc = mdt_lock_two_dirs(info, mtgtdir, lh_tgtdirp,
2697                                        &rr->rr_tgt_name, msrcdir, lh_srcdirp,
2698                                        &rr->rr_name, cos_incompat);
2699         else
2700                 rc = mdt_lock_two_dirs(info, msrcdir, lh_srcdirp, &rr->rr_name,
2701                                        mtgtdir, lh_tgtdirp, &rr->rr_tgt_name,
2702                                        cos_incompat);
2703
2704         if (rc != 0)
2705                 GOTO(out_unlock_rename, rc);
2706
2707         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2708         OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME2, 5);
2709
2710         /* find mold object. */
2711         fid_zero(old_fid);
2712         rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2);
2713         if (rc != 0)
2714                 GOTO(out_unlock_parents, rc);
2715
2716         if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
2717                 GOTO(out_unlock_parents, rc = -EINVAL);
2718
2719         if (!fid_is_md_operative(old_fid))
2720                 GOTO(out_unlock_parents, rc = -EPERM);
2721
2722         mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
2723         if (IS_ERR(mold))
2724                 GOTO(out_unlock_parents, rc = PTR_ERR(mold));
2725
2726         if (!mdt_object_exists(mold)) {
2727                 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2728                                 &mold->mot_obj,
2729                                 "object does not exist");
2730                 GOTO(out_put_old, rc = -ENOENT);
2731         }
2732
2733         if (mdt_object_remote(mold) && !mdt->mdt_enable_remote_rename)
2734                 GOTO(out_put_old, rc = -EXDEV);
2735
2736         /* Check if @mtgtdir is subdir of @mold, before locking child
2737          * to avoid reverse locking.
2738          */
2739         if (mtgtdir != msrcdir) {
2740                 rc = mdo_is_subdir(info->mti_env, mdt_object_child(mtgtdir),
2741                                    old_fid);
2742                 if (rc) {
2743                         if (rc == 1)
2744                                 rc = -EINVAL;
2745                         GOTO(out_put_old, rc);
2746                 }
2747         }
2748
2749         tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold));
2750         /* save version after locking */
2751         mdt_version_get_save(info, mold, 2);
2752
2753         if (!cos_incompat && mdt_object_remote(mold)) {
2754                 cos_incompat = true;
2755                 mdt_object_put(info->mti_env, mold);
2756                 mdt_object_unlock(info, mtgtdir, lh_tgtdirp, -EAGAIN);
2757                 mdt_object_unlock(info, msrcdir, lh_srcdirp, -EAGAIN);
2758                 goto relock;
2759         }
2760
2761         /* find mnew object:
2762          * mnew target object may not exist now
2763          * lookup with version checking
2764          */
2765         fid_zero(new_fid);
2766         rc = mdt_lookup_version_check(info, mtgtdir, &rr->rr_tgt_name, new_fid,
2767                                       3);
2768         if (rc == 0) {
2769                 /* the new_fid should have been filled at this moment */
2770                 if (lu_fid_eq(old_fid, new_fid))
2771                         GOTO(out_put_old, rc);
2772
2773                 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
2774                     lu_fid_eq(new_fid, rr->rr_fid2))
2775                         GOTO(out_put_old, rc = -EINVAL);
2776
2777                 if (!fid_is_md_operative(new_fid))
2778                         GOTO(out_put_old, rc = -EPERM);
2779
2780                 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
2781                 if (IS_ERR(mnew))
2782                         GOTO(out_put_old, rc = PTR_ERR(mnew));
2783
2784                 if (!mdt_object_exists(mnew)) {
2785                         LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2786                                         &mnew->mot_obj,
2787                                         "object does not exist");
2788                         GOTO(out_put_new, rc = -ENOENT);
2789                 }
2790
2791                 if (mdt_object_remote(mnew)) {
2792                         struct mdt_body  *repbody;
2793
2794                         /* Always send rename req to the target child MDT */
2795                         repbody = req_capsule_server_get(info->mti_pill,
2796                                                          &RMF_MDT_BODY);
2797                         LASSERT(repbody != NULL);
2798                         repbody->mbo_fid1 = *new_fid;
2799                         repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
2800                         GOTO(out_put_new, rc = -EXDEV);
2801                 }
2802                 /* Before locking the target dir, check we do not replace
2803                  * a dir with a non-dir, otherwise it may deadlock with
2804                  * link op which tries to create a link in this dir
2805                  * back to this non-dir.
2806                  */
2807                 if (S_ISDIR(lu_object_attr(&mnew->mot_obj)) &&
2808                     !S_ISDIR(lu_object_attr(&mold->mot_obj)))
2809                         GOTO(out_put_new, rc = -EISDIR);
2810
2811                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2812                 lh_rmt = &info->mti_lh[MDT_LH_LOOKUP];
2813                 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2814                                             lh_rmt, MDS_INODELOCK_LOOKUP |
2815                                             MDS_INODELOCK_XATTR, cos_incompat);
2816                 if (rc < 0)
2817                         GOTO(out_put_new, rc);
2818
2819                 /* Check if @msrcdir is subdir of @mnew, before locking child
2820                  * to avoid reverse locking.
2821                  */
2822                 if (mtgtdir != msrcdir) {
2823                         rc = mdo_is_subdir(info->mti_env,
2824                                            mdt_object_child(msrcdir), new_fid);
2825                         if (rc) {
2826                                 if (rc == 1)
2827                                         rc = -EINVAL;
2828                                 GOTO(out_unlock_old, rc);
2829                         }
2830                 }
2831
2832                 /* We used to acquire MDS_INODELOCK_FULL here but we
2833                  * can't do this now because a running HSM restore on
2834                  * the rename onto victim will hold the layout
2835                  * lock. See LU-4002.
2836                  */
2837
2838                 lh_newp = &info->mti_lh[MDT_LH_NEW];
2839                 rc = mdt_object_check_lock(info, mtgtdir, mnew, lh_newp,
2840                                            MDS_INODELOCK_LOOKUP |
2841                                            MDS_INODELOCK_UPDATE, LCK_EX,
2842                                            cos_incompat);
2843                 if (rc != 0)
2844                         GOTO(out_unlock_new, rc);
2845
2846                 /* get and save version after locking */
2847                 mdt_version_get_save(info, mnew, 3);
2848         } else if (rc != -ENOENT) {
2849                 GOTO(out_put_old, rc);
2850         } else {
2851                 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2852                 lh_rmt = &info->mti_lh[MDT_LH_LOOKUP];
2853                 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2854                                             lh_rmt, MDS_INODELOCK_LOOKUP |
2855                                             MDS_INODELOCK_XATTR, cos_incompat);
2856                 if (rc != 0)
2857                         GOTO(out_put_old, rc);
2858
2859                 mdt_enoent_version_save(info, 3);
2860         }
2861
2862         /* step 5: rename it */
2863         mdt_reint_init_ma(info, ma);
2864
2865         mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
2866                        OBD_FAIL_MDS_REINT_RENAME_WRITE);
2867
2868         if (mnew != NULL)
2869                 mutex_lock(&mnew->mot_lov_mutex);
2870
2871         rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
2872                         mdt_object_child(mtgtdir), old_fid, &rr->rr_name,
2873                         mnew != NULL ? mdt_object_child(mnew) : NULL,
2874                         &rr->rr_tgt_name, ma);
2875
2876         if (mnew != NULL)
2877                 mutex_unlock(&mnew->mot_lov_mutex);
2878
2879         /* handle last link of tgt object */
2880         if (rc == 0) {
2881                 if (mnew) {
2882                         mdt_handle_last_unlink(info, mnew, ma);
2883                         discard = mdt_dom_check_for_discard(info, mnew);
2884                 }
2885                 mdt_rename_counter_tally(info, info->mti_mdt, req,
2886                                          msrcdir, mtgtdir, msi,
2887                                          ktime_us_delta(ktime_get(), kstart));
2888         }
2889
2890         EXIT;
2891 out_unlock_new:
2892         if (mnew != NULL)
2893                 mdt_object_unlock(info, mnew, lh_newp, rc);
2894 out_unlock_old:
2895         mdt_object_unlock(info, NULL, lh_rmt, rc);
2896         mdt_object_unlock(info, mold, lh_oldp, rc);
2897 out_put_new:
2898         if (mnew && !discard)
2899                 mdt_object_put(info->mti_env, mnew);
2900 out_put_old:
2901         mdt_object_put(info->mti_env, mold);
2902 out_unlock_parents:
2903         mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
2904         mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
2905 out_unlock_rename:
2906         mdt_rename_unlock(info, rename_lh);
2907 out_put_tgtdir:
2908         mdt_object_put(info->mti_env, mtgtdir);
2909 out_put_srcdir:
2910         mdt_object_put(info->mti_env, msrcdir);
2911
2912         /* The DoM discard can be done right in the place above where it is
2913          * assigned, meanwhile it is done here after rename unlock due to
2914          * compatibility with old clients, for them the discard blocks
2915          * the main thread until completion. Check LU-11359 for details.
2916          */
2917         if (discard) {
2918                 mdt_dom_discard_data(info, mnew);
2919                 mdt_object_put(info->mti_env, mnew);
2920         }
2921         OBD_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
2922         return rc;
2923 }
2924
2925 static int mdt_reint_resync(struct mdt_thread_info *info,
2926                             struct mdt_lock_handle *lhc)
2927 {
2928         struct mdt_reint_record *rr = &info->mti_rr;
2929         struct ptlrpc_request *req = mdt_info_req(info);
2930         struct md_attr *ma = &info->mti_attr;
2931         struct mdt_object *mo;
2932         struct ldlm_lock *lease;
2933         struct mdt_body *repbody;
2934         struct md_layout_change layout = { .mlc_mirror_id = rr->rr_mirror_id };
2935         bool lease_broken;
2936         int rc;
2937
2938         ENTRY;
2939         DEBUG_REQ(D_INODE, req, DFID", FLR file resync", PFID(rr->rr_fid1));
2940
2941         if (info->mti_dlm_req)
2942                 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2943
2944         mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
2945         if (IS_ERR(mo))
2946                 GOTO(out, rc = PTR_ERR(mo));
2947
2948         if (!mdt_object_exists(mo))
2949                 GOTO(out_obj, rc = -ENOENT);
2950
2951         if (!S_ISREG(lu_object_attr(&mo->mot_obj)))
2952                 GOTO(out_obj, rc = -EINVAL);
2953
2954         if (mdt_object_remote(mo))
2955                 GOTO(out_obj, rc = -EREMOTE);
2956
2957         lease = ldlm_handle2lock(rr->rr_lease_handle);
2958         if (lease == NULL)
2959                 GOTO(out_obj, rc = -ESTALE);
2960
2961         /* It's really necessary to grab open_sem and check if the lease lock
2962          * has been lost. There would exist a concurrent writer coming in and
2963          * generating some dirty data in memory cache, the writeback would fail
2964          * after the layout version is increased by MDS_REINT_RESYNC RPC.
2965          */
2966         if (!down_write_trylock(&mo->mot_open_sem))
2967                 GOTO(out_put_lease, rc = -EBUSY);
2968
2969         lock_res_and_lock(lease);
2970         lease_broken = ldlm_is_cancel(lease);
2971         unlock_res_and_lock(lease);
2972         if (lease_broken)
2973                 GOTO(out_unlock, rc = -EBUSY);
2974
2975         /* the file has yet opened by anyone else after we took the lease. */
2976         layout.mlc_opc = MD_LAYOUT_RESYNC;
2977         lhc = &info->mti_lh[MDT_LH_LOCAL];
2978         rc = mdt_layout_change(info, mo, lhc, &layout);
2979         if (rc)
2980                 GOTO(out_unlock, rc);
2981
2982         mdt_object_unlock(info, mo, lhc, 0);
2983
2984         ma->ma_need = MA_INODE;
2985         ma->ma_valid = 0;
2986         rc = mdt_attr_get_complex(info, mo, ma);
2987         if (rc != 0)
2988                 GOTO(out_unlock, rc);
2989
2990         repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2991         mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
2992
2993         EXIT;
2994 out_unlock:
2995         up_write(&mo->mot_open_sem);
2996 out_put_lease:
2997         LDLM_LOCK_PUT(lease);
2998 out_obj:
2999         mdt_object_put(info->mti_env, mo);
3000 out:
3001         mdt_client_compatibility(info);
3002         return rc;
3003 }
3004
3005 struct mdt_reinter {
3006         int (*mr_handler)(struct mdt_thread_info *, struct mdt_lock_handle *);
3007         enum lprocfs_extra_opc mr_extra_opc;
3008 };
3009
3010 static const struct mdt_reinter mdt_reinters[] = {
3011         [REINT_SETATTR] = {
3012                 .mr_handler = &mdt_reint_setattr,
3013                 .mr_extra_opc = MDS_REINT_SETATTR,
3014         },
3015         [REINT_CREATE] = {
3016                 .mr_handler = &mdt_reint_create,
3017                 .mr_extra_opc = MDS_REINT_CREATE,
3018         },
3019         [REINT_LINK] = {
3020                 .mr_handler = &mdt_reint_link,
3021                 .mr_extra_opc = MDS_REINT_LINK,
3022         },
3023         [REINT_UNLINK] = {
3024                 .mr_handler = &mdt_reint_unlink,
3025                 .mr_extra_opc = MDS_REINT_UNLINK,
3026         },
3027         [REINT_RENAME] = {
3028                 .mr_handler = &mdt_reint_rename,
3029                 .mr_extra_opc = MDS_REINT_RENAME,
3030         },
3031         [REINT_OPEN] = {
3032                 .mr_handler = &mdt_reint_open,
3033                 .mr_extra_opc = MDS_REINT_OPEN,
3034         },
3035         [REINT_SETXATTR] = {
3036                 .mr_handler = &mdt_reint_setxattr,
3037                 .mr_extra_opc = MDS_REINT_SETXATTR,
3038         },
3039         [REINT_RMENTRY] = {
3040                 .mr_handler = &mdt_reint_unlink,
3041                 .mr_extra_opc = MDS_REINT_UNLINK,
3042         },
3043         [REINT_MIGRATE] = {
3044                 .mr_handler = &mdt_reint_migrate,
3045                 .mr_extra_opc = MDS_REINT_RENAME,
3046         },
3047         [REINT_RESYNC] = {
3048                 .mr_handler = &mdt_reint_resync,
3049                 .mr_extra_opc = MDS_REINT_RESYNC,
3050         },
3051 };
3052
3053 int mdt_reint_rec(struct mdt_thread_info *info,
3054                   struct mdt_lock_handle *lhc)
3055 {
3056         const struct mdt_reinter *mr;
3057         int rc;
3058
3059         ENTRY;
3060         if (!(info->mti_rr.rr_opcode < ARRAY_SIZE(mdt_reinters)))
3061                 RETURN(-EPROTO);
3062
3063         mr = &mdt_reinters[info->mti_rr.rr_opcode];
3064         if (mr->mr_handler == NULL)
3065                 RETURN(-EPROTO);
3066
3067         rc = (*mr->mr_handler)(info, lhc);
3068
3069         lprocfs_counter_incr(ptlrpc_req2svc(mdt_info_req(info))->srv_stats,
3070                              PTLRPC_LAST_CNTR + mr->mr_extra_opc);
3071
3072         RETURN(rc);
3073 }