Whamcloud - gitweb
LU-12930 various: use schedule_timeout_*interruptible
[fs/lustre-release.git] / lustre / quota / qsd_handler.c
index e2fdd35..2119271 100644 (file)
@@ -21,7 +21,7 @@
  * GPL HEADER END
  */
 /*
- * Copyright (c) 2012, 2014, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
  * Use is subject to license terms.
  *
  * Author: Johann Lombardi <johann.lombardi@intel.com>
@@ -402,7 +402,7 @@ out:
        adjust = qsd_adjust_needed(lqe);
        if (reqbody && req_is_acq(reqbody->qb_flags) && ret != -EDQUOT) {
                lqe->lqe_acq_rc = ret;
-               lqe->lqe_acq_time = cfs_time_current_64();
+               lqe->lqe_acq_time = ktime_get_seconds();
        }
 out_noadjust:
        qsd_request_exit(lqe);
@@ -463,8 +463,7 @@ static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space)
         * sometimes due to the race reply of dqacq vs. id lock glimpse
         * (see LU-4505), so we revalidate it every 5 seconds. */
        } else if (lqe->lqe_edquot &&
-                  cfs_time_before_64(cfs_time_shift_64(-5),
-                                     lqe->lqe_edquot_time)) {
+                  (lqe->lqe_edquot_time > ktime_get_seconds() - 5)) {
                rc = -EDQUOT;
        }else {
                rc = -EAGAIN;
@@ -563,7 +562,7 @@ static int qsd_acquire_remote(const struct lu_env *env,
 
        /* check whether an acquire request completed recently */
        if (lqe->lqe_acq_rc != 0 &&
-           cfs_time_before_64(cfs_time_shift_64(-1), lqe->lqe_acq_time)) {
+           lqe->lqe_acq_time > ktime_get_seconds() - 1) {
                lqe_write_unlock(lqe);
                LQUOTA_DEBUG(lqe, "using cached return code %d", lqe->lqe_acq_rc);
                RETURN(lqe->lqe_acq_rc);
@@ -653,6 +652,11 @@ static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe,
                         * rc < 0, something bad happened */
                         break;
 
+               /* if we have gotten some quota and stil wait more quota,
+                * it's better to give QMT some time to reclaim from clients */
+               if (count > 0)
+                       schedule_timeout_interruptible(cfs_time_seconds(1));
+
                /* need to acquire more quota space from master */
                rc = qsd_acquire_remote(env, lqe);
        }
@@ -685,11 +689,10 @@ static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe,
  */
 static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
                         struct lquota_id_info *qid, long long space,
-                        int *flags)
+                        enum osd_quota_local_flags *local_flags)
 {
        struct lquota_entry *lqe;
-       struct l_wait_info lwi;
-       int qtype_flag = 0;
+       enum osd_quota_local_flags qtype_flag = 0;
        int rc, ret = -EINPROGRESS;
        ENTRY;
 
@@ -716,7 +719,7 @@ static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
                 * quota space. That said, we still want to perform space
                 * adjustments in qsd_op_end, so we return here, but with
                 * a reference on the lqe */
-               if (flags != NULL) {
+               if (local_flags != NULL) {
                        rc = qsd_refresh_usage(env, lqe);
                        GOTO(out_flags, rc);
                }
@@ -731,26 +734,28 @@ static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
 
        /* acquire quota space for the operation, cap overall wait time to
         * prevent a service thread from being stuck for too long */
-       lwi = LWI_TIMEOUT(cfs_time_seconds(qsd_wait_timeout(qqi->qqi_qsd)),
-                         NULL, NULL);
-       rc = l_wait_event(lqe->lqe_waiters, qsd_acquire(env, lqe, space, &ret),
-                         &lwi);
+       rc = wait_event_idle_timeout(
+               lqe->lqe_waiters, qsd_acquire(env, lqe, space, &ret),
+               cfs_time_seconds(qsd_wait_timeout(qqi->qqi_qsd)));
 
-       if (rc == 0 && ret == 0) {
+       if (rc > 0 && ret == 0) {
                qid->lqi_space += space;
+               rc = 0;
        } else {
-               if (rc == 0)
+               if (rc > 0)
                        rc = ret;
+               else if (rc == 0)
+                       rc = -ETIMEDOUT;
 
                LQUOTA_DEBUG(lqe, "acquire quota failed:%d", rc);
 
                lqe_write_lock(lqe);
                lqe->lqe_waiting_write -= space;
 
-               if (flags && lqe->lqe_pending_write != 0)
+               if (local_flags && lqe->lqe_pending_write != 0)
                        /* Inform OSD layer that there are pending writes.
                         * It might want to retry after a sync if appropriate */
-                        *flags |= QUOTA_FL_SYNC;
+                        *local_flags |= QUOTA_FL_SYNC;
                lqe_write_unlock(lqe);
 
                /* convert recoverable error into -EINPROGRESS, client will
@@ -769,26 +774,29 @@ static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
                }
        }
 
-       if (flags != NULL) {
+       if (local_flags != NULL) {
 out_flags:
                LASSERT(qid->lqi_is_blk);
                if (rc != 0) {
-                       *flags |= lquota_over_fl(qqi->qqi_qtype);
+                       *local_flags |= lquota_over_fl(qqi->qqi_qtype);
                } else {
                        __u64   usage;
 
                        lqe_read_lock(lqe);
-                       usage  = lqe->lqe_usage;
-                       usage += lqe->lqe_pending_write;
+                       usage = lqe->lqe_pending_write;
                        usage += lqe->lqe_waiting_write;
-                       usage += qqi->qqi_qsd->qsd_sync_threshold;
+                       if (lqe->lqe_qunit != 0 && (usage % lqe->lqe_qunit >
+                           qqi->qqi_qsd->qsd_sync_threshold))
+                               usage += qqi->qqi_qsd->qsd_sync_threshold;
+
+                       usage += lqe->lqe_usage;
 
                        qtype_flag = lquota_over_fl(qqi->qqi_qtype);
                        /* if we should notify client to start sync write */
                        if (usage >= lqe->lqe_granted - lqe->lqe_pending_rel)
-                               *flags |= qtype_flag;
+                               *local_flags |= qtype_flag;
                        else
-                               *flags &= ~qtype_flag;
+                               *local_flags &= ~qtype_flag;
                        lqe_read_unlock(lqe);
                }
        }
@@ -801,7 +809,7 @@ out_flags:
 static inline bool qid_equal(struct lquota_id_info *q1,
                             struct lquota_id_info *q2)
 {
-       if (q1->lqi_type != q2->lqi_type)
+       if (q1->lqi_is_blk != q2->lqi_is_blk || q1->lqi_type != q2->lqi_type)
                return false;
        return (q1->lqi_id.qid_uid == q2->lqi_id.qid_uid) ? true : false;
 }
@@ -826,7 +834,7 @@ static inline bool qid_equal(struct lquota_id_info *q1,
  */
 int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
                 struct lquota_trans *trans, struct lquota_id_info *qi,
-                int *flags)
+                enum osd_quota_local_flags *local_flags)
 {
        int     i, rc;
        bool    found = false;
@@ -865,8 +873,6 @@ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
        for (i = 0; i < trans->lqt_id_cnt; i++) {
                if (qid_equal(qi, &trans->lqt_ids[i])) {
                        found = true;
-                       /* make sure we are not mixing inodes & blocks */
-                       LASSERT(trans->lqt_ids[i].lqi_is_blk == qi->lqi_is_blk);
                        break;
                }
        }
@@ -887,7 +893,7 @@ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
 
        /* manage quota enforcement for this ID */
        rc = qsd_op_begin0(env, qsd->qsd_type_array[qi->lqi_type],
-                          &trans->lqt_ids[i], qi->lqi_space, flags);
+                          &trans->lqt_ids[i], qi->lqi_space, local_flags);
        RETURN(rc);
 }
 EXPORT_SYMBOL(qsd_op_begin);