4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
31 * lustre/mdt/mdt_reint.c
33 * Lustre Metadata Target (mdt) reintegration routines
35 * Author: Peter Braam <braam@clusterfs.com>
36 * Author: Andreas Dilger <adilger@clusterfs.com>
37 * Author: Phil Schwan <phil@clusterfs.com>
38 * Author: Huang Hua <huanghua@clusterfs.com>
39 * Author: Yury Umanets <umka@clusterfs.com>
42 #define DEBUG_SUBSYSTEM S_MDS
44 #include <lprocfs_status.h>
45 #include "mdt_internal.h"
46 #include <lustre_lmv.h>
47 #include <lustre_crypto.h>
49 static inline void mdt_reint_init_ma(struct mdt_thread_info *info,
52 ma->ma_need = MA_INODE;
57 * Get version of object by fid.
59 * Return real version or ENOENT_VERSION if object doesn't exist
61 static void mdt_obj_version_get(struct mdt_thread_info *info,
62 struct mdt_object *o, __u64 *version)
66 if (mdt_object_exists(o) && !mdt_object_remote(o) &&
67 !fid_is_obf(mdt_object_fid(o)))
68 *version = dt_version_get(info->mti_env, mdt_obj2dt(o));
70 *version = ENOENT_VERSION;
71 CDEBUG(D_INODE, "FID "DFID" version is %#llx\n",
72 PFID(mdt_object_fid(o)), *version);
76 * Check version is correct.
78 * Should be called only during replay.
80 static int mdt_version_check(struct ptlrpc_request *req,
81 __u64 version, int idx)
83 __u64 *pre_ver = lustre_msg_get_versions(req->rq_reqmsg);
86 if (!exp_connect_vbr(req->rq_export))
89 LASSERT(req_is_replay(req));
90 /** VBR: version is checked always because costs nothing */
91 LASSERT(idx < PTLRPC_NUM_VERSIONS);
92 /** Sanity check for malformed buffers */
93 if (pre_ver == NULL) {
94 CERROR("No versions in request buffer\n");
95 spin_lock(&req->rq_export->exp_lock);
96 req->rq_export->exp_vbr_failed = 1;
97 spin_unlock(&req->rq_export->exp_lock);
99 } else if (pre_ver[idx] != version) {
100 CDEBUG(D_INODE, "Version mismatch %#llx != %#llx\n",
101 pre_ver[idx], version);
102 spin_lock(&req->rq_export->exp_lock);
103 req->rq_export->exp_vbr_failed = 1;
104 spin_unlock(&req->rq_export->exp_lock);
111 * Save pre-versions in reply.
113 static void mdt_version_save(struct ptlrpc_request *req, __u64 version,
118 if (!exp_connect_vbr(req->rq_export))
121 LASSERT(!req_is_replay(req));
122 LASSERT(req->rq_repmsg != NULL);
123 reply_ver = lustre_msg_get_versions(req->rq_repmsg);
125 reply_ver[idx] = version;
129 * Save enoent version, it is needed when it is obvious that object doesn't
130 * exist, e.g. child during create.
132 static void mdt_enoent_version_save(struct mdt_thread_info *info, int idx)
134 /* save version of file name for replay, it must be ENOENT here */
135 if (!req_is_replay(mdt_info_req(info))) {
136 info->mti_ver[idx] = ENOENT_VERSION;
137 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
142 * Get version from disk and save in reply buffer.
144 * Versions are saved in reply only during normal operations not replays.
146 void mdt_version_get_save(struct mdt_thread_info *info,
147 struct mdt_object *mto, int idx)
149 /* don't save versions during replay */
150 if (!req_is_replay(mdt_info_req(info))) {
151 mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
152 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
157 * Get version from disk and check it, no save in reply.
159 int mdt_version_get_check(struct mdt_thread_info *info,
160 struct mdt_object *mto, int idx)
162 /* only check versions during replay */
163 if (!req_is_replay(mdt_info_req(info)))
166 mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
167 return mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
171 * Get version from disk and check if recovery or just save.
173 int mdt_version_get_check_save(struct mdt_thread_info *info,
174 struct mdt_object *mto, int idx)
178 mdt_obj_version_get(info, mto, &info->mti_ver[idx]);
179 if (req_is_replay(mdt_info_req(info)))
180 rc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx],
183 mdt_version_save(mdt_info_req(info), info->mti_ver[idx], idx);
188 * Lookup with version checking.
190 * This checks version of 'name'. Many reint functions uses 'name' for child not
191 * FID, therefore we need to get object by name and check its version.
193 int mdt_lookup_version_check(struct mdt_thread_info *info,
194 struct mdt_object *p,
195 const struct lu_name *lname,
196 struct lu_fid *fid, int idx)
200 rc = mdo_lookup(info->mti_env, mdt_object_child(p), lname, fid,
202 /* Check version only during replay */
203 if (!req_is_replay(mdt_info_req(info)))
206 info->mti_ver[idx] = ENOENT_VERSION;
208 struct mdt_object *child;
210 child = mdt_object_find(info->mti_env, info->mti_mdt, fid);
211 if (likely(!IS_ERR(child))) {
212 mdt_obj_version_get(info, child, &info->mti_ver[idx]);
213 mdt_object_put(info->mti_env, child);
216 vbrc = mdt_version_check(mdt_info_req(info), info->mti_ver[idx], idx);
217 return vbrc ? vbrc : rc;
221 static int mdt_unlock_slaves(struct mdt_thread_info *mti,
222 struct mdt_object *obj,
223 struct ldlm_enqueue_info *einfo,
226 union ldlm_policy_data *policy = &mti->mti_policy;
227 struct mdt_lock_handle *lh = &mti->mti_lh[MDT_LH_LOCAL];
228 struct lustre_handle_array *slave_locks = einfo->ei_cbdata;
231 LASSERT(S_ISDIR(obj->mot_header.loh_attr));
232 LASSERT(slave_locks);
234 memset(policy, 0, sizeof(*policy));
235 policy->l_inodebits.bits = einfo->ei_inodebits;
236 mdt_lock_handle_init(lh);
237 mdt_lock_reg_init(lh, einfo->ei_mode);
238 for (i = 0; i < slave_locks->ha_count; i++) {
239 if (test_bit(i, (void *)slave_locks->ha_map))
240 lh->mlh_rreg_lh = slave_locks->ha_handles[i];
242 lh->mlh_reg_lh = slave_locks->ha_handles[i];
243 mdt_object_unlock(mti, NULL, lh, decref);
244 slave_locks->ha_handles[i].cookie = 0ull;
247 return mo_object_unlock(mti->mti_env, mdt_object_child(obj), einfo,
251 static inline int mdt_object_striped(struct mdt_thread_info *mti,
252 struct mdt_object *obj)
254 struct lu_device *bottom_dev;
255 struct lu_object *bottom_obj;
258 if (!S_ISDIR(obj->mot_header.loh_attr))
261 /* getxattr from bottom obj to avoid reading in shard FIDs */
262 bottom_dev = dt2lu_dev(mti->mti_mdt->mdt_bottom);
263 bottom_obj = lu_object_find_slice(mti->mti_env, bottom_dev,
264 mdt_object_fid(obj), NULL);
265 if (IS_ERR(bottom_obj))
266 return PTR_ERR(bottom_obj);
268 rc = dt_xattr_get(mti->mti_env, lu2dt(bottom_obj), &LU_BUF_NULL,
270 lu_object_put(mti->mti_env, bottom_obj);
272 return (rc > 0) ? 1 : (rc == -ENODATA) ? 0 : rc;
276 * Lock slave stripes if necessary, the lock handles of slave stripes
277 * will be stored in einfo->ei_cbdata.
279 static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj,
280 enum ldlm_mode mode, __u64 ibits,
281 struct ldlm_enqueue_info *einfo)
283 union ldlm_policy_data *policy = &mti->mti_policy;
285 LASSERT(S_ISDIR(obj->mot_header.loh_attr));
287 einfo->ei_type = LDLM_IBITS;
288 einfo->ei_mode = mode;
289 einfo->ei_cb_bl = mdt_remote_blocking_ast;
290 einfo->ei_cb_local_bl = mdt_blocking_ast;
291 einfo->ei_cb_cp = ldlm_completion_ast;
292 einfo->ei_enq_slave = 1;
293 einfo->ei_namespace = mti->mti_mdt->mdt_namespace;
294 einfo->ei_inodebits = ibits;
295 einfo->ei_req_slot = 1;
296 memset(policy, 0, sizeof(*policy));
297 policy->l_inodebits.bits = ibits;
299 return mo_object_lock(mti->mti_env, mdt_object_child(obj), NULL, einfo,
303 int mdt_reint_striped_lock(struct mdt_thread_info *info,
304 struct mdt_object *o,
305 struct mdt_lock_handle *lh,
307 struct ldlm_enqueue_info *einfo,
312 LASSERT(!mdt_object_remote(o));
314 memset(einfo, 0, sizeof(*einfo));
316 rc = mdt_reint_object_lock(info, o, lh, ibits, cos_incompat);
320 rc = mdt_object_striped(info, o);
323 mdt_object_unlock(info, o, lh, rc);
327 rc = mdt_lock_slaves(info, o, lh->mlh_reg_mode, ibits, einfo);
329 mdt_object_unlock(info, o, lh, rc);
330 if (rc == -EIO && OBD_FAIL_CHECK(OBD_FAIL_LFSCK_BAD_SLAVE_NAME))
337 void mdt_reint_striped_unlock(struct mdt_thread_info *info,
338 struct mdt_object *o,
339 struct mdt_lock_handle *lh,
340 struct ldlm_enqueue_info *einfo, int decref)
342 if (einfo->ei_cbdata)
343 mdt_unlock_slaves(info, o, einfo, decref);
344 mdt_object_unlock(info, o, lh, decref);
347 static int mdt_restripe(struct mdt_thread_info *info,
348 struct mdt_object *parent,
349 const struct lu_name *lname,
350 const struct lu_fid *tfid,
351 struct md_op_spec *spec,
354 struct mdt_device *mdt = info->mti_mdt;
355 struct lu_fid *fid = &info->mti_tmp_fid2;
356 struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
357 struct lmv_user_md *lum = spec->u.sp_ea.eadata;
358 struct lmv_mds_md_v1 *lmv;
359 struct mdt_object *child;
360 struct mdt_lock_handle *lhp;
361 struct mdt_lock_handle *lhc;
362 struct mdt_body *repbody;
366 if (!mdt->mdt_enable_dir_restripe)
370 lum->lum_hash_type |= cpu_to_le32(LMV_HASH_FLAG_FIXED);
372 rc = mdt_version_get_check_save(info, parent, 0);
376 lhp = &info->mti_lh[MDT_LH_PARENT];
377 mdt_lock_pdo_init(lhp, LCK_PW, lname);
378 rc = mdt_reint_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE,
383 rc = mdt_stripe_get(info, parent, ma, XATTR_NAME_LMV);
385 GOTO(unlock_parent, rc);
387 if (ma->ma_valid & MA_LMV) {
388 /* don't allow restripe if parent dir layout is changing */
389 lmv = &ma->ma_lmv->lmv_md_v1;
390 if (!lmv_is_sane2(lmv))
391 GOTO(unlock_parent, rc = -EBADF);
393 if (lmv_is_layout_changing(lmv))
394 GOTO(unlock_parent, rc = -EBUSY);
398 rc = mdt_lookup_version_check(info, parent, lname, fid, 1);
400 GOTO(unlock_parent, rc);
402 child = mdt_object_find(info->mti_env, mdt, fid);
404 GOTO(unlock_parent, rc = PTR_ERR(child));
406 if (!mdt_object_exists(child))
407 GOTO(out_child, rc = -ENOENT);
409 if (mdt_object_remote(child)) {
410 struct mdt_body *repbody;
412 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
414 GOTO(out_child, rc = -EPROTO);
416 repbody->mbo_fid1 = *fid;
417 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
418 GOTO(out_child, rc = -EREMOTE);
421 if (!S_ISDIR(lu_object_attr(&child->mot_obj)))
422 GOTO(out_child, rc = -ENOTDIR);
424 rc = mdt_stripe_get(info, child, ma, XATTR_NAME_LMV);
428 /* race with migrate? */
429 if ((ma->ma_valid & MA_LMV) &&
430 lmv_is_migrating(&ma->ma_lmv->lmv_md_v1))
431 GOTO(out_child, rc = -EBUSY);
434 lhc = &info->mti_lh[MDT_LH_CHILD];
435 mdt_lock_reg_init(lhc, LCK_EX);
437 /* enqueue object remote LOOKUP lock */
438 if (mdt_object_remote(parent)) {
439 rc = mdt_remote_object_lock(info, parent, fid,
442 MDS_INODELOCK_LOOKUP, false);
447 rc = mdt_reint_striped_lock(info, child, lhc, MDS_INODELOCK_FULL, einfo,
450 GOTO(unlock_child, rc);
452 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
453 rc = mdt_version_get_check_save(info, child, 1);
455 GOTO(unlock_child, rc);
457 spin_lock(&mdt->mdt_restriper.mdr_lock);
458 if (child->mot_restriping) {
460 spin_unlock(&mdt->mdt_restriper.mdr_lock);
461 GOTO(unlock_child, rc = -EBUSY);
463 child->mot_restriping = 1;
464 spin_unlock(&mdt->mdt_restriper.mdr_lock);
467 rc = mdt_restripe_internal(info, parent, child, lname, fid, spec, ma);
469 GOTO(restriping_clear, rc);
471 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
473 GOTO(restriping_clear, rc = -EPROTO);
475 mdt_pack_attr2body(info, repbody, &ma->ma_attr, fid);
479 child->mot_restriping = 0;
481 mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
483 mdt_object_put(info->mti_env, child);
485 mdt_object_unlock(info, parent, lhp, rc);
491 * VBR: we save three versions in reply:
492 * 0 - parent. Check that parent version is the same during replay.
493 * 1 - name. Version of 'name' if file exists with the same name or
494 * ENOENT_VERSION, it is needed because file may appear due to missed replays.
495 * 2 - child. Version of child by FID. Must be ENOENT. It is mostly sanity
498 static int mdt_create(struct mdt_thread_info *info)
500 struct mdt_device *mdt = info->mti_mdt;
501 struct mdt_object *parent;
502 struct mdt_object *child;
503 struct mdt_lock_handle *lh;
504 struct mdt_body *repbody;
505 struct md_attr *ma = &info->mti_attr;
506 struct mdt_reint_record *rr = &info->mti_rr;
507 struct md_op_spec *spec = &info->mti_spec;
508 bool restripe = false;
512 DEBUG_REQ(D_INODE, mdt_info_req(info),
513 "Create ("DNAME"->"DFID") in "DFID,
514 PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1));
516 if (!fid_is_md_operative(rr->rr_fid1))
519 if (S_ISDIR(ma->ma_attr.la_mode) &&
520 spec->u.sp_ea.eadata != NULL && spec->u.sp_ea.eadatalen != 0) {
521 const struct lmv_user_md *lum = spec->u.sp_ea.eadata;
522 struct lu_ucred *uc = mdt_ucred(info);
523 struct obd_export *exp = mdt_info_req(info)->rq_export;
525 /* Only new clients can create remote dir( >= 2.4) and
526 * striped dir(>= 2.6), old client will return -ENOTSUPP
528 if (!mdt_is_dne_client(exp))
531 if (le32_to_cpu(lum->lum_stripe_count) > 1) {
532 if (!mdt_is_striped_client(exp))
535 if (!mdt->mdt_enable_striped_dir)
537 } else if (!mdt->mdt_enable_remote_dir) {
541 if ((!(exp_connect_flags2(exp) & OBD_CONNECT2_CRUSH)) &&
542 (le32_to_cpu(lum->lum_hash_type) & LMV_HASH_TYPE_MASK) ==
546 if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
547 uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
548 mdt->mdt_enable_remote_dir_gid != -1)
551 /* restripe if later found dir exists, MDS_OPEN_CREAT means
552 * this is create only, don't try restripe.
554 if (mdt->mdt_enable_dir_restripe &&
555 le32_to_cpu(lum->lum_stripe_offset) == LMV_OFFSET_DEFAULT &&
556 !(spec->sp_cr_flags & MDS_OPEN_CREAT))
560 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
562 parent = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
564 RETURN(PTR_ERR(parent));
566 if (!mdt_object_exists(parent))
567 GOTO(put_parent, rc = -ENOENT);
570 * LU-10235: check if name exists locklessly first to avoid massive
571 * lock recalls on existing directories.
573 rc = mdt_lookup_version_check(info, parent, &rr->rr_name,
574 &info->mti_tmp_fid1, 1);
577 GOTO(put_parent, rc = -EEXIST);
579 rc = mdt_restripe(info, parent, &rr->rr_name, rr->rr_fid2, spec,
583 /* -ENOENT is expected here */
585 GOTO(put_parent, rc);
587 /* save version of file name for replay, it must be ENOENT here */
588 mdt_enoent_version_save(info, 1);
590 OBD_RACE(OBD_FAIL_MDS_CREATE_RACE);
592 lh = &info->mti_lh[MDT_LH_PARENT];
593 mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
594 rc = mdt_object_lock(info, parent, lh, MDS_INODELOCK_UPDATE);
596 GOTO(put_parent, rc);
598 if (!mdt_object_remote(parent)) {
599 rc = mdt_version_get_check_save(info, parent, 0);
601 GOTO(unlock_parent, rc);
604 child = mdt_object_new(info->mti_env, mdt, rr->rr_fid2);
605 if (unlikely(IS_ERR(child)))
606 GOTO(unlock_parent, rc = PTR_ERR(child));
608 ma->ma_need = MA_INODE;
611 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
612 OBD_FAIL_MDS_REINT_CREATE_WRITE);
614 /* Version of child will be updated on disk. */
615 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child));
616 rc = mdt_version_get_check_save(info, child, 2);
621 * Do not perform lookup sanity check. We know that name does
624 info->mti_spec.sp_cr_lookup = 0;
625 info->mti_spec.sp_feat = &dt_directory_features;
627 rc = mdo_create(info->mti_env, mdt_object_child(parent), &rr->rr_name,
628 mdt_object_child(child), &info->mti_spec, ma);
630 rc = mdt_attr_get_complex(info, child, ma);
636 * On DNE, we need to eliminate dependey between 'mkdir a' and
637 * 'mkdir a/b' if b is a striped directory, to achieve this, two
638 * things are done below:
639 * 1. save child and slaves lock.
640 * 2. if the child is a striped directory, relock parent so to
641 * compare against with COS locks to ensure parent was
644 if (mdt_slc_is_enabled(mdt) && S_ISDIR(ma->ma_attr.la_mode)) {
645 struct mdt_lock_handle *lhc;
646 struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
649 rc = mdt_object_striped(info, child);
655 if (!mdt_object_remote(parent)) {
656 mdt_object_unlock(info, parent, lh, 1);
657 mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name);
658 rc = mdt_reint_object_lock(info, parent, lh,
659 MDS_INODELOCK_UPDATE,
666 lhc = &info->mti_lh[MDT_LH_CHILD];
667 mdt_lock_handle_init(lhc);
668 mdt_lock_reg_init(lhc, LCK_PW);
669 rc = mdt_reint_striped_lock(info, child, lhc,
670 MDS_INODELOCK_UPDATE, einfo,
675 mdt_reint_striped_unlock(info, child, lhc, einfo, rc);
678 /* Return fid & attr to client. */
679 if (ma->ma_valid & MA_INODE)
680 mdt_pack_attr2body(info, repbody, &ma->ma_attr,
681 mdt_object_fid(child));
684 mdt_object_put(info->mti_env, child);
686 mdt_object_unlock(info, parent, lh, rc);
688 mdt_object_put(info->mti_env, parent);
692 static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo,
695 struct mdt_lock_handle *lh;
696 int do_vbr = ma->ma_attr.la_valid &
697 (LA_MODE | LA_UID | LA_GID | LA_PROJID | LA_FLAGS);
698 __u64 lockpart = MDS_INODELOCK_UPDATE;
699 struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
704 rc = mdt_object_striped(info, mo);
710 lh = &info->mti_lh[MDT_LH_PARENT];
711 mdt_lock_reg_init(lh, LCK_PW);
713 /* Even though the new MDT will grant PERM lock to the old
714 * client, but the old client will almost ignore that during
715 * So it needs to revoke both LOOKUP and PERM lock here, so
716 * both new and old client can cancel the dcache
718 if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID))
719 lockpart |= MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM;
721 rc = mdt_reint_striped_lock(info, mo, lh, lockpart, einfo,
726 /* all attrs are packed into mti_attr in unpack_setattr */
727 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
728 OBD_FAIL_MDS_REINT_SETATTR_WRITE);
730 /* VBR: update version if attr changed are important for recovery */
732 /* update on-disk version of changed object */
733 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mo));
734 rc = mdt_version_get_check_save(info, mo, 0);
736 GOTO(out_unlock, rc);
739 /* Ensure constant striping during chown(). See LU-2789. */
740 if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
741 mutex_lock(&mo->mot_lov_mutex);
743 /* all attrs are packed into mti_attr in unpack_setattr */
744 rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma);
746 if (ma->ma_attr.la_valid & (LA_UID|LA_GID|LA_PROJID))
747 mutex_unlock(&mo->mot_lov_mutex);
750 GOTO(out_unlock, rc);
751 mdt_dom_obj_lvb_update(info->mti_env, mo, NULL, false);
754 mdt_reint_striped_unlock(info, mo, lh, einfo, rc);
759 * Check HSM flags and add HS_DIRTY flag if relevant.
761 * A file could be set dirty only if it has a copy in the backend (HS_EXISTS)
762 * and is not RELEASED.
764 int mdt_add_dirty_flag(struct mdt_thread_info *info, struct mdt_object *mo,
767 struct lu_ucred *uc = mdt_ucred(info);
768 kernel_cap_t cap_saved;
772 /* If the file was modified, add the dirty flag */
773 ma->ma_need = MA_HSM;
774 rc = mdt_attr_get_complex(info, mo, ma);
776 CERROR("file attribute read error for "DFID": %d.\n",
777 PFID(mdt_object_fid(mo)), rc);
781 /* If an up2date copy exists in the backend, add dirty flag */
782 if ((ma->ma_valid & MA_HSM) && (ma->ma_hsm.mh_flags & HS_EXISTS)
783 && !(ma->ma_hsm.mh_flags & (HS_DIRTY|HS_RELEASED))) {
784 ma->ma_hsm.mh_flags |= HS_DIRTY;
786 /* Bump cap so that closes from non-owner writers can
787 * set the HSM state to dirty.
789 cap_saved = uc->uc_cap;
790 cap_raise(uc->uc_cap, CAP_FOWNER);
791 rc = mdt_hsm_attr_set(info, mo, &ma->ma_hsm);
792 uc->uc_cap = cap_saved;
794 CERROR("file attribute change error for "DFID": %d\n",
795 PFID(mdt_object_fid(mo)), rc);
801 static int mdt_reint_setattr(struct mdt_thread_info *info,
802 struct mdt_lock_handle *lhc)
804 struct mdt_device *mdt = info->mti_mdt;
805 struct md_attr *ma = &info->mti_attr;
806 struct mdt_reint_record *rr = &info->mti_rr;
807 struct ptlrpc_request *req = mdt_info_req(info);
808 struct mdt_object *mo;
809 struct mdt_body *repbody;
810 ktime_t kstart = ktime_get();
814 DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1),
815 (unsigned int)ma->ma_attr.la_valid);
817 if (info->mti_dlm_req)
818 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
820 OBD_RACE(OBD_FAIL_PTLRPC_RESEND_RACE);
822 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
823 mo = mdt_object_find(info->mti_env, mdt, rr->rr_fid1);
825 GOTO(out, rc = PTR_ERR(mo));
827 if (!mdt_object_exists(mo))
828 GOTO(out_put, rc = -ENOENT);
830 if (mdt_object_remote(mo))
831 GOTO(out_put, rc = -EREMOTE);
833 ma->ma_enable_chprojid_gid = mdt->mdt_enable_chprojid_gid;
834 /* revoke lease lock if size is going to be changed */
835 if (unlikely(ma->ma_attr.la_valid & LA_SIZE &&
836 !(ma->ma_attr_flags & MDS_TRUNC_KEEP_LEASE) &&
837 atomic_read(&mo->mot_lease_count) > 0)) {
838 down_read(&mo->mot_open_sem);
840 if (atomic_read(&mo->mot_lease_count) > 0) { /* lease exists */
841 lhc = &info->mti_lh[MDT_LH_LOCAL];
842 mdt_lock_reg_init(lhc, LCK_CW);
844 rc = mdt_object_lock(info, mo, lhc, MDS_INODELOCK_OPEN);
846 up_read(&mo->mot_open_sem);
850 /* revoke lease lock */
851 mdt_object_unlock(info, mo, lhc, 1);
853 up_read(&mo->mot_open_sem);
856 if (ma->ma_attr.la_valid & LA_SIZE || rr->rr_flags & MRF_OPEN_TRUNC) {
857 /* Check write access for the O_TRUNC case */
858 if (mdt_write_read(mo) < 0)
859 GOTO(out_put, rc = -ETXTBSY);
861 /* LU-10286: compatibility check for FLR.
862 * Please check the comment in mdt_finish_open() for details
864 if (!exp_connect_flr(info->mti_exp) ||
865 !exp_connect_overstriping(info->mti_exp)) {
866 rc = mdt_big_xattr_get(info, mo, XATTR_NAME_LOV);
867 if (rc < 0 && rc != -ENODATA)
870 if (!exp_connect_flr(info->mti_exp)) {
872 mdt_lmm_is_flr(info->mti_big_lmm))
873 GOTO(out_put, rc = -EOPNOTSUPP);
876 if (!exp_connect_overstriping(info->mti_exp)) {
878 mdt_lmm_is_overstriping(info->mti_big_lmm))
879 GOTO(out_put, rc = -EOPNOTSUPP);
883 /* For truncate, the file size sent from client
884 * is believable, but the blocks are incorrect,
885 * which makes the block size in LSOM attribute
886 * inconsisent with the real block size.
888 rc = mdt_lsom_update(info, mo, true);
893 if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) {
894 if (ma->ma_valid & MA_LOV)
895 GOTO(out_put, rc = -EPROTO);
897 /* MDT supports FMD for regular files due to Data-on-MDT */
898 if (S_ISREG(lu_object_attr(&mo->mot_obj)) &&
899 ma->ma_attr.la_valid & (LA_ATIME | LA_MTIME | LA_CTIME)) {
900 tgt_fmd_update(info->mti_exp, mdt_object_fid(mo),
903 if (ma->ma_attr.la_valid & LA_MTIME) {
904 rc = mdt_attr_get_pfid(info, mo, &ma->ma_pfid);
906 ma->ma_valid |= MA_PFID;
910 rc = mdt_attr_set(info, mo, ma);
913 } else if ((ma->ma_valid & (MA_LOV | MA_LMV)) &&
914 (ma->ma_valid & MA_INODE)) {
915 struct lu_buf *buf = &info->mti_buf;
916 struct lu_ucred *uc = mdt_ucred(info);
917 struct mdt_lock_handle *lh;
919 __u64 lockpart = MDS_INODELOCK_XATTR;
921 /* reject if either remote or striped dir is disabled */
922 if (ma->ma_valid & MA_LMV) {
923 if (!mdt->mdt_enable_remote_dir ||
924 !mdt->mdt_enable_striped_dir)
925 GOTO(out_put, rc = -EPERM);
927 if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
928 uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
929 mdt->mdt_enable_remote_dir_gid != -1)
930 GOTO(out_put, rc = -EPERM);
933 if (!S_ISDIR(lu_object_attr(&mo->mot_obj)))
934 GOTO(out_put, rc = -ENOTDIR);
936 if (ma->ma_attr.la_valid != 0)
937 GOTO(out_put, rc = -EPROTO);
939 lh = &info->mti_lh[MDT_LH_PARENT];
940 mdt_lock_reg_init(lh, LCK_PW);
942 if (ma->ma_valid & MA_LOV) {
943 buf->lb_buf = ma->ma_lmm;
944 buf->lb_len = ma->ma_lmm_size;
945 name = XATTR_NAME_LOV;
947 struct lmv_user_md *lmu = &ma->ma_lmv->lmv_user_md;
948 struct lu_fid *pfid = &info->mti_tmp_fid1;
949 struct lu_name *pname = &info->mti_name;
950 const char dotdot[] = "..";
951 struct mdt_object *pobj;
954 buf->lb_len = ma->ma_lmv_size;
955 name = XATTR_NAME_DEFAULT_LMV;
957 if (fid_is_root(rr->rr_fid1)) {
958 lockpart |= MDS_INODELOCK_LOOKUP;
960 /* force client to update dir default layout */
962 pname->ln_name = dotdot;
963 pname->ln_namelen = sizeof(dotdot);
964 rc = mdo_lookup(info->mti_env,
965 mdt_object_child(mo), pname,
970 pobj = mdt_object_find(info->mti_env, mdt,
973 GOTO(out_put, rc = PTR_ERR(pobj));
975 if (mdt_object_remote(pobj))
976 rc = mdt_remote_object_lock(info, pobj,
978 &lh->mlh_rreg_lh, LCK_EX,
979 MDS_INODELOCK_LOOKUP, false);
981 lockpart |= MDS_INODELOCK_LOOKUP;
983 mdt_object_put(info->mti_env, pobj);
990 rc = mdt_object_lock(info, mo, lh, lockpart);
994 rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), buf,
997 mdt_object_unlock(info, mo, lh, rc);
1001 GOTO(out_put, rc = -EPROTO);
1004 /* If file data is modified, add the dirty flag */
1005 if (ma->ma_attr_flags & MDS_DATA_MODIFIED)
1006 rc = mdt_add_dirty_flag(info, mo, ma);
1008 ma->ma_need = MA_INODE;
1010 rc = mdt_attr_get_complex(info, mo, ma);
1014 mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
1018 mdt_object_put(info->mti_env, mo);
1021 mdt_counter_incr(req, LPROC_MDT_SETATTR,
1022 ktime_us_delta(ktime_get(), kstart));
1024 mdt_client_compatibility(info);
1025 rc2 = mdt_fix_reply(info);
1031 static int mdt_reint_create(struct mdt_thread_info *info,
1032 struct mdt_lock_handle *lhc)
1034 struct ptlrpc_request *req = mdt_info_req(info);
1035 ktime_t kstart = ktime_get();
1039 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE))
1040 RETURN(err_serious(-ESTALE));
1042 if (info->mti_dlm_req)
1043 ldlm_request_cancel(mdt_info_req(info),
1044 info->mti_dlm_req, 0, LATF_SKIP);
1046 if (!lu_name_is_valid(&info->mti_rr.rr_name))
1049 switch (info->mti_attr.ma_attr.la_mode & S_IFMT) {
1059 CERROR("%s: Unsupported mode %o\n",
1060 mdt_obd_name(info->mti_mdt),
1061 info->mti_attr.ma_attr.la_mode);
1062 RETURN(err_serious(-EOPNOTSUPP));
1065 rc = mdt_create(info);
1067 if ((info->mti_attr.ma_attr.la_mode & S_IFMT) == S_IFDIR)
1068 mdt_counter_incr(req, LPROC_MDT_MKDIR,
1069 ktime_us_delta(ktime_get(), kstart));
1071 /* Special file should stay on the same node as parent*/
1072 mdt_counter_incr(req, LPROC_MDT_MKNOD,
1073 ktime_us_delta(ktime_get(), kstart));
1080 * VBR: save parent version in reply and child version getting by its name.
1081 * Version of child is getting and checking during its lookup. If
1083 static int mdt_reint_unlink(struct mdt_thread_info *info,
1084 struct mdt_lock_handle *lhc)
1086 struct mdt_reint_record *rr = &info->mti_rr;
1087 struct ptlrpc_request *req = mdt_info_req(info);
1088 struct md_attr *ma = &info->mti_attr;
1089 struct lu_fid *child_fid = &info->mti_tmp_fid1;
1090 struct mdt_object *mp;
1091 struct mdt_object *mc;
1092 struct mdt_lock_handle *parent_lh;
1093 struct mdt_lock_handle *child_lh;
1094 struct ldlm_enqueue_info *einfo = &info->mti_einfo[0];
1096 bool cos_incompat = false;
1098 ktime_t kstart = ktime_get();
1102 DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1),
1103 PNAME(&rr->rr_name));
1105 if (info->mti_dlm_req)
1106 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1108 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK))
1109 RETURN(err_serious(-ENOENT));
1111 if (!fid_is_md_operative(rr->rr_fid1))
1114 mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1116 RETURN(PTR_ERR(mp));
1118 if (mdt_object_remote(mp)) {
1119 cos_incompat = true;
1121 rc = mdt_version_get_check_save(info, mp, 0);
1123 GOTO(put_parent, rc);
1126 OBD_RACE(OBD_FAIL_MDS_REINT_OPEN);
1127 OBD_RACE(OBD_FAIL_MDS_REINT_OPEN2);
1129 parent_lh = &info->mti_lh[MDT_LH_PARENT];
1130 mdt_lock_pdo_init(parent_lh, LCK_PW, &rr->rr_name);
1131 rc = mdt_reint_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE,
1134 GOTO(put_parent, rc);
1136 if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID) {
1137 *child_fid = *rr->rr_fid2;
1139 /* lookup child object along with version checking */
1140 fid_zero(child_fid);
1141 rc = mdt_lookup_version_check(info, mp, &rr->rr_name, child_fid,
1144 /* Name might not be able to find during resend of
1145 * remote unlink, considering following case.
1146 * dir_A is a remote directory, the name entry of
1147 * dir_A is on MDT0, the directory is on MDT1,
1149 * 1. client sends unlink req to MDT1.
1150 * 2. MDT1 sends name delete update to MDT0.
1151 * 3. name entry is being deleted in MDT0 synchronously.
1152 * 4. MDT1 is restarted.
1153 * 5. client resends unlink req to MDT1. So it can not
1154 * find the name entry on MDT0 anymore.
1155 * In this case, MDT1 only needs to destory the local
1158 if (mdt_object_remote(mp) && rc == -ENOENT &&
1159 !fid_is_zero(rr->rr_fid2) &&
1160 lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
1162 *child_fid = *rr->rr_fid2;
1164 GOTO(unlock_parent, rc);
1169 if (!fid_is_md_operative(child_fid))
1170 GOTO(unlock_parent, rc = -EPERM);
1172 /* We will lock the child regardless it is local or remote. No harm. */
1173 mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid);
1175 GOTO(unlock_parent, rc = PTR_ERR(mc));
1177 if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID) {
1178 /* In this case, child fid is embedded in the request, and we do
1179 * not have a proper name as rr_name contains an encoded
1180 * hash. So find name that matches provided hash.
1182 if (!find_name_matching_hash(info, &rr->rr_name,
1184 GOTO(put_child, rc = -ENOENT);
1187 if (!cos_incompat) {
1188 rc = mdt_object_striped(info, mc);
1190 GOTO(put_child, rc);
1194 mdt_object_put(info->mti_env, mc);
1195 mdt_object_unlock(info, mp, parent_lh, -EAGAIN);
1200 child_lh = &info->mti_lh[MDT_LH_CHILD];
1201 mdt_lock_reg_init(child_lh, LCK_EX);
1202 if (info->mti_spec.sp_rm_entry) {
1203 struct lu_ucred *uc = mdt_ucred(info);
1205 if (!mdt_is_dne_client(req->rq_export))
1206 /* Return -ENOTSUPP for old client */
1207 GOTO(put_child, rc = -ENOTSUPP);
1209 if (!cap_raised(uc->uc_cap, CAP_SYS_ADMIN))
1210 GOTO(put_child, rc = -EPERM);
1212 ma->ma_need = MA_INODE;
1214 rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1215 NULL, &rr->rr_name, ma, no_name);
1216 GOTO(put_child, rc);
1219 if (mdt_object_remote(mc)) {
1220 struct mdt_body *repbody;
1222 if (!fid_is_zero(rr->rr_fid2)) {
1223 CDEBUG(D_INFO, "%s: name "DNAME" cannot find "DFID"\n",
1224 mdt_obd_name(info->mti_mdt),
1225 PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1226 GOTO(put_child, rc = -ENOENT);
1228 CDEBUG(D_INFO, "%s: name "DNAME": "DFID" is on another MDT\n",
1229 mdt_obd_name(info->mti_mdt),
1230 PNAME(&rr->rr_name), PFID(mdt_object_fid(mc)));
1232 if (!mdt_is_dne_client(req->rq_export))
1233 /* Return -ENOTSUPP for old client */
1234 GOTO(put_child, rc = -ENOTSUPP);
1236 /* Revoke the LOOKUP lock of the remote object granted by
1237 * this MDT. Since the unlink will happen on another MDT,
1238 * it will release the LOOKUP lock right away. Then What
1239 * would happen if another client try to grab the LOOKUP
1240 * lock at the same time with unlink XXX
1242 mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_LOOKUP);
1243 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
1244 LASSERT(repbody != NULL);
1245 repbody->mbo_fid1 = *mdt_object_fid(mc);
1246 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
1247 GOTO(unlock_child, rc = -EREMOTE);
1249 /* We used to acquire MDS_INODELOCK_FULL here but we can't do
1250 * this now because a running HSM restore on the child (unlink
1251 * victim) will hold the layout lock. See LU-4002.
1253 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
1254 if (mdt_object_remote(mp)) {
1255 /* Enqueue lookup lock from parent MDT */
1256 rc = mdt_remote_object_lock(info, mp, mdt_object_fid(mc),
1257 &child_lh->mlh_rreg_lh,
1258 child_lh->mlh_rreg_mode,
1259 MDS_INODELOCK_LOOKUP, false);
1261 GOTO(put_child, rc);
1263 lock_ibits &= ~MDS_INODELOCK_LOOKUP;
1266 rc = mdt_reint_striped_lock(info, mc, child_lh, lock_ibits, einfo,
1269 GOTO(put_child, rc);
1272 * Now we can only make sure we need MA_INODE, in mdd layer, will check
1273 * whether need MA_LOV and MA_COOKIE.
1275 ma->ma_need = MA_INODE;
1278 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1279 OBD_FAIL_MDS_REINT_UNLINK_WRITE);
1280 /* save version when object is locked */
1281 mdt_version_get_save(info, mc, 1);
1283 mutex_lock(&mc->mot_lov_mutex);
1285 rc = mdo_unlink(info->mti_env, mdt_object_child(mp),
1286 mdt_object_child(mc), &rr->rr_name, ma, no_name);
1288 mutex_unlock(&mc->mot_lov_mutex);
1290 GOTO(unlock_child, rc);
1292 if (!lu_object_is_dying(&mc->mot_header)) {
1293 rc = mdt_attr_get_complex(info, mc, ma);
1296 } else if (mdt_dom_check_for_discard(info, mc)) {
1297 mdt_dom_discard_data(info, mc);
1299 mdt_handle_last_unlink(info, mc, ma);
1302 if (ma->ma_valid & MA_INODE) {
1303 switch (ma->ma_attr.la_mode & S_IFMT) {
1305 mdt_counter_incr(req, LPROC_MDT_RMDIR,
1306 ktime_us_delta(ktime_get(), kstart));
1314 mdt_counter_incr(req, LPROC_MDT_UNLINK,
1315 ktime_us_delta(ktime_get(), kstart));
1318 LASSERTF(0, "bad file type %o unlinking\n",
1319 ma->ma_attr.la_mode);
1326 mdt_reint_striped_unlock(info, mc, child_lh, einfo, rc);
1328 if (info->mti_spec.sp_cr_flags & MDS_OP_WITH_FID &&
1329 info->mti_big_buf.lb_buf)
1330 lu_buf_free(&info->mti_big_buf);
1331 mdt_object_put(info->mti_env, mc);
1333 mdt_object_unlock(info, mp, parent_lh, rc);
1335 mdt_object_put(info->mti_env, mp);
1336 CFS_RACE_WAKEUP(OBD_FAIL_OBD_ZERO_NLINK_RACE);
1341 * VBR: save versions in reply: 0 - parent; 1 - child by fid; 2 - target by
1344 static int mdt_reint_link(struct mdt_thread_info *info,
1345 struct mdt_lock_handle *lhc)
1347 struct mdt_reint_record *rr = &info->mti_rr;
1348 struct ptlrpc_request *req = mdt_info_req(info);
1349 struct md_attr *ma = &info->mti_attr;
1350 struct mdt_object *ms;
1351 struct mdt_object *mp;
1352 struct mdt_lock_handle *lhs;
1353 struct mdt_lock_handle *lhp;
1354 ktime_t kstart = ktime_get();
1359 DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/"DNAME,
1360 PFID(rr->rr_fid1), PFID(rr->rr_fid2), PNAME(&rr->rr_name));
1362 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK))
1363 RETURN(err_serious(-ENOENT));
1365 if (OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_RESEND_RACE) ||
1366 OBD_FAIL_PRECHECK(OBD_FAIL_PTLRPC_ENQ_RESEND)) {
1367 req->rq_no_reply = 1;
1368 RETURN(err_serious(-ENOENT));
1371 if (info->mti_dlm_req)
1372 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
1374 /* Invalid case so return error immediately instead of
1377 if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2))
1380 if (!fid_is_md_operative(rr->rr_fid1) ||
1381 !fid_is_md_operative(rr->rr_fid2))
1384 /* step 1: find target parent dir */
1385 mp = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid2);
1387 RETURN(PTR_ERR(mp));
1389 rc = mdt_version_get_check_save(info, mp, 0);
1391 GOTO(put_parent, rc);
1393 /* step 2: find source */
1394 ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
1396 GOTO(put_parent, rc = PTR_ERR(ms));
1398 if (!mdt_object_exists(ms)) {
1399 CDEBUG(D_INFO, "%s: "DFID" does not exist.\n",
1400 mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1));
1401 GOTO(put_source, rc = -ENOENT);
1404 cos_incompat = (mdt_object_remote(mp) || mdt_object_remote(ms));
1406 OBD_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
1408 lhp = &info->mti_lh[MDT_LH_PARENT];
1409 mdt_lock_pdo_init(lhp, LCK_PW, &rr->rr_name);
1410 rc = mdt_reint_object_lock(info, mp, lhp, MDS_INODELOCK_UPDATE,
1413 GOTO(put_source, rc);
1415 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
1417 lhs = &info->mti_lh[MDT_LH_CHILD];
1418 mdt_lock_reg_init(lhs, LCK_EX);
1419 rc = mdt_reint_object_lock(info, ms, lhs,
1420 MDS_INODELOCK_UPDATE | MDS_INODELOCK_XATTR,
1423 GOTO(unlock_parent, rc);
1425 /* step 3: link it */
1426 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
1427 OBD_FAIL_MDS_REINT_LINK_WRITE);
1429 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(ms));
1430 rc = mdt_version_get_check_save(info, ms, 1);
1432 GOTO(unlock_source, rc);
1434 /** check target version by name during replay */
1435 rc = mdt_lookup_version_check(info, mp, &rr->rr_name,
1436 &info->mti_tmp_fid1, 2);
1437 if (rc != 0 && rc != -ENOENT)
1438 GOTO(unlock_source, rc);
1439 /* save version of file name for replay, it must be ENOENT here */
1440 if (!req_is_replay(mdt_info_req(info))) {
1441 if (rc != -ENOENT) {
1442 CDEBUG(D_INFO, "link target "DNAME" existed!\n",
1443 PNAME(&rr->rr_name));
1444 GOTO(unlock_source, rc = -EEXIST);
1446 info->mti_ver[2] = ENOENT_VERSION;
1447 mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2);
1450 rc = mdo_link(info->mti_env, mdt_object_child(mp),
1451 mdt_object_child(ms), &rr->rr_name, ma);
1454 mdt_counter_incr(req, LPROC_MDT_LINK,
1455 ktime_us_delta(ktime_get(), kstart));
1459 mdt_object_unlock(info, ms, lhs, rc);
1461 mdt_object_unlock(info, mp, lhp, rc);
1463 mdt_object_put(info->mti_env, ms);
1465 mdt_object_put(info->mti_env, mp);
1469 * lock the part of the directory according to the hash of the name
1470 * (lh->mlh_pdo_hash) in parallel directory lock.
1472 static int mdt_pdir_hash_lock(struct mdt_thread_info *info,
1473 struct mdt_lock_handle *lh,
1474 struct mdt_object *obj, __u64 ibits,
1477 struct ldlm_res_id *res = &info->mti_res_id;
1478 struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1479 union ldlm_policy_data *policy = &info->mti_policy;
1480 __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
1484 * Finish res_id initializing by name hash marking part of
1485 * directory which is taking modification.
1487 LASSERT(lh->mlh_pdo_hash != 0);
1488 fid_build_pdo_res_name(mdt_object_fid(obj), lh->mlh_pdo_hash, res);
1489 memset(policy, 0, sizeof(*policy));
1490 policy->l_inodebits.bits = ibits;
1492 (lh->mlh_reg_mode == LCK_PW || lh->mlh_reg_mode == LCK_EX))
1493 dlmflags |= LDLM_FL_COS_INCOMPAT;
1495 * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is
1496 * going to be sent to client. If it is - mdt_intent_policy() path will
1497 * fix it up and turn FL_LOCAL flag off.
1499 rc = mdt_fid_lock(info->mti_env, ns, &lh->mlh_reg_lh, lh->mlh_reg_mode,
1500 policy, res, dlmflags,
1501 &info->mti_exp->exp_handle.h_cookie);
1506 * Get BFL lock for rename or migrate process.
1508 static int mdt_rename_lock(struct mdt_thread_info *info,
1509 struct lustre_handle *lh)
1514 if (mdt_seq_site(info->mti_mdt)->ss_node_id != 0) {
1515 struct lu_fid *fid = &info->mti_tmp_fid1;
1516 struct mdt_object *obj;
1518 /* XXX, right now, it has to use object API to
1519 * enqueue lock cross MDT, so it will enqueue
1520 * rename lock(with LUSTRE_BFL_FID) by root object
1523 obj = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1525 RETURN(PTR_ERR(obj));
1527 rc = mdt_remote_object_lock(info, obj,
1528 &LUSTRE_BFL_FID, lh,
1530 MDS_INODELOCK_UPDATE, false);
1531 mdt_object_put(info->mti_env, obj);
1533 struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace;
1534 union ldlm_policy_data *policy = &info->mti_policy;
1535 struct ldlm_res_id *res_id = &info->mti_res_id;
1538 fid_build_reg_res_name(&LUSTRE_BFL_FID, res_id);
1539 memset(policy, 0, sizeof(*policy));
1540 policy->l_inodebits.bits = MDS_INODELOCK_UPDATE;
1541 flags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB;
1542 rc = ldlm_cli_enqueue_local(info->mti_env, ns, res_id,
1543 LDLM_IBITS, policy, LCK_EX, &flags,
1545 ldlm_completion_ast, NULL, NULL, 0,
1547 &info->mti_exp->exp_handle.h_cookie,
1554 static void mdt_rename_unlock(struct lustre_handle *lh)
1557 LASSERT(lustre_handle_is_used(lh));
1558 /* Cancel the single rename lock right away */
1559 ldlm_lock_decref_and_cancel(lh, LCK_EX);
1563 static struct mdt_object *mdt_parent_find_check(struct mdt_thread_info *info,
1564 const struct lu_fid *fid,
1567 struct mdt_object *dir;
1571 dir = mdt_object_find(info->mti_env, info->mti_mdt, fid);
1575 /* check early, the real version will be saved after locking */
1576 rc = mdt_version_get_check(info, dir, idx);
1580 if (!mdt_object_exists(dir))
1581 GOTO(out_put, rc = -ENOENT);
1583 if (!S_ISDIR(lu_object_attr(&dir->mot_obj)))
1584 GOTO(out_put, rc = -ENOTDIR);
1588 mdt_object_put(info->mti_env, dir);
1593 * in case obj is remote obj on its parent, revoke LOOKUP lock,
1594 * herein we don't really check it, just do revoke.
1596 int mdt_revoke_remote_lookup_lock(struct mdt_thread_info *info,
1597 struct mdt_object *pobj,
1598 struct mdt_object *obj)
1600 struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_LOCAL];
1603 mdt_lock_handle_init(lh);
1604 mdt_lock_reg_init(lh, LCK_EX);
1606 if (mdt_object_remote(pobj)) {
1607 /* don't bother to check if pobj and obj are on the same MDT. */
1608 rc = mdt_remote_object_lock(info, pobj, mdt_object_fid(obj),
1609 &lh->mlh_rreg_lh, LCK_EX,
1610 MDS_INODELOCK_LOOKUP, false);
1611 } else if (mdt_object_remote(obj)) {
1612 struct ldlm_res_id *res = &info->mti_res_id;
1613 union ldlm_policy_data *policy = &info->mti_policy;
1614 __u64 dlmflags = LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB |
1615 LDLM_FL_COS_INCOMPAT;
1617 fid_build_reg_res_name(mdt_object_fid(obj), res);
1618 memset(policy, 0, sizeof(*policy));
1619 policy->l_inodebits.bits = MDS_INODELOCK_LOOKUP;
1620 rc = mdt_fid_lock(info->mti_env, info->mti_mdt->mdt_namespace,
1621 &lh->mlh_reg_lh, LCK_EX, policy, res,
1624 /* do nothing if both are local */
1632 * TODO, currently we don't save this lock because there is no place to
1633 * hold this lock handle, but to avoid race we need to save this lock.
1635 mdt_object_unlock(info, NULL, lh, 1);
1641 * operation may takes locks of linkea, or directory stripes, group them in
1644 struct mdt_sub_lock {
1645 struct mdt_object *msl_obj;
1646 struct mdt_lock_handle msl_lh;
1647 struct list_head msl_linkage;
1650 static void mdt_unlock_list(struct mdt_thread_info *info,
1651 struct list_head *list, int decref)
1653 struct mdt_sub_lock *msl;
1654 struct mdt_sub_lock *tmp;
1656 list_for_each_entry_safe(msl, tmp, list, msl_linkage) {
1657 mdt_object_unlock_put(info, msl->msl_obj, &msl->msl_lh, decref);
1658 list_del(&msl->msl_linkage);
1663 static inline void mdt_migrate_object_unlock(struct mdt_thread_info *info,
1664 struct mdt_object *obj,
1665 struct mdt_lock_handle *lh,
1666 struct ldlm_enqueue_info *einfo,
1667 struct list_head *slave_locks,
1670 if (mdt_object_remote(obj)) {
1671 mdt_unlock_list(info, slave_locks, decref);
1672 mdt_object_unlock(info, obj, lh, decref);
1674 mdt_reint_striped_unlock(info, obj, lh, einfo, decref);
1679 * lock parents of links, and also check whether total locks don't exceed
1682 * \retval 0 on success, and locks can be saved in ptlrpc_reply_stat
1683 * \retval 1 on success, but total lock count may exceed RS_MAX_LOCKS
1684 * \retval -ev negative errno upon error
1686 static int mdt_link_parents_lock(struct mdt_thread_info *info,
1687 struct mdt_object *pobj,
1688 const struct md_attr *ma,
1689 struct mdt_object *obj,
1690 struct mdt_lock_handle *lhp,
1691 struct ldlm_enqueue_info *peinfo,
1692 struct list_head *parent_slave_locks,
1693 struct list_head *link_locks)
1695 struct mdt_device *mdt = info->mti_mdt;
1696 struct lu_buf *buf = &info->mti_big_buf;
1697 struct lu_name *lname = &info->mti_name;
1698 struct linkea_data ldata = { NULL };
1699 bool blocked = false;
1700 int local_lnkp_cnt = 0;
1704 if (S_ISDIR(lu_object_attr(&obj->mot_obj)))
1707 buf = lu_buf_check_and_alloc(buf, MAX_LINKEA_SIZE);
1708 if (buf->lb_buf == NULL)
1712 rc = mdt_links_read(info, obj, &ldata);
1714 if (rc == -ENOENT || rc == -ENODATA)
1719 for (linkea_first_entry(&ldata); ldata.ld_lee && !rc;
1720 linkea_next_entry(&ldata)) {
1721 struct mdt_object *lnkp;
1722 struct mdt_sub_lock *msl;
1726 linkea_entry_unpack(ldata.ld_lee, &ldata.ld_reclen, lname,
1729 /* check if it's also linked to parent */
1730 if (lu_fid_eq(mdt_object_fid(pobj), &fid)) {
1731 CDEBUG(D_INFO, "skip parent "DFID", reovke "DNAME"\n",
1732 PFID(&fid), PNAME(lname));
1733 /* in case link is remote object, revoke LOOKUP lock */
1734 rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
1740 /* check if it's linked to a stripe of parent */
1741 if (ma->ma_valid & MA_LMV) {
1742 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1743 struct lu_fid *stripe_fid = &info->mti_tmp_fid1;
1746 for (; j < le32_to_cpu(lmv->lmv_stripe_count); j++) {
1747 fid_le_to_cpu(stripe_fid,
1748 &lmv->lmv_stripe_fids[j]);
1749 if (lu_fid_eq(stripe_fid, &fid)) {
1750 CDEBUG(D_INFO, "skip stripe "DFID
1751 ", reovke "DNAME"\n",
1752 PFID(&fid), PNAME(lname));
1753 lnkp = mdt_object_find(info->mti_env,
1756 GOTO(out, rc = PTR_ERR(lnkp));
1762 rc = mdt_revoke_remote_lookup_lock(info, lnkp,
1764 mdt_object_put(info->mti_env, lnkp);
1769 /* Check if it's already locked */
1770 list_for_each_entry(msl, link_locks, msl_linkage) {
1771 if (lu_fid_eq(mdt_object_fid(msl->msl_obj), &fid)) {
1773 DFID" was locked, revoke "DNAME"\n",
1774 PFID(&fid), PNAME(lname));
1775 lnkp = msl->msl_obj;
1781 rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
1785 CDEBUG(D_INFO, "lock "DFID":"DNAME"\n",
1786 PFID(&fid), PNAME(lname));
1788 lnkp = mdt_object_find(info->mti_env, mdt, &fid);
1790 CWARN("%s: cannot find obj "DFID": %ld\n",
1791 mdt_obd_name(mdt), PFID(&fid), PTR_ERR(lnkp));
1795 if (!mdt_object_exists(lnkp)) {
1796 CDEBUG(D_INFO, DFID" doesn't exist, skip "DNAME"\n",
1797 PFID(&fid), PNAME(lname));
1798 mdt_object_put(info->mti_env, lnkp);
1802 if (!mdt_object_remote(lnkp))
1807 GOTO(out, rc = -ENOMEM);
1810 * we can't follow parent-child lock order like other MD
1811 * operations, use lock_try here to avoid deadlock, if the lock
1812 * cannot be taken, drop all locks taken, revoke the blocked
1813 * one, and continue processing the remaining entries, and in
1814 * the end of the loop restart from beginning.
1816 mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
1818 rc = mdt_object_lock_try(info, lnkp, &msl->msl_lh, &ibits,
1819 MDS_INODELOCK_UPDATE, true);
1820 if (!(ibits & MDS_INODELOCK_UPDATE)) {
1822 CDEBUG(D_INFO, "busy lock on "DFID" "DNAME"\n",
1823 PFID(&fid), PNAME(lname));
1825 mdt_unlock_list(info, link_locks, 1);
1826 /* also unlock parent locks to avoid deadlock */
1828 mdt_migrate_object_unlock(info, pobj, lhp,
1835 mdt_lock_pdo_init(&msl->msl_lh, LCK_PW, lname);
1836 rc = mdt_object_lock(info, lnkp, &msl->msl_lh,
1837 MDS_INODELOCK_UPDATE);
1839 mdt_object_put(info->mti_env, lnkp);
1844 if (mdt_object_remote(lnkp)) {
1845 struct ldlm_lock *lock;
1848 * for remote object, set lock cb_atomic,
1849 * so lock can be released in blocking_ast()
1850 * immediately, then the next lock_try will
1851 * have better chance of success.
1853 lock = ldlm_handle2lock(
1854 &msl->msl_lh.mlh_rreg_lh);
1855 LASSERT(lock != NULL);
1856 lock_res_and_lock(lock);
1857 ldlm_set_atomic_cb(lock);
1858 unlock_res_and_lock(lock);
1859 LDLM_LOCK_PUT(lock);
1862 mdt_object_unlock_put(info, lnkp, &msl->msl_lh, 1);
1867 INIT_LIST_HEAD(&msl->msl_linkage);
1868 msl->msl_obj = lnkp;
1869 list_add_tail(&msl->msl_linkage, link_locks);
1871 rc = mdt_revoke_remote_lookup_lock(info, lnkp, obj);
1875 GOTO(out, rc = -EBUSY);
1880 mdt_unlock_list(info, link_locks, rc);
1881 } else if (local_lnkp_cnt > RS_MAX_LOCKS - 5) {
1882 CDEBUG(D_INFO, "Too many links (%d), sync operations\n",
1885 * parent may have 3 local objects: master object and 2 stripes
1886 * (if it's being migrated too); source may have 1 local objects
1887 * as regular file; target has 1 local object.
1888 * Note, source may have 2 local locks if it is directory but it
1889 * can't have hardlinks, so it is not considered here.
1896 static int mdt_lock_remote_slaves(struct mdt_thread_info *info,
1897 struct mdt_object *obj,
1898 const struct md_attr *ma,
1899 struct list_head *slave_locks)
1901 struct mdt_device *mdt = info->mti_mdt;
1902 const struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
1903 struct lu_fid *fid = &info->mti_tmp_fid1;
1904 struct mdt_object *slave;
1905 struct mdt_sub_lock *msl;
1910 LASSERT(mdt_object_remote(obj));
1911 LASSERT(ma->ma_valid & MA_LMV);
1914 if (!lmv_is_sane(lmv))
1917 for (i = 0; i < le32_to_cpu(lmv->lmv_stripe_count); i++) {
1918 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[i]);
1920 if (!fid_is_sane(fid))
1923 slave = mdt_object_find(info->mti_env, mdt, fid);
1925 GOTO(out, rc = PTR_ERR(slave));
1929 mdt_object_put(info->mti_env, slave);
1930 GOTO(out, rc = -ENOMEM);
1933 mdt_lock_reg_init(&msl->msl_lh, LCK_EX);
1934 rc = mdt_reint_object_lock(info, slave, &msl->msl_lh,
1935 MDS_INODELOCK_UPDATE, true);
1938 mdt_object_put(info->mti_env, slave);
1942 INIT_LIST_HEAD(&msl->msl_linkage);
1943 msl->msl_obj = slave;
1944 list_add_tail(&msl->msl_linkage, slave_locks);
1950 mdt_unlock_list(info, slave_locks, rc);
1954 /* lock parent and its stripes */
1955 static int mdt_migrate_parent_lock(struct mdt_thread_info *info,
1956 struct mdt_object *obj,
1957 const struct md_attr *ma,
1958 struct mdt_lock_handle *lh,
1959 struct ldlm_enqueue_info *einfo,
1960 struct list_head *slave_locks)
1964 if (mdt_object_remote(obj)) {
1965 rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
1966 &lh->mlh_rreg_lh, LCK_PW,
1967 MDS_INODELOCK_UPDATE, false);
1972 * if obj is remote and striped, lock its stripes explicitly
1973 * because it's not striped in LOD layer on this MDT.
1975 if (ma->ma_valid & MA_LMV) {
1976 rc = mdt_lock_remote_slaves(info, obj, ma, slave_locks);
1978 mdt_object_unlock(info, obj, lh, rc);
1981 rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_UPDATE,
1989 * in migration, object may be remote, and we need take full lock of it and its
1990 * stripes if it's directory, besides, object may be a remote object on its
1991 * parent, revoke its LOOKUP lock on where its parent is located.
1993 static int mdt_migrate_object_lock(struct mdt_thread_info *info,
1994 struct mdt_object *pobj,
1995 struct mdt_object *obj,
1996 struct mdt_lock_handle *lh,
1997 struct ldlm_enqueue_info *einfo,
1998 struct list_head *slave_locks)
2002 if (mdt_object_remote(obj)) {
2003 rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
2007 rc = mdt_remote_object_lock(info, obj, mdt_object_fid(obj),
2008 &lh->mlh_rreg_lh, LCK_EX,
2009 MDS_INODELOCK_FULL, false);
2014 * if obj is remote and striped, lock its stripes explicitly
2015 * because it's not striped in LOD layer on this MDT.
2017 if (S_ISDIR(lu_object_attr(&obj->mot_obj))) {
2018 struct md_attr *ma = &info->mti_attr;
2020 rc = mdt_stripe_get(info, obj, ma, XATTR_NAME_LMV);
2022 mdt_object_unlock(info, obj, lh, rc);
2026 if (ma->ma_valid & MA_LMV) {
2027 rc = mdt_lock_remote_slaves(info, obj, ma,
2030 mdt_object_unlock(info, obj, lh, rc);
2034 if (mdt_object_remote(pobj)) {
2035 rc = mdt_revoke_remote_lookup_lock(info, pobj, obj);
2040 rc = mdt_reint_striped_lock(info, obj, lh, MDS_INODELOCK_FULL,
2048 * lookup source by name, if parent is striped directory, we need to find the
2049 * corresponding stripe where source is located, and then lookup there.
2051 * besides, if parent is migrating too, and file is already in target stripe,
2052 * this should be a redo of 'lfs migrate' on client side.
2054 static int mdt_migrate_lookup(struct mdt_thread_info *info,
2055 struct mdt_object *pobj,
2056 const struct md_attr *ma,
2057 const struct lu_name *lname,
2058 struct mdt_object **spobj,
2059 struct mdt_object **sobj)
2061 const struct lu_env *env = info->mti_env;
2062 struct lu_fid *fid = &info->mti_tmp_fid1;
2063 struct mdt_object *stripe;
2066 if (ma->ma_valid & MA_LMV) {
2067 /* if parent is striped, lookup on corresponding stripe */
2068 struct lmv_mds_md_v1 *lmv = &ma->ma_lmv->lmv_md_v1;
2070 if (!lmv_is_sane(lmv))
2073 rc = lmv_name_to_stripe_index_old(lmv, lname->ln_name,
2078 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
2080 stripe = mdt_object_find(env, info->mti_mdt, fid);
2082 return PTR_ERR(stripe);
2085 rc = mdo_lookup(env, mdt_object_child(stripe), lname, fid,
2087 if (rc == -ENOENT && lmv_is_layout_changing(lmv)) {
2089 * if parent layout is changeing, and lookup child
2090 * failed on source stripe, lookup again on target
2091 * stripe, if it exists, it means previous migration
2092 * was interrupted, and current file was migrated
2095 mdt_object_put(env, stripe);
2097 rc = lmv_name_to_stripe_index(lmv, lname->ln_name,
2102 fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[rc]);
2104 stripe = mdt_object_find(env, info->mti_mdt, fid);
2106 return PTR_ERR(stripe);
2109 rc = mdo_lookup(env, mdt_object_child(stripe), lname,
2110 fid, &info->mti_spec);
2111 mdt_object_put(env, stripe);
2112 return rc ?: -EALREADY;
2114 mdt_object_put(env, stripe);
2119 rc = mdo_lookup(env, mdt_object_child(pobj), lname, fid,
2125 mdt_object_get(env, stripe);
2130 *sobj = mdt_object_find(env, info->mti_mdt, fid);
2131 if (IS_ERR(*sobj)) {
2132 mdt_object_put(env, stripe);
2133 rc = PTR_ERR(*sobj);
2141 /* end lease and close file for regular file */
2142 static int mdd_migrate_close(struct mdt_thread_info *info,
2143 struct mdt_object *obj)
2145 struct close_data *data;
2146 struct mdt_body *repbody;
2147 struct ldlm_lock *lease;
2152 if (!req_capsule_field_present(info->mti_pill, &RMF_MDT_EPOCH,
2154 !req_capsule_field_present(info->mti_pill, &RMF_CLOSE_DATA,
2158 data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
2163 lease = ldlm_handle2lock(&data->cd_handle);
2167 /* check if the lease was already canceled */
2168 lock_res_and_lock(lease);
2169 rc = ldlm_is_cancel(lease);
2170 unlock_res_and_lock(lease);
2174 LDLM_DEBUG(lease, DFID" lease broken",
2175 PFID(mdt_object_fid(obj)));
2179 * cancel server side lease, client side counterpart should have been
2180 * cancelled, it's okay to cancel it now as we've held mot_open_sem.
2182 ldlm_lock_cancel(lease);
2183 ldlm_reprocess_all(lease->l_resource,
2184 lease->l_policy_data.l_inodebits.bits);
2185 LDLM_LOCK_PUT(lease);
2188 rc2 = mdt_close_internal(info, mdt_info_req(info), NULL);
2189 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
2190 repbody->mbo_valid |= OBD_MD_CLOSE_INTENT_EXECED;
2196 * migrate file in below steps:
2197 * 1. lock parent and its stripes
2198 * 2. lookup source by name
2199 * 3. lock parents of source links if source is not directory
2200 * 4. reject if source is in HSM
2201 * 5. take source open_sem and close file if source is regular file
2202 * 6. lock source and its stripes if it's directory
2203 * 7. lock target so subsequent change to it can trigger COS
2205 * 9. unlock above locks
2206 * 10. sync device if source has links
2208 int mdt_reint_migrate(struct mdt_thread_info *info,
2209 struct mdt_lock_handle *unused)
2211 const struct lu_env *env = info->mti_env;
2212 struct mdt_device *mdt = info->mti_mdt;
2213 struct ptlrpc_request *req = mdt_info_req(info);
2214 struct mdt_reint_record *rr = &info->mti_rr;
2215 struct lu_ucred *uc = mdt_ucred(info);
2216 struct md_attr *ma = &info->mti_attr;
2217 struct ldlm_enqueue_info *peinfo = &info->mti_einfo[0];
2218 struct ldlm_enqueue_info *seinfo = &info->mti_einfo[1];
2219 struct mdt_object *pobj;
2220 struct mdt_object *spobj = NULL;
2221 struct mdt_object *sobj = NULL;
2222 struct mdt_object *tobj;
2223 struct lustre_handle rename_lh = { 0 };
2224 struct mdt_lock_handle *lhp;
2225 struct mdt_lock_handle *lhs;
2226 struct mdt_lock_handle *lht;
2227 LIST_HEAD(parent_slave_locks);
2228 LIST_HEAD(child_slave_locks);
2229 LIST_HEAD(link_locks);
2230 int lock_retries = 5;
2231 bool open_sem_locked = false;
2232 bool do_sync = false;
2236 CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1),
2237 PNAME(&rr->rr_name), PFID(rr->rr_fid2));
2239 if (info->mti_dlm_req)
2240 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2242 if (!fid_is_md_operative(rr->rr_fid1) ||
2243 !fid_is_md_operative(rr->rr_fid2))
2246 /* don't allow migrate . or .. */
2247 if (lu_name_is_dot_or_dotdot(&rr->rr_name))
2250 if (!mdt->mdt_enable_remote_dir || !mdt->mdt_enable_dir_migration)
2253 if (uc && !cap_raised(uc->uc_cap, CAP_SYS_ADMIN) &&
2254 uc->uc_gid != mdt->mdt_enable_remote_dir_gid &&
2255 mdt->mdt_enable_remote_dir_gid != -1)
2259 * Note: do not enqueue rename lock for replay request, because
2260 * if other MDT holds rename lock, but being blocked to wait for
2261 * this MDT to finish its recovery, and the failover MDT can not
2262 * get rename lock, which will cause deadlock.
2264 * req is NULL if this is called by directory auto-split.
2266 if (req && !req_is_replay(req)) {
2267 rc = mdt_rename_lock(info, &rename_lh);
2269 CERROR("%s: can't lock FS for rename: rc = %d\n",
2270 mdt_obd_name(info->mti_mdt), rc);
2275 /* pobj is master object of parent */
2276 pobj = mdt_object_find(env, mdt, rr->rr_fid1);
2278 GOTO(unlock_rename, rc = PTR_ERR(pobj));
2281 rc = mdt_version_get_check(info, pobj, 0);
2283 GOTO(put_parent, rc);
2286 if (!mdt_object_exists(pobj))
2287 GOTO(put_parent, rc = -ENOENT);
2289 if (!S_ISDIR(lu_object_attr(&pobj->mot_obj)))
2290 GOTO(put_parent, rc = -ENOTDIR);
2292 rc = mdt_stripe_get(info, pobj, ma, XATTR_NAME_LMV);
2294 GOTO(put_parent, rc);
2297 /* lock parent object */
2298 lhp = &info->mti_lh[MDT_LH_PARENT];
2299 mdt_lock_reg_init(lhp, LCK_PW);
2300 rc = mdt_migrate_parent_lock(info, pobj, ma, lhp, peinfo,
2301 &parent_slave_locks);
2303 GOTO(put_parent, rc);
2306 * spobj is the corresponding stripe against name if pobj is striped
2307 * directory, which is the real parent, and no need to lock, because
2308 * we've taken full lock of pobj.
2310 rc = mdt_migrate_lookup(info, pobj, ma, &rr->rr_name, &spobj, &sobj);
2312 GOTO(unlock_parent, rc);
2314 /* lock parents of source links, and revoke LOOKUP lock of links */
2315 rc = mdt_link_parents_lock(info, pobj, ma, sobj, lhp, peinfo,
2316 &parent_slave_locks, &link_locks);
2317 if (rc == -EBUSY && lock_retries-- > 0) {
2318 mdt_object_put(env, sobj);
2319 mdt_object_put(env, spobj);
2324 GOTO(put_source, rc);
2327 * RS_MAX_LOCKS is the limit of number of locks that can be saved along
2328 * with one request, if total lock count exceeds this limit, we will
2329 * drop all locks after migration, and synchronous device in the end.
2333 /* TODO: DoM migration is not supported, migrate dirent only */
2334 if (S_ISREG(lu_object_attr(&sobj->mot_obj))) {
2335 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LOV);
2337 GOTO(unlock_links, rc);
2339 if (ma->ma_valid & MA_LOV && mdt_lmm_dom_stripesize(ma->ma_lmm))
2340 info->mti_spec.sp_migrate_nsonly = 1;
2341 } else if (S_ISDIR(lu_object_attr(&sobj->mot_obj))) {
2342 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2344 GOTO(unlock_links, rc);
2346 /* race with restripe/auto-split? */
2347 if ((ma->ma_valid & MA_LMV) &&
2348 lmv_is_restriping(&ma->ma_lmv->lmv_md_v1))
2349 GOTO(unlock_links, rc = -EBUSY);
2352 /* if migration HSM is allowed */
2353 if (!mdt->mdt_opts.mo_migrate_hsm_allowed) {
2354 ma->ma_need = MA_HSM;
2356 rc = mdt_attr_get_complex(info, sobj, ma);
2358 GOTO(unlock_links, rc);
2360 if ((ma->ma_valid & MA_HSM) && ma->ma_hsm.mh_flags != 0)
2361 GOTO(unlock_links, rc = -EOPNOTSUPP);
2364 /* end lease and close file for regular file */
2365 if (info->mti_spec.sp_migrate_close) {
2366 /* try to hold open_sem so that nobody else can open the file */
2367 if (!down_write_trylock(&sobj->mot_open_sem)) {
2369 mdd_migrate_close(info, sobj);
2370 GOTO(unlock_links, rc = -EBUSY);
2372 open_sem_locked = true;
2373 rc = mdd_migrate_close(info, sobj);
2375 GOTO(unlock_open_sem, rc);
2380 lhs = &info->mti_lh[MDT_LH_OLD];
2381 mdt_lock_reg_init(lhs, LCK_EX);
2382 rc = mdt_migrate_object_lock(info, spobj, sobj, lhs, seinfo,
2383 &child_slave_locks);
2385 GOTO(unlock_open_sem, rc);
2388 tobj = mdt_object_find(env, mdt, rr->rr_fid2);
2390 GOTO(unlock_source, rc = PTR_ERR(tobj));
2392 lht = &info->mti_lh[MDT_LH_NEW];
2393 mdt_lock_reg_init(lht, LCK_EX);
2394 rc = mdt_reint_object_lock(info, tobj, lht, MDS_INODELOCK_FULL, true);
2396 GOTO(put_target, rc);
2398 /* Don't do lookup sanity check. We know name doesn't exist. */
2399 info->mti_spec.sp_cr_lookup = 0;
2400 info->mti_spec.sp_feat = &dt_directory_features;
2402 rc = mdo_migrate(env, mdt_object_child(pobj),
2403 mdt_object_child(sobj), &rr->rr_name,
2404 mdt_object_child(tobj),
2405 &info->mti_spec, ma);
2407 lprocfs_counter_incr(mdt->mdt_lu_dev.ld_obd->obd_md_stats,
2408 LPROC_MDT_MIGRATE + LPROC_MD_LAST_OPC);
2411 mdt_object_unlock(info, tobj, lht, rc);
2413 mdt_object_put(env, tobj);
2415 mdt_migrate_object_unlock(info, sobj, lhs, seinfo,
2416 &child_slave_locks, rc);
2418 if (open_sem_locked)
2419 up_write(&sobj->mot_open_sem);
2421 /* if we've got too many locks to save into RPC,
2422 * then just commit before the locks are released
2425 mdt_device_sync(env, mdt);
2426 mdt_unlock_list(info, &link_locks, do_sync ? 1 : rc);
2428 mdt_object_put(env, sobj);
2429 mdt_object_put(env, spobj);
2431 mdt_migrate_object_unlock(info, pobj, lhp, peinfo,
2432 &parent_slave_locks, rc);
2434 mdt_object_put(env, pobj);
2436 if (lustre_handle_is_used(&rename_lh))
2437 mdt_rename_unlock(&rename_lh);
2442 static int mdt_object_lock_save(struct mdt_thread_info *info,
2443 struct mdt_object *dir,
2444 struct mdt_lock_handle *lh,
2445 int idx, bool cos_incompat)
2449 /* we lock the target dir if it is local */
2450 rc = mdt_reint_object_lock(info, dir, lh, MDS_INODELOCK_UPDATE,
2455 /* get and save correct version after locking */
2456 mdt_version_get_save(info, dir, idx);
2461 * determine lock order of sobj and tobj
2463 * there are two situations we need to lock tobj before sobj:
2464 * 1. sobj is child of tobj
2465 * 2. sobj and tobj are stripes of a directory, and stripe index of sobj is
2466 * larger than that of tobj
2468 * \retval 1 lock tobj before sobj
2469 * \retval 0 lock sobj before tobj
2470 * \retval -ev negative errno upon error
2472 static int mdt_rename_determine_lock_order(struct mdt_thread_info *info,
2473 struct mdt_object *sobj,
2474 struct mdt_object *tobj)
2476 struct md_attr *ma = &info->mti_attr;
2477 struct lu_fid *spfid = &info->mti_tmp_fid1;
2478 struct lu_fid *tpfid = &info->mti_tmp_fid2;
2479 struct lmv_mds_md_v1 *lmv;
2484 /* sobj and tobj are the same */
2488 if (fid_is_root(mdt_object_fid(sobj)))
2491 if (fid_is_root(mdt_object_fid(tobj)))
2494 /* check whether sobj is child of tobj */
2495 rc = mdo_is_subdir(info->mti_env, mdt_object_child(sobj),
2496 mdt_object_fid(tobj));
2503 /* check whether sobj and tobj are children of the same parent */
2504 rc = mdt_attr_get_pfid(info, sobj, spfid);
2508 rc = mdt_attr_get_pfid(info, tobj, tpfid);
2512 if (!lu_fid_eq(spfid, tpfid))
2515 /* check whether sobj and tobj are sibling stripes */
2516 rc = mdt_stripe_get(info, sobj, ma, XATTR_NAME_LMV);
2520 if (!(ma->ma_valid & MA_LMV))
2523 lmv = &ma->ma_lmv->lmv_md_v1;
2524 if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2526 sindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2529 rc = mdt_stripe_get(info, tobj, ma, XATTR_NAME_LMV);
2533 if (!(ma->ma_valid & MA_LMV))
2536 lmv = &ma->ma_lmv->lmv_md_v1;
2537 if (!(le32_to_cpu(lmv->lmv_magic) & LMV_MAGIC_STRIPE))
2539 tindex = le32_to_cpu(lmv->lmv_master_mdt_index);
2541 /* check stripe index of sobj and tobj */
2542 if (sindex == tindex)
2545 return sindex < tindex ? 0 : 1;
2549 * lock rename source object.
2551 * Both source and source parent may be remote, and source may be a remote
2552 * object on source parent, to avoid overriding lock handle, store remote
2553 * LOOKUP lock separately in @lhr.
2555 * \retval 0 on success
2556 * \retval -ev negative errno upon error
2558 static int mdt_rename_source_lock(struct mdt_thread_info *info,
2559 struct mdt_object *parent,
2560 struct mdt_object *child,
2561 struct mdt_lock_handle *lhc,
2562 struct mdt_lock_handle *lhr,
2568 rc = mdt_is_remote_object(info, parent, child);
2573 /* enqueue remote LOOKUP lock from the parent MDT */
2574 __u64 rmt_ibits = MDS_INODELOCK_LOOKUP;
2576 if (mdt_object_remote(parent)) {
2577 rc = mdt_remote_object_lock(info, parent,
2578 mdt_object_fid(child),
2585 LASSERT(mdt_object_remote(child));
2586 rc = mdt_object_local_lock(info, child, lhr,
2587 &rmt_ibits, 0, true);
2592 ibits &= ~MDS_INODELOCK_LOOKUP;
2595 if (mdt_object_remote(child)) {
2596 rc = mdt_remote_object_lock(info, child, mdt_object_fid(child),
2603 rc = mdt_reint_object_lock(info, child, lhc, ibits,
2608 mdt_object_unlock(info, child, lhr, rc);
2614 * VBR: rename versions in reply: 0 - srcdir parent; 1 - tgtdir parent;
2615 * 2 - srcdir child; 3 - tgtdir child.
2616 * Update on disk version of srcdir child.
2618 static int mdt_reint_rename(struct mdt_thread_info *info,
2619 struct mdt_lock_handle *unused)
2621 struct mdt_device *mdt = info->mti_mdt;
2622 struct mdt_reint_record *rr = &info->mti_rr;
2623 struct md_attr *ma = &info->mti_attr;
2624 struct ptlrpc_request *req = mdt_info_req(info);
2625 struct mdt_object *msrcdir = NULL;
2626 struct mdt_object *mtgtdir = NULL;
2627 struct mdt_object *mold;
2628 struct mdt_object *mnew = NULL;
2629 struct lustre_handle rename_lh = { 0 };
2630 struct mdt_lock_handle *lh_srcdirp;
2631 struct mdt_lock_handle *lh_tgtdirp;
2632 struct mdt_lock_handle *lh_oldp = NULL;
2633 struct mdt_lock_handle *lh_rmt = NULL;
2634 struct mdt_lock_handle *lh_newp = NULL;
2635 struct lu_fid *old_fid = &info->mti_tmp_fid1;
2636 struct lu_fid *new_fid = &info->mti_tmp_fid2;
2638 bool reverse = false, discard = false;
2640 ktime_t kstart = ktime_get();
2644 DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME,
2645 PFID(rr->rr_fid1), PNAME(&rr->rr_name),
2646 PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name));
2648 if (info->mti_dlm_req)
2649 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
2651 if (!fid_is_md_operative(rr->rr_fid1) ||
2652 !fid_is_md_operative(rr->rr_fid2))
2655 /* find both parents. */
2656 msrcdir = mdt_parent_find_check(info, rr->rr_fid1, 0);
2657 if (IS_ERR(msrcdir))
2658 RETURN(PTR_ERR(msrcdir));
2660 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5);
2662 if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) {
2664 mdt_object_get(info->mti_env, mtgtdir);
2666 mtgtdir = mdt_parent_find_check(info, rr->rr_fid2, 1);
2667 if (IS_ERR(mtgtdir))
2668 GOTO(out_put_srcdir, rc = PTR_ERR(mtgtdir));
2672 * Note: do not enqueue rename lock for replay request, because
2673 * if other MDT holds rename lock, but being blocked to wait for
2674 * this MDT to finish its recovery, and the failover MDT can not
2675 * get rename lock, which will cause deadlock.
2677 if (!req_is_replay(req)) {
2679 * Normally rename RPC is handled on the MDT with the target
2680 * directory (if target exists, it's on the MDT with the
2681 * target), if the source directory is remote, it's a hint that
2682 * source is remote too (this may not be true, but it won't
2683 * cause any issue), return -EXDEV early to avoid taking
2686 if (!mdt->mdt_enable_remote_rename &&
2687 mdt_object_remote(msrcdir))
2688 GOTO(out_put_tgtdir, rc = -EXDEV);
2690 /* This might be further relaxed in the future for regular file
2691 * renames in different source and target parents. Start with
2692 * only same-directory renames for simplicity and because this
2693 * is by far the most the common use case.
2695 if (msrcdir != mtgtdir) {
2696 rc = mdt_rename_lock(info, &rename_lh);
2698 CERROR("%s: cannot lock for rename: rc = %d\n",
2699 mdt_obd_name(mdt), rc);
2700 GOTO(out_put_tgtdir, rc);
2703 CDEBUG(D_INFO, "%s: samedir rename "DFID"/"DNAME"\n",
2704 mdt_obd_name(mdt), PFID(rr->rr_fid1),
2705 PNAME(&rr->rr_name));
2709 rc = mdt_rename_determine_lock_order(info, msrcdir, mtgtdir);
2711 GOTO(out_unlock_rename, rc);
2715 /* source needs to be looked up after locking source parent, otherwise
2716 * this rename may race with unlink source, and cause rename hang, see
2717 * sanityn.sh 55b, so check parents first, if later we found source is
2718 * remote, relock parents.
2720 cos_incompat = (mdt_object_remote(msrcdir) ||
2721 mdt_object_remote(mtgtdir));
2723 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2725 /* lock parents in the proper order. */
2726 lh_srcdirp = &info->mti_lh[MDT_LH_PARENT];
2727 lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD];
2729 OBD_RACE(OBD_FAIL_MDS_REINT_OPEN);
2730 OBD_RACE(OBD_FAIL_MDS_REINT_OPEN2);
2732 mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name);
2733 mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name);
2736 rc = mdt_object_lock_save(info, mtgtdir, lh_tgtdirp, 1,
2739 GOTO(out_unlock_rename, rc);
2741 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
2743 rc = mdt_object_lock_save(info, msrcdir, lh_srcdirp, 0,
2746 mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
2747 GOTO(out_unlock_rename, rc);
2750 rc = mdt_object_lock_save(info, msrcdir, lh_srcdirp, 0,
2753 GOTO(out_unlock_rename, rc);
2755 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5);
2757 if (mtgtdir != msrcdir) {
2758 rc = mdt_object_lock_save(info, mtgtdir, lh_tgtdirp, 1,
2760 } else if (!mdt_object_remote(mtgtdir) &&
2761 lh_srcdirp->mlh_pdo_hash !=
2762 lh_tgtdirp->mlh_pdo_hash) {
2763 rc = mdt_pdir_hash_lock(info, lh_tgtdirp, mtgtdir,
2764 MDS_INODELOCK_UPDATE,
2766 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10);
2769 mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
2770 GOTO(out_unlock_rename, rc);
2774 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5);
2775 OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME2, 5);
2777 /* find mold object. */
2779 rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2);
2781 GOTO(out_unlock_parents, rc);
2783 if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2))
2784 GOTO(out_unlock_parents, rc = -EINVAL);
2786 if (!fid_is_md_operative(old_fid))
2787 GOTO(out_unlock_parents, rc = -EPERM);
2789 mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid);
2791 GOTO(out_unlock_parents, rc = PTR_ERR(mold));
2793 if (!mdt_object_exists(mold)) {
2794 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2796 "object does not exist");
2797 GOTO(out_put_old, rc = -ENOENT);
2800 if (mdt_object_remote(mold) && !mdt->mdt_enable_remote_rename)
2801 GOTO(out_put_old, rc = -EXDEV);
2803 /* Check if @mtgtdir is subdir of @mold, before locking child
2804 * to avoid reverse locking.
2806 if (mtgtdir != msrcdir) {
2807 rc = mdo_is_subdir(info->mti_env, mdt_object_child(mtgtdir),
2812 GOTO(out_put_old, rc);
2816 tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold));
2817 /* save version after locking */
2818 mdt_version_get_save(info, mold, 2);
2820 if (!cos_incompat && mdt_object_remote(mold)) {
2821 cos_incompat = true;
2822 mdt_object_put(info->mti_env, mold);
2823 mdt_object_unlock(info, mtgtdir, lh_tgtdirp, -EAGAIN);
2824 mdt_object_unlock(info, msrcdir, lh_srcdirp, -EAGAIN);
2828 /* find mnew object:
2829 * mnew target object may not exist now
2830 * lookup with version checking
2833 rc = mdt_lookup_version_check(info, mtgtdir, &rr->rr_tgt_name, new_fid,
2836 /* the new_fid should have been filled at this moment */
2837 if (lu_fid_eq(old_fid, new_fid))
2838 GOTO(out_put_old, rc);
2840 if (lu_fid_eq(new_fid, rr->rr_fid1) ||
2841 lu_fid_eq(new_fid, rr->rr_fid2))
2842 GOTO(out_put_old, rc = -EINVAL);
2844 if (!fid_is_md_operative(new_fid))
2845 GOTO(out_put_old, rc = -EPERM);
2847 mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid);
2849 GOTO(out_put_old, rc = PTR_ERR(mnew));
2851 if (!mdt_object_exists(mnew)) {
2852 LU_OBJECT_DEBUG(D_INODE, info->mti_env,
2854 "object does not exist");
2855 GOTO(out_put_new, rc = -ENOENT);
2858 if (mdt_object_remote(mnew)) {
2859 struct mdt_body *repbody;
2861 /* Always send rename req to the target child MDT */
2862 repbody = req_capsule_server_get(info->mti_pill,
2864 LASSERT(repbody != NULL);
2865 repbody->mbo_fid1 = *new_fid;
2866 repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS);
2867 GOTO(out_put_new, rc = -EXDEV);
2869 /* Before locking the target dir, check we do not replace
2870 * a dir with a non-dir, otherwise it may deadlock with
2871 * link op which tries to create a link in this dir
2872 * back to this non-dir.
2874 if (S_ISDIR(lu_object_attr(&mnew->mot_obj)) &&
2875 !S_ISDIR(lu_object_attr(&mold->mot_obj)))
2876 GOTO(out_put_new, rc = -EISDIR);
2878 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2879 lh_rmt = &info->mti_lh[MDT_LH_RMT];
2880 mdt_lock_reg_init(lh_oldp, LCK_EX);
2881 mdt_lock_reg_init(lh_rmt, LCK_EX);
2882 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
2883 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2884 lh_rmt, lock_ibits, cos_incompat);
2886 GOTO(out_put_new, rc);
2888 /* Check if @msrcdir is subdir of @mnew, before locking child
2889 * to avoid reverse locking.
2891 if (mtgtdir != msrcdir) {
2892 rc = mdo_is_subdir(info->mti_env,
2893 mdt_object_child(msrcdir), new_fid);
2897 GOTO(out_unlock_old, rc);
2901 /* We used to acquire MDS_INODELOCK_FULL here but we
2902 * can't do this now because a running HSM restore on
2903 * the rename onto victim will hold the layout
2904 * lock. See LU-4002.
2907 lh_newp = &info->mti_lh[MDT_LH_NEW];
2908 mdt_lock_reg_init(lh_newp, LCK_EX);
2909 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE;
2910 if (mdt_object_remote(mtgtdir)) {
2911 rc = mdt_remote_object_lock(info, mtgtdir,
2912 mdt_object_fid(mnew),
2913 &lh_newp->mlh_rreg_lh,
2914 lh_newp->mlh_rreg_mode,
2915 MDS_INODELOCK_LOOKUP,
2918 GOTO(out_unlock_old, rc);
2920 lock_ibits &= ~MDS_INODELOCK_LOOKUP;
2922 rc = mdt_reint_object_lock(info, mnew, lh_newp, lock_ibits,
2925 GOTO(out_unlock_new, rc);
2927 /* get and save version after locking */
2928 mdt_version_get_save(info, mnew, 3);
2929 } else if (rc != -ENOENT) {
2930 GOTO(out_put_old, rc);
2932 lh_oldp = &info->mti_lh[MDT_LH_OLD];
2933 lh_rmt = &info->mti_lh[MDT_LH_RMT];
2934 mdt_lock_reg_init(lh_oldp, LCK_EX);
2935 mdt_lock_reg_init(lh_rmt, LCK_EX);
2936 lock_ibits = MDS_INODELOCK_LOOKUP | MDS_INODELOCK_XATTR;
2937 rc = mdt_rename_source_lock(info, msrcdir, mold, lh_oldp,
2938 lh_rmt, lock_ibits, cos_incompat);
2940 GOTO(out_put_old, rc);
2942 mdt_enoent_version_save(info, 3);
2945 /* step 5: rename it */
2946 mdt_reint_init_ma(info, ma);
2948 mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom,
2949 OBD_FAIL_MDS_REINT_RENAME_WRITE);
2952 mutex_lock(&mnew->mot_lov_mutex);
2954 rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir),
2955 mdt_object_child(mtgtdir), old_fid, &rr->rr_name,
2956 mnew != NULL ? mdt_object_child(mnew) : NULL,
2957 &rr->rr_tgt_name, ma);
2960 mutex_unlock(&mnew->mot_lov_mutex);
2962 /* handle last link of tgt object */
2964 mdt_counter_incr(req, LPROC_MDT_RENAME,
2965 ktime_us_delta(ktime_get(), kstart));
2967 mdt_handle_last_unlink(info, mnew, ma);
2968 discard = mdt_dom_check_for_discard(info, mnew);
2970 mdt_rename_counter_tally(info, info->mti_mdt, req,
2972 ktime_us_delta(ktime_get(), kstart));
2978 mdt_object_unlock(info, mnew, lh_newp, rc);
2980 mdt_object_unlock(info, NULL, lh_rmt, rc);
2981 mdt_object_unlock(info, mold, lh_oldp, rc);
2983 if (mnew && !discard)
2984 mdt_object_put(info->mti_env, mnew);
2986 mdt_object_put(info->mti_env, mold);
2988 mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc);
2989 mdt_object_unlock(info, msrcdir, lh_srcdirp, rc);
2991 if (lustre_handle_is_used(&rename_lh))
2992 mdt_rename_unlock(&rename_lh);
2994 mdt_object_put(info->mti_env, mtgtdir);
2996 mdt_object_put(info->mti_env, msrcdir);
2998 /* The DoM discard can be done right in the place above where it is
2999 * assigned, meanwhile it is done here after rename unlock due to
3000 * compatibility with old clients, for them the discard blocks
3001 * the main thread until completion. Check LU-11359 for details.
3004 mdt_dom_discard_data(info, mnew);
3005 mdt_object_put(info->mti_env, mnew);
3007 OBD_RACE(OBD_FAIL_MDS_LINK_RENAME_RACE);
3011 static int mdt_reint_resync(struct mdt_thread_info *info,
3012 struct mdt_lock_handle *lhc)
3014 struct mdt_reint_record *rr = &info->mti_rr;
3015 struct ptlrpc_request *req = mdt_info_req(info);
3016 struct md_attr *ma = &info->mti_attr;
3017 struct mdt_object *mo;
3018 struct ldlm_lock *lease;
3019 struct mdt_body *repbody;
3020 struct md_layout_change layout = { .mlc_mirror_id = rr->rr_mirror_id };
3025 DEBUG_REQ(D_INODE, req, DFID", FLR file resync", PFID(rr->rr_fid1));
3027 if (info->mti_dlm_req)
3028 ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP);
3030 mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1);
3032 GOTO(out, rc = PTR_ERR(mo));
3034 if (!mdt_object_exists(mo))
3035 GOTO(out_obj, rc = -ENOENT);
3037 if (!S_ISREG(lu_object_attr(&mo->mot_obj)))
3038 GOTO(out_obj, rc = -EINVAL);
3040 if (mdt_object_remote(mo))
3041 GOTO(out_obj, rc = -EREMOTE);
3043 lease = ldlm_handle2lock(rr->rr_lease_handle);
3045 GOTO(out_obj, rc = -ESTALE);
3047 /* It's really necessary to grab open_sem and check if the lease lock
3048 * has been lost. There would exist a concurrent writer coming in and
3049 * generating some dirty data in memory cache, the writeback would fail
3050 * after the layout version is increased by MDS_REINT_RESYNC RPC.
3052 if (!down_write_trylock(&mo->mot_open_sem))
3053 GOTO(out_put_lease, rc = -EBUSY);
3055 lock_res_and_lock(lease);
3056 lease_broken = ldlm_is_cancel(lease);
3057 unlock_res_and_lock(lease);
3059 GOTO(out_unlock, rc = -EBUSY);
3061 /* the file has yet opened by anyone else after we took the lease. */
3062 layout.mlc_opc = MD_LAYOUT_RESYNC;
3063 lhc = &info->mti_lh[MDT_LH_LOCAL];
3064 rc = mdt_layout_change(info, mo, lhc, &layout);
3066 GOTO(out_unlock, rc);
3068 mdt_object_unlock(info, mo, lhc, 0);
3070 ma->ma_need = MA_INODE;
3072 rc = mdt_attr_get_complex(info, mo, ma);
3074 GOTO(out_unlock, rc);
3076 repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY);
3077 mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo));
3081 up_write(&mo->mot_open_sem);
3083 LDLM_LOCK_PUT(lease);
3085 mdt_object_put(info->mti_env, mo);
3087 mdt_client_compatibility(info);
3088 rc2 = mdt_fix_reply(info);
3094 struct mdt_reinter {
3095 int (*mr_handler)(struct mdt_thread_info *, struct mdt_lock_handle *);
3096 enum lprocfs_extra_opc mr_extra_opc;
3099 static const struct mdt_reinter mdt_reinters[] = {
3101 .mr_handler = &mdt_reint_setattr,
3102 .mr_extra_opc = MDS_REINT_SETATTR,
3105 .mr_handler = &mdt_reint_create,
3106 .mr_extra_opc = MDS_REINT_CREATE,
3109 .mr_handler = &mdt_reint_link,
3110 .mr_extra_opc = MDS_REINT_LINK,
3113 .mr_handler = &mdt_reint_unlink,
3114 .mr_extra_opc = MDS_REINT_UNLINK,
3117 .mr_handler = &mdt_reint_rename,
3118 .mr_extra_opc = MDS_REINT_RENAME,
3121 .mr_handler = &mdt_reint_open,
3122 .mr_extra_opc = MDS_REINT_OPEN,
3124 [REINT_SETXATTR] = {
3125 .mr_handler = &mdt_reint_setxattr,
3126 .mr_extra_opc = MDS_REINT_SETXATTR,
3129 .mr_handler = &mdt_reint_unlink,
3130 .mr_extra_opc = MDS_REINT_UNLINK,
3133 .mr_handler = &mdt_reint_migrate,
3134 .mr_extra_opc = MDS_REINT_RENAME,
3137 .mr_handler = &mdt_reint_resync,
3138 .mr_extra_opc = MDS_REINT_RESYNC,
3142 int mdt_reint_rec(struct mdt_thread_info *info,
3143 struct mdt_lock_handle *lhc)
3145 const struct mdt_reinter *mr;
3149 if (!(info->mti_rr.rr_opcode < ARRAY_SIZE(mdt_reinters)))
3152 mr = &mdt_reinters[info->mti_rr.rr_opcode];
3153 if (mr->mr_handler == NULL)
3156 rc = (*mr->mr_handler)(info, lhc);
3158 lprocfs_counter_incr(ptlrpc_req2svc(mdt_info_req(info))->srv_stats,
3159 PTLRPC_LAST_CNTR + mr->mr_extra_opc);