From: shadow Date: Thu, 4 Oct 2007 12:59:44 +0000 (+0000) Subject: refine locking for avoid write wrong info into lov_objid file and some races. X-Git-Tag: v1_7_0_51~662 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=54ce66650db543b32d05cd597ff06ae6e4af2171 refine locking for avoid write wrong info into lov_objid file and some races. b=12702 i=nathan i=tappro --- diff --git a/lustre/ChangeLog b/lustre/ChangeLog index 2bce9ed..7faba05 100644 --- a/lustre/ChangeLog +++ b/lustre/ChangeLog @@ -703,6 +703,19 @@ Details : If a OST has no remain object, system will block on the creating empty osc while others are not empty. If we must block, we block for the shortest possible period of time. +Severity : major +Bugzilla : 11710 +Description: improve handling recoverable errors +Details : if request processig with error which can be recoverable on server + request should be resend, otherwise page released from cache and + marked as error. + +Severity : enhancement +Bugzilla : 12702 +Description: refine locking for avoid write wrong info into lov_objid file +Details : fix possible races with add new target and write/update data in + lov_objid file. + -------------------------------------------------------------------------------- 2007-05-03 Cluster File Systems, Inc. diff --git a/lustre/include/obd.h b/lustre/include/obd.h index e86e153..5d1fe3b 100644 --- a/lustre/include/obd.h +++ b/lustre/include/obd.h @@ -523,12 +523,19 @@ struct mds_obd { struct obd_export *mds_osc_exp; /* XXX lov_exp */ struct lov_desc mds_lov_desc; __u32 mds_id; - obd_id *mds_lov_objids; - int mds_lov_objids_size; - __u32 mds_lov_objids_in_file; + unsigned int mds_lov_objids_dirty:1; - int mds_lov_nextid_set; struct file *mds_lov_objid_filp; + /* protect update vs free in lov_add_target */ + struct rw_semaphore mds_lov_objids_sem; + /* protect update vs update or memmove vs update */ + spinlock_t mds_lov_objids_lock; + /* wait for safe free */ + cfs_waitq_t mds_lov_objids_wait; + obd_id *mds_lov_objids; + __u32 mds_lov_objids_count; + int mds_lov_nextid_set; + struct file *mds_health_check_filp; unsigned long *mds_client_bitmap; // struct upcall_cache *mds_group_hash; @@ -552,6 +559,8 @@ struct mds_obd { struct lustre_capa_key *mds_capa_keys; }; +#define mds_lov_objids_size(mds) ((mds)->mds_lov_objids_count*sizeof(obd_id)) + struct echo_obd { struct obdo eo_oa; spinlock_t eo_lock; diff --git a/lustre/mds/handler.c b/lustre/mds/handler.c index a15c52b..8cbed31 100644 --- a/lustre/mds/handler.c +++ b/lustre/mds/handler.c @@ -2247,7 +2247,7 @@ static int mds_cleanup(struct obd_device *obd) mds_update_server_data(obd, 1); if (mds->mds_lov_objids != NULL) - OBD_FREE(mds->mds_lov_objids, mds->mds_lov_objids_size); + OBD_FREE(mds->mds_lov_objids, mds_lov_objids_size(mds)); mds_fs_cleanup(obd); #if 0 @@ -2846,12 +2846,15 @@ static int mds_cmd_setup(struct obd_device *obd, struct lustre_cfg *lcfg) CERROR("cannot open/create %s file: rc = %d\n", LOV_OBJID, rc); GOTO(err_fid, rc = PTR_ERR(file)); } - mds->mds_lov_objid_filp = file; if (!S_ISREG(file->f_dentry->d_inode->i_mode)) { CERROR("%s is not a regular file!: mode = %o\n", LOV_OBJID, file->f_dentry->d_inode->i_mode); GOTO(err_lov_objid, rc = -ENOENT); } + init_rwsem(&mds->mds_lov_objids_sem); + spin_lock_init(&mds->mds_lov_objids_lock); + cfs_waitq_init(&mds->mds_lov_objids_wait); + mds->mds_lov_objid_filp = file; rc = mds_lov_presetup(mds, lcfg); if (rc < 0) @@ -2911,7 +2914,7 @@ static int mds_cmd_cleanup(struct obd_device *obd) } if (mds->mds_lov_objids != NULL) - OBD_FREE(mds->mds_lov_objids, mds->mds_lov_objids_size); + OBD_FREE(mds->mds_lov_objids, mds_lov_objids_size(mds)); shrink_dcache_parent(mds->mds_fid_de); dput(mds->mds_fid_de); diff --git a/lustre/mds/mds_lov.c b/lustre/mds/mds_lov.c index a0ef3ce..7688a2d 100644 --- a/lustre/mds/mds_lov.c +++ b/lustre/mds/mds_lov.c @@ -45,15 +45,24 @@ void mds_lov_update_objids(struct obd_device *obd, obd_id *ids) { struct mds_obd *mds = &obd->u.mds; int i; + int dirty = 0; ENTRY; - lock_kernel(); - for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++) + down_read(&mds->mds_lov_objids_sem); + + spin_lock(&mds->mds_lov_objids_lock); + for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++) { if (ids[i] > (mds->mds_lov_objids)[i]) { (mds->mds_lov_objids)[i] = ids[i]; - mds->mds_lov_objids_dirty = 1; + dirty = 1; } - unlock_kernel(); + } + mds->mds_lov_objids_dirty = dirty; + + spin_unlock(&mds->mds_lov_objids_lock); + + up_read(&mds->mds_lov_objids_sem); + EXIT; } EXPORT_SYMBOL(mds_lov_update_objids); @@ -66,7 +75,7 @@ static int mds_lov_read_objids(struct obd_device *obd) int i, rc, size; ENTRY; - LASSERT(!mds->mds_lov_objids_size); + LASSERT(!mds->mds_lov_objids_count); LASSERT(!mds->mds_lov_objids_dirty); /* Read everything in the file, even if our current lov desc @@ -74,41 +83,46 @@ static int mds_lov_read_objids(struct obd_device *obd) during mds setup may still have valid objids. */ size = i_size_read(mds->mds_lov_objid_filp->f_dentry->d_inode); if (size == 0) - RETURN(0); + GOTO(out, rc = 0); OBD_ALLOC(ids, size); if (ids == NULL) - RETURN(-ENOMEM); - mds->mds_lov_objids = ids; - mds->mds_lov_objids_size = size; + GOTO(out, rc = -ENOMEM); rc = fsfilt_read_record(obd, mds->mds_lov_objid_filp, ids, size, &off); if (rc < 0) { + OBD_FREE(ids, size); CERROR("Error reading objids %d\n", rc); - RETURN(rc); + GOTO(out, rc); } + mds->mds_lov_objids = ids; + mds->mds_lov_objids_count = size / sizeof(*ids); - mds->mds_lov_objids_in_file = size / sizeof(*ids); - - for (i = 0; i < mds->mds_lov_objids_in_file; i++) { + for (i = 0; i < mds->mds_lov_objids_count; i++) { CDEBUG(D_INFO, "read last object "LPU64" for idx %d\n", mds->mds_lov_objids[i], i); } - RETURN(0); -} +out: + RETURN(rc); +} +/* must held mds_lov_objids_sem */ int mds_lov_write_objids(struct obd_device *obd) { struct mds_obd *mds = &obd->u.mds; loff_t off = 0; int i, rc, tgts; ENTRY; - - if (!mds->mds_lov_objids_dirty) + + spin_lock(&mds->mds_lov_objids_lock); + if (!mds->mds_lov_objids_dirty) { + spin_unlock(&mds->mds_lov_objids_lock); RETURN(0); + } + mds->mds_lov_objids_dirty = 0; + spin_unlock(&mds->mds_lov_objids_lock); - tgts = max(mds->mds_lov_desc.ld_tgt_count, mds->mds_lov_objids_in_file); - + tgts = mds->mds_lov_objids_count; if (!tgts) RETURN(0); @@ -120,8 +134,10 @@ int mds_lov_write_objids(struct obd_device *obd) mds->mds_lov_objids, tgts * sizeof(obd_id), &off, 0); if (rc >= 0) { - mds->mds_lov_objids_dirty = 0; rc = 0; + cfs_waitq_signal(&mds->mds_lov_objids_wait); + } else { + mds->mds_lov_objids_dirty = 1; } RETURN(rc); @@ -162,10 +178,15 @@ int mds_lov_set_nextid(struct obd_device *obd) ENTRY; LASSERT(!obd->obd_recovering); + if (mds->mds_lov_objids_count == 0) + RETURN(0); LASSERT(mds->mds_lov_objids != NULL); - rc = obd_set_info_async(mds->mds_osc_exp, strlen(KEY_NEXT_ID), + /* mds->mds_lov_objids_sem must be held so mds_lov_objids doesn't change + * we need use mds->mds_lov_desc.ld_tgt_count because in recovery not all + * target can be connected at start time */ + rc = obd_set_info_async(mds->mds_osc_exp, sizeof(KEY_NEXT_ID), KEY_NEXT_ID, mds->mds_lov_desc.ld_tgt_count * sizeof(*mds->mds_lov_objids), @@ -191,36 +212,39 @@ static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov) if (!ld) RETURN(-ENOMEM); - rc = obd_get_info(lov, strlen(KEY_LOVDESC) + 1, KEY_LOVDESC, + rc = obd_get_info(lov, sizeof(KEY_LOVDESC), KEY_LOVDESC, &valsize, ld); if (rc) GOTO(out, rc); /* The size of the LOV target table may have increased. */ - size = ld->ld_tgt_count * sizeof(obd_id); - if ((mds->mds_lov_objids_size == 0) || - (size > mds->mds_lov_objids_size)) { + if (ld->ld_tgt_count > mds->mds_lov_objids_count) { obd_id *ids; - /* add room by powers of 2 */ - size = 1; - while (size < ld->ld_tgt_count) - size = size << 1; - size = size * sizeof(obd_id); + size = ld->ld_tgt_count * sizeof(obd_id); OBD_ALLOC(ids, size); if (ids == NULL) GOTO(out, rc = -ENOMEM); memset(ids, 0, size); - if (mds->mds_lov_objids_size) { + + /* write lock is enough for protect from access to + * old pointer in mds_lov_update_objids and dirty == 0 + * is enough for protect from access in write_objids */ + if (mds->mds_lov_objids) { + struct l_wait_info lwi = { 0 }; obd_id *old_ids = mds->mds_lov_objids; + memcpy(ids, mds->mds_lov_objids, - mds->mds_lov_objids_size); - mds->mds_lov_objids = ids; - OBD_FREE(old_ids, mds->mds_lov_objids_size); + mds_lov_objids_size(mds)); + + l_wait_event(mds->mds_lov_objids_wait, + mds->mds_lov_objids_dirty == 0, &lwi); + + OBD_FREE(old_ids, mds_lov_objids_size(mds)); } mds->mds_lov_objids = ids; - mds->mds_lov_objids_size = size; + mds->mds_lov_objids_count = ld->ld_tgt_count; } /* Don't change the mds_lov_desc until the objids size matches the @@ -231,7 +255,7 @@ static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov) stripes = min_t(__u32, LOV_MAX_STRIPE_COUNT, max(mds->mds_lov_desc.ld_tgt_count, - mds->mds_lov_objids_in_file)); + mds->mds_lov_objids_count)); mds->mds_max_mdsize = lov_mds_md_size(stripes); mds->mds_max_cookiesize = stripes * sizeof(struct llog_cookie); CDEBUG(D_CONFIG, "updated max_mdsize/max_cookiesize for %d stripes: " @@ -242,7 +266,7 @@ static int mds_lov_update_desc(struct obd_device *obd, struct obd_export *lov) /* We only _need_ to do this at first add (idx), or the first time after recovery. However, it should now be safe to call anytime. */ mutex_down(&obd->obd_dev_sem); - llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL); + rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL); mutex_up(&obd->obd_dev_sem); /*XXX this notifies the MDD until lov handling use old mds code */ @@ -267,57 +291,103 @@ static int mds_lov_update_mds(struct obd_device *obd, struct mds_obd *mds = &obd->u.mds; int old_count; int rc = 0; + obd_id lastid; + __u32 size = sizeof(lastid); + ENTRY; old_count = mds->mds_lov_desc.ld_tgt_count; + down_write(&mds->mds_lov_objids_sem); rc = mds_lov_update_desc(obd, mds->mds_osc_exp); + downgrade_write(&mds->mds_lov_objids_sem); if (rc) - RETURN(rc); + GOTO(out, rc); CDEBUG(D_CONFIG, "idx=%d, recov=%d/%d, cnt=%d/%d\n", idx, obd->obd_recovering, obd->obd_async_recov, old_count, mds->mds_lov_desc.ld_tgt_count); /* idx is set as data from lov_notify. */ - if (idx != MDSLOV_NO_INDEX && !obd->obd_recovering) { - if (idx >= mds->mds_lov_desc.ld_tgt_count) { - CERROR("index %d > count %d!\n", idx, - mds->mds_lov_desc.ld_tgt_count); - RETURN(-EINVAL); - } + if (idx == MDSLOV_NO_INDEX || obd->obd_recovering) + GOTO(out, rc); - if (idx >= mds->mds_lov_objids_in_file) { - /* We never read this lastid; ask the osc */ - obd_id lastid; - __u32 size = sizeof(lastid); - rc = obd_get_info(watched->obd_self_export, - strlen("last_id"), - "last_id", &size, &lastid); - if (rc) - RETURN(rc); - mds->mds_lov_objids[idx] = lastid; - mds->mds_lov_objids_dirty = 1; - mds_lov_write_objids(obd); - } else { - /* We have read this lastid from disk; tell the osc. - Don't call this during recovery. */ - rc = mds_lov_set_nextid(obd); - } + if (idx >= mds->mds_lov_desc.ld_tgt_count) { + CERROR("index %d > count %d!\n", idx, + mds->mds_lov_desc.ld_tgt_count); + GOTO(out, rc = -EINVAL); + } + rc = obd_get_info(watched->obd_self_export, sizeof("last_id"), + "last_id", &size, &lastid); + if (rc) + GOTO(out, rc); - CDEBUG(D_CONFIG, "last object "LPU64" from OST %d\n", - mds->mds_lov_objids[idx], idx); + spin_lock(&mds->mds_lov_objids_lock); + if (mds->mds_lov_objids[idx] == 0 || mds->mds_lov_objids[idx] > lastid) { + /* last id not init or corrupted - use data from osc */ + mds->mds_lov_objids[idx] = lastid; + mds->mds_lov_objids_dirty = 1; + spin_unlock(&mds->mds_lov_objids_lock); + /* not need write immediately, mark for write for avoid + * lock inverse */ + } else { + spin_unlock(&mds->mds_lov_objids_lock); + /* We have read this lastid from disk; tell the osc. + Don't call this during recovery. */ + rc = mds_lov_set_nextid(obd); } + CDEBUG(D_CONFIG, "last object "LPU64" from OST %d\n", + mds->mds_lov_objids[idx], idx); + +out: + up_read(&mds->mds_lov_objids_sem); + RETURN(rc); +} + +int mds_update_objids_from_lastid(struct obd_device *obd) +{ + struct mds_obd *mds = &obd->u.mds; + int size; + int rc, i; + + if (mds->mds_lov_objids_count < mds->mds_lov_desc.ld_tgt_count) { + obd_id *ids; + + size = mds->mds_lov_desc.ld_tgt_count * sizeof(obd_id); + OBD_ALLOC(ids, size); + if (ids == NULL) + GOTO(out, rc = -ENOMEM); + + OBD_FREE(mds->mds_lov_objids, mds_lov_objids_size(mds)); + mds->mds_lov_objids = ids; + mds->mds_lov_objids_count = size / sizeof(obd_id); + } + + size = mds->mds_lov_desc.ld_tgt_count * sizeof(obd_id); + rc = obd_get_info(mds->mds_osc_exp, sizeof("last_id"), + "last_id", &size, mds->mds_lov_objids); + if (!rc) { + for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++) + CWARN("got last object "LPU64" from OST %d\n", + mds->mds_lov_objids[i], i); + mds->mds_lov_objids_dirty = 1; + rc = mds_lov_write_objids(obd); + if (rc) + CERROR("got last objids from OSTs, but error " + "writing objids file: %d\n", rc); + } +out: RETURN(rc); } + /* update the LOV-OSC knowledge of the last used object id's */ int mds_lov_connect(struct obd_device *obd, char * lov_name) { struct mds_obd *mds = &obd->u.mds; struct lustre_handle conn = {0,}; struct obd_connect_data *data; - int rc, i; + int rc; ENTRY; if (IS_ERR(mds->mds_osc_obd)) @@ -364,6 +434,7 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name) /* Deny new client connections until we are sure we have some OSTs */ obd->obd_no_conn = 1; + down_write(&mds->mds_lov_objids_sem); rc = mds_lov_read_objids(obd); if (rc) { CERROR("cannot read %s: rc = %d\n", "lov_objids", rc); @@ -374,30 +445,12 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name) if (rc) GOTO(err_reg, rc); - /* tgt_count may be 0! */ - rc = llog_cat_initialize(obd, NULL, mds->mds_lov_desc.ld_tgt_count, NULL); - if (rc) { - CERROR("failed to initialize catalog %d\n", rc); - GOTO(err_reg, rc); - } - /* If we're mounting this code for the first time on an existing FS, * we need to populate the objids array from the real OST values */ - if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objids_in_file) { - int size = sizeof(obd_id) * mds->mds_lov_desc.ld_tgt_count; - rc = obd_get_info(mds->mds_osc_exp, strlen("last_id"), - "last_id", &size, mds->mds_lov_objids); - if (!rc) { - for (i = 0; i < mds->mds_lov_desc.ld_tgt_count; i++) - CWARN("got last object "LPU64" from OST %d\n", - mds->mds_lov_objids[i], i); - mds->mds_lov_objids_dirty = 1; - rc = mds_lov_write_objids(obd); - if (rc) - CERROR("got last objids from OSTs, but error " - "writing objids file: %d\n", rc); - } + if (mds->mds_lov_desc.ld_tgt_count > mds->mds_lov_objids_count) { + mds_update_objids_from_lastid(obd); } + up_write(&mds->mds_lov_objids_sem); /* I want to see a callback happen when the OBD moves to a * "For General Use" state, and that's when we'll call @@ -411,6 +464,7 @@ int mds_lov_connect(struct obd_device *obd, char * lov_name) RETURN(rc); err_reg: + up_write(&mds->mds_lov_objids_sem); obd_register_observer(mds->mds_osc_obd, NULL); err_discon: obd_disconnect(mds->mds_osc_exp); @@ -822,6 +876,7 @@ int mds_lov_start_synchronize(struct obd_device *obd, int mds_notify(struct obd_device *obd, struct obd_device *watched, enum obd_notify_event ev, void *data) { + struct mds_obd *mds = &obd->u.mds; int rc = 0; ENTRY; @@ -852,7 +907,9 @@ int mds_notify(struct obd_device *obd, struct obd_device *watched, /* We still have to fix the lov descriptor for ost's added after the mdt in the config log. They didn't make it into mds_lov_connect. */ - rc = mds_lov_update_desc(obd, obd->u.mds.mds_osc_exp); + down_write(&mds->mds_lov_objids_sem); + rc = mds_lov_update_desc(obd, mds->mds_osc_exp); + up_write(&mds->mds_lov_objids_sem); if (rc) RETURN(rc); /* We should update init llog here too for replay unlink and