Whamcloud - gitweb
LU-11023 quota: quota pools for OSTs
[fs/lustre-release.git] / lustre / quota / qmt_lock.c
index 5b744ec..69f5eb8 100644 (file)
@@ -89,6 +89,7 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
        case IT_QUOTA_DQACQ: {
                struct lquota_entry     *lqe;
                struct ldlm_lock        *lock;
+               int idx;
 
                if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
                        /* acquire on global lock? something is wrong ... */
@@ -103,15 +104,31 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
                        GOTO(out, rc = -ENOLCK);
                LDLM_LOCK_PUT(lock);
 
+               rc = qmt_uuid2idx(uuid, &idx);
+               if (rc < 0)
+                       GOTO(out, rc = -EINVAL);
+
+               /* TODO: it seems we don't need to get lqe from
+                * lq_lvb_data anymore ... And do extra get
+                * and put on it */
                lqe = res->lr_lvb_data;
                LASSERT(lqe != NULL);
                lqe_getref(lqe);
 
+               rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), rc,
+                                         lqe_qtype(lqe), &reqbody->qb_id,
+                                         NULL, idx);
+               if (rc) {
+                       lqe_putref(lqe);
+                       GOTO(out, rc);
+               }
+
                /* acquire quota space */
-               rc = qmt_dqacq0(env, lqe, qmt, uuid, reqbody->qb_flags,
-                               reqbody->qb_count, reqbody->qb_usage,
-                               repbody);
+               rc = qmt_dqacq0(env, qmt, uuid,
+                               reqbody->qb_flags, reqbody->qb_count,
+                               reqbody->qb_usage, repbody);
                lqe_putref(lqe);
+               qti_lqes_fini(env);
                if (rc)
                        GOTO(out, rc);
                break;
@@ -191,16 +208,36 @@ int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res)
        if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
                /* no ID quota lock associated with UID/GID 0 or with a seq 0,
                 * we are thus dealing with an ID lock. */
+               struct qmt_pool_info    *pool;
                struct lquota_entry     *lqe;
+               struct lqe_glbl_data    *lgd;
+
+               pool = qmt_pool_lookup_glb(env, qmt, pool_type);
+               if (IS_ERR(pool))
+                       GOTO(out, rc = -ENOMEM);
 
                /* Find the quota entry associated with the quota id */
                lqe = qmt_pool_lqe_lookup(env, qmt, pool_type, qtype,
-                                         &qti->qti_id);
-               if (IS_ERR(lqe))
+                                         &qti->qti_id, NULL);
+               if (IS_ERR(lqe)) {
+                       qpi_putref(env, pool);
                        GOTO(out, rc = PTR_ERR(lqe));
+               }
+
+               /* TODO: need something like qmt_extend_lqe_gd that has
+                * to be calledeach time when qpi_slv_nr is incremented */
+               lgd = qmt_alloc_lqe_gd(pool, qtype);
+               if (!lgd) {
+                       lqe_putref(lqe);
+                       qpi_putref(env, pool);
+                       GOTO(out, rc = -ENOMEM);
+               }
+
+               qmt_setup_lqe_gd(env, qmt, lqe, lgd, pool_type);
 
                /* store reference to lqe in lr_lvb_data */
                res->lr_lvb_data = lqe;
+               qpi_putref(env, pool);
                LQUOTA_DEBUG(lqe, "initialized res lvb");
        } else {
                struct dt_object        *obj;
@@ -225,6 +262,57 @@ out:
        return rc;
 }
 
