Whamcloud - gitweb
LU-17504 build: fix gcc-13 [-Werror=stringop-overread] error
[fs/lustre-release.git] / lustre / quota / qmt_lock.c
index e37d041..d847529 100644 (file)
  * GPL HEADER END
  */
 /*
- * Copyright (c) 2012 Intel, Inc.
+ * Copyright (c) 2012, 2017, Intel Corporation.
  * Use is subject to license terms.
  *
  * Author: Johann Lombardi <johann.lombardi@intel.com>
  * Author: Niu    Yawei    <yawei.niu@intel.com>
  */
 
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
-
 #define DEBUG_SUBSYSTEM S_LQUOTA
 
+#include <linux/kthread.h>
+#include <linux/workqueue.h>
+
 #include <lustre_dlm.h>
+#include <lustre_swab.h>
 #include <obd_class.h>
 
 #include "qmt_internal.h"
 
+struct workqueue_struct *qmt_lvbo_free_wq;
+
 /* intent policy function called from mdt_intent_opc() when the intent is of
  * quota type */
 int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
@@ -51,10 +53,14 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
        struct quota_body       *repbody;
        struct obd_uuid         *uuid;
        struct lquota_lvb       *lvb;
-       int                      rc;
+       struct ldlm_resource    *res = (*lockp)->l_resource;
+       struct ldlm_reply       *ldlm_rep;
+       int                      rc, lvb_len;
        ENTRY;
 
        req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_QUOTA);
+       req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
+                            ldlm_lvbo_size(*lockp));
 
        /* extract quota body and intent opc */
        it = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
@@ -76,16 +82,69 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
        if (repbody == NULL)
                RETURN(err_serious(-EFAULT));
 
+       ldlm_rep = req_capsule_server_get(&req->rq_pill, &RMF_DLM_REP);
+       if (ldlm_rep == NULL)
+               RETURN(err_serious(-EFAULT));
+
        uuid = &(*lockp)->l_export->exp_client_uuid;
        switch (it->opc) {
 
-       case IT_QUOTA_DQACQ:
-               /* XXX: to be added in a next patch */
-               GOTO(out, -EOPNOTSUPP);
+       case IT_QUOTA_DQACQ: {
+               struct lquota_entry     *lqe;
+               struct ldlm_lock        *lock;
+               int idx, stype;
+
+               if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
+                       /* acquire on global lock? something is wrong ... */
+                       GOTO(out, rc = -EPROTO);
+
+               /* verify global lock isn't stale */
+               if (!lustre_handle_is_used(&reqbody->qb_glb_lockh))
+                       GOTO(out, rc = -ENOLCK);
+
+               lock = ldlm_handle2lock(&reqbody->qb_glb_lockh);
+               if (lock == NULL)
+                       GOTO(out, rc = -ENOLCK);
+               LDLM_LOCK_PUT(lock);
+
+               stype = qmt_uuid2idx(uuid, &idx);
+               if (stype < 0)
+                       GOTO(out, rc = -EINVAL);
+
+               /* TODO: it seems we don't need to get lqe from
+                * lq_lvb_data anymore ... And do extra get
+                * and put on it */
+               lqe = res->lr_lvb_data;
+               LASSERT(lqe != NULL);
+               lqe_getref(lqe);
+
+               rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), stype,
+                                         lqe_qtype(lqe), &reqbody->qb_id,
+                                         NULL, idx);
+               if (rc) {
+                       lqe_putref(lqe);
+                       GOTO(out, rc);
+               }
+
+               /* acquire quota space */
+               rc = qmt_dqacq0(env, qmt, uuid,
+                               reqbody->qb_flags, reqbody->qb_count,
+                               reqbody->qb_usage, repbody,
+                               qmt_dom(lqe_rtype(lqe), stype) ? -1 : idx);
+               lqe_putref(lqe);
+               qti_lqes_fini(env);
+               if (rc)
+                       GOTO(out, rc);
                break;
+       }
 
        case IT_QUOTA_CONN:
                /* new connection from slave */
+
+               if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0)
+                       /* connection on per-ID lock? something is wrong ... */
+                       GOTO(out, rc = -EPROTO);
+
                rc = qmt_pool_new_conn(env, qmt, &reqbody->qb_fid,
                                       &repbody->qb_slv_fid,
                                       &repbody->qb_slv_ver, uuid);
@@ -94,19 +153,23 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
                break;
 
        default:
-               CERROR("%s: invalid intent opcode: "LPU64"\n", qmt->qmt_svname,
+               CERROR("%s: invalid intent opcode: %llu\n", qmt->qmt_svname,
                       it->opc);
-               GOTO(out, rc = err_serious(-EINVAL));
+               GOTO(out, rc = -EINVAL);
        }
 
        /* on success, pack lvb in reply */
-       req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
-                            ldlm_lvbo_size(*lockp));
        lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
-       ldlm_lvbo_fill(*lockp, lvb, ldlm_lvbo_size(*lockp));
-       EXIT;
+       lvb_len = ldlm_lvbo_size(*lockp);
+       lvb_len = ldlm_lvbo_fill(*lockp, lvb, &lvb_len);
+       if (lvb_len < 0)
+               GOTO(out, rc = lvb_len);
+
+       req_capsule_shrink(&req->rq_pill, &RMF_DLM_LVB, lvb_len, RCL_SERVER);
 out:
-       return rc;
+       ldlm_rep->lock_policy_res2 = clear_serious(rc);
+       EXIT;
+       return ELDLM_OK;
 }
 
 /*
@@ -118,7 +181,7 @@ int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res)
        struct lu_env           *env;
        struct qmt_thread_info  *qti;
        struct qmt_device       *qmt = lu2qmt_dev(ld);
-       int                      pool_id, pool_type, qtype;
+       int                      pool_type, qtype;
        int                      rc;
        ENTRY;
 
@@ -131,25 +194,17 @@ int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res)
            res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
                RETURN(0);
 
-       OBD_ALLOC_PTR(env);
-       if (env == NULL)
-               RETURN(-ENOMEM);
-
-       /* initialize environment */
-       rc = lu_env_init(env, LCT_MD_THREAD);
-       if (rc) {
-               OBD_FREE_PTR(env);
-               RETURN(rc);
-       }
+       env = lu_env_find();
+       LASSERT(env);
        qti = qmt_info(env);
 
        /* extract global index FID and quota identifier */
