Whamcloud - gitweb
LU-16605 lfs: Add -n option to fid2path
[fs/lustre-release.git] / lustre / quota / qmt_lock.c
index 5ee05c2..9935897 100644 (file)
@@ -89,7 +89,7 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
        case IT_QUOTA_DQACQ: {
                struct lquota_entry     *lqe;
                struct ldlm_lock        *lock;
-               int idx;
+               int idx, stype;
 
                if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] == 0)
                        /* acquire on global lock? something is wrong ... */
@@ -104,8 +104,8 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
                        GOTO(out, rc = -ENOLCK);
                LDLM_LOCK_PUT(lock);
 
-               rc = qmt_uuid2idx(uuid, &idx);
-               if (rc < 0)
+               stype = qmt_uuid2idx(uuid, &idx);
+               if (stype < 0)
                        GOTO(out, rc = -EINVAL);
 
                /* TODO: it seems we don't need to get lqe from
@@ -115,7 +115,7 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
                LASSERT(lqe != NULL);
                lqe_getref(lqe);
 
-               rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), rc,
+               rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), stype,
                                          lqe_qtype(lqe), &reqbody->qb_id,
                                          NULL, idx);
                if (rc) {
@@ -126,7 +126,8 @@ int qmt_intent_policy(const struct lu_env *env, struct lu_device *ld,
                /* acquire quota space */
                rc = qmt_dqacq0(env, qmt, uuid,
                                reqbody->qb_flags, reqbody->qb_count,
-                               reqbody->qb_usage, repbody);
+                               reqbody->qb_usage, repbody,
+                               qmt_dom(lqe_rtype(lqe), stype) ? -1 : idx);
                lqe_putref(lqe);
                qti_lqes_fini(env);
                if (rc)
@@ -275,7 +276,7 @@ static bool qmt_clear_lgeg_arr_nu(struct lquota_entry *lqe, int stype, int idx)
 
        /* There is no array to store lge for the case of DOM.
         * Ignore it until MDT pools will be ready. */
-       if (!(lqe_rtype(lqe) == LQUOTA_RES_DT && stype == QMT_STYPE_MDT)) {
+       if (!qmt_dom(lqe_rtype(lqe), stype)) {
                lqe->lqe_glbl_data->lqeg_arr[idx].lge_qunit_nu = 0;
                lqe->lqe_glbl_data->lqeg_arr[idx].lge_edquot_nu = 0;
 
@@ -331,7 +332,7 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
        struct ldlm_lock        *lock;
        struct obd_export       *exp;
        bool                     need_revoke;
-       int                      rc = 0, idx;
+       int                      rc = 0, idx, stype;
        ENTRY;
 
        LASSERT(res != NULL);
@@ -381,11 +382,11 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
                GOTO(out, rc = -EFAULT);
        }
 
-       rc = qmt_uuid2idx(&exp->exp_client_uuid, &idx);
-       if (rc < 0)
-               GOTO(out_exp, rc);
+       stype = qmt_uuid2idx(&exp->exp_client_uuid, &idx);
+       if (stype < 0)
+               GOTO(out_exp, rc = stype);
 
-       need_revoke = qmt_clear_lgeg_arr_nu(lqe, rc, idx);
+       need_revoke = qmt_clear_lgeg_arr_nu(lqe, stype, idx);
        if (lvb->lvb_id_rel == 0) {
                /* nothing to release */
                if (lvb->lvb_id_may_rel != 0)
@@ -396,15 +397,18 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
        if (!need_revoke && lvb->lvb_id_rel == 0)
                GOTO(out_exp, rc = 0);
 
-       rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), rc, lqe_qtype(lqe),
-                                 &lqe->lqe_id, NULL, idx);
+       rc = qmt_pool_lqes_lookup(env, qmt, lqe_rtype(lqe), stype,
+                                 lqe_qtype(lqe), &lqe->lqe_id, NULL, idx);
        if (rc)
                GOTO(out_exp, rc);
 
