Whamcloud - gitweb
LU-12702 quota: wait pending write before acquiring remotely
[fs/lustre-release.git] / lustre / quota / qsd_handler.c
index 01b3e54..ae770f5 100644 (file)
  * GPL HEADER END
  */
 /*
- * Copyright (c) 2012 Intel, Inc.
+ * Copyright (c) 2012, 2017, Intel Corporation.
  * Use is subject to license terms.
  *
  * Author: Johann Lombardi <johann.lombardi@intel.com>
  * Author: Niu    Yawei    <yawei.niu@intel.com>
  */
 
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
-
 #define DEBUG_SUBSYSTEM S_LQUOTA
 
 #include "qsd_internal.h"
@@ -49,7 +45,7 @@ static inline int qsd_request_enter(struct lquota_entry *lqe)
        }
 
        if (lqe->lqe_pending_rel != 0) {
-               LQUOTA_ERROR(lqe, "no request in flight with pending_rel="LPU64,
+               LQUOTA_ERROR(lqe, "no request in flight with pending_rel=%llu",
                             lqe->lqe_pending_rel);
                LBUG();
        }
@@ -69,7 +65,7 @@ static inline void qsd_request_exit(struct lquota_entry *lqe)
        }
        lqe->lqe_pending_req--;
        lqe->lqe_pending_rel = 0;
