Whamcloud - gitweb
New tag 2.15.63
[fs/lustre-release.git] / lustre / lod / lod_pool.c
index 27b945e..b7c3380 100644 (file)
  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2014, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
- * Lustre is a trademark of Sun Microsystems, Inc.
  */
 /*
  * lustre/lod/lod_pool.c
@@ -58,6 +57,8 @@
 #define DEBUG_SUBSYSTEM S_LOV
 
 #include <libcfs/libcfs.h>
+#include <libcfs/linux/linux-hash.h>
+#include <libcfs/linux/linux-fs.h>
 #include <obd.h>
 #include "lod_internal.h"
 
  *
  * \param[in] pool     pool descriptor on which to gain reference
  */
-static void pool_getref(struct pool_desc *pool)
+static void pool_getref(struct lod_pool_desc *pool)
 {
        CDEBUG(D_INFO, "pool %p\n", pool);
-       atomic_inc(&pool->pool_refcount);
+       kref_get(&pool->pool_refcount);
+}
+
+static void lod_pool_putref_free(struct kref *kref)
+{
+       struct lod_pool_desc *pool = container_of(kref, struct lod_pool_desc,
+                                                 pool_refcount);
+
+       LASSERT(list_empty(&pool->pool_list));
+       LASSERT(pool->pool_proc_entry == NULL);
+       lu_tgt_pool_free(&(pool->pool_rr.lqr_pool));
+       lu_tgt_pool_free(&(pool->pool_obds));
+       kfree_rcu(pool, pool_rcu);
+       EXIT;
 }
 
 /**
@@ -90,136 +104,38 @@ static void pool_getref(struct pool_desc *pool)
  * it is explicitly destroyed by the sysadmin.  The pool structure is freed
  * after the last reference on the structure is released.
  *
- * \param[in] pool     pool descriptor to drop reference on and possibly free
- */
-void lod_pool_putref(struct pool_desc *pool)
-{
-       CDEBUG(D_INFO, "pool %p\n", pool);
-       if (atomic_dec_and_test(&pool->pool_refcount)) {
-               LASSERT(hlist_unhashed(&pool->pool_hash));
-               LASSERT(list_empty(&pool->pool_list));
-               LASSERT(pool->pool_proc_entry == NULL);
-               lod_ost_pool_free(&(pool->pool_rr.lqr_pool));
-               lod_ost_pool_free(&(pool->pool_obds));
-               OBD_FREE_PTR(pool);
-               EXIT;
-       }
-}
-
-/**
- * Drop the refcount in cases where the caller holds a spinlock.
- *
- * This is needed if the caller cannot be blocked while freeing memory.
- * It assumes that there is some other known refcount held on the \a pool
- * and the memory cannot actually be freed, but the refcounting needs to
- * be kept accurate.
- *
- * \param[in] pool     pool descriptor on which to drop reference
+ * \param[in] pool     lod pool descriptor to drop reference on and possibly
+ *                     free
  */
-static void pool_putref_locked(struct pool_desc *pool)
+void lod_pool_putref(struct lod_pool_desc *pool)
 {
        CDEBUG(D_INFO, "pool %p\n", pool);
-       LASSERT(atomic_read(&pool->pool_refcount) > 1);
-
-       atomic_dec(&pool->pool_refcount);
-}
-
-/*
- * Group of functions needed for cfs_hash implementation.  This
- * includes pool lookup, refcounting, and cleanup.
- */
-
-/**
- * Hash the pool name for use by the cfs_hash handlers.
- *
- * Use the standard DJB2 hash function for ASCII strings in Lustre.
- *
- * \param[in] hash_body        hash structure where this key is embedded (unused)
- * \param[in] key      key to be hashed (in this case the pool name)
- * \param[in] mask     bitmask to limit the hash value to the desired size
- *
- * \retval             computed hash value from \a key and limited by \a mask
- */
-static __u32 pool_hashfn(struct cfs_hash *hash_body, const void *key,
-                        unsigned mask)
-{
-       return cfs_hash_djb2_hash(key, strnlen(key, LOV_MAXPOOLNAME), mask);
-}
-
-/**
- * Return the actual key (pool name) from the hashed \a hnode.
- *
- * Allows extracting the key name when iterating over all hash entries.
- *
- * \param[in] hnode    hash node found by lookup or iteration
- *
- * \retval             char array referencing the pool name (no refcount)
- */
-static void *pool_key(struct hlist_node *hnode)
-{
-       struct pool_desc *pool;
-
-       pool = hlist_entry(hnode, struct pool_desc, pool_hash);
-       return pool->pool_name;
-}
-
-/**
- * Check if the specified hash key matches the hash node.
- *
- * This is needed in case there is a hash key collision, allowing the hash
- * table lookup/iteration to distinguish between the two entries.
- *
- * \param[in] key      key (pool name) being searched for
- * \param[in] compared current entry being compared
- *
- * \retval             0 if \a key is the same as the key of \a compared
- * \retval             1 if \a key is different from the key of \a compared
- */
-static int pool_hashkey_keycmp(const void *key, struct hlist_node *compared)
-{
-       return !strncmp(key, pool_key(compared), LOV_MAXPOOLNAME);
-}
-
-/**
- * Return the actual pool data structure from the hash table entry.
- *
- * Once the hash table entry is found, extract the pool data from it.
- * The return type of this function is void * because it needs to be
- * assigned to the generic hash operations table.
- *
- * \param[in] hnode    hash table entry
- *
- * \retval             struct pool_desc for the specified \a hnode
- */
-static void *pool_hashobject(struct hlist_node *hnode)
-{
-       return hlist_entry(hnode, struct pool_desc, pool_hash);
+       kref_put(&pool->pool_refcount, lod_pool_putref_free);
 }
 
