Whamcloud - gitweb
b=17181
authornathan <nathan>
Fri, 5 Dec 2008 23:54:38 +0000 (23:54 +0000)
committernathan <nathan>
Fri, 5 Dec 2008 23:54:38 +0000 (23:54 +0000)
i=johann
i=nathan
add pool refcount
replace rwlock with semaphore

lustre/include/obd.h
lustre/lov/lov_obd.c
lustre/lov/lov_pack.c
lustre/lov/lov_pool.c
lustre/lov/lov_qos.c

index 3aa9f0e..cf5979c 100644 (file)
@@ -622,7 +622,7 @@ struct ost_pool {
                                                 lov_obd->lov_tgts */
         unsigned int        op_count;        /* number of OSTs in the array */
         unsigned int        op_size;         /* allocated size of lp_array */
-        rwlock_t            op_rwlock;       /* to protect lov_pool use */
+        struct rw_semaphore op_rw_sem;       /* to protect ost_pool use */
 };
 
 /* Round-robin allocator data */
@@ -662,12 +662,13 @@ struct lov_tgt_desc {
 #define pool_tgt_size(_p)   _p->pool_obds.op_size
 #define pool_tgt_count(_p)  _p->pool_obds.op_count
 #define pool_tgt_array(_p)  _p->pool_obds.op_array
-#define pool_tgt_rwlock(_p) _p->pool_obds.op_rwlock
+#define pool_tgt_rw_sem(_p) _p->pool_obds.op_rw_sem
 #define pool_tgt(_p, _i)    _p->pool_lov->lov_tgts[_p->pool_obds.op_array[_i]]
 
 struct pool_desc {
         char                  pool_name[LOV_MAXPOOLNAME + 1]; /* name of pool */
         struct ost_pool       pool_obds;              /* pool members */
+        atomic_t              pool_refcount;          /* pool ref. counter */
         struct lov_qos_rr     pool_rr;                /* round robin qos */
         struct hlist_node     pool_hash;              /* access by poolname */
         struct list_head      pool_list;              /* serial access */
index eb32c62..cc2097e 100644 (file)
@@ -879,10 +879,8 @@ static int lov_cleanup(struct obd_device *obd)
 
         list_for_each_safe(pos, tmp, &lov->lov_pool_list) {
                 pool = list_entry(pos, struct pool_desc, pool_list);
-                list_del(&pool->pool_list);
-                lov_ost_pool_free(&(pool->pool_rr.lqr_pool));
-                lov_ost_pool_free(&(pool->pool_obds));
-                OBD_FREE_PTR(pool);
+                /* free pool structs */
+                lov_pool_del(obd, pool->pool_name);
         }
         lov_ost_pool_free(&(lov->lov_qos.lq_rr.lqr_pool));
         lov_ost_pool_free(&lov->lov_packed);
index d992e54..12c2d28 100644 (file)
@@ -483,12 +483,17 @@ static int __lov_setstripe(struct obd_export *exp, struct lov_stripe_md **lsmp,
                     (typeof(lumv3.lmm_stripe_offset))(-1)) {
                         rc = lov_check_index_in_pool(lumv3.lmm_stripe_offset,
                                                      pool);
-                        if (rc < 0)
+                        if (rc < 0) {
+                                lh_put(lov->lov_pools_hash_body,
+                                       &pool->pool_hash);
                                 RETURN(-EINVAL);
+                        }
                 }
 
                 if (stripe_count > pool_tgt_count(pool))
                         stripe_count = pool_tgt_count(pool);
+
+                lh_put(lov->lov_pools_hash_body, &pool->pool_hash);
         }
 
         if ((__u64)lumv1->lmm_stripe_size * stripe_count > ~0UL) {
index f63fac7..1836691 100644 (file)
 #include <obd.h>
 #include "lov_internal.h"
 
+static void lov_pool_getref(struct pool_desc *pool) {
+        atomic_inc(&pool->pool_refcount);
+}
+
+static void lov_pool_putref(struct pool_desc *pool) {
+        if (atomic_dec_and_test(&pool->pool_refcount)) {
+                lov_ost_pool_free(&(pool->pool_rr.lqr_pool));
+                lov_ost_pool_free(&(pool->pool_obds));
+                OBD_FREE_PTR(pool);
+        }
+}
+
+
 /*
  * hash function using a Rotating Hash algorithm
  * Knuth, D. The Art of Computer Programming,
@@ -99,6 +112,7 @@ static void *pool_hashrefcount_get(struct hlist_node *hnode)
         struct pool_desc *pool;
 
         pool = hlist_entry(hnode, struct pool_desc, pool_hash);
+        lov_pool_getref(pool);
         return (pool);
 }
 
@@ -107,6 +121,7 @@ static void *pool_hashrefcount_put(struct hlist_node *hnode)
         struct pool_desc *pool;
 
         pool = hlist_entry(hnode, struct pool_desc, pool_hash);
+        lov_pool_putref(pool);
         return (pool);
 }
 
@@ -149,14 +164,14 @@ static void *pool_proc_next(struct seq_file *s, void *v, loff_t *pos)
 
         /* iterate to find a non empty entry */
         prev_idx = iter->idx;
-        read_lock(&pool_tgt_rwlock(iter->pool));
+        down_read(&pool_tgt_rw_sem(iter->pool));
         iter->idx++;
         if (iter->idx == pool_tgt_count(iter->pool)) {
                 iter->idx = prev_idx; /* we stay on the last entry */
-                read_unlock(&pool_tgt_rwlock(iter->pool));
+                up_read(&pool_tgt_rw_sem(iter->pool));
                 return NULL;
         }
-        read_unlock(&pool_tgt_rwlock(iter->pool));
+        up_read(&pool_tgt_rw_sem(iter->pool));
         (*pos)++;
         /* return != NULL to continue */
         return iter;
@@ -167,11 +182,16 @@ static void *pool_proc_start(struct seq_file *s, loff_t *pos)
         struct pool_desc *pool = (struct pool_desc *)s->private;
         struct pool_iterator *iter;
 
+        lov_pool_getref(pool);
         if ((pool_tgt_count(pool) == 0) ||
-            (*pos >= pool_tgt_count(pool)))
+            (*pos >= pool_tgt_count(pool))) {
+                /* iter is not created, so stop() has no way to
+                 * find pool to dec ref */
+                lov_pool_putref(pool);
                 return NULL;
+        }
 
-        OBD_ALLOC(iter, sizeof(struct pool_iterator));
+        OBD_ALLOC_PTR(iter);
         if (!iter)
                 return ERR_PTR(-ENOMEM);
         iter->magic = POOL_IT_MAGIC;
@@ -206,7 +226,8 @@ static void pool_proc_stop(struct seq_file *s, void *v)
                 /* we restore s->private so next call to pool_proc_start()
                  * will work */
                 s->private = iter->pool;
-                OBD_FREE(iter, sizeof(struct pool_iterator));
+                lov_pool_putref(iter->pool);
+                OBD_FREE_PTR(iter);
         }
         return;
 }