+/* clear lge_qunit/edquot_nu flags -
+ * slave recieved new qunit and edquot.
+ *
+ * \retval     true if revoke is needed - qunit
+ *             for this slave reaches least_qunit
+ */
+static bool qmt_clear_lgeg_arr_nu(struct lquota_entry *lqe, int stype, int idx)
+{
+       unsigned long least_qunit = lqe2qpi(lqe)->qpi_least_qunit;
+       struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
+
+       /* There is no array to store lge for the case of DOM.
+        * Ignore it until MDT pools will be ready. */
+       if (!(lqe_rtype(lqe) == LQUOTA_RES_DT && stype == QMT_STYPE_MDT)) {
+               lqe->lqe_glbl_data->lqeg_arr[idx].lge_qunit_nu = 0;
+               lqe->lqe_glbl_data->lqeg_arr[idx].lge_edquot_nu = 0;
+
+               /* We shouldn't call revoke for DOM case, it will be updated
+                * at qmt_id_lock_glimpse. */
+               return (lgd->lqeg_arr[idx].lge_qunit == least_qunit);
+       }
+
+       return false;
+}
+
+static void qmt_set_revoke(struct lu_env *env, struct lquota_entry *lqe,
+                         int stype, int idx)
+{
+       unsigned long least_qunit = lqe2qpi(lqe)->qpi_least_qunit;
+       struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
+
+       if (lgd->lqeg_arr[idx].lge_qunit == least_qunit) {
+               int i;
+
+               qti_lqes_write_lock(env);
+               for (i = 0; i < qti_lqes_cnt(env); i++) {
+                       LQUOTA_DEBUG(qti_lqes(env)[i],
+                                    "idx %d lge_qunit %llu least_qunit %lu\n",
+                                    idx, lgd->lqeg_arr[idx].lge_qunit,
+                                    least_qunit);
+                       if (qti_lqes(env)[i]->lqe_qunit == least_qunit) {
+                               qti_lqes(env)[i]->lqe_revoke_time =
+                                                       ktime_get_seconds();
+                               qmt_adjust_edquot(qti_lqes(env)[i],
+                                                 ktime_get_real_seconds());
+                       }
+               }
+               qti_lqes_write_unlock(env);
+       }
+}
+
 /*
  * Update LVB associated with the global quota index.
  * This function is called from the DLM itself after a glimpse callback, in this
@@ -240,7 +328,8 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
        struct lquota_lvb       *lvb;
        struct ldlm_lock        *lock;
        struct obd_export       *exp;
-       int                      rc = 0;
+       bool                     need_revoke;
+       int                      rc = 0, idx;
        ENTRY;
 
        LASSERT(res != NULL);
@@ -264,17 +353,6 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
        LASSERT(lqe != NULL);
        lqe_getref(lqe);
 
-       LQUOTA_DEBUG(lqe, "releasing:%llu may release:%llu",
-                    lvb->lvb_id_rel, lvb->lvb_id_may_rel);
-
-       if (lvb->lvb_id_rel == 0) {
-               /* nothing to release */
-               if (lvb->lvb_id_may_rel != 0)
-                       /* but might still release later ... */
-                       lqe->lqe_may_rel += lvb->lvb_id_may_rel;
-               GOTO(out, rc = 0);
-       }
-
        /* allocate environement */
        env = lu_env_find();
        LASSERT(env);
@@ -301,17 +379,49 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
                GOTO(out, rc = -EFAULT);
        }
 
-       /* release quota space */
-       rc = qmt_dqacq0(env, lqe, qmt, &exp->exp_client_uuid,
-                       QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel, 0, &qti->qti_body);
-       if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
-               LQUOTA_ERROR(lqe, "failed to release quota space on glimpse "
-                            "%llu!=%llu : rc = %d\n", qti->qti_body.qb_count,
-                            lvb->lvb_id_rel, rc);
-       class_export_put(exp);
+       rc = qmt_uuid2idx(&exp->exp_client_uuid, &idx);
+       if (rc < 0)
+               GOTO(out_exp, rc);
+
+       need_revoke = qmt_clear_lgeg_arr_nu(lqe, rc, idx);
+       if (lvb->lvb_id_rel == 0) {
+               /* nothing to release */
+               if (lvb->lvb_id_may_rel != 0)
+                       /* but might still release later ... */
+                       lqe->lqe_may_rel += lvb->lvb_id_may_rel;
+       }
+
+       if (!need_revoke && lvb->lvb_id_rel == 0)
+               GOTO(out_exp, rc = 0);
+
+       rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), rc, lqe_qtype(lqe),
+                                 &lqe->lqe_id, NULL, idx);
        if (rc)
-               GOTO(out, rc);
+               GOTO(out_exp, rc);
+
+       if (need_revoke)
+               qmt_set_revoke(env, lqe, rc, idx);
+
+       if (lvb->lvb_id_rel) {
+               LQUOTA_DEBUG(lqe, "releasing:%llu may release:%llu",
+                            lvb->lvb_id_rel, lvb->lvb_id_may_rel);
+
+               /* release quota space */
+               rc = qmt_dqacq0(env, qmt, &exp->exp_client_uuid,
+                               QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel,
+                               0, &qti->qti_body);
+               if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
+                       LQUOTA_ERROR(lqe,
+                                    "failed to release quota space on glimpse %llu!=%llu : rc = %d\n",
+                                    qti->qti_body.qb_count,
+                                    lvb->lvb_id_rel, rc);
+       }
+       qti_lqes_fini(env);
+       if (rc)
+               GOTO(out_exp, rc);
        EXIT;
+out_exp:
+       class_export_put(exp);
 out:
        lqe_putref(lqe);
        return rc;