-static void pool_hashrefcount_get(struct cfs_hash *hs, struct hlist_node *hnode)
+static u32 pool_hashfh(const void *data, u32 len, u32 seed)
 {
-       struct pool_desc *pool;
+       const char *pool_name = data;
 
-       pool = hlist_entry(hnode, struct pool_desc, pool_hash);
-       pool_getref(pool);
+       return hashlen_hash(cfs_hashlen_string((void *)(unsigned long)seed,
+                                              pool_name));
 }
 
-static void pool_hashrefcount_put_locked(struct cfs_hash *hs,
-                                        struct hlist_node *hnode)
+static int pool_cmpfn(struct rhashtable_compare_arg *arg, const void *obj)
 {
-       struct pool_desc *pool;
+       const struct lod_pool_desc *pool = obj;
+       const char *pool_name = arg->key;
 
-       pool = hlist_entry(hnode, struct pool_desc, pool_hash);
-       pool_putref_locked(pool);
+       return strcmp(pool_name, pool->pool_name);
 }
 
-struct cfs_hash_ops pool_hash_operations = {
-       .hs_hash        = pool_hashfn,
-       .hs_key         = pool_key,
-       .hs_keycmp      = pool_hashkey_keycmp,
-       .hs_object      = pool_hashobject,
-       .hs_get         = pool_hashrefcount_get,
-       .hs_put_locked  = pool_hashrefcount_put_locked,
+static const struct rhashtable_params pools_hash_params = {
+       .key_len        = 1, /* actually variable */
+       .key_offset     = offsetof(struct lod_pool_desc, pool_name),
+       .head_offset    = offsetof(struct lod_pool_desc, pool_hash),
+       .hashfn         = pool_hashfh,
+       .obj_cmpfn      = pool_cmpfn,
+       .automatic_shrinking = true,
 };
 
 /*
@@ -230,7 +146,7 @@ struct cfs_hash_ops pool_hash_operations = {
 struct lod_pool_iterator {
        unsigned int      lpi_magic;    /* POOL_IT_MAGIC */
        unsigned int      lpi_idx;      /* from 0 to pool_tgt_size - 1 */
-       struct pool_desc *lpi_pool;
+       struct lod_pool_desc *lpi_pool;
 };
 
 /**
@@ -256,21 +172,21 @@ static void *pool_proc_next(struct seq_file *seq, void *v, loff_t *pos)
 
        LASSERTF(iter->lpi_magic == POOL_IT_MAGIC, "%08X\n", iter->lpi_magic);
 
+       (*pos)++;
        /* test if end of file */
-       if (*pos >= pool_tgt_count(iter->lpi_pool))
+       if (*pos > pool_tgt_count(iter->lpi_pool))
                return NULL;
 
+       CFS_FAIL_TIMEOUT(OBD_FAIL_OST_LIST_ASSERT, cfs_fail_val);
+
        /* iterate to find a non empty entry */
        prev_idx = iter->lpi_idx;
-       down_read(&pool_tgt_rw_sem(iter->lpi_pool));
        iter->lpi_idx++;
-       if (iter->lpi_idx == pool_tgt_count(iter->lpi_pool)) {
+       if (iter->lpi_idx >= pool_tgt_count(iter->lpi_pool)) {
                iter->lpi_idx = prev_idx; /* we stay on the last entry */
-               up_read(&pool_tgt_rw_sem(iter->lpi_pool));
                return NULL;
        }
-       up_read(&pool_tgt_rw_sem(iter->lpi_pool));
-       (*pos)++;
+
        /* return != NULL to continue */
        return iter;
 }
