#include <obd_class.h>
#include <lustre_quota.h>
#include <lustre_fsfilt.h>
-#include <class_hash.h>
#include <lprocfs_status.h>
#include "quota_internal.h"
#ifdef HAVE_QUOTA_SUPPORT
-static lustre_hash_ops_t lqs_hash_ops;
+static cfs_hash_ops_t lqs_hash_ops;
unsigned long default_bunit_sz = 128 * 1024 * 1024; /* 128M bytes */
unsigned long default_btune_ratio = 50; /* 50 percentage */
spinlock_t lq_lock; /** Protect the whole structure */
enum qunit_state lq_state; /** Present the status of qunit */
int lq_rc; /** The rc of lq_data */
+ pid_t lq_owner;
};
#define QUNIT_SET_STATE(qunit, state) \
do { \
spin_lock(&qunit->lq_lock); \
QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), " \
- "lq_rc(%d)\n", \
+ "lq_rc(%d), lq_owner(%d)\n", \
qunit, qunit_state_names[qunit->lq_state], \
- qunit_state_names[state], qunit->lq_rc); \
+ qunit_state_names[state], qunit->lq_rc, \
+ qunit->lq_owner); \
qunit->lq_state = state; \
spin_unlock(&qunit->lq_lock); \
} while(0)
spin_lock(&qunit->lq_lock); \
qunit->lq_rc = rc; \
QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), " \
- "lq_rc(%d)\n", \
+ "lq_rc(%d), lq_owner(%d)\n", \
qunit, qunit_state_names[qunit->lq_state], \
- qunit_state_names[state], qunit->lq_rc); \
+ qunit_state_names[state], qunit->lq_rc, \
+ qunit->lq_owner); \
qunit->lq_state = state; \
spin_unlock(&qunit->lq_lock); \
} while(0)
-
int should_translate_quota (struct obd_import *imp)
{
ENTRY;
int ret = 0;
ENTRY;
- if (!sb_any_quota_enabled(sb))
+ if (!ll_sb_any_quota_active(sb))
RETURN(0);
spin_lock(&qctxt->lqc_lock);
if (!limit)
GOTO(out, ret = 0);
- search_lqs:
- quota_search_lqs(qdata, NULL, qctxt, &lqs);
- if (!lqs) {
- CDEBUG(D_QUOTA, "Can't find the lustre qunit size!\n");
- ret = quota_create_lqs(qdata, NULL, qctxt, &lqs);
- if (ret == -EALREADY) {
- ret = 0;
- goto search_lqs;
- }
- if (ret < 0)
- GOTO (out, ret);
+ lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id),
+ qctxt, 0);
+ if (IS_ERR(lqs) || lqs == NULL) {
+ CERROR("fail to find a lqs for %sid: %u)!\n",
+ QDATA_IS_GRP(qdata) ? "g" : "u", qdata->qd_id);
+ GOTO (out, ret = 0);
}
spin_lock(&lqs->lqs_lock);
ret = 2;
/* if there are other pending writes for this uid/gid, releasing
* quota is put off until the last pending write b=16645 */
- if (ret == 2 && pending_write) {
+ /* if there is an ongoing quota request, a releasing request is aborted.
+ * That ongoing quota request will call this function again when
+ * it returned b=18630 */
+ if (pending_write || record) {
CDEBUG(D_QUOTA, "delay quota release\n");
ret = 0;
}
}
+ if (ret > 0)
+ quota_compute_lqs(qdata, lqs, 1, (ret == 1) ? 1 : 0);
+
CDEBUG(D_QUOTA, "type: %c, limit: "LPU64", usage: "LPU64
- ", pending_write: "LPU64", record: "LPD64
+ ", pending_write: "LPU64", record: %lld"
", qunit_sz: %lu, tune_sz: %lu, ret: %d.\n",
QDATA_IS_BLK(qdata) ? 'b' : 'i', limit, usage, pending_write,
record, qunit_sz, tune_sz, ret);
spin_unlock(&lqs->lqs_lock);
lqs_putref(lqs);
+
EXIT;
out:
OBD_FREE_PTR(qctl);
int ret = QUOTA_RET_OK;
ENTRY;
- if (!sb_any_quota_enabled(sb))
+ if (!ll_sb_any_quota_active(sb))
RETURN(QUOTA_RET_NOQUOTA);
/* ignore root user */
struct lustre_qunit *qunit = NULL;
ENTRY;
- OBD_SLAB_ALLOC(qunit, qunit_cachep, CFS_ALLOC_IO, sizeof(*qunit));
+ OBD_SLAB_ALLOC_PTR_GFP(qunit, qunit_cachep, CFS_ALLOC_IO);
if (qunit == NULL)
RETURN(NULL);
qunit->lq_opc = opc;
qunit->lq_lock = SPIN_LOCK_UNLOCKED;
QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0);
+ qunit->lq_owner = cfs_curproc_pid();
RETURN(qunit);
}
static void compute_lqs_after_removing_qunit(struct lustre_qunit *qunit)
{
- struct lustre_qunit_size *lqs = NULL;
+ struct lustre_qunit_size *lqs;
- quota_search_lqs(&qunit->lq_data, NULL, qunit->lq_ctxt, &lqs);
- if (lqs) {
+ lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(&qunit->lq_data),
+ qunit->lq_data.qd_id),
+ qunit->lq_ctxt, 0);
+ if (lqs && !IS_ERR(lqs)) {
spin_lock(&lqs->lqs_lock);
if (qunit->lq_opc == QUOTA_DQACQ)
quota_compute_lqs(&qunit->lq_data, lqs, 0, 1);
/* this is for schedule_dqacq */
lqs_putref(lqs);
}
-
}
static void remove_qunit_nolock(struct lustre_qunit *qunit)
qunit_put(qunit);
}
+void* quota_barrier(struct lustre_quota_ctxt *qctxt,
+ struct obd_quotactl *oqctl, int isblk)
+{
+ struct lustre_qunit *qunit, *find_qunit;
+ int cycle = 1;
+
+ OBD_SLAB_ALLOC(qunit, qunit_cachep, CFS_ALLOC_IO, sizeof(*qunit));
+ if (qunit == NULL) {
+ CERROR("locating %sunit failed for %sid %u\n",
+ isblk ? "b" : "i", oqctl->qc_type ? "g" : "u",
+ oqctl->qc_id);
+ qctxt_wait_pending_dqacq(qctxt, oqctl->qc_id,
+ oqctl->qc_type, isblk);
+ return NULL;
+ }
+
+ INIT_LIST_HEAD(&qunit->lq_hash);
+ qunit->lq_lock = SPIN_LOCK_UNLOCKED;
+ init_waitqueue_head(&qunit->lq_waitq);
+ atomic_set(&qunit->lq_refcnt, 1);
+ qunit->lq_ctxt = qctxt;
+ qunit->lq_data.qd_id = oqctl->qc_id;
+ qunit->lq_data.qd_flags = oqctl->qc_type;
+ if (isblk)
+ QDATA_SET_BLK(&qunit->lq_data);
+ QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0);
+ /* it means it is only an invalid qunit for barrier */
+ qunit->lq_opc = QUOTA_LAST_OPC;
+
+ while (1) {
+ spin_lock(&qunit_hash_lock);
+ find_qunit = dqacq_in_flight(qctxt, &qunit->lq_data);
+ if (find_qunit) {
+ spin_unlock(&qunit_hash_lock);
+ qunit_put(find_qunit);
+ qctxt_wait_pending_dqacq(qctxt, oqctl->qc_id,
+ oqctl->qc_type, isblk);
+ CDEBUG(D_QUOTA, "cycle=%d\n", cycle++);
+ continue;
+ }
+ break;
+ }
+ insert_qunit_nolock(qctxt, qunit);
+ spin_unlock(&qunit_hash_lock);
+ return qunit;
+}
+
+void quota_unbarrier(void *handle)
+{
+ struct lustre_qunit *qunit = (struct lustre_qunit *)handle;
+
+ if (qunit == NULL) {
+ CERROR("handle is NULL\n");
+ return;
+ }
+
+ LASSERT(qunit->lq_opc == QUOTA_LAST_OPC);
+ spin_lock(&qunit_hash_lock);
+ remove_qunit_nolock(qunit);
+ spin_unlock(&qunit_hash_lock);
+ QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, QUOTA_REQ_RETURNED);
+ wake_up(&qunit->lq_waitq);
+ qunit_put(qunit);
+}
+
#define INC_QLIMIT(limit, count) (limit == MIN_QLIMIT) ? \
(limit = count) : (limit += count)
struct qunit_data *qdata, int opc, int wait,
struct obd_trans_info *oti);
+static inline void qdata_to_oqaq(struct qunit_data *qdata,
+ struct quota_adjust_qunit *oqaq)
+{
+ LASSERT(qdata);
+ LASSERT(oqaq);
+
+ oqaq->qaq_flags = qdata->qd_flags;
+ oqaq->qaq_id = qdata->qd_id;
+ if (QDATA_IS_ADJBLK(qdata))
+ oqaq->qaq_bunit_sz = qdata->qd_qunit;
+ if (QDATA_IS_ADJINO(qdata))
+ oqaq->qaq_iunit_sz = qdata->qd_qunit;
+}
+
static int
dqacq_completion(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
struct qunit_data *qdata, int rc, int opc)
QDATA_DEBUG(qdata, "obd(%s): complete %s quota req\n",
obd->obd_name, (opc == QUOTA_DQACQ) ? "acq" : "rel");
+ /* do it only when a releasing quota req more than 5MB b=18491 */
+ if (opc == QUOTA_DQREL && qdata->qd_count >= 5242880)
+ OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_DELAY_REL, 5);
+
/* update local operational quota file */
if (rc == 0) {
__u64 count = QUSG(qdata->qd_count, QDATA_IS_BLK(qdata));
/* this is for dqacq_in_flight() */
qunit_put(qunit);
- /* this is for alloc_qunit() */
- qunit_put(qunit);
if (rc < 0 && rc != -EDQUOT)
- RETURN(err);
+ GOTO(out1, err);
/* don't reschedule in such cases:
* - acq/rel failure and qunit isn't changed,
*/
OBD_ALLOC_PTR(oqaq);
if (!oqaq)
- RETURN(-ENOMEM);
+ GOTO(out1, err = -ENOMEM);
qdata_to_oqaq(qdata, oqaq);
/* adjust the qunit size in slaves */
rc1 = quota_adjust_slave_lqs(oqaq, qctxt);
OBD_FREE_PTR(oqaq);
if (rc1 < 0) {
CERROR("adjust slave's qunit size failed!(rc:%d)\n", rc1);
- RETURN(rc1);
+ GOTO(out1, err = rc1);
}
if (err || (rc < 0 && rc != -EBUSY && rc1 == 0) || is_master(qctxt))
- RETURN(err);
+ GOTO(out1, err);
+
+ if (opc == QUOTA_DQREL && qdata->qd_count >= 5242880 &&
+ OBD_FAIL_CHECK(OBD_FAIL_QUOTA_DELAY_REL))
+ GOTO(out1, err);
/* reschedule another dqacq/dqrel if needed */
qdata->qd_count = 0;
rc1 = schedule_dqacq(obd, qctxt, qdata, opc, 0, NULL);
QDATA_DEBUG(qdata, "reschedudle opc(%d) rc(%d)\n", opc, rc1);
}
+ out1:
+ /* this is for alloc_qunit() */
+ qunit_put(qunit);
RETURN(err);
}
struct lustre_qunit *qunit = aa->aa_qunit;
struct obd_device *obd = req->rq_import->imp_obd;
struct qunit_data *qdata = NULL;
- int rc1 = 0;
ENTRY;
LASSERT(req);
LASSERT(req->rq_import);
- /* there are several forms of qunit(historic causes), so we need to
- * adjust qunit from slaves to the same form here */
- OBD_ALLOC(qdata, sizeof(struct qunit_data));
- if (!qdata)
- RETURN(-ENOMEM);
-
down_read(&obt->obt_rwsem);
/* if a quota req timeouts or is dropped, we should update quota
* statistics which will be handled in dqacq_completion. And in
* this situation we should get qdata from request instead of
* reply */
- rc1 = quota_get_qdata(req, qdata,
- (rc != 0) ? QUOTA_REQUEST : QUOTA_REPLY,
- QUOTA_IMPORT);
- if (rc1 < 0) {
+ qdata = quota_get_qdata(req, (rc != 0) ? QUOTA_REQUEST : QUOTA_REPLY,
+ QUOTA_IMPORT);
+ if (IS_ERR(qdata)) {
+ rc = PTR_ERR(qdata);
DEBUG_REQ(D_ERROR, req,
- "error unpacking qunit_data(rc: %d)\n", rc1);
- GOTO(exit, rc = rc1);
+ "error unpacking qunit_data(rc: %ld)\n",
+ PTR_ERR(qdata));
+ qdata = &qunit->lq_data;
}
QDATA_DEBUG(qdata, "qdata: interpret rc(%d).\n", rc);
if (qdata->qd_id != qunit->lq_data.qd_id ||
OBD_FAIL_CHECK(OBD_FAIL_QUOTA_RET_QDATA)) {
- CDEBUG(D_ERROR, "the returned qd_id isn't expected!"
+ CERROR("the returned qd_id isn't expected!"
"(qdata: %u, lq_data: %u)\n", qdata->qd_id,
qunit->lq_data.qd_id);
qdata->qd_id = qunit->lq_data.qd_id;
rc = -EPROTO;
}
if (QDATA_IS_GRP(qdata) != QDATA_IS_GRP(&qunit->lq_data)) {
- CDEBUG(D_ERROR, "the returned grp/usr isn't expected!"
+ CERROR("the returned grp/usr isn't expected!"
"(qdata: %u, lq_data: %u)\n", qdata->qd_flags,
qunit->lq_data.qd_flags);
if (QDATA_IS_GRP(&qunit->lq_data))
rc = -EPROTO;
}
if (qdata->qd_count > qunit->lq_data.qd_count) {
- CDEBUG(D_ERROR, "the returned qd_count isn't expected!"
+ CERROR("the returned qd_count isn't expected!"
"(qdata: "LPU64", lq_data: "LPU64")\n", qdata->qd_count,
qunit->lq_data.qd_count);
rc = -EPROTO;
}
+ if (unlikely(rc == -ESRCH))
+ CERROR("quota for %s has been enabled by master, but disabled "
+ "by slave.\n", QDATA_IS_GRP(qdata) ? "group" : "user");
+
rc = dqacq_completion(obd, qctxt, qdata, rc,
lustre_msg_get_opc(req->rq_reqmsg));
-exit:
up_read(&obt->obt_rwsem);
- OBD_FREE(qdata, sizeof(struct qunit_data));
-
RETURN(rc);
}
EXIT;
}
-static int got_qunit(struct lustre_qunit *qunit)
+static int got_qunit(struct lustre_qunit *qunit, int is_master)
{
struct lustre_quota_ctxt *qctxt = qunit->lq_ctxt;
int rc = 0;
if (!rc) {
spin_lock(&qctxt->lqc_lock);
- rc = !qctxt->lqc_import || !qctxt->lqc_valid;
+ rc = !qctxt->lqc_valid;
+ if (!is_master)
+ rc |= !qctxt->lqc_import;
spin_unlock(&qctxt->lqc_lock);
}
RETURN(rc);
}
+static inline void
+revoke_lqs_rec(struct lustre_qunit_size *lqs, struct qunit_data *qdata, int opc)
+{
+ /* revoke lqs_xxx_rec which is computed in check_cur_qunit
+ * b=18630 */
+ spin_lock(&lqs->lqs_lock);
+ quota_compute_lqs(qdata, lqs, 0, (opc == QUOTA_DQACQ) ? 1 : 0);
+ spin_unlock(&lqs->lqs_lock);
+}
+
static int
schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
struct qunit_data *qdata, int opc, int wait,
LASSERT(opc == QUOTA_DQACQ || opc == QUOTA_DQREL);
do_gettimeofday(&work_start);
- if ((empty = alloc_qunit(qctxt, qdata, opc)) == NULL)
+
+ lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id),
+ qctxt, 0);
+ if (lqs == NULL || IS_ERR(lqs)) {
+ CERROR("Can't find the lustre qunit size!\n");
+ RETURN(-EPERM);
+ }
+
+ if ((empty = alloc_qunit(qctxt, qdata, opc)) == NULL) {
+ revoke_lqs_rec(lqs, qdata, opc);
+ /* this is for quota_search_lqs */
+ lqs_putref(lqs);
RETURN(-ENOMEM);
+ }
+
+ OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_DELAY_SD, 5);
spin_lock(&qunit_hash_lock);
qunit = dqacq_in_flight(qctxt, qdata);
spin_unlock(&qunit_hash_lock);
qunit_put(empty);
+ revoke_lqs_rec(lqs, qdata, opc);
+ /* this is for quota_search_lqs */
+ lqs_putref(lqs);
goto wait_completion;
}
qunit = empty;
insert_qunit_nolock(qctxt, qunit);
spin_unlock(&qunit_hash_lock);
- quota_search_lqs(qdata, NULL, qctxt, &lqs);
- if (lqs) {
- spin_lock(&lqs->lqs_lock);
- quota_compute_lqs(qdata, lqs, 1, (opc == QUOTA_DQACQ) ? 1 : 0);
- /* when this qdata returned from mds, it will call lqs_putref */
- lqs_getref(lqs);
- spin_unlock(&lqs->lqs_lock);
- /* this is for quota_search_lqs */
- lqs_putref(lqs);
- } else {
- CDEBUG(D_ERROR, "Can't find the lustre qunit size!\n");
- }
+ /* From here, the quota request will be sent anyway.
+ * When this qdata request returned or is cancelled,
+ * lqs_putref will be called at that time */
+ lqs_getref(lqs);
+ /* this is for quota_search_lqs */
+ lqs_putref(lqs);
QDATA_DEBUG(qdata, "obd(%s): send %s quota req\n",
obd->obd_name, (opc == QUOTA_DQACQ) ? "acq" : "rel");
l_wait_event(qctxt->lqc_wait_for_qmaster,
check_qm(qctxt), &lwi);
CDEBUG(D_QUOTA, "wake up when quota master is back\n");
- lc_watchdog_touch(oti->oti_thread->t_watchdog);
+ lc_watchdog_touch(oti->oti_thread->t_watchdog,
+ GET_TIMEOUT(oti->oti_thread->t_svc));
} else {
spin_unlock(&qctxt->lqc_lock);
}
LUSTRE_MDS_VERSION, opc);
class_import_put(imp);
if (req == NULL) {
- CDEBUG(D_ERROR, "Can't alloc request\n");
+ CERROR("Can't alloc request\n");
dqacq_completion(obd, qctxt, qdata, -ENOMEM, opc);
/* this is for qunit_get() */
qunit_put(qunit);
req->rq_no_resend = req->rq_no_delay = 1;
rc = quota_copy_qdata(req, qdata, QUOTA_REQUEST, QUOTA_IMPORT);
if (rc < 0) {
- CDEBUG(D_ERROR, "Can't pack qunit_data(rc: %d)\n", rc);
+ CERROR("Can't pack qunit_data(rc: %d)\n", rc);
ptlrpc_req_finished(req);
dqacq_completion(obd, qctxt, qdata, -EPROTO, opc);
/* this is for qunit_get() */
struct qunit_data *p = &qunit->lq_data;
QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit);
- l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi);
+ l_wait_event(qunit->lq_waitq, got_qunit(qunit, is_master(qctxt)),
+ &lwi);
/* rc = -EAGAIN, it means the quota master isn't ready yet
* rc = QUOTA_REQ_RETURNED, it means a quota req is finished;
* rc = -EDQUOT, it means out of quota
spin_lock(&qunit->lq_lock);
rc = qunit->lq_rc;
spin_unlock(&qunit->lq_lock);
- CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
- qunit, rc);
+ CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: id(%u) flag(%u) "
+ "rc(%d) owner(%d)\n", qunit, qunit->lq_data.qd_id,
+ qunit->lq_data.qd_flags, rc, qunit->lq_owner);
}
qunit_put(qunit);
int
qctxt_adjust_qunit(struct obd_device *obd, struct lustre_quota_ctxt *qctxt,
- uid_t uid, gid_t gid, __u32 isblk, int wait,
+ const unsigned int id[], __u32 isblk, int wait,
struct obd_trans_info *oti)
{
int rc = 0, i = USRQUOTA;
- __u32 id[MAXQUOTAS] = { uid, gid };
struct qunit_data qdata[MAXQUOTAS];
ENTRY;
- CLASSERT(MAXQUOTAS < 4);
- if (!sb_any_quota_enabled(qctxt->lqc_sb))
+ if (quota_is_set(obd, id, isblk ? QB_SET : QI_SET) == 0)
RETURN(0);
for (i = 0; i < MAXQUOTAS; i++) {
struct qunit_data *p = &qunit->lq_data;
QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit);
- l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi);
- CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n",
- qunit, qunit->lq_rc);
+ l_wait_event(qunit->lq_waitq, got_qunit(qunit, is_master(qctxt)),
+ &lwi);
+ CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: rc(%d) "
+ "owner(%d)\n", qunit, qunit->lq_rc, qunit->lq_owner);
/* keep same as schedule_dqacq() b=17030 */
spin_lock(&qunit->lq_lock);
rc = qunit->lq_rc;
RETURN(rc);
cfs_waitq_init(&qctxt->lqc_wait_for_qmaster);
+ cfs_waitq_init(&qctxt->lqc_lqs_waitq);
+ atomic_set(&qctxt->lqc_lqs, 0);
spin_lock_init(&qctxt->lqc_lock);
spin_lock(&qctxt->lqc_lock);
qctxt->lqc_handler = handler;
qctxt->lqc_sync_blk = 0;
spin_unlock(&qctxt->lqc_lock);
- qctxt->lqc_lqs_hash = lustre_hash_init("LQS_HASH", 7, 7,
- &lqs_hash_ops, 0);
+ qctxt->lqc_lqs_hash = cfs_hash_create("LQS_HASH",
+ HASH_LQS_CUR_BITS,
+ HASH_LQS_MAX_BITS,
+ &lqs_hash_ops, CFS_HASH_REHASH);
if (!qctxt->lqc_lqs_hash) {
CERROR("initialize hash lqs for %s error!\n", obd->obd_name);
RETURN(-ENOMEM);
RETURN(rc);
}
+static int check_lqs(struct lustre_quota_ctxt *qctxt)
+{
+ int rc;
+ ENTRY;
+
+ rc = !atomic_read(&qctxt->lqc_lqs);
+
+ RETURN(rc);
+}
+
+
+void hash_put_lqs(void *obj, void *data)
+{
+ lqs_putref((struct lustre_qunit_size *)obj);
+}
+
void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force)
{
struct lustre_qunit *qunit, *tmp;
struct list_head tmp_list;
+ struct l_wait_info lwi = { 0 };
struct obd_device_target *obt = qctxt->lqc_obt;
int i;
ENTRY;
qunit_put(qunit);
}
- down_write(&obt->obt_rwsem);
- lustre_hash_exit(qctxt->lqc_lqs_hash);
- qctxt->lqc_lqs_hash = NULL;
- up_write(&obt->obt_rwsem);
-
/* after qctxt_cleanup, qctxt might be freed, then check_qm() is
* unpredicted. So we must wait until lqc_wait_for_qmaster is empty */
while (cfs_waitq_active(&qctxt->lqc_wait_for_qmaster)) {
cfs_time_seconds(1));
}
+ cfs_hash_for_each_safe(qctxt->lqc_lqs_hash, hash_put_lqs, NULL);
+ l_wait_event(qctxt->lqc_lqs_waitq, check_lqs(qctxt), &lwi);
+ down_write(&obt->obt_rwsem);
+ cfs_hash_destroy(qctxt->lqc_lqs_hash);
+ qctxt->lqc_lqs_hash = NULL;
+ up_write(&obt->obt_rwsem);
+
ptlrpcd_decref();
#ifdef LPROCFS
int rc = 0;
ENTRY;
- ptlrpc_daemonize("qslave_recovd");
+ cfs_daemonize_ctxt("qslave_recovd");
+
+ /* for obdfilter */
+ class_incref(obd, "qslave_recovd_filter", obd);
complete(&data->comp);
- if (qctxt->lqc_recovery)
+ spin_lock(&qctxt->lqc_lock);
+ if (qctxt->lqc_recovery) {
+ spin_unlock(&qctxt->lqc_lock);
+ class_decref(obd, "qslave_recovd_filter", obd);
RETURN(0);
- qctxt->lqc_recovery = 1;
+ } else {
+ qctxt->lqc_recovery = 1;
+ spin_unlock(&qctxt->lqc_lock);
+ }
for (type = USRQUOTA; type < MAXQUOTAS; type++) {
struct qunit_data qdata;
int ret;
LOCK_DQONOFF_MUTEX(dqopt);
- if (!sb_has_quota_enabled(qctxt->lqc_sb, type)) {
+ if (!ll_sb_has_quota_active(qctxt->lqc_sb, type)) {
UNLOCK_DQONOFF_MUTEX(dqopt);
break;
}
rc = 0;
}
- if (rc)
- CDEBUG(rc == -EBUSY ? D_QUOTA : D_ERROR,
- "qslave recovery failed! (id:%d type:%d "
+ if (rc && rc != -EBUSY)
+ CERROR("qslave recovery failed! (id:%d type:%d "
" rc:%d)\n", dqid->di_id, type, rc);
free:
- kfree(dqid);
+ OBD_FREE_PTR(dqid);
}
}
+ spin_lock(&qctxt->lqc_lock);
qctxt->lqc_recovery = 0;
+ spin_unlock(&qctxt->lqc_lock);
+ class_decref(obd, "qslave_recovd_filter", obd);
RETURN(rc);
}
int rc;
ENTRY;
- if (!sb_any_quota_enabled(qctxt->lqc_sb))
+ if (!ll_sb_any_quota_active(qctxt->lqc_sb))
goto exit;
data.obd = obd;
EXIT;
}
+int quota_is_on(struct lustre_quota_ctxt *qctxt, struct obd_quotactl *oqctl)
+{
+ unsigned int type;
+
+ for (type = USRQUOTA; type < MAXQUOTAS; type++) {
+ if (!Q_TYPESET(oqctl, type))
+ continue;
+ if (!(qctxt->lqc_flags & UGQUOTA2LQC(oqctl->qc_type)))
+ return 0;
+ }
+ return 1;
+}
+
+int quota_is_off(struct lustre_quota_ctxt *qctxt, struct obd_quotactl *oqctl)
+{
+ unsigned int type;
+
+ for (type = USRQUOTA; type < MAXQUOTAS; type++) {
+ if (!Q_TYPESET(oqctl, type))
+ continue;
+ if (qctxt->lqc_flags & UGQUOTA2LQC(oqctl->qc_type))
+ return 0;
+ }
+ return 1;
+}
+
+/**
+ * When quotaon, build a lqs for every uid/gid who has been set limitation
+ * for quota. After quota_search_lqs, it will hold one ref for the lqs.
+ * It will be released when qctxt_cleanup() is executed b=18574
+ *
+ * Should be called with obt->obt_quotachecking held. b=20152
+ */
+void build_lqs(struct obd_device *obd)
+{
+ struct obd_device_target *obt = &obd->u.obt;
+ struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt;
+ struct list_head id_list;
+ int i, rc;
+
+ LASSERT_SEM_LOCKED(&obt->obt_quotachecking);
+ INIT_LIST_HEAD(&id_list);
+ for (i = 0; i < MAXQUOTAS; i++) {
+ struct dquot_id *dqid, *tmp;
+
+ if (sb_dqopt(qctxt->lqc_sb)->files[i] == NULL)
+ continue;
+
+#ifndef KERNEL_SUPPORTS_QUOTA_READ
+ rc = fsfilt_qids(obd, sb_dqopt(qctxt->lqc_sb)->files[i], NULL,
+ i, &id_list);
+#else
+ rc = fsfilt_qids(obd, NULL, sb_dqopt(qctxt->lqc_sb)->files[i],
+ i, &id_list);
+#endif
+ if (rc) {
+ CERROR("%s: failed to get %s qids!\n", obd->obd_name,
+ i ? "group" : "user");
+ continue;
+ }
+
+ list_for_each_entry_safe(dqid, tmp, &id_list,
+ di_link) {
+ struct lustre_qunit_size *lqs;
+
+ list_del_init(&dqid->di_link);
+ lqs = quota_search_lqs(LQS_KEY(i, dqid->di_id),
+ qctxt, 1);
+ if (lqs && !IS_ERR(lqs)) {
+ lqs->lqs_flags |= dqid->di_flag;
+ lqs_putref(lqs);
+ } else {
+ CERROR("%s: failed to create a lqs for %sid %u"
+ "\n", obd->obd_name, i ? "g" : "u",
+ dqid->di_id);
+ }
+
+ OBD_FREE_PTR(dqid);
+ }
+ }
+}
/**
* lqs<->qctxt hash operations
* string hashing using djb2 hash algorithm
*/
static unsigned
-lqs_hash(lustre_hash_t *lh, void *key, unsigned mask)
+lqs_hash(cfs_hash_t *hs, void *key, unsigned mask)
{
struct quota_adjust_qunit *lqs_key;
unsigned hash;
static int
lqs_compare(void *key, struct hlist_node *hnode)
{
- struct quota_adjust_qunit *lqs_key;
struct lustre_qunit_size *q;
int rc;
ENTRY;
LASSERT(key);
- lqs_key = (struct quota_adjust_qunit *)key;
q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
spin_lock(&q->lqs_lock);
- rc = ((lqs_key->qaq_id == q->lqs_id) &&
- (QAQ_IS_GRP(lqs_key) == LQS_IS_GRP(q)));
+ rc = (q->lqs_key == *((unsigned long long *)key));
spin_unlock(&q->lqs_lock);
RETURN(rc);
static void *
lqs_get(struct hlist_node *hnode)
{
- struct lustre_qunit_size *q =
- hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
+ struct lustre_qunit_size *q =
+ hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
ENTRY;
- atomic_inc(&q->lqs_refcount);
- CDEBUG(D_QUOTA, "lqs=%p refcount %d\n",
- q, atomic_read(&q->lqs_refcount));
+ __lqs_getref(q);
RETURN(q);
}
static void *
lqs_put(struct hlist_node *hnode)
{
- struct lustre_qunit_size *q =
- hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
+ struct lustre_qunit_size *q =
+ hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
ENTRY;
- LASSERT(atomic_read(&q->lqs_refcount) > 0);
- atomic_dec(&q->lqs_refcount);
- CDEBUG(D_QUOTA, "lqs=%p refcount %d\n",
- q, atomic_read(&q->lqs_refcount));
+ __lqs_putref(q);
RETURN(q);
}
static void
lqs_exit(struct hlist_node *hnode)
{
- struct lustre_qunit_size *q;
+ struct lustre_qunit_size *q =
+ hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
ENTRY;
- q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash);
- /*
+ /*
* Nothing should be left. User of lqs put it and
* lqs also was deleted from table by this time
* so we should have 0 refs.
*/
- LASSERTF(atomic_read(&q->lqs_refcount) == 0,
+ LASSERTF(atomic_read(&q->lqs_refcount) == 0,
"Busy lqs %p with %d refs\n", q,
atomic_read(&q->lqs_refcount));
OBD_FREE_PTR(q);
EXIT;
}
-static lustre_hash_ops_t lqs_hash_ops = {
- .lh_hash = lqs_hash,
- .lh_compare = lqs_compare,
- .lh_get = lqs_get,
- .lh_put = lqs_put,
- .lh_exit = lqs_exit
+static cfs_hash_ops_t lqs_hash_ops = {
+ .hs_hash = lqs_hash,
+ .hs_compare = lqs_compare,
+ .hs_get = lqs_get,
+ .hs_put = lqs_put,
+ .hs_exit = lqs_exit
};
#endif /* HAVE_QUOTA_SUPPORT */