-       fid_extract_quota_resid(&res->lr_name, &qti->qti_fid, &qti->qti_id);
+       fid_extract_from_quota_res(&qti->qti_fid, &qti->qti_id, &res->lr_name);
 
        /* sanity check the global index FID */
-       rc = lquota_extract_fid(&qti->qti_fid, &pool_id, &pool_type, &qtype);
+       rc = lquota_extract_fid(&qti->qti_fid, &pool_type, &qtype);
        if (rc) {
-               CERROR("can't extract pool information from FID "DFID"\n",
+               CERROR("can't extract glb index information from FID "DFID"\n",
                       PFID(&qti->qti_fid));
                GOTO(out, rc);
        }
@@ -157,16 +212,36 @@ int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res)
        if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
                /* no ID quota lock associated with UID/GID 0 or with a seq 0,
                 * we are thus dealing with an ID lock. */
+               struct qmt_pool_info    *pool;
                struct lquota_entry     *lqe;
+               struct lqe_glbl_data    *lgd;
+
+               pool = qmt_pool_lookup_glb(env, qmt, pool_type);
+               if (IS_ERR(pool))
+                       GOTO(out, rc = -ENOMEM);
 
                /* Find the quota entry associated with the quota id */
-               lqe = qmt_pool_lqe_lookup(env, qmt, pool_id, pool_type, qtype,
-                                         &qti->qti_id);
-               if (IS_ERR(lqe))
+               lqe = qmt_pool_lqe_lookup(env, qmt, pool_type, qtype,
+                                         &qti->qti_id, NULL);
+               if (IS_ERR(lqe)) {
+                       qpi_putref(env, pool);
                        GOTO(out, rc = PTR_ERR(lqe));
+               }
+
+               /* TODO: need something like qmt_extend_lqe_gd that has
+                * to be calledeach time when qpi_slv_nr is incremented */
+               lgd = qmt_alloc_lqe_gd(pool, qtype);
+               if (!lgd) {
+                       lqe_putref(lqe);
+                       qpi_putref(env, pool);
+                       GOTO(out, rc = -ENOMEM);
+               }
+
+               qmt_setup_lqe_gd(env, qmt, lqe, lgd, pool_type);
 
                /* store reference to lqe in lr_lvb_data */
                res->lr_lvb_data = lqe;
+               qpi_putref(env, pool);
                LQUOTA_DEBUG(lqe, "initialized res lvb");
        } else {
                struct dt_object        *obj;
@@ -176,7 +251,7 @@ int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res)
                if (IS_ERR(obj))
                        GOTO(out, rc = PTR_ERR(obj));
                if (!dt_object_exists(obj)) {
-                       lu_object_put(env, &obj->do_lu);
+                       dt_object_put(env, obj);
                        GOTO(out, rc = -ENOENT);
                }
 
@@ -185,14 +260,88 @@ int qmt_lvbo_init(struct lu_device *ld, struct ldlm_resource *res)
                CDEBUG(D_QUOTA, DFID" initialized lvb\n", PFID(&qti->qti_fid));
        }
 
-       res->lr_lvb_len  = sizeof(struct lquota_lvb);
+       res->lr_lvb_len = sizeof(struct lquota_lvb);
        EXIT;
 out:
-       lu_env_fini(env);
-       OBD_FREE_PTR(env);
        return rc;
 }
 
+/* clear lge_qunit/edquot_nu flags -
+ * slave recieved new qunit and edquot.
+ *
+ * \retval     true if revoke is needed - qunit
+ *             for this slave reaches least_qunit
+ */
+static bool qmt_clear_lgeg_arr_nu(struct lquota_entry *lqe, int stype, int idx)
+{
+       unsigned long least = lqe2qpi(lqe)->qpi_least_qunit;
+       bool revoke = false;
+
+       /* There is no array to store lge for the case of DOM.
+        * Ignore it until MDT pools will be ready.
+        */
+       if (!qmt_dom(lqe_rtype(lqe), stype)) {
+               struct lqe_glbl_data *lgd;
+
+               mutex_lock(&lqe->lqe_glbl_data_lock);
+               lgd = lqe->lqe_glbl_data;
+               if (lgd) {
+                       int lge_idx = qmt_map_lge_idx(lgd, idx);
+
+                       lgd->lqeg_arr[lge_idx].lge_qunit_nu = 0;
+                       lgd->lqeg_arr[lge_idx].lge_edquot_nu = 0;
+                       /* We shouldn't call revoke for DOM case, it will be
+                        * updated at qmt_id_lock_glimpse.
+                        */
+                       revoke = lgd->lqeg_arr[lge_idx].lge_qunit == least;
+               }
+               mutex_unlock(&lqe->lqe_glbl_data_lock);
+       }
+
+       return revoke;
+}
+
+static bool qmt_set_revoke(struct lu_env *env, struct lquota_entry *lqe_gl,
+                         int stype, int idx)
+{
+       unsigned long least_qunit = lqe2qpi(lqe_gl)->qpi_least_qunit;
+       bool notify = false;
+
+       if (qmt_dom(lqe_rtype(lqe_gl), stype))
+               return false;
+
+       qti_lqes_write_lock(env);
+       mutex_lock(&lqe_gl->lqe_glbl_data_lock);
+       if (lqe_gl->lqe_glbl_data) {
+               struct lqe_glbl_data *lgd = lqe_gl->lqe_glbl_data;
+               int lge_idx;
+
+               lge_idx = qmt_map_lge_idx(lgd, idx);
+               if (lgd->lqeg_arr[lge_idx].lge_qunit == least_qunit) {
+                       struct lquota_entry *lqe;
+                       int i;
+
+                       for (i = 0; i < qti_lqes_cnt(env); i++) {
+                               lqe = qti_lqes(env)[i];
+                               LQUOTA_DEBUG(lqe,
+                                            "lge_qunit %llu least_qunit %lu idx %d\n",
+                                            lgd->lqeg_arr[lge_idx].lge_qunit,
+                                            least_qunit, idx);
+                               if (lqe->lqe_qunit == least_qunit) {
+                                       lqe->lqe_revoke_time =
+                                                       ktime_get_seconds();
+                                       notify |= qmt_adjust_edquot(lqe,
+                                                 ktime_get_real_seconds());
+                               }
+                       }
+               }
+       }
+       mutex_unlock(&lqe_gl->lqe_glbl_data_lock);
+       qti_lqes_write_unlock(env);
+
+       return notify;
+}
+
 /*
  * Update LVB associated with the global quota index.
  * This function is called from the DLM itself after a glimpse callback, in this
@@ -206,7 +355,10 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
        struct qmt_device       *qmt = lu2qmt_dev(ld);
        struct lquota_entry     *lqe;
        struct lquota_lvb       *lvb;
-       int                      rc = 0;
+       struct ldlm_lock        *lock;
+       struct obd_export       *exp;
+       bool                     need_revoke;
+       int                      rc = 0, idx, stype;
        ENTRY;
 
        LASSERT(res != NULL);
@@ -218,35 +370,100 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
                /* no need to update lvb for global quota locks */
                RETURN(0);
 
