Whamcloud - gitweb
LU-15055 lod: run qmt_pool_* only from the MDT0000 config
[fs/lustre-release.git] / lustre / quota / qmt_pool.c
index 1a4ff12..dcfb7c7 100644 (file)
@@ -58,7 +58,7 @@ static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi);
 static inline int qmt_sarr_pool_add(struct qmt_pool_info *qpi,
                                    int idx, int min);
 static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx);
-static inline int qmt_sarr_pool_free(struct qmt_pool_info *qpi);
+static inline void qmt_sarr_pool_free(struct qmt_pool_info *qpi);
 static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx);
 static inline void qmt_stop_pool_recalc(struct qmt_pool_info *qpi);
 
@@ -79,12 +79,14 @@ static int qpi_state_seq_show(struct seq_file *m, void *data)
        int                      type;
 
        LASSERT(pool != NULL);
+       if (unlikely(!test_bit(QPI_FLAG_STATE_INITED, &pool->qpi_flags)))
+               return -ENOENT;
 
        seq_printf(m, "pool:\n"
                   "    id: %u\n"
                   "    type: %s\n"
-                  "    ref: %d\n"
-                  "    least qunit: %lu\n",
+                  "    refcount: %d\n"
+                  "    least_qunit: %lu\n",
                   0,
                   RES_NAME(pool->qpi_rtype),
                   atomic_read(&pool->qpi_ref),
@@ -92,8 +94,8 @@ static int qpi_state_seq_show(struct seq_file *m, void *data)
 
        for (type = 0; type < LL_MAXQUOTAS; type++)
                seq_printf(m, "    %s:\n"
-                          "        #slv: %d\n"
-                          "        #lqe: %d\n",
+                          "        quota_servers: %d\n"
+                          "        quota_entries: %d\n",
                           qtype_name(type),
                           qpi_slv_nr(pool, type),
                    atomic_read(&pool->qpi_site[type]->lqs_hash->hs_count));
@@ -106,6 +108,8 @@ static int qpi_soft_least_qunit_seq_show(struct seq_file *m, void *data)
 {
        struct qmt_pool_info    *pool = m->private;
        LASSERT(pool != NULL);
+       if (unlikely(!test_bit(QPI_FLAG_STATE_INITED, &pool->qpi_flags)))
+               return -ENOENT;
 
        seq_printf(m, "%lu\n", pool->qpi_soft_least_qunit);
        return 0;
@@ -117,10 +121,12 @@ qpi_soft_least_qunit_seq_write(struct file *file, const char __user *buffer,
 {
        struct seq_file *m = file->private_data;
        struct qmt_pool_info *pool = m->private;
-       long long least_qunit;
-       int qunit, rc;
+       long long least_qunit, qunit;
+       int rc;
 
        LASSERT(pool != NULL);
+       if (unlikely(!test_bit(QPI_FLAG_STATE_INITED, &pool->qpi_flags)))
+               return -ENOENT;
 
        /* Not tuneable for inode limit */
        if (pool->qpi_rtype != LQUOTA_RES_DT)
@@ -177,6 +183,7 @@ static int qmt_pool_alloc(const struct lu_env *env, struct qmt_device *qmt,
        init_rwsem(&pool->qpi_recalc_sem);
 
        pool->qpi_rtype = pool_type;
+       pool->qpi_flags = 0;
 
        /* initialize refcount to 1, hash table will then grab an additional
         * reference */
@@ -312,7 +319,7 @@ static inline int qti_pools_add(const struct lu_env *env,
        LASSERTF(qti->qti_pools_num >= QMT_MAX_POOL_NUM,
                 "Forgot init? %p\n", qti);
 
-       if (qti->qti_pools_cnt > qti->qti_pools_num) {
+       if (qti->qti_pools_cnt >= qti->qti_pools_num) {
                OBD_ALLOC(pools, sizeof(qpi) * qti->qti_pools_num * 2);
                if (!pools)
                        return -ENOMEM;
@@ -383,7 +390,7 @@ struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
                                             int idx, bool add)
 {
        struct qmt_pool_info    *pos, *pool;
-       int rc;
+       int rc = 0;
        ENTRY;
 
        down_read(&qmt->qmt_pool_lock);
@@ -392,8 +399,8 @@ struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
                RETURN(ERR_PTR(-ENOENT));
        }
 
-       CDEBUG(D_QUOTA, "type %d name %p index %d\n",
-              rtype, pool_name, idx);
+       CDEBUG(D_QUOTA, "type %d name %s index %d\n",
+              rtype, pool_name ?: "<none>", idx);
        /* Now just find a pool with correct type in a list. Further we need
         * to go through the list and find a pool that includes requested OST
         * or MDT. Possibly this would return a list of pools that includes
@@ -409,7 +416,7 @@ struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
                if (idx >= 0 && !qmt_sarr_check_idx(pos, idx)) {
                        rc = qti_pools_add(env, pos);
                        if (rc)
-                               GOTO(out_err, rc);
+                               break;
                        continue;
                }
 
@@ -419,7 +426,7 @@ struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
                        if (add) {
                                rc = qti_pools_add(env, pos);
                                if (rc)
-                                       GOTO(out_err, rc);
+                                       break;
                        } else {
                                qpi_getref(pool);
                        }
@@ -428,6 +435,9 @@ struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
        }
        up_read(&qmt->qmt_pool_lock);
 
+       if (rc)
+               GOTO(out_err, rc);
+
        if (idx >= 0 && qti_pools_cnt(env))
                pool = qti_pools_env(env)[0];
 
@@ -435,7 +445,7 @@ struct qmt_pool_info *qmt_pool_lookup(const struct lu_env *env,
 out_err:
        CERROR("%s: cannot add pool %s: err = %d\n",
                qmt->qmt_svname, pos->qpi_name, rc);
-       RETURN(ERR_PTR(rc));
+       return ERR_PTR(rc);
 }
 
 /*
@@ -456,6 +466,8 @@ void qmt_pool_fini(const struct lu_env *env, struct qmt_device *qmt)
 
        /* parse list of pool and destroy each element */
        list_for_each_entry_safe(pool, tmp, &qmt->qmt_pool_list, qpi_linkage) {
+               /* stop all recalc threads - it may hold qpi reference */
+               qmt_stop_pool_recalc(pool);
                /* release extra reference taken in qmt_pool_alloc */
                qpi_putref(env, pool);
        }
@@ -667,6 +679,7 @@ int qmt_pool_prepare(const struct lu_env *env, struct qmt_device *qmt,
                                      qmt->qmt_svname, PFID(&qti->qti_fid), rc);
 #endif
                }
+               set_bit(QPI_FLAG_STATE_INITED, &pool->qpi_flags);
                if (name)
                        break;
        }
@@ -821,18 +834,6 @@ int qmt_pool_lqes_lookup(const struct lu_env *env,
                        qti_lqes_fini(env);
                        GOTO(out, rc = PTR_ERR(lqe));
                }
-               /* Only release could be done for not enforced lqe
-                * (see qmt_dqacq0). However slave could request to
-                * release more than not global lqe had granted before
-                * lqe_enforced was cleared. It is legal case,
-                * because even if current lqe is not enforced,
-                * lqes from other pools are still active and avilable
-                * for acquiring. Furthermore, skip not enforced lqe
-                * to don't make extra allocations. */
-               /*if (!lqe_is_glbl(lqe) && !lqe->lqe_enforced) {
-                       lqe_putref(lqe);
-                       continue;
-               }*/
                qti_lqes_add(env, lqe);
        }
        LASSERT(qti_lqes_glbl(env)->lqe_is_global);
@@ -845,9 +846,14 @@ out:
 static int lqes_cmp(const void *arg1, const void *arg2)
 {
        const struct lquota_entry *lqe1, *lqe2;
-       lqe1 = arg1;
-       lqe2 = arg2;
-       return lqe1->lqe_qunit > lqe2->lqe_qunit;
+
+       lqe1 = *(const struct lquota_entry **)arg1;
+       lqe2 = *(const struct lquota_entry **)arg2;
+       if (lqe1->lqe_qunit > lqe2->lqe_qunit)
+               return 1;
+       if (lqe1->lqe_qunit < lqe2->lqe_qunit)
+               return -1;
+       return 0;
 }
 
 void qmt_lqes_sort(const struct lu_env *env)
@@ -1159,6 +1165,13 @@ static int qmt_pool_recalc(void *args)
 
        pool = args;
 
+       rc = lu_env_init(&env, LCT_MD_THREAD);
+       if (rc) {
+               CERROR("%s: cannot init env: rc = %d\n",
+                      pool->qpi_qmt->qmt_svname, rc);
+               GOTO(out, rc);
+       }
+
        obd = qmt_get_mgc(pool->qpi_qmt);
        if (IS_ERR(obd))
                GOTO(out, rc = PTR_ERR(obd));
@@ -1168,6 +1181,7 @@ static int qmt_pool_recalc(void *args)
                while (obd->obd_process_conf)
                        schedule_timeout_uninterruptible(cfs_time_seconds(1));
 
+       OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_RECALC, cfs_fail_val);
        sem = qmt_sarr_rwsem(pool);
        LASSERT(sem);
        down_read(sem);
@@ -1187,15 +1201,9 @@ static int qmt_pool_recalc(void *args)
         * solution looks more complex, so leave it as it is. */
        down_write(&pool->qpi_recalc_sem);
 
-       rc = lu_env_init(&env, LCT_MD_THREAD);
-       if (rc) {
-               CERROR("%s: cannot init env: rc = %d\n", obd->obd_name, rc);
-               GOTO(out, rc);
-       }
-
        glbl_pool = qmt_pool_lookup_glb(&env, pool->qpi_qmt, pool->qpi_rtype);
        if (IS_ERR(glbl_pool))
-               GOTO(out_env, rc = PTR_ERR(glbl_pool));
+               GOTO(out, rc = PTR_ERR(glbl_pool));
 
        slaves_cnt = qmt_sarr_count(pool);
        CDEBUG(D_QUOTA, "Starting pool recalculation for %d slaves in %s\n",
@@ -1241,8 +1249,6 @@ static int qmt_pool_recalc(void *args)
        GOTO(out_stop, rc);
 out_stop:
        qpi_putref(&env, glbl_pool);
-out_env:
-       lu_env_fini(&env);
 out:
        if (xchg(&pool->qpi_recalc_task, NULL) == NULL)
                /*
@@ -1255,12 +1261,20 @@ out:
 
        clear_bit(QPI_FLAG_RECALC_OFFSET, &pool->qpi_flags);
        /* Pool can't be changed, since sem has been down.
-        * Thus until up_read, no one can restart recalc thread. */
+        * Thus until up_read, no one can restart recalc thread.
+        */
        if (sem) {
                up_read(sem);
                up_write(&pool->qpi_recalc_sem);
        }
-       qpi_putref(&env, pool);
+
+       /* qpi_getref has been called in qmt_start_pool_recalc,
+        * however we can't call qpi_putref if lu_env_init failed.
+        */
+       if (env.le_ctx.lc_state == LCS_ENTERED) {
+               qpi_putref(&env, pool);
+               lu_env_fini(&env);
+       }
 
        return rc;
 }
@@ -1353,6 +1367,9 @@ static int qmt_pool_add_rem(struct obd_device *obd, char *poolname,
        int                      rc, idx;
        ENTRY;
 
+       if (qmt->qmt_stopping)
+               RETURN(0);
+
        if (strnlen(poolname, LOV_MAXPOOLNAME + 1) > LOV_MAXPOOLNAME)
                RETURN(-ENAMETOOLONG);
 
@@ -1380,9 +1397,10 @@ static int qmt_pool_add_rem(struct obd_device *obd, char *poolname,
        rc = add ? qmt_sarr_pool_add(qpi, idx, 32) :
                   qmt_sarr_pool_rem(qpi, idx);
        if (rc) {
-               CERROR("%s: can't %s %s pool %s: rc = %d\n",
-                      add ? "add to" : "remove", obd->obd_name,
-                      slavename, poolname, rc);
+               /* message is checked in sanity-quota test_1b */
+               CERROR("%s: can't %s %s pool '%s': rc = %d\n",
+                      obd->obd_name, add ? "add to" : "remove", slavename,
+                      poolname, rc);
                GOTO(out_putref, rc);
        }
        qmt_pool_slv_nr_change(&env, qpi, idx, add);
@@ -1505,7 +1523,7 @@ static inline int qmt_sarr_pool_init(struct qmt_pool_info *qpi)
 
        switch (qpi->qpi_rtype) {
        case LQUOTA_RES_DT:
-               return tgt_pool_init(&qpi->qpi_sarr.osts, 0);
+               return lu_tgt_pool_init(&qpi->qpi_sarr.osts, 0);
        case LQUOTA_RES_MD:
        default:
                return 0;
@@ -1516,7 +1534,7 @@ static inline int qmt_sarr_pool_add(struct qmt_pool_info *qpi, int idx, int min)
 {
        switch (qpi->qpi_rtype) {
        case LQUOTA_RES_DT:
-               return tgt_pool_add(&qpi->qpi_sarr.osts, idx, min);
+               return lu_tgt_pool_add(&qpi->qpi_sarr.osts, idx, min);
        case LQUOTA_RES_MD:
        default:
                return 0;
@@ -1527,26 +1545,26 @@ static inline int qmt_sarr_pool_rem(struct qmt_pool_info *qpi, int idx)
 {
        switch (qpi->qpi_rtype) {
        case LQUOTA_RES_DT:
-               return tgt_pool_remove(&qpi->qpi_sarr.osts, idx);
+               return lu_tgt_pool_remove(&qpi->qpi_sarr.osts, idx);
        case LQUOTA_RES_MD:
        default:
                return 0;
        }
 }
 
-static inline int qmt_sarr_pool_free(struct qmt_pool_info *qpi)
+static inline void qmt_sarr_pool_free(struct qmt_pool_info *qpi)
 {
        if (qmt_pool_global(qpi))
-               return 0;
+               return;
 
        switch (qpi->qpi_rtype) {
        case LQUOTA_RES_DT:
-               if (!qpi->qpi_sarr.osts.op_array)
-                       return 0;
-               return tgt_pool_free(&qpi->qpi_sarr.osts);
+               if (qpi->qpi_sarr.osts.op_array)
+                       lu_tgt_pool_free(&qpi->qpi_sarr.osts);
+               return;
        case LQUOTA_RES_MD:
        default:
-               return 0;
+               return;
        }
 }
 
@@ -1557,14 +1575,14 @@ static inline int qmt_sarr_check_idx(struct qmt_pool_info *qpi, int idx)
 
        switch (qpi->qpi_rtype) {
        case LQUOTA_RES_DT:
-               return tgt_check_index(idx, &qpi->qpi_sarr.osts);
+               return lu_tgt_check_index(idx, &qpi->qpi_sarr.osts);
        case LQUOTA_RES_MD:
        default:
                return 0;
        }
 }
 
-inline struct rw_semaphore *qmt_sarr_rwsem(struct qmt_pool_info *qpi)
+struct rw_semaphore *qmt_sarr_rwsem(struct qmt_pool_info *qpi)
 {
        switch (qpi->qpi_rtype) {
        case LQUOTA_RES_DT:
@@ -1576,7 +1594,7 @@ inline struct rw_semaphore *qmt_sarr_rwsem(struct qmt_pool_info *qpi)
        }
 }
 
-inline int qmt_sarr_get_idx(struct qmt_pool_info *qpi, int arr_idx)
+int qmt_sarr_get_idx(struct qmt_pool_info *qpi, int arr_idx)
 {
 
        if (qmt_pool_global(qpi))
@@ -1595,7 +1613,7 @@ inline int qmt_sarr_get_idx(struct qmt_pool_info *qpi, int arr_idx)
 }
 
 /* Number of slaves in a pool */
-inline unsigned int qmt_sarr_count(struct qmt_pool_info *qpi)
+unsigned int qmt_sarr_count(struct qmt_pool_info *qpi)
 {
        switch (qpi->qpi_rtype) {
        case LQUOTA_RES_DT: