--- /dev/null
+/* GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; if not, write to the
+ * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+ * Boston, MA 021110-1307, USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2011, 2012, Intel, Inc.
+ * Use is subject to license terms.
+ *
+ * Author: Johann Lombardi <johann@whamcloud.com>
+ * Author: Niu Yawei <niu@whamcloud.com>
+ */
+
+#ifndef EXPORT_SYMTAB
+# define EXPORT_SYMTAB
+#endif
+
+#define DEBUG_SUBSYSTEM S_LQUOTA
+
+#include <linux/module.h>
+#include <linux/slab.h>
+#include <obd_class.h>
+#include "lquota_internal.h"
+
+static int hash_lqs_cur_bits = HASH_LQE_CUR_BITS;
+CFS_MODULE_PARM(hash_lqs_cur_bits, "i", int, 0444,
+ "the current bits of lqe hash");
+cfs_mem_cache_t *lqe_kmem;
+
+static unsigned lqe64_hash_hash(cfs_hash_t *hs, const void *key, unsigned mask)
+{
+ return cfs_hash_u64_hash(*((__u64 *)key), mask);
+}
+
+static void *lqe64_hash_key(cfs_hlist_node_t *hnode)
+{
+ struct lquota_entry *lqe;
+ lqe = cfs_hlist_entry(hnode, struct lquota_entry, lqe_hash);
+ return &lqe->lqe_id.qid_uid;
+}
+
+static int lqe64_hash_keycmp(const void *key, cfs_hlist_node_t *hnode)
+{
+ struct lquota_entry *lqe;
+ lqe = cfs_hlist_entry(hnode, struct lquota_entry, lqe_hash);
+ return (lqe->lqe_id.qid_uid == *((__u64*)key));
+}
+
+static void *lqe_hash_object(cfs_hlist_node_t *hnode)
+{
+ return cfs_hlist_entry(hnode, struct lquota_entry, lqe_hash);
+}
+
+static void lqe_hash_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
+{
+ struct lquota_entry *lqe;
+ lqe = cfs_hlist_entry(hnode, struct lquota_entry, lqe_hash);
+ lqe_getref(lqe);
+}
+
+static void lqe_hash_put_locked(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
+{
+ struct lquota_entry *lqe;
+ lqe = cfs_hlist_entry(hnode, struct lquota_entry, lqe_hash);
+ lqe_putref(lqe);
+}
+
+static void lqe_hash_exit(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
+{
+ CERROR("Should not have any item left!\n");
+}
+
+/* lqe hash methods for 64-bit uid/gid, new hash functions would have to be
+ * defined for per-directory quota relying on a 128-bit FID */
+static cfs_hash_ops_t lqe64_hash_ops = {
+ .hs_hash = lqe64_hash_hash,
+ .hs_key = lqe64_hash_key,
+ .hs_keycmp = lqe64_hash_keycmp,
+ .hs_object = lqe_hash_object,
+ .hs_get = lqe_hash_get,
+ .hs_put_locked = lqe_hash_put_locked,
+ .hs_exit = lqe_hash_exit
+};
+
+/* Logging helper function */
+void lquota_lqe_debug0(struct lquota_entry *lqe,
+ struct libcfs_debug_msg_data *msgdata,
+ const char *fmt, ...)
+{
+ struct lquota_site *site = lqe->lqe_site;
+ va_list args;
+
+ LASSERT(site->lqs_ops->lqe_debug != NULL);
+
+ va_start(args, fmt);
+ site->lqs_ops->lqe_debug(lqe, site->lqs_parent, msgdata, fmt, args);
+ va_end(args);
+}
+
+struct lqe_iter_data {
+ unsigned long lid_inuse;
+ unsigned long lid_freed;
+ bool lid_free_all;
+};
+
+static int lqe_iter_cb(cfs_hash_t *hs, cfs_hash_bd_t *bd,
+ cfs_hlist_node_t *hnode, void *data)
+{
+ struct lqe_iter_data *d = (struct lqe_iter_data *)data;
+ struct lquota_entry *lqe;
+
+ lqe = cfs_hlist_entry(hnode, struct lquota_entry, lqe_hash);
+ LASSERT(atomic_read(&lqe->lqe_ref) > 0);
+
+ /* Only one reference held by hash table, and nobody else can
+ * grab the entry at this moment, it's safe to remove it from
+ * the hash and free it. */
+ if (atomic_read(&lqe->lqe_ref) == 1) {
+ if (!lqe_is_master(lqe)) {
+ LASSERT(lqe->lqe_pending_write == 0);
+ LASSERT(lqe->lqe_pending_req == 0);
+ }
+ if (d->lid_free_all || lqe->lqe_enforced) {
+ d->lid_freed++;
+ cfs_hash_bd_del_locked(hs, bd, hnode);
+ return 0;
+ }
+ }
+ d->lid_inuse++;
+
+ if (d->lid_free_all)
+ LQUOTA_ERROR(lqe, "Inuse quota entry");
+ return 0;
+}
+
+/**
+ * Cleanup the entries in the hashtable
+ *
+ * \param hash - hash table which stores quota entries
+ * \param free_all - free all entries or only free the entries
+ * without quota enforce ?
+ */
+static void lqe_cleanup(cfs_hash_t *hash, bool free_all)
+{
+ struct lqe_iter_data d;
+ int repeat = 0;
+ ENTRY;
+retry:
+ memset(&d, 0, sizeof(d));
+ d.lid_free_all = free_all;
+
+ cfs_hash_for_each_safe(hash, lqe_iter_cb, &d);
+
+ /* In most case, when this function is called on master or
+ * slave finalization, there should be no inuse quota entry.
+ *
+ * If the per-fs quota updating thread is still holding
+ * some entries, we just wait for it's finished. */
+ if (free_all && d.lid_inuse) {
+ CDEBUG(D_QUOTA, "Hash:%p has entries inuse: inuse:%lu, "
+ "freed:%lu, repeat:%u\n", hash,
+ d.lid_inuse, d.lid_freed, repeat);
+ repeat++;
+ cfs_schedule_timeout_and_set_state(CFS_TASK_INTERRUPTIBLE,
+ cfs_time_seconds(1));
+ goto retry;
+ }
+ EXIT;
+}
+
+/*
+ * Allocate a new lquota site.
+ *
+ * \param env - the environment passed by the caller
+ * \param parent - is a pointer to the parent structure, either a qmt_pool_info
+ * structure on the master or a qsd_qtype_info structure on the
+ * slave.
+ * \param is_master - is set when the site belongs to a QMT.
+ * \param qtype - is the quota type managed by this site
+ * \param ops - is the quota entry operation vector to be used for quota
+ * entry belonging to this site.
+ *
+ * \retval 0 - success
+ * \retval -ve - failure
+ */
+struct lquota_site *lquota_site_alloc(const struct lu_env *env, void *parent,
+ bool is_master, short qtype,
+ struct lquota_entry_operations *ops)
+{
+ struct lquota_site *site;
+ char hashname[15];
+ ENTRY;
+
+ LASSERT(qtype < MAXQUOTAS);
+
+ OBD_ALLOC_PTR(site);
+ if (site == NULL)
+ RETURN(ERR_PTR(-ENOMEM));
+
+ /* assign parameters */
+ site->lqs_qtype = qtype;
+ site->lqs_parent = parent;
+ site->lqs_is_mst = is_master;
+ site->lqs_ops = ops;
+
+ /* allocate hash table */
+ memset(hashname, 0, sizeof(hashname));
+ sprintf(hashname, "LQUOTA_HASH%u", qtype);
+ site->lqs_hash= cfs_hash_create(hashname, hash_lqs_cur_bits,
+ HASH_LQE_MAX_BITS,
+ min(hash_lqs_cur_bits,
+ HASH_LQE_BKT_BITS),
+ 0, CFS_HASH_MIN_THETA,
+ CFS_HASH_MAX_THETA, &lqe64_hash_ops,
+ CFS_HASH_DEFAULT|CFS_HASH_BIGNAME);
+ if (site->lqs_hash == NULL) {
+ OBD_FREE_PTR(site);
+ RETURN(ERR_PTR(-ENOMEM));
+ }
+
+ RETURN(site);
+}
+
+/*
+ * Destroy a lquota site.
+ *
+ * \param env - the environment passed by the caller
+ * \param site - lquota site to be destroyed
+ *
+ * \retval 0 - success
+ * \retval -ve - failure
+ */
+void lquota_site_free(const struct lu_env *env, struct lquota_site *site)
+{
+ /* cleanup hash table */
+ lqe_cleanup(site->lqs_hash, true);
+ cfs_hash_putref(site->lqs_hash);
+
+ site->lqs_parent = NULL;
+ OBD_FREE_PTR(site);
+}
+
+/*
+ * Initialize qsd/qmt-specific fields of quota entry.
+ *
+ * \param lqe - is the quota entry to initialize
+ */
+static void lqe_init(struct lquota_entry *lqe)
+{
+ struct lquota_site *site;
+ ENTRY;
+
+ LASSERT(lqe != NULL);
+ site = lqe->lqe_site;
+ LASSERT(site != NULL);
+ LASSERT(site->lqs_ops->lqe_init != NULL);
+
+ LQUOTA_DEBUG(lqe, "init");
+
+ site->lqs_ops->lqe_init(lqe, site->lqs_parent);
+}
+
+/*
+ * Update a lquota entry. This is done by reading quota settings from the
+ * on-disk index. The lquota entry must be write locked.
+ *
+ * \param env - the environment passed by the caller
+ * \param lqe - is the quota entry to refresh
+ */
+static int lqe_read(const struct lu_env *env, struct lquota_entry *lqe)
+{
+ struct lquota_site *site;
+ int rc;
+ ENTRY;
+
+ LASSERT(lqe != NULL);
+ site = lqe->lqe_site;
+ LASSERT(site != NULL);
+ LASSERT(site->lqs_ops->lqe_read != NULL);
+
+ LQUOTA_DEBUG(lqe, "read");
+
+ rc = site->lqs_ops->lqe_read(env, lqe, site->lqs_parent);
+ if (rc == 0)
+ /* mark the entry as up-to-date */
+ lqe->lqe_uptodate = true;
+
+ RETURN(rc);
+}
+
+/*
+ * Find or create a quota entry.
+ *
+ * \param env - the environment passed by the caller
+ * \param site - lquota site which stores quota entries in a hash table
+ * \param qid - is the quota ID to be found/created
+ *
+ * \retval 0 - success
+ * \retval -ve - failure
+ */
+struct lquota_entry *lqe_locate(const struct lu_env *env,
+ struct lquota_site *site, union lquota_id *qid)
+{
+ struct lquota_entry *lqe, *new = NULL;
+ int rc = 0;
+ ENTRY;
+
+ lqe = cfs_hash_lookup(site->lqs_hash, (void *)&qid->qid_uid);
+ if (lqe != NULL) {
+ LASSERT(lqe->lqe_uptodate);
+ RETURN(lqe);
+ }
+
+ OBD_SLAB_ALLOC_PTR_GFP(new, lqe_kmem, CFS_ALLOC_IO);
+ if (new == NULL) {
+ CERROR("Fail to allocate lqe for id:"LPU64", "
+ "hash:%s\n", qid->qid_uid, site->lqs_hash->hs_name);
+ RETURN(ERR_PTR(-ENOMEM));
+ }
+
+ atomic_set(&new->lqe_ref, 1); /* hold 1 for caller */
+ new->lqe_id = *qid;
+ new->lqe_site = site;
+ CFS_INIT_LIST_HEAD(&new->lqe_link);
+
+ /* quota settings need to be updated from disk, that's why
+ * lqe->lqe_uptodate isn't set yet */
+ new->lqe_uptodate = false;
+
+ /* perform qmt/qsd specific initialization */
+ lqe_init(new);
+
+ /* read quota settings from disk and mark lqe as up-to-date */
+ rc = lqe_read(env, new);
+ if (rc)
+ GOTO(out, lqe = ERR_PTR(rc));
+
+ /* add new entry to hash */
+ lqe = cfs_hash_findadd_unique(site->lqs_hash, &new->lqe_id.qid_uid,
+ &new->lqe_hash);
+ if (lqe == new)
+ new = NULL;
+out:
+ if (new)
+ lqe_putref(new);
+ RETURN(lqe);
+}
LQUOTA_GENERATED_OID = 4096UL,
};
+/*
+ * lquota_entry support
+ */
+
+/* Common operations supported by a lquota_entry */
+struct lquota_entry_operations {
+ /* Initialize specific fields of a lquota entry */
+ void (*lqe_init)(struct lquota_entry *, void *arg);
+
+ /* Read quota settings from disk and update lquota entry */
+ int (*lqe_read)(const struct lu_env *, struct lquota_entry *,
+ void *arg);
+
+ /* Print debug information about a given lquota entry */
+ void (*lqe_debug)(struct lquota_entry *, void *,
+ struct libcfs_debug_msg_data *, const char *,
+ va_list);
+};
+
+/* Per-ID information specific to the quota master target */
+struct lquota_mst_entry {
+ /* global hard limit, in inodes or kbytes */
+ __u64 lme_hardlimit;
+
+ /* global quota soft limit, in inodes or kbytes */
+ __u64 lme_softlimit;
+
+ /* grace time, in seconds */
+ __u64 lme_gracetime;
+
+ /* last time we glimpsed */
+ __u64 lme_revoke_time;
+
+ /* r/w semaphore used to protect concurrent access to the quota
+ * parameters which are stored on disk */
+ cfs_rw_semaphore_t lme_sem;
+
+ /* quota space that may be released after glimpse */
+ __u64 lme_may_rel;
+};
+
+/* Per-ID information specific to the quota slave */
+struct lquota_slv_entry {
+ /* [ib]tune size, inodes or kbytes */
+ __u64 lse_qtune;
+
+ /* per-ID lock handle */
+ struct lustre_handle lse_lockh;
+
+ /* pending write which were granted quota space but haven't completed
+ * yet, in inodes or kbytes. */
+ __u64 lse_pending_write;
+
+ /* writes waiting for quota space, in inodes or kbytes. */
+ __u64 lse_waiting_write;
+
+ /* pending release, in inodes or kbytes */
+ __u64 lse_pending_rel;
+
+ /* pending dqacq/dqrel requests. */
+ unsigned int lse_pending_req;
+
+ /* rw spinlock protecting in-memory counters (i.e. lse_pending*) */
+ cfs_rwlock_t lse_lock;
+
+ /* waiter for pending request done */
+ cfs_waitq_t lse_waiters;
+
+ /* hint on current on-disk usage, in inodes or kbytes */
+ __u64 lse_usage;
+
+ /* time to trigger quota adjust */
+ __u64 lse_adjust_time;
+};
+
+/* In-memory entry for each enforced quota id
+ * A lquota_entry structure belong to a single lquota_site */
+struct lquota_entry {
+ /* link to site hash table */
+ cfs_hlist_node_t lqe_hash;
+
+ /* quota identifier associated with this entry */
+ union lquota_id lqe_id;
+
+ /* site this quota entry belongs to */
+ struct lquota_site *lqe_site;
+
+ /* reference counter */
+ cfs_atomic_t lqe_ref;
+
+ /* linked to list of lqes which:
+ * - need quota space adjustment on slave
+ * - need glimpse to be sent on master */
+ cfs_list_t lqe_link;
+
+ /* current quota settings/usage of this ID */
+ __u64 lqe_granted; /* granted limit, inodes or kbytes */
+ __u64 lqe_qunit; /* [ib]unit size, inodes or kbytes */
+ union {
+ struct lquota_mst_entry me; /* params specific to QMT */
+ struct lquota_slv_entry se; /* params specific to QSD */
+ } u;
+
+ /* flags describing the state of the lquota_entry */
+ unsigned long lqe_enforced:1,/* quota enforced or not */
+ lqe_uptodate:1,/* successfully read from disk */
+ lqe_edquot:1, /* id out of quota space on QMT */
+ lqe_gl:1, /* glimpse is in progress */
+ lqe_nopreacq:1;/* pre-acquire disabled */
+};
+
+/* Compartment within which lquota_entry are unique.
+ * lquota_entry structures are kept in a hash table and read from disk if not
+ * present. */
+struct lquota_site {
+ /* Hash table storing lquota_entry structures */
+ cfs_hash_t *lqs_hash;
+
+ /* Quota type, either user or group. */
+ int lqs_qtype;
+
+ /* Record whether this site is for a QMT or a slave */
+ int lqs_is_mst;
+
+ /* Vector of operations which can be done on lquota entry belonging to
+ * this quota site */
+ struct lquota_entry_operations *lqs_ops;
+
+ /* Backpointer to parent structure, either QMT pool info for master or
+ * QSD for slave */
+ void *lqs_parent;
+};
+
+#define lqe_hardlimit u.me.lme_hardlimit
+#define lqe_softlimit u.me.lme_softlimit
+#define lqe_gracetime u.me.lme_gracetime
+#define lqe_revoke_time u.me.lme_revoke_time
+#define lqe_sem u.me.lme_sem
+#define lqe_may_rel u.me.lme_may_rel
+
+#define lqe_qtune u.se.lse_qtune
+#define lqe_pending_write u.se.lse_pending_write
+#define lqe_waiting_write u.se.lse_waiting_write
+#define lqe_pending_rel u.se.lse_pending_rel
+#define lqe_pending_req u.se.lse_pending_req
+#define lqe_waiters u.se.lse_waiters
+#define lqe_lock u.se.lse_lock
+#define lqe_usage u.se.lse_usage
+#define lqe_adjust_time u.se.lse_adjust_time
+#define lqe_lockh u.se.lse_lockh
+
+#define LQUOTA_BUMP_VER 0x1
+#define LQUOTA_SET_VER 0x2
+
+/* helper routine to get/put reference on lquota_entry */
+static inline void lqe_getref(struct lquota_entry *lqe)
+{
+ cfs_atomic_inc(&lqe->lqe_ref);
+}
+
+static inline void lqe_putref(struct lquota_entry *lqe)
+{
+ LASSERT(atomic_read(&lqe->lqe_ref) > 0);
+ if (atomic_dec_and_test(&lqe->lqe_ref))
+ OBD_FREE_PTR(lqe);
+}
+
+static inline int lqe_is_master(struct lquota_entry *lqe)
+{
+ return lqe->lqe_site->lqs_is_mst;
+}
+
+/* lqe locking helpers */
+static inline void lqe_write_lock(struct lquota_entry *lqe)
+{
+ if (lqe_is_master(lqe))
+ cfs_down_write(&lqe->lqe_sem);
+ else
+ cfs_write_lock(&lqe->lqe_lock);
+}
+
+static inline void lqe_write_unlock(struct lquota_entry *lqe)
+{
+ if (lqe_is_master(lqe))
+ cfs_up_write(&lqe->lqe_sem);
+ else
+ cfs_write_unlock(&lqe->lqe_lock);
+}
+
+static inline void lqe_read_lock(struct lquota_entry *lqe)
+{
+ if (lqe_is_master(lqe))
+ cfs_down_read(&lqe->lqe_sem);
+ else
+ cfs_read_lock(&lqe->lqe_lock);
+}
+
+static inline void lqe_read_unlock(struct lquota_entry *lqe)
+{
+ if (lqe_is_master(lqe))
+ cfs_up_read(&lqe->lqe_sem);
+ else
+ cfs_read_unlock(&lqe->lqe_lock);
+}
+
/* Common data shared by quota-level handlers. This is allocated per-thread to
* reduce stack consumption */
struct lquota_thread_info {
return info;
}
+/* debugging macros */
+#ifdef LIBCFS_DEBUG
+#define lquota_lqe_debug(msgdata, mask, cdls, lqe, fmt, a...) do { \
+ CFS_CHECK_STACK(msgdata, mask, cdls); \
+ \
+ if (((mask) & D_CANTMASK) != 0 || \
+ ((libcfs_debug & (mask)) != 0 && \
+ (libcfs_subsystem_debug & DEBUG_SUBSYSTEM) != 0)) \
+ lquota_lqe_debug0(lqe, msgdata, fmt, ##a); \
+} while(0)
+
+void lquota_lqe_debug0(struct lquota_entry *lqe,
+ struct libcfs_debug_msg_data *data, const char *fmt, ...)
+ __attribute__ ((format (printf, 3, 4)));
+
+#define LQUOTA_DEBUG_LIMIT(mask, lqe, fmt, a...) do { \
+ static cfs_debug_limit_state_t _lquota_cdls; \
+ LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, mask, &_lquota_cdls); \
+ lquota_lqe_debug(&msgdata, mask, &_lquota_cdls, lqe, "$$$ "fmt" ", \
+ ##a); \
+} while (0)
+
+#define LQUOTA_ERROR(lqe, fmt, a...) LQUOTA_DEBUG_LIMIT(D_ERROR, lqe, fmt, ## a)
+#define LQUOTA_WARN(lqe, fmt, a...) \
+ LQUOTA_DEBUG_LIMIT(D_WARNING, lqe, fmt, ## a)
+#define LQUOTA_CONSOLE(lqe, fmt, a...) \
+ LQUOTA_DEBUG_LIMIT(D_CONSOLE, lqe, fmt, ## a)
+
+#define LQUOTA_DEBUG(lock, fmt, a...) do { \
+ LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_QUOTA, NULL); \
+ lquota_lqe_debug(&msgdata, D_QUOTA, NULL, lqe, "$$$ "fmt" ", ##a); \
+} while (0)
+#else /* !LIBCFS_DEBUG */
+# define LQUOTA_DEBUG(lqe, fmt, a...) ((void)0)
+# define LQUOTA_ERROR(lqe, fmt, a...) ((void)0)
+# define LQUOTA_WARN(lqe, fmt, a...) ((void)0)
+# define LQUOTA_CONSOLE(lqe, fmt, a...) ((void)0)
+# define lquota_lqe_debug(cdls, level, lqe, file, func, line, fmt, a...) \
+ ((void)0)
+#endif
+
/* lquota_lib.c */
struct dt_object *acct_obj_lookup(const struct lu_env *, struct dt_device *,
int);
int lquota_extract_fid(struct lu_fid *, int *, int *, int *);
const struct dt_index_features *glb_idx_feature(struct lu_fid *);
+/* lquota_entry.c */
+/* site create/destroy */
+struct lquota_site *lquota_site_alloc(const struct lu_env *, void *, bool,
+ short, struct lquota_entry_operations *);
+void lquota_site_free(const struct lu_env *, struct lquota_site *);
+/* quota entry operations */
+struct lquota_entry *lqe_locate(const struct lu_env *, struct lquota_site *,
+ union lquota_id *);
+
/* lquota_disk.c */
struct dt_object *lquota_disk_dir_find_create(const struct lu_env *,
struct dt_device *,