+       lvb = req_capsule_server_swab_get(&req->rq_pill, &RMF_DLM_LVB,
+                                         lustre_swab_lquota_lvb);
+       if (lvb == NULL) {
+               CERROR("%s: failed to extract lvb from request\n",
+                      qmt->qmt_svname);
+               RETURN(-EFAULT);
+       }
+
        lqe = res->lr_lvb_data;
        LASSERT(lqe != NULL);
+       lqe_getref(lqe);
 
        /* allocate environement */
-       OBD_ALLOC_PTR(env);
-       if (env == NULL)
-               RETURN(-ENOMEM);
+       env = lu_env_find();
+       LASSERT(env);
+       qti = qmt_info(env);
 
-       /* initialize environment */
-       rc = lu_env_init(env, LCT_MD_THREAD);
-       if (rc) {
-               OBD_FREE_PTR(env);
-               RETURN(rc);
+       /* The request is a glimpse callback which was sent via the
+        * reverse import to the slave. What we care about here is the
+        * export associated with the slave and req->rq_export is
+        * definitely not what we are looking for (it is actually set to
+        * NULL here).
+        * Therefore we extract the lock from the request argument
+        * and use lock->l_export. */
+       lock = ldlm_request_lock(req);
+       if (IS_ERR(lock)) {
+               CERROR("%s: failed to get lock from request!\n",
+                      qmt->qmt_svname);
+               GOTO(out, rc = PTR_ERR(lock));
        }
-       qti = qmt_info(env);
 
-       lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
-       if (lvb == NULL) {
-               CERROR("%s: failed to extract lvb from request\n",
+       exp = class_export_get(lock->l_export);
+       if (exp == NULL) {
+               CERROR("%s: failed to get export from lock!\n",
                       qmt->qmt_svname);
-               GOTO(out, rc);
+               GOTO(out, rc = -EFAULT);
        }
 
-       /* XXX: Space release handling to be added in a next patch */
+       stype = qmt_uuid2idx(&exp->exp_client_uuid, &idx);
+       if (stype < 0)
+               GOTO(out_exp, rc = stype);
+
+       need_revoke = qmt_clear_lgeg_arr_nu(lqe, stype, idx);
+       if (lvb->lvb_id_rel == 0) {
+               /* nothing to release */
+               if (lvb->lvb_id_may_rel != 0)
+                       /* but might still release later ... */
+                       lqe->lqe_may_rel += lvb->lvb_id_may_rel;
+       }
+
+       if (!need_revoke && lvb->lvb_id_rel == 0)
+               GOTO(out_exp, rc = 0);
+
+       rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), stype,
+                                 lqe_qtype(lqe), &lqe->lqe_id, NULL, idx);
+       if (rc)
+               GOTO(out_exp, rc);
+
+       if (need_revoke && qmt_set_revoke(env, lqe, stype, idx)) {
+               int notify = false;
+
+               mutex_lock(&lqe->lqe_glbl_data_lock);
+               if (lqe->lqe_glbl_data) {
+                       qmt_seed_glbe_edquot(env, lqe->lqe_glbl_data);
+                       notify = true;
+               }
+               mutex_unlock(&lqe->lqe_glbl_data_lock);
+               if (notify)
+                       qmt_id_lock_notify(qmt, lqe);
+       }
 
+       if (lvb->lvb_id_rel) {
+               LQUOTA_DEBUG(lqe, "releasing:%llu may release:%llu",
+                            lvb->lvb_id_rel, lvb->lvb_id_may_rel);
+
+               /* release quota space */
+               rc = qmt_dqacq0(env, qmt, &exp->exp_client_uuid,
+                               QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel,
+                               0, &qti->qti_body,
+                               qmt_dom(lqe_rtype(lqe), stype) ? -1 : idx);
+               if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
+                       LQUOTA_ERROR(lqe,
+                                    "failed to release quota space on glimpse %llu!=%llu : rc = %d\n",
+                                    qti->qti_body.qb_count,
+                                    lvb->lvb_id_rel, rc);
+       }
+       qti_lqes_fini(env);
+       if (rc)
+               GOTO(out_exp, rc);
        EXIT;
+out_exp:
+       class_export_put(exp);
 out:
-       lu_env_fini(env);
-       OBD_FREE_PTR(env);
+       lqe_putref(lqe);
        return rc;
 }
 