@@ -220,9 +241,9 @@ static int pool_proc_show(struct seq_file *s, void *v)
         LASSERT(iter->pool != NULL);
         LASSERT(iter->idx <= pool_tgt_count(iter->pool));
 
-        read_lock(&pool_tgt_rwlock(iter->pool));
+        down_read(&pool_tgt_rw_sem(iter->pool));
         tgt = pool_tgt(iter->pool, iter->idx);
-        read_unlock(&pool_tgt_rwlock(iter->pool));
+        up_read(&pool_tgt_rw_sem(iter->pool));
         if (tgt)
                 seq_printf(s, "%s\n", obd_uuid2str(&(tgt->ltd_uuid)));
 
@@ -260,9 +281,12 @@ void lov_dump_pool(int level, struct pool_desc *pool)
 {
         int i;
 
+        lov_pool_getref(pool);
+
         CDEBUG(level, "pool "LOV_POOLNAMEF" has %d members\n",
                pool->pool_name, pool->pool_obds.op_count);
-        read_lock(&pool_tgt_rwlock(pool));
+        down_read(&pool_tgt_rw_sem(pool));
+
         for (i = 0; i < pool_tgt_count(pool) ; i++) {
                 if (!pool_tgt(pool, i) || !(pool_tgt(pool, i))->ltd_exp)
                         continue;
@@ -270,7 +294,9 @@ void lov_dump_pool(int level, struct pool_desc *pool)
                        pool->pool_name, i,
                        obd_uuid2str(&((pool_tgt(pool, i))->ltd_uuid)));
         }
