Whamcloud - gitweb
LU-18875 quota: change lqe only under lock 12/58612/2
authorSergey Cheremencev <scherementsev@ddn.com>
Fri, 21 Mar 2025 06:32:31 +0000 (09:32 +0300)
committerOleg Drokin <green@whamcloud.com>
Fri, 25 Apr 2025 00:56:39 +0000 (00:56 +0000)
All fields of an lqe, especially flags, must be changed only under a
lock protection. The flags like lqe_gl, lqe_is_reset, lqe_edquot are
the part of an unsigned long(8 bytes) memory region in lquota_entry.
It is impossible to change only 1 bit in a memory, so the general way
is to "mov" 1 byte to some register(AL), change it(AND, OR) in this
register and "mov" back to the memory. It is a window for a race and
might cause a corruption of the neighbour flags. So this patch adds a
locking for remain places where the flags or any lqe members are
changed without locking. One of known issues this patch is aimed to
sort out is the following assertion:

 (qmt_lock.c:973:qmt_id_lock_glimpse()) ASSERTION( lqe->lqe_gl )

Restore one old assertion in qmt_id_lock_glimpse that was removed by
mistake in 16ee9ecdb0 - qmt_reba_thread() is the only thread that can
change lqe_gl flag. Also check that lqe_gl is  not set when we are
freeing an lqe.

Fixes: 16ee9ecdb0 ("LU-16913 quota: fix ASSERTION(lqe->lqe_gl)")
Fixes: 78be823f33 ("LU-15218 quota: delete unused quota ID")
Signed-off-by: Sergey Cheremencev <scherementsev@ddn.com>
Change-Id: I954ed8aa23be01846e479a64c376c4b37af05268
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/58612
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
Reviewed-by: Hongchao Zhang <hongchao@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
lustre/quota/lquota_internal.h
lustre/quota/lquota_lib.c
lustre/quota/qmt_handler.c
lustre/quota/qmt_lock.c
lustre/quota/qsd_entry.c
lustre/quota/qsd_writeback.c

index 7e2578b..cddd430 100644 (file)
@@ -453,6 +453,7 @@ struct lquota_entry *lqe_locate_find(const struct lu_env *,
 
 static inline void lqe_set_deleted(struct lquota_entry *lqe)
 {
+       lqe_write_lock(lqe);
        lqe->lqe_enforced = 0;
        lqe->lqe_edquot = 0;
        lqe->lqe_is_default = 0;
@@ -461,6 +462,7 @@ static inline void lqe_set_deleted(struct lquota_entry *lqe)
        lqe->lqe_gracetime = 0;
 
        lqe->lqe_is_deleted = 1;
+       lqe_write_unlock(lqe);
 }
 
 /* lquota_disk.c */
index b3a3879..7dfe5eb 100644 (file)
@@ -41,6 +41,7 @@ void lqe_ref_free(struct kref *kref)
        struct lquota_entry *lqe = container_of(kref, struct lquota_entry,
                                                lqe_ref);
 
+       LASSERT(!lqe->lqe_gl);
        OBD_SLAB_FREE_PTR(lqe, lqe_kmem);
 }
 
index 18f59b3..141bbb6 100644 (file)
@@ -326,13 +326,12 @@ static int qmt_set(const struct lu_env *env, struct qmt_device *qmt,
        if (IS_ERR(lqe))
                        RETURN(PTR_ERR(lqe));
 
+       lqe_write_lock(lqe);
        lqe->lqe_is_deleted = 0;
        lqe->lqe_is_reset = 0;
+       lqe_write_unlock(lqe);
        rc = qmt_set_with_lqe(env, qmt, lqe, hard, soft, time, valid,
                              is_default, is_updated);
-       if (rc == 0)
-               lqe->lqe_is_deleted = 0;
-
        lqe_putref(lqe);
        RETURN(rc);
 }