@@ -266,53 +483,60 @@ int qmt_lvbo_size(struct lu_device *ld, struct ldlm_lock *lock)
 int qmt_lvbo_fill(struct lu_device *ld, struct ldlm_lock *lock, void *lvb,
                  int lvblen)
 {
-       struct ldlm_resource    *res = lock->l_resource;
-       struct lquota_lvb       *qlvb = lvb;
+       struct ldlm_resource *res = lock->l_resource;
+       struct lquota_lvb *qlvb = lvb;
+       struct lu_env *env;
+       int rc;
        ENTRY;
 
        LASSERT(res != NULL);
+       rc = 0;
 
        if (res->lr_type != LDLM_PLAIN || res->lr_lvb_data == NULL ||
            res->lr_name.name[LUSTRE_RES_ID_SEQ_OFF] != FID_SEQ_QUOTA_GLB)
                RETURN(-EINVAL);
 
+       env = lu_env_find();
+       LASSERT(env);
+
        if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
                /* no ID quota lock associated with UID/GID 0 or with a seq 0,
                 * we are thus dealing with an ID lock. */
-               struct lquota_entry     *lqe = res->lr_lvb_data;
-
+               struct lquota_entry *lqe = res->lr_lvb_data;
+               struct qmt_device *qmt;
+               struct obd_uuid *uuid;
+               int idx;
+
+               uuid = &(lock)->l_export->exp_client_uuid;
+               rc = qmt_uuid2idx(uuid, &idx);
+               if (rc < 0)
+                       RETURN(rc);
+               qmt = lu2qmt_dev(ld);
                /* return current qunit value & edquot flags in lvb */
                lqe_getref(lqe);
-               qlvb->lvb_id_qunit = lqe->lqe_qunit;
-               qlvb->lvb_flags = 0;
-               if (lqe->lqe_edquot)
-                       qlvb->lvb_flags = LQUOTA_FL_EDQUOT;
+               rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), rc,
+                                         lqe_qtype(lqe), &lqe->lqe_id,
+                                         NULL, idx);
+               if (!rc) {
+                       qlvb->lvb_id_qunit = qti_lqes_min_qunit(env);
+                       qlvb->lvb_flags = 0;
+                       if (qti_lqes_edquot(env))
+                               qlvb->lvb_flags = LQUOTA_FL_EDQUOT;
+                       qti_lqes_fini(env);
+               }
+               CDEBUG(D_QUOTA, "uuid %s lqe_id %lu, edquot %llu qunit %llu\n",
+                      (char *)uuid, (unsigned long)lqe->lqe_id.qid_uid,
+                      qlvb->lvb_flags, qlvb->lvb_id_qunit);
                lqe_putref(lqe);
        } else {
                /* global quota lock */
-               struct lu_env           *env;
-               int                      rc;
-               struct dt_object        *obj = res->lr_lvb_data;
-
-               OBD_ALLOC_PTR(env);
-               if (env == NULL)
-                       RETURN(-ENOMEM);
-
-               /* initialize environment */
-               rc = lu_env_init(env, LCT_LOCAL);
-               if (rc) {
-                       OBD_FREE_PTR(env);
-                       RETURN(rc);
-               }
+               struct dt_object *obj = res->lr_lvb_data;
 
                /* return current version of global index */
                qlvb->lvb_glb_ver = dt_version_get(env, obj);
-
-               lu_env_fini(env);
-               OBD_FREE_PTR(env);
        }
 
-       RETURN(sizeof(struct lquota_lvb));
+       RETURN(rc = rc ?: sizeof(struct lquota_lvb));
 }
 
 /*
@@ -328,30 +552,13 @@ int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res)
                RETURN(0);
 
        if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
-               struct lquota_entry     *lqe = res->lr_lvb_data;
+               struct lquota_entry *lqe = res->lr_lvb_data;
 
-               /* release lqe reference */
-               lqe_putref(lqe);
+               queue_work(qmt_lvbo_free_wq, &lqe->lqe_work);
        } else {
-               struct dt_object        *obj = res->lr_lvb_data;
-               struct lu_env           *env;
-               int                      rc;
-
-               OBD_ALLOC_PTR(env);
-               if (env == NULL)
-                       RETURN(-ENOMEM);
-
-               /* initialize environment */
-               rc = lu_env_init(env, LCT_LOCAL);
-               if (rc) {
-                       OBD_FREE_PTR(env);
-                       RETURN(rc);
-               }
-
+               struct dt_object *obj = res->lr_lvb_data;
                /* release object reference */
-               lu_object_put(env, &obj->do_lu);
-               lu_env_fini(env);
-               OBD_FREE_PTR(env);
+               dt_object_put(lu_env_find(), obj);
        }
 
        res->lr_lvb_data = NULL;
@@ -360,9 +567,136 @@ int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res)
        RETURN(0);
 }
 
-typedef int (*qmt_glimpse_cb_t)(const struct lu_env *, struct qmt_device *,
-                               struct obd_uuid *, union ldlm_gl_desc *,
-                               void *);
+typedef int (*qmt_glimpse_cb_t)(struct ldlm_lock *, struct lquota_entry *);
+
+struct qmt_gl_lock_array {
+       unsigned long             q_max;
+       unsigned long             q_cnt;
+       struct ldlm_lock        **q_locks;
+};
+
+static void qmt_free_lock_array(struct qmt_gl_lock_array *array)
+{
+       int i;
+
+       if (array->q_max == 0) {
+               LASSERT(array->q_locks == NULL);
+               return;
+       }
+
+       for (i = 0; i < array->q_cnt; i++) {
+               LASSERT(array->q_locks[i]);
+               LDLM_LOCK_RELEASE(array->q_locks[i]);
+               array->q_locks[i] = NULL;
+       }
+       array->q_cnt = 0;
+       OBD_FREE_PTR_ARRAY(array->q_locks, array->q_max);
+       array->q_locks = NULL;
+       array->q_max = 0;
+}
+
+static int qmt_alloc_lock_array(struct ldlm_resource *res,
+                               struct qmt_gl_lock_array *array,
+                               qmt_glimpse_cb_t cb, void *arg)
+{
+       struct lquota_entry *lqe = arg;
+       struct list_head *pos;
+       unsigned long count = 0;
+       int fail_cnt = 0;
+       ENTRY;
+
+       LASSERT(!array->q_max && !array->q_cnt && !array->q_locks);
+again:
+       if (cb)
+               mutex_lock(&lqe->lqe_glbl_data_lock);
+       lock_res(res);
+       /* scan list of granted locks */
+       list_for_each(pos, &res->lr_granted) {
+               struct ldlm_lock *lock;
+               int rc;
+
+               lock = list_entry(pos, struct ldlm_lock, l_res_link);
+               LASSERT(lock->l_export);
+
+               if (cb != NULL) {
+                       rc = cb(lock, arg);
+                       /* slave should not be notified */
+                       if (rc == 0)
+                               continue;
+               }
+
+               count++;
+               if (array->q_max != 0 && array->q_cnt < array->q_max) {
+                       array->q_locks[array->q_cnt] = LDLM_LOCK_GET(lock);
+                       array->q_cnt++;
+               }
+       }
+       unlock_res(res);
+       if (cb)
+               mutex_unlock(&lqe->lqe_glbl_data_lock);
+
+       if (count > array->q_max) {
+               qmt_free_lock_array(array);
+               if (++fail_cnt > 5)
+                       RETURN(-EAGAIN);
+               /*
+                * allocate more slots in case of more qualified locks are
+                * found during next loop
+                */
+               array->q_max = count + count / 2 + 10;
+               count = 0;
+               LASSERT(array->q_locks == NULL && array->q_cnt == 0);
+               OBD_ALLOC_PTR_ARRAY(array->q_locks, array->q_max);
+               if (array->q_locks == NULL) {
+                       array->q_max = 0;
+                       RETURN(-ENOMEM);
+               }
+
+               goto again;
+       }
+       RETURN(0);
+}
+
+static void qmt_setup_id_desc(struct ldlm_lock *lock, union ldlm_gl_desc *desc,
+                             struct lquota_entry *lqe)
+{
+       struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid;
+       int idx, stype;
+       __u64 qunit;
+       bool edquot;
+
+       stype = qmt_uuid2idx(uuid, &idx);
+       LASSERT(stype >= 0);
+
+       /* DOM case - set global lqe settings */
+       if (qmt_dom(lqe_rtype(lqe), stype)) {
+               edquot = lqe->lqe_edquot;
+               qunit = lqe->lqe_qunit;
+       } else {
+               struct lqe_glbl_data *lgd;
+               int lge_idx;
+
+               mutex_lock(&lqe->lqe_glbl_data_lock);
+               lgd = lqe->lqe_glbl_data;
+               if (lgd) {
+                       lge_idx = qmt_map_lge_idx(lgd, idx);
+                       edquot = lgd->lqeg_arr[lge_idx].lge_edquot;
+                       qunit = lgd->lqeg_arr[lge_idx].lge_qunit;
+               } else {
+                       edquot = lqe->lqe_edquot;
+                       qunit = lqe->lqe_qunit;
+               }
+               mutex_unlock(&lqe->lqe_glbl_data_lock);
+       }
+
+       /* fill glimpse descriptor with lqe settings */
+       desc->lquota_desc.gl_flags = edquot ? LQUOTA_FL_EDQUOT : 0;
+       desc->lquota_desc.gl_qunit = qunit;
+       CDEBUG(D_QUOTA, "setup desc: stype %d idx %d, edquot %llu qunit %llu\n",
+                        stype, idx, desc->lquota_desc.gl_flags,
+                        desc->lquota_desc.gl_qunit);
+}
+
 /*
  * Send glimpse callback to slaves holding a lock on resource \res.
  * This is used to notify slaves of new quota settings or to claim quota space
@@ -378,72 +712,101 @@ typedef int (*qmt_glimpse_cb_t)(const struct lu_env *, struct qmt_device *,
  */
 static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
                            struct ldlm_resource *res, union ldlm_gl_desc *desc,
