X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fquota%2Fqsd_handler.c;h=f8011eb6d6d711d09c382e0940d91a892d8df524;hb=6c6f247bc08bb6c3a3c07d2da2774a54fd244c9a;hp=01b3e54bb4690addb3b961180cdb0b5e9187e01d;hpb=6929549acde2357339aec7bcaac6ec301150e96e;p=fs%2Flustre-release.git diff --git a/lustre/quota/qsd_handler.c b/lustre/quota/qsd_handler.c index 01b3e54..f8011eb 100644 --- a/lustre/quota/qsd_handler.c +++ b/lustre/quota/qsd_handler.c @@ -21,17 +21,13 @@ * GPL HEADER END */ /* - * Copyright (c) 2012 Intel, Inc. + * Copyright (c) 2012, 2017, Intel Corporation. * Use is subject to license terms. * * Author: Johann Lombardi * Author: Niu Yawei */ -#ifndef EXPORT_SYMTAB -# define EXPORT_SYMTAB -#endif - #define DEBUG_SUBSYSTEM S_LQUOTA #include "qsd_internal.h" @@ -49,7 +45,7 @@ static inline int qsd_request_enter(struct lquota_entry *lqe) } if (lqe->lqe_pending_rel != 0) { - LQUOTA_ERROR(lqe, "no request in flight with pending_rel="LPU64, + LQUOTA_ERROR(lqe, "no request in flight with pending_rel=%llu", lqe->lqe_pending_rel); LBUG(); } @@ -69,7 +65,7 @@ static inline void qsd_request_exit(struct lquota_entry *lqe) } lqe->lqe_pending_req--; lqe->lqe_pending_rel = 0; - cfs_waitq_broadcast(&lqe->lqe_waiters); + wake_up_all(&lqe->lqe_waiters); } /** @@ -94,10 +90,10 @@ static int qsd_ready(struct lquota_entry *lqe, struct lustre_handle *lockh) struct ldlm_lock *lock; ENTRY; - cfs_read_lock(&qsd->qsd_lock); + read_lock(&qsd->qsd_lock); /* is the qsd about to shut down? */ if (qsd->qsd_stopping) { - cfs_read_unlock(&qsd->qsd_lock); + read_unlock(&qsd->qsd_lock); LQUOTA_DEBUG(lqe, "dropping quota req since qsd is stopping"); /* Target is about to shut down, client will retry */ RETURN(-EINPROGRESS); @@ -107,7 +103,7 @@ static int qsd_ready(struct lquota_entry *lqe, struct lustre_handle *lockh) if (qsd->qsd_exp_valid) imp = class_exp2cliimp(qsd->qsd_exp); if (imp == NULL || imp->imp_invalid) { - cfs_read_unlock(&qsd->qsd_lock); + read_unlock(&qsd->qsd_lock); LQUOTA_DEBUG(lqe, "connection to master not ready"); RETURN(-ENOTCONN); } @@ -120,7 +116,7 @@ static int qsd_ready(struct lquota_entry *lqe, struct lustre_handle *lockh) * If the previous reintegration failed for some reason, we'll * re-trigger it here as well. */ if (!qqi->qqi_glb_uptodate || !qqi->qqi_slv_uptodate) { - cfs_read_unlock(&qsd->qsd_lock); + read_unlock(&qsd->qsd_lock); LQUOTA_DEBUG(lqe, "not up-to-date, dropping request and " "kicking off reintegration"); qsd_start_reint_thread(qqi); @@ -130,7 +126,7 @@ static int qsd_ready(struct lquota_entry *lqe, struct lustre_handle *lockh) /* Fill the remote global lock handle, master will check this handle * to see if the slave is sending request with stale lock */ lustre_handle_copy(lockh, &qqi->qqi_lockh); - cfs_read_unlock(&qsd->qsd_lock); + read_unlock(&qsd->qsd_lock); if (!lustre_handle_is_used(lockh)) RETURN(-ENOLCK); @@ -308,7 +304,7 @@ static void qsd_req_completion(const struct lu_env *env, struct quota_body *reqbody, struct quota_body *repbody, struct lustre_handle *lockh, - union ldlm_wire_lvb *lvb, + struct lquota_lvb *lvb, void *arg, int ret) { struct lquota_entry *lqe = (struct lquota_entry *)arg; @@ -354,12 +350,12 @@ static void qsd_req_completion(const struct lu_env *env, * the DQACQ since the limit for this ID has been removed, so we * should not update quota entry & slave index copy neither. */ if (repbody != NULL && repbody->qb_count != 0) { - LQUOTA_DEBUG(lqe, "DQACQ qb_count:"LPU64, repbody->qb_count); + LQUOTA_DEBUG(lqe, "DQACQ qb_count:%llu", repbody->qb_count); if (req_is_rel(reqbody->qb_flags)) { if (lqe->lqe_granted < repbody->qb_count) { LQUOTA_ERROR(lqe, "can't release more space " - "than owned "LPU64"<"LPU64, + "than owned %llu<%llu", lqe->lqe_granted, repbody->qb_count); lqe->lqe_granted = 0; @@ -389,13 +385,10 @@ static void qsd_req_completion(const struct lu_env *env, } /* extract information from lvb */ - if (ret == 0 && lvb != 0) { - if (lvb->l_lquota.lvb_id_qunit != 0) - qsd_set_qunit(lqe, lvb->l_lquota.lvb_id_qunit); - if (lvb->l_lquota.lvb_flags & LQUOTA_FL_EDQUOT) - lqe->lqe_edquot = true; - else - lqe->lqe_edquot = false; + if (ret == 0 && lvb != NULL) { + if (lvb->lvb_id_qunit != 0) + qsd_set_qunit(lqe, lvb->lvb_id_qunit); + qsd_set_edquot(lqe, !!(lvb->lvb_flags & LQUOTA_FL_EDQUOT)); } else if (repbody != NULL && repbody->qb_qunit != 0) { qsd_set_qunit(lqe, repbody->qb_qunit); } @@ -409,7 +402,7 @@ out: adjust = qsd_adjust_needed(lqe); if (reqbody && req_is_acq(reqbody->qb_flags) && ret != -EDQUOT) { lqe->lqe_acq_rc = ret; - lqe->lqe_acq_time = cfs_time_current_64(); + lqe->lqe_acq_time = ktime_get_seconds(); } out_noadjust: qsd_request_exit(lqe); @@ -465,9 +458,14 @@ static int qsd_acquire_local(struct lquota_entry *lqe, __u64 space) lqe->lqe_pending_write += space; lqe->lqe_waiting_write -= space; rc = 0; - } else if (lqe->lqe_edquot) { + /* lqe_edquot flag is used to avoid flooding dqacq requests when + * the user is over quota, however, the lqe_edquot could be stale + * sometimes due to the race reply of dqacq vs. id lock glimpse + * (see LU-4505), so we revalidate it every 5 seconds. */ + } else if (lqe->lqe_edquot && + (lqe->lqe_edquot_time > ktime_get_seconds() - 5)) { rc = -EDQUOT; - } else { + }else { rc = -EAGAIN; } lqe_write_unlock(lqe); @@ -564,7 +562,7 @@ static int qsd_acquire_remote(const struct lu_env *env, /* check whether an acquire request completed recently */ if (lqe->lqe_acq_rc != 0 && - cfs_time_before_64(cfs_time_shift_64(-1), lqe->lqe_acq_time)) { + lqe->lqe_acq_time > ktime_get_seconds() - 1) { lqe_write_unlock(lqe); LQUOTA_DEBUG(lqe, "using cached return code %d", lqe->lqe_acq_rc); RETURN(lqe->lqe_acq_rc); @@ -590,7 +588,7 @@ static int qsd_acquire_remote(const struct lu_env *env, /* check whether we already own a valid lock for this ID */ rc = qsd_id_lock_match(&qti->qti_lockh, &qbody->qb_lockh); if (rc) { - union ldlm_wire_lvb *lvb; + struct lquota_lvb *lvb; OBD_ALLOC_PTR(lvb); if (lvb == NULL) { @@ -625,7 +623,8 @@ static int qsd_acquire_remote(const struct lu_env *env, * \param space - is the amount of quota required for the operation * \param ret - is the return code (-EDQUOT, -EINPROGRESS, ...) * - * \retval true - exit from l_wait_event and real return value in \a ret + * \retval true - stop waiting in wait_event_idle_timeout, + * and real return value in \a ret * \retval false - continue waiting */ static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe, @@ -635,7 +634,7 @@ static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe, ENTRY; for (count = 0; rc == 0; count++) { - LQUOTA_DEBUG(lqe, "acquiring:"LPD64 " count=%d", space, count); + LQUOTA_DEBUG(lqe, "acquiring:%lld count=%d", space, count); if (lqe2qqi(lqe)->qqi_qsd->qsd_stopping) { rc = -EINPROGRESS; @@ -654,6 +653,11 @@ static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe, * rc < 0, something bad happened */ break; + /* if we have gotten some quota and stil wait more quota, + * it's better to give QMT some time to reclaim from clients */ + if (count > 0) + schedule_timeout_interruptible(cfs_time_seconds(1)); + /* need to acquire more quota space from master */ rc = qsd_acquire_remote(env, lqe); } @@ -662,7 +666,7 @@ static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe, /* already a request in flight, continue waiting */ RETURN(false); *ret = rc; - RETURN(true); /* exit from l_wait_event */ + RETURN(true); } /** @@ -686,11 +690,11 @@ static bool qsd_acquire(const struct lu_env *env, struct lquota_entry *lqe, */ static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi, struct lquota_id_info *qid, long long space, - int *flags) + enum osd_quota_local_flags *local_flags) { - struct lquota_entry *lqe; - int rc, ret = -EINPROGRESS; - struct l_wait_info lwi; + struct lquota_entry *lqe; + enum osd_quota_local_flags qtype_flag = 0; + int rc, ret = -EINPROGRESS; ENTRY; if (qid->lqi_qentry != NULL) { @@ -716,14 +720,14 @@ static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi, * quota space. That said, we still want to perform space * adjustments in qsd_op_end, so we return here, but with * a reference on the lqe */ - if (flags != NULL) { + if (local_flags != NULL) { rc = qsd_refresh_usage(env, lqe); GOTO(out_flags, rc); } RETURN(0); } - LQUOTA_DEBUG(lqe, "op_begin space:"LPD64, space); + LQUOTA_DEBUG(lqe, "op_begin space:%lld", space); lqe_write_lock(lqe); lqe->lqe_waiting_write += space; @@ -731,26 +735,28 @@ static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi, /* acquire quota space for the operation, cap overall wait time to * prevent a service thread from being stuck for too long */ - lwi = LWI_TIMEOUT(cfs_time_seconds(qsd_wait_timeout(qqi->qqi_qsd)), - NULL, NULL); - rc = l_wait_event(lqe->lqe_waiters, qsd_acquire(env, lqe, space, &ret), - &lwi); + rc = wait_event_idle_timeout( + lqe->lqe_waiters, qsd_acquire(env, lqe, space, &ret), + cfs_time_seconds(qsd_wait_timeout(qqi->qqi_qsd))); - if (rc == 0 && ret == 0) { + if (rc > 0 && ret == 0) { qid->lqi_space += space; + rc = 0; } else { - if (rc == 0) + if (rc > 0) rc = ret; + else if (rc == 0) + rc = -ETIMEDOUT; LQUOTA_DEBUG(lqe, "acquire quota failed:%d", rc); lqe_write_lock(lqe); lqe->lqe_waiting_write -= space; - if (flags && lqe->lqe_pending_write != 0) + if (local_flags && lqe->lqe_pending_write != 0) /* Inform OSD layer that there are pending writes. * It might want to retry after a sync if appropriate */ - *flags |= QUOTA_FL_SYNC; + *local_flags |= QUOTA_FL_SYNC; lqe_write_unlock(lqe); /* convert recoverable error into -EINPROGRESS, client will @@ -769,25 +775,29 @@ static int qsd_op_begin0(const struct lu_env *env, struct qsd_qtype_info *qqi, } } - if (flags != NULL) { + if (local_flags != NULL) { out_flags: LASSERT(qid->lqi_is_blk); if (rc != 0) { - *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype); + *local_flags |= lquota_over_fl(qqi->qqi_qtype); } else { __u64 usage; lqe_read_lock(lqe); - usage = lqe->lqe_usage; - usage += lqe->lqe_pending_write; + usage = lqe->lqe_pending_write; usage += lqe->lqe_waiting_write; - usage += qqi->qqi_qsd->qsd_sync_threshold; + if (lqe->lqe_qunit != 0 && (usage % lqe->lqe_qunit > + qqi->qqi_qsd->qsd_sync_threshold)) + usage += qqi->qqi_qsd->qsd_sync_threshold; + + usage += lqe->lqe_usage; + qtype_flag = lquota_over_fl(qqi->qqi_qtype); /* if we should notify client to start sync write */ if (usage >= lqe->lqe_granted - lqe->lqe_pending_rel) - *flags |= LQUOTA_OVER_FL(qqi->qqi_qtype); + *local_flags |= qtype_flag; else - *flags &= ~LQUOTA_OVER_FL(qqi->qqi_qtype); + *local_flags &= ~qtype_flag; lqe_read_unlock(lqe); } } @@ -800,7 +810,7 @@ out_flags: static inline bool qid_equal(struct lquota_id_info *q1, struct lquota_id_info *q2) { - if (q1->lqi_type != q2->lqi_type) + if (q1->lqi_is_blk != q2->lqi_is_blk || q1->lqi_type != q2->lqi_type) return false; return (q1->lqi_id.qid_uid == q2->lqi_id.qid_uid) ? true : false; } @@ -825,7 +835,7 @@ static inline bool qid_equal(struct lquota_id_info *q1, */ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd, struct lquota_trans *trans, struct lquota_id_info *qi, - int *flags) + enum osd_quota_local_flags *local_flags) { int i, rc; bool found = false; @@ -834,13 +844,16 @@ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd, if (unlikely(qsd == NULL)) RETURN(0); + if (qsd->qsd_dev->dd_rdonly) + RETURN(0); + /* We don't enforce quota until the qsd_instance is started */ - cfs_read_lock(&qsd->qsd_lock); + read_lock(&qsd->qsd_lock); if (!qsd->qsd_started) { - cfs_read_unlock(&qsd->qsd_lock); + read_unlock(&qsd->qsd_lock); RETURN(0); } - cfs_read_unlock(&qsd->qsd_lock); + read_unlock(&qsd->qsd_lock); /* ignore block quota on MDTs, ignore inode quota on OSTs */ if ((!qsd->qsd_is_md && !qi->lqi_is_blk) || @@ -849,18 +862,18 @@ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd, /* ignore quota enforcement request when: * - quota isn't enforced for this quota type - * or - the user/group is root */ - if (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0) + * or - the user/group is root + * or - quota accounting isn't enabled */ + if (!qsd_type_enabled(qsd, qi->lqi_type) || qi->lqi_id.qid_uid == 0 || + (qsd->qsd_type_array[qi->lqi_type])->qqi_acct_failed) RETURN(0); - LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d", + LASSERTF(trans->lqt_id_cnt <= QUOTA_MAX_TRANSIDS, "id_cnt=%d\n", trans->lqt_id_cnt); /* check whether we already allocated a slot for this id */ for (i = 0; i < trans->lqt_id_cnt; i++) { if (qid_equal(qi, &trans->lqt_ids[i])) { found = true; - /* make sure we are not mixing inodes & blocks */ - LASSERT(trans->lqt_ids[i].lqi_is_blk == qi->lqi_is_blk); break; } } @@ -881,7 +894,7 @@ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd, /* manage quota enforcement for this ID */ rc = qsd_op_begin0(env, qsd->qsd_type_array[qi->lqi_type], - &trans->lqt_ids[i], qi->lqi_space, flags); + &trans->lqt_ids[i], qi->lqi_space, local_flags); RETURN(rc); } EXPORT_SYMBOL(qsd_op_begin); @@ -921,6 +934,9 @@ int qsd_adjust(const struct lu_env *env, struct lquota_entry *lqe) qqi = lqe2qqi(lqe); qsd = qqi->qqi_qsd; + if (qsd->qsd_dev->dd_rdonly) + RETURN(0); + lqe_write_lock(lqe); /* fill qb_count & qb_flags */ @@ -982,7 +998,7 @@ int qsd_adjust(const struct lu_env *env, struct lquota_entry *lqe) qsd_req_completion, qqi, &qti->qti_lockh, lqe); } else { - union ldlm_wire_lvb *lvb; + struct lquota_lvb *lvb; OBD_ALLOC_PTR(lvb); if (lvb == NULL) @@ -1074,13 +1090,16 @@ void qsd_op_end(const struct lu_env *env, struct qsd_instance *qsd, if (unlikely(qsd == NULL)) RETURN_EXIT; + if (qsd->qsd_dev->dd_rdonly) + RETURN_EXIT; + /* We don't enforce quota until the qsd_instance is started */ - cfs_read_lock(&qsd->qsd_lock); + read_lock(&qsd->qsd_lock); if (!qsd->qsd_started) { - cfs_read_unlock(&qsd->qsd_lock); + read_unlock(&qsd->qsd_lock); RETURN_EXIT; } - cfs_read_unlock(&qsd->qsd_lock); + read_unlock(&qsd->qsd_lock); LASSERT(trans != NULL); @@ -1127,12 +1146,12 @@ void qsd_op_adjust(const struct lu_env *env, struct qsd_instance *qsd, RETURN_EXIT; /* We don't enforce quota until the qsd_instance is started */ - cfs_read_lock(&qsd->qsd_lock); + read_lock(&qsd->qsd_lock); if (!qsd->qsd_started) { - cfs_read_unlock(&qsd->qsd_lock); + read_unlock(&qsd->qsd_lock); RETURN_EXIT; } - cfs_read_unlock(&qsd->qsd_lock); + read_unlock(&qsd->qsd_lock); qqi = qsd->qsd_type_array[qtype]; LASSERT(qqi); @@ -1141,16 +1160,16 @@ void qsd_op_adjust(const struct lu_env *env, struct qsd_instance *qsd, qid->qid_uid == 0) RETURN_EXIT; - cfs_read_lock(&qsd->qsd_lock); + read_lock(&qsd->qsd_lock); if (!qsd->qsd_started) { - cfs_read_unlock(&qsd->qsd_lock); + read_unlock(&qsd->qsd_lock); RETURN_EXIT; } - cfs_read_unlock(&qsd->qsd_lock); + read_unlock(&qsd->qsd_lock); lqe = lqe_locate(env, qqi->qqi_site, qid); if (IS_ERR(lqe)) { - CERROR("%s: fail to locate lqe for id:"LPU64", type:%d\n", + CERROR("%s: fail to locate lqe for id:%llu, type:%d\n", qsd->qsd_svname, qid->qid_uid, qtype); RETURN_EXIT; }