-        read_unlock(&pool_tgt_rwlock(pool));
+
+        up_read(&pool_tgt_rw_sem(pool));
+        lov_pool_putref(pool);
 }
 
 #define LOV_POOL_INIT_COUNT 2
@@ -280,7 +306,7 @@ int lov_ost_pool_init(struct ost_pool *op, unsigned int count)
                 count = LOV_POOL_INIT_COUNT;
         op->op_array = NULL;
         op->op_count = 0;
-        rwlock_init(&op->op_rwlock);
+        init_rwsem(&op->op_rw_sem);
         op->op_size = count;
         OBD_ALLOC(op->op_array, op->op_size * sizeof(op->op_array[0]));
         if (op->op_array == NULL) {
@@ -319,7 +345,7 @@ int lov_ost_pool_add(struct ost_pool *op, __u32 idx, unsigned int max_count)
         int rc = 0, i;
         ENTRY;
 
-        write_lock(&op->op_rwlock);
+        down_write(&op->op_rw_sem);
 
         rc = lov_ost_pool_extend(op, max_count);
         if (rc)
@@ -334,7 +360,7 @@ int lov_ost_pool_add(struct ost_pool *op, __u32 idx, unsigned int max_count)
         op->op_array[op->op_count] = idx;
         op->op_count++;
 out:
-        write_unlock(&op->op_rwlock);
+        up_write(&op->op_rw_sem);
         return rc;
 }
 
@@ -342,17 +368,19 @@ int lov_ost_pool_remove(struct ost_pool *op, __u32 idx)
 {
         int i;
 
-        write_lock(&op->op_rwlock);
+        down_write(&op->op_rw_sem);
+
         for (i = 0; i < op->op_count; i++) {
                 if (op->op_array[i] == idx) {
                         memmove(&op->op_array[i], &op->op_array[i + 1],
                                 (op->op_count - i - 1) * sizeof(op->op_array[0]));
                         op->op_count--;
-                        write_unlock(&op->op_rwlock);
+                        up_write(&op->op_rw_sem);
                         return 0;
                 }
         }
-        write_unlock(&op->op_rwlock);
+
+        up_write(&op->op_rw_sem);
         return -EINVAL;
 }
 
@@ -361,12 +389,14 @@ int lov_ost_pool_free(struct ost_pool *op)
         if (op->op_size == 0)
                 return 0;
 
-        write_lock(&op->op_rwlock);
+        down_write(&op->op_rw_sem);
+
         OBD_FREE(op->op_array, op->op_size * sizeof(op->op_array[0]));
         op->op_array = NULL;
         op->op_count = 0;
         op->op_size = 0;
-        write_unlock(&op->op_rwlock);
+
+        up_write(&op->op_rw_sem);
         return 0;
 }
 
@@ -376,19 +406,24 @@ int lov_pool_new(struct obd_device *obd, char *poolname)
         struct lov_obd *lov;
         struct pool_desc *new_pool;
         int rc;
+        ENTRY;
 
         lov = &(obd->u.lov);
 
         if (strlen(poolname) > LOV_MAXPOOLNAME)
