Whamcloud - gitweb
LU-13543 lustre: update *pos in seq_file .next functions
[fs/lustre-release.git] / lustre / lod / lod_pool.c
index c8959a0..b59b0df 100644 (file)
  * in the LICENSE file that accompanied this code).
  *
  * You should have received a copy of the GNU General Public License
- * version 2 along with this program; If not, see [sun.com URL with a
- * copy of GPLv2].
- *
- * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
- * CA 95054 USA or visit www.sun.com if you need additional information or
- * have any questions.
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
  *
  * GPL HEADER END
  */
  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2013, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
  * Lustre is a trademark of Sun Microsystems, Inc.
- *
+ */
+/*
  * lustre/lod/lod_pool.c
  *
  * OST pool methods
  *
+ * This file provides code related to the Logical Object Device (LOD)
+ * handling of OST Pools on the MDT.  Pools are named lists of targets
+ * that allow userspace to group targets that share a particlar property
+ * together so that users or kernel helpers can make decisions about file
+ * allocation based on these properties.  For example, pools could be
+ * defined based on fault domains (e.g. separate racks of server nodes) so
+ * that RAID-1 mirroring could select targets from independent fault
+ * domains, or pools could define target performance characteristics so
+ * that applicatins could select IOP-optimized storage or stream-optimized
+ * storage for a particular output file.
+ *
+ * This file handles creation, lookup, and removal of pools themselves, as
+ * well as adding and removing targets to pools.  It also handles lprocfs
+ * display of configured pool.  The pools are accessed by name in the pool
+ * hash, and are refcounted to ensure proper pool structure lifetimes.
+ *
  * Author: Jacques-Charles LAFOUCRIERE <jc.lafoucriere@cea.fr>
  * Author: Alex Lyashkov <Alexey.Lyashkov@Sun.COM>
  * Author: Nathaniel Rutman <Nathan.Rutman@Sun.COM>
 #define DEBUG_SUBSYSTEM S_LOV
 
 #include <libcfs/libcfs.h>
+#include <libcfs/linux/linux-hash.h>
 #include <obd.h>
 #include "lod_internal.h"
 
-#define pool_tgt(_p, _i) \
-       OST_TGT(lu2lod_dev((_p)->pool_lobd->obd_lu_dev),(_p)->pool_obds.op_array[_i])
+#define pool_tgt(_p, _i) OST_TGT(lu2lod_dev((_p)->pool_lobd->obd_lu_dev), \
+                                (_p)->pool_obds.op_array[_i])
 
-static void lod_pool_getref(struct pool_desc *pool)
+/**
+ * Get a reference on the specified pool.
+ *
+ * To ensure the pool descriptor is not freed before the caller is finished
+ * with it.  Any process that is accessing \a pool directly needs to hold
+ * reference on it, including /proc since a userspace thread may be holding
+ * the /proc file open and busy in the kernel.
+ *
+ * \param[in] pool     pool descriptor on which to gain reference
+ */
+static void pool_getref(struct pool_desc *pool)
 {
        CDEBUG(D_INFO, "pool %p\n", pool);
        atomic_inc(&pool->pool_refcount);
 }
 
+/**
+ * Drop a reference on the specified pool and free its memory if needed.
+ *
+ * One reference is held by the LOD OBD device while it is configured, from
+ * the time the configuration log defines the pool until the time when it is
+ * dropped when the LOD OBD is cleaned up or the pool is deleted.  This means
+ * that the pool will not be freed while the LOD device is configured, unless
+ * it is explicitly destroyed by the sysadmin.  The pool structure is freed
+ * after the last reference on the structure is released.
+ *
+ * \param[in] pool     pool descriptor to drop reference on and possibly free
+ */
 void lod_pool_putref(struct pool_desc *pool)
 {
        CDEBUG(D_INFO, "pool %p\n", pool);
        if (atomic_dec_and_test(&pool->pool_refcount)) {
-               LASSERT(cfs_hlist_unhashed(&pool->pool_hash));
-               LASSERT(cfs_list_empty(&pool->pool_list));
+               LASSERT(list_empty(&pool->pool_list));
                LASSERT(pool->pool_proc_entry == NULL);
-               lod_ost_pool_free(&(pool->pool_rr.lqr_pool));
-               lod_ost_pool_free(&(pool->pool_obds));
-               OBD_FREE_PTR(pool);
+               tgt_pool_free(&(pool->pool_rr.lqr_pool));
+               tgt_pool_free(&(pool->pool_obds));
+               kfree_rcu(pool, pool_rcu);
                EXIT;
        }
 }
 
