From 1d8af173c4fa18c4edd3d82a04676f9c79191fc2 Mon Sep 17 00:00:00 2001 From: yury Date: Thu, 19 Oct 2006 08:07:10 +0000 Subject: [PATCH] - cleanups in cmm split a bit; - preparation stage for adding ldlm part of pdirops (not used yet, all stuff is under CONFIG_PDIROPS) --- lustre/cmm/cmm_internal.h | 18 +++- lustre/cmm/cmm_object.c | 37 +++++++- lustre/cmm/cmm_split.c | 56 ++++++----- lustre/include/lustre/lustre_idl.h | 26 ++++-- lustre/include/md_object.h | 12 +++ lustre/mdt/mdt_handler.c | 184 +++++++++++++++++++++++++++++-------- lustre/mdt/mdt_internal.h | 9 +- lustre/mdt/mdt_open.c | 14 +-- lustre/mdt/mdt_reint.c | 29 +++--- lustre/mdt/mdt_xattr.c | 2 +- 10 files changed, 284 insertions(+), 103 deletions(-) diff --git a/lustre/cmm/cmm_internal.h b/lustre/cmm/cmm_internal.h index fd1b278..b142cc6 100644 --- a/lustre/cmm/cmm_internal.h +++ b/lustre/cmm/cmm_internal.h @@ -36,6 +36,14 @@ #include #include +#define CMM_NO_SPLIT_EXPECTED 0 +#define CMM_EXPECT_SPLIT 1 +#define CMM_NOT_SPLITTABLE 2 + +enum { + CMM_SPLIT_SIZE = 64 * 1024 +}; + struct cmm_device { struct md_device cmm_md_dev; /* device flags, taken from enum cmm_flags */ @@ -137,9 +145,15 @@ struct lu_object *cmm_object_alloc(const struct lu_env *env, int cmm_upcall(const struct lu_env *env, struct md_device *md, enum md_upcall_event ev); #ifdef HAVE_SPLIT_SUPPORT + /* cmm_split.c */ -int cml_try_to_split(const struct lu_env *env, - struct md_object *mo); +int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp, + const char *name); + +int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo, + struct md_attr *ma); + +int cmm_try_to_split(const struct lu_env *env, struct md_object *mo); #endif #endif /* __KERNEL__ */ diff --git a/lustre/cmm/cmm_object.c b/lustre/cmm/cmm_object.c index 854289b..8294033 100644 --- a/lustre/cmm/cmm_object.c +++ b/lustre/cmm/cmm_object.c @@ -361,6 +361,7 @@ static int cml_lookup(const struct lu_env *env, struct md_object *mo_p, { int rc; ENTRY; + #ifdef HAVE_SPLIT_SUPPORT rc = cmm_mdsnum_check(env, mo_p, name); if (rc) @@ -371,6 +372,32 @@ static int cml_lookup(const struct lu_env *env, struct md_object *mo_p, } +static lu_mode_t cml_lock_mode(const struct lu_env *env, + struct md_object *mo, lu_mode_t lm) +{ + ENTRY; +#ifdef HAVE_SPLIT_SUPPORT + if (lm == LU_EX) { + RETURN(LU_EX); + } else if (lm == LU_PR) { + RETURN(LU_CR); + } else if (lm == LU_PW) { + struct md_attr *ma = &cmm_env_info(env)->cmi_ma; + int split; + + memset(ma, 0, sizeof(*ma)); + split = cmm_expect_splitting(env, mo, ma); + + if (split == CMM_EXPECT_SPLIT) { + RETURN(LU_EX); + } else { + RETURN(LU_CW); + } + } +#endif + RETURN(LU_MINMODE); +} + static int cml_create(const struct lu_env *env, struct md_object *mo_p, const char *child_name, struct md_object *mo_c, struct md_create_spec *spec, @@ -380,7 +407,7 @@ static int cml_create(const struct lu_env *env, ENTRY; #ifdef HAVE_SPLIT_SUPPORT - rc = cml_try_to_split(env, mo_p); + rc = cmm_try_to_split(env, mo_p); if (rc) RETURN(rc); #endif @@ -546,6 +573,7 @@ static int cmm_is_subdir(const struct lu_env *env, struct md_object *mo, static struct md_dir_operations cml_dir_ops = { .mdo_is_subdir = cmm_is_subdir, .mdo_lookup = cml_lookup, + .mdo_lock_mode = cml_lock_mode, .mdo_create = cml_create, .mdo_link = cml_link, .mdo_unlink = cml_unlink, @@ -757,6 +785,12 @@ static int cmr_lookup(const struct lu_env *env, struct md_object *mo_p, RETURN(-EREMOTE); } +static lu_mode_t cmr_lock_mode(const struct lu_env *env, + struct md_object *mo, lu_mode_t lm) +{ + RETURN(LU_MINMODE); +} + /* * All methods below are cross-ref by nature. They consist of remote call and * local operation. Due to future rollback functionality there are several @@ -904,6 +938,7 @@ static int cmr_rename_tgt(const struct lu_env *env, static struct md_dir_operations cmr_dir_ops = { .mdo_is_subdir = cmm_is_subdir, .mdo_lookup = cmr_lookup, + .mdo_lock_mode = cmr_lock_mode, .mdo_create = cmr_create, .mdo_link = cmr_link, .mdo_unlink = cmr_unlink, diff --git a/lustre/cmm/cmm_split.c b/lustre/cmm/cmm_split.c index b6e287a..ac12864 100644 --- a/lustre/cmm/cmm_split.c +++ b/lustre/cmm/cmm_split.c @@ -40,14 +40,6 @@ #include "cmm_internal.h" #include "mdc_internal.h" -#define CMM_NO_SPLIT_EXPECTED 0 -#define CMM_EXPECT_SPLIT 1 -#define CMM_NO_SPLITTABLE 2 - -enum { - SPLIT_SIZE = 64*1024 -}; - static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area, ssize_t len) { @@ -60,12 +52,13 @@ static struct lu_buf *cmm_buf_get(const struct lu_env *env, void *area, } int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp, - const char *name) + const char *name) { struct md_attr *ma = &cmm_env_info(env)->cmi_ma; int rc; ENTRY; - /* try to get the LMV EA size */ + + /* Try to get the LMV EA size */ memset(ma, 0, sizeof(*ma)); ma->ma_need = MA_INODE | MA_LMV; rc = mo_attr_get(env, mp, ma); @@ -79,20 +72,21 @@ int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp, if (ma->ma_lmv == NULL) RETURN(-ENOMEM); - /* get LMV EA */ + /* Get LMV EA */ ma->ma_need = MA_INODE | MA_LMV; rc = mo_attr_get(env, mp, ma); if (rc) RETURN(rc); - /* skip checking the slave dirs (mea_count == 0) */ + /* Skip checking the slave dirs (mea_count == 0) */ if (ma->ma_lmv->mea_count != 0) RETURN(0); /* - * Get stripe by name to check the name belongs - * to master dir, otherwise return the -ERESTART + * Get stripe by name to check the name belongs to master dir, + * otherwise return the -ERESTART */ stripe = mea_name2idx(ma->ma_lmv, name, strlen(name)); + /* Master stripe is always 0 */ if (stripe != 0) rc = -ERESTART; @@ -102,23 +96,28 @@ int cmm_mdsnum_check(const struct lu_env *env, struct md_object *mp, RETURN(rc); } -static int cmm_expect_splitting(const struct lu_env *env, - struct md_object *mo, - struct md_attr *ma) +int cmm_expect_splitting(const struct lu_env *env, struct md_object *mo, + struct md_attr *ma) { struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); struct lu_fid *fid = NULL; int rc = CMM_EXPECT_SPLIT; ENTRY; + ma->ma_need = MA_INODE | MA_LMV; + rc = mo_attr_get(env, mo, ma); + if (rc) + GOTO(cleanup, rc = CMM_NOT_SPLITTABLE); + if (cmm->cmm_tgt_count == 0) GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED); - if (ma->ma_attr.la_size < SPLIT_SIZE) + if (ma->ma_attr.la_size < CMM_SPLIT_SIZE) GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED); if (ma->ma_lmv_size) GOTO(cleanup, rc = CMM_NO_SPLIT_EXPECTED); + OBD_ALLOC_PTR(fid); rc = cmm_child_ops(cmm)->mdo_root_get(env, cmm->cmm_child, fid); if (rc) @@ -491,7 +490,7 @@ free_rdpg: return rc; } -int cml_try_to_split(const struct lu_env *env, struct md_object *mo) +int cmm_try_to_split(const struct lu_env *env, struct md_object *mo) { struct cmm_device *cmm = cmm_obj2dev(md2cmm_obj(mo)); struct md_attr *ma = &cmm_env_info(env)->cmi_ma; @@ -500,14 +499,9 @@ int cml_try_to_split(const struct lu_env *env, struct md_object *mo) ENTRY; LASSERT(S_ISDIR(lu_object_attr(&mo->mo_lu))); - memset(ma, 0, sizeof(*ma)); - ma->ma_need = MA_INODE | MA_LMV; - rc = mo_attr_get(env, mo, ma); - if (rc) - GOTO(cleanup, ma); - /* step1: checking whether the dir need to be splitted */ + /* Step1: Checking whether the dir needs to be split. */ rc = cmm_expect_splitting(env, mo, ma); if (rc != CMM_EXPECT_SPLIT) GOTO(cleanup, rc = 0); @@ -520,23 +514,25 @@ int cml_try_to_split(const struct lu_env *env, struct md_object *mo) if (rc) GOTO(cleanup, rc = 0); - /* step2: create slave objects */ + /* Step2: Create slave objects (on slave MDTs) */ rc = cmm_slaves_create(env, mo, ma); if (rc) GOTO(cleanup, ma); - /* step3: scan and split the object */ + /* Step3: Scan and split the object. */ rc = cmm_scan_and_split(env, mo, ma); if (rc) GOTO(cleanup, ma); buf = cmm_buf_get(env, ma->ma_lmv, ma->ma_lmv_size); - /* step4: set mea to the master object */ - rc = mo_xattr_set(env, md_object_next(mo), buf, MDS_LMV_MD_NAME, 0); - if (rc == -ERESTART) + /* Step4: Set mea to the master object. */ + rc = mo_xattr_set(env, md_object_next(mo), buf, + MDS_LMV_MD_NAME, 0); + if (rc == -ERESTART) { CWARN("Dir "DFID" has been split\n", PFID(lu_object_fid(&mo->mo_lu))); + } EXIT; cleanup: if (ma->ma_lmv_size && ma->ma_lmv) diff --git a/lustre/include/lustre/lustre_idl.h b/lustre/include/lustre/lustre_idl.h index 8c67dea..c23b7f8 100644 --- a/lustre/include/lustre/lustre_idl.h +++ b/lustre/include/lustre/lustre_idl.h @@ -1443,17 +1443,29 @@ extern void lustre_swab_ldlm_res_id (struct ldlm_res_id *id); /* lock types */ typedef enum { LCK_MINMODE = 0, - LCK_EX = 1, - LCK_PW = 2, - LCK_PR = 4, - LCK_CW = 8, - LCK_CR = 16, - LCK_NL = 32, - LCK_GROUP = 64, + LCK_EX = 1, + LCK_PW = 2, + LCK_PR = 4, + LCK_CW = 8, + LCK_CR = 16, + LCK_NL = 32, + LCK_GROUP = 64, LCK_MAXMODE } ldlm_mode_t; typedef enum { + LU_MINMODE = 0, + LU_EX = 1, + LU_PW = 2, + LU_PR = 4, + LU_CW = 8, + LU_CR = 16, + LU_NL = 32, + LU_GROUP = 64, + LU_MAXMODE +} lu_mode_t; + +typedef enum { LDLM_PLAIN = 10, LDLM_EXTENT = 11, LDLM_FLOCK = 12, diff --git a/lustre/include/md_object.h b/lustre/include/md_object.h index 37cf3c8..520ff21 100644 --- a/lustre/include/md_object.h +++ b/lustre/include/md_object.h @@ -196,6 +196,9 @@ struct md_dir_operations { int (*mdo_lookup)(const struct lu_env *env, struct md_object *obj, const char *name, struct lu_fid *fid); + lu_mode_t (*mdo_lock_mode)(const struct lu_env *env, struct md_object *obj, + lu_mode_t mode); + int (*mdo_create)(const struct lu_env *env, struct md_object *pobj, const char *name, struct md_object *child, struct md_create_spec *spec, @@ -454,6 +457,15 @@ static inline int mdo_lookup(const struct lu_env *env, return p->mo_dir_ops->mdo_lookup(env, p, name, f); } +static inline lu_mode_t mdo_lock_mode(const struct lu_env *env, + struct md_object *mo, + lu_mode_t lm) +{ + if (mo->mo_dir_ops->mdo_lock_mode == NULL) + return LU_MINMODE; + return mo->mo_dir_ops->mdo_lock_mode(env, mo, lm); +} + static inline int mdo_create(const struct lu_env *env, struct md_object *p, const char *child_name, diff --git a/lustre/mdt/mdt_handler.c b/lustre/mdt/mdt_handler.c index ca6ecca..ebab34b 100644 --- a/lustre/mdt/mdt_handler.c +++ b/lustre/mdt/mdt_handler.c @@ -566,10 +566,10 @@ static int mdt_raw_lookup(struct mdt_thread_info *info, const char* name, struct ldlm_reply *ldlm_rep) { - const struct mdt_body *reqbody = info->mti_body; - struct md_object *next = mdt_object_child(info->mti_object); - struct lu_fid *child_fid = &info->mti_tmp_fid1; - struct mdt_body *repbody; + struct md_object *next = mdt_object_child(info->mti_object); + const struct mdt_body *reqbody = info->mti_body; + struct lu_fid *child_fid = &info->mti_tmp_fid1; + struct mdt_body *repbody; int rc; ENTRY; @@ -614,7 +614,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, struct ldlm_lock *lock; ENTRY; - is_resent = lustre_handle_is_used(&lhc->mlh_lh); + is_resent = lustre_handle_is_used(&lhc->mlh_reg_lh); if (is_resent) LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT); @@ -653,10 +653,10 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, if (is_resent) { /* Do not take lock for resent case. */ - lock = ldlm_handle2lock(&lhc->mlh_lh); + lock = ldlm_handle2lock(&lhc->mlh_reg_lh); if (!lock) { CERROR("Invalid lock handle "LPX64"\n", - lhc->mlh_lh.cookie); + lhc->mlh_reg_lh.cookie); LBUG(); } LASSERT(fid_res_name_eq(mdt_object_fid(child), @@ -665,7 +665,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, rc = 0; } else { mdt_lock_handle_init(lhc); - lhc->mlh_mode = LCK_CR; + lhc->mlh_reg_mode = LCK_CR; /* * Object's name is on another MDS, no lookup lock is @@ -688,7 +688,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, /*step 1: lock parent */ lhp = &info->mti_lh[MDT_LH_PARENT]; - lhp->mlh_mode = LCK_CR; + lhp->mlh_reg_mode = LCK_CR; rc = mdt_object_lock(info, parent, lhp, MDS_INODELOCK_UPDATE); if (rc != 0) RETURN(rc); @@ -710,10 +710,10 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, GOTO(out_parent, rc = PTR_ERR(child)); if (is_resent) { /* Do not take lock for resent case. */ - lock = ldlm_handle2lock(&lhc->mlh_lh); + lock = ldlm_handle2lock(&lhc->mlh_reg_lh); if (!lock) { CERROR("Invalid lock handle "LPX64"\n", - lhc->mlh_lh.cookie); + lhc->mlh_reg_lh.cookie); LBUG(); } LASSERT(fid_res_name_eq(child_fid, @@ -721,7 +721,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, LDLM_LOCK_PUT(lock); } else { mdt_lock_handle_init(lhc); - lhc->mlh_mode = LCK_CR; + lhc->mlh_reg_mode = LCK_CR; rc = mdt_object_cr_lock(info, child, lhc, child_bits); if (rc != 0) GOTO(out_child, rc); @@ -733,7 +733,7 @@ static int mdt_getattr_name_lock(struct mdt_thread_info *info, if (rc != 0) { mdt_object_unlock(info, child, lhc, 1); } else { - struct ldlm_lock *lock = ldlm_handle2lock(&lhc->mlh_lh); + struct ldlm_lock *lock = ldlm_handle2lock(&lhc->mlh_reg_lh); if (lock) { struct ldlm_res_id *res_id; struct mdt_body *repbody; @@ -791,9 +791,9 @@ static int mdt_getattr_name(struct mdt_thread_info *info) GOTO(out, rc); rc = mdt_getattr_name_lock(info, lhc, MDS_INODELOCK_UPDATE, NULL); - if (lustre_handle_is_used(&lhc->mlh_lh)) { - ldlm_lock_decref(&lhc->mlh_lh, lhc->mlh_mode); - lhc->mlh_lh.cookie = 0; + if (lustre_handle_is_used(&lhc->mlh_reg_lh)) { + ldlm_lock_decref(&lhc->mlh_reg_lh, lhc->mlh_reg_mode); + lhc->mlh_reg_lh.cookie = 0; } mdt_exit_ucred(info); EXIT; @@ -1409,6 +1409,112 @@ struct mdt_object *mdt_object_find(const struct lu_env *env, RETURN(m); } +static inline lu_mode_t mdt_ldlm_mode2lu_mode(ldlm_mode_t mode) +{ + switch (mode) { + case LCK_MINMODE: + return LU_MINMODE; + case LCK_EX: + return LU_EX; + case LCK_PW: + return LU_PW; + case LCK_PR: + return LU_PR; + case LCK_CW: + return LU_CW; + case LCK_CR: + return LU_CR; + case LCK_NL: + return LU_NL; + case LCK_GROUP: + return LU_GROUP; + default: + return 0; + } +} + +static inline ldlm_mode_t mdt_lu_mode2ldlm_mode(lu_mode_t mode) +{ + switch (mode) { + case LU_MINMODE: + return LCK_MINMODE; + case LU_EX: + return LCK_EX; + case LU_PW: + return LCK_PW; + case LU_PR: + return LCK_PR; + case LU_CW: + return LCK_CW; + case LU_CR: + return LCK_CR; + case LU_NL: + return LCK_NL; + case LU_GROUP: + return LCK_GROUP; + default: + return 0; + } +} + +int mdt_object_lock_mode(struct mdt_thread_info *info, + struct mdt_object *o, + struct mdt_lock_handle *lh, + ldlm_mode_t lm) +{ + ENTRY; + + lh->mlh_reg_mode = lm; + +#ifdef CONFIG_PDIROPS + { + lu_mode_t mode; + + /* + * Any dir access needs couple of locks: + * + * 1) on part of dir we gonna take lookup/modify; + * + * 2) on whole dir to protect it from concurrent splitting + * and/or to flush client's cache for readdir(). + * + * so, for a given mode and object this routine decides what + * lock mode to use for lock #2: + * + * 1) if caller's gonna lookup in dir then we need to protect + * dir from being splitted only - LCK_CR + * + * 2) if caller's gonna modify dir then we need to protect dir + * from being splitted and to flush cache - LCK_CW + * + * 3) if caller's gonna modify dir and that dir seems ready for + * splitting then we need to protect it from any type of access + * (lookup/modify/split) - LCK_EX --bzzz + */ + + /* Ask underlaying level its opinion about possible locks. */ + mode = mdo_lock_mode(info->mti_env, mdt_object_child(o), + mdt_ldlm_mode2lu_mode(lm)); + if (mode != LU_MINMODE) { + /* Lower layer said what lock mode it likes to be, use it. */ + lh->mlh_pdo_mode = mdt_lu_mode2ldlm_mode(mode); + } else { + /* + * Lower layer does not want to specify locking mode. We od it + * our selves. No special protection is needed, just flush + * client's cache on modification. + */ + if (lm == LCK_PW) + lh->mlh_pdo_mode = LCK_CW; + else + lh->mlh_pdo_mode = LCK_MINMODE; + } + } +#endif + + RETURN(0); +} + int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o, struct mdt_lock_handle *lh, __u64 ibits) { @@ -1418,8 +1524,8 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o, int rc; ENTRY; - LASSERT(!lustre_handle_is_used(&lh->mlh_lh)); - LASSERT(lh->mlh_mode != LCK_MINMODE); + LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh)); + LASSERT(lh->mlh_reg_mode != LCK_MINMODE); if (mdt_object_exists(o) < 0) { LASSERT(!(ibits & MDS_INODELOCK_UPDATE)); LASSERT(ibits & MDS_INODELOCK_LOOKUP); @@ -1427,8 +1533,8 @@ int mdt_object_lock(struct mdt_thread_info *info, struct mdt_object *o, memset(policy, 0, sizeof *policy); policy->l_inodebits.bits = ibits; - rc = fid_lock(ns, mdt_object_fid(o), &lh->mlh_lh, lh->mlh_mode, - policy, res_id); + rc = fid_lock(ns, mdt_object_fid(o), &lh->mlh_reg_lh, + lh->mlh_reg_mode, policy, res_id); RETURN(rc); } @@ -1453,8 +1559,8 @@ void mdt_object_unlock(struct mdt_thread_info *info, struct mdt_object *o, struct mdt_lock_handle *lh, int decref) { struct ptlrpc_request *req = mdt_info_req(info); - struct lustre_handle *handle = &lh->mlh_lh; - ldlm_mode_t mode = lh->mlh_mode; + struct lustre_handle *handle = &lh->mlh_reg_lh; + ldlm_mode_t mode = lh->mlh_reg_mode; ENTRY; if (lustre_handle_is_used(handle)) { @@ -1794,13 +1900,15 @@ static int mdt_req_handle(struct mdt_thread_info *info, void mdt_lock_handle_init(struct mdt_lock_handle *lh) { - lh->mlh_lh.cookie = 0ull; - lh->mlh_mode = LCK_MINMODE; + lh->mlh_reg_lh.cookie = 0ull; + lh->mlh_reg_mode = LCK_MINMODE; + lh->mlh_pdo_lh.cookie = 0ull; + lh->mlh_pdo_mode = LCK_MINMODE; } void mdt_lock_handle_fini(struct mdt_lock_handle *lh) { - LASSERT(!lustre_handle_is_used(&lh->mlh_lh)); + LASSERT(!lustre_handle_is_used(&lh->mlh_reg_lh)); } /* @@ -1898,8 +2006,8 @@ static int mdt_recovery(struct mdt_thread_info *info) req->rq_xid == req_exp_last_close_xid(req)) { if (!(lustre_msg_get_flags(req->rq_reqmsg) & (MSG_RESENT | MSG_REPLAY))) { - CERROR("rq_xid "LPU64" matches last_xid, " - "expected RESENT flag\n", req->rq_xid); + DEBUG_REQ(D_WARNING, req, "rq_xid "LPU64" matches last_xid, " + "expected RESENT flag\n", req->rq_xid); LBUG(); req->rq_status = -ENOTCONN; RETURN(-ENOTCONN); @@ -2167,15 +2275,15 @@ int mdt_intent_lock_replace(struct mdt_thread_info *info, * lock. */ if (new_lock == NULL) - new_lock = ldlm_handle2lock(&lh->mlh_lh); + new_lock = ldlm_handle2lock(&lh->mlh_reg_lh); if (new_lock == NULL && (flags & LDLM_FL_INTENT_ONLY)) { - lh->mlh_lh.cookie = 0; + lh->mlh_reg_lh.cookie = 0; RETURN(0); } LASSERTF(new_lock != NULL, - "lockh "LPX64"\n", lh->mlh_lh.cookie); + "lockh "LPX64"\n", lh->mlh_reg_lh.cookie); /* * If we've already given this lock to a client once, then we should @@ -2199,7 +2307,7 @@ int mdt_intent_lock_replace(struct mdt_thread_info *info, */ LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT); - lh->mlh_lh.cookie = 0; + lh->mlh_reg_lh.cookie = 0; RETURN(ELDLM_LOCK_REPLACED); } @@ -2219,7 +2327,7 @@ int mdt_intent_lock_replace(struct mdt_thread_info *info, unlock_res_and_lock(new_lock); LDLM_LOCK_PUT(new_lock); - lh->mlh_lh.cookie = 0; + lh->mlh_reg_lh.cookie = 0; RETURN(ELDLM_LOCK_REPLACED); } @@ -2248,12 +2356,12 @@ static void mdt_intent_fixup_resent(struct req_capsule *pill, if (lock == new_lock) continue; if (lock->l_remote_handle.cookie == remote_hdl.cookie) { - lh->mlh_lh.cookie = lock->l_handle.h_cookie; - lh->mlh_mode = lock->l_granted_mode; + lh->mlh_reg_lh.cookie = lock->l_handle.h_cookie; + lh->mlh_reg_mode = lock->l_granted_mode; LDLM_DEBUG(lock, "restoring lock cookie"); DEBUG_REQ(D_HA, req, "restoring lock cookie "LPX64, - lh->mlh_lh.cookie); + lh->mlh_reg_lh.cookie); if (old_lock) *old_lock = LDLM_LOCK_GET(lock); spin_unlock(&exp->exp_ldlm_data.led_lock); @@ -2335,7 +2443,7 @@ static int mdt_intent_getattr(enum mdt_it_code opcode, ldlm_rep->lock_policy_res2 = 0; if (!mdt_get_disposition(ldlm_rep, DISP_LOOKUP_POS) || ldlm_rep->lock_policy_res2) { - lhc->mlh_lh.cookie = 0ull; + lhc->mlh_reg_lh.cookie = 0ull; GOTO(out_ucred, rc = ELDLM_LOCK_ABORTED); } @@ -2390,13 +2498,13 @@ static int mdt_intent_reint(enum mdt_it_code opcode, /* cross-ref case, the lock should be returned to the client */ if (rc == -EREMOTE) { - LASSERT(lustre_handle_is_used(&lhc->mlh_lh)); + LASSERT(lustre_handle_is_used(&lhc->mlh_reg_lh)); rep->lock_policy_res2 = 0; RETURN(mdt_intent_lock_replace(info, lockp, NULL, lhc, flags)); } rep->lock_policy_res2 = clear_serious(rc); - lhc->mlh_lh.cookie = 0ull; + lhc->mlh_reg_lh.cookie = 0ull; RETURN(ELDLM_LOCK_ABORTED); } diff --git a/lustre/mdt/mdt_internal.h b/lustre/mdt/mdt_internal.h index ad44c01..3250a7c 100644 --- a/lustre/mdt/mdt_internal.h +++ b/lustre/mdt/mdt_internal.h @@ -197,8 +197,13 @@ struct mdt_object { }; struct mdt_lock_handle { - struct lustre_handle mlh_lh; - ldlm_mode_t mlh_mode; + /* Regular lock */ + struct lustre_handle mlh_reg_lh; + ldlm_mode_t mlh_reg_mode; + + /* Pdirops lock */ + struct lustre_handle mlh_pdo_lh; + ldlm_mode_t mlh_pdo_mode; }; enum { diff --git a/lustre/mdt/mdt_open.c b/lustre/mdt/mdt_open.c index 2aadf00..cc20816 100644 --- a/lustre/mdt/mdt_open.c +++ b/lustre/mdt/mdt_open.c @@ -148,7 +148,7 @@ int mdt_epoch_open(struct mdt_thread_info *info, struct mdt_object *o) * In the later case, mdt_reint_setattr will do it. */ if (cancel && (info->mti_rr.rr_fid1 != NULL)) { struct mdt_lock_handle *lh = &info->mti_lh[MDT_LH_CHILD]; - lh->mlh_mode = LCK_EX; + lh->mlh_reg_mode = LCK_EX; rc = mdt_object_lock(info, o, lh, MDS_INODELOCK_UPDATE); if (rc == 0) mdt_object_unlock(info, o, lh, 1); @@ -734,9 +734,9 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) lh = &info->mti_lh[MDT_LH_PARENT]; if (!(create_flags & MDS_OPEN_CREAT)) - lh->mlh_mode = LCK_CR; + lh->mlh_reg_mode = LCK_CR; else - lh->mlh_mode = LCK_EX; + lh->mlh_reg_mode = LCK_EX; parent = mdt_object_find_lock(info, rr->rr_fid1, lh, MDS_INODELOCK_UPDATE); if (IS_ERR(parent)) @@ -819,16 +819,16 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) */ LASSERT(lhc != NULL); - if (lustre_handle_is_used(&lhc->mlh_lh)) { + if (lustre_handle_is_used(&lhc->mlh_reg_lh)) { struct ldlm_lock *lock; LASSERT(lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT); - lock = ldlm_handle2lock(&lhc->mlh_lh); + lock = ldlm_handle2lock(&lhc->mlh_reg_lh); if (!lock) { CERROR("Invalid lock handle "LPX64"\n", - lhc->mlh_lh.cookie); + lhc->mlh_reg_lh.cookie); LBUG(); } LASSERT(fid_res_name_eq(mdt_object_fid(child), @@ -837,7 +837,7 @@ int mdt_reint_open(struct mdt_thread_info *info, struct mdt_lock_handle *lhc) rc = 0; } else { mdt_lock_handle_init(lhc); - lhc->mlh_mode = LCK_CR; + lhc->mlh_reg_mode = LCK_CR; rc = mdt_object_lock(info, child, lhc, MDS_INODELOCK_LOOKUP); diff --git a/lustre/mdt/mdt_reint.c b/lustre/mdt/mdt_reint.c index da49c6b..3911547 100644 --- a/lustre/mdt/mdt_reint.c +++ b/lustre/mdt/mdt_reint.c @@ -51,7 +51,7 @@ static int mdt_md_create(struct mdt_thread_info *info) repbody = req_capsule_server_get(&info->mti_pill, &RMF_MDT_BODY); lh = &info->mti_lh[MDT_LH_PARENT]; - lh->mlh_mode = LCK_EX; + lh->mlh_reg_mode = LCK_EX; parent = mdt_object_find_lock(info, rr->rr_fid1, lh, MDS_INODELOCK_UPDATE); @@ -152,7 +152,7 @@ int mdt_attr_set(struct mdt_thread_info *info, struct mdt_object *mo, int flags) RETURN(0); lh = &info->mti_lh[MDT_LH_PARENT]; - lh->mlh_mode = LCK_EX; + lh->mlh_reg_mode = LCK_EX; if (!(flags & MRF_SETATTR_LOCKED)) { __u64 lockpart = MDS_INODELOCK_UPDATE; @@ -359,7 +359,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, /* step 1: lock the parent */ parent_lh = &info->mti_lh[MDT_LH_PARENT]; - parent_lh->mlh_mode = LCK_EX; + parent_lh->mlh_reg_mode = LCK_EX; mp = mdt_object_find_lock(info, rr->rr_fid1, parent_lh, MDS_INODELOCK_UPDATE); if (IS_ERR(mp)) @@ -416,7 +416,7 @@ static int mdt_reint_unlink(struct mdt_thread_info *info, if (IS_ERR(mc)) GOTO(out_unlock_parent, rc = PTR_ERR(mc)); child_lh = &info->mti_lh[MDT_LH_CHILD]; - child_lh->mlh_mode = LCK_EX; + child_lh->mlh_reg_mode = LCK_EX; rc = mdt_object_cr_lock(info, mc, child_lh, MDS_INODELOCK_FULL); if (rc != 0) GOTO(out_put_child, rc); @@ -471,7 +471,7 @@ static int mdt_reint_link(struct mdt_thread_info *info, if (rr->rr_name[0] == 0) { /* MDT holding name ask us to add ref. */ lhs = &info->mti_lh[MDT_LH_CHILD]; - lhs->mlh_mode = LCK_EX; + lhs->mlh_reg_mode = LCK_EX; ms = mdt_object_find_lock(info, rr->rr_fid2, lhs, MDS_INODELOCK_UPDATE); if (IS_ERR(ms)) @@ -485,7 +485,7 @@ static int mdt_reint_link(struct mdt_thread_info *info, /* step 1: find & lock the target parent dir */ lhp = &info->mti_lh[MDT_LH_PARENT]; - lhp->mlh_mode = LCK_EX; + lhp->mlh_reg_mode = LCK_EX; mp = mdt_object_find_lock(info, rr->rr_fid2, lhp, MDS_INODELOCK_UPDATE); if (IS_ERR(mp)) @@ -493,7 +493,7 @@ static int mdt_reint_link(struct mdt_thread_info *info, /* step 2: find & lock the source */ lhs = &info->mti_lh[MDT_LH_CHILD]; - lhs->mlh_mode = LCK_EX; + lhs->mlh_reg_mode = LCK_EX; ms = mdt_object_find(info->mti_env, info->mti_mdt, rr->rr_fid1); if (IS_ERR(ms)) GOTO(out_unlock_parent, rc = PTR_ERR(ms)); @@ -539,7 +539,7 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info) /* step 1: lookup & lock the tgt dir */ lh_tgt = &info->mti_lh[MDT_LH_CHILD]; lh_tgtdir = &info->mti_lh[MDT_LH_PARENT]; - lh_tgtdir->mlh_mode = LCK_EX; + lh_tgtdir->mlh_reg_mode = LCK_EX; mtgtdir = mdt_object_find_lock(info, rr->rr_fid1, lh_tgtdir, MDS_INODELOCK_UPDATE); if (IS_ERR(mtgtdir)) @@ -557,7 +557,7 @@ static int mdt_reint_rename_tgt(struct mdt_thread_info *info) if (lu_fid_eq(tgt_fid, rr->rr_fid2)) GOTO(out_unlock_tgtdir, rc); - lh_tgt->mlh_mode = LCK_EX; + lh_tgt->mlh_reg_mode = LCK_EX; mtgt = mdt_object_find_lock(info, tgt_fid, lh_tgt, MDS_INODELOCK_LOOKUP); @@ -703,7 +703,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info, /* step 1: lock the source dir */ lh_srcdirp = &info->mti_lh[MDT_LH_PARENT]; - lh_srcdirp->mlh_mode = LCK_EX; + lh_srcdirp->mlh_reg_mode = LCK_EX; msrcdir = mdt_object_find_lock(info, rr->rr_fid1, lh_srcdirp, MDS_INODELOCK_UPDATE); if (IS_ERR(msrcdir)) @@ -711,7 +711,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info, /*step 2: find & lock the target dir*/ lh_tgtdirp = &info->mti_lh[MDT_LH_CHILD]; - lh_tgtdirp->mlh_mode = LCK_EX; + lh_tgtdirp->mlh_reg_mode = LCK_EX; if (lu_fid_eq(rr->rr_fid1, rr->rr_fid2)) { mdt_object_get(info->mti_env, msrcdir); mtgtdir = msrcdir; @@ -740,7 +740,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info, GOTO(out_unlock_target, rc = -EINVAL); lh_oldp = &info->mti_lh[MDT_LH_OLD]; - lh_oldp->mlh_mode = LCK_EX; + lh_oldp->mlh_reg_mode = LCK_EX; mold = mdt_object_find_lock(info, old_fid, lh_oldp, MDS_INODELOCK_LOOKUP); if (IS_ERR(mold)) @@ -759,7 +759,7 @@ static int mdt_reint_rename(struct mdt_thread_info *info, lu_fid_eq(new_fid, rr->rr_fid2)) GOTO(out_unlock_old, rc = -EINVAL); - lh_newp->mlh_mode = LCK_EX; + lh_newp->mlh_reg_mode = LCK_EX; mnew = mdt_object_find(info->mti_env, info->mti_mdt, new_fid); if (IS_ERR(mnew)) GOTO(out_unlock_old, rc = PTR_ERR(mnew)); @@ -809,9 +809,8 @@ static int mdt_reint_rename(struct mdt_thread_info *info, mdt_handle_last_unlink(info, mnew, ma); out_unlock_new: - if (mnew) { + if (mnew) mdt_object_unlock_put(info, mnew, lh_newp, rc); - } out_unlock_old: mdt_object_unlock_put(info, mold, lh_oldp, rc); out_unlock_target: diff --git a/lustre/mdt/mdt_xattr.c b/lustre/mdt/mdt_xattr.c index 6ab89850..6fbf06e 100644 --- a/lustre/mdt/mdt_xattr.c +++ b/lustre/mdt/mdt_xattr.c @@ -315,7 +315,7 @@ int mdt_setxattr(struct mdt_thread_info *info) lockpart |= MDS_INODELOCK_LOOKUP; lh = &info->mti_lh[MDT_LH_PARENT]; - lh->mlh_mode = LCK_EX; + lh->mlh_reg_mode = LCK_EX; rc = mdt_object_lock(info, obj, lh, lockpart); if (rc != 0) GOTO(out, rc); -- 1.8.3.1