After an unnecessary block in osts for waiting in-flight quota request is
deleted, a few places should be fixed in order to work with it.
b=14783
i=johann
i=andrew.perepechko
gid = dparent->d_inode->i_gid;
else
gid = current->fsgid;
gid = dparent->d_inode->i_gid;
else
gid = current->fsgid;
- rc = lquota_chkquota(mds_quota_interface_ref, obd,
- current->fsuid, gid, 1, &rec_pending);
- if (rc < 0)
- GOTO(cleanup, rc);
+ /* we try to get enough quota to write here, and let ldiskfs
+ * decide if it is out of quota or not b=14783 */
+ lquota_chkquota(mds_quota_interface_ref, obd,
+ current->fsuid, gid, 1, &rec_pending);
intent_set_disposition(rep, DISP_OPEN_CREATE);
handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_CREATE,
intent_set_disposition(rep, DISP_OPEN_CREATE);
handle = fsfilt_start(obd, dparent->d_inode, FSFILT_OP_CREATE,
else
gid = current->fsgid;
else
gid = current->fsgid;
- rc = lquota_chkquota(mds_quota_interface_ref, obd,
- current->fsuid, gid, 1, &rec_pending);
-
- if (rc < 0)
- GOTO(cleanup, rc);
+ /* we try to get enough quota to write here, and let ldiskfs
+ * decide if it is out of quota or not b=14783 */
+ lquota_chkquota(mds_quota_interface_ref, obd,
+ current->fsuid, gid, 1, &rec_pending);
switch (type) {
case S_IFREG:{
switch (type) {
case S_IFREG:{
int i, err, cleanup_phase = 0;
struct obd_device *obd = exp->exp_obd;
void *wait_handle;
int i, err, cleanup_phase = 0;
struct obd_device *obd = exp->exp_obd;
void *wait_handle;
- int total_size = 0, rc2 = 0;
int rec_pending = 0;
unsigned int qcids[MAXQUOTAS] = {0, 0};
ENTRY;
int rec_pending = 0;
unsigned int qcids[MAXQUOTAS] = {0, 0};
ENTRY;
if (rc != 0)
GOTO(cleanup, rc);
if (rc != 0)
GOTO(cleanup, rc);
- /* Unfortunately, if quota master is too busy to handle the
- * pre-dqacq in time and quota hash on ost is used up, we
- * have to wait for the completion of in flight dqacq/dqrel,
- * in order not to get enough quota for write b=12588 */
- rc2 = lquota_chkquota(filter_quota_interface_ref, obd, oa->o_uid,
- oa->o_gid, niocount, &rec_pending);
-
- if (rc2 < 0)
- GOTO(cleanup, rc = rc2);
+ /* we try to get enough quota to write here, and let ldiskfs
+ * decide if it is out of quota or not b=14783 */
+ lquota_chkquota(filter_quota_interface_ref, obd, oa->o_uid,
+ oa->o_gid, niocount, &rec_pending);
iobuf = filter_iobuf_get(&obd->u.filter, oti);
if (IS_ERR(iobuf))
iobuf = filter_iobuf_get(&obd->u.filter, oti);
if (IS_ERR(iobuf))
if (rc > 0) {
rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, 1, 0);
if (rc > 0) {
rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, 1, 0);
+ if (rc == -EDQUOT || rc == -EBUSY) {
+ CDEBUG(D_QUOTA, "rc: %d.\n", rc);
+ rc = 0;
+ }
if (rc)
CERROR("slave adjust block quota failed!(rc:%d)\n", rc);
}
if (rc)
CERROR("slave adjust block quota failed!(rc:%d)\n", rc);
}
record, qunit_sz, tune_sz, ret);
LASSERT(ret == 0 || qdata->qd_count);
record, qunit_sz, tune_sz, ret);
LASSERT(ret == 0 || qdata->qd_count);
- if (ret > 0) {
- quota_compute_lqs(qdata, lqs, 1, (ret == 1) ? 1 : 0);
- /* when this qdata returned from mds, it will call lqs_putref */
- lqs_getref(lqs);
- }
-
spin_unlock(&lqs->lqs_lock);
lqs_putref(lqs);
EXIT;
spin_unlock(&lqs->lqs_lock);
lqs_putref(lqs);
EXIT;
spin_unlock(&lqs->lqs_lock);
/* this is for quota_search_lqs */
lqs_putref(lqs);
spin_unlock(&lqs->lqs_lock);
/* this is for quota_search_lqs */
lqs_putref(lqs);
- /* this is for check_cur_qunit */
+ /* this is for schedule_dqacq */
qdata->qd_count > factor) {
tmp_qdata = *qdata;
tmp_qdata.qd_count = factor;
qdata->qd_count > factor) {
tmp_qdata = *qdata;
tmp_qdata.qd_count = factor;
- qdata->qd_count -= tmp_qdata.qd_count;
+ qdata->qd_count -= tmp_qdata.qd_count;
QDATA_DEBUG((&tmp_qdata), "be split.\n");
rc = schedule_dqacq(obd, qctxt, &tmp_qdata, opc, wait);
} else{
QDATA_DEBUG((&tmp_qdata), "be split.\n");
rc = schedule_dqacq(obd, qctxt, &tmp_qdata, opc, wait);
} else{
int size[2] = { sizeof(struct ptlrpc_body), 0 };
struct obd_import *imp = NULL;
unsigned long factor;
int size[2] = { sizeof(struct ptlrpc_body), 0 };
struct obd_import *imp = NULL;
unsigned long factor;
+ struct lustre_qunit_size *lqs = NULL;
qunit = dqacq_in_flight(qctxt, qdata);
if (qunit) {
qunit = dqacq_in_flight(qctxt, qdata);
if (qunit) {
- struct lustre_qunit_size *lqs = NULL;
-
if (wait)
list_add_tail(&qw.qw_entry, &qunit->lq_waiters);
spin_unlock(&qunit_hash_lock);
free_qunit(empty);
if (wait)
list_add_tail(&qw.qw_entry, &qunit->lq_waiters);
spin_unlock(&qunit_hash_lock);
free_qunit(empty);
- quota_search_lqs(qdata, NULL, qctxt, &lqs);
- if (lqs) {
- spin_lock(&lqs->lqs_lock);
- quota_compute_lqs(qdata, lqs, 0,
- (opc == QUOTA_DQACQ) ? 1 : 0);
- spin_unlock(&lqs->lqs_lock);
- /* this is for quota_search_lqs */
- lqs_putref(lqs);
- /* this is for check_cur_qunit */
- lqs_putref(lqs);
- } else {
- CDEBUG(D_ERROR, "Can't find the lustre qunit size!\n");
- }
-
goto wait_completion;
}
qunit = empty;
goto wait_completion;
}
qunit = empty;
+ quota_search_lqs(qdata, NULL, qctxt, &lqs);
+ if (lqs) {
+ spin_lock(&lqs->lqs_lock);
+ quota_compute_lqs(qdata, lqs, 1, (opc == QUOTA_DQACQ) ? 1 : 0);
+ /* when this qdata returned from mds, it will call lqs_putref */
+ lqs_getref(lqs);
+ spin_unlock(&lqs->lqs_lock);
+ /* this is for quota_search_lqs */
+ lqs_putref(lqs);
+ } else {
+ CDEBUG(D_ERROR, "Can't find the lustre qunit size!\n");
+ }
+
QDATA_DEBUG(qdata, "obd(%s): send %s quota req\n",
obd->obd_name, (opc == QUOTA_DQACQ) ? "acq" : "rel");
/* master is going to dqacq/dqrel from itself */
QDATA_DEBUG(qdata, "obd(%s): send %s quota req\n",
obd->obd_name, (opc == QUOTA_DQACQ) ? "acq" : "rel");
/* master is going to dqacq/dqrel from itself */
opc == QUOTA_DQACQ ? "DQACQ" : "DQREL");
rc = qctxt->lqc_handler(obd, qdata, opc);
rc2 = dqacq_completion(obd, qctxt, qdata, rc, opc);
opc == QUOTA_DQACQ ? "DQACQ" : "DQREL");
rc = qctxt->lqc_handler(obd, qdata, opc);
rc2 = dqacq_completion(obd, qctxt, qdata, rc, opc);
- RETURN((rc && rc != -EDQUOT) ? rc : rc2);
}
spin_lock(&qctxt->lqc_lock);
}
spin_lock(&qctxt->lqc_lock);
QDATA_DEBUG(p, "wait for dqacq.\n");
l_wait_event(qw.qw_waitq, got_qunit(&qw), &lwi);
QDATA_DEBUG(p, "wait for dqacq.\n");
l_wait_event(qw.qw_waitq, got_qunit(&qw), &lwi);
+ /* rc = -EAGAIN, it means a quota req is finished;
+ * rc = -EDQUOT, it means out of quota
+ * rc = -EBUSY, it means recovery is happening
+ * other rc < 0, it means real errors, functions who call
+ * schedule_dqacq should take care of this */
if (qw.qw_rc == 0)
rc = -EAGAIN;
if (qw.qw_rc == 0)
rc = -EAGAIN;
CDEBUG(D_QUOTA, "wait dqacq done. (rc:%d)\n", qw.qw_rc);
}
CDEBUG(D_QUOTA, "wait dqacq done. (rc:%d)\n", qw.qw_rc);
}
opc, wait);
if (!rc)
rc = ret;
opc, wait);
if (!rc)
rc = ret;
+ } else if (wait == 1) {
+ /* when wait equates 1, that means mds_quota_acquire
+ * or filter_quota_acquire is calling it. */
+ qctxt_wait_pending_dqacq(qctxt, id[i], i, isblk);
rc = split_before_schedule_dqacq(obd, qctxt,
&qdata, opc,
0);
rc = split_before_schedule_dqacq(obd, qctxt,
&qdata, opc,
0);
+ if (rc == -EDQUOT)
+ rc = 0;
rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt,
uid, gid, 1, 0);
rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt,
uid, gid, 1, 0);
+ if (rc == -EDQUOT || rc == -EBUSY) {
+ CDEBUG(D_QUOTA, "rc: %d.\n", rc);
+ rc = 0;
+ }
+
}
spin_unlock(&lqs->lqs_lock);
}
spin_unlock(&lqs->lqs_lock);
/* When cycle is zero, lqs_*_pending will be changed. We will
/* When cycle is zero, lqs_*_pending will be changed. We will
- * putref lqs in quota_pending_commit instead of here b=14784 */
- if (cycle)
- lqs_putref(lqs);
+ * get reference of the lqs here and put reference of lqs in
+ * quota_pending_commit b=14784 */
+ if (!cycle)
+ lqs_getref(lqs);
+
+ /* this is for quota_search_lqs */
+ lqs_putref(lqs);
}
if (rc2[0] == QUOTA_RET_ACQUOTA || rc2[1] == QUOTA_RET_ACQUOTA)
}
if (rc2[0] == QUOTA_RET_ACQUOTA || rc2[1] == QUOTA_RET_ACQUOTA)
unsigned int gid, int count, int *pending,
int isblk, quota_acquire acquire)
{
unsigned int gid, int count, int *pending,
int isblk, quota_acquire acquire)
{
+ int rc = 0, cycle = 0, count_err = 0;
+ /* Unfortunately, if quota master is too busy to handle the
+ * pre-dqacq in time and quota hash on ost is used up, we
+ * have to wait for the completion of in flight dqacq/dqrel,
+ * in order to get enough quota for write b=12588 */
while ((rc = quota_check_common(obd, uid, gid, count, cycle, isblk)) &
QUOTA_RET_ACQUOTA) {
while ((rc = quota_check_common(obd, uid, gid, count, cycle, isblk)) &
QUOTA_RET_ACQUOTA) {
cycle++;
if (isblk)
OBD_FAIL_TIMEOUT(OBD_FAIL_OST_HOLD_WRITE_RPC, 90);
cycle++;
if (isblk)
OBD_FAIL_TIMEOUT(OBD_FAIL_OST_HOLD_WRITE_RPC, 90);
+ /* after acquire(), we should run quota_check_common again
+ * so that we confirm there are enough quota to finish write */
rc = acquire(obd, uid, gid);
/* please reference to dqacq_completion for the below */
rc = acquire(obd, uid, gid);
/* please reference to dqacq_completion for the below */
}
/* -EBUSY and others, try 10 times */
}
/* -EBUSY and others, try 10 times */
- if (rc < 0 && cycle < 10) {
- CDEBUG(D_QUOTA, "rc: %d, cycle: %d\n", rc, cycle);
+ if (rc < 0 && count_err < 10) {
+ CDEBUG(D_QUOTA, "rc: %d, count_err: %d\n", rc, count_err++);
cfs_schedule_timeout(CFS_TASK_INTERRUPTIBLE, HZ);
continue;
}
cfs_schedule_timeout(CFS_TASK_INTERRUPTIBLE, HZ);
continue;
}
- CDEBUG(D_QUOTA, "exit with rc: %d\n", rc);
- break;
+ if (count_err >= 10 || cycle >= 1000) {
+ CDEBUG(D_ERROR, "we meet 10 errors or run too many"
+ " cycles when acquiring quota, quit checking with"
+ " rc: %d, cycle: %d.\n", rc, cycle);
+ break;
+ }
+
+ CDEBUG(D_QUOTA, "recheck quota with rc: %d, cycle: %d\n", rc,
+ cycle);
}
if (!cycle && rc & QUOTA_RET_INC_PENDING)
}
if (!cycle && rc & QUOTA_RET_INC_PENDING)
up(&dquot->dq_sem);
rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, is_blk, 0);
up(&dquot->dq_sem);
rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, is_blk, 0);
+ if (rc == -EDQUOT || rc == -EBUSY) {
+ CDEBUG(D_QUOTA, "rc: %d.\n", rc);
+ rc = 0;
+ }
if (rc) {
CDEBUG(D_ERROR, "mds fail to adjust file quota! \
(rc:%d)\n", rc);
if (rc) {
CDEBUG(D_ERROR, "mds fail to adjust file quota! \
(rc:%d)\n", rc);
gid = oqctl->qc_id;
rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, uid, gid, 0, 0);
gid = oqctl->qc_id;
rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, uid, gid, 0, 0);
+ if (rc == -EDQUOT || rc == -EBUSY) {
+ CDEBUG(D_QUOTA, "rc: %d.\n", rc);
+ rc = 0;
+ }
if (rc) {
CDEBUG(D_QUOTA,"error mds adjust local file quota! (rc:%d)\n",
rc);
if (rc) {
CDEBUG(D_QUOTA,"error mds adjust local file quota! (rc:%d)\n",
rc);
rc = obd_quotactl(mds->mds_osc_exp, ioqc);
rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, uid, gid, 1, 0);
rc = obd_quotactl(mds->mds_osc_exp, ioqc);
rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, uid, gid, 1, 0);
+ if (rc == -EDQUOT || rc == -EBUSY) {
+ CDEBUG(D_QUOTA, "rc: %d.\n", rc);
+ rc = 0;
+ }
if (rc) {
CERROR("error mds adjust local block quota! (rc:%d)\n", rc);
GOTO(out, rc);
if (rc) {
CERROR("error mds adjust local block quota! (rc:%d)\n", rc);
GOTO(out, rc);