-                           qmt_glimpse_cb_t cb, void *arg)
+                           qmt_glimpse_cb_t cb, struct lquota_entry *lqe)
 {
-       cfs_list_t      *tmp, *pos;
-       CFS_LIST_HEAD(gl_list);
-       int              rc = 0;
+       union ldlm_gl_desc *descs = NULL;
+       struct list_head *tmp, *pos;
+       LIST_HEAD(gl_list);
+       struct qmt_gl_lock_array locks;
+       unsigned long i, locks_count;
+       int rc = 0;
        ENTRY;
 
-       lock_res(res);
-       /* scan list of granted locks */
-       cfs_list_for_each(pos, &res->lr_granted) {
-               struct ldlm_glimpse_work        *work;
-               struct ldlm_lock                *lock;
-               struct obd_uuid                 *uuid;
-
-               lock = cfs_list_entry(pos, struct ldlm_lock, l_res_link);
-               LASSERT(lock->l_export);
-               uuid = &lock->l_export->exp_client_uuid;
-
-               if (cb != NULL) {
-                       rc = cb(env, qmt, uuid, desc, arg);
-                       if (rc == 0)
-                               /* slave should not be notified */
-                               continue;
-                       if (rc < 0)
-                               /* something wrong happened, we still notify */
-                               CERROR("%s: callback function failed to "
-                                      "determine whether slave %s should be "
-                                      "notified (%d)\n", qmt->qmt_svname,
-                                      obd_uuid2str(uuid), rc);
+       memset(&locks, 0, sizeof(locks));
+       rc = qmt_alloc_lock_array(res, &locks, cb, lqe);
+       if (rc) {
+               CERROR("%s: failed to allocate glimpse lock array (%d)\n",
+                      qmt->qmt_svname, rc);
+               RETURN(rc);
+       }
+       if (!locks.q_cnt) {
+               CDEBUG(D_QUOTA, "%s: no granted locks to send glimpse\n",
+                      qmt->qmt_svname);
+               RETURN(0);
+       }
+       CDEBUG(D_QUOTA, "found granted locks %lu\n", locks.q_cnt);
+       locks_count = locks.q_cnt;
+
+       /* Use one desc for all works, when called from qmt_glb_lock_notify */
+       if (cb && locks.q_cnt > 1) {
+               /* TODO: think about to store this preallocated descs
+                * in lqe_global in lqeg_arr as a part of lqe_glbl_entry.
+                * The benefit is that we don't need to allocate/free
+                * and setup this descs each time. But the drawback is
+                * memory use (sizeof ldlm_gl_desc * OST_COUNT * user_number).
+                * for examfple it could be 88 * 256 * 10 000 about 225 MB. */
+               OBD_ALLOC(descs,
+                         sizeof(struct ldlm_gl_lquota_desc) * locks.q_cnt);
+               if (!descs) {
+                       CERROR("%s: alloc glimpse lock array failed: rc = %d\n",
+                              qmt->qmt_svname, rc);
+                       qmt_free_lock_array(&locks);
+                       RETURN(-ENOMEM);
                }
+       }
+
+       for (i = locks.q_cnt; i > 0; i--) {
+               struct ldlm_glimpse_work *work;
 
                OBD_ALLOC_PTR(work);
                if (work == NULL) {
-                       CERROR("%s: failed to notify %s\n", qmt->qmt_svname,
-                              obd_uuid2str(uuid));
+                       CERROR("%s: failed to notify a lock.\n",
+                              qmt->qmt_svname);
                        continue;
                }
 
-               cfs_list_add_tail(&work->gl_list, &gl_list);
-               work->gl_lock  = LDLM_LOCK_GET(lock);
+               if (cb) {
+                       if (descs)
+                               desc = &descs[i - 1];
+                       qmt_setup_id_desc(locks.q_locks[i - 1], desc, lqe);
+                       work->gl_interpret_data = lqe;
+               }
+
+               list_add_tail(&work->gl_list, &gl_list);
+               work->gl_lock  = locks.q_locks[i - 1];
                work->gl_flags = 0;
                work->gl_desc  = desc;
 
+               locks.q_locks[i - 1] = NULL;
+               locks.q_cnt--;
        }
-       unlock_res(res);
 
-       if (cfs_list_empty(&gl_list)) {
+       qmt_free_lock_array(&locks);
+
+       if (list_empty(&gl_list)) {
                CDEBUG(D_QUOTA, "%s: nobody to notify\n", qmt->qmt_svname);
-               RETURN(0);
+               GOTO(out, rc = 0);
        }
 
        /* issue glimpse callbacks to all connected slaves */
        rc = ldlm_glimpse_locks(res, &gl_list);
 
-       cfs_list_for_each_safe(pos, tmp, &gl_list) {
+       list_for_each_safe(pos, tmp, &gl_list) {
                struct ldlm_glimpse_work *work;
 
-               work = cfs_list_entry(pos, struct ldlm_glimpse_work, gl_list);
+               work = list_entry(pos, struct ldlm_glimpse_work, gl_list);
 
-               cfs_list_del(&work->gl_list);
+               list_del(&work->gl_list);
                CERROR("%s: failed to notify %s of new quota settings\n",
                       qmt->qmt_svname,
                       obd_uuid2str(&work->gl_lock->l_export->exp_client_uuid));
                LDLM_LOCK_RELEASE(work->gl_lock);
                OBD_FREE_PTR(work);
        }
+out:
+       if (descs)
+               OBD_FREE(descs,
+                        sizeof(struct ldlm_gl_lquota_desc) * locks_count);
 
        RETURN(rc);
 }
@@ -462,24 +825,46 @@ void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
        struct qmt_thread_info  *qti = qmt_info(env);
        struct qmt_pool_info    *pool = lqe2qpi(lqe);
        struct ldlm_resource    *res = NULL;
-       int                      rc;
        ENTRY;
 
-       lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff,
-                           pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype);
+       lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype, lqe_qtype(lqe));
 
        /* send glimpse callback to notify slaves of new quota settings */
        qti->qti_gl_desc.lquota_desc.gl_id        = lqe->lqe_id;
        qti->qti_gl_desc.lquota_desc.gl_flags     = 0;
-       qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
-       qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
+       if (lqe->lqe_is_default) {
+               qti->qti_gl_desc.lquota_desc.gl_hardlimit = 0;
+               qti->qti_gl_desc.lquota_desc.gl_softlimit = 0;
+               qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
+                                                       LQUOTA_FLAG_DEFAULT);
+
+       } else if (lqe->lqe_is_deleted) {
+               qti->qti_gl_desc.lquota_desc.gl_hardlimit = 0;
+               qti->qti_gl_desc.lquota_desc.gl_softlimit = 0;
+               qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
+                                                       LQUOTA_FLAG_DELETED);
+       } else if (lqe->lqe_is_reset) {
+               qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
+               qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
+               qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
+                                                       LQUOTA_FLAG_RESET);
+       } else if (lqe->lqe_granted > lqe->lqe_hardlimit) {
+               qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
+               qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
+               qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
+                                                       LQUOTA_FLAG_REVOKE);
+       } else {
+               qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
+               qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
+               qti->qti_gl_desc.lquota_desc.gl_time = lqe->lqe_gracetime;
+       }
        qti->qti_gl_desc.lquota_desc.gl_ver       = ver;
 
        /* look up ldlm resource associated with global index */
        fid_build_reg_res_name(&qti->qti_fid, &qti->qti_resid);
