if (QDATA_IS_BLK(qdata)) {
qunit_sz = lqs->lqs_bunit_sz;
tune_sz = lqs->lqs_btune_sz;
- pending_write = lqs->lqs_bwrite_pending * CFS_PAGE_SIZE;
+ pending_write = lqs->lqs_bwrite_pending;
record = lqs->lqs_blk_rec;
LASSERT(!(qunit_sz % QUOTABLOCK_SIZE));
} else {
return ret;
}
-/* caller must hold qunit_hash_lock */
-static struct lustre_qunit *dqacq_in_flight(struct lustre_quota_ctxt *qctxt,
- struct qunit_data *qdata)
-{
- unsigned int hashent = qunit_hashfn(qctxt, qdata);
- struct lustre_qunit *qunit;
- ENTRY;
-
- LASSERT_SPIN_LOCKED(&qunit_hash_lock);
- qunit = find_qunit(hashent, qctxt, qdata);
- RETURN(qunit);
-}
-
static struct lustre_qunit *alloc_qunit(struct lustre_quota_ctxt *qctxt,
struct qunit_data *qdata, int opc)
{
struct lustre_qunit *qunit = NULL;
ENTRY;
- OBD_SLAB_ALLOC(qunit, qunit_cachep, CFS_ALLOC_IO, sizeof(*qunit));
+ OBD_SLAB_ALLOC_PTR_GFP(qunit, qunit_cachep, CFS_ALLOC_IO);
if (qunit == NULL)
RETURN(NULL);
free_qunit(qunit);
}
+/* caller must hold qunit_hash_lock and release ref of qunit after using it */
+static struct lustre_qunit *dqacq_in_flight(struct lustre_quota_ctxt *qctxt,
+ struct qunit_data *qdata)
+{
+ unsigned int hashent = qunit_hashfn(qctxt, qdata);
+ struct lustre_qunit *qunit;
+ ENTRY;
+
+ LASSERT_SPIN_LOCKED(&qunit_hash_lock);
+ qunit = find_qunit(hashent, qctxt, qdata);
+ if (qunit)
+ qunit_get(qunit);
+ RETURN(qunit);
+}
+
static void
insert_qunit_nolock(struct lustre_quota_ctxt *qctxt, struct lustre_qunit *qunit)
{
QDATA_DEBUG(qdata, "obd(%s): complete %s quota req\n",
obd->obd_name, (opc == QUOTA_DQACQ) ? "acq" : "rel");
+ /* do it only when a releasing quota req more than 5MB b=18491 */
+ if (opc == QUOTA_DQREL && qdata->qd_count >= 5242880)
+ OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_DELAY_REL, 5);
+
/* update local operational quota file */
if (rc == 0) {
__u64 count = QUSG(qdata->qd_count, QDATA_IS_BLK(qdata));
compute_lqs_after_removing_qunit(qunit);
- /* wake up all waiters */
+ if (rc == 0)
+ rc = QUOTA_REQ_RETURNED;
QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, rc);
+ /* wake up all waiters */
wake_up_all(&qunit->lq_waitq);
+ /* this is for dqacq_in_flight() */
+ qunit_put(qunit);
+ /* this is for alloc_qunit() */
qunit_put(qunit);
if (rc < 0 && rc != -EDQUOT)
RETURN(err);
CERROR("adjust slave's qunit size failed!(rc:%d)\n", rc1);
RETURN(rc1);
}
- if (err || (rc && rc != -EBUSY && rc1 == 0) || is_master(qctxt))
- RETURN(err);
+ if (err || (rc < 0 && rc != -EBUSY && rc1 == 0) || is_master(qctxt))
+ RETURN(err);
+
+ if (opc == QUOTA_DQREL && qdata->qd_count >= 5242880 &&
+ OBD_FAIL_CHECK(OBD_FAIL_QUOTA_DELAY_REL))
+ RETURN(err);
/* reschedule another dqacq/dqrel if needed */
qdata->qd_count = 0;
RETURN(rc);
}
+/* wake up all waiting threads when lqc_import is NULL */
+void dqacq_interrupt(struct lustre_quota_ctxt *qctxt)
+{
+ struct lustre_qunit *qunit, *tmp;
+ int i;
+ ENTRY;
+
+ spin_lock(&qunit_hash_lock);
+ for (i = 0; i < NR_DQHASH; i++) {
+ list_for_each_entry_safe(qunit, tmp, &qunit_hash[i], lq_hash) {
+ if (qunit->lq_ctxt != qctxt)
+ continue;
+
+ /* Wake up all waiters. Do not change lq_state.
+ * The waiters will check lq_rc which is kept as 0
+ * if no others change it, then the waiters will return
+ * -EAGAIN to caller who can perform related quota
+ * acq/rel if necessary. */
+ wake_up_all(&qunit->lq_waitq);
+ }
+ }
+ spin_unlock(&qunit_hash_lock);
+ EXIT;
+}
+
static int got_qunit(struct lustre_qunit *qunit)
{
- int rc;
+ struct lustre_quota_ctxt *qctxt = qunit->lq_ctxt;
+ int rc = 0;
ENTRY;
spin_lock(&qunit->lq_lock);
switch (qunit->lq_state) {
case QUNIT_IN_HASH:
case QUNIT_RM_FROM_HASH:
- rc = 0;
break;
case QUNIT_FINISHED:
rc = 1;
break;
default:
- rc = 0;
CERROR("invalid qunit state %d\n", qunit->lq_state);
}
spin_unlock(&qunit->lq_lock);
+
+ if (!rc) {
+ spin_lock(&qctxt->lqc_lock);
+ rc = !qctxt->lqc_import || !qctxt->lqc_valid;
+ spin_unlock(&qctxt->lqc_lock);
+ }
+
RETURN(rc);
}
spin_lock(&qunit_hash_lock);
qunit = dqacq_in_flight(qctxt, qdata);
if (qunit) {
- if (wait)
- qunit_get(qunit);
spin_unlock(&qunit_hash_lock);
qunit_put(empty);
goto wait_completion;
}
qunit = empty;
+ qunit_get(qunit);
insert_qunit_nolock(qctxt, qunit);
spin_unlock(&qunit_hash_lock);
- LASSERT(qunit);
-
quota_search_lqs(qdata, NULL, qctxt, &lqs);
if (lqs) {
spin_lock(&lqs->lqs_lock);
QDATA_SET_CHANGE_QS(qdata);
rc = qctxt->lqc_handler(obd, qdata, opc);
rc2 = dqacq_completion(obd, qctxt, qdata, rc, opc);
+ /* this is for qunit_get() */
+ qunit_put(qunit);
+
do_gettimeofday(&work_end);
timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
if (opc == QUOTA_DQACQ)
QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, -EAGAIN);
wake_up_all(&qunit->lq_waitq);
+ /* this is for qunit_get() */
+ qunit_put(qunit);
+ /* this for alloc_qunit() */
qunit_put(qunit);
spin_lock(&qctxt->lqc_lock);
if (wait && !qctxt->lqc_import) {
if (req == NULL) {
CDEBUG(D_ERROR, "Can't alloc request\n");
dqacq_completion(obd, qctxt, qdata, -ENOMEM, opc);
+ /* this is for qunit_get() */
+ qunit_put(qunit);
RETURN(-ENOMEM);
}
CDEBUG(D_ERROR, "Can't pack qunit_data(rc: %d)\n", rc);
ptlrpc_req_finished(req);
dqacq_completion(obd, qctxt, qdata, -EPROTO, opc);
+ /* this is for qunit_get() */
+ qunit_put(qunit);
RETURN(rc);
}
- if (wait && qunit)
- qunit_get(qunit);
-
CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
aa = ptlrpc_req_async_args(req);
aa->aa_ctxt = qctxt;
QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit);
l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi);
- /* rc = -EAGAIN, it means a quota req is finished;
+ /* rc = -EAGAIN, it means the quota master isn't ready yet
+ * rc = QUOTA_REQ_RETURNED, it means a quota req is finished;
* rc = -EDQUOT, it means out of quota
* rc = -EBUSY, it means recovery is happening
* other rc < 0, it means real errors, functions who call
* schedule_dqacq should take care of this */
spin_lock(&qunit->lq_lock);
- if (qunit->lq_rc == 0)
- rc = -EAGAIN;
- else
- rc = qunit->lq_rc;
+ rc = qunit->lq_rc;
spin_unlock(&qunit->lq_lock);
CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
qunit, rc);
- qunit_put(qunit);
}
+ qunit_put(qunit);
do_gettimeofday(&work_end);
timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
if (opc == QUOTA_DQACQ)
spin_lock(&qunit_hash_lock);
qunit = dqacq_in_flight(qctxt, &qdata);
- if (qunit)
- /* grab reference on this qunit to handle races with
- * dqacq_completion(). Otherwise, this qunit could be freed just
- * after we release the qunit_hash_lock */
- qunit_get(qunit);
spin_unlock(&qunit_hash_lock);
if (qunit) {
l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi);
CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
qunit, qunit->lq_rc);
+ /* keep same as schedule_dqacq() b=17030 */
+ spin_lock(&qunit->lq_lock);
+ rc = qunit->lq_rc;
+ spin_unlock(&qunit->lq_lock);
+ /* this is for dqacq_in_flight() */
qunit_put(qunit);
do_gettimeofday(&work_end);
timediff = cfs_timeval_sub(&work_end, &work_start, NULL);
isblk ? LQUOTA_WAIT_PENDING_BLK_QUOTA :
LQUOTA_WAIT_PENDING_INO_QUOTA,
timediff);
- /* keep same as schedule_dqacq() b=17030 */
- spin_lock(&qunit->lq_lock);
- if (qunit->lq_rc == 0)
- rc = -EAGAIN;
- else
- rc = qunit->lq_rc;
- spin_unlock(&qunit->lq_lock);
} else {
do_gettimeofday(&work_end);
timediff = cfs_timeval_sub(&work_end, &work_start, NULL);