X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fquota%2Fqsd_handler.c;h=ed67286d22c14335961970dc1d66e5fe6d0bcf86;hb=a7d68d9f32abb85d3e01d184292ba6f10c774704;hp=6bef54e86c7f3960274c7e771b965459b285576f;hpb=11db1a551172f596d1d284e8496530f9ce24ac81;p=fs%2Flustre-release.git diff --git a/lustre/quota/qsd_handler.c b/lustre/quota/qsd_handler.c index 6bef54e..ed67286 100644 --- a/lustre/quota/qsd_handler.c +++ b/lustre/quota/qsd_handler.c @@ -21,7 +21,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2012, Intel Corporation. + * Copyright (c) 2012, 2017, Intel Corporation. * Use is subject to license terms. * * Author: Johann Lombardi @@ -45,7 +45,7 @@ static inline int qsd_request_enter(struct lquota_entry *lqe) } if (lqe->lqe_pending_rel != 0) { - LQUOTA_ERROR(lqe, "no request in flight with pending_rel="LPU64, + LQUOTA_ERROR(lqe, "no request in flight with pending_rel=%llu", lqe->lqe_pending_rel); LBUG(); } @@ -65,7 +65,7 @@ static inline void qsd_request_exit(struct lquota_entry *lqe) } lqe->lqe_pending_req--; lqe->lqe_pending_rel = 0; - cfs_waitq_broadcast(&lqe->lqe_waiters); + wake_up_all(&lqe->lqe_waiters); } /** @@ -350,12 +350,12 @@ static void qsd_req_completion(const struct lu_env *env, * the DQACQ since the limit for this ID has been removed, so we * should not update quota entry & slave index copy neither. */ if (repbody != NULL && repbody->qb_count != 0) { - LQUOTA_DEBUG(lqe, "DQACQ qb_count:"LPU64, repbody->qb_count); + LQUOTA_DEBUG(lqe, "DQACQ qb_count:%llu", repbody->qb_count); if (req_is_rel(reqbody->qb_flags)) { if (lqe->lqe_granted < repbody->qb_count) { LQUOTA_ERROR(lqe, "can't release more space " - "than owned "LPU64"<"LPU64, + "than owned %llu<%llu", lqe->lqe_granted, repbody->qb_count); lqe->lqe_granted = 0; @@ -385,13 +385,10 @@ static void qsd_req_completion(const struct lu_env *env, } /* extract information from lvb */ - if (ret == 0 && lvb != 0) { + if (ret == 0 && lvb != NULL) { if (lvb->lvb_id_qunit != 0) qsd_set_qunit(lqe, lvb->lvb_id_qunit); - if (lvb->lvb_flags & LQUOTA_FL_EDQUOT) - lqe->lqe_edquot = true; - else - lqe->lqe_edquot = false; + qsd_set_edquot(lqe, !!(lvb->lvb_flags & LQUOTA_FL_EDQUOT)); } else if (repbody != NULL && repbody->qb_qunit != 0) { qsd_set_qunit(lqe, repbody->qb_qunit); } @@ -405,7 +402,7 @@ out: adjust = qsd_adjust_needed(lqe); if (reqbody && req_is_acq(reqbody->qb_flags) && ret != -EDQUOT) { lqe->lqe_acq_rc = ret; - lqe->lqe_acq_time = cfs_time_current_64(); + lqe->lqe_acq_time = ktime_get_seconds(); } out_noadjust: qsd_request_exit(lqe); @@ -461,9 +458,14 @@ static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space) lqe->lqe_pending_write += space; lqe->lqe_waiting_write -= space; rc = 0; - } else if (lqe->lqe_edquot) { + /* lqe_edquot flag is used to avoid flooding dqacq requests when + * the user is over quota, however, the lqe_edquot could be stale + * sometimes due to the race reply of dqacq vs. id lock glimpse + * (see LU-4505), so we revalidate it every 5 seconds. */ + } else if (lqe->lqe_edquot && + (lqe->lqe_edquot_time > ktime_get_seconds() - 5)) { rc = -EDQUOT; - } else { + }else { rc = -EAGAIN; } lqe_write_unlock(lqe); @@ -560,7 +562,7 @@ static int qsd_acquire_remote(const struct lu_env *env, /* check whether an acquire request completed recently */ if (lqe->lqe_acq_rc != 0 && - cfs_time_before_64(cfs_time_shift_64(-1), lqe->lqe_acq_time)) { + lqe->lqe_acq_time > ktime_get_seconds() - 1) { lqe_write_unlock(lqe); LQUOTA_DEBUG(lqe, "using cached return code %d", lqe->lqe_acq_rc); RETURN(lqe->lqe_acq_rc); @@ -621,7 +623,8 @@ static int qsd_acquire_remote(const struct lu_env *env, * \param space - is the amount of quota required for the operation * \param ret - is the return code (-EDQUOT, -EINPROGRESS, ...) * - * \retval true - exit from l_wait_event and real return value in \a ret + * \retval true - stop waiting in wait_event_idle_timeout, + * and real return value in \a ret * \retval false - continue waiting */ static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe, @@ -631,7 +634,7 @@ static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe, ENTRY; for (count = 0; rc == 0; count++) { - LQUOTA_DEBUG(lqe, "acquiring:"LPD64 " count=%d", space, count); + LQUOTA_DEBUG(lqe, "acquiring:%lld count=%d", space, count); if (lqe2qqi(lqe)->qqi_qsd->qsd_stopping) { rc = -EINPROGRESS; @@ -650,6 +653,11 @@ static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe, * rc < 0, something bad happened */ break; + /* if we have gotten some quota and stil wait more quota, + * it's better to give QMT some time to reclaim from clients */ + if (count > 0) + schedule_timeout_interruptible(cfs_time_seconds(1)); + /* need to acquire more quota space from master */ rc = qsd_acquire_remote(env, lqe); } @@ -658,7 +666,7 @@ static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe, /* already a request in flight, continue waiting */ RETURN(false); *ret = rc; - RETURN(true); /* exit from l_wait_event */ + RETURN(true); } /** @@ -682,11 +690,11 @@ static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe, */ static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi, struct lquota_id_info *qid, long long space, - int *flags) + enum osd_quota_local_flags *local_flags) { - struct lquota_entry *lqe; - int rc, ret = -EINPROGRESS; - struct l_wait_info lwi; + struct lquota_entry *lqe; + enum osd_quota_local_flags qtype_flag = 0; + int rc, ret = -EINPROGRESS; ENTRY; if (qid->lqi_qentry != NULL) { @@ -712,14 +720,14 @@ static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi, * quota space. That said, we still want to perform space * adjustments in qsd_op_end, so we return here, but with * a reference on the lqe */ - if (flags != NULL) { + if (local_flags != NULL) { rc = qsd_refresh_usage(env, lqe); GOTO(out_flags, rc); } RETURN(0); } - LQUOTA_DEBUG(lqe, "op_begin space:"LPD64, space); + LQUOTA_DEBUG(lqe, "op_begin space:%lld", space); lqe_write_lock(lqe); lqe->lqe_waiting_write += space; @@ -727,26 +735,28 @@ static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi, /* acquire quota space for the operation, cap overall wait time to * prevent a service thread from being stuck for too long */ - lwi = LWI_TIMEOUT(cfs_time_seconds(qsd_wait_timeout(qqi->qqi_qsd)), - NULL, NULL); - rc = l_wait_event(lqe->lqe_waiters, qsd_acquire(env, lqe, space, &ret), - &lwi); + rc = wait_event_idle_timeout( + lqe->lqe_waiters, qsd_acquire(env, lqe, space, &ret), + cfs_time_seconds(qsd_wait_timeout(qqi->qqi_qsd))); - if (rc == 0 && ret == 0) { + if (rc > 0 && ret == 0) { qid->lqi_space += space; + rc = 0; } else { - if (rc == 0) + if (rc > 0) rc = ret; + else if (rc == 0) + rc = -ETIMEDOUT; LQUOTA_DEBUG(lqe, "acquire quota failed:%d", rc); lqe_write_lock(lqe); lqe->lqe_waiting_write -= space; - if (flags && lqe->lqe_pending_write != 0) + if (local_flags && lqe->lqe_pending_write != 0) /* Inform OSD layer that there are pending writes. * It might want to retry after a sync if appropriate */ - *flags |= QUOTA_FL_SYNC; + *local_flags |= QUOTA_FL_SYNC; lqe_write_unlock(lqe); /* convert recoverable error into -EINPROGRESS, client will @@ -765,25 +775,32 @@ static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi, } } - if (flags != NULL) { + if (local_flags != NULL) { out_flags: LASSERT(qid->lqi_is_blk); if (rc != 0) { - *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype); + *local_flags |= lquota_over_fl(qqi->qqi_qtype); } else { __u64 usage; lqe_read_lock(lqe); - usage = lqe->lqe_usage; - usage += lqe->lqe_pending_write; + usage = lqe->lqe_pending_write; usage += lqe->lqe_waiting_write; - usage += qqi->qqi_qsd->qsd_sync_threshold; + /* There is a chance to successfully grant more quota + * but get edquot flag through glimpse. */ + if (lqe->lqe_edquot || (lqe->lqe_qunit != 0 && + (usage % lqe->lqe_qunit > + qqi->qqi_qsd->qsd_sync_threshold))) + usage += qqi->qqi_qsd->qsd_sync_threshold; + + usage += lqe->lqe_usage; + qtype_flag = lquota_over_fl(qqi->qqi_qtype); /* if we should notify client to start sync write */ if (usage >= lqe->lqe_granted - lqe->lqe_pending_rel) - *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype); + *local_flags |= qtype_flag; else - *flags &= ~LQUOTA_OVER_FL(qqi->qqi_qtype); + *local_flags &= ~qtype_flag; lqe_read_unlock(lqe); } } @@ -796,7 +813,7 @@ out_flags: static inline bool qid_equal(struct lquota_id_info *q1, struct lquota_id_info *q2) { - if (q1->lqi_type != q2->lqi_type) + if (q1->lqi_is_blk != q2->lqi_is_blk || q1->lqi_type != q2->lqi_type) return false; return (q1->lqi_id.qid_uid == q2->lqi_id.qid_uid) ? true : false; } @@ -821,7 +838,7 @@ static inline bool qid_equal(struct lquota_id_info *q1, */ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd, struct lquota_trans *trans, struct lquota_id_info *qi, - int *flags) + enum osd_quota_local_flags *local_flags) { int i, rc; bool found = false; @@ -830,6 +847,9 @@ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd, if (unlikely(qsd == NULL)) RETURN(0); + if (qsd->qsd_dev->dd_rdonly) + RETURN(0); + /* We don't enforce quota until the qsd_instance is started */ read_lock(&qsd->qsd_lock); if (!qsd->qsd_started) { @@ -848,17 +868,15 @@ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd, * or - the user/group is root * or - quota accounting isn't enabled */ if (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0 || - qsd->qsd_acct_failed) + (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed) RETURN(0); - LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d", + LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d\n", trans->lqt_id_cnt); /* check whether we already allocated a slot for this id */ for (i = 0; i < trans->lqt_id_cnt; i++) { if (qid_equal(qi, &trans->lqt_ids[i])) { found = true; - /* make sure we are not mixing inodes & blocks */ - LASSERT(trans->lqt_ids[i].lqi_is_blk == qi->lqi_is_blk); break; } } @@ -879,7 +897,7 @@ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd, /* manage quota enforcement for this ID */ rc = qsd_op_begin0(env, qsd->qsd_type_array[qi->lqi_type], - &trans->lqt_ids[i], qi->lqi_space, flags); + &trans->lqt_ids[i], qi->lqi_space, local_flags); RETURN(rc); } EXPORT_SYMBOL(qsd_op_begin); @@ -919,6 +937,9 @@ int qsd_adjust(const struct lu_env *env, struct lquota_entry *lqe) qqi = lqe2qqi(lqe); qsd = qqi->qqi_qsd; + if (qsd->qsd_dev->dd_rdonly) + RETURN(0); + lqe_write_lock(lqe); /* fill qb_count & qb_flags */ @@ -1072,6 +1093,9 @@ void qsd_op_end(const struct lu_env *env, struct qsd_instance *qsd, if (unlikely(qsd == NULL)) RETURN_EXIT; + if (qsd->qsd_dev->dd_rdonly) + RETURN_EXIT; + /* We don't enforce quota until the qsd_instance is started */ read_lock(&qsd->qsd_lock); if (!qsd->qsd_started) { @@ -1148,7 +1172,7 @@ void qsd_op_adjust(const struct lu_env *env, struct qsd_instance *qsd, lqe = lqe_locate(env, qqi->qqi_site, qid); if (IS_ERR(lqe)) { - CERROR("%s: fail to locate lqe for id:"LPU64", type:%d\n", + CERROR("%s: fail to locate lqe for id:%llu, type:%d\n", qsd->qsd_svname, qid->qid_uid, qtype); RETURN_EXIT; }