* future. For the time being, we can just use an array. */
struct qsd_qtype_info *qsd_type_array[MAXQUOTAS];
+ /* r/w spinlock protecting:
+ * - the state flags
+ * - the qsd update list
+ * - the deferred list
+ * - flags of the qsd_qtype_info
+ *
+ * probably way too much :(
+ */
+ cfs_rwlock_t qsd_lock;
+
unsigned long qsd_is_md:1, /* managing quota for mdt */
qsd_stopping:1; /* qsd_instance is stopping */
};
* immutable after creation. */
struct qsd_instance *qqi_qsd;
+ /* handle of global quota lock */
+ struct lustre_handle qqi_lockh;
+
/* Local index files storing quota settings for this quota type */
struct dt_object *qqi_acct_obj; /* accounting object */
struct dt_object *qqi_slv_obj; /* slave index copy */
/* Current object versions */
__u64 qqi_slv_ver; /* slave index version */
__u64 qqi_glb_ver; /* global index version */
+
+ /* Various flags representing the current state of the slave for this
+ * quota type. */
+ unsigned long qqi_glb_uptodate:1, /* global index uptodate
+ with master */
+ qqi_slv_uptodate:1, /* slave index uptodate
+ with master */
+ qqi_reint:1; /* in reintegration or not */
+
+ /* A list of references to this instance, for debugging */
+ struct lu_ref qqi_reference;
};
/*
cfs_atomic_dec(&qqi->qqi_ref);
}
+/* all kind of operations supported by qsd_dqacq() */
+enum qsd_ops {
+ QSD_ADJ, /* adjust quota space based on current qunit */
+ QSD_ACQ, /* acquire space for requests */
+ QSD_REL, /* release all space quota space uncondionnally */
+ QSD_REP, /* report space usage during reintegration */
+};
+
#define QSD_RES_TYPE(qsd) ((qsd)->qsd_is_md ? LQUOTA_RES_MD : LQUOTA_RES_DT)
/* Common data shared by qsd-level handlers. This is allocated per-thread to
int qsd_write_version(const struct lu_env *, struct qsd_qtype_info *,
__u64, bool);
+/* qsd_lock.c */
+extern struct ldlm_enqueue_info qsd_glb_einfo;
+extern struct ldlm_enqueue_info qsd_id_einfo;
+int qsd_id_lock_match(struct lustre_handle *, struct lustre_handle *);
+int qsd_id_lock_cancel(const struct lu_env *, struct lquota_entry *);
+
+/* qsd_reint.c */
+int qsd_start_reint_thread(struct qsd_qtype_info *);
+void qsd_stop_reint_thread(struct qsd_qtype_info *);
+
/* qsd_request.c */
typedef void (*qsd_req_completion_t) (const struct lu_env *,
struct qsd_qtype_info *,
struct idx_info *, unsigned int, cfs_page_t **, bool *);
/* qsd_writeback.c */
-/* XXX to be replaced with real function when reintegration landed. */
-static inline void qsd_bump_version(struct qsd_qtype_info *qqi, __u64 ver,
- bool global)
+void qsd_bump_version(struct qsd_qtype_info *, __u64, bool);
+void qsd_upd_schedule(struct qsd_qtype_info *, struct lquota_entry *,
+ union lquota_id *, union lquota_rec *, __u64, bool);
+
+/* qsd_handler.c */
+/* XXX to be replaced with real function once qsd_handler landed */
+static inline int qsd_dqacq(const struct lu_env *env, struct lquota_entry *lqe,
+ enum qsd_ops op)
{
+ return 0;
}
#endif /* _QSD_INTERNAL_H */
--- /dev/null
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2011, 2012, Intel, Inc.
+ * Use is subject to license terms.
+ *
+ * Author: Johann Lombardi <johann@whamcloud.com>
+ * Author: Niu Yawei <niu@whamcloud.com>
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+
+#define DEBUG_SUBSYSTEM S_LQUOTA
+
+#include <lustre_dlm.h>
+#include <obd_class.h>
+
+#include "qsd_internal.h"
+
+/*
+ * Return qsd_qtype_info structure associated with a global lock
+ *
+ * \param lock - is the global lock from which we should extract the qqi
+ * \param reset - whether lock->l_ast_data should be cleared
+ */
+static struct qsd_qtype_info *qsd_glb_ast_data_get(struct ldlm_lock *lock,
+ bool reset) {
+ struct qsd_qtype_info *qqi;
+ ENTRY;
+
+ lock_res_and_lock(lock);
+ qqi = lock->l_ast_data;
+ if (qqi != NULL) {
+ qqi_getref(qqi);
+ lu_ref_add(&qqi->qqi_reference, "ast_data_get", lock);
+ if (reset)
+ lock->l_ast_data = NULL;
+ }
+ unlock_res_and_lock(lock);
+
+ if (reset && qqi != NULL) {
+ /* release qqi reference hold for the lock */
+ qqi_putref(qqi);
+ lu_ref_del(&qqi->qqi_reference, "glb_lock", lock);
+ }
+ RETURN(qqi);
+}
+
+/*
+ * Return lquota entry structure associated with a per-ID lock
+ *
+ * \param lock - is the per-ID lock from which we should extract the lquota
+ * entry
+ * \param reset - whether lock->l_ast_data should be cleared
+ */
+static struct lquota_entry *qsd_id_ast_data_get(struct ldlm_lock *lock,
+ bool reset) {
+ struct lquota_entry *lqe;
+ ENTRY;
+
+ lock_res_and_lock(lock);
+ lqe = lock->l_ast_data;
+ if (lqe != NULL) {
+ lqe_getref(lqe);
+ if (reset)
+ lock->l_ast_data = NULL;
+ }
+ unlock_res_and_lock(lock);
+
+ if (reset && lqe != NULL)
+ /* release lqe reference hold for the lock */
+ lqe_putref(lqe);
+ RETURN(lqe);
+}
+
+/*
+ * Glimpse callback handler for all quota locks. This function extracts
+ * information from the glimpse request.
+ *
+ * \param lock - is the lock targeted by the glimpse
+ * \param data - is a pointer to the glimpse ptlrpc request
+ * \param req - is the glimpse request
+ * \param desc - is the glimpse descriptor describing the purpose of the glimpse
+ * request.
+ * \param lvb - is the pointer to the lvb in the reply buffer
+ *
+ * \retval 0 on success and \desc, \lvb & \arg point to a valid structures,
+ * appropriate error on failure
+ */
+static int qsd_common_glimpse_ast(struct ptlrpc_request *req,
+ struct ldlm_gl_lquota_desc **desc, void **lvb)
+{
+ int rc;
+ ENTRY;
+
+ LASSERT(lustre_msg_get_opc(req->rq_reqmsg) == LDLM_GL_CALLBACK);
+
+ /* glimpse on quota locks always packs a glimpse descriptor */
+ req_capsule_extend(&req->rq_pill, &RQF_LDLM_GL_DESC_CALLBACK);
+
+ /* extract glimpse descriptor */
+ *desc = req_capsule_client_get(&req->rq_pill, &RMF_DLM_GL_DESC);
+ if (*desc == NULL)
+ RETURN(-EFAULT);
+
+ /* prepare reply */
+ rc = req_capsule_server_pack(&req->rq_pill);
+ if (rc != 0) {
+ CERROR("Can't pack response, rc %d\n", rc);
+ RETURN(rc);
+ }
+
+ /* extract lvb */
+ *lvb = req_capsule_server_get(&req->rq_pill, &RMF_DLM_LVB);
+
+ RETURN(0);
+}
+
+/*
+ * Blocking callback handler for global index lock
+ *
+ * \param lock - is the lock for which ast occurred.
+ * \param desc - is the description of a conflicting lock in case of blocking
+ * ast.
+ * \param data - is the value of lock->l_ast_data
+ * \param flag - LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
+ * cancellation and blocking ast's.
+ */
+static int qsd_glb_blocking_ast(struct ldlm_lock *lock,
+ struct ldlm_lock_desc *desc, void *data,
+ int flag)
+{
+ int rc = 0;
+ ENTRY;
+
+ switch(flag) {
+ case LDLM_CB_BLOCKING: {
+ struct lustre_handle lockh;
+
+ LDLM_DEBUG(lock, "blocking AST on global quota lock");
+ ldlm_lock2handle(lock, &lockh);
+ rc = ldlm_cli_cancel(&lockh);
+ break;
+ }
+ case LDLM_CB_CANCELING: {
+ struct qsd_qtype_info *qqi;
+
+ LDLM_DEBUG(lock, "canceling global quota lock");
+
+ qqi = qsd_glb_ast_data_get(lock, true);
+ if (qqi == NULL)
+ break;
+
+ /* we are losing the global index lock, so let's mark the
+ * global & slave indexes as not up-to-date any more */
+ cfs_write_lock(&qqi->qqi_qsd->qsd_lock);
+ qqi->qqi_glb_uptodate = false;
+ qqi->qqi_slv_uptodate = false;
+ if (lock->l_handle.h_cookie == qqi->qqi_lockh.cookie)
+ memset(&qqi->qqi_lockh, 0, sizeof(qqi->qqi_lockh));
+ cfs_write_unlock(&qqi->qqi_qsd->qsd_lock);
+
+ CDEBUG(D_QUOTA, "%s: losing global index lock for %s type\n",
+ qqi->qqi_qsd->qsd_svname, QTYPE_NAME((qqi->qqi_qtype)));
+
+ /* kick off reintegration thread if not running already, if
+ * it's just local cancel (for stack clean up or eviction),
+ * don't re-trigger the reintegration. */
+ if ((lock->l_flags & LDLM_FL_LOCAL_ONLY) == 0)
+ qsd_start_reint_thread(qqi);
+
+ lu_ref_del(&qqi->qqi_reference, "ast_data_get", lock);
+ qqi_putref(qqi);
+ break;
+ }
+ default:
+ LASSERTF(0, "invalid flags for blocking ast %d", flag);
+ }
+
+ RETURN(rc);
+}
+
+/*
+ * Glimpse callback handler for global quota lock.
+ *
+ * \param lock - is the lock targeted by the glimpse
+ * \param data - is a pointer to the glimpse ptlrpc request
+ */
+static int qsd_glb_glimpse_ast(struct ldlm_lock *lock, void *data)
+{
+ struct ptlrpc_request *req = data;
+ struct qsd_qtype_info *qqi;
+ struct ldlm_gl_lquota_desc *desc;
+ struct lquota_lvb *lvb;
+ struct lquota_glb_rec rec;
+ int rc;
+ ENTRY;
+
+ rc = qsd_common_glimpse_ast(req, &desc, (void **)&lvb);
+ if (rc)
+ GOTO(out, rc);
+
+ qqi = qsd_glb_ast_data_get(lock, false);
+ if (qqi == NULL)
+ /* valid race */
+ GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
+
+ LCONSOLE_INFO("%s: glimpse on glb quota locks, id:"LPU64" ver:"LPU64
+ " hard:" LPU64" soft:"LPU64"\n", qqi->qqi_qsd->qsd_svname,
+ desc->gl_id.qid_uid, desc->gl_ver, desc->gl_hardlimit,
+ desc->gl_softlimit);
+
+ if (desc->gl_ver == 0) {
+ CERROR("%s: invalid global index version "LPU64"\n",
+ qqi->qqi_qsd->qsd_svname, desc->gl_ver);
+ GOTO(out_qqi, rc = -EINVAL);
+ }
+
+ /* extract new hard & soft limits from the glimpse descriptor */
+ rec.qbr_hardlimit = desc->gl_hardlimit;
+ rec.qbr_softlimit = desc->gl_softlimit;
+ rec.qbr_time = 0;
+ rec.qbr_granted = 0;
+
+ /* We can't afford disk io in the context of glimpse callback handling
+ * thread, so the on-disk global limits update has to be deferred. */
+ qsd_upd_schedule(qqi, NULL, &desc->gl_id, (union lquota_rec *)&rec,
+ desc->gl_ver, true);
+ EXIT;
+out_qqi:
+ lu_ref_del(&qqi->qqi_reference, "ast_data_get", lock);
+ qqi_putref(qqi);
+out:
+ req->rq_status = rc;
+ return rc;
+}
+
+struct ldlm_enqueue_info qsd_glb_einfo = { LDLM_PLAIN,
+ LCK_CR,
+ qsd_glb_blocking_ast,
+ ldlm_completion_ast,
+ qsd_glb_glimpse_ast,
+ NULL, NULL };
+/*
+ * Blocking callback handler for per-ID lock
+ *
+ * \param lock - is the lock for which ast occurred.
+ * \param desc - is the description of a conflicting lock in case of blocking
+ * ast.
+ * \param data - is the value of lock->l_ast_data
+ * \param flag - LDLM_CB_BLOCKING or LDLM_CB_CANCELING. Used to distinguish
+ * cancellation and blocking ast's.
+ */
+static int qsd_id_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
+ void *data, int flag)
+{
+ struct lustre_handle lockh;
+ int rc = 0;
+ ENTRY;
+
+ switch(flag) {
+ case LDLM_CB_BLOCKING: {
+
+ LDLM_DEBUG(lock, "blocking AST on ID quota lock");
+ ldlm_lock2handle(lock, &lockh);
+ rc = ldlm_cli_cancel(&lockh);
+ break;
+ }
+ case LDLM_CB_CANCELING: {
+ struct lu_env *env;
+ struct lquota_entry *lqe;
+ bool rel = false;
+
+ LDLM_DEBUG(lock, "canceling global quota lock");
+ lqe = qsd_id_ast_data_get(lock, true);
+ if (lqe == NULL)
+ break;
+
+ LQUOTA_DEBUG(lqe, "losing ID lock");
+
+ /* just local cancel (for stack clean up or eviction), don't
+ * release quota space in this case */
+ if ((lock->l_flags & LDLM_FL_LOCAL_ONLY) != 0) {
+ lqe_putref(lqe);
+ break;
+ }
+
+ /* allocate environment */
+ OBD_ALLOC_PTR(env);
+ if (env == NULL) {
+ lqe_putref(lqe);
+ rc = -ENOMEM;
+ break;
+ }
+
+ /* initialize environment */
+ rc = lu_env_init(env, LCT_DT_THREAD);
+ if (rc) {
+ OBD_FREE_PTR(env);
+ lqe_putref(lqe);
+ break;
+ }
+
+ ldlm_lock2handle(lock, &lockh);
+ lqe_write_lock(lqe);
+ if (lustre_handle_equal(&lockh, &lqe->lqe_lockh)) {
+ /* Clear lqe_lockh & reset qunit to 0 */
+ qsd_set_qunit(lqe, 0);
+ memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
+ lqe->lqe_edquot = false;
+ rel = true;
+ }
+ lqe_write_unlock(lqe);
+
+ /* If there is qqacq inflight, the release will be skipped
+ * at this time, and triggered on dqacq completion later,
+ * which means there could be a short window that slave is
+ * holding spare grant wihtout per-ID lock. */
+ if (rel)
+ rc = qsd_dqacq(env, lqe, QSD_REL);
+
+ /* release lqe reference grabbed by qsd_id_ast_data_get() */
+ lqe_putref(lqe);
+ lu_env_fini(env);
+ OBD_FREE_PTR(env);
+ break;
+ }
+ default:
+ LASSERTF(0, "invalid flags for blocking ast %d", flag);
+ }
+
+ RETURN(rc);
+}
+
+/*
+ * Glimpse callback handler for per-ID quota locks.
+ *
+ * \param lock - is the lock targeted by the glimpse
+ * \param data - is a pointer to the glimpse ptlrpc request
+ */
+static int qsd_id_glimpse_ast(struct ldlm_lock *lock, void *data)
+{
+ struct ptlrpc_request *req = data;
+ struct lquota_entry *lqe;
+ struct qsd_instance *qsd;
+ struct ldlm_gl_lquota_desc *desc;
+ struct lquota_lvb *lvb;
+ int rc;
+ bool wakeup = false;
+ ENTRY;
+
+ rc = qsd_common_glimpse_ast(req, &desc, (void **)&lvb);
+ if (rc)
+ GOTO(out, rc);
+
+ lqe = qsd_id_ast_data_get(lock, false);
+ if (lqe == NULL)
+ /* valid race */
+ GOTO(out, rc = -ELDLM_NO_LOCK_DATA);
+
+ LQUOTA_CONSOLE(lqe, "glimpse on quota locks, new qunit:"LPU64,
+ desc->gl_qunit);
+
+ qsd = lqe2qqi(lqe)->qqi_qsd;
+
+ lqe_write_lock(lqe);
+ lvb->lvb_id_rel = 0;
+ if (desc->gl_qunit != 0 && desc->gl_qunit != lqe->lqe_qunit) {
+ long long space;
+
+ /* extract new qunit from glimpse request */
+ qsd_set_qunit(lqe, desc->gl_qunit);
+
+ space = lqe->lqe_granted - lqe->lqe_pending_rel;
+ space -= lqe->lqe_usage;
+ space -= lqe->lqe_pending_write + lqe->lqe_waiting_write;
+ space -= lqe->lqe_qunit;
+
+ if (space > 0) {
+ if (lqe->lqe_pending_req > 0) {
+ LQUOTA_ERROR(lqe, "request in flight, postpone "
+ "release of "LPD64, space);
+ lvb->lvb_id_may_rel = space;
+ } else {
+ lqe->lqe_pending_req++;
+
+ /* release quota space in glimpse reply */
+ LQUOTA_ERROR(lqe, "releasing "LPD64, space);
+ lqe->lqe_granted -= space;
+ lvb->lvb_id_rel = space;
+
+ lqe_write_unlock(lqe);
+ /* change the lqe_granted */
+ qsd_upd_schedule(lqe2qqi(lqe), lqe, &lqe->lqe_id,
+ (union lquota_rec *)&lqe->lqe_granted,
+ 0, false);
+ lqe_write_lock(lqe);
+
+ lqe->lqe_pending_req--;
+ wakeup = true;
+ }
+ }
+ }
+
+ lqe->lqe_edquot = !!(desc->gl_flags & LQUOTA_FL_EDQUOT);
+ lqe_write_unlock(lqe);
+
+ if (wakeup)
+ cfs_waitq_broadcast(&lqe->lqe_waiters);
+ lqe_putref(lqe);
+out:
+ req->rq_status = rc;
+ RETURN(rc);
+}
+
+struct ldlm_enqueue_info qsd_id_einfo = { LDLM_PLAIN,
+ LCK_CR,
+ qsd_id_blocking_ast,
+ ldlm_completion_ast,
+ qsd_id_glimpse_ast,
+ NULL, NULL };
+
+/*
+ * Check whether a slave already own a ldlm lock for the quota identifier \qid.
+ *
+ * \param lockh - is the local lock handle from lquota entry.
+ * \param rlockh - is the remote lock handle of the matched lock, if any.
+ *
+ * \retval 0 : on successful look up and \lockh contains the lock handle.
+ * \retval -ENOENT: no lock found
+ */
+int qsd_id_lock_match(struct lustre_handle *lockh, struct lustre_handle *rlockh)
+{
+ struct ldlm_lock *lock;
+ int rc;
+ ENTRY;
+
+ LASSERT(lockh);
+
+ if (!lustre_handle_is_used(lockh))
+ RETURN(-ENOENT);
+
+ rc = ldlm_lock_addref_try(lockh, qsd_id_einfo.ei_mode);
+ if (rc)
+ RETURN(-ENOENT);
+
+ LASSERT(lustre_handle_is_used(lockh));
+ ldlm_lock_dump_handle(D_QUOTA, lockh);
+
+ if (rlockh == NULL)
+ RETURN(0);
+
+ /* look up lock associated with local handle and extract remote handle
+ * to be packed in quota request */
+ lock = ldlm_handle2lock(lockh);
+ LASSERT(lock != NULL);
+ lustre_handle_copy(rlockh, &lock->l_remote_handle);
+ LDLM_LOCK_PUT(lock);
+
+ RETURN(0);
+}
+
+int qsd_id_lock_cancel(const struct lu_env *env, struct lquota_entry *lqe)
+{
+ struct qsd_thread_info *qti = qsd_info(env);
+ int rc;
+ ENTRY;
+
+ lqe_write_lock(lqe);
+ if (lqe->lqe_pending_write || lqe->lqe_waiting_write ||
+ lqe->lqe_usage || lqe->lqe_granted) {
+ lqe_write_unlock(lqe);
+ RETURN(0);
+ }
+
+ lustre_handle_copy(&qti->qti_lockh, &lqe->lqe_lockh);
+ if (lustre_handle_is_used(&qti->qti_lockh)) {
+ memset(&lqe->lqe_lockh, 0, sizeof(lqe->lqe_lockh));
+ qsd_set_qunit(lqe, 0);
+ lqe->lqe_edquot = false;
+ }
+ lqe_write_unlock(lqe);
+
+ rc = qsd_id_lock_match(&qti->qti_lockh, NULL);
+ if (rc)
+ RETURN(rc);
+
+ ldlm_lock_decref_and_cancel(&qti->qti_lockh, qsd_id_einfo.ei_mode);
+ RETURN(0);
+}