X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fquota%2Fqsd_handler.c;h=ae770f59d3791645bf2b43fca2f65b4e7fdd0b92;hp=5bdb0cf2477638c13677ba1e068bf719eb31b611;hb=18cd3e1e28afd311e4743dab1011f85fba0e1765;hpb=54a792d389e7d21151031a63731567cb877f918c diff --git a/lustre/quota/qsd_handler.c b/lustre/quota/qsd_handler.c index 5bdb0cf..ae770f5 100644 --- a/lustre/quota/qsd_handler.c +++ b/lustre/quota/qsd_handler.c @@ -21,121 +21,291 @@ * GPL HEADER END */ /* - * Copyright (c) 2012 Intel, Inc. + * Copyright (c) 2012, 2017, Intel Corporation. * Use is subject to license terms. * * Author: Johann Lombardi * Author: Niu Yawei */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif - #define DEBUG_SUBSYSTEM S_LQUOTA -#include -#include -#include -#include -#include - -#include -#include -#include - #include "qsd_internal.h" -/* - * helper function returning how much space is currently reserved for requests - * in flight. +/** + * helper function bumping lqe_pending_req if there is no quota request in + * flight for the lquota entry \a lqe. Otherwise, EBUSY is returned. */ -static inline int lqe_pending_dqacq(struct lquota_entry *lqe) +static inline int qsd_request_enter(struct lquota_entry *lqe) { - int pending; + /* is there already a quota request in flight? */ + if (lqe->lqe_pending_req != 0) { + LQUOTA_DEBUG(lqe, "already a request in flight"); + return -EBUSY; + } - lqe_read_lock(lqe); - pending = lqe->lqe_pending_req; - lqe_read_unlock(lqe); + if (lqe->lqe_pending_rel != 0) { + LQUOTA_ERROR(lqe, "no request in flight with pending_rel=%llu", + lqe->lqe_pending_rel); + LBUG(); + } + + lqe->lqe_pending_req++; + return 0; +} - return pending; +/** + * Companion of qsd_request_enter() dropping lqe_pending_req to 0. + */ +static inline void qsd_request_exit(struct lquota_entry *lqe) +{ + if (lqe->lqe_pending_req != 1) { + LQUOTA_ERROR(lqe, "lqe_pending_req != 1!!!"); + LBUG(); + } + lqe->lqe_pending_req--; + lqe->lqe_pending_rel = 0; + wake_up_all(&lqe->lqe_waiters); } -/* - * helper function returning true when the connection to master is ready to be - * used. +/** + * Check whether a qsd instance is all set to send quota request to master. + * This includes checking whether: + * - the connection to master is set up and usable, + * - the qsd isn't stopping + * - reintegration has been successfully completed and all indexes are + * up-to-date + * + * \param lqe - is the lquota entry for which we would like to send an quota + * request + * \param lockh - is the remote handle of the global lock returned on success + * + * \retval 0 on success, appropriate error on failure */ -static inline int qsd_ready(struct qsd_instance *qsd) +static int qsd_ready(struct lquota_entry *lqe, struct lustre_handle *lockh) { + struct qsd_qtype_info *qqi = lqe2qqi(lqe); + struct qsd_instance *qsd = qqi->qqi_qsd; struct obd_import *imp = NULL; + struct ldlm_lock *lock; + ENTRY; + + read_lock(&qsd->qsd_lock); + /* is the qsd about to shut down? */ + if (qsd->qsd_stopping) { + read_unlock(&qsd->qsd_lock); + LQUOTA_DEBUG(lqe, "dropping quota req since qsd is stopping"); + /* Target is about to shut down, client will retry */ + RETURN(-EINPROGRESS); + } - cfs_read_lock(&qsd->qsd_lock); + /* is the connection to the quota master ready? */ if (qsd->qsd_exp_valid) imp = class_exp2cliimp(qsd->qsd_exp); - cfs_read_unlock(&qsd->qsd_lock); + if (imp == NULL || imp->imp_invalid) { + read_unlock(&qsd->qsd_lock); + LQUOTA_DEBUG(lqe, "connection to master not ready"); + RETURN(-ENOTCONN); + } + + /* In most case, reintegration must have been triggered (when enable + * quota or on OST start), however, in rare race condition (enabling + * quota when starting OSTs), we might miss triggering reintegration + * for some qqi. + * + * If the previous reintegration failed for some reason, we'll + * re-trigger it here as well. */ + if (!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) { + read_unlock(&qsd->qsd_lock); + LQUOTA_DEBUG(lqe, "not up-to-date, dropping request and " + "kicking off reintegration"); + qsd_start_reint_thread(qqi); + RETURN(-EINPROGRESS); + } + + /* Fill the remote global lock handle, master will check this handle + * to see if the slave is sending request with stale lock */ + lustre_handle_copy(lockh, &qqi->qqi_lockh); + read_unlock(&qsd->qsd_lock); + + if (!lustre_handle_is_used(lockh)) + RETURN(-ENOLCK); + + lock = ldlm_handle2lock(lockh); + if (lock == NULL) + RETURN(-ENOLCK); - return (imp == NULL || imp->imp_invalid) ? false : true; + /* return remote lock handle to be packed in quota request */ + lustre_handle_copy(lockh, &lock->l_remote_handle); + LDLM_LOCK_PUT(lock); + + RETURN(0); } -/* - * Helper function returning true when quota space need to be adjusted (some - * unused space should be free or pre-acquire) and false otherwise. +/** + * Check whether any quota space adjustment (pre-acquire/release/report) is + * needed for a given quota ID. If a non-null \a qbody is passed, then the + * \a qbody structure (qb_count/flags/usage) is filled with appropriate data + * to be packed in the quota request. + * + * \param lqe - is the lquota entry for which we would like to adjust quota + * space. + * \param qbody - is the quota body to fill, if not NULL. + * + * \retval true - space adjustment is required and \a qbody is filled, if not + * NULL + * \retval false - no space adjustment required */ -static bool qsd_adjust_needed(struct lquota_entry *lqe) +static bool qsd_calc_adjust(struct lquota_entry *lqe, struct quota_body *qbody) { - struct qsd_qtype_info *qqi; - __u64 usage, granted; + __u64 usage, granted; + ENTRY; - qqi = lqe2qqi(lqe); + usage = lqe->lqe_usage; + usage += lqe->lqe_pending_write + lqe->lqe_waiting_write; + granted = lqe->lqe_granted; - if (!lqe->lqe_enforced || qqi->qqi_qsd->qsd_stopping) - /* if quota isn't enforced for this id, no need to adjust - * Similarly, no need to perform adjustment if the target is in - * the process of shutting down. */ - return false; + if (qbody != NULL) + qbody->qb_flags = 0; + + if (!lqe->lqe_enforced) { + /* quota not enforced any more for this ID */ + if (granted != 0) { + /* release all quota space unconditionally */ + LQUOTA_DEBUG(lqe, "not enforced, releasing all space"); + if (qbody != NULL) { + qbody->qb_count = granted; + qbody->qb_flags = QUOTA_DQACQ_FL_REL; + } + RETURN(true); + } + RETURN(false); + } - usage = lqe->lqe_usage; - usage += lqe->lqe_pending_write + lqe->lqe_waiting_write; - granted = lqe->lqe_granted - lqe->lqe_pending_rel; + if (!lustre_handle_is_used(&lqe->lqe_lockh)) { + /* No valid per-ID lock + * When reporting quota (during reintegration or on setquota + * glimpse), we should release granted space if usage is 0. + * Otherwise, if the usage is less than granted, we need to + * acquire the per-ID lock to make sure the unused grant can be + * reclaimed by per-ID lock glimpse. */ + if (usage == 0) { + /* no on-disk usage and no outstanding activity, release + * space */ + if (granted != 0) { + LQUOTA_DEBUG(lqe, "no usage, releasing all " + "space"); + if (qbody != NULL) { + qbody->qb_count = granted; + qbody->qb_flags = QUOTA_DQACQ_FL_REL; + } + RETURN(true); + } + LQUOTA_DEBUG(lqe, "no usage + no granted, nothing to " + "do"); + RETURN(false); + } - /* need to re-acquire per-ID lock or release all grant */ - if (!lustre_handle_is_used(&lqe->lqe_lockh) && - lqe->lqe_granted > lqe->lqe_usage) - return true; + if (lqe->lqe_usage < lqe->lqe_granted) { + /* holding quota space w/o any lock, enqueue per-ID lock + * again */ + LQUOTA_DEBUG(lqe, "(re)acquiring per-ID lock"); + if (qbody != NULL) { + qbody->qb_count = 0; + qbody->qb_flags = QUOTA_DQACQ_FL_ACQ; + } + RETURN(true); + } + + if (lqe->lqe_usage > lqe->lqe_granted) { + /* quota overrun, report usage */ + LQUOTA_DEBUG(lqe, "overrun, reporting usage"); + if (qbody != NULL) { + qbody->qb_usage = lqe->lqe_usage; + qbody->qb_flags = QUOTA_DQACQ_FL_REPORT; + } + RETURN(true); + } + LQUOTA_DEBUG(lqe, "granted matches usage, nothing to do"); + RETURN(false); + } - /* good old quota qunit adjustment logic which has been around since - * lustre 1.4: - * 1. Need to release some space? */ - if (granted > usage + lqe->lqe_qunit) - return true; + /* valid per-ID lock + * Apply good old quota qunit adjustment logic which has been around + * since lustre 1.4: + * 1. release spare quota space? */ + if (granted > usage + lqe->lqe_qunit) { + /* pre-release quota space */ + if (qbody == NULL) + RETURN(true); + qbody->qb_count = granted - usage; + /* if usage == 0, release all granted space */ + if (usage) { + /* try to keep one qunit of quota space */ + qbody->qb_count -= lqe->lqe_qunit; + /* but don't release less than qtune to avoid releasing + * space too often */ + if (qbody->qb_count < lqe->lqe_qtune) + qbody->qb_count = lqe->lqe_qtune; + } + qbody->qb_flags = QUOTA_DQACQ_FL_REL; + RETURN(true); + } /* 2. Any quota overrun? */ - if (lqe->lqe_usage > lqe->lqe_granted) - /* we ended up consuming more than we own, we need to have this - * fixed ASAP */ - return true; + if (lqe->lqe_usage > lqe->lqe_granted) { + /* we overconsumed quota space, we report usage in request so + * that master can adjust it unconditionally */ + if (qbody == NULL) + RETURN(true); + qbody->qb_usage = lqe->lqe_usage; + granted = lqe->lqe_usage; + qbody->qb_flags = QUOTA_DQACQ_FL_REPORT; + } /* 3. Time to pre-acquire? */ - if (!lqe->lqe_edquot && !lqe->lqe_nopreacq && lqe->lqe_qunit != 0 && - granted < usage + lqe->lqe_qtune) - /* need to pre-acquire some space if we don't want to block - * client's requests */ - return true; + if (!lqe->lqe_edquot && !lqe->lqe_nopreacq && usage > 0 && + lqe->lqe_qunit != 0 && granted < usage + lqe->lqe_qtune) { + /* To pre-acquire quota space, we report how much spare quota + * space the slave currently owns, then the master will grant us + * back how much we can pretend given the current state of + * affairs */ + if (qbody == NULL) + RETURN(true); + if (granted <= usage) + qbody->qb_count = 0; + else + qbody->qb_count = granted - usage; + qbody->qb_flags |= QUOTA_DQACQ_FL_PREACQ; + RETURN(true); + } - return false; + if (qbody != NULL) + RETURN(qbody->qb_flags != 0); + else + RETURN(false); } -/* +/** + * Helper function returning true when quota space need to be adjusted (some + * unused space should be free or pre-acquire) and false otherwise. + */ +static inline bool qsd_adjust_needed(struct lquota_entry *lqe) +{ + return qsd_calc_adjust(lqe, NULL); +} + +/** * Callback function called when an acquire/release request sent to the master * is completed */ -static void qsd_dqacq_completion(const struct lu_env *env, - struct qsd_qtype_info *qqi, - struct quota_body *reqbody, - struct quota_body *repbody, - struct lustre_handle *lockh, - union ldlm_wire_lvb *lvb, - void *arg, int ret) +static void qsd_req_completion(const struct lu_env *env, + struct qsd_qtype_info *qqi, + struct quota_body *reqbody, + struct quota_body *repbody, + struct lustre_handle *lockh, + struct lquota_lvb *lvb, + void *arg, int ret) { struct lquota_entry *lqe = (struct lquota_entry *)arg; struct qsd_thread_info *qti; @@ -157,7 +327,6 @@ static void qsd_dqacq_completion(const struct lu_env *env, qti = qsd_info(env); lqe_write_lock(lqe); - LQUOTA_DEBUG(lqe, "DQACQ returned %d, flags:0x%x", ret, reqbody->qb_flags); @@ -181,12 +350,12 @@ static void qsd_dqacq_completion(const struct lu_env *env, * the DQACQ since the limit for this ID has been removed, so we * should not update quota entry & slave index copy neither. */ if (repbody != NULL && repbody->qb_count != 0) { - LQUOTA_DEBUG(lqe, "DQACQ qb_count:"LPU64, repbody->qb_count); + LQUOTA_DEBUG(lqe, "DQACQ qb_count:%llu", repbody->qb_count); if (req_is_rel(reqbody->qb_flags)) { if (lqe->lqe_granted < repbody->qb_count) { LQUOTA_ERROR(lqe, "can't release more space " - "than owned "LPU64"<"LPU64, + "than owned %llu<%llu", lqe->lqe_granted, repbody->qb_count); lqe->lqe_granted = 0; @@ -216,13 +385,10 @@ static void qsd_dqacq_completion(const struct lu_env *env, } /* extract information from lvb */ - if (ret == 0 && lvb != 0) { - if (lvb->l_lquota.lvb_id_qunit != 0) - qsd_set_qunit(lqe, lvb->l_lquota.lvb_id_qunit); - if (lvb->l_lquota.lvb_flags & LQUOTA_FL_EDQUOT) - lqe->lqe_edquot = true; - else - lqe->lqe_edquot = false; + if (ret == 0 && lvb != NULL) { + if (lvb->lvb_id_qunit != 0) + qsd_set_qunit(lqe, lvb->lvb_id_qunit); + qsd_set_edquot(lqe, !!(lvb->lvb_flags & LQUOTA_FL_EDQUOT)); } else if (repbody != NULL && repbody->qb_qunit != 0) { qsd_set_qunit(lqe, repbody->qb_qunit); } @@ -234,13 +400,14 @@ static void qsd_dqacq_completion(const struct lu_env *env, lqe->lqe_nopreacq = true; out: adjust = qsd_adjust_needed(lqe); + if (reqbody && req_is_acq(reqbody->qb_flags) && ret != -EDQUOT) { + lqe->lqe_acq_rc = ret; + lqe->lqe_acq_time = ktime_get_seconds(); + } out_noadjust: - lqe->lqe_pending_req--; - lqe->lqe_pending_rel = 0; + qsd_request_exit(lqe); lqe_write_unlock(lqe); - cfs_waitq_broadcast(&lqe->lqe_waiters); - /* release reference on per-ID lock */ if (lustre_handle_is_used(lockh)) ldlm_lock_decref(lockh, qsd_id_einfo.ei_mode); @@ -253,15 +420,23 @@ out_noadjust: else qsd_adjust_schedule(lqe, true, false); } + lqe_putref(lqe); if (lvb) - /* free lvb allocated in qsd_dqacq */ OBD_FREE_PTR(lvb); - - lqe_putref(lqe); EXIT; } +/** + * Try to consume local quota space. + * + * \param lqe - is the qid entry to be processed + * \param space - is the amount of quota space needed to complete the operation + * + * \retval 0 - success + * \retval -EDQUOT - out of quota + * \retval -EAGAIN - need to acquire space from master + */ static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space) { __u64 usage; @@ -270,7 +445,7 @@ static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space) if (!lqe->lqe_enforced) /* not enforced any more, we are good */ - RETURN(0); + RETURN(-ESRCH); lqe_write_lock(lqe); /* use latest usage */ @@ -283,9 +458,14 @@ static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space) lqe->lqe_pending_write += space; lqe->lqe_waiting_write -= space; rc = 0; - } else if (lqe->lqe_edquot) { + /* lqe_edquot flag is used to avoid flooding dqacq requests when + * the user is over quota, however, the lqe_edquot could be stale + * sometimes due to the race reply of dqacq vs. id lock glimpse + * (see LU-4505), so we revalidate it every 5 seconds. */ + } else if (lqe->lqe_edquot && + (lqe->lqe_edquot_time > ktime_get_seconds() - 5)) { rc = -EDQUOT; - } else { + }else { rc = -EAGAIN; } lqe_write_unlock(lqe); @@ -293,294 +473,216 @@ static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space) RETURN(rc); } -static bool qsd_calc_space(struct lquota_entry *lqe, enum qsd_ops op, - struct quota_body *qbody) +/** + * Compute how much quota space should be acquire from the master based + * on how much is currently granted to this slave and pending/waiting + * operations. + * + * \param lqe - is the lquota entry for which we would like to adjust quota + * space. + * \param qbody - is the quota body of the acquire request to fill + * + * \retval true - space acquisition is needed and qbody is filled + * \retval false - no space acquisition required + */ +static inline bool qsd_calc_acquire(struct lquota_entry *lqe, + struct quota_body *qbody) { - struct qsd_qtype_info *qqi; - __u64 usage, granted; - - if (!lqe->lqe_enforced && op != QSD_REL) - return 0; + __u64 usage, granted; - qqi = lqe2qqi(lqe); - - LASSERT(lqe->lqe_pending_rel == 0); usage = lqe->lqe_usage; usage += lqe->lqe_pending_write + lqe->lqe_waiting_write; granted = lqe->lqe_granted; qbody->qb_flags = 0; -again: - switch (op) { - case QSD_ACQ: - /* if we overconsumed quota space, we report usage in request - * so that master can adjust it unconditionally */ - if (lqe->lqe_usage > lqe->lqe_granted) { - qbody->qb_usage = lqe->lqe_usage; - qbody->qb_flags = QUOTA_DQACQ_FL_REPORT; - granted = lqe->lqe_usage; - } - /* acquire as much as needed, but not more */ - if (usage > granted) { - qbody->qb_count = usage - granted; - qbody->qb_flags |= QUOTA_DQACQ_FL_ACQ; - } - break; - case QSD_REP: - /* When reporting quota (during reintegration or on setquota - * glimpse), we should release granted space if usage is 0. - * Otherwise, if the usage is less than granted, we need to - * acquire the per-ID lock to make sure the unused grant can be - * reclaimed by per-ID lock glimpse. */ - if (lqe->lqe_usage == 0 && lqe->lqe_granted != 0) { - LQUOTA_DEBUG(lqe, "Release on report!"); - GOTO(again, op = QSD_REL); - } else if (lqe->lqe_usage == lqe->lqe_granted) { - LQUOTA_DEBUG(lqe, "Usage matches granted, needn't do " - "anything on report!"); - } else if (lqe->lqe_usage < lqe->lqe_granted) { - LQUOTA_DEBUG(lqe, "Acquire per-ID lock on report!"); - qbody->qb_count = 0; - qbody->qb_flags = QUOTA_DQACQ_FL_ACQ; - } else { - LASSERT(lqe->lqe_usage > lqe->lqe_granted); - LQUOTA_DEBUG(lqe, "Reporting usage"); - qbody->qb_usage = lqe->lqe_usage; - qbody->qb_flags = QUOTA_DQACQ_FL_REPORT; - } - break; - case QSD_REL: - /* release unused quota space unconditionally */ - if (lqe->lqe_granted > lqe->lqe_usage) { - qbody->qb_count = lqe->lqe_granted - lqe->lqe_usage; - qbody->qb_flags = QUOTA_DQACQ_FL_REL; - } - break; - case QSD_ADJ: { - /* need to re-acquire per-ID lock or release all grant */ - if (!lustre_handle_is_used(&lqe->lqe_lockh) && - lqe->lqe_granted > lqe->lqe_usage) - GOTO(again, op = QSD_REP); - - /* release spare grant */ - if (granted > usage + lqe->lqe_qunit) { - /* pre-release quota space */ - qbody->qb_count = granted - usage; - /* if usage == 0, release all granted space */ - if (usage) { - /* try to keep one qunit of quota space */ - qbody->qb_count -= lqe->lqe_qunit; - /* but don't release less than qtune to avoid - * releasing space too often */ - if (qbody->qb_count < lqe->lqe_qtune) - qbody->qb_count = lqe->lqe_qtune; - } - qbody->qb_flags = QUOTA_DQACQ_FL_REL; - break; - } - - /* if we overconsumed quota space, we report usage in request - * so that master can adjust it unconditionally */ - if (lqe->lqe_usage > lqe->lqe_granted) { - qbody->qb_usage = lqe->lqe_usage; - qbody->qb_flags = QUOTA_DQACQ_FL_REPORT; - granted = lqe->lqe_usage; - } - if (!lqe->lqe_edquot && !lqe->lqe_nopreacq && - lustre_handle_is_used(&lqe->lqe_lockh) && usage > 0 && - lqe->lqe_qunit != 0 && granted < usage + lqe->lqe_qtune) { - /* To pre-acquire quota space, we report how much spare - * quota space the slave currently owns, then the master - * will grant us back how much we can pretend given the - * current state of affairs */ - if (granted <= usage) - qbody->qb_count = 0; - else - qbody->qb_count = granted - usage; - qbody->qb_flags |= QUOTA_DQACQ_FL_PREACQ; - } - break; + /* if we overconsumed quota space, we report usage in request so that + * master can adjust it unconditionally */ + if (lqe->lqe_usage > lqe->lqe_granted) { + qbody->qb_usage = lqe->lqe_usage; + qbody->qb_flags = QUOTA_DQACQ_FL_REPORT; + granted = lqe->lqe_usage; } - default: - CERROR("Invalid qsd operation:%u\n", op); - LBUG(); - break; + + /* acquire as much as needed, but not more */ + if (usage > granted) { + qbody->qb_count = usage - granted; + qbody->qb_flags |= QUOTA_DQACQ_FL_ACQ; } + return qbody->qb_flags != 0; } -/* - * Acquire/release quota space from master. +/** + * Acquire quota space from master. * There are at most 1 in-flight dqacq/dqrel. * * \param env - the environment passed by the caller * \param lqe - is the qid entry to be processed - * \param op - operation that want to be performed by the caller * - * \retval 0 - success - * \retval -EDQUOT : out of quota - * -EINPROGRESS : inform client to retry write/create - * -ve : other appropriate errors + * \retval 0 - success + * \retval -EDQUOT - out of quota + * \retval -EINPROGRESS - inform client to retry write/create + * \retval -EBUSY - already a quota request in flight + * \retval -ve - other appropriate errors */ -int qsd_dqacq(const struct lu_env *env, struct lquota_entry *lqe, - enum qsd_ops op) +static int qsd_acquire_remote(const struct lu_env *env, + struct lquota_entry *lqe) { struct qsd_thread_info *qti = qsd_info(env); struct quota_body *qbody = &qti->qti_body; struct qsd_instance *qsd; struct qsd_qtype_info *qqi; - struct ldlm_lock *lock; int rc; - bool intent = false, sync; ENTRY; + memset(qbody, 0, sizeof(*qbody)); + rc = qsd_ready(lqe, &qbody->qb_glb_lockh); + if (rc) + RETURN(rc); + qqi = lqe2qqi(lqe); qsd = qqi->qqi_qsd; - if (qsd->qsd_stopping) { - LQUOTA_DEBUG(lqe, "Dropping quota req since qsd is stopping"); - /* Target is about to shut down, client will retry */ - RETURN(-EINPROGRESS); - } - - if (!qsd_ready(qsd)) { - LQUOTA_DEBUG(lqe, "Connection to master not ready"); - RETURN(-ENOTCONN); - } - - /* In most case, reintegration must have been triggered (when enable - * quota or on OST start), however, in rare race condition (enabling - * quota when starting OSTs), we might miss triggering reintegration - * for some qqi. - * - * If the previous reintegration failed for some reason, we'll - * re-trigger it here as well. */ - if (!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) { - LQUOTA_DEBUG(lqe, "Not up-to-date, dropping request and kicking" - " off reintegration"); - qsd_start_reint_thread(qqi); - RETURN(-EINPROGRESS); - } - - LQUOTA_DEBUG(lqe, "DQACQ starts op=%u", op); - - /* Fill the remote global lock handle, master will check this handle - * to see if the slave is sending request with stale lock */ - cfs_read_lock(&qsd->qsd_lock); - lustre_handle_copy(&qbody->qb_glb_lockh, &qqi->qqi_lockh); - cfs_read_unlock(&qsd->qsd_lock); - - if (!lustre_handle_is_used(&qbody->qb_glb_lockh)) - RETURN(-ENOLCK); - - lock = ldlm_handle2lock(&qbody->qb_glb_lockh); - if (lock == NULL) - RETURN(-ENOLCK); - lustre_handle_copy(&qbody->qb_glb_lockh, &lock->l_remote_handle); - LDLM_LOCK_PUT(lock); - - /* We allow only one in-flight dqacq/dqrel for specified qid, if - * there is already in-flight dqacq/dqrel: - * - * - For QSD_ADJ: we should just abort it, since local limit is going - * to be changed soon; - * - For QSD_ACQ & QSD_REL: we just wait for the in-flight dqacq/dqrel - * finished, and return success to the caller. The caller is - * responsible for retrying; - * - For QSD_REP: we should just abort it, since slave has already - * acquired/released grant; */ - sync = (op == QSD_ACQ || op == QSD_REL) ? true : false; - LASSERTF(lqe->lqe_pending_req <= 1, "pending dqacq/dqrel:%d", - lqe->lqe_pending_req); - lqe_write_lock(lqe); - if (lqe->lqe_pending_req != 0) { - struct l_wait_info lwi = { 0 }; + /* is quota really enforced for this id? */ + if (!lqe->lqe_enforced) { lqe_write_unlock(lqe); - if (!sync) { - LQUOTA_DEBUG(lqe, "Abort DQACQ, op=%d", op); - RETURN(0); - } - - LQUOTA_DEBUG(lqe, "waiting for in-flight dqacq/dqrel"); - l_wait_event(lqe->lqe_waiters, - !lqe_pending_dqacq(lqe) || qsd->qsd_stopping, - &lwi); + LQUOTA_DEBUG(lqe, "quota not enforced any more"); RETURN(0); } /* fill qb_count & qb_flags */ - if (!qsd_calc_space(lqe, op, qbody)) { + if (!qsd_calc_acquire(lqe, qbody)) { lqe_write_unlock(lqe); - LQUOTA_DEBUG(lqe, "No DQACQ required, op=%u", op); + LQUOTA_DEBUG(lqe, "No acquire required"); RETURN(0); } - lqe->lqe_pending_req++; - lqe_write_unlock(lqe); - /* fill other quota body fields */ - qbody->qb_fid = qqi->qqi_fid; - qbody->qb_id = lqe->lqe_id; - memset(&qbody->qb_lockh, 0, sizeof(qbody->qb_lockh)); - memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh)); + /* check whether an acquire request completed recently */ + if (lqe->lqe_acq_rc != 0 && + lqe->lqe_acq_time > ktime_get_seconds() - 1) { + lqe_write_unlock(lqe); + LQUOTA_DEBUG(lqe, "using cached return code %d", lqe->lqe_acq_rc); + RETURN(lqe->lqe_acq_rc); + } + + /* only 1 quota request in flight for a given ID is allowed */ + rc = qsd_request_enter(lqe); + if (rc) { + lqe_write_unlock(lqe); + RETURN(rc); + } + + lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh); + lqe_write_unlock(lqe); /* hold a refcount until completion */ lqe_getref(lqe); - if (req_is_acq(qbody->qb_flags) || req_is_preacq(qbody->qb_flags)) { - /* check whether we already own a lock for this ID */ - lqe_read_lock(lqe); - lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh); - lqe_read_unlock(lqe); - - rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh); - if (rc) { - memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh)); - if (req_is_preacq(qbody->qb_flags)) { - if (req_has_rep(qbody->qb_flags)) - /* still want to report usage */ - qbody->qb_flags = QUOTA_DQACQ_FL_REPORT; - else - /* no pre-acquire if no per-ID lock */ - GOTO(out, rc = -ENOLCK); - } else { - /* no lock found, should use intent */ - intent = true; - } - } else if (req_is_acq(qbody->qb_flags) && - qbody->qb_count == 0) { - /* found cached lock, no need to acquire */ - GOTO(out, rc = 0); - } - } + /* fill other quota body fields */ + qbody->qb_fid = qqi->qqi_fid; + qbody->qb_id = lqe->lqe_id; - if (!intent) { - rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, sync, - qsd_dqacq_completion, qqi, &qti->qti_lockh, - lqe); - } else { - union ldlm_wire_lvb *lvb; + /* check whether we already own a valid lock for this ID */ + rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh); + if (rc) { + struct lquota_lvb *lvb; OBD_ALLOC_PTR(lvb); - if (lvb == NULL) - GOTO(out, rc = -ENOMEM); - - rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, sync, - IT_QUOTA_DQACQ, qsd_dqacq_completion, + if (lvb == NULL) { + rc = -ENOMEM; + qsd_req_completion(env, qqi, qbody, NULL, + &qti->qti_lockh, NULL, lqe, rc); + RETURN(rc); + } + /* no lock found, should use intent */ + rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, true, + IT_QUOTA_DQACQ, qsd_req_completion, qqi, lvb, (void *)lqe); + } else { + /* lock found, should use regular dqacq */ + rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, true, + qsd_req_completion, qqi, &qti->qti_lockh, + lqe); } + /* the completion function will be called by qsd_send_dqacq or * qsd_intent_lock */ RETURN(rc); -out: - qsd_dqacq_completion(env, qqi, qbody, NULL, &qti->qti_lockh, NULL, lqe, - rc); - return rc; } -/* +/** + * Acquire \a space of quota space in order to complete an operation. + * Try to consume local quota space first and send acquire request to quota + * master if required. + * + * \param env - the environment passed by the caller + * \param lqe - is the qid entry to be processed + * \param space - is the amount of quota required for the operation + * \param ret - is the return code (-EDQUOT, -EINPROGRESS, ...) + * + * \retval true - stop waiting in wait_event_idle_timeout, + * and real return value in \a ret + * \retval false - continue waiting + */ +static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe, + long long space, int *ret) +{ + int rc = 0, count; + int wait_pending = 0; + struct qsd_qtype_info *qqi = lqe2qqi(lqe); + + ENTRY; + + for (count = 0; rc == 0; count++) { + LQUOTA_DEBUG(lqe, "acquiring:%lld count=%d", space, count); +again: + if (lqe2qqi(lqe)->qqi_qsd->qsd_stopping) { + rc = -EINPROGRESS; + break; + } + + /* refresh disk usage */ + rc = qsd_refresh_usage(env, lqe); + if (rc) + break; + + /* try to consume local quota space first */ + rc = qsd_acquire_local(lqe, space); + if (rc != -EAGAIN) + /* rc == 0, Wouhou! enough local quota space + * rc < 0, something bad happened */ + break; + /* + * There might be a window that commit transaction + * have updated usage but pending write doesn't change + * wait for it before acquiring remotely. + */ + if (lqe->lqe_pending_write >= space && !wait_pending) { + wait_pending = 1; + dt_wait_quota_pending(qqi->qqi_qsd->qsd_dev); + goto again; + } + + /* if we have gotten some quota and stil wait more quota, + * it's better to give QMT some time to reclaim from clients */ + if (count > 0) + schedule_timeout_interruptible(cfs_time_seconds(1)); + + /* need to acquire more quota space from master */ + rc = qsd_acquire_remote(env, lqe); + } + + if (rc == -EBUSY) + /* already a request in flight, continue waiting */ + RETURN(false); + *ret = rc; + RETURN(true); +} + +/** * Quota enforcement handler. If local quota can satisfy this operation, * return success, otherwise, acquire more quota from master. * (for write operation, if master isn't available at this moment, return @@ -594,17 +696,18 @@ out: * \param flags - if the operation is write, return caller no user/group * and sync commit flags * - * \retval 0 - success - * \retval -EDQUOT : out of quota - * -EINPROGRESS : inform client to retry write - * -ve : other appropriate errors + * \retval 0 - success + * \retval -EDQUOT - out of quota + * \retval -EINPROGRESS - inform client to retry write + * \retval -ve - other appropriate errors */ static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi, struct lquota_id_info *qid, long long space, - int *flags) + enum osd_quota_local_flags *local_flags) { struct lquota_entry *lqe; - int rc = 0, retry_cnt; + enum osd_quota_local_flags qtype_flag = 0; + int rc, ret = -EINPROGRESS; ENTRY; if (qid->lqi_qentry != NULL) { @@ -630,114 +733,125 @@ static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi, * quota space. That said, we still want to perform space * adjustments in qsd_op_end, so we return here, but with * a reference on the lqe */ - if (flags != NULL) { + if (local_flags != NULL) { rc = qsd_refresh_usage(env, lqe); GOTO(out_flags, rc); } RETURN(0); } - LQUOTA_DEBUG(lqe, "op_begin space:"LPD64, space); + LQUOTA_DEBUG(lqe, "op_begin space:%lld", space); lqe_write_lock(lqe); lqe->lqe_waiting_write += space; lqe_write_unlock(lqe); - for (retry_cnt = 0; rc == 0; retry_cnt++) { - /* refresh disk usage if required */ - rc = qsd_refresh_usage(env, lqe); - if (rc) - break; - - /* try to consume local quota space */ - rc = qsd_acquire_local(lqe, space); - if (rc != -EAGAIN) - /* rc == 0, Wouhou! enough local quota space - * rc < 0, something bad happened */ - break; - - /* need to acquire more quota space from master, this is done - * synchronously */ - rc = qsd_dqacq(env, lqe, QSD_ACQ); - LQUOTA_DEBUG(lqe, "Acquired quota space, retry cnt:%d rc:%d", - retry_cnt, rc); - } + /* acquire quota space for the operation, cap overall wait time to + * prevent a service thread from being stuck for too long */ + rc = wait_event_idle_timeout( + lqe->lqe_waiters, qsd_acquire(env, lqe, space, &ret), + cfs_time_seconds(qsd_wait_timeout(qqi->qqi_qsd))); - if (rc == 0) { + if (rc > 0 && ret == 0) { qid->lqi_space += space; + rc = 0; } else { - LQUOTA_DEBUG(lqe, "Acquire quota failed:%d", rc); + if (rc > 0) + rc = ret; + else if (rc == 0) + rc = -ETIMEDOUT; + + LQUOTA_DEBUG(lqe, "acquire quota failed:%d", rc); lqe_write_lock(lqe); lqe->lqe_waiting_write -= space; - if (flags && lqe->lqe_pending_write != 0) + if (local_flags && lqe->lqe_pending_write != 0) /* Inform OSD layer that there are pending writes. * It might want to retry after a sync if appropriate */ - *flags |= QUOTA_FL_SYNC; + *local_flags |= QUOTA_FL_SYNC; lqe_write_unlock(lqe); - /* convert recoverable error into -EINPROGRESS, and client will - * retry write on -EINPROGRESS. */ + /* convert recoverable error into -EINPROGRESS, client will + * retry */ if (rc == -ETIMEDOUT || rc == -ENOTCONN || rc == -ENOLCK || - rc == -EAGAIN || rc == -EINTR) + rc == -EAGAIN || rc == -EINTR) { rc = -EINPROGRESS; + } else if (rc == -ESRCH) { + rc = 0; + LQUOTA_ERROR(lqe, "ID isn't enforced on master, it " + "probably due to a legeal race, if this " + "message is showing up constantly, there " + "could be some inconsistence between " + "master & slave, and quota reintegration " + "needs be re-triggered."); + } } - if (flags != NULL) { + if (local_flags != NULL) { out_flags: LASSERT(qid->lqi_is_blk); if (rc != 0) { - *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype); + *local_flags |= lquota_over_fl(qqi->qqi_qtype); } else { __u64 usage; lqe_read_lock(lqe); - usage = lqe->lqe_usage; - usage += lqe->lqe_pending_write; + usage = lqe->lqe_pending_write; usage += lqe->lqe_waiting_write; - usage += qqi->qqi_qsd->qsd_sync_threshold; + /* There is a chance to successfully grant more quota + * but get edquot flag through glimpse. */ + if (lqe->lqe_edquot || (lqe->lqe_qunit != 0 && + (usage % lqe->lqe_qunit > + qqi->qqi_qsd->qsd_sync_threshold))) + usage += qqi->qqi_qsd->qsd_sync_threshold; + usage += lqe->lqe_usage; + + qtype_flag = lquota_over_fl(qqi->qqi_qtype); /* if we should notify client to start sync write */ if (usage >= lqe->lqe_granted - lqe->lqe_pending_rel) - *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype); + *local_flags |= qtype_flag; else - *flags &= ~LQUOTA_OVER_FL(qqi->qqi_qtype); + *local_flags &= ~qtype_flag; lqe_read_unlock(lqe); } } RETURN(rc); } +/** + * helper function comparing two lquota_id_info structures + */ static inline bool qid_equal(struct lquota_id_info *q1, struct lquota_id_info *q2) { - if (q1->lqi_type != q2->lqi_type) + if (q1->lqi_is_blk != q2->lqi_is_blk || q1->lqi_type != q2->lqi_type) return false; return (q1->lqi_id.qid_uid == q2->lqi_id.qid_uid) ? true : false; } -/* +/** * Enforce quota, it's called in the declaration of each operation. * qsd_op_end() will then be called later once all the operations have been * completed in order to release/adjust the quota space. * - * \param env - the environment passed by the caller - * \param qsd - is the qsd instance associated with the device in charge - * of the operation. - * \param trans - is the quota transaction information - * \param qi - qid & space required by current operation - * \param flags - if the operation is write, return caller no user/group - * and sync commit flags + * \param env - the environment passed by the caller + * \param qsd - is the qsd instance associated with the device in charge of + * the operation. + * \param trans - is the quota transaction information + * \param qi - qid & space required by current operation + * \param flags - if the operation is write, return caller no user/group and + * sync commit flags * - * \retval 0 - success - * \retval -EDQUOT : out of quota - * -EINPROGRESS : inform client to retry write - * -ve : other appropriate errors + * \retval 0 - success + * \retval -EDQUOT - out of quota + * \retval -EINPROGRESS - inform client to retry write + * \retval -ve - other appropriate errors */ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd, struct lquota_trans *trans, struct lquota_id_info *qi, - int *flags) + enum osd_quota_local_flags *local_flags) { int i, rc; bool found = false; @@ -746,13 +860,16 @@ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd, if (unlikely(qsd == NULL)) RETURN(0); + if (qsd->qsd_dev->dd_rdonly) + RETURN(0); + /* We don't enforce quota until the qsd_instance is started */ - cfs_read_lock(&qsd->qsd_lock); + read_lock(&qsd->qsd_lock); if (!qsd->qsd_started) { - cfs_read_unlock(&qsd->qsd_lock); + read_unlock(&qsd->qsd_lock); RETURN(0); } - cfs_read_unlock(&qsd->qsd_lock); + read_unlock(&qsd->qsd_lock); /* ignore block quota on MDTs, ignore inode quota on OSTs */ if ((!qsd->qsd_is_md && !qi->lqi_is_blk) || @@ -761,18 +878,18 @@ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd, /* ignore quota enforcement request when: * - quota isn't enforced for this quota type - * or - the user/group is root */ - if (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0) + * or - the user/group is root + * or - quota accounting isn't enabled */ + if (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0 || + (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed) RETURN(0); - LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d", + LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d\n", trans->lqt_id_cnt); /* check whether we already allocated a slot for this id */ for (i = 0; i < trans->lqt_id_cnt; i++) { if (qid_equal(qi, &trans->lqt_ids[i])) { found = true; - /* make sure we are not mixing inodes & blocks */ - LASSERT(trans->lqt_ids[i].lqi_is_blk == qi->lqi_is_blk); break; } } @@ -793,12 +910,130 @@ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd, /* manage quota enforcement for this ID */ rc = qsd_op_begin0(env, qsd->qsd_type_array[qi->lqi_type], - &trans->lqt_ids[i], qi->lqi_space, flags); + &trans->lqt_ids[i], qi->lqi_space, local_flags); RETURN(rc); } EXPORT_SYMBOL(qsd_op_begin); /** + * Adjust quota space (by acquiring or releasing) hold by the quota slave. + * This function is called after each quota request completion and during + * reintegration in order to report usage or re-acquire quota locks. + * Space adjustment is aborted if there is already a quota request in flight + * for this ID. + * + * \param env - the environment passed by the caller + * \param lqe - is the qid entry to be processed + * + * \retval 0 on success, appropriate errors on failure + */ +int qsd_adjust(const struct lu_env *env, struct lquota_entry *lqe) +{ + struct qsd_thread_info *qti = qsd_info(env); + struct quota_body *qbody = &qti->qti_body; + struct qsd_instance *qsd; + struct qsd_qtype_info *qqi; + int rc; + bool intent = false; + ENTRY; + + memset(qbody, 0, sizeof(*qbody)); + rc = qsd_ready(lqe, &qbody->qb_glb_lockh); + if (rc) { + /* add to adjust list again to trigger adjustment later when + * slave is ready */ + LQUOTA_DEBUG(lqe, "delaying adjustment since qsd isn't ready"); + qsd_adjust_schedule(lqe, true, false); + RETURN(0); + } + + qqi = lqe2qqi(lqe); + qsd = qqi->qqi_qsd; + + if (qsd->qsd_dev->dd_rdonly) + RETURN(0); + + lqe_write_lock(lqe); + + /* fill qb_count & qb_flags */ + if (!qsd_calc_adjust(lqe, qbody)) { + lqe_write_unlock(lqe); + LQUOTA_DEBUG(lqe, "no adjustment required"); + RETURN(0); + } + + /* only 1 quota request in flight for a given ID is allowed */ + rc = qsd_request_enter(lqe); + if (rc) { + /* already a request in flight, space adjustment will be run + * again on request completion */ + lqe_write_unlock(lqe); + RETURN(0); + } + + if (req_is_rel(qbody->qb_flags)) + lqe->lqe_pending_rel = qbody->qb_count; + lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh); + lqe_write_unlock(lqe); + + /* hold a refcount until completion */ + lqe_getref(lqe); + + /* fill other quota body fields */ + qbody->qb_fid = qqi->qqi_fid; + qbody->qb_id = lqe->lqe_id; + + if (req_is_acq(qbody->qb_flags) || req_is_preacq(qbody->qb_flags)) { + /* check whether we own a valid lock for this ID */ + rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh); + if (rc) { + memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh)); + if (req_is_preacq(qbody->qb_flags)) { + if (req_has_rep(qbody->qb_flags)) + /* still want to report usage */ + qbody->qb_flags = QUOTA_DQACQ_FL_REPORT; + else + /* no pre-acquire if no per-ID lock */ + GOTO(out, rc = -ENOLCK); + } else { + /* no lock found, should use intent */ + intent = true; + } + } else if (req_is_acq(qbody->qb_flags) && + qbody->qb_count == 0) { + /* found cached lock, no need to acquire */ + GOTO(out, rc = 0); + } + } else { + /* release and report don't need a per-ID lock */ + memset(&qti->qti_lockh, 0, sizeof(qti->qti_lockh)); + } + + if (!intent) { + rc = qsd_send_dqacq(env, qsd->qsd_exp, qbody, false, + qsd_req_completion, qqi, &qti->qti_lockh, + lqe); + } else { + struct lquota_lvb *lvb; + + OBD_ALLOC_PTR(lvb); + if (lvb == NULL) + GOTO(out, rc = -ENOMEM); + + rc = qsd_intent_lock(env, qsd->qsd_exp, qbody, false, + IT_QUOTA_DQACQ, qsd_req_completion, + qqi, lvb, (void *)lqe); + } + /* the completion function will be called by qsd_send_dqacq or + * qsd_intent_lock */ + RETURN(rc); +out: + qsd_req_completion(env, qqi, qbody, NULL, &qti->qti_lockh, NULL, lqe, + rc); + return rc; +} + +/** * Post quota operation, pre-acquire/release quota from master. * * \param env - the environment passed by the caller @@ -840,7 +1075,7 @@ static void qsd_op_end0(const struct lu_env *env, struct qsd_qtype_info *qqi, if (adjust) { /* pre-acquire/release quota space is needed */ if (env != NULL) - qsd_dqacq(env, lqe, QSD_ADJ); + qsd_adjust(env, lqe); else /* no suitable environment, handle adjustment in * separate thread context */ @@ -850,7 +1085,7 @@ static void qsd_op_end0(const struct lu_env *env, struct qsd_qtype_info *qqi, EXIT; } -/* +/** * Post quota operation. It's called after each operation transaction stopped. * * \param env - the environment passed by the caller @@ -871,13 +1106,16 @@ void qsd_op_end(const struct lu_env *env, struct qsd_instance *qsd, if (unlikely(qsd == NULL)) RETURN_EXIT; + if (qsd->qsd_dev->dd_rdonly) + RETURN_EXIT; + /* We don't enforce quota until the qsd_instance is started */ - cfs_read_lock(&qsd->qsd_lock); + read_lock(&qsd->qsd_lock); if (!qsd->qsd_started) { - cfs_read_unlock(&qsd->qsd_lock); + read_unlock(&qsd->qsd_lock); RETURN_EXIT; } - cfs_read_unlock(&qsd->qsd_lock); + read_unlock(&qsd->qsd_lock); LASSERT(trans != NULL); @@ -898,8 +1136,22 @@ void qsd_op_end(const struct lu_env *env, struct qsd_instance *qsd, } EXPORT_SYMBOL(qsd_op_end); -void qsd_adjust_quota(const struct lu_env *env, struct qsd_instance *qsd, - union lquota_id *qid, int qtype) +/** + * Trigger pre-acquire/release if necessary. + * It's only used by ldiskfs osd so far. When unlink a file in ldiskfs, the + * quota accounting isn't updated when the transaction stopped. Instead, it'll + * be updated on the final iput, so qsd_op_adjust() will be called then (in + * osd_object_delete()) to trigger quota release if necessary. + * + * \param env - the environment passed by the caller + * \param qsd - is the qsd instance associated with the device in charge + * of the operation. + * \param qid - is the lquota ID of the user/group for which to trigger + * quota space adjustment + * \param qtype - is the quota type (USRQUOTA or GRPQUOTA) + */ +void qsd_op_adjust(const struct lu_env *env, struct qsd_instance *qsd, + union lquota_id *qid, int qtype) { struct lquota_entry *lqe; struct qsd_qtype_info *qqi; @@ -910,12 +1162,12 @@ void qsd_adjust_quota(const struct lu_env *env, struct qsd_instance *qsd, RETURN_EXIT; /* We don't enforce quota until the qsd_instance is started */ - cfs_read_lock(&qsd->qsd_lock); + read_lock(&qsd->qsd_lock); if (!qsd->qsd_started) { - cfs_read_unlock(&qsd->qsd_lock); + read_unlock(&qsd->qsd_lock); RETURN_EXIT; } - cfs_read_unlock(&qsd->qsd_lock); + read_unlock(&qsd->qsd_lock); qqi = qsd->qsd_type_array[qtype]; LASSERT(qqi); @@ -924,16 +1176,16 @@ void qsd_adjust_quota(const struct lu_env *env, struct qsd_instance *qsd, qid->qid_uid == 0) RETURN_EXIT; - cfs_read_lock(&qsd->qsd_lock); + read_lock(&qsd->qsd_lock); if (!qsd->qsd_started) { - cfs_read_unlock(&qsd->qsd_lock); + read_unlock(&qsd->qsd_lock); RETURN_EXIT; } - cfs_read_unlock(&qsd->qsd_lock); + read_unlock(&qsd->qsd_lock); lqe = lqe_locate(env, qqi->qqi_site, qid); if (IS_ERR(lqe)) { - CERROR("%s: fail to locate lqe for id:"LPU64", type:%d\n", + CERROR("%s: fail to locate lqe for id:%llu, type:%d\n", qsd->qsd_svname, qid->qid_uid, qtype); RETURN_EXIT; } @@ -945,9 +1197,9 @@ void qsd_adjust_quota(const struct lu_env *env, struct qsd_instance *qsd, lqe_read_unlock(lqe); if (adjust) - qsd_dqacq(env, lqe, QSD_ADJ); + qsd_adjust(env, lqe); lqe_putref(lqe); EXIT; } -EXPORT_SYMBOL(qsd_adjust_quota); +EXPORT_SYMBOL(qsd_op_adjust);