X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fquota%2Fquota_context.c;h=0a191136b2a70628b489a4881d72063027289d4a;hp=258b809cdf91216acffbd32f88c60bbbceeb5217;hb=2650455e68fea4442b43f39acc7e56fdc934867e;hpb=af3cf3ae556b2e3abe636e244358fa139d1f2830 diff --git a/lustre/quota/quota_context.c b/lustre/quota/quota_context.c index 258b809..0a19113 100644 --- a/lustre/quota/quota_context.c +++ b/lustre/quota/quota_context.c @@ -57,13 +57,12 @@ #include #include #include -#include #include #include "quota_internal.h" #ifdef HAVE_QUOTA_SUPPORT -static lustre_hash_ops_t lqs_hash_ops; +static cfs_hash_ops_t lqs_hash_ops; unsigned long default_bunit_sz = 128 * 1024 * 1024; /* 128M bytes */ unsigned long default_btune_ratio = 50; /* 50 percentage */ @@ -113,15 +112,17 @@ struct lustre_qunit { spinlock_t lq_lock; /** Protect the whole structure */ enum qunit_state lq_state; /** Present the status of qunit */ int lq_rc; /** The rc of lq_data */ + pid_t lq_owner; }; #define QUNIT_SET_STATE(qunit, state) \ do { \ spin_lock(&qunit->lq_lock); \ QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), " \ - "lq_rc(%d)\n", \ + "lq_rc(%d), lq_owner(%d)\n", \ qunit, qunit_state_names[qunit->lq_state], \ - qunit_state_names[state], qunit->lq_rc); \ + qunit_state_names[state], qunit->lq_rc, \ + qunit->lq_owner); \ qunit->lq_state = state; \ spin_unlock(&qunit->lq_lock); \ } while(0) @@ -131,14 +132,14 @@ do { \ spin_lock(&qunit->lq_lock); \ qunit->lq_rc = rc; \ QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), " \ - "lq_rc(%d)\n", \ + "lq_rc(%d), lq_owner(%d)\n", \ qunit, qunit_state_names[qunit->lq_state], \ - qunit_state_names[state], qunit->lq_rc); \ + qunit_state_names[state], qunit->lq_rc, \ + qunit->lq_owner); \ qunit->lq_state = state; \ spin_unlock(&qunit->lq_lock); \ } while(0) - int should_translate_quota (struct obd_import *imp) { ENTRY; @@ -245,7 +246,7 @@ check_cur_qunit(struct obd_device *obd, int ret = 0; ENTRY; - if (!sb_any_quota_enabled(sb)) + if (!ll_sb_any_quota_active(sb)) RETURN(0); spin_lock(&qctxt->lqc_lock); @@ -285,17 +286,12 @@ check_cur_qunit(struct obd_device *obd, if (!limit) GOTO(out, ret = 0); - search_lqs: - quota_search_lqs(qdata, NULL, qctxt, &lqs); - if (!lqs) { - CDEBUG(D_QUOTA, "Can't find the lustre qunit size!\n"); - ret = quota_create_lqs(qdata, NULL, qctxt, &lqs); - if (ret == -EALREADY) { - ret = 0; - goto search_lqs; - } - if (ret < 0) - GOTO (out, ret); + lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id), + qctxt, 0); + if (IS_ERR(lqs) || lqs == NULL) { + CERROR("fail to find a lqs for %sid: %u)!\n", + QDATA_IS_GRP(qdata) ? "g" : "u", qdata->qd_id); + GOTO (out, ret = 0); } spin_lock(&lqs->lqs_lock); @@ -341,11 +337,17 @@ check_cur_qunit(struct obd_device *obd, ret = 2; /* if there are other pending writes for this uid/gid, releasing * quota is put off until the last pending write b=16645 */ - if (ret == 2 && pending_write) { + /* if there is an ongoing quota request, a releasing request is aborted. + * That ongoing quota request will call this function again when + * it returned b=18630 */ + if (pending_write || record) { CDEBUG(D_QUOTA, "delay quota release\n"); ret = 0; } } + if (ret > 0) + quota_compute_lqs(qdata, lqs, 1, (ret == 1) ? 1 : 0); + CDEBUG(D_QUOTA, "type: %c, limit: "LPU64", usage: "LPU64 ", pending_write: "LPU64", record: "LPD64 ", qunit_sz: %lu, tune_sz: %lu, ret: %d.\n", @@ -355,6 +357,7 @@ check_cur_qunit(struct obd_device *obd, spin_unlock(&lqs->lqs_lock); lqs_putref(lqs); + EXIT; out: OBD_FREE_PTR(qctl); @@ -373,7 +376,7 @@ int compute_remquota(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, int ret = QUOTA_RET_OK; ENTRY; - if (!sb_any_quota_enabled(sb)) + if (!ll_sb_any_quota_active(sb)) RETURN(QUOTA_RET_NOQUOTA); /* ignore root user */ @@ -435,6 +438,7 @@ static struct lustre_qunit *alloc_qunit(struct lustre_quota_ctxt *qctxt, qunit->lq_opc = opc; qunit->lq_lock = SPIN_LOCK_UNLOCKED; QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0); + qunit->lq_owner = cfs_curproc_pid(); RETURN(qunit); } @@ -484,10 +488,12 @@ insert_qunit_nolock(struct lustre_quota_ctxt *qctxt, struct lustre_qunit *qunit) static void compute_lqs_after_removing_qunit(struct lustre_qunit *qunit) { - struct lustre_qunit_size *lqs = NULL; + struct lustre_qunit_size *lqs; - quota_search_lqs(&qunit->lq_data, NULL, qunit->lq_ctxt, &lqs); - if (lqs) { + lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(&qunit->lq_data), + qunit->lq_data.qd_id), + qunit->lq_ctxt, 0); + if (lqs && !IS_ERR(lqs)) { spin_lock(&lqs->lqs_lock); if (qunit->lq_opc == QUOTA_DQACQ) quota_compute_lqs(&qunit->lq_data, lqs, 0, 1); @@ -511,6 +517,71 @@ static void remove_qunit_nolock(struct lustre_qunit *qunit) qunit_put(qunit); } +void* quota_barrier(struct lustre_quota_ctxt *qctxt, + struct obd_quotactl *oqctl, int isblk) +{ + struct lustre_qunit *qunit, *find_qunit; + int cycle = 1; + + OBD_SLAB_ALLOC(qunit, qunit_cachep, CFS_ALLOC_IO, sizeof(*qunit)); + if (qunit == NULL) { + CERROR("locating %sunit failed for %sid %u\n", + isblk ? "b" : "i", oqctl->qc_type ? "g" : "u", + oqctl->qc_id); + qctxt_wait_pending_dqacq(qctxt, oqctl->qc_id, + oqctl->qc_type, isblk); + return NULL; + } + + INIT_LIST_HEAD(&qunit->lq_hash); + qunit->lq_lock = SPIN_LOCK_UNLOCKED; + init_waitqueue_head(&qunit->lq_waitq); + atomic_set(&qunit->lq_refcnt, 1); + qunit->lq_ctxt = qctxt; + qunit->lq_data.qd_id = oqctl->qc_id; + qunit->lq_data.qd_flags = oqctl->qc_type; + if (isblk) + QDATA_SET_BLK(&qunit->lq_data); + QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0); + /* it means it is only an invalid qunit for barrier */ + qunit->lq_opc = QUOTA_LAST_OPC; + + while (1) { + spin_lock(&qunit_hash_lock); + find_qunit = dqacq_in_flight(qctxt, &qunit->lq_data); + if (find_qunit) { + spin_unlock(&qunit_hash_lock); + qunit_put(find_qunit); + qctxt_wait_pending_dqacq(qctxt, oqctl->qc_id, + oqctl->qc_type, isblk); + CDEBUG(D_QUOTA, "cycle=%d\n", cycle++); + continue; + } + break; + } + insert_qunit_nolock(qctxt, qunit); + spin_unlock(&qunit_hash_lock); + return qunit; +} + +void quota_unbarrier(void *handle) +{ + struct lustre_qunit *qunit = (struct lustre_qunit *)handle; + + if (qunit == NULL) { + CERROR("handle is NULL\n"); + return; + } + + LASSERT(qunit->lq_opc == QUOTA_LAST_OPC); + spin_lock(&qunit_hash_lock); + remove_qunit_nolock(qunit); + spin_unlock(&qunit_hash_lock); + QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, QUOTA_REQ_RETURNED); + wake_up(&qunit->lq_waitq); + qunit_put(qunit); +} + #define INC_QLIMIT(limit, count) (limit == MIN_QLIMIT) ? \ (limit = count) : (limit += count) @@ -525,6 +596,20 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata, int opc, int wait, struct obd_trans_info *oti); +static inline void qdata_to_oqaq(struct qunit_data *qdata, + struct quota_adjust_qunit *oqaq) +{ + LASSERT(qdata); + LASSERT(oqaq); + + oqaq->qaq_flags = qdata->qd_flags; + oqaq->qaq_id = qdata->qd_id; + if (QDATA_IS_ADJBLK(qdata)) + oqaq->qaq_bunit_sz = qdata->qd_qunit; + if (QDATA_IS_ADJINO(qdata)) + oqaq->qaq_iunit_sz = qdata->qd_qunit; +} + static int dqacq_completion(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata, int rc, int opc) @@ -647,10 +732,8 @@ out: /* this is for dqacq_in_flight() */ qunit_put(qunit); - /* this is for alloc_qunit() */ - qunit_put(qunit); if (rc < 0 && rc != -EDQUOT) - RETURN(err); + GOTO(out1, err); /* don't reschedule in such cases: * - acq/rel failure and qunit isn't changed, @@ -660,21 +743,21 @@ out: */ OBD_ALLOC_PTR(oqaq); if (!oqaq) - RETURN(-ENOMEM); + GOTO(out1, err = -ENOMEM); qdata_to_oqaq(qdata, oqaq); /* adjust the qunit size in slaves */ rc1 = quota_adjust_slave_lqs(oqaq, qctxt); OBD_FREE_PTR(oqaq); if (rc1 < 0) { CERROR("adjust slave's qunit size failed!(rc:%d)\n", rc1); - RETURN(rc1); + GOTO(out1, err = rc1); } if (err || (rc < 0 && rc != -EBUSY && rc1 == 0) || is_master(qctxt)) - RETURN(err); + GOTO(out1, err); if (opc == QUOTA_DQREL && qdata->qd_count >= 5242880 && OBD_FAIL_CHECK(OBD_FAIL_QUOTA_DELAY_REL)) - RETURN(err); + GOTO(out1, err); /* reschedule another dqacq/dqrel if needed */ qdata->qd_count = 0; @@ -686,6 +769,9 @@ out: rc1 = schedule_dqacq(obd, qctxt, qdata, opc, 0, NULL); QDATA_DEBUG(qdata, "reschedudle opc(%d) rc(%d)\n", opc, rc1); } + out1: + /* this is for alloc_qunit() */ + qunit_put(qunit); RETURN(err); } @@ -703,30 +789,24 @@ static int dqacq_interpret(const struct lu_env *env, struct lustre_qunit *qunit = aa->aa_qunit; struct obd_device *obd = req->rq_import->imp_obd; struct qunit_data *qdata = NULL; - int rc1 = 0; ENTRY; LASSERT(req); LASSERT(req->rq_import); - /* there are several forms of qunit(historic causes), so we need to - * adjust qunit from slaves to the same form here */ - OBD_ALLOC(qdata, sizeof(struct qunit_data)); - if (!qdata) - RETURN(-ENOMEM); - down_read(&obt->obt_rwsem); /* if a quota req timeouts or is dropped, we should update quota * statistics which will be handled in dqacq_completion. And in * this situation we should get qdata from request instead of * reply */ - rc1 = quota_get_qdata(req, qdata, - (rc != 0) ? QUOTA_REQUEST : QUOTA_REPLY, - QUOTA_IMPORT); - if (rc1 < 0) { + qdata = quota_get_qdata(req, (rc != 0) ? QUOTA_REQUEST : QUOTA_REPLY, + QUOTA_IMPORT); + if (IS_ERR(qdata)) { + rc = PTR_ERR(qdata); DEBUG_REQ(D_ERROR, req, - "error unpacking qunit_data(rc: %d)\n", rc1); - GOTO(exit, rc = rc1); + "error unpacking qunit_data(rc: %ld)\n", + PTR_ERR(qdata)); + qdata = &qunit->lq_data; } QDATA_DEBUG(qdata, "qdata: interpret rc(%d).\n", rc); @@ -734,14 +814,14 @@ static int dqacq_interpret(const struct lu_env *env, if (qdata->qd_id != qunit->lq_data.qd_id || OBD_FAIL_CHECK(OBD_FAIL_QUOTA_RET_QDATA)) { - CDEBUG(D_ERROR, "the returned qd_id isn't expected!" + CERROR("the returned qd_id isn't expected!" "(qdata: %u, lq_data: %u)\n", qdata->qd_id, qunit->lq_data.qd_id); qdata->qd_id = qunit->lq_data.qd_id; rc = -EPROTO; } if (QDATA_IS_GRP(qdata) != QDATA_IS_GRP(&qunit->lq_data)) { - CDEBUG(D_ERROR, "the returned grp/usr isn't expected!" + CERROR("the returned grp/usr isn't expected!" "(qdata: %u, lq_data: %u)\n", qdata->qd_flags, qunit->lq_data.qd_flags); if (QDATA_IS_GRP(&qunit->lq_data)) @@ -751,19 +831,20 @@ static int dqacq_interpret(const struct lu_env *env, rc = -EPROTO; } if (qdata->qd_count > qunit->lq_data.qd_count) { - CDEBUG(D_ERROR, "the returned qd_count isn't expected!" + CERROR("the returned qd_count isn't expected!" "(qdata: "LPU64", lq_data: "LPU64")\n", qdata->qd_count, qunit->lq_data.qd_count); rc = -EPROTO; } + if (unlikely(rc == -ESRCH)) + CERROR("quota for %s has been enabled by master, but disabled " + "by slave.\n", QDATA_IS_GRP(qdata) ? "group" : "user"); + rc = dqacq_completion(obd, qctxt, qdata, rc, lustre_msg_get_opc(req->rq_reqmsg)); -exit: up_read(&obt->obt_rwsem); - OBD_FREE(qdata, sizeof(struct qunit_data)); - RETURN(rc); } @@ -808,7 +889,7 @@ void dqacq_interrupt(struct lustre_quota_ctxt *qctxt) EXIT; } -static int got_qunit(struct lustre_qunit *qunit) +static int got_qunit(struct lustre_qunit *qunit, int is_master) { struct lustre_quota_ctxt *qctxt = qunit->lq_ctxt; int rc = 0; @@ -829,13 +910,25 @@ static int got_qunit(struct lustre_qunit *qunit) if (!rc) { spin_lock(&qctxt->lqc_lock); - rc = !qctxt->lqc_import || !qctxt->lqc_valid; + rc = !qctxt->lqc_valid; + if (!is_master) + rc |= !qctxt->lqc_import; spin_unlock(&qctxt->lqc_lock); } RETURN(rc); } +static inline void +revoke_lqs_rec(struct lustre_qunit_size *lqs, struct qunit_data *qdata, int opc) +{ + /* revoke lqs_xxx_rec which is computed in check_cur_qunit + * b=18630 */ + spin_lock(&lqs->lqs_lock); + quota_compute_lqs(qdata, lqs, 0, (opc == QUOTA_DQACQ) ? 1 : 0); + spin_unlock(&lqs->lqs_lock); +} + static int schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata, int opc, int wait, @@ -855,8 +948,22 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, LASSERT(opc == QUOTA_DQACQ || opc == QUOTA_DQREL); do_gettimeofday(&work_start); - if ((empty = alloc_qunit(qctxt, qdata, opc)) == NULL) + + lqs = quota_search_lqs(LQS_KEY(QDATA_IS_GRP(qdata), qdata->qd_id), + qctxt, 0); + if (lqs == NULL || IS_ERR(lqs)) { + CERROR("Can't find the lustre qunit size!\n"); + RETURN(-EPERM); + } + + if ((empty = alloc_qunit(qctxt, qdata, opc)) == NULL) { + revoke_lqs_rec(lqs, qdata, opc); + /* this is for quota_search_lqs */ + lqs_putref(lqs); RETURN(-ENOMEM); + } + + OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_DELAY_SD, 5); spin_lock(&qunit_hash_lock); qunit = dqacq_in_flight(qctxt, qdata); @@ -864,6 +971,9 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, spin_unlock(&qunit_hash_lock); qunit_put(empty); + revoke_lqs_rec(lqs, qdata, opc); + /* this is for quota_search_lqs */ + lqs_putref(lqs); goto wait_completion; } qunit = empty; @@ -871,18 +981,12 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, insert_qunit_nolock(qctxt, qunit); spin_unlock(&qunit_hash_lock); - quota_search_lqs(qdata, NULL, qctxt, &lqs); - if (lqs) { - spin_lock(&lqs->lqs_lock); - quota_compute_lqs(qdata, lqs, 1, (opc == QUOTA_DQACQ) ? 1 : 0); - /* when this qdata returned from mds, it will call lqs_putref */ - lqs_getref(lqs); - spin_unlock(&lqs->lqs_lock); - /* this is for quota_search_lqs */ - lqs_putref(lqs); - } else { - CDEBUG(D_ERROR, "Can't find the lustre qunit size!\n"); - } + /* From here, the quota request will be sent anyway. + * When this qdata request returned or is cancelled, + * lqs_putref will be called at that time */ + lqs_getref(lqs); + /* this is for quota_search_lqs */ + lqs_putref(lqs); QDATA_DEBUG(qdata, "obd(%s): send %s quota req\n", obd->obd_name, (opc == QUOTA_DQACQ) ? "acq" : "rel"); @@ -958,7 +1062,7 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, LUSTRE_MDS_VERSION, opc); class_import_put(imp); if (req == NULL) { - CDEBUG(D_ERROR, "Can't alloc request\n"); + CERROR("Can't alloc request\n"); dqacq_completion(obd, qctxt, qdata, -ENOMEM, opc); /* this is for qunit_get() */ qunit_put(qunit); @@ -969,7 +1073,7 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, req->rq_no_resend = req->rq_no_delay = 1; rc = quota_copy_qdata(req, qdata, QUOTA_REQUEST, QUOTA_IMPORT); if (rc < 0) { - CDEBUG(D_ERROR, "Can't pack qunit_data(rc: %d)\n", rc); + CERROR("Can't pack qunit_data(rc: %d)\n", rc); ptlrpc_req_finished(req); dqacq_completion(obd, qctxt, qdata, -EPROTO, opc); /* this is for qunit_get() */ @@ -992,7 +1096,8 @@ wait_completion: struct qunit_data *p = &qunit->lq_data; QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit); - l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi); + l_wait_event(qunit->lq_waitq, got_qunit(qunit, is_master(qctxt)), + &lwi); /* rc = -EAGAIN, it means the quota master isn't ready yet * rc = QUOTA_REQ_RETURNED, it means a quota req is finished; * rc = -EDQUOT, it means out of quota @@ -1002,8 +1107,9 @@ wait_completion: spin_lock(&qunit->lq_lock); rc = qunit->lq_rc; spin_unlock(&qunit->lq_lock); - CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n", - qunit, rc); + CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: id(%u) flag(%u) " + "rc(%d) owner(%d)\n", qunit, qunit->lq_data.qd_id, + qunit->lq_data.qd_flags, rc, qunit->lq_owner); } qunit_put(qunit); @@ -1023,16 +1129,14 @@ wait_completion: int qctxt_adjust_qunit(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, - uid_t uid, gid_t gid, __u32 isblk, int wait, + const unsigned int id[], __u32 isblk, int wait, struct obd_trans_info *oti) { int rc = 0, i = USRQUOTA; - __u32 id[MAXQUOTAS] = { uid, gid }; struct qunit_data qdata[MAXQUOTAS]; ENTRY; - CLASSERT(MAXQUOTAS < 4); - if (!sb_any_quota_enabled(qctxt->lqc_sb)) + if (quota_is_set(obd, id, isblk ? QB_SET : QI_SET) == 0) RETURN(0); for (i = 0; i < MAXQUOTAS; i++) { @@ -1091,9 +1195,10 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id, struct qunit_data *p = &qunit->lq_data; QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit); - l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi); - CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n", - qunit, qunit->lq_rc); + l_wait_event(qunit->lq_waitq, got_qunit(qunit, is_master(qctxt)), + &lwi); + CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: rc(%d) " + "owner(%d)\n", qunit, qunit->lq_rc, qunit->lq_owner); /* keep same as schedule_dqacq() b=17030 */ spin_lock(&qunit->lq_lock); rc = qunit->lq_rc; @@ -1134,6 +1239,8 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler) RETURN(rc); cfs_waitq_init(&qctxt->lqc_wait_for_qmaster); + cfs_waitq_init(&qctxt->lqc_lqs_waitq); + atomic_set(&qctxt->lqc_lqs, 0); spin_lock_init(&qctxt->lqc_lock); spin_lock(&qctxt->lqc_lock); qctxt->lqc_handler = handler; @@ -1158,10 +1265,10 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler) qctxt->lqc_sync_blk = 0; spin_unlock(&qctxt->lqc_lock); - qctxt->lqc_lqs_hash = lustre_hash_init("LQS_HASH", - HASH_LQS_CUR_BITS, - HASH_LQS_MAX_BITS, - &lqs_hash_ops, 0); + qctxt->lqc_lqs_hash = cfs_hash_create("LQS_HASH", + HASH_LQS_CUR_BITS, + HASH_LQS_MAX_BITS, + &lqs_hash_ops, CFS_HASH_REHASH); if (!qctxt->lqc_lqs_hash) { CERROR("initialize hash lqs for %s error!\n", obd->obd_name); RETURN(-ENOMEM); @@ -1176,10 +1283,27 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler) RETURN(rc); } +static int check_lqs(struct lustre_quota_ctxt *qctxt) +{ + int rc; + ENTRY; + + rc = !atomic_read(&qctxt->lqc_lqs); + + RETURN(rc); +} + + +void hash_put_lqs(void *obj, void *data) +{ + lqs_putref((struct lustre_qunit_size *)obj); +} + void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force) { struct lustre_qunit *qunit, *tmp; struct list_head tmp_list; + struct l_wait_info lwi = { 0 }; struct obd_device_target *obt = qctxt->lqc_obt; int i; ENTRY; @@ -1211,11 +1335,6 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force) qunit_put(qunit); } - down_write(&obt->obt_rwsem); - lustre_hash_exit(qctxt->lqc_lqs_hash); - qctxt->lqc_lqs_hash = NULL; - up_write(&obt->obt_rwsem); - /* after qctxt_cleanup, qctxt might be freed, then check_qm() is * unpredicted. So we must wait until lqc_wait_for_qmaster is empty */ while (cfs_waitq_active(&qctxt->lqc_wait_for_qmaster)) { @@ -1224,6 +1343,13 @@ void qctxt_cleanup(struct lustre_quota_ctxt *qctxt, int force) cfs_time_seconds(1)); } + cfs_hash_for_each_safe(qctxt->lqc_lqs_hash, hash_put_lqs, NULL); + l_wait_event(qctxt->lqc_lqs_waitq, check_lqs(qctxt), &lwi); + down_write(&obt->obt_rwsem); + cfs_hash_destroy(qctxt->lqc_lqs_hash); + qctxt->lqc_lqs_hash = NULL; + up_write(&obt->obt_rwsem); + ptlrpcd_decref(); #ifdef LPROCFS @@ -1250,13 +1376,22 @@ static int qslave_recovery_main(void *arg) int rc = 0; ENTRY; - ptlrpc_daemonize("qslave_recovd"); + cfs_daemonize_ctxt("qslave_recovd"); + + /* for obdfilter */ + class_incref(obd, "qslave_recovd_filter", obd); complete(&data->comp); - if (qctxt->lqc_recovery) + spin_lock(&qctxt->lqc_lock); + if (qctxt->lqc_recovery) { + spin_unlock(&qctxt->lqc_lock); + class_decref(obd, "qslave_recovd_filter", obd); RETURN(0); - qctxt->lqc_recovery = 1; + } else { + qctxt->lqc_recovery = 1; + spin_unlock(&qctxt->lqc_lock); + } for (type = USRQUOTA; type < MAXQUOTAS; type++) { struct qunit_data qdata; @@ -1266,7 +1401,7 @@ static int qslave_recovery_main(void *arg) int ret; LOCK_DQONOFF_MUTEX(dqopt); - if (!sb_has_quota_enabled(qctxt->lqc_sb, type)) { + if (!ll_sb_has_quota_active(qctxt->lqc_sb, type)) { UNLOCK_DQONOFF_MUTEX(dqopt); break; } @@ -1307,16 +1442,18 @@ static int qslave_recovery_main(void *arg) rc = 0; } - if (rc) - CDEBUG(rc == -EBUSY ? D_QUOTA : D_ERROR, - "qslave recovery failed! (id:%d type:%d " + if (rc && rc != -EBUSY) + CERROR("qslave recovery failed! (id:%d type:%d " " rc:%d)\n", dqid->di_id, type, rc); free: - kfree(dqid); + OBD_FREE_PTR(dqid); } } + spin_lock(&qctxt->lqc_lock); qctxt->lqc_recovery = 0; + spin_unlock(&qctxt->lqc_lock); + class_decref(obd, "qslave_recovd_filter", obd); RETURN(rc); } @@ -1327,7 +1464,7 @@ qslave_start_recovery(struct obd_device *obd, struct lustre_quota_ctxt *qctxt) int rc; ENTRY; - if (!sb_any_quota_enabled(qctxt->lqc_sb)) + if (!ll_sb_any_quota_active(qctxt->lqc_sb)) goto exit; data.obd = obd; @@ -1344,6 +1481,87 @@ exit: EXIT; } +int quota_is_on(struct lustre_quota_ctxt *qctxt, struct obd_quotactl *oqctl) +{ + unsigned int type; + + for (type = USRQUOTA; type < MAXQUOTAS; type++) { + if (!Q_TYPESET(oqctl, type)) + continue; + if (!(qctxt->lqc_flags & UGQUOTA2LQC(oqctl->qc_type))) + return 0; + } + return 1; +} + +int quota_is_off(struct lustre_quota_ctxt *qctxt, struct obd_quotactl *oqctl) +{ + unsigned int type; + + for (type = USRQUOTA; type < MAXQUOTAS; type++) { + if (!Q_TYPESET(oqctl, type)) + continue; + if (qctxt->lqc_flags & UGQUOTA2LQC(oqctl->qc_type)) + return 0; + } + return 1; +} + +/** + * When quotaon, build a lqs for every uid/gid who has been set limitation + * for quota. After quota_search_lqs, it will hold one ref for the lqs. + * It will be released when qctxt_cleanup() is executed b=18574 + * + * Should be called with obt->obt_quotachecking held. b=20152 + */ +void build_lqs(struct obd_device *obd) +{ + struct obd_device_target *obt = &obd->u.obt; + struct lustre_quota_ctxt *qctxt = &obt->obt_qctxt; + struct list_head id_list; + int i, rc; + + LASSERT_SEM_LOCKED(&obt->obt_quotachecking); + INIT_LIST_HEAD(&id_list); + for (i = 0; i < MAXQUOTAS; i++) { + struct dquot_id *dqid, *tmp; + + if (sb_dqopt(qctxt->lqc_sb)->files[i] == NULL) + continue; + +#ifndef KERNEL_SUPPORTS_QUOTA_READ + rc = fsfilt_qids(obd, sb_dqopt(qctxt->lqc_sb)->files[i], NULL, + i, &id_list); +#else + rc = fsfilt_qids(obd, NULL, sb_dqopt(qctxt->lqc_sb)->files[i], + i, &id_list); +#endif + if (rc) { + CERROR("%s: failed to get %s qids!\n", obd->obd_name, + i ? "group" : "user"); + continue; + } + + list_for_each_entry_safe(dqid, tmp, &id_list, + di_link) { + struct lustre_qunit_size *lqs; + + list_del_init(&dqid->di_link); + lqs = quota_search_lqs(LQS_KEY(i, dqid->di_id), + qctxt, 1); + if (lqs && !IS_ERR(lqs)) { + lqs->lqs_flags |= dqid->di_flag; + lqs_putref(lqs); + } else { + CERROR("%s: failed to create a lqs for %sid %u" + "\n", obd->obd_name, i ? "g" : "u", + dqid->di_id); + } + + OBD_FREE_PTR(dqid); + } + } +} /** * lqs<->qctxt hash operations @@ -1353,7 +1571,7 @@ exit: * string hashing using djb2 hash algorithm */ static unsigned -lqs_hash(lustre_hash_t *lh, void *key, unsigned mask) +lqs_hash(cfs_hash_t *hs, void *key, unsigned mask) { struct quota_adjust_qunit *lqs_key; unsigned hash; @@ -1369,18 +1587,15 @@ lqs_hash(lustre_hash_t *lh, void *key, unsigned mask) static int lqs_compare(void *key, struct hlist_node *hnode) { - struct quota_adjust_qunit *lqs_key; struct lustre_qunit_size *q; int rc; ENTRY; LASSERT(key); - lqs_key = (struct quota_adjust_qunit *)key; q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); spin_lock(&q->lqs_lock); - rc = ((lqs_key->qaq_id == q->lqs_id) && - (QAQ_IS_GRP(lqs_key) == LQS_IS_GRP(q))); + rc = (q->lqs_key == *((unsigned long long *)key)); spin_unlock(&q->lqs_lock); RETURN(rc); @@ -1390,12 +1605,10 @@ static void * lqs_get(struct hlist_node *hnode) { struct lustre_qunit_size *q = - hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); + hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); ENTRY; - atomic_inc(&q->lqs_refcount); - CDEBUG(D_QUOTA, "lqs=%p refcount %d\n", - q, atomic_read(&q->lqs_refcount)); + __lqs_getref(q); RETURN(q); } @@ -1404,13 +1617,10 @@ static void * lqs_put(struct hlist_node *hnode) { struct lustre_qunit_size *q = - hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); + hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); ENTRY; - LASSERT(atomic_read(&q->lqs_refcount) > 0); - atomic_dec(&q->lqs_refcount); - CDEBUG(D_QUOTA, "lqs=%p refcount %d\n", - q, atomic_read(&q->lqs_refcount)); + __lqs_putref(q); RETURN(q); } @@ -1418,10 +1628,10 @@ lqs_put(struct hlist_node *hnode) static void lqs_exit(struct hlist_node *hnode) { - struct lustre_qunit_size *q; + struct lustre_qunit_size *q = + hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); ENTRY; - q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); /* * Nothing should be left. User of lqs put it and * lqs also was deleted from table by this time @@ -1434,11 +1644,11 @@ lqs_exit(struct hlist_node *hnode) EXIT; } -static lustre_hash_ops_t lqs_hash_ops = { - .lh_hash = lqs_hash, - .lh_compare = lqs_compare, - .lh_get = lqs_get, - .lh_put = lqs_put, - .lh_exit = lqs_exit +static cfs_hash_ops_t lqs_hash_ops = { + .hs_hash = lqs_hash, + .hs_compare = lqs_compare, + .hs_get = lqs_get, + .hs_put = lqs_put, + .hs_exit = lqs_exit }; #endif /* HAVE_QUOTA_SUPPORT */