#include "quota_internal.h"
#ifdef __KERNEL__
+
+static cfs_time_t last_print = 0;
+static spinlock_t last_print_lock = SPIN_LOCK_UNLOCKED;
+
static int filter_quota_setup(struct obd_device *obd)
{
int rc = 0;
static int filter_quota_setinfo(struct obd_export *exp, struct obd_device *obd)
{
struct obd_import *imp;
+ struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
+ ENTRY;
/* setup the quota context import */
spin_lock(&obd->u.obt.obt_qctxt.lqc_lock);
obd->u.obt.obt_qctxt.lqc_import = exp->exp_imp_reverse;
spin_unlock(&obd->u.obt.obt_qctxt.lqc_lock);
+ CDEBUG(D_QUOTA, "%s: lqc_import(%p) of obd(%p) is reactivated now, \n",
+ obd->obd_name,exp->exp_imp_reverse, obd);
/* make imp's connect flags equal relative exp's connect flags
* adding it to avoid the scan export list
(exp->exp_connect_flags &
(OBD_CONNECT_QUOTA64 | OBD_CONNECT_CHANGE_QS));
+ cfs_waitq_signal(&qctxt->lqc_wait_for_qmaster);
/* start quota slave recovery thread. (release high limits) */
qslave_start_recovery(obd, &obd->u.obt.obt_qctxt);
- return 0;
+ RETURN(0);
}
static int filter_quota_clearinfo(struct obd_export *exp, struct obd_device *obd)
{
struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
+ ENTRY;
/* lquota may be not set up before destroying export, b=14896 */
if (!obd->obd_set_up)
if (qctxt->lqc_import == exp->exp_imp_reverse) {
spin_lock(&qctxt->lqc_lock);
qctxt->lqc_import = NULL;
+ CDEBUG(D_QUOTA, "%s: lqc_import of obd(%p) is invalid now.\n",
+ obd->obd_name, obd);
spin_unlock(&qctxt->lqc_lock);
}
- return 0;
+ RETURN(0);
}
static int filter_quota_enforce(struct obd_device *obd, unsigned int ignore)
if (!sb_any_quota_enabled(obd->u.obt.obt_sb))
RETURN(0);
- if (ignore)
+ if (ignore) {
+ CDEBUG(D_QUOTA, "blocks will be written with ignoring quota.\n");
cap_raise(current->cap_effective, CAP_SYS_RESOURCE);
- else
+ } else {
cap_lower(current->cap_effective, CAP_SYS_RESOURCE);
+ }
RETURN(0);
}
}
static int filter_quota_acquire(struct obd_device *obd, unsigned int uid,
- unsigned int gid)
+ unsigned int gid, struct obd_trans_info *oti)
{
struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
int rc;
ENTRY;
- rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, LQUOTA_FLAGS_BLK, 1);
+ rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, LQUOTA_FLAGS_BLK, 1, oti);
RETURN(rc);
}
if (!sb_any_quota_enabled(qctxt->lqc_sb))
RETURN(rc);
+ spin_lock(&qctxt->lqc_lock);
+ if (!qctxt->lqc_valid){
+ spin_unlock(&qctxt->lqc_lock);
+ RETURN(rc);
+ }
+ spin_unlock(&qctxt->lqc_lock);
+
for (i = 0; i < MAXQUOTAS; i++) {
struct lustre_qunit_size *lqs = NULL;
lqs->lqs_iwrite_pending += count;
}
- CDEBUG(D_QUOTA, "write pending: %lu, qd_count: "LPU64".\n",
+ CDEBUG(D_QUOTA, "count: %d, write pending: %lu, qd_count: "LPU64
+ ".\n", count,
isblk ? lqs->lqs_bwrite_pending : lqs->lqs_iwrite_pending,
qdata[i].qd_count);
if (rc2[i] == QUOTA_RET_OK) {
static int quota_chk_acq_common(struct obd_device *obd, unsigned int uid,
unsigned int gid, int count, int *pending,
- int isblk, quota_acquire acquire)
+ int isblk, quota_acquire acquire,
+ struct obd_trans_info *oti)
{
struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
struct timeval work_start;
struct timeval work_end;
long timediff;
- int rc = 0, cycle = 0, count_err = 0;
+ struct l_wait_info lwi = { 0 };
+ int rc = 0, cycle = 0, count_err = 1;
ENTRY;
CDEBUG(D_QUOTA, "check quota for %s\n", obd->obd_name);
while ((rc = quota_check_common(obd, uid, gid, count, cycle, isblk)) &
QUOTA_RET_ACQUOTA) {
+ spin_lock(&qctxt->lqc_lock);
+ if (!qctxt->lqc_import && oti) {
+ spin_unlock(&qctxt->lqc_lock);
+
+ LASSERT(oti && oti->oti_thread &&
+ oti->oti_thread->t_watchdog);
+
+ lc_watchdog_disable(oti->oti_thread->t_watchdog);
+ CDEBUG(D_QUOTA, "sleep for quota master\n");
+ l_wait_event(qctxt->lqc_wait_for_qmaster, check_qm(qctxt),
+ &lwi);
+ CDEBUG(D_QUOTA, "wake up when quota master is back\n");
+ lc_watchdog_touch(oti->oti_thread->t_watchdog);
+ } else {
+ spin_unlock(&qctxt->lqc_lock);
+ }
+
if (rc & QUOTA_RET_INC_PENDING)
*pending = 1;
OBD_FAIL_TIMEOUT(OBD_FAIL_OST_HOLD_WRITE_RPC, 90);
/* after acquire(), we should run quota_check_common again
* so that we confirm there are enough quota to finish write */
- rc = acquire(obd, uid, gid);
+ rc = acquire(obd, uid, gid, oti);
/* please reference to dqacq_completion for the below */
/* a new request is finished, try again */
break;
}
- /* -EBUSY and others, try 10 times */
- if (rc < 0 && count_err < 10) {
- CDEBUG(D_QUOTA, "rc: %d, count_err: %d\n", rc, count_err++);
- cfs_schedule_timeout(CFS_TASK_INTERRUPTIBLE, HZ);
- continue;
+ /* -EBUSY and others, wait a second and try again */
+ if (rc < 0) {
+ cfs_waitq_t waitq;
+ struct l_wait_info lwi;
+
+ if (oti && oti->oti_thread && oti->oti_thread->t_watchdog)
+ lc_watchdog_touch(oti->oti_thread->t_watchdog);
+ CDEBUG(D_QUOTA, "rc: %d, count_err: %d\n", rc,
+ count_err++);
+
+ init_waitqueue_head(&waitq);
+ lwi = LWI_TIMEOUT(cfs_time_seconds(min(cycle, 10)), NULL,
+ NULL);
+ l_wait_event(waitq, 0, &lwi);
}
- if (count_err >= 10 || cycle >= 1000) {
- CDEBUG(D_ERROR, "we meet 10 errors or run too many"
- " cycles when acquiring quota, quit checking with"
- " rc: %d, cycle: %d.\n", rc, cycle);
- break;
+ if (rc < 0 || cycle % 10 == 2) {
+ spin_lock(&last_print_lock);
+ if (last_print == 0 ||
+ cfs_time_before((last_print + cfs_time_seconds(30)),
+ cfs_time_current())) {
+ CWARN("still haven't managed to acquire quota "
+ "space from the quota master after %d "
+ "retries (err=%d, rc=%d)\n",
+ cycle, count_err - 1, rc);
+ last_print = cfs_time_current();
+ }
+ spin_unlock(&last_print_lock);
}
CDEBUG(D_QUOTA, "recheck quota with rc: %d, cycle: %d\n", rc,
static int filter_quota_check(struct obd_device *obd, unsigned int uid,
unsigned int gid, int npage, int *flag,
- quota_acquire acquire)
+ quota_acquire acquire, struct obd_trans_info *oti)
{
return quota_chk_acq_common(obd, uid, gid, npage, flag, LQUOTA_FLAGS_BLK,
- acquire);
+ acquire, oti);
}
/* when a block_write or inode_create rpc is finished, adjust the record for
static int mds_quota_check(struct obd_device *obd, unsigned int uid,
unsigned int gid, int inodes, int *flag,
- quota_acquire acquire)
+ quota_acquire acquire, struct obd_trans_info *oti)
{
- return quota_chk_acq_common(obd, uid, gid, inodes, flag, 0, acquire);
+ return quota_chk_acq_common(obd, uid, gid, inodes, flag, 0,
+ acquire, oti);
}
static int mds_quota_acquire(struct obd_device *obd, unsigned int uid,
- unsigned int gid)
+ unsigned int gid, struct obd_trans_info *oti)
{
struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt;
int rc;
ENTRY;
- rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, 0, 1);
+ rc = qctxt_adjust_qunit(obd, qctxt, uid, gid, 0, 1, oti);
RETURN(rc);
}