@@ -333,37 +443,60 @@ int qmt_lvbo_size(struct lu_device *ld, struct ldlm_lock *lock)
 int qmt_lvbo_fill(struct lu_device *ld, struct ldlm_lock *lock, void *lvb,
                  int lvblen)
 {
-       struct ldlm_resource    *res = lock->l_resource;
-       struct lquota_lvb       *qlvb = lvb;
+       struct ldlm_resource *res = lock->l_resource;
+       struct lquota_lvb *qlvb = lvb;
+       struct lu_env *env;
+       int rc;
        ENTRY;
 
        LASSERT(res != NULL);
+       rc = 0;
 
        if (res->lr_type != LDLM_PLAIN || res->lr_lvb_data == NULL ||
            res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
                RETURN(-EINVAL);
 
+       env = lu_env_find();
+       LASSERT(env);
+
        if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
                /* no ID quota lock associated with UID/GID 0 or with a seq 0,
                 * we are thus dealing with an ID lock. */
-               struct lquota_entry     *lqe = res->lr_lvb_data;
-
+               struct lquota_entry *lqe = res->lr_lvb_data;
+               struct qmt_device *qmt;
+               struct obd_uuid *uuid;
+               int idx;
+
+               uuid = &(lock)->l_export->exp_client_uuid;
+               rc = qmt_uuid2idx(uuid, &idx);
+               if (rc < 0)
+                       RETURN(rc);
+               qmt = lu2qmt_dev(ld);
                /* return current qunit value & edquot flags in lvb */
                lqe_getref(lqe);
-               qlvb->lvb_id_qunit = lqe->lqe_qunit;
-               qlvb->lvb_flags = 0;
-               if (lqe->lqe_edquot)
-                       qlvb->lvb_flags = LQUOTA_FL_EDQUOT;
+               rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), rc,
+                                         lqe_qtype(lqe), &lqe->lqe_id,
+                                         NULL, idx);
+               if (!rc) {
+                       qlvb->lvb_id_qunit = qti_lqes_min_qunit(env);
+                       qlvb->lvb_flags = 0;
+                       if (qti_lqes_edquot(env))
+                               qlvb->lvb_flags = LQUOTA_FL_EDQUOT;
+                       qti_lqes_fini(env);
+               }
+               CDEBUG(D_QUOTA, "uuid %s lqe_id %lu, edquot %llu qunit %llu\n",
+                      (char *)uuid, (unsigned long)lqe->lqe_id.qid_uid,
+                      qlvb->lvb_flags, qlvb->lvb_id_qunit);
                lqe_putref(lqe);
        } else {
                /* global quota lock */
-               struct dt_object *obj = res->lr_lvb_data;
+               struct dt_object        *obj = res->lr_lvb_data;
 
                /* return current version of global index */
-               qlvb->lvb_glb_ver = dt_version_get(lu_env_find(), obj);
+               qlvb->lvb_glb_ver = dt_version_get(env, obj);
        }
 
-       RETURN(sizeof(struct lquota_lvb));
+       RETURN(rc = rc ?: sizeof(struct lquota_lvb));
 }
 
 /*
@@ -379,10 +512,13 @@ int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res)
                RETURN(0);
 
        if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
-               struct lquota_entry     *lqe = res->lr_lvb_data;
+               struct lquota_entry *lqe = res->lr_lvb_data;
+               struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
 
                /* release lqe reference */
+               lqe->lqe_glbl_data = NULL;
                lqe_putref(lqe);
+               qmt_free_lqe_gd(lgd);
        } else {
                struct dt_object *obj = res->lr_lvb_data;
                /* release object reference */
@@ -395,7 +531,7 @@ int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res)
        RETURN(0);
 }
 
-typedef int (*qmt_glimpse_cb_t)(struct ldlm_lock *, void *);
+typedef int (*qmt_glimpse_cb_t)(struct ldlm_lock *, struct lquota_entry *);
 
 struct qmt_gl_lock_array {
        unsigned long             q_max;
@@ -481,6 +617,35 @@ again:
        RETURN(0);
 }
 