-                return -ENAMETOOLONG;
+                RETURN(-ENAMETOOLONG);
 
         OBD_ALLOC_PTR(new_pool);
         if (new_pool == NULL)
-                return -ENOMEM;
+                RETURN(-ENOMEM);
 
         strncpy(new_pool->pool_name, poolname, LOV_MAXPOOLNAME);
         new_pool->pool_name[LOV_MAXPOOLNAME] = '\0';
         new_pool->pool_lov = lov;
+        /* ref count init to 1 because when created a pool is always used
+         * up to deletion
+         */
+        atomic_set(&new_pool->pool_refcount, 1);
         rc = lov_ost_pool_init(&new_pool->pool_obds, 0);
         if (rc)
                GOTO(out_err, rc);
@@ -412,6 +447,7 @@ int lov_pool_new(struct obd_device *obd, char *poolname)
         spin_lock(&obd->obd_dev_lock);
         list_add_tail(&new_pool->pool_list, &lov->lov_pool_list);
         lov->lov_pool_count++;
+
         spin_unlock(&obd->obd_dev_lock);
 
         CDEBUG(D_CONFIG, LOV_POOLNAMEF" is pool #%d\n",
@@ -419,6 +455,8 @@ int lov_pool_new(struct obd_device *obd, char *poolname)
 
 #ifdef LPROCFS
         /* ifdef needed for liblustre */
+        /* get ref for /proc file */
+        lov_pool_getref(new_pool);
         new_pool->pool_proc_entry = lprocfs_add_simple(lov->lov_pool_proc_entry,
                                                        poolname, NULL, NULL,
                                                        new_pool,
@@ -428,9 +466,10 @@ int lov_pool_new(struct obd_device *obd, char *poolname)
         if (IS_ERR(new_pool->pool_proc_entry)) {
                 CWARN("Cannot add proc pool entry "LOV_POOLNAMEF"\n", poolname);
                 new_pool->pool_proc_entry = NULL;
+                lov_pool_putref(new_pool);
         }
 
-        return 0;
+        RETURN(0);
 
 out_err:
         OBD_FREE_PTR(new_pool);
@@ -441,33 +480,40 @@ int lov_pool_del(struct obd_device *obd, char *poolname)
 {
         struct lov_obd *lov;
         struct pool_desc *pool;
+        ENTRY;
 
         lov = &(obd->u.lov);
 
         spin_lock(&obd->obd_dev_lock);
-        pool = lustre_hash_lookup(lov->lov_pools_hash_body,
-                                             poolname);
+
+        pool = lustre_hash_lookup(lov->lov_pools_hash_body, poolname);
         if (pool == NULL) {
                 spin_unlock(&obd->obd_dev_lock);
-                return -ENOENT;
+                RETURN(-ENOENT);
         }
 
 #ifdef LPROCFS
-        if (pool->pool_proc_entry != NULL)
+        if (pool->pool_proc_entry != NULL) {
                 remove_proc_entry(pool->pool_proc_entry->name,
                                   pool->pool_proc_entry->parent);
+                /* remove ref for /proc file */
+                lov_pool_putref(pool);
+        }
 #endif
 
         lustre_hash_del_key(lov->lov_pools_hash_body, poolname);
+        list_del_init(&pool->pool_list);
 
         lov->lov_pool_count--;
-
+        lh_put(lov->lov_pools_hash_body, &pool->pool_hash);
         spin_unlock(&obd->obd_dev_lock);
 
-        /* pool struct is not freed because it may be used by
-         * some open in /proc. The struct is freed at lov_cleanup()
-        */
-        return 0;
+        /* remove ref got when pool was created in memory
+         * pool will be freed when refount will reach 0
+         */
+        lov_pool_putref(pool);
+
+        RETURN(0);
 }
 
 
@@ -478,12 +524,13 @@ int lov_pool_add(struct obd_device *obd, char *poolname, char *ostname)
         struct pool_desc *pool;
         unsigned int i, lov_idx;
         int rc;
+        ENTRY;
 
         lov = &(obd->u.lov);
 
         pool = lustre_hash_lookup(lov->lov_pools_hash_body, poolname);
         if (pool == NULL)
-                return -ENOENT;
+                RETURN(-ENOENT);
 
         obd_str2uuid(&ost_uuid, ostname);
 
@@ -500,7 +547,7 @@ int lov_pool_add(struct obd_device *obd, char *poolname, char *ostname)
         /* test if ost found in lov */
         if (i == lov->desc.ld_tgt_count) {
                 mutex_up(&lov->lov_lock);
-                return -EINVAL;
+                GOTO(out, rc = -EINVAL);
         }
         mutex_up(&lov->lov_lock);
 
@@ -508,13 +555,17 @@ int lov_pool_add(struct obd_device *obd, char *poolname, char *ostname)
 
         rc = lov_ost_pool_add(&pool->pool_obds, lov_idx, lov->lov_tgt_size);
         if (rc)
-                return rc;
+                GOTO(out, rc);
 
         pool->pool_rr.lqr_dirty = 1;
 
         CDEBUG(D_CONFIG, "Added %s to "LOV_POOLNAMEF" as member %d\n",
                ostname, poolname,  pool_tgt_count(pool));
-        return 0;
+
+        EXIT;
+out:
+        lh_put(lov->lov_pools_hash_body, &pool->pool_hash);
+        return rc;
 }
 
 int lov_pool_remove(struct obd_device *obd, char *poolname, char *ostname)
@@ -523,6 +574,8 @@ int lov_pool_remove(struct obd_device *obd, char *poolname, char *ostname)
         struct lov_obd *lov;
         struct pool_desc *pool;
         unsigned int i, lov_idx;
+        int rc = 0;
+        ENTRY;
 
         lov = &(obd->u.lov);
 
@@ -530,7 +583,7 @@ int lov_pool_remove(struct obd_device *obd, char *poolname, char *ostname)
         pool = lustre_hash_lookup(lov->lov_pools_hash_body, poolname);
         if (pool == NULL) {
                 spin_unlock(&obd->obd_dev_lock);
-                return -ENOENT;
+                RETURN(-ENOENT);
         }
 
         obd_str2uuid(&ost_uuid, ostname);
@@ -547,7 +600,7 @@ int lov_pool_remove(struct obd_device *obd, char *poolname, char *ostname)
         /* test if ost found in lov */
         if (i == lov->desc.ld_tgt_count) {
                 spin_unlock(&obd->obd_dev_lock);
-                return -EINVAL;
+                GOTO(out, rc = -EINVAL);
         }
 
         spin_unlock(&obd->obd_dev_lock);
@@ -558,24 +611,39 @@ int lov_pool_remove(struct obd_device *obd, char *poolname, char *ostname)
 
         pool->pool_rr.lqr_dirty = 1;
 
-        CDEBUG(D_CONFIG, "%s removed from "LOV_POOLNAMEF"\n", ostname, poolname);
+        CDEBUG(D_CONFIG, "%s removed from "LOV_POOLNAMEF"\n", ostname,
+               poolname);
 
-        return 0;
+        EXIT;
+out:
+        lh_put(lov->lov_pools_hash_body, &pool->pool_hash);
+        return rc;
 }
 
 int lov_check_index_in_pool(__u32 idx, struct pool_desc *pool)
 {
-        int i;
+        int i, rc;
+        ENTRY;
+
+        /* caller may no have a ref on pool if it got the pool
+         * without calling lov_find_pool() (e.g. go through the lov pool
+         * list)
+         */
+        lov_pool_getref(pool);
+
+        down_read(&pool_tgt_rw_sem(pool));
 
-        read_lock(&pool_tgt_rwlock(pool));
         for (i = 0; i < pool_tgt_count(pool); i++) {
-                if (pool_tgt_array(pool)[i] == idx) {
-                        read_unlock(&pool_tgt_rwlock(pool));
-                        return 0;
-                }
+                if (pool_tgt_array(pool)[i] == idx)
+                        GOTO(out, rc = 0);
         }
-        read_unlock(&pool_tgt_rwlock(pool));
-        return -ENOENT;
+        rc = -ENOENT;
+        EXIT;
+out:
+        up_read(&pool_tgt_rw_sem(pool));
+
+        lov_pool_putref(pool);
+        return rc;
 }
 
 struct pool_desc *lov_find_pool(struct lov_obd *lov, char *poolname)