-void lod_pool_putref_locked(struct pool_desc *pool)
+static u32 pool_hashfh(const void *data, u32 len, u32 seed)
 {
-       CDEBUG(D_INFO, "pool %p\n", pool);
-       LASSERT(atomic_read(&pool->pool_refcount) > 1);
+       const char *pool_name = data;
 
-       atomic_dec(&pool->pool_refcount);
+       return hashlen_hash(cfs_hashlen_string((void *)(unsigned long)seed,
+                                              pool_name));
 }
 
-
-/*
- * hash function using a Rotating Hash algorithm
- * Knuth, D. The Art of Computer Programming,
- * Volume 3: Sorting and Searching,
- * Chapter 6.4.
- * Addison Wesley, 1973
- */
-static __u32 pool_hashfn(cfs_hash_t *hash_body, const void *key, unsigned mask)
+static int pool_cmpfn(struct rhashtable_compare_arg *arg, const void *obj)
 {
-       int i;
-       __u32 result;
-       char *poolname;
-
-       result = 0;
-       poolname = (char *)key;
-       for (i = 0; i < LOV_MAXPOOLNAME; i++) {
-               if (poolname[i] == '\0')
-                       break;
-               result = (result << 4)^(result >> 28) ^  poolname[i];
-       }
-       return (result % mask);
-}
+       const struct pool_desc *pool = obj;
+       const char *pool_name = arg->key;
 
-static void *pool_key(cfs_hlist_node_t *hnode)
-{
-       struct pool_desc *pool;
-
-       pool = cfs_hlist_entry(hnode, struct pool_desc, pool_hash);
-       return (pool->pool_name);
-}
-
-static int pool_hashkey_keycmp(const void *key, cfs_hlist_node_t *compared_hnode)
-{
-       char *pool_name;
-       struct pool_desc *pool;
-
-       pool_name = (char *)key;
-       pool = cfs_hlist_entry(compared_hnode, struct pool_desc, pool_hash);
-       return !strncmp(pool_name, pool->pool_name, LOV_MAXPOOLNAME);
-}
-
-static void *pool_hashobject(cfs_hlist_node_t *hnode)
-{
-       return cfs_hlist_entry(hnode, struct pool_desc, pool_hash);
-}
-
-static void pool_hashrefcount_get(cfs_hash_t *hs, cfs_hlist_node_t *hnode)
-{
-       struct pool_desc *pool;
-
-       pool = cfs_hlist_entry(hnode, struct pool_desc, pool_hash);
-       lod_pool_getref(pool);
-}
-
-static void pool_hashrefcount_put_locked(cfs_hash_t *hs,
-               cfs_hlist_node_t *hnode)
-{
-       struct pool_desc *pool;
-
-       pool = cfs_hlist_entry(hnode, struct pool_desc, pool_hash);
-       lod_pool_putref_locked(pool);
+       return strcmp(pool_name, pool->pool_name);
 }
 