-       if (need_revoke && qmt_set_revoke(env, lqe, rc, idx) &&
-           lqe->lqe_glbl_data) {
-               qmt_seed_glbe_edquot(env, lqe->lqe_glbl_data);
-               qmt_id_lock_notify(qmt, lqe);
+       if (need_revoke && qmt_set_revoke(env, lqe, stype, idx)) {
+               mutex_lock(&lqe->lqe_glbl_data_lock);
+               if (lqe->lqe_glbl_data) {
+                       qmt_seed_glbe_edquot(env, lqe->lqe_glbl_data);
+                       qmt_id_lock_notify(qmt, lqe);
+               }
+               mutex_unlock(&lqe->lqe_glbl_data_lock);
        }
 
        if (lvb->lvb_id_rel) {
@@ -414,7 +418,8 @@ int qmt_lvbo_update(struct lu_device *ld, struct ldlm_resource *res,
                /* release quota space */
                rc = qmt_dqacq0(env, qmt, &exp->exp_client_uuid,
                                QUOTA_DQACQ_FL_REL, lvb->lvb_id_rel,
-                               0, &qti->qti_body);
+                               0, &qti->qti_body,
+                               qmt_dom(lqe_rtype(lqe), stype) ? -1 : idx);
                if (rc || qti->qti_body.qb_count != lvb->lvb_id_rel)
                        LQUOTA_ERROR(lqe,
                                     "failed to release quota space on glimpse %llu!=%llu : rc = %d\n",
@@ -518,12 +523,13 @@ int qmt_lvbo_free(struct lu_device *ld, struct ldlm_resource *res)
 
        if (res->lr_name.name[LUSTRE_RES_ID_QUOTA_SEQ_OFF] != 0) {
                struct lquota_entry *lqe = res->lr_lvb_data;
-               struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
+               struct lqe_glbl_data *lgd;
 
                mutex_lock(&lqe->lqe_glbl_data_lock);
+               lgd = lqe->lqe_glbl_data;
                lqe->lqe_glbl_data = NULL;
-               qmt_free_lqe_gd(lgd);
                mutex_unlock(&lqe->lqe_glbl_data_lock);
+               qmt_free_lqe_gd(lgd);
 
                /* release lqe reference */
                lqe_putref(lqe);
@@ -571,6 +577,7 @@ static int qmt_alloc_lock_array(struct ldlm_resource *res,
                                struct qmt_gl_lock_array *array,
                                qmt_glimpse_cb_t cb, void *arg)
 {
+       struct lquota_entry *lqe = arg;
        struct list_head *pos;
        unsigned long count = 0;
        int fail_cnt = 0;
@@ -578,6 +585,8 @@ static int qmt_alloc_lock_array(struct ldlm_resource *res,
 
        LASSERT(!array->q_max && !array->q_cnt && !array->q_locks);
 again:
+       if (cb)
+               mutex_lock(&lqe->lqe_glbl_data_lock);
        lock_res(res);
        /* scan list of granted locks */
        list_for_each(pos, &res->lr_granted) {
@@ -601,6 +610,8 @@ again:
                }
        }
        unlock_res(res);
+       if (cb)
+               mutex_unlock(&lqe->lqe_glbl_data_lock);
 
        if (count > array->q_max) {
                qmt_free_lock_array(array);
@@ -624,11 +635,10 @@ again:
        RETURN(0);
 }
 
-void qmt_setup_id_desc(struct ldlm_lock *lock, union ldlm_gl_desc *desc,
-                      struct lquota_entry *lqe)
+static void qmt_setup_id_desc(struct ldlm_lock *lock, union ldlm_gl_desc *desc,
+                             struct lquota_entry *lqe)
 {
        struct obd_uuid *uuid = &(lock)->l_export->exp_client_uuid;
-       struct lqe_glbl_data *lgd = lqe->lqe_glbl_data;
        int idx, stype;
        __u64 qunit;
        bool edquot;
@@ -637,12 +647,22 @@ void qmt_setup_id_desc(struct ldlm_lock *lock, union ldlm_gl_desc *desc,
        LASSERT(stype >= 0);
 
        /* DOM case - set global lqe settings */
-       if (lqe_rtype(lqe) == LQUOTA_RES_DT && stype == QMT_STYPE_MDT) {
+       if (qmt_dom(lqe_rtype(lqe), stype)) {
                edquot = lqe->lqe_edquot;
                qunit = lqe->lqe_qunit;
        } else {
-               edquot = lgd->lqeg_arr[idx].lge_edquot;
-               qunit = lgd->lqeg_arr[idx].lge_qunit;
+               struct lqe_glbl_data *lgd;
+
+               mutex_lock(&lqe->lqe_glbl_data_lock);
+               lgd = lqe->lqe_glbl_data;
+               if (lgd) {
+                       edquot = lgd->lqeg_arr[idx].lge_edquot;
+                       qunit = lgd->lqeg_arr[idx].lge_qunit;
+               } else {
+                       edquot = lqe->lqe_edquot;
+                       qunit = lqe->lqe_qunit;
+               }
+               mutex_unlock(&lqe->lqe_glbl_data_lock);
        }
 
        /* fill glimpse descriptor with lqe settings */
@@ -671,7 +691,6 @@ static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
                            qmt_glimpse_cb_t cb, struct lquota_entry *lqe)
 {
        union ldlm_gl_desc *descs = NULL;
-       struct lqe_glbl_data *gld;
        struct list_head *tmp, *pos;
        LIST_HEAD(gl_list);
        struct qmt_gl_lock_array locks;
@@ -679,7 +698,6 @@ static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
        int rc = 0;
        ENTRY;
 
-       gld = lqe ? lqe->lqe_glbl_data : NULL;
        memset(&locks, 0, sizeof(locks));
        rc = qmt_alloc_lock_array(res, &locks, cb, lqe);
        if (rc) {
@@ -696,7 +714,7 @@ static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
        locks_count = locks.q_cnt;
 
        /* Use one desc for all works, when called from qmt_glb_lock_notify */
-       if (gld && locks.q_cnt > 1) {
+       if (cb && locks.q_cnt > 1) {
                /* TODO: think about to store this preallocated descs
                 * in lqe_global in lqeg_arr as a part of lqe_glbl_entry.
                 * The benefit is that we don't need to allocate/free
@@ -723,7 +741,7 @@ static int qmt_glimpse_lock(const struct lu_env *env, struct qmt_device *qmt,
                        continue;
                }
 
-               if (gld) {
+               if (cb) {
                        if (descs)
                                desc = &descs[i - 1];
                        qmt_setup_id_desc(locks.q_locks[i - 1], desc, lqe);
@@ -801,6 +819,11 @@ void qmt_glb_lock_notify(const struct lu_env *env, struct lquota_entry *lqe,
                qti->qti_gl_desc.lquota_desc.gl_softlimit = 0;
                qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
                                                        LQUOTA_FLAG_DELETED);
+       } else if (lqe->lqe_is_reset) {
+               qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
+               qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
+               qti->qti_gl_desc.lquota_desc.gl_time = LQUOTA_GRACE_FLAG(0,
+                                                       LQUOTA_FLAG_RESET);
        } else {
                qti->qti_gl_desc.lquota_desc.gl_hardlimit = lqe->lqe_hardlimit;
                qti->qti_gl_desc.lquota_desc.gl_softlimit = lqe->lqe_softlimit;
@@ -839,11 +862,11 @@ static int qmt_id_lock_cb(struct ldlm_lock *lock, struct lquota_entry *lqe)
 
        /* Quota pools support only OSTs, despite MDTs also could be registered
         * as LQUOTA_RES_DT devices(DOM). */
-       if (lqe_rtype(lqe) == LQUOTA_RES_DT && stype == QMT_STYPE_MDT)
+       if (qmt_dom(lqe_rtype(lqe), stype))
                return 1;
        else
-               return lgd->lqeg_arr[idx].lge_edquot_nu ||
-                      lgd->lqeg_arr[idx].lge_qunit_nu;
+               return lgd ? lgd->lqeg_arr[idx].lge_edquot_nu ||
+                      lgd->lqeg_arr[idx].lge_qunit_nu : 0;
 }
 
 
@@ -884,6 +907,12 @@ static void qmt_id_lock_glimpse(const struct lu_env *env,
        }
 
        lqe_write_lock(lqe);
+       /*
+        * It is possible to add an lqe in a 2nd time while the same lqe
+        * from the 1st time is still sending glimpse
+        */
+       if (lqe->lqe_gl)
+               GOTO(out, 0);
        /* The purpose of glimpse callback on per-ID lock is twofold:
         * - notify slaves of new qunit value and hope they will release some
         *   spare quota space in return
@@ -902,14 +931,13 @@ static void qmt_id_lock_glimpse(const struct lu_env *env,
                 * replies if needed */
                lqe->lqe_may_rel = 0;
 
-       /* The rebalance thread is the only thread which can issue glimpses */
-       LASSERT(!lqe->lqe_gl);
        lqe->lqe_gl = true;
        lqe_write_unlock(lqe);
 
        /* issue glimpse callback to slaves */
-       qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
-                        qmt_id_lock_cb, lqe);
+       if (lqe->lqe_glbl_data)
+               qmt_glimpse_lock(env, qmt, res, &qti->qti_gl_desc,
+                                qmt_id_lock_cb, lqe);
 
        lqe_write_lock(lqe);
        if (lqe->lqe_revoke_time == 0 &&
@@ -919,8 +947,8 @@ static void qmt_id_lock_glimpse(const struct lu_env *env,
        }
        LASSERT(lqe->lqe_gl);
        lqe->lqe_gl = false;
+out:
        lqe_write_unlock(lqe);
-
        ldlm_resource_putref(res);
        EXIT;
 }
@@ -992,7 +1020,11 @@ static int qmt_reba_thread(void *_args)
                        list_del_init(&lqe->lqe_link);
                        spin_unlock(&qmt->qmt_reba_lock);
 
-                       if (!kthread_should_stop())
+                       /* lqe_ref == 1 means we hold the last ref,
+                        * so no need to send glimpse callbacks.
+                        */
+                       if (!kthread_should_stop() &&
+                           atomic_read(&lqe->lqe_ref) > 1)
                                qmt_id_lock_glimpse(env, qmt, lqe, NULL);
 
                        lqe_putref(lqe);