-       cfs_waitq_broadcast(&lqe->lqe_waiters);
+       wake_up_all(&lqe->lqe_waiters);
 }
 
 /**
@@ -94,10 +90,10 @@ static int qsd_ready(struct lquota_entry *lqe, struct lustre_handle *lockh)
        struct ldlm_lock        *lock;
        ENTRY;
 
-       cfs_read_lock(&qsd->qsd_lock);
+       read_lock(&qsd->qsd_lock);
        /* is the qsd about to shut down? */
        if (qsd->qsd_stopping) {
-               cfs_read_unlock(&qsd->qsd_lock);
+               read_unlock(&qsd->qsd_lock);
                LQUOTA_DEBUG(lqe, "dropping quota req since qsd is stopping");
                /* Target is about to shut down, client will retry */
                RETURN(-EINPROGRESS);
@@ -107,7 +103,7 @@ static int qsd_ready(struct lquota_entry *lqe, struct lustre_handle *lockh)
        if (qsd->qsd_exp_valid)
                imp = class_exp2cliimp(qsd->qsd_exp);
        if (imp == NULL || imp->imp_invalid) {
-               cfs_read_unlock(&qsd->qsd_lock);
+               read_unlock(&qsd->qsd_lock);
                LQUOTA_DEBUG(lqe, "connection to master not ready");
                RETURN(-ENOTCONN);
        }
@@ -120,7 +116,7 @@ static int qsd_ready(struct lquota_entry *lqe, struct lustre_handle *lockh)
         * If the previous reintegration failed for some reason, we'll
         * re-trigger it here as well. */
        if (!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) {
-               cfs_read_unlock(&qsd->qsd_lock);
+               read_unlock(&qsd->qsd_lock);
                LQUOTA_DEBUG(lqe, "not up-to-date, dropping request and "
                             "kicking off reintegration");
                qsd_start_reint_thread(qqi);
@@ -130,7 +126,7 @@ static int qsd_ready(struct lquota_entry *lqe, struct lustre_handle *lockh)
        /* Fill the remote global lock handle, master will check this handle
         * to see if the slave is sending request with stale lock */
        lustre_handle_copy(lockh, &qqi->qqi_lockh);
-       cfs_read_unlock(&qsd->qsd_lock);
+       read_unlock(&qsd->qsd_lock);
 
        if (!lustre_handle_is_used(lockh))
                RETURN(-ENOLCK);
@@ -308,7 +304,7 @@ static void qsd_req_completion(const struct lu_env *env,
                               struct quota_body *reqbody,
                               struct quota_body *repbody,
                               struct lustre_handle *lockh,
-                              union ldlm_wire_lvb *lvb,
+                              struct lquota_lvb *lvb,
                               void *arg, int ret)
 {
        struct lquota_entry     *lqe = (struct lquota_entry *)arg;
@@ -354,12 +350,12 @@ static void qsd_req_completion(const struct lu_env *env,
         * the DQACQ since the limit for this ID has been removed, so we
         * should not update quota entry & slave index copy neither. */
        if (repbody != NULL && repbody->qb_count != 0) {
-               LQUOTA_DEBUG(lqe, "DQACQ qb_count:"LPU64, repbody->qb_count);
+               LQUOTA_DEBUG(lqe, "DQACQ qb_count:%llu", repbody->qb_count);
 
                if (req_is_rel(reqbody->qb_flags)) {
                        if (lqe->lqe_granted < repbody->qb_count) {
                                LQUOTA_ERROR(lqe, "can't release more space "
-                                            "than owned "LPU64"<"LPU64,
+                                            "than owned %llu<%llu",
                                             lqe->lqe_granted,
                                             repbody->qb_count);
                                lqe->lqe_granted = 0;
@@ -389,13 +385,10 @@ static void qsd_req_completion(const struct lu_env *env,
        }
 
        /* extract information from lvb */
-       if (ret == 0 && lvb != 0) {
-               if (lvb->l_lquota.lvb_id_qunit != 0)
-                       qsd_set_qunit(lqe, lvb->l_lquota.lvb_id_qunit);
-               if (lvb->l_lquota.lvb_flags & LQUOTA_FL_EDQUOT)
-                       lqe->lqe_edquot = true;
-               else
-                       lqe->lqe_edquot = false;
+       if (ret == 0 && lvb != NULL) {
+               if (lvb->lvb_id_qunit != 0)
+                       qsd_set_qunit(lqe, lvb->lvb_id_qunit);
+               qsd_set_edquot(lqe, !!(lvb->lvb_flags & LQUOTA_FL_EDQUOT));
        } else if (repbody != NULL && repbody->qb_qunit != 0) {
                qsd_set_qunit(lqe, repbody->qb_qunit);
        }
@@ -409,7 +402,7 @@ out:
        adjust = qsd_adjust_needed(lqe);
        if (reqbody && req_is_acq(reqbody->qb_flags) && ret != -EDQUOT) {
                lqe->lqe_acq_rc = ret;
-               lqe->lqe_acq_time = cfs_time_current_64();
+               lqe->lqe_acq_time = ktime_get_seconds();
        }
 out_noadjust:
        qsd_request_exit(lqe);
@@ -465,9 +458,14 @@ static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space)
                lqe->lqe_pending_write += space;
                lqe->lqe_waiting_write -= space;
                rc = 0;
-       } else if (lqe->lqe_edquot) {
+       /* lqe_edquot flag is used to avoid flooding dqacq requests when
+        * the user is over quota, however, the lqe_edquot could be stale
+        * sometimes due to the race reply of dqacq vs. id lock glimpse
+        * (see LU-4505), so we revalidate it every 5 seconds. */
+       } else if (lqe->lqe_edquot &&
+                  (lqe->lqe_edquot_time > ktime_get_seconds() - 5)) {
                rc = -EDQUOT;
-       } else {
+       }else {
                rc = -EAGAIN;
        }
        lqe_write_unlock(lqe);
@@ -564,7 +562,7 @@ static int qsd_acquire_remote(const struct lu_env *env,
 
        /* check whether an acquire request completed recently */
        if (lqe->lqe_acq_rc != 0 &&
-           cfs_time_before_64(cfs_time_shift_64(-1), lqe->lqe_acq_time)) {
+           lqe->lqe_acq_time > ktime_get_seconds() - 1) {
                lqe_write_unlock(lqe);
                LQUOTA_DEBUG(lqe, "using cached return code %d", lqe->lqe_acq_rc);
                RETURN(lqe->lqe_acq_rc);
@@ -590,7 +588,7 @@ static int qsd_acquire_remote(const struct lu_env *env,
        /* check whether we already own a valid lock for this ID */
        rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh);
        if (rc) {
-               union ldlm_wire_lvb *lvb;
+               struct lquota_lvb *lvb;
 
                OBD_ALLOC_PTR(lvb);
                if (lvb == NULL) {
@@ -625,18 +623,22 @@ static int qsd_acquire_remote(const struct lu_env *env,
  * \param space - is the amount of quota required for the operation
  * \param ret   - is the return code (-EDQUOT, -EINPROGRESS, ...)
  *
- * \retval true  - exit from l_wait_event and real return value in \a ret
+ * \retval true  - stop waiting in wait_event_idle_timeout,
+ *                 and real return value in \a ret
  * \retval false - continue waiting
  */
 static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe,
                        long long space, int *ret)
 {
        int rc = 0, count;
+       int wait_pending = 0;
+       struct qsd_qtype_info *qqi = lqe2qqi(lqe);
+
        ENTRY;
 
        for (count = 0; rc == 0; count++) {
-               LQUOTA_DEBUG(lqe, "acquiring:"LPD64 " count=%d", space, count);
-
+               LQUOTA_DEBUG(lqe, "acquiring:%lld count=%d", space, count);
+again:
                if (lqe2qqi(lqe)->qqi_qsd->qsd_stopping) {
                        rc = -EINPROGRESS;
                        break;
@@ -653,6 +655,21 @@ static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe,
                        /* rc == 0, Wouhou! enough local quota space
                         * rc < 0, something bad happened */
                         break;
+               /*
+                * There might be a window that commit transaction
+                * have updated usage but pending write doesn't change
+                * wait for it before acquiring remotely.
+                */
+               if (lqe->lqe_pending_write >= space && !wait_pending) {
+                       wait_pending = 1;
+                       dt_wait_quota_pending(qqi->qqi_qsd->qsd_dev);
+                       goto again;
+               }
+
+               /* if we have gotten some quota and stil wait more quota,
+                * it's better to give QMT some time to reclaim from clients */
+               if (count > 0)
+                       schedule_timeout_interruptible(cfs_time_seconds(1));
 
                /* need to acquire more quota space from master */
                rc = qsd_acquire_remote(env, lqe);
@@ -662,7 +679,7 @@ static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe,
                /* already a request in flight, continue waiting */
                RETURN(false);
        *ret = rc;
-       RETURN(true); /* exit from l_wait_event */
+       RETURN(true);
 }
 
 /**
@@ -686,11 +703,11 @@ static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe,
  */
 static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
                         struct lquota_id_info *qid, long long space,
-                        int *flags)
+                        enum osd_quota_local_flags *local_flags)
 {
-       struct lquota_entry     *lqe;
-       int                      rc, ret = -EINPROGRESS;
-       struct l_wait_info       lwi;
+       struct lquota_entry *lqe;
+       enum osd_quota_local_flags qtype_flag = 0;
+       int rc, ret = -EINPROGRESS;
        ENTRY;
 
        if (qid->lqi_qentry != NULL) {
@@ -716,14 +733,14 @@ static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
                 * quota space. That said, we still want to perform space
                 * adjustments in qsd_op_end, so we return here, but with
                 * a reference on the lqe */
-               if (flags != NULL) {
+               if (local_flags != NULL) {
                        rc = qsd_refresh_usage(env, lqe);
                        GOTO(out_flags, rc);
                }
                RETURN(0);
        }
 
-       LQUOTA_DEBUG(lqe, "op_begin space:"LPD64, space);
+       LQUOTA_DEBUG(lqe, "op_begin space:%lld", space);
 
        lqe_write_lock(lqe);
        lqe->lqe_waiting_write += space;
@@ -731,26 +748,28 @@ static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
 
        /* acquire quota space for the operation, cap overall wait time to
         * prevent a service thread from being stuck for too long */
-       lwi = LWI_TIMEOUT(cfs_time_seconds(qsd_wait_timeout(qqi->qqi_qsd)),
-                         NULL, NULL);
-       rc = l_wait_event(lqe->lqe_waiters, qsd_acquire(env, lqe, space, &ret),
-                         &lwi);
+       rc = wait_event_idle_timeout(
+               lqe->lqe_waiters, qsd_acquire(env, lqe, space, &ret),
+               cfs_time_seconds(qsd_wait_timeout(qqi->qqi_qsd)));
 
-       if (rc == 0 && ret == 0) {
+       if (rc > 0 && ret == 0) {
                qid->lqi_space += space;
+               rc = 0;
        } else {
-               if (rc == 0)
+               if (rc > 0)
                        rc = ret;
+               else if (rc == 0)
+                       rc = -ETIMEDOUT;
 
                LQUOTA_DEBUG(lqe, "acquire quota failed:%d", rc);
 
                lqe_write_lock(lqe);
                lqe->lqe_waiting_write -= space;
 
-               if (flags && lqe->lqe_pending_write != 0)
+               if (local_flags && lqe->lqe_pending_write != 0)
                        /* Inform OSD layer that there are pending writes.
                         * It might want to retry after a sync if appropriate */
-                        *flags |= QUOTA_FL_SYNC;
+                        *local_flags |= QUOTA_FL_SYNC;
                lqe_write_unlock(lqe);
 
                /* convert recoverable error into -EINPROGRESS, client will
@@ -769,25 +788,32 @@ static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
                }
        }
 
-       if (flags != NULL) {
+       if (local_flags != NULL) {
 out_flags:
                LASSERT(qid->lqi_is_blk);
                if (rc != 0) {
-                       *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype);
+                       *local_flags |= lquota_over_fl(qqi->qqi_qtype);
                } else {
                        __u64   usage;
 
                        lqe_read_lock(lqe);
-                       usage  = lqe->lqe_usage;
-                       usage += lqe->lqe_pending_write;
+                       usage = lqe->lqe_pending_write;
                        usage += lqe->lqe_waiting_write;
-                       usage += qqi->qqi_qsd->qsd_sync_threshold;
+                       /* There is a chance to successfully grant more quota
+                        * but get edquot flag through glimpse. */
+                       if (lqe->lqe_edquot || (lqe->lqe_qunit != 0 &&
+                          (usage % lqe->lqe_qunit >
+                           qqi->qqi_qsd->qsd_sync_threshold)))
+                               usage += qqi->qqi_qsd->qsd_sync_threshold;
+
+                       usage += lqe->lqe_usage;
 
+                       qtype_flag = lquota_over_fl(qqi->qqi_qtype);
                        /* if we should notify client to start sync write */
                        if (usage >= lqe->lqe_granted - lqe->lqe_pending_rel)
-                               *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype);
+                               *local_flags |= qtype_flag;
                        else
-                               *flags &= ~LQUOTA_OVER_FL(qqi->qqi_qtype);
+                               *local_flags &= ~qtype_flag;
                        lqe_read_unlock(lqe);
                }
        }
@@ -800,7 +826,7 @@ out_flags:
 static inline bool qid_equal(struct lquota_id_info *q1,
                             struct lquota_id_info *q2)
 {
-       if (q1->lqi_type != q2->lqi_type)
+       if (q1->lqi_is_blk != q2->lqi_is_blk || q1->lqi_type != q2->lqi_type)
                return false;
        return (q1->lqi_id.qid_uid == q2->lqi_id.qid_uid) ? true : false;
 }
@@ -825,7 +851,7 @@ static inline bool qid_equal(struct lquota_id_info *q1,
  */
 int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
                 struct lquota_trans *trans, struct lquota_id_info *qi,
-                int *flags)
+                enum osd_quota_local_flags *local_flags)
 {
        int     i, rc;
        bool    found = false;
@@ -834,13 +860,16 @@ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
        if (unlikely(qsd == NULL))
                RETURN(0);
 
+       if (qsd->qsd_dev->dd_rdonly)
+               RETURN(0);
+
        /* We don't enforce quota until the qsd_instance is started */
-       cfs_read_lock(&qsd->qsd_lock);
+       read_lock(&qsd->qsd_lock);
        if (!qsd->qsd_started) {
-               cfs_read_unlock(&qsd->qsd_lock);
+               read_unlock(&qsd->qsd_lock);
                RETURN(0);
        }
-       cfs_read_unlock(&qsd->qsd_lock);
+       read_unlock(&qsd->qsd_lock);
 
        /* ignore block quota on MDTs, ignore inode quota on OSTs */
        if ((!qsd->qsd_is_md && !qi->lqi_is_blk) ||
@@ -849,18 +878,18 @@ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
 
        /* ignore quota enforcement request when:
         *    - quota isn't enforced for this quota type
-        * or - the user/group is root */
-       if (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0)
+        * or - the user/group is root
+        * or - quota accounting isn't enabled */
+       if (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0 ||
+           (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed)
                RETURN(0);
 
-       LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d",
+       LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d\n",
                 trans->lqt_id_cnt);
        /* check whether we already allocated a slot for this id */
        for (i = 0; i < trans->lqt_id_cnt; i++) {
                if (qid_equal(qi, &trans->lqt_ids[i])) {
                        found = true;
-                       /* make sure we are not mixing inodes & blocks */
-                       LASSERT(trans->lqt_ids[i].lqi_is_blk == qi->lqi_is_blk);
                        break;
                }
        }
@@ -881,7 +910,7 @@ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
 
        /* manage quota enforcement for this ID */
        rc = qsd_op_begin0(env, qsd->qsd_type_array[qi->lqi_type],
-                          &trans->lqt_ids[i], qi->lqi_space, flags);
+                          &trans->lqt_ids[i], qi->lqi_space, local_flags);
        RETURN(rc);
 }
 EXPORT_SYMBOL(qsd_op_begin);
@@ -921,6 +950,9 @@ int qsd_adjust(const struct lu_env *env, struct lquota_entry *lqe)
        qqi = lqe2qqi(lqe);
        qsd = qqi->qqi_qsd;
 
+       if (qsd->qsd_dev->dd_rdonly)
+               RETURN(0);
+
        lqe_write_lock(lqe);
 
        /* fill qb_count & qb_flags */
@@ -982,7 +1014,7 @@ int qsd_adjust(const struct lu_env *env, struct lquota_entry *lqe)
                                    qsd_req_completion, qqi, &qti->qti_lockh,
                                    lqe);
        } else {
-               union ldlm_wire_lvb *lvb;
+               struct lquota_lvb *lvb;
 
                OBD_ALLOC_PTR(lvb);
                if (lvb == NULL)
@@ -1074,13 +1106,16 @@ void qsd_op_end(const struct lu_env *env, struct qsd_instance *qsd,
        if (unlikely(qsd == NULL))
                RETURN_EXIT;
 
+       if (qsd->qsd_dev->dd_rdonly)
+               RETURN_EXIT;
+
        /* We don't enforce quota until the qsd_instance is started */
-       cfs_read_lock(&qsd->qsd_lock);
+       read_lock(&qsd->qsd_lock);
        if (!qsd->qsd_started) {
-               cfs_read_unlock(&qsd->qsd_lock);
+               read_unlock(&qsd->qsd_lock);
                RETURN_EXIT;
        }
-       cfs_read_unlock(&qsd->qsd_lock);
+       read_unlock(&qsd->qsd_lock);
 
        LASSERT(trans != NULL);
 
@@ -1127,12 +1162,12 @@ void qsd_op_adjust(const struct lu_env *env, struct qsd_instance *qsd,
                RETURN_EXIT;
 
        /* We don't enforce quota until the qsd_instance is started */
-       cfs_read_lock(&qsd->qsd_lock);
+       read_lock(&qsd->qsd_lock);
        if (!qsd->qsd_started) {
-               cfs_read_unlock(&qsd->qsd_lock);
+               read_unlock(&qsd->qsd_lock);
                RETURN_EXIT;
        }
-       cfs_read_unlock(&qsd->qsd_lock);
+       read_unlock(&qsd->qsd_lock);
 
        qqi = qsd->qsd_type_array[qtype];
        LASSERT(qqi);
@@ -1141,16 +1176,16 @@ void qsd_op_adjust(const struct lu_env *env, struct qsd_instance *qsd,
            qid->qid_uid == 0)
                RETURN_EXIT;
 
-       cfs_read_lock(&qsd->qsd_lock);
+       read_lock(&qsd->qsd_lock);
        if (!qsd->qsd_started) {
-               cfs_read_unlock(&qsd->qsd_lock);
+               read_unlock(&qsd->qsd_lock);
                RETURN_EXIT;
        }
-       cfs_read_unlock(&qsd->qsd_lock);
+       read_unlock(&qsd->qsd_lock);
 
        lqe = lqe_locate(env, qqi->qqi_site, qid);
        if (IS_ERR(lqe)) {
-               CERROR("%s: fail to locate lqe for id:"LPU64", type:%d\n",
+               CERROR("%s: fail to locate lqe for id:%llu, type:%d\n",
                       qsd->qsd_svname, qid->qid_uid, qtype);
                RETURN_EXIT;
        }