-cfs_hash_ops_t pool_hash_operations = {
-       .hs_hash        = pool_hashfn,
-       .hs_key         = pool_key,
-       .hs_keycmp      = pool_hashkey_keycmp,
-       .hs_object      = pool_hashobject,
-       .hs_get         = pool_hashrefcount_get,
-       .hs_put_locked  = pool_hashrefcount_put_locked,
+static const struct rhashtable_params pools_hash_params = {
+       .key_len        = 1, /* actually variable */
+       .key_offset     = offsetof(struct pool_desc, pool_name),
+       .head_offset    = offsetof(struct pool_desc, pool_hash),
+       .hashfn         = pool_hashfh,
+       .obj_cmpfn      = pool_cmpfn,
+       .automatic_shrinking = true,
 };
 
-#ifdef LPROCFS
-/* ifdef needed for liblustre support */
-/*
- * pool /proc seq_file methods
- */
 /*
- * iterator is used to go through the target pool entries
- * index is the current entry index in the lp_array[] array
- * index >= pos returned to the seq_file interface
- * pos is from 0 to (pool->pool_obds.op_count - 1)
+ * Methods for /proc seq_file iteration of the defined pools.
  */
+
 #define POOL_IT_MAGIC 0xB001CEA0
 struct lod_pool_iterator {
-       int               lpi_magic;
-       int               lpi_idx;      /* from 0 to pool_tgt_size - 1 */
+       unsigned int      lpi_magic;    /* POOL_IT_MAGIC */
+       unsigned int      lpi_idx;      /* from 0 to pool_tgt_size - 1 */
        struct pool_desc *lpi_pool;
 };
 
-static void *pool_proc_next(struct seq_file *s, void *v, loff_t *pos)
+/**
+ * Return the next configured target within one pool for seq_file iteration.
+ *
+ * Iterator is used to go through the target entries of a single pool
+ * (i.e. the list of OSTs configured for a named pool).
+ * lpi_idx is the current target index in the pool's op_array[].
+ *
+ * The return type is a void * because this function is one of the
+ * struct seq_operations methods and must match the function template.
+ *
+ * \param[in] seq      /proc sequence file iteration tracking structure
+ * \param[in] v                unused
+ * \param[in] pos      position within iteration; 0 to number of targets - 1
+ *
+ * \retval     struct pool_iterator of the next pool descriptor
+ */
+static void *pool_proc_next(struct seq_file *seq, void *v, loff_t *pos)
 {
-       struct lod_pool_iterator *iter = s->private;
+       struct lod_pool_iterator *iter = seq->private;
        int prev_idx;
 
-       LASSERTF(iter->lpi_magic == POOL_IT_MAGIC, "%08X", iter->lpi_magic);
+       LASSERTF(iter->lpi_magic == POOL_IT_MAGIC, "%08X\n", iter->lpi_magic);
 
+       (*pos)++;
        /* test if end of file */
-       if (*pos >= pool_tgt_count(iter->lpi_pool))
+       if (*pos > pool_tgt_count(iter->lpi_pool))
                return NULL;
 
+       OBD_FAIL_TIMEOUT(OBD_FAIL_OST_LIST_ASSERT, cfs_fail_val);
+
        /* iterate to find a non empty entry */
        prev_idx = iter->lpi_idx;
-       down_read(&pool_tgt_rw_sem(iter->lpi_pool));
        iter->lpi_idx++;
-       if (iter->lpi_idx == pool_tgt_count(iter->lpi_pool)) {
+       if (iter->lpi_idx >= pool_tgt_count(iter->lpi_pool)) {
                iter->lpi_idx = prev_idx; /* we stay on the last entry */
-               up_read(&pool_tgt_rw_sem(iter->lpi_pool));
                return NULL;
        }
-       up_read(&pool_tgt_rw_sem(iter->lpi_pool));
-       (*pos)++;
+
        /* return != NULL to continue */
        return iter;
 }
 
-static void *pool_proc_start(struct seq_file *s, loff_t *pos)
+/**
+ * Start seq_file iteration via /proc for a single pool.
+ *
+ * The \a pos parameter may be non-zero, indicating that the iteration
+ * is starting at some offset in the target list.  Use the seq_file
+ * private field to memorize the iterator so we can free it at stop().
+ * Need to restore the private pointer to the pool before freeing it.
+ *
+ * \param[in] seq      new sequence file structure to initialize
+ * \param[in] pos      initial target number at which to start iteration
+ *
+ * \retval             initialized pool iterator private structure
+ * \retval             NULL if \a pos exceeds the number of targets in \a pool
+ * \retval             negative error number on failure
+ */
+static void *pool_proc_start(struct seq_file *seq, loff_t *pos)
 {
-       struct pool_desc *pool = s->private;
+       struct pool_desc *pool = seq->private;
        struct lod_pool_iterator *iter;
 
-       lod_pool_getref(pool);
+       pool_getref(pool);
        if ((pool_tgt_count(pool) == 0) ||
            (*pos >= pool_tgt_count(pool))) {
                /* iter is not created, so stop() has no way to
@@ -211,79 +214,105 @@ static void *pool_proc_start(struct seq_file *s, loff_t *pos)
        }
 
        OBD_ALLOC_PTR(iter);
-       if (!iter)
+       if (iter == NULL)
                return ERR_PTR(-ENOMEM);
        iter->lpi_magic = POOL_IT_MAGIC;
        iter->lpi_pool = pool;
        iter->lpi_idx = 0;
 
-       /* we use seq_file private field to memorized iterator so
-        * we can free it at stop() */
-       /* /!\ do not forget to restore it to pool before freeing it */
-       s->private = iter;
+       seq->private = iter;
+       down_read(&pool_tgt_rw_sem(pool));
        if (*pos > 0) {
                loff_t i;
                void *ptr;
 
                i = 0;
                do {
-                       ptr = pool_proc_next(s, &iter, &i);
+                       ptr = pool_proc_next(seq, &iter, &i);
                } while ((i < *pos) && (ptr != NULL));
+
                return ptr;
        }
+
        return iter;
 }
 
-static void pool_proc_stop(struct seq_file *s, void *v)
+/**
+ * Finish seq_file iteration for a single pool.
+ *
+ * Once iteration has been completed, the pool_iterator struct must be
+ * freed, and the seq_file private pointer restored to the pool, as it
+ * was initially when pool_proc_start() was called.
+ *
+ * In some cases the stop() method may be called 2 times, without calling
+ * the start() method (see seq_read() from fs/seq_file.c). We have to free
+ * the private iterator struct only if seq->private points to the iterator.
+ *
+ * \param[in] seq      sequence file structure to clean up
+ * \param[in] v                (unused)
+ */
+static void pool_proc_stop(struct seq_file *seq, void *v)
 {
-       struct lod_pool_iterator *iter = s->private;
-
-       /* in some cases stop() method is called 2 times, without
-        * calling start() method (see seq_read() from fs/seq_file.c)
-        * we have to free only if s->private is an iterator */
-       if (iter != NULL && (iter->lpi_magic == POOL_IT_MAGIC)) {
-               /* we restore s->private so next call to pool_proc_start()
-                * will work */
-               s->private = iter->lpi_pool;
+       struct lod_pool_iterator *iter = seq->private;
+
+       if (iter != NULL && iter->lpi_magic == POOL_IT_MAGIC) {
+               up_read(&pool_tgt_rw_sem(iter->lpi_pool));
+               seq->private = iter->lpi_pool;
                lod_pool_putref(iter->lpi_pool);
                OBD_FREE_PTR(iter);
        }
-       return;
 }
 
-static int pool_proc_show(struct seq_file *s, void *v)
+/**
+ * Print out one target entry from the pool for seq_file iteration.
+ *
+ * The currently referenced pool target is given by op_array[lpi_idx].
+ *
+ * \param[in] seq      new sequence file structure to initialize
+ * \param[in] v                (unused)
+ */
+static int pool_proc_show(struct seq_file *seq, void *v)
 {
        struct lod_pool_iterator *iter = v;
-       struct lod_tgt_desc  *osc_desc;
+       struct lod_tgt_desc  *tgt;
 
-       LASSERTF(iter->lpi_magic == POOL_IT_MAGIC, "%08X", iter->lpi_magic);
+       LASSERTF(iter->lpi_magic == POOL_IT_MAGIC, "%08X\n", iter->lpi_magic);
        LASSERT(iter->lpi_pool != NULL);
        LASSERT(iter->lpi_idx <= pool_tgt_count(iter->lpi_pool));
 
-       down_read(&pool_tgt_rw_sem(iter->lpi_pool));
-       osc_desc = pool_tgt(iter->lpi_pool, iter->lpi_idx);
-       up_read(&pool_tgt_rw_sem(iter->lpi_pool));
-       if (osc_desc)
-               seq_printf(s, "%s\n", obd_uuid2str(&(osc_desc->ltd_uuid)));
+       tgt = pool_tgt(iter->lpi_pool, iter->lpi_idx);
+       if (tgt != NULL)
+               seq_printf(seq, "%s\n", obd_uuid2str(&(tgt->ltd_uuid)));
 
        return 0;
 }
 
-static struct seq_operations pool_proc_ops = {
+static const struct seq_operations pool_proc_ops = {
        .start  = pool_proc_start,
        .next   = pool_proc_next,
        .stop   = pool_proc_stop,
        .show   = pool_proc_show,
 };
 
+/**
+ * Open a new /proc file for seq_file iteration of targets in one pool.
+ *
+ * Initialize the seq_file private pointer to reference the pool.
+ *
+ * \param inode        inode to store iteration state for /proc
+ * \param file file descriptor to store iteration methods
+ *
+ * \retval     0 for success
+ * \retval     negative error number on failure
+ */
 static int pool_proc_open(struct inode *inode, struct file *file)
 {
        int rc;
 
        rc = seq_open(file, &pool_proc_ops);
        if (!rc) {
-               struct seq_file *s = file->private_data;
-               s->private = PDE_DATA(inode);
+               struct seq_file *seq = file->private_data;
+               seq->private = PDE_DATA(inode);
        }
        return rc;
 }
@@ -294,13 +323,25 @@ static struct file_operations pool_proc_operations = {
        .llseek         = seq_lseek,
        .release        = seq_release,
 };
-#endif /* LPROCFS */
 
+/**
+ * Dump the pool target list into the Lustre debug log.
+ *
+ * This is a debugging function to allow dumping the list of targets
+ * in \a pool to the Lustre kernel debug log at the given \a level.
+ *
+ * This is not currently called by any existing code, but can be called
+ * from within gdb/crash to display the contents of the pool, or from
+ * code under development.
+ *
+ * \param[in] level    Lustre debug level (D_INFO, D_WARN, D_ERROR, etc)
+ * \param[in] pool     pool descriptor to be dumped
+ */
 void lod_dump_pool(int level, struct pool_desc *pool)
 {
-       int i;
+       unsigned int i;
 
-       lod_pool_getref(pool);
+       pool_getref(pool);
 
        CDEBUG(level, "pool "LOV_POOLNAMEF" has %d members\n",
               pool->pool_name, pool->pool_obds.op_count);
@@ -318,115 +359,37 @@ void lod_dump_pool(int level, struct pool_desc *pool)
        lod_pool_putref(pool);
 }
 
-#define LOD_POOL_INIT_COUNT 2
-int lod_ost_pool_init(struct ost_pool *op, unsigned int count)
+static void pools_hash_exit(void *vpool, void *data)
 {
-       ENTRY;
+       struct pool_desc *pool = vpool;
 
-       if (count == 0)
-               count = LOD_POOL_INIT_COUNT;
-       op->op_array = NULL;
-       op->op_count = 0;
-       init_rwsem(&op->op_rw_sem);
-       op->op_size = count;
-       OBD_ALLOC(op->op_array, op->op_size * sizeof(op->op_array[0]));
-       if (op->op_array == NULL) {
-               op->op_size = 0;
-               RETURN(-ENOMEM);
-       }
-       EXIT;
-       return 0;
-}
-
-/* Caller must hold write op_rwlock */
-int lod_ost_pool_extend(struct ost_pool *op, unsigned int min_count)
-{
-       __u32 *new;
-       int new_size;
-
-       LASSERT(min_count != 0);
-
-       if (op->op_count < op->op_size)
-               return 0;
-
-       new_size = max(min_count, 2 * op->op_size);
-       OBD_ALLOC(new, new_size * sizeof(op->op_array[0]));
-       if (new == NULL)
-               return -ENOMEM;
-
-       /* copy old array to new one */
-       memcpy(new, op->op_array, op->op_size * sizeof(op->op_array[0]));
-       OBD_FREE(op->op_array, op->op_size * sizeof(op->op_array[0]));
-       op->op_array = new;
-       op->op_size = new_size;
-       return 0;
-}
-
-int lod_ost_pool_add(struct ost_pool *op, __u32 idx, unsigned int min_count)
-{
-       int rc = 0, i;
-       ENTRY;
-
-       down_write(&op->op_rw_sem);
-
-       rc = lod_ost_pool_extend(op, min_count);
-       if (rc)
-               GOTO(out, rc);
-
-       /* search ost in pool array */
-       for (i = 0; i < op->op_count; i++) {
-               if (op->op_array[i] == idx)
-                       GOTO(out, rc = -EEXIST);
-       }
-       /* ost not found we add it */
-       op->op_array[op->op_count] = idx;
-       op->op_count++;
-       EXIT;
-out:
-       up_write(&op->op_rw_sem);
-       return rc;
+       lod_pool_putref(pool);
 }
 
-int lod_ost_pool_remove(struct ost_pool *op, __u32 idx)
+int lod_pool_hash_init(struct rhashtable *tbl)
 {
-       int i;
-       ENTRY;
-
-       down_write(&op->op_rw_sem);
-
-       for (i = 0; i < op->op_count; i++) {
-               if (op->op_array[i] == idx) {
-                       memmove(&op->op_array[i], &op->op_array[i + 1],
-                               (op->op_count - i - 1) * sizeof(op->op_array[0]));
-                       op->op_count--;
-                       up_write(&op->op_rw_sem);
-                       EXIT;
-                       return 0;
-               }
-       }
-
-       up_write(&op->op_rw_sem);
-       RETURN(-EINVAL);
+       return rhashtable_init(tbl, &pools_hash_params);
 }
 
-int lod_ost_pool_free(struct ost_pool *op)
+void lod_pool_hash_destroy(struct rhashtable *tbl)
 {
-       ENTRY;
-
-       if (op->op_size == 0)
-               RETURN(0);
-
-       down_write(&op->op_rw_sem);
-
-       OBD_FREE(op->op_array, op->op_size * sizeof(op->op_array[0]));
-       op->op_array = NULL;
-       op->op_count = 0;
-       op->op_size = 0;
-
-       up_write(&op->op_rw_sem);
-       RETURN(0);
+       rhashtable_free_and_destroy(tbl, pools_hash_exit, NULL);
 }
 
+/**
+ * Allocate a new pool for the specified device.
+ *
+ * Allocate a new pool_desc structure for the specified \a new_pool
+ * device to create a pool with the given \a poolname.  The new pool
+ * structure is created with a single reference, and is freed when the
+ * reference count drops to zero.
+ *
+ * \param[in] obd      Lustre OBD device on which to add a pool iterator
+ * \param[in] poolname the name of the pool to be created
+ *
+ * \retval             0 in case of success
+ * \retval             negative error code in case of error
+ */
 int lod_pool_new(struct obd_device *obd, char *poolname)
 {
        struct lod_device *lod = lu2lod_dev(obd->obd_lu_dev);
@@ -437,36 +400,28 @@ int lod_pool_new(struct obd_device *obd, char *poolname)
        if (strlen(poolname) > LOV_MAXPOOLNAME)
                RETURN(-ENAMETOOLONG);
 
-       OBD_ALLOC_PTR(new_pool);
+       /* OBD_ALLOC_* doesn't work with direct kfree_rcu use */
+       new_pool = kmalloc(sizeof(*new_pool), GFP_KERNEL);
        if (new_pool == NULL)
                RETURN(-ENOMEM);
 
-       strncpy(new_pool->pool_name, poolname, LOV_MAXPOOLNAME);
-       new_pool->pool_name[LOV_MAXPOOLNAME] = '\0';
+       strlcpy(new_pool->pool_name, poolname, sizeof(new_pool->pool_name));
        new_pool->pool_lobd = obd;
-       /* ref count init to 1 because when created a pool is always used
-        * up to deletion
-        */
        atomic_set(&new_pool->pool_refcount, 1);
-       rc = lod_ost_pool_init(&new_pool->pool_obds, 0);
+       rc = tgt_pool_init(&new_pool->pool_obds, 0);
        if (rc)
                GOTO(out_err, rc);
 
-       memset(&new_pool->pool_rr, 0, sizeof(new_pool->pool_rr));
-       rc = lod_ost_pool_init(&new_pool->pool_rr.lqr_pool, 0);
+       lu_qos_rr_init(&new_pool->pool_rr);
+
+       rc = tgt_pool_init(&new_pool->pool_rr.lqr_pool, 0);
        if (rc)
                GOTO(out_free_pool_obds, rc);
 
-       INIT_HLIST_NODE(&new_pool->pool_hash);
-
-#ifdef LPROCFS
-       lod_pool_getref(new_pool);
+#ifdef CONFIG_PROC_FS
+       pool_getref(new_pool);
        new_pool->pool_proc_entry = lprocfs_add_simple(lod->lod_pool_proc_entry,
-                                                      poolname,
-#ifndef HAVE_ONLY_PROCFS_SEQ
-                                                      NULL, NULL,
-#endif
-                                                      new_pool,
+                                                      poolname, new_pool,
                                                       &pool_proc_operations);
        if (IS_ERR(new_pool->pool_proc_entry)) {
                CDEBUG(D_CONFIG, "%s: cannot add proc entry "LOV_POOLNAMEF"\n",
@@ -479,15 +434,23 @@ int lod_pool_new(struct obd_device *obd, char *poolname)
 #endif
 
        spin_lock(&obd->obd_dev_lock);
-       cfs_list_add_tail(&new_pool->pool_list, &lod->lod_pool_list);
+       list_add_tail(&new_pool->pool_list, &lod->lod_pool_list);
        lod->lod_pool_count++;
        spin_unlock(&obd->obd_dev_lock);
 
-       /* add to find only when it fully ready  */
-       rc = cfs_hash_add_unique(lod->lod_pools_hash_body, poolname,
-                                &new_pool->pool_hash);
-       if (rc)
-               GOTO(out_err, rc = -EEXIST);
+       /* Add to hash table only when it is fully ready. */
+       rc = rhashtable_lookup_insert_fast(&lod->lod_pools_hash_body,
+                                          &new_pool->pool_hash,
+                                          pools_hash_params);
+       if (rc) {
+               if (rc != -EEXIST)
+                       /*
+                        * Hide -E2BIG and -EBUSY which
+                        * are not helpful.
+                        */
+                       rc = -ENOMEM;
+               GOTO(out_err, rc);
+       }
 
        CDEBUG(D_CONFIG, LOV_POOLNAMEF" is pool #%d\n",
                        poolname, lod->lod_pool_count);
@@ -496,19 +459,28 @@ int lod_pool_new(struct obd_device *obd, char *poolname)
 
 out_err:
        spin_lock(&obd->obd_dev_lock);
-       cfs_list_del_init(&new_pool->pool_list);
+       list_del_init(&new_pool->pool_list);
        lod->lod_pool_count--;
        spin_unlock(&obd->obd_dev_lock);
 
        lprocfs_remove(&new_pool->pool_proc_entry);
 
-       lod_ost_pool_free(&new_pool->pool_rr.lqr_pool);
+       tgt_pool_free(&new_pool->pool_rr.lqr_pool);
 out_free_pool_obds:
-       lod_ost_pool_free(&new_pool->pool_obds);
+       tgt_pool_free(&new_pool->pool_obds);
        OBD_FREE_PTR(new_pool);
        return rc;
 }
 
+/**
+ * Remove the named pool from the OBD device.
+ *
+ * \param[in] obd      OBD device on which pool was previously created
+ * \param[in] poolname name of pool to remove from \a obd
+ *
+ * \retval             0 on successfully removing the pool
+ * \retval             negative error numbers for failures
+ */
 int lod_pool_del(struct obd_device *obd, char *poolname)
 {
        struct lod_device *lod = lu2lod_dev(obd->obd_lu_dev);
@@ -516,8 +488,15 @@ int lod_pool_del(struct obd_device *obd, char *poolname)
        ENTRY;
 
        /* lookup and kill hash reference */
-       pool = cfs_hash_del_key(lod->lod_pools_hash_body, poolname);
-       if (pool == NULL)
+       rcu_read_lock();
+       pool = rhashtable_lookup(&lod->lod_pools_hash_body, poolname,
+                                pools_hash_params);
+       if (pool && rhashtable_remove_fast(&lod->lod_pools_hash_body,
+                                          &pool->pool_hash,
+                                          pools_hash_params) != 0)
+               pool = NULL;
+       rcu_read_unlock();
+       if (!pool)
                RETURN(-ENOENT);
 
        if (pool->pool_proc_entry != NULL) {
@@ -527,7 +506,7 @@ int lod_pool_del(struct obd_device *obd, char *poolname)
        }
 
        spin_lock(&obd->obd_dev_lock);
-       cfs_list_del_init(&pool->pool_list);
+       list_del_init(&pool->pool_list);
        lod->lod_pool_count--;
        spin_unlock(&obd->obd_dev_lock);
 
@@ -537,26 +516,42 @@ int lod_pool_del(struct obd_device *obd, char *poolname)
        RETURN(0);
 }
 
-
+/**
+ * Add a single target device to the named pool.
+ *
+ * Add the target specified by \a ostname to the specified \a poolname.
+ *
+ * \param[in] obd      OBD device on which to add the pool
+ * \param[in] poolname name of the pool to which to add the target \a ostname
+ * \param[in] ostname  name of the target device to be added
+ *
+ * \retval             0 if \a ostname was (previously) added to the named pool
+ * \retval             negative error number on failure
+ */
 int lod_pool_add(struct obd_device *obd, char *poolname, char *ostname)
 {
        struct lod_device *lod = lu2lod_dev(obd->obd_lu_dev);
-       struct obd_uuid    ost_uuid;
-       struct pool_desc  *pool;
-       unsigned int       idx;
-       int                rc = -EINVAL;
+       struct obd_uuid ost_uuid;
+       struct pool_desc *pool;
+       struct lu_tgt_desc *tgt;
+       int rc = -EINVAL;
        ENTRY;
 
-       pool = cfs_hash_lookup(lod->lod_pools_hash_body, poolname);
-       if (pool == NULL)
+       rcu_read_lock();
+       pool = rhashtable_lookup(&lod->lod_pools_hash_body, poolname,
+                                pools_hash_params);
+       if (pool && !atomic_inc_not_zero(&pool->pool_refcount))
+               pool = NULL;
+       rcu_read_unlock();
+       if (!pool)
                RETURN(-ENOENT);
 
        obd_str2uuid(&ost_uuid, ostname);
 
        /* search ost in lod array */
        lod_getref(&lod->lod_ost_descs);
-       lod_foreach_ost(lod, idx) {
-               if (obd_uuid_equals(&ost_uuid, &OST_TGT(lod, idx)->ltd_uuid)) {
+       lod_foreach_ost(lod, tgt) {
+               if (obd_uuid_equals(&ost_uuid, &tgt->ltd_uuid)) {
                        rc = 0;
                        break;
                }
@@ -565,7 +560,8 @@ int lod_pool_add(struct obd_device *obd, char *poolname, char *ostname)
        if (rc)
                GOTO(out, rc);
 
-       rc = lod_ost_pool_add(&pool->pool_obds, idx, lod->lod_osts_size);
+       rc = tgt_pool_add(&pool->pool_obds, tgt->ltd_index,
+                             lod->lod_ost_count);
        if (rc)
                GOTO(out, rc);
 
@@ -581,25 +577,44 @@ out:
        return rc;
 }
 
+/**
+ * Remove the named target from the specified pool.
+ *
+ * Remove one target named \a ostname from \a poolname.  The \a ostname
+ * is searched for in the lod_device lod_ost_bitmap array, to ensure the
+ * specified name actually exists in the pool.
+ *
+ * \param[in] obd      OBD device from which to remove \a poolname
+ * \param[in] poolname name of the pool to be changed
+ * \param[in] ostname  name of the target to remove from \a poolname
+ *
+ * \retval             0 on successfully removing \a ostname from the pool
+ * \retval             negative number on error (e.g. \a ostname not in pool)
+ */
 int lod_pool_remove(struct obd_device *obd, char *poolname, char *ostname)
 {
        struct lod_device *lod = lu2lod_dev(obd->obd_lu_dev);
-       struct obd_uuid    ost_uuid;
-       struct pool_desc  *pool;
-       unsigned int       idx;
-       int                rc = -EINVAL;
+       struct lu_tgt_desc *ost;
+       struct obd_uuid ost_uuid;
+       struct pool_desc *pool;
+       int rc = -EINVAL;
        ENTRY;
 
-       pool = cfs_hash_lookup(lod->lod_pools_hash_body, poolname);
-       if (pool == NULL)
+       /* lookup and kill hash reference */
+       rcu_read_lock();
+       pool = rhashtable_lookup(&lod->lod_pools_hash_body, poolname,
+                                pools_hash_params);
+       if (pool && !atomic_inc_not_zero(&pool->pool_refcount))
+               pool = NULL;
+       rcu_read_unlock();
+       if (!pool)
                RETURN(-ENOENT);
 
        obd_str2uuid(&ost_uuid, ostname);
 
        lod_getref(&lod->lod_ost_descs);
-       /* search ost in lod array, to get index */
-       cfs_foreach_bit(lod->lod_ost_bitmap, idx) {
-               if (obd_uuid_equals(&ost_uuid, &OST_TGT(lod, idx)->ltd_uuid)) {
+       lod_foreach_ost(lod, ost) {
+               if (obd_uuid_equals(&ost_uuid, &ost->ltd_uuid)) {
                        rc = 0;
                        break;
                }
@@ -609,8 +624,7 @@ int lod_pool_remove(struct obd_device *obd, char *poolname, char *ostname)
        if (rc)
                GOTO(out, rc);
 
-       lod_ost_pool_remove(&pool->pool_obds, idx);
-
+       tgt_pool_remove(&pool->pool_obds, ost->ltd_index);
        pool->pool_rr.lqr_dirty = 1;
 
        CDEBUG(D_CONFIG, "%s removed from "LOV_POOLNAMEF"\n", ostname,
@@ -623,42 +637,53 @@ out:
        return rc;
 }
 
+/**
+ * Check if the specified target exists in the pool.
+ *
+ * The caller may not have a reference on \a pool if it got the pool without
+ * calling lod_find_pool() (e.g. directly from the lod pool list)
+ *
+ * \param[in] idx      Target index to check
+ * \param[in] pool     Pool in which to check if target is added.
+ *
+ * \retval             0 successfully found index in \a pool
+ * \retval             negative error if device not found in \a pool
+ */
 int lod_check_index_in_pool(__u32 idx, struct pool_desc *pool)
 {
-       int i, rc;
-       ENTRY;
-
-       /* caller may no have a ref on pool if it got the pool
-        * without calling lod_find_pool() (e.g. go through the lod pool
-        * list)
-        */
-       lod_pool_getref(pool);
-
-       down_read(&pool_tgt_rw_sem(pool));
-
-       for (i = 0; i < pool_tgt_count(pool); i++) {
-               if (pool_tgt_array(pool)[i] == idx)
-                       GOTO(out, rc = 0);
-       }
-       rc = -ENOENT;
-       EXIT;
-out:
-       up_read(&pool_tgt_rw_sem(pool));
+       int rc;
 
+       pool_getref(pool);
+       rc = tgt_check_index(idx, &pool->pool_obds);
        lod_pool_putref(pool);
        return rc;
 }
 
+/**
+ * Find the pool descriptor for the specified pool and return it with a
+ * reference to the caller if found.
+ *
+ * \param[in] lod      LOD on which the pools are configured
+ * \param[in] poolname NUL-terminated name of the pool
+ *
+ * \retval     pointer to pool descriptor on success
+ * \retval     NULL if \a poolname could not be found or poolname is empty
+ */
 struct pool_desc *lod_find_pool(struct lod_device *lod, char *poolname)
 {
        struct pool_desc *pool;
 
        pool = NULL;
        if (poolname[0] != '\0') {
-               pool = cfs_hash_lookup(lod->lod_pools_hash_body, poolname);
-               if (pool == NULL)
-                       CDEBUG(D_CONFIG, "%s: request for an unknown pool ("
-                              LOV_POOLNAMEF")\n",
+               rcu_read_lock();
+               pool = rhashtable_lookup(&lod->lod_pools_hash_body, poolname,
+                                        pools_hash_params);
+               if (pool && !atomic_inc_not_zero(&pool->pool_refcount))
+                       pool = NULL;
+               rcu_read_unlock();
+               if (!pool)
+                       CDEBUG(D_CONFIG,
+                              "%s: request for an unknown pool (" LOV_POOLNAMEF ")\n",
                               lod->lod_child_exp->exp_obd->obd_name, poolname);
                if (pool != NULL && pool_tgt_count(pool) == 0) {
                        CDEBUG(D_CONFIG, "%s: request for an empty pool ("