-       res = ldlm_resource_get(pool->qpi_qmt->qmt_ns, NULL, &qti->qti_resid,
+       res = ldlm_resource_get(pool->qpi_qmt->qmt_ns, &qti->qti_resid,
                                LDLM_PLAIN, 0);
-       if (res == NULL) {
+       if (IS_ERR(res)) {
                /* this might happen if no slaves have enqueued global quota
                 * locks yet */
                LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource associated "
@@ -487,25 +872,45 @@ void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
                RETURN_EXIT;
        }
 
-       rc = qmt_glimpse_lock(env, pool->qpi_qmt, res, &qti->qti_gl_desc,
-                             NULL, NULL);
+       qmt_glimpse_lock(env, pool->qpi_qmt, res, &qti->qti_gl_desc,
+                        NULL, NULL);
        ldlm_resource_putref(res);
        EXIT;
 }
 
 /* Callback function used to select locks that should be glimpsed when
  * broadcasting the new qunit value */
-static int qmt_id_lock_cb(const struct lu_env *env, struct qmt_device *qmt,
-                         struct obd_uuid *uuid, union ldlm_gl_desc *desc,
-                         void *arg)
+static int qmt_id_lock_cb(struct ldlm_lock *lock, struct lquota_entry *lqe)
 {
-       struct obd_uuid *slv_uuid = arg;
+       struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid;
+       struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
+       int idx;
+       int stype = qmt_uuid2idx(uuid, &idx);
+
+       LASSERT(stype == QMT_STYPE_OST || stype == QMT_STYPE_MDT);
+
+       CDEBUG(D_QUOTA, "stype %d rtype %d idx %d uuid %s\n",
+              stype, lqe_rtype(lqe), idx, uuid->uuid);
+       /* Quota pools support only OSTs, despite MDTs also could be registered
+        * as LQUOTA_RES_DT devices(DOM). */
+       if (qmt_dom(lqe_rtype(lqe), stype))
+               return 1;
+
+       if (lgd) {
+               int lge_idx = qmt_map_lge_idx(lgd, idx);
+
+               CDEBUG(D_QUOTA,
+                      "tgt idx:%d lge_idx:%d edquot_nu:%d qunit_nu:%d\n",
+                      idx, lge_idx, lgd->lqeg_arr[lge_idx].lge_edquot_nu,
+                      lgd->lqeg_arr[lge_idx].lge_qunit_nu);
+               return lgd->lqeg_arr[lge_idx].lge_edquot_nu ||
+                      lgd->lqeg_arr[lge_idx].lge_qunit_nu;
+       }
 
-       if (slv_uuid != NULL && obd_uuid_equals(uuid, slv_uuid))
-               RETURN(0);
-       RETURN(+1);
+       return 0;
 }
 
+
 /*
  * Send glimpse request on per-ID lock to push new qunit value to slave.
  *
@@ -521,51 +926,70 @@ static void qmt_id_lock_glimpse(const struct lu_env *env,
        struct qmt_thread_info  *qti = qmt_info(env);
        struct qmt_pool_info    *pool = lqe2qpi(lqe);
        struct ldlm_resource    *res = NULL;
-       int                      rc;
        ENTRY;
 
        if (!lqe->lqe_enforced)
                RETURN_EXIT;
 
-       lquota_generate_fid(&qti->qti_fid, pool->qpi_key & 0x0000ffff,
-                           pool->qpi_key >> 16, lqe->lqe_site->lqs_qtype);
-       fid_build_quota_resid(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid);
-       res = ldlm_resource_get(qmt->qmt_ns, NULL, &qti->qti_resid, LDLM_PLAIN,
-                               0);
-       if (res == NULL) {
+       lquota_generate_fid(&qti->qti_fid, pool->qpi_rtype, lqe_qtype(lqe));
+       fid_build_quota_res_name(&qti->qti_fid, &lqe->lqe_id, &qti->qti_resid);
+       res = ldlm_resource_get(qmt->qmt_ns, &qti->qti_resid, LDLM_PLAIN, 0);
+       if (IS_ERR(res)) {
                /* this might legitimately happens if slaves haven't had the
                 * opportunity to enqueue quota lock yet. */
                LQUOTA_DEBUG(lqe, "failed to lookup ldlm resource for per-ID "
                             "lock "DFID, PFID(&qti->qti_fid));
+               lqe_write_lock(lqe);
+               if (lqe->lqe_revoke_time == 0 &&
+                   lqe->lqe_qunit == pool->qpi_least_qunit)
+                       lqe->lqe_revoke_time = ktime_get_seconds();
+               lqe_write_unlock(lqe);
                RETURN_EXIT;
        }
 
