Whamcloud - gitweb
LU-17034 quota: lqeg_arr memmory corruption 94/52094/9
authorSergey Cheremencev <scherementsev@ddn.com>
Fri, 25 Aug 2023 06:22:26 +0000 (10:22 +0400)
committerOleg Drokin <green@whamcloud.com>
Sat, 18 Nov 2023 21:40:36 +0000 (21:40 +0000)
Fix memory corruption caused by accessing memory
out of array lqeg_arr. It could happen when at least
one of OSTs has index larger than the whole number
of OSTs. For example, if the system has 4 OSTs with
indexes 0001, 0002, 00c9, 00ca. This issue more often
corrupted bucket_table in obd_uuid_hash or obd_nid_hash
causing to crash rhashtable code. However, it could
be the reason of other panics depending on the type
of corrupted neighbour memory region.

This patch adds an lge_idx field to each lqe global entry
to store index of the OST. It is needed to map OST index
to the array index to avoid out-of-bound array access.

This patch also add locking to protect lqe_glbl_data in
qmt_set_revoke and qmt_clear_lgeg_arr_nu. This was
forgotten in 50ff4d1da6.

This patch begins to store all connected MDTs in the quota
global pool. Thus handling MDTs beginning from this patch
is the same with OSTs stored in the global pool. It is the
1st step to introduce MDT pools.

Add conf-sanity_33c that reproduces mentioned memory
corruption without the fix.

Fixes: 50ff4d1da6 ("LU-16772 quota: protect lqe_glbl_data in qmt_site_recalc_cb")
Signed-off-by: Sergey Cheremencev <scherementsev@ddn.com>
Change-Id: Id6e4bcde09d9f32726d69f711eedb82729a2266e
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/52094
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Hongchao Zhang <hongchao@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/include/lu_object.h
lustre/obdclass/lu_tgt_pool.c
lustre/quota/lquota_internal.h
lustre/quota/qmt_entry.c
lustre/quota/qmt_internal.h
lustre/quota/qmt_lock.c
lustre/quota/qmt_pool.c
lustre/tests/conf-sanity.sh
lustre/tests/test-framework.sh

index 7075042..fada7c8 100644 (file)
@@ -1587,7 +1587,12 @@ struct lu_tgt_pool {
 };
 
 int lu_tgt_pool_init(struct lu_tgt_pool *op, unsigned int count);
-int lu_tgt_pool_add(struct lu_tgt_pool *op, __u32 idx, unsigned int min_count);
+#define lu_tgt_pool_add(op, idx, min_count) \
+               lu_tgt_pool_add_lock(op, idx, min_count, true)
+#define lu_tgt_pool_add_locked(op, idx, min_count) \
+               lu_tgt_pool_add_lock(op, idx, min_count, false)
+int lu_tgt_pool_add_lock(struct lu_tgt_pool *op, __u32 idx,
+                        unsigned int min_count, bool locked);
 int lu_tgt_pool_remove(struct lu_tgt_pool *op, __u32 idx);
 void lu_tgt_pool_free(struct lu_tgt_pool *op);
 int lu_tgt_check_index(int idx, struct lu_tgt_pool *osts);
index 4bf0d16..7ef83ce 100644 (file)
@@ -132,13 +132,15 @@ EXPORT_SYMBOL(lu_tgt_pool_extend);
  * \retval             0 if target could be added to the pool
  * \retval             negative error if target \a idx was not added
  */
-int lu_tgt_pool_add(struct lu_tgt_pool *op, __u32 idx, unsigned int min_count)
+int lu_tgt_pool_add_lock(struct lu_tgt_pool *op, __u32 idx,
+                        unsigned int min_count, bool lock)
 {
        unsigned int i;
        int rc = 0;
        ENTRY;
 
-       down_write(&op->op_rw_sem);
+       if (lock)
+               down_write(&op->op_rw_sem);
 
        /* search ost in pool array */
        for (i = 0; i < op->op_count; i++) {
@@ -155,10 +157,11 @@ int lu_tgt_pool_add(struct lu_tgt_pool *op, __u32 idx, unsigned int min_count)
        op->op_count++;
        EXIT;
 out:
-       up_write(&op->op_rw_sem);
+       if (lock)
+               up_write(&op->op_rw_sem);
        return rc;
 }
-EXPORT_SYMBOL(lu_tgt_pool_add);
+EXPORT_SYMBOL(lu_tgt_pool_add_lock);
 
 /**
  * Remove an existing pool from the system.
index 734cd1f..bd05ef0 100644 (file)
@@ -202,7 +202,9 @@ struct lquota_entry {
 
 struct lqe_glbl_entry {
        __u64                    lge_qunit;
-       unsigned long            lge_edquot:1,
+       unsigned long            lge_idx:16,
+                                /* index of target */
+                                lge_edquot:1,
                                 /* true when minimum qunit is set */
                                 lge_qunit_set:1,
                                 /* qunit or edquot is changed - need
index ba910c2..fb441a1 100644 (file)
@@ -830,8 +830,8 @@ bool qmt_adjust_edquot_qunit_notify(const struct lu_env *env,
                                    int idx)
 {
        struct lquota_entry *lqe_gl, *lqe;
-       bool need_reseed = false;
-       bool need_notify = false;
+       bool reseed = false;
+       bool notify = false;
        int i;
 
        lqe_gl = qti_lqes_glbl(env);
@@ -839,30 +839,31 @@ bool qmt_adjust_edquot_qunit_notify(const struct lu_env *env,
        for (i = 0; i < qti_lqes_cnt(env); i++) {
                lqe = qti_lqes(env)[i];
                if (qunit)
-                       need_reseed |= qmt_adjust_qunit(env, lqe);
+                       reseed |= qmt_adjust_qunit(env, lqe);
                if (edquot)
-                       need_reseed |= qmt_adjust_edquot(lqe, now);
+                       reseed |= qmt_adjust_edquot(lqe, now);
        }
 
        LASSERT(lqe_gl);
        if (!lqe_gl->lqe_glbl_data &&
            (req_has_rep(qb_flags) || req_is_rel(qb_flags))) {
-               if (need_reseed)
+               if (reseed)
                        CDEBUG(D_QUOTA,
                               "%s: can not notify - lge_glbl_data is not set\n",
                               qmt->qmt_svname);
-               return need_reseed;
+               return reseed;
        }
 
-       if (need_reseed || idx >= 0) {
+       if (reseed || idx >= 0) {
                mutex_lock(&lqe_gl->lqe_glbl_data_lock);
                if (lqe_gl->lqe_glbl_data) {
                        struct lqe_glbl_data *lgd = lqe_gl->lqe_glbl_data;
 
-                       if (need_reseed) {
+                       if (reseed) {
                                qmt_seed_glbe_all(env, lgd, qunit, edquot);
                        } else if (idx >= 0) {
-                               LASSERT(idx <= lgd->lqeg_num_used);
+                               int lge_idx = qmt_map_lge_idx(lgd, idx);
+
                                /* If there are no locks yet when
                                 * lge_qunit/edquot_nu is set, slaves
                                 * are still not notified with new
@@ -872,17 +873,17 @@ bool qmt_adjust_edquot_qunit_notify(const struct lu_env *env,
                                 * to the least qunit, but lqe_revoke_time is
                                 * still not set.
                                 */
-                               need_notify = lgd->lqeg_arr[idx].lge_qunit_nu ||
-                                             lgd->lqeg_arr[idx].lge_edquot_nu;
+                               notify = lgd->lqeg_arr[lge_idx].lge_qunit_nu ||
+                                        lgd->lqeg_arr[lge_idx].lge_edquot_nu;
                        }
                }
                mutex_unlock(&lqe_gl->lqe_glbl_data_lock);
        }
 
