From 632ece8635e3f8dc7de374d44a420f1bea8f96c3 Mon Sep 17 00:00:00 2001 From: alex Date: Mon, 28 Jun 2004 11:59:54 +0000 Subject: [PATCH] - race fixed: if 1st client sees directory isn't large enough to be splitted then it doesn't take exclusive lock on it; in the meantime another client can enlarge directory and 1st client will run splitting w/o exclusive lock --- lustre/mds/handler.c | 12 ++++++++++-- lustre/mds/mds_internal.h | 2 +- lustre/mds/mds_lmv.c | 4 +++- lustre/mds/mds_open.c | 3 ++- lustre/mds/mds_reint.c | 12 +++++++++--- 5 files changed, 25 insertions(+), 8 deletions(-) diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index 9479266..9a04a47 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -189,7 +189,12 @@ int mds_lock_mode_for_dir(struct obd_device *obd, write lock here to retract client's cache for readdir */ ret_mode = LCK_CW; if (split == MDS_EXPECT_SPLIT) { - /* splitting possible. serialize any access */ + /* splitting possible. serialize any access + * the idea is that first one seen dir is + * splittable is given exclusive lock and + * split directory. caller passes lock mode + * to mds_try_to_split_dir() and splitting + * would be done with exclusive lock only -bzzz */ CDEBUG(D_OTHER, "%s: gonna split %u/%u\n", obd->obd_name, (unsigned) dentry->d_inode->i_ino, @@ -1457,7 +1462,10 @@ repeat: obd->obd_name, new->d_inode->i_ino, new->d_inode->i_generation, flags); } else if (body->oa.o_easize) { - mds_try_to_split_dir(obd, new, NULL, body->oa.o_easize); + /* we pass LCK_EX to split routine to signal that we have + * exclusive access to the directory. simple because nobody + * knows it already exists -bzzz */ + mds_try_to_split_dir(obd, new, NULL, body->oa.o_easize, LCK_EX); } cleanup: diff --git a/lustre/mds/mds_internal.h b/lustre/mds/mds_internal.h index ce3d1b8..385f471 100644 --- a/lustre/mds/mds_internal.h +++ b/lustre/mds/mds_internal.h @@ -132,7 +132,7 @@ int mds_lmv_postsetup(struct obd_device *obd); int mds_lmv_connect(struct obd_device *obd, char * lov_name); int mds_lmv_disconnect(struct obd_device *obd, int flags); int mds_try_to_split_dir(struct obd_device *, struct dentry *, struct mea **, - int); + int, int); int mds_get_lmv_attr(struct obd_device *, struct inode *, struct mea **, int *); int mds_choose_mdsnum(struct obd_device *, const char *, int, int); int mds_lmv_postsetup(struct obd_device *); diff --git a/lustre/mds/mds_lmv.c b/lustre/mds/mds_lmv.c index 0b7eeac..7496916 100644 --- a/lustre/mds/mds_lmv.c +++ b/lustre/mds/mds_lmv.c @@ -521,7 +521,7 @@ int mds_splitting_expected(struct obd_device *obd, struct dentry *dentry) * must not be called on already splitted directories. */ int mds_try_to_split_dir(struct obd_device *obd, struct dentry *dentry, - struct mea **mea, int nstripes) + struct mea **mea, int nstripes, int update_mode) { struct inode *dir = dentry->d_inode; struct mds_obd *mds = &obd->u.mds; @@ -531,6 +531,8 @@ int mds_try_to_split_dir(struct obd_device *obd, struct dentry *dentry, void *handle; ENTRY; + if (update_mode != LCK_EX) + return 0; /* TODO: optimization possible - we already may have mea here */ rc = mds_splitting_expected(obd, dentry); if (rc == MDS_NO_SPLITTABLE) diff --git a/lustre/mds/mds_open.c b/lustre/mds/mds_open.c index 32af2fb..7fc99a7 100644 --- a/lustre/mds/mds_open.c +++ b/lustre/mds/mds_open.c @@ -977,7 +977,8 @@ got_child: struct iattr iattr; struct inode *inode; - if ((rc = mds_try_to_split_dir(obd, dparent, &mea, 0))) { + if ((rc = mds_try_to_split_dir(obd, dparent, &mea, + 0, update_mode))) { if (rc > 0) { /* dir got splitted */ CERROR("%s: splitted %lu/%u - %d\n", obd->obd_name, diff --git a/lustre/mds/mds_reint.c b/lustre/mds/mds_reint.c index 20486d6..d10fc12 100644 --- a/lustre/mds/mds_reint.c +++ b/lustre/mds/mds_reint.c @@ -624,7 +624,8 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, OBD_FAIL_WRITE(OBD_FAIL_MDS_REINT_CREATE_WRITE, dir->i_sb); if (type == S_IFREG || type == S_IFDIR) { - if ((rc = mds_try_to_split_dir(obd, dparent, &mea, 0))) { + if ((rc = mds_try_to_split_dir(obd, dparent, &mea, + 0, parent_mode))) { if (rc > 0) { /* dir got splitted */ CERROR("%s: splitted %lu/%u - %d/%d\n", @@ -681,8 +682,13 @@ static int mds_reint_create(struct mds_update_record *rec, int offset, nstripes = *(u16 *)rec->ur_eadata; if (rc == 0 && nstripes) { + /* we pass LCK_EX to split routine to + * signalthat we have exclusive access + * to the directory. simple because + * nobody knows it already exists -bzzz */ if ((rc = mds_try_to_split_dir(obd, dchild, - NULL, nstripes))) { + NULL, nstripes, + LCK_EX))) { if (rc > 0) { /* dir got splitted */ CERROR("%s: splitted %lu/%u - %d\n", @@ -1407,7 +1413,7 @@ int mds_create_local_dentry(struct mds_update_record *rec, } if (IS_ERR(child)) { - CERROR("can't get victim\n"); + CERROR("can't get victim: %d\n", (int) PTR_ERR(child)); GOTO(cleanup, rc = PTR_ERR(child)); } cleanup_phase = 2; -- 1.8.3.1