From 0fe1faf20cdf119591f151242aecd7d11e29a6ce Mon Sep 17 00:00:00 2001 From: Sergey Cheremencev Date: Mon, 8 Apr 2024 14:25:41 +0300 Subject: [PATCH] DDN-4905 revert: "quota: lqeg_arr memmory corruption" This reverts commit 7c6d08994b23cc3ef112e3626f9402dbccf0bc2c ("LU-17034 quota: lqeg_arr memmory corruption") as it causes following panic: qmt_map_lge_idx()) ASSERTION( k < lgd->lqeg_num_used ) failed: Cannot map ostidx 32 for 0000000072ee3f23 qmt_map_lge_idx()) LBUG Call Trace TBD: libcfs_call_trace+0x6f/0xa0 [libcfs] lbug_with_loc+0x3f/0x70 [libcfs] qmt_map_lge_idx+0x7f/0x90 [lquota] qmt_seed_glbe_all+0x17f/0x770 [lquota] qmt_revalidate_lqes+0x213/0x360 [lquota] qmt_dqacq0+0x7d5/0x2320 [lquota] qmt_intent_policy+0x8d2/0xf10 [lquota] mdt_intent_opc+0x9a9/0xa80 [mdt] mdt_intent_policy+0x1fd/0x390 [mdt] ldlm_lock_enqueue+0x469/0xa90 [ptlrpc] ldlm_handle_enqueue0+0x61a/0x16c0 [ptlrpc] tgt_enqueue+0xa4/0x200 [ptlrpc] tgt_request_handle+0xc9c/0x1950 [ptlrpc] ptlrpc_server_handle_request+0x323/0xbd0 [ptlrpc] ptlrpc_main+0xbf1/0x1510 [ptlrpc] kthread+0x134/0x150 ret_from_fork+0x1f/0x40 Kernel panic - not syncing: LBUG Fixes: 7c6d08994b ("LU-17034 quota: lqeg_arr memmory corruption") Signed-off-by: Sergey Cheremencev Change-Id: Iff377529d2862c869b751b4c942b476262951570 Reviewed-on: https://review.whamcloud.com/c/ex/lustre-release/+/54695 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Andreas Dilger --- lustre/include/obd_target.h | 7 +- lustre/quota/lquota_internal.h | 4 +- lustre/quota/qmt_entry.c | 96 ++++++-------- lustre/quota/qmt_internal.h | 17 +-- lustre/quota/qmt_lock.c | 109 +++++----------- lustre/quota/qmt_pool.c | 283 ++++++++++++++++------------------------- lustre/target/tgt_pool.c | 11 +- lustre/tests/conf-sanity.sh | 9 +- 8 files changed, 197 insertions(+), 339 deletions(-) diff --git a/lustre/include/obd_target.h b/lustre/include/obd_target.h index b2a6fae6..406f408 100644 --- a/lustre/include/obd_target.h +++ b/lustre/include/obd_target.h @@ -81,12 +81,7 @@ struct lu_tgt_pool { }; int tgt_pool_init(struct lu_tgt_pool *op, unsigned int count); -#define tgt_pool_add(op, idx, min_count) \ - tgt_pool_add_lock(op, idx, min_count, true) -#define tgt_pool_add_locked(op, idx, min_count) \ - tgt_pool_add_lock(op, idx, min_count, false) -int tgt_pool_add_lock(struct lu_tgt_pool *op, __u32 idx, - unsigned int min_count, bool locked); +int tgt_pool_add(struct lu_tgt_pool *op, __u32 idx, unsigned int min_count); int tgt_pool_remove(struct lu_tgt_pool *op, __u32 idx); int tgt_pool_free(struct lu_tgt_pool *op); int tgt_check_index(int idx, struct lu_tgt_pool *osts); diff --git a/lustre/quota/lquota_internal.h b/lustre/quota/lquota_internal.h index 0d55ff7..93e687e 100644 --- a/lustre/quota/lquota_internal.h +++ b/lustre/quota/lquota_internal.h @@ -201,9 +201,7 @@ struct lquota_entry { struct lqe_glbl_entry { __u64 lge_qunit; - unsigned long lge_idx:16, - /* index of target */ - lge_edquot:1, + unsigned long lge_edquot:1, /* true when minimum qunit is set */ lge_qunit_set:1, /* qunit or edquot is changed - need diff --git a/lustre/quota/qmt_entry.c b/lustre/quota/qmt_entry.c index 54586f9..2ebf260 100644 --- a/lustre/quota/qmt_entry.c +++ b/lustre/quota/qmt_entry.c @@ -831,8 +831,8 @@ bool qmt_adjust_edquot_qunit_notify(const struct lu_env *env, int idx) { struct lquota_entry *lqe_gl, *lqe; - bool reseed = false; - bool notify = false; + bool need_reseed = false; + bool need_notify = false; int i; lqe_gl = qti_lqes_glbl(env); @@ -840,31 +840,30 @@ bool qmt_adjust_edquot_qunit_notify(const struct lu_env *env, for (i = 0; i < qti_lqes_cnt(env); i++) { lqe = qti_lqes(env)[i]; if (qunit) - reseed |= qmt_adjust_qunit(env, lqe); + need_reseed |= qmt_adjust_qunit(env, lqe); if (edquot) - reseed |= qmt_adjust_edquot(lqe, now); + need_reseed |= qmt_adjust_edquot(lqe, now); } LASSERT(lqe_gl); if (!lqe_gl->lqe_glbl_data && (req_has_rep(qb_flags) || req_is_rel(qb_flags))) { - if (reseed) + if (need_reseed) CDEBUG(D_QUOTA, "%s: can not notify - lge_glbl_data is not set\n", qmt->qmt_svname); - return reseed; + return need_reseed; } - if (reseed || idx >= 0) { + if (need_reseed || idx >= 0) { mutex_lock(&lqe_gl->lqe_glbl_data_lock); if (lqe_gl->lqe_glbl_data) { struct lqe_glbl_data *lgd = lqe_gl->lqe_glbl_data; - if (reseed) { + if (need_reseed) { qmt_seed_glbe_all(env, lgd, qunit, edquot, false); } else if (idx >= 0) { - int lge_idx = qmt_map_lge_idx(lgd, idx); - + LASSERT(idx <= lgd->lqeg_num_alloc); /* If there are no locks yet when * lge_qunit/edquot_nu is set, slaves * are still not notified with new @@ -874,17 +873,17 @@ bool qmt_adjust_edquot_qunit_notify(const struct lu_env *env, * to the least qunit, but lqe_revoke_time is * still not set. */ - notify = lgd->lqeg_arr[lge_idx].lge_qunit_nu || - lgd->lqeg_arr[lge_idx].lge_edquot_nu; + need_notify = lgd->lqeg_arr[idx].lge_qunit_nu || + lgd->lqeg_arr[idx].lge_edquot_nu; } } mutex_unlock(&lqe_gl->lqe_glbl_data_lock); } - if (reseed || notify) + if (need_reseed || need_notify) qmt_id_lock_notify(qmt, lqe_gl); - return reseed; + return need_reseed; } @@ -1089,7 +1088,7 @@ void qti_lqes_write_unlock(const struct lu_env *env) lqe_write_unlock(qti_lqes(env)[i]); } -#define QMT_INIT_SLV_CNT 64 +#define QMT_INIT_SLV_CNT 64 struct lqe_glbl_data *qmt_alloc_lqe_gd(struct qmt_pool_info *pool, int qtype) { struct lqe_glbl_data *lgd; @@ -1128,29 +1127,12 @@ void qmt_free_lqe_gd(struct lqe_glbl_data *lgd) OBD_FREE(lgd, sizeof(struct lqe_glbl_data)); } -int qmt_map_lge_idx(struct lqe_glbl_data *lgd, int ostidx) -{ - int k; - - /* check common case of sequential OST numbers first */ - if (ostidx < lgd->lqeg_num_used && - lgd->lqeg_arr[ostidx].lge_idx == ostidx) - return ostidx; - - for (k = 0; k < lgd->lqeg_num_used; k++) - if (lgd->lqeg_arr[k].lge_idx == ostidx) - break; - - LASSERTF(k < lgd->lqeg_num_used, "Cannot map ostidx %d for %p\n", - ostidx, lgd); - return k; -} - void qmt_seed_glbe_all(const struct lu_env *env, struct lqe_glbl_data *lgd, bool qunit, bool edquot, bool pool_locked) { - struct qmt_pool_info *qpi; - int i, j; + struct rw_semaphore *sem = NULL; + struct qmt_pool_info *qpi; + int i, j, idx; ENTRY; if (!qti_lqes_cnt(env)) @@ -1183,16 +1165,21 @@ void qmt_seed_glbe_all(const struct lu_env *env, struct lqe_glbl_data *lgd, CDEBUG(D_QUOTA, "lqes_cnt %d, i %d\n", qti_lqes_cnt(env), i); qpi = lqe2qpi(lqe); - if (!pool_locked) - qmt_sarr_read_down(qpi); - slaves_cnt = qmt_sarr_count(qpi); + if (qmt_pool_global(qpi)) { + slaves_cnt = qpi_slv_nr_by_rtype(lqe2qpi(lqe), + lqe_qtype(lqe)); + } else { + if (!pool_locked) { + sem = qmt_sarr_rwsem(qpi); + down_read(sem); + } - for (j = 0; j < slaves_cnt; j++) { - int idx, tgt_idx; + slaves_cnt = qmt_sarr_count(qpi); + } - tgt_idx = qmt_sarr_get_idx(qpi, j); - LASSERT(tgt_idx >= 0); - idx = qmt_map_lge_idx(lgd, tgt_idx); + for (j = 0; j < slaves_cnt; j++) { + idx = qmt_sarr_get_idx(qpi, j); + LASSERT(idx >= 0); if (edquot) { int lge_edquot, new_edquot, edquot_nu; @@ -1222,9 +1209,8 @@ qunit_lbl: __u64 lge_qunit, new_qunit; CDEBUG(D_QUOTA, - "tgt_idx %d idx %d lge_qunit_set %d lge_qunit %llu new_qunit %llu\n", - tgt_idx, idx, - lgd->lqeg_arr[idx].lge_qunit_set, + "idx %d lge_qunit_set %d lge_qunit %llu new_qunit %llu\n", + idx, lgd->lqeg_arr[idx].lge_qunit_set, lgd->lqeg_arr[idx].lge_qunit, lqe->lqe_qunit); /* lge for this idx is already set @@ -1251,15 +1237,14 @@ qunit_lbl: } } - if (!pool_locked) - qmt_sarr_read_up(qpi); + if (!pool_locked && !qmt_pool_global(qpi)) + up_read(sem); } /* TODO: only for debug purposes - remove it later */ for (i = 0; i < lgd->lqeg_num_used; i++) CDEBUG(D_QUOTA, - "lgd i %d tgt_idx %d qunit %lu nu %d; edquot %d nu %d\n", - i, lgd->lqeg_arr[i].lge_idx, - (unsigned long)lgd->lqeg_arr[i].lge_qunit, + "lgd ost %d, qunit %lu nu %d; edquot %d nu %d\n", + i, (long unsigned)lgd->lqeg_arr[i].lge_qunit, lgd->lqeg_arr[i].lge_qunit_nu, lgd->lqeg_arr[i].lge_edquot, lgd->lqeg_arr[i].lge_edquot_nu); @@ -1271,16 +1256,15 @@ void qmt_setup_lqe_gd(const struct lu_env *env, struct qmt_device *qmt, struct lquota_entry *lqe, struct lqe_glbl_data *lgd, int pool_type) { - __u64 qunit; - bool edquot; - int i; + __u64 qunit; + bool edquot; + int i; qunit = lqe->lqe_qunit; edquot = lqe->lqe_edquot; /* Firstly set all elements in array with * qunit and edquot of global pool */ - qmt_sarr_read_down(lqe2qpi(lqe)); for (i = 0; i < lgd->lqeg_num_used; i++) { lgd->lqeg_arr[i].lge_qunit = qunit; lgd->lqeg_arr[i].lge_edquot = edquot; @@ -1288,9 +1272,7 @@ void qmt_setup_lqe_gd(const struct lu_env *env, struct qmt_device *qmt, * will be sent to slaves during qmt_lvbo_fill. */ lgd->lqeg_arr[i].lge_qunit_nu = 0; lgd->lqeg_arr[i].lge_edquot_nu = 0; - lgd->lqeg_arr[i].lge_idx = qmt_sarr_get_idx(lqe2qpi(lqe), i); } - qmt_sarr_read_up(lqe2qpi(lqe)); qmt_pool_lqes_lookup_spec(env, qmt, pool_type, lqe_qtype(lqe), &lqe->lqe_id); diff --git a/lustre/quota/qmt_internal.h b/lustre/quota/qmt_internal.h index f99975b..d80109a 100644 --- a/lustre/quota/qmt_internal.h +++ b/lustre/quota/qmt_internal.h @@ -431,15 +431,8 @@ int qmt_pool_new_conn(const struct lu_env *, struct qmt_device *, qmt_pool_lookup(env, qmt, type, NULL, -1, false) #define qmt_pool_lookup_name(env, qmt, type, name) \ qmt_pool_lookup(env, qmt, type, name, -1, false) - -/* - * Until MDT pools are not emplemented, all MDTs belong to - * global pool, thus lookup lqes only in global pool for the - * DOM case. - */ -#define qmt_pool_lookup_arr(env, qmt, type, idx, stype) \ - qmt_pool_lookup(env, qmt, type, NULL, \ - qmt_dom(type, stype) ? -1 : idx, true) +#define qmt_pool_lookup_arr(env, qmt, type, idx) \ + qmt_pool_lookup(env, qmt, type, NULL, idx, true) struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env, struct qmt_device *qmt, int rtype, @@ -459,10 +452,7 @@ int qmt_pool_add(struct obd_device *obd, char *poolname, char *ostname); int qmt_pool_rem(struct obd_device *obd, char *poolname, char *ostname); int qmt_pool_del(struct obd_device *obd, char *poolname); -#define qmt_sarr_read_down(qpi) down_read(&qpi->qpi_sarr.osts.op_rw_sem) -#define qmt_sarr_read_up(qpi) up_read(&qpi->qpi_sarr.osts.op_rw_sem) -#define qmt_sarr_write_down(qpi) down_write(&qpi->qpi_sarr.osts.op_rw_sem) -#define qmt_sarr_write_up(qpi) up_write(&qpi->qpi_sarr.osts.op_rw_sem) +struct rw_semaphore *qmt_sarr_rwsem(struct qmt_pool_info *qpi); int qmt_sarr_get_idx(struct qmt_pool_info *qpi, int arr_idx); unsigned int qmt_sarr_count(struct qmt_pool_info *qpi); @@ -510,7 +500,6 @@ void qti_lqes_restore_fini(const struct lu_env *env); void qti_lqes_write_lock(const struct lu_env *env); void qti_lqes_write_unlock(const struct lu_env *env); -int qmt_map_lge_idx(struct lqe_glbl_data *lgd, int ostidx); struct lqe_glbl_data *qmt_alloc_lqe_gd(struct qmt_pool_info *, int); void qmt_free_lqe_gd(struct lqe_glbl_data *); void qmt_setup_lqe_gd(const struct lu_env *, struct qmt_device *, diff --git a/lustre/quota/qmt_lock.c b/lustre/quota/qmt_lock.c index 7301b25..465c42e 100644 --- a/lustre/quota/qmt_lock.c +++ b/lustre/quota/qmt_lock.c @@ -271,71 +271,48 @@ out: */ static bool qmt_clear_lgeg_arr_nu(struct lquota_entry *lqe, int stype, int idx) { - unsigned long least = lqe2qpi(lqe)->qpi_least_qunit; - bool revoke = false; + unsigned long least_qunit = lqe2qpi(lqe)->qpi_least_qunit; + struct lqe_glbl_data *lgd = lqe->lqe_glbl_data; /* There is no array to store lge for the case of DOM. - * Ignore it until MDT pools will be ready. - */ + * Ignore it until MDT pools will be ready. */ if (!qmt_dom(lqe_rtype(lqe), stype)) { - struct lqe_glbl_data *lgd; - - mutex_lock(&lqe->lqe_glbl_data_lock); - lgd = lqe->lqe_glbl_data; - if (lgd) { - int lge_idx = qmt_map_lge_idx(lgd, idx); + lqe->lqe_glbl_data->lqeg_arr[idx].lge_qunit_nu = 0; + lqe->lqe_glbl_data->lqeg_arr[idx].lge_edquot_nu = 0; - lgd->lqeg_arr[lge_idx].lge_qunit_nu = 0; - lgd->lqeg_arr[lge_idx].lge_edquot_nu = 0; - /* We shouldn't call revoke for DOM case, it will be - * updated at qmt_id_lock_glimpse. - */ - revoke = lgd->lqeg_arr[lge_idx].lge_qunit == least; - } - mutex_unlock(&lqe->lqe_glbl_data_lock); + /* We shouldn't call revoke for DOM case, it will be updated + * at qmt_id_lock_glimpse. */ + return (lgd->lqeg_arr[idx].lge_qunit == least_qunit); } - return revoke; + return false; } -static bool qmt_set_revoke(struct lu_env *env, struct lquota_entry *lqe_gl, +static bool qmt_set_revoke(struct lu_env *env, struct lquota_entry *lqe, int stype, int idx) { - unsigned long least_qunit = lqe2qpi(lqe_gl)->qpi_least_qunit; + unsigned long least_qunit = lqe2qpi(lqe)->qpi_least_qunit; + struct lqe_glbl_data *lgd = lqe->lqe_glbl_data; bool notify = false; - if (qmt_dom(lqe_rtype(lqe_gl), stype)) - return false; - - qti_lqes_write_lock(env); - mutex_lock(&lqe_gl->lqe_glbl_data_lock); - if (lqe_gl->lqe_glbl_data) { - struct lqe_glbl_data *lgd = lqe_gl->lqe_glbl_data; - int lge_idx; - - lge_idx = qmt_map_lge_idx(lgd, idx); - if (lgd->lqeg_arr[lge_idx].lge_qunit == least_qunit) { - struct lquota_entry *lqe; - int i; - - for (i = 0; i < qti_lqes_cnt(env); i++) { - lqe = qti_lqes(env)[i]; - LQUOTA_DEBUG(lqe, - "lge_qunit %llu least_qunit %lu idx %d\n", - lgd->lqeg_arr[lge_idx].lge_qunit, - least_qunit, idx); - if (lqe->lqe_qunit == least_qunit) { - lqe->lqe_revoke_time = + if (lgd->lqeg_arr[idx].lge_qunit == least_qunit) { + int i; + + qti_lqes_write_lock(env); + for (i = 0; i < qti_lqes_cnt(env); i++) { + LQUOTA_DEBUG(qti_lqes(env)[i], + "idx %d lge_qunit %llu least_qunit %lu\n", + idx, lgd->lqeg_arr[idx].lge_qunit, + least_qunit); + if (qti_lqes(env)[i]->lqe_qunit == least_qunit) { + qti_lqes(env)[i]->lqe_revoke_time = ktime_get_seconds(); - notify |= qmt_adjust_edquot(lqe, + notify |= qmt_adjust_edquot(qti_lqes(env)[i], ktime_get_real_seconds()); - } } } + qti_lqes_write_unlock(env); } - mutex_unlock(&lqe_gl->lqe_glbl_data_lock); - qti_lqes_write_unlock(env); - return notify; } @@ -426,16 +403,10 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res, GOTO(out_exp, rc); if (need_revoke && qmt_set_revoke(env, lqe, stype, idx)) { - int notify = false; - mutex_lock(&lqe->lqe_glbl_data_lock); - if (lqe->lqe_glbl_data) { - qmt_seed_glbe_edquot(env, lqe->lqe_glbl_data); - notify = true; - } + qmt_seed_glbe_edquot(env, lqe->lqe_glbl_data); + qmt_id_lock_notify(qmt, lqe); mutex_unlock(&lqe->lqe_glbl_data_lock); - if (notify) - qmt_id_lock_notify(qmt, lqe); } if (lvb->lvb_id_rel) { @@ -688,14 +659,12 @@ static void qmt_setup_id_desc(struct ldlm_lock *lock, union ldlm_gl_desc *desc, qunit = lqe->lqe_qunit; } else { struct lqe_glbl_data *lgd; - int lge_idx; mutex_lock(&lqe->lqe_glbl_data_lock); lgd = lqe->lqe_glbl_data; if (lgd) { - lge_idx = qmt_map_lge_idx(lgd, idx); - edquot = lgd->lqeg_arr[lge_idx].lge_edquot; - qunit = lgd->lqeg_arr[lge_idx].lge_qunit; + edquot = lgd->lqeg_arr[idx].lge_edquot; + qunit = lgd->lqeg_arr[idx].lge_qunit; } else { edquot = lqe->lqe_edquot; qunit = lqe->lqe_qunit; @@ -896,32 +865,20 @@ void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe, * broadcasting the new qunit value */ static int qmt_id_lock_cb(struct ldlm_lock *lock, struct lquota_entry *lqe) { - struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid; + struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid; struct lqe_glbl_data *lgd = lqe->lqe_glbl_data; int idx; int stype = qmt_uuid2idx(uuid, &idx); LASSERT(stype == QMT_STYPE_OST || stype == QMT_STYPE_MDT); - CDEBUG(D_QUOTA, "stype %d rtype %d idx %d uuid %s\n", - stype, lqe_rtype(lqe), idx, uuid->uuid); /* Quota pools support only OSTs, despite MDTs also could be registered * as LQUOTA_RES_DT devices(DOM). */ if (qmt_dom(lqe_rtype(lqe), stype)) return 1; - - if (lgd) { - int lge_idx = qmt_map_lge_idx(lgd, idx); - - CDEBUG(D_QUOTA, - "tgt idx:%d lge_idx:%d edquot_nu:%d qunit_nu:%d\n", - idx, lge_idx, lgd->lqeg_arr[lge_idx].lge_edquot_nu, - lgd->lqeg_arr[lge_idx].lge_qunit_nu); - return lgd->lqeg_arr[lge_idx].lge_edquot_nu || - lgd->lqeg_arr[lge_idx].lge_qunit_nu; - } - - return 0; + else + return lgd ? lgd->lqeg_arr[idx].lge_edquot_nu || + lgd->lqeg_arr[idx].lge_qunit_nu : 0; } diff --git a/lustre/quota/qmt_pool.c b/lustre/quota/qmt_pool.c index 07e3c9d..ba50583 100644 --- a/lustre/quota/qmt_pool.c +++ b/lustre/quota/qmt_pool.c @@ -55,14 +55,10 @@ #include "qmt_internal.h" static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi); -#define qmt_sarr_pool_add(qpi, idx, stype) \ - _qmt_sarr_pool_add(qpi, idx, stype, false) -#define qmt_sarr_pool_add_locked(qpi, idx, stype) \ - _qmt_sarr_pool_add(qpi, idx, stype, true) -static inline int _qmt_sarr_pool_add(struct qmt_pool_info *qpi, - int idx, int min, bool locked); +static inline int qmt_sarr_pool_add(struct qmt_pool_info *qpi, + int idx, int min); static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx); -static inline void qmt_sarr_pool_free(struct qmt_pool_info *qpi); +static inline int qmt_sarr_pool_free(struct qmt_pool_info *qpi); static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx); static inline void qmt_stop_pool_recalc(struct qmt_pool_info *qpi); @@ -514,34 +510,25 @@ int qmt_pool_init(const struct lu_env *env, struct qmt_device *qmt) RETURN(rc); } -static int qmt_slv_add(const struct lu_env *env, struct lu_fid *glb_fid, +static int qmt_slv_cnt(const struct lu_env *env, struct lu_fid *glb_fid, char *slv_name, struct lu_fid *slv_fid, void *arg) { struct obd_uuid uuid; - struct qmt_pool_info *qpi = arg; - int stype, qtype, idx; + int (*nr)[QMT_STYPE_CNT][LL_MAXQUOTAS] = arg; + int stype, qtype; int rc; rc = lquota_extract_fid(glb_fid, NULL, &qtype); LASSERT(!rc); obd_str2uuid(&uuid, slv_name); - stype = qmt_uuid2idx(&uuid, &idx); + stype = qmt_uuid2idx(&uuid, NULL); if (stype < 0) return stype; - - CDEBUG(D_QUOTA, "add new idx:%d in %s\n", idx, qpi->qpi_name); - rc = qmt_sarr_pool_add(qpi, idx, stype); - if (rc && rc != -EEXIST) { - CERROR("%s: can't add idx %d into dt-0x0: rc = %d\n", - qpi->qpi_qmt->qmt_svname, idx, rc); - return rc; - } - /* one more slave */ - qpi->qpi_slv_nr[stype][qtype]++; + (*nr)[stype][qtype]++; CDEBUG(D_QUOTA, "slv_name %s stype %d qtype %d nr %d\n", - slv_name, stype, qtype, qpi->qpi_slv_nr[stype][qtype]); + slv_name, stype, qtype, (*nr)[stype][qtype]); return 0; } @@ -661,8 +648,8 @@ int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt, rc = lquota_disk_for_each_slv(env, pool->qpi_root, &qti->qti_fid, - qmt_slv_add, - pool); + qmt_slv_cnt, + &pool->qpi_slv_nr); if (rc) { CERROR("%s: failed to scan & count slave indexes for %s type: rc = %d\n", qmt->qmt_svname, qtype_name(qtype), rc); @@ -700,76 +687,6 @@ int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt, RETURN(0); } -static int qmt_lgd_extend_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd, - struct hlist_node *hnode, void *data) -{ - struct lqe_glbl_entry *lqeg_arr, *old_lqeg_arr; - struct lquota_entry *lqe; - int old_num, rc; - - lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash); - LASSERT(atomic_read(&lqe->lqe_ref) > 0); - old_num = 0; - rc = 0; - - CDEBUG(D_QUOTA, "lgd %px\n", lqe->lqe_glbl_data); - if (lqe->lqe_glbl_data) { - struct lqe_glbl_data *lgd; - - old_lqeg_arr = NULL; - mutex_lock(&lqe->lqe_glbl_data_lock); - if (lqe->lqe_glbl_data) { - struct qmt_pool_info *qpi = lqe2qpi(lqe); - int sarr_cnt = qmt_sarr_count(qpi); - - lgd = lqe->lqe_glbl_data; - if (lgd->lqeg_num_alloc < sarr_cnt) { - LASSERT((lgd->lqeg_num_alloc + 1) == sarr_cnt); - - OBD_ALLOC(lqeg_arr, - sizeof(struct lqe_glbl_entry) * - (lgd->lqeg_num_alloc + 16)); - if (lqeg_arr) { - memcpy(lqeg_arr, lgd->lqeg_arr, - sizeof(struct lqe_glbl_entry) * - (lgd->lqeg_num_alloc)); - old_lqeg_arr = lgd->lqeg_arr; - old_num = lgd->lqeg_num_alloc; - lgd->lqeg_arr = lqeg_arr; - lgd->lqeg_num_alloc += 16; - CDEBUG(D_QUOTA, - "extend lqeg_arr:%px from %d to %d\n", - lgd, old_num, - lgd->lqeg_num_alloc); - } else { - CERROR("%s: cannot allocate new lqeg_arr: rc = %d\n", - qpi->qpi_qmt->qmt_svname, - -ENOMEM); - GOTO(out, rc = -ENOMEM); - } - } - lgd->lqeg_arr[lgd->lqeg_num_used].lge_idx = - qmt_sarr_get_idx(qpi, sarr_cnt - 1); - lgd->lqeg_arr[lgd->lqeg_num_used].lge_edquot = - lqe->lqe_edquot; - lgd->lqeg_arr[lgd->lqeg_num_used].lge_qunit = - lqe->lqe_qunit; - lgd->lqeg_arr[lgd->lqeg_num_used].lge_edquot_nu = 0; - lgd->lqeg_arr[lgd->lqeg_num_used].lge_qunit_nu = 0; - LQUOTA_DEBUG(lqe, "add tgt idx:%d used %d alloc %d\n", - lgd->lqeg_arr[lgd->lqeg_num_used].lge_idx, - lgd->lqeg_num_used, lgd->lqeg_num_alloc); - lgd->lqeg_num_used++; - } -out: - mutex_unlock(&lqe->lqe_glbl_data_lock); - if (old_lqeg_arr) - OBD_FREE(old_lqeg_arr, old_num * sizeof(struct lqe_glbl_entry)); - } - - return rc; -} - /* * Handle new slave connection. Called when a slave enqueues the global quota * lock at the beginning of the reintegration procedure. @@ -797,14 +714,15 @@ int qmt_pool_new_conn(const struct lu_env *env, struct qmt_device *qmt, stype = qmt_uuid2idx(uuid, &idx); if (stype < 0) RETURN(stype); - CDEBUG(D_QUOTA, "FID "DFID"\n", PFID(glb_fid)); /* extract pool info from global index FID */ rc = lquota_extract_fid(glb_fid, &pool_type, &qtype); if (rc) RETURN(rc); - pool = qmt_pool_lookup_glb(env, qmt, pool_type); + /* look-up pool in charge of this global index FID */ + qti_pools_init(env); + pool = qmt_pool_lookup_arr(env, qmt, pool_type, idx); if (IS_ERR(pool)) RETURN(PTR_ERR(pool)); @@ -829,51 +747,11 @@ int qmt_pool_new_conn(const struct lu_env *env, struct qmt_device *qmt, memcpy(slv_fid, lu_object_fid(&slv_obj->do_lu), sizeof(*slv_fid)); *slv_ver = dt_version_get(env, slv_obj); dt_object_put(env, slv_obj); - if (created) { - struct qmt_pool_info *ptr; - - CDEBUG(D_QUOTA, "add tgt idx:%d pool_type:%d qtype:%d stype:%d\n", - idx, pool_type, qtype, stype); - - if (!qmt_dom(qtype, stype)) { - qmt_sarr_write_down(pool); - rc = qmt_sarr_pool_add_locked(pool, idx, stype); - if (!rc) { - for (i = 0; i < LL_MAXQUOTAS; i++) - cfs_hash_for_each(pool->qpi_site[i]-> - lqs_hash, - qmt_lgd_extend_cb, - &env); - } else if (rc == -EEXIST) { - /* This target has been already added - * by another qtype - */ - rc = 0; - } - qmt_sarr_write_up(pool); - - if (rc) { - CERROR("%s: cannot add idx:%d to pool %s: rc = %d\n", - qmt->qmt_svname, idx, - pool->qpi_name, rc); - GOTO(out, rc); - } - } - - /* look-up pool in charge of this global index FID */ - qti_pools_init(env); - ptr = qmt_pool_lookup_arr(env, qmt, pool_type, idx, stype); - if (IS_ERR(ptr)) - GOTO(out, rc = PTR_ERR(ptr)); - + if (created) for (i = 0; i < qti_pools_cnt(env); i++) qti_pools_env(env)[i]->qpi_slv_nr[stype][qtype]++; - - qti_pools_fini(env); - } - out: - qpi_putref(env, pool); + qti_pools_fini(env); RETURN(rc); } @@ -932,10 +810,15 @@ int qmt_pool_lqes_lookup(const struct lu_env *env, int rc, i; ENTRY; + /* Until MDT pools are not emplemented, all MDTs belong to + * global pool, thus lookup lqes only from global pool. */ + if (qmt_dom(rtype, stype)) + idx = -1; + qti_pools_init(env); rc = 0; /* look-up pool responsible for this global index FID */ - pool = qmt_pool_lookup_arr(env, qmt, rtype, idx, stype); + pool = qmt_pool_lookup_arr(env, qmt, rtype, idx); if (IS_ERR(pool)) { qti_pools_fini(env); RETURN(PTR_ERR(pool)); @@ -1287,10 +1170,10 @@ static struct obd_device *qmt_get_mgc(struct qmt_device *qmt) static int qmt_pool_recalc(void *args) { struct qmt_pool_info *pool, *glbl_pool; + struct rw_semaphore *sem = NULL; struct obd_device *obd; struct lu_env env; int i, rc, qtype, slaves_cnt; - bool sem = false; ENTRY; pool = args; @@ -1305,16 +1188,16 @@ static int qmt_pool_recalc(void *args) obd = qmt_get_mgc(pool->qpi_qmt); if (IS_ERR(obd)) GOTO(out, rc = PTR_ERR(obd)); - - /* Waiting for the end of processing mgs config. - * It is needed to be sure all pools are configured. - */ - while (obd->obd_process_conf) - schedule_timeout_uninterruptible(cfs_time_seconds(1)); + else + /* Waiting for the end of processing mgs config. + * It is needed to be sure all pools are configured. */ + while (obd->obd_process_conf) + schedule_timeout_uninterruptible(cfs_time_seconds(1)); OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_RECALC, cfs_fail_val); - qmt_sarr_read_down(pool); - sem = true; + sem = qmt_sarr_rwsem(pool); + LASSERT(sem); + down_read(sem); /* Hold this to be sure that OSTs from this pool * can't do acquire/release. * @@ -1394,7 +1277,7 @@ out: * Thus until up_read, no one can restart recalc thread. */ if (sem) { - qmt_sarr_read_up(pool); + up_read(sem); up_write(&pool->qpi_recalc_sem); } @@ -1524,7 +1407,7 @@ static int qmt_pool_add_rem(struct obd_device *obd, char *poolname, GOTO(out, rc = PTR_ERR(qpi)); } - rc = add ? qmt_sarr_pool_add(qpi, idx, QMT_STYPE_OST) : + rc = add ? qmt_sarr_pool_add(qpi, idx, 32) : qmt_sarr_pool_rem(qpi, idx); if (rc) { CERROR("%s: can't %s %s pool %s: rc = %d\n", @@ -1644,31 +1527,57 @@ int qmt_pool_del(struct obd_device *obd, char *poolname) static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi) { - return tgt_pool_init(&qpi->qpi_sarr.osts, 0); + + /* No need to initialize sarray for global pool + * as it always includes all slaves */ + if (qmt_pool_global(qpi)) + return 0; + + switch (qpi->qpi_rtype) { + case LQUOTA_RES_DT: + return tgt_pool_init(&qpi->qpi_sarr.osts, 0); + case LQUOTA_RES_MD: + default: + return 0; + } } -static inline int -_qmt_sarr_pool_add(struct qmt_pool_info *qpi, int idx, int stype, bool locked) +static inline int qmt_sarr_pool_add(struct qmt_pool_info *qpi, int idx, int min) { - /* We don't have an array for DOM */ - if (qmt_dom(qpi->qpi_rtype, stype)) + switch (qpi->qpi_rtype) { + case LQUOTA_RES_DT: + return tgt_pool_add(&qpi->qpi_sarr.osts, idx, min); + case LQUOTA_RES_MD: + default: return 0; - - if (locked) - return tgt_pool_add_locked(&qpi->qpi_sarr.osts, idx, 32); - else - return tgt_pool_add(&qpi->qpi_sarr.osts, idx, 32); + } } static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx) { - return tgt_pool_remove(&qpi->qpi_sarr.osts, idx); + switch (qpi->qpi_rtype) { + case LQUOTA_RES_DT: + return tgt_pool_remove(&qpi->qpi_sarr.osts, idx); + case LQUOTA_RES_MD: + default: + return 0; + } } -static inline void qmt_sarr_pool_free(struct qmt_pool_info *qpi) +static inline int qmt_sarr_pool_free(struct qmt_pool_info *qpi) { - if (qpi->qpi_sarr.osts.op_array) - tgt_pool_free(&qpi->qpi_sarr.osts); + if (qmt_pool_global(qpi)) + return 0; + + switch (qpi->qpi_rtype) { + case LQUOTA_RES_DT: + if (!qpi->qpi_sarr.osts.op_array) + return 0; + return tgt_pool_free(&qpi->qpi_sarr.osts); + case LQUOTA_RES_MD: + default: + return 0; + } } static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx) @@ -1676,19 +1585,53 @@ static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx) if (qmt_pool_global(qpi)) return 0; - return tgt_check_index(idx, &qpi->qpi_sarr.osts); + switch (qpi->qpi_rtype) { + case LQUOTA_RES_DT: + return tgt_check_index(idx, &qpi->qpi_sarr.osts); + case LQUOTA_RES_MD: + default: + return 0; + } +} + +struct rw_semaphore *qmt_sarr_rwsem(struct qmt_pool_info *qpi) +{ + switch (qpi->qpi_rtype) { + case LQUOTA_RES_DT: + /* to protect ost_pool use */ + return &qpi->qpi_sarr.osts.op_rw_sem; + case LQUOTA_RES_MD: + default: + return NULL; + } } int qmt_sarr_get_idx(struct qmt_pool_info *qpi, int arr_idx) { - LASSERTF(arr_idx < qpi->qpi_sarr.osts.op_count && arr_idx >= 0, - "idx invalid %d op_count %d\n", arr_idx, - qpi->qpi_sarr.osts.op_count); - return qpi->qpi_sarr.osts.op_array[arr_idx]; + + if (qmt_pool_global(qpi)) + return arr_idx; + + switch (qpi->qpi_rtype) { + case LQUOTA_RES_DT: + LASSERTF(arr_idx < qpi->qpi_sarr.osts.op_count && arr_idx >= 0, + "idx invalid %d op_count %d\n", arr_idx, + qpi->qpi_sarr.osts.op_count); + return qpi->qpi_sarr.osts.op_array[arr_idx]; + case LQUOTA_RES_MD: + default: + return -EINVAL; + } } /* Number of slaves in a pool */ unsigned int qmt_sarr_count(struct qmt_pool_info *qpi) { - return qpi->qpi_sarr.osts.op_count; + switch (qpi->qpi_rtype) { + case LQUOTA_RES_DT: + return qpi->qpi_sarr.osts.op_count; + case LQUOTA_RES_MD: + default: + return -EINVAL; + } } diff --git a/lustre/target/tgt_pool.c b/lustre/target/tgt_pool.c index f1f7219..4beef3a 100644 --- a/lustre/target/tgt_pool.c +++ b/lustre/target/tgt_pool.c @@ -132,15 +132,13 @@ EXPORT_SYMBOL(tgt_pool_extend); * \retval 0 if target could be added to the pool * \retval negative error if target \a idx was not added */ -int tgt_pool_add_lock(struct lu_tgt_pool *op, __u32 idx, - unsigned int min_count, bool lock) +int tgt_pool_add(struct lu_tgt_pool *op, __u32 idx, unsigned int min_count) { unsigned int i; int rc = 0; ENTRY; - if (lock) - down_write(&op->op_rw_sem); + down_write(&op->op_rw_sem); rc = tgt_pool_extend(op, min_count); if (rc) @@ -156,11 +154,10 @@ int tgt_pool_add_lock(struct lu_tgt_pool *op, __u32 idx, op->op_count++; EXIT; out: - if (lock) - up_write(&op->op_rw_sem); + up_write(&op->op_rw_sem); return rc; } -EXPORT_SYMBOL(tgt_pool_add_lock); +EXPORT_SYMBOL(tgt_pool_add); /** * Remove an existing pool from the system. diff --git a/lustre/tests/conf-sanity.sh b/lustre/tests/conf-sanity.sh index f7bbaf6..3cb295e 100644 --- a/lustre/tests/conf-sanity.sh +++ b/lustre/tests/conf-sanity.sh @@ -15,7 +15,7 @@ init_logging ALWAYS_EXCEPT="$CONF_SANITY_EXCEPT 32newtarball" # bug number for skipped test: LU-11915 EX-3746 -ALWAYS_EXCEPT="$ALWAYS_EXCEPT 110 115 5a" +ALWAYS_EXCEPT="$ALWAYS_EXCEPT 33c 110 115 5a" # UPDATE THE COMMENT ABOVE WITH BUG NUMBERS WHEN CHANGING ALWAYS_EXCEPT! if $SHARED_KEY; then @@ -2653,9 +2653,6 @@ test_33c() { local mgs_flag="--mgs" fi - load_modules - stack_trap unload_modules_conf - add fs2mds $(mkfs_opts mds1 ${fs2mdsdev}) --fsname=${FSNAME} \ --reformat $mgs_flag $mkfsoptions $fs2mdsdev $fs2mdsvdev || exit 10 @@ -2671,8 +2668,8 @@ test_33c() { mount_client $MOUNT || error "client start failed" stack_trap "umount_client $MOUNT" - mkdir_on_mdt0 $DIR/$tdir || error "cannot create $DIR/$tdir" - chmod 0777 $DIR/$tdir || error "chown failed" + mkdir_on_mdt0 $DIR/$tdir || "cannot create $DIR/$tdir" + chmod 0777 $DIR/$tdir || "chown failed" if [[ $PERM_CMD == *"set_param -P"* ]]; then do_facet mgs $PERM_CMD \ osd-*.$FSNAME-OST*.quota_slave.enable=$QUOTA_TYPE -- 1.8.3.1