@@ -292,7 +208,7 @@ static void *pool_proc_next(struct seq_file *seq, void *v, loff_t *pos)
  */
 static void *pool_proc_start(struct seq_file *seq, loff_t *pos)
 {
-       struct pool_desc *pool = seq->private;
+       struct lod_pool_desc *pool = seq->private;
        struct lod_pool_iterator *iter;
 
        pool_getref(pool);
@@ -312,6 +228,7 @@ static void *pool_proc_start(struct seq_file *seq, loff_t *pos)
        iter->lpi_idx = 0;
 
        seq->private = iter;
+       down_read(&pool_tgt_rw_sem(pool));
        if (*pos > 0) {
                loff_t i;
                void *ptr;
@@ -346,6 +263,7 @@ static void pool_proc_stop(struct seq_file *seq, void *v)
        struct lod_pool_iterator *iter = seq->private;
 
        if (iter != NULL && iter->lpi_magic == POOL_IT_MAGIC) {
+               up_read(&pool_tgt_rw_sem(iter->lpi_pool));
                seq->private = iter->lpi_pool;
                lod_pool_putref(iter->lpi_pool);
                OBD_FREE_PTR(iter);
@@ -369,9 +287,7 @@ static int pool_proc_show(struct seq_file *seq, void *v)
        LASSERT(iter->lpi_pool != NULL);
        LASSERT(iter->lpi_idx <= pool_tgt_count(iter->lpi_pool));
 
-       down_read(&pool_tgt_rw_sem(iter->lpi_pool));
        tgt = pool_tgt(iter->lpi_pool, iter->lpi_idx);
-       up_read(&pool_tgt_rw_sem(iter->lpi_pool));
        if (tgt != NULL)
                seq_printf(seq, "%s\n", obd_uuid2str(&(tgt->ltd_uuid)));
 
@@ -403,229 +319,88 @@ static int pool_proc_open(struct inode *inode, struct file *file)
        rc = seq_open(file, &pool_proc_ops);
        if (!rc) {
                struct seq_file *seq = file->private_data;
-               seq->private = PDE_DATA(inode);
+               seq->private = pde_data(inode);
        }
        return rc;
 }
 
-static struct file_operations pool_proc_operations = {
-       .open           = pool_proc_open,
-       .read           = seq_read,
-       .llseek         = seq_lseek,
-       .release        = seq_release,
+const static struct proc_ops pool_proc_operations = {
+       .proc_open      = pool_proc_open,
+       .proc_read      = seq_read,
+       .proc_lseek     = seq_lseek,
+       .proc_release   = seq_release,
 };
 
-/**
- * Dump the pool target list into the Lustre debug log.
- *
- * This is a debugging function to allow dumping the list of targets
- * in \a pool to the Lustre kernel debug log at the given \a level.
- *
- * This is not currently called by any existing code, but can be called
- * from within gdb/crash to display the contents of the pool, or from
- * code under development.
- *
- * \param[in] level    Lustre debug level (D_INFO, D_WARN, D_ERROR, etc)
- * \param[in] pool     pool descriptor to be dumped
- */
-void lod_dump_pool(int level, struct pool_desc *pool)
+static void pools_hash_exit(void *vpool, void *data)
 {
-       unsigned int i;
-
-       pool_getref(pool);
+       struct lod_pool_desc *pool = vpool;
 
-       CDEBUG(level, "pool "LOV_POOLNAMEF" has %d members\n",
-              pool->pool_name, pool->pool_obds.op_count);
-       down_read(&pool_tgt_rw_sem(pool));
-
-       for (i = 0; i < pool_tgt_count(pool) ; i++) {
-               if (!pool_tgt(pool, i) || !(pool_tgt(pool, i))->ltd_exp)
-                       continue;
-               CDEBUG(level, "pool "LOV_POOLNAMEF"[%d] = %s\n",
-                      pool->pool_name, i,
-                      obd_uuid2str(&((pool_tgt(pool, i))->ltd_uuid)));
-       }
-
-       up_read(&pool_tgt_rw_sem(pool));
        lod_pool_putref(pool);
 }
 
-/**
- * Initialize the pool data structures at startup.
- *
- * Allocate and initialize the pool data structures with the specified
- * array size.  If pool count is not specified (\a count == 0), then
- * POOL_INIT_COUNT will be used.  Allocating a non-zero initial array
- * size avoids the need to reallocate as new pools are added.
- *
- * \param[in] op       pool structure
- * \param[in] count    initial size of the target op_array[] array
- *
- * \retval             0 indicates successful pool initialization
- * \retval             negative error number on failure
- */
-#define POOL_INIT_COUNT 2
-int lod_ost_pool_init(struct ost_pool *op, unsigned int count)
+int lod_pool_hash_init(struct rhashtable *tbl)
 {
-       ENTRY;
-
-       if (count == 0)
-               count = POOL_INIT_COUNT;
-       op->op_array = NULL;
-       op->op_count = 0;
-       init_rwsem(&op->op_rw_sem);
-       op->op_size = count * sizeof(op->op_array[0]);
-       OBD_ALLOC(op->op_array, op->op_size);
-       if (op->op_array == NULL) {
-               op->op_size = 0;
-               RETURN(-ENOMEM);
-       }
-       EXIT;
-       return 0;
+       return rhashtable_init(tbl, &pools_hash_params);
 }
 
-/**
- * Increase the op_array size to hold more targets in this pool.
- *
- * The size is increased to at least \a min_count, but may be larger
- * for an existing pool since ->op_array[] is growing exponentially.
- * Caller must hold write op_rwlock.
- *
- * \param[in] op       pool structure
- * \param[in] min_count        minimum number of entries to handle
- *
- * \retval             0 on success
- * \retval             negative error number on failure.
- */
-int lod_ost_pool_extend(struct ost_pool *op, unsigned int min_count)
+void lod_pool_hash_destroy(struct rhashtable *tbl)
 {
-       __u32 *new;
-       __u32 new_size;
-
-       LASSERT(min_count != 0);
-
-       if (op->op_count * sizeof(op->op_array[0]) < op->op_size)
-               return 0;
-
-       new_size = max_t(__u32, min_count * sizeof(op->op_array[0]),
-                        2 * op->op_size);
-       OBD_ALLOC(new, new_size);
-       if (new == NULL)
-               return -ENOMEM;
-
-       /* copy old array to new one */
-       memcpy(new, op->op_array, op->op_size);
-       OBD_FREE(op->op_array, op->op_size);
-       op->op_array = new;
-       op->op_size = new_size;
-
-       return 0;
+       rhashtable_free_and_destroy(tbl, pools_hash_exit, NULL);
 }
 
-/**
- * Add a new target to an existing pool.
- *
- * Add a new target device to the pool previously created and returned by
- * lod_pool_new().  Each target can only be in each pool at most one time.
- *
- * \param[in] op       target pool to add new entry
- * \param[in] idx      pool index number to add to the \a op array
- * \param[in] min_count        minimum number of entries to expect in the pool
- *
- * \retval             0 if target could be added to the pool
- * \retval             negative error if target \a idx was not added
- */
-int lod_ost_pool_add(struct ost_pool *op, __u32 idx, unsigned int min_count)
+bool lod_pool_exists(struct lod_device *lod, char *poolname)
 {
-       unsigned int i;
-       int rc = 0;
-       ENTRY;
-
-       down_write(&op->op_rw_sem);
-
-       rc = lod_ost_pool_extend(op, min_count);
-       if (rc)
-               GOTO(out, rc);
-
-       /* search ost in pool array */
-       for (i = 0; i < op->op_count; i++) {
-               if (op->op_array[i] == idx)
-                       GOTO(out, rc = -EEXIST);
-       }
-       /* ost not found we add it */
-       op->op_array[op->op_count] = idx;
-       op->op_count++;
-       EXIT;
-out:
-       up_write(&op->op_rw_sem);
-       return rc;
+       struct lod_pool_desc *pool;
+
+       rcu_read_lock();
+       pool = rhashtable_lookup(&lod->lod_pools_hash_body,
+                               poolname,
+                               pools_hash_params);
+       rcu_read_unlock();
+       return pool != NULL;
 }
 
-/**
- * Remove an existing pool from the system.
- *
- * The specified pool must have previously been allocated by
- * lod_pool_new() and not have any target members in the pool.
- * If the removed target is not the last, compact the array
- * to remove empty spaces.
- *
- * \param[in] op       pointer to the original data structure
- * \param[in] idx      target index to be removed
- *
- * \retval             0 on success
- * \retval             negative error number on failure
- */
-int lod_ost_pool_remove(struct ost_pool *op, __u32 idx)
+struct lod_pool_desc *lod_pool_find(struct lod_device *lod, const char *poolname)
 {
-       unsigned int i;
-       ENTRY;
-
-       down_write(&op->op_rw_sem);
-
-       for (i = 0; i < op->op_count; i++) {
-               if (op->op_array[i] == idx) {
-                       memmove(&op->op_array[i], &op->op_array[i + 1],
-                               (op->op_count - i - 1) *
-                               sizeof(op->op_array[0]));
-                       op->op_count--;
-                       up_write(&op->op_rw_sem);
-                       EXIT;
-                       return 0;
-               }
-       }
-
-       up_write(&op->op_rw_sem);
-       RETURN(-EINVAL);
+       struct lod_pool_desc *pool;
+
+       rcu_read_lock();
+       pool = rhashtable_lookup(&lod->lod_pools_hash_body,
+                               poolname,
+                               pools_hash_params);
+       if (pool && !kref_get_unless_zero(&pool->pool_refcount))
+               pool = NULL;
+       rcu_read_unlock();
+       return pool;
 }
 
-/**
- * Free the pool after it was emptied and removed from /proc.
- *
- * Note that all of the child/target entries referenced by this pool
- * must have been removed by lod_ost_pool_remove() before it can be
- * deleted from memory.
- *
- * \param[in] op       pool to be freed.
- *
- * \retval             0 on success or if pool was already freed
- */
-int lod_ost_pool_free(struct ost_pool *op)
+static int lod_ost_pool_weights_seq_show(struct seq_file *m, void *data)
 {
-       ENTRY;
-
-       if (op->op_size == 0)
-               RETURN(0);
+       struct lod_pool_desc *pool = m->private;
+       struct lod_device *lod = lu2lod_dev(pool->pool_lobd->obd_lu_dev);
 
-       down_write(&op->op_rw_sem);
+       return lod_tgt_weights_seq_show(m, lod, &pool->pool_obds, false);
+}
 
-       OBD_FREE(op->op_array, op->op_size);
-       op->op_array = NULL;
-       op->op_count = 0;
-       op->op_size = 0;
+static ssize_t
+lod_ost_pool_weights_seq_write(struct file *file, const char __user *buf,
+                              size_t count, loff_t *off)
+{
+       struct seq_file *m = file->private_data;
+       struct lod_pool_desc *pool = m->private;
+       struct lod_device *lod = lu2lod_dev(pool->pool_lobd->obd_lu_dev);
 
-       up_write(&op->op_rw_sem);
-       RETURN(0);
+       return lod_tgt_weights_seq_write(m, buf, count, lod, &pool->pool_obds,
+                                        false);
 }
+LDEBUGFS_SEQ_FOPS(lod_ost_pool_weights);
+
+static struct ldebugfs_vars ldebugfs_lod_pool_vars[] = {
+       { .name =       "qos_ost_weights",
+         .fops =       &lod_ost_pool_weights_fops,
+         .proc_mode =  0444 },
+       { 0 }
+};
 
 /**
  * Allocate a new pool for the specified device.
@@ -644,31 +419,33 @@ int lod_ost_pool_free(struct ost_pool *op)
 int lod_pool_new(struct obd_device *obd, char *poolname)
 {
        struct lod_device *lod = lu2lod_dev(obd->obd_lu_dev);
-       struct pool_desc  *new_pool;
+       struct lod_pool_desc  *new_pool;
        int rc;
        ENTRY;
 
        if (strlen(poolname) > LOV_MAXPOOLNAME)
                RETURN(-ENAMETOOLONG);
 
-       OBD_ALLOC_PTR(new_pool);
+       /* OBD_ALLOC_* doesn't work with direct kfree_rcu use */
+       new_pool = kmalloc(sizeof(*new_pool), __GFP_ZERO);
        if (new_pool == NULL)
                RETURN(-ENOMEM);
 
-       strlcpy(new_pool->pool_name, poolname, sizeof(new_pool->pool_name));
+       strscpy(new_pool->pool_name, poolname, sizeof(new_pool->pool_name));
+       new_pool->pool_spill_target[0] = '\0';
+       atomic_set(&new_pool->pool_spill_hit, 0);
        new_pool->pool_lobd = obd;
-       atomic_set(&new_pool->pool_refcount, 1);
-       rc = lod_ost_pool_init(&new_pool->pool_obds, 0);
+       kref_init(&new_pool->pool_refcount);
+       rc = lu_tgt_pool_init(&new_pool->pool_obds, 0);
        if (rc)
-               GOTO(out_err, rc);
+               GOTO(out_free_pool, rc);
 
-       lod_qos_rr_init(&new_pool->pool_rr);
-       rc = lod_ost_pool_init(&new_pool->pool_rr.lqr_pool, 0);
+       lu_qos_rr_init(&new_pool->pool_rr);
+
+       rc = lu_tgt_pool_init(&new_pool->pool_rr.lqr_pool, 0);
        if (rc)
                GOTO(out_free_pool_obds, rc);
 
-       INIT_HLIST_NODE(&new_pool->pool_hash);
-
 #ifdef CONFIG_PROC_FS
        pool_getref(new_pool);
        new_pool->pool_proc_entry = lprocfs_add_simple(lod->lod_pool_proc_entry,
@@ -680,6 +457,17 @@ int lod_pool_new(struct obd_device *obd, char *poolname)
                new_pool->pool_proc_entry = NULL;
                lod_pool_putref(new_pool);
        }
+
+       pool_getref(new_pool);
+       new_pool->pool_spill_proc_entry =
+               lprocfs_register(poolname, lod->lod_spill_proc_entry,
+                       lprocfs_lod_spill_vars, new_pool);
+       if (IS_ERR(new_pool->pool_spill_proc_entry)) {
+               rc = PTR_ERR(new_pool->pool_spill_proc_entry);
+               new_pool->pool_proc_entry = NULL;
+               lod_pool_putref(new_pool);
+       }
+
        CDEBUG(D_INFO, "pool %p - proc %p\n", new_pool,
               new_pool->pool_proc_entry);
 #endif
@@ -689,11 +477,24 @@ int lod_pool_new(struct obd_device *obd, char *poolname)
        lod->lod_pool_count++;
        spin_unlock(&obd->obd_dev_lock);
 
-       /* add to find only when it fully ready  */
-       rc = cfs_hash_add_unique(lod->lod_pools_hash_body, poolname,
-                                &new_pool->pool_hash);
-       if (rc)
-               GOTO(out_err, rc = -EEXIST);
+       /* Add to hash table only when it is fully ready. */
+       rc = rhashtable_lookup_insert_fast(&lod->lod_pools_hash_body,
+                                          &new_pool->pool_hash,
+                                          pools_hash_params);
+       if (rc) {
+               if (rc != -EEXIST)
+                       /*
+                        * Hide -E2BIG and -EBUSY which
+                        * are not helpful.
+                        */
+                       rc = -ENOMEM;
+               GOTO(out_err, rc);
+       }
+
+       new_pool->pool_debugfs = debugfs_create_dir(poolname,
+                                                   lod->lod_pool_debugfs);
+       ldebugfs_add_vars(new_pool->pool_debugfs, ldebugfs_lod_pool_vars,
+                         new_pool);
 
        CDEBUG(D_CONFIG, LOV_POOLNAMEF" is pool #%d\n",
                        poolname, lod->lod_pool_count);
@@ -706,11 +507,13 @@ out_err:
        lod->lod_pool_count--;
        spin_unlock(&obd->obd_dev_lock);
 
+       lprocfs_remove(&new_pool->pool_spill_proc_entry);
        lprocfs_remove(&new_pool->pool_proc_entry);
 
-       lod_ost_pool_free(&new_pool->pool_rr.lqr_pool);
+       lu_tgt_pool_free(&new_pool->pool_rr.lqr_pool);
 out_free_pool_obds:
-       lod_ost_pool_free(&new_pool->pool_obds);
+       lu_tgt_pool_free(&new_pool->pool_obds);
+out_free_pool:
        OBD_FREE_PTR(new_pool);
        return rc;
 }
@@ -727,19 +530,33 @@ out_free_pool_obds:
 int lod_pool_del(struct obd_device *obd, char *poolname)
 {
        struct lod_device *lod = lu2lod_dev(obd->obd_lu_dev);
-       struct pool_desc  *pool;
+       struct lod_pool_desc  *pool;
        ENTRY;
 
        /* lookup and kill hash reference */
-       pool = cfs_hash_del_key(lod->lod_pools_hash_body, poolname);
-       if (pool == NULL)
+       rcu_read_lock();
+       pool = rhashtable_lookup(&lod->lod_pools_hash_body, poolname,
+                                pools_hash_params);
+       if (pool && rhashtable_remove_fast(&lod->lod_pools_hash_body,
+                                          &pool->pool_hash,
+                                          pools_hash_params) != 0)
+               pool = NULL;
+       rcu_read_unlock();
+       if (!pool)
                RETURN(-ENOENT);
 
+       debugfs_remove_recursive(pool->pool_debugfs);
+
        if (pool->pool_proc_entry != NULL) {
                CDEBUG(D_INFO, "proc entry %p\n", pool->pool_proc_entry);
                lprocfs_remove(&pool->pool_proc_entry);
                lod_pool_putref(pool);
        }
+       if (pool->pool_spill_proc_entry != NULL) {
+               CDEBUG(D_INFO, "proc entry %p\n", pool->pool_spill_proc_entry);
+               lprocfs_remove(&pool->pool_spill_proc_entry);
+               lod_pool_putref(pool);
+       }
 
        spin_lock(&obd->obd_dev_lock);
        list_del_init(&pool->pool_list);
@@ -766,23 +583,23 @@ int lod_pool_del(struct obd_device *obd, char *poolname)
  */
 int lod_pool_add(struct obd_device *obd, char *poolname, char *ostname)
 {
-       struct lod_device       *lod = lu2lod_dev(obd->obd_lu_dev);
-       struct obd_uuid          ost_uuid;
-       struct pool_desc        *pool;
-       unsigned int             idx;
-       int                      rc = -EINVAL;
+       struct lod_device *lod = lu2lod_dev(obd->obd_lu_dev);
+       struct obd_uuid ost_uuid;
+       struct lod_pool_desc *pool;
+       struct lu_tgt_desc *tgt;
+       int rc = -EINVAL;
        ENTRY;
 
-       pool = cfs_hash_lookup(lod->lod_pools_hash_body, poolname);
-       if (pool == NULL)
+       pool = lod_pool_find(lod, poolname);
+       if (!pool)
                RETURN(-ENOENT);
 
        obd_str2uuid(&ost_uuid, ostname);
 
        /* search ost in lod array */
        lod_getref(&lod->lod_ost_descs);
-       lod_foreach_ost(lod, idx) {
-               if (obd_uuid_equals(&ost_uuid, &OST_TGT(lod, idx)->ltd_uuid)) {
+       lod_foreach_ost(lod, tgt) {
+               if (obd_uuid_equals(&ost_uuid, &tgt->ltd_uuid)) {
                        rc = 0;
                        break;
                }
@@ -791,11 +608,12 @@ int lod_pool_add(struct obd_device *obd, char *poolname, char *ostname)
        if (rc)
                GOTO(out, rc);
 
-       rc = lod_ost_pool_add(&pool->pool_obds, idx, lod->lod_osts_size);
+       rc = lu_tgt_pool_add(&pool->pool_obds, tgt->ltd_index,
+                            lod->lod_ost_count);
        if (rc)
                GOTO(out, rc);
 
-       pool->pool_rr.lqr_dirty = 1;
+       set_bit(LQ_DIRTY, &pool->pool_rr.lqr_flags);
 
        CDEBUG(D_CONFIG, "Added %s to "LOV_POOLNAMEF" as member %d\n",
                        ostname, poolname,  pool_tgt_count(pool));
@@ -823,22 +641,23 @@ out:
  */
 int lod_pool_remove(struct obd_device *obd, char *poolname, char *ostname)
 {
-       struct lod_device       *lod = lu2lod_dev(obd->obd_lu_dev);
-       struct obd_uuid          ost_uuid;
-       struct pool_desc        *pool;
-       unsigned int             idx;
-       int                      rc = -EINVAL;
+       struct lod_device *lod = lu2lod_dev(obd->obd_lu_dev);
+       struct lu_tgt_desc *ost;
+       struct obd_uuid ost_uuid;
+       struct lod_pool_desc *pool;
+       int rc = -EINVAL;
        ENTRY;
 
-       pool = cfs_hash_lookup(lod->lod_pools_hash_body, poolname);
-       if (pool == NULL)
+       /* lookup and kill hash reference */
+       pool = lod_pool_find(lod, poolname);
+       if (!pool)
                RETURN(-ENOENT);
 
        obd_str2uuid(&ost_uuid, ostname);
 
        lod_getref(&lod->lod_ost_descs);
-       cfs_foreach_bit(lod->lod_ost_bitmap, idx) {
-               if (obd_uuid_equals(&ost_uuid, &OST_TGT(lod, idx)->ltd_uuid)) {
+       lod_foreach_ost(lod, ost) {
+               if (obd_uuid_equals(&ost_uuid, &ost->ltd_uuid)) {
                        rc = 0;
                        break;
                }
@@ -848,9 +667,8 @@ int lod_pool_remove(struct obd_device *obd, char *poolname, char *ostname)
        if (rc)
                GOTO(out, rc);
 
-       lod_ost_pool_remove(&pool->pool_obds, idx);
-
-       pool->pool_rr.lqr_dirty = 1;
+       lu_tgt_pool_remove(&pool->pool_obds, ost->ltd_index);
+       set_bit(LQ_DIRTY, &pool->pool_rr.lqr_flags);
 
        CDEBUG(D_CONFIG, "%s removed from "LOV_POOLNAMEF"\n", ostname,
               poolname);
@@ -874,25 +692,12 @@ out:
  * \retval             0 successfully found index in \a pool
  * \retval             negative error if device not found in \a pool
  */
-int lod_check_index_in_pool(__u32 idx, struct pool_desc *pool)
+int lod_check_index_in_pool(__u32 idx, struct lod_pool_desc *pool)
 {
-       unsigned int i;
        int rc;
-       ENTRY;
 
        pool_getref(pool);
-
-       down_read(&pool_tgt_rw_sem(pool));
-
-       for (i = 0; i < pool_tgt_count(pool); i++) {
-               if (pool_tgt_array(pool)[i] == idx)
-                       GOTO(out, rc = 0);
-       }
-       rc = -ENOENT;
-       EXIT;
-out:
-       up_read(&pool_tgt_rw_sem(pool));
-
+       rc = lu_tgt_check_index(idx, &pool->pool_obds);
        lod_pool_putref(pool);
        return rc;
 }
@@ -907,26 +712,99 @@ out:
  * \retval     pointer to pool descriptor on success
  * \retval     NULL if \a poolname could not be found or poolname is empty
  */
-struct pool_desc *lod_find_pool(struct lod_device *lod, char *poolname)
+struct lod_pool_desc *lod_find_pool(struct lod_device *lod, const char *poolname)
 {
-       struct pool_desc *pool;
-
-       pool = NULL;
-       if (poolname[0] != '\0') {
-               pool = cfs_hash_lookup(lod->lod_pools_hash_body, poolname);
-               if (pool == NULL)
-                       CDEBUG(D_CONFIG, "%s: request for an unknown pool ("
-                              LOV_POOLNAMEF")\n",
-                              lod->lod_child_exp->exp_obd->obd_name, poolname);
-               if (pool != NULL && pool_tgt_count(pool) == 0) {
-                       CDEBUG(D_CONFIG, "%s: request for an empty pool ("
-                              LOV_POOLNAMEF")\n",
-                              lod->lod_child_exp->exp_obd->obd_name, poolname);
-                       /* pool is ignored, so we remove ref on it */
-                       lod_pool_putref(pool);
-                       pool = NULL;
-               }
+       struct lod_pool_desc *pool;
+
+       if (poolname[0] == '\0' || lov_pool_is_reserved(poolname))
+               return NULL;
+
+       pool = lod_pool_find(lod, poolname);
+       if (!pool)
+               CDEBUG(D_CONFIG,
+                      "%s: request for an unknown pool (" LOV_POOLNAMEF ")\n",
+                      lod->lod_child_exp->exp_obd->obd_name, poolname);
+       if (pool != NULL && pool_tgt_count(pool) == 0) {
+               CDEBUG(D_CONFIG, "%s: request for an empty pool ("
+                      LOV_POOLNAMEF")\n",
+                      lod->lod_child_exp->exp_obd->obd_name, poolname);
+               /* pool is ignored, so we remove ref on it */
+               lod_pool_putref(pool);
+               pool = NULL;
        }
+
        return pool;
 }
 
+void lod_spill_target_refresh(const struct lu_env *env, struct lod_device *lod,
+                             struct lod_pool_desc *pool)
+{
+       __u64 avail_bytes = 0, total_bytes = 0;
+       struct lu_tgt_pool *osts;
+       int i;
+
+       if (ktime_get_seconds() < pool->pool_spill_expire)
+               return;
+
+       if (pool->pool_spill_threshold_pct == 0)
+               return;
+
+       lod_qos_statfs_update(env, lod, &lod->lod_ost_descs);
+
+       down_write(&pool_tgt_rw_sem(pool));
+       if (ktime_get_seconds() < pool->pool_spill_expire)
+               goto out_sem;
+       pool->pool_spill_expire = ktime_get_seconds() +
+               lod->lod_ost_descs.ltd_lov_desc.ld_qos_maxage;
+
+       osts = &(pool->pool_obds);
+       for (i = 0; i < osts->op_count; i++) {
+               int idx = osts->op_array[i];
+               struct lod_tgt_desc *tgt;
+               struct obd_statfs *sfs;
+
+               if (!test_bit(idx, lod->lod_ost_bitmap))
+                       continue;
+               tgt = OST_TGT(lod, idx);
+               if (!tgt->ltd_active)
+                       continue;
+               sfs = &tgt->ltd_statfs;
+
+               avail_bytes += sfs->os_bavail * sfs->os_bsize;
+               total_bytes += sfs->os_blocks * sfs->os_bsize;
+       }
+       if (total_bytes - avail_bytes >=
+           total_bytes * pool->pool_spill_threshold_pct / 100)
+               pool->pool_spill_is_active = true;
+       else
+               pool->pool_spill_is_active = false;
+
+out_sem:
+       up_write(&pool_tgt_rw_sem(pool));
+}
+
+/*
+ * XXX: consider a better schema to detect loops
+ */
+void lod_check_and_spill_pool(const struct lu_env *env, struct lod_device *lod,
+                             char **poolname)
+{
+       struct lod_pool_desc *pool;
+
+       if (!poolname || !*poolname || (*poolname)[0] == '\0')
+               return;
+repeat:
+       pool = lod_pool_find(lod, *poolname);
+       if (!pool)
+               return;
+
+       lod_spill_target_refresh(env, lod, pool);
+       if (pool->pool_spill_is_active) {
+               lod_set_pool(poolname, pool->pool_spill_target);
+               atomic_inc(&pool->pool_spill_hit);
+               lod_pool_putref(pool);
+               goto repeat;
+       }
+
+       lod_pool_putref(pool);
+}