Whamcloud - gitweb
LU-14927 quota: move qsd_transfer to lquota module
[fs/lustre-release.git] / lustre / quota / qsd_handler.c
index eb0bddb..1c8e43a 100644 (file)
@@ -65,7 +65,7 @@ static inline void qsd_request_exit(struct lquota_entry *lqe)
        }
        lqe->lqe_pending_req--;
        lqe->lqe_pending_rel = 0;
-       wake_up_all(&lqe->lqe_waiters);
+       wake_up(&lqe->lqe_waiters);
 }
 
 /**
@@ -623,18 +623,22 @@ static int qsd_acquire_remote(const struct lu_env *env,
  * \param space - is the amount of quota required for the operation
  * \param ret   - is the return code (-EDQUOT, -EINPROGRESS, ...)
  *
- * \retval true  - exit from l_wait_event and real return value in \a ret
+ * \retval true  - stop waiting in wait_event_idle_timeout,
+ *                 and real return value in \a ret
  * \retval false - continue waiting
  */
 static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe,
                        long long space, int *ret)
 {
        int rc = 0, count;
+       int wait_pending = 0;
+       struct qsd_qtype_info *qqi = lqe2qqi(lqe);
+
        ENTRY;
 
        for (count = 0; rc == 0; count++) {
                LQUOTA_DEBUG(lqe, "acquiring:%lld count=%d", space, count);
-
+again:
                if (lqe2qqi(lqe)->qqi_qsd->qsd_stopping) {
                        rc = -EINPROGRESS;
                        break;
@@ -651,13 +655,21 @@ static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe,
                        /* rc == 0, Wouhou! enough local quota space
                         * rc < 0, something bad happened */
                         break;
+               /*
+                * There might be a window that commit transaction
+                * have updated usage but pending write doesn't change
+                * wait for it before acquiring remotely.
+                */
+               if (lqe->lqe_pending_write >= space && !wait_pending) {
+                       wait_pending = 1;
+                       dt_sync(env, qqi->qqi_qsd->qsd_dev);
+                       goto again;
+               }
 
                /* if we have gotten some quota and stil wait more quota,
                 * it's better to give QMT some time to reclaim from clients */
-               if (count > 0) {
-                       set_current_state(TASK_INTERRUPTIBLE);
-                       schedule_timeout(cfs_time_seconds(1));
-               }
+               if (count > 0)
+                       schedule_timeout_interruptible(cfs_time_seconds(1));
 
                /* need to acquire more quota space from master */
                rc = qsd_acquire_remote(env, lqe);
@@ -667,7 +679,7 @@ static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe,
                /* already a request in flight, continue waiting */
                RETURN(false);
        *ret = rc;
-       RETURN(true); /* exit from l_wait_event */
+       RETURN(true);
 }
 
 /**
@@ -691,11 +703,10 @@ static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe,
  */
 static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
                         struct lquota_id_info *qid, long long space,
-                        int *flags)
+                        enum osd_quota_local_flags *local_flags)
 {
        struct lquota_entry *lqe;
-       struct l_wait_info lwi;
-       int qtype_flag = 0;
+       enum osd_quota_local_flags qtype_flag = 0;
        int rc, ret = -EINPROGRESS;
        ENTRY;
 
@@ -722,7 +733,7 @@ static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
                 * quota space. That said, we still want to perform space
                 * adjustments in qsd_op_end, so we return here, but with
                 * a reference on the lqe */
-               if (flags != NULL) {
+               if (local_flags != NULL) {
                        rc = qsd_refresh_usage(env, lqe);
                        GOTO(out_flags, rc);
                }
@@ -737,26 +748,28 @@ static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
 
        /* acquire quota space for the operation, cap overall wait time to
         * prevent a service thread from being stuck for too long */
-       lwi = LWI_TIMEOUT(cfs_time_seconds(qsd_wait_timeout(qqi->qqi_qsd)),
-                         NULL, NULL);
-       rc = l_wait_event(lqe->lqe_waiters, qsd_acquire(env, lqe, space, &ret),
-                         &lwi);
+       rc = wait_event_idle_timeout(
+               lqe->lqe_waiters, qsd_acquire(env, lqe, space, &ret),
+               cfs_time_seconds(qsd_wait_timeout(qqi->qqi_qsd)));
 
-       if (rc == 0 && ret == 0) {
+       if (rc > 0 && ret == 0) {
                qid->lqi_space += space;
+               rc = 0;
        } else {
-               if (rc == 0)
+               if (rc > 0)
                        rc = ret;
+               else if (rc == 0)
+                       rc = -ETIMEDOUT;
 
                LQUOTA_DEBUG(lqe, "acquire quota failed:%d", rc);
 
                lqe_write_lock(lqe);
                lqe->lqe_waiting_write -= space;
 
-               if (flags && lqe->lqe_pending_write != 0)
+               if (local_flags && lqe->lqe_pending_write != 0)
                        /* Inform OSD layer that there are pending writes.
                         * It might want to retry after a sync if appropriate */
-                        *flags |= QUOTA_FL_SYNC;
+                        *local_flags |= QUOTA_FL_SYNC;
                lqe_write_unlock(lqe);
 
                /* convert recoverable error into -EINPROGRESS, client will
@@ -775,26 +788,32 @@ static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi,
                }
        }
 
-       if (flags != NULL) {
+       if (local_flags != NULL) {
 out_flags:
                LASSERT(qid->lqi_is_blk);
                if (rc != 0) {
-                       *flags |= lquota_over_fl(qqi->qqi_qtype);
+                       *local_flags |= lquota_over_fl(qqi->qqi_qtype);
                } else {
                        __u64   usage;
 
                        lqe_read_lock(lqe);
-                       usage  = lqe->lqe_usage;
-                       usage += lqe->lqe_pending_write;
+                       usage = lqe->lqe_pending_write;
                        usage += lqe->lqe_waiting_write;
-                       usage += qqi->qqi_qsd->qsd_sync_threshold;
+                       /* There is a chance to successfully grant more quota
+                        * but get edquot flag through glimpse. */
+                       if (lqe->lqe_edquot || (lqe->lqe_qunit != 0 &&
+                          (usage % lqe->lqe_qunit >
+                           qqi->qqi_qsd->qsd_sync_threshold)))
+                               usage += qqi->qqi_qsd->qsd_sync_threshold;
+
+                       usage += lqe->lqe_usage;
 
                        qtype_flag = lquota_over_fl(qqi->qqi_qtype);
                        /* if we should notify client to start sync write */
                        if (usage >= lqe->lqe_granted - lqe->lqe_pending_rel)
-                               *flags |= qtype_flag;
+                               *local_flags |= qtype_flag;
                        else
-                               *flags &= ~qtype_flag;
+                               *local_flags &= ~qtype_flag;
                        lqe_read_unlock(lqe);
                }
        }
@@ -807,7 +826,7 @@ out_flags:
 static inline bool qid_equal(struct lquota_id_info *q1,
                             struct lquota_id_info *q2)
 {
-       if (q1->lqi_type != q2->lqi_type)
+       if (q1->lqi_is_blk != q2->lqi_is_blk || q1->lqi_type != q2->lqi_type)
                return false;
        return (q1->lqi_id.qid_uid == q2->lqi_id.qid_uid) ? true : false;
 }
@@ -832,7 +851,7 @@ static inline bool qid_equal(struct lquota_id_info *q1,
  */
 int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
                 struct lquota_trans *trans, struct lquota_id_info *qi,
-                int *flags)
+                enum osd_quota_local_flags *local_flags)
 {
        int     i, rc;
        bool    found = false;
@@ -871,8 +890,6 @@ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
        for (i = 0; i < trans->lqt_id_cnt; i++) {
                if (qid_equal(qi, &trans->lqt_ids[i])) {
                        found = true;
-                       /* make sure we are not mixing inodes & blocks */
-                       LASSERT(trans->lqt_ids[i].lqi_is_blk == qi->lqi_is_blk);
                        break;
                }
        }
@@ -893,7 +910,7 @@ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
 
        /* manage quota enforcement for this ID */
        rc = qsd_op_begin0(env, qsd->qsd_type_array[qi->lqi_type],
-                          &trans->lqt_ids[i], qi->lqi_space, flags);
+                          &trans->lqt_ids[i], qi->lqi_space, local_flags);
        RETURN(rc);
 }
 EXPORT_SYMBOL(qsd_op_begin);
