Whamcloud - gitweb
LU-17504 build: fix gcc-13 [-Werror=stringop-overread] error
[fs/lustre-release.git] / lustre / quota / qmt_lock.c
index c28b0b2..d847529 100644 (file)
@@ -31,6 +31,7 @@
 #define DEBUG_SUBSYSTEM S_LQUOTA
 
 #include <linux/kthread.h>
+#include <linux/workqueue.h>
 
 #include <lustre_dlm.h>
 #include <lustre_swab.h>
@@ -38,6 +39,8 @@
 
 #include "qmt_internal.h"
 
+struct workqueue_struct *qmt_lvbo_free_wq;
+
 /* intent policy function called from mdt_intent_opc() when the intent is of
  * quota type */
 int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
@@ -89,7 +92,7 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
        case IT_QUOTA_DQACQ: {
                struct lquota_entry     *lqe;
                struct ldlm_lock        *lock;
-               int idx;
+               int idx, stype;
 
                if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
                        /* acquire on global lock? something is wrong ... */
@@ -104,8 +107,8 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
                        GOTO(out, rc = -ENOLCK);
                LDLM_LOCK_PUT(lock);
 
-               rc = qmt_uuid2idx(uuid, &idx);
-               if (rc < 0)
+               stype = qmt_uuid2idx(uuid, &idx);
+               if (stype < 0)
                        GOTO(out, rc = -EINVAL);
 
                /* TODO: it seems we don't need to get lqe from
@@ -115,7 +118,7 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
                LASSERT(lqe != NULL);
                lqe_getref(lqe);
 
-               rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), rc,
+               rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), stype,
                                          lqe_qtype(lqe), &reqbody->qb_id,
                                          NULL, idx);
                if (rc) {
@@ -126,7 +129,8 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
                /* acquire quota space */
                rc = qmt_dqacq0(env, qmt, uuid,
                                reqbody->qb_flags, reqbody->qb_count,
-                               reqbody->qb_usage, repbody);
+                               reqbody->qb_usage, repbody,
+                               qmt_dom(lqe_rtype(lqe), stype) ? -1 : idx);
                lqe_putref(lqe);
                qti_lqes_fini(env);
                if (rc)
