X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fquota%2Fquota_context.c;h=847851d335d72208bede5fe551bb72b9c9777bf1;hb=7661294557774975dbd7aa90014f4ab1610db7df;hp=3f034259d3c4de56cdd69c995cef106f50d142ab;hpb=d2d56f38da01001c92a09afc6b52b5acbd9bc13c;p=fs%2Flustre-release.git diff --git a/lustre/quota/quota_context.c b/lustre/quota/quota_context.c index 3f03425..847851d 100644 --- a/lustre/quota/quota_context.c +++ b/lustre/quota/quota_context.c @@ -1,22 +1,50 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * lustre/quota/quota_context.c - * Lustre Quota Context + * GPL HEADER START * - * Copyright (c) 2001-2005 Cluster File Systems, Inc. - * Author: Niu YaWei + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * This file is part of Lustre, http://www.lustre.org. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. * - * No redistribution or use is permitted outside of Cluster File Systems, Inc. + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Use is subject to license terms. */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. + * + * lustre/quota/quota_context.c + * + * Lustre Quota Context + * + * Author: Niu YaWei + */ + #ifndef EXPORT_SYMTAB # define EXPORT_SYMTAB #endif -#define DEBUG_SUBSYSTEM S_MDS +#define DEBUG_SUBSYSTEM S_LQUOTA #include #include @@ -29,26 +57,90 @@ #include #include #include +#include +#include #include "quota_internal.h" -unsigned long default_bunit_sz = 100 * 1024 * 1024; /* 100M bytes */ -unsigned long default_btune_ratio = 50; /* 50 percentage */ -unsigned long default_iunit_sz = 5000; /* 5000 inodes */ -unsigned long default_itune_ratio = 50; /* 50 percentage */ +#ifdef HAVE_QUOTA_SUPPORT + +static lustre_hash_ops_t lqs_hash_ops; + +unsigned long default_bunit_sz = 128 * 1024 * 1024; /* 128M bytes */ +unsigned long default_btune_ratio = 50; /* 50 percentage */ +unsigned long default_iunit_sz = 5120; /* 5120 inodes */ +unsigned long default_itune_ratio = 50; /* 50 percentage */ cfs_mem_cache_t *qunit_cachep = NULL; struct list_head qunit_hash[NR_DQHASH]; spinlock_t qunit_hash_lock = SPIN_LOCK_UNLOCKED; +/* please sync qunit_state with qunit_state_names */ +enum qunit_state { + /** + * a qunit is created + */ + QUNIT_CREATED = 0, + /** + * a qunit is added into qunit hash, that means + * a quota req will be sent or is flying + */ + QUNIT_IN_HASH = 1, + /** + * a qunit is removed from qunit hash, that + * means a quota req is handled and comes back + */ + QUNIT_RM_FROM_HASH = 2, + /** + * qunit can wake up all threads waiting for it + */ + QUNIT_FINISHED = 3, +}; + +static const char *qunit_state_names[] = { + [QUNIT_CREATED] = "CREATED", + [QUNIT_IN_HASH] = "IN_HASH", + [QUNIT_RM_FROM_HASH] = "RM_FROM_HASH", + [QUNIT_FINISHED] = "FINISHED", +}; + struct lustre_qunit { - struct list_head lq_hash; /* Hash list in memory */ - atomic_t lq_refcnt; /* Use count */ - struct lustre_quota_ctxt *lq_ctxt; /* Quota context this applies to */ - struct qunit_data lq_data; /* See qunit_data */ - unsigned int lq_opc; /* QUOTA_DQACQ, QUOTA_DQREL */ - struct list_head lq_waiters; /* All write threads waiting for this qunit */ + struct list_head lq_hash; /** Hash list in memory */ + atomic_t lq_refcnt; /** Use count */ + struct lustre_quota_ctxt *lq_ctxt; /** Quota context this applies to */ + struct qunit_data lq_data; /** See qunit_data */ + unsigned int lq_opc; /** QUOTA_DQACQ, QUOTA_DQREL */ + cfs_waitq_t lq_waitq; /** Threads waiting for this qunit */ + spinlock_t lq_lock; /** Protect the whole structure */ + enum qunit_state lq_state; /** Present the status of qunit */ + int lq_rc; /** The rc of lq_data */ + pid_t lq_owner; }; +#define QUNIT_SET_STATE(qunit, state) \ +do { \ + spin_lock(&qunit->lq_lock); \ + QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), " \ + "lq_rc(%d), lq_owner(%d)\n", \ + qunit, qunit_state_names[qunit->lq_state], \ + qunit_state_names[state], qunit->lq_rc, \ + qunit->lq_owner); \ + qunit->lq_state = state; \ + spin_unlock(&qunit->lq_lock); \ +} while(0) + +#define QUNIT_SET_STATE_AND_RC(qunit, state, rc) \ +do { \ + spin_lock(&qunit->lq_lock); \ + qunit->lq_rc = rc; \ + QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), " \ + "lq_rc(%d), lq_owner(%d)\n", \ + qunit, qunit_state_names[qunit->lq_state], \ + qunit_state_names[state], qunit->lq_rc, \ + qunit->lq_owner); \ + qunit->lq_state = state; \ + spin_unlock(&qunit->lq_lock); \ +} while(0) + int should_translate_quota (struct obd_import *imp) { ENTRY; @@ -73,7 +165,7 @@ void qunit_cache_cleanup(void) if (qunit_cachep) { int rc; rc = cfs_mem_cache_destroy(qunit_cachep); - LASSERTF(rc == 0, "couldn't destory qunit_cache slab\n"); + LASSERTF(rc == 0, "couldn't destroy qunit_cache slab\n"); qunit_cachep = NULL; } EXIT; @@ -86,14 +178,14 @@ int qunit_cache_init(void) LASSERT(qunit_cachep == NULL); qunit_cachep = cfs_mem_cache_create("ll_qunit_cache", - sizeof(struct lustre_qunit), - 0, 0); + sizeof(struct lustre_qunit), + 0, 0); if (!qunit_cachep) RETURN(-ENOMEM); spin_lock(&qunit_hash_lock); for (i = 0; i < NR_DQHASH; i++) - INIT_LIST_HEAD(qunit_hash + i); + CFS_INIT_LIST_HEAD(qunit_hash + i); spin_unlock(&qunit_hash_lock); RETURN(0); } @@ -106,66 +198,13 @@ static inline int qunit_hashfn(struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata) { unsigned int id = qdata->qd_id; - unsigned int type = qdata->qd_flags & QUOTA_IS_GRP; + unsigned int type = QDATA_IS_GRP(qdata); unsigned long tmp = ((unsigned long)qctxt >> L1_CACHE_SHIFT) ^ id; tmp = (tmp * (MAXQUOTAS - type)) % NR_DQHASH; return tmp; } -/* compute the remaining quota for certain gid or uid b=11693 */ -int compute_remquota(struct obd_device *obd, - struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata) -{ - struct super_block *sb = qctxt->lqc_sb; - __u64 usage, limit; - struct obd_quotactl *qctl; - int ret = QUOTA_RET_OK; - __u32 qdata_type = qdata->qd_flags & QUOTA_IS_GRP; - ENTRY; - - if (!sb_any_quota_enabled(sb)) - RETURN(QUOTA_RET_NOQUOTA); - - /* ignore root user */ - if (qdata->qd_id == 0 && qdata_type == USRQUOTA) - RETURN(QUOTA_RET_NOLIMIT); - - OBD_ALLOC_PTR(qctl); - if (qctl == NULL) - RETURN(-ENOMEM); - - /* get fs quota usage & limit */ - qctl->qc_cmd = Q_GETQUOTA; - qctl->qc_id = qdata->qd_id; - qctl->qc_type = qdata_type; - ret = fsfilt_quotactl(obd, sb, qctl); - if (ret) { - if (ret == -ESRCH) /* no limit */ - ret = QUOTA_RET_NOLIMIT; - else - CDEBUG(D_QUOTA, "can't get fs quota usage! (rc:%d)", - ret); - GOTO(out, ret); - } - - usage = qctl->qc_dqblk.dqb_curspace; - limit = qctl->qc_dqblk.dqb_bhardlimit << QUOTABLOCK_BITS; - if (!limit){ /* no limit */ - ret = QUOTA_RET_NOLIMIT; - GOTO(out, ret); - } - - if (limit >= usage) - qdata->qd_count = limit - usage; - else - qdata->qd_count = 0; - EXIT; -out: - OBD_FREE_PTR(qctl); - return ret; -} - /* caller must hold qunit_hash_lock */ static inline struct lustre_qunit *find_qunit(unsigned int hashent, struct lustre_quota_ctxt *qctxt, @@ -178,7 +217,9 @@ static inline struct lustre_qunit *find_qunit(unsigned int hashent, list_for_each_entry(qunit, qunit_hash + hashent, lq_hash) { tmp = &qunit->lq_data; if (qunit->lq_ctxt == qctxt && - qdata->qd_id == tmp->qd_id && qdata->qd_flags == tmp->qd_flags) + qdata->qd_id == tmp->qd_id && + (qdata->qd_flags & LQUOTA_QUNIT_FLAGS) == + (tmp->qd_flags & LQUOTA_QUNIT_FLAGS)) return qunit; } return NULL; @@ -189,9 +230,9 @@ static inline struct lustre_qunit *find_qunit(unsigned int hashent, * @qdata: the type of quota unit to be checked * * return: 1 - need acquire qunit; - * 2 - need release qunit; - * 0 - need do nothing. - * < 0 - error. + * 2 - need release qunit; + * 0 - need do nothing. + * < 0 - error. */ static int check_cur_qunit(struct obd_device *obd, @@ -199,19 +240,22 @@ check_cur_qunit(struct obd_device *obd, { struct super_block *sb = qctxt->lqc_sb; unsigned long qunit_sz, tune_sz; - __u64 usage, limit; + __u64 usage, limit, limit_org, pending_write = 0; + long long record = 0; struct obd_quotactl *qctl; + struct lustre_qunit_size *lqs = NULL; int ret = 0; - __u32 qdata_type = qdata->qd_flags & QUOTA_IS_GRP; - __u32 is_blk = (qdata->qd_flags & QUOTA_IS_BLOCK) >> 1; ENTRY; if (!sb_any_quota_enabled(sb)) RETURN(0); - /* ignore root user */ - if (qdata->qd_id == 0 && qdata_type == USRQUOTA) + spin_lock(&qctxt->lqc_lock); + if (!qctxt->lqc_valid){ + spin_unlock(&qctxt->lqc_lock); RETURN(0); + } + spin_unlock(&qctxt->lqc_lock); OBD_ALLOC_PTR(qctl); if (qctl == NULL) @@ -220,7 +264,7 @@ check_cur_qunit(struct obd_device *obd, /* get fs quota usage & limit */ qctl->qc_cmd = Q_GETQUOTA; qctl->qc_id = qdata->qd_id; - qctl->qc_type = qdata_type; + qctl->qc_type = QDATA_IS_GRP(qdata); ret = fsfilt_quotactl(obd, sb, qctl); if (ret) { if (ret == -ESRCH) /* no limit */ @@ -230,57 +274,144 @@ check_cur_qunit(struct obd_device *obd, GOTO(out, ret); } - if (is_blk) { + if (QDATA_IS_BLK(qdata)) { usage = qctl->qc_dqblk.dqb_curspace; limit = qctl->qc_dqblk.dqb_bhardlimit << QUOTABLOCK_BITS; - qunit_sz = qctxt->lqc_bunit_sz; - tune_sz = qctxt->lqc_btune_sz; - - LASSERT(!(qunit_sz % QUOTABLOCK_SIZE)); } else { usage = qctl->qc_dqblk.dqb_curinodes; limit = qctl->qc_dqblk.dqb_ihardlimit; - qunit_sz = qctxt->lqc_iunit_sz; - tune_sz = qctxt->lqc_itune_sz; } - /* ignore the no quota limit case */ + /* ignore the no quota limit case; and it can avoid creating + * unnecessary lqs for uid/gid */ if (!limit) GOTO(out, ret = 0); + lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id), + qctxt, 0); + if (IS_ERR(lqs) || lqs == NULL) { + CDEBUG(D_ERROR, "fail to find a lqs(%s id: %u)!\n", + QDATA_IS_GRP(qdata) ? "group" : "user", qdata->qd_id); + GOTO (out, ret = 0); + } + spin_lock(&lqs->lqs_lock); + + if (QDATA_IS_BLK(qdata)) { + qunit_sz = lqs->lqs_bunit_sz; + tune_sz = lqs->lqs_btune_sz; + pending_write = lqs->lqs_bwrite_pending; + record = lqs->lqs_blk_rec; + LASSERT(!(qunit_sz % QUOTABLOCK_SIZE)); + } else { + /* we didn't need change inode qunit size now */ + qunit_sz = lqs->lqs_iunit_sz; + tune_sz = lqs->lqs_itune_sz; + pending_write = lqs->lqs_iwrite_pending; + record = lqs->lqs_ino_rec; + } + /* we don't count the MIN_QLIMIT */ - if ((limit == MIN_QLIMIT && !is_blk) || - (toqb(limit) == MIN_QLIMIT && is_blk)) + if ((limit == MIN_QLIMIT && !QDATA_IS_BLK(qdata)) || + (toqb(limit) == MIN_QLIMIT && QDATA_IS_BLK(qdata))) limit = 0; + usage += pending_write; + limit_org = limit; + /* when a releasing quota req is sent, before it returned + limit is assigned a small value. limit will overflow */ + if (limit + record < 0) + usage -= record; + else + limit += record; + LASSERT(qdata->qd_count == 0); if (limit <= usage + tune_sz) { - while (qdata->qd_count + limit <= usage + tune_sz) + while (qdata->qd_count + limit <= + usage + tune_sz) qdata->qd_count += qunit_sz; ret = 1; - } else if (limit > usage + qunit_sz + tune_sz) { - while (limit - qdata->qd_count > usage + qunit_sz + tune_sz) + } else if (limit > usage + qunit_sz + tune_sz && + limit_org > qdata->qd_count + qunit_sz) { + while (limit - qdata->qd_count > usage + qunit_sz + tune_sz && + limit_org > qdata->qd_count + qunit_sz) qdata->qd_count += qunit_sz; ret = 2; + /* if there are other pending writes for this uid/gid, releasing + * quota is put off until the last pending write b=16645 */ + if (ret == 2 && pending_write) { + CDEBUG(D_QUOTA, "delay quota release\n"); + ret = 0; + } } + CDEBUG(D_QUOTA, "type: %c, limit: "LPU64", usage: "LPU64 + ", pending_write: "LPU64", record: "LPD64 + ", qunit_sz: %lu, tune_sz: %lu, ret: %d.\n", + QDATA_IS_BLK(qdata) ? 'b' : 'i', limit, usage, pending_write, + record, qunit_sz, tune_sz, ret); LASSERT(ret == 0 || qdata->qd_count); + + spin_unlock(&lqs->lqs_lock); + lqs_putref(lqs); EXIT; -out: + out: OBD_FREE_PTR(qctl); return ret; } -/* caller must hold qunit_hash_lock */ -static struct lustre_qunit *dqacq_in_flight(struct lustre_quota_ctxt *qctxt, - struct qunit_data *qdata) +/** + * Compute the remaining quota for certain gid or uid b=11693 + */ +int compute_remquota(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, + struct qunit_data *qdata, int isblk) { - unsigned int hashent = qunit_hashfn(qctxt, qdata); - struct lustre_qunit *qunit; + struct super_block *sb = qctxt->lqc_sb; + __u64 usage, limit; + struct obd_quotactl *qctl; + int ret = QUOTA_RET_OK; ENTRY; - LASSERT_SPIN_LOCKED(&qunit_hash_lock); - qunit = find_qunit(hashent, qctxt, qdata); - RETURN(qunit); + if (!sb_any_quota_enabled(sb)) + RETURN(QUOTA_RET_NOQUOTA); + + /* ignore root user */ + if (qdata->qd_id == 0 && QDATA_IS_GRP(qdata) == USRQUOTA) + RETURN(QUOTA_RET_NOLIMIT); + + OBD_ALLOC_PTR(qctl); + if (qctl == NULL) + RETURN(-ENOMEM); + + /* get fs quota usage & limit */ + qctl->qc_cmd = Q_GETQUOTA; + qctl->qc_id = qdata->qd_id; + qctl->qc_type = QDATA_IS_GRP(qdata); + ret = fsfilt_quotactl(obd, sb, qctl); + if (ret) { + if (ret == -ESRCH) /* no limit */ + ret = QUOTA_RET_NOLIMIT; + else + CDEBUG(D_QUOTA, "can't get fs quota usage! (rc:%d)", + ret); + GOTO(out, ret); + } + + usage = isblk ? qctl->qc_dqblk.dqb_curspace : + qctl->qc_dqblk.dqb_curinodes; + limit = isblk ? qctl->qc_dqblk.dqb_bhardlimit << QUOTABLOCK_BITS : + qctl->qc_dqblk.dqb_ihardlimit; + if (!limit){ /* no limit */ + ret = QUOTA_RET_NOLIMIT; + GOTO(out, ret); + } + + if (limit >= usage) + qdata->qd_count = limit - usage; + else + qdata->qd_count = 0; + EXIT; +out: + OBD_FREE_PTR(qctl); + return ret; } static struct lustre_qunit *alloc_qunit(struct lustre_quota_ctxt *qctxt, @@ -289,17 +420,19 @@ static struct lustre_qunit *alloc_qunit(struct lustre_quota_ctxt *qctxt, struct lustre_qunit *qunit = NULL; ENTRY; - OBD_SLAB_ALLOC(qunit, qunit_cachep, GFP_NOFS, sizeof(*qunit)); + OBD_SLAB_ALLOC_PTR_GFP(qunit, qunit_cachep, CFS_ALLOC_IO); if (qunit == NULL) RETURN(NULL); - INIT_LIST_HEAD(&qunit->lq_hash); - INIT_LIST_HEAD(&qunit->lq_waiters); + CFS_INIT_LIST_HEAD(&qunit->lq_hash); + init_waitqueue_head(&qunit->lq_waitq); atomic_set(&qunit->lq_refcnt, 1); qunit->lq_ctxt = qctxt; memcpy(&qunit->lq_data, qdata, sizeof(*qdata)); qunit->lq_opc = opc; - + qunit->lq_lock = SPIN_LOCK_UNLOCKED; + QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0); + qunit->lq_owner = cfs_curproc_pid(); RETURN(qunit); } @@ -320,97 +453,178 @@ static void qunit_put(struct lustre_qunit *qunit) free_qunit(qunit); } +/* caller must hold qunit_hash_lock and release ref of qunit after using it */ +static struct lustre_qunit *dqacq_in_flight(struct lustre_quota_ctxt *qctxt, + struct qunit_data *qdata) +{ + unsigned int hashent = qunit_hashfn(qctxt, qdata); + struct lustre_qunit *qunit; + ENTRY; + + LASSERT_SPIN_LOCKED(&qunit_hash_lock); + qunit = find_qunit(hashent, qctxt, qdata); + if (qunit) + qunit_get(qunit); + RETURN(qunit); +} + static void insert_qunit_nolock(struct lustre_quota_ctxt *qctxt, struct lustre_qunit *qunit) { struct list_head *head; LASSERT(list_empty(&qunit->lq_hash)); + qunit_get(qunit); head = qunit_hash + qunit_hashfn(qctxt, &qunit->lq_data); list_add(&qunit->lq_hash, head); + QUNIT_SET_STATE(qunit, QUNIT_IN_HASH); +} + +static void compute_lqs_after_removing_qunit(struct lustre_qunit *qunit) +{ + struct lustre_qunit_size *lqs; + + lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(&qunit->lq_data), + qunit->lq_data.qd_id), + qunit->lq_ctxt, 0); + if (lqs && !IS_ERR(lqs)) { + spin_lock(&lqs->lqs_lock); + if (qunit->lq_opc == QUOTA_DQACQ) + quota_compute_lqs(&qunit->lq_data, lqs, 0, 1); + if (qunit->lq_opc == QUOTA_DQREL) + quota_compute_lqs(&qunit->lq_data, lqs, 0, 0); + spin_unlock(&lqs->lqs_lock); + /* this is for quota_search_lqs */ + lqs_putref(lqs); + /* this is for schedule_dqacq */ + lqs_putref(lqs); + } } static void remove_qunit_nolock(struct lustre_qunit *qunit) { LASSERT(!list_empty(&qunit->lq_hash)); + LASSERT_SPIN_LOCKED(&qunit_hash_lock); + list_del_init(&qunit->lq_hash); + QUNIT_SET_STATE(qunit, QUNIT_RM_FROM_HASH); + qunit_put(qunit); } -struct qunit_waiter { - struct list_head qw_entry; - cfs_waitq_t qw_waitq; - int qw_rc; -}; +void* quota_barrier(struct lustre_quota_ctxt *qctxt, + struct obd_quotactl *oqctl, int isblk) +{ + struct lustre_qunit *qunit, *find_qunit; + int cycle = 1; + + OBD_SLAB_ALLOC(qunit, qunit_cachep, CFS_ALLOC_IO, sizeof(*qunit)); + if (qunit == NULL) { + CERROR("locating qunit failed.(id=%u isblk=%d %s)\n", + oqctl->qc_id, isblk, oqctl->qc_type ? "grp" : "usr"); + qctxt_wait_pending_dqacq(qctxt, oqctl->qc_id, + oqctl->qc_type, isblk); + return NULL; + } + + INIT_LIST_HEAD(&qunit->lq_hash); + qunit->lq_lock = SPIN_LOCK_UNLOCKED; + init_waitqueue_head(&qunit->lq_waitq); + atomic_set(&qunit->lq_refcnt, 1); + qunit->lq_ctxt = qctxt; + qunit->lq_data.qd_id = oqctl->qc_id; + qunit->lq_data.qd_flags = oqctl->qc_type; + if (isblk) + QDATA_SET_BLK(&qunit->lq_data); + QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0); + /* it means it is only an invalid qunit for barrier */ + qunit->lq_opc = QUOTA_LAST_OPC; + + while (1) { + spin_lock(&qunit_hash_lock); + find_qunit = dqacq_in_flight(qctxt, &qunit->lq_data); + if (find_qunit) { + spin_unlock(&qunit_hash_lock); + qunit_put(find_qunit); + qctxt_wait_pending_dqacq(qctxt, oqctl->qc_id, + oqctl->qc_type, isblk); + CDEBUG(D_QUOTA, "cycle=%d\n", cycle++); + continue; + } + break; + } + insert_qunit_nolock(qctxt, qunit); + spin_unlock(&qunit_hash_lock); + return qunit; +} + +void quota_unbarrier(void *handle) +{ + struct lustre_qunit *qunit = (struct lustre_qunit *)handle; + + if (qunit == NULL) { + CERROR("handle is NULL\n"); + return; + } + + LASSERT(qunit->lq_opc == QUOTA_LAST_OPC); + spin_lock(&qunit_hash_lock); + remove_qunit_nolock(qunit); + spin_unlock(&qunit_hash_lock); + QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, QUOTA_REQ_RETURNED); + wake_up(&qunit->lq_waitq); + qunit_put(qunit); +} #define INC_QLIMIT(limit, count) (limit == MIN_QLIMIT) ? \ (limit = count) : (limit += count) -/* FIXME check if this mds is the master of specified id */ -static int -is_master(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, - unsigned int id, int type) +static inline int is_master(struct lustre_quota_ctxt *qctxt) { return qctxt->lqc_handler ? 1 : 0; } static int schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, - struct qunit_data *qdata, int opc, int wait); + struct qunit_data *qdata, int opc, int wait, + struct obd_trans_info *oti); -static int split_before_schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, - struct qunit_data *qdata, int opc, int wait) +static inline void qdata_to_oqaq(struct qunit_data *qdata, + struct quota_adjust_qunit *oqaq) { - int rc = 0, ret; - struct qunit_data tmp_qdata; - ENTRY; - LASSERT(qdata); - if (qctxt->lqc_import) - while (should_translate_quota(qctxt->lqc_import) && - qdata->qd_count > MAX_QUOTA_COUNT32) { - - tmp_qdata = *qdata; - tmp_qdata.qd_count = MAX_QUOTA_COUNT32; - qdata->qd_count -= tmp_qdata.qd_count; - ret = schedule_dqacq(obd, qctxt, &tmp_qdata, opc, wait); - if (!rc) - rc = ret; - } - - if (qdata->qd_count){ - ret = schedule_dqacq(obd, qctxt, qdata, opc, wait); - if (!rc) - rc = ret; - } - - RETURN(rc); + LASSERT(oqaq); + + oqaq->qaq_flags = qdata->qd_flags; + oqaq->qaq_id = qdata->qd_id; + if (QDATA_IS_ADJBLK(qdata)) + oqaq->qaq_bunit_sz = qdata->qd_qunit; + if (QDATA_IS_ADJINO(qdata)) + oqaq->qaq_iunit_sz = qdata->qd_qunit; } static int -dqacq_completion(struct obd_device *obd, - struct lustre_quota_ctxt *qctxt, +dqacq_completion(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata, int rc, int opc) { struct lustre_qunit *qunit = NULL; struct super_block *sb = qctxt->lqc_sb; - unsigned long qunit_sz; - struct qunit_waiter *qw, *tmp; int err = 0; - __u32 qdata_type = qdata->qd_flags & QUOTA_IS_GRP; - __u32 is_blk = (qdata->qd_flags & QUOTA_IS_BLOCK) >> 1; - __u64 qd_tmp = qdata->qd_count; - unsigned long div_r; + struct quota_adjust_qunit *oqaq = NULL; + int rc1 = 0; ENTRY; LASSERT(qdata); - qunit_sz = is_blk ? qctxt->lqc_bunit_sz : qctxt->lqc_iunit_sz; - div_r = do_div(qd_tmp, qunit_sz); - LASSERT(!div_r); + QDATA_DEBUG(qdata, "obd(%s): complete %s quota req\n", + obd->obd_name, (opc == QUOTA_DQACQ) ? "acq" : "rel"); + + /* do it only when a releasing quota req more than 5MB b=18491 */ + if (opc == QUOTA_DQREL && qdata->qd_count >= 5242880) + OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_DELAY_REL, 5); /* update local operational quota file */ if (rc == 0) { - __u32 count = QUSG(qdata->qd_count, is_blk); + __u64 count = QUSG(qdata->qd_count, QDATA_IS_BLK(qdata)); struct obd_quotactl *qctl; __u64 *hardlimit; @@ -423,14 +637,14 @@ dqacq_completion(struct obd_device *obd, * set fs quota limit */ qctl->qc_cmd = Q_GETQUOTA; qctl->qc_id = qdata->qd_id; - qctl->qc_type = qdata_type; + qctl->qc_type = QDATA_IS_GRP(qdata); err = fsfilt_quotactl(obd, sb, qctl); if (err) { CERROR("error get quota fs limit! (rc:%d)\n", err); GOTO(out_mem, err); } - if (is_blk) { + if (QDATA_IS_BLK(qdata)) { qctl->qc_dqblk.dqb_valid = QIF_BLIMITS; hardlimit = &qctl->qc_dqblk.dqb_bhardlimit; } else { @@ -438,12 +652,24 @@ dqacq_completion(struct obd_device *obd, hardlimit = &qctl->qc_dqblk.dqb_ihardlimit; } + CDEBUG(D_QUOTA, "hardlimt: "LPU64"\n", *hardlimit); + + if (*hardlimit == 0) + goto out_mem; + switch (opc) { case QUOTA_DQACQ: INC_QLIMIT(*hardlimit, count); break; case QUOTA_DQREL: - LASSERT(count < *hardlimit); + LASSERTF(count < *hardlimit, + "id(%u) flag(%u) type(%c) isblk(%c) " + "count("LPU64") qd_qunit("LPU64") " + "hardlimit("LPU64").\n", + qdata->qd_id, qdata->qd_flags, + QDATA_IS_GRP(qdata) ? 'g' : 'u', + QDATA_IS_BLK(qdata) ? 'b': 'i', + qdata->qd_count, qdata->qd_qunit, *hardlimit); *hardlimit -= count; break; default: @@ -478,40 +704,65 @@ out: /* this qunit has been removed by qctxt_cleanup() */ if (!qunit) { spin_unlock(&qunit_hash_lock); + QDATA_DEBUG(qdata, "%s is discarded because qunit isn't found\n", + opc == QUOTA_DQACQ ? "DQACQ" : "DQREL"); RETURN(err); } LASSERT(opc == qunit->lq_opc); + /* remove this qunit from lq_hash so that new processes cannot be added + * to qunit->lq_waiters */ remove_qunit_nolock(qunit); + spin_unlock(&qunit_hash_lock); - /* wake up all waiters */ - list_for_each_entry_safe(qw, tmp, &qunit->lq_waiters, qw_entry) { - list_del_init(&qw->qw_entry); - qw->qw_rc = rc; - wake_up(&qw->qw_waitq); - } + compute_lqs_after_removing_qunit(qunit); - spin_unlock(&qunit_hash_lock); + if (rc == 0) + rc = QUOTA_REQ_RETURNED; + QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, rc); + /* wake up all waiters */ + wake_up_all(&qunit->lq_waitq); + /* this is for dqacq_in_flight() */ qunit_put(qunit); + /* this is for alloc_qunit() */ + qunit_put(qunit); + if (rc < 0 && rc != -EDQUOT) + RETURN(err); /* don't reschedule in such cases: - * - acq/rel failure, but not for quota recovery. + * - acq/rel failure and qunit isn't changed, + * but not for quota recovery. * - local dqacq/dqrel. * - local disk io failure. */ - if (err || (rc && rc != -EBUSY) || - is_master(obd, qctxt, qdata->qd_id, qdata_type)) - RETURN(err); + OBD_ALLOC_PTR(oqaq); + if (!oqaq) + RETURN(-ENOMEM); + qdata_to_oqaq(qdata, oqaq); + /* adjust the qunit size in slaves */ + rc1 = quota_adjust_slave_lqs(oqaq, qctxt); + OBD_FREE_PTR(oqaq); + if (rc1 < 0) { + CERROR("adjust slave's qunit size failed!(rc:%d)\n", rc1); + RETURN(rc1); + } + if (err || (rc < 0 && rc != -EBUSY && rc1 == 0) || is_master(qctxt)) + RETURN(err); + + if (opc == QUOTA_DQREL && qdata->qd_count >= 5242880 && + OBD_FAIL_CHECK(OBD_FAIL_QUOTA_DELAY_REL)) + RETURN(err); /* reschedule another dqacq/dqrel if needed */ qdata->qd_count = 0; - rc = check_cur_qunit(obd, qctxt, qdata); - if (rc > 0) { + qdata->qd_flags &= LQUOTA_QUNIT_FLAGS; + rc1 = check_cur_qunit(obd, qctxt, qdata); + if (rc1 > 0) { int opc; - opc = rc == 1 ? QUOTA_DQACQ : QUOTA_DQREL; - rc = split_before_schedule_dqacq(obd, qctxt, qdata, opc, 0); - QDATA_DEBUG(qdata, "reschedudle opc(%d) rc(%d)\n", opc, rc); + opc = rc1 == 1 ? QUOTA_DQACQ : QUOTA_DQREL; + rc1 = schedule_dqacq(obd, qctxt, qdata, opc, 0, NULL); + QDATA_DEBUG(qdata, "reschedudle opc(%d) rc(%d)\n", opc, rc1); } RETURN(err); } @@ -521,193 +772,363 @@ struct dqacq_async_args { struct lustre_qunit *aa_qunit; }; -static int dqacq_interpret(struct ptlrpc_request *req, void *data, int rc) +static int dqacq_interpret(const struct lu_env *env, + struct ptlrpc_request *req, void *data, int rc) { struct dqacq_async_args *aa = (struct dqacq_async_args *)data; struct lustre_quota_ctxt *qctxt = aa->aa_ctxt; + struct obd_device_target *obt = qctxt->lqc_obt; struct lustre_qunit *qunit = aa->aa_qunit; struct obd_device *obd = req->rq_import->imp_obd; struct qunit_data *qdata = NULL; - struct qunit_data_old *qdata_old = NULL; ENTRY; LASSERT(req); LASSERT(req->rq_import); - if ((req->rq_import->imp_connect_data.ocd_connect_flags & OBD_CONNECT_QUOTA64) && - !OBD_FAIL_CHECK(OBD_FAIL_QUOTA_QD_COUNT_32BIT)) { - CDEBUG(D_QUOTA, "qd_count is 64bit!\n"); - qdata = lustre_swab_reqbuf(req, REPLY_REC_OFF, sizeof(*qdata), lustre_swab_qdata); - } else { - CDEBUG(D_QUOTA, "qd_count is 32bit!\n"); - qdata_old = lustre_swab_reqbuf(req, REPLY_REC_OFF, sizeof(struct qunit_data_old), - lustre_swab_qdata_old); - qdata = lustre_quota_old_to_new(qdata_old); - } - if (qdata == NULL) { - DEBUG_REQ(D_ERROR, req, "error unpacking qunit_data"); - RETURN(-EPROTO); + + down_read(&obt->obt_rwsem); + /* if a quota req timeouts or is dropped, we should update quota + * statistics which will be handled in dqacq_completion. And in + * this situation we should get qdata from request instead of + * reply */ + qdata = quota_get_qdata(req, (rc != 0) ? QUOTA_REQUEST : QUOTA_REPLY, + QUOTA_IMPORT); + if (IS_ERR(qdata)) { + rc = PTR_ERR(qdata); + DEBUG_REQ(D_ERROR, req, + "error unpacking qunit_data(rc: %ld)\n", + PTR_ERR(qdata)); + RETURN(PTR_ERR(qdata)); } - LASSERT(qdata->qd_id == qunit->lq_data.qd_id && - (qdata->qd_flags & QUOTA_IS_GRP) == (qunit->lq_data.qd_flags & QUOTA_IS_GRP) && - (qdata->qd_count == qunit->lq_data.qd_count || - qdata->qd_count == 0)); + QDATA_DEBUG(qdata, "qdata: interpret rc(%d).\n", rc); + QDATA_DEBUG((&qunit->lq_data), "lq_data: \n"); - QDATA_DEBUG(qdata, "%s interpret rc(%d).\n", - lustre_msg_get_opc(req->rq_reqmsg) == QUOTA_DQACQ ? - "DQACQ" : "DQREL", rc); + if (qdata->qd_id != qunit->lq_data.qd_id || + OBD_FAIL_CHECK(OBD_FAIL_QUOTA_RET_QDATA)) { + CDEBUG(D_ERROR, "the returned qd_id isn't expected!" + "(qdata: %u, lq_data: %u)\n", qdata->qd_id, + qunit->lq_data.qd_id); + qdata->qd_id = qunit->lq_data.qd_id; + rc = -EPROTO; + } + if (QDATA_IS_GRP(qdata) != QDATA_IS_GRP(&qunit->lq_data)) { + CDEBUG(D_ERROR, "the returned grp/usr isn't expected!" + "(qdata: %u, lq_data: %u)\n", qdata->qd_flags, + qunit->lq_data.qd_flags); + if (QDATA_IS_GRP(&qunit->lq_data)) + QDATA_SET_GRP(qdata); + else + QDATA_CLR_GRP(qdata); + rc = -EPROTO; + } + if (qdata->qd_count > qunit->lq_data.qd_count) { + CDEBUG(D_ERROR, "the returned qd_count isn't expected!" + "(qdata: "LPU64", lq_data: "LPU64")\n", qdata->qd_count, + qunit->lq_data.qd_count); + rc = -EPROTO; + } rc = dqacq_completion(obd, qctxt, qdata, rc, lustre_msg_get_opc(req->rq_reqmsg)); + up_read(&obt->obt_rwsem); RETURN(rc); } -static int got_qunit(struct qunit_waiter *waiter) +/** + * check if quota master is online + */ +int check_qm(struct lustre_quota_ctxt *qctxt) { - int rc = 0; + int rc; + ENTRY; + + spin_lock(&qctxt->lqc_lock); + /* quit waiting when mds is back or qctxt is cleaned up */ + rc = qctxt->lqc_import || !qctxt->lqc_valid; + spin_unlock(&qctxt->lqc_lock); + + RETURN(rc); +} + +/* wake up all waiting threads when lqc_import is NULL */ +void dqacq_interrupt(struct lustre_quota_ctxt *qctxt) +{ + struct lustre_qunit *qunit, *tmp; + int i; ENTRY; + spin_lock(&qunit_hash_lock); - rc = list_empty(&waiter->qw_entry); + for (i = 0; i < NR_DQHASH; i++) { + list_for_each_entry_safe(qunit, tmp, &qunit_hash[i], lq_hash) { + if (qunit->lq_ctxt != qctxt) + continue; + + /* Wake up all waiters. Do not change lq_state. + * The waiters will check lq_rc which is kept as 0 + * if no others change it, then the waiters will return + * -EAGAIN to caller who can perform related quota + * acq/rel if necessary. */ + wake_up_all(&qunit->lq_waitq); + } + } spin_unlock(&qunit_hash_lock); + EXIT; +} + +static int got_qunit(struct lustre_qunit *qunit, int is_master) +{ + struct lustre_quota_ctxt *qctxt = qunit->lq_ctxt; + int rc = 0; + ENTRY; + + spin_lock(&qunit->lq_lock); + switch (qunit->lq_state) { + case QUNIT_IN_HASH: + case QUNIT_RM_FROM_HASH: + break; + case QUNIT_FINISHED: + rc = 1; + break; + default: + CERROR("invalid qunit state %d\n", qunit->lq_state); + } + spin_unlock(&qunit->lq_lock); + + if (!rc) { + spin_lock(&qctxt->lqc_lock); + rc = !qctxt->lqc_valid; + if (!is_master) + rc |= !qctxt->lqc_import; + spin_unlock(&qctxt->lqc_lock); + } + RETURN(rc); } static int -schedule_dqacq(struct obd_device *obd, - struct lustre_quota_ctxt *qctxt, - struct qunit_data *qdata, int opc, int wait) +schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, + struct qunit_data *qdata, int opc, int wait, + struct obd_trans_info *oti) { struct lustre_qunit *qunit, *empty; - struct qunit_waiter qw; struct l_wait_info lwi = { 0 }; struct ptlrpc_request *req; - struct qunit_data *reqdata; struct dqacq_async_args *aa; - int size[2] = { sizeof(struct ptlrpc_body), sizeof(*reqdata) }; + struct obd_import *imp = NULL; + struct lustre_qunit_size *lqs = NULL; + struct timeval work_start; + struct timeval work_end; + long timediff; int rc = 0; ENTRY; - INIT_LIST_HEAD(&qw.qw_entry); - init_waitqueue_head(&qw.qw_waitq); - qw.qw_rc = 0; - + LASSERT(opc == QUOTA_DQACQ || opc == QUOTA_DQREL); + do_gettimeofday(&work_start); if ((empty = alloc_qunit(qctxt, qdata, opc)) == NULL) RETURN(-ENOMEM); spin_lock(&qunit_hash_lock); - qunit = dqacq_in_flight(qctxt, qdata); if (qunit) { - if (wait) - list_add_tail(&qw.qw_entry, &qunit->lq_waiters); spin_unlock(&qunit_hash_lock); + qunit_put(empty); - free_qunit(empty); goto wait_completion; } qunit = empty; + qunit_get(qunit); insert_qunit_nolock(qctxt, qunit); - if (wait) - list_add_tail(&qw.qw_entry, &qunit->lq_waiters); spin_unlock(&qunit_hash_lock); - LASSERT(qunit); + lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id), + qctxt, 0); + if (lqs && !IS_ERR(lqs)) { + spin_lock(&lqs->lqs_lock); + quota_compute_lqs(qdata, lqs, 1, (opc == QUOTA_DQACQ) ? 1 : 0); + /* when this qdata returned from mds, it will call lqs_putref */ + lqs_getref(lqs); + spin_unlock(&lqs->lqs_lock); + /* this is for quota_search_lqs */ + lqs_putref(lqs); + } else { + CDEBUG(D_ERROR, "Can't find the lustre qunit size!\n"); + } + QDATA_DEBUG(qdata, "obd(%s): send %s quota req\n", + obd->obd_name, (opc == QUOTA_DQACQ) ? "acq" : "rel"); /* master is going to dqacq/dqrel from itself */ - if (is_master(obd, qctxt, qdata->qd_id, qdata->qd_flags & QUOTA_IS_GRP)) { + if (is_master(qctxt)) { int rc2; QDATA_DEBUG(qdata, "local %s.\n", opc == QUOTA_DQACQ ? "DQACQ" : "DQREL"); + QDATA_SET_CHANGE_QS(qdata); rc = qctxt->lqc_handler(obd, qdata, opc); rc2 = dqacq_completion(obd, qctxt, qdata, rc, opc); - RETURN((rc && rc != -EDQUOT) ? rc : rc2); + /* this is for qunit_get() */ + qunit_put(qunit); + + do_gettimeofday(&work_end); + timediff = cfs_timeval_sub(&work_end, &work_start, NULL); + if (opc == QUOTA_DQACQ) + lprocfs_counter_add(qctxt->lqc_stats, + wait ? LQUOTA_SYNC_ACQ : LQUOTA_ASYNC_ACQ, + timediff); + else + lprocfs_counter_add(qctxt->lqc_stats, + wait ? LQUOTA_SYNC_REL : LQUOTA_ASYNC_REL, + timediff); + RETURN(rc ? rc : rc2); + } + + spin_lock(&qctxt->lqc_lock); + if (!qctxt->lqc_import) { + spin_unlock(&qctxt->lqc_lock); + QDATA_DEBUG(qdata, "lqc_import is invalid.\n"); + + spin_lock(&qunit_hash_lock); + remove_qunit_nolock(qunit); + spin_unlock(&qunit_hash_lock); + + compute_lqs_after_removing_qunit(qunit); + + QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, -EAGAIN); + wake_up_all(&qunit->lq_waitq); + + /* this is for qunit_get() */ + qunit_put(qunit); + /* this for alloc_qunit() */ + qunit_put(qunit); + spin_lock(&qctxt->lqc_lock); + if (wait && !qctxt->lqc_import) { + spin_unlock(&qctxt->lqc_lock); + + LASSERT(oti && oti->oti_thread && + oti->oti_thread->t_watchdog); + + lc_watchdog_disable(oti->oti_thread->t_watchdog); + CDEBUG(D_QUOTA, "sleep for quota master\n"); + l_wait_event(qctxt->lqc_wait_for_qmaster, + check_qm(qctxt), &lwi); + CDEBUG(D_QUOTA, "wake up when quota master is back\n"); + lc_watchdog_touch(oti->oti_thread->t_watchdog, + GET_TIMEOUT(oti->oti_thread->t_svc)); + } else { + spin_unlock(&qctxt->lqc_lock); + } + + RETURN(-EAGAIN); } + imp = class_import_get(qctxt->lqc_import); + spin_unlock(&qctxt->lqc_lock); /* build dqacq/dqrel request */ - LASSERT(qctxt->lqc_import); - req = ptlrpc_prep_req(qctxt->lqc_import, LUSTRE_MDS_VERSION, opc, 2, - size, NULL); - if (!req) { + LASSERT(imp); + + req = ptlrpc_request_alloc_pack(imp, &RQF_MDS_QUOTA_DQACQ, + LUSTRE_MDS_VERSION, opc); + class_import_put(imp); + if (req == NULL) { + CDEBUG(D_ERROR, "Can't alloc request\n"); dqacq_completion(obd, qctxt, qdata, -ENOMEM, opc); + /* this is for qunit_get() */ + qunit_put(qunit); RETURN(-ENOMEM); } - LASSERT(!should_translate_quota(qctxt->lqc_import) || - qdata->qd_count <= MAX_QUOTA_COUNT32); - if (should_translate_quota(qctxt->lqc_import) || - OBD_FAIL_CHECK(OBD_FAIL_QUOTA_QD_COUNT_32BIT)) - { - struct qunit_data_old *reqdata_old, *tmp; - - reqdata_old = lustre_msg_buf(req->rq_reqmsg, REPLY_REC_OFF, - sizeof(*reqdata_old)); - tmp = lustre_quota_new_to_old(qdata); - *reqdata_old = *tmp; - size[1] = sizeof(*reqdata_old); - CDEBUG(D_QUOTA, "qd_count is 32bit!\n"); - } else { - reqdata = lustre_msg_buf(req->rq_reqmsg, REPLY_REC_OFF, - sizeof(*reqdata)); - *reqdata = *qdata; - size[1] = sizeof(*reqdata); - CDEBUG(D_QUOTA, "qd_count is 64bit!\n"); + ptlrpc_request_set_replen(req); + req->rq_no_resend = req->rq_no_delay = 1; + rc = quota_copy_qdata(req, qdata, QUOTA_REQUEST, QUOTA_IMPORT); + if (rc < 0) { + CDEBUG(D_ERROR, "Can't pack qunit_data(rc: %d)\n", rc); + ptlrpc_req_finished(req); + dqacq_completion(obd, qctxt, qdata, -EPROTO, opc); + /* this is for qunit_get() */ + qunit_put(qunit); + RETURN(rc); } - ptlrpc_req_set_repsize(req, 2, size); CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); - aa = (struct dqacq_async_args *)&req->rq_async_args; + aa = ptlrpc_req_async_args(req); aa->aa_ctxt = qctxt; aa->aa_qunit = qunit; req->rq_interpret_reply = dqacq_interpret; - ptlrpcd_add_req(req); + ptlrpcd_add_req(req, PSCOPE_OTHER); QDATA_DEBUG(qdata, "%s scheduled.\n", opc == QUOTA_DQACQ ? "DQACQ" : "DQREL"); wait_completion: if (wait && qunit) { struct qunit_data *p = &qunit->lq_data; - QDATA_DEBUG(p, "wait for dqacq.\n"); - - l_wait_event(qw.qw_waitq, got_qunit(&qw), &lwi); - if (qw.qw_rc == 0) - rc = -EAGAIN; - CDEBUG(D_QUOTA, "wait dqacq done. (rc:%d)\n", qw.qw_rc); + QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit); + l_wait_event(qunit->lq_waitq, got_qunit(qunit, is_master(qctxt)), + &lwi); + /* rc = -EAGAIN, it means the quota master isn't ready yet + * rc = QUOTA_REQ_RETURNED, it means a quota req is finished; + * rc = -EDQUOT, it means out of quota + * rc = -EBUSY, it means recovery is happening + * other rc < 0, it means real errors, functions who call + * schedule_dqacq should take care of this */ + spin_lock(&qunit->lq_lock); + rc = qunit->lq_rc; + spin_unlock(&qunit->lq_lock); + CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: id(%u) flag(%u) " + "rc(%d) owner(%d)\n", qunit, qunit->lq_data.qd_id, + qunit->lq_data.qd_flags, rc, qunit->lq_owner); } + + qunit_put(qunit); + do_gettimeofday(&work_end); + timediff = cfs_timeval_sub(&work_end, &work_start, NULL); + if (opc == QUOTA_DQACQ) + lprocfs_counter_add(qctxt->lqc_stats, + wait ? LQUOTA_SYNC_ACQ : LQUOTA_ASYNC_ACQ, + timediff); + else + lprocfs_counter_add(qctxt->lqc_stats, + wait ? LQUOTA_SYNC_REL : LQUOTA_ASYNC_REL, + timediff); + RETURN(rc); } int qctxt_adjust_qunit(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, - uid_t uid, gid_t gid, __u32 isblk, int wait) + const unsigned int id[], __u32 isblk, int wait, + struct obd_trans_info *oti) { - int ret, rc = 0, i = USRQUOTA; - __u32 id[MAXQUOTAS] = { uid, gid }; + int rc = 0, i = USRQUOTA; struct qunit_data qdata[MAXQUOTAS]; ENTRY; - CLASSERT(MAXQUOTAS < 4); - if (!sb_any_quota_enabled(qctxt->lqc_sb)) + if (quota_is_set(obd, id, isblk ? QB_SET : QI_SET) == 0) RETURN(0); for (i = 0; i < MAXQUOTAS; i++) { qdata[i].qd_id = id[i]; - qdata[i].qd_flags = 0; - qdata[i].qd_flags |= i; - qdata[i].qd_flags |= isblk ? QUOTA_IS_BLOCK : 0; + qdata[i].qd_flags = i; + if (isblk) + QDATA_SET_BLK(&qdata[i]); qdata[i].qd_count = 0; - ret = check_cur_qunit(obd, qctxt, &qdata[i]); - if (ret > 0) { + rc = check_cur_qunit(obd, qctxt, &qdata[i]); + if (rc > 0) { int opc; /* need acquire or release */ - opc = ret == 1 ? QUOTA_DQACQ : QUOTA_DQREL; - ret = split_before_schedule_dqacq(obd, qctxt, &qdata[i], - opc, wait); - if (!rc) - rc = ret; + opc = rc == 1 ? QUOTA_DQACQ : QUOTA_DQREL; + rc = schedule_dqacq(obd, qctxt, &qdata[i], opc, + wait,oti); + if (rc < 0) + RETURN(rc); + } else if (wait == 1) { + /* when wait equates 1, that means mds_quota_acquire + * or filter_quota_acquire is calling it. */ + rc = qctxt_wait_pending_dqacq(qctxt, id[i], i, isblk); + if (rc < 0) + RETURN(rc); } } @@ -719,93 +1140,177 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id, unsigned short type, int isblk) { struct lustre_qunit *qunit = NULL; - struct qunit_waiter qw; struct qunit_data qdata; + struct timeval work_start; + struct timeval work_end; + long timediff; struct l_wait_info lwi = { 0 }; + int rc = 0; ENTRY; - INIT_LIST_HEAD(&qw.qw_entry); - init_waitqueue_head(&qw.qw_waitq); - qw.qw_rc = 0; - + do_gettimeofday(&work_start); qdata.qd_id = id; - qdata.qd_flags = 0; - qdata.qd_flags |= type; - qdata.qd_flags |= isblk ? QUOTA_IS_BLOCK : 0; + qdata.qd_flags = type; + if (isblk) + QDATA_SET_BLK(&qdata); qdata.qd_count = 0; spin_lock(&qunit_hash_lock); - qunit = dqacq_in_flight(qctxt, &qdata); - if (qunit) - list_add_tail(&qw.qw_entry, &qunit->lq_waiters); - spin_unlock(&qunit_hash_lock); if (qunit) { - struct qunit_data *p = &qdata; - QDATA_DEBUG(p, "wait for dqacq completion.\n"); - l_wait_event(qw.qw_waitq, got_qunit(&qw), &lwi); - QDATA_DEBUG(p, "wait dqacq done. (rc:%d)\n", qw.qw_rc); + struct qunit_data *p = &qunit->lq_data; + + QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit); + l_wait_event(qunit->lq_waitq, got_qunit(qunit, is_master(qctxt)), + &lwi); + CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: rc(%d) " + "owner(%d)\n", qunit, qunit->lq_rc, qunit->lq_owner); + /* keep same as schedule_dqacq() b=17030 */ + spin_lock(&qunit->lq_lock); + rc = qunit->lq_rc; + spin_unlock(&qunit->lq_lock); + /* this is for dqacq_in_flight() */ + qunit_put(qunit); + do_gettimeofday(&work_end); + timediff = cfs_timeval_sub(&work_end, &work_start, NULL); + lprocfs_counter_add(qctxt->lqc_stats, + isblk ? LQUOTA_WAIT_PENDING_BLK_QUOTA : + LQUOTA_WAIT_PENDING_INO_QUOTA, + timediff); + } else { + do_gettimeofday(&work_end); + timediff = cfs_timeval_sub(&work_end, &work_start, NULL); + lprocfs_counter_add(qctxt->lqc_stats, + isblk ? LQUOTA_NOWAIT_PENDING_BLK_QUOTA : + LQUOTA_NOWAIT_PENDING_INO_QUOTA, + timediff); } - RETURN(0); + + RETURN(rc); } int -qctxt_init(struct lustre_quota_ctxt *qctxt, struct super_block *sb, - dqacq_handler_t handler) +qctxt_init(struct obd_device *obd, dqacq_handler_t handler) { + struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt; + struct obd_device_target *obt = &obd->u.obt; + struct super_block *sb = obt->obt_sb; int rc = 0; ENTRY; + LASSERT(qctxt); + rc = ptlrpcd_addref(); if (rc) RETURN(rc); + cfs_waitq_init(&qctxt->lqc_wait_for_qmaster); + spin_lock_init(&qctxt->lqc_lock); + spin_lock(&qctxt->lqc_lock); qctxt->lqc_handler = handler; qctxt->lqc_sb = sb; + qctxt->lqc_obt = obt; qctxt->lqc_import = NULL; qctxt->lqc_recovery = 0; - qctxt->lqc_atype = 0; - qctxt->lqc_status= 0; + qctxt->lqc_switch_qs = 1; /* Change qunit size in default setting */ + qctxt->lqc_valid = 1; + qctxt->lqc_cqs_boundary_factor = 4; + qctxt->lqc_cqs_least_bunit = PTLRPC_MAX_BRW_SIZE; + qctxt->lqc_cqs_least_iunit = 2; + qctxt->lqc_cqs_qs_factor = 2; + qctxt->lqc_flags = 0; + QUOTA_MASTER_UNREADY(qctxt); qctxt->lqc_bunit_sz = default_bunit_sz; qctxt->lqc_btune_sz = default_bunit_sz / 100 * default_btune_ratio; qctxt->lqc_iunit_sz = default_iunit_sz; qctxt->lqc_itune_sz = default_iunit_sz * default_itune_ratio / 100; + qctxt->lqc_switch_seconds = 300; /* enlarging will wait 5 minutes + * after the last shrinking */ + qctxt->lqc_sync_blk = 0; + spin_unlock(&qctxt->lqc_lock); + + qctxt->lqc_lqs_hash = lustre_hash_init("LQS_HASH", + HASH_LQS_CUR_BITS, + HASH_LQS_MAX_BITS, + &lqs_hash_ops, 0); + if (!qctxt->lqc_lqs_hash) { + CERROR("initialize hash lqs for %s error!\n", obd->obd_name); + RETURN(-ENOMEM); + } - RETURN(0); +#ifdef LPROCFS + rc = lquota_proc_setup(obd, is_master(qctxt)); + if (rc) + CERROR("initialize proc for %s error!\n", obd->obd_name); +#endif + + RETURN(rc); +} + + +void hash_put_lqs(void *obj, void *data) +{ + lqs_putref((struct lustre_qunit_size *)obj); } void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force) { struct lustre_qunit *qunit, *tmp; - struct qunit_waiter *qw, *tmp2; + struct list_head tmp_list; + struct obd_device_target *obt = qctxt->lqc_obt; int i; ENTRY; - spin_lock(&qunit_hash_lock); + CFS_INIT_LIST_HEAD(&tmp_list); + + spin_lock(&qctxt->lqc_lock); + qctxt->lqc_valid = 0; + spin_unlock(&qctxt->lqc_lock); + spin_lock(&qunit_hash_lock); for (i = 0; i < NR_DQHASH; i++) { list_for_each_entry_safe(qunit, tmp, &qunit_hash[i], lq_hash) { if (qunit->lq_ctxt != qctxt) continue; - remove_qunit_nolock(qunit); - /* wake up all waiters */ - list_for_each_entry_safe(qw, tmp2, &qunit->lq_waiters, - qw_entry) { - list_del_init(&qw->qw_entry); - qw->qw_rc = 0; - wake_up(&qw->qw_waitq); - } - qunit_put(qunit); + list_add(&qunit->lq_hash, &tmp_list); } } - spin_unlock(&qunit_hash_lock); + list_for_each_entry_safe(qunit, tmp, &tmp_list, lq_hash) { + list_del_init(&qunit->lq_hash); + compute_lqs_after_removing_qunit(qunit); + + /* wake up all waiters */ + QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, 0); + wake_up_all(&qunit->lq_waitq); + qunit_put(qunit); + } + + lustre_hash_for_each_safe(qctxt->lqc_lqs_hash, hash_put_lqs, NULL); + down_write(&obt->obt_rwsem); + lustre_hash_exit(qctxt->lqc_lqs_hash); + qctxt->lqc_lqs_hash = NULL; + up_write(&obt->obt_rwsem); + + /* after qctxt_cleanup, qctxt might be freed, then check_qm() is + * unpredicted. So we must wait until lqc_wait_for_qmaster is empty */ + while (cfs_waitq_active(&qctxt->lqc_wait_for_qmaster)) { + cfs_waitq_signal(&qctxt->lqc_wait_for_qmaster); + cfs_schedule_timeout(CFS_TASK_INTERRUPTIBLE, + cfs_time_seconds(1)); + } + ptlrpcd_decref(); +#ifdef LPROCFS + if (lquota_proc_cleanup(qctxt)) + CERROR("cleanup proc error!\n"); +#endif + EXIT; } @@ -827,11 +1332,20 @@ static int qslave_recovery_main(void *arg) ptlrpc_daemonize("qslave_recovd"); + /* for obdfilter */ + class_incref(obd, "qslave_recovd_filter", obd); + complete(&data->comp); - if (qctxt->lqc_recovery) + spin_lock(&qctxt->lqc_lock); + if (qctxt->lqc_recovery) { + spin_unlock(&qctxt->lqc_lock); + class_decref(obd, "qslave_recovd_filter", obd); RETURN(0); - qctxt->lqc_recovery = 1; + } else { + qctxt->lqc_recovery = 1; + spin_unlock(&qctxt->lqc_lock); + } for (type = USRQUOTA; type < MAXQUOTAS; type++) { struct qunit_data qdata; @@ -847,8 +1361,8 @@ static int qslave_recovery_main(void *arg) } LASSERT(dqopt->files[type] != NULL); - INIT_LIST_HEAD(&id_list); -#ifndef KERNEL_SUPPORTS_QUOTA_READ + CFS_INIT_LIST_HEAD(&id_list); +#ifndef KERNEL_SUPPORTS_QUOTA_READ rc = fsfilt_qids(obd, dqopt->files[type], NULL, type, &id_list); #else rc = fsfilt_qids(obd, NULL, dqopt->files[type], type, &id_list); @@ -860,35 +1374,41 @@ static int qslave_recovery_main(void *arg) list_for_each_entry_safe(dqid, tmp, &id_list, di_link) { list_del_init(&dqid->di_link); /* skip slave recovery on itself */ - if (is_master(obd, qctxt, dqid->di_id, type)) + if (is_master(qctxt)) goto free; if (rc && rc != -EBUSY) goto free; qdata.qd_id = dqid->di_id; - qdata.qd_flags = 0; - qdata.qd_flags |= type; - qdata.qd_flags |= QUOTA_IS_BLOCK; + qdata.qd_flags = type; + QDATA_SET_BLK(&qdata); qdata.qd_count = 0; ret = check_cur_qunit(obd, qctxt, &qdata); if (ret > 0) { int opc; opc = ret == 1 ? QUOTA_DQACQ : QUOTA_DQREL; - rc = split_before_schedule_dqacq(obd, qctxt, &qdata, opc, 0); - } else + rc = schedule_dqacq(obd, qctxt, &qdata, opc, + 0, NULL); + if (rc == -EDQUOT) + rc = 0; + } else { rc = 0; + } if (rc) CDEBUG(rc == -EBUSY ? D_QUOTA : D_ERROR, "qslave recovery failed! (id:%d type:%d " " rc:%d)\n", dqid->di_id, type, rc); free: - kfree(dqid); + OBD_FREE_PTR(dqid); } } + spin_lock(&qctxt->lqc_lock); qctxt->lqc_recovery = 0; + spin_unlock(&qctxt->lqc_lock); + class_decref(obd, "qslave_recovd_filter", obd); RETURN(rc); } @@ -916,3 +1436,98 @@ exit: EXIT; } + +/** + * lqs<->qctxt hash operations + */ + +/** + * string hashing using djb2 hash algorithm + */ +static unsigned +lqs_hash(lustre_hash_t *lh, void *key, unsigned mask) +{ + struct quota_adjust_qunit *lqs_key; + unsigned hash; + ENTRY; + + LASSERT(key); + lqs_key = (struct quota_adjust_qunit *)key; + hash = (QAQ_IS_GRP(lqs_key) ? 5381 : 5387) * lqs_key->qaq_id; + + RETURN(hash & mask); +} + +static int +lqs_compare(void *key, struct hlist_node *hnode) +{ + struct lustre_qunit_size *q; + int rc; + ENTRY; + + LASSERT(key); + q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); + + spin_lock(&q->lqs_lock); + rc = (q->lqs_key == *((unsigned long long *)key)); + spin_unlock(&q->lqs_lock); + + RETURN(rc); +} + +static void * +lqs_get(struct hlist_node *hnode) +{ + struct lustre_qunit_size *q = + hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); + ENTRY; + + atomic_inc(&q->lqs_refcount); + CDEBUG(D_QUOTA, "lqs=%p refcount %d\n", + q, atomic_read(&q->lqs_refcount)); + + RETURN(q); +} + +static void * +lqs_put(struct hlist_node *hnode) +{ + struct lustre_qunit_size *q = + hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); + ENTRY; + + LASSERT(atomic_read(&q->lqs_refcount) > 0); + atomic_dec(&q->lqs_refcount); + CDEBUG(D_QUOTA, "lqs=%p refcount %d\n", + q, atomic_read(&q->lqs_refcount)); + + RETURN(q); +} + +static void +lqs_exit(struct hlist_node *hnode) +{ + struct lustre_qunit_size *q; + ENTRY; + + q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); + /* + * Nothing should be left. User of lqs put it and + * lqs also was deleted from table by this time + * so we should have 0 refs. + */ + LASSERTF(atomic_read(&q->lqs_refcount) == 0, + "Busy lqs %p with %d refs\n", q, + atomic_read(&q->lqs_refcount)); + OBD_FREE_PTR(q); + EXIT; +} + +static lustre_hash_ops_t lqs_hash_ops = { + .lh_hash = lqs_hash, + .lh_compare = lqs_compare, + .lh_get = lqs_get, + .lh_put = lqs_put, + .lh_exit = lqs_exit +}; +#endif /* HAVE_QUOTA_SUPPORT */