-       if (need_reseed || need_notify)
+       if (reseed || notify)
                qmt_id_lock_notify(qmt, lqe_gl);
 
-       return need_reseed;
+       return reseed;
 }
 
 
@@ -1087,7 +1088,7 @@ void qti_lqes_write_unlock(const struct lu_env *env)
                lqe_write_unlock(qti_lqes(env)[i]);
 }
 
-#define QMT_INIT_SLV_CNT       64
+#define QMT_INIT_SLV_CNT 64
 struct lqe_glbl_data *qmt_alloc_lqe_gd(struct qmt_pool_info *pool, int qtype)
 {
        struct lqe_glbl_data    *lgd;
@@ -1126,12 +1127,29 @@ void qmt_free_lqe_gd(struct lqe_glbl_data *lgd)
        OBD_FREE(lgd, sizeof(struct lqe_glbl_data));
 }
 
+int qmt_map_lge_idx(struct lqe_glbl_data *lgd, int ostidx)
+{
+       int k;
+
+       /* check common case of sequential OST numbers first */
+       if (ostidx < lgd->lqeg_num_used &&
+           lgd->lqeg_arr[ostidx].lge_idx == ostidx)
+               return ostidx;
+
+       for (k = 0; k < lgd->lqeg_num_used; k++)
+               if (lgd->lqeg_arr[k].lge_idx == ostidx)
+                       break;
+
+       LASSERTF(k < lgd->lqeg_num_used, "Cannot map ostidx %d for %p\n",
+                ostidx, lgd);
+       return k;
+}
+
 void qmt_seed_glbe_all(const struct lu_env *env, struct lqe_glbl_data *lgd,
                       bool qunit, bool edquot)
 {
-       struct rw_semaphore     *sem = NULL;
-       struct qmt_pool_info    *qpi;
-       int                      i, j, idx;
+       struct qmt_pool_info *qpi;
+       int i, j;
        ENTRY;
 
        if (!qti_lqes_cnt(env))
@@ -1164,18 +1182,15 @@ void qmt_seed_glbe_all(const struct lu_env *env, struct lqe_glbl_data *lgd,
 
                CDEBUG(D_QUOTA, "lqes_cnt %d, i %d\n", qti_lqes_cnt(env), i);
                qpi = lqe2qpi(lqe);
-               if (qmt_pool_global(qpi)) {
-                       slaves_cnt = qpi_slv_nr_by_rtype(lqe2qpi(lqe),
-                                                        lqe_qtype(lqe));
-               } else {
-                       sem = qmt_sarr_rwsem(qpi);
-                       down_read(sem);
-                       slaves_cnt = qmt_sarr_count(qpi);
-               }
+               qmt_sarr_read_down(qpi);
+               slaves_cnt = qmt_sarr_count(qpi);
 
                for (j = 0; j < slaves_cnt; j++) {
-                       idx = qmt_sarr_get_idx(qpi, j);
-                       LASSERT(idx >= 0);
+                       int idx, tgt_idx;
+
+                       tgt_idx = qmt_sarr_get_idx(qpi, j);
+                       LASSERT(tgt_idx >= 0);
+                       idx = qmt_map_lge_idx(lgd, tgt_idx);
 
                        if (edquot) {
                                int lge_edquot, new_edquot, edquot_nu;
@@ -1205,8 +1220,9 @@ qunit_lbl:
                                __u64 lge_qunit, new_qunit;
 
                                CDEBUG(D_QUOTA,
-                                      "idx %d lge_qunit_set %d lge_qunit %llu new_qunit %llu\n",
-                                      idx, lgd->lqeg_arr[idx].lge_qunit_set,
+                                      "tgt_idx %d idx %d lge_qunit_set %d lge_qunit %llu new_qunit %llu\n",
+                                      tgt_idx, idx,
+                                      lgd->lqeg_arr[idx].lge_qunit_set,
                                       lgd->lqeg_arr[idx].lge_qunit,
                                       lqe->lqe_qunit);
                                /* lge for this idx is already set
@@ -1233,14 +1249,14 @@ qunit_lbl:
                        }
                }
 
-               if (!qmt_pool_global(qpi))
-                       up_read(sem);
+               qmt_sarr_read_up(qpi);
        }
        /* TODO: only for debug purposes - remove it later */
        for (i = 0; i < lgd->lqeg_num_used; i++)
                CDEBUG(D_QUOTA,
-                       "lgd ost %d, qunit %lu nu %d;  edquot %d nu %d\n",
-                       i, (long unsigned)lgd->lqeg_arr[i].lge_qunit,
+                       "lgd i %d tgt_idx %d qunit %lu nu %d;  edquot %d nu %d\n",
+                       i, lgd->lqeg_arr[i].lge_idx,
+                       (unsigned long)lgd->lqeg_arr[i].lge_qunit,
                        lgd->lqeg_arr[i].lge_qunit_nu,
                        lgd->lqeg_arr[i].lge_edquot,
                        lgd->lqeg_arr[i].lge_edquot_nu);
@@ -1252,15 +1268,16 @@ void qmt_setup_lqe_gd(const struct lu_env *env, struct qmt_device *qmt,
                      struct lquota_entry *lqe, struct lqe_glbl_data *lgd,
                      int pool_type)
 {
-       __u64                    qunit;
-       bool                     edquot;
-       int                      i;
+       __u64 qunit;
+       bool edquot;
+       int i;
 
        qunit = lqe->lqe_qunit;
        edquot = lqe->lqe_edquot;
 
        /* Firstly set all elements in array with
         * qunit and edquot of global pool */
+       qmt_sarr_read_down(lqe2qpi(lqe));
        for (i = 0; i < lgd->lqeg_num_used; i++) {
                lgd->lqeg_arr[i].lge_qunit = qunit;
                lgd->lqeg_arr[i].lge_edquot = edquot;
@@ -1268,7 +1285,9 @@ void qmt_setup_lqe_gd(const struct lu_env *env, struct qmt_device *qmt,
                 * will be sent to slaves during qmt_lvbo_fill. */
                lgd->lqeg_arr[i].lge_qunit_nu = 0;
                lgd->lqeg_arr[i].lge_edquot_nu = 0;
+               lgd->lqeg_arr[i].lge_idx = qmt_sarr_get_idx(lqe2qpi(lqe), i);
        }
+       qmt_sarr_read_up(lqe2qpi(lqe));
 
        qmt_pool_lqes_lookup_spec(env, qmt, pool_type,
                                  lqe_qtype(lqe), &lqe->lqe_id);
index 1474a8a..935bb8a 100644 (file)
@@ -431,8 +431,15 @@ int qmt_pool_new_conn(const struct lu_env *, struct qmt_device *,
                qmt_pool_lookup(env, qmt, type, NULL, -1, false)
 #define qmt_pool_lookup_name(env, qmt, type, name) \
                qmt_pool_lookup(env, qmt, type, name, -1, false)
-#define qmt_pool_lookup_arr(env, qmt, type, idx) \
-               qmt_pool_lookup(env, qmt, type, NULL, idx, true)
+
+/*
+ * Until MDT pools are not emplemented, all MDTs belong to
+ * global pool, thus lookup lqes only in global pool for the
+ * DOM case.
+ */
+#define qmt_pool_lookup_arr(env, qmt, type, idx, stype) \
+               qmt_pool_lookup(env, qmt, type, NULL, \
+               qmt_dom(type, stype) ? -1 : idx, true)
 struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
                                             struct qmt_device *qmt,
                                             int rtype,
@@ -452,7 +459,10 @@ int qmt_pool_add(struct obd_device *obd, char *poolname, char *ostname);
 int qmt_pool_rem(struct obd_device *obd, char *poolname, char *ostname);
 int qmt_pool_del(struct obd_device *obd, char *poolname);
 
-struct rw_semaphore *qmt_sarr_rwsem(struct qmt_pool_info *qpi);
+#define qmt_sarr_read_down(qpi) down_read(&qpi->qpi_sarr.osts.op_rw_sem)
+#define qmt_sarr_read_up(qpi) up_read(&qpi->qpi_sarr.osts.op_rw_sem)
+#define qmt_sarr_write_down(qpi) down_write(&qpi->qpi_sarr.osts.op_rw_sem)
+#define qmt_sarr_write_up(qpi) up_write(&qpi->qpi_sarr.osts.op_rw_sem)
 int qmt_sarr_get_idx(struct qmt_pool_info *qpi, int arr_idx);
 unsigned int qmt_sarr_count(struct qmt_pool_info *qpi);
 
@@ -500,6 +510,7 @@ void qti_lqes_restore_fini(const struct lu_env *env);
 void qti_lqes_write_lock(const struct lu_env *env);
 void qti_lqes_write_unlock(const struct lu_env *env);
 
+int qmt_map_lge_idx(struct lqe_glbl_data *lgd, int ostidx);
 struct lqe_glbl_data *qmt_alloc_lqe_gd(struct qmt_pool_info *, int);
 void qmt_free_lqe_gd(struct lqe_glbl_data *);
 void qmt_setup_lqe_gd(const struct lu_env *,  struct qmt_device *,
index 1b42537..7c78e7e 100644 (file)
@@ -271,48 +271,71 @@ out:
  */
 static bool qmt_clear_lgeg_arr_nu(struct lquota_entry *lqe, int stype, int idx)
 {
-       unsigned long least_qunit = lqe2qpi(lqe)->qpi_least_qunit;
-       struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
+       unsigned long least = lqe2qpi(lqe)->qpi_least_qunit;
+       bool revoke = false;
 
        /* There is no array to store lge for the case of DOM.
-        * Ignore it until MDT pools will be ready. */
+        * Ignore it until MDT pools will be ready.
+        */
        if (!qmt_dom(lqe_rtype(lqe), stype)) {
-               lqe->lqe_glbl_data->lqeg_arr[idx].lge_qunit_nu = 0;
-               lqe->lqe_glbl_data->lqeg_arr[idx].lge_edquot_nu = 0;
+               struct lqe_glbl_data *lgd;
+
+               mutex_lock(&lqe->lqe_glbl_data_lock);
+               lgd = lqe->lqe_glbl_data;
+               if (lgd) {
+                       int lge_idx = qmt_map_lge_idx(lgd, idx);
 
-               /* We shouldn't call revoke for DOM case, it will be updated
-                * at qmt_id_lock_glimpse. */
-               return (lgd->lqeg_arr[idx].lge_qunit == least_qunit);
+                       lgd->lqeg_arr[lge_idx].lge_qunit_nu = 0;
+                       lgd->lqeg_arr[lge_idx].lge_edquot_nu = 0;
+                       /* We shouldn't call revoke for DOM case, it will be
+                        * updated at qmt_id_lock_glimpse.
+                        */
+                       revoke = lgd->lqeg_arr[lge_idx].lge_qunit == least;
+               }
+               mutex_unlock(&lqe->lqe_glbl_data_lock);
        }
 
-       return false;
+       return revoke;
 }
 
-static bool qmt_set_revoke(struct lu_env *env, struct lquota_entry *lqe,
+static bool qmt_set_revoke(struct lu_env *env, struct lquota_entry *lqe_gl,
                          int stype, int idx)
 {
-       unsigned long least_qunit = lqe2qpi(lqe)->qpi_least_qunit;
-       struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
+       unsigned long least_qunit = lqe2qpi(lqe_gl)->qpi_least_qunit;
        bool notify = false;
 
-       if (lgd->lqeg_arr[idx].lge_qunit == least_qunit) {
-               int i;
-
-               qti_lqes_write_lock(env);
-               for (i = 0; i < qti_lqes_cnt(env); i++) {
-                       LQUOTA_DEBUG(qti_lqes(env)[i],
-                                    "idx %d lge_qunit %llu least_qunit %lu\n",
-                                    idx, lgd->lqeg_arr[idx].lge_qunit,
-                                    least_qunit);
-                       if (qti_lqes(env)[i]->lqe_qunit == least_qunit) {
-                               qti_lqes(env)[i]->lqe_revoke_time =
+       if (qmt_dom(lqe_rtype(lqe_gl), stype))
+               return false;
+
+       qti_lqes_write_lock(env);
+       mutex_lock(&lqe_gl->lqe_glbl_data_lock);
+       if (lqe_gl->lqe_glbl_data) {
+               struct lqe_glbl_data *lgd = lqe_gl->lqe_glbl_data;
+               int lge_idx;
+
+               lge_idx = qmt_map_lge_idx(lgd, idx);
+               if (lgd->lqeg_arr[lge_idx].lge_qunit == least_qunit) {
+                       struct lquota_entry *lqe;
+                       int i;
+
+                       for (i = 0; i < qti_lqes_cnt(env); i++) {
+                               lqe = qti_lqes(env)[i];
+                               LQUOTA_DEBUG(lqe,
+                                            "lge_qunit %llu least_qunit %lu idx %d\n",
+                                            lgd->lqeg_arr[lge_idx].lge_qunit,
+                                            least_qunit, idx);
+                               if (lqe->lqe_qunit == least_qunit) {
+                                       lqe->lqe_revoke_time =
                                                        ktime_get_seconds();
-                               notify |= qmt_adjust_edquot(qti_lqes(env)[i],
+                                       notify |= qmt_adjust_edquot(lqe,
                                                  ktime_get_real_seconds());
+                               }
                        }
                }
-               qti_lqes_write_unlock(env);
        }
+       mutex_unlock(&lqe_gl->lqe_glbl_data_lock);
+       qti_lqes_write_unlock(env);
+
        return notify;
 }
 
@@ -403,12 +426,16 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
                GOTO(out_exp, rc);
 
        if (need_revoke && qmt_set_revoke(env, lqe, stype, idx)) {
+               int notify = false;
+
                mutex_lock(&lqe->lqe_glbl_data_lock);
                if (lqe->lqe_glbl_data) {
                        qmt_seed_glbe_edquot(env, lqe->lqe_glbl_data);
-                       qmt_id_lock_notify(qmt, lqe);
+                       notify = true;
                }
                mutex_unlock(&lqe->lqe_glbl_data_lock);
+               if (notify)
+                       qmt_id_lock_notify(qmt, lqe);
        }
 
        if (lvb->lvb_id_rel) {
@@ -652,12 +679,14 @@ static void qmt_setup_id_desc(struct ldlm_lock *lock, union ldlm_gl_desc *desc,
                qunit = lqe->lqe_qunit;
        } else {
                struct lqe_glbl_data *lgd;
+               int lge_idx;
 
                mutex_lock(&lqe->lqe_glbl_data_lock);
                lgd = lqe->lqe_glbl_data;
                if (lgd) {
-                       edquot = lgd->lqeg_arr[idx].lge_edquot;
-                       qunit = lgd->lqeg_arr[idx].lge_qunit;
+                       lge_idx = qmt_map_lge_idx(lgd, idx);
+                       edquot = lgd->lqeg_arr[lge_idx].lge_edquot;
+                       qunit = lgd->lqeg_arr[lge_idx].lge_qunit;
                } else {
                        edquot = lqe->lqe_edquot;
                        qunit = lqe->lqe_qunit;
@@ -858,20 +887,32 @@ void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
  * broadcasting the new qunit value */
 static int qmt_id_lock_cb(struct ldlm_lock *lock, struct lquota_entry *lqe)
 {
-       struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid;
+       struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid;
        struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
        int idx;
        int stype = qmt_uuid2idx(uuid, &idx);
 
        LASSERT(stype == QMT_STYPE_OST || stype == QMT_STYPE_MDT);
 
+       CDEBUG(D_QUOTA, "stype %d rtype %d idx %d uuid %s\n",
+              stype, lqe_rtype(lqe), idx, uuid->uuid);
        /* Quota pools support only OSTs, despite MDTs also could be registered
         * as LQUOTA_RES_DT devices(DOM). */
        if (qmt_dom(lqe_rtype(lqe), stype))
                return 1;
-       else
-               return lgd ? lgd->lqeg_arr[idx].lge_edquot_nu ||
-                      lgd->lqeg_arr[idx].lge_qunit_nu : 0;
+
+       if (lgd) {
+               int lge_idx = qmt_map_lge_idx(lgd, idx);
+
+               CDEBUG(D_QUOTA,
+                      "tgt idx:%d lge_idx:%d edquot_nu:%d qunit_nu:%d\n",
+                      idx, lge_idx, lgd->lqeg_arr[lge_idx].lge_edquot_nu,
+                      lgd->lqeg_arr[lge_idx].lge_qunit_nu);
+               return lgd->lqeg_arr[lge_idx].lge_edquot_nu ||
+                      lgd->lqeg_arr[lge_idx].lge_qunit_nu;
+       }
+
+       return 0;
 }
 
 
index 92bcf60..9c89695 100644 (file)
 #include "qmt_internal.h"
 
 static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi);
-static inline int qmt_sarr_pool_add(struct qmt_pool_info *qpi,
-                                   int idx, int min);
+#define qmt_sarr_pool_add(qpi, idx, stype) \
+               _qmt_sarr_pool_add(qpi, idx, stype, false)
+#define qmt_sarr_pool_add_locked(qpi, idx, stype) \
+               _qmt_sarr_pool_add(qpi, idx, stype, true)
+static inline int _qmt_sarr_pool_add(struct qmt_pool_info *qpi,
+                                   int idx, int min, bool locked);
 static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx);
 static inline void qmt_sarr_pool_free(struct qmt_pool_info *qpi);
 static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx);
@@ -510,25 +514,34 @@ int qmt_pool_init(const struct lu_env *env, struct qmt_device *qmt)
        RETURN(rc);
 }
 
-static int qmt_slv_cnt(const struct lu_env *env, struct lu_fid *glb_fid,
+static int qmt_slv_add(const struct lu_env *env, struct lu_fid *glb_fid,
                       char *slv_name, struct lu_fid *slv_fid, void *arg)
 {
        struct obd_uuid uuid;
-       int (*nr)[QMT_STYPE_CNT][LL_MAXQUOTAS] = arg;
-       int stype, qtype;
+       struct qmt_pool_info *qpi = arg;
+       int stype, qtype, idx;
        int rc;
 
        rc = lquota_extract_fid(glb_fid, NULL, &qtype);
        LASSERT(!rc);
 
        obd_str2uuid(&uuid, slv_name);
-       stype = qmt_uuid2idx(&uuid, NULL);
+       stype = qmt_uuid2idx(&uuid, &idx);
        if (stype < 0)
                return stype;
+
+       CDEBUG(D_QUOTA, "add new idx:%d in %s\n", idx, qpi->qpi_name);
+       rc = qmt_sarr_pool_add(qpi, idx, stype);
+       if (rc && rc != -EEXIST) {
+               CERROR("%s: can't add idx %d into dt-0x0: rc = %d\n",
+                      qpi->qpi_qmt->qmt_svname, idx, rc);
+               return rc;
+       }
+
        /* one more slave */
-       (*nr)[stype][qtype]++;
+       qpi->qpi_slv_nr[stype][qtype]++;
        CDEBUG(D_QUOTA, "slv_name %s stype %d qtype %d nr %d\n",
-                       slv_name, stype, qtype, (*nr)[stype][qtype]);
+                       slv_name, stype, qtype, qpi->qpi_slv_nr[stype][qtype]);
 
        return 0;
 }
@@ -648,8 +661,8 @@ int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
 
                        rc = lquota_disk_for_each_slv(env, pool->qpi_root,
                                                      &qti->qti_fid,
-                                                     qmt_slv_cnt,
-                                                     &pool->qpi_slv_nr);
+                                                     qmt_slv_add,
+                                                     pool);
                        if (rc) {
                                CERROR("%s: failed to scan & count slave indexes for %s type: rc = %d\n",
                                       qmt->qmt_svname, qtype_name(qtype), rc);
@@ -687,6 +700,74 @@ int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
        RETURN(0);
 }
 
+static int qmt_lgd_extend_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
+                             struct hlist_node *hnode, void *data)
+{
+       struct lqe_glbl_entry *lqeg_arr, *old_lqeg_arr;
+       struct lquota_entry *lqe;
+       int old_num, rc;
+
+       lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash);
+       LASSERT(atomic_read(&lqe->lqe_ref) > 0);
+       rc = 0;
+
+       CDEBUG(D_QUOTA, "lgd %px\n", lqe->lqe_glbl_data);
+       if (lqe->lqe_glbl_data) {
+               struct lqe_glbl_data *lgd;
+
+               old_lqeg_arr = NULL;
+               mutex_lock(&lqe->lqe_glbl_data_lock);
+               if (lqe->lqe_glbl_data) {
+                       struct qmt_pool_info *qpi = lqe2qpi(lqe);
+                       int sarr_cnt = qmt_sarr_count(qpi);
+
+                       lgd = lqe->lqe_glbl_data;
+                       if (lgd->lqeg_num_alloc < sarr_cnt) {
+                               LASSERT((lgd->lqeg_num_alloc + 1) == sarr_cnt);
+
+                               OBD_ALLOC(lqeg_arr,
+                                         sizeof(struct lqe_glbl_entry) *
+                                         (lgd->lqeg_num_alloc + 16));
+                               if (lqeg_arr) {
+                                       memcpy(lqeg_arr, lgd->lqeg_arr,
+                                              sizeof(struct lqe_glbl_entry) *
+                                              (lgd->lqeg_num_alloc));
+                                       old_lqeg_arr = lgd->lqeg_arr;
+                                       old_num = lgd->lqeg_num_alloc;
+                                       lgd->lqeg_arr = lqeg_arr;
+                                       lgd->lqeg_num_alloc += 16;
+                                       CDEBUG(D_QUOTA,
+                                              "extend lqeg_arr:%px from %d to %d\n",
+                                              lgd, old_num,
+                                              lgd->lqeg_num_alloc);
+                               } else {
+                                       CERROR("%s: cannot allocate new lqeg_arr: rc = %d\n",
+                                              qpi->qpi_qmt->qmt_svname,
+                                              -ENOMEM);
+                                       GOTO(out, rc = -ENOMEM);
+                               }
+                       }
+                       lgd->lqeg_arr[lgd->lqeg_num_used].lge_idx =
+                               qmt_sarr_get_idx(qpi, sarr_cnt - 1);
+                       lgd->lqeg_arr[lgd->lqeg_num_used].lge_edquot =
+                               lqe->lqe_edquot;
+                       lgd->lqeg_arr[lgd->lqeg_num_used].lge_qunit =
+                               lqe->lqe_qunit;
+                       lgd->lqeg_arr[lgd->lqeg_num_used].lge_edquot_nu = 0;
+                       lgd->lqeg_arr[lgd->lqeg_num_used].lge_qunit_nu = 0;
+                       LQUOTA_DEBUG(lqe, "add tgt idx:%d used %d alloc %d\n",
+                                    lgd->lqeg_arr[lgd->lqeg_num_used].lge_idx,
+                                    lgd->lqeg_num_used, lgd->lqeg_num_alloc);
+                       lgd->lqeg_num_used++;
+               }
+out:
+               mutex_unlock(&lqe->lqe_glbl_data_lock);
+               OBD_FREE(old_lqeg_arr, old_num * sizeof(struct lqe_glbl_entry));
+       }
+
+       return rc;
+}
+
 /*
  * Handle new slave connection. Called when a slave enqueues the global quota
  * lock at the beginning of the reintegration procedure.
@@ -714,15 +795,14 @@ int qmt_pool_new_conn(const struct lu_env *env, struct qmt_device *qmt,
        stype = qmt_uuid2idx(uuid, &idx);
        if (stype < 0)
                RETURN(stype);
+       CDEBUG(D_QUOTA, "FID "DFID"\n", PFID(glb_fid));
 
        /* extract pool info from global index FID */
        rc = lquota_extract_fid(glb_fid, &pool_type, &qtype);
        if (rc)
                RETURN(rc);
 
-       /* look-up pool in charge of this global index FID */
-       qti_pools_init(env);
-       pool = qmt_pool_lookup_arr(env, qmt, pool_type, idx);
+       pool = qmt_pool_lookup_glb(env, qmt, pool_type);
        if (IS_ERR(pool))
                RETURN(PTR_ERR(pool));
 
@@ -747,11 +827,51 @@ int qmt_pool_new_conn(const struct lu_env *env, struct qmt_device *qmt,
        memcpy(slv_fid, lu_object_fid(&slv_obj->do_lu), sizeof(*slv_fid));
        *slv_ver = dt_version_get(env, slv_obj);
        dt_object_put(env, slv_obj);
-       if (created)
+       if (created) {
+               struct qmt_pool_info *ptr;
+
+               CDEBUG(D_QUOTA, "add tgt idx:%d pool_type:%d qtype:%d stype:%d\n",
+                      idx, pool_type, qtype, stype);
+
+               if (!qmt_dom(qtype, stype)) {
+                       qmt_sarr_write_down(pool);
+                       rc = qmt_sarr_pool_add_locked(pool, idx, stype);
+                       if (!rc) {
+                               for (i = 0; i < LL_MAXQUOTAS; i++)
+                                       cfs_hash_for_each(pool->qpi_site[i]->
+                                                         lqs_hash,
+                                                         qmt_lgd_extend_cb,
+                                                         &env);
+                       } else if (rc == -EEXIST) {
+                               /* This target has been already added
+                                * by another qtype
+                                */
+                               rc = 0;
+                       }
+                       qmt_sarr_write_up(pool);
+
+                       if (rc) {
+                               CERROR("%s: cannot add idx:%d to pool %s: rc = %d\n",
+                                      qmt->qmt_svname, idx,
+                                      pool->qpi_name, rc);
+                               GOTO(out, rc);
+                       }
+               }
+
+               /* look-up pool in charge of this global index FID */
+               qti_pools_init(env);
+               ptr = qmt_pool_lookup_arr(env, qmt, pool_type, idx, stype);
+               if (IS_ERR(ptr))
+                       GOTO(out, rc = PTR_ERR(ptr));
+
                for (i = 0; i < qti_pools_cnt(env); i++)
                        qti_pools_env(env)[i]->qpi_slv_nr[stype][qtype]++;
+
+               qti_pools_fini(env);
+       }
+
 out:
-       qti_pools_fini(env);
+       qpi_putref(env, pool);
        RETURN(rc);
 }
 
@@ -810,15 +930,10 @@ int qmt_pool_lqes_lookup(const struct lu_env *env,
        int rc, i;
        ENTRY;
 
-       /* Until MDT pools are not emplemented, all MDTs belong to
-        * global pool, thus lookup lqes only from global pool. */
-       if (qmt_dom(rtype, stype))
-               idx = -1;
-
        qti_pools_init(env);
        rc = 0;
        /* look-up pool responsible for this global index FID */
-       pool = qmt_pool_lookup_arr(env, qmt, rtype, idx);
+       pool = qmt_pool_lookup_arr(env, qmt, rtype, idx, stype);
        if (IS_ERR(pool)) {
                qti_pools_fini(env);
                RETURN(PTR_ERR(pool));
@@ -1157,10 +1272,10 @@ static struct obd_device *qmt_get_mgc(struct qmt_device *qmt)
 static int qmt_pool_recalc(void *args)
 {
        struct qmt_pool_info *pool, *glbl_pool;
-       struct rw_semaphore *sem = NULL;
        struct obd_device *obd;
        struct lu_env env;
        int i, rc, qtype, slaves_cnt;
+       bool sem = false;
        ENTRY;
 
        pool = args;
@@ -1175,16 +1290,16 @@ static int qmt_pool_recalc(void *args)
        obd = qmt_get_mgc(pool->qpi_qmt);
        if (IS_ERR(obd))
                GOTO(out, rc = PTR_ERR(obd));
-       else
-               /* Waiting for the end of processing mgs config.
-                * It is needed to be sure all pools are configured. */
-               while (obd->obd_process_conf)
-                       schedule_timeout_uninterruptible(cfs_time_seconds(1));
+
+       /* Waiting for the end of processing mgs config.
+        * It is needed to be sure all pools are configured.
+        */
+       while (obd->obd_process_conf)
+               schedule_timeout_uninterruptible(cfs_time_seconds(1));
 
        CFS_FAIL_TIMEOUT(OBD_FAIL_QUOTA_RECALC, cfs_fail_val);
-       sem = qmt_sarr_rwsem(pool);
-       LASSERT(sem);
-       down_read(sem);
+       qmt_sarr_read_down(pool);
+       sem = true;
        /* Hold this to be sure that OSTs from this pool
         * can't do acquire/release.
         *
@@ -1264,7 +1379,7 @@ out:
         * Thus until up_read, no one can restart recalc thread.
         */
        if (sem) {
-               up_read(sem);
+               qmt_sarr_read_up(pool);
                up_write(&pool->qpi_recalc_sem);
        }
 
@@ -1394,7 +1509,7 @@ static int qmt_pool_add_rem(struct obd_device *obd, char *poolname,
                GOTO(out, rc = PTR_ERR(qpi));
        }
 
-       rc = add ? qmt_sarr_pool_add(qpi, idx, 32) :
+       rc = add ? qmt_sarr_pool_add(qpi, idx, QMT_STYPE_OST) :
                   qmt_sarr_pool_rem(qpi, idx);
        if (rc) {
                /* message is checked in sanity-quota test_1b */
@@ -1515,57 +1630,31 @@ int qmt_pool_del(struct obd_device *obd, char *poolname)
 
 static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi)
 {
-
-       /* No need to initialize sarray for global pool
-        * as it always includes all slaves */
-       if (qmt_pool_global(qpi))
-               return 0;
-
-       switch (qpi->qpi_rtype) {
-       case LQUOTA_RES_DT:
-               return lu_tgt_pool_init(&qpi->qpi_sarr.osts, 0);
-       case LQUOTA_RES_MD:
-       default:
-               return 0;
-       }
+       return lu_tgt_pool_init(&qpi->qpi_sarr.osts, 0);
 }
 
-static inline int qmt_sarr_pool_add(struct qmt_pool_info *qpi, int idx, int min)
+static inline int
+_qmt_sarr_pool_add(struct qmt_pool_info *qpi, int idx, int stype, bool locked)
 {
-       switch (qpi->qpi_rtype) {
-       case LQUOTA_RES_DT:
-               return lu_tgt_pool_add(&qpi->qpi_sarr.osts, idx, min);
-       case LQUOTA_RES_MD:
-       default:
+       /* We don't have an array for DOM */
+       if (qmt_dom(qpi->qpi_rtype, stype))
                return 0;
-       }
+
+       if (locked)
+               return lu_tgt_pool_add_locked(&qpi->qpi_sarr.osts, idx, 32);
+       else
+               return lu_tgt_pool_add(&qpi->qpi_sarr.osts, idx, 32);
 }
 
 static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx)
 {
-       switch (qpi->qpi_rtype) {
-       case LQUOTA_RES_DT:
-               return lu_tgt_pool_remove(&qpi->qpi_sarr.osts, idx);
-       case LQUOTA_RES_MD:
-       default:
-               return 0;
-       }
+       return lu_tgt_pool_remove(&qpi->qpi_sarr.osts, idx);
 }
 
 static inline void qmt_sarr_pool_free(struct qmt_pool_info *qpi)
 {
-       if (qmt_pool_global(qpi))
-               return;
-
-       switch (qpi->qpi_rtype) {
-       case LQUOTA_RES_DT:
-               if (qpi->qpi_sarr.osts.op_array)
-                       lu_tgt_pool_free(&qpi->qpi_sarr.osts);
-               return;
-       case LQUOTA_RES_MD:
-       default:
-               return;
-       }
+       if (qpi->qpi_sarr.osts.op_array)
+               lu_tgt_pool_free(&qpi->qpi_sarr.osts);
 }
 
 static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx)
@@ -1573,53 +1662,19 @@ static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx)
        if (qmt_pool_global(qpi))
                return 0;
 
-       switch (qpi->qpi_rtype) {
-       case LQUOTA_RES_DT:
-               return lu_tgt_check_index(idx, &qpi->qpi_sarr.osts);
-       case LQUOTA_RES_MD:
-       default:
-               return 0;
-       }
-}
-
-struct rw_semaphore *qmt_sarr_rwsem(struct qmt_pool_info *qpi)
-{
-       switch (qpi->qpi_rtype) {
-       case LQUOTA_RES_DT:
-               /* to protect ost_pool use */
-               return &qpi->qpi_sarr.osts.op_rw_sem;
-       case LQUOTA_RES_MD:
-       default:
-               return NULL;
-       }
+       return lu_tgt_check_index(idx, &qpi->qpi_sarr.osts);
 }
 
 int qmt_sarr_get_idx(struct qmt_pool_info *qpi, int arr_idx)
 {
-
-       if (qmt_pool_global(qpi))
-               return arr_idx;
-
-       switch (qpi->qpi_rtype) {
-       case LQUOTA_RES_DT:
-               LASSERTF(arr_idx < qpi->qpi_sarr.osts.op_count && arr_idx >= 0,
-                        "idx invalid %d op_count %d\n", arr_idx,
-                        qpi->qpi_sarr.osts.op_count);
-               return qpi->qpi_sarr.osts.op_array[arr_idx];
-       case LQUOTA_RES_MD:
-       default:
-               return -EINVAL;
-       }
+       LASSERTF(arr_idx < qpi->qpi_sarr.osts.op_count && arr_idx >= 0,
+                "idx invalid %d op_count %d\n", arr_idx,
+                qpi->qpi_sarr.osts.op_count);
+       return qpi->qpi_sarr.osts.op_array[arr_idx];
 }
 
 /* Number of slaves in a pool */
 unsigned int qmt_sarr_count(struct qmt_pool_info *qpi)
 {
-       switch (qpi->qpi_rtype) {
-       case LQUOTA_RES_DT:
-               return qpi->qpi_sarr.osts.op_count;
-       case LQUOTA_RES_MD:
-       default:
-               return -EINVAL;
-       }
+       return qpi->qpi_sarr.osts.op_count;
 }
index b90be4b..1bb9346 100644 (file)
@@ -3091,6 +3091,84 @@ test_33b() {     # was test_34
 }
 run_test 33b "Drop cancel during umount"
 
+test_33c() {
+       (( MDS1_VERSION >= $(version_code 2.15.57) )) ||
+               skip "Need MDS version at least 2.15.57"
+       local MDSDEV=$(mdsdevname ${SINGLEMDS//mds/})
+       local tstid=${TSTID:-"$(id -u $TSTUSR)"}
+       local mkfsoptions
+       local qpool="qpool1"
+
+       [ -n "$ost1_HOST" ] && fs2ost_HOST=$ost1_HOST
+
+       local fs2mdsdev=$(mdsdevname 1_2)
+       local fs2ostdev=$(ostdevname 1_2)
+       local fs2mdsvdev=$(mdsvdevname 1_2)
+       local fs2ostvdev=$(ostvdevname 1_2)
+
+       if [ "$mds1_FSTYPE" == ldiskfs ]; then
+               mkfsoptions="--mkfsoptions=\\\"-J size=8\\\"" # See bug 17931.
+       fi
+
+       if combined_mgs_mds; then
+               local mgs_flag="--mgs"
+       fi
+
+       load_modules
+       stack_trap unload_modules_conf
+
+       add fs2mds $(mkfs_opts mds1 ${fs2mdsdev}) --fsname=${FSNAME} \
+               --reformat $mgs_flag $mkfsoptions $fs2mdsdev $fs2mdsvdev ||
+               exit 10
+       add fs2ost $(mkfs_opts ost1 ${fs2ostdev}) --mgsnode=$MGSNID \
+               --fsname=${FSNAME} --index=0x7c6 --reformat $fs2ostdev \
+               $fs2ostvdev || exit 10
+
+
+       start fs2mds $fs2mdsdev $MDS_MOUNT_OPTS
+       stack_trap "stop fs2mds -f"
+       start fs2ost $fs2ostdev $OST_MOUNT_OPTS
+       stack_trap "stop fs2ost -f"
+
+       mount_client $MOUNT || error "client start failed"
+       stack_trap "umount_client $MOUNT"
+       mkdir_on_mdt0 $DIR/$tdir || error "cannot create $DIR/$tdir"
+       chmod 0777 $DIR/$tdir || error "chown failed"
+       if [[ $PERM_CMD == *"set_param -P"* ]]; then
+               do_facet mgs $PERM_CMD \
+                       osd-*.$FSNAME-OST*.quota_slave.enable=$QUOTA_TYPE
+       else
+               do_facet mgs $PERM_CMD $FSNAME.quota.ost=$QUOTA_TYPE ||
+                       error "set ost quota type failed"
+       fi
+
+       local old_MDSCOUNT=$MDSCOUNT
+       MDSCOUNT=1
+       stack_trap "MDSCOUNT=$old_MDSCOUNT"
+
+       pool_add $qpool || error "pool_add failed"
+       pool_add_targets $qpool 0x7c6
+
+       $LFS setquota -u $tstid -B20M -b 0 $MOUNT
+       $LFS setquota -g $tstid -B20M -b 0 $MOUNT
+       $LFS setquota -u $tstid -B20M -b 0 --pool $qpool $MOUNT
+       $LFS setquota -g $tstid -B20M -b 0 --pool $qpool $MOUNT
+
+       for i in {1..10}; do
+               runas -u $tstid -g $tstid dd if=/dev/zero of=$DIR/$tdir/f1 \
+                       bs=1M count=30 oflag=direct
+               sleep 3
+               rm -f $DIR/$tdir/f1
+       done
+
+       destroy_pools
+       #umount_client $MOUNT || error "client start failed"
+       #stop fs2ost -f
+       #stop fs2mds -f
+       #cleanup_nocli || error "cleanup_nocli failed with $?"
+}
+run_test 33c "Mount ost with a large index number"
+
 test_34a() {
        setup
        do_facet client "bash runmultiop_bg_pause $DIR/file O_c"
index 1e96184..6a3daeb 100755 (executable)
@@ -9637,6 +9637,7 @@ pool_add_targets() {
 
        if [ -z $last ]; then
                local list=$first
+               last=$first
        else
                local list=$(seq $first $step $last)
        fi