Whamcloud - gitweb
b=17181
[fs/lustre-release.git] / lustre / lov / lov_pool.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see [sun.com URL with a
20  * copy of GPLv2].
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/lov/lov_pool.c
37  *
38  * OST pool methods
39  *
40  * Author: Jacques-Charles LAFOUCRIERE <jc.lafoucriere@cea.fr>
41  */
42
43 #define DEBUG_SUBSYSTEM S_LOV
44
45 #ifdef __KERNEL__
46 #include <libcfs/libcfs.h>
47 #else
48 #include <liblustre.h>
49 #endif
50
51 #include <obd.h>
52 #include "lov_internal.h"
53
54 static void lov_pool_getref(struct pool_desc *pool) {
55         atomic_inc(&pool->pool_refcount);
56 }
57
58 static void lov_pool_putref(struct pool_desc *pool) {
59         if (atomic_dec_and_test(&pool->pool_refcount)) {
60                 lov_ost_pool_free(&(pool->pool_rr.lqr_pool));
61                 lov_ost_pool_free(&(pool->pool_obds));
62                 OBD_FREE_PTR(pool);
63         }
64 }
65
66
67 /*
68  * hash function using a Rotating Hash algorithm
69  * Knuth, D. The Art of Computer Programming,
70  * Volume 3: Sorting and Searching,
71  * Chapter 6.4.
72  * Addison Wesley, 1973
73  */
74 static __u32 pool_hashfn(lustre_hash_t *hash_body, void *key, unsigned mask)
75 {
76         int i;
77         __u32 result;
78         char *poolname;
79
80         result = 0;
81         poolname = (char *)key;
82         for (i = 0; i < LOV_MAXPOOLNAME; i++) {
83                 if (poolname[i] == '\0')
84                         break;
85                 result = (result << 4)^(result >> 28) ^  poolname[i];
86         }
87         return (result % mask);
88 }
89
90 static void *pool_key(struct hlist_node *hnode)
91 {
92         struct pool_desc *pool;
93
94         pool = hlist_entry(hnode, struct pool_desc, pool_hash);
95         return (pool->pool_name);
96 }
97
98 static int pool_hashkey_compare(void *key, struct hlist_node *compared_hnode)
99 {
100         char *pool_name;
101         struct pool_desc *pool;
102         int rc;
103
104         pool_name = (char *)key;
105         pool = hlist_entry(compared_hnode, struct pool_desc, pool_hash);
106         rc = strncmp(pool_name, pool->pool_name, LOV_MAXPOOLNAME);
107         return (!rc);
108 }
109
110 static void *pool_hashrefcount_get(struct hlist_node *hnode)
111 {
112         struct pool_desc *pool;
113
114         pool = hlist_entry(hnode, struct pool_desc, pool_hash);
115         lov_pool_getref(pool);
116         return (pool);
117 }
118
119 static void *pool_hashrefcount_put(struct hlist_node *hnode)
120 {
121         struct pool_desc *pool;
122
123         pool = hlist_entry(hnode, struct pool_desc, pool_hash);
124         lov_pool_putref(pool);
125         return (pool);
126 }
127
128 lustre_hash_ops_t pool_hash_operations = {
129         .lh_hash        = pool_hashfn,
130         .lh_key         = pool_key,
131         .lh_compare     = pool_hashkey_compare,
132         .lh_get         = pool_hashrefcount_get,
133         .lh_put         = pool_hashrefcount_put,
134 };
135
136 #ifdef LPROCFS
137 /* ifdef needed for liblustre support */
138 /*
139  * pool /proc seq_file methods
140  */
141 /*
142  * iterator is used to go through the target pool entries
143  * index is the current entry index in the lp_array[] array
144  * index >= pos returned to the seq_file interface
145  * pos is from 0 to (pool->pool_obds.op_count - 1)
146  */
147 #define POOL_IT_MAGIC 0xB001CEA0
148 struct pool_iterator {
149         int magic;
150         struct pool_desc *pool;
151         int idx;        /* from 0 to pool_tgt_size - 1 */
152 };
153
154 static void *pool_proc_next(struct seq_file *s, void *v, loff_t *pos)
155 {
156         struct pool_iterator *iter = (struct pool_iterator *)s->private;
157         int prev_idx;
158
159         LASSERTF(iter->magic == POOL_IT_MAGIC, "%08X", iter->magic);
160
161         /* test if end of file */
162         if (*pos >= pool_tgt_count(iter->pool))
163                 return NULL;
164
165         /* iterate to find a non empty entry */
166         prev_idx = iter->idx;
167         down_read(&pool_tgt_rw_sem(iter->pool));
168         iter->idx++;
169         if (iter->idx == pool_tgt_count(iter->pool)) {
170                 iter->idx = prev_idx; /* we stay on the last entry */
171                 up_read(&pool_tgt_rw_sem(iter->pool));
172                 return NULL;
173         }
174         up_read(&pool_tgt_rw_sem(iter->pool));
175         (*pos)++;
176         /* return != NULL to continue */
177         return iter;
178 }
179
180 static void *pool_proc_start(struct seq_file *s, loff_t *pos)
181 {
182         struct pool_desc *pool = (struct pool_desc *)s->private;
183         struct pool_iterator *iter;
184
185         lov_pool_getref(pool);
186         if ((pool_tgt_count(pool) == 0) ||
187             (*pos >= pool_tgt_count(pool))) {
188                 /* iter is not created, so stop() has no way to
189                  * find pool to dec ref */
190                 lov_pool_putref(pool);
191                 return NULL;
192         }
193
194         OBD_ALLOC_PTR(iter);
195         if (!iter)
196                 return ERR_PTR(-ENOMEM);
197         iter->magic = POOL_IT_MAGIC;
198         iter->pool = pool;
199         iter->idx = 0;
200
201         /* we use seq_file private field to memorized iterator so
202          * we can free it at stop() */
203         /* /!\ do not forget to restore it to pool before freeing it */
204         s->private = iter;
205         if (*pos > 0) {
206                 loff_t i;
207                 void *ptr;
208
209                 i = 0;
210                 do {
211                      ptr = pool_proc_next(s, &iter, &i);
212                 } while ((i < *pos) && (ptr != NULL));
213                 return ptr;
214         }
215         return iter;
216 }
217
218 static void pool_proc_stop(struct seq_file *s, void *v)
219 {
220         struct pool_iterator *iter = (struct pool_iterator *)s->private;
221
222         /* in some cases stop() method is called 2 times, without
223          * calling start() method (see seq_read() from fs/seq_file.c)
224          * we have to free only if s->private is an iterator */
225         if ((iter) && (iter->magic == POOL_IT_MAGIC)) {
226                 /* we restore s->private so next call to pool_proc_start()
227                  * will work */
228                 s->private = iter->pool;
229                 lov_pool_putref(iter->pool);
230                 OBD_FREE_PTR(iter);
231         }
232         return;
233 }
234
235 static int pool_proc_show(struct seq_file *s, void *v)
236 {
237         struct pool_iterator *iter = (struct pool_iterator *)v;
238         struct lov_tgt_desc *tgt;
239
240         LASSERTF(iter->magic == POOL_IT_MAGIC, "%08X", iter->magic);
241         LASSERT(iter->pool != NULL);
242         LASSERT(iter->idx <= pool_tgt_count(iter->pool));
243
244         down_read(&pool_tgt_rw_sem(iter->pool));
245         tgt = pool_tgt(iter->pool, iter->idx);
246         up_read(&pool_tgt_rw_sem(iter->pool));
247         if (tgt)
248                 seq_printf(s, "%s\n", obd_uuid2str(&(tgt->ltd_uuid)));
249
250         return 0;
251 }
252
253 static struct seq_operations pool_proc_ops = {
254         .start          = pool_proc_start,
255         .next           = pool_proc_next,
256         .stop           = pool_proc_stop,
257         .show           = pool_proc_show,
258 };
259
260 static int pool_proc_open(struct inode *inode, struct file *file)
261 {
262         int rc;
263
264         rc = seq_open(file, &pool_proc_ops);
265         if (!rc) {
266                 struct seq_file *s = file->private_data;
267                 s->private = PROC_I(inode)->pde->data;
268         }
269         return rc;
270 }
271
272 static struct file_operations pool_proc_operations = {
273         .open           = pool_proc_open,
274         .read           = seq_read,
275         .llseek         = seq_lseek,
276         .release        = seq_release,
277 };
278 #endif /* LPROCFS */
279
280 void lov_dump_pool(int level, struct pool_desc *pool)
281 {
282         int i;
283
284         lov_pool_getref(pool);
285
286         CDEBUG(level, "pool "LOV_POOLNAMEF" has %d members\n",
287                pool->pool_name, pool->pool_obds.op_count);
288         down_read(&pool_tgt_rw_sem(pool));
289
290         for (i = 0; i < pool_tgt_count(pool) ; i++) {
291                 if (!pool_tgt(pool, i) || !(pool_tgt(pool, i))->ltd_exp)
292                         continue;
293                 CDEBUG(level, "pool "LOV_POOLNAMEF"[%d] = %s\n",
294                        pool->pool_name, i,
295                        obd_uuid2str(&((pool_tgt(pool, i))->ltd_uuid)));
296         }
297
298         up_read(&pool_tgt_rw_sem(pool));
299         lov_pool_putref(pool);
300 }
301
302 #define LOV_POOL_INIT_COUNT 2
303 int lov_ost_pool_init(struct ost_pool *op, unsigned int count)
304 {
305         if (count == 0)
306                 count = LOV_POOL_INIT_COUNT;
307         op->op_array = NULL;
308         op->op_count = 0;
309         init_rwsem(&op->op_rw_sem);
310         op->op_size = count;
311         OBD_ALLOC(op->op_array, op->op_size * sizeof(op->op_array[0]));
312         if (op->op_array == NULL) {
313                 op->op_size = 0;
314                 return -ENOMEM;
315         }
316         return 0;
317 }
318
319 /* Caller must hold write op_rwlock */
320 int lov_ost_pool_extend(struct ost_pool *op, unsigned int max_count)
321 {
322         __u32 *new;
323         int new_size;
324
325         LASSERT(max_count != 0);
326
327         if (op->op_count < op->op_size)
328                 return 0;
329
330         new_size = min(max_count, 2 * op->op_size);
331         OBD_ALLOC(new, new_size * sizeof(op->op_array[0]));
332         if (new == NULL)
333                 return -ENOMEM;
334
335         /* copy old array to new one */
336         memcpy(new, op->op_array, op->op_size * sizeof(op->op_array[0]));
337         OBD_FREE(op->op_array, op->op_size * sizeof(op->op_array[0]));
338         op->op_array = new;
339         op->op_size = new_size;
340         return 0;
341 }
342
343 int lov_ost_pool_add(struct ost_pool *op, __u32 idx, unsigned int max_count)
344 {
345         int rc = 0, i;
346         ENTRY;
347
348         down_write(&op->op_rw_sem);
349
350         rc = lov_ost_pool_extend(op, max_count);
351         if (rc)
352                 GOTO(out, rc);
353
354         /* search ost in pool array */
355         for (i = 0; i < op->op_count; i++) {
356                 if (op->op_array[i] == idx)
357                         GOTO(out, rc = -EEXIST);
358         }
359         /* ost not found we add it */
360         op->op_array[op->op_count] = idx;
361         op->op_count++;
362 out:
363         up_write(&op->op_rw_sem);
364         return rc;
365 }
366
367 int lov_ost_pool_remove(struct ost_pool *op, __u32 idx)
368 {
369         int i;
370
371         down_write(&op->op_rw_sem);
372
373         for (i = 0; i < op->op_count; i++) {
374                 if (op->op_array[i] == idx) {
375                         memmove(&op->op_array[i], &op->op_array[i + 1],
376                                 (op->op_count - i - 1) * sizeof(op->op_array[0]));
377                         op->op_count--;
378                         up_write(&op->op_rw_sem);
379                         return 0;
380                 }
381         }
382
383         up_write(&op->op_rw_sem);
384         return -EINVAL;
385 }
386
387 int lov_ost_pool_free(struct ost_pool *op)
388 {
389         if (op->op_size == 0)
390                 return 0;
391
392         down_write(&op->op_rw_sem);
393
394         OBD_FREE(op->op_array, op->op_size * sizeof(op->op_array[0]));
395         op->op_array = NULL;
396         op->op_count = 0;
397         op->op_size = 0;
398
399         up_write(&op->op_rw_sem);
400         return 0;
401 }
402
403
404 int lov_pool_new(struct obd_device *obd, char *poolname)
405 {
406         struct lov_obd *lov;
407         struct pool_desc *new_pool;
408         int rc;
409         ENTRY;
410
411         lov = &(obd->u.lov);
412
413         if (strlen(poolname) > LOV_MAXPOOLNAME)
414                 RETURN(-ENAMETOOLONG);
415
416         OBD_ALLOC_PTR(new_pool);
417         if (new_pool == NULL)
418                 RETURN(-ENOMEM);
419
420         strncpy(new_pool->pool_name, poolname, LOV_MAXPOOLNAME);
421         new_pool->pool_name[LOV_MAXPOOLNAME] = '\0';
422         new_pool->pool_lov = lov;
423         /* ref count init to 1 because when created a pool is always used
424          * up to deletion
425          */
426         atomic_set(&new_pool->pool_refcount, 1);
427         rc = lov_ost_pool_init(&new_pool->pool_obds, 0);
428         if (rc)
429                GOTO(out_err, rc);
430
431         memset(&(new_pool->pool_rr), 0, sizeof(struct lov_qos_rr));
432         rc = lov_ost_pool_init(&new_pool->pool_rr.lqr_pool, 0);
433         if (rc) {
434                 lov_ost_pool_free(&new_pool->pool_obds);
435                 GOTO(out_err, rc);
436         }
437
438         INIT_HLIST_NODE(&new_pool->pool_hash);
439         rc = lustre_hash_add_unique(lov->lov_pools_hash_body, poolname,
440                                     &new_pool->pool_hash);
441         if (rc) {
442                 lov_ost_pool_free(&new_pool->pool_rr.lqr_pool);
443                 lov_ost_pool_free(&new_pool->pool_obds);
444                 GOTO(out_err, rc = -EEXIST);
445         }
446
447         spin_lock(&obd->obd_dev_lock);
448         list_add_tail(&new_pool->pool_list, &lov->lov_pool_list);
449         lov->lov_pool_count++;
450
451         spin_unlock(&obd->obd_dev_lock);
452
453         CDEBUG(D_CONFIG, LOV_POOLNAMEF" is pool #%d\n",
454                poolname, lov->lov_pool_count);
455
456 #ifdef LPROCFS
457         /* ifdef needed for liblustre */
458         /* get ref for /proc file */
459         lov_pool_getref(new_pool);
460         new_pool->pool_proc_entry = lprocfs_add_simple(lov->lov_pool_proc_entry,
461                                                        poolname, NULL, NULL,
462                                                        new_pool,
463                                                        &pool_proc_operations);
464 #endif
465
466         if (IS_ERR(new_pool->pool_proc_entry)) {
467                 CWARN("Cannot add proc pool entry "LOV_POOLNAMEF"\n", poolname);
468                 new_pool->pool_proc_entry = NULL;
469                 lov_pool_putref(new_pool);
470         }
471
472         RETURN(0);
473
474 out_err:
475         OBD_FREE_PTR(new_pool);
476         return rc;
477 }
478
479 int lov_pool_del(struct obd_device *obd, char *poolname)
480 {
481         struct lov_obd *lov;
482         struct pool_desc *pool;
483         ENTRY;
484
485         lov = &(obd->u.lov);
486
487         spin_lock(&obd->obd_dev_lock);
488
489         pool = lustre_hash_lookup(lov->lov_pools_hash_body, poolname);
490         if (pool == NULL) {
491                 spin_unlock(&obd->obd_dev_lock);
492                 RETURN(-ENOENT);
493         }
494
495 #ifdef LPROCFS
496         if (pool->pool_proc_entry != NULL) {
497                 remove_proc_entry(pool->pool_proc_entry->name,
498                                   pool->pool_proc_entry->parent);
499                 /* remove ref for /proc file */
500                 lov_pool_putref(pool);
501         }
502 #endif
503
504         lustre_hash_del_key(lov->lov_pools_hash_body, poolname);
505         list_del_init(&pool->pool_list);
506
507         lov->lov_pool_count--;
508         lh_put(lov->lov_pools_hash_body, &pool->pool_hash);
509         spin_unlock(&obd->obd_dev_lock);
510
511         /* remove ref got when pool was created in memory
512          * pool will be freed when refount will reach 0
513          */
514         lov_pool_putref(pool);
515
516         RETURN(0);
517 }
518
519
520 int lov_pool_add(struct obd_device *obd, char *poolname, char *ostname)
521 {
522         struct obd_uuid ost_uuid;
523         struct lov_obd *lov;
524         struct pool_desc *pool;
525         unsigned int i, lov_idx;
526         int rc;
527         ENTRY;
528
529         lov = &(obd->u.lov);
530
531         pool = lustre_hash_lookup(lov->lov_pools_hash_body, poolname);
532         if (pool == NULL)
533                 RETURN(-ENOENT);
534
535         obd_str2uuid(&ost_uuid, ostname);
536
537
538         /* search ost in lov array */
539         mutex_down(&lov->lov_lock);
540         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
541                 if (!lov->lov_tgts[i])
542                         continue;
543                 if (obd_uuid_equals(&ost_uuid, &(lov->lov_tgts[i]->ltd_uuid)))
544                         break;
545         }
546
547         /* test if ost found in lov */
548         if (i == lov->desc.ld_tgt_count) {
549                 mutex_up(&lov->lov_lock);
550                 GOTO(out, rc = -EINVAL);
551         }
552         mutex_up(&lov->lov_lock);
553
554         lov_idx = i;
555
556         rc = lov_ost_pool_add(&pool->pool_obds, lov_idx, lov->lov_tgt_size);
557         if (rc)
558                 GOTO(out, rc);
559
560         pool->pool_rr.lqr_dirty = 1;
561
562         CDEBUG(D_CONFIG, "Added %s to "LOV_POOLNAMEF" as member %d\n",
563                ostname, poolname,  pool_tgt_count(pool));
564
565         EXIT;
566 out:
567         lh_put(lov->lov_pools_hash_body, &pool->pool_hash);
568         return rc;
569 }
570
571 int lov_pool_remove(struct obd_device *obd, char *poolname, char *ostname)
572 {
573         struct obd_uuid ost_uuid;
574         struct lov_obd *lov;
575         struct pool_desc *pool;
576         unsigned int i, lov_idx;
577         int rc = 0;
578         ENTRY;
579
580         lov = &(obd->u.lov);
581
582         spin_lock(&obd->obd_dev_lock);
583         pool = lustre_hash_lookup(lov->lov_pools_hash_body, poolname);
584         if (pool == NULL) {
585                 spin_unlock(&obd->obd_dev_lock);
586                 RETURN(-ENOENT);
587         }
588
589         obd_str2uuid(&ost_uuid, ostname);
590
591         /* search ost in lov array, to get index */
592         for (i = 0; i < lov->desc.ld_tgt_count; i++) {
593                 if (!lov->lov_tgts[i])
594                         continue;
595
596                 if (obd_uuid_equals(&ost_uuid, &(lov->lov_tgts[i]->ltd_uuid)))
597                         break;
598         }
599
600         /* test if ost found in lov */
601         if (i == lov->desc.ld_tgt_count) {
602                 spin_unlock(&obd->obd_dev_lock);
603                 GOTO(out, rc = -EINVAL);
604         }
605
606         spin_unlock(&obd->obd_dev_lock);
607
608         lov_idx = i;
609
610         lov_ost_pool_remove(&pool->pool_obds, lov_idx);
611
612         pool->pool_rr.lqr_dirty = 1;
613
614         CDEBUG(D_CONFIG, "%s removed from "LOV_POOLNAMEF"\n", ostname,
615                poolname);
616
617         EXIT;
618 out:
619         lh_put(lov->lov_pools_hash_body, &pool->pool_hash);
620         return rc;
621 }
622
623 int lov_check_index_in_pool(__u32 idx, struct pool_desc *pool)
624 {
625         int i, rc;
626         ENTRY;
627
628         /* caller may no have a ref on pool if it got the pool
629          * without calling lov_find_pool() (e.g. go through the lov pool
630          * list)
631          */
632         lov_pool_getref(pool);
633
634         down_read(&pool_tgt_rw_sem(pool));
635
636         for (i = 0; i < pool_tgt_count(pool); i++) {
637                 if (pool_tgt_array(pool)[i] == idx)
638                         GOTO(out, rc = 0);
639         }
640         rc = -ENOENT;
641         EXIT;
642 out:
643         up_read(&pool_tgt_rw_sem(pool));
644
645         lov_pool_putref(pool);
646         return rc;
647 }
648
649 struct pool_desc *lov_find_pool(struct lov_obd *lov, char *poolname)
650 {
651         struct pool_desc *pool;
652
653         pool = NULL;
654         if (poolname[0] != '\0') {
655                 pool = lustre_hash_lookup(lov->lov_pools_hash_body, poolname);
656                 if (pool == NULL)
657                         CWARN("Request for an unknown pool ("LOV_POOLNAMEF")\n",
658                               poolname);
659                 if ((pool != NULL) && (pool_tgt_count(pool) == 0)) {
660                         CWARN("Request for an empty pool ("LOV_POOLNAMEF")\n",
661                                poolname);
662                         /* pool is ignored, so we remove ref on it */
663                         lh_put(lov->lov_pools_hash_body, &pool->pool_hash);
664                         pool = NULL;
665                 }
666         }
667         return pool;
668 }
669