4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
31 * lustre/mdt/mdt_reint.c
33 * Lustre Metadata Target (mdt) reintegration routines
35 * Author: Peter Braam <braam@clusterfs.com>
36 * Author: Andreas Dilger <adilger@clusterfs.com>
37 * Author: Phil Schwan <phil@clusterfs.com>
38 * Author: Huang Hua <huanghua@clusterfs.com>
39 * Author: Yury Umanets <umka@clusterfs.com>
42 #define DEBUG_SUBSYSTEM S_MDS
44 #include <lprocfs_status.h>
45 #include "mdt_internal.h"
46 #include <lustre_lmv.h>
47 #include <lustre_crypto.h>
49 static inline void mdt_reint_init_ma(struct mdt_thread_info *info,
52 ma->ma_need = MA_INODE;
57 * Get version of object by fid.
59 * Return real version or ENOENT_VERSION if object doesn't exist
61 static void mdt_obj_version_get(struct mdt_thread_info *info,
62 struct mdt_object *o, __u64 *version)
66 if (mdt_object_exists(o) && !mdt_object_remote(o) &&
67 !fid_is_obf(mdt_object_fid(o)))
68 *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
70 *version = ENOENT_VERSION;
71 CDEBUG(D_INODE, "FID "DFID" version is %#llx\n",
72 PFID(mdt_object_fid(o)), *version);
76 * Check version is correct.
78 * Should be called only during replay.
80 static int mdt_version_check(struct ptlrpc_request *req,
81 __u64 version, int idx)
83 __u64 *pre_ver = lustre_msg_get_versions(req->rq_reqmsg);
86 if (!exp_connect_vbr(req->rq_export))
89 LASSERT(req_is_replay(req));
90 /** VBR: version is checked always because costs nothing */
91 LASSERT(idx < PTLRPC_NUM_VERSIONS);
92 /** Sanity check for malformed buffers */
93 if (pre_ver == NULL) {
94 CERROR("No versions in request buffer\n");
95 spin_lock(&req->rq_export->exp_lock);
96 req->rq_export->exp_vbr_failed = 1;
97 spin_unlock(&req->rq_export->exp_lock);
99 } else if (pre_ver[idx] != version) {
100 CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n",
101 pre_ver[idx], version);
102 spin_lock(&req->rq_export->exp_lock);
103 req->rq_export->exp_vbr_failed = 1;
104 spin_unlock(&req->rq_export->exp_lock);
111 * Save pre-versions in reply.
113 static void mdt_version_save(struct ptlrpc_request *req, __u64 version,
118 if (!exp_connect_vbr(req->rq_export))
121 LASSERT(!req_is_replay(req));
122 LASSERT(req->rq_repmsg != NULL);
123 reply_ver = lustre_msg_get_versions(req->rq_repmsg);
125 reply_ver[idx] = version;
129 * Save enoent version, it is needed when it is obvious that object doesn't
130 * exist, e.g. child during create.
132 static void mdt_enoent_version_save(struct mdt_thread_info *info, int idx)
134 /* save version of file name for replay, it must be ENOENT here */
135 if (!req_is_replay(mdt_info_req(info))) {
136 info->mti_ver[idx] = ENOENT_VERSION;
137 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
142 * Get version from disk and save in reply buffer.
144 * Versions are saved in reply only during normal operations not replays.
146 void mdt_version_get_save(struct mdt_thread_info *info,
147 struct mdt_object *mto, int idx)
149 /* don't save versions during replay */
150 if (!req_is_replay(mdt_info_req(info))) {
151 mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
152 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
157 * Get version from disk and check it, no save in reply.
159 int mdt_version_get_check(struct mdt_thread_info *info,
160 struct mdt_object *mto, int idx)
162 /* only check versions during replay */
163 if (!req_is_replay(mdt_info_req(info)))
166 mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
167 return mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
171 * Get version from disk and check if recovery or just save.
173 int mdt_version_get_check_save(struct mdt_thread_info *info,
174 struct mdt_object *mto, int idx)
178 mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
179 if (req_is_replay(mdt_info_req(info)))
180 rc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx],
183 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
188 * Lookup with version checking.
190 * This checks version of 'name'. Many reint functions uses 'name' for child not
191 * FID, therefore we need to get object by name and check its version.
193 int mdt_lookup_version_check(struct mdt_thread_info *info,
194 struct mdt_object *p,
195 const struct lu_name *lname,
196 struct lu_fid *fid, int idx)
200 rc = mdo_lookup(info->mti_env, mdt_object_child(p), lname, fid,
202 /* Check version only during replay */
203 if (!req_is_replay(mdt_info_req(info)))
206 info->mti_ver[idx] = ENOENT_VERSION;
208 struct mdt_object *child;
210 child = mdt_object_find(info->mti_env, info->mti_mdt, fid);
211 if (likely(!IS_ERR(child))) {
212 mdt_obj_version_get(info, child, &info->mti_ver[idx]);
213 mdt_object_put(info->mti_env, child);
216 vbrc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
217 return vbrc ? vbrc : rc;
221 static int mdt_stripes_unlock(struct mdt_thread_info *mti,
222 struct mdt_object *obj,
223 struct ldlm_enqueue_info *einfo,
226 union ldlm_policy_data *policy = &mti->mti_policy;
227 struct mdt_lock_handle *lh = &mti->mti_lh[MDT_LH_LOCAL];
228 struct lustre_handle_array *locks = einfo->ei_cbdata;
231 LASSERT(S_ISDIR(obj->mot_header.loh_attr));
234 memset(policy, 0, sizeof(*policy));
235 policy->l_inodebits.bits = einfo->ei_inodebits;
236 mdt_lock_reg_init(lh, einfo->ei_mode);
237 for (i = 0; i < locks->ha_count; i++) {
238 if (test_bit(i, (void *)locks->ha_map))
239 lh->mlh_rreg_lh = locks->ha_handles[i];
241 lh->mlh_reg_lh = locks->ha_handles[i];
242 mdt_object_unlock(mti, NULL, lh, decref);
243 locks->ha_handles[i].cookie = 0ull;
246 return mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
250 static inline int mdt_object_striped(struct mdt_thread_info *mti,
251 struct mdt_object *obj)
253 struct lu_device *bottom_dev;
254 struct lu_object *bottom_obj;
257 if (!S_ISDIR(obj->mot_header.loh_attr))
260 /* getxattr from bottom obj to avoid reading in shard FIDs */
261 bottom_dev = dt2lu_dev(mti->mti_mdt->mdt_bottom);
262 bottom_obj = lu_object_find_slice(mti->mti_env, bottom_dev,
263 mdt_object_fid(obj), NULL);
264 if (IS_ERR(bottom_obj))
265 return PTR_ERR(bottom_obj);
267 rc = dt_xattr_get(mti->mti_env, lu2dt(bottom_obj), &LU_BUF_NULL,
269 lu_object_put(mti->mti_env, bottom_obj);
271 return (rc > 0) ? 1 : (rc == -ENODATA) ? 0 : rc;
275 * Lock slave stripes if necessary, the lock handles of slave stripes
276 * will be stored in einfo->ei_cbdata.
278 static int mdt_stripes_lock(struct mdt_thread_info *mti, struct mdt_object *obj,
279 enum ldlm_mode mode, __u64 ibits,
280 struct ldlm_enqueue_info *einfo)
282 union ldlm_policy_data *policy = &mti->mti_policy;
284 LASSERT(S_ISDIR(obj->mot_header.loh_attr));
285 einfo->ei_type = LDLM_IBITS;
286 einfo->ei_mode = mode;
287 einfo->ei_cb_bl = mdt_remote_blocking_ast;
288 einfo->ei_cb_local_bl = mdt_blocking_ast;
289 einfo->ei_cb_cp = ldlm_completion_ast;
290 einfo->ei_enq_slave = 1;
291 einfo->ei_namespace = mti->mti_mdt->mdt_namespace;
292 einfo->ei_inodebits = ibits;
293 einfo->ei_req_slot = 1;
294 memset(policy, 0, sizeof(*policy));
295 policy->l_inodebits.bits = ibits;
297 return mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
301 /** lock object, and stripes if it's a striped directory
303 * object should be local, this is called in operations which modify both object
306 * \param info struct mdt_thread_info
307 * \param parent parent object, if it's NULL, find parent by mdo_lookup()
308 * \param child child object
309 * \param lh lock handle
310 * \param einfo struct ldlm_enqueue_info
311 * \param ibits MDS inode lock bits
312 * \param mode lock mode
313 * \param cos_incompat DNE COS incompatible
315 * \retval 0 on success, -ev on error.
317 int mdt_object_stripes_lock(struct mdt_thread_info *info,
318 struct mdt_object *parent,
319 struct mdt_object *child,
320 struct mdt_lock_handle *lh,
321 struct ldlm_enqueue_info *einfo, __u64 ibits,
322 enum ldlm_mode mode, bool cos_incompat)
327 /* according to the protocol, child should be local, is request sent to
330 if (mdt_object_remote(child)) {
331 CERROR("%s: lock target "DFID", but it is on other MDT: rc = %d\n",
332 mdt_obd_name(info->mti_mdt), PFID(mdt_object_fid(child)),
337 memset(einfo, 0, sizeof(*einfo));
338 if (ibits & MDS_INODELOCK_LOOKUP) {
340 rc = mdt_object_check_lock(info, parent, child, lh, ibits,
343 rc = mdt_object_lock(info, child, lh, ibits, mode,
349 if (S_ISDIR(child->mot_header.loh_attr)) {
350 rc = mdt_stripes_lock(info, child, mode, ibits, einfo);
352 mdt_object_unlock(info, child, lh, rc);
353 if (OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME) &&
362 void mdt_object_stripes_unlock(struct mdt_thread_info *info,
363 struct mdt_object *obj,
364 struct mdt_lock_handle *lh,
365 struct ldlm_enqueue_info *einfo, int decref)
367 /* this is checked in mdt_object_stripes_lock() */
368 LASSERT(!mdt_object_remote(obj));
369 if (einfo->ei_cbdata)
370 mdt_stripes_unlock(info, obj, einfo, decref);
371 mdt_object_unlock(info, obj, lh, decref);
374 static int mdt_restripe(struct mdt_thread_info *info,
375 struct mdt_object *parent,
376 const struct lu_name *lname,
377 const struct lu_fid *tfid,
378 struct md_op_spec *spec,
381 struct mdt_device *mdt = info->mti_mdt;
382 struct lu_fid *fid = &info->mti_tmp_fid2;
383 struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
384 struct lmv_user_md *lum = spec->u.sp_ea.eadata;
385 struct lu_ucred *uc = mdt_ucred(info);
386 struct lmv_mds_md_v1 *lmv;
387 struct mdt_object *child;
388 struct mdt_lock_handle *lhp;
389 struct mdt_lock_handle *lhc;
390 struct mdt_body *repbody;
395 /* we want rbac roles to have precedence over any other
396 * permission or capability checks
398 if (!mdt->mdt_enable_dir_restripe && !uc->uc_rbac_dne_ops)
402 lum->lum_hash_type |= cpu_to_le32(LMV_HASH_FLAG_FIXED);
404 rc = mdt_version_get_check_save(info, parent, 0);
408 lhp = &info->mti_lh[MDT_LH_PARENT];
409 rc = mdt_parent_lock(info, parent, lhp, lname, LCK_PW, true);
413 rc = mdt_stripe_get(info, parent, ma, XATTR_NAME_LMV);
415 GOTO(unlock_parent, rc);
417 if (ma->ma_valid & MA_LMV) {
418 /* don't allow restripe if parent dir layout is changing */
419 lmv = &ma->ma_lmv->lmv_md_v1;
420 if (!lmv_is_sane2(lmv))
421 GOTO(unlock_parent, rc = -EBADF);
423 if (lmv_is_layout_changing(lmv))
424 GOTO(unlock_parent, rc = -EBUSY);
428 rc = mdt_lookup_version_check(info, parent, lname, fid, 1);
430 GOTO(unlock_parent, rc);
432 child = mdt_object_find(info->mti_env, mdt, fid);
434 GOTO(unlock_parent, rc = PTR_ERR(child));
436 if (!mdt_object_exists(child))
437 GOTO(out_child, rc = -ENOENT);
439 if (mdt_object_remote(child)) {
440 struct mdt_body *repbody;
442 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
444 GOTO(out_child, rc = -EPROTO);
446 repbody->mbo_fid1 = *fid;
447 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
448 GOTO(out_child, rc = -EREMOTE);
451 if (!S_ISDIR(lu_object_attr(&child->mot_obj)))
452 GOTO(out_child, rc = -ENOTDIR);
454 rc = mdt_stripe_get(info, child, ma, XATTR_NAME_LMV);
458 /* race with migrate? */
459 if ((ma->ma_valid & MA_LMV) &&
460 lmv_is_migrating(&ma->ma_lmv->lmv_md_v1))
461 GOTO(out_child, rc = -EBUSY);
464 lhc = &info->mti_lh[MDT_LH_CHILD];
465 rc = mdt_object_stripes_lock(info, parent, child, lhc, einfo,
466 MDS_INODELOCK_FULL, LCK_PW, true);
468 GOTO(unlock_child, rc);
470 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
471 rc = mdt_version_get_check_save(info, child, 1);
473 GOTO(unlock_child, rc);
475 spin_lock(&mdt->mdt_restriper.mdr_lock);
476 if (child->mot_restriping) {
478 spin_unlock(&mdt->mdt_restriper.mdr_lock);
479 GOTO(unlock_child, rc = -EBUSY);
481 child->mot_restriping = 1;
482 spin_unlock(&mdt->mdt_restriper.mdr_lock);
485 rc = mdt_restripe_internal(info, parent, child, lname, fid, spec, ma);
487 GOTO(restriping_clear, rc);
489 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
491 GOTO(restriping_clear, rc = -EPROTO);
493 mdt_pack_attr2body(info, repbody, &ma->ma_attr, fid);
497 child->mot_restriping = 0;
499 mdt_object_stripes_unlock(info, child, lhc, einfo, rc);
501 mdt_object_put(info->mti_env, child);
503 mdt_object_unlock(info, parent, lhp, rc);
509 * VBR: we save three versions in reply:
510 * 0 - parent. Check that parent version is the same during replay.
511 * 1 - name. Version of 'name' if file exists with the same name or
512 * ENOENT_VERSION, it is needed because file may appear due to missed replays.
513 * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
516 static int mdt_create(struct mdt_thread_info *info)
518 struct mdt_device *mdt = info->mti_mdt;
519 struct mdt_object *parent;
520 struct mdt_object *child;
521 struct mdt_lock_handle *lh;
522 struct mdt_body *repbody;
523 struct md_attr *ma = &info->mti_attr;
524 struct mdt_reint_record *rr = &info->mti_rr;
525 struct md_op_spec *spec = &info->mti_spec;
526 struct lu_ucred *uc = mdt_ucred(info);
527 bool restripe = false;
531 DEBUG_REQ(D_INODE, mdt_info_req(info),
532 "Create ("DNAME"->"DFID") in "DFID,
533 PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1));
535 if (!fid_is_md_operative(rr->rr_fid1))
538 if (S_ISDIR(ma->ma_attr.la_mode) &&
539 spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
540 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
541 struct obd_export *exp = mdt_info_req(info)->rq_export;
543 /* Only new clients can create remote dir( >= 2.4) and
544 * striped dir(>= 2.6), old client will return -ENOTSUPP
546 if (!mdt_is_dne_client(exp))
549 if (le32_to_cpu(lum->lum_stripe_count) > 1) {
550 if (!mdt_is_striped_client(exp))
553 if (!mdt->mdt_enable_striped_dir)
555 } else if (!mdt->mdt_enable_remote_dir) {
559 if ((!(exp_connect_flags2(exp) & OBD_CONNECT2_CRUSH)) &&
560 (le32_to_cpu(lum->lum_hash_type) & LMV_HASH_TYPE_MASK) >=
564 /* we want rbac roles to have precedence over any other
565 * permission or capability checks
567 if (!uc->uc_rbac_dne_ops ||
568 (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
569 uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
570 mdt->mdt_enable_remote_dir_gid != -1))
573 /* restripe if later found dir exists, MDS_OPEN_CREAT means
574 * this is create only, don't try restripe.
576 if (mdt->mdt_enable_dir_restripe &&
577 le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT &&
578 !(spec->sp_cr_flags & MDS_OPEN_CREAT))
582 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
584 parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
586 RETURN(PTR_ERR(parent));
588 if (!mdt_object_exists(parent))
589 GOTO(put_parent, rc = -ENOENT);
591 rc = mdt_check_enc(info, parent);
593 GOTO(put_parent, rc);
595 if (!uc->uc_rbac_fscrypt_admin &&
596 parent->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD)
597 GOTO(put_parent, rc = -EPERM);
600 * LU-10235: check if name exists locklessly first to avoid massive
601 * lock recalls on existing directories.
603 rc = mdt_lookup_version_check(info, parent, &rr->rr_name,
604 &info->mti_tmp_fid1, 1);
607 GOTO(put_parent, rc = -EEXIST);
609 rc = mdt_restripe(info, parent, &rr->rr_name, rr->rr_fid2, spec,
613 /* -ENOENT is expected here */
615 GOTO(put_parent, rc);
617 /* save version of file name for replay, it must be ENOENT here */
618 mdt_enoent_version_save(info, 1);
620 OBD_RACE(OBD_FAIL_MDS_CREATE_RACE);
622 lh = &info->mti_lh[MDT_LH_PARENT];
623 rc = mdt_parent_lock(info, parent, lh, &rr->rr_name, LCK_PW, false);
625 GOTO(put_parent, rc);
627 if (!mdt_object_remote(parent)) {
628 rc = mdt_version_get_check_save(info, parent, 0);
630 GOTO(unlock_parent, rc);
633 child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
634 if (unlikely(IS_ERR(child)))
635 GOTO(unlock_parent, rc = PTR_ERR(child));
637 ma->ma_need = MA_INODE;
640 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
641 OBD_FAIL_MDS_REINT_CREATE_WRITE);
643 /* Version of child will be updated on disk. */
644 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
645 rc = mdt_version_get_check_save(info, child, 2);
649 if (parent->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD ||
650 (rr->rr_name.ln_namelen == strlen(dot_fscrypt_name) &&
651 strncmp(rr->rr_name.ln_name, dot_fscrypt_name,
652 rr->rr_name.ln_namelen) == 0))
653 child->mot_obj.lo_header->loh_attr |= LOHA_FSCRYPT_MD;
656 * Do not perform lookup sanity check. We know that name does
659 info->mti_spec.sp_cr_lookup = 0;
660 if (mdt_object_remote(parent))
661 info->mti_spec.sp_cr_lookup = 1;
662 info->mti_spec.sp_feat = &dt_directory_features;
664 rc = mdo_create(info->mti_env, mdt_object_child(parent), &rr->rr_name,
665 mdt_object_child(child), &info->mti_spec, ma);
667 rc = mdt_attr_get_complex(info, child, ma);
673 * On DNE, we need to eliminate dependey between 'mkdir a' and
674 * 'mkdir a/b' if b is a striped directory, to achieve this, two
675 * things are done below:
676 * 1. save child and slaves lock.
677 * 2. if the child is a striped directory, relock parent so to
678 * compare against with COS locks to ensure parent was
681 if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
682 struct mdt_lock_handle *lhc;
683 struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
686 rc = mdt_object_striped(info, child);
692 if (!mdt_object_remote(parent)) {
693 mdt_object_unlock(info, parent, lh, 1);
694 rc = mdt_parent_lock(info, parent, lh,
695 &rr->rr_name, LCK_PW,
702 lhc = &info->mti_lh[MDT_LH_CHILD];
703 rc = mdt_object_stripes_lock(info, parent, child, lhc, einfo,
704 MDS_INODELOCK_UPDATE, LCK_PW,
709 mdt_object_stripes_unlock(info, child, lhc, einfo, rc);
712 /* Return fid & attr to client. */
713 if (ma->ma_valid & MA_INODE)
714 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
715 mdt_object_fid(child));
718 mdt_object_put(info->mti_env, child);
720 mdt_object_unlock(info, parent, lh, rc);
722 mdt_object_put(info->mti_env, parent);
726 static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
729 struct mdt_lock_handle *lh;
730 int do_vbr = ma->ma_attr.la_valid &
731 (LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS);
732 __u64 lockpart = MDS_INODELOCK_UPDATE;
733 struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
738 rc = mdt_object_striped(info, mo);
743 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
744 lockpart |= MDS_INODELOCK_PERM;
745 /* Clear xattr cache on clients, so the virtual project ID xattr
746 * can get the new project ID
748 if (ma->ma_attr.la_valid & LA_PROJID)
749 lockpart |= MDS_INODELOCK_XATTR;
751 lh = &info->mti_lh[MDT_LH_PARENT];
752 rc = mdt_object_stripes_lock(info, NULL, mo, lh, einfo, lockpart,
753 LCK_PW, cos_incompat);
757 /* all attrs are packed into mti_attr in unpack_setattr */
758 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
759 OBD_FAIL_MDS_REINT_SETATTR_WRITE);
761 /* VBR: update version if attr changed are important for recovery */
763 /* update on-disk version of changed object */
764 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mo));
765 rc = mdt_version_get_check_save(info, mo, 0);
767 GOTO(out_unlock, rc);
770 /* Ensure constant striping during chown(). See LU-2789. */
771 if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
772 mutex_lock(&mo->mot_lov_mutex);
774 /* all attrs are packed into mti_attr in unpack_setattr */
775 rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
777 if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
778 mutex_unlock(&mo->mot_lov_mutex);
781 GOTO(out_unlock, rc);
782 mdt_dom_obj_lvb_update(info->mti_env, mo, NULL, false);
785 mdt_object_stripes_unlock(info, mo, lh, einfo, rc);
790 * Check HSM flags and add HS_DIRTY flag if relevant.
792 * A file could be set dirty only if it has a copy in the backend (HS_EXISTS)
793 * and is not RELEASED.
795 int mdt_add_dirty_flag(struct mdt_thread_info *info, struct mdt_object *mo,
798 struct lu_ucred *uc = mdt_ucred(info);
799 kernel_cap_t cap_saved;
803 /* If the file was modified, add the dirty flag */
804 ma->ma_need = MA_HSM;
805 rc = mdt_attr_get_complex(info, mo, ma);
807 CERROR("file attribute read error for "DFID": %d.\n",
808 PFID(mdt_object_fid(mo)), rc);
812 /* If an up2date copy exists in the backend, add dirty flag */
813 if ((ma->ma_valid & MA_HSM) && (ma->ma_hsm.mh_flags & HS_EXISTS)
814 && !(ma->ma_hsm.mh_flags & (HS_DIRTY|HS_RELEASED))) {
815 ma->ma_hsm.mh_flags |= HS_DIRTY;
817 /* Bump cap so that closes from non-owner writers can
818 * set the HSM state to dirty.
820 cap_saved = uc->uc_cap;
821 cap_raise(uc->uc_cap, CAP_FOWNER);
822 rc = mdt_hsm_attr_set(info, mo, &ma->ma_hsm);
823 uc->uc_cap = cap_saved;
825 CERROR("file attribute change error for "DFID": %d\n",
826 PFID(mdt_object_fid(mo)), rc);
832 static int mdt_reint_setattr(struct mdt_thread_info *info,
833 struct mdt_lock_handle *lhc)
835 struct mdt_device *mdt = info->mti_mdt;
836 struct md_attr *ma = &info->mti_attr;
837 struct mdt_reint_record *rr = &info->mti_rr;
838 struct ptlrpc_request *req = mdt_info_req(info);
839 struct mdt_object *mo;
840 struct mdt_body *repbody;
841 ktime_t kstart = ktime_get();
845 DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
846 (unsigned int)ma->ma_attr.la_valid);
848 if (info->mti_dlm_req)
849 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
851 OBD_RACE(OBD_FAIL_PTLRPC_RESEND_RACE);
853 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
854 mo = mdt_object_find(info->mti_env, mdt, rr->rr_fid1);
856 GOTO(out, rc = PTR_ERR(mo));
858 if (!mdt_object_exists(mo))
859 GOTO(out_put, rc = -ENOENT);
861 if (mdt_object_remote(mo))
862 GOTO(out_put, rc = -EREMOTE);
864 ma->ma_enable_chprojid_gid = mdt->mdt_enable_chprojid_gid;
865 /* revoke lease lock if size is going to be changed */
866 if (unlikely(ma->ma_attr.la_valid & LA_SIZE &&
867 !(ma->ma_attr_flags & MDS_TRUNC_KEEP_LEASE) &&
868 atomic_read(&mo->mot_lease_count) > 0)) {
869 down_read(&mo->mot_open_sem);
871 if (atomic_read(&mo->mot_lease_count) > 0) { /* lease exists */
872 lhc = &info->mti_lh[MDT_LH_LOCAL];
873 rc = mdt_object_lock(info, mo, lhc, MDS_INODELOCK_OPEN,
876 up_read(&mo->mot_open_sem);
880 /* revoke lease lock */
881 mdt_object_unlock(info, mo, lhc, 1);
883 up_read(&mo->mot_open_sem);
886 if (ma->ma_attr.la_valid & LA_SIZE || rr->rr_flags & MRF_OPEN_TRUNC) {
887 /* Check write access for the O_TRUNC case */
888 if (mdt_write_read(mo) < 0)
889 GOTO(out_put, rc = -ETXTBSY);
891 /* LU-10286: compatibility check for FLR.
892 * Please check the comment in mdt_finish_open() for details
894 if (!exp_connect_flr(info->mti_exp) ||
895 !exp_connect_overstriping(info->mti_exp)) {
896 rc = mdt_big_xattr_get(info, mo, XATTR_NAME_LOV);
897 if (rc < 0 && rc != -ENODATA)
900 if (!exp_connect_flr(info->mti_exp)) {
902 mdt_lmm_is_flr(info->mti_big_lmm))
903 GOTO(out_put, rc = -EOPNOTSUPP);
906 if (!exp_connect_overstriping(info->mti_exp)) {
908 mdt_lmm_is_overstriping(info->mti_big_lmm))
909 GOTO(out_put, rc = -EOPNOTSUPP);
913 /* For truncate, the file size sent from client
914 * is believable, but the blocks are incorrect,
915 * which makes the block size in LSOM attribute
916 * inconsisent with the real block size.
918 rc = mdt_lsom_update(info, mo, true);
923 if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
924 if (ma->ma_valid & MA_LOV)
925 GOTO(out_put, rc = -EPROTO);
927 /* MDT supports FMD for regular files due to Data-on-MDT */
928 if (S_ISREG(lu_object_attr(&mo->mot_obj)) &&
929 ma->ma_attr.la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) {
930 tgt_fmd_update(info->mti_exp, mdt_object_fid(mo),
933 if (ma->ma_attr.la_valid & LA_MTIME) {
934 rc = mdt_attr_get_pfid(info, mo, &ma->ma_pfid);
936 ma->ma_valid |= MA_PFID;
940 rc = mdt_attr_set(info, mo, ma);
943 } else if ((ma->ma_valid & (MA_LOV | MA_LMV)) &&
944 (ma->ma_valid & MA_INODE)) {
945 struct lu_buf *buf = &info->mti_buf;
946 struct lu_ucred *uc = mdt_ucred(info);
947 struct mdt_lock_handle *lh;
950 /* reject if either remote or striped dir is disabled */
951 if (ma->ma_valid & MA_LMV) {
952 if (!mdt->mdt_enable_remote_dir ||
953 !mdt->mdt_enable_striped_dir)
954 GOTO(out_put, rc = -EPERM);
956 /* we want rbac roles to have precedence over any other
957 * permission or capability checks
959 if (!uc->uc_rbac_dne_ops ||
960 (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
961 uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
962 mdt->mdt_enable_remote_dir_gid != -1))
963 GOTO(out_put, rc = -EPERM);
966 if (!S_ISDIR(lu_object_attr(&mo->mot_obj)))
967 GOTO(out_put, rc = -ENOTDIR);
969 if (ma->ma_attr.la_valid != 0)
970 GOTO(out_put, rc = -EPROTO);
972 lh = &info->mti_lh[MDT_LH_PARENT];
973 if (ma->ma_valid & MA_LOV) {
974 buf->lb_buf = ma->ma_lmm;
975 buf->lb_len = ma->ma_lmm_size;
976 name = XATTR_NAME_LOV;
977 rc = mdt_object_lock(info, mo, lh, MDS_INODELOCK_XATTR,
980 buf->lb_buf = &ma->ma_lmv->lmv_user_md;
981 buf->lb_len = ma->ma_lmv_size;
982 name = XATTR_NAME_DEFAULT_LMV;
984 if (unlikely(fid_is_root(mdt_object_fid(mo)))) {
985 rc = mdt_object_lock(info, mo, lh,
986 MDS_INODELOCK_XATTR |
987 MDS_INODELOCK_LOOKUP,
990 struct lu_fid *pfid = &info->mti_tmp_fid1;
991 struct lu_name *pname = &info->mti_name;
992 const char dotdot[] = "..";
993 struct mdt_object *pobj;
996 pname->ln_name = dotdot;
997 pname->ln_namelen = sizeof(dotdot);
998 rc = mdo_lookup(info->mti_env,
999 mdt_object_child(mo), pname,
1004 pobj = mdt_object_find(info->mti_env,
1005 info->mti_mdt, pfid);
1007 GOTO(out_put, rc = PTR_ERR(pobj));
1009 rc = mdt_object_check_lock(info, pobj, mo, lh,
1010 MDS_INODELOCK_XATTR |
1011 MDS_INODELOCK_LOOKUP,
1013 mdt_object_put(info->mti_env, pobj);
1020 rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), buf,
1023 mdt_object_unlock(info, mo, lh, rc);
1027 GOTO(out_put, rc = -EPROTO);
1030 /* If file data is modified, add the dirty flag */
1031 if (ma->ma_attr_flags & MDS_DATA_MODIFIED)
1032 rc = mdt_add_dirty_flag(info, mo, ma);
1034 ma->ma_need = MA_INODE;
1036 rc = mdt_attr_get_complex(info, mo, ma);
1040 mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
1044 mdt_object_put(info->mti_env, mo);
1047 mdt_counter_incr(req, LPROC_MDT_SETATTR,
1048 ktime_us_delta(ktime_get(), kstart));
1050 mdt_client_compatibility(info);
1054 static int mdt_reint_create(struct mdt_thread_info *info,
1055 struct mdt_lock_handle *lhc)
1057 struct ptlrpc_request *req = mdt_info_req(info);
1058 ktime_t kstart = ktime_get();
1062 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
1063 RETURN(err_serious(-ESTALE));
1065 if (info->mti_dlm_req)
1066 ldlm_request_cancel(mdt_info_req(info),
1067 info->mti_dlm_req, 0, LATF_SKIP);
1069 if (!lu_name_is_valid(&info->mti_rr.rr_name))
1072 switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
1082 CERROR("%s: Unsupported mode %o\n",
1083 mdt_obd_name(info->mti_mdt),
1084 info->mti_attr.ma_attr.la_mode);
1085 RETURN(err_serious(-EOPNOTSUPP));
1088 rc = mdt_create(info);
1090 if ((info->mti_attr.ma_attr.la_mode & S_IFMT) == S_IFDIR)
1091 mdt_counter_incr(req, LPROC_MDT_MKDIR,
1092 ktime_us_delta(ktime_get(), kstart));
1094 /* Special file should stay on the same node as parent*/
1095 mdt_counter_incr(req, LPROC_MDT_MKNOD,
1096 ktime_us_delta(ktime_get(), kstart));
1103 * VBR: save parent version in reply and child version getting by its name.
1104 * Version of child is getting and checking during its lookup. If
1106 static int mdt_reint_unlink(struct mdt_thread_info *info,
1107 struct mdt_lock_handle *lhc)
1109 struct mdt_reint_record *rr = &info->mti_rr;
1110 struct ptlrpc_request *req = mdt_info_req(info);
1111 struct md_attr *ma = &info->mti_attr;
1112 struct lu_fid *child_fid = &info->mti_tmp_fid1;
1113 struct mdt_object *mp;
1114 struct mdt_object *mc;
1115 struct mdt_lock_handle *parent_lh;
1116 struct mdt_lock_handle *child_lh;
1117 struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
1118 struct lu_ucred *uc = mdt_ucred(info);
1119 bool cos_incompat = false;
1121 ktime_t kstart = ktime_get();
1125 DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1),
1126 PNAME(&rr->rr_name));
1128 if (info->mti_dlm_req)
1129 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1131 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
1132 RETURN(err_serious(-ENOENT));
1134 if (!fid_is_md_operative(rr->rr_fid1))
1137 mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1139 RETURN(PTR_ERR(mp));
1141 if (mdt_object_remote(mp)) {
1142 cos_incompat = true;
1144 rc = mdt_version_get_check_save(info, mp, 0);
1146 GOTO(put_parent, rc);
1149 if (!uc->uc_rbac_fscrypt_admin &&
1150 mp->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD)
1151 GOTO(put_parent, rc = -EPERM);
1153 OBD_RACE(OBD_FAIL_MDS_REINT_OPEN);
1154 OBD_RACE(OBD_FAIL_MDS_REINT_OPEN2);
1156 parent_lh = &info->mti_lh[MDT_LH_PARENT];
1157 rc = mdt_parent_lock(info, mp, parent_lh, &rr->rr_name, LCK_PW,
1160 GOTO(put_parent, rc);
1162 if (info->mti_spec.sp_rm_entry) {
1163 if (!mdt_is_dne_client(req->rq_export))
1164 /* Return -ENOTSUPP for old client */
1165 GOTO(unlock_parent, rc = -ENOTSUPP);
1167 if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN))
1168 GOTO(unlock_parent, rc = -EPERM);
1170 ma->ma_need = MA_INODE;
1172 rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1173 NULL, &rr->rr_name, ma, no_name);
1174 GOTO(unlock_parent, rc);
1177 if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID) {
1178 *child_fid = *rr->rr_fid2;
1180 /* lookup child object along with version checking */
1181 fid_zero(child_fid);
1182 rc = mdt_lookup_version_check(info, mp, &rr->rr_name, child_fid,
1185 /* Name might not be able to find during resend of
1186 * remote unlink, considering following case.
1187 * dir_A is a remote directory, the name entry of
1188 * dir_A is on MDT0, the directory is on MDT1,
1190 * 1. client sends unlink req to MDT1.
1191 * 2. MDT1 sends name delete update to MDT0.
1192 * 3. name entry is being deleted in MDT0 synchronously.
1193 * 4. MDT1 is restarted.
1194 * 5. client resends unlink req to MDT1. So it can not
1195 * find the name entry on MDT0 anymore.
1196 * In this case, MDT1 only needs to destory the local
1199 if (mdt_object_remote(mp) && rc == -ENOENT &&
1200 !fid_is_zero(rr->rr_fid2) &&
1201 lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
1203 *child_fid = *rr->rr_fid2;
1205 GOTO(unlock_parent, rc);
1210 if (!fid_is_md_operative(child_fid))
1211 GOTO(unlock_parent, rc = -EPERM);
1213 /* We will lock the child regardless it is local or remote. No harm. */
1214 mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
1216 GOTO(unlock_parent, rc = PTR_ERR(mc));
1218 if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID) {
1219 /* In this case, child fid is embedded in the request, and we do
1220 * not have a proper name as rr_name contains an encoded
1221 * hash. So find name that matches provided hash.
1223 if (!find_name_matching_hash(info, &rr->rr_name,
1225 GOTO(put_child, rc = -ENOENT);
1228 if (!cos_incompat) {
1229 rc = mdt_object_striped(info, mc);
1231 GOTO(put_child, rc);
1235 mdt_object_put(info->mti_env, mc);
1236 mdt_object_unlock(info, mp, parent_lh, -EAGAIN);
1241 child_lh = &info->mti_lh[MDT_LH_CHILD];
1242 if (mdt_object_remote(mc)) {
1243 struct mdt_body *repbody;
1245 if (!fid_is_zero(rr->rr_fid2)) {
1246 CDEBUG(D_INFO, "%s: name "DNAME" cannot find "DFID"\n",
1247 mdt_obd_name(info->mti_mdt),
1248 PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1249 GOTO(put_child, rc = -ENOENT);
1251 CDEBUG(D_INFO, "%s: name "DNAME": "DFID" is on another MDT\n",
1252 mdt_obd_name(info->mti_mdt),
1253 PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1255 if (!mdt_is_dne_client(req->rq_export))
1256 /* Return -ENOTSUPP for old client */
1257 GOTO(put_child, rc = -ENOTSUPP);
1259 /* Revoke the LOOKUP lock of the remote object granted by
1260 * this MDT. Since the unlink will happen on another MDT,
1261 * it will release the LOOKUP lock right away. Then What
1262 * would happen if another client try to grab the LOOKUP
1263 * lock at the same time with unlink XXX
1265 rc = mdt_object_lookup_lock(info, NULL, mc, child_lh, LCK_EX,
1268 GOTO(put_child, rc);
1270 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1271 LASSERT(repbody != NULL);
1272 repbody->mbo_fid1 = *mdt_object_fid(mc);
1273 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
1274 GOTO(unlock_child, rc = -EREMOTE);
1276 /* We used to acquire MDS_INODELOCK_FULL here but we can't do
1277 * this now because a running HSM restore on the child (unlink
1278 * victim) will hold the layout lock. See LU-4002.
1280 rc = mdt_object_stripes_lock(info, mp, mc, child_lh, einfo,
1281 MDS_INODELOCK_LOOKUP |
1282 MDS_INODELOCK_UPDATE,
1283 LCK_EX, cos_incompat);
1285 GOTO(put_child, rc);
1288 * Now we can only make sure we need MA_INODE, in mdd layer, will check
1289 * whether need MA_LOV and MA_COOKIE.
1291 ma->ma_need = MA_INODE;
1294 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1295 OBD_FAIL_MDS_REINT_UNLINK_WRITE);
1296 /* save version when object is locked */
1297 mdt_version_get_save(info, mc, 1);
1299 mutex_lock(&mc->mot_lov_mutex);
1301 rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1302 mdt_object_child(mc), &rr->rr_name, ma, no_name);
1304 mutex_unlock(&mc->mot_lov_mutex);
1306 GOTO(unlock_child, rc);
1308 if (!lu_object_is_dying(&mc->mot_header)) {
1309 rc = mdt_attr_get_complex(info, mc, ma);
1312 } else if (mdt_dom_check_for_discard(info, mc)) {
1313 mdt_dom_discard_data(info, mc);
1315 mdt_handle_last_unlink(info, mc, ma);
1318 if (ma->ma_valid & MA_INODE) {
1319 switch (ma->ma_attr.la_mode & S_IFMT) {
1321 mdt_counter_incr(req, LPROC_MDT_RMDIR,
1322 ktime_us_delta(ktime_get(), kstart));
1330 mdt_counter_incr(req, LPROC_MDT_UNLINK,
1331 ktime_us_delta(ktime_get(), kstart));
1334 LASSERTF(0, "bad file type %o unlinking\n",
1335 ma->ma_attr.la_mode);
1342 mdt_object_stripes_unlock(info, mc, child_lh, einfo, rc);
1344 if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID &&
1345 info->mti_big_buf.lb_buf)
1346 lu_buf_free(&info->mti_big_buf);
1347 mdt_object_put(info->mti_env, mc);
1349 mdt_object_unlock(info, mp, parent_lh, rc);
1351 mdt_object_put(info->mti_env, mp);
1352 CFS_RACE_WAKEUP(OBD_FAIL_OBD_ZERO_NLINK_RACE);
1357 * VBR: save versions in reply: 0 - parent; 1 - child by fid; 2 - target by
1360 static int mdt_reint_link(struct mdt_thread_info *info,
1361 struct mdt_lock_handle *lhc)
1363 struct mdt_reint_record *rr = &info->mti_rr;
1364 struct ptlrpc_request *req = mdt_info_req(info);
1365 struct md_attr *ma = &info->mti_attr;
1366 struct mdt_object *ms;
1367 struct mdt_object *mp;
1368 struct mdt_lock_handle *lhs;
1369 struct mdt_lock_handle *lhp;
1370 ktime_t kstart = ktime_get();
1375 DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/"DNAME,
1376 PFID(rr->rr_fid1), PFID(rr->rr_fid2), PNAME(&rr->rr_name));
1378 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
1379 RETURN(err_serious(-ENOENT));
1381 if (OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_RESEND_RACE) ||
1382 OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND)) {
1383 req->rq_no_reply = 1;
1384 RETURN(err_serious(-ENOENT));
1387 if (info->mti_dlm_req)
1388 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1390 /* Invalid case so return error immediately instead of
1393 if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
1396 if (!fid_is_md_operative(rr->rr_fid1) ||
1397 !fid_is_md_operative(rr->rr_fid2))
1400 /* step 1: find target parent dir */
1401 mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
1403 RETURN(PTR_ERR(mp));
1405 rc = mdt_version_get_check_save(info, mp, 0);
1407 GOTO(put_parent, rc);
1409 rc = mdt_check_enc(info, mp);
1411 GOTO(put_parent, rc);
1413 /* step 2: find source */
1414 ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1416 GOTO(put_parent, rc = PTR_ERR(ms));
1418 if (!mdt_object_exists(ms)) {
1419 CDEBUG(D_INFO, "%s: "DFID" does not exist.\n",
1420 mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1));
1421 GOTO(put_source, rc = -ENOENT);
1424 cos_incompat = (mdt_object_remote(mp) || mdt_object_remote(ms));
1426 OBD_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
1428 lhp = &info->mti_lh[MDT_LH_PARENT];
1429 rc = mdt_parent_lock(info, mp, lhp, &rr->rr_name, LCK_PW, cos_incompat);
1431 GOTO(put_source, rc);
1433 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
1435 lhs = &info->mti_lh[MDT_LH_CHILD];
1436 rc = mdt_object_lock(info, ms, lhs,
1437 MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR, LCK_EX,
1440 GOTO(unlock_parent, rc);
1442 /* step 3: link it */
1443 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1444 OBD_FAIL_MDS_REINT_LINK_WRITE);
1446 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(ms));
1447 rc = mdt_version_get_check_save(info, ms, 1);
1449 GOTO(unlock_source, rc);
1451 /** check target version by name during replay */
1452 rc = mdt_lookup_version_check(info, mp, &rr->rr_name,
1453 &info->mti_tmp_fid1, 2);
1454 if (rc != 0 && rc != -ENOENT)
1455 GOTO(unlock_source, rc);
1456 /* save version of file name for replay, it must be ENOENT here */
1457 if (!req_is_replay(mdt_info_req(info))) {
1458 if (rc != -ENOENT) {
1459 CDEBUG(D_INFO, "link target "DNAME" existed!\n",
1460 PNAME(&rr->rr_name));
1461 GOTO(unlock_source, rc = -EEXIST);
1463 info->mti_ver[2] = ENOENT_VERSION;
1464 mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2);
1467 rc = mdo_link(info->mti_env, mdt_object_child(mp),
1468 mdt_object_child(ms), &rr->rr_name, ma);
1471 mdt_counter_incr(req, LPROC_MDT_LINK,
1472 ktime_us_delta(ktime_get(), kstart));
1476 mdt_object_unlock(info, ms, lhs, rc);
1478 mdt_object_unlock(info, mp, lhp, rc);
1480 mdt_object_put(info->mti_env, ms);
1482 mdt_object_put(info->mti_env, mp);
1487 * Get BFL lock for rename or migrate process.
1489 static int mdt_rename_lock(struct mdt_thread_info *info,
1490 struct mdt_lock_handle *lh)
1492 struct lu_fid *fid = &info->mti_tmp_fid1;
1493 struct mdt_object *obj;
1494 __u64 ibits = MDS_INODELOCK_UPDATE;
1499 obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1501 RETURN(PTR_ERR(obj));
1503 mdt_lock_reg_init(lh, LCK_EX);
1504 rc = mdt_object_lock_internal(info, obj, &LUSTRE_BFL_FID, lh,
1505 &ibits, 0, false, false);
1506 mdt_object_put(info->mti_env, obj);
1510 static void mdt_rename_unlock(struct mdt_thread_info *info,
1511 struct mdt_lock_handle *lh)
1514 /* Cancel the single rename lock right away */
1515 mdt_object_unlock(info, NULL, lh, 1);
1519 static struct mdt_object *mdt_parent_find_check(struct mdt_thread_info *info,
1520 const struct lu_fid *fid,
1523 struct mdt_object *dir;
1527 dir = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1531 /* check early, the real version will be saved after locking */
1532 rc = mdt_version_get_check(info, dir, idx);
1536 if (!mdt_object_exists(dir))
1537 GOTO(out_put, rc = -ENOENT);
1539 if (!S_ISDIR(lu_object_attr(&dir->mot_obj)))
1540 GOTO(out_put, rc = -ENOTDIR);
1544 mdt_object_put(info->mti_env, dir);
1549 * in case obj is remote obj on its parent, revoke LOOKUP lock,
1550 * herein we don't really check it, just do revoke.
1552 int mdt_revoke_remote_lookup_lock(struct mdt_thread_info *info,
1553 struct mdt_object *pobj,
1554 struct mdt_object *obj)
1556 struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
1559 rc = mdt_object_lookup_lock(info, pobj, obj, lh, LCK_EX, true);
1564 * TODO, currently we don't save this lock because there is no place to
1565 * hold this lock handle, but to avoid race we need to save this lock.
1567 mdt_object_unlock(info, NULL, lh, 1);
1573 * operation may takes locks of linkea, or directory stripes, group them in
1576 struct mdt_sub_lock {
1577 struct mdt_object *msl_obj;
1578 struct mdt_lock_handle msl_lh;
1579 struct list_head msl_linkage;
1582 static void mdt_unlock_list(struct mdt_thread_info *info,
1583 struct list_head *list, int decref)
1585 struct mdt_sub_lock *msl;
1586 struct mdt_sub_lock *tmp;
1588 list_for_each_entry_safe(msl, tmp, list, msl_linkage) {
1589 mdt_object_unlock_put(info, msl->msl_obj, &msl->msl_lh, decref);
1590 list_del(&msl->msl_linkage);
1595 static inline void mdt_migrate_object_unlock(struct mdt_thread_info *info,
1596 struct mdt_object *obj,
1597 struct mdt_lock_handle *lh,
1598 struct ldlm_enqueue_info *einfo,
1599 struct list_head *slave_locks,
1602 if (mdt_object_remote(obj)) {
1603 mdt_unlock_list(info, slave_locks, decref);
1604 mdt_object_unlock(info, obj, lh, decref);
1606 mdt_object_stripes_unlock(info, obj, lh, einfo, decref);
1611 * lock parents of links, and also check whether total locks don't exceed
1614 * \retval 0 on success, and locks can be saved in ptlrpc_reply_stat
1615 * \retval 1 on success, but total lock count may exceed RS_MAX_LOCKS
1616 * \retval -ev negative errno upon error
1618 static int mdt_link_parents_lock(struct mdt_thread_info *info,
1619 struct mdt_object *pobj,
1620 const struct md_attr *ma,
1621 struct mdt_object *obj,
1622 struct mdt_lock_handle *lhp,
1623 struct ldlm_enqueue_info *peinfo,
1624 struct list_head *parent_slave_locks,
1625 struct list_head *link_locks)
1627 struct mdt_device *mdt = info->mti_mdt;
1628 struct lu_buf *buf = &info->mti_big_buf;
1629 struct lu_name *lname = &info->mti_name;
1630 struct linkea_data ldata = { NULL };
1631 bool blocked = false;
1632 int local_lnkp_cnt = 0;
1636 if (S_ISDIR(lu_object_attr(&obj->mot_obj)))
1639 buf = lu_buf_check_and_alloc(buf, MAX_LINKEA_SIZE);
1640 if (buf->lb_buf == NULL)
1644 rc = mdt_links_read(info, obj, &ldata);
1646 if (rc == -ENOENT || rc == -ENODATA)
1651 for (linkea_first_entry(&ldata); ldata.ld_lee && !rc;
1652 linkea_next_entry(&ldata)) {
1653 struct mdt_object *lnkp;
1654 struct mdt_sub_lock *msl;
1658 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, lname,
1661 /* check if it's also linked to parent */
1662 if (lu_fid_eq(mdt_object_fid(pobj), &fid)) {
1663 CDEBUG(D_INFO, "skip parent "DFID", reovke "DNAME"\n",
1664 PFID(&fid), PNAME(lname));
1665 /* in case link is remote object, revoke LOOKUP lock */
1666 rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
1672 /* check if it's linked to a stripe of parent */
1673 if (ma->ma_valid & MA_LMV) {
1674 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1675 struct lu_fid *stripe_fid = &info->mti_tmp_fid1;
1678 for (; j < le32_to_cpu(lmv->lmv_stripe_count); j++) {
1679 fid_le_to_cpu(stripe_fid,
1680 &lmv->lmv_stripe_fids[j]);
1681 if (lu_fid_eq(stripe_fid, &fid)) {
1682 CDEBUG(D_INFO, "skip stripe "DFID
1683 ", reovke "DNAME"\n",
1684 PFID(&fid), PNAME(lname));
1685 lnkp = mdt_object_find(info->mti_env,
1688 GOTO(out, rc = PTR_ERR(lnkp));
1694 rc = mdt_revoke_remote_lookup_lock(info, lnkp,
1696 mdt_object_put(info->mti_env, lnkp);
1701 /* Check if it's already locked */
1702 list_for_each_entry(msl, link_locks, msl_linkage) {
1703 if (lu_fid_eq(mdt_object_fid(msl->msl_obj), &fid)) {
1705 DFID" was locked, revoke "DNAME"\n",
1706 PFID(&fid), PNAME(lname));
1707 lnkp = msl->msl_obj;
1713 rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
1717 CDEBUG(D_INFO, "lock "DFID":"DNAME"\n",
1718 PFID(&fid), PNAME(lname));
1720 lnkp = mdt_object_find(info->mti_env, mdt, &fid);
1722 CWARN("%s: cannot find obj "DFID": %ld\n",
1723 mdt_obd_name(mdt), PFID(&fid), PTR_ERR(lnkp));
1727 if (!mdt_object_exists(lnkp)) {
1728 CDEBUG(D_INFO, DFID" doesn't exist, skip "DNAME"\n",
1729 PFID(&fid), PNAME(lname));
1730 mdt_object_put(info->mti_env, lnkp);
1734 if (!mdt_object_remote(lnkp))
1739 GOTO(out, rc = -ENOMEM);
1742 * we can't follow parent-child lock order like other MD
1743 * operations, use lock_try here to avoid deadlock, if the lock
1744 * cannot be taken, drop all locks taken, revoke the blocked
1745 * one, and continue processing the remaining entries, and in
1746 * the end of the loop restart from beginning.
1749 rc = mdt_object_lock_try(info, lnkp, &msl->msl_lh, &ibits,
1750 MDS_INODELOCK_UPDATE, LCK_PW, true);
1751 if (!(ibits & MDS_INODELOCK_UPDATE)) {
1753 CDEBUG(D_INFO, "busy lock on "DFID" "DNAME"\n",
1754 PFID(&fid), PNAME(lname));
1756 mdt_unlock_list(info, link_locks, 1);
1757 /* also unlock parent locks to avoid deadlock */
1759 mdt_migrate_object_unlock(info, pobj, lhp,
1766 rc = mdt_object_lock(info, lnkp, &msl->msl_lh,
1767 MDS_INODELOCK_UPDATE, LCK_PW,
1770 mdt_object_put(info->mti_env, lnkp);
1775 if (mdt_object_remote(lnkp)) {
1776 struct ldlm_lock *lock;
1779 * for remote object, set lock cb_atomic,
1780 * so lock can be released in blocking_ast()
1781 * immediately, then the next lock_try will
1782 * have better chance of success.
1784 lock = ldlm_handle2lock(
1785 &msl->msl_lh.mlh_rreg_lh);
1786 LASSERT(lock != NULL);
1787 lock_res_and_lock(lock);
1788 ldlm_set_atomic_cb(lock);
1789 unlock_res_and_lock(lock);
1790 LDLM_LOCK_PUT(lock);
1793 mdt_object_unlock_put(info, lnkp, &msl->msl_lh, 1);
1798 INIT_LIST_HEAD(&msl->msl_linkage);
1799 msl->msl_obj = lnkp;
1800 list_add_tail(&msl->msl_linkage, link_locks);
1802 rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
1806 GOTO(out, rc = -EBUSY);
1811 mdt_unlock_list(info, link_locks, rc);
1812 } else if (local_lnkp_cnt > RS_MAX_LOCKS - 5) {
1813 CDEBUG(D_INFO, "Too many links (%d), sync operations\n",
1816 * parent may have 3 local objects: master object and 2 stripes
1817 * (if it's being migrated too); source may have 1 local objects
1818 * as regular file; target has 1 local object.
1819 * Note, source may have 2 local locks if it is directory but it
1820 * can't have hardlinks, so it is not considered here.
1827 static int mdt_lock_remote_slaves(struct mdt_thread_info *info,
1828 struct mdt_object *obj,
1829 const struct md_attr *ma,
1830 struct list_head *slave_locks)
1832 struct mdt_device *mdt = info->mti_mdt;
1833 const struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1834 struct lu_fid *fid = &info->mti_tmp_fid1;
1835 struct mdt_object *slave;
1836 struct mdt_sub_lock *msl;
1841 LASSERT(mdt_object_remote(obj));
1842 LASSERT(ma->ma_valid & MA_LMV);
1845 if (!lmv_is_sane(lmv))
1848 for (i = 0; i < le32_to_cpu(lmv->lmv_stripe_count); i++) {
1849 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[i]);
1851 if (!fid_is_sane(fid))
1854 slave = mdt_object_find(info->mti_env, mdt, fid);
1856 GOTO(out, rc = PTR_ERR(slave));
1860 mdt_object_put(info->mti_env, slave);
1861 GOTO(out, rc = -ENOMEM);
1864 rc = mdt_object_lock(info, slave, &msl->msl_lh,
1865 MDS_INODELOCK_UPDATE, LCK_EX, true);
1868 mdt_object_put(info->mti_env, slave);
1872 INIT_LIST_HEAD(&msl->msl_linkage);
1873 msl->msl_obj = slave;
1874 list_add_tail(&msl->msl_linkage, slave_locks);
1880 mdt_unlock_list(info, slave_locks, rc);
1884 /* lock parent and its stripes */
1885 static int mdt_migrate_parent_lock(struct mdt_thread_info *info,
1886 struct mdt_object *obj,
1887 const struct md_attr *ma,
1888 struct mdt_lock_handle *lh,
1889 struct ldlm_enqueue_info *einfo,
1890 struct list_head *slave_locks)
1894 if (mdt_object_remote(obj)) {
1895 rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_UPDATE,
1901 * if obj is remote and striped, lock its stripes explicitly
1902 * because it's not striped in LOD layer on this MDT.
1904 if (ma->ma_valid & MA_LMV) {
1905 rc = mdt_lock_remote_slaves(info, obj, ma, slave_locks);
1907 mdt_object_unlock(info, obj, lh, rc);
1910 rc = mdt_object_stripes_lock(info, NULL, obj, lh, einfo,
1911 MDS_INODELOCK_UPDATE, LCK_PW,
1919 * in migration, object may be remote, and we need take full lock of it and its
1920 * stripes if it's directory, besides, object may be a remote object on its
1921 * parent, revoke its LOOKUP lock on where its parent is located.
1923 static int mdt_migrate_object_lock(struct mdt_thread_info *info,
1924 struct mdt_object *pobj,
1925 struct mdt_object *obj,
1926 struct mdt_lock_handle *lh,
1927 struct ldlm_enqueue_info *einfo,
1928 struct list_head *slave_locks)
1932 if (mdt_object_remote(obj)) {
1933 rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
1937 rc = mdt_object_lock(info, obj, lh, MDS_INODELOCK_FULL, LCK_EX,
1943 * if obj is remote and striped, lock its stripes explicitly
1944 * because it's not striped in LOD layer on this MDT.
1946 if (S_ISDIR(lu_object_attr(&obj->mot_obj))) {
1947 struct md_attr *ma = &info->mti_attr;
1949 rc = mdt_stripe_get(info, obj, ma, XATTR_NAME_LMV);
1951 mdt_object_unlock(info, obj, lh, rc);
1955 if (ma->ma_valid & MA_LMV) {
1956 rc = mdt_lock_remote_slaves(info, obj, ma,
1959 mdt_object_unlock(info, obj, lh, rc);
1963 rc = mdt_object_stripes_lock(info, pobj, obj, lh, einfo,
1964 MDS_INODELOCK_FULL, LCK_EX, true);
1971 * lookup source by name, if parent is striped directory, we need to find the
1972 * corresponding stripe where source is located, and then lookup there.
1974 * besides, if parent is migrating too, and file is already in target stripe,
1975 * this should be a redo of 'lfs migrate' on client side.
1977 static int mdt_migrate_lookup(struct mdt_thread_info *info,
1978 struct mdt_object *pobj,
1979 const struct md_attr *ma,
1980 const struct lu_name *lname,
1981 struct mdt_object **spobj,
1982 struct mdt_object **sobj)
1984 const struct lu_env *env = info->mti_env;
1985 struct lu_fid *fid = &info->mti_tmp_fid1;
1986 struct mdt_object *stripe;
1989 if (ma->ma_valid & MA_LMV) {
1990 /* if parent is striped, lookup on corresponding stripe */
1991 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1993 if (!lmv_is_sane(lmv))
1996 rc = lmv_name_to_stripe_index_old(lmv, lname->ln_name,
2001 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
2003 stripe = mdt_object_find(env, info->mti_mdt, fid);
2005 return PTR_ERR(stripe);
2008 rc = mdo_lookup(env, mdt_object_child(stripe), lname, fid,
2010 if (rc == -ENOENT && lmv_is_layout_changing(lmv)) {
2012 * if parent layout is changeing, and lookup child
2013 * failed on source stripe, lookup again on target
2014 * stripe, if it exists, it means previous migration
2015 * was interrupted, and current file was migrated
2018 mdt_object_put(env, stripe);
2020 rc = lmv_name_to_stripe_index(lmv, lname->ln_name,
2025 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
2027 stripe = mdt_object_find(env, info->mti_mdt, fid);
2029 return PTR_ERR(stripe);
2032 rc = mdo_lookup(env, mdt_object_child(stripe), lname,
2033 fid, &info->mti_spec);
2034 mdt_object_put(env, stripe);
2035 return rc ?: -EALREADY;
2037 mdt_object_put(env, stripe);
2042 rc = mdo_lookup(env, mdt_object_child(pobj), lname, fid,
2048 mdt_object_get(env, stripe);
2053 *sobj = mdt_object_find(env, info->mti_mdt, fid);
2054 if (IS_ERR(*sobj)) {
2055 mdt_object_put(env, stripe);
2056 rc = PTR_ERR(*sobj);
2064 /* end lease and close file for regular file */
2065 static int mdd_migrate_close(struct mdt_thread_info *info,
2066 struct mdt_object *obj)
2068 struct close_data *data;
2069 struct mdt_body *repbody;
2070 struct ldlm_lock *lease;
2075 if (!req_capsule_field_present(info->mti_pill, &RMF_MDT_EPOCH,
2077 !req_capsule_field_present(info->mti_pill, &RMF_CLOSE_DATA,
2081 data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
2086 lease = ldlm_handle2lock(&data->cd_handle);
2090 /* check if the lease was already canceled */
2091 lock_res_and_lock(lease);
2092 rc = ldlm_is_cancel(lease);
2093 unlock_res_and_lock(lease);
2097 LDLM_DEBUG(lease, DFID" lease broken",
2098 PFID(mdt_object_fid(obj)));
2102 * cancel server side lease, client side counterpart should have been
2103 * cancelled, it's okay to cancel it now as we've held mot_open_sem.
2105 ldlm_lock_cancel(lease);
2106 ldlm_reprocess_all(lease->l_resource,
2107 lease->l_policy_data.l_inodebits.bits);
2108 LDLM_LOCK_PUT(lease);
2111 rc2 = mdt_close_internal(info, mdt_info_req(info), NULL);
2112 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2113 repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
2119 * migrate file in below steps:
2120 * 1. lock parent and its stripes
2121 * 2. lookup source by name
2122 * 3. lock parents of source links if source is not directory
2123 * 4. reject if source is in HSM
2124 * 5. take source open_sem and close file if source is regular file
2125 * 6. lock source and its stripes if it's directory
2126 * 7. lock target so subsequent change to it can trigger COS
2128 * 9. unlock above locks
2129 * 10. sync device if source has links
2131 int mdt_reint_migrate(struct mdt_thread_info *info,
2132 struct mdt_lock_handle *unused)
2134 const struct lu_env *env = info->mti_env;
2135 struct mdt_device *mdt = info->mti_mdt;
2136 struct ptlrpc_request *req = mdt_info_req(info);
2137 struct mdt_reint_record *rr = &info->mti_rr;
2138 struct lu_ucred *uc = mdt_ucred(info);
2139 struct md_attr *ma = &info->mti_attr;
2140 struct ldlm_enqueue_info *peinfo = &info->mti_einfo[0];
2141 struct ldlm_enqueue_info *seinfo = &info->mti_einfo[1];
2142 struct mdt_object *pobj;
2143 struct mdt_object *spobj = NULL;
2144 struct mdt_object *sobj = NULL;
2145 struct mdt_object *tobj;
2146 struct mdt_lock_handle *rename_lh = &info->mti_lh[MDT_LH_RMT];
2147 struct mdt_lock_handle *lhp;
2148 struct mdt_lock_handle *lhs;
2149 struct mdt_lock_handle *lht;
2150 LIST_HEAD(parent_slave_locks);
2151 LIST_HEAD(child_slave_locks);
2152 LIST_HEAD(link_locks);
2153 int lock_retries = 5;
2154 bool open_sem_locked = false;
2155 bool do_sync = false;
2159 CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1),
2160 PNAME(&rr->rr_name), PFID(rr->rr_fid2));
2162 if (info->mti_dlm_req)
2163 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2165 if (!fid_is_md_operative(rr->rr_fid1) ||
2166 !fid_is_md_operative(rr->rr_fid2))
2169 /* don't allow migrate . or .. */
2170 if (lu_name_is_dot_or_dotdot(&rr->rr_name))
2173 if (!mdt->mdt_enable_remote_dir || !mdt->mdt_enable_dir_migration)
2176 /* we want rbac roles to have precedence over any other
2177 * permission or capability checks
2179 if (uc && (!uc->uc_rbac_dne_ops ||
2180 (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
2181 uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
2182 mdt->mdt_enable_remote_dir_gid != -1)))
2186 * Note: do not enqueue rename lock for replay request, because
2187 * if other MDT holds rename lock, but being blocked to wait for
2188 * this MDT to finish its recovery, and the failover MDT can not
2189 * get rename lock, which will cause deadlock.
2191 * req is NULL if this is called by directory auto-split.
2193 if (req && !req_is_replay(req)) {
2194 rc = mdt_rename_lock(info, rename_lh);
2196 CERROR("%s: can't lock FS for rename: rc = %d\n",
2197 mdt_obd_name(info->mti_mdt), rc);
2202 /* pobj is master object of parent */
2203 pobj = mdt_object_find(env, mdt, rr->rr_fid1);
2205 GOTO(unlock_rename, rc = PTR_ERR(pobj));
2208 rc = mdt_version_get_check(info, pobj, 0);
2210 GOTO(put_parent, rc);
2213 if (!mdt_object_exists(pobj))
2214 GOTO(put_parent, rc = -ENOENT);
2216 if (!S_ISDIR(lu_object_attr(&pobj->mot_obj)))
2217 GOTO(put_parent, rc = -ENOTDIR);
2219 rc = mdt_check_enc(info, pobj);
2221 GOTO(put_parent, rc);
2223 rc = mdt_stripe_get(info, pobj, ma, XATTR_NAME_LMV);
2225 GOTO(put_parent, rc);
2228 /* lock parent object */
2229 lhp = &info->mti_lh[MDT_LH_PARENT];
2230 rc = mdt_migrate_parent_lock(info, pobj, ma, lhp, peinfo,
2231 &parent_slave_locks);
2233 GOTO(put_parent, rc);
2236 * spobj is the corresponding stripe against name if pobj is striped
2237 * directory, which is the real parent, and no need to lock, because
2238 * we've taken full lock of pobj.
2240 rc = mdt_migrate_lookup(info, pobj, ma, &rr->rr_name, &spobj, &sobj);
2242 GOTO(unlock_parent, rc);
2244 /* lock parents of source links, and revoke LOOKUP lock of links */
2245 rc = mdt_link_parents_lock(info, pobj, ma, sobj, lhp, peinfo,
2246 &parent_slave_locks, &link_locks);
2247 if (rc == -EBUSY && lock_retries-- > 0) {
2248 mdt_object_put(env, sobj);
2249 mdt_object_put(env, spobj);
2254 GOTO(put_source, rc);
2257 * RS_MAX_LOCKS is the limit of number of locks that can be saved along
2258 * with one request, if total lock count exceeds this limit, we will
2259 * drop all locks after migration, and synchronous device in the end.
2263 /* TODO: DoM migration is not supported, migrate dirent only */
2264 if (S_ISREG(lu_object_attr(&sobj->mot_obj))) {
2265 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LOV);
2267 GOTO(unlock_links, rc);
2269 if (ma->ma_valid & MA_LOV && mdt_lmm_dom_stripesize(ma->ma_lmm))
2270 info->mti_spec.sp_migrate_nsonly = 1;
2271 } else if (S_ISDIR(lu_object_attr(&sobj->mot_obj))) {
2272 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2274 GOTO(unlock_links, rc);
2276 /* race with restripe/auto-split? */
2277 if ((ma->ma_valid & MA_LMV) &&
2278 lmv_is_restriping(&ma->ma_lmv->lmv_md_v1))
2279 GOTO(unlock_links, rc = -EBUSY);
2282 /* if migration HSM is allowed */
2283 if (!mdt->mdt_opts.mo_migrate_hsm_allowed) {
2284 ma->ma_need = MA_HSM;
2286 rc = mdt_attr_get_complex(info, sobj, ma);
2288 GOTO(unlock_links, rc);
2290 if ((ma->ma_valid & MA_HSM) && ma->ma_hsm.mh_flags != 0)
2291 GOTO(unlock_links, rc = -EOPNOTSUPP);
2294 /* end lease and close file for regular file */
2295 if (info->mti_spec.sp_migrate_close) {
2296 /* try to hold open_sem so that nobody else can open the file */
2297 if (!down_write_trylock(&sobj->mot_open_sem)) {
2299 mdd_migrate_close(info, sobj);
2300 GOTO(unlock_links, rc = -EBUSY);
2302 open_sem_locked = true;
2303 rc = mdd_migrate_close(info, sobj);
2305 GOTO(unlock_open_sem, rc);
2310 lhs = &info->mti_lh[MDT_LH_OLD];
2311 rc = mdt_migrate_object_lock(info, spobj, sobj, lhs, seinfo,
2312 &child_slave_locks);
2314 GOTO(unlock_open_sem, rc);
2317 tobj = mdt_object_find(env, mdt, rr->rr_fid2);
2319 GOTO(unlock_source, rc = PTR_ERR(tobj));
2321 lht = &info->mti_lh[MDT_LH_NEW];
2322 rc = mdt_object_lock(info, tobj, lht, MDS_INODELOCK_FULL, LCK_EX, true);
2324 GOTO(put_target, rc);
2326 /* Don't do lookup sanity check. We know name doesn't exist. */
2327 info->mti_spec.sp_cr_lookup = 0;
2328 info->mti_spec.sp_feat = &dt_directory_features;
2330 rc = mdo_migrate(env, mdt_object_child(pobj),
2331 mdt_object_child(sobj), &rr->rr_name,
2332 mdt_object_child(tobj),
2333 &info->mti_spec, ma);
2335 lprocfs_counter_incr(mdt->mdt_lu_dev.ld_obd->obd_md_stats,
2336 LPROC_MDT_MIGRATE + LPROC_MD_LAST_OPC);
2339 mdt_object_unlock(info, tobj, lht, rc);
2341 mdt_object_put(env, tobj);
2343 mdt_migrate_object_unlock(info, sobj, lhs, seinfo,
2344 &child_slave_locks, rc);
2346 if (open_sem_locked)
2347 up_write(&sobj->mot_open_sem);
2349 /* if we've got too many locks to save into RPC,
2350 * then just commit before the locks are released
2353 mdt_device_sync(env, mdt);
2354 mdt_unlock_list(info, &link_locks, do_sync ? 1 : rc);
2356 mdt_object_put(env, sobj);
2357 mdt_object_put(env, spobj);
2359 mdt_migrate_object_unlock(info, pobj, lhp, peinfo,
2360 &parent_slave_locks, rc);
2362 mdt_object_put(env, pobj);
2364 mdt_rename_unlock(info, rename_lh);
2370 * determine lock order of sobj and tobj
2372 * there are two situations we need to lock tobj before sobj:
2373 * 1. sobj is child of tobj
2374 * 2. sobj and tobj are stripes of a directory, and stripe index of sobj is
2375 * larger than that of tobj
2377 * \retval 1 lock tobj before sobj
2378 * \retval 0 lock sobj before tobj
2379 * \retval -ev negative errno upon error
2381 static int mdt_rename_determine_lock_order(struct mdt_thread_info *info,
2382 struct mdt_object *sobj,
2383 struct mdt_object *tobj)
2385 struct md_attr *ma = &info->mti_attr;
2386 struct lu_fid *spfid = &info->mti_tmp_fid1;
2387 struct lu_fid *tpfid = &info->mti_tmp_fid2;
2388 struct lmv_mds_md_v1 *lmv;
2393 /* sobj and tobj are the same */
2397 if (fid_is_root(mdt_object_fid(sobj)))
2400 if (fid_is_root(mdt_object_fid(tobj)))
2403 /* check whether sobj is child of tobj */
2404 rc = mdo_is_subdir(info->mti_env, mdt_object_child(sobj),
2405 mdt_object_fid(tobj));
2412 /* check whether sobj and tobj are children of the same parent */
2413 rc = mdt_attr_get_pfid(info, sobj, spfid);
2417 rc = mdt_attr_get_pfid(info, tobj, tpfid);
2421 if (!lu_fid_eq(spfid, tpfid))
2424 /* check whether sobj and tobj are sibling stripes */
2425 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2429 if (!(ma->ma_valid & MA_LMV))
2432 lmv = &ma->ma_lmv->lmv_md_v1;
2433 if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2435 sindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2438 rc = mdt_stripe_get(info, tobj, ma, XATTR_NAME_LMV);
2442 if (!(ma->ma_valid & MA_LMV))
2445 lmv = &ma->ma_lmv->lmv_md_v1;
2446 if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2448 tindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2450 /* check stripe index of sobj and tobj */
2451 if (sindex == tindex)
2454 return sindex < tindex ? 0 : 1;
2458 * lock rename source object.
2460 * Both source and source parent may be remote, and source may be a remote
2461 * object on source parent, to avoid overriding lock handle, store remote
2462 * LOOKUP lock separately in @lhr.
2464 * \retval 0 on success
2465 * \retval -ev negative errno upon error
2467 static int mdt_rename_source_lock(struct mdt_thread_info *info,
2468 struct mdt_object *parent,
2469 struct mdt_object *child,
2470 struct mdt_lock_handle *lhc,
2471 struct mdt_lock_handle *lhr,
2477 rc = mdt_is_remote_object(info, parent, child);
2482 rc = mdt_object_lookup_lock(info, parent, child, lhr, LCK_EX,
2487 ibits &= ~MDS_INODELOCK_LOOKUP;
2490 rc = mdt_object_lock(info, child, lhc, ibits, LCK_EX, cos_incompat);
2491 if (rc && !(ibits & MDS_INODELOCK_LOOKUP))
2492 mdt_object_unlock(info, NULL, lhr, rc);
2497 /* Helper function for mdt_reint_rename so we don't need to opencode
2498 * two different order lockings
2500 static int mdt_lock_two_dirs(struct mdt_thread_info *info,
2501 struct mdt_object *mfirstdir,
2502 struct mdt_lock_handle *lh_firstdirp,
2503 const struct lu_name *firstname,
2504 struct mdt_object *mseconddir,
2505 struct mdt_lock_handle *lh_seconddirp,
2506 const struct lu_name *secondname,
2511 rc = mdt_parent_lock(info, mfirstdir, lh_firstdirp, firstname, LCK_PW,
2516 mdt_version_get_save(info, mfirstdir, 0);
2517 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
2519 if (mfirstdir != mseconddir) {
2520 rc = mdt_parent_lock(info, mseconddir, lh_seconddirp,
2521 secondname, LCK_PW, cos_incompat);
2522 } else if (!mdt_object_remote(mseconddir)) {
2523 if (lh_firstdirp->mlh_pdo_hash !=
2524 lh_seconddirp->mlh_pdo_hash) {
2525 rc = mdt_object_pdo_lock(info, mseconddir,
2526 lh_seconddirp, secondname,
2527 LCK_PW, false, cos_incompat);
2528 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
2531 mdt_version_get_save(info, mseconddir, 1);
2534 mdt_object_unlock(info, mfirstdir, lh_firstdirp, rc);
2540 * VBR: rename versions in reply: 0 - srcdir parent; 1 - tgtdir parent;
2541 * 2 - srcdir child; 3 - tgtdir child.
2542 * Update on disk version of srcdir child.
2544 static int mdt_reint_rename(struct mdt_thread_info *info,
2545 struct mdt_lock_handle *unused)
2547 struct mdt_device *mdt = info->mti_mdt;
2548 struct mdt_reint_record *rr = &info->mti_rr;
2549 struct md_attr *ma = &info->mti_attr;
2550 struct ptlrpc_request *req = mdt_info_req(info);
2551 struct mdt_object *msrcdir = NULL;
2552 struct mdt_object *mtgtdir = NULL;
2553 struct mdt_object *mold;
2554 struct mdt_object *mnew = NULL;
2555 struct mdt_lock_handle *rename_lh = &info->mti_lh[MDT_LH_RMT];
2556 struct mdt_lock_handle *lh_srcdirp;
2557 struct mdt_lock_handle *lh_tgtdirp;
2558 struct mdt_lock_handle *lh_oldp = NULL;
2559 struct mdt_lock_handle *lh_rmt = NULL;
2560 struct mdt_lock_handle *lh_newp = NULL;
2561 struct lu_fid *old_fid = &info->mti_tmp_fid1;
2562 struct lu_fid *new_fid = &info->mti_tmp_fid2;
2563 struct lu_ucred *uc = mdt_ucred(info);
2564 bool reverse = false, discard = false;
2566 ktime_t kstart = ktime_get();
2567 enum mdt_stat_idx msi = 0;
2571 DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME,
2572 PFID(rr->rr_fid1), PNAME(&rr->rr_name),
2573 PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name));
2575 if (info->mti_dlm_req)
2576 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2578 if (!fid_is_md_operative(rr->rr_fid1) ||
2579 !fid_is_md_operative(rr->rr_fid2))
2582 /* find both parents. */
2583 msrcdir = mdt_parent_find_check(info, rr->rr_fid1, 0);
2584 if (IS_ERR(msrcdir))
2585 RETURN(PTR_ERR(msrcdir));
2587 rc = mdt_check_enc(info, msrcdir);
2589 GOTO(out_put_srcdir, rc);
2591 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
2593 if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
2595 mdt_object_get(info->mti_env, mtgtdir);
2597 mtgtdir = mdt_parent_find_check(info, rr->rr_fid2, 1);
2598 if (IS_ERR(mtgtdir))
2599 GOTO(out_put_srcdir, rc = PTR_ERR(mtgtdir));
2602 rc = mdt_check_enc(info, mtgtdir);
2604 GOTO(out_put_tgtdir, rc);
2606 if (!uc->uc_rbac_fscrypt_admin &&
2607 mtgtdir->mot_obj.lo_header->loh_attr & LOHA_FSCRYPT_MD)
2608 GOTO(out_put_tgtdir, rc = -EPERM);
2611 * Note: do not enqueue rename lock for replay request, because
2612 * if other MDT holds rename lock, but being blocked to wait for
2613 * this MDT to finish its recovery, and the failover MDT can not
2614 * get rename lock, which will cause deadlock.
2616 if (!req_is_replay(req)) {
2617 bool remote = mdt_object_remote(msrcdir);
2620 * Normally rename RPC is handled on the MDT with the target
2621 * directory (if target exists, it's on the MDT with the
2622 * target), if the source directory is remote, it's a hint that
2623 * source is remote too (this may not be true, but it won't
2624 * cause any issue), return -EXDEV early to avoid taking
2627 if (!mdt->mdt_enable_remote_rename && remote)
2628 GOTO(out_put_tgtdir, rc = -EXDEV);
2630 /* This might be further relaxed in the future for regular file
2631 * renames in different source and target parents. Start with
2632 * only same-directory renames for simplicity and because this
2633 * is by far the most the common use case.
2635 * Striped directories should be considered "remote".
2637 if (msrcdir != mtgtdir || remote ||
2638 (S_ISDIR(ma->ma_attr.la_mode) &&
2639 !mdt->mdt_enable_parallel_rename_dir) ||
2640 (!S_ISDIR(ma->ma_attr.la_mode) &&
2641 !mdt->mdt_enable_parallel_rename_file)) {
2642 rc = mdt_rename_lock(info, rename_lh);
2644 CERROR("%s: cannot lock for rename: rc = %d\n",
2645 mdt_obd_name(mdt), rc);
2646 GOTO(out_put_tgtdir, rc);
2649 if (S_ISDIR(ma->ma_attr.la_mode))
2650 msi = LPROC_MDT_RENAME_PAR_DIR;
2652 msi = LPROC_MDT_RENAME_PAR_FILE;
2655 "%s: samedir parallel rename "DFID"/"DNAME"\n",
2656 mdt_obd_name(mdt), PFID(rr->rr_fid1),
2657 PNAME(&rr->rr_name));
2661 rc = mdt_rename_determine_lock_order(info, msrcdir, mtgtdir);
2663 GOTO(out_unlock_rename, rc);
2666 /* source needs to be looked up after locking source parent, otherwise
2667 * this rename may race with unlink source, and cause rename hang, see
2668 * sanityn.sh 55b, so check parents first, if later we found source is
2669 * remote, relock parents.
2671 cos_incompat = (mdt_object_remote(msrcdir) ||
2672 mdt_object_remote(mtgtdir));
2674 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2676 /* lock parents in the proper order. */
2677 lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
2678 lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
2680 OBD_RACE(OBD_FAIL_MDS_REINT_OPEN);
2681 OBD_RACE(OBD_FAIL_MDS_REINT_OPEN2);
2683 mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name);
2684 mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name);
2686 /* In case of same dir local rename we must sort by the hash,
2687 * otherwise a lock deadlock is possible when renaming
2688 * a to b and b to a at the same time LU-15285
2690 if (!mdt_object_remote(mtgtdir) && mtgtdir == msrcdir)
2691 reverse = lh_srcdirp->mlh_pdo_hash > lh_tgtdirp->mlh_pdo_hash;
2692 if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_MDS_PDO_LOCK)))
2696 rc = mdt_lock_two_dirs(info, mtgtdir, lh_tgtdirp,
2697 &rr->rr_tgt_name, msrcdir, lh_srcdirp,
2698 &rr->rr_name, cos_incompat);
2700 rc = mdt_lock_two_dirs(info, msrcdir, lh_srcdirp, &rr->rr_name,
2701 mtgtdir, lh_tgtdirp, &rr->rr_tgt_name,
2705 GOTO(out_unlock_rename, rc);
2707 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2708 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME2, 5);
2710 /* find mold object. */
2712 rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2);
2714 GOTO(out_unlock_parents, rc);
2716 if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
2717 GOTO(out_unlock_parents, rc = -EINVAL);
2719 if (!fid_is_md_operative(old_fid))
2720 GOTO(out_unlock_parents, rc = -EPERM);
2722 mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
2724 GOTO(out_unlock_parents, rc = PTR_ERR(mold));
2726 if (!mdt_object_exists(mold)) {
2727 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2729 "object does not exist");
2730 GOTO(out_put_old, rc = -ENOENT);
2733 if (mdt_object_remote(mold) && !mdt->mdt_enable_remote_rename)
2734 GOTO(out_put_old, rc = -EXDEV);
2736 /* Check if @mtgtdir is subdir of @mold, before locking child
2737 * to avoid reverse locking.
2739 if (mtgtdir != msrcdir) {
2740 rc = mdo_is_subdir(info->mti_env, mdt_object_child(mtgtdir),
2745 GOTO(out_put_old, rc);
2749 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold));
2750 /* save version after locking */
2751 mdt_version_get_save(info, mold, 2);
2753 if (!cos_incompat && mdt_object_remote(mold)) {
2754 cos_incompat = true;
2755 mdt_object_put(info->mti_env, mold);
2756 mdt_object_unlock(info, mtgtdir, lh_tgtdirp, -EAGAIN);
2757 mdt_object_unlock(info, msrcdir, lh_srcdirp, -EAGAIN);
2761 /* find mnew object:
2762 * mnew target object may not exist now
2763 * lookup with version checking
2766 rc = mdt_lookup_version_check(info, mtgtdir, &rr->rr_tgt_name, new_fid,
2769 /* the new_fid should have been filled at this moment */
2770 if (lu_fid_eq(old_fid, new_fid))
2771 GOTO(out_put_old, rc);
2773 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
2774 lu_fid_eq(new_fid, rr->rr_fid2))
2775 GOTO(out_put_old, rc = -EINVAL);
2777 if (!fid_is_md_operative(new_fid))
2778 GOTO(out_put_old, rc = -EPERM);
2780 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
2782 GOTO(out_put_old, rc = PTR_ERR(mnew));
2784 if (!mdt_object_exists(mnew)) {
2785 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2787 "object does not exist");
2788 GOTO(out_put_new, rc = -ENOENT);
2791 if (mdt_object_remote(mnew)) {
2792 struct mdt_body *repbody;
2794 /* Always send rename req to the target child MDT */
2795 repbody = req_capsule_server_get(info->mti_pill,
2797 LASSERT(repbody != NULL);
2798 repbody->mbo_fid1 = *new_fid;
2799 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
2800 GOTO(out_put_new, rc = -EXDEV);
2802 /* Before locking the target dir, check we do not replace
2803 * a dir with a non-dir, otherwise it may deadlock with
2804 * link op which tries to create a link in this dir
2805 * back to this non-dir.
2807 if (S_ISDIR(lu_object_attr(&mnew->mot_obj)) &&
2808 !S_ISDIR(lu_object_attr(&mold->mot_obj)))
2809 GOTO(out_put_new, rc = -EISDIR);
2811 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2812 lh_rmt = &info->mti_lh[MDT_LH_LOOKUP];
2813 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2814 lh_rmt, MDS_INODELOCK_LOOKUP |
2815 MDS_INODELOCK_XATTR, cos_incompat);
2817 GOTO(out_put_new, rc);
2819 /* Check if @msrcdir is subdir of @mnew, before locking child
2820 * to avoid reverse locking.
2822 if (mtgtdir != msrcdir) {
2823 rc = mdo_is_subdir(info->mti_env,
2824 mdt_object_child(msrcdir), new_fid);
2828 GOTO(out_unlock_old, rc);
2832 /* We used to acquire MDS_INODELOCK_FULL here but we
2833 * can't do this now because a running HSM restore on
2834 * the rename onto victim will hold the layout
2835 * lock. See LU-4002.
2838 lh_newp = &info->mti_lh[MDT_LH_NEW];
2839 rc = mdt_object_check_lock(info, mtgtdir, mnew, lh_newp,
2840 MDS_INODELOCK_LOOKUP |
2841 MDS_INODELOCK_UPDATE, LCK_EX,
2844 GOTO(out_unlock_new, rc);
2846 /* get and save version after locking */
2847 mdt_version_get_save(info, mnew, 3);
2848 } else if (rc != -ENOENT) {
2849 GOTO(out_put_old, rc);
2851 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2852 lh_rmt = &info->mti_lh[MDT_LH_LOOKUP];
2853 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2854 lh_rmt, MDS_INODELOCK_LOOKUP |
2855 MDS_INODELOCK_XATTR, cos_incompat);
2857 GOTO(out_put_old, rc);
2859 mdt_enoent_version_save(info, 3);
2862 /* step 5: rename it */
2863 mdt_reint_init_ma(info, ma);
2865 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
2866 OBD_FAIL_MDS_REINT_RENAME_WRITE);
2869 mutex_lock(&mnew->mot_lov_mutex);
2871 rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
2872 mdt_object_child(mtgtdir), old_fid, &rr->rr_name,
2873 mnew != NULL ? mdt_object_child(mnew) : NULL,
2874 &rr->rr_tgt_name, ma);
2877 mutex_unlock(&mnew->mot_lov_mutex);
2879 /* handle last link of tgt object */
2882 mdt_handle_last_unlink(info, mnew, ma);
2883 discard = mdt_dom_check_for_discard(info, mnew);
2885 mdt_rename_counter_tally(info, info->mti_mdt, req,
2886 msrcdir, mtgtdir, msi,
2887 ktime_us_delta(ktime_get(), kstart));
2893 mdt_object_unlock(info, mnew, lh_newp, rc);
2895 mdt_object_unlock(info, NULL, lh_rmt, rc);
2896 mdt_object_unlock(info, mold, lh_oldp, rc);
2898 if (mnew && !discard)
2899 mdt_object_put(info->mti_env, mnew);
2901 mdt_object_put(info->mti_env, mold);
2903 mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
2904 mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
2906 mdt_rename_unlock(info, rename_lh);
2908 mdt_object_put(info->mti_env, mtgtdir);
2910 mdt_object_put(info->mti_env, msrcdir);
2912 /* The DoM discard can be done right in the place above where it is
2913 * assigned, meanwhile it is done here after rename unlock due to
2914 * compatibility with old clients, for them the discard blocks
2915 * the main thread until completion. Check LU-11359 for details.
2918 mdt_dom_discard_data(info, mnew);
2919 mdt_object_put(info->mti_env, mnew);
2921 OBD_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
2925 static int mdt_reint_resync(struct mdt_thread_info *info,
2926 struct mdt_lock_handle *lhc)
2928 struct mdt_reint_record *rr = &info->mti_rr;
2929 struct ptlrpc_request *req = mdt_info_req(info);
2930 struct md_attr *ma = &info->mti_attr;
2931 struct mdt_object *mo;
2932 struct ldlm_lock *lease;
2933 struct mdt_body *repbody;
2934 struct md_layout_change layout = { .mlc_mirror_id = rr->rr_mirror_id };
2939 DEBUG_REQ(D_INODE, req, DFID", FLR file resync", PFID(rr->rr_fid1));
2941 if (info->mti_dlm_req)
2942 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2944 mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
2946 GOTO(out, rc = PTR_ERR(mo));
2948 if (!mdt_object_exists(mo))
2949 GOTO(out_obj, rc = -ENOENT);
2951 if (!S_ISREG(lu_object_attr(&mo->mot_obj)))
2952 GOTO(out_obj, rc = -EINVAL);
2954 if (mdt_object_remote(mo))
2955 GOTO(out_obj, rc = -EREMOTE);
2957 lease = ldlm_handle2lock(rr->rr_lease_handle);
2959 GOTO(out_obj, rc = -ESTALE);
2961 /* It's really necessary to grab open_sem and check if the lease lock
2962 * has been lost. There would exist a concurrent writer coming in and
2963 * generating some dirty data in memory cache, the writeback would fail
2964 * after the layout version is increased by MDS_REINT_RESYNC RPC.
2966 if (!down_write_trylock(&mo->mot_open_sem))
2967 GOTO(out_put_lease, rc = -EBUSY);
2969 lock_res_and_lock(lease);
2970 lease_broken = ldlm_is_cancel(lease);
2971 unlock_res_and_lock(lease);
2973 GOTO(out_unlock, rc = -EBUSY);
2975 /* the file has yet opened by anyone else after we took the lease. */
2976 layout.mlc_opc = MD_LAYOUT_RESYNC;
2977 lhc = &info->mti_lh[MDT_LH_LOCAL];
2978 rc = mdt_layout_change(info, mo, lhc, &layout);
2980 GOTO(out_unlock, rc);
2982 mdt_object_unlock(info, mo, lhc, 0);
2984 ma->ma_need = MA_INODE;
2986 rc = mdt_attr_get_complex(info, mo, ma);
2988 GOTO(out_unlock, rc);
2990 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2991 mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
2995 up_write(&mo->mot_open_sem);
2997 LDLM_LOCK_PUT(lease);
2999 mdt_object_put(info->mti_env, mo);
3001 mdt_client_compatibility(info);
3005 struct mdt_reinter {
3006 int (*mr_handler)(struct mdt_thread_info *, struct mdt_lock_handle *);
3007 enum lprocfs_extra_opc mr_extra_opc;
3010 static const struct mdt_reinter mdt_reinters[] = {
3012 .mr_handler = &mdt_reint_setattr,
3013 .mr_extra_opc = MDS_REINT_SETATTR,
3016 .mr_handler = &mdt_reint_create,
3017 .mr_extra_opc = MDS_REINT_CREATE,
3020 .mr_handler = &mdt_reint_link,
3021 .mr_extra_opc = MDS_REINT_LINK,
3024 .mr_handler = &mdt_reint_unlink,
3025 .mr_extra_opc = MDS_REINT_UNLINK,
3028 .mr_handler = &mdt_reint_rename,
3029 .mr_extra_opc = MDS_REINT_RENAME,
3032 .mr_handler = &mdt_reint_open,
3033 .mr_extra_opc = MDS_REINT_OPEN,
3035 [REINT_SETXATTR] = {
3036 .mr_handler = &mdt_reint_setxattr,
3037 .mr_extra_opc = MDS_REINT_SETXATTR,
3040 .mr_handler = &mdt_reint_unlink,
3041 .mr_extra_opc = MDS_REINT_UNLINK,
3044 .mr_handler = &mdt_reint_migrate,
3045 .mr_extra_opc = MDS_REINT_RENAME,
3048 .mr_handler = &mdt_reint_resync,
3049 .mr_extra_opc = MDS_REINT_RESYNC,
3053 int mdt_reint_rec(struct mdt_thread_info *info,
3054 struct mdt_lock_handle *lhc)
3056 const struct mdt_reinter *mr;
3060 if (!(info->mti_rr.rr_opcode < ARRAY_SIZE(mdt_reinters)))
3063 mr = &mdt_reinters[info->mti_rr.rr_opcode];
3064 if (mr->mr_handler == NULL)
3067 rc = (*mr->mr_handler)(info, lhc);
3069 lprocfs_counter_incr(ptlrpc_req2svc(mdt_info_req(info))->srv_stats,
3070 PTLRPC_LAST_CNTR + mr->mr_extra_opc);