From 7c6d08994b23cc3ef112e3626f9402dbccf0bc2c Mon Sep 17 00:00:00 2001 From: Sergey Cheremencev Date: Fri, 25 Aug 2023 10:22:26 +0400 Subject: [PATCH] LU-17034 quota: lqeg_arr memmory corruption Fix memory corruption caused by accessing memory out of array lqeg_arr. It could happen when at least one of OSTs has index larger than the whole number of OSTs. For example, if the system has 4 OSTs with indexes 0001, 0002, 00c9, 00ca. This issue more often corrupted bucket_table in obd_uuid_hash or obd_nid_hash causing to crash rhashtable code. However, it could be the reason of other panics depending on the type of corrupted neighbour memory region. This patch adds an lge_idx field to each lqe global entry to store index of the OST. It is needed to map OST index to the array index to avoid out-of-bound array access. This patch also add locking to protect lqe_glbl_data in qmt_set_revoke and qmt_clear_lgeg_arr_nu. This was forgotten in 50ff4d1da6. This patch begins to store all connected MDTs in the quota global pool. Thus handling MDTs beginning from this patch is the same with OSTs stored in the global pool. It is the 1st step to introduce MDT pools. Add conf-sanity_33c that reproduces mentioned memory corruption without the fix. Lustre-change: https://review.whamcloud.com/52094 Lustre-commit: 67f90e42889ff22d574e82cc647f6076e48c65a5 Fixes: 50ff4d1da6 ("LU-16772 quota: protect lqe_glbl_data in qmt_site_recalc_cb") Signed-off-by: Sergey Cheremencev Change-Id: Id6e4bcde09d9f32726d69f711eedb82729a2266e Reviewed-on: https://review.whamcloud.com/c/ex/lustre-release/+/53810 Reviewed-by: Andreas Dilger Tested-by: jenkins Tested-by: Maloo --- lustre/include/obd_target.h | 7 +- lustre/quota/lquota_internal.h | 4 +- lustre/quota/qmt_entry.c | 96 ++++++++------ lustre/quota/qmt_internal.h | 17 ++- lustre/quota/qmt_lock.c | 109 +++++++++++----- lustre/quota/qmt_pool.c | 283 +++++++++++++++++++++++++---------------- lustre/target/tgt_pool.c | 11 +- lustre/tests/conf-sanity.sh | 9 +- 8 files changed, 339 insertions(+), 197 deletions(-) diff --git a/lustre/include/obd_target.h b/lustre/include/obd_target.h index 406f408..b2a6fae6 100644 --- a/lustre/include/obd_target.h +++ b/lustre/include/obd_target.h @@ -81,7 +81,12 @@ struct lu_tgt_pool { }; int tgt_pool_init(struct lu_tgt_pool *op, unsigned int count); -int tgt_pool_add(struct lu_tgt_pool *op, __u32 idx, unsigned int min_count); +#define tgt_pool_add(op, idx, min_count) \ + tgt_pool_add_lock(op, idx, min_count, true) +#define tgt_pool_add_locked(op, idx, min_count) \ + tgt_pool_add_lock(op, idx, min_count, false) +int tgt_pool_add_lock(struct lu_tgt_pool *op, __u32 idx, + unsigned int min_count, bool locked); int tgt_pool_remove(struct lu_tgt_pool *op, __u32 idx); int tgt_pool_free(struct lu_tgt_pool *op); int tgt_check_index(int idx, struct lu_tgt_pool *osts); diff --git a/lustre/quota/lquota_internal.h b/lustre/quota/lquota_internal.h index 93e687e..0d55ff7 100644 --- a/lustre/quota/lquota_internal.h +++ b/lustre/quota/lquota_internal.h @@ -201,7 +201,9 @@ struct lquota_entry { struct lqe_glbl_entry { __u64 lge_qunit; - unsigned long lge_edquot:1, + unsigned long lge_idx:16, + /* index of target */ + lge_edquot:1, /* true when minimum qunit is set */ lge_qunit_set:1, /* qunit or edquot is changed - need diff --git a/lustre/quota/qmt_entry.c b/lustre/quota/qmt_entry.c index 2ebf260..54586f9 100644 --- a/lustre/quota/qmt_entry.c +++ b/lustre/quota/qmt_entry.c @@ -831,8 +831,8 @@ bool qmt_adjust_edquot_qunit_notify(const struct lu_env *env, int idx) { struct lquota_entry *lqe_gl, *lqe; - bool need_reseed = false; - bool need_notify = false; + bool reseed = false; + bool notify = false; int i; lqe_gl = qti_lqes_glbl(env); @@ -840,30 +840,31 @@ bool qmt_adjust_edquot_qunit_notify(const struct lu_env *env, for (i = 0; i < qti_lqes_cnt(env); i++) { lqe = qti_lqes(env)[i]; if (qunit) - need_reseed |= qmt_adjust_qunit(env, lqe); + reseed |= qmt_adjust_qunit(env, lqe); if (edquot) - need_reseed |= qmt_adjust_edquot(lqe, now); + reseed |= qmt_adjust_edquot(lqe, now); } LASSERT(lqe_gl); if (!lqe_gl->lqe_glbl_data && (req_has_rep(qb_flags) || req_is_rel(qb_flags))) { - if (need_reseed) + if (reseed) CDEBUG(D_QUOTA, "%s: can not notify - lge_glbl_data is not set\n", qmt->qmt_svname); - return need_reseed; + return reseed; } - if (need_reseed || idx >= 0) { + if (reseed || idx >= 0) { mutex_lock(&lqe_gl->lqe_glbl_data_lock); if (lqe_gl->lqe_glbl_data) { struct lqe_glbl_data *lgd = lqe_gl->lqe_glbl_data; - if (need_reseed) { + if (reseed) { qmt_seed_glbe_all(env, lgd, qunit, edquot, false); } else if (idx >= 0) { - LASSERT(idx <= lgd->lqeg_num_alloc); + int lge_idx = qmt_map_lge_idx(lgd, idx); + /* If there are no locks yet when * lge_qunit/edquot_nu is set, slaves * are still not notified with new @@ -873,17 +874,17 @@ bool qmt_adjust_edquot_qunit_notify(const struct lu_env *env, * to the least qunit, but lqe_revoke_time is * still not set. */ - need_notify = lgd->lqeg_arr[idx].lge_qunit_nu || - lgd->lqeg_arr[idx].lge_edquot_nu; + notify = lgd->lqeg_arr[lge_idx].lge_qunit_nu || + lgd->lqeg_arr[lge_idx].lge_edquot_nu; } } mutex_unlock(&lqe_gl->lqe_glbl_data_lock); } - if (need_reseed || need_notify) + if (reseed || notify) qmt_id_lock_notify(qmt, lqe_gl); - return need_reseed; + return reseed; } @@ -1088,7 +1089,7 @@ void qti_lqes_write_unlock(const struct lu_env *env) lqe_write_unlock(qti_lqes(env)[i]); } -#define QMT_INIT_SLV_CNT 64 +#define QMT_INIT_SLV_CNT 64 struct lqe_glbl_data *qmt_alloc_lqe_gd(struct qmt_pool_info *pool, int qtype) { struct lqe_glbl_data *lgd; @@ -1127,12 +1128,29 @@ void qmt_free_lqe_gd(struct lqe_glbl_data *lgd) OBD_FREE(lgd, sizeof(struct lqe_glbl_data)); } +int qmt_map_lge_idx(struct lqe_glbl_data *lgd, int ostidx) +{ + int k; + + /* check common case of sequential OST numbers first */ + if (ostidx < lgd->lqeg_num_used && + lgd->lqeg_arr[ostidx].lge_idx == ostidx) + return ostidx; + + for (k = 0; k < lgd->lqeg_num_used; k++) + if (lgd->lqeg_arr[k].lge_idx == ostidx) + break; + + LASSERTF(k < lgd->lqeg_num_used, "Cannot map ostidx %d for %p\n", + ostidx, lgd); + return k; +} + void qmt_seed_glbe_all(const struct lu_env *env, struct lqe_glbl_data *lgd, bool qunit, bool edquot, bool pool_locked) { - struct rw_semaphore *sem = NULL; - struct qmt_pool_info *qpi; - int i, j, idx; + struct qmt_pool_info *qpi; + int i, j; ENTRY; if (!qti_lqes_cnt(env)) @@ -1165,21 +1183,16 @@ void qmt_seed_glbe_all(const struct lu_env *env, struct lqe_glbl_data *lgd, CDEBUG(D_QUOTA, "lqes_cnt %d, i %d\n", qti_lqes_cnt(env), i); qpi = lqe2qpi(lqe); - if (qmt_pool_global(qpi)) { - slaves_cnt = qpi_slv_nr_by_rtype(lqe2qpi(lqe), - lqe_qtype(lqe)); - } else { - if (!pool_locked) { - sem = qmt_sarr_rwsem(qpi); - down_read(sem); - } - - slaves_cnt = qmt_sarr_count(qpi); - } + if (!pool_locked) + qmt_sarr_read_down(qpi); + slaves_cnt = qmt_sarr_count(qpi); for (j = 0; j < slaves_cnt; j++) { - idx = qmt_sarr_get_idx(qpi, j); - LASSERT(idx >= 0); + int idx, tgt_idx; + + tgt_idx = qmt_sarr_get_idx(qpi, j); + LASSERT(tgt_idx >= 0); + idx = qmt_map_lge_idx(lgd, tgt_idx); if (edquot) { int lge_edquot, new_edquot, edquot_nu; @@ -1209,8 +1222,9 @@ qunit_lbl: __u64 lge_qunit, new_qunit; CDEBUG(D_QUOTA, - "idx %d lge_qunit_set %d lge_qunit %llu new_qunit %llu\n", - idx, lgd->lqeg_arr[idx].lge_qunit_set, + "tgt_idx %d idx %d lge_qunit_set %d lge_qunit %llu new_qunit %llu\n", + tgt_idx, idx, + lgd->lqeg_arr[idx].lge_qunit_set, lgd->lqeg_arr[idx].lge_qunit, lqe->lqe_qunit); /* lge for this idx is already set @@ -1237,14 +1251,15 @@ qunit_lbl: } } - if (!pool_locked && !qmt_pool_global(qpi)) - up_read(sem); + if (!pool_locked) + qmt_sarr_read_up(qpi); } /* TODO: only for debug purposes - remove it later */ for (i = 0; i < lgd->lqeg_num_used; i++) CDEBUG(D_QUOTA, - "lgd ost %d, qunit %lu nu %d; edquot %d nu %d\n", - i, (long unsigned)lgd->lqeg_arr[i].lge_qunit, + "lgd i %d tgt_idx %d qunit %lu nu %d; edquot %d nu %d\n", + i, lgd->lqeg_arr[i].lge_idx, + (unsigned long)lgd->lqeg_arr[i].lge_qunit, lgd->lqeg_arr[i].lge_qunit_nu, lgd->lqeg_arr[i].lge_edquot, lgd->lqeg_arr[i].lge_edquot_nu); @@ -1256,15 +1271,16 @@ void qmt_setup_lqe_gd(const struct lu_env *env, struct qmt_device *qmt, struct lquota_entry *lqe, struct lqe_glbl_data *lgd, int pool_type) { - __u64 qunit; - bool edquot; - int i; + __u64 qunit; + bool edquot; + int i; qunit = lqe->lqe_qunit; edquot = lqe->lqe_edquot; /* Firstly set all elements in array with * qunit and edquot of global pool */ + qmt_sarr_read_down(lqe2qpi(lqe)); for (i = 0; i < lgd->lqeg_num_used; i++) { lgd->lqeg_arr[i].lge_qunit = qunit; lgd->lqeg_arr[i].lge_edquot = edquot; @@ -1272,7 +1288,9 @@ void qmt_setup_lqe_gd(const struct lu_env *env, struct qmt_device *qmt, * will be sent to slaves during qmt_lvbo_fill. */ lgd->lqeg_arr[i].lge_qunit_nu = 0; lgd->lqeg_arr[i].lge_edquot_nu = 0; + lgd->lqeg_arr[i].lge_idx = qmt_sarr_get_idx(lqe2qpi(lqe), i); } + qmt_sarr_read_up(lqe2qpi(lqe)); qmt_pool_lqes_lookup_spec(env, qmt, pool_type, lqe_qtype(lqe), &lqe->lqe_id); diff --git a/lustre/quota/qmt_internal.h b/lustre/quota/qmt_internal.h index d80109a..f99975b 100644 --- a/lustre/quota/qmt_internal.h +++ b/lustre/quota/qmt_internal.h @@ -431,8 +431,15 @@ int qmt_pool_new_conn(const struct lu_env *, struct qmt_device *, qmt_pool_lookup(env, qmt, type, NULL, -1, false) #define qmt_pool_lookup_name(env, qmt, type, name) \ qmt_pool_lookup(env, qmt, type, name, -1, false) -#define qmt_pool_lookup_arr(env, qmt, type, idx) \ - qmt_pool_lookup(env, qmt, type, NULL, idx, true) + +/* + * Until MDT pools are not emplemented, all MDTs belong to + * global pool, thus lookup lqes only in global pool for the + * DOM case. + */ +#define qmt_pool_lookup_arr(env, qmt, type, idx, stype) \ + qmt_pool_lookup(env, qmt, type, NULL, \ + qmt_dom(type, stype) ? -1 : idx, true) struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env, struct qmt_device *qmt, int rtype, @@ -452,7 +459,10 @@ int qmt_pool_add(struct obd_device *obd, char *poolname, char *ostname); int qmt_pool_rem(struct obd_device *obd, char *poolname, char *ostname); int qmt_pool_del(struct obd_device *obd, char *poolname); -struct rw_semaphore *qmt_sarr_rwsem(struct qmt_pool_info *qpi); +#define qmt_sarr_read_down(qpi) down_read(&qpi->qpi_sarr.osts.op_rw_sem) +#define qmt_sarr_read_up(qpi) up_read(&qpi->qpi_sarr.osts.op_rw_sem) +#define qmt_sarr_write_down(qpi) down_write(&qpi->qpi_sarr.osts.op_rw_sem) +#define qmt_sarr_write_up(qpi) up_write(&qpi->qpi_sarr.osts.op_rw_sem) int qmt_sarr_get_idx(struct qmt_pool_info *qpi, int arr_idx); unsigned int qmt_sarr_count(struct qmt_pool_info *qpi); @@ -500,6 +510,7 @@ void qti_lqes_restore_fini(const struct lu_env *env); void qti_lqes_write_lock(const struct lu_env *env); void qti_lqes_write_unlock(const struct lu_env *env); +int qmt_map_lge_idx(struct lqe_glbl_data *lgd, int ostidx); struct lqe_glbl_data *qmt_alloc_lqe_gd(struct qmt_pool_info *, int); void qmt_free_lqe_gd(struct lqe_glbl_data *); void qmt_setup_lqe_gd(const struct lu_env *, struct qmt_device *, diff --git a/lustre/quota/qmt_lock.c b/lustre/quota/qmt_lock.c index 976bdfc..b690b13 100644 --- a/lustre/quota/qmt_lock.c +++ b/lustre/quota/qmt_lock.c @@ -271,48 +271,71 @@ out: */ static bool qmt_clear_lgeg_arr_nu(struct lquota_entry *lqe, int stype, int idx) { - unsigned long least_qunit = lqe2qpi(lqe)->qpi_least_qunit; - struct lqe_glbl_data *lgd = lqe->lqe_glbl_data; + unsigned long least = lqe2qpi(lqe)->qpi_least_qunit; + bool revoke = false; /* There is no array to store lge for the case of DOM. - * Ignore it until MDT pools will be ready. */ + * Ignore it until MDT pools will be ready. + */ if (!qmt_dom(lqe_rtype(lqe), stype)) { - lqe->lqe_glbl_data->lqeg_arr[idx].lge_qunit_nu = 0; - lqe->lqe_glbl_data->lqeg_arr[idx].lge_edquot_nu = 0; + struct lqe_glbl_data *lgd; + + mutex_lock(&lqe->lqe_glbl_data_lock); + lgd = lqe->lqe_glbl_data; + if (lgd) { + int lge_idx = qmt_map_lge_idx(lgd, idx); - /* We shouldn't call revoke for DOM case, it will be updated - * at qmt_id_lock_glimpse. */ - return (lgd->lqeg_arr[idx].lge_qunit == least_qunit); + lgd->lqeg_arr[lge_idx].lge_qunit_nu = 0; + lgd->lqeg_arr[lge_idx].lge_edquot_nu = 0; + /* We shouldn't call revoke for DOM case, it will be + * updated at qmt_id_lock_glimpse. + */ + revoke = lgd->lqeg_arr[lge_idx].lge_qunit == least; + } + mutex_unlock(&lqe->lqe_glbl_data_lock); } - return false; + return revoke; } -static bool qmt_set_revoke(struct lu_env *env, struct lquota_entry *lqe, +static bool qmt_set_revoke(struct lu_env *env, struct lquota_entry *lqe_gl, int stype, int idx) { - unsigned long least_qunit = lqe2qpi(lqe)->qpi_least_qunit; - struct lqe_glbl_data *lgd = lqe->lqe_glbl_data; + unsigned long least_qunit = lqe2qpi(lqe_gl)->qpi_least_qunit; bool notify = false; - if (lgd->lqeg_arr[idx].lge_qunit == least_qunit) { - int i; - - qti_lqes_write_lock(env); - for (i = 0; i < qti_lqes_cnt(env); i++) { - LQUOTA_DEBUG(qti_lqes(env)[i], - "idx %d lge_qunit %llu least_qunit %lu\n", - idx, lgd->lqeg_arr[idx].lge_qunit, - least_qunit); - if (qti_lqes(env)[i]->lqe_qunit == least_qunit) { - qti_lqes(env)[i]->lqe_revoke_time = + if (qmt_dom(lqe_rtype(lqe_gl), stype)) + return false; + + qti_lqes_write_lock(env); + mutex_lock(&lqe_gl->lqe_glbl_data_lock); + if (lqe_gl->lqe_glbl_data) { + struct lqe_glbl_data *lgd = lqe_gl->lqe_glbl_data; + int lge_idx; + + lge_idx = qmt_map_lge_idx(lgd, idx); + if (lgd->lqeg_arr[lge_idx].lge_qunit == least_qunit) { + struct lquota_entry *lqe; + int i; + + for (i = 0; i < qti_lqes_cnt(env); i++) { + lqe = qti_lqes(env)[i]; + LQUOTA_DEBUG(lqe, + "lge_qunit %llu least_qunit %lu idx %d\n", + lgd->lqeg_arr[lge_idx].lge_qunit, + least_qunit, idx); + if (lqe->lqe_qunit == least_qunit) { + lqe->lqe_revoke_time = ktime_get_seconds(); - notify |= qmt_adjust_edquot(qti_lqes(env)[i], + notify |= qmt_adjust_edquot(lqe, ktime_get_real_seconds()); + } } } - qti_lqes_write_unlock(env); } + mutex_unlock(&lqe_gl->lqe_glbl_data_lock); + qti_lqes_write_unlock(env); + return notify; } @@ -403,10 +426,16 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res, GOTO(out_exp, rc); if (need_revoke && qmt_set_revoke(env, lqe, stype, idx)) { + int notify = false; + mutex_lock(&lqe->lqe_glbl_data_lock); - qmt_seed_glbe_edquot(env, lqe->lqe_glbl_data); - qmt_id_lock_notify(qmt, lqe); + if (lqe->lqe_glbl_data) { + qmt_seed_glbe_edquot(env, lqe->lqe_glbl_data); + notify = true; + } mutex_unlock(&lqe->lqe_glbl_data_lock); + if (notify) + qmt_id_lock_notify(qmt, lqe); } if (lvb->lvb_id_rel) { @@ -659,12 +688,14 @@ void qmt_setup_id_desc(struct ldlm_lock *lock, union ldlm_gl_desc *desc, qunit = lqe->lqe_qunit; } else { struct lqe_glbl_data *lgd; + int lge_idx; mutex_lock(&lqe->lqe_glbl_data_lock); lgd = lqe->lqe_glbl_data; if (lgd) { - edquot = lgd->lqeg_arr[idx].lge_edquot; - qunit = lgd->lqeg_arr[idx].lge_qunit; + lge_idx = qmt_map_lge_idx(lgd, idx); + edquot = lgd->lqeg_arr[lge_idx].lge_edquot; + qunit = lgd->lqeg_arr[lge_idx].lge_qunit; } else { edquot = lqe->lqe_edquot; qunit = lqe->lqe_qunit; @@ -865,20 +896,32 @@ void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe, * broadcasting the new qunit value */ static int qmt_id_lock_cb(struct ldlm_lock *lock, struct lquota_entry *lqe) { - struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid; + struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid; struct lqe_glbl_data *lgd = lqe->lqe_glbl_data; int idx; int stype = qmt_uuid2idx(uuid, &idx); LASSERT(stype == QMT_STYPE_OST || stype == QMT_STYPE_MDT); + CDEBUG(D_QUOTA, "stype %d rtype %d idx %d uuid %s\n", + stype, lqe_rtype(lqe), idx, uuid->uuid); /* Quota pools support only OSTs, despite MDTs also could be registered * as LQUOTA_RES_DT devices(DOM). */ if (qmt_dom(lqe_rtype(lqe), stype)) return 1; - else - return lgd ? lgd->lqeg_arr[idx].lge_edquot_nu || - lgd->lqeg_arr[idx].lge_qunit_nu : 0; + + if (lgd) { + int lge_idx = qmt_map_lge_idx(lgd, idx); + + CDEBUG(D_QUOTA, + "tgt idx:%d lge_idx:%d edquot_nu:%d qunit_nu:%d\n", + idx, lge_idx, lgd->lqeg_arr[lge_idx].lge_edquot_nu, + lgd->lqeg_arr[lge_idx].lge_qunit_nu); + return lgd->lqeg_arr[lge_idx].lge_edquot_nu || + lgd->lqeg_arr[lge_idx].lge_qunit_nu; + } + + return 0; } diff --git a/lustre/quota/qmt_pool.c b/lustre/quota/qmt_pool.c index ba50583..07e3c9d 100644 --- a/lustre/quota/qmt_pool.c +++ b/lustre/quota/qmt_pool.c @@ -55,10 +55,14 @@ #include "qmt_internal.h" static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi); -static inline int qmt_sarr_pool_add(struct qmt_pool_info *qpi, - int idx, int min); +#define qmt_sarr_pool_add(qpi, idx, stype) \ + _qmt_sarr_pool_add(qpi, idx, stype, false) +#define qmt_sarr_pool_add_locked(qpi, idx, stype) \ + _qmt_sarr_pool_add(qpi, idx, stype, true) +static inline int _qmt_sarr_pool_add(struct qmt_pool_info *qpi, + int idx, int min, bool locked); static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx); -static inline int qmt_sarr_pool_free(struct qmt_pool_info *qpi); +static inline void qmt_sarr_pool_free(struct qmt_pool_info *qpi); static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx); static inline void qmt_stop_pool_recalc(struct qmt_pool_info *qpi); @@ -510,25 +514,34 @@ int qmt_pool_init(const struct lu_env *env, struct qmt_device *qmt) RETURN(rc); } -static int qmt_slv_cnt(const struct lu_env *env, struct lu_fid *glb_fid, +static int qmt_slv_add(const struct lu_env *env, struct lu_fid *glb_fid, char *slv_name, struct lu_fid *slv_fid, void *arg) { struct obd_uuid uuid; - int (*nr)[QMT_STYPE_CNT][LL_MAXQUOTAS] = arg; - int stype, qtype; + struct qmt_pool_info *qpi = arg; + int stype, qtype, idx; int rc; rc = lquota_extract_fid(glb_fid, NULL, &qtype); LASSERT(!rc); obd_str2uuid(&uuid, slv_name); - stype = qmt_uuid2idx(&uuid, NULL); + stype = qmt_uuid2idx(&uuid, &idx); if (stype < 0) return stype; + + CDEBUG(D_QUOTA, "add new idx:%d in %s\n", idx, qpi->qpi_name); + rc = qmt_sarr_pool_add(qpi, idx, stype); + if (rc && rc != -EEXIST) { + CERROR("%s: can't add idx %d into dt-0x0: rc = %d\n", + qpi->qpi_qmt->qmt_svname, idx, rc); + return rc; + } + /* one more slave */ - (*nr)[stype][qtype]++; + qpi->qpi_slv_nr[stype][qtype]++; CDEBUG(D_QUOTA, "slv_name %s stype %d qtype %d nr %d\n", - slv_name, stype, qtype, (*nr)[stype][qtype]); + slv_name, stype, qtype, qpi->qpi_slv_nr[stype][qtype]); return 0; } @@ -648,8 +661,8 @@ int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt, rc = lquota_disk_for_each_slv(env, pool->qpi_root, &qti->qti_fid, - qmt_slv_cnt, - &pool->qpi_slv_nr); + qmt_slv_add, + pool); if (rc) { CERROR("%s: failed to scan & count slave indexes for %s type: rc = %d\n", qmt->qmt_svname, qtype_name(qtype), rc); @@ -687,6 +700,76 @@ int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt, RETURN(0); } +static int qmt_lgd_extend_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd, + struct hlist_node *hnode, void *data) +{ + struct lqe_glbl_entry *lqeg_arr, *old_lqeg_arr; + struct lquota_entry *lqe; + int old_num, rc; + + lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash); + LASSERT(atomic_read(&lqe->lqe_ref) > 0); + old_num = 0; + rc = 0; + + CDEBUG(D_QUOTA, "lgd %px\n", lqe->lqe_glbl_data); + if (lqe->lqe_glbl_data) { + struct lqe_glbl_data *lgd; + + old_lqeg_arr = NULL; + mutex_lock(&lqe->lqe_glbl_data_lock); + if (lqe->lqe_glbl_data) { + struct qmt_pool_info *qpi = lqe2qpi(lqe); + int sarr_cnt = qmt_sarr_count(qpi); + + lgd = lqe->lqe_glbl_data; + if (lgd->lqeg_num_alloc < sarr_cnt) { + LASSERT((lgd->lqeg_num_alloc + 1) == sarr_cnt); + + OBD_ALLOC(lqeg_arr, + sizeof(struct lqe_glbl_entry) * + (lgd->lqeg_num_alloc + 16)); + if (lqeg_arr) { + memcpy(lqeg_arr, lgd->lqeg_arr, + sizeof(struct lqe_glbl_entry) * + (lgd->lqeg_num_alloc)); + old_lqeg_arr = lgd->lqeg_arr; + old_num = lgd->lqeg_num_alloc; + lgd->lqeg_arr = lqeg_arr; + lgd->lqeg_num_alloc += 16; + CDEBUG(D_QUOTA, + "extend lqeg_arr:%px from %d to %d\n", + lgd, old_num, + lgd->lqeg_num_alloc); + } else { + CERROR("%s: cannot allocate new lqeg_arr: rc = %d\n", + qpi->qpi_qmt->qmt_svname, + -ENOMEM); + GOTO(out, rc = -ENOMEM); + } + } + lgd->lqeg_arr[lgd->lqeg_num_used].lge_idx = + qmt_sarr_get_idx(qpi, sarr_cnt - 1); + lgd->lqeg_arr[lgd->lqeg_num_used].lge_edquot = + lqe->lqe_edquot; + lgd->lqeg_arr[lgd->lqeg_num_used].lge_qunit = + lqe->lqe_qunit; + lgd->lqeg_arr[lgd->lqeg_num_used].lge_edquot_nu = 0; + lgd->lqeg_arr[lgd->lqeg_num_used].lge_qunit_nu = 0; + LQUOTA_DEBUG(lqe, "add tgt idx:%d used %d alloc %d\n", + lgd->lqeg_arr[lgd->lqeg_num_used].lge_idx, + lgd->lqeg_num_used, lgd->lqeg_num_alloc); + lgd->lqeg_num_used++; + } +out: + mutex_unlock(&lqe->lqe_glbl_data_lock); + if (old_lqeg_arr) + OBD_FREE(old_lqeg_arr, old_num * sizeof(struct lqe_glbl_entry)); + } + + return rc; +} + /* * Handle new slave connection. Called when a slave enqueues the global quota * lock at the beginning of the reintegration procedure. @@ -714,15 +797,14 @@ int qmt_pool_new_conn(const struct lu_env *env, struct qmt_device *qmt, stype = qmt_uuid2idx(uuid, &idx); if (stype < 0) RETURN(stype); + CDEBUG(D_QUOTA, "FID "DFID"\n", PFID(glb_fid)); /* extract pool info from global index FID */ rc = lquota_extract_fid(glb_fid, &pool_type, &qtype); if (rc) RETURN(rc); - /* look-up pool in charge of this global index FID */ - qti_pools_init(env); - pool = qmt_pool_lookup_arr(env, qmt, pool_type, idx); + pool = qmt_pool_lookup_glb(env, qmt, pool_type); if (IS_ERR(pool)) RETURN(PTR_ERR(pool)); @@ -747,11 +829,51 @@ int qmt_pool_new_conn(const struct lu_env *env, struct qmt_device *qmt, memcpy(slv_fid, lu_object_fid(&slv_obj->do_lu), sizeof(*slv_fid)); *slv_ver = dt_version_get(env, slv_obj); dt_object_put(env, slv_obj); - if (created) + if (created) { + struct qmt_pool_info *ptr; + + CDEBUG(D_QUOTA, "add tgt idx:%d pool_type:%d qtype:%d stype:%d\n", + idx, pool_type, qtype, stype); + + if (!qmt_dom(qtype, stype)) { + qmt_sarr_write_down(pool); + rc = qmt_sarr_pool_add_locked(pool, idx, stype); + if (!rc) { + for (i = 0; i < LL_MAXQUOTAS; i++) + cfs_hash_for_each(pool->qpi_site[i]-> + lqs_hash, + qmt_lgd_extend_cb, + &env); + } else if (rc == -EEXIST) { + /* This target has been already added + * by another qtype + */ + rc = 0; + } + qmt_sarr_write_up(pool); + + if (rc) { + CERROR("%s: cannot add idx:%d to pool %s: rc = %d\n", + qmt->qmt_svname, idx, + pool->qpi_name, rc); + GOTO(out, rc); + } + } + + /* look-up pool in charge of this global index FID */ + qti_pools_init(env); + ptr = qmt_pool_lookup_arr(env, qmt, pool_type, idx, stype); + if (IS_ERR(ptr)) + GOTO(out, rc = PTR_ERR(ptr)); + for (i = 0; i < qti_pools_cnt(env); i++) qti_pools_env(env)[i]->qpi_slv_nr[stype][qtype]++; + + qti_pools_fini(env); + } + out: - qti_pools_fini(env); + qpi_putref(env, pool); RETURN(rc); } @@ -810,15 +932,10 @@ int qmt_pool_lqes_lookup(const struct lu_env *env, int rc, i; ENTRY; - /* Until MDT pools are not emplemented, all MDTs belong to - * global pool, thus lookup lqes only from global pool. */ - if (qmt_dom(rtype, stype)) - idx = -1; - qti_pools_init(env); rc = 0; /* look-up pool responsible for this global index FID */ - pool = qmt_pool_lookup_arr(env, qmt, rtype, idx); + pool = qmt_pool_lookup_arr(env, qmt, rtype, idx, stype); if (IS_ERR(pool)) { qti_pools_fini(env); RETURN(PTR_ERR(pool)); @@ -1170,10 +1287,10 @@ static struct obd_device *qmt_get_mgc(struct qmt_device *qmt) static int qmt_pool_recalc(void *args) { struct qmt_pool_info *pool, *glbl_pool; - struct rw_semaphore *sem = NULL; struct obd_device *obd; struct lu_env env; int i, rc, qtype, slaves_cnt; + bool sem = false; ENTRY; pool = args; @@ -1188,16 +1305,16 @@ static int qmt_pool_recalc(void *args) obd = qmt_get_mgc(pool->qpi_qmt); if (IS_ERR(obd)) GOTO(out, rc = PTR_ERR(obd)); - else - /* Waiting for the end of processing mgs config. - * It is needed to be sure all pools are configured. */ - while (obd->obd_process_conf) - schedule_timeout_uninterruptible(cfs_time_seconds(1)); + + /* Waiting for the end of processing mgs config. + * It is needed to be sure all pools are configured. + */ + while (obd->obd_process_conf) + schedule_timeout_uninterruptible(cfs_time_seconds(1)); OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_RECALC, cfs_fail_val); - sem = qmt_sarr_rwsem(pool); - LASSERT(sem); - down_read(sem); + qmt_sarr_read_down(pool); + sem = true; /* Hold this to be sure that OSTs from this pool * can't do acquire/release. * @@ -1277,7 +1394,7 @@ out: * Thus until up_read, no one can restart recalc thread. */ if (sem) { - up_read(sem); + qmt_sarr_read_up(pool); up_write(&pool->qpi_recalc_sem); } @@ -1407,7 +1524,7 @@ static int qmt_pool_add_rem(struct obd_device *obd, char *poolname, GOTO(out, rc = PTR_ERR(qpi)); } - rc = add ? qmt_sarr_pool_add(qpi, idx, 32) : + rc = add ? qmt_sarr_pool_add(qpi, idx, QMT_STYPE_OST) : qmt_sarr_pool_rem(qpi, idx); if (rc) { CERROR("%s: can't %s %s pool %s: rc = %d\n", @@ -1527,57 +1644,31 @@ int qmt_pool_del(struct obd_device *obd, char *poolname) static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi) { - - /* No need to initialize sarray for global pool - * as it always includes all slaves */ - if (qmt_pool_global(qpi)) - return 0; - - switch (qpi->qpi_rtype) { - case LQUOTA_RES_DT: - return tgt_pool_init(&qpi->qpi_sarr.osts, 0); - case LQUOTA_RES_MD: - default: - return 0; - } + return tgt_pool_init(&qpi->qpi_sarr.osts, 0); } -static inline int qmt_sarr_pool_add(struct qmt_pool_info *qpi, int idx, int min) +static inline int +_qmt_sarr_pool_add(struct qmt_pool_info *qpi, int idx, int stype, bool locked) { - switch (qpi->qpi_rtype) { - case LQUOTA_RES_DT: - return tgt_pool_add(&qpi->qpi_sarr.osts, idx, min); - case LQUOTA_RES_MD: - default: + /* We don't have an array for DOM */ + if (qmt_dom(qpi->qpi_rtype, stype)) return 0; - } + + if (locked) + return tgt_pool_add_locked(&qpi->qpi_sarr.osts, idx, 32); + else + return tgt_pool_add(&qpi->qpi_sarr.osts, idx, 32); } static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx) { - switch (qpi->qpi_rtype) { - case LQUOTA_RES_DT: - return tgt_pool_remove(&qpi->qpi_sarr.osts, idx); - case LQUOTA_RES_MD: - default: - return 0; - } + return tgt_pool_remove(&qpi->qpi_sarr.osts, idx); } -static inline int qmt_sarr_pool_free(struct qmt_pool_info *qpi) +static inline void qmt_sarr_pool_free(struct qmt_pool_info *qpi) { - if (qmt_pool_global(qpi)) - return 0; - - switch (qpi->qpi_rtype) { - case LQUOTA_RES_DT: - if (!qpi->qpi_sarr.osts.op_array) - return 0; - return tgt_pool_free(&qpi->qpi_sarr.osts); - case LQUOTA_RES_MD: - default: - return 0; - } + if (qpi->qpi_sarr.osts.op_array) + tgt_pool_free(&qpi->qpi_sarr.osts); } static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx) @@ -1585,53 +1676,19 @@ static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx) if (qmt_pool_global(qpi)) return 0; - switch (qpi->qpi_rtype) { - case LQUOTA_RES_DT: - return tgt_check_index(idx, &qpi->qpi_sarr.osts); - case LQUOTA_RES_MD: - default: - return 0; - } -} - -struct rw_semaphore *qmt_sarr_rwsem(struct qmt_pool_info *qpi) -{ - switch (qpi->qpi_rtype) { - case LQUOTA_RES_DT: - /* to protect ost_pool use */ - return &qpi->qpi_sarr.osts.op_rw_sem; - case LQUOTA_RES_MD: - default: - return NULL; - } + return tgt_check_index(idx, &qpi->qpi_sarr.osts); } int qmt_sarr_get_idx(struct qmt_pool_info *qpi, int arr_idx) { - - if (qmt_pool_global(qpi)) - return arr_idx; - - switch (qpi->qpi_rtype) { - case LQUOTA_RES_DT: - LASSERTF(arr_idx < qpi->qpi_sarr.osts.op_count && arr_idx >= 0, - "idx invalid %d op_count %d\n", arr_idx, - qpi->qpi_sarr.osts.op_count); - return qpi->qpi_sarr.osts.op_array[arr_idx]; - case LQUOTA_RES_MD: - default: - return -EINVAL; - } + LASSERTF(arr_idx < qpi->qpi_sarr.osts.op_count && arr_idx >= 0, + "idx invalid %d op_count %d\n", arr_idx, + qpi->qpi_sarr.osts.op_count); + return qpi->qpi_sarr.osts.op_array[arr_idx]; } /* Number of slaves in a pool */ unsigned int qmt_sarr_count(struct qmt_pool_info *qpi) { - switch (qpi->qpi_rtype) { - case LQUOTA_RES_DT: - return qpi->qpi_sarr.osts.op_count; - case LQUOTA_RES_MD: - default: - return -EINVAL; - } + return qpi->qpi_sarr.osts.op_count; } diff --git a/lustre/target/tgt_pool.c b/lustre/target/tgt_pool.c index 4beef3a..f1f7219 100644 --- a/lustre/target/tgt_pool.c +++ b/lustre/target/tgt_pool.c @@ -132,13 +132,15 @@ EXPORT_SYMBOL(tgt_pool_extend); * \retval 0 if target could be added to the pool * \retval negative error if target \a idx was not added */ -int tgt_pool_add(struct lu_tgt_pool *op, __u32 idx, unsigned int min_count) +int tgt_pool_add_lock(struct lu_tgt_pool *op, __u32 idx, + unsigned int min_count, bool lock) { unsigned int i; int rc = 0; ENTRY; - down_write(&op->op_rw_sem); + if (lock) + down_write(&op->op_rw_sem); rc = tgt_pool_extend(op, min_count); if (rc) @@ -154,10 +156,11 @@ int tgt_pool_add(struct lu_tgt_pool *op, __u32 idx, unsigned int min_count) op->op_count++; EXIT; out: - up_write(&op->op_rw_sem); + if (lock) + up_write(&op->op_rw_sem); return rc; } -EXPORT_SYMBOL(tgt_pool_add); +EXPORT_SYMBOL(tgt_pool_add_lock); /** * Remove an existing pool from the system. diff --git a/lustre/tests/conf-sanity.sh b/lustre/tests/conf-sanity.sh index 3cb295e..f7bbaf6 100644 --- a/lustre/tests/conf-sanity.sh +++ b/lustre/tests/conf-sanity.sh @@ -15,7 +15,7 @@ init_logging ALWAYS_EXCEPT="$CONF_SANITY_EXCEPT 32newtarball" # bug number for skipped test: LU-11915 EX-3746 -ALWAYS_EXCEPT="$ALWAYS_EXCEPT 33c 110 115 5a" +ALWAYS_EXCEPT="$ALWAYS_EXCEPT 110 115 5a" # UPDATE THE COMMENT ABOVE WITH BUG NUMBERS WHEN CHANGING ALWAYS_EXCEPT! if $SHARED_KEY; then @@ -2653,6 +2653,9 @@ test_33c() { local mgs_flag="--mgs" fi + load_modules + stack_trap unload_modules_conf + add fs2mds $(mkfs_opts mds1 ${fs2mdsdev}) --fsname=${FSNAME} \ --reformat $mgs_flag $mkfsoptions $fs2mdsdev $fs2mdsvdev || exit 10 @@ -2668,8 +2671,8 @@ test_33c() { mount_client $MOUNT || error "client start failed" stack_trap "umount_client $MOUNT" - mkdir_on_mdt0 $DIR/$tdir || "cannot create $DIR/$tdir" - chmod 0777 $DIR/$tdir || "chown failed" + mkdir_on_mdt0 $DIR/$tdir || error "cannot create $DIR/$tdir" + chmod 0777 $DIR/$tdir || error "chown failed" if [[ $PERM_CMD == *"set_param -P"* ]]; then do_facet mgs $PERM_CMD \ osd-*.$FSNAME-OST*.quota_slave.enable=$QUOTA_TYPE -- 1.8.3.1