@@ -591,6 +659,8 @@ struct pool_desc *lov_find_pool(struct lov_obd *lov, char *poolname)
                 if ((pool != NULL) && (pool_tgt_count(pool) == 0)) {
                         CWARN("Request for an empty pool ("LOV_POOLNAMEF")\n",
                                poolname);
+                        /* pool is ignored, so we remove ref on it */
+                        lh_put(lov->lov_pools_hash_body, &pool->pool_hash);
                         pool = NULL;
                 }
         }
index 5ad5848..45245ed 100644 (file)
@@ -560,7 +560,7 @@ static int alloc_rr(struct lov_obd *lov, int *idx_arr, int *stripe_cnt,
                 osts = &(lov->lov_packed);
                 lqr = &(lov->lov_qos.lq_rr);
         } else {
-                read_lock(&pool_tgt_rwlock(pool));
+                down_read(&pool_tgt_rw_sem(pool));
                 osts = &(pool->pool_obds);
                 lqr = &(pool->pool_rr);
         }
@@ -637,8 +637,12 @@ repeat_find:
 
         *stripe_cnt = idx_pos - idx_arr;
 out:
-        if (pool != NULL)
-                read_unlock(&pool_tgt_rwlock(pool));
+        if (pool != NULL) {
+                up_read(&pool_tgt_rw_sem(pool));
+                /* put back ref got by lov_find_pool() */
+                lh_put(lov->lov_pools_hash_body, &pool->pool_hash);
+        }
+
         RETURN(rc);
 }
 
