+void* quota_barrier(struct lustre_quota_ctxt *qctxt,
+ struct obd_quotactl *oqctl, int isblk)
+{
+ struct lustre_qunit *qunit, *find_qunit;
+ int cycle = 1;
+
+ OBD_SLAB_ALLOC(qunit, qunit_cachep, CFS_ALLOC_IO, sizeof(*qunit));
+ if (qunit == NULL) {
+ CERROR("locating qunit failed.(id=%u isblk=%d %s)\n",
+ oqctl->qc_id, isblk, oqctl->qc_type ? "grp" : "usr");
+ qctxt_wait_pending_dqacq(qctxt, oqctl->qc_id,
+ oqctl->qc_type, isblk);
+ return NULL;
+ }
+
+ INIT_LIST_HEAD(&qunit->lq_hash);
+ qunit->lq_lock = SPIN_LOCK_UNLOCKED;
+ init_waitqueue_head(&qunit->lq_waitq);
+ atomic_set(&qunit->lq_refcnt, 1);
+ qunit->lq_ctxt = qctxt;
+ qunit->lq_data.qd_id = oqctl->qc_id;
+ qunit->lq_data.qd_flags = oqctl->qc_type;
+ if (isblk)
+ QDATA_SET_BLK(&qunit->lq_data);
+ QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0);
+ /* it means it is only an invalid qunit for barrier */
+ qunit->lq_opc = QUOTA_LAST_OPC;
+
+ while (1) {
+ spin_lock(&qunit_hash_lock);
+ find_qunit = dqacq_in_flight(qctxt, &qunit->lq_data);
+ if (find_qunit) {
+ spin_unlock(&qunit_hash_lock);
+ qunit_put(find_qunit);
+ qctxt_wait_pending_dqacq(qctxt, oqctl->qc_id,
+ oqctl->qc_type, isblk);
+ CDEBUG(D_QUOTA, "cycle=%d\n", cycle++);
+ continue;
+ }
+ break;
+ }
+ insert_qunit_nolock(qctxt, qunit);
+ spin_unlock(&qunit_hash_lock);
+ return qunit;
+}
+
+void quota_unbarrier(void *handle)
+{
+ struct lustre_qunit *qunit = (struct lustre_qunit *)handle;
+
+ if (qunit == NULL) {
+ CERROR("handle is NULL\n");
+ return;
+ }
+
+ LASSERT(qunit->lq_opc == QUOTA_LAST_OPC);
+ spin_lock(&qunit_hash_lock);
+ remove_qunit_nolock(qunit);
+ spin_unlock(&qunit_hash_lock);
+ QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, QUOTA_REQ_RETURNED);
+ wake_up(&qunit->lq_waitq);
+ qunit_put(qunit);
+}
+