@@ -270,47 +274,72 @@ out:
  */
 static bool qmt_clear_lgeg_arr_nu(struct lquota_entry *lqe, int stype, int idx)
 {
-       unsigned long least_qunit = lqe2qpi(lqe)->qpi_least_qunit;
-       struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
+       unsigned long least = lqe2qpi(lqe)->qpi_least_qunit;
+       bool revoke = false;
 
        /* There is no array to store lge for the case of DOM.
-        * Ignore it until MDT pools will be ready. */
-       if (!(lqe_rtype(lqe) == LQUOTA_RES_DT && stype == QMT_STYPE_MDT)) {
-               lqe->lqe_glbl_data->lqeg_arr[idx].lge_qunit_nu = 0;
-               lqe->lqe_glbl_data->lqeg_arr[idx].lge_edquot_nu = 0;
-
-               /* We shouldn't call revoke for DOM case, it will be updated
-                * at qmt_id_lock_glimpse. */
-               return (lgd->lqeg_arr[idx].lge_qunit == least_qunit);
+        * Ignore it until MDT pools will be ready.
+        */
+       if (!qmt_dom(lqe_rtype(lqe), stype)) {
+               struct lqe_glbl_data *lgd;
+
+               mutex_lock(&lqe->lqe_glbl_data_lock);
+               lgd = lqe->lqe_glbl_data;
+               if (lgd) {
+                       int lge_idx = qmt_map_lge_idx(lgd, idx);
+
+                       lgd->lqeg_arr[lge_idx].lge_qunit_nu = 0;
+                       lgd->lqeg_arr[lge_idx].lge_edquot_nu = 0;
+                       /* We shouldn't call revoke for DOM case, it will be
+                        * updated at qmt_id_lock_glimpse.
+                        */
+                       revoke = lgd->lqeg_arr[lge_idx].lge_qunit == least;
+               }
+               mutex_unlock(&lqe->lqe_glbl_data_lock);
        }
 
-       return false;
+       return revoke;
 }
 
-static void qmt_set_revoke(struct lu_env *env, struct lquota_entry *lqe,
+static bool qmt_set_revoke(struct lu_env *env, struct lquota_entry *lqe_gl,
                          int stype, int idx)
 {
-       unsigned long least_qunit = lqe2qpi(lqe)->qpi_least_qunit;
-       struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
-
-       if (lgd->lqeg_arr[idx].lge_qunit == least_qunit) {
-               int i;
-
-               qti_lqes_write_lock(env);
-               for (i = 0; i < qti_lqes_cnt(env); i++) {
-                       LQUOTA_DEBUG(qti_lqes(env)[i],
-                                    "idx %d lge_qunit %llu least_qunit %lu\n",
-                                    idx, lgd->lqeg_arr[idx].lge_qunit,
-                                    least_qunit);
-                       if (qti_lqes(env)[i]->lqe_qunit == least_qunit) {
-                               qti_lqes(env)[i]->lqe_revoke_time =
+       unsigned long least_qunit = lqe2qpi(lqe_gl)->qpi_least_qunit;
+       bool notify = false;
+
+       if (qmt_dom(lqe_rtype(lqe_gl), stype))
+               return false;
+
+       qti_lqes_write_lock(env);
+       mutex_lock(&lqe_gl->lqe_glbl_data_lock);
+       if (lqe_gl->lqe_glbl_data) {
+               struct lqe_glbl_data *lgd = lqe_gl->lqe_glbl_data;
+               int lge_idx;
+
+               lge_idx = qmt_map_lge_idx(lgd, idx);
+               if (lgd->lqeg_arr[lge_idx].lge_qunit == least_qunit) {
+                       struct lquota_entry *lqe;
+                       int i;
+
+                       for (i = 0; i < qti_lqes_cnt(env); i++) {
+                               lqe = qti_lqes(env)[i];
+                               LQUOTA_DEBUG(lqe,
+                                            "lge_qunit %llu least_qunit %lu idx %d\n",
+                                            lgd->lqeg_arr[lge_idx].lge_qunit,
+                                            least_qunit, idx);
+                               if (lqe->lqe_qunit == least_qunit) {
+                                       lqe->lqe_revoke_time =
                                                        ktime_get_seconds();
-                               qmt_adjust_edquot(qti_lqes(env)[i],
+                                       notify |= qmt_adjust_edquot(lqe,
                                                  ktime_get_real_seconds());
+                               }
                        }
                }
-               qti_lqes_write_unlock(env);
        }
+       mutex_unlock(&lqe_gl->lqe_glbl_data_lock);
+       qti_lqes_write_unlock(env);
+
+       return notify;
 }
 
 /*
@@ -329,7 +358,7 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
        struct ldlm_lock        *lock;
        struct obd_export       *exp;
        bool                     need_revoke;
-       int                      rc = 0, idx;
+       int                      rc = 0, idx, stype;
        ENTRY;
 
        LASSERT(res != NULL);
@@ -379,11 +408,11 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
                GOTO(out, rc = -EFAULT);
        }
 
-       rc = qmt_uuid2idx(&exp->exp_client_uuid, &idx);
-       if (rc < 0)
-               GOTO(out_exp, rc);
+       stype = qmt_uuid2idx(&exp->exp_client_uuid, &idx);
+       if (stype < 0)
+               GOTO(out_exp, rc = stype);
 
-       need_revoke = qmt_clear_lgeg_arr_nu(lqe, rc, idx);
+       need_revoke = qmt_clear_lgeg_arr_nu(lqe, stype, idx);
        if (lvb->lvb_id_rel == 0) {
                /* nothing to release */
                if (lvb->lvb_id_may_rel != 0)
@@ -394,13 +423,23 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
        if (!need_revoke && lvb->lvb_id_rel == 0)
                GOTO(out_exp, rc = 0);
 
-       rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), rc, lqe_qtype(lqe),
-                                 &lqe->lqe_id, NULL, idx);
+       rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), stype,
+                                 lqe_qtype(lqe), &lqe->lqe_id, NULL, idx);
        if (rc)
                GOTO(out_exp, rc);
 
-       if (need_revoke)
-               qmt_set_revoke(env, lqe, rc, idx);
+       if (need_revoke && qmt_set_revoke(env, lqe, stype, idx)) {
+               int notify = false;
+
+               mutex_lock(&lqe->lqe_glbl_data_lock);
+               if (lqe->lqe_glbl_data) {
+                       qmt_seed_glbe_edquot(env, lqe->lqe_glbl_data);
+                       notify = true;
+               }
+               mutex_unlock(&lqe->lqe_glbl_data_lock);
+               if (notify)
+                       qmt_id_lock_notify(qmt, lqe);
+       }
 
        if (lvb->lvb_id_rel) {
                LQUOTA_DEBUG(lqe, "releasing:%llu may release:%llu",
@@ -409,7 +448,8 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
                /* release quota space */
                rc = qmt_dqacq0(env, qmt, &exp->exp_client_uuid,
                                QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel,
-                               0, &qti->qti_body);
+                               0, &qti->qti_body,
+                               qmt_dom(lqe_rtype(lqe), stype) ? -1 : idx);
                if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
                        LQUOTA_ERROR(lqe,
                                     "failed to release quota space on glimpse %llu!=%llu : rc = %d\n",
@@ -513,12 +553,8 @@ int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res)
 
        if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
                struct lquota_entry *lqe = res->lr_lvb_data;
-               struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
 
-               /* release lqe reference */
-               lqe->lqe_glbl_data = NULL;
-               lqe_putref(lqe);
-               qmt_free_lqe_gd(lgd);
+               queue_work(qmt_lvbo_free_wq, &lqe->lqe_work);
        } else {
                struct dt_object *obj = res->lr_lvb_data;
                /* release object reference */
@@ -563,6 +599,7 @@ static int qmt_alloc_lock_array(struct ldlm_resource *res,
                                struct qmt_gl_lock_array *array,
                                qmt_glimpse_cb_t cb, void *arg)
 {
+       struct lquota_entry *lqe = arg;
        struct list_head *pos;
        unsigned long count = 0;
        int fail_cnt = 0;
@@ -570,6 +607,8 @@ static int qmt_alloc_lock_array(struct ldlm_resource *res,
 
        LASSERT(!array->q_max && !array->q_cnt && !array->q_locks);
 again:
+       if (cb)
+               mutex_lock(&lqe->lqe_glbl_data_lock);
        lock_res(res);
        /* scan list of granted locks */
        list_for_each(pos, &res->lr_granted) {
@@ -593,6 +632,8 @@ again:
                }
        }
        unlock_res(res);
+       if (cb)
+               mutex_unlock(&lqe->lqe_glbl_data_lock);
 
        if (count > array->q_max) {
                qmt_free_lock_array(array);
@@ -616,11 +657,10 @@ again:
        RETURN(0);
 }
 
-void qmt_setup_id_desc(struct ldlm_lock *lock, union ldlm_gl_desc *desc,
-                      struct lquota_entry *lqe)
+static void qmt_setup_id_desc(struct ldlm_lock *lock, union ldlm_gl_desc *desc,
+                             struct lquota_entry *lqe)
 {
        struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid;
-       struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
        int idx, stype;
        __u64 qunit;
        bool edquot;
@@ -629,12 +669,24 @@ void qmt_setup_id_desc(struct ldlm_lock *lock, union ldlm_gl_desc *desc,
        LASSERT(stype >= 0);
 
        /* DOM case - set global lqe settings */
-       if (lqe_rtype(lqe) == LQUOTA_RES_DT && stype == QMT_STYPE_MDT) {
+       if (qmt_dom(lqe_rtype(lqe), stype)) {
                edquot = lqe->lqe_edquot;
                qunit = lqe->lqe_qunit;
        } else {
-               edquot = lgd->lqeg_arr[idx].lge_edquot;
-               qunit = lgd->lqeg_arr[idx].lge_qunit;
+               struct lqe_glbl_data *lgd;
+               int lge_idx;
+
+               mutex_lock(&lqe->lqe_glbl_data_lock);
+               lgd = lqe->lqe_glbl_data;
+               if (lgd) {
+                       lge_idx = qmt_map_lge_idx(lgd, idx);
+                       edquot = lgd->lqeg_arr[lge_idx].lge_edquot;
+                       qunit = lgd->lqeg_arr[lge_idx].lge_qunit;
+               } else {
+                       edquot = lqe->lqe_edquot;
+                       qunit = lqe->lqe_qunit;
+               }
+               mutex_unlock(&lqe->lqe_glbl_data_lock);
        }
 
        /* fill glimpse descriptor with lqe settings */
@@ -663,7 +715,6 @@ static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
                            qmt_glimpse_cb_t cb, struct lquota_entry *lqe)
 {
        union ldlm_gl_desc *descs = NULL;
-       struct lqe_glbl_data *gld;
        struct list_head *tmp, *pos;
        LIST_HEAD(gl_list);
        struct qmt_gl_lock_array locks;
@@ -671,7 +722,6 @@ static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
        int rc = 0;
        ENTRY;
 
-       gld = lqe ? lqe->lqe_glbl_data : NULL;
        memset(&locks, 0, sizeof(locks));
        rc = qmt_alloc_lock_array(res, &locks, cb, lqe);
        if (rc) {
@@ -688,7 +738,7 @@ static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
        locks_count = locks.q_cnt;
 
        /* Use one desc for all works, when called from qmt_glb_lock_notify */
-       if (gld && locks.q_cnt > 1) {
+       if (cb && locks.q_cnt > 1) {
                /* TODO: think about to store this preallocated descs
                 * in lqe_global in lqeg_arr as a part of lqe_glbl_entry.
                 * The benefit is that we don't need to allocate/free
@@ -715,7 +765,7 @@ static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
                        continue;
                }
 
-               if (gld) {
+               if (cb) {
                        if (descs)
                                desc = &descs[i - 1];
                        qmt_setup_id_desc(locks.q_locks[i - 1], desc, lqe);
@@ -788,6 +838,21 @@ void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
                qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
                                                        LQUOTA_FLAG_DEFAULT);
 
+       } else if (lqe->lqe_is_deleted) {
+               qti->qti_gl_desc.lquota_desc.gl_hardlimit = 0;
+               qti->qti_gl_desc.lquota_desc.gl_softlimit = 0;
+               qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
+                                                       LQUOTA_FLAG_DELETED);
+       } else if (lqe->lqe_is_reset) {
+               qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
+               qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
+               qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
+                                                       LQUOTA_FLAG_RESET);
+       } else if (lqe->lqe_granted > lqe->lqe_hardlimit) {
+               qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
+               qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
+               qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
+                                                       LQUOTA_FLAG_REVOKE);
        } else {
                qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
                qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
@@ -797,7 +862,7 @@ void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
 
        /* look up ldlm resource associated with global index */
        fid_build_reg_res_name(&qti->qti_fid, &qti->qti_resid);
-       res = ldlm_resource_get(pool->qpi_qmt->qmt_ns, NULL, &qti->qti_resid,
+       res = ldlm_resource_get(pool->qpi_qmt->qmt_ns, &qti->qti_resid,
                                LDLM_PLAIN, 0);
        if (IS_ERR(res)) {
                /* this might happen if no slaves have enqueued global quota
@@ -817,20 +882,32 @@ void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
  * broadcasting the new qunit value */
 static int qmt_id_lock_cb(struct ldlm_lock *lock, struct lquota_entry *lqe)
 {
-       struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid;
+       struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid;
        struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
        int idx;
        int stype = qmt_uuid2idx(uuid, &idx);
 
        LASSERT(stype == QMT_STYPE_OST || stype == QMT_STYPE_MDT);
 
+       CDEBUG(D_QUOTA, "stype %d rtype %d idx %d uuid %s\n",
+              stype, lqe_rtype(lqe), idx, uuid->uuid);
        /* Quota pools support only OSTs, despite MDTs also could be registered
         * as LQUOTA_RES_DT devices(DOM). */
-       if (lqe_rtype(lqe) == LQUOTA_RES_DT && stype == QMT_STYPE_MDT)
+       if (qmt_dom(lqe_rtype(lqe), stype))
                return 1;
-       else
-               return lgd->lqeg_arr[idx].lge_edquot_nu ||
-                      lgd->lqeg_arr[idx].lge_qunit_nu;
+
+       if (lgd) {
+               int lge_idx = qmt_map_lge_idx(lgd, idx);
+
+               CDEBUG(D_QUOTA,
+                      "tgt idx:%d lge_idx:%d edquot_nu:%d qunit_nu:%d\n",
+                      idx, lge_idx, lgd->lqeg_arr[lge_idx].lge_edquot_nu,
+                      lgd->lqeg_arr[lge_idx].lge_qunit_nu);
+               return lgd->lqeg_arr[lge_idx].lge_edquot_nu ||
+                      lgd->lqeg_arr[lge_idx].lge_qunit_nu;
+       }
+
+       return 0;
 }
 
 
@@ -856,8 +933,7 @@ static void qmt_id_lock_glimpse(const struct lu_env *env,
 
        lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype, lqe_qtype(lqe));
        fid_build_quota_res_name(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid);
-       res = ldlm_resource_get(qmt->qmt_ns, NULL, &qti->qti_resid, LDLM_PLAIN,
-                               0);
+       res = ldlm_resource_get(qmt->qmt_ns, &qti->qti_resid, LDLM_PLAIN, 0);
        if (IS_ERR(res)) {
                /* this might legitimately happens if slaves haven't had the
                 * opportunity to enqueue quota lock yet. */
@@ -872,6 +948,12 @@ static void qmt_id_lock_glimpse(const struct lu_env *env,
        }
 
        lqe_write_lock(lqe);
+       /*
+        * It is possible to add an lqe in a 2nd time while the same lqe
+        * from the 1st time is still sending glimpse
+        */
+       if (lqe->lqe_gl)
+               GOTO(out, 0);
        /* The purpose of glimpse callback on per-ID lock is twofold:
         * - notify slaves of new qunit value and hope they will release some
         *   spare quota space in return
@@ -890,14 +972,13 @@ static void qmt_id_lock_glimpse(const struct lu_env *env,
                 * replies if needed */
                lqe->lqe_may_rel = 0;
 
-       /* The rebalance thread is the only thread which can issue glimpses */
-       LASSERT(!lqe->lqe_gl);
        lqe->lqe_gl = true;
        lqe_write_unlock(lqe);
 
        /* issue glimpse callback to slaves */
-       qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
-                        qmt_id_lock_cb, lqe);
+       if (lqe->lqe_glbl_data)
+               qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
+                                qmt_id_lock_cb, lqe);
 
        lqe_write_lock(lqe);
        if (lqe->lqe_revoke_time == 0 &&
@@ -907,8 +988,8 @@ static void qmt_id_lock_glimpse(const struct lu_env *env,
        }
        LASSERT(lqe->lqe_gl);
        lqe->lqe_gl = false;
+out:
        lqe_write_unlock(lqe);
-
        ldlm_resource_putref(res);
        EXIT;
 }
@@ -980,7 +1061,11 @@ static int qmt_reba_thread(void *_args)
                        list_del_init(&lqe->lqe_link);
                        spin_unlock(&qmt->qmt_reba_lock);
 
-                       if (!kthread_should_stop())
+                       /* lqe_ref == 1 means we hold the last ref,
+                        * so no need to send glimpse callbacks.
+                        */
+                       if (!kthread_should_stop() &&
+                           atomic_read(&lqe->lqe_ref) > 1)
                                qmt_id_lock_glimpse(env, qmt, lqe, NULL);
 
                        lqe_putref(lqe);