-       lqe_read_lock(lqe);
+       lqe_write_lock(lqe);
+       /*
+        * It is possible to add an lqe in a 2nd time while the same lqe
+        * from the 1st time is still sending glimpse
+        */
+       if (lqe->lqe_gl)
+               GOTO(out, 0);
        /* The purpose of glimpse callback on per-ID lock is twofold:
         * - notify slaves of new qunit value and hope they will release some
         *   spare quota space in return
         * - notify slaves that master ran out of quota space and there is no
         *   need to send acquire request any more until further notice */
 
-       /* fill glimpse descriptor with lqe settings */
-       if (lqe->lqe_edquot)
-               qti->qti_gl_desc.lquota_desc.gl_flags = LQUOTA_FL_EDQUOT;
-       else
-               qti->qti_gl_desc.lquota_desc.gl_flags = 0;
-       qti->qti_gl_desc.lquota_desc.gl_qunit = lqe->lqe_qunit;
-       lqe_read_unlock(lqe);
-
-       /* The rebalance thread is the only thread which can issue glimpses */
-       LASSERT(!lqe->lqe_gl);
+       /* TODO: it is not clear how to implement below case for all lqes
+        * from where slaves will be notified in qmt_glimpse_lock. Because
+        * here we have just global lqe with an array of OSTs that should
+        * be notified. Theoretically we can find all lqes that includes
+        * these OSTs, but it is not trivial. So I would propose to move
+        * this case to another place ... */
+       if (lqe->lqe_revoke_time == 0 &&
+           lqe->lqe_qunit == pool->qpi_least_qunit)
+               /* reset lqe_may_rel, it will be updated on glimpse callback
+                * replies if needed */
+               lqe->lqe_may_rel = 0;
+
        lqe->lqe_gl = true;
+       lqe_write_unlock(lqe);
 
        /* issue glimpse callback to slaves */
-       rc = qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
-                             uuid ? qmt_id_lock_cb : NULL, (void *)uuid);
-
+       if (lqe->lqe_glbl_data)
+               qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
+                                qmt_id_lock_cb, lqe);
+
+       lqe_write_lock(lqe);
+       if (lqe->lqe_revoke_time == 0 &&
+           lqe->lqe_qunit == pool->qpi_least_qunit) {
+               lqe->lqe_revoke_time = ktime_get_seconds();
+               qmt_adjust_edquot(lqe, ktime_get_real_seconds());
+       }
        LASSERT(lqe->lqe_gl);
        lqe->lqe_gl = false;
-
+out:
+       lqe_write_unlock(lqe);
        ldlm_resource_putref(res);
        EXIT;
 }
@@ -582,21 +1006,32 @@ void qmt_id_lock_notify(struct qmt_device *qmt, struct lquota_entry *lqe)
        bool    added = false;
        ENTRY;
 
+       LASSERT(lqe->lqe_is_global);
        lqe_getref(lqe);
-       cfs_spin_lock(&qmt->qmt_reba_lock);
-       if (!qmt->qmt_stopping && cfs_list_empty(&lqe->lqe_link)) {
-               cfs_list_add_tail(&lqe->lqe_link, &qmt->qmt_reba_list);
+       spin_lock(&qmt->qmt_reba_lock);
+       if (!qmt->qmt_stopping && list_empty(&lqe->lqe_link)) {
+               list_add_tail(&lqe->lqe_link, &qmt->qmt_reba_list);
                added = true;
+               if (qmt->qmt_reba_task)
+                       wake_up_process(qmt->qmt_reba_task);
        }
-       cfs_spin_unlock(&qmt->qmt_reba_lock);
+       spin_unlock(&qmt->qmt_reba_lock);
 
-       if (added)
-               cfs_waitq_signal(&qmt->qmt_reba_thread.t_ctl_waitq);
-       else
+       if (!added)
                lqe_putref(lqe);
        EXIT;
 }
 
+struct qmt_reba_args {
+       struct qmt_device       *qra_dev;
+       struct lu_env            qra_env;
+       struct completion       *qra_started;
+};
+
+#ifndef TASK_IDLE
+#define TASK_IDLE TASK_INTERRUPTIBLE
+#endif
+
 /*
  * The rebalance thread is in charge of sending glimpse callbacks on per-ID
  * quota locks owned by slaves in order to notify them of:
@@ -607,61 +1042,44 @@ void qmt_id_lock_notify(struct qmt_device *qmt, struct lquota_entry *lqe)
  *   try to acquire quota from the master since this latter has already
  *   distributed all the space.
  */
