X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdt%2Fmdt_reint.c;h=c7436c8e5da04196155d2bb5f9d26970a5531466;hp=8e362f74d2ca318d01b4daebc6df65e40efd12a7;hb=78e16f885cd76bfe8e2ed3d2a15995959eee7a6f;hpb=39a9eeb26f20bb42e9d39e361f5d0a724c199137 diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index 8e362f7..c7436c8 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -27,7 +27,7 @@ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Intel Corporation. + * Copyright (c) 2011, 2013, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -66,7 +66,7 @@ static int mdt_create_pack_capa(struct mdt_thread_info *info, int rc, if (repbody->valid & OBD_MD_FLMDSCAPA) RETURN(rc); - if (rc == 0 && info->mti_mdt->mdt_opts.mo_mds_capa && + if (rc == 0 && info->mti_mdt->mdt_lut.lut_mds_capa && exp_connect_flags(info->mti_exp) & OBD_CONNECT_MDS_CAPA) { struct lustre_capa *capa; @@ -91,7 +91,8 @@ static void mdt_obj_version_get(struct mdt_thread_info *info, struct mdt_object *o, __u64 *version) { LASSERT(o); - if (mdt_object_exists(o) > 0 && !mdt_object_obf(o)) + if (mdt_object_exists(o) && !mdt_object_remote(o) && + !fid_is_obf(mdt_object_fid(o))) *version = dt_version_get(info->mti_env, mdt_obj2dt(o)); else *version = ENOENT_VERSION; @@ -218,8 +219,8 @@ int mdt_version_get_check_save(struct mdt_thread_info *info, * FID, therefore we need to get object by name and check its version. */ int mdt_lookup_version_check(struct mdt_thread_info *info, - struct mdt_object *p, struct lu_name *lname, - struct lu_fid *fid, int idx) + struct mdt_object *p, const struct lu_name *lname, + struct lu_fid *fid, int idx) { int rc, vbrc; @@ -260,26 +261,26 @@ static int mdt_md_create(struct mdt_thread_info *info) struct mdt_body *repbody; struct md_attr *ma = &info->mti_attr; struct mdt_reint_record *rr = &info->mti_rr; - struct lu_name *lname; int rc; ENTRY; - DEBUG_REQ(D_INODE, mdt_info_req(info), "Create (%s->"DFID") in "DFID, - rr->rr_name, PFID(rr->rr_fid2), PFID(rr->rr_fid1)); + DEBUG_REQ(D_INODE, mdt_info_req(info), "Create ("DNAME"->"DFID") " + "in "DFID, + PNAME(&rr->rr_name), PFID(rr->rr_fid2), PFID(rr->rr_fid1)); - repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); + if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1)) + RETURN(-EPERM); - lh = &info->mti_lh[MDT_LH_PARENT]; - mdt_lock_pdo_init(lh, LCK_PW, rr->rr_name, rr->rr_namelen); + repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); + + lh = &info->mti_lh[MDT_LH_PARENT]; + mdt_lock_pdo_init(lh, LCK_PW, &rr->rr_name); parent = mdt_object_find_lock(info, rr->rr_fid1, lh, MDS_INODELOCK_UPDATE); if (IS_ERR(parent)) RETURN(PTR_ERR(parent)); - if (mdt_object_obf(parent)) - GOTO(out_put_parent, rc = -EPERM); - rc = mdt_version_get_check_save(info, parent, 0); if (rc) GOTO(out_put_parent, rc); @@ -288,9 +289,8 @@ static int mdt_md_create(struct mdt_thread_info *info) * Check child name version during replay. * During create replay a file may exist with same name. */ - lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen); - rc = mdt_lookup_version_check(info, parent, lname, - &info->mti_tmp_fid1, 1); + rc = mdt_lookup_version_check(info, parent, &rr->rr_name, + &info->mti_tmp_fid1, 1); if (rc == 0) GOTO(out_put_parent, rc = -EEXIST); @@ -305,15 +305,21 @@ static int mdt_md_create(struct mdt_thread_info *info) if (likely(!IS_ERR(child))) { struct md_object *next = mdt_object_child(parent); - if (mdt_object_exists(child) < 0) { + if (mdt_object_remote(child)) { struct seq_server_site *ss; struct lu_ucred *uc = mdt_ucred(info); if (!md_capable(uc, CFS_CAP_SYS_ADMIN)) { - CERROR("%s: Creating remote dir is only " - "permitted for administrator: rc = %d\n", - mdt2obd_dev(mdt)->obd_name, -EPERM); - GOTO(out_put_child, rc = -EPERM); + if (uc->uc_gid != + mdt->mdt_enable_remote_dir_gid && + mdt->mdt_enable_remote_dir_gid != -1) { + CERROR("%s: Creating remote dir is only" + " permitted for administrator or" + " set mdt_enable_remote_dir_gid:" + " rc = %d\n", + mdt_obd_name(mdt), -EPERM); + GOTO(out_put_child, rc = -EPERM); + } } ss = mdt_seq_site(mdt); @@ -322,9 +328,14 @@ static int mdt_md_create(struct mdt_thread_info *info) CERROR("%s: remote dir is only permitted on" " MDT0 or set_param" " mdt.*.enable_remote_dir=1\n", - mdt2obd_dev(mdt)->obd_name); + mdt_obd_name(mdt)); GOTO(out_put_child, rc = -EPERM); } + if (!mdt_is_dne_client(mdt_info_req(info)->rq_export)) { + /* Return -EIO for old client */ + GOTO(out_put_child, rc = -EIO); + } + } ma->ma_need = MA_INODE; ma->ma_valid = 0; @@ -337,7 +348,7 @@ static int mdt_md_create(struct mdt_thread_info *info) OBD_FAIL_MDS_REINT_CREATE_WRITE); /* Version of child will be updated on disk. */ - info->mti_mos = child; + tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(child)); rc = mdt_version_get_check_save(info, child, 2); if (rc) GOTO(out_put_child, rc); @@ -353,9 +364,8 @@ static int mdt_md_create(struct mdt_thread_info *info) info->mti_spec.sp_cr_lookup = 0; info->mti_spec.sp_feat = &dt_directory_features; - rc = mdo_create(info->mti_env, next, lname, - mdt_object_child(child), - &info->mti_spec, ma); + rc = mdo_create(info->mti_env, next, &rr->rr_name, + mdt_object_child(child), &info->mti_spec, ma); if (rc == 0) rc = mdt_attr_get_complex(info, child, ma); @@ -385,14 +395,18 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, int rc; ENTRY; - /* attr shouldn't be set on remote object */ - LASSERT(mdt_object_exists(mo) >= 0); + /* attr shouldn't be set on remote object */ + LASSERT(!mdt_object_remote(mo)); lh = &info->mti_lh[MDT_LH_PARENT]; mdt_lock_reg_init(lh, LCK_PW); - if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID)) - lockpart |= MDS_INODELOCK_LOOKUP; + /* Even though the new MDT will grant PERM lock to the old + * client, but the old client will almost ignore that during + * So it needs to revoke both LOOKUP and PERM lock here, so + * both new and old client can cancel the dcache */ + if (ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID)) + lockpart |= MDS_INODELOCK_LOOKUP | MDS_INODELOCK_PERM; rc = mdt_object_lock(info, mo, lh, lockpart, MDT_LOCAL_LOCK); if (rc != 0) @@ -412,14 +426,22 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, /* VBR: update version if attr changed are important for recovery */ if (do_vbr) { /* update on-disk version of changed object */ - info->mti_mos = mo; + tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mo)); rc = mdt_version_get_check_save(info, mo, 0); if (rc) GOTO(out_unlock, rc); } + /* Ensure constant striping during chown(). See LU-2789. */ + if (ma->ma_attr.la_valid & (LA_UID|LA_GID)) + mutex_lock(&mo->mot_lov_mutex); + /* all attrs are packed into mti_attr in unpack_setattr */ rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma); + + if (ma->ma_attr.la_valid & (LA_UID|LA_GID)) + mutex_unlock(&mo->mot_lov_mutex); + if (rc != 0) GOTO(out_unlock, rc); @@ -446,21 +468,28 @@ int mdt_add_dirty_flag(struct mdt_thread_info *info, struct mdt_object *mo, rc = mdt_attr_get_complex(info, mo, ma); if (rc) { CERROR("file attribute read error for "DFID": %d.\n", - PFID(lu_object_fid(&mo->mot_obj.mo_lu)), rc); + PFID(mdt_object_fid(mo)), rc); RETURN(rc); } /* If an up2date copy exists in the backend, add dirty flag */ if ((ma->ma_valid & MA_HSM) && (ma->ma_hsm.mh_flags & HS_EXISTS) && !(ma->ma_hsm.mh_flags & (HS_DIRTY|HS_RELEASED))) { + struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_CHILD]; ma->ma_hsm.mh_flags |= HS_DIRTY; + + mdt_lock_reg_init(lh, LCK_PW); + rc = mdt_object_lock(info, mo, lh, MDS_INODELOCK_XATTR, + MDT_LOCAL_LOCK); + if (rc != 0) + RETURN(rc); + rc = mdt_hsm_attr_set(info, mo, &ma->ma_hsm); - if (rc) { + if (rc) CERROR("file attribute change error for "DFID": %d\n", - PFID(lu_object_fid(&mo->mot_obj.mo_lu)), rc); - RETURN(rc); - } + PFID(mdt_object_fid(mo)), rc); + mdt_object_unlock(info, mo, lh, rc); } RETURN(rc); @@ -485,14 +514,11 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, if (info->mti_dlm_req) ldlm_request_cancel(req, info->mti_dlm_req, 0); - repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); + repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1); if (IS_ERR(mo)) GOTO(out, rc = PTR_ERR(mo)); - if (mdt_object_obf(mo)) - GOTO(out_put, rc = -EPERM); - /* start a log jounal handle if needed */ if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM)) { if ((ma->ma_attr.la_valid & LA_SIZE) || @@ -508,7 +534,7 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, if (rc) GOTO(out_put, rc); - mfd = mdt_mfd_new(); + mfd = mdt_mfd_new(med); if (mfd == NULL) { mdt_write_put(mo); GOTO(out_put, rc = -ENOMEM); @@ -536,7 +562,8 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, LASSERT(info->mti_ioepoch); spin_lock(&med->med_open_lock); - mfd = mdt_handle2mfd(info, &info->mti_ioepoch->handle); + mfd = mdt_handle2mfd(med, &info->mti_ioepoch->handle, + req_is_replay(req)); if (mfd == NULL) { spin_unlock(&med->med_open_lock); CDEBUG(D_INODE, "no handle for file close: " @@ -577,14 +604,14 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, ma->ma_need = MA_INODE; ma->ma_valid = 0; rc = mdt_attr_get_complex(info, mo, ma); - if (rc != 0) - GOTO(out_put, rc); + if (rc != 0) + GOTO(out_put, rc); - mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo)); + mdt_pack_attr2body(info, repbody, &ma->ma_attr, mdt_object_fid(mo)); - if (info->mti_mdt->mdt_opts.mo_oss_capa && + if (info->mti_mdt->mdt_lut.lut_oss_capa && exp_connect_flags(info->mti_exp) & OBD_CONNECT_OSS_CAPA && - S_ISREG(lu_object_attr(&mo->mot_obj.mo_lu)) && + S_ISREG(lu_object_attr(&mo->mot_obj)) && (ma->ma_attr.la_valid & LA_SIZE) && !som_au) { struct lustre_capa *capa; @@ -624,8 +651,10 @@ static int mdt_reint_create(struct mdt_thread_info *info, if (info->mti_dlm_req) ldlm_request_cancel(mdt_info_req(info), info->mti_dlm_req, 0); - LASSERT(info->mti_rr.rr_namelen > 0); - switch (info->mti_attr.ma_attr.la_mode & S_IFMT) { + if (!lu_name_is_valid(&info->mti_rr.rr_name)) + RETURN(-EPROTO); + + switch (info->mti_attr.ma_attr.la_mode & S_IFMT) { case S_IFDIR: mdt_counter_incr(req, LPROC_MDT_MKDIR); break; @@ -640,7 +669,7 @@ static int mdt_reint_create(struct mdt_thread_info *info, break; default: CERROR("%s: Unsupported mode %o\n", - mdt2obd_dev(info->mti_mdt)->obd_name, + mdt_obd_name(info->mti_mdt), info->mti_attr.ma_attr.la_mode); RETURN(err_serious(-EOPNOTSUPP)); } @@ -664,12 +693,12 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, struct mdt_object *mc; struct mdt_lock_handle *parent_lh; struct mdt_lock_handle *child_lh; - struct lu_name *lname; int rc; - ENTRY; + int no_name = 0; + ENTRY; - DEBUG_REQ(D_INODE, req, "unlink "DFID"/%s", PFID(rr->rr_fid1), - rr->rr_name); + DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1), + PNAME(&rr->rr_name)); if (info->mti_dlm_req) ldlm_request_cancel(req, info->mti_dlm_req, 0); @@ -677,6 +706,8 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK)) RETURN(err_serious(-ENOENT)); + if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1)) + RETURN(-EPERM); /* * step 1: Found the parent. */ @@ -686,12 +717,9 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, GOTO(out, rc); } - if (mdt_object_obf(mp)) - GOTO(put_parent, rc = -EPERM); - parent_lh = &info->mti_lh[MDT_LH_PARENT]; - lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen); - if (mdt_object_exists(mp) < 0) { + + if (mdt_object_remote(mp)) { mdt_lock_reg_init(parent_lh, LCK_EX); rc = mdt_remote_object_lock(info, mp, &parent_lh->mlh_rreg_lh, parent_lh->mlh_rreg_mode, @@ -700,8 +728,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, GOTO(put_parent, rc); } else { - mdt_lock_pdo_init(parent_lh, LCK_PW, rr->rr_name, - rr->rr_namelen); + mdt_lock_pdo_init(parent_lh, LCK_PW, &rr->rr_name); rc = mdt_object_lock(info, mp, parent_lh, MDS_INODELOCK_UPDATE, MDT_LOCAL_LOCK); if (rc) @@ -715,9 +742,34 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, /* step 2: find & lock the child */ /* lookup child object along with version checking */ fid_zero(child_fid); - rc = mdt_lookup_version_check(info, mp, lname, child_fid, 1); - if (rc != 0) - GOTO(unlock_parent, rc); + rc = mdt_lookup_version_check(info, mp, &rr->rr_name, child_fid, 1); + if (rc != 0) { + /* Name might not be able to find during resend of + * remote unlink, considering following case. + * dir_A is a remote directory, the name entry of + * dir_A is on MDT0, the directory is on MDT1, + * + * 1. client sends unlink req to MDT1. + * 2. MDT1 sends name delete update to MDT0. + * 3. name entry is being deleted in MDT0 synchronously. + * 4. MDT1 is restarted. + * 5. client resends unlink req to MDT1. So it can not + * find the name entry on MDT0 anymore. + * In this case, MDT1 only needs to destory the local + * directory. + * */ + if (mdt_object_remote(mp) && rc == -ENOENT && + !fid_is_zero(rr->rr_fid2) && + lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) { + no_name = 1; + *child_fid = *rr->rr_fid2; + } else { + GOTO(unlock_parent, rc); + } + } + + if (fid_is_obf(child_fid) || fid_is_dot_lustre(child_fid)) + GOTO(unlock_parent, rc = -EPERM); mdt_reint_init_ma(info, ma); @@ -728,18 +780,22 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, child_lh = &info->mti_lh[MDT_LH_CHILD]; mdt_lock_reg_init(child_lh, LCK_EX); - if (mdt_object_exists(mc) < 0) { + if (mdt_object_remote(mc)) { struct mdt_body *repbody; if (!fid_is_zero(rr->rr_fid2)) { - CDEBUG(D_INFO, "%s: name %s can not find "DFID"\n", - mdt2obd_dev(info->mti_mdt)->obd_name, - (char *)rr->rr_name, PFID(mdt_object_fid(mc))); - GOTO(unlock_parent, rc = -ENOENT); + CDEBUG(D_INFO, "%s: name "DNAME" cannot find "DFID"\n", + mdt_obd_name(info->mti_mdt), + PNAME(&rr->rr_name), PFID(mdt_object_fid(mc))); + GOTO(put_child, rc = -ENOENT); } - CDEBUG(D_INFO, "%s: name %s: "DFID" is another MDT\n", - mdt2obd_dev(info->mti_mdt)->obd_name, - (char *)rr->rr_name, PFID(mdt_object_fid(mc))); + CDEBUG(D_INFO, "%s: name "DNAME": "DFID" is on another MDT\n", + mdt_obd_name(info->mti_mdt), + PNAME(&rr->rr_name), PFID(mdt_object_fid(mc))); + + if (!mdt_is_dne_client(req->rq_export)) + /* Return -EIO for old client */ + GOTO(put_child, rc = -EIO); if (info->mti_spec.sp_rm_entry) { struct lu_ucred *uc = mdt_ucred(info); @@ -747,18 +803,17 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, if (!md_capable(uc, CFS_CAP_SYS_ADMIN)) { CERROR("%s: unlink remote entry is only " "permitted for administrator: rc = %d\n", - mdt2obd_dev(info->mti_mdt)->obd_name, + mdt_obd_name(info->mti_mdt), -EPERM); - GOTO(unlock_parent, rc = -EPERM); + GOTO(put_child, rc = -EPERM); } ma->ma_need = MA_INODE; ma->ma_valid = 0; mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA); rc = mdo_unlink(info->mti_env, mdt_object_child(mp), - NULL, lname, ma); - mdt_object_put(info->mti_env, mc); - GOTO(unlock_parent, rc); + NULL, &rr->rr_name, ma, no_name); + GOTO(put_child, rc); } /* Revoke the LOOKUP lock of the remote object granted by * this MDT. Since the unlink will happen on another MDT, @@ -771,40 +826,47 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, LASSERT(repbody != NULL); repbody->fid1 = *mdt_object_fid(mc); repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS); - mdt_object_unlock_put(info, mc, child_lh, rc); - GOTO(unlock_parent, rc = -EREMOTE); + GOTO(unlock_child, rc = -EREMOTE); } else if (info->mti_spec.sp_rm_entry) { - CERROR("%s: lfs rmdir should not be used on local dir %s\n", - mdt2obd_dev(info->mti_mdt)->obd_name, - (char *)rr->rr_name); - mdt_object_put(info->mti_env, mc); - GOTO(unlock_parent, rc = -EPERM); + rc = -EPERM; + CDEBUG(D_INFO, "%s: no rm_entry on local dir '"DNAME"': " + "rc = %d\n", + mdt_obd_name(info->mti_mdt), PNAME(&rr->rr_name), rc); + GOTO(put_child, rc); } - rc = mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_FULL, - MDT_CROSS_LOCK); - if (rc != 0) { - mdt_object_put(info->mti_env, mc); - GOTO(unlock_parent, rc); - } + /* We used to acquire MDS_INODELOCK_FULL here but we can't do + * this now because a running HSM restore on the child (unlink + * victim) will hold the layout lock. See LU-4002. */ + rc = mdt_object_lock(info, mc, child_lh, + MDS_INODELOCK_LOOKUP | MDS_INODELOCK_UPDATE, + MDT_CROSS_LOCK); + if (rc != 0) + GOTO(put_child, rc); mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom, OBD_FAIL_MDS_REINT_UNLINK_WRITE); /* save version when object is locked */ mdt_version_get_save(info, mc, 1); - /* - * Now we can only make sure we need MA_INODE, in mdd layer, will check - * whether need MA_LOV and MA_COOKIE. - */ - ma->ma_need = MA_INODE; - ma->ma_valid = 0; - mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA); - rc = mdo_unlink(info->mti_env, mdt_object_child(mp), - mdt_object_child(mc), lname, ma); + /* + * Now we can only make sure we need MA_INODE, in mdd layer, will check + * whether need MA_LOV and MA_COOKIE. + */ + ma->ma_need = MA_INODE; + ma->ma_valid = 0; + mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA); + + mutex_lock(&mc->mot_lov_mutex); + + rc = mdo_unlink(info->mti_env, mdt_object_child(mp), + mdt_object_child(mc), &rr->rr_name, ma, no_name); + + mutex_unlock(&mc->mot_lov_mutex); + if (rc == 0 && !lu_object_is_dying(&mc->mot_header)) rc = mdt_attr_get_complex(info, mc, ma); - if (rc == 0) - mdt_handle_last_unlink(info, mc, ma); + if (rc == 0) + mdt_handle_last_unlink(info, mc, ma); if (ma->ma_valid & MA_INODE) { switch (ma->ma_attr.la_mode & S_IFMT) { @@ -826,8 +888,10 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, } EXIT; - - mdt_object_unlock_put(info, mc, child_lh, rc); +unlock_child: + mdt_object_unlock(info, mc, child_lh, rc); +put_child: + mdt_object_put(info->mti_env, mc); unlock_parent: mdt_object_unlock(info, mp, parent_lh, rc); put_parent: @@ -850,12 +914,11 @@ static int mdt_reint_link(struct mdt_thread_info *info, struct mdt_object *mp; struct mdt_lock_handle *lhs; struct mdt_lock_handle *lhp; - struct lu_name *lname; int rc; ENTRY; - DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/%s", - PFID(rr->rr_fid1), PFID(rr->rr_fid2), rr->rr_name); + DEBUG_REQ(D_INODE, req, "link "DFID" to "DFID"/"DNAME, + PFID(rr->rr_fid1), PFID(rr->rr_fid2), PNAME(&rr->rr_name)); if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK)) RETURN(err_serious(-ENOENT)); @@ -868,18 +931,18 @@ static int mdt_reint_link(struct mdt_thread_info *info, if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) RETURN(-EPERM); + if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1) || + fid_is_obf(rr->rr_fid2) || fid_is_dot_lustre(rr->rr_fid2)) + RETURN(-EPERM); + /* step 1: find & lock the target parent dir */ lhp = &info->mti_lh[MDT_LH_PARENT]; - mdt_lock_pdo_init(lhp, LCK_PW, rr->rr_name, - rr->rr_namelen); + mdt_lock_pdo_init(lhp, LCK_PW, &rr->rr_name); mp = mdt_object_find_lock(info, rr->rr_fid2, lhp, MDS_INODELOCK_UPDATE); if (IS_ERR(mp)) RETURN(PTR_ERR(mp)); - if (mdt_object_obf(mp)) - GOTO(out_unlock_parent, rc = -EPERM); - rc = mdt_version_get_check_save(info, mp, 0); if (rc) GOTO(out_unlock_parent, rc); @@ -892,15 +955,23 @@ static int mdt_reint_link(struct mdt_thread_info *info, if (IS_ERR(ms)) GOTO(out_unlock_parent, rc = PTR_ERR(ms)); - if (mdt_object_exists(ms) < 0) { + if (!mdt_object_exists(ms)) { mdt_object_put(info->mti_env, ms); - CERROR("Target directory "DFID" is on another MDT\n", - PFID(rr->rr_fid1)); + CDEBUG(D_INFO, "%s: "DFID" does not exist.\n", + mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1)); + GOTO(out_unlock_parent, rc = -ENOENT); + } + + if (mdt_object_remote(ms)) { + mdt_object_put(info->mti_env, ms); + CERROR("%s: source inode "DFID" on remote MDT from "DFID"\n", + mdt_obd_name(info->mti_mdt), PFID(rr->rr_fid1), + PFID(rr->rr_fid2)); GOTO(out_unlock_parent, rc = -EXDEV); } - rc = mdt_object_lock(info, ms, lhs, MDS_INODELOCK_UPDATE, - MDT_CROSS_LOCK); + rc = mdt_object_lock(info, ms, lhs, MDS_INODELOCK_UPDATE | + MDS_INODELOCK_XATTR, MDT_CROSS_LOCK); if (rc != 0) { mdt_object_put(info->mti_env, ms); GOTO(out_unlock_parent, rc); @@ -910,24 +981,29 @@ static int mdt_reint_link(struct mdt_thread_info *info, mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom, OBD_FAIL_MDS_REINT_LINK_WRITE); - info->mti_mos = ms; + tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(ms)); rc = mdt_version_get_check_save(info, ms, 1); if (rc) GOTO(out_unlock_child, rc); - lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen); /** check target version by name during replay */ - rc = mdt_lookup_version_check(info, mp, lname, &info->mti_tmp_fid1, 2); + rc = mdt_lookup_version_check(info, mp, &rr->rr_name, + &info->mti_tmp_fid1, 2); if (rc != 0 && rc != -ENOENT) GOTO(out_unlock_child, rc); /* save version of file name for replay, it must be ENOENT here */ if (!req_is_replay(mdt_info_req(info))) { + if (rc != -ENOENT) { + CDEBUG(D_INFO, "link target "DNAME" existed!\n", + PNAME(&rr->rr_name)); + GOTO(out_unlock_child, rc = -EEXIST); + } info->mti_ver[2] = ENOENT_VERSION; mdt_version_save(mdt_info_req(info), info->mti_ver[2], 2); } - rc = mdo_link(info->mti_env, mdt_object_child(mp), - mdt_object_child(ms), lname, ma); + rc = mdo_link(info->mti_env, mdt_object_child(mp), + mdt_object_child(ms), &rr->rr_name, ma); if (rc == 0) mdt_counter_incr(req, LPROC_MDT_LINK); @@ -944,31 +1020,31 @@ out_unlock_parent: * (lh->mlh_pdo_hash) in parallel directory lock. */ static int mdt_pdir_hash_lock(struct mdt_thread_info *info, - struct mdt_lock_handle *lh, - struct mdt_object *obj, __u64 ibits) + struct mdt_lock_handle *lh, + struct mdt_object *obj, __u64 ibits) { - struct ldlm_res_id *res_id = &info->mti_res_id; - struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace; - ldlm_policy_data_t *policy = &info->mti_policy; - int rc; + struct ldlm_res_id *res = &info->mti_res_id; + struct ldlm_namespace *ns = info->mti_mdt->mdt_namespace; + ldlm_policy_data_t *policy = &info->mti_policy; + int rc; - /* - * Finish res_id initializing by name hash marking part of - * directory which is taking modification. - */ - LASSERT(lh->mlh_pdo_hash != 0); - fid_build_pdo_res_name(mdt_object_fid(obj), lh->mlh_pdo_hash, res_id); - memset(policy, 0, sizeof(*policy)); - policy->l_inodebits.bits = ibits; - /* - * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is - * going to be sent to client. If it is - mdt_intent_policy() path will - * fix it up and turn FL_LOCAL flag off. - */ - rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy, - res_id, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB, - &info->mti_exp->exp_handle.h_cookie); - return rc; + /* + * Finish res_id initializing by name hash marking part of + * directory which is taking modification. + */ + LASSERT(lh->mlh_pdo_hash != 0); + fid_build_pdo_res_name(mdt_object_fid(obj), lh->mlh_pdo_hash, res); + memset(policy, 0, sizeof(*policy)); + policy->l_inodebits.bits = ibits; + /* + * Use LDLM_FL_LOCAL_ONLY for this lock. We do not know yet if it is + * going to be sent to client. If it is - mdt_intent_policy() path will + * fix it up and turn FL_LOCAL flag off. + */ + rc = mdt_fid_lock(ns, &lh->mlh_reg_lh, lh->mlh_reg_mode, policy, + res, LDLM_FL_LOCAL_ONLY | LDLM_FL_ATOMIC_CB, + &info->mti_exp->exp_handle.h_cookie); + return rc; } static int mdt_rename_lock(struct mdt_thread_info *info, @@ -985,7 +1061,7 @@ static int mdt_rename_lock(struct mdt_thread_info *info, memset(policy, 0, sizeof *policy); policy->l_inodebits.bits = MDS_INODELOCK_UPDATE; -#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 4, 53, 0) +#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(2, 5, 53, 0) /* In phase I, we will not do cross-rename, so local BFL lock would * be enough */ @@ -1055,6 +1131,17 @@ static int mdt_rename_sanity(struct mdt_thread_info *info, struct lu_fid *fid) * 2 - src child; 3 - tgt child. * Update on disk version of src child. */ +/** + * For DNE phase I, only these renames are allowed + * mv src_p/src_c tgt_p/tgt_c + * 1. src_p/src_c/tgt_p/tgt_c are in the same MDT. + * 2. src_p and tgt_p are same directory, and tgt_c does not + * exists. In this case, all of modification will happen + * in the MDT where ithesource parent is, only one remote + * update is needed, i.e. set c_time/m_time on the child. + * And tgt_c will be still in the same MDT as the original + * src_c. + */ static int mdt_reint_rename(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) { @@ -1072,17 +1159,19 @@ static int mdt_reint_rename(struct mdt_thread_info *info, struct lu_fid *old_fid = &info->mti_tmp_fid1; struct lu_fid *new_fid = &info->mti_tmp_fid2; struct lustre_handle rename_lh = { 0 }; - struct lu_name slname = { 0 }; - struct lu_name *lname; int rc; ENTRY; if (info->mti_dlm_req) ldlm_request_cancel(req, info->mti_dlm_req, 0); - DEBUG_REQ(D_INODE, req, "rename "DFID"/%s to "DFID"/%s", - PFID(rr->rr_fid1), rr->rr_name, - PFID(rr->rr_fid2), rr->rr_tgt); + DEBUG_REQ(D_INODE, req, "rename "DFID"/"DNAME" to "DFID"/"DNAME, + PFID(rr->rr_fid1), PNAME(&rr->rr_name), + PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name)); + + if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1) || + fid_is_obf(rr->rr_fid2) || fid_is_dot_lustre(rr->rr_fid2)) + RETURN(-EPERM); rc = mdt_rename_lock(info, &rename_lh); if (rc) { @@ -1094,24 +1183,19 @@ static int mdt_reint_rename(struct mdt_thread_info *info, /* step 1: lock the source dir. */ lh_srcdirp = &info->mti_lh[MDT_LH_PARENT]; - mdt_lock_pdo_init(lh_srcdirp, LCK_PW, rr->rr_name, - rr->rr_namelen); + mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name); msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp, MDS_INODELOCK_UPDATE); if (IS_ERR(msrcdir)) GOTO(out_rename_lock, rc = PTR_ERR(msrcdir)); - if (mdt_object_obf(msrcdir)) - GOTO(out_unlock_source, rc = -EPERM); - rc = mdt_version_get_check_save(info, msrcdir, 0); if (rc) GOTO(out_unlock_source, rc); /* step 2: find & lock the target dir. */ lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD]; - mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, rr->rr_tgt, - rr->rr_tgtlen); + mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name); if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) { mdt_object_get(info->mti_env, msrcdir); mtgtdir = msrcdir; @@ -1128,79 +1212,68 @@ static int mdt_reint_rename(struct mdt_thread_info *info, if (IS_ERR(mtgtdir)) GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir)); - if (mdt_object_obf(mtgtdir)) - GOTO(out_put_target, rc = -EPERM); - /* check early, the real version will be saved after locking */ rc = mdt_version_get_check(info, mtgtdir, 1); if (rc) GOTO(out_put_target, rc); - rc = mdt_object_exists(mtgtdir); - if (rc == 0) { - GOTO(out_put_target, rc = -ESTALE); - } else if (rc > 0) { - /* we lock the target dir if it is local */ - rc = mdt_object_lock(info, mtgtdir, lh_tgtdirp, - MDS_INODELOCK_UPDATE, - MDT_LOCAL_LOCK); - if (rc != 0) - GOTO(out_put_target, rc); - /* get and save correct version after locking */ - mdt_version_get_save(info, mtgtdir, 1); - } else if (rc < 0) { - CERROR("Source dir "DFID" target dir "DFID + if (unlikely(mdt_object_remote(mtgtdir))) { + CDEBUG(D_INFO, "Source dir "DFID" target dir "DFID "on different MDTs\n", PFID(rr->rr_fid1), PFID(rr->rr_fid2)); GOTO(out_put_target, rc = -EXDEV); + } else { + if (likely(mdt_object_exists(mtgtdir))) { + /* we lock the target dir if it is local */ + rc = mdt_object_lock(info, mtgtdir, lh_tgtdirp, + MDS_INODELOCK_UPDATE, + MDT_LOCAL_LOCK); + if (rc != 0) + GOTO(out_put_target, rc); + /* get and save correct version after locking */ + mdt_version_get_save(info, mtgtdir, 1); + } else { + GOTO(out_put_target, rc = -ESTALE); + } } - } + } - /* step 3: find & lock the old object. */ - lname = mdt_name(info->mti_env, (char *)rr->rr_name, rr->rr_namelen); - mdt_name_copy(&slname, lname); - fid_zero(old_fid); - rc = mdt_lookup_version_check(info, msrcdir, &slname, old_fid, 2); - if (rc != 0) - GOTO(out_unlock_target, rc); + /* step 3: find & lock the old object. */ + fid_zero(old_fid); + rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2); + if (rc != 0) + GOTO(out_unlock_target, rc); + + if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2)) + GOTO(out_unlock_target, rc = -EINVAL); - if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2)) - GOTO(out_unlock_target, rc = -EINVAL); + if (fid_is_obf(old_fid) || fid_is_dot_lustre(old_fid)) + GOTO(out_unlock_target, rc = -EPERM); mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid); if (IS_ERR(mold)) GOTO(out_unlock_target, rc = PTR_ERR(mold)); - if (mdt_object_exists(mold) < 0) { - mdt_object_put(info->mti_env, mold); - CERROR("Source child "DFID" is on another MDT\n", PFID(old_fid)); - GOTO(out_unlock_target, rc = -EXDEV); - } - - if (mdt_object_obf(mold)) { - mdt_object_put(info->mti_env, mold); - GOTO(out_unlock_target, rc = -EPERM); - } lh_oldp = &info->mti_lh[MDT_LH_OLD]; mdt_lock_reg_init(lh_oldp, LCK_EX); - rc = mdt_object_lock(info, mold, lh_oldp, MDS_INODELOCK_LOOKUP, - MDT_CROSS_LOCK); + rc = mdt_object_lock(info, mold, lh_oldp, MDS_INODELOCK_LOOKUP | + MDS_INODELOCK_XATTR, MDT_CROSS_LOCK); if (rc != 0) { mdt_object_put(info->mti_env, mold); GOTO(out_unlock_target, rc); } - info->mti_mos = mold; + tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold)); /* save version after locking */ mdt_version_get_save(info, mold, 2); mdt_set_capainfo(info, 2, old_fid, BYPASS_CAPA); /* step 4: find & lock the new object. */ /* new target object may not exist now */ - lname = mdt_name(info->mti_env, (char *)rr->rr_tgt, rr->rr_tgtlen); /* lookup with version checking */ fid_zero(new_fid); - rc = mdt_lookup_version_check(info, mtgtdir, lname, new_fid, 3); + rc = mdt_lookup_version_check(info, mtgtdir, &rr->rr_tgt_name, new_fid, + 3); if (rc == 0) { /* the new_fid should have been filled at this moment */ if (lu_fid_eq(old_fid, new_fid)) @@ -1210,25 +1283,35 @@ static int mdt_reint_rename(struct mdt_thread_info *info, lu_fid_eq(new_fid, rr->rr_fid2)) GOTO(out_unlock_old, rc = -EINVAL); + if (fid_is_obf(new_fid) || fid_is_dot_lustre(new_fid)) + GOTO(out_unlock_old, rc = -EPERM); + + if (mdt_object_remote(mold)) { + CDEBUG(D_INFO, "Src child "DFID" is on another MDT\n", + PFID(old_fid)); + GOTO(out_unlock_old, rc = -EXDEV); + } + mdt_lock_reg_init(lh_newp, LCK_EX); mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid); if (IS_ERR(mnew)) GOTO(out_unlock_old, rc = PTR_ERR(mnew)); - if (mdt_object_obf(mnew)) { - mdt_object_put(info->mti_env, mnew); - GOTO(out_unlock_old, rc = -EPERM); - } - - if (mdt_object_exists(mnew) < 0) { + if (mdt_object_remote(mnew)) { mdt_object_put(info->mti_env, mnew); - CERROR("Source child "DFID" is on another MDT\n", + CDEBUG(D_INFO, "src child "DFID" is on another MDT\n", PFID(new_fid)); GOTO(out_unlock_old, rc = -EXDEV); } - rc = mdt_object_lock(info, mnew, lh_newp, - MDS_INODELOCK_FULL, MDT_CROSS_LOCK); + /* We used to acquire MDS_INODELOCK_FULL here but we + * can't do this now because a running HSM restore on + * the rename onto victim will hold the layout + * lock. See LU-4002. */ + rc = mdt_object_lock(info, mnew, lh_newp, + MDS_INODELOCK_LOOKUP | + MDS_INODELOCK_UPDATE, + MDT_CROSS_LOCK); if (rc != 0) { mdt_object_put(info->mti_env, mnew); GOTO(out_unlock_old, rc); @@ -1239,7 +1322,14 @@ static int mdt_reint_rename(struct mdt_thread_info *info, } else if (rc != -EREMOTE && rc != -ENOENT) { GOTO(out_unlock_old, rc); } else { - mdt_enoent_version_save(info, 3); + /* If mnew does not exist and mold are remote directory, + * it only allows rename if they are under same directory */ + if (mtgtdir != msrcdir && mdt_object_remote(mold)) { + CDEBUG(D_INFO, "Src child "DFID" is on another MDT\n", + PFID(old_fid)); + GOTO(out_unlock_old, rc = -EXDEV); + } + mdt_enoent_version_save(info, 3); } /* step 5: rename it */ @@ -1248,22 +1338,27 @@ static int mdt_reint_rename(struct mdt_thread_info *info, mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom, OBD_FAIL_MDS_REINT_RENAME_WRITE); + /* Check if @dst is subdir of @src. */ + rc = mdt_rename_sanity(info, old_fid); + if (rc) + GOTO(out_unlock_new, rc); - /* Check if @dst is subdir of @src. */ - rc = mdt_rename_sanity(info, old_fid); - if (rc) - GOTO(out_unlock_new, rc); + if (mnew != NULL) + mutex_lock(&mnew->mot_lov_mutex); - rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir), - mdt_object_child(mtgtdir), old_fid, &slname, - (mnew ? mdt_object_child(mnew) : NULL), - lname, ma); + rc = mdo_rename(info->mti_env, mdt_object_child(msrcdir), + mdt_object_child(mtgtdir), old_fid, &rr->rr_name, + mnew != NULL ? mdt_object_child(mnew) : NULL, + &rr->rr_tgt_name, ma); - /* handle last link of tgt object */ - if (rc == 0) { + if (mnew != NULL) + mutex_unlock(&mnew->mot_lov_mutex); + + /* handle last link of tgt object */ + if (rc == 0) { mdt_counter_incr(req, LPROC_MDT_RENAME); - if (mnew) - mdt_handle_last_unlink(info, mnew, ma); + if (mnew) + mdt_handle_last_unlink(info, mnew, ma); mdt_rename_counter_tally(info, info->mti_mdt, req, msrcdir, mtgtdir);