* GPL HEADER END
*/
/*
- * Copyright (c) 2011, 2012, Intel, Inc.
+ * Copyright (c) 2012, 2013, Intel Corporation.
* Use is subject to license terms.
*
* Author: Johann Lombardi <johann.lombardi@intel.com>
* Author: Niu Yawei <yawei.niu@intel.com>
*/
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
-
#define DEBUG_SUBSYSTEM S_LQUOTA
#include "qsd_internal.h"
-extern cfs_mem_cache_t *upd_kmem;
+extern struct kmem_cache *upd_kmem;
/*
* Allocate and fill an qsd_upd_rec structure to be processed by the writeback
{
struct qsd_upd_rec *upd;
- OBD_SLAB_ALLOC_PTR_GFP(upd, upd_kmem, CFS_ALLOC_IO);
+ OBD_SLAB_ALLOC_PTR_GFP(upd, upd_kmem, __GFP_IO);
if (upd == NULL) {
CERROR("Failed to allocate upd");
return NULL;
/* wake up the upd thread */
cfs_waitq_signal(&qsd->qsd_upd_thread.t_ctl_waitq);
} else {
- CWARN("%s: discard deferred update.\n", qsd->qsd_svname);
+ CWARN("%s: discard update.\n", qsd->qsd_svname);
if (upd->qur_lqe)
- LQUOTA_WARN(upd->qur_lqe, "discard deferred update.");
+ LQUOTA_WARN(upd->qur_lqe, "discard update.");
qsd_upd_free(upd);
}
}
/* must hold the qsd_lock */
-static void qsd_add_deferred(cfs_list_t *list, struct qsd_upd_rec *upd)
+static void qsd_add_deferred(struct qsd_instance *qsd, cfs_list_t *list,
+ struct qsd_upd_rec *upd)
{
- struct qsd_upd_rec *tmp;
+ struct qsd_upd_rec *tmp, *n;
- /* Sort the updates in ascending order */
- cfs_list_for_each_entry_reverse(tmp, list, qur_link) {
+ if (qsd->qsd_stopping) {
+ CWARN("%s: discard deferred udpate.\n", qsd->qsd_svname);
+ if (upd->qur_lqe)
+ LQUOTA_WARN(upd->qur_lqe, "discard deferred update.");
+ qsd_upd_free(upd);
+ return;
+ }
- LASSERTF(upd->qur_ver != tmp->qur_ver, "ver:"LPU64"\n",
- upd->qur_ver);
+ /* Sort the updates in ascending order */
+ cfs_list_for_each_entry_safe_reverse(tmp, n, list, qur_link) {
+
+ /* There could be some legacy records which have duplicated
+ * version. Imagine following scenario: slave received global
+ * glimpse and queued a record in the deferred list, then
+ * master crash and rollback to an ealier version, then the
+ * version of queued record will be conflicting with later
+ * updates. We should just delete the legacy record in such
+ * case. */
+ if (upd->qur_ver == tmp->qur_ver) {
+ LASSERT(tmp->qur_lqe);
+ LQUOTA_ERROR(tmp->qur_lqe, "Found a conflict record "
+ "with ver:"LPU64"", tmp->qur_ver);
+ cfs_list_del_init(&tmp->qur_link);
+ qsd_upd_free(tmp);
+ }
if (upd->qur_ver < tmp->qur_ver) {
continue;
idx_ver = global ? &qqi->qqi_glb_ver : &qqi->qqi_slv_ver;
list = global ? &qqi->qqi_deferred_glb : &qqi->qqi_deferred_slv;
- cfs_write_lock(&qqi->qqi_qsd->qsd_lock);
+ write_lock(&qqi->qqi_qsd->qsd_lock);
*idx_ver = ver;
if (global)
qqi->qqi_glb_uptodate = 1;
else
qqi->qqi_slv_uptodate = 1;
qsd_kickoff_deferred(qqi, list, ver);
- cfs_write_unlock(&qqi->qqi_qsd->qsd_lock);
+ write_unlock(&qqi->qqi_qsd->qsd_lock);
}
/*
* \param qid - quota id
* \param rec - global or slave record to be updated to disk
* \param ver - new index file version
- * \param global- ture : master record; false : slave record
+ * \param global- true: master record; false: slave record
*/
void qsd_upd_schedule(struct qsd_qtype_info *qqi, struct lquota_entry *lqe,
union lquota_id *qid, union lquota_rec *rec, __u64 ver,
/* If we don't want update index version, no need to sort the
* records in version order, just schedule the updates instantly. */
if (ver == 0) {
- cfs_write_lock(&qsd->qsd_lock);
+ write_lock(&qsd->qsd_lock);
qsd_upd_add(qsd, upd);
- cfs_write_unlock(&qsd->qsd_lock);
+ write_unlock(&qsd->qsd_lock);
RETURN_EXIT;
}
- cfs_write_lock(&qsd->qsd_lock);
+ write_lock(&qsd->qsd_lock);
cur_ver = global ? qqi->qqi_glb_ver : qqi->qqi_slv_ver;
* the reintegration is in progress. Defer the update. */
cfs_list_t *list = global ? &qqi->qqi_deferred_glb :
&qqi->qqi_deferred_slv;
- qsd_add_deferred(list, upd);
+ qsd_add_deferred(qsd, list, upd);
}
- cfs_write_unlock(&qsd->qsd_lock);
+ write_unlock(&qsd->qsd_lock);
EXIT;
}
rc = qsd_update_lqe(env, lqe, upd->qur_global, &upd->qur_rec);
if (rc)
GOTO(out, rc);
+ /* refresh usage */
+ qsd_refresh_usage(env, lqe);
/* Report usage asynchronously */
- if (lqe->lqe_enforced &&
- !qsd_refresh_usage(env, lqe)) {
- rc = qsd_dqacq(env, lqe, QSD_REP);
- LQUOTA_DEBUG(lqe, "Report usage. rc:%d", rc);
- }
+ rc = qsd_adjust(env, lqe);
+ if (rc)
+ LQUOTA_ERROR(lqe, "failed to report usage, rc:%d", rc);
}
rc = qsd_update_index(env, qqi, &upd->qur_qid, upd->qur_global,
struct qsd_instance *qsd = lqe2qqi(lqe)->qqi_qsd;
bool added = false;
+ read_lock(&qsd->qsd_lock);
+ if (qsd->qsd_stopping) {
+ read_unlock(&qsd->qsd_lock);
+ return;
+ }
+ read_unlock(&qsd->qsd_lock);
+
lqe_getref(lqe);
- cfs_spin_lock(&qsd->qsd_adjust_lock);
+ spin_lock(&qsd->qsd_adjust_lock);
/* the lqe is being queued for the per-ID lock cancel, we should
* cancel the lock cancel and re-add it for quota adjust */
cfs_list_add(&lqe->lqe_link, &qsd->qsd_adjust_list);
added = true;
}
- cfs_spin_unlock(&qsd->qsd_adjust_lock);
+ spin_unlock(&qsd->qsd_adjust_lock);
if (added)
cfs_waitq_signal(&qsd->qsd_upd_thread.t_ctl_waitq);
LASSERT(cfs_list_empty(upd));
*uptodate = true;
- cfs_spin_lock(&qsd->qsd_adjust_lock);
+ spin_lock(&qsd->qsd_adjust_lock);
if (!cfs_list_empty(&qsd->qsd_adjust_list)) {
struct lquota_entry *lqe;
lqe = cfs_list_entry(qsd->qsd_adjust_list.next,
cfs_time_current_64()))
job_pending = true;
}
- cfs_spin_unlock(&qsd->qsd_adjust_lock);
+ spin_unlock(&qsd->qsd_adjust_lock);
- cfs_write_lock(&qsd->qsd_lock);
+ write_lock(&qsd->qsd_lock);
if (!cfs_list_empty(&qsd->qsd_upd_list)) {
cfs_list_splice_init(&qsd->qsd_upd_list, upd);
job_pending = true;
}
+ if (qsd->qsd_acct_failed) {
+ /* don't bother kicking off reintegration if space accounting
+ * failed to be enabled */
+ write_unlock(&qsd->qsd_lock);
+ return job_pending;
+ }
+
for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++) {
struct qsd_qtype_info *qqi = qsd->qsd_type_array[qtype];
*uptodate = false;
}
- cfs_write_unlock(&qsd->qsd_lock);
+ write_unlock(&qsd->qsd_lock);
return job_pending;
}
struct l_wait_info lwi;
cfs_list_t queue;
struct qsd_upd_rec *upd, *n;
- char pname[MTI_NAME_MAXLEN];
struct lu_env *env;
int qtype, rc = 0;
bool uptodate;
RETURN(rc);
}
- snprintf(pname, MTI_NAME_MAXLEN, "lquota_wb_%s", qsd->qsd_svname);
- cfs_daemonize(pname);
-
thread_set_flags(thread, SVC_RUNNING);
cfs_waitq_signal(&thread->t_ctl_waitq);
qsd_upd_free(upd);
}
- cfs_spin_lock(&qsd->qsd_adjust_lock);
+ spin_lock(&qsd->qsd_adjust_lock);
cur_time = cfs_time_current_64();
cfs_list_for_each_entry_safe(lqe, tmp, &qsd->qsd_adjust_list,
lqe_link) {
break;
cfs_list_del_init(&lqe->lqe_link);
- cfs_spin_unlock(&qsd->qsd_adjust_lock);
+ spin_unlock(&qsd->qsd_adjust_lock);
if (thread_is_running(thread) && uptodate) {
qsd_refresh_usage(env, lqe);
if (lqe->lqe_adjust_time == 0)
qsd_id_lock_cancel(env, lqe);
else
- qsd_dqacq(env, lqe, QSD_ADJ);
+ qsd_adjust(env, lqe);
}
lqe_putref(lqe);
- cfs_spin_lock(&qsd->qsd_adjust_lock);
+ spin_lock(&qsd->qsd_adjust_lock);
}
- cfs_spin_unlock(&qsd->qsd_adjust_lock);
+ spin_unlock(&qsd->qsd_adjust_lock);
if (!thread_is_running(thread))
break;
{
struct ptlrpc_thread *thread = &qsd->qsd_upd_thread;
struct l_wait_info lwi = { 0 };
- int rc;
+ cfs_task_t *task;
ENTRY;
- rc = cfs_create_thread(qsd_upd_thread, (void *)qsd, 0);
- if (rc < 0) {
- CERROR("Fail to start quota update thread. rc: %d\n", rc);
+ task = kthread_run(qsd_upd_thread, (void *)qsd,
+ "lquota_wb_%s", qsd->qsd_svname);
+ if (IS_ERR(task)) {
+ CERROR("Fail to start quota update thread. rc: %ld\n",
+ PTR_ERR(task));
thread_set_flags(thread, SVC_STOPPED);
- RETURN(rc);
+ RETURN(PTR_ERR(task));
}
l_wait_event(thread->t_ctl_waitq,
if (qqi == NULL)
continue;
- cfs_write_lock(&qsd->qsd_lock);
+ write_lock(&qsd->qsd_lock);
cfs_list_for_each_entry_safe(upd, tmp, &qqi->qqi_deferred_glb,
qur_link) {
CWARN("%s: Free global deferred upd: ID:"LPU64", "
list_del_init(&upd->qur_link);
qsd_upd_free(upd);
}
- cfs_write_unlock(&qsd->qsd_lock);
+ write_unlock(&qsd->qsd_lock);
}
}
{
struct lquota_entry *lqe;
- cfs_spin_lock(&qsd->qsd_adjust_lock);
+ spin_lock(&qsd->qsd_adjust_lock);
while (!cfs_list_empty(&qsd->qsd_adjust_list)) {
lqe = cfs_list_entry(qsd->qsd_adjust_list.next,
struct lquota_entry, lqe_link);
cfs_list_del_init(&lqe->lqe_link);
lqe_putref(lqe);
}
- cfs_spin_unlock(&qsd->qsd_adjust_lock);
+ spin_unlock(&qsd->qsd_adjust_lock);
}
void qsd_stop_upd_thread(struct qsd_instance *qsd)