@@ -1119,6 +1136,72 @@ void qsd_op_end(const struct lu_env *env, struct qsd_instance *qsd,
 }
 EXPORT_SYMBOL(qsd_op_end);
 
+/* Simple wrapper on top of qsd API which implement quota transfer for osd
+ * setattr needs. As a reminder, only the root user can change ownership of
+ * a file, that's why EDQUOT & EINPROGRESS errors are discarded
+ */
+int qsd_transfer(const struct lu_env *env, struct qsd_instance *qsd,
+                struct lquota_trans *trans, unsigned int qtype,
+                u64 orig_id, u64 new_id, u64 bspace,
+                struct lquota_id_info *qi)
+{
+       int rc;
+
+       if (unlikely(!qsd))
+               return 0;
+
+       LASSERT(qtype < LL_MAXQUOTAS);
+       if (qtype == PRJQUOTA)
+               if (!projid_valid(make_kprojid(&init_user_ns, new_id)))
+                       return -EINVAL;
+
+       qi->lqi_type = qtype;
+
+       /* inode accounting */
+       qi->lqi_is_blk = false;
+
+       /* one more inode for the new owner ... */
+       qi->lqi_id.qid_uid = new_id;
+       qi->lqi_space = 1;
+       rc = qsd_op_begin(env, qsd, trans, qi, NULL);
+       if (rc == -EDQUOT || rc == -EINPROGRESS)
+               rc = 0;
+       if (rc)
+               return rc;
+
+       /* and one less inode for the current id */
+       qi->lqi_id.qid_uid = orig_id;
+       qi->lqi_space = -1;
+       /* can't get EDQUOT when reducing usage */
+       rc = qsd_op_begin(env, qsd, trans, qi, NULL);
+       if (rc == -EINPROGRESS)
+               rc = 0;
+       if (rc)
+               return rc;
+
+       /* block accounting */
+       qi->lqi_is_blk = true;
+
+       /* more blocks for the new owner ... */
+       qi->lqi_id.qid_uid = new_id;
+       qi->lqi_space = bspace;
+       rc = qsd_op_begin(env, qsd, trans, qi, NULL);
+       if (rc == -EDQUOT || rc == -EINPROGRESS)
+               rc = 0;
+       if (rc)
+               return rc;
+
+       /* and finally less blocks for the current owner */
+       qi->lqi_id.qid_uid = orig_id;
+       qi->lqi_space = -bspace;
+       rc = qsd_op_begin(env, qsd, trans, qi, NULL);
+       /* can't get EDQUOT when reducing usage */
+       if (rc == -EINPROGRESS)
+               rc = 0;
+       return rc;
+}
+EXPORT_SYMBOL(qsd_transfer);
+
 /**
  * Trigger pre-acquire/release if necessary.
  * It's only used by ldiskfs osd so far. When unlink a file in ldiskfs, the
@@ -1186,3 +1269,98 @@ void qsd_op_adjust(const struct lu_env *env, struct qsd_instance *qsd,
        EXIT;
 }
 EXPORT_SYMBOL(qsd_op_adjust);
+
+/**
+ * Reserve or free quota.
+ *
+ * Currently, It's used to reserve quota space before changing the file's group
+ * for normal user and free the reserved quota after the group change.
+ *
+ * \param env     - the environment passed by the caller
+ * \param qsd     - is the qsd instance associated with the device in charge of
+ *                  the operation.
+ * \param qi      - qid & space required by current operation
+ *
+ * \retval 0            - success
+ * \retval -EDQUOT      - out of quota
+ * \retval -EINPROGRESS - inform client to retry write
+ * \retval -ve          - other appropriate errors
+ */
+int qsd_reserve_or_free_quota(const struct lu_env *env,
+                             struct qsd_instance *qsd,
+                             struct lquota_id_info *qi)
+{
+       struct lquota_entry *lqe;
+       struct qsd_qtype_info  *qqi;
+       int rc = 0;
+       bool is_free = qi->lqi_space < 0;
+
+       ENTRY;
+
+       if (unlikely(qsd == NULL))
+               RETURN(0);
+
+       if (qsd->qsd_dev->dd_rdonly)
+               RETURN(0);
+
+       if (is_free)
+               qi->lqi_space *= -1;
+
+       /* We don't enforce quota until the qsd_instance is started */
+       read_lock(&qsd->qsd_lock);
+       if (!qsd->qsd_started) {
+               read_unlock(&qsd->qsd_lock);
+               RETURN(0);
+       }
+       read_unlock(&qsd->qsd_lock);
+
+       qqi = qsd->qsd_type_array[qi->lqi_type];
+       LASSERT(qqi);
+
+       CDEBUG(D_QUOTA, "type %s, acct %s, free %d, count %llu\n",
+              qsd_type_enabled(qsd, qi->lqi_type) ? "enabled" : "disabled",
+              (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed ? "failed" :
+              "succeed", is_free, qi->lqi_space);
+
+       /* ignore quota enforcement request when:
+        *    - quota isn't enforced for this quota type
+        * or - the user/group is root
+        * or - quota accounting isn't enabled
+        */
+       if (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0 ||
+           (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed)
+               RETURN(0);
+
+       if (is_free) {
+               /* look up lquota entry associated with qid */
+               lqe = lqe_locate(env, qqi->qqi_site, &qi->lqi_id);
+               if (IS_ERR(lqe))
+                       RETURN(PTR_ERR(lqe));
+               if (!lqe->lqe_enforced) {
+                       lqe_putref(lqe);
+                       RETURN(0);
+               }
+
+               qi->lqi_qentry = lqe;
+
+               /* lqe will be put in qsd_op_end0 */
+               qsd_op_end0(env, qsd->qsd_type_array[qi->lqi_type], qi);
+               qi->lqi_qentry = NULL;
+       } else {
+               /* manage quota enforcement for this ID */
+               rc = qsd_op_begin0(env, qsd->qsd_type_array[qi->lqi_type], qi,
+                                  qi->lqi_space, NULL);
+
+               if (qi->lqi_qentry != NULL) {
+                       lqe_putref(qi->lqi_qentry);
+                       qi->lqi_qentry = NULL;
+               }
+       }
+
+       CDEBUG(D_QUOTA, "%s quota: type %i, uid %llu, gid %llu, space %llu\n",
+              is_free ? "Free" : "Reserve", qi->lqi_type, qi->lqi_id.qid_uid,
+              qi->lqi_id.qid_gid, qi->lqi_space);
+
+       RETURN(rc);
+}
+EXPORT_SYMBOL(qsd_reserve_or_free_quota);