-static int qmt_reba_thread(void *arg)
+static int qmt_reba_thread(void *_args)
 {
-       struct qmt_device       *qmt = (struct qmt_device *)arg;
-       struct ptlrpc_thread    *thread = &qmt->qmt_reba_thread;
-       struct l_wait_info       lwi = { 0 };
-       struct lu_env           *env;
+       struct qmt_reba_args    *args = _args;
+       struct qmt_device       *qmt = args->qra_dev;
+       struct lu_env           *env = &args->qra_env;
        struct lquota_entry     *lqe, *tmp;
-       char                     pname[MTI_NAME_MAXLEN];
-       int                      rc;
        ENTRY;
 
-       OBD_ALLOC_PTR(env);
-       if (env == NULL)
-               RETURN(-ENOMEM);
-
-       rc = lu_env_init(env, LCT_MD_THREAD);
-       if (rc) {
-               CERROR("%s: failed to init env.", qmt->qmt_svname);
-               OBD_FREE_PTR(env);
-               RETURN(rc);
-       }
-
-       snprintf(pname, MTI_NAME_MAXLEN, "qmt_reba_%s", qmt->qmt_svname);
-       cfs_daemonize(pname);
-
-       thread_set_flags(thread, SVC_RUNNING);
-       cfs_waitq_signal(&thread->t_ctl_waitq);
-
-       while (1) {
-               l_wait_event(thread->t_ctl_waitq,
-                            !cfs_list_empty(&qmt->qmt_reba_list) ||
-                            !thread_is_running(thread), &lwi);
-
-               cfs_spin_lock(&qmt->qmt_reba_lock);
-               cfs_list_for_each_entry_safe(lqe, tmp, &qmt->qmt_reba_list,
-                                            lqe_link) {
-                       cfs_list_del_init(&lqe->lqe_link);
-                       cfs_spin_unlock(&qmt->qmt_reba_lock);
-
-                       if (thread_is_running(thread))
+       complete(args->qra_started);
+       while (({set_current_state(TASK_IDLE);
+                !kthread_should_stop(); })) {
+
+               spin_lock(&qmt->qmt_reba_lock);
+               list_for_each_entry_safe(lqe, tmp, &qmt->qmt_reba_list,
+                                        lqe_link) {
+                       __set_current_state(TASK_RUNNING);
+                       list_del_init(&lqe->lqe_link);
+                       spin_unlock(&qmt->qmt_reba_lock);
+
+                       /* lqe_ref == 1 means we hold the last ref,
+                        * so no need to send glimpse callbacks.
+                        */
+                       if (!kthread_should_stop() &&
+                           atomic_read(&lqe->lqe_ref) > 1)
                                qmt_id_lock_glimpse(env, qmt, lqe, NULL);
 
                        lqe_putref(lqe);
-                       cfs_spin_lock(&qmt->qmt_reba_lock);
+                       spin_lock(&qmt->qmt_reba_lock);
                }
-               cfs_spin_unlock(&qmt->qmt_reba_lock);
-
-               if (!thread_is_running(thread))
-                       break;
+               spin_unlock(&qmt->qmt_reba_lock);
+               schedule();
        }
+       __set_current_state(TASK_RUNNING);
+
+       lu_env_remove(env);
        lu_env_fini(env);
-       OBD_FREE_PTR(env);
-       thread_set_flags(thread, SVC_STOPPED);
-       cfs_waitq_signal(&thread->t_ctl_waitq);
-       RETURN(rc);
+       OBD_FREE_PTR(args);
+       RETURN(0);
 }
 
 /*
@@ -669,24 +1087,47 @@ static int qmt_reba_thread(void *arg)
  */
 int qmt_start_reba_thread(struct qmt_device *qmt)
 {
-       struct ptlrpc_thread    *thread = &qmt->qmt_reba_thread;
-       struct l_wait_info       lwi    = { 0 };
-       int                      rc;
+       struct task_struct *task;
+       struct qmt_reba_args *args;
+       DECLARE_COMPLETION_ONSTACK(started);
+       int rc;
        ENTRY;
 
-       rc = cfs_create_thread(qmt_reba_thread, (void *)qmt, 0);
-       if (rc < 0) {
-               CERROR("%s: failed to start rebalance thread (%d)\n",
-                      qmt->qmt_svname, rc);
-               thread_set_flags(thread, SVC_STOPPED);
-               RETURN(rc);
+       OBD_ALLOC_PTR(args);
+       if (args == NULL)
+               RETURN(-ENOMEM);
+       args->qra_dev = qmt;
+       args->qra_started = &started;
+
+       rc = lu_env_init(&args->qra_env, LCT_MD_THREAD);
+       if (rc) {
+               CERROR("%s: failed to init env.\n", qmt->qmt_svname);
+               GOTO(out_env, rc);
        }
 
-       l_wait_event(thread->t_ctl_waitq,
-                    thread_is_running(thread) || thread_is_stopped(thread),
-                    &lwi);
+       task = kthread_create(qmt_reba_thread, args,
+                             "qmt_reba_%s", qmt->qmt_svname);
+       if (IS_ERR(task)) {
+               CERROR("%s: failed to start rebalance thread (%ld)\n",
+                      qmt->qmt_svname, PTR_ERR(task));
+               GOTO(out_env_fini, rc = PTR_ERR(task));
+       }
+
+       rc = lu_env_add_task(&args->qra_env, task);
+       if (rc) {
+               kthread_stop(task);
+               GOTO(out_env_fini, rc);
+       }
+       qmt->qmt_reba_task = task;
+       wake_up_process(task);
+       wait_for_completion(&started);
 
        RETURN(0);
+out_env_fini:
+       lu_env_fini(&args->qra_env);
+out_env:
+       OBD_FREE_PTR(args);
+       RETURN(rc);
 }
 
 /*
@@ -694,16 +1135,15 @@ int qmt_start_reba_thread(struct qmt_device *qmt)
  */
 void qmt_stop_reba_thread(struct qmt_device *qmt)
 {
-       struct ptlrpc_thread *thread = &qmt->qmt_reba_thread;
+       struct task_struct *task;
 
-       if (!thread_is_stopped(thread)) {
-               struct l_wait_info lwi = { 0 };
+       spin_lock(&qmt->qmt_reba_lock);
+       task = qmt->qmt_reba_task;
+       qmt->qmt_reba_task = NULL;
+       spin_unlock(&qmt->qmt_reba_lock);
 
-               thread_set_flags(thread, SVC_STOPPING);
-               cfs_waitq_signal(&thread->t_ctl_waitq);
+       if (task)
+               kthread_stop(task);
 
-               l_wait_event(thread->t_ctl_waitq, thread_is_stopped(thread),
-                            &lwi);
-       }
-       LASSERT(cfs_list_empty(&qmt->qmt_reba_list));
+       LASSERT(list_empty(&qmt->qmt_reba_list));
 }