@@ -657,7 +661,7 @@ static int alloc_specific(struct lov_obd *lov, struct lov_stripe_md *lsm,
         if (pool == NULL) {
                 osts = &(lov->lov_packed);
         } else {
-                read_lock(&pool_tgt_rwlock(pool));
+                down_read(&pool_tgt_rw_sem(pool));
                 osts = &(pool->pool_obds);
         }
 
@@ -725,8 +729,12 @@ repeat_find:
                lsm->lsm_stripe_count);
         rc = -EFBIG;
 out:
-        if (pool != NULL)
-                read_unlock(&pool_tgt_rwlock(pool));
+        if (pool != NULL) {
+                up_read(&pool_tgt_rw_sem(pool));
+                /* put back ref got by lov_find_pool() */
+                lh_put(lov->lov_pools_hash_body, &pool->pool_hash);
+        }
+
         RETURN(rc);
 }
 
@@ -756,7 +764,7 @@ static int alloc_qos(struct obd_export *exp, int *idx_arr, int *stripe_cnt,
                 osts = &(lov->lov_packed);
                 lqr = &(lov->lov_qos.lq_rr);
         } else {
-                read_lock(&pool_tgt_rwlock(pool));
+                down_read(&pool_tgt_rw_sem(pool));
                 osts = &(pool->pool_obds);
                 lqr = &(pool->pool_rr);
         }
@@ -916,8 +924,11 @@ out:
         up_write(&lov->lov_qos.lq_rw_sem);
 
 out_nolock:
-        if (pool != NULL)
-                read_unlock(&pool_tgt_rwlock(pool));
+        if (pool != NULL) {
+                up_read(&pool_tgt_rw_sem(pool));
+                /* put back ref got by lov_find_pool() */
+                lh_put(lov->lov_pools_hash_body, &pool->pool_hash);
+        }
 
         if (rc == -EAGAIN)
                 rc = alloc_rr(lov, idx_arr, stripe_cnt, poolname, flags);