};
extern void lustre_swab_quota_adjust_qunit(struct quota_adjust_qunit *q);
-/* flags in qunit_data and quota_adjust_qunit will use macroes below */
+/* flags is shared among quota structures */
#define LQUOTA_FLAGS_GRP 1UL /* 0 is user, 1 is group */
#define LQUOTA_FLAGS_BLK 2UL /* 0 is inode, 1 is block */
#define LQUOTA_FLAGS_ADJBLK 4UL /* adjust the block qunit size */
#define LQUOTA_FLAGS_CHG_QS 16UL /* indicate whether it has capability of
* OBD_CONNECT_CHANGE_QS */
-/* the status of lqsk_flags in struct lustre_qunit_size_key */
+/* flags is specific for quota_adjust_qunit */
+#define LQUOTA_QAQ_CEATE_LQS (1 << 31) /* when it is set, need create lqs */
+
+/* the status of lqs_flags in struct lustre_qunit_size */
#define LQUOTA_QUNIT_FLAGS (LQUOTA_FLAGS_GRP | LQUOTA_FLAGS_BLK)
-#define QAQ_IS_GRP(qaq) ((qaq)->qaq_flags & LQUOTA_FLAGS_GRP)
-#define QAQ_IS_ADJBLK(qaq) ((qaq)->qaq_flags & LQUOTA_FLAGS_ADJBLK)
-#define QAQ_IS_ADJINO(qaq) ((qaq)->qaq_flags & LQUOTA_FLAGS_ADJINO)
+#define QAQ_IS_GRP(qaq) ((qaq)->qaq_flags & LQUOTA_FLAGS_GRP)
+#define QAQ_IS_ADJBLK(qaq) ((qaq)->qaq_flags & LQUOTA_FLAGS_ADJBLK)
+#define QAQ_IS_ADJINO(qaq) ((qaq)->qaq_flags & LQUOTA_FLAGS_ADJINO)
+#define QAQ_IS_CREATE_LQS(qaq) ((qaq)->qaq_flags & LQUOTA_QAQ_CEATE_LQS)
-#define QAQ_SET_GRP(qaq) ((qaq)->qaq_flags |= LQUOTA_FLAGS_GRP)
-#define QAQ_SET_ADJBLK(qaq) ((qaq)->qaq_flags |= LQUOTA_FLAGS_ADJBLK)
-#define QAQ_SET_ADJINO(qaq) ((qaq)->qaq_flags |= LQUOTA_FLAGS_ADJINO)
+#define QAQ_SET_GRP(qaq) ((qaq)->qaq_flags |= LQUOTA_FLAGS_GRP)
+#define QAQ_SET_ADJBLK(qaq) ((qaq)->qaq_flags |= LQUOTA_FLAGS_ADJBLK)
+#define QAQ_SET_ADJINO(qaq) ((qaq)->qaq_flags |= LQUOTA_FLAGS_ADJINO)
+#define QAQ_SET_CREATE_LQS(qaq) ((qaq)->qaq_flags |= LQUOTA_QAQ_CEATE_LQS)
/* inode access permission for remote user, the inode info are omitted,
* for client knows them. */
struct dquot_id {
struct list_head di_link;
__u32 di_id;
+ __u32 di_flag;
};
+/* set inode quota limitation on a quota uid/gid */
+#define QI_SET (1 << 30)
+/* set block quota limitation on a quota uid/gid */
+#define QB_SET (1 << 31)
#define QFILE_CHK 1
#define QFILE_RD_INFO 2
struct lustre_qunit_size {
struct hlist_node lqs_hash; /** the hash entry */
unsigned int lqs_id; /** id of user/group */
- unsigned long lqs_flags; /** is user/group; FULLBUF or LESSBUF */
+ unsigned long lqs_flags; /** 31st bit is QB_SET, 30th bit is QI_SET
+ * other bits are same as LQUOTA_FLAGS_*
+ */
unsigned long lqs_iunit_sz; /** Unit size of file quota currently */
/**
* Trigger dqacq when available file quota
cqget(sb, qctxt->qckt_hash, &qctxt->qckt_list,
dqid->di_id, i,
qctxt->qckt_first_check[i]);
- kfree(dqid);
+ OBD_FREE_PTR(dqid);
}
}
#endif
(char *)&ddquot[i], dqblk_sz))
continue;
- dqid = kmalloc(sizeof(*dqid), GFP_NOFS);
- if (!dqid)
+ OBD_ALLOC_GFP(dqid, sizeof(*dqid), GFP_NOFS);
+ if (!dqid)
GOTO(out_free, rc = -ENOMEM);
- dqid->di_id = le32_to_cpu(ddquot[i].dqb_id);
+ dqid->di_id = le32_to_cpu(ddquot[i].dqb_id);
+ dqid->di_flag = le64_to_cpu(ddquot[i].dqb_ihardlimit) ?
+ QI_SET : 0;
+ dqid->di_flag |= le64_to_cpu(ddquot[i].dqb_bhardlimit) ?
+ QB_SET : 0;
+
INIT_LIST_HEAD(&dqid->di_link);
list_add(&dqid->di_link, list);
}
}
}
- if (type != 0)
+ if (type != 0) {
auto_quota_on(obd, type - 1, obt->obt_sb, is_mds);
+ build_lqs(obd);
+ }
return count;
}
struct lustre_quota_ctxt *qctxt,
int create)
{
- int rc = 0;
struct lustre_qunit_size *lqs;
+ int rc = 0;
search_lqs:
lqs = lustre_hash_lookup(qctxt->lqc_lqs_hash, &lqs_key);
- if (lqs == NULL && create) {
+ if (IS_ERR(lqs))
+ GOTO(out, rc = PTR_ERR(lqs));
+
+ if (create && lqs == NULL) {
+ /* if quota_create_lqs is successful, it will get a
+ * ref to the lqs. The ref will be released when
+ * qctxt_cleanup() or quota is nullified */
lqs = quota_create_lqs(lqs_key, qctxt);
if (IS_ERR(lqs))
rc = PTR_ERR(lqs);
- if (rc == -EALREADY) {
- rc = 0;
- goto search_lqs;
- }
+ if (rc == -EALREADY)
+ GOTO(search_lqs, rc = 0);
+ /* get a reference for the caller when creating lqs
+ * successfully */
+ if (rc == 0)
+ lqs_getref(lqs);
}
- if (lqs)
+ if (lqs && rc == 0)
LQS_DEBUG(lqs, "%s\n",
(create == 1 ? "create lqs" : "search lqs"));
+ out:
if (rc == 0) {
return lqs;
} else {
struct lustre_quota_ctxt *qctxt)
{
struct lustre_qunit_size *lqs = NULL;
- unsigned long *lbunit, *liunit, *lbtune, *litune;
- signed long b_tmp = 0, i_tmp = 0;
- cfs_time_t time_limit = 0;
- int rc = 0;
+ unsigned long *unit, *tune;
+ signed long tmp = 0;
+ cfs_time_t time_limit = 0, *shrink;
+ int i, rc = 0;
ENTRY;
LASSERT(qctxt);
lqs = quota_search_lqs(LQS_KEY(QAQ_IS_GRP(oqaq), oqaq->qaq_id),
- qctxt, 1);
- if (lqs == NULL || IS_ERR(lqs))
+ qctxt, QAQ_IS_CREATE_LQS(oqaq) ? 1 : 0);
+ if (lqs == NULL || IS_ERR(lqs)){
+ CDEBUG(D_ERROR, "fail to find a lqs(%s id: %u)!\n",
+ QAQ_IS_GRP(oqaq) ? "group" : "user", oqaq->qaq_id);
RETURN(PTR_ERR(lqs));
-
- /* deleting the lqs, because a user sets lfs quota 0 0 0 0 */
- if (!oqaq->qaq_bunit_sz && !oqaq->qaq_iunit_sz && QAQ_IS_ADJBLK(oqaq) &&
- QAQ_IS_ADJINO(oqaq)) {
- LQS_DEBUG(lqs, "release lqs\n");
- /* this is for quota_search_lqs */
- lqs_putref(lqs);
- /* kill lqs */
- lqs_putref(lqs);
- RETURN(rc);
}
- lbunit = &lqs->lqs_bunit_sz;
- liunit = &lqs->lqs_iunit_sz;
- lbtune = &lqs->lqs_btune_sz;
- litune = &lqs->lqs_itune_sz;
-
- CDEBUG(D_QUOTA, "before: bunit: %lu, iunit: %lu.\n", *lbunit, *liunit);
+ CDEBUG(D_QUOTA, "before: bunit: %lu, iunit: %lu.\n",
+ lqs->lqs_bunit_sz, lqs->lqs_iunit_sz);
spin_lock(&lqs->lqs_lock);
- /* adjust the slave's block qunit size */
- if (QAQ_IS_ADJBLK(oqaq)) {
- cfs_duration_t sec = cfs_time_seconds(qctxt->lqc_switch_seconds);
-
- b_tmp = *lbunit - oqaq->qaq_bunit_sz;
-
- if (qctxt->lqc_handler && b_tmp > 0)
- lqs->lqs_last_bshrink = cfs_time_current();
-
- if (qctxt->lqc_handler && b_tmp < 0) {
- time_limit = cfs_time_add(lqs->lqs_last_bshrink, sec);
- if (!lqs->lqs_last_bshrink ||
- cfs_time_after(cfs_time_current(), time_limit)) {
- *lbunit = oqaq->qaq_bunit_sz;
- *lbtune = (*lbunit) / 2;
- } else {
- b_tmp = 0;
- }
- } else {
- *lbunit = oqaq->qaq_bunit_sz;
- *lbtune = (*lbunit) / 2;
+ for (i = 0; i < 2; i++) {
+ if (i == 0 && !QAQ_IS_ADJBLK(oqaq))
+ continue;
+
+ if (i == 1 && !QAQ_IS_ADJINO(oqaq))
+ continue;
+
+ tmp = i ? (lqs->lqs_iunit_sz - oqaq->qaq_iunit_sz) :
+ (lqs->lqs_bunit_sz - oqaq->qaq_bunit_sz);
+ shrink = i ? &lqs->lqs_last_ishrink :
+ &lqs->lqs_last_bshrink;
+ time_limit = cfs_time_add(i ? lqs->lqs_last_ishrink :
+ lqs->lqs_last_bshrink,
+ cfs_time_seconds(qctxt->lqc_switch_seconds));
+ unit = i ? &lqs->lqs_iunit_sz : &lqs->lqs_bunit_sz;
+ tune = i ? &lqs->lqs_itune_sz : &lqs->lqs_btune_sz;
+
+ /* quota master shrinks */
+ if (qctxt->lqc_handler && tmp > 0)
+ *shrink = cfs_time_current();
+
+ /* quota master enlarges */
+ if (qctxt->lqc_handler && tmp < 0) {
+ /* in case of ping-pong effect, don't enlarge lqs
+ * in a short time */
+ if (*shrink &&
+ cfs_time_before(cfs_time_current(), time_limit))
+ tmp = 0;
}
- }
- /* adjust the slave's file qunit size */
- if (QAQ_IS_ADJINO(oqaq)) {
- i_tmp = *liunit - oqaq->qaq_iunit_sz;
-
- if (qctxt->lqc_handler && i_tmp > 0)
- lqs->lqs_last_ishrink = cfs_time_current();
-
- if (qctxt->lqc_handler && i_tmp < 0) {
- time_limit = cfs_time_add(lqs->lqs_last_ishrink,
- cfs_time_seconds(qctxt->
- lqc_switch_seconds));
- if (!lqs->lqs_last_ishrink ||
- cfs_time_after(cfs_time_current(), time_limit)) {
- *liunit = oqaq->qaq_iunit_sz;
- *litune = (*liunit) / 2;
- } else {
- i_tmp = 0;
- }
- } else {
- *liunit = oqaq->qaq_iunit_sz;
- *litune = (*liunit) / 2;
+ /* when setquota, don't enlarge lqs b=18616 */
+ if (QAQ_IS_CREATE_LQS(oqaq) && tmp < 0)
+ tmp = 0;
+
+ if (tmp != 0) {
+ *unit = i ? oqaq->qaq_iunit_sz : oqaq->qaq_bunit_sz;
+ *tune = (*unit) / 2;
}
+
+
+ if (tmp > 0)
+ rc |= i ? LQS_INO_DECREASE : LQS_BLK_DECREASE;
+ if (tmp < 0)
+ rc |= i ? LQS_INO_INCREASE : LQS_BLK_INCREASE;
}
spin_unlock(&lqs->lqs_lock);
- CDEBUG(D_QUOTA, "after: bunit: %lu, iunit: %lu.\n", *lbunit, *liunit);
+ CDEBUG(D_QUOTA, "after: bunit: %lu, iunit: %lu.\n",
+ lqs->lqs_bunit_sz, lqs->lqs_iunit_sz);
lqs_putref(lqs);
- if (b_tmp > 0)
- rc |= LQS_BLK_DECREASE;
- else if (b_tmp < 0)
- rc |= LQS_BLK_INCREASE;
-
- if (i_tmp > 0)
- rc |= LQS_INO_DECREASE;
- else if (i_tmp < 0)
- rc |= LQS_INO_INCREASE;
-
RETURN(rc);
}
spin_unlock(&qunit->lq_lock); \
} while(0)
-
int should_translate_quota (struct obd_import *imp)
{
ENTRY;
GOTO(out, ret = 0);
lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id),
- qctxt, 1);
- if (IS_ERR(lqs))
- GOTO (out, ret = PTR_ERR(lqs));
+ qctxt, 0);
+ if (IS_ERR(lqs) || lqs == NULL) {
+ CDEBUG(D_ERROR, "fail to find a lqs(%s id: %u)!\n",
+ QDATA_IS_GRP(qdata) ? "group" : "user", qdata->qd_id);
+ GOTO (out, ret = 0);
+ }
spin_lock(&lqs->lqs_lock);
if (QDATA_IS_BLK(qdata)) {
qunit_put(qunit);
}
+void* quota_barrier(struct lustre_quota_ctxt *qctxt,
+ struct obd_quotactl *oqctl, int isblk)
+{
+ struct lustre_qunit *qunit, *find_qunit;
+ int cycle = 1;
+
+ OBD_SLAB_ALLOC(qunit, qunit_cachep, CFS_ALLOC_IO, sizeof(*qunit));
+ if (qunit == NULL) {
+ CERROR("locating qunit failed.(id=%u isblk=%d %s)\n",
+ oqctl->qc_id, isblk, oqctl->qc_type ? "grp" : "usr");
+ qctxt_wait_pending_dqacq(qctxt, oqctl->qc_id,
+ oqctl->qc_type, isblk);
+ return NULL;
+ }
+
+ INIT_LIST_HEAD(&qunit->lq_hash);
+ qunit->lq_lock = SPIN_LOCK_UNLOCKED;
+ init_waitqueue_head(&qunit->lq_waitq);
+ atomic_set(&qunit->lq_refcnt, 1);
+ qunit->lq_ctxt = qctxt;
+ qunit->lq_data.qd_id = oqctl->qc_id;
+ qunit->lq_data.qd_flags = oqctl->qc_type;
+ if (isblk)
+ QDATA_SET_BLK(&qunit->lq_data);
+ QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0);
+ /* it means it is only an invalid qunit for barrier */
+ qunit->lq_opc = QUOTA_LAST_OPC;
+
+ while (1) {
+ spin_lock(&qunit_hash_lock);
+ find_qunit = dqacq_in_flight(qctxt, &qunit->lq_data);
+ if (find_qunit) {
+ spin_unlock(&qunit_hash_lock);
+ qunit_put(find_qunit);
+ qctxt_wait_pending_dqacq(qctxt, oqctl->qc_id,
+ oqctl->qc_type, isblk);
+ CDEBUG(D_QUOTA, "cycle=%d\n", cycle++);
+ continue;
+ }
+ break;
+ }
+ insert_qunit_nolock(qctxt, qunit);
+ spin_unlock(&qunit_hash_lock);
+ return qunit;
+}
+
+void quota_unbarrier(void *handle)
+{
+ struct lustre_qunit *qunit = (struct lustre_qunit *)handle;
+
+ if (qunit == NULL) {
+ CERROR("handle is NULL\n");
+ return;
+ }
+
+ LASSERT(qunit->lq_opc == QUOTA_LAST_OPC);
+ spin_lock(&qunit_hash_lock);
+ remove_qunit_nolock(qunit);
+ spin_unlock(&qunit_hash_lock);
+ QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, QUOTA_REQ_RETURNED);
+ wake_up(&qunit->lq_waitq);
+ qunit_put(qunit);
+}
+
#define INC_QLIMIT(limit, count) (limit == MIN_QLIMIT) ? \
(limit = count) : (limit += count)
struct qunit_data qdata[MAXQUOTAS];
ENTRY;
- CLASSERT(MAXQUOTAS < 4);
- if (!sb_any_quota_enabled(qctxt->lqc_sb))
+ if (quota_is_set(obd, id, isblk ? QB_SET : QI_SET) == 0)
RETURN(0);
for (i = 0; i < MAXQUOTAS; i++) {
RETURN(rc);
}
+
+void hash_put_lqs(void *obj, void *data)
+{
+ lqs_putref((struct lustre_qunit_size *)obj);
+}
+
void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
{
struct lustre_qunit *qunit, *tmp;
qunit_put(qunit);
}
+ lustre_hash_for_each_safe(qctxt->lqc_lqs_hash, hash_put_lqs, NULL);
down_write(&obt->obt_rwsem);
lustre_hash_exit(qctxt->lqc_lqs_hash);
qctxt->lqc_lqs_hash = NULL;
"qslave recovery failed! (id:%d type:%d "
" rc:%d)\n", dqid->di_id, type, rc);
free:
- kfree(dqid);
+ OBD_FREE_PTR(dqid);
}
}
#ifdef HAVE_QUOTA_SUPPORT
#ifdef __KERNEL__
+
+/* When quotaon, build a lqs for every uid/gid who has been set limitation
+ * for quota. After quota_search_lqs, it will hold one ref for the lqs.
+ * It will be released when qctxt_cleanup() is executed b=18574 */
+void build_lqs(struct obd_device *obd)
+{
+ struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
+ struct list_head id_list;
+ int i, rc;
+
+ INIT_LIST_HEAD(&id_list);
+ for (i = 0; i < MAXQUOTAS; i++) {
+ struct dquot_id *dqid, *tmp;
+
+#ifndef KERNEL_SUPPORTS_QUOTA_READ
+ rc = fsfilt_qids(obd, sb_dqopt(qctxt->lqc_sb)->files[i], NULL,
+ i, &id_list);
+#else
+ rc = fsfilt_qids(obd, NULL, sb_dqopt(qctxt->lqc_sb)->files[i],
+ i, &id_list);
+#endif
+ if (rc) {
+ CDEBUG(D_ERROR, "fail to get %s qids!\n",
+ i ? "group" : "user");
+ continue;
+ }
+
+ list_for_each_entry_safe(dqid, tmp, &id_list,
+ di_link) {
+ struct lustre_qunit_size *lqs;
+
+ list_del_init(&dqid->di_link);
+ lqs = quota_search_lqs(LQS_KEY(i, dqid->di_id),
+ qctxt, 1);
+ if (lqs && !IS_ERR(lqs)) {
+ lqs->lqs_flags |= dqid->di_flag;
+ lqs_putref(lqs);
+ } else {
+ CDEBUG(D_ERROR, "fail to create a lqs"
+ "(%s id: %u)!\n", i ? "group" : "user",
+ dqid->di_id);
+ }
+
+ OBD_FREE_PTR(dqid);
+ }
+ }
+}
+
int mds_quota_ctl(struct obd_device *obd, struct obd_export *unused,
struct obd_quotactl *oqctl)
{
case Q_QUOTAON:
oqctl->qc_id = obt->obt_qfmt; /* override qfmt version */
rc = mds_quota_on(obd, oqctl);
+ /* when quotaon, create lqs for every quota uid/gid b=18574 */
+ build_lqs(obd);
break;
case Q_QUOTAOFF:
oqctl->qc_id = obt->obt_qfmt; /* override qfmt version */
struct obd_device_target *obt = &obd->u.obt;
struct lvfs_run_ctxt saved;
struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
+ struct lustre_qunit_size *lqs;
+ void *handle = NULL;
struct timeval work_start;
struct timeval work_end;
long timediff;
case Q_GETQUOTA:
/* In recovery scenario, this pending dqacq/dqrel might have
* been processed by master successfully before it's dquot
- * on master enter recovery mode. We must wait for this
+ * on master enter recovery mode. We must wait for this
* dqacq/dqrel done then return the correct limits to master */
if (oqctl->qc_stat == QUOTA_RECOVERING)
- qctxt_wait_pending_dqacq(&obd->u.obt.obt_qctxt,
- oqctl->qc_id, oqctl->qc_type,
- 1);
+ handle = quota_barrier(&obd->u.obt.obt_qctxt, oqctl, 1);
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
rc = fsfilt_quotactl(obd, obt->obt_sb, oqctl);
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ if (oqctl->qc_stat == QUOTA_RECOVERING)
+ quota_unbarrier(handle);
+
if (oqctl->qc_cmd == Q_QUOTAON || oqctl->qc_cmd == Q_QUOTAOFF ||
oqctl->qc_cmd == Q_FINVALIDATE) {
if (!rc && oqctl->qc_cmd == Q_QUOTAON)
obt->obt_qctxt.lqc_flags &= ~UGQUOTA2LQC(oqctl->qc_type);
atomic_inc(&obt->obt_quotachecking);
}
+
+ /* when quotaon, create lqs for every quota uid/gid b=18574 */
+ if (oqctl->qc_cmd == Q_QUOTAON)
+ build_lqs(obd);
break;
case Q_SETQUOTA:
/* currently, it is only used for nullifying the quota */
- qctxt_wait_pending_dqacq(&obd->u.obt.obt_qctxt,
- oqctl->qc_id, oqctl->qc_type, 1);
+ handle = quota_barrier(&obd->u.obt.obt_qctxt, oqctl, 1);
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
oqctl->qc_cmd = Q_SETQUOTA;
}
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ quota_unbarrier(handle);
+
+ lqs = quota_search_lqs(LQS_KEY(oqctl->qc_type, oqctl->qc_id),
+ qctxt, 0);
+ if (lqs == NULL || IS_ERR(lqs)){
+ CDEBUG(D_ERROR, "fail to create lqs when setquota\n");
+ } else {
+ lqs->lqs_flags &= ~QB_SET;
+ lqs_putref(lqs);
+ }
+
break;
case Q_INITQUOTA:
{
LASSERT(oqctl->qc_dqblk.dqb_valid == QIF_BLIMITS);
LASSERT(oqctl->qc_dqblk.dqb_bsoftlimit == 0);
- /* There might be a pending dqacq/dqrel (which is going to
- * clear stale limits on slave). we should wait for it's
- * completion then initialize limits */
- qctxt_wait_pending_dqacq(&obd->u.obt.obt_qctxt,
- oqctl->qc_id, oqctl->qc_type, 1);
-
if (!oqctl->qc_dqblk.dqb_bhardlimit)
goto adjust;
+ /* There might be a pending dqacq/dqrel (which is going to
+ * clear stale limits on slave). we should wait for it's
+ * completion then initialize limits */
+ handle = quota_barrier(&obd->u.obt.obt_qctxt, oqctl, 1);
LASSERT(oqctl->qc_dqblk.dqb_bhardlimit == MIN_QLIMIT);
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
oqctl->qc_cmd = Q_INITQUOTA;
}
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ quota_unbarrier(handle);
if (rc)
RETURN(rc);
adjust:
+ lqs = quota_search_lqs(LQS_KEY(oqctl->qc_type, oqctl->qc_id),
+ qctxt, 1);
+ if (lqs == NULL || IS_ERR(lqs)){
+ CDEBUG(D_ERROR, "fail to create lqs when setquota\n");
+ break;
+ } else {
+ lqs->lqs_flags |= QB_SET;
+ lqs_putref(lqs);
+ }
+
/* Trigger qunit pre-acquire */
if (oqctl->qc_type == USRQUOTA)
id[USRQUOTA] = oqctl->qc_id;
int rc = 0, rc2[2] = { 0, 0 };
ENTRY;
- CLASSERT(MAXQUOTAS < 4);
- if (!sb_any_quota_enabled(qctxt->lqc_sb))
- RETURN(rc);
-
spin_lock(&qctxt->lqc_lock);
if (!qctxt->lqc_valid){
spin_unlock(&qctxt->lqc_lock);
RETURN(rc);
}
+int quota_is_set(struct obd_device *obd, const unsigned int id[], int flag)
+{
+ struct lustre_qunit_size *lqs;
+ int i, q_set = 0;
+
+ if (!sb_any_quota_enabled(obd->u.obt.obt_qctxt.lqc_sb))
+ RETURN(0);
+
+ for (i = 0; i < MAXQUOTAS; i++) {
+ lqs = quota_search_lqs(LQS_KEY(i, id[i]),
+ &obd->u.obt.obt_qctxt, 0);
+ if (lqs && !IS_ERR(lqs)) {
+ if (lqs->lqs_flags & flag)
+ q_set = 1;
+ lqs_putref(lqs);
+ }
+ }
+
+ return q_set;
+}
+
static int quota_chk_acq_common(struct obd_device *obd, const unsigned int id[],
int pending[], int count, quota_acquire acquire,
struct obd_trans_info *oti, int isblk,
int rc = 0, cycle = 0, count_err = 1;
ENTRY;
+ if (!quota_is_set(obd, id, isblk ? QB_SET : QI_SET))
+ RETURN(0);
+
CDEBUG(D_QUOTA, "check quota for %s\n", obd->obd_name);
pending[USRQUOTA] = pending[GRPQUOTA] = 0;
/* Unfortunately, if quota master is too busy to handle the
int isblk);
int check_qm(struct lustre_quota_ctxt *qctxt);
void dqacq_interrupt(struct lustre_quota_ctxt *qctxt);
+void* quota_barrier(struct lustre_quota_ctxt *qctxt,
+ struct obd_quotactl *oqctl, int isblk);
+void quota_unbarrier(void *handle);
/* quota_master.c */
int lustre_dquot_init(void);
void lustre_dquot_exit(void);
int quota_adjust_slave_lqs(struct quota_adjust_qunit *oqaq, struct
lustre_quota_ctxt *qctxt);
#ifdef __KERNEL__
+int quota_is_set(struct obd_device *obd, const unsigned int id[], int flag);
struct lustre_qunit_size *quota_search_lqs(unsigned long long lqs_key,
struct lustre_quota_ctxt *qctxt,
int create);
struct lustre_quota_ctxt *qctxt);
int lquota_proc_setup(struct obd_device *obd, int is_master);
int lquota_proc_cleanup(struct lustre_quota_ctxt *qctxt);
+void build_lqs(struct obd_device *obd);
extern cfs_proc_dir_entry_t *lquota_type_proc_dir;
#endif
}
- if (!dquot->dq_dqb.dqb_bhardlimit && !dquot->dq_dqb.dqb_bsoftlimit &&
- !dquot->dq_dqb.dqb_ihardlimit && !dquot->dq_dqb.dqb_isoftlimit) {
- oqaq->qaq_bunit_sz = 0;
- oqaq->qaq_iunit_sz = 0;
- QAQ_SET_ADJBLK(oqaq);
- QAQ_SET_ADJINO(oqaq);
- }
-
QAQ_DEBUG(oqaq, "the oqaq computed\n");
RETURN(rc);
}
static int mds_init_slave_ilimits(struct obd_device *obd,
- struct obd_quotactl *oqctl, int set,
- struct quota_adjust_qunit *oqaq)
+ struct obd_quotactl *oqctl, int set)
{
/* XXX: for file limits only adjust local now */
struct obd_device_target *obt = &obd->u.obt;
struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt;
unsigned int id[MAXQUOTAS] = { 0, 0 };
struct obd_quotactl *ioqc = NULL;
+ struct lustre_qunit_size *lqs;
int flag;
int rc;
ENTRY;
ioqc->qc_dqblk.dqb_valid = QIF_ILIMITS;
ioqc->qc_dqblk.dqb_ihardlimit = flag ? MIN_QLIMIT : 0;
- if (QAQ_IS_ADJINO(oqaq)) {
- /* adjust the mds slave's inode qunit size */
- rc = quota_adjust_slave_lqs(oqaq, qctxt);
- if (rc < 0)
- CDEBUG(D_ERROR, "adjust mds slave's inode qunit size \
- failed! (rc:%d)\n", rc);
+ /* build lqs for mds */
+ lqs = quota_search_lqs(LQS_KEY(oqctl->qc_type, oqctl->qc_id),
+ qctxt, flag ? 1 : 0);
+ if (lqs && !IS_ERR(lqs)) {
+ if (flag)
+ lqs->lqs_flags |= QI_SET;
+ else
+ lqs->lqs_flags &= ~QI_SET;
+ lqs_putref(lqs);
+ } else {
+ CERROR("fail to %s lqs for inode(%s id: %u)!\n",
+ flag ? "create" : "search",
+ oqctl->qc_type ? "group" : "user",
+ oqctl->qc_id);
+ GOTO(out, rc = PTR_ERR(lqs));
}
/* set local limit to MIN_QLIMIT */
}
static int mds_init_slave_blimits(struct obd_device *obd,
- struct obd_quotactl *oqctl, int set,
- struct quota_adjust_qunit *oqaq)
+ struct obd_quotactl *oqctl, int set)
{
struct obd_device_target *obt = &obd->u.obt;
struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt;
struct mds_obd *mds = &obd->u.mds;
struct obd_quotactl *ioqc;
+ struct lustre_qunit_size *lqs;
unsigned int id[MAXQUOTAS] = { 0, 0 };
- int rc, rc1 = 0;
+ int rc;
int flag;
ENTRY;
ioqc->qc_type = oqctl->qc_type;
ioqc->qc_dqblk.dqb_valid = QIF_BLIMITS;
ioqc->qc_dqblk.dqb_bhardlimit = flag ? MIN_QLIMIT : 0;
- if (QAQ_IS_ADJBLK(oqaq)) {
- /* adjust the mds slave's block qunit size */
- rc1 = quota_adjust_slave_lqs(oqaq, qctxt);
- if (rc1 < 0)
- CERROR("adjust mds slave's block qunit size failed!"
- "(rc:%d)\n", rc1);
+
+ /* build lqs for mds */
+ lqs = quota_search_lqs(LQS_KEY(oqctl->qc_type, oqctl->qc_id),
+ qctxt, flag ? 1 : 0);
+ if (lqs && !IS_ERR(lqs)) {
+ if (flag)
+ lqs->lqs_flags |= QB_SET;
+ else
+ lqs->lqs_flags &= ~QB_SET;
+ lqs_putref(lqs);
+ } else {
+ CERROR("fail to %s lqs for block(%s id: %u)!\n",
+ flag ? "create" : "search",
+ oqctl->qc_type ? "group" : "user",
+ oqctl->qc_id);
+ GOTO(out, rc = PTR_ERR(lqs));
}
rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, ioqc);
GOTO(out, rc);
}
- /* adjust all slave's qunit size when setting quota
- * this is will create a lqs for every ost, which will present
- * certain uid/gid is set quota or not */
- QAQ_SET_ADJBLK(oqaq);
- rc = obd_quota_adjust_qunit(mds->mds_osc_exp, oqaq, qctxt);
-
EXIT;
out:
OBD_FREE_PTR(ioqc);
return rc;
}
+static void adjust_lqs(struct obd_device *obd, struct quota_adjust_qunit *qaq)
+{
+ struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
+ int rc = 0;
+
+ QAQ_SET_CREATE_LQS(qaq);
+ /* adjust local lqs */
+ rc = quota_adjust_slave_lqs(qaq, qctxt);
+ if (rc < 0)
+ CERROR("adjust master's qunit size failed!(rc=%d)\n", rc);
+
+ /* adjust remote lqs */
+ if (QAQ_IS_ADJBLK(qaq)) {
+ rc = obd_quota_adjust_qunit(obd->u.mds.mds_osc_exp, qaq, qctxt);
+ if (rc < 0)
+ CERROR("adjust slaves' qunit size failed!(rc=%d)\n", rc);
+
+ }
+}
+
int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl)
{
struct mds_obd *mds = &obd->u.mds;
}
up(&mds->mds_qonoff_sem);
+
+ adjust_lqs(obd, oqaq);
+
orig_set = ihardlimit || isoftlimit;
now_set = dqblk->dqb_ihardlimit || dqblk->dqb_isoftlimit;
if (dqblk->dqb_valid & QIF_ILIMITS && orig_set != now_set) {
down(&dquot->dq_sem);
dquot->dq_dqb.dqb_curinodes = 0;
up(&dquot->dq_sem);
- rc = mds_init_slave_ilimits(obd, oqctl, orig_set, oqaq);
+ rc = mds_init_slave_ilimits(obd, oqctl, orig_set);
if (rc) {
CERROR("init slave ilimits failed! (rc:%d)\n", rc);
goto revoke_out;
down(&dquot->dq_sem);
dquot->dq_dqb.dqb_curspace = 0;
up(&dquot->dq_sem);
- rc = mds_init_slave_blimits(obd, oqctl, orig_set, oqaq);
+ rc = mds_init_slave_blimits(obd, oqctl, orig_set);
if (rc) {
CERROR("init slave blimits failed! (rc:%d)\n", rc);
goto revoke_out;
CERROR("qmaster recovery failed! (id:%d type:%d"
" rc:%d)\n", dqid->di_id, type, rc);
free:
- kfree(dqid);
+ OBD_FREE_PTR(dqid);
}
}
class_decref(mds->mds_osc_obd, "qmaster_recovd_lov", mds->mds_osc_obd);
}
run_test_with_stat 27 "lfs quota/setquota should handle wrong arguments (19612) ================="
+test_28() {
+ BLK_LIMIT=$((100 * 1024 * 1024)) # 100G
+ echo "Step 1: set enough high limit for user [$TSTUSR:$BLK_LIMIT]"
+ $LFS setquota -u $TSTUSR -b 0 -B $BLK_LIMIT -i 0 -I 0 $DIR
+ $SHOW_QUOTA_USER
+
+ echo "Step 2: reset system ..."
+ cleanup_and_setup_lustre
+ test_0
+
+ echo "Step 3: change qunit for user [$TSTUSR:512:1024]"
+ set_blk_tunesz 512
+ set_blk_unitsz 1024
+
+ wait_delete_completed
+
+ #define OBD_FAIL_QUOTA_RET_QDATA | OBD_FAIL_ONCE
+ lustre_fail ost 0x80000A02
+
+ TESTFILE="$DIR/$tdir/$tfile"
+ mkdir -p $DIR/$tdir
+
+ BLK_LIMIT=$((100 * 1024)) # 100M
+ echo "Step 4: set enough high limit for user [$TSTUSR:$BLK_LIMIT]"
+ $LFS setquota -u $TSTUSR -b 0 -B $BLK_LIMIT -i 0 -I 0 $DIR
+ $SHOW_QUOTA_USER
+
+ touch $TESTFILE
+ chown $TSTUSR.$TSTUSR $TESTFILE
+
+ echo "Step 5: write the test file1 [10M] ..."
+ $RUNAS dd if=/dev/zero of=$TESTFILE bs=$BLK_SZ count=$(( 10 * 1024 )) \
+ || quota_error a $TSTUSR "write 10M file failure"
+ $SHOW_QUOTA_USER
+
+ rm -f $TESTFILE
+ sync; sleep 3; sync;
+
+ # make qd_count 64 bit
+ lustre_fail ost 0
+
+ set_blk_unitsz $((128 * 1024))
+ set_blk_tunesz $((128 * 1024 / 2))
+
+ resetquota -u $TSTUSR
+}
+run_test_with_stat 28 "test for consistency for qunit when setquota (18574) ==========="
+
# turn off quota
quota_fini()
{