X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fmdt%2Fmdt_reint.c;h=62538bcce68dbe841b1d3dcb6ed8f2a7f27a7668;hp=6c657dac4bcf98883c5d7f253980955465b27482;hb=ad1810a2dbea1eed5e8b5feb55bdf915a545feb3;hpb=742d4ee68aaf3407ac4b8f86c9bddbece4beaa6e diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index 6c657da..62538bc 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -27,7 +27,7 @@ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2013, Intel Corporation. + * Copyright (c) 2011, 2014, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -46,6 +46,7 @@ #define DEBUG_SUBSYSTEM S_MDS +#include #include "mdt_internal.h" #include @@ -64,7 +65,7 @@ static int mdt_create_pack_capa(struct mdt_thread_info *info, int rc, /* for cross-ref mkdir, mds capa has been fetched from remote obj, then * we won't go to below*/ - if (repbody->valid & OBD_MD_FLMDSCAPA) + if (repbody->mbo_valid & OBD_MD_FLMDSCAPA) RETURN(rc); if (rc == 0 && info->mti_mdt->mdt_lut.lut_mds_capa && @@ -77,7 +78,7 @@ static int mdt_create_pack_capa(struct mdt_thread_info *info, int rc, rc = mo_capa_get(info->mti_env, mdt_object_child(object), capa, 0); if (rc == 0) - repbody->valid |= OBD_MD_FLMDSCAPA; + repbody->mbo_valid |= OBD_MD_FLMDSCAPA; } RETURN(rc); @@ -219,9 +220,10 @@ int mdt_version_get_check_save(struct mdt_thread_info *info, * This checks version of 'name'. Many reint functions uses 'name' for child not * FID, therefore we need to get object by name and check its version. */ -int mdt_lookup_version_check(struct mdt_thread_info *info, - struct mdt_object *p, const struct lu_name *lname, - struct lu_fid *fid, int idx) +static int mdt_lookup_version_check(struct mdt_thread_info *info, + struct mdt_object *p, + const struct lu_name *lname, + struct lu_fid *fid, int idx) { int rc, vbrc; @@ -245,6 +247,84 @@ int mdt_lookup_version_check(struct mdt_thread_info *info, } +/** + * mdt_remote_permission: Check whether the remote operation is permitted, + * + * Before we implement async cross-MDT updates (DNE phase 2). There are a few + * limitations here: + * + * 1.Only sysadmin can create remote directory and striped directory and + * migrate directory now, unless + * lctl set_param mdt.*.enable_remote_dir_gid=allow_gid. + * 2.Remote directory can only be created on MDT0, unless + * lctl set_param mdt.*.enable_remote_dir = 1 + * 3.Only new clients can access remote dir( >= 2.4) and striped dir(>= 2.6), + * old client will return -ENOTSUPP. + * + * XXX these check are only needed for remote synchronization, once async + * update is supported, these check will be removed. + * + * param[in]info: execution environment. + * param[in]parent: the directory of this operation. + * param[in]child: the child of this operation. + * + * retval = 0 remote operation is allowed. + * < 0 remote operation is denied. + */ +static int mdt_remote_permission(struct mdt_thread_info *info, + struct mdt_object *parent, + struct mdt_object *child) +{ + struct mdt_device *mdt = info->mti_mdt; + struct lu_ucred *uc = mdt_ucred(info); + struct md_op_spec *spec = &info->mti_spec; + struct lu_attr *attr = &info->mti_attr.ma_attr; + struct obd_export *exp = mdt_info_req(info)->rq_export; + + /* Only check create remote directory, striped directory and + * migration */ + if (mdt_object_remote(parent) == 0 && mdt_object_remote(child) == 0 && + !(S_ISDIR(attr->la_mode) && spec->u.sp_ea.eadata != NULL && + spec->u.sp_ea.eadatalen != 0) && + info->mti_rr.rr_opcode != REINT_MIGRATE) + return 0; + + if (!md_capable(uc, CFS_CAP_SYS_ADMIN)) { + if (uc->uc_gid != mdt->mdt_enable_remote_dir_gid && + mdt->mdt_enable_remote_dir_gid != -1) + return -EPERM; + } + + if (mdt->mdt_enable_remote_dir == 0) { + struct seq_server_site *ss = mdt_seq_site(mdt); + struct lu_seq_range range = { 0 }; + int rc; + + fld_range_set_type(&range, LU_SEQ_RANGE_MDT); + rc = fld_server_lookup(info->mti_env, ss->ss_server_fld, + fid_seq(mdt_object_fid(parent)), &range); + if (rc != 0) + return rc; + + if (range.lsr_index != 0) + return -EPERM; + } + + if (!mdt_is_dne_client(exp)) + return -ENOTSUPP; + + if (S_ISDIR(attr->la_mode) && spec->u.sp_ea.eadata != NULL && + spec->u.sp_ea.eadatalen != 0) { + const struct lmv_user_md *lum = spec->u.sp_ea.eadata; + + if (le32_to_cpu(lum->lum_stripe_count) > 1 && + !mdt_is_striped_client(exp)) + return -ENOTSUPP; + } + + return 0; +} + /* * VBR: we save three versions in reply: * 0 - parent. Check that parent version is the same during replay. @@ -314,39 +394,11 @@ static int mdt_md_create(struct mdt_thread_info *info) if (likely(!IS_ERR(child))) { struct md_object *next = mdt_object_child(parent); - if (mdt_object_remote(child)) { - struct seq_server_site *ss; - struct lu_ucred *uc = mdt_ucred(info); - - if (!md_capable(uc, CFS_CAP_SYS_ADMIN)) { - if (uc->uc_gid != - mdt->mdt_enable_remote_dir_gid && - mdt->mdt_enable_remote_dir_gid != -1) { - CERROR("%s: Creating remote dir is only" - " permitted for administrator or" - " set mdt_enable_remote_dir_gid:" - " rc = %d\n", - mdt_obd_name(mdt), -EPERM); - GOTO(out_put_child, rc = -EPERM); - } - } - - ss = mdt_seq_site(mdt); - if (ss->ss_node_id != 0 && - mdt->mdt_enable_remote_dir == 0) { - CERROR("%s: remote dir is only permitted on" - " MDT0 or set_param" - " mdt.*.enable_remote_dir=1\n", - mdt_obd_name(mdt)); - GOTO(out_put_child, rc = -EPERM); - } - if (!mdt_is_dne_client(mdt_info_req(info)->rq_export)) { - /* Return -EIO for old client */ - GOTO(out_put_child, rc = -EIO); - } + rc = mdt_remote_permission(info, parent, child); + if (rc != 0) + GOTO(out_put_child, rc); - } - ma->ma_need = MA_INODE; + ma->ma_need = MA_INODE; ma->ma_valid = 0; /* capa for cross-ref will be stored here */ ma->ma_capa = req_capsule_server_get(info->mti_pill, @@ -378,18 +430,19 @@ static int mdt_md_create(struct mdt_thread_info *info) if (rc == 0) rc = mdt_attr_get_complex(info, child, ma); - if (rc == 0) { - /* Return fid & attr to client. */ - if (ma->ma_valid & MA_INODE) - mdt_pack_attr2body(info, repbody, &ma->ma_attr, - mdt_object_fid(child)); - } + if (rc == 0) { + /* Return fid & attr to client. */ + if (ma->ma_valid & MA_INODE) + mdt_pack_attr2body(info, repbody, &ma->ma_attr, + mdt_object_fid(child)); + } out_put_child: - mdt_object_put(info->mti_env, child); - } else { - rc = PTR_ERR(child); - } - mdt_create_pack_capa(info, rc, child, repbody); + mdt_create_pack_capa(info, rc, child, repbody); + mdt_object_put(info->mti_env, child); + } else { + rc = PTR_ERR(child); + mdt_create_pack_capa(info, rc, NULL, repbody); + } unlock_parent: mdt_object_unlock(info, parent, lh, rc); put_parent: @@ -399,6 +452,8 @@ put_parent: static int mdt_unlock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj, __u64 ibits, + struct mdt_lock_handle *s0_lh, + struct mdt_object *s0_obj, struct ldlm_enqueue_info *einfo) { ldlm_policy_data_t *policy = &mti->mti_policy; @@ -408,6 +463,12 @@ static int mdt_unlock_slaves(struct mdt_thread_info *mti, if (!S_ISDIR(obj->mot_header.loh_attr)) RETURN(0); + /* Unlock stripe 0 */ + if (s0_lh != NULL && lustre_handle_is_used(&s0_lh->mlh_reg_lh)) { + LASSERT(s0_obj != NULL); + mdt_object_unlock_put(mti, s0_obj, s0_lh, 1); + } + memset(policy, 0, sizeof(*policy)); policy->l_inodebits.bits = ibits; @@ -422,15 +483,51 @@ static int mdt_unlock_slaves(struct mdt_thread_info *mti, **/ static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj, ldlm_mode_t mode, __u64 ibits, + struct mdt_lock_handle *s0_lh, + struct mdt_object **s0_objp, struct ldlm_enqueue_info *einfo) { ldlm_policy_data_t *policy = &mti->mti_policy; - int rc; + struct lu_buf *buf = &mti->mti_buf; + struct lmv_mds_md_v1 *lmv; + struct lu_fid *fid = &mti->mti_tmp_fid1; + int rc; ENTRY; if (!S_ISDIR(obj->mot_header.loh_attr)) RETURN(0); + buf->lb_buf = mti->mti_xattr_buf; + buf->lb_len = sizeof(mti->mti_xattr_buf); + rc = mo_xattr_get(mti->mti_env, mdt_object_child(obj), buf, + XATTR_NAME_LMV); + if (rc == -ERANGE) { + rc = mdt_big_xattr_get(mti, obj, XATTR_NAME_LMV); + if (rc > 0) { + buf->lb_buf = mti->mti_big_lmm; + buf->lb_len = mti->mti_big_lmmsize; + } + } + + if (rc == -ENODATA || rc == -ENOENT) + RETURN(0); + + if (rc <= 0) + RETURN(rc); + + lmv = buf->lb_buf; + if (le32_to_cpu(lmv->lmv_magic) != LMV_MAGIC_V1) + RETURN(-EINVAL); + + /* Sigh, 0_stripe and master object are different + * object, though they are in the same MDT, to avoid + * adding osd_object_lock here, so we will enqueue the + * stripe0 lock in MDT0 for now */ + fid_le_to_cpu(fid, &lmv->lmv_stripe_fids[0]); + *s0_objp = mdt_object_find_lock(mti, fid, s0_lh, ibits); + if (IS_ERR(*s0_objp)) + RETURN(PTR_ERR(*s0_objp)); + memset(einfo, 0, sizeof(*einfo)); einfo->ei_type = LDLM_IBITS; einfo->ei_mode = mode; @@ -445,18 +542,17 @@ static int mdt_lock_slaves(struct mdt_thread_info *mti, struct mdt_object *obj, RETURN(rc); } -int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, - struct md_attr *ma, int flags) +static int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, + struct md_attr *ma) { - struct mdt_lock_handle *lh; - int do_vbr = ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID|LA_FLAGS); - __u64 lockpart = MDS_INODELOCK_UPDATE; + struct mdt_lock_handle *lh; + int do_vbr = ma->ma_attr.la_valid & (LA_MODE|LA_UID|LA_GID|LA_FLAGS); + __u64 lockpart = MDS_INODELOCK_UPDATE; struct ldlm_enqueue_info *einfo = &info->mti_einfo; - int rc; - ENTRY; - - /* attr shouldn't be set on remote object */ - LASSERT(!mdt_object_remote(mo)); + struct mdt_lock_handle *s0_lh; + struct mdt_object *s0_obj = NULL; + int rc; + ENTRY; lh = &info->mti_lh[MDT_LH_PARENT]; mdt_lock_reg_init(lh, LCK_PW); @@ -472,13 +568,12 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, if (rc != 0) RETURN(rc); - rc = mdt_lock_slaves(info, mo, LCK_EX, lockpart, einfo); + s0_lh = &info->mti_lh[MDT_LH_LOCAL]; + mdt_lock_reg_init(s0_lh, LCK_PW); + rc = mdt_lock_slaves(info, mo, LCK_PW, lockpart, s0_lh, &s0_obj, einfo); if (rc != 0) GOTO(out_unlock, rc); - if (mdt_object_exists(mo) == 0) - GOTO(out_unlock, rc = -ENOENT); - /* all attrs are packed into mti_attr in unpack_setattr */ mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom, OBD_FAIL_MDS_REINT_SETATTR_WRITE); @@ -511,7 +606,7 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, EXIT; out_unlock: - mdt_unlock_slaves(info, mo, lockpart, einfo); + mdt_unlock_slaves(info, mo, lockpart, s0_lh, s0_obj, einfo); mdt_object_unlock(info, mo, lh, rc); return rc; } @@ -576,14 +671,20 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, DEBUG_REQ(D_INODE, req, "setattr "DFID" %x", PFID(rr->rr_fid1), (unsigned int)ma->ma_attr.la_valid); - if (info->mti_dlm_req) - ldlm_request_cancel(req, info->mti_dlm_req, 0); + if (info->mti_dlm_req) + ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP); repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); mo = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1); if (IS_ERR(mo)) GOTO(out, rc = PTR_ERR(mo)); + if (!mdt_object_exists(mo)) + GOTO(out_put, rc = -ENOENT); + + if (mdt_object_remote(mo)) + GOTO(out_put, rc = -EREMOTE); + /* start a log jounal handle if needed */ if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM)) { if ((ma->ma_attr.la_valid & LA_SIZE) || @@ -606,7 +707,7 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, } mdt_ioepoch_open(info, mo, 0); - repbody->ioepoch = mo->mot_ioepoch; + repbody->mbo_ioepoch = mo->mot_ioepoch; mdt_object_get(info->mti_env, mo); mdt_mfd_set_mode(mfd, MDS_FMODE_TRUNC); @@ -614,17 +715,17 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, mfd->mfd_xid = req->rq_xid; spin_lock(&med->med_open_lock); - cfs_list_add(&mfd->mfd_list, &med->med_open_head); + list_add(&mfd->mfd_list, &med->med_open_head); spin_unlock(&med->med_open_lock); - repbody->handle.cookie = mfd->mfd_handle.h_cookie; + repbody->mbo_handle.cookie = mfd->mfd_handle.h_cookie; } som_au = info->mti_ioepoch && info->mti_ioepoch->flags & MF_SOM_CHANGE; if (som_au) { /* SOM Attribute update case. Find the proper mfd and update * SOM attributes on the proper object. */ - LASSERT(mdt_conn_flags(info) & OBD_CONNECT_SOM); - LASSERT(info->mti_ioepoch); + if (!(mdt_conn_flags(info) & OBD_CONNECT_SOM)) + GOTO(out_put, rc = -EPROTO); spin_lock(&med->med_open_lock); mfd = mdt_handle2mfd(med, &info->mti_ioepoch->handle, @@ -637,22 +738,29 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, info->mti_ioepoch->handle.cookie); GOTO(out_put, rc = -ESTALE); } - LASSERT(mfd->mfd_mode == MDS_FMODE_SOM); - LASSERT(!(info->mti_ioepoch->flags & MF_EPOCH_CLOSE)); - class_handle_unhash(&mfd->mfd_handle); - cfs_list_del_init(&mfd->mfd_list); + if (mfd->mfd_mode != MDS_FMODE_SOM || + (info->mti_ioepoch->flags & MF_EPOCH_CLOSE)) + GOTO(out_put, rc = -EPROTO); + + class_handle_unhash(&mfd->mfd_handle); + list_del_init(&mfd->mfd_list); spin_unlock(&med->med_open_lock); mdt_mfd_close(info, mfd); } else if ((ma->ma_valid & MA_INODE) && ma->ma_attr.la_valid) { - LASSERT((ma->ma_valid & MA_LOV) == 0); - rc = mdt_attr_set(info, mo, ma, rr->rr_flags); + if (ma->ma_valid & MA_LOV) + GOTO(out_put, rc = -EPROTO); + + rc = mdt_attr_set(info, mo, ma); if (rc) GOTO(out_put, rc); } else if ((ma->ma_valid & MA_LOV) && (ma->ma_valid & MA_INODE)) { struct lu_buf *buf = &info->mti_buf; - LASSERT(ma->ma_attr.la_valid == 0); + + if (ma->ma_attr.la_valid != 0) + GOTO(out_put, rc = -EPROTO); + buf->lb_buf = ma->ma_lmm; buf->lb_len = ma->ma_lmm_size; rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), @@ -662,15 +770,18 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, } else if ((ma->ma_valid & MA_LMV) && (ma->ma_valid & MA_INODE)) { struct lu_buf *buf = &info->mti_buf; - LASSERT(ma->ma_attr.la_valid == 0); + if (ma->ma_attr.la_valid != 0) + GOTO(out_put, rc = -EPROTO); + buf->lb_buf = ma->ma_lmv; buf->lb_len = ma->ma_lmv_size; rc = mo_xattr_set(info->mti_env, mdt_object_child(mo), buf, XATTR_NAME_DEFAULT_LMV, 0); if (rc) GOTO(out_put, rc); - } else - LBUG(); + } else { + GOTO(out_put, rc = -EPROTO); + } /* If file data is modified, add the dirty flag */ if (ma->ma_attr_flags & MDS_DATA_MODIFIED) @@ -696,7 +807,7 @@ static int mdt_reint_setattr(struct mdt_thread_info *info, rc = mo_capa_get(info->mti_env, mdt_object_child(mo), capa, 0); if (rc) GOTO(out_put, rc); - repbody->valid |= OBD_MD_FLOSSCAPA; + repbody->mbo_valid |= OBD_MD_FLOSSCAPA; } EXIT; @@ -723,8 +834,9 @@ static int mdt_reint_create(struct mdt_thread_info *info, if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_CREATE)) RETURN(err_serious(-ESTALE)); - if (info->mti_dlm_req) - ldlm_request_cancel(mdt_info_req(info), info->mti_dlm_req, 0); + if (info->mti_dlm_req) + ldlm_request_cancel(mdt_info_req(info), + info->mti_dlm_req, 0, LATF_SKIP); if (!lu_name_is_valid(&info->mti_rr.rr_name)) RETURN(-EPROTO); @@ -769,6 +881,8 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, struct mdt_lock_handle *parent_lh; struct mdt_lock_handle *child_lh; struct ldlm_enqueue_info *einfo = &info->mti_einfo; + struct mdt_lock_handle *s0_lh = NULL; + struct mdt_object *s0_obj = NULL; int rc; int no_name = 0; ENTRY; @@ -776,8 +890,8 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, DEBUG_REQ(D_INODE, req, "unlink "DFID"/"DNAME"", PFID(rr->rr_fid1), PNAME(&rr->rr_name)); - if (info->mti_dlm_req) - ldlm_request_cancel(req, info->mti_dlm_req, 0); + if (info->mti_dlm_req) + ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP); if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_UNLINK)) RETURN(err_serious(-ENOENT)); @@ -860,8 +974,8 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, PNAME(&rr->rr_name), PFID(mdt_object_fid(mc))); if (!mdt_is_dne_client(req->rq_export)) - /* Return -EIO for old client */ - GOTO(put_child, rc = -EIO); + /* Return -ENOTSUPP for old client */ + GOTO(put_child, rc = -ENOTSUPP); if (info->mti_spec.sp_rm_entry) { struct lu_ucred *uc = mdt_ucred(info); @@ -890,8 +1004,8 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, MDT_CROSS_LOCK); repbody = req_capsule_server_get(info->mti_pill, &RMF_MDT_BODY); LASSERT(repbody != NULL); - repbody->fid1 = *mdt_object_fid(mc); - repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS); + repbody->mbo_fid1 = *mdt_object_fid(mc); + repbody->mbo_valid |= (OBD_MD_FLID | OBD_MD_MDS); GOTO(unlock_child, rc = -EREMOTE); } else if (info->mti_spec.sp_rm_entry) { rc = -EPERM; @@ -916,7 +1030,10 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, ma->ma_need = MA_INODE; ma->ma_valid = 0; - rc = mdt_lock_slaves(info, mc, LCK_EX, MDS_INODELOCK_UPDATE, einfo); + s0_lh = &info->mti_lh[MDT_LH_LOCAL]; + mdt_lock_reg_init(s0_lh, LCK_EX); + rc = mdt_lock_slaves(info, mc, LCK_EX, MDS_INODELOCK_UPDATE, s0_lh, + &s0_obj, einfo); if (rc != 0) GOTO(unlock_child, rc); @@ -961,13 +1078,8 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, EXIT; unlock_child: - mdt_unlock_slaves(info, mc, MDS_INODELOCK_UPDATE, einfo); + mdt_unlock_slaves(info, mc, MDS_INODELOCK_UPDATE, s0_lh, s0_obj, einfo); mdt_object_unlock(info, mc, child_lh, rc); - - /* Since we do not need reply md striped dir info to client, so - * reset mti_big_lmm_used to avoid confusing mdt_fix_reply */ - if (info->mti_big_lmm_used) - info->mti_big_lmm_used = 0; put_child: mdt_object_put(info->mti_env, mc); unlock_parent: @@ -1001,8 +1113,8 @@ static int mdt_reint_link(struct mdt_thread_info *info, if (OBD_FAIL_CHECK(OBD_FAIL_MDS_REINT_LINK)) RETURN(err_serious(-ENOENT)); - if (info->mti_dlm_req) - ldlm_request_cancel(req, info->mti_dlm_req, 0); + if (info->mti_dlm_req) + ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP); /* Invalid case so return error immediately instead of * processing it */ @@ -1025,6 +1137,8 @@ static int mdt_reint_link(struct mdt_thread_info *info, if (rc) GOTO(out_unlock_parent, rc); + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5); + /* step 2: find & lock the source */ lhs = &info->mti_lh[MDT_LH_CHILD]; mdt_lock_reg_init(lhs, LCK_EX); @@ -1195,38 +1309,36 @@ static void mdt_rename_unlock(struct lustre_handle *lh) * target. Source should not be ancestor of target dir. May be other rename * checks can be moved here later. */ -static int mdt_rename_sanity(struct mdt_thread_info *info, struct lu_fid *fid) +static int mdt_is_subdir(struct mdt_thread_info *info, + struct mdt_object *dir, + const struct lu_fid *fid) { - struct mdt_reint_record *rr = &info->mti_rr; - struct lu_fid dst_fid = *rr->rr_fid2; - struct mdt_object *dst; + struct lu_fid dir_fid = dir->mot_header.loh_fid; int rc = 0; ENTRY; /* If the source and target are in the same directory, they can not * be parent/child relationship, so subdir check is not needed */ - if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) + if (lu_fid_eq(&dir_fid, fid)) return 0; - do { - LASSERT(fid_is_sane(&dst_fid)); - dst = mdt_object_find(info->mti_env, info->mti_mdt, &dst_fid); - if (!IS_ERR(dst)) { - rc = mdo_is_subdir(info->mti_env, - mdt_object_child(dst), fid, - &dst_fid); - mdt_object_put(info->mti_env, dst); - if (rc != -EREMOTE && rc < 0) { - CERROR("Failed mdo_is_subdir(), rc %d\n", rc); - } else { - /* check the found fid */ - if (lu_fid_eq(&dst_fid, fid)) - rc = -EINVAL; - } - } else { - rc = PTR_ERR(dst); - } - } while (rc == -EREMOTE); + if (!mdt_object_exists(dir)) + RETURN(-ENOENT); + + rc = mdo_is_subdir(info->mti_env, mdt_object_child(dir), + fid, &dir_fid); + if (rc < 0) { + CERROR("%s: failed subdir check in "DFID" for "DFID + ": rc = %d\n", mdt_obd_name(info->mti_mdt), + PFID(&dir_fid), PFID(fid), rc); + /* Return EINVAL only if a parent is the @fid */ + if (rc == -EINVAL) + rc = -EIO; + } else { + /* check the found fid */ + if (lu_fid_eq(&dir_fid, fid)) + rc = -EINVAL; + } RETURN(rc); } @@ -1329,7 +1441,7 @@ static int mdt_lock_objects_in_linkea(struct mdt_thread_info *info, GOTO(out, rc); } - CFS_INIT_LIST_HEAD(&mll->mll_list); + INIT_LIST_HEAD(&mll->mll_list); mll->mll_obj = mdt_pobj; list_add_tail(&mll->mll_list, lock_list); } @@ -1358,6 +1470,7 @@ static int mdt_reint_migrate_internal(struct mdt_thread_info *info, CDEBUG(D_INODE, "migrate "DFID"/"DNAME" to "DFID"\n", PFID(rr->rr_fid1), PNAME(&rr->rr_name), PFID(rr->rr_fid2)); + /* 1: lock the source dir. */ msrcdir = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1); if (IS_ERR(msrcdir)) { @@ -1412,8 +1525,12 @@ static int mdt_reint_migrate_internal(struct mdt_thread_info *info, GOTO(out_put_child, rc = -EPERM); } + rc = mdt_remote_permission(info, msrcdir, mold); + if (rc != 0) + GOTO(out_put_child, rc); + /* 3: iterate the linkea of the object and lock all of the objects */ - CFS_INIT_LIST_HEAD(&lock_list); + INIT_LIST_HEAD(&lock_list); rc = mdt_lock_objects_in_linkea(info, mold, msrcdir, &lock_list); if (rc != 0) GOTO(out_put_child, rc); @@ -1440,7 +1557,7 @@ static int mdt_reint_migrate_internal(struct mdt_thread_info *info, lmv_le_to_cpu(ma->ma_lmv, ma->ma_lmv); lmm1 = &ma->ma_lmv->lmv_md_v1; - if (lmm1->lmv_magic != LMV_MAGIC_MIGRATE) { + if (!(lmm1->lmv_hash_type & LMV_HASH_FLAG_MIGRATION)) { CERROR("%s: can not migrate striped dir "DFID ": rc = %d\n", mdt_obd_name(info->mti_mdt), PFID(mdt_object_fid(mold)), -EPERM); @@ -1517,6 +1634,141 @@ out_put_parent: RETURN(rc); } +static struct mdt_object *mdt_object_find_check(struct mdt_thread_info *info, + const struct lu_fid *fid, + int idx) +{ + struct mdt_object *dir; + int rc; + ENTRY; + + dir = mdt_object_find(info->mti_env, info->mti_mdt, fid); + if (IS_ERR(dir)) + RETURN(dir); + + /* check early, the real version will be saved after locking */ + rc = mdt_version_get_check(info, dir, idx); + if (rc) + GOTO(out_put, rc); + + RETURN(dir); +out_put: + mdt_object_put(info->mti_env, dir); + return ERR_PTR(rc); +} + +static int mdt_object_lock_save(struct mdt_thread_info *info, + struct mdt_object *dir, + struct mdt_lock_handle *lh, + int idx) +{ + int rc; + + /* we lock the target dir if it is local */ + rc = mdt_object_lock(info, dir, lh, MDS_INODELOCK_UPDATE, + MDT_LOCAL_LOCK); + if (rc != 0) + return rc; + + /* get and save correct version after locking */ + mdt_version_get_save(info, dir, idx); + return 0; +} + + +static int mdt_rename_parents_lock(struct mdt_thread_info *info, + struct mdt_object **srcp, + struct mdt_object **tgtp) +{ + struct mdt_reint_record *rr = &info->mti_rr; + const struct lu_fid *fid_src = rr->rr_fid1; + const struct lu_fid *fid_tgt = rr->rr_fid2; + struct mdt_lock_handle *lh_src = &info->mti_lh[MDT_LH_PARENT]; + struct mdt_lock_handle *lh_tgt = &info->mti_lh[MDT_LH_CHILD]; + struct mdt_object *src; + struct mdt_object *tgt; + int reverse = 0; + int rc; + ENTRY; + + /* find both parents. */ + src = mdt_object_find_check(info, fid_src, 0); + if (IS_ERR(src)) + RETURN(PTR_ERR(src)); + + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME3, 5); + + if (lu_fid_eq(fid_src, fid_tgt)) { + tgt = src; + mdt_object_get(info->mti_env, tgt); + } else { + /* Check if the @src is not a child of the @tgt, otherwise a + * reverse locking must take place. */ + rc = mdt_is_subdir(info, src, fid_tgt); + if (rc == -EINVAL) + reverse = 1; + else if (rc) + GOTO(err_src_put, rc); + + tgt = mdt_object_find_check(info, fid_tgt, 1); + if (IS_ERR(tgt)) + GOTO(err_src_put, rc = PTR_ERR(tgt)); + + if (unlikely(mdt_object_remote(tgt))) { + CDEBUG(D_INFO, "Source dir "DFID" target dir "DFID + "on different MDTs\n", PFID(fid_src), + PFID(fid_tgt)); + GOTO(err_tgt_put, rc = -EXDEV); + } + } + + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5); + + /* lock parents in the proper order. */ + if (reverse) { + rc = mdt_object_lock_save(info, tgt, lh_tgt, 1); + if (rc) + GOTO(err_tgt_put, rc); + + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5); + + rc = mdt_object_lock_save(info, src, lh_src, 0); + } else { + rc = mdt_object_lock_save(info, src, lh_src, 0); + if (rc) + GOTO(err_tgt_put, rc); + + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME, 5); + + if (tgt != src) + rc = mdt_object_lock_save(info, tgt, lh_tgt, 1); + else if (lh_src->mlh_pdo_hash != lh_tgt->mlh_pdo_hash) { + rc = mdt_pdir_hash_lock(info, lh_tgt, tgt, + MDS_INODELOCK_UPDATE); + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10); + } + } + if (rc) + GOTO(err_unlock, rc); + + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME4, 5); + + *srcp = src; + *tgtp = tgt; + RETURN(0); + +err_unlock: + /* The order does not matter as the handle is checked inside, + * as well as not used handle. */ + mdt_object_unlock(info, src, lh_src, rc); + mdt_object_unlock(info, tgt, lh_tgt, rc); +err_tgt_put: + mdt_object_put(info->mti_env, tgt); +err_src_put: + mdt_object_put(info->mti_env, src); + RETURN(rc); +} + /* * VBR: rename versions in reply: 0 - src parent; 1 - tgt parent; * 2 - src child; 3 - tgt child. @@ -1539,8 +1791,8 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info, struct mdt_reint_record *rr = &info->mti_rr; struct md_attr *ma = &info->mti_attr; struct ptlrpc_request *req = mdt_info_req(info); - struct mdt_object *msrcdir; - struct mdt_object *mtgtdir; + struct mdt_object *msrcdir = NULL; + struct mdt_object *mtgtdir = NULL; struct mdt_object *mold; struct mdt_object *mnew = NULL; struct mdt_lock_handle *lh_srcdirp; @@ -1556,78 +1808,39 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info, PFID(rr->rr_fid1), PNAME(&rr->rr_name), PFID(rr->rr_fid2), PNAME(&rr->rr_tgt_name)); - /* step 1: lock the source dir. */ lh_srcdirp = &info->mti_lh[MDT_LH_PARENT]; mdt_lock_pdo_init(lh_srcdirp, LCK_PW, &rr->rr_name); - msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp, - MDS_INODELOCK_UPDATE); - if (IS_ERR(msrcdir)) - RETURN(PTR_ERR(msrcdir)); - - rc = mdt_version_get_check_save(info, msrcdir, 0); - if (rc) - GOTO(out_unlock_source, rc); - - /* step 2: find & lock the target dir. */ lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD]; mdt_lock_pdo_init(lh_tgtdirp, LCK_PW, &rr->rr_tgt_name); - if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) { - mdt_object_get(info->mti_env, msrcdir); - mtgtdir = msrcdir; - if (lh_tgtdirp->mlh_pdo_hash != lh_srcdirp->mlh_pdo_hash) { - rc = mdt_pdir_hash_lock(info, lh_tgtdirp, mtgtdir, - MDS_INODELOCK_UPDATE); - if (rc != 0) - GOTO(out_unlock_source, rc); - OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_PDO_LOCK2, 10); - } - } else { - mtgtdir = mdt_object_find(info->mti_env, info->mti_mdt, - rr->rr_fid2); - if (IS_ERR(mtgtdir)) - GOTO(out_unlock_source, rc = PTR_ERR(mtgtdir)); - /* check early, the real version will be saved after locking */ - rc = mdt_version_get_check(info, mtgtdir, 1); - if (rc) - GOTO(out_put_target, rc); + /* step 1&2: lock the source and target dirs. */ + rc = mdt_rename_parents_lock(info, &msrcdir, &mtgtdir); + if (rc) + RETURN(rc); - if (unlikely(mdt_object_remote(mtgtdir))) { - CDEBUG(D_INFO, "Source dir "DFID" target dir "DFID - "on different MDTs\n", PFID(rr->rr_fid1), - PFID(rr->rr_fid2)); - GOTO(out_put_target, rc = -EXDEV); - } else { - if (likely(mdt_object_exists(mtgtdir))) { - /* we lock the target dir if it is local */ - rc = mdt_object_lock(info, mtgtdir, lh_tgtdirp, - MDS_INODELOCK_UPDATE, - MDT_LOCAL_LOCK); - if (rc != 0) - GOTO(out_put_target, rc); - /* get and save correct version after locking */ - mdt_version_get_save(info, mtgtdir, 1); - } else { - GOTO(out_put_target, rc = -ESTALE); - } - } - } + OBD_FAIL_TIMEOUT(OBD_FAIL_MDS_RENAME2, 5); /* step 3: find & lock the old object. */ fid_zero(old_fid); rc = mdt_lookup_version_check(info, msrcdir, &rr->rr_name, old_fid, 2); if (rc != 0) - GOTO(out_unlock_target, rc); + GOTO(out_unlock_parents, rc); if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2)) - GOTO(out_unlock_target, rc = -EINVAL); + GOTO(out_unlock_parents, rc = -EINVAL); if (!fid_is_md_operative(old_fid)) - GOTO(out_unlock_target, rc = -EPERM); + GOTO(out_unlock_parents, rc = -EPERM); mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid); if (IS_ERR(mold)) - GOTO(out_unlock_target, rc = PTR_ERR(mold)); + GOTO(out_unlock_parents, rc = PTR_ERR(mold)); + + /* Check if @mtgtdir is subdir of @mold, before locking child + * to avoid reverse locking. */ + rc = mdt_is_subdir(info, mtgtdir, old_fid); + if (rc) + GOTO(out_put_old, rc); tgt_vbr_obj_set(info->mti_env, mdt_obj2dt(mold)); /* save version after locking */ @@ -1668,6 +1881,14 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info, GOTO(out_put_new, rc = -EXDEV); } + /* Before locking the target dir, check we do not replace + * a dir with a non-dir, otherwise it may deadlock with + * link op which tries to create a link in this dir + * back to this non-dir. */ + if (S_ISDIR(lu_object_attr(&mnew->mot_obj)) && + !S_ISDIR(lu_object_attr(&mold->mot_obj))) + GOTO(out_put_new, rc = -EISDIR); + lh_oldp = &info->mti_lh[MDT_LH_OLD]; mdt_lock_reg_init(lh_oldp, LCK_EX); rc = mdt_object_lock(info, mold, lh_oldp, MDS_INODELOCK_LOOKUP | @@ -1675,6 +1896,12 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info, if (rc != 0) GOTO(out_put_new, rc); + /* Check if @msrcdir is subdir of @mnew, before locking child + * to avoid reverse locking. */ + rc = mdt_is_subdir(info, msrcdir, new_fid); + if (rc) + GOTO(out_unlock_old, rc); + /* We used to acquire MDS_INODELOCK_FULL here but we * can't do this now because a running HSM restore on * the rename onto victim will hold the layout @@ -1719,11 +1946,6 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info, mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom, OBD_FAIL_MDS_REINT_RENAME_WRITE); - /* Check if @dst is subdir of @src. */ - rc = mdt_rename_sanity(info, old_fid); - if (rc) - GOTO(out_unlock_new, rc); - if (mnew != NULL) mutex_lock(&mnew->mot_lov_mutex); @@ -1746,7 +1968,6 @@ static int mdt_reint_rename_internal(struct mdt_thread_info *info, } EXIT; -out_unlock_new: if (mnew != NULL) mdt_object_unlock(info, mnew, lh_newp, rc); out_unlock_old: @@ -1756,11 +1977,8 @@ out_put_new: mdt_object_put(info->mti_env, mnew); out_put_old: mdt_object_put(info->mti_env, mold); -out_unlock_target: - mdt_object_unlock(info, mtgtdir, lh_tgtdirp, rc); -out_put_target: - mdt_object_put(info->mti_env, mtgtdir); -out_unlock_source: +out_unlock_parents: + mdt_object_unlock_put(info, mtgtdir, lh_tgtdirp, rc); mdt_object_unlock_put(info, msrcdir, lh_srcdirp, rc); return rc; } @@ -1776,10 +1994,10 @@ static int mdt_reint_rename_or_migrate(struct mdt_thread_info *info, ENTRY; if (info->mti_dlm_req) - ldlm_request_cancel(req, info->mti_dlm_req, 0); + ldlm_request_cancel(req, info->mti_dlm_req, 0, LATF_SKIP); - if (fid_is_obf(rr->rr_fid1) || fid_is_dot_lustre(rr->rr_fid1) || - fid_is_obf(rr->rr_fid2) || fid_is_dot_lustre(rr->rr_fid2)) + if (!fid_is_md_operative(rr->rr_fid1) || + !fid_is_md_operative(rr->rr_fid2)) RETURN(-EPERM); rc = mdt_rename_lock(info, &rename_lh, rename_lock); @@ -1812,28 +2030,68 @@ static int mdt_reint_migrate(struct mdt_thread_info *info, return mdt_reint_rename_or_migrate(info, lhc, MRL_MIGRATE); } -typedef int (*mdt_reinter)(struct mdt_thread_info *info, - struct mdt_lock_handle *lhc); - -static mdt_reinter reinters[REINT_MAX] = { - [REINT_SETATTR] = mdt_reint_setattr, - [REINT_CREATE] = mdt_reint_create, - [REINT_LINK] = mdt_reint_link, - [REINT_UNLINK] = mdt_reint_unlink, - [REINT_RENAME] = mdt_reint_rename, - [REINT_OPEN] = mdt_reint_open, - [REINT_SETXATTR] = mdt_reint_setxattr, - [REINT_RMENTRY] = mdt_reint_unlink, - [REINT_MIGRATE] = mdt_reint_migrate, +struct mdt_reinter { + int (*mr_handler)(struct mdt_thread_info *, struct mdt_lock_handle *); + enum lprocfs_extra_opc mr_extra_opc; +}; + +static const struct mdt_reinter mdt_reinters[] = { + [REINT_SETATTR] = { + .mr_handler = &mdt_reint_setattr, + .mr_extra_opc = MDS_REINT_SETATTR, + }, + [REINT_CREATE] = { + .mr_handler = &mdt_reint_create, + .mr_extra_opc = MDS_REINT_CREATE, + }, + [REINT_LINK] = { + .mr_handler = &mdt_reint_link, + .mr_extra_opc = MDS_REINT_LINK, + }, + [REINT_UNLINK] = { + .mr_handler = &mdt_reint_unlink, + .mr_extra_opc = MDS_REINT_UNLINK, + }, + [REINT_RENAME] = { + .mr_handler = &mdt_reint_rename, + .mr_extra_opc = MDS_REINT_RENAME, + }, + [REINT_OPEN] = { + .mr_handler = &mdt_reint_open, + .mr_extra_opc = MDS_REINT_OPEN, + }, + [REINT_SETXATTR] = { + .mr_handler = &mdt_reint_setxattr, + .mr_extra_opc = MDS_REINT_SETXATTR, + }, + [REINT_RMENTRY] = { + .mr_handler = &mdt_reint_unlink, + .mr_extra_opc = MDS_REINT_UNLINK, + }, + [REINT_MIGRATE] = { + .mr_handler = &mdt_reint_migrate, + .mr_extra_opc = MDS_REINT_RENAME, + }, }; int mdt_reint_rec(struct mdt_thread_info *info, - struct mdt_lock_handle *lhc) + struct mdt_lock_handle *lhc) { - int rc; - ENTRY; + const struct mdt_reinter *mr; + int rc; + ENTRY; - rc = reinters[info->mti_rr.rr_opcode](info, lhc); + if (!(info->mti_rr.rr_opcode < ARRAY_SIZE(mdt_reinters))) + RETURN(-EPROTO); - RETURN(rc); + mr = &mdt_reinters[info->mti_rr.rr_opcode]; + if (mr->mr_handler == NULL) + RETURN(-EPROTO); + + rc = (*mr->mr_handler)(info, lhc); + + lprocfs_counter_incr(ptlrpc_req2svc(mdt_info_req(info))->srv_stats, + PTLRPC_LAST_CNTR + mr->mr_extra_opc); + + RETURN(rc); }