X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fquota%2Fqsd_handler.c;h=a8d842ddb901667321cebfdd900932f63992070c;hb=2e2c8d59cfdb6d7078f9627ca970920059fcf90f;hp=6bef54e86c7f3960274c7e771b965459b285576f;hpb=11db1a551172f596d1d284e8496530f9ce24ac81;p=fs%2Flustre-release.git diff --git a/lustre/quota/qsd_handler.c b/lustre/quota/qsd_handler.c index 6bef54e..a8d842d 100644 --- a/lustre/quota/qsd_handler.c +++ b/lustre/quota/qsd_handler.c @@ -21,7 +21,7 @@ * GPL HEADER END */ /* - * Copyright (c) 2012, Intel Corporation. + * Copyright (c) 2012, 2014, Intel Corporation. * Use is subject to license terms. * * Author: Johann Lombardi @@ -45,7 +45,7 @@ static inline int qsd_request_enter(struct lquota_entry *lqe) } if (lqe->lqe_pending_rel != 0) { - LQUOTA_ERROR(lqe, "no request in flight with pending_rel="LPU64, + LQUOTA_ERROR(lqe, "no request in flight with pending_rel=%llu", lqe->lqe_pending_rel); LBUG(); } @@ -65,7 +65,7 @@ static inline void qsd_request_exit(struct lquota_entry *lqe) } lqe->lqe_pending_req--; lqe->lqe_pending_rel = 0; - cfs_waitq_broadcast(&lqe->lqe_waiters); + wake_up_all(&lqe->lqe_waiters); } /** @@ -350,12 +350,12 @@ static void qsd_req_completion(const struct lu_env *env, * the DQACQ since the limit for this ID has been removed, so we * should not update quota entry & slave index copy neither. */ if (repbody != NULL && repbody->qb_count != 0) { - LQUOTA_DEBUG(lqe, "DQACQ qb_count:"LPU64, repbody->qb_count); + LQUOTA_DEBUG(lqe, "DQACQ qb_count:%llu", repbody->qb_count); if (req_is_rel(reqbody->qb_flags)) { if (lqe->lqe_granted < repbody->qb_count) { LQUOTA_ERROR(lqe, "can't release more space " - "than owned "LPU64"<"LPU64, + "than owned %llu<%llu", lqe->lqe_granted, repbody->qb_count); lqe->lqe_granted = 0; @@ -385,13 +385,10 @@ static void qsd_req_completion(const struct lu_env *env, } /* extract information from lvb */ - if (ret == 0 && lvb != 0) { + if (ret == 0 && lvb != NULL) { if (lvb->lvb_id_qunit != 0) qsd_set_qunit(lqe, lvb->lvb_id_qunit); - if (lvb->lvb_flags & LQUOTA_FL_EDQUOT) - lqe->lqe_edquot = true; - else - lqe->lqe_edquot = false; + qsd_set_edquot(lqe, !!(lvb->lvb_flags & LQUOTA_FL_EDQUOT)); } else if (repbody != NULL && repbody->qb_qunit != 0) { qsd_set_qunit(lqe, repbody->qb_qunit); } @@ -405,7 +402,7 @@ out: adjust = qsd_adjust_needed(lqe); if (reqbody && req_is_acq(reqbody->qb_flags) && ret != -EDQUOT) { lqe->lqe_acq_rc = ret; - lqe->lqe_acq_time = cfs_time_current_64(); + lqe->lqe_acq_time = ktime_get_seconds(); } out_noadjust: qsd_request_exit(lqe); @@ -461,9 +458,14 @@ static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space) lqe->lqe_pending_write += space; lqe->lqe_waiting_write -= space; rc = 0; - } else if (lqe->lqe_edquot) { + /* lqe_edquot flag is used to avoid flooding dqacq requests when + * the user is over quota, however, the lqe_edquot could be stale + * sometimes due to the race reply of dqacq vs. id lock glimpse + * (see LU-4505), so we revalidate it every 5 seconds. */ + } else if (lqe->lqe_edquot && + (lqe->lqe_edquot_time > ktime_get_seconds() - 5)) { rc = -EDQUOT; - } else { + }else { rc = -EAGAIN; } lqe_write_unlock(lqe); @@ -560,7 +562,7 @@ static int qsd_acquire_remote(const struct lu_env *env, /* check whether an acquire request completed recently */ if (lqe->lqe_acq_rc != 0 && - cfs_time_before_64(cfs_time_shift_64(-1), lqe->lqe_acq_time)) { + lqe->lqe_acq_time > ktime_get_seconds() - 1) { lqe_write_unlock(lqe); LQUOTA_DEBUG(lqe, "using cached return code %d", lqe->lqe_acq_rc); RETURN(lqe->lqe_acq_rc); @@ -631,7 +633,7 @@ static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe, ENTRY; for (count = 0; rc == 0; count++) { - LQUOTA_DEBUG(lqe, "acquiring:"LPD64 " count=%d", space, count); + LQUOTA_DEBUG(lqe, "acquiring:%lld count=%d", space, count); if (lqe2qqi(lqe)->qqi_qsd->qsd_stopping) { rc = -EINPROGRESS; @@ -684,9 +686,10 @@ static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi, struct lquota_id_info *qid, long long space, int *flags) { - struct lquota_entry *lqe; - int rc, ret = -EINPROGRESS; - struct l_wait_info lwi; + struct lquota_entry *lqe; + struct l_wait_info lwi; + int qtype_flag = 0; + int rc, ret = -EINPROGRESS; ENTRY; if (qid->lqi_qentry != NULL) { @@ -719,7 +722,7 @@ static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi, RETURN(0); } - LQUOTA_DEBUG(lqe, "op_begin space:"LPD64, space); + LQUOTA_DEBUG(lqe, "op_begin space:%lld", space); lqe_write_lock(lqe); lqe->lqe_waiting_write += space; @@ -769,7 +772,7 @@ static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi, out_flags: LASSERT(qid->lqi_is_blk); if (rc != 0) { - *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype); + *flags |= lquota_over_fl(qqi->qqi_qtype); } else { __u64 usage; @@ -779,11 +782,12 @@ out_flags: usage += lqe->lqe_waiting_write; usage += qqi->qqi_qsd->qsd_sync_threshold; + qtype_flag = lquota_over_fl(qqi->qqi_qtype); /* if we should notify client to start sync write */ if (usage >= lqe->lqe_granted - lqe->lqe_pending_rel) - *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype); + *flags |= qtype_flag; else - *flags &= ~LQUOTA_OVER_FL(qqi->qqi_qtype); + *flags &= ~qtype_flag; lqe_read_unlock(lqe); } } @@ -830,6 +834,9 @@ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd, if (unlikely(qsd == NULL)) RETURN(0); + if (qsd->qsd_dev->dd_rdonly) + RETURN(0); + /* We don't enforce quota until the qsd_instance is started */ read_lock(&qsd->qsd_lock); if (!qsd->qsd_started) { @@ -848,10 +855,10 @@ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd, * or - the user/group is root * or - quota accounting isn't enabled */ if (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0 || - qsd->qsd_acct_failed) + (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed) RETURN(0); - LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d", + LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d\n", trans->lqt_id_cnt); /* check whether we already allocated a slot for this id */ for (i = 0; i < trans->lqt_id_cnt; i++) { @@ -919,6 +926,9 @@ int qsd_adjust(const struct lu_env *env, struct lquota_entry *lqe) qqi = lqe2qqi(lqe); qsd = qqi->qqi_qsd; + if (qsd->qsd_dev->dd_rdonly) + RETURN(0); + lqe_write_lock(lqe); /* fill qb_count & qb_flags */ @@ -1072,6 +1082,9 @@ void qsd_op_end(const struct lu_env *env, struct qsd_instance *qsd, if (unlikely(qsd == NULL)) RETURN_EXIT; + if (qsd->qsd_dev->dd_rdonly) + RETURN_EXIT; + /* We don't enforce quota until the qsd_instance is started */ read_lock(&qsd->qsd_lock); if (!qsd->qsd_started) { @@ -1148,7 +1161,7 @@ void qsd_op_adjust(const struct lu_env *env, struct qsd_instance *qsd, lqe = lqe_locate(env, qqi->qqi_site, qid); if (IS_ERR(lqe)) { - CERROR("%s: fail to locate lqe for id:"LPU64", type:%d\n", + CERROR("%s: fail to locate lqe for id:%llu, type:%d\n", qsd->qsd_svname, qid->qid_uid, qtype); RETURN_EXIT; }