+void qmt_setup_id_desc(struct ldlm_lock *lock, union ldlm_gl_desc *desc,
+                      struct lquota_entry *lqe)
+{
+       struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid;
+       struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
+       int idx, stype;
+       __u64 qunit;
+       bool edquot;
+
+       stype = qmt_uuid2idx(uuid, &idx);
+       LASSERT(stype >= 0);
+
+       /* DOM case - set global lqe settings */
+       if (lqe_rtype(lqe) == LQUOTA_RES_DT && stype == QMT_STYPE_MDT) {
+               edquot = lqe->lqe_edquot;
+               qunit = lqe->lqe_qunit;
+       } else {
+               edquot = lgd->lqeg_arr[idx].lge_edquot;
+               qunit = lgd->lqeg_arr[idx].lge_qunit;
+       }
+
+       /* fill glimpse descriptor with lqe settings */
+       desc->lquota_desc.gl_flags = edquot ? LQUOTA_FL_EDQUOT : 0;
+       desc->lquota_desc.gl_qunit = qunit;
+       CDEBUG(D_QUOTA, "setup desc: stype %d idx %d, edquot %llu qunit %llu\n",
+                        stype, idx, desc->lquota_desc.gl_flags,
+                        desc->lquota_desc.gl_qunit);
+}
+
 /*
  * Send glimpse callback to slaves holding a lock on resource \res.
  * This is used to notify slaves of new quota settings or to claim quota space
@@ -496,22 +661,50 @@ again:
  */
 static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
                            struct ldlm_resource *res, union ldlm_gl_desc *desc,
-                           qmt_glimpse_cb_t cb, void *arg)
+                           qmt_glimpse_cb_t cb, struct lquota_entry *lqe)
 {
+       union ldlm_gl_desc *descs = NULL;
+       struct lqe_glbl_data *gld;
        struct list_head *tmp, *pos;
        LIST_HEAD(gl_list);
        struct qmt_gl_lock_array locks;
-       unsigned long i;
+       unsigned long i, locks_count;
        int rc = 0;
        ENTRY;
 
+       gld = lqe ? lqe->lqe_glbl_data : NULL;
        memset(&locks, 0, sizeof(locks));
-       rc = qmt_alloc_lock_array(res, &locks, cb, arg);
+       rc = qmt_alloc_lock_array(res, &locks, cb, lqe);
        if (rc) {
                CERROR("%s: failed to allocate glimpse lock array (%d)\n",
                       qmt->qmt_svname, rc);
                RETURN(rc);
        }
+       if (!locks.q_cnt) {
+               CDEBUG(D_QUOTA, "%s: no granted locks to send glimpse\n",
+                      qmt->qmt_svname);
+               RETURN(0);
+       }
+       CDEBUG(D_QUOTA, "found granted locks %lu\n", locks.q_cnt);
+       locks_count = locks.q_cnt;
+
+       /* Use one desc for all works, when called from qmt_glb_lock_notify */
+       if (gld && locks.q_cnt > 1) {
+               /* TODO: think about to store this preallocated descs
+                * in lqe_global in lqeg_arr as a part of lqe_glbl_entry.
+                * The benefit is that we don't need to allocate/free
+                * and setup this descs each time. But the drawback is
+                * memory use (sizeof ldlm_gl_desc * OST_COUNT * user_number).
+                * for examfple it could be 88 * 256 * 10 000 about 225 MB. */
+               OBD_ALLOC(descs,
+                         sizeof(struct ldlm_gl_lquota_desc) * locks.q_cnt);
+               if (!descs) {
+                       CERROR("%s: alloc glimpse lock array failed: rc = %d\n",
+                              qmt->qmt_svname, rc);
+                       qmt_free_lock_array(&locks);
+                       RETURN(-ENOMEM);
+               }
+       }
 
        for (i = locks.q_cnt; i > 0; i--) {
                struct ldlm_glimpse_work *work;
@@ -523,6 +716,13 @@ static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
                        continue;
                }
 
+               if (gld) {
+                       if (descs)
+                               desc = &descs[i - 1];
+                       qmt_setup_id_desc(locks.q_locks[i - 1], desc, lqe);
+                       work->gl_interpret_data = lqe;
+               }
+
                list_add_tail(&work->gl_list, &gl_list);
                work->gl_lock  = locks.q_locks[i - 1];
                work->gl_flags = 0;
@@ -536,7 +736,7 @@ static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
 
        if (list_empty(&gl_list)) {
                CDEBUG(D_QUOTA, "%s: nobody to notify\n", qmt->qmt_svname);
-               RETURN(0);
+               GOTO(out, rc = 0);
        }
 
        /* issue glimpse callbacks to all connected slaves */
@@ -554,6 +754,10 @@ static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
                LDLM_LOCK_RELEASE(work->gl_lock);
                OBD_FREE_PTR(work);
        }
+out:
+       if (descs)
+               OBD_FREE(descs,
+                        sizeof(struct ldlm_gl_lquota_desc) * locks_count);
 
        RETURN(rc);
 }
@@ -574,8 +778,7 @@ void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
        struct ldlm_resource    *res = NULL;
        ENTRY;
 
