X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fquota%2Fquota_context.c;h=82a5762b2523bbe02c4c93a175a56c6e2a3b9f44;hb=a328a628aaab22311020dff7bd99a369aa867124;hp=4aac3ae5f7c6dbf54e2129acf8dbc65335ce41fe;hpb=cefa8cda2ba2d288ccaa4ec077a6c627592503ea;p=fs%2Flustre-release.git diff --git a/lustre/quota/quota_context.c b/lustre/quota/quota_context.c index 4aac3ae..82a5762 100644 --- a/lustre/quota/quota_context.c +++ b/lustre/quota/quota_context.c @@ -113,15 +113,17 @@ struct lustre_qunit { spinlock_t lq_lock; /** Protect the whole structure */ enum qunit_state lq_state; /** Present the status of qunit */ int lq_rc; /** The rc of lq_data */ + pid_t lq_owner; }; #define QUNIT_SET_STATE(qunit, state) \ do { \ spin_lock(&qunit->lq_lock); \ QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), " \ - "lq_rc(%d)\n", \ + "lq_rc(%d), lq_owner(%d)\n", \ qunit, qunit_state_names[qunit->lq_state], \ - qunit_state_names[state], qunit->lq_rc); \ + qunit_state_names[state], qunit->lq_rc, \ + qunit->lq_owner); \ qunit->lq_state = state; \ spin_unlock(&qunit->lq_lock); \ } while(0) @@ -131,9 +133,10 @@ do { \ spin_lock(&qunit->lq_lock); \ qunit->lq_rc = rc; \ QDATA_DEBUG((&qunit->lq_data), "qunit(%p) lq_state(%s->%s), " \ - "lq_rc(%d)\n", \ + "lq_rc(%d), lq_owner(%d)\n", \ qunit, qunit_state_names[qunit->lq_state], \ - qunit_state_names[state], qunit->lq_rc); \ + qunit_state_names[state], qunit->lq_rc, \ + qunit->lq_owner); \ qunit->lq_state = state; \ spin_unlock(&qunit->lq_lock); \ } while(0) @@ -302,7 +305,7 @@ check_cur_qunit(struct obd_device *obd, if (QDATA_IS_BLK(qdata)) { qunit_sz = lqs->lqs_bunit_sz; tune_sz = lqs->lqs_btune_sz; - pending_write = lqs->lqs_bwrite_pending * CFS_PAGE_SIZE; + pending_write = lqs->lqs_bwrite_pending; record = lqs->lqs_blk_rec; LASSERT(!(qunit_sz % QUOTABLOCK_SIZE)); } else { @@ -417,26 +420,13 @@ out: return ret; } -/* caller must hold qunit_hash_lock */ -static struct lustre_qunit *dqacq_in_flight(struct lustre_quota_ctxt *qctxt, - struct qunit_data *qdata) -{ - unsigned int hashent = qunit_hashfn(qctxt, qdata); - struct lustre_qunit *qunit; - ENTRY; - - LASSERT_SPIN_LOCKED(&qunit_hash_lock); - qunit = find_qunit(hashent, qctxt, qdata); - RETURN(qunit); -} - static struct lustre_qunit *alloc_qunit(struct lustre_quota_ctxt *qctxt, struct qunit_data *qdata, int opc) { struct lustre_qunit *qunit = NULL; ENTRY; - OBD_SLAB_ALLOC(qunit, qunit_cachep, CFS_ALLOC_IO, sizeof(*qunit)); + OBD_SLAB_ALLOC_PTR_GFP(qunit, qunit_cachep, CFS_ALLOC_IO); if (qunit == NULL) RETURN(NULL); @@ -448,6 +438,7 @@ static struct lustre_qunit *alloc_qunit(struct lustre_quota_ctxt *qctxt, qunit->lq_opc = opc; qunit->lq_lock = SPIN_LOCK_UNLOCKED; QUNIT_SET_STATE_AND_RC(qunit, QUNIT_CREATED, 0); + qunit->lq_owner = cfs_curproc_pid(); RETURN(qunit); } @@ -468,6 +459,21 @@ static void qunit_put(struct lustre_qunit *qunit) free_qunit(qunit); } +/* caller must hold qunit_hash_lock and release ref of qunit after using it */ +static struct lustre_qunit *dqacq_in_flight(struct lustre_quota_ctxt *qctxt, + struct qunit_data *qdata) +{ + unsigned int hashent = qunit_hashfn(qctxt, qdata); + struct lustre_qunit *qunit; + ENTRY; + + LASSERT_SPIN_LOCKED(&qunit_hash_lock); + qunit = find_qunit(hashent, qctxt, qdata); + if (qunit) + qunit_get(qunit); + RETURN(qunit); +} + static void insert_qunit_nolock(struct lustre_quota_ctxt *qctxt, struct lustre_qunit *qunit) { @@ -497,7 +503,6 @@ static void compute_lqs_after_removing_qunit(struct lustre_qunit *qunit) /* this is for schedule_dqacq */ lqs_putref(lqs); } - } static void remove_qunit_nolock(struct lustre_qunit *qunit) @@ -539,6 +544,10 @@ dqacq_completion(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, QDATA_DEBUG(qdata, "obd(%s): complete %s quota req\n", obd->obd_name, (opc == QUOTA_DQACQ) ? "acq" : "rel"); + /* do it only when a releasing quota req more than 5MB b=18491 */ + if (opc == QUOTA_DQREL && qdata->qd_count >= 5242880) + OBD_FAIL_TIMEOUT(OBD_FAIL_QUOTA_DELAY_REL, 5); + /* update local operational quota file */ if (rc == 0) { __u64 count = QUSG(qdata->qd_count, QDATA_IS_BLK(qdata)); @@ -634,10 +643,15 @@ out: compute_lqs_after_removing_qunit(qunit); - /* wake up all waiters */ + if (rc == 0) + rc = QUOTA_REQ_RETURNED; QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, rc); + /* wake up all waiters */ wake_up_all(&qunit->lq_waitq); + /* this is for dqacq_in_flight() */ + qunit_put(qunit); + /* this is for alloc_qunit() */ qunit_put(qunit); if (rc < 0 && rc != -EDQUOT) RETURN(err); @@ -659,8 +673,12 @@ out: CERROR("adjust slave's qunit size failed!(rc:%d)\n", rc1); RETURN(rc1); } - if (err || (rc && rc != -EBUSY && rc1 == 0) || is_master(qctxt)) - RETURN(err); + if (err || (rc < 0 && rc != -EBUSY && rc1 == 0) || is_master(qctxt)) + RETURN(err); + + if (opc == QUOTA_DQREL && qdata->qd_count >= 5242880 && + OBD_FAIL_CHECK(OBD_FAIL_QUOTA_DELAY_REL)) + RETURN(err); /* reschedule another dqacq/dqrel if needed */ qdata->qd_count = 0; @@ -689,30 +707,24 @@ static int dqacq_interpret(const struct lu_env *env, struct lustre_qunit *qunit = aa->aa_qunit; struct obd_device *obd = req->rq_import->imp_obd; struct qunit_data *qdata = NULL; - int rc1 = 0; ENTRY; LASSERT(req); LASSERT(req->rq_import); - /* there are several forms of qunit(historic causes), so we need to - * adjust qunit from slaves to the same form here */ - OBD_ALLOC(qdata, sizeof(struct qunit_data)); - if (!qdata) - RETURN(-ENOMEM); - down_read(&obt->obt_rwsem); /* if a quota req timeouts or is dropped, we should update quota * statistics which will be handled in dqacq_completion. And in * this situation we should get qdata from request instead of * reply */ - rc1 = quota_get_qdata(req, qdata, - (rc != 0) ? QUOTA_REQUEST : QUOTA_REPLY, - QUOTA_IMPORT); - if (rc1 < 0) { + qdata = quota_get_qdata(req, (rc != 0) ? QUOTA_REQUEST : QUOTA_REPLY, + QUOTA_IMPORT); + if (IS_ERR(qdata)) { + rc = PTR_ERR(qdata); DEBUG_REQ(D_ERROR, req, - "error unpacking qunit_data(rc: %d)\n", rc1); - GOTO(exit, rc = rc1); + "error unpacking qunit_data(rc: %ld)\n", + PTR_ERR(qdata)); + RETURN(PTR_ERR(qdata)); } QDATA_DEBUG(qdata, "qdata: interpret rc(%d).\n", rc); @@ -746,10 +758,7 @@ static int dqacq_interpret(const struct lu_env *env, rc = dqacq_completion(obd, qctxt, qdata, rc, lustre_msg_get_opc(req->rq_reqmsg)); -exit: up_read(&obt->obt_rwsem); - OBD_FREE(qdata, sizeof(struct qunit_data)); - RETURN(rc); } @@ -769,25 +778,56 @@ int check_qm(struct lustre_quota_ctxt *qctxt) RETURN(rc); } +/* wake up all waiting threads when lqc_import is NULL */ +void dqacq_interrupt(struct lustre_quota_ctxt *qctxt) +{ + struct lustre_qunit *qunit, *tmp; + int i; + ENTRY; + + spin_lock(&qunit_hash_lock); + for (i = 0; i < NR_DQHASH; i++) { + list_for_each_entry_safe(qunit, tmp, &qunit_hash[i], lq_hash) { + if (qunit->lq_ctxt != qctxt) + continue; + + /* Wake up all waiters. Do not change lq_state. + * The waiters will check lq_rc which is kept as 0 + * if no others change it, then the waiters will return + * -EAGAIN to caller who can perform related quota + * acq/rel if necessary. */ + wake_up_all(&qunit->lq_waitq); + } + } + spin_unlock(&qunit_hash_lock); + EXIT; +} + static int got_qunit(struct lustre_qunit *qunit) { - int rc; + struct lustre_quota_ctxt *qctxt = qunit->lq_ctxt; + int rc = 0; ENTRY; spin_lock(&qunit->lq_lock); switch (qunit->lq_state) { case QUNIT_IN_HASH: case QUNIT_RM_FROM_HASH: - rc = 0; break; case QUNIT_FINISHED: rc = 1; break; default: - rc = 0; CERROR("invalid qunit state %d\n", qunit->lq_state); } spin_unlock(&qunit->lq_lock); + + if (!rc) { + spin_lock(&qctxt->lqc_lock); + rc = !qctxt->lqc_import || !qctxt->lqc_valid; + spin_unlock(&qctxt->lqc_lock); + } + RETURN(rc); } @@ -816,19 +856,16 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, spin_lock(&qunit_hash_lock); qunit = dqacq_in_flight(qctxt, qdata); if (qunit) { - if (wait) - qunit_get(qunit); spin_unlock(&qunit_hash_lock); qunit_put(empty); goto wait_completion; } qunit = empty; + qunit_get(qunit); insert_qunit_nolock(qctxt, qunit); spin_unlock(&qunit_hash_lock); - LASSERT(qunit); - quota_search_lqs(qdata, NULL, qctxt, &lqs); if (lqs) { spin_lock(&lqs->lqs_lock); @@ -852,6 +889,9 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, QDATA_SET_CHANGE_QS(qdata); rc = qctxt->lqc_handler(obd, qdata, opc); rc2 = dqacq_completion(obd, qctxt, qdata, rc, opc); + /* this is for qunit_get() */ + qunit_put(qunit); + do_gettimeofday(&work_end); timediff = cfs_timeval_sub(&work_end, &work_start, NULL); if (opc == QUOTA_DQACQ) @@ -879,6 +919,9 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, QUNIT_SET_STATE_AND_RC(qunit, QUNIT_FINISHED, -EAGAIN); wake_up_all(&qunit->lq_waitq); + /* this is for qunit_get() */ + qunit_put(qunit); + /* this for alloc_qunit() */ qunit_put(qunit); spin_lock(&qctxt->lqc_lock); if (wait && !qctxt->lqc_import) { @@ -892,7 +935,8 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, l_wait_event(qctxt->lqc_wait_for_qmaster, check_qm(qctxt), &lwi); CDEBUG(D_QUOTA, "wake up when quota master is back\n"); - lc_watchdog_touch(oti->oti_thread->t_watchdog); + lc_watchdog_touch(oti->oti_thread->t_watchdog, + GET_TIMEOUT(oti->oti_thread->t_svc)); } else { spin_unlock(&qctxt->lqc_lock); } @@ -911,6 +955,8 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, if (req == NULL) { CDEBUG(D_ERROR, "Can't alloc request\n"); dqacq_completion(obd, qctxt, qdata, -ENOMEM, opc); + /* this is for qunit_get() */ + qunit_put(qunit); RETURN(-ENOMEM); } @@ -921,12 +967,11 @@ schedule_dqacq(struct obd_device *obd, struct lustre_quota_ctxt *qctxt, CDEBUG(D_ERROR, "Can't pack qunit_data(rc: %d)\n", rc); ptlrpc_req_finished(req); dqacq_completion(obd, qctxt, qdata, -EPROTO, opc); + /* this is for qunit_get() */ + qunit_put(qunit); RETURN(rc); } - if (wait && qunit) - qunit_get(qunit); - CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args)); aa = ptlrpc_req_async_args(req); aa->aa_ctxt = qctxt; @@ -943,22 +988,21 @@ wait_completion: QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit); l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi); - /* rc = -EAGAIN, it means a quota req is finished; + /* rc = -EAGAIN, it means the quota master isn't ready yet + * rc = QUOTA_REQ_RETURNED, it means a quota req is finished; * rc = -EDQUOT, it means out of quota * rc = -EBUSY, it means recovery is happening * other rc < 0, it means real errors, functions who call * schedule_dqacq should take care of this */ spin_lock(&qunit->lq_lock); - if (qunit->lq_rc == 0) - rc = -EAGAIN; - else - rc = qunit->lq_rc; + rc = qunit->lq_rc; spin_unlock(&qunit->lq_lock); - CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n", - qunit, rc); - qunit_put(qunit); + CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: id(%u) flag(%u) " + "rc(%d) owner(%d)\n", qunit, qunit->lq_data.qd_id, + qunit->lq_data.qd_flags, rc, qunit->lq_owner); } + qunit_put(qunit); do_gettimeofday(&work_end); timediff = cfs_timeval_sub(&work_end, &work_start, NULL); if (opc == QUOTA_DQACQ) @@ -1037,11 +1081,6 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id, spin_lock(&qunit_hash_lock); qunit = dqacq_in_flight(qctxt, &qdata); - if (qunit) - /* grab reference on this qunit to handle races with - * dqacq_completion(). Otherwise, this qunit could be freed just - * after we release the qunit_hash_lock */ - qunit_get(qunit); spin_unlock(&qunit_hash_lock); if (qunit) { @@ -1049,8 +1088,13 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id, QDATA_DEBUG(p, "qunit(%p) is waiting for dqacq.\n", qunit); l_wait_event(qunit->lq_waitq, got_qunit(qunit), &lwi); - CDEBUG(D_QUOTA, "qunit(%p) finishes waiting. (rc:%d)\n", - qunit, qunit->lq_rc); + CDEBUG(D_QUOTA, "qunit(%p) finishes waiting: rc(%d) " + "owner(%d)\n", qunit, qunit->lq_rc, qunit->lq_owner); + /* keep same as schedule_dqacq() b=17030 */ + spin_lock(&qunit->lq_lock); + rc = qunit->lq_rc; + spin_unlock(&qunit->lq_lock); + /* this is for dqacq_in_flight() */ qunit_put(qunit); do_gettimeofday(&work_end); timediff = cfs_timeval_sub(&work_end, &work_start, NULL); @@ -1058,13 +1102,6 @@ qctxt_wait_pending_dqacq(struct lustre_quota_ctxt *qctxt, unsigned int id, isblk ? LQUOTA_WAIT_PENDING_BLK_QUOTA : LQUOTA_WAIT_PENDING_INO_QUOTA, timediff); - /* keep same as schedule_dqacq() b=17030 */ - spin_lock(&qunit->lq_lock); - if (qunit->lq_rc == 0) - rc = -EAGAIN; - else - rc = qunit->lq_rc; - spin_unlock(&qunit->lq_lock); } else { do_gettimeofday(&work_end); timediff = cfs_timeval_sub(&work_end, &work_start, NULL); @@ -1117,7 +1154,9 @@ qctxt_init(struct obd_device *obd, dqacq_handler_t handler) qctxt->lqc_sync_blk = 0; spin_unlock(&qctxt->lqc_lock); - qctxt->lqc_lqs_hash = lustre_hash_init("LQS_HASH", 7, 7, + qctxt->lqc_lqs_hash = lustre_hash_init("LQS_HASH", + HASH_LQS_CUR_BITS, + HASH_LQS_MAX_BITS, &lqs_hash_ops, 0); if (!qctxt->lqc_lqs_hash) { CERROR("initialize hash lqs for %s error!\n", obd->obd_name); @@ -1209,11 +1248,20 @@ static int qslave_recovery_main(void *arg) ptlrpc_daemonize("qslave_recovd"); + /* for obdfilter */ + class_incref(obd, "qslave_recovd_filter", obd); + complete(&data->comp); - if (qctxt->lqc_recovery) + spin_lock(&qctxt->lqc_lock); + if (qctxt->lqc_recovery) { + spin_unlock(&qctxt->lqc_lock); + class_decref(obd, "qslave_recovd_filter", obd); RETURN(0); - qctxt->lqc_recovery = 1; + } else { + qctxt->lqc_recovery = 1; + spin_unlock(&qctxt->lqc_lock); + } for (type = USRQUOTA; type < MAXQUOTAS; type++) { struct qunit_data qdata; @@ -1273,7 +1321,10 @@ free: } } + spin_lock(&qctxt->lqc_lock); qctxt->lqc_recovery = 0; + spin_unlock(&qctxt->lqc_lock); + class_decref(obd, "qslave_recovd_filter", obd); RETURN(rc); } @@ -1346,7 +1397,7 @@ lqs_compare(void *key, struct hlist_node *hnode) static void * lqs_get(struct hlist_node *hnode) { - struct lustre_qunit_size *q = + struct lustre_qunit_size *q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); ENTRY; @@ -1360,7 +1411,7 @@ lqs_get(struct hlist_node *hnode) static void * lqs_put(struct hlist_node *hnode) { - struct lustre_qunit_size *q = + struct lustre_qunit_size *q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); ENTRY; @@ -1379,12 +1430,12 @@ lqs_exit(struct hlist_node *hnode) ENTRY; q = hlist_entry(hnode, struct lustre_qunit_size, lqs_hash); - /* + /* * Nothing should be left. User of lqs put it and * lqs also was deleted from table by this time * so we should have 0 refs. */ - LASSERTF(atomic_read(&q->lqs_refcount) == 0, + LASSERTF(atomic_read(&q->lqs_refcount) == 0, "Busy lqs %p with %d refs\n", q, atomic_read(&q->lqs_refcount)); OBD_FREE_PTR(q);