From b7a30a6a46143b5bdfb81f929fda5465019e66a0 Mon Sep 17 00:00:00 2001 From: yury Date: Wed, 1 Nov 2006 19:44:13 +0000 Subject: [PATCH] - fixes in split about using correct byte order; - in cmm_split_try() init la_size used in split info messages; - cleanups and debug in llite close thread related stuff; - fixes in mdd about link() operation. Fixed deadlock for case when tgt and src is same object. Fixed lost error code after insert name which is possibly existing. Cleanups; - in mdt_reint_open() use MDT_CROSS_LOCK for opened file in case it is located on remote MDT (by huanghua); - in mdt_attr_set() fixed wrong goto to label out; - cleanups in mdt_reint_link(), mdt_reint_unlink(), mdt_reint_rename() (by huanghua); - fixed lock mode in md_reint_link(). It should LCK_EX instead of LCK_PW; - fixed deadlock in mdt_reint_link() in case src and dst are same object; - use MDT_CROSS_LOCK for src in mdt_reint_link() as it maybe located on remote MDT (by huanghua); - used MDT_CROSS_LOCK for in mdt_reint_rename() as name may be located on remote MDT (by huanghua); - fixed -ENODATA case in mdt_getxattr_pack_reply(). It should cause err_serious() with errors in console. Cleanups; --- lustre/cmm/cmm_internal.h | 1 + lustre/cmm/cmm_split.c | 58 +++++++++++++++++------------- lustre/include/lustre/lustre_idl.h | 2 +- lustre/llite/llite_close.c | 16 +++++---- lustre/mdd/mdd_dir.c | 73 +++++++++++++++++++++++--------------- lustre/mdt/mdt_handler.c | 2 +- lustre/mdt/mdt_open.c | 2 +- lustre/mdt/mdt_reint.c | 71 ++++++++++++++++++++---------------- lustre/mdt/mdt_xattr.c | 37 ++++++++++--------- 9 files changed, 152 insertions(+), 110 deletions(-) diff --git a/lustre/cmm/cmm_internal.h b/lustre/cmm/cmm_internal.h index b8d0983..1d3357d 100644 --- a/lustre/cmm/cmm_internal.h +++ b/lustre/cmm/cmm_internal.h @@ -111,6 +111,7 @@ struct cmr_object { struct cmm_thread_info { struct md_attr cmi_ma; struct lu_buf cmi_buf; + struct lu_fid cmi_fid; /* used for le/cpu convertions */ char cmi_xattr_buf[LUSTRE_POSIX_ACL_MAX_SIZE]; }; diff --git a/lustre/cmm/cmm_split.c b/lustre/cmm/cmm_split.c index d6bd3e2..5d052f8 100644 --- a/lustre/cmm/cmm_split.c +++ b/lustre/cmm/cmm_split.c @@ -55,9 +55,8 @@ enum { int cmm_split_check(const struct lu_env *env, struct md_object *mp, const char *name) { - struct cml_object *clo = md2cml_obj(mp); - struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mp)); struct md_attr *ma = &cmm_env_info(env)->cmi_ma; + struct cml_object *clo = md2cml_obj(mp); int rc; ENTRY; @@ -197,8 +196,8 @@ int cmm_split_expect(const struct lu_env *env, struct md_object *mo, } /* - * Assumption: ma_valid = 0 here, we only need get - * inode and lmv_size for this get_attr + * Assumption: ma_valid = 0 here, we only need get inode and lmv_size + * for this get_attr. */ LASSERT(ma->ma_valid == 0); ma->ma_need = MA_INODE | MA_LMV; @@ -370,22 +369,26 @@ cleanup: return rc; } -/* Remove one entry from local MDT. */ +/* + * Remove one entry from local MDT. Do not corrupt byte order in page, it will + * be sent to remote MDT. + */ static int cmm_split_remove_entry(const struct lu_env *env, struct md_object *mo, struct lu_dirent *ent) { struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); struct cmm_object *obj; - char *name; int is_dir, rc; + char *name; ENTRY; - if (!strncmp(ent->lde_name, ".", ent->lde_namelen) || - !strncmp(ent->lde_name, "..", ent->lde_namelen)) + if (!strncmp(ent->lde_name, ".", le16_to_cpu(ent->lde_namelen)) || + !strncmp(ent->lde_name, "..", le16_to_cpu(ent->lde_namelen))) RETURN(0); - obj = cmm_object_find(env, cmm, &ent->lde_fid); + fid_le_to_cpu(&cmm_env_info(env)->cmi_fid, &ent->lde_fid); + obj = cmm_object_find(env, cmm, &cmm_env_info(env)->cmi_fid); if (IS_ERR(obj)) RETURN(PTR_ERR(obj)); @@ -400,14 +403,14 @@ static int cmm_split_remove_entry(const struct lu_env *env, */ is_dir = 1; - OBD_ALLOC(name, ent->lde_namelen + 1); + OBD_ALLOC(name, le16_to_cpu(ent->lde_namelen) + 1); if (!name) GOTO(cleanup, rc = -ENOMEM); - memcpy(name, ent->lde_name, ent->lde_namelen); + memcpy(name, ent->lde_name, le16_to_cpu(ent->lde_namelen)); rc = mdo_name_remove(env, md_object_next(mo), name, is_dir); - OBD_FREE(name, ent->lde_namelen + 1); + OBD_FREE(name, le16_to_cpu(ent->lde_namelen) + 1); if (rc) GOTO(cleanup, rc); @@ -417,8 +420,10 @@ static int cmm_split_remove_entry(const struct lu_env *env, * use the highest bit of the hash to indicate that (because we do not * use highest bit of hash). */ - if (is_dir) - ent->lde_hash |= MAX_HASH_HIGHEST_BIT; + if (is_dir) { + ent->lde_hash = le32_to_cpu(ent->lde_hash); + ent->lde_hash = cpu_to_le32(ent->lde_hash | MAX_HASH_HIGHEST_BIT); + } EXIT; cleanup: cmm_object_put(env, obj); @@ -443,25 +448,26 @@ static int cmm_split_remove_page(const struct lu_env *env, dp = page_address(rdpg->rp_pages[0]); *len = 0; for (ent = lu_dirent_start(dp); - ent != NULL && ent->lde_hash < hash_end; + ent != NULL && le32_to_cpu(ent->lde_hash) < hash_end; ent = lu_dirent_next(ent)) { rc = cmm_split_remove_entry(env, mo, ent); if (rc) { /* - * XXX Error handler to insert remove name back, - * currently we assumed it will success anyway - * in verfication test. + * XXX: Error handler to insert remove name back, + * currently we assumed it will success anyway in + * verfication test. */ - CWARN("Can not del %*.*s rc %d\n", ent->lde_namelen, - ent->lde_namelen, ent->lde_name, rc); + CWARN("Can not del %*.*s rc %d\n", le16_to_cpu(ent->lde_namelen), + le16_to_cpu(ent->lde_namelen), ent->lde_name, rc); GOTO(unmap, rc); } - if (ent->lde_reclen == 0) + if (le16_to_cpu(ent->lde_reclen) == 0) /* * This is the last ent, whose rec size set to 0 * so recompute here */ - *len += (sizeof *ent + le16_to_cpu(ent->lde_namelen) + + *len += (sizeof(*ent) + + le16_to_cpu(ent->lde_namelen) + 3) & ~3; else *len += le16_to_cpu(ent->lde_reclen); @@ -553,10 +559,10 @@ static int cmm_split_process_stripe(const struct lu_env *env, kmap(rdpg->rp_pages[0]); ldp = page_address(rdpg->rp_pages[0]); - if (ldp->ldp_hash_end >= end) + if (le32_to_cpu(ldp->ldp_hash_end) >= end) done = 1; - rdpg->rp_hash = ldp->ldp_hash_end; + rdpg->rp_hash = le32_to_cpu(ldp->ldp_hash_end); kunmap(rdpg->rp_pages[0]); } while (!done); @@ -631,7 +637,7 @@ int cmm_split_try(const struct lu_env *env, struct md_object *mo) struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); struct md_attr *ma = &cmm_env_info(env)->cmi_ma; int rc = 0, split; - __u64 la_size = 0; + __u64 la_size; struct lu_buf *buf; ENTRY; @@ -647,6 +653,8 @@ int cmm_split_try(const struct lu_env *env, struct md_object *mo) /* No split is needed, caller may proceed with create. */ RETURN(0); } + + la_size = ma->ma_attr.la_size; /* Split should be done now, let's do it. */ CWARN("Dir "DFID" is going to split\n", diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 04b6e71..8f0b9c9 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -309,7 +309,7 @@ static inline struct lu_dirent *lu_dirent_next(struct lu_dirent *ent) { struct lu_dirent *next; - if (ent->lde_reclen != 0) + if (le16_to_cpu(ent->lde_reclen) != 0) next = ((void *)ent) + le16_to_cpu(ent->lde_reclen); else next = NULL; diff --git a/lustre/llite/llite_close.c b/lustre/llite/llite_close.c index 8a36eda..9c74070 100644 --- a/lustre/llite/llite_close.c +++ b/lustre/llite/llite_close.c @@ -89,8 +89,8 @@ void ll_queue_done_writing(struct inode *inode, unsigned long flags) inode->i_ino, inode->i_generation); list_add_tail(&lli->lli_close_list, &lcq->lcq_head); } else { - CWARN("Inode %d is already queued for done writing!\n", - inode->i_ino); + CWARN("Inode %lu/%u is already queued for done writing!\n", + inode->i_ino, inode->i_generation); } wake_up(&lcq->lcq_waitq); spin_unlock(&lcq->lcq_lock); @@ -119,7 +119,7 @@ void ll_epoch_close(struct inode *inode, struct md_op_data *op_data, inode = igrab(inode); LASSERT(inode); - goto out; + GOTO(out, 0); } CDEBUG(D_INODE, "Epoch "LPU64" closed on "DFID"\n", @@ -136,14 +136,14 @@ void ll_epoch_close(struct inode *inode, struct md_op_data *op_data, /* Pack Size-on-MDS inode attributes only if they has changed */ if (!(lli->lli_flags & LLIF_SOM_DIRTY)) { spin_unlock(&lli->lli_lock); - goto out; + GOTO(out, 0); } /* There is already 1 pending DONE_WRITE, do not create another * one -- close epoch with no attribute change. */ if (lli->lli_flags & LLIF_EPOCH_PENDING) { spin_unlock(&lli->lli_lock); - goto out; + GOTO(out, 0); } } @@ -157,8 +157,9 @@ void ll_epoch_close(struct inode *inode, struct md_op_data *op_data, op_data->attr.ia_valid |= ATTR_MTIME_SET | ATTR_CTIME_SET | ATTR_SIZE | ATTR_BLOCKS; } -out: EXIT; +out: + return; } int ll_sizeonmds_update(struct inode *inode, struct lustre_handle *fh) @@ -276,10 +277,13 @@ static int ll_close_thread(void *arg) break; inode = ll_info2i(lli); + CDEBUG(D_INFO, "done_writting for inode %lu/%u\n", + inode->i_ino, inode->i_generation); ll_done_writing(inode); iput(inode); } + CDEBUG(D_INFO, "ll_close exiting\n"); complete(&lcq->lcq_comp); RETURN(0); } diff --git a/lustre/mdd/mdd_dir.c b/lustre/mdd/mdd_dir.c index 7e76c2b..4530819 100644 --- a/lustre/mdd/mdd_dir.c +++ b/lustre/mdd/mdd_dir.c @@ -176,10 +176,9 @@ static int mdd_is_subdir(const struct lu_env *env, RETURN(rc); } -/*Check whether it may create the cobj under the pobj*/ -static int mdd_may_create(const struct lu_env *env, - struct mdd_object *pobj, struct mdd_object *cobj, - int need_check) +/* Check whether it may create the cobj under the pobj */ +static int mdd_may_create(const struct lu_env *env, struct mdd_object *pobj, + struct mdd_object *cobj, int need_check, int lock) { int rc = 0; ENTRY; @@ -190,11 +189,16 @@ static int mdd_may_create(const struct lu_env *env, if (mdd_is_dead_obj(pobj)) RETURN(-ENOENT); - /*check pobj may create or not*/ - if (need_check) - rc = mdd_permission_internal_locked(env, pobj, - MAY_WRITE | MAY_EXEC); - + if (need_check) { + if (lock) { + rc = mdd_permission_internal_locked(env, pobj, + (MAY_WRITE | + MAY_EXEC)); + } else { + rc = mdd_permission_internal(env, pobj, (MAY_WRITE | + MAY_EXEC)); + } + } RETURN(rc); } @@ -256,7 +260,7 @@ static int mdd_may_delete(const struct lu_env *env, RETURN(-EBUSY); } else if (S_ISDIR(mdd_object_type(cobj))) { - RETURN(-EISDIR); + RETURN(-EISDIR); } if (pobj) { @@ -281,15 +285,20 @@ int mdd_link_sanity_check(const struct lu_env *env, struct mdd_object *tgt_obj, ENTRY; if (tgt_obj) { - rc = mdd_may_create(env, tgt_obj, NULL, 1); + /* + * Lock only if tgt and src not same object. This is because + * mdd_link() already locked src and we try to lock it again we + * have a problem. + */ + rc = mdd_may_create(env, tgt_obj, NULL, 1, (src_obj != tgt_obj)); if (rc) RETURN(rc); } - if (S_ISDIR(mdd_object_type(src_obj))) + if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj)) RETURN(-EPERM); - if (mdd_is_immutable(src_obj) || mdd_is_append(src_obj)) + if (S_ISDIR(mdd_object_type(src_obj))) RETURN(-EPERM); RETURN(rc); @@ -382,16 +391,18 @@ static int __mdd_index_insert_only(const struct lu_env *env, const char *name, struct thandle *th, struct lustre_capa *capa) { - int rc; struct dt_object *next = mdd_object_child(pobj); + int rc; ENTRY; - if (dt_try_as_dir(env, next)) + if (dt_try_as_dir(env, next)) { rc = next->do_index_ops->dio_insert(env, next, - __mdd_fid_rec(env, lf), - (const struct dt_key *)name, th, capa); - else + __mdd_fid_rec(env, lf), + (const struct dt_key *)name, + th, capa); + } else { rc = -ENOTDIR; + } RETURN(rc); } @@ -399,12 +410,12 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, struct md_object *src_obj, const char *name, struct md_attr *ma) { + struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix; struct mdd_object *mdd_tobj = md2mdd_obj(tgt_obj); struct mdd_object *mdd_sobj = md2mdd_obj(src_obj); struct mdd_device *mdd = mdo2mdd(src_obj); - struct lu_attr *la_copy = &mdd_env_info(env)->mti_la_for_fix; - struct thandle *handle; struct dynlock_handle *dlh; + struct thandle *handle; int rc; ENTRY; @@ -420,24 +431,26 @@ static int mdd_link(const struct lu_env *env, struct md_object *tgt_obj, rc = mdd_link_sanity_check(env, mdd_tobj, mdd_sobj); if (rc) - GOTO(out, rc); + GOTO(out_unlock, rc); rc = __mdd_index_insert_only(env, mdd_tobj, mdo2fid(mdd_sobj), name, handle, mdd_object_capa(env, mdd_tobj)); - if (rc == 0) - mdd_ref_add_internal(env, mdd_sobj, handle); + if (rc) + GOTO(out_unlock, rc); + + mdd_ref_add_internal(env, mdd_sobj, handle); *la_copy = ma->ma_attr; la_copy->la_valid = LA_CTIME; rc = mdd_attr_set_internal(env, mdd_sobj, la_copy, handle, 0); if (rc) - GOTO(out, rc); + GOTO(out_unlock, rc); la_copy->la_valid = LA_CTIME | LA_MTIME; rc = mdd_attr_set_internal_locked(env, mdd_tobj, la_copy, handle, 0); -out: +out_unlock: mdd_write_unlock(env, mdd_sobj); mdd_pdo_write_unlock(env, mdd_tobj, dlh); out_trans: @@ -664,11 +677,12 @@ static int mdd_name_insert(const struct lu_env *env, rc = __mdd_index_insert(env, mdd_obj, fid, name, isdir, handle, BYPASS_CAPA); + EXIT; out_unlock: mdd_pdo_write_unlock(env, mdd_obj, dlh); out_trans: mdd_trans_stop(env, mdo2mdd(pobj), rc, handle); - RETURN(rc); + return rc; } /* @@ -732,6 +746,7 @@ out_trans: mdd_trans_stop(env, mdd, rc, handle); RETURN(rc); } + static int mdd_rt_sanity_check(const struct lu_env *env, struct mdd_object *tgt_pobj, struct mdd_object *tobj, @@ -752,7 +767,7 @@ static int mdd_rt_sanity_check(const struct lu_env *env, mdd_dir_is_empty(env, tobj)) RETURN(-ENOTEMPTY); } else { - rc = mdd_may_create(env, tgt_pobj, NULL, 1); + rc = mdd_may_create(env, tgt_pobj, NULL, 1, 1); } RETURN(rc); @@ -898,7 +913,7 @@ __mdd_lookup(const struct lu_env *env, struct md_object *pobj, struct dt_object *dir = mdd_object_child(mdd_obj); struct dt_rec *rec = (struct dt_rec *)fid; const struct dt_key *key = (const struct dt_key *)name; - struct timeval start; + struct timeval start; int rc; ENTRY; @@ -1293,7 +1308,7 @@ static int mdd_rename_sanity_check(const struct lu_env *env, if (!tobj) { rc = mdd_may_create(env, tgt_pobj, NULL, - (src_pobj != tgt_pobj)); + (src_pobj != tgt_pobj), 1); } else { mdd_read_lock(env, tobj); rc = mdd_may_delete(env, tgt_pobj, tobj, src_is_dir, diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index 3614a55..98c7529 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -818,7 +818,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, GOTO(out, rc); } - /*step 1: lock parent */ + /* step 1: lock parent */ lhp = &info->mti_lh[MDT_LH_PARENT]; mdt_lock_pdo_init(lhp, LCK_PR, name, namelen); rc = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE, diff --git a/lustre/mdt/mdt_open.c b/lustre/mdt/mdt_open.c index 9927f56..fc83a3a 100644 --- a/lustre/mdt/mdt_open.c +++ b/lustre/mdt/mdt_open.c @@ -889,7 +889,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) rc = mdt_object_lock(info, child, lhc, MDS_INODELOCK_LOOKUP, - MDT_LOCAL_LOCK); + MDT_CROSS_LOCK); } repbody->fid1 = *mdt_object_fid(child); repbody->valid |= (OBD_MD_FLID | OBD_MD_MDS); diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index 1413d96..2a6a25f 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -165,17 +165,17 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, int flags) rc = mdt_object_lock(info, mo, lh, lockpart, MDT_LOCAL_LOCK); if (rc != 0) - GOTO(out, rc); + RETURN(rc); } /* Setattrs are syncronized through dlm lock taken above. If another * epoch started, its attributes may be already flushed on disk, * skip setattr. */ if (som_update && (info->mti_epoch->ioepoch != mo->mot_ioepoch)) - GOTO(out, rc = 0); + GOTO(out_unlock, rc = 0); if (lu_object_assert_not_exists(&mo->mot_obj.mo_lu)) - GOTO(out, rc = -ENOENT); + GOTO(out_unlock, rc = -ENOENT); /* all attrs are packed into mti_attr in unpack_setattr */ mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom, @@ -184,21 +184,20 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, int flags) /* all attrs are packed into mti_attr in unpack_setattr */ rc = mo_attr_set(info->mti_env, mdt_object_child(mo), ma); if (rc != 0) - GOTO(out, rc); + GOTO(out_unlock, rc); /* Re-enable SIZEONMDS. */ if (som_update) { CDEBUG(D_INODE, "Closing epoch "LPU64" on "DFID". Count %d\n", mo->mot_ioepoch, PFID(mdt_object_fid(mo)), mo->mot_epochcount); - mdt_sizeonmds_enable(info, mo); } EXIT; -out: +out_unlock: mdt_object_unlock(info, mo, lh, rc); - return(rc); + return rc; } static int mdt_reint_setattr(struct mdt_thread_info *info, @@ -407,7 +406,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, if (rc != 0) GOTO(out_unlock_parent, rc); - /* we will lock the child regardless it is local or remote. No harm. */ + /* We will lock the child regardless it is local or remote. No harm. */ mc = mdt_object_find(info->mti_env, info->mti_mdt, child_fid); if (IS_ERR(mc)) GOTO(out_unlock_parent, rc = PTR_ERR(mc)); @@ -415,8 +414,10 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, mdt_lock_reg_init(child_lh, LCK_EX); rc = mdt_object_lock(info, mc, child_lh, MDS_INODELOCK_FULL, MDT_CROSS_LOCK); - if (rc != 0) - GOTO(out_put_child, rc); + if (rc != 0) { + mdt_object_put(info->mti_env, mc); + GOTO(out_unlock_parent, rc); + } mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom, OBD_FAIL_MDS_REINT_UNLINK_WRITE); @@ -430,16 +431,11 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, mdt_set_capainfo(info, 1, child_fid, BYPASS_CAPA); rc = mdo_unlink(info->mti_env, mdt_object_child(mp), mdt_object_child(mc), rr->rr_name, ma); - if (rc) - GOTO(out_unlock_child, rc); - - mdt_handle_last_unlink(info, mc, ma); + if (rc == 0) + mdt_handle_last_unlink(info, mc, ma); EXIT; -out_unlock_child: - mdt_object_unlock(info, mc, child_lh, rc); -out_put_child: - mdt_object_put(info->mti_env, mc); + mdt_object_unlock_put(info, mc, child_lh, rc); out_unlock_parent: mdt_object_unlock_put(info, mp, parent_lh, rc); out: @@ -483,7 +479,7 @@ static int mdt_reint_link(struct mdt_thread_info *info, /* step 1: find & lock the target parent dir */ lhp = &info->mti_lh[MDT_LH_PARENT]; - mdt_lock_pdo_init(lhp, LCK_PW, rr->rr_name, + mdt_lock_pdo_init(lhp, LCK_EX, rr->rr_name, rr->rr_namelen); mp = mdt_object_find_lock(info, rr->rr_fid2, lhp, MDS_INODELOCK_UPDATE); @@ -493,14 +489,22 @@ static int mdt_reint_link(struct mdt_thread_info *info, /* step 2: find & lock the source */ lhs = &info->mti_lh[MDT_LH_CHILD]; mdt_lock_reg_init(lhs, LCK_EX); - ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1); - if (IS_ERR(ms)) - GOTO(out_unlock_parent, rc = PTR_ERR(ms)); - rc = mdt_object_lock(info, ms, lhs, MDS_INODELOCK_UPDATE, - MDT_CROSS_LOCK); - if (rc != 0) - GOTO(out_unlock_source, rc); + if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) { + mdt_object_get(info->mti_env, mp); + ms = mp; + } else { + ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1); + if (IS_ERR(ms)) + GOTO(out_unlock_parent, rc = PTR_ERR(ms)); + + rc = mdt_object_lock(info, ms, lhs, MDS_INODELOCK_UPDATE, + MDT_CROSS_LOCK); + if (rc != 0) { + mdt_object_put(info->mti_env, ms); + GOTO(out_unlock_parent, rc); + } + } /* step 3: link it */ mdt_fail_write(info->mti_env, info->mti_mdt->mdt_bottom, @@ -510,7 +514,6 @@ static int mdt_reint_link(struct mdt_thread_info *info, mdt_object_child(ms), rr->rr_name, ma); EXIT; -out_unlock_source: mdt_object_unlock_put(info, ms, lhs, rc); out_unlock_parent: mdt_object_unlock_put(info, mp, lhp, rc); @@ -752,13 +755,19 @@ static int mdt_reint_rename(struct mdt_thread_info *info, if (lu_fid_eq(old_fid, rr->rr_fid1) || lu_fid_eq(old_fid, rr->rr_fid2)) GOTO(out_unlock_target, rc = -EINVAL); - lh_oldp = &info->mti_lh[MDT_LH_OLD]; - mdt_lock_reg_init(lh_oldp, LCK_EX); - mold = mdt_object_find_lock(info, old_fid, lh_oldp, - MDS_INODELOCK_LOOKUP); + mold = mdt_object_find(info->mti_env, info->mti_mdt, old_fid); if (IS_ERR(mold)) GOTO(out_unlock_target, rc = PTR_ERR(mold)); + lh_oldp = &info->mti_lh[MDT_LH_OLD]; + mdt_lock_reg_init(lh_oldp, LCK_EX); + rc = mdt_object_lock(info, mold, lh_oldp, MDS_INODELOCK_LOOKUP, + MDT_CROSS_LOCK); + if (rc != 0) { + mdt_object_put(info->mti_env, mold); + GOTO(out_unlock_target, rc); + } + /* step 4: find & lock the new object. */ /* new target object may not exist now */ rc = mdo_lookup(info->mti_env, mdt_object_child(mtgtdir), diff --git a/lustre/mdt/mdt_xattr.c b/lustre/mdt/mdt_xattr.c index cd207d2..c2949db 100644 --- a/lustre/mdt/mdt_xattr.c +++ b/lustre/mdt/mdt_xattr.c @@ -53,20 +53,21 @@ static int mdt_getxattr_pack_reply(struct mdt_thread_info * info) __u64 valid = info->mti_body->valid; static const char user_string[] = "user."; int size, rc; - + ENTRY; + if (MDT_FAIL_CHECK(OBD_FAIL_MDS_GETXATTR_PACK)) - return -ENOMEM; + RETURN(-ENOMEM); /* Determine how many bytes we need */ if ((valid & OBD_MD_FLXATTR) == OBD_MD_FLXATTR) { xattr_name = req_capsule_client_get(pill, &RMF_NAME); if (!xattr_name) - return -EFAULT; + RETURN(-EFAULT); if (!(req->rq_export->exp_connect_flags & OBD_CONNECT_XATTR) && !strncmp(xattr_name, user_string, sizeof(user_string) - 1)) - return -EOPNOTSUPP; - + RETURN(-EOPNOTSUPP); + if (!strcmp(xattr_name, XATTR_NAME_LUSTRE_ACL)) size = RMTACL_SIZE_MAX; else @@ -78,19 +79,22 @@ static int mdt_getxattr_pack_reply(struct mdt_thread_info * info) mdt_object_child(info->mti_object), &LU_BUF_NULL); } else { - CERROR("valid bits: "LPX64"\n", info->mti_body->valid); - return -EINVAL; + CERROR("Valid bits: "LPX64"\n", info->mti_body->valid); + RETURN(-EINVAL); } if (size < 0) { - if (size != -ENODATA && size != -EOPNOTSUPP) - CERROR("get EA size error: %d\n", size); - return size; + if (size == -ENODATA) + size = 0; + else if (size != -EOPNOTSUPP) { + CERROR("Error geting EA size: %d\n", size); + RETURN(size); + } } if (info->mti_body->eadatasize != 0 && info->mti_body->eadatasize < size) - return -ERANGE; + RETURN(-ERANGE); req_capsule_set_size(pill, &RMF_EADATA, RCL_SERVER, min_t(int, size, info->mti_body->eadatasize)); @@ -98,10 +102,10 @@ static int mdt_getxattr_pack_reply(struct mdt_thread_info * info) rc = req_capsule_pack(pill); if (rc) { LASSERT(rc < 0); - return rc; + RETURN(rc); } - return size; + RETURN(size); } static int do_remote_getfacl(struct mdt_thread_info *info, @@ -152,13 +156,13 @@ int mdt_getxattr(struct mdt_thread_info *info) RETURN(err_serious(easize)); repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY); - LASSERT(repbody); + LASSERT(repbody != NULL); rc = mdt_init_ucred(info, reqbody); if (rc) RETURN(rc); - /* no need further getxattr */ + /* No need further getxattr. */ if (easize == 0 || reqbody->eadatasize == 0) GOTO(out, rc = easize); @@ -192,13 +196,14 @@ int mdt_getxattr(struct mdt_thread_info *info) } else LBUG(); + EXIT; out: if (rc >= 0) { repbody->eadatasize = rc; rc = 0; } mdt_exit_ucred(info); - RETURN(rc); + return rc; } /* return EADATA length to the caller. negative value means error */ -- 1.8.3.1