-       lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype,
-                           lqe->lqe_site->lqs_qtype);
+       lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype, lqe_qtype(lqe));
 
        /* send glimpse callback to notify slaves of new quota settings */
        qti->qti_gl_desc.lquota_desc.gl_id        = lqe->lqe_id;
@@ -613,16 +816,25 @@ void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
 
 /* Callback function used to select locks that should be glimpsed when
  * broadcasting the new qunit value */
-static int qmt_id_lock_cb(struct ldlm_lock *lock, void *arg)
+static int qmt_id_lock_cb(struct ldlm_lock *lock, struct lquota_entry *lqe)
 {
-       struct obd_uuid *slv_uuid = arg;
-       struct obd_uuid *uuid = &lock->l_export->exp_client_uuid;
+       struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid;
+       struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
+       int idx;
+       int stype = qmt_uuid2idx(uuid, &idx);
 
-       if (slv_uuid != NULL && obd_uuid_equals(uuid, slv_uuid))
-               RETURN(0);
-       RETURN(+1);
+       LASSERT(stype == QMT_STYPE_OST || stype == QMT_STYPE_MDT);
+
+       /* Quota pools support only OSTs, despite MDTs also could be registered
+        * as LQUOTA_RES_DT devices(DOM). */
+       if (lqe_rtype(lqe) == LQUOTA_RES_DT && stype == QMT_STYPE_MDT)
+               return 1;
+       else
+               return lgd->lqeg_arr[idx].lge_edquot_nu ||
+                      lgd->lqeg_arr[idx].lge_qunit_nu;
 }
 
+
 /*
  * Send glimpse request on per-ID lock to push new qunit value to slave.
  *
@@ -643,8 +855,7 @@ static void qmt_id_lock_glimpse(const struct lu_env *env,
        if (!lqe->lqe_enforced)
                RETURN_EXIT;
 
-       lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype,
-                           lqe->lqe_site->lqs_qtype);
+       lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype, lqe_qtype(lqe));
        fid_build_quota_res_name(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid);
        res = ldlm_resource_get(qmt->qmt_ns, NULL, &qti->qti_resid, LDLM_PLAIN,
                                0);
@@ -668,15 +879,14 @@ static void qmt_id_lock_glimpse(const struct lu_env *env,
         * - notify slaves that master ran out of quota space and there is no
         *   need to send acquire request any more until further notice */
 
-       /* fill glimpse descriptor with lqe settings */
-       if (lqe->lqe_edquot)
-               qti->qti_gl_desc.lquota_desc.gl_flags = LQUOTA_FL_EDQUOT;
-       else
-               qti->qti_gl_desc.lquota_desc.gl_flags = 0;
-       qti->qti_gl_desc.lquota_desc.gl_qunit = lqe->lqe_qunit;
-
+       /* TODO: it is not clear how to implement below case for all lqes
+        * from where slaves will be notified in qmt_glimpse_lock. Because
+        * here we have just global lqe with an array of OSTs that should
+        * be notified. Theoretically we can find all lqes that includes
+        * these OSTs, but it is not trivial. So I would propose to move
+        * this case to another place ... */
        if (lqe->lqe_revoke_time == 0 &&
-           qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit)
+           lqe->lqe_qunit == pool->qpi_least_qunit)
                /* reset lqe_may_rel, it will be updated on glimpse callback
                 * replies if needed */
                lqe->lqe_may_rel = 0;
@@ -688,11 +898,10 @@ static void qmt_id_lock_glimpse(const struct lu_env *env,
 
        /* issue glimpse callback to slaves */
        qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
-                        uuid ? qmt_id_lock_cb : NULL, (void *)uuid);
+                        qmt_id_lock_cb, lqe);
 
        lqe_write_lock(lqe);
        if (lqe->lqe_revoke_time == 0 &&
-           qti->qti_gl_desc.lquota_desc.gl_qunit == pool->qpi_least_qunit &&
            lqe->lqe_qunit == pool->qpi_least_qunit) {
                lqe->lqe_revoke_time = ktime_get_seconds();
                qmt_adjust_edquot(lqe, ktime_get_real_seconds());
@@ -717,6 +926,7 @@ void qmt_id_lock_notify(struct qmt_device *qmt, struct lquota_entry *lqe)
        bool    added = false;
        ENTRY;
 
+       LASSERT(lqe->lqe_is_global);
        lqe_getref(lqe);
        spin_lock(&qmt->qmt_reba_lock);
        if (!qmt->qmt_stopping && list_empty(&lqe->lqe_link)) {