X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fquota%2Fquota_ctl.c;h=8891c9d2d30b05fdde71ed4a0e0b6251542183fe;hb=146676af22f226ddd8b5eb09d509e831123cc4ea;hp=ad4f9a40cba808ca46bb5696bb0ff9b57e8d9e6b;hpb=52133d804b2af4279fc8aa51eab09d0f231dce96;p=fs%2Flustre-release.git diff --git a/lustre/quota/quota_ctl.c b/lustre/quota/quota_ctl.c index ad4f9a4..8891c9d 100644 --- a/lustre/quota/quota_ctl.c +++ b/lustre/quota/quota_ctl.c @@ -64,6 +64,7 @@ #ifdef HAVE_QUOTA_SUPPORT #ifdef __KERNEL__ + int mds_quota_ctl(struct obd_device *obd, struct obd_export *unused, struct obd_quotactl *oqctl) { @@ -131,7 +132,9 @@ int filter_quota_ctl(struct obd_device *unused, struct obd_export *exp, struct obd_device *obd = exp->exp_obd; struct obd_device_target *obt = &obd->u.obt; struct lvfs_run_ctxt saved; - struct lustre_quota_ctxt *qctxt = &obd->u.obt.obt_qctxt; + struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt; + struct lustre_qunit_size *lqs; + void *handle = NULL; struct timeval work_start; struct timeval work_end; long timediff; @@ -143,15 +146,11 @@ int filter_quota_ctl(struct obd_device *unused, struct obd_export *exp, case Q_FINVALIDATE: case Q_QUOTAON: case Q_QUOTAOFF: - if (!atomic_dec_and_test(&obt->obt_quotachecking)) { - CDEBUG(D_INFO, "other people are doing quotacheck\n"); - atomic_inc(&obt->obt_quotachecking); - rc = -EBUSY; - break; - } + down(&obt->obt_quotachecking); if (oqctl->qc_cmd == Q_FINVALIDATE && (obt->obt_qctxt.lqc_flags & UGQUOTA2LQC(oqctl->qc_type))) { - atomic_inc(&obt->obt_quotachecking); + CWARN("quota[%u] is on yet\n", oqctl->qc_type); + up(&obt->obt_quotachecking); rc = -EBUSY; break; } @@ -161,30 +160,45 @@ int filter_quota_ctl(struct obd_device *unused, struct obd_export *exp, case Q_GETQUOTA: /* In recovery scenario, this pending dqacq/dqrel might have * been processed by master successfully before it's dquot - * on master enter recovery mode. We must wait for this + * on master enter recovery mode. We must wait for this * dqacq/dqrel done then return the correct limits to master */ if (oqctl->qc_stat == QUOTA_RECOVERING) - qctxt_wait_pending_dqacq(&obd->u.obt.obt_qctxt, - oqctl->qc_id, oqctl->qc_type, - 1); + handle = quota_barrier(&obd->u.obt.obt_qctxt, oqctl, 1); push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); rc = fsfilt_quotactl(obd, obt->obt_sb, oqctl); pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + if (oqctl->qc_stat == QUOTA_RECOVERING) + quota_unbarrier(handle); + if (oqctl->qc_cmd == Q_QUOTAON || oqctl->qc_cmd == Q_QUOTAOFF || oqctl->qc_cmd == Q_FINVALIDATE) { - if (!rc && oqctl->qc_cmd == Q_QUOTAON) - obt->obt_qctxt.lqc_flags |= UGQUOTA2LQC(oqctl->qc_type); - if (!rc && oqctl->qc_cmd == Q_QUOTAOFF) - obt->obt_qctxt.lqc_flags &= ~UGQUOTA2LQC(oqctl->qc_type); - atomic_inc(&obt->obt_quotachecking); + if (oqctl->qc_cmd == Q_QUOTAON) { + if (!rc) { + obt->obt_qctxt.lqc_flags |= + UGQUOTA2LQC(oqctl->qc_type); + /* when quotaon, create lqs for every + * quota uid/gid b=18574 */ + build_lqs(obd); + } else if (rc == -EBUSY && + quota_is_on(qctxt, oqctl)) { + rc = -EALREADY; + } + } else if (oqctl->qc_cmd == Q_QUOTAOFF) { + if (!rc) + obt->obt_qctxt.lqc_flags &= + ~UGQUOTA2LQC(oqctl->qc_type); + else if (quota_is_off(qctxt, oqctl)) + rc = -EALREADY; + } + up(&obt->obt_quotachecking); } + break; case Q_SETQUOTA: /* currently, it is only used for nullifying the quota */ - qctxt_wait_pending_dqacq(&obd->u.obt.obt_qctxt, - oqctl->qc_id, oqctl->qc_type, 1); + handle = quota_barrier(&obd->u.obt.obt_qctxt, oqctl, 1); push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl); @@ -195,24 +209,33 @@ int filter_quota_ctl(struct obd_device *unused, struct obd_export *exp, oqctl->qc_cmd = Q_SETQUOTA; } pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + quota_unbarrier(handle); + + lqs = quota_search_lqs(LQS_KEY(oqctl->qc_type, oqctl->qc_id), + qctxt, 0); + if (lqs == NULL || IS_ERR(lqs)){ + CDEBUG(D_ERROR, "fail to create lqs when setquota\n"); + } else { + lqs->lqs_flags &= ~QB_SET; + lqs_putref(lqs); + } + break; case Q_INITQUOTA: { - unsigned int uid = 0, gid = 0; + unsigned int id[MAXQUOTAS] = { 0, 0 }; /* Initialize quota limit to MIN_QLIMIT */ LASSERT(oqctl->qc_dqblk.dqb_valid == QIF_BLIMITS); LASSERT(oqctl->qc_dqblk.dqb_bsoftlimit == 0); - /* There might be a pending dqacq/dqrel (which is going to - * clear stale limits on slave). we should wait for it's - * completion then initialize limits */ - qctxt_wait_pending_dqacq(&obd->u.obt.obt_qctxt, - oqctl->qc_id, oqctl->qc_type, 1); - if (!oqctl->qc_dqblk.dqb_bhardlimit) goto adjust; + /* There might be a pending dqacq/dqrel (which is going to + * clear stale limits on slave). we should wait for it's + * completion then initialize limits */ + handle = quota_barrier(&obd->u.obt.obt_qctxt, oqctl, 1); LASSERT(oqctl->qc_dqblk.dqb_bhardlimit == MIN_QLIMIT); push_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); rc = fsfilt_quotactl(obd, obd->u.obt.obt_sb, oqctl); @@ -225,18 +248,29 @@ int filter_quota_ctl(struct obd_device *unused, struct obd_export *exp, oqctl->qc_cmd = Q_INITQUOTA; } pop_ctxt(&saved, &obd->obd_lvfs_ctxt, NULL); + quota_unbarrier(handle); if (rc) RETURN(rc); adjust: + lqs = quota_search_lqs(LQS_KEY(oqctl->qc_type, oqctl->qc_id), + qctxt, 1); + if (lqs == NULL || IS_ERR(lqs)){ + CDEBUG(D_ERROR, "fail to create lqs when setquota\n"); + break; + } else { + lqs->lqs_flags |= QB_SET; + lqs_putref(lqs); + } + /* Trigger qunit pre-acquire */ if (oqctl->qc_type == USRQUOTA) - uid = oqctl->qc_id; + id[USRQUOTA] = oqctl->qc_id; else - gid = oqctl->qc_id; + id[GRPQUOTA] = oqctl->qc_id; rc = qctxt_adjust_qunit(obd, &obd->u.obt.obt_qctxt, - uid, gid, 1, 0, NULL); + id, 1, 0, NULL); if (rc == -EDQUOT || rc == -EBUSY) { CDEBUG(D_QUOTA, "rc: %d.\n", rc); rc = 0; @@ -264,7 +298,7 @@ int client_quota_ctl(struct obd_device *unused, struct obd_export *exp, struct ptlrpc_request *req; struct obd_quotactl *oqc; const struct req_format *rf; - int ver, opc, rc; + int ver, opc, rc, resends = 0; ENTRY; if (!strcmp(exp->exp_obd->obd_type->typ_name, LUSTRE_MDC_NAME)) { @@ -279,6 +313,8 @@ int client_quota_ctl(struct obd_device *unused, struct obd_export *exp, RETURN(-EINVAL); } +restart_request: + req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp), rf, ver, opc); if (req == NULL) RETURN(-ENOMEM); @@ -287,6 +323,8 @@ int client_quota_ctl(struct obd_device *unused, struct obd_export *exp, *oqc = *oqctl; ptlrpc_request_set_replen(req); + ptlrpc_at_set_req_timeout(req); + req->rq_no_resend = 1; rc = ptlrpc_queue_wait(req); if (rc) { @@ -307,6 +345,17 @@ int client_quota_ctl(struct obd_device *unused, struct obd_export *exp, EXIT; out: ptlrpc_req_finished(req); + + if (client_quota_recoverable_error(rc)) { + resends++; + if (!client_quota_should_resend(resends, &exp->exp_obd->u.cli)) { + CERROR("too many resend retries, returning error\n"); + RETURN(-EIO); + } + + goto restart_request; + } + return rc; } @@ -361,8 +410,8 @@ int lov_quota_ctl(struct obd_device *unused, struct obd_export *exp, tgt = lov->lov_tgts[i]; if (!tgt || !tgt->ltd_active || tgt->ltd_reap) { if (oqctl->qc_cmd == Q_GETOQUOTA) { + rc = -EREMOTEIO; CERROR("ost %d is inactive\n", i); - rc = -EIO; } else { CDEBUG(D_HA, "ost %d is inactive\n", i); }