Whamcloud - gitweb
LU-11425 quota: support quota for DoM
[fs/lustre-release.git] / lustre / quota / qsd_handler.c
index 6bef54e..eb79812 100644 (file)
@@ -21,7 +21,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright (c) 2012, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
  * Use is subject to license terms.
  *
  * Author: Johann Lombardi <johann.lombardi@intel.com>
@@ -45,7 +45,7 @@ static inline int qsd_request_enter(struct lquota_entry *lqe)
        }
 
        if (lqe->lqe_pending_rel != 0) {
-               LQUOTA_ERROR(lqe, "no request in flight with pending_rel="LPU64,
+               LQUOTA_ERROR(lqe, "no request in flight with pending_rel=%llu",
                             lqe->lqe_pending_rel);
                LBUG();
        }
@@ -65,7 +65,7 @@ static inline void qsd_request_exit(struct lquota_entry *lqe)
        }
        lqe->lqe_pending_req--;
        lqe->lqe_pending_rel = 0;
-       cfs_waitq_broadcast(&lqe->lqe_waiters);
+       wake_up_all(&lqe->lqe_waiters);
 }
 
 /**
@@ -350,12 +350,12 @@ static void qsd_req_completion(const struct lu_env *env,
         * the DQACQ since the limit for this ID has been removed, so we
         * should not update quota entry & slave index copy neither. */
        if (repbody != NULL && repbody->qb_count != 0) {
-               LQUOTA_DEBUG(lqe, "DQACQ qb_count:"LPU64, repbody->qb_count);
+               LQUOTA_DEBUG(lqe, "DQACQ qb_count:%llu", repbody->qb_count);
 
                if (req_is_rel(reqbody->qb_flags)) {
                        if (lqe->lqe_granted < repbody->qb_count) {
                                LQUOTA_ERROR(lqe, "can't release more space "
-                                            "than owned "LPU64"<"LPU64,
+                                            "than owned %llu<%llu",
                                             lqe->lqe_granted,
                                             repbody->qb_count);
                                lqe->lqe_granted = 0;
@@ -385,13 +385,10 @@ static void qsd_req_completion(const struct lu_env *env,
        }
 
        /* extract information from lvb */
-       if (ret == 0 && lvb != 0) {
+       if (ret == 0 && lvb != NULL) {
                if (lvb->lvb_id_qunit != 0)
                        qsd_set_qunit(lqe, lvb->lvb_id_qunit);
-               if (lvb->lvb_flags & LQUOTA_FL_EDQUOT)
-                       lqe->lqe_edquot = true;
-               else
-                       lqe->lqe_edquot = false;
+               qsd_set_edquot(lqe, !!(lvb->lvb_flags & LQUOTA_FL_EDQUOT));
        } else if (repbody != NULL && repbody->qb_qunit != 0) {
                qsd_set_qunit(lqe, repbody->qb_qunit);
        }
@@ -405,7 +402,7 @@ out:
        adjust = qsd_adjust_needed(lqe);
        if (reqbody && req_is_acq(reqbody->qb_flags) && ret != -EDQUOT) {
                lqe->lqe_acq_rc = ret;
-               lqe->lqe_acq_time = cfs_time_current_64();
+               lqe->lqe_acq_time = ktime_get_seconds();
        }
 out_noadjust:
        qsd_request_exit(lqe);
@@ -461,9 +458,14 @@ static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space)
                lqe->lqe_pending_write += space;
                lqe->lqe_waiting_write -= space;
                rc = 0;
-       } else if (lqe->lqe_edquot) {
+       /* lqe_edquot flag is used to avoid flooding dqacq requests when
+        * the user is over quota, however, the lqe_edquot could be stale
+        * sometimes due to the race reply of dqacq vs. id lock glimpse
+        * (see LU-4505), so we revalidate it every 5 seconds. */
+       } else if (lqe->lqe_edquot &&
+                  (lqe->lqe_edquot_time > ktime_get_seconds() - 5)) {
                rc = -EDQUOT;
-       } else {
+       }else {
                rc = -EAGAIN;
        }
        lqe_write_unlock(lqe);
@@ -560,7 +562,7 @@ static int qsd_acquire_remote(const struct lu_env *env,
 
        /* check whether an acquire request completed recently */
        if (lqe->lqe_acq_rc != 0 &&
-           cfs_time_before_64(cfs_time_shift_64(-1), lqe->lqe_acq_time)) {
+           lqe->lqe_acq_time > ktime_get_seconds() - 1) {
                lqe_write_unlock(lqe);
                LQUOTA_DEBUG(lqe, "using cached return code %d", lqe->lqe_acq_rc);
                RETURN(lqe->lqe_acq_rc);
@@ -631,7 +633,7 @@ static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe,
        ENTRY;
 
        for (count = 0; rc == 0; count++) {
-               LQUOTA_DEBUG(lqe, "acquiring:"LPD64 " count=%d", space, count);
+               LQUOTA_DEBUG(lqe, "acquiring:%lld count=%d", space, count);
 
                if (lqe2qqi(lqe)->qqi_qsd->qsd_stopping) {
                        rc = -EINPROGRESS;
@@ -650,6 +652,13 @@ static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe,
                         * rc < 0, something bad happened */
                         break;
 
+               /* if we have gotten some quota and stil wait more quota,
+                * it's better to give QMT some time to reclaim from clients */
+               if (count > 0) {
+                       set_current_state(TASK_INTERRUPTIBLE);
+                       schedule_timeout(cfs_time_seconds(1));
+               }
+
                /* need to acquire more quota space from master */
                rc = qsd_acquire_remote(env, lqe);
        }
@@ -684,9 +693,10 @@ static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
                         struct lquota_id_info *qid, long long space,
                         int *flags)
 {
-       struct lquota_entry     *lqe;
-       int                      rc, ret = -EINPROGRESS;
-       struct l_wait_info       lwi;
+       struct lquota_entry *lqe;
+       struct l_wait_info lwi;
+       int qtype_flag = 0;
+       int rc, ret = -EINPROGRESS;
        ENTRY;
 
        if (qid->lqi_qentry != NULL) {
@@ -719,7 +729,7 @@ static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
                RETURN(0);
        }
 
-       LQUOTA_DEBUG(lqe, "op_begin space:"LPD64, space);
+       LQUOTA_DEBUG(lqe, "op_begin space:%lld", space);
 
        lqe_write_lock(lqe);
        lqe->lqe_waiting_write += space;
@@ -769,21 +779,25 @@ static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
 out_flags:
                LASSERT(qid->lqi_is_blk);
                if (rc != 0) {
-                       *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype);
+                       *flags |= lquota_over_fl(qqi->qqi_qtype);
                } else {
                        __u64   usage;
 
                        lqe_read_lock(lqe);
-                       usage  = lqe->lqe_usage;
-                       usage += lqe->lqe_pending_write;
+                       usage = lqe->lqe_pending_write;
                        usage += lqe->lqe_waiting_write;
-                       usage += qqi->qqi_qsd->qsd_sync_threshold;
+                       if (lqe->lqe_qunit != 0 && (usage % lqe->lqe_qunit >
+                           qqi->qqi_qsd->qsd_sync_threshold))
+                               usage += qqi->qqi_qsd->qsd_sync_threshold;
+
+                       usage += lqe->lqe_usage;
 
+                       qtype_flag = lquota_over_fl(qqi->qqi_qtype);
                        /* if we should notify client to start sync write */
                        if (usage >= lqe->lqe_granted - lqe->lqe_pending_rel)
-                               *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype);
+                               *flags |= qtype_flag;
                        else
-                               *flags &= ~LQUOTA_OVER_FL(qqi->qqi_qtype);
+                               *flags &= ~qtype_flag;
                        lqe_read_unlock(lqe);
                }
        }
@@ -796,7 +810,7 @@ out_flags:
 static inline bool qid_equal(struct lquota_id_info *q1,
                             struct lquota_id_info *q2)
 {
-       if (q1->lqi_type != q2->lqi_type)
+       if (q1->lqi_is_blk != q2->lqi_is_blk || q1->lqi_type != q2->lqi_type)
                return false;
        return (q1->lqi_id.qid_uid == q2->lqi_id.qid_uid) ? true : false;
 }
@@ -830,6 +844,9 @@ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
        if (unlikely(qsd == NULL))
                RETURN(0);
 
+       if (qsd->qsd_dev->dd_rdonly)
+               RETURN(0);
+
        /* We don't enforce quota until the qsd_instance is started */
        read_lock(&qsd->qsd_lock);
        if (!qsd->qsd_started) {
@@ -848,17 +865,15 @@ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
         * or - the user/group is root
         * or - quota accounting isn't enabled */
        if (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0 ||
-           qsd->qsd_acct_failed)
+           (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed)
                RETURN(0);
 
-       LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d",
+       LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d\n",
                 trans->lqt_id_cnt);
        /* check whether we already allocated a slot for this id */
        for (i = 0; i < trans->lqt_id_cnt; i++) {
                if (qid_equal(qi, &trans->lqt_ids[i])) {
                        found = true;
-                       /* make sure we are not mixing inodes & blocks */
-                       LASSERT(trans->lqt_ids[i].lqi_is_blk == qi->lqi_is_blk);
                        break;
                }
        }
@@ -919,6 +934,9 @@ int qsd_adjust(const struct lu_env *env, struct lquota_entry *lqe)
        qqi = lqe2qqi(lqe);
        qsd = qqi->qqi_qsd;
 
+       if (qsd->qsd_dev->dd_rdonly)
+               RETURN(0);
+
        lqe_write_lock(lqe);
 
        /* fill qb_count & qb_flags */
@@ -1072,6 +1090,9 @@ void qsd_op_end(const struct lu_env *env, struct qsd_instance *qsd,
        if (unlikely(qsd == NULL))
                RETURN_EXIT;
 
+       if (qsd->qsd_dev->dd_rdonly)
+               RETURN_EXIT;
+
        /* We don't enforce quota until the qsd_instance is started */
        read_lock(&qsd->qsd_lock);
        if (!qsd->qsd_started) {
@@ -1148,7 +1169,7 @@ void qsd_op_adjust(const struct lu_env *env, struct qsd_instance *qsd,
 
        lqe = lqe_locate(env, qqi->qqi_site, qid);
        if (IS_ERR(lqe)) {
-               CERROR("%s: fail to locate lqe for id:"LPU64", type:%d\n",
+               CERROR("%s: fail to locate lqe for id:%llu, type:%d\n",
                       qsd->qsd_svname, qid->qid_uid, qtype);
                RETURN_EXIT;
        }