From bec818434c27bb390b4c8866e73d1afb0dd9e884 Mon Sep 17 00:00:00 2001 From: Lai Siyao Date: Wed, 17 Aug 2011 01:31:42 -0700 Subject: [PATCH] LU-146 mds_open() may deadlock * Get open lock inside mds_get_parent_child_locked() to avoid deadlock. * Never get open lock if child is newly created to avoid deadlock. Signed-off-by: Lai Siyao Change-Id: I1a1dadafef364f36ad3b892ba7a2d669f6027f10 Reviewed-on: http://review.whamcloud.com/1259 Tested-by: Hudson Tested-by: Maloo Reviewed-by: Oleg Drokin Reviewed-by: Johann Lombardi --- lustre/mds/handler.c | 3 ++- lustre/mds/mds_internal.h | 3 ++- lustre/mds/mds_open.c | 62 ++++++----------------------------------------- lustre/mds/mds_reint.c | 31 ++++++++++++++++++++++-- 4 files changed, 40 insertions(+), 59 deletions(-) diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 6225881..1288132 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -950,7 +950,8 @@ static int mds_getattr_lock(struct ptlrpc_request *req, int offset, MDS_INODELOCK_UPDATE, name, namesize, child_lockh, &dchild, - LCK_CR, child_part); + LCK_CR, child_part, + IT_GETATTR, 0); } else { /* For revalidate by fid we always take UPDATE lock */ dchild = mds_fid2locked_dentry(obd, &body->fid2, NULL, diff --git a/lustre/mds/mds_internal.h b/lustre/mds/mds_internal.h index 7c8979b..5d3c336 100644 --- a/lustre/mds/mds_internal.h +++ b/lustre/mds/mds_internal.h @@ -161,7 +161,8 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds, char *name, int namelen, struct lustre_handle *child_lockh, struct dentry **dchildp, int child_mode, - __u64 child_lockpart); + __u64 child_lockpart, + int it_op, __u32 flags); int mds_lock_new_child(struct obd_device *obd, struct inode *inode, struct lustre_handle *child_lockh); int mds_osc_setattr_async(struct obd_device *obd, struct inode *inode, diff --git a/lustre/mds/mds_open.c b/lustre/mds/mds_open.c index 8ff4860..b05a9d3 100644 --- a/lustre/mds/mds_open.c +++ b/lustre/mds/mds_open.c @@ -178,7 +178,6 @@ static int mds_get_write_access(struct mds_obd *mds, struct inode *inode, RETURN(-ETXTBSY); } - if (MDS_FILTERDATA(inode) && MDS_FILTERDATA(inode)->io_epoch != 0) { CDEBUG(D_INODE, "continuing MDS epoch "LPU64" for ino %lu/%u\n", MDS_FILTERDATA(inode)->io_epoch, inode->i_ino, @@ -997,27 +996,6 @@ int mds_lock_new_child(struct obd_device *obd, struct inode *inode, RETURN(rc); } -static int noinline mds_get_open_lock(struct obd_device *obd, - struct dentry *dchild, - int child_mode, - struct lustre_handle *child_lockh) -{ - ldlm_policy_data_t policy = { - .l_inodebits = { MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN } - }; - struct ldlm_res_id child_res_id = { - .name[0] = dchild->d_inode->i_ino, - .name[1] = dchild->d_inode->i_generation, - }; - int lock_flags = 0; - - return ldlm_cli_enqueue_local(obd->obd_namespace, &child_res_id, - LDLM_IBITS, &policy, child_mode, - &lock_flags, ldlm_blocking_ast, - ldlm_completion_ast, NULL, NULL, - 0, NULL, child_lockh); -} - int mds_open(struct mds_update_record *rec, int offset, struct ptlrpc_request *req, struct lustre_handle *child_lockh) { @@ -1041,7 +1019,7 @@ int mds_open(struct mds_update_record *rec, int offset, /* Always returning LOOKUP lock if open succesful to guard dentry on client. */ int quota_pending[2] = {0, 0}; - int use_parent, need_open_lock; + int use_parent; unsigned int gid = current_fsgid(); ENTRY; @@ -1143,9 +1121,6 @@ int mds_open(struct mds_update_record *rec, int offset, (rec->ur_flags & MDS_OPEN_LOCK) && (rec->ur_namelen == 1)) || (rec->ur_flags & MDS_OPEN_JOIN_FILE); - need_open_lock = !(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) && - (rec->ur_flags & MDS_OPEN_LOCK); - /* Try to lock both parent and child first. If child is not found, * return only locked parent. This is enough to prevent other * threads from changing this directory until creation is finished. */ @@ -1156,11 +1131,11 @@ int mds_open(struct mds_update_record *rec, int offset, MDS_INODELOCK_UPDATE, use_parent ? NULL : rec->ur_name, rec->ur_namelen, - (rec->ur_flags & MDS_OPEN_LOCK) ? - child_lockh : NULL, + child_lockh, &dchild, child_mode, MDS_INODELOCK_LOOKUP | - MDS_INODELOCK_OPEN); + MDS_INODELOCK_OPEN, + IT_OPEN, rec->ur_flags); if (rc) { if (rc != -ENOENT) { @@ -1308,13 +1283,12 @@ int mds_open(struct mds_update_record *rec, int offset, } else { acc_mode = accmode(dchild->d_inode, rec->ur_flags); /* Child previously existed so the lookup and lock is already - * done, so no further locking is needed. */ + * done. */ /* for nfs and join - we need two locks for same fid, but * with different mode */ - if (need_open_lock && !use_parent) { + if (!(lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) && + (rec->ur_flags & MDS_OPEN_LOCK) && !use_parent) ldlm_reply_set_disposition(rep, DISP_OPEN_LOCK); - need_open_lock = 0; - } } LASSERTF(!mds_inode_is_orphan(dchild->d_inode), @@ -1376,28 +1350,6 @@ found_child: GOTO(cleanup, rc = -EAGAIN); } - if (need_open_lock) { - rc = mds_get_open_lock(obd, dchild, child_mode, child_lockh); - if (rc != ELDLM_OK) - GOTO(cleanup, rc); - /* Let mds_intent_policy know that we have a lock to return */ - ldlm_reply_set_disposition(rep, DISP_OPEN_LOCK); - } else if (!(rec->ur_flags & MDS_OPEN_LOCK) && - S_ISREG(dchild->d_inode->i_mode) && - (dchild->d_inode->i_mode & S_IXUGO) && - (rec->ur_flags & (FMODE_WRITE | MDS_FMODE_EXEC))) { - /* if this is an executable, and a non-nfsd client open write or - * execute it, revoke open lock in case other client holds a - * open lock which denies writing/executing in mds_finish_open() - * below. LU-146 - */ - rc = mds_get_open_lock(obd, dchild, child_mode, child_lockh); - if (rc != ELDLM_OK) - GOTO(cleanup, rc); - ldlm_lock_decref(child_lockh, child_mode); - memset(child_lockh, 0, sizeof(*child_lockh)); - } - if (!S_ISREG(dchild->d_inode->i_mode) && !S_ISDIR(dchild->d_inode->i_mode) && (req->rq_export->exp_connect_flags & OBD_CONNECT_NODEVOH)) { diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index 2568220..3558c2d 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -1595,7 +1595,7 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds, char *name, int namelen, struct lustre_handle *child_lockh, struct dentry **dchildp, int child_mode, - __u64 child_lockpart) + __u64 child_lockpart, int it_op, __u32 flags) { struct ldlm_res_id child_res_id = { .name = {0} }; struct ldlm_res_id parent_res_id = { .name = {0} }; @@ -1624,6 +1624,7 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds, if (name == NULL) GOTO(retry_locks, rc); + /* Step 2: Lookup child (without DLM lock, to get resource name) */ *dchildp = mds_lookup(obd, name, *dparentp, namelen - 1); if (IS_ERR(*dchildp)) { @@ -1659,6 +1660,20 @@ int mds_get_parent_child_locked(struct obd_device *obd, struct mds_obd *mds, if ((child_mode & (LCK_CR|LCK_PR|LCK_CW)) && INODE_CTIME_OLD(inode)) child_policy.l_inodebits.bits |= MDS_INODELOCK_UPDATE; + if (it_op == IT_OPEN && !(flags & MDS_OPEN_LOCK)) { + /* + * LU-146 + * if this is an executable, and a non-nfsd client open write or + * execute it, revoke open lock in case other client holds an + * open lock which denies write/execute in mds_finish_open(). + */ + LASSERT(child_lockh != NULL); + if (!(S_ISREG(inode->i_mode) && + (inode->i_mode & S_IXUGO) && + (flags & (FMODE_WRITE | MDS_FMODE_EXEC)))) + child_lockh = NULL; + } + iput(inode); retry_locks: @@ -1688,6 +1703,17 @@ retry_locks: GOTO(cleanup, rc); } + if (it_op == IT_OPEN && !(flags & MDS_OPEN_LOCK) && child_lockh && + (*dchildp)->d_inode != NULL) { + /* + * LU-146 + * See above, revoke open lock only, no need to reply child + * lock back to client. + */ + ldlm_lock_decref(child_lockh, child_mode); + memset(child_lockh, 0, sizeof(*child_lockh)); + } + cleanup: if (rc) { switch (cleanup_phase) { @@ -1926,7 +1952,8 @@ static int mds_reint_unlink(struct mds_update_record *rec, int offset, MDS_INODELOCK_UPDATE, rec->ur_name, rec->ur_namelen, &child_lockh, &dchild, LCK_EX, - MDS_INODELOCK_FULL); + MDS_INODELOCK_FULL, + IT_UNLINK, 0); if (rc) GOTO(cleanup, rc); -- 1.8.3.1