I found a few issues after landing the patch for 18574.(some are caused by it, some are easier to
be reproduced by it)
1. in setquota, quota slaves will set their quota limitation to 1 and then get more quota. But in
this time, it's lqs size is the default value. If we have set a small quota limitation, we will
meet -EDQUOT. So we should adjust qunit firstly in this situation.
2. filter_quota_ctl(), we should promise there is no qunit is calculated or sent between
qctxt_wait_pending_dqacq() and fsfilt_quotactl(). Now I did this via a "fake" qunit.
b=18574
i=johann
i=fanyong
};
extern void lustre_swab_quota_adjust_qunit(struct quota_adjust_qunit *q);
-/* flags in qunit_data and quota_adjust_qunit will use macroes below */
+/* flags is shared among quota structures */
#define LQUOTA_FLAGS_GRP 1UL /* 0 is user, 1 is group */
#define LQUOTA_FLAGS_BLK 2UL /* 0 is inode, 1 is block */
#define LQUOTA_FLAGS_ADJBLK 4UL /* adjust the block qunit size */
#define LQUOTA_FLAGS_CHG_QS 16UL /* indicate whether it has capability of
* OBD_CONNECT_CHANGE_QS */
+/* flags is specific for quota_adjust_qunit */
+#define LQUOTA_QAQ_CEATE_LQS (1 << 31) /* when it is set, need create lqs */
+
/* the status of lqs_flags in struct lustre_qunit_size */
#define LQUOTA_QUNIT_FLAGS (LQUOTA_FLAGS_GRP | LQUOTA_FLAGS_BLK)
-#define QAQ_IS_GRP(qaq) ((qaq)->qaq_flags & LQUOTA_FLAGS_GRP)
-#define QAQ_IS_ADJBLK(qaq) ((qaq)->qaq_flags & LQUOTA_FLAGS_ADJBLK)
-#define QAQ_IS_ADJINO(qaq) ((qaq)->qaq_flags & LQUOTA_FLAGS_ADJINO)
+#define QAQ_IS_GRP(qaq) ((qaq)->qaq_flags & LQUOTA_FLAGS_GRP)
+#define QAQ_IS_ADJBLK(qaq) ((qaq)->qaq_flags & LQUOTA_FLAGS_ADJBLK)
+#define QAQ_IS_ADJINO(qaq) ((qaq)->qaq_flags & LQUOTA_FLAGS_ADJINO)
+#define QAQ_IS_CREATE_LQS(qaq) ((qaq)->qaq_flags & LQUOTA_QAQ_CEATE_LQS)
-#define QAQ_SET_GRP(qaq) ((qaq)->qaq_flags |= LQUOTA_FLAGS_GRP)
-#define QAQ_SET_ADJBLK(qaq) ((qaq)->qaq_flags |= LQUOTA_FLAGS_ADJBLK)
-#define QAQ_SET_ADJINO(qaq) ((qaq)->qaq_flags |= LQUOTA_FLAGS_ADJINO)
+#define QAQ_SET_GRP(qaq) ((qaq)->qaq_flags |= LQUOTA_FLAGS_GRP)
+#define QAQ_SET_ADJBLK(qaq) ((qaq)->qaq_flags |= LQUOTA_FLAGS_ADJBLK)
+#define QAQ_SET_ADJINO(qaq) ((qaq)->qaq_flags |= LQUOTA_FLAGS_ADJINO)
+#define QAQ_SET_CREATE_LQS(qaq) ((qaq)->qaq_flags |= LQUOTA_QAQ_CEATE_LQS)
struct mds_rec_setattr {
__u32 sa_opcode;
struct lustre_quota_ctxt *qctxt)
{
struct lustre_qunit_size *lqs = NULL;
- unsigned long *lbunit, *liunit, *lbtune, *litune;
- signed long b_tmp = 0, i_tmp = 0;
- cfs_time_t time_limit = 0;
- int rc = 0;
+ unsigned long *unit, *tune;
+ signed long tmp = 0;
+ cfs_time_t time_limit = 0, *shrink;
+ int i, rc = 0;
ENTRY;
LASSERT(qctxt);
lqs = quota_search_lqs(LQS_KEY(QAQ_IS_GRP(oqaq), oqaq->qaq_id),
- qctxt, 0);
+ qctxt, QAQ_IS_CREATE_LQS(oqaq) ? 1 : 0);
if (lqs == NULL || IS_ERR(lqs)){
CDEBUG(D_ERROR, "fail to find a lqs(%s id: %u)!\n",
QAQ_IS_GRP(oqaq) ? "group" : "user", oqaq->qaq_id);
RETURN(0);
}
- lbunit = &lqs->lqs_bunit_sz;
- liunit = &lqs->lqs_iunit_sz;
- lbtune = &lqs->lqs_btune_sz;
- litune = &lqs->lqs_itune_sz;
-
+ CDEBUG(D_QUOTA, "before: bunit: %lu, iunit: %lu.\n",
+ lqs->lqs_bunit_sz, lqs->lqs_iunit_sz);
spin_lock(&lqs->lqs_lock);
- CDEBUG(D_QUOTA, "before: bunit: %lu, iunit: %lu.\n", *lbunit, *liunit);
- /* adjust the slave's block qunit size */
- if (QAQ_IS_ADJBLK(oqaq)) {
- cfs_duration_t sec = cfs_time_seconds(qctxt->lqc_switch_seconds);
-
- b_tmp = *lbunit - oqaq->qaq_bunit_sz;
-
- if (qctxt->lqc_handler && b_tmp > 0)
- lqs->lqs_last_bshrink = cfs_time_current();
-
- if (qctxt->lqc_handler && b_tmp < 0) {
- time_limit = cfs_time_add(lqs->lqs_last_bshrink, sec);
- if (!lqs->lqs_last_bshrink ||
- cfs_time_after(cfs_time_current(), time_limit)) {
- *lbunit = oqaq->qaq_bunit_sz;
- *lbtune = (*lbunit) / 2;
- } else {
- b_tmp = 0;
- }
- } else {
- *lbunit = oqaq->qaq_bunit_sz;
- *lbtune = (*lbunit) / 2;
+ for (i = 0; i < 2; i++) {
+ if (i == 0 && !QAQ_IS_ADJBLK(oqaq))
+ continue;
+
+ if (i == 1 && !QAQ_IS_ADJINO(oqaq))
+ continue;
+
+ tmp = i ? (lqs->lqs_iunit_sz - oqaq->qaq_iunit_sz) :
+ (lqs->lqs_bunit_sz - oqaq->qaq_bunit_sz);
+ shrink = i ? &lqs->lqs_last_ishrink :
+ &lqs->lqs_last_bshrink;
+ time_limit = cfs_time_add(i ? lqs->lqs_last_ishrink :
+ lqs->lqs_last_bshrink,
+ cfs_time_seconds(qctxt->lqc_switch_seconds));
+ unit = i ? &lqs->lqs_iunit_sz : &lqs->lqs_bunit_sz;
+ tune = i ? &lqs->lqs_itune_sz : &lqs->lqs_btune_sz;
+
+ /* quota master shrinks */
+ if (qctxt->lqc_handler && tmp > 0)
+ *shrink = cfs_time_current();
+
+ /* quota master enlarges */
+ if (qctxt->lqc_handler && tmp < 0) {
+ /* in case of ping-pong effect, don't enlarge lqs
+ * in a short time */
+ if (*shrink &&
+ cfs_time_before(cfs_time_current(), time_limit))
+ tmp = 0;
}
- }
- /* adjust the slave's file qunit size */
- if (QAQ_IS_ADJINO(oqaq)) {
- i_tmp = *liunit - oqaq->qaq_iunit_sz;
-
- if (qctxt->lqc_handler && i_tmp > 0)
- lqs->lqs_last_ishrink = cfs_time_current();
-
- if (qctxt->lqc_handler && i_tmp < 0) {
- time_limit = cfs_time_add(lqs->lqs_last_ishrink,
- cfs_time_seconds(qctxt->
- lqc_switch_seconds));
- if (!lqs->lqs_last_ishrink ||
- cfs_time_after(cfs_time_current(), time_limit)) {
- *liunit = oqaq->qaq_iunit_sz;
- *litune = (*liunit) / 2;
- } else {
- i_tmp = 0;
- }
- } else {
- *liunit = oqaq->qaq_iunit_sz;
- *litune = (*liunit) / 2;
+ /* when setquota, don't enlarge lqs b=18616 */
+ if (QAQ_IS_CREATE_LQS(oqaq) && tmp < 0)
+ tmp = 0;
+
+ if (tmp != 0) {
+ *unit = i ? oqaq->qaq_iunit_sz : oqaq->qaq_bunit_sz;
+ *tune = (*unit) / 2;
}
+
+
+ if (tmp > 0)
+ rc |= i ? LQS_INO_DECREASE : LQS_BLK_DECREASE;
+ if (tmp < 0)
+ rc |= i ? LQS_INO_INCREASE : LQS_BLK_INCREASE;
}
- CDEBUG(D_QUOTA, "after: bunit: %lu, iunit: %lu.\n", *lbunit, *liunit);
spin_unlock(&lqs->lqs_lock);
+ CDEBUG(D_QUOTA, "after: bunit: %lu, iunit: %lu.\n",
+ lqs->lqs_bunit_sz, lqs->lqs_iunit_sz);
lqs_putref(lqs);
- if (b_tmp > 0)
- rc |= LQS_BLK_DECREASE;
- else if (b_tmp < 0)
- rc |= LQS_BLK_INCREASE;
-
- if (i_tmp > 0)
- rc |= LQS_INO_DECREASE;
- else if (i_tmp < 0)
- rc |= LQS_INO_INCREASE;
RETURN(rc);
}
spin_unlock(&qunit->lq_lock); \
} while(0)
-
int should_translate_quota (struct obd_import *imp)
{
ENTRY;
qunit_put(qunit);
}
+void* quota_barrier(struct lustre_quota_ctxt *qctxt,
+ struct obd_quotactl *oqctl, int isblk)
+{
+ struct lustre_qunit *qunit, *find_qunit;
+ int cycle = 1;
+
+ OBD_SLAB_ALLOC(qunit, qunit_cachep, CFS_ALLOC_IO, sizeof(*qunit));
+ if (qunit == NULL) {
+ CERROR("locating qunit failed.(id=%u isblk=%d %s)\n",
+ oqctl->qc_id, isblk, oqctl->qc_type ? "grp" : "usr");
+ qctxt_wait_pending_dqacq(qctxt, oqctl->qc_id,
+ oqctl->qc_type, isblk);
+ return NULL;
+ }
+
+ INIT_LIST_HEAD(&qunit->lq_hash);
+ qunit->lq_lock = SPIN_LOCK_UNLOCKED;
+ init_waitqueue_head(&qunit->lq_waitq);
+ atomic_set(&qunit->lq_refcnt, 1);
+ qunit->lq_ctxt = qctxt;
+ qunit->lq_data.qd_id = oqctl->qc_id;
+ qunit->lq_data.qd_flags = oqctl->qc_type;
+ if (isblk)
+ QDATA_SET_BLK(&qunit->lq_data);
+ QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0);
+ /* it means it is only an invalid qunit for barrier */
+ qunit->lq_opc = QUOTA_LAST_OPC;
+
+ while (1) {
+ spin_lock(&qunit_hash_lock);
+ find_qunit = dqacq_in_flight(qctxt, &qunit->lq_data);
+ if (find_qunit) {
+ spin_unlock(&qunit_hash_lock);
+ qunit_put(find_qunit);
+ qctxt_wait_pending_dqacq(qctxt, oqctl->qc_id,
+ oqctl->qc_type, isblk);
+ CDEBUG(D_QUOTA, "cycle=%d\n", cycle++);
+ continue;
+ }
+ break;
+ }
+ insert_qunit_nolock(qctxt, qunit);
+ spin_unlock(&qunit_hash_lock);
+ return qunit;
+}
+
+void quota_unbarrier(void *handle)
+{
+ struct lustre_qunit *qunit = (struct lustre_qunit *)handle;
+
+ if (qunit == NULL) {
+ CERROR("handle is NULL\n");
+ return;
+ }
+
+ LASSERT(qunit->lq_opc == QUOTA_LAST_OPC);
+ spin_lock(&qunit_hash_lock);
+ remove_qunit_nolock(qunit);
+ spin_unlock(&qunit_hash_lock);
+ QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, QUOTA_REQ_RETURNED);
+ wake_up(&qunit->lq_waitq);
+ qunit_put(qunit);
+}
+
#define INC_QLIMIT(limit, count) (limit == MIN_QLIMIT) ? \
(limit = count) : (limit += count)
struct lvfs_run_ctxt saved;
struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
struct lustre_qunit_size *lqs;
+ void *handle = NULL;
struct timeval work_start;
struct timeval work_end;
long timediff;
* on master enter recovery mode. We must wait for this
* dqacq/dqrel done then return the correct limits to master */
if (oqctl->qc_stat == QUOTA_RECOVERING)
- qctxt_wait_pending_dqacq(&obd->u.obt.obt_qctxt,
- oqctl->qc_id, oqctl->qc_type,
- 1);
+ handle = quota_barrier(&obd->u.obt.obt_qctxt, oqctl, 1);
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
rc = fsfilt_quotactl(obd, obt->obt_sb, oqctl);
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ if (oqctl->qc_stat == QUOTA_RECOVERING)
+ quota_unbarrier(handle);
+
if (oqctl->qc_cmd == Q_QUOTAON || oqctl->qc_cmd == Q_QUOTAOFF ||
oqctl->qc_cmd == Q_FINVALIDATE) {
if (!rc && oqctl->qc_cmd == Q_QUOTAON)
break;
case Q_SETQUOTA:
/* currently, it is only used for nullifying the quota */
- qctxt_wait_pending_dqacq(&obd->u.obt.obt_qctxt,
- oqctl->qc_id, oqctl->qc_type, 1);
+ handle = quota_barrier(&obd->u.obt.obt_qctxt, oqctl, 1);
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
oqctl->qc_cmd = Q_SETQUOTA;
}
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ quota_unbarrier(handle);
lqs = quota_search_lqs(LQS_KEY(oqctl->qc_type, oqctl->qc_id),
qctxt, 0);
LASSERT(oqctl->qc_dqblk.dqb_valid == QIF_BLIMITS);
LASSERT(oqctl->qc_dqblk.dqb_bsoftlimit == 0);
- /* There might be a pending dqacq/dqrel (which is going to
- * clear stale limits on slave). we should wait for it's
- * completion then initialize limits */
- qctxt_wait_pending_dqacq(&obd->u.obt.obt_qctxt,
- oqctl->qc_id, oqctl->qc_type, 1);
-
if (!oqctl->qc_dqblk.dqb_bhardlimit)
goto adjust;
+ /* There might be a pending dqacq/dqrel (which is going to
+ * clear stale limits on slave). we should wait for it's
+ * completion then initialize limits */
+ handle = quota_barrier(&obd->u.obt.obt_qctxt, oqctl, 1);
LASSERT(oqctl->qc_dqblk.dqb_bhardlimit == MIN_QLIMIT);
push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl);
oqctl->qc_cmd = Q_INITQUOTA;
}
pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL);
+ quota_unbarrier(handle);
if (rc)
RETURN(rc);
int isblk);
int check_qm(struct lustre_quota_ctxt *qctxt);
void dqacq_interrupt(struct lustre_quota_ctxt *qctxt);
+void* quota_barrier(struct lustre_quota_ctxt *qctxt,
+ struct obd_quotactl *oqctl, int isblk);
+void quota_unbarrier(void *handle);
/* quota_master.c */
int lustre_dquot_init(void);
void lustre_dquot_exit(void);
}
static int mds_init_slave_ilimits(struct obd_device *obd,
- struct obd_quotactl *oqctl, int set,
- struct quota_adjust_qunit *oqaq)
+ struct obd_quotactl *oqctl, int set)
{
/* XXX: for file limits only adjust local now */
struct obd_device_target *obt = &obd->u.obt;
GOTO(out, rc = PTR_ERR(lqs));
}
- if (QAQ_IS_ADJINO(oqaq)) {
- /* adjust the mds slave's inode qunit size */
- rc = quota_adjust_slave_lqs(oqaq, qctxt);
- if (rc < 0)
- CDEBUG(D_ERROR, "adjust mds slave's inode qunit size \
- failed! (rc:%d)\n", rc);
- }
-
/* set local limit to MIN_QLIMIT */
rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, ioqc);
if (rc)
}
static int mds_init_slave_blimits(struct obd_device *obd,
- struct obd_quotactl *oqctl, int set,
- struct quota_adjust_qunit *oqaq)
+ struct obd_quotactl *oqctl, int set)
{
struct obd_device_target *obt = &obd->u.obt;
struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt;
struct obd_quotactl *ioqc;
struct lustre_qunit_size *lqs;
unsigned int uid = 0, gid = 0;
- int rc, rc1 = 0;
+ int rc;
int flag;
ENTRY;
GOTO(out, rc = PTR_ERR(lqs));
}
- if (QAQ_IS_ADJBLK(oqaq)) {
- /* adjust the mds slave's block qunit size */
- rc1 = quota_adjust_slave_lqs(oqaq, qctxt);
- if (rc1 < 0)
- CERROR("adjust mds slave's block qunit size failed!"
- "(rc:%d)\n", rc1);
- }
-
rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, ioqc);
if (rc)
GOTO(out, rc);
GOTO(out, rc);
}
- /* adjust all slave's qunit size when setting quota
- * this is will create a lqs for every ost, which will present
- * certain uid/gid is set quota or not */
- QAQ_SET_ADJBLK(oqaq);
- rc = obd_quota_adjust_qunit(mds->mds_osc_exp, oqaq, qctxt);
-
EXIT;
out:
OBD_FREE_PTR(ioqc);
return rc;
}
+static void adjust_lqs(struct obd_device *obd, struct quota_adjust_qunit *qaq)
+{
+ struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
+ int rc = 0;
+
+ QAQ_SET_CREATE_LQS(qaq);
+ /* adjust local lqs */
+ rc = quota_adjust_slave_lqs(qaq, qctxt);
+ if (rc < 0)
+ CERROR("adjust master's qunit size failed!(rc=%d)\n", rc);
+
+ /* adjust remote lqs */
+ if (QAQ_IS_ADJBLK(qaq)) {
+ rc = obd_quota_adjust_qunit(obd->u.mds.mds_osc_exp, qaq, qctxt);
+ if (rc < 0)
+ CERROR("adjust slaves' qunit size failed!(rc=%d)\n", rc);
+
+ }
+}
+
int mds_set_dqblk(struct obd_device *obd, struct obd_quotactl *oqctl)
{
struct mds_obd *mds = &obd->u.mds;
}
up(&mds->mds_qonoff_sem);
+
+ adjust_lqs(obd, oqaq);
+
orig_set = ihardlimit || isoftlimit;
now_set = dqblk->dqb_ihardlimit || dqblk->dqb_isoftlimit;
if (dqblk->dqb_valid & QIF_ILIMITS && orig_set != now_set) {
down(&dquot->dq_sem);
dquot->dq_dqb.dqb_curinodes = 0;
up(&dquot->dq_sem);
- rc = mds_init_slave_ilimits(obd, oqctl, orig_set, oqaq);
+ rc = mds_init_slave_ilimits(obd, oqctl, orig_set);
if (rc) {
CERROR("init slave ilimits failed! (rc:%d)\n", rc);
goto revoke_out;
down(&dquot->dq_sem);
dquot->dq_dqb.dqb_curspace = 0;
up(&dquot->dq_sem);
- rc = mds_init_slave_blimits(obd, oqctl, orig_set, oqaq);
+ rc = mds_init_slave_blimits(obd, oqctl, orig_set);
if (rc) {
CERROR("init slave blimits failed! (rc:%d)\n", rc);
goto revoke_out;
}
run_test_with_stat 27 "lfs quota/setquota should handle wrong arguments (19612) ================="
+test_28() {
+ BLK_LIMIT=$((100 * 1024 * 1024)) # 100G
+ echo "Step 1: set enough high limit for user [$TSTUSR:$BLK_LIMIT]"
+ $LFS setquota -u $TSTUSR -b 0 -B $BLK_LIMIT -i 0 -I 0 $DIR
+ $SHOW_QUOTA_USER
+
+ echo "Step 2: reset system ..."
+ cleanup_and_setup_lustre
+ test_0
+
+ echo "Step 3: change qunit for user [$TSTUSR:512:1024]"
+ set_blk_tunesz 512
+ set_blk_unitsz 1024
+
+ wait_delete_completed
+
+ #define OBD_FAIL_QUOTA_RET_QDATA | OBD_FAIL_ONCE
+ lustre_fail ost 0x80000A02
+
+ TESTFILE="$DIR/$tdir/$tfile"
+ mkdir -p $DIR/$tdir
+
+ BLK_LIMIT=$((100 * 1024)) # 100M
+ echo "Step 4: set enough high limit for user [$TSTUSR:$BLK_LIMIT]"
+ $LFS setquota -u $TSTUSR -b 0 -B $BLK_LIMIT -i 0 -I 0 $DIR
+ $SHOW_QUOTA_USER
+
+ touch $TESTFILE
+ chown $TSTUSR.$TSTUSR $TESTFILE
+
+ echo "Step 5: write the test file1 [10M] ..."
+ $RUNAS dd if=/dev/zero of=$TESTFILE bs=$BLK_SZ count=$(( 10 * 1024 )) \
+ || quota_error a $TSTUSR "write 10M file failure"
+ $SHOW_QUOTA_USER
+
+ rm -f $TESTFILE
+ sync; sleep 3; sync;
+
+ # make qd_count 64 bit
+ lustre_fail ost 0
+
+ set_blk_unitsz $((128 * 1024))
+ set_blk_tunesz $((128 * 1024 / 2))
+
+ resetquota -u $TSTUSR
+}
+run_test_with_stat 28 "test for consistency for qunit when setquota (18574) ==========="
+
# turn off quota
test_99()
{