@@ -377,11 +376,8 @@ static int qmt_delete_qid(const struct lu_env *env, struct qmt_device *qmt,
        if (CFS_FAIL_CHECK(OBD_FAIL_QUOTA_NOSYNC))
                th->th_sync = 0;
 
-       lqe_write_lock(lqe);
        rc = lquota_disk_delete(env, th,
                                qpi->qpi_glb_obj[qtype], qid, &ver);
-
-       lqe_write_unlock(lqe);
        dt_trans_stop(env, qmt->qmt_child, th);
 
        if (rc == 0) {
index f449b8e..297831a 100644 (file)
@@ -401,9 +401,12 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
        need_revoke = qmt_clear_lgeg_arr_nu(lqe, stype, idx);
        if (lvb->lvb_id_rel == 0) {
                /* nothing to release */
-               if (lvb->lvb_id_may_rel != 0)
+               if (lvb->lvb_id_may_rel != 0) {
                        /* but might still release later ... */
+                       lqe_write_lock(lqe);
                        lqe->lqe_may_rel += lvb->lvb_id_may_rel;
+                       lqe_write_unlock(lqe);
+               }
        }
 
        if (!need_revoke && lvb->lvb_id_rel == 0)
@@ -942,12 +945,6 @@ static void qmt_id_lock_glimpse(const struct lu_env *env,
        }
 
        lqe_write_lock(lqe);
-       /*
-        * It is possible to add an lqe in a 2nd time while the same lqe
-        * from the 1st time is still sending glimpse
-        */
-       if (lqe->lqe_gl)
-               GOTO(out, 0);
        /* The purpose of glimpse callback on per-ID lock is twofold:
         * - notify slaves of new qunit value and hope they will release some
         *   spare quota space in return
@@ -966,6 +963,8 @@ static void qmt_id_lock_glimpse(const struct lu_env *env,
                 * replies if needed */
                lqe->lqe_may_rel = 0;
 
+       /* The rebalance thread is the only thread which can issue glimpses */
+       LASSERT(!lqe->lqe_gl);
        lqe->lqe_gl = true;
        lqe_write_unlock(lqe);
 
@@ -982,7 +981,6 @@ static void qmt_id_lock_glimpse(const struct lu_env *env,
        }
        LASSERT(lqe->lqe_gl);
        lqe->lqe_gl = false;
-out:
        lqe_write_unlock(lqe);
        ldlm_resource_putref(res);
        EXIT;
index d5c0363..4f24c13 100644 (file)
@@ -223,10 +223,12 @@ int qsd_refresh_usage(const struct lu_env *env, struct lquota_entry *lqe)
                rc = 0;
                break;
        case 0:
+               lqe_write_lock(lqe);
                if (qqi->qqi_qsd->qsd_is_md)
                        lqe->lqe_usage = rec->ispace;
                else
                        lqe->lqe_usage = toqb(rec->bspace);
+               lqe_write_unlock(lqe);
                break;
        default:
                LQUOTA_ERROR(lqe, "failed to read disk usage, rc:%d", rc);
index 71b09ee..50676f8 100644 (file)
@@ -323,8 +323,10 @@ out_del:
                        GOTO(out, rc = PTR_ERR(lqe));
        }
 
+       lqe_write_lock(lqe);
        lqe->lqe_is_reset = false;
        lqe->lqe_is_deleted = 0;
+       lqe_write_unlock(lqe);
 
        /* The in-memory lqe update for slave index copy isn't deferred,
         * we shouldn't touch it here. */
@@ -340,12 +342,17 @@ out_del:
                spin_unlock(&qsd->qsd_adjust_lock);
 
                if (LQUOTA_FLAG(upd->qur_rec.lqr_glb_rec.qbr_time) &
-                                                       LQUOTA_FLAG_REVOKE)
+                                                       LQUOTA_FLAG_REVOKE) {
+                       lqe_write_lock(lqe);
                        lqe->lqe_revoke = 1;
+                       lqe_write_unlock(lqe);
+               }
 
                /* Report usage asynchronously */
                rc = qsd_adjust(env, lqe);
+               lqe_write_lock(lqe);
                lqe->lqe_revoke = 0;
+               lqe_write_unlock(lqe);
                if (rc)
                        LQUOTA_ERROR(lqe, "failed to report usage, rc:%d", rc);
        }
@@ -358,12 +365,14 @@ out:
            upd->qur_rec.lqr_glb_rec.qbr_hardlimit == 0 &&
            (LQUOTA_FLAG(upd->qur_rec.lqr_glb_rec.qbr_time) &
                                                        LQUOTA_FLAG_DEFAULT)) {
+               lqe_write_lock(lqe);
                lqe->lqe_is_default = true;
                if (qqi->qqi_default_softlimit == 0 &&
                    qqi->qqi_default_hardlimit == 0)
                        lqe->lqe_enforced = false;
                else
                        lqe->lqe_enforced = true;
+               lqe_write_unlock(lqe);
 
                LQUOTA_DEBUG(lqe, "update to use default quota");
        }
@@ -372,11 +381,13 @@ out:
                                                        LQUOTA_FLAG_RESET)) {
                struct lquota_slv_rec srec;
 
+               lqe_write_lock(lqe);
                lqe->lqe_granted = 0;
                lqe->lqe_softlimit = 0;
                lqe->lqe_hardlimit = 0;
                lqe->lqe_is_default = false;
                lqe->lqe_is_reset = true;
+               lqe_write_unlock(lqe);
 
                memset(&srec, 0, sizeof(srec));
                rc = qsd_update_index(env, qqi, &upd->qur_qid, false, 0, &srec);