X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fquota%2Fquota_interface.c;h=f257cd57a4a7f8645d2236a2089409e2517dd451;hp=d7084a983bc678580b644dec4be43244d28e6e65;hb=c159c408293fbebf71a948e630aa9f637f3c8ffe;hpb=d2d56f38da01001c92a09afc6b52b5acbd9bc13c diff --git a/lustre/quota/quota_interface.c b/lustre/quota/quota_interface.c index d7084a9..f257cd5 100644 --- a/lustre/quota/quota_interface.c +++ b/lustre/quota/quota_interface.c @@ -1,19 +1,43 @@ /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*- * vim:expandtab:shiftwidth=8:tabstop=8: * - * lustre/quota/quota_interface.c + * GPL HEADER START * - * Copyright (c) 2001-2005 Cluster File Systems, Inc. + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * - * This file is part of Lustre, http://www.lustre.org. + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. * - * No redistribution or use is permitted outside of Cluster File Systems, Inc. + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf + * + * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, + * CA 95054 USA or visit www.sun.com if you need additional information or + * have any questions. + * + * GPL HEADER END + */ +/* + * Copyright 2008 Sun Microsystems, Inc. All rights reserved + * Use is subject to license terms. + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + * Lustre is a trademark of Sun Microsystems, Inc. */ + #ifndef EXPORT_SYMTAB # define EXPORT_SYMTAB #endif -#define DEBUG_SUBSYSTEM S_MDS +#define DEBUG_SUBSYSTEM S_LQUOTA #ifdef __KERNEL__ # include @@ -21,7 +45,6 @@ # include # include # include -# include # if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)) # include # include @@ -44,245 +67,12 @@ #include #include "quota_internal.h" - #ifdef __KERNEL__ -/* quota proc file handling functions */ -#ifdef LPROCFS -int lprocfs_rd_bunit(char *page, char **start, off_t off, int count, - int *eof, void *data) -{ - struct obd_device *obd = (struct obd_device *)data; - LASSERT(obd != NULL); - - return snprintf(page, count, "%lu\n", - obd->u.obt.obt_qctxt.lqc_bunit_sz); -} -EXPORT_SYMBOL(lprocfs_rd_bunit); - -int lprocfs_rd_iunit(char *page, char **start, off_t off, int count, - int *eof, void *data) -{ - struct obd_device *obd = (struct obd_device *)data; - LASSERT(obd != NULL); - - return snprintf(page, count, "%lu\n", - obd->u.obt.obt_qctxt.lqc_iunit_sz); -} -EXPORT_SYMBOL(lprocfs_rd_iunit); - -int lprocfs_wr_bunit(struct file *file, const char *buffer, - unsigned long count, void *data) -{ - struct obd_device *obd = (struct obd_device *)data; - int val, rc; - LASSERT(obd != NULL); - - rc = lprocfs_write_helper(buffer, count, &val); - - if (rc) - return rc; - - if (val % QUOTABLOCK_SIZE || - val <= obd->u.obt.obt_qctxt.lqc_btune_sz) - return -EINVAL; - - obd->u.obt.obt_qctxt.lqc_bunit_sz = val; - return count; -} -EXPORT_SYMBOL(lprocfs_wr_bunit); - -int lprocfs_wr_iunit(struct file *file, const char *buffer, - unsigned long count, void *data) -{ - struct obd_device *obd = (struct obd_device *)data; - int val, rc; - LASSERT(obd != NULL); - - rc = lprocfs_write_helper(buffer, count, &val); - if (rc) - return rc; - - if (val <= obd->u.obt.obt_qctxt.lqc_itune_sz) - return -EINVAL; - - obd->u.obt.obt_qctxt.lqc_iunit_sz = val; - return count; -} -EXPORT_SYMBOL(lprocfs_wr_iunit); - -int lprocfs_rd_btune(char *page, char **start, off_t off, int count, - int *eof, void *data) -{ - struct obd_device *obd = (struct obd_device *)data; - LASSERT(obd != NULL); - - return snprintf(page, count, "%lu\n", - obd->u.obt.obt_qctxt.lqc_btune_sz); -} -EXPORT_SYMBOL(lprocfs_rd_btune); - -int lprocfs_rd_itune(char *page, char **start, off_t off, int count, - int *eof, void *data) -{ - struct obd_device *obd = (struct obd_device *)data; - LASSERT(obd != NULL); - - return snprintf(page, count, "%lu\n", - obd->u.obt.obt_qctxt.lqc_itune_sz); -} -EXPORT_SYMBOL(lprocfs_rd_itune); - -int lprocfs_wr_btune(struct file *file, const char *buffer, - unsigned long count, void *data) -{ - struct obd_device *obd = (struct obd_device *)data; - int val, rc; - LASSERT(obd != NULL); - - rc = lprocfs_write_helper(buffer, count, &val); - if (rc) - return rc; - - if (val <= QUOTABLOCK_SIZE * MIN_QLIMIT || val % QUOTABLOCK_SIZE || - val >= obd->u.obt.obt_qctxt.lqc_bunit_sz) - return -EINVAL; - - obd->u.obt.obt_qctxt.lqc_btune_sz = val; - return count; -} -EXPORT_SYMBOL(lprocfs_wr_btune); - -int lprocfs_wr_itune(struct file *file, const char *buffer, - unsigned long count, void *data) -{ - struct obd_device *obd = (struct obd_device *)data; - int val, rc; - LASSERT(obd != NULL); - - rc = lprocfs_write_helper(buffer, count, &val); - if (rc) - return rc; - - if (val <= MIN_QLIMIT || - val >= obd->u.obt.obt_qctxt.lqc_iunit_sz) - return -EINVAL; - - obd->u.obt.obt_qctxt.lqc_itune_sz = val; - return count; -} -EXPORT_SYMBOL(lprocfs_wr_itune); - -#define USER_QUOTA 1 -#define GROUP_QUOTA 2 - -#define MAX_STYPE_SIZE 4 -int lprocfs_rd_type(char *page, char **start, off_t off, int count, - int *eof, void *data) -{ - struct obd_device *obd = (struct obd_device *)data; - char stype[MAX_STYPE_SIZE + 1] = ""; - int type = obd->u.obt.obt_qctxt.lqc_atype; - LASSERT(obd != NULL); - - if (type == 0) { - strcpy(stype, "off"); - } else { - if (type & USER_QUOTA) - strcat(stype, "u"); - if (type & GROUP_QUOTA) - strcat(stype, "g"); - } - - return snprintf(page, count, "%s\n", stype); -} -EXPORT_SYMBOL(lprocfs_rd_type); - -static int auto_quota_on(struct obd_device *obd, int type, - struct super_block *sb, int is_master) -{ - struct obd_quotactl *oqctl; - struct lvfs_run_ctxt saved; - int rc; - ENTRY; - - LASSERT(type == USRQUOTA || type == GRPQUOTA || type == UGQUOTA); - - /* quota already turned on */ - if (obd->u.obt.obt_qctxt.lqc_status) - RETURN(0); - - OBD_ALLOC_PTR(oqctl); - if (!oqctl) - RETURN(-ENOMEM); - oqctl->qc_type = type; - oqctl->qc_cmd = Q_QUOTAON; - oqctl->qc_id = QFMT_LDISKFS; +#ifdef HAVE_QUOTA_SUPPORT - push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - - if (!is_master) - goto local_quota; - - /* turn on cluster wide quota */ - rc = mds_admin_quota_on(obd, oqctl); - if (rc) { - CDEBUG(rc == -ENOENT ? D_QUOTA : D_ERROR, - "auto-enable admin quota failed. rc=%d\n", rc); - GOTO(out_pop, rc); - } -local_quota: - /* turn on local quota */ - rc = fsfilt_quotactl(obd, sb, oqctl); - if (rc) { - CDEBUG(rc == -ENOENT ? D_QUOTA : D_ERROR, - "auto-enable local quota failed. rc=%d\n", rc); - if (is_master) - mds_quota_off(obd, oqctl); - } else { - obd->u.obt.obt_qctxt.lqc_status = 1; - } -out_pop: - pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); - - OBD_FREE_PTR(oqctl); - RETURN(rc); -} - - -int lprocfs_wr_type(struct file *file, const char *buffer, - unsigned long count, void *data) -{ - struct obd_device *obd = (struct obd_device *)data; - struct obd_device_target *obt = &obd->u.obt; - int type = 0; - char stype[MAX_STYPE_SIZE + 1] = ""; - LASSERT(obd != NULL); - - if (copy_from_user(stype, buffer, MAX_STYPE_SIZE)) - return -EFAULT; - - if (strchr(stype, 'u')) - type |= USER_QUOTA; - if (strchr(stype, 'g')) - type |= GROUP_QUOTA; - - obt->obt_qctxt.lqc_atype = type; - - if (type == 0) - return count; - - if (!strcmp(obd->obd_type->typ_name, LUSTRE_MDS_NAME)) - auto_quota_on(obd, type - 1, obt->obt_sb, 1); - else if (!strcmp(obd->obd_type->typ_name, LUSTRE_OST_NAME)) - auto_quota_on(obd, type - 1, obt->obt_sb, 0); - else - return -EFAULT; - - return count; -} -EXPORT_SYMBOL(lprocfs_wr_type); -#endif /* LPROCFS */ +static cfs_time_t last_print = 0; +static spinlock_t last_print_lock = SPIN_LOCK_UNLOCKED; static int filter_quota_setup(struct obd_device *obd) { @@ -290,41 +80,73 @@ static int filter_quota_setup(struct obd_device *obd) struct obd_device_target *obt = &obd->u.obt; ENTRY; + init_rwsem(&obt->obt_rwsem); + obt->obt_qfmt = LUSTRE_QUOTA_V2; atomic_set(&obt->obt_quotachecking, 1); - rc = qctxt_init(&obt->obt_qctxt, obt->obt_sb, NULL); - if (rc) { + rc = qctxt_init(obd, NULL); + if (rc) CERROR("initialize quota context failed! (rc:%d)\n", rc); - RETURN(rc); - } RETURN(rc); } static int filter_quota_cleanup(struct obd_device *obd) { + ENTRY; qctxt_cleanup(&obd->u.obt.obt_qctxt, 0); - return 0; + RETURN(0); } -static int filter_quota_setinfo(struct obd_export *exp, struct obd_device *obd) +static int filter_quota_setinfo(struct obd_device *obd, void *data) { + struct obd_export *exp = data; + struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt; struct obd_import *imp; + ENTRY; /* setup the quota context import */ - obd->u.obt.obt_qctxt.lqc_import = exp->exp_imp_reverse; + spin_lock(&qctxt->lqc_lock); + qctxt->lqc_import = exp->exp_imp_reverse; + spin_unlock(&qctxt->lqc_lock); + CDEBUG(D_QUOTA, "%s: lqc_import(%p) of obd(%p) is reactivated now, \n", + obd->obd_name,exp->exp_imp_reverse, obd); - /* make imp's connect flags equal relative exp's connect flags + /* make imp's connect flags equal relative exp's connect flags * adding it to avoid the scan export list */ - imp = exp->exp_imp_reverse; - if (imp) - imp->imp_connect_data.ocd_connect_flags |= - (exp->exp_connect_flags & OBD_CONNECT_QUOTA64); + imp = qctxt->lqc_import; + if (likely(imp)) + imp->imp_connect_data.ocd_connect_flags |= + (exp->exp_connect_flags & + (OBD_CONNECT_QUOTA64 | OBD_CONNECT_CHANGE_QS)); + cfs_waitq_signal(&qctxt->lqc_wait_for_qmaster); /* start quota slave recovery thread. (release high limits) */ - qslave_start_recovery(obd, &obd->u.obt.obt_qctxt); - return 0; + qslave_start_recovery(obd, qctxt); + RETURN(0); +} + +static int filter_quota_clearinfo(struct obd_export *exp, struct obd_device *obd) +{ + struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt; + ENTRY; + + /* lquota may be not set up before destroying export, b=14896 */ + if (!obd->obd_set_up) + RETURN(0); + + /* when exp->exp_imp_reverse is destroyed, the corresponding lqc_import + * should be invalid b=12374 */ + if (qctxt->lqc_import && qctxt->lqc_import == exp->exp_imp_reverse) { + spin_lock(&qctxt->lqc_lock); + qctxt->lqc_import = NULL; + spin_unlock(&qctxt->lqc_lock); + CDEBUG(D_QUOTA, "%s: lqc_import of obd(%p) is invalid now.\n", + obd->obd_name, obd); + } + RETURN(0); } + static int filter_quota_enforce(struct obd_device *obd, unsigned int ignore) { ENTRY; @@ -332,10 +154,12 @@ static int filter_quota_enforce(struct obd_device *obd, unsigned int ignore) if (!sb_any_quota_enabled(obd->u.obt.obt_sb)) RETURN(0); - if (ignore) - cap_raise(current->cap_effective, CAP_SYS_RESOURCE); - else - cap_lower(current->cap_effective, CAP_SYS_RESOURCE); + if (ignore) { + CDEBUG(D_QUOTA, "blocks will be written with ignoring quota.\n"); + cfs_cap_raise(CFS_CAP_SYS_RESOURCE); + } else { + cfs_cap_lower(CFS_CAP_SYS_RESOURCE); + } RETURN(0); } @@ -343,6 +167,7 @@ static int filter_quota_enforce(struct obd_device *obd, unsigned int ignore) static int filter_quota_getflag(struct obd_device *obd, struct obdo *oa) { struct obd_device_target *obt = &obd->u.obt; + struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt; int err, cnt, rc = 0; struct obd_quotactl *oqctl; ENTRY; @@ -350,15 +175,42 @@ static int filter_quota_getflag(struct obd_device *obd, struct obdo *oa) if (!sb_any_quota_enabled(obt->obt_sb)) RETURN(0); - oa->o_flags &= ~(OBD_FL_NO_USRQUOTA | OBD_FL_NO_GRPQUOTA); - OBD_ALLOC_PTR(oqctl); if (!oqctl) { CERROR("Not enough memory!"); RETURN(-ENOMEM); } + /* set over quota flags for a uid/gid */ + oa->o_valid |= OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA; + oa->o_flags &= ~(OBD_FL_NO_USRQUOTA | OBD_FL_NO_GRPQUOTA); + for (cnt = 0; cnt < MAXQUOTAS; cnt++) { + struct quota_adjust_qunit oqaq_tmp; + struct lustre_qunit_size *lqs = NULL; + + oqaq_tmp.qaq_flags = cnt; + oqaq_tmp.qaq_id = (cnt == USRQUOTA) ? oa->o_uid : oa->o_gid; + + quota_search_lqs(NULL, &oqaq_tmp, qctxt, &lqs); + if (lqs) { + spin_lock(&lqs->lqs_lock); + if (lqs->lqs_bunit_sz <= qctxt->lqc_sync_blk) { + oa->o_flags |= (cnt == USRQUOTA) ? + OBD_FL_NO_USRQUOTA : OBD_FL_NO_GRPQUOTA; + spin_unlock(&lqs->lqs_lock); + CDEBUG(D_QUOTA, "set sync flag: bunit(%lu), " + "sync_blk(%d)\n", lqs->lqs_bunit_sz, + qctxt->lqc_sync_blk); + /* this is for quota_search_lqs */ + lqs_putref(lqs); + continue; + } + spin_unlock(&lqs->lqs_lock); + /* this is for quota_search_lqs */ + lqs_putref(lqs); + } + memset(oqctl, 0, sizeof(*oqctl)); oqctl->qc_cmd = Q_GETQUOTA; @@ -368,14 +220,13 @@ static int filter_quota_getflag(struct obd_device *obd, struct obdo *oa) if (err) { if (!rc) rc = err; + oa->o_valid &= ~((cnt == USRQUOTA) ? OBD_MD_FLUSRQUOTA : + OBD_MD_FLGRPQUOTA); continue; } - /* set over quota flags for a uid/gid */ - oa->o_valid |= (cnt == USRQUOTA) ? - OBD_MD_FLUSRQUOTA : OBD_MD_FLGRPQUOTA; if (oqctl->qc_dqblk.dqb_bhardlimit && - (toqb(oqctl->qc_dqblk.dqb_curspace) > + (toqb(oqctl->qc_dqblk.dqb_curspace) >= oqctl->qc_dqblk.dqb_bhardlimit)) oa->o_flags |= (cnt == USRQUOTA) ? OBD_FL_NO_USRQUOTA : OBD_FL_NO_GRPQUOTA; @@ -384,58 +235,301 @@ static int filter_quota_getflag(struct obd_device *obd, struct obdo *oa) RETURN(rc); } -static int filter_quota_acquire(struct obd_device *obd, unsigned int uid, - unsigned int gid) +/** + * check whether the left quota of certain uid and gid can satisfy a block_write + * or inode_create rpc. When need to acquire quota, return QUOTA_RET_ACQUOTA + */ +static int quota_check_common(struct obd_device *obd, unsigned int uid, + unsigned int gid, int count, int cycle, int isblk, + struct inode *inode, int frags, int *pending) { struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt; - int rc; + int i; + __u32 id[MAXQUOTAS] = { uid, gid }; + struct qunit_data qdata[MAXQUOTAS]; + int mb = 0; + int rc = 0, rc2[2] = { 0, 0 }; ENTRY; - rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, 1, 1); - RETURN(rc == -EAGAIN); -} + CLASSERT(MAXQUOTAS < 4); + if (!sb_any_quota_enabled(qctxt->lqc_sb)) + RETURN(rc); -static int mds_quota_init(void) -{ - return lustre_dquot_init(); + spin_lock(&qctxt->lqc_lock); + if (!qctxt->lqc_valid){ + spin_unlock(&qctxt->lqc_lock); + RETURN(rc); + } + spin_unlock(&qctxt->lqc_lock); + + for (i = 0; i < MAXQUOTAS; i++) { + struct lustre_qunit_size *lqs = NULL; + + qdata[i].qd_id = id[i]; + qdata[i].qd_flags = i; + if (isblk) + QDATA_SET_BLK(&qdata[i]); + qdata[i].qd_count = 0; + + /* ignore root user */ + if (qdata[i].qd_id == 0 && !QDATA_IS_GRP(&qdata[i])) + continue; + + quota_search_lqs(&qdata[i], NULL, qctxt, &lqs); + if (!lqs) + continue; + + rc2[i] = compute_remquota(obd, qctxt, &qdata[i], isblk); + spin_lock(&lqs->lqs_lock); + if (!cycle) { + if (isblk) { + *pending = count * CFS_PAGE_SIZE; + /* in order to complete this write, we need extra + * meta blocks. This function can get it through + * data needed to be written b=16542 */ + if (inode) { + mb = *pending; + rc = fsfilt_get_mblk(obd, qctxt->lqc_sb, + &mb, inode,frags); + if (rc) + CDEBUG(D_ERROR, + "can't get extra " + "meta blocks.\n"); + else + *pending += mb; + } + lqs->lqs_bwrite_pending += *pending; + } else { + *pending = count; + lqs->lqs_iwrite_pending += *pending; + } + } + if (rc2[i] == QUOTA_RET_OK) { + if (isblk && qdata[i].qd_count < lqs->lqs_bwrite_pending) + rc2[i] = QUOTA_RET_ACQUOTA; + if (!isblk && qdata[i].qd_count < + lqs->lqs_iwrite_pending) + rc2[i] = QUOTA_RET_ACQUOTA; + } + spin_unlock(&lqs->lqs_lock); + CDEBUG(D_QUOTA, "count: %d, lqs pending: %lu, qd_count: "LPU64 + ", metablocks: %d, isblk: %d, pending: %d.\n", count, + isblk ? lqs->lqs_bwrite_pending : lqs->lqs_iwrite_pending, + qdata[i].qd_count, mb, isblk, *pending); + + /* When cycle is zero, lqs_*_pending will be changed. We will + * get reference of the lqs here and put reference of lqs in + * quota_pending_commit b=14784 */ + if (!cycle) + lqs_getref(lqs); + + /* this is for quota_search_lqs */ + lqs_putref(lqs); + } + + if (rc2[0] == QUOTA_RET_ACQUOTA || rc2[1] == QUOTA_RET_ACQUOTA) + RETURN(QUOTA_RET_ACQUOTA); + else + RETURN(rc); } -static int mds_quota_exit(void) +static int quota_chk_acq_common(struct obd_device *obd, unsigned int uid, + unsigned int gid, int count, int *pending, + quota_acquire acquire, + struct obd_trans_info *oti, int isblk, + struct inode *inode, int frags) { - lustre_dquot_exit(); - return 0; + struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt; + struct timeval work_start; + struct timeval work_end; + long timediff; + struct l_wait_info lwi = { 0 }; + int rc = 0, cycle = 0, count_err = 1; + ENTRY; + + CDEBUG(D_QUOTA, "check quota for %s\n", obd->obd_name); + *pending = 0; + /* Unfortunately, if quota master is too busy to handle the + * pre-dqacq in time and quota hash on ost is used up, we + * have to wait for the completion of in flight dqacq/dqrel, + * in order to get enough quota for write b=12588 */ + do_gettimeofday(&work_start); + while ((rc = quota_check_common(obd, uid, gid, count, cycle, isblk, + inode, frags, pending)) & + QUOTA_RET_ACQUOTA) { + + spin_lock(&qctxt->lqc_lock); + if (!qctxt->lqc_import && oti) { + spin_unlock(&qctxt->lqc_lock); + + LASSERT(oti && oti->oti_thread && + oti->oti_thread->t_watchdog); + + lc_watchdog_disable(oti->oti_thread->t_watchdog); + CDEBUG(D_QUOTA, "sleep for quota master\n"); + l_wait_event(qctxt->lqc_wait_for_qmaster, check_qm(qctxt), + &lwi); + CDEBUG(D_QUOTA, "wake up when quota master is back\n"); + lc_watchdog_touch(oti->oti_thread->t_watchdog); + } else { + spin_unlock(&qctxt->lqc_lock); + } + + cycle++; + if (isblk) + OBD_FAIL_TIMEOUT(OBD_FAIL_OST_HOLD_WRITE_RPC, 90); + /* after acquire(), we should run quota_check_common again + * so that we confirm there are enough quota to finish write */ + rc = acquire(obd, uid, gid, oti, isblk); + + /* please reference to dqacq_completion for the below */ + /* a new request is finished, try again */ + if (rc == -EAGAIN) { + CDEBUG(D_QUOTA, "finish a quota req, try again\n"); + continue; + } + + /* it is out of quota already */ + if (rc == -EDQUOT) { + CDEBUG(D_QUOTA, "out of quota, return -EDQUOT\n"); + break; + } + + /* -EBUSY and others, wait a second and try again */ + if (rc < 0) { + cfs_waitq_t waitq; + struct l_wait_info lwi; + + if (oti && oti->oti_thread && oti->oti_thread->t_watchdog) + lc_watchdog_touch(oti->oti_thread->t_watchdog); + CDEBUG(D_QUOTA, "rc: %d, count_err: %d\n", rc, + count_err++); + + init_waitqueue_head(&waitq); + lwi = LWI_TIMEOUT(cfs_time_seconds(min(cycle, 10)), NULL, + NULL); + l_wait_event(waitq, 0, &lwi); + } + + if (rc < 0 || cycle % 10 == 2) { + spin_lock(&last_print_lock); + if (last_print == 0 || + cfs_time_before((last_print + cfs_time_seconds(30)), + cfs_time_current())) { + last_print = cfs_time_current(); + spin_unlock(&last_print_lock); + CWARN("still haven't managed to acquire quota " + "space from the quota master after %d " + "retries (err=%d, rc=%d)\n", + cycle, count_err - 1, rc); + } else { + spin_unlock(&last_print_lock); + } + } + + CDEBUG(D_QUOTA, "recheck quota with rc: %d, cycle: %d\n", rc, + cycle); + } + do_gettimeofday(&work_end); + timediff = cfs_timeval_sub(&work_end, &work_start, NULL); + lprocfs_counter_add(qctxt->lqc_stats, + isblk ? LQUOTA_WAIT_FOR_CHK_BLK : + LQUOTA_WAIT_FOR_CHK_INO, + timediff); + + RETURN(rc); } -/* check whether the left quota of certain uid and uid can satisfy a write rpc - * when need to acquire quota, return QUOTA_RET_ACQUOTA */ -static int filter_quota_check(struct obd_device *obd, unsigned int uid, - unsigned int gid, int npage) +/** + * when a block_write or inode_create rpc is finished, adjust the record for + * pending blocks and inodes + */ +static int quota_pending_commit(struct obd_device *obd, unsigned int uid, + unsigned int gid, int pending, int isblk) { struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt; + struct timeval work_start; + struct timeval work_end; + long timediff; int i; __u32 id[MAXQUOTAS] = { uid, gid }; struct qunit_data qdata[MAXQUOTAS]; - int rc; ENTRY; + CDEBUG(D_QUOTA, "commit pending quota for %s\n", obd->obd_name); CLASSERT(MAXQUOTAS < 4); if (!sb_any_quota_enabled(qctxt->lqc_sb)) RETURN(0); + do_gettimeofday(&work_start); for (i = 0; i < MAXQUOTAS; i++) { + struct lustre_qunit_size *lqs = NULL; + qdata[i].qd_id = id[i]; qdata[i].qd_flags = i; - qdata[i].qd_flags |= QUOTA_IS_BLOCK; + if (isblk) + QDATA_SET_BLK(&qdata[i]); qdata[i].qd_count = 0; - qctxt_wait_pending_dqacq(qctxt, id[i], i, 1); - rc = compute_remquota(obd, qctxt, &qdata[i]); - if (rc == QUOTA_RET_OK && - qdata[i].qd_count < npage * CFS_PAGE_SIZE) - RETURN(QUOTA_RET_ACQUOTA); + if (qdata[i].qd_id == 0 && !QDATA_IS_GRP(&qdata[i])) + continue; + + quota_search_lqs(&qdata[i], NULL, qctxt, &lqs); + if (lqs) { + int flag = 0; + spin_lock(&lqs->lqs_lock); + if (isblk) { + if (lqs->lqs_bwrite_pending >= pending) { + lqs->lqs_bwrite_pending -= pending; + spin_unlock(&lqs->lqs_lock); + flag = 1; + } else { + spin_unlock(&lqs->lqs_lock); + CDEBUG(D_ERROR, + "there are too many blocks!\n"); + } + } else { + if (lqs->lqs_iwrite_pending >= pending) { + lqs->lqs_iwrite_pending -= pending; + spin_unlock(&lqs->lqs_lock); + flag = 1; + } else { + spin_unlock(&lqs->lqs_lock); + CDEBUG(D_ERROR, + "there are too many files!\n"); + } + } + CDEBUG(D_QUOTA, "lqs pending: %lu, pending: %d, " + "isblk: %d.\n", + isblk ? lqs->lqs_bwrite_pending : + lqs->lqs_iwrite_pending, pending, isblk); + + lqs_putref(lqs); + /* When lqs_*_pening is changed back, we'll putref lqs + * here b=14784 */ + if (flag) + lqs_putref(lqs); + } } + do_gettimeofday(&work_end); + timediff = cfs_timeval_sub(&work_end, &work_start, NULL); + lprocfs_counter_add(qctxt->lqc_stats, + isblk ? LQUOTA_WAIT_FOR_COMMIT_BLK : + LQUOTA_WAIT_FOR_COMMIT_INO, + timediff); - RETURN(rc); + RETURN(0); +} + +static int mds_quota_init(void) +{ + return lustre_dquot_init(); +} + +static int mds_quota_exit(void) +{ + lustre_dquot_exit(); + return 0; } static int mds_quota_setup(struct obd_device *obd) @@ -445,41 +539,83 @@ static int mds_quota_setup(struct obd_device *obd) int rc; ENTRY; + if (unlikely(mds->mds_quota)) { + CWARN("try to reinitialize quota context!\n"); + RETURN(0); + } + + init_rwsem(&obt->obt_rwsem); + obt->obt_qfmt = LUSTRE_QUOTA_V2; + mds->mds_quota_info.qi_version = LUSTRE_QUOTA_V2; atomic_set(&obt->obt_quotachecking, 1); /* initialize quota master and quota context */ sema_init(&mds->mds_qonoff_sem, 1); - rc = qctxt_init(&obt->obt_qctxt, obt->obt_sb, dqacq_handler); + rc = qctxt_init(obd, dqacq_handler); if (rc) { CERROR("initialize quota context failed! (rc:%d)\n", rc); RETURN(rc); } - + mds->mds_quota = 1; RETURN(rc); } static int mds_quota_cleanup(struct obd_device *obd) { + ENTRY; + if (unlikely(!obd->u.mds.mds_quota)) + RETURN(0); + qctxt_cleanup(&obd->u.obt.obt_qctxt, 0); RETURN(0); } +static int mds_quota_setinfo(struct obd_device *obd, void *data) +{ + struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt; + ENTRY; + + if (unlikely(!obd->u.mds.mds_quota)) + RETURN(0); + + if (data != NULL) + QUOTA_MASTER_READY(qctxt); + else + QUOTA_MASTER_UNREADY(qctxt); + RETURN(0); +} + static int mds_quota_fs_cleanup(struct obd_device *obd) { struct mds_obd *mds = &obd->u.mds; - int i; + struct obd_quotactl oqctl; ENTRY; - /* close admin quota files */ + if (unlikely(!mds->mds_quota)) + RETURN(0); + + mds->mds_quota = 0; + memset(&oqctl, 0, sizeof(oqctl)); + oqctl.qc_type = UGQUOTA; + down(&mds->mds_qonoff_sem); - for (i = 0; i < MAXQUOTAS; i++) { - if (mds->mds_quota_info.qi_files[i]) { - filp_close(mds->mds_quota_info.qi_files[i], 0); - mds->mds_quota_info.qi_files[i] = NULL; - } - } + mds_admin_quota_off(obd, &oqctl); up(&mds->mds_qonoff_sem); RETURN(0); } + +static int quota_acquire_common(struct obd_device *obd, unsigned int uid, + unsigned int gid, struct obd_trans_info *oti, + int isblk) +{ + struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt; + int rc; + ENTRY; + + rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, isblk, 1, oti); + RETURN(rc); +} + +#endif /* HAVE_QUOTA_SUPPORT */ #endif /* __KERNEL__ */ struct osc_quota_info { @@ -528,6 +664,7 @@ static inline struct osc_quota_info *find_qinfo(struct client_obd *cli, { unsigned int hashent = hashfn(cli, id, type); struct osc_quota_info *oqi; + ENTRY; LASSERT_SPIN_LOCKED(&qinfo_list_lock); list_for_each_entry(oqi, &qinfo_hash[hashent], oqi_hash) { @@ -535,7 +672,7 @@ static inline struct osc_quota_info *find_qinfo(struct client_obd *cli, oqi->oqi_id == id && oqi->oqi_type == type) return oqi; } - return NULL; + RETURN(NULL); } static struct osc_quota_info *alloc_qinfo(struct client_obd *cli, @@ -548,7 +685,7 @@ static struct osc_quota_info *alloc_qinfo(struct client_obd *cli, if(!oqi) RETURN(NULL); - INIT_LIST_HEAD(&oqi->oqi_hash); + CFS_INIT_LIST_HEAD(&oqi->oqi_hash); oqi->oqi_cli = cli; oqi->oqi_id = id; oqi->oqi_type = type; @@ -561,8 +698,7 @@ static void free_qinfo(struct osc_quota_info *oqi) OBD_SLAB_FREE(oqi, qinfo_cachep, sizeof(*oqi)); } -int osc_quota_chkdq(struct client_obd *cli, - unsigned int uid, unsigned int gid) +int osc_quota_chkdq(struct client_obd *cli, unsigned int uid, unsigned int gid) { unsigned int id; int cnt, rc = QUOTA_OK; @@ -584,8 +720,7 @@ int osc_quota_chkdq(struct client_obd *cli, RETURN(rc); } -int osc_quota_setdq(struct client_obd *cli, - unsigned int uid, unsigned int gid, +int osc_quota_setdq(struct client_obd *cli, unsigned int uid, unsigned int gid, obd_flag valid, obd_flag flags) { unsigned int id; @@ -659,13 +794,13 @@ int osc_quota_init(void) LASSERT(qinfo_cachep == NULL); qinfo_cachep = cfs_mem_cache_create("osc_quota_info", - sizeof(struct osc_quota_info), - 0, 0); + sizeof(struct osc_quota_info), + 0, 0); if (!qinfo_cachep) RETURN(-ENOMEM); for (i = 0; i < NR_DQHASH; i++) - INIT_LIST_HEAD(qinfo_hash + i); + CFS_INIT_LIST_HEAD(qinfo_hash + i); RETURN(0); } @@ -693,6 +828,7 @@ int osc_quota_exit(void) } #ifdef __KERNEL__ +#ifdef HAVE_QUOTA_SUPPORT quota_interface_t mds_quota_interface = { .quota_init = mds_quota_init, .quota_exit = mds_quota_exit, @@ -700,9 +836,13 @@ quota_interface_t mds_quota_interface = { .quota_cleanup = mds_quota_cleanup, .quota_check = target_quota_check, .quota_ctl = mds_quota_ctl, - .quota_fs_cleanup =mds_quota_fs_cleanup, + .quota_setinfo = mds_quota_setinfo, + .quota_fs_cleanup = mds_quota_fs_cleanup, .quota_recovery = mds_quota_recovery, .quota_adjust = mds_quota_adjust, + .quota_chkquota = quota_chk_acq_common, + .quota_acquire = quota_acquire_common, + .quota_pending_commit = quota_pending_commit, }; quota_interface_t filter_quota_interface = { @@ -711,12 +851,16 @@ quota_interface_t filter_quota_interface = { .quota_check = target_quota_check, .quota_ctl = filter_quota_ctl, .quota_setinfo = filter_quota_setinfo, + .quota_clearinfo = filter_quota_clearinfo, .quota_enforce = filter_quota_enforce, .quota_getflag = filter_quota_getflag, - .quota_acquire = filter_quota_acquire, + .quota_acquire = quota_acquire_common, .quota_adjust = filter_quota_adjust, - .quota_chkquota = filter_quota_check, + .quota_chkquota = quota_chk_acq_common, + .quota_adjust_qunit = filter_quota_adjust_qunit, + .quota_pending_commit = quota_pending_commit, }; +#endif #endif /* __KERNEL__ */ quota_interface_t mdc_quota_interface = { @@ -725,6 +869,11 @@ quota_interface_t mdc_quota_interface = { .quota_poll_check = client_quota_poll_check, }; +quota_interface_t lmv_quota_interface = { + .quota_ctl = lmv_quota_ctl, + .quota_check = lmv_quota_check, +}; + quota_interface_t osc_quota_interface = { .quota_ctl = client_quota_ctl, .quota_check = client_quota_check, @@ -734,22 +883,42 @@ quota_interface_t osc_quota_interface = { .quota_chkdq = osc_quota_chkdq, .quota_setdq = osc_quota_setdq, .quota_cleanup = osc_quota_cleanup, + .quota_adjust_qunit = client_quota_adjust_qunit, }; quota_interface_t lov_quota_interface = { - .quota_check = lov_quota_check, .quota_ctl = lov_quota_ctl, + .quota_check = lov_quota_check, + .quota_adjust_qunit = lov_quota_adjust_qunit, }; #ifdef __KERNEL__ + +cfs_proc_dir_entry_t *lquota_type_proc_dir = NULL; + static int __init init_lustre_quota(void) { - int rc = qunit_cache_init(); +#ifdef HAVE_QUOTA_SUPPORT + int rc = 0; + + lquota_type_proc_dir = lprocfs_register(OBD_LQUOTA_DEVICENAME, + proc_lustre_root, + NULL, NULL); + if (IS_ERR(lquota_type_proc_dir)) { + CERROR("LProcFS failed in lquota-init\n"); + rc = PTR_ERR(lquota_type_proc_dir); + return rc; + } + + rc = qunit_cache_init(); if (rc) return rc; + PORTAL_SYMBOL_REGISTER(filter_quota_interface); PORTAL_SYMBOL_REGISTER(mds_quota_interface); +#endif PORTAL_SYMBOL_REGISTER(mdc_quota_interface); + PORTAL_SYMBOL_REGISTER(lmv_quota_interface); PORTAL_SYMBOL_REGISTER(osc_quota_interface); PORTAL_SYMBOL_REGISTER(lov_quota_interface); return 0; @@ -757,24 +926,33 @@ static int __init init_lustre_quota(void) static void /*__exit*/ exit_lustre_quota(void) { - PORTAL_SYMBOL_UNREGISTER(filter_quota_interface); - PORTAL_SYMBOL_UNREGISTER(mds_quota_interface); PORTAL_SYMBOL_UNREGISTER(mdc_quota_interface); + PORTAL_SYMBOL_UNREGISTER(lmv_quota_interface); PORTAL_SYMBOL_UNREGISTER(osc_quota_interface); PORTAL_SYMBOL_UNREGISTER(lov_quota_interface); +#ifdef HAVE_QUOTA_SUPPORT + PORTAL_SYMBOL_UNREGISTER(filter_quota_interface); + PORTAL_SYMBOL_UNREGISTER(mds_quota_interface); qunit_cache_cleanup(); + + if (lquota_type_proc_dir) + lprocfs_remove(&lquota_type_proc_dir); +#endif } -MODULE_AUTHOR("Cluster File Systems, Inc. "); +MODULE_AUTHOR("Sun Microsystems, Inc. "); MODULE_DESCRIPTION("Lustre Quota"); MODULE_LICENSE("GPL"); cfs_module(lquota, "1.0.0", init_lustre_quota, exit_lustre_quota); +#ifdef HAVE_QUOTA_SUPPORT EXPORT_SYMBOL(mds_quota_interface); EXPORT_SYMBOL(filter_quota_interface); +#endif EXPORT_SYMBOL(mdc_quota_interface); +EXPORT_SYMBOL(lmv_quota_interface); EXPORT_SYMBOL(osc_quota_interface); EXPORT_SYMBOL(lov_quota_interface); #endif /* __KERNEL */