* GPL HEADER END
*/
/*
- * Copyright (c) 2012, 2013, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
* Use is subject to license terms.
*
* Author: Johann Lombardi <johann.lombardi@intel.com>
* Author: Niu Yawei <yawei.niu@intel.com>
*/
-#ifndef EXPORT_SYMTAB
-# define EXPORT_SYMTAB
-#endif
-
#define DEBUG_SUBSYSTEM S_LQUOTA
#include <lustre_dlm.h>
#include <obd_class.h>
+#include <lustre_swab.h>
#include "qsd_internal.h"
+typedef int (enqi_bl_cb_t)(struct ldlm_lock *lock,
+ struct ldlm_lock_desc *desc, void *data,
+ int flag);
+static enqi_bl_cb_t qsd_glb_blocking_ast, qsd_id_blocking_ast;
+
+typedef int (enqi_gl_cb_t)(struct ldlm_lock *lock, void *data);
+static enqi_gl_cb_t qsd_glb_glimpse_ast, qsd_id_glimpse_ast;
+
+struct ldlm_enqueue_info qsd_glb_einfo = {
+ .ei_type = LDLM_PLAIN,
+ .ei_mode = LCK_CR,
+ .ei_cb_bl = qsd_glb_blocking_ast,
+ .ei_cb_cp = ldlm_completion_ast,
+ .ei_cb_gl = qsd_glb_glimpse_ast,
+};
+
+struct ldlm_enqueue_info qsd_id_einfo = {
+ .ei_type = LDLM_PLAIN,
+ .ei_mode = LCK_CR,
+ .ei_cb_bl = qsd_id_blocking_ast,
+ .ei_cb_cp = ldlm_completion_ast,
+ .ei_cb_gl = qsd_id_glimpse_ast,
+};
+
/*
* Return qsd_qtype_info structure associated with a global lock
*
* \param reset - whether lock->l_ast_data should be cleared
*/
static struct qsd_qtype_info *qsd_glb_ast_data_get(struct ldlm_lock *lock,
- bool reset) {
+ bool reset)
+{
struct qsd_qtype_info *qqi;
+
ENTRY;
lock_res_and_lock(lock);
qqi = lock->l_ast_data;
- if (qqi != NULL) {
+ if (qqi) {
qqi_getref(qqi);
if (reset)
lock->l_ast_data = NULL;
}
unlock_res_and_lock(lock);
- if (qqi != NULL)
+ if (qqi)
/* it is not safe to call lu_ref_add() under spinlock */
lu_ref_add(&qqi->qqi_reference, "ast_data_get", lock);
- if (reset && qqi != NULL) {
+ if (reset && qqi) {
/* release qqi reference hold for the lock */
lu_ref_del(&qqi->qqi_reference, "glb_lock", lock);
qqi_putref(qqi);
* \param reset - whether lock->l_ast_data should be cleared
*/
static struct lquota_entry *qsd_id_ast_data_get(struct ldlm_lock *lock,
- bool reset) {
+ bool reset)
+{
struct lquota_entry *lqe;
+
ENTRY;
lock_res_and_lock(lock);
lqe = lock->l_ast_data;
- if (lqe != NULL) {
+ if (lqe) {
lqe_getref(lqe);
if (reset)
lock->l_ast_data = NULL;
}
unlock_res_and_lock(lock);
- if (reset && lqe != NULL)
+ if (reset && lqe)
/* release lqe reference hold for the lock */
lqe_putref(lqe);
RETURN(lqe);
struct ldlm_gl_lquota_desc **desc, void **lvb)
{
int rc;
+
ENTRY;
LASSERT(lustre_msg_get_opc(req->rq_reqmsg) == LDLM_GL_CALLBACK);
/* glimpse on quota locks always packs a glimpse descriptor */
- req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_DESC_CALLBACK);
+ req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_CALLBACK_DESC);
/* extract glimpse descriptor */
*desc = req_capsule_client_get(&req->rq_pill, &RMF_DLM_GL_DESC);
- if (*desc == NULL)
+ if (!*desc)
RETURN(-EFAULT);
+ if (ptlrpc_req_need_swab(req))
+ lustre_swab_gl_lquota_desc(*desc);
+
/* prepare reply */
req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
sizeof(struct lquota_lvb));
int flag)
{
int rc = 0;
+
ENTRY;
- switch(flag) {
+ switch (flag) {
case LDLM_CB_BLOCKING: {
struct lustre_handle lockh;
LDLM_DEBUG(lock, "canceling global quota lock");
qqi = qsd_glb_ast_data_get(lock, true);
- if (qqi == NULL)
+ if (!qqi)
break;
- /* we are losing the global index lock, so let's mark the
- * global & slave indexes as not up-to-date any more */
+ /*
+ * we are losing the global index lock, so let's mark the
+ * global & slave indexes as not up-to-date any more
+ */
write_lock(&qqi->qqi_qsd->qsd_lock);
qqi->qqi_glb_uptodate = false;
qqi->qqi_slv_uptodate = false;
write_unlock(&qqi->qqi_qsd->qsd_lock);
CDEBUG(D_QUOTA, "%s: losing global index lock for %s type\n",
- qqi->qqi_qsd->qsd_svname, QTYPE_NAME((qqi->qqi_qtype)));
+ qqi->qqi_qsd->qsd_svname, qtype_name((qqi->qqi_qtype)));
- /* kick off reintegration thread if not running already, if
+ /*
+ * kick off reintegration thread if not running already, if
* it's just local cancel (for stack clean up or eviction),
- * don't re-trigger the reintegration. */
- if ((lock->l_flags & LDLM_FL_LOCAL_ONLY) == 0)
+ * don't re-trigger the reintegration.
+ */
+ if (!ldlm_is_local_only(lock))
qsd_start_reint_thread(qqi);
lu_ref_del(&qqi->qqi_reference, "ast_data_get", lock);
break;
}
default:
- LASSERTF(0, "invalid flags for blocking ast %d", flag);
+ LASSERTF(0, "invalid flags for blocking ast %d\n", flag);
}
RETURN(rc);
}
+static int qsd_entry_def_iter_cb(struct cfs_hash *hs, struct cfs_hash_bd *bd,
+ struct hlist_node *hnode, void *data)
+{
+ struct qsd_qtype_info *qqi = (struct qsd_qtype_info *)data;
+ struct lquota_entry *lqe;
+
+ lqe = hlist_entry(hnode, struct lquota_entry, lqe_hash);
+ LASSERT(atomic_read(&lqe->lqe_ref) > 0);
+
+ if (lqe->lqe_id.qid_uid == 0 || !lqe->lqe_is_default)
+ return 0;
+
+ lqe_write_lock(lqe);
+ if (qqi->qqi_default_hardlimit == 0 && qqi->qqi_default_softlimit == 0)
+ lqe->lqe_enforced = false;
+ else
+ lqe->lqe_enforced = true;
+ lqe_write_unlock(lqe);
+
+ return 0;
+}
+
+/* Update the quota entries after receiving default quota update
+ *
+ * \param qqi - is the qsd_qtype_info associated with the quota entries
+ * \param hardlimit - new hardlimit of default quota
+ * \param softlimit - new softlimit of default quota
+ * \param gracetime - new gracetime of default quota
+ */
+void qsd_update_default_quota(struct qsd_qtype_info *qqi, __u64 hardlimit,
+ __u64 softlimit, __u64 gracetime)
+{
+ CDEBUG(D_QUOTA, "%s: update default quota setting from QMT.\n",
+ qqi->qqi_qsd->qsd_svname);
+
+ qqi->qqi_default_hardlimit = hardlimit;
+ qqi->qqi_default_softlimit = softlimit;
+ qqi->qqi_default_gracetime = gracetime;
+
+ cfs_hash_for_each_safe(qqi->qqi_site->lqs_hash,
+ qsd_entry_def_iter_cb, qqi);
+}
+
/*
* Glimpse callback handler for global quota lock.
*
*/
static int qsd_glb_glimpse_ast(struct ldlm_lock *lock, void *data)
{
- struct ptlrpc_request *req = data;
- struct qsd_qtype_info *qqi;
- struct ldlm_gl_lquota_desc *desc;
- struct lquota_lvb *lvb;
- struct lquota_glb_rec rec;
- int rc;
+ struct ptlrpc_request *req = data;
+ struct qsd_qtype_info *qqi;
+ struct ldlm_gl_lquota_desc *desc;
+ struct lquota_lvb *lvb;
+ struct lquota_glb_rec rec;
+ int rc;
+
ENTRY;
rc = qsd_common_glimpse_ast(req, &desc, (void **)&lvb);
GOTO(out, rc);
qqi = qsd_glb_ast_data_get(lock, false);
- if (qqi == NULL)
+ if (!qqi)
/* valid race */
GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
- CDEBUG(D_QUOTA, "%s: glimpse on glb quota locks, id:"LPU64" ver:"LPU64
- " hard:" LPU64" soft:"LPU64"\n", qqi->qqi_qsd->qsd_svname,
+ CDEBUG(D_QUOTA,
+ "%s: glimpse on glb quota locks, id:%llu ver:%llu hard:%llu soft:%llu\n",
+ qqi->qqi_qsd->qsd_svname,
desc->gl_id.qid_uid, desc->gl_ver, desc->gl_hardlimit,
desc->gl_softlimit);
if (desc->gl_ver == 0) {
- CERROR("%s: invalid global index version "LPU64"\n",
+ CERROR("%s: invalid global index version %llu\n",
qqi->qqi_qsd->qsd_svname, desc->gl_ver);
GOTO(out_qqi, rc = -EINVAL);
}
rec.qbr_time = desc->gl_time;
rec.qbr_granted = 0;
- /* We can't afford disk io in the context of glimpse callback handling
- * thread, so the on-disk global limits update has to be deferred. */
+ if (desc->gl_id.qid_uid == 0)
+ qsd_update_default_quota(qqi, desc->gl_hardlimit,
+ desc->gl_softlimit, desc->gl_time);
+
+ /*
+ * We can't afford disk io in the context of glimpse callback handling
+ * thread, so the on-disk global limits update has to be deferred.
+ */
qsd_upd_schedule(qqi, NULL, &desc->gl_id, (union lquota_rec *)&rec,
desc->gl_ver, true);
EXIT;
return rc;
}
-struct ldlm_enqueue_info qsd_glb_einfo = { LDLM_PLAIN,
- LCK_CR,
- qsd_glb_blocking_ast,
- ldlm_completion_ast,
- qsd_glb_glimpse_ast,
- NULL, NULL };
-/*
+/**
* Blocking callback handler for per-ID lock
*
* \param lock - is the lock for which ast occurred.
* \param flag - LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
* cancellation and blocking ast's.
*/
-static int qsd_id_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+static int qsd_id_blocking_ast(struct ldlm_lock *lock,
+ struct ldlm_lock_desc *desc,
void *data, int flag)
{
- struct lustre_handle lockh;
- int rc = 0;
+ struct lustre_handle lockh;
+ int rc = 0;
+
ENTRY;
- switch(flag) {
+ switch (flag) {
case LDLM_CB_BLOCKING: {
LDLM_DEBUG(lock, "blocking AST on ID quota lock");
break;
}
case LDLM_CB_CANCELING: {
- struct lu_env *env;
- struct lquota_entry *lqe;
- bool rel = false;
+ struct lu_env *env;
+ struct lquota_entry *lqe;
- LDLM_DEBUG(lock, "canceling global quota lock");
+ LDLM_DEBUG(lock, "canceling ID quota lock");
lqe = qsd_id_ast_data_get(lock, true);
- if (lqe == NULL)
+ if (!lqe)
break;
LQUOTA_DEBUG(lqe, "losing ID lock");
- /* just local cancel (for stack clean up or eviction), don't
- * release quota space in this case */
- if ((lock->l_flags & LDLM_FL_LOCAL_ONLY) != 0) {
- lqe_putref(lqe);
- break;
- }
-
- /* allocate environment */
- OBD_ALLOC_PTR(env);
- if (env == NULL) {
- lqe_putref(lqe);
- rc = -ENOMEM;
- break;
- }
-
- /* initialize environment */
- rc = lu_env_init(env, LCT_DT_THREAD);
- if (rc) {
- OBD_FREE_PTR(env);
- lqe_putref(lqe);
- break;
- }
-
ldlm_lock2handle(lock, &lockh);
lqe_write_lock(lqe);
if (lustre_handle_equal(&lockh, &lqe->lqe_lockh)) {
/* Clear lqe_lockh & reset qunit to 0 */
qsd_set_qunit(lqe, 0);
memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
- lqe->lqe_edquot = false;
- rel = true;
+ qsd_set_edquot(lqe, false);
}
lqe_write_unlock(lqe);
- /* If there is qqacq inflight, the release will be skipped
+ /*
+ * If there is dqacq inflight, the release will be skipped
* at this time, and triggered on dqacq completion later,
* which means there could be a short window that slave is
- * holding spare grant wihtout per-ID lock. */
- if (rel)
+ * holding spare grant wihtout per-ID lock.
+ */
+
+ /*
+ * don't release quota space for local cancel (stack clean
+ * up or eviction)
+ */
+ if (!ldlm_is_local_only(lock)) {
+ /* allocate environment */
+ OBD_ALLOC_PTR(env);
+ if (!env) {
+ lqe_putref(lqe);
+ rc = -ENOMEM;
+ break;
+ }
+
+ /* initialize environment */
+ rc = lu_env_init(env, LCT_DT_THREAD);
+ if (rc) {
+ OBD_FREE_PTR(env);
+ lqe_putref(lqe);
+ break;
+ }
+
rc = qsd_adjust(env, lqe);
+ lu_env_fini(env);
+ OBD_FREE_PTR(env);
+ }
+
/* release lqe reference grabbed by qsd_id_ast_data_get() */
lqe_putref(lqe);
- lu_env_fini(env);
- OBD_FREE_PTR(env);
break;
}
default:
- LASSERTF(0, "invalid flags for blocking ast %d", flag);
+ LASSERTF(0, "invalid flags for blocking ast %d\n", flag);
}
RETURN(rc);
*/
static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data)
{
- struct ptlrpc_request *req = data;
- struct lquota_entry *lqe;
- struct qsd_instance *qsd;
- struct ldlm_gl_lquota_desc *desc;
- struct lquota_lvb *lvb;
- int rc;
- bool wakeup = false;
+ struct ptlrpc_request *req = data;
+ struct lquota_entry *lqe;
+ struct ldlm_gl_lquota_desc *desc;
+ struct lquota_lvb *lvb;
+ int rc;
+ bool wakeup = false;
+
ENTRY;
rc = qsd_common_glimpse_ast(req, &desc, (void **)&lvb);
GOTO(out, rc);
lqe = qsd_id_ast_data_get(lock, false);
- if (lqe == NULL)
+ if (!lqe)
/* valid race */
GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
- LQUOTA_DEBUG(lqe, "glimpse on quota locks, new qunit:"LPU64,
+ LQUOTA_DEBUG(lqe, "glimpse on quota locks, new qunit:%llu",
desc->gl_qunit);
- qsd = lqe2qqi(lqe)->qqi_qsd;
-
lqe_write_lock(lqe);
lvb->lvb_id_rel = 0;
if (desc->gl_qunit != 0 && desc->gl_qunit != lqe->lqe_qunit) {
if (space > 0) {
if (lqe->lqe_pending_req > 0) {
- LQUOTA_DEBUG(lqe, "request in flight, postpone "
- "release of "LPD64, space);
+ LQUOTA_DEBUG(lqe,
+ "request in flight, postpone release of %lld",
+ space);
lvb->lvb_id_may_rel = space;
} else {
lqe->lqe_pending_req++;
/* release quota space in glimpse reply */
- LQUOTA_DEBUG(lqe, "releasing "LPD64, space);
+ LQUOTA_DEBUG(lqe, "releasing %lld", space);
lqe->lqe_granted -= space;
lvb->lvb_id_rel = space;
lqe_write_unlock(lqe);
/* change the lqe_granted */
- qsd_upd_schedule(lqe2qqi(lqe), lqe, &lqe->lqe_id,
- (union lquota_rec *)&lqe->lqe_granted,
- 0, false);
+ qsd_upd_schedule(lqe2qqi(lqe), lqe,
+ &lqe->lqe_id,
+ (union lquota_rec *)
+ &lqe->lqe_granted, 0, false);
lqe_write_lock(lqe);
lqe->lqe_pending_req--;
}
}
- lqe->lqe_edquot = !!(desc->gl_flags & LQUOTA_FL_EDQUOT);
+ qsd_set_edquot(lqe, !!(desc->gl_flags & LQUOTA_FL_EDQUOT));
lqe_write_unlock(lqe);
if (wakeup)
- cfs_waitq_broadcast(&lqe->lqe_waiters);
+ wake_up_all(&lqe->lqe_waiters);
lqe_putref(lqe);
out:
req->rq_status = rc;
RETURN(rc);
}
-struct ldlm_enqueue_info qsd_id_einfo = { LDLM_PLAIN,
- LCK_CR,
- qsd_id_blocking_ast,
- ldlm_completion_ast,
- qsd_id_glimpse_ast,
- NULL, NULL };
-
-/*
+/**
* Check whether a slave already own a ldlm lock for the quota identifier \qid.
*
* \param lockh - is the local lock handle from lquota entry.
*/
int qsd_id_lock_match(struct lustre_handle *lockh, struct lustre_handle *rlockh)
{
- struct ldlm_lock *lock;
- int rc;
+ struct ldlm_lock *lock;
+ int rc;
+
ENTRY;
LASSERT(lockh);
LASSERT(lustre_handle_is_used(lockh));
ldlm_lock_dump_handle(D_QUOTA, lockh);
- if (rlockh == NULL)
+ if (!rlockh)
/* caller not interested in remote handle */
RETURN(0);
- /* look up lock associated with local handle and extract remote handle
- * to be packed in quota request */
+ /*
+ * look up lock associated with local handle and extract remote handle
+ * to be packed in quota request
+ */
lock = ldlm_handle2lock(lockh);
LASSERT(lock != NULL);
lustre_handle_copy(rlockh, &lock->l_remote_handle);
int qsd_id_lock_cancel(const struct lu_env *env, struct lquota_entry *lqe)
{
- struct qsd_thread_info *qti = qsd_info(env);
- int rc;
+ struct qsd_thread_info *qti = qsd_info(env);
+ int rc;
+
ENTRY;
lqe_write_lock(lqe);
if (lustre_handle_is_used(&qti->qti_lockh)) {
memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
qsd_set_qunit(lqe, 0);
- lqe->lqe_edquot = false;
+ qsd_set_edquot(lqe, false);
}
lqe_write_unlock(lqe);