4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2011, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
31 * lustre/ptlrpc/sec_bulk.c
33 * Author: Eric Mei <ericm@clusterfs.com>
36 #define DEBUG_SUBSYSTEM S_SEC
38 #include <libcfs/linux/linux-mem.h>
41 #include <obd_cksum.h>
42 #include <obd_class.h>
43 #include <obd_support.h>
44 #include <lustre_net.h>
45 #include <lustre_import.h>
46 #include <lustre_dlm.h>
47 #include <lustre_sec.h>
49 #include "ptlrpc_internal.h"
51 /* we have a pool for every power of 2 number of pages <= MAX_BRW_BITS.
52 * most pools will be unused, but that's OK - unused pools are very cheap
54 #define POOLS_COUNT (PTLRPC_MAX_BRW_BITS + 1)
55 #define PAGES_TO_MiB(pages) ((pages) >> (20 - PAGE_SHIFT))
56 #define MiB_TO_PAGES(mb) ((mb) << (20 - PAGE_SHIFT))
57 /* deprecated - see pool_max_memory_mb below */
58 static int enc_pool_max_memory_mb;
59 module_param(enc_pool_max_memory_mb, int, 0644);
60 MODULE_PARM_DESC(enc_pool_max_memory_mb,
61 "Encoding pool max memory (MB), default unlimited (deprecated, please use pool_max_memory_mb)");
63 static int pool_max_memory_mb;
64 module_param(pool_max_memory_mb, int, 0644);
65 MODULE_PARM_DESC(pool_max_memory_mb,
66 "Encoding pool max memory (MB), default unlimited");
71 #define PTRS_PER_PAGE (PAGE_SIZE / sizeof(void *))
73 #define IDLE_IDX_MAX (100)
74 #define IDLE_IDX_WEIGHT (3)
76 #define CACHE_QUIESCENT_PERIOD (20)
78 static struct ptlrpc_page_pool {
79 unsigned long ppp_max_pages; /* maximum pages can hold, const */
80 unsigned int ppp_max_ptr_pages; /* number of ptr_pages, const */
83 * wait queue in case of not enough free pages.
85 wait_queue_head_t ppp_waitq; /* waiting threads */
86 unsigned int ppp_waitqlen; /* wait queue length */
87 unsigned long ppp_pages_short; /* # of pages wanted of in-q users */
88 unsigned int ppp_growing:1; /* during adding pages */
89 unsigned int ppp_order; /* page pool order and index in pools
90 * array (element size is 2^order pages),
94 * indicating how idle the pool is, from 0 to MAX_IDLE_IDX
95 * this is counted based on each time when getting pages from
96 * the pool, not based on time. which means in case that system
97 * is idled for a while but the idle_idx might still be low if no
98 * activities happened in the pool.
100 unsigned long ppp_idle_idx;
102 /* last shrink time due to mem tight */
103 time64_t ppp_last_shrink;
104 time64_t ppp_last_access;
106 /* in-pool pages bookkeeping */
107 spinlock_t ppp_lock; /* protect following fields */
108 unsigned long ppp_total_pages; /* total pages in pool */
109 unsigned long ppp_free_pages; /* current pages available */
112 unsigned long ppp_st_max_pages; /* # of pages ever reached */
113 unsigned int ppp_st_grows; /* # of grows */
114 unsigned int ppp_st_grow_fails; /* # of add pages failures */
115 unsigned int ppp_st_shrinks; /* # of shrinks */
116 unsigned long ppp_st_access; /* # of access */
117 unsigned long ppp_st_missings; /* # of cache missing */
118 unsigned long ppp_st_lowfree; /* lowest free pages reached */
119 unsigned int ppp_st_max_wqlen; /* highest waitqueue length */
120 ktime_t ppp_st_max_wait; /* in nanoseconds */
121 unsigned long ppp_st_outofmem; /* # of out of mem requests */
123 * pointers to ptr_pages, may be vmalloc'd
125 void ***ppp_ptr_pages;
129 struct ll_shrinker_ops ppp_shops;
130 struct shrinker *pool_shrinker;
131 struct mutex add_pages_mutex;
134 static int element_size(struct ptlrpc_page_pool *pool)
136 return 1 << pool->ppp_order;
140 * Keep old name (encrypt_page_pool vs page_pool) for compatibility with user
141 * tools pulling stats
143 * /sys/kernel/debug/lustre/sptlrpc/encrypt_page_pools
145 int encrypt_page_pools_seq_show(struct seq_file *m, void *v)
147 struct ptlrpc_page_pool *pool = page_pools[0];
149 spin_lock(&pool->ppp_lock);
151 "physical pages: %lu\n"
152 "pages per pool: %lu\n"
157 "idle index: %lu/100\n"
158 "last shrink: %llds\n"
159 "last access: %llds\n"
160 "max pages reached: %lu\n"
162 "grows failure: %u\n"
164 "cache access: %lu\n"
165 "cache missing: %lu\n"
166 "low free mark: %lu\n"
167 "max waitqueue depth: %u\n"
168 "max wait time ms: %lld\n"
170 cfs_totalram_pages(), PTRS_PER_PAGE,
172 pool->ppp_max_ptr_pages,
173 pool->ppp_total_pages,
174 pool->ppp_free_pages,
176 ktime_get_seconds() - pool->ppp_last_shrink,
177 ktime_get_seconds() - pool->ppp_last_access,
178 pool->ppp_st_max_pages,
180 pool->ppp_st_grow_fails,
181 pool->ppp_st_shrinks,
183 pool->ppp_st_missings,
184 pool->ppp_st_lowfree,
185 pool->ppp_st_max_wqlen,
186 ktime_to_ms(pool->ppp_st_max_wait),
187 pool->ppp_st_outofmem);
188 spin_unlock(&pool->ppp_lock);
194 * /sys/kernel/debug/lustre/sptlrpc/page_pools
196 int page_pools_seq_show(struct seq_file *m, void *v)
199 struct ptlrpc_page_pool *pool;
201 seq_printf(m, "physical_pages: %lu\n"
203 cfs_totalram_pages());
205 for (pool_order = 0; pool_order < POOLS_COUNT; pool_order++) {
206 pool = page_pools[pool_order];
207 if (!pool->ppp_st_access)
209 spin_lock(&pool->ppp_lock);
210 seq_printf(m, " pool_%dk:\n"
213 " total_pages: %lu\n"
215 " idle_index: %lu/100\n"
216 " last_shrink: %llds\n"
217 " last_access: %llds\n"
218 " max_pages_reached: %lu\n"
220 " grows_failure: %u\n"
222 " cache_access: %lu\n"
223 " cache_missing: %lu\n"
224 " low_free_mark: %lu\n"
225 " max_waitqueue_depth: %u\n"
226 " max_wait_time_ms: %lld\n"
227 " out_of_mem: %lu\n",
228 /* convert from bytes to KiB */
229 element_size(pool) >> 10,
231 pool->ppp_max_ptr_pages * PTRS_PER_PAGE,
232 pool->ppp_total_pages,
233 pool->ppp_free_pages,
235 ktime_get_seconds() - pool->ppp_last_shrink,
236 ktime_get_seconds() - pool->ppp_last_access,
237 pool->ppp_st_max_pages,
239 pool->ppp_st_grow_fails,
240 pool->ppp_st_shrinks,
242 pool->ppp_st_missings,
243 pool->ppp_st_lowfree,
244 pool->ppp_st_max_wqlen,
245 ktime_to_ms(pool->ppp_st_max_wait),
246 pool->ppp_st_outofmem);
248 spin_unlock(&pool->ppp_lock);
253 static void pool_release_free_pages(long npages, struct ptlrpc_page_pool *pool)
256 int p_idx_max1, p_idx_max2;
259 LASSERT(npages <= pool->ppp_free_pages);
260 LASSERT(pool->ppp_free_pages <= pool->ppp_total_pages);
262 /* max pool index before the release */
263 p_idx_max2 = (pool->ppp_total_pages - 1) / PTRS_PER_PAGE;
265 pool->ppp_free_pages -= npages;
266 pool->ppp_total_pages -= npages;
268 /* max pool index after the release */
269 p_idx_max1 = pool->ppp_total_pages == 0 ? -1 :
270 ((pool->ppp_total_pages - 1) / PTRS_PER_PAGE);
272 p_idx = pool->ppp_free_pages / PTRS_PER_PAGE;
273 g_idx = pool->ppp_free_pages % PTRS_PER_PAGE;
274 LASSERT(pool->ppp_ptr_pages[p_idx]);
277 LASSERT(pool->ppp_ptr_pages[p_idx]);
278 LASSERT(pool->ppp_ptr_pages[p_idx][g_idx] != NULL);
280 if (pool->ppp_order == 0)
281 __free_page(pool->ppp_ptr_pages[p_idx][g_idx]);
283 OBD_FREE_LARGE(pool->ppp_ptr_pages[p_idx][g_idx],
285 pool->ppp_ptr_pages[p_idx][g_idx] = NULL;
287 if (++g_idx == PTRS_PER_PAGE) {
293 /* free unused ptr_pages */
294 while (p_idx_max1 < p_idx_max2) {
295 LASSERT(pool->ppp_ptr_pages[p_idx_max2]);
296 OBD_FREE(pool->ppp_ptr_pages[p_idx_max2], PAGE_SIZE);
297 pool->ppp_ptr_pages[p_idx_max2] = NULL;
302 #define SEEKS_TO_ORDER(s) (((s)->seeks >> 8) & 0xff)
303 #define ORDER_TO_SEEKS(i) (DEFAULT_SEEKS | (i << 8))
305 * we try to keep at least PTLRPC_MAX_BRW_PAGES pages in the pool.
307 static unsigned long pool_shrink_count(struct shrinker *s,
308 struct shrink_control *sc)
310 unsigned int pool_order = SEEKS_TO_ORDER(s);
311 struct ptlrpc_page_pool *pool = page_pools[pool_order];
313 * if no pool access for a long time, we consider it's fully
314 * idle. A little race here is fine.
316 if (unlikely(ktime_get_seconds() - pool->ppp_last_access >
317 CACHE_QUIESCENT_PERIOD)) {
318 spin_lock(&pool->ppp_lock);
319 pool->ppp_idle_idx = IDLE_IDX_MAX;
320 spin_unlock(&pool->ppp_lock);
323 LASSERT(pool->ppp_idle_idx <= IDLE_IDX_MAX);
325 return (pool->ppp_free_pages <= PTLRPC_MAX_BRW_PAGES) ? 0 :
326 (pool->ppp_free_pages - PTLRPC_MAX_BRW_PAGES) *
327 (IDLE_IDX_MAX - pool->ppp_idle_idx) / IDLE_IDX_MAX;
331 * we try to keep at least PTLRPC_MAX_BRW_PAGES pages in the pool.
333 static unsigned long pool_shrink_scan(struct shrinker *s,
334 struct shrink_control *sc)
336 /* Get pool number passed as part of pool_shrinker_seeks value */
337 unsigned int pool_order = SEEKS_TO_ORDER(s);
338 struct ptlrpc_page_pool *pool = page_pools[pool_order];
340 spin_lock(&pool->ppp_lock);
341 if (pool->ppp_free_pages <= PTLRPC_MAX_BRW_PAGES)
344 sc->nr_to_scan = min_t(unsigned long, sc->nr_to_scan,
345 pool->ppp_free_pages - PTLRPC_MAX_BRW_PAGES);
346 if (sc->nr_to_scan > 0) {
347 pool_release_free_pages(sc->nr_to_scan, pool);
348 CDEBUG(D_SEC, "released %ld pages, %ld left\n",
349 (long)sc->nr_to_scan, pool->ppp_free_pages);
351 pool->ppp_st_shrinks++;
352 pool->ppp_last_shrink = ktime_get_seconds();
354 spin_unlock(&pool->ppp_lock);
357 * if no pool access for a long time, we consider it's fully idle.
358 * a little race here is fine.
360 if (unlikely(ktime_get_seconds() - pool->ppp_last_access >
361 CACHE_QUIESCENT_PERIOD)) {
362 spin_lock(&pool->ppp_lock);
363 pool->ppp_idle_idx = IDLE_IDX_MAX;
364 spin_unlock(&pool->ppp_lock);
367 LASSERT(pool->ppp_idle_idx <= IDLE_IDX_MAX);
369 return sc->nr_to_scan;
372 #ifndef HAVE_SHRINKER_COUNT
374 * could be called frequently for query (@nr_to_scan == 0).
375 * we try to keep at least PTLRPC_MAX_BRW_PAGES pages in the pool.
377 static int pool_shrink(struct shrinker *shrinker, struct shrink_control *sc)
379 pool_shrink_scan(shrinker, sc);
381 return pool_shrink_count(shrinker, sc);
383 #endif /* HAVE_SHRINKER_COUNT */
386 int npages_to_nptr_pages(unsigned long npages)
388 return (int) ((npages + PTRS_PER_PAGE - 1) / PTRS_PER_PAGE);
392 * return how many pages cleaned up.
394 static unsigned long pool_cleanup(void ***ptr_pages, int nptr_pages,
395 struct ptlrpc_page_pool *pool)
397 unsigned long cleaned = 0;
400 for (i = 0; i < nptr_pages; i++) {
402 for (j = 0; j < PTRS_PER_PAGE; j++) {
403 if (ptr_pages[i][j]) {
404 if (pool->ppp_order == 0) {
405 __free_page(ptr_pages[i][j]);
407 OBD_FREE_LARGE(ptr_pages[i][j],
413 OBD_FREE(ptr_pages[i], PAGE_SIZE);
422 * merge @nptr_pages pointed by @ptr_pages which contains @npages new pages
425 * we have options to avoid most memory copy with some tricks. but we choose
426 * the simplest way to avoid complexity. It's not frequently called.
428 static void pool_insert_ptrs(void ***ptr_pages, int nptr_pages, int npages,
429 struct ptlrpc_page_pool *page_pool)
432 int op_idx, np_idx, og_idx, ng_idx;
433 int cur_nptr_page, end_nptr_page;
436 LASSERT(page_pool->ppp_total_pages+npages <= page_pool->ppp_max_pages);
437 LASSERT(npages_to_nptr_pages(npages) == nptr_pages);
438 LASSERT(page_pool->ppp_growing);
440 spin_lock(&page_pool->ppp_lock);
443 * (1) fill all the free slots in current pool ptr_pages
446 * free slots are those left by rent pages, and the extra ones with
447 * index >= total_pages, locate at the tail of last pool.
449 freeslot = page_pool->ppp_total_pages % PTRS_PER_PAGE;
451 freeslot = PTRS_PER_PAGE - freeslot;
452 freeslot += page_pool->ppp_total_pages - page_pool->ppp_free_pages;
454 op_idx = page_pool->ppp_free_pages / PTRS_PER_PAGE;
455 og_idx = page_pool->ppp_free_pages % PTRS_PER_PAGE;
456 np_idx = nptr_pages - 1;
457 ng_idx = (npages - 1) % PTRS_PER_PAGE;
460 LASSERT(page_pool->ppp_ptr_pages[op_idx][og_idx] == NULL);
461 LASSERT(ptr_pages[np_idx][ng_idx] != NULL);
463 page_pool->ppp_ptr_pages[op_idx][og_idx] =
464 ptr_pages[np_idx][ng_idx];
465 ptr_pages[np_idx][ng_idx] = NULL;
469 if (++og_idx == PTRS_PER_PAGE) {
477 ng_idx = PTRS_PER_PAGE - 1;
482 * (2) add ptr pages if needed.
484 cur_nptr_page = (page_pool->ppp_total_pages + PTRS_PER_PAGE - 1) /
486 end_nptr_page = (page_pool->ppp_total_pages + npages +
487 PTRS_PER_PAGE - 1) / PTRS_PER_PAGE;
488 LASSERT(end_nptr_page <= page_pool->ppp_max_ptr_pages);
491 while (cur_nptr_page < end_nptr_page) {
492 LASSERT(page_pool->ppp_ptr_pages[cur_nptr_page] == NULL);
493 LASSERT(np_idx < nptr_pages);
494 LASSERT(ptr_pages[np_idx] != NULL);
496 page_pool->ppp_ptr_pages[cur_nptr_page++] = ptr_pages[np_idx];
497 ptr_pages[np_idx++] = NULL;
501 * (3) free useless source ptr pages
503 while (np_idx < nptr_pages) {
504 LASSERT(ptr_pages[np_idx] != NULL);
505 CDEBUG(D_SEC, "Free useless ptr pages: %i, %p\n", np_idx,
507 OBD_FREE(ptr_pages[np_idx], PAGE_SIZE);
508 ptr_pages[np_idx++] = NULL;
511 page_pool->ppp_total_pages += npages;
512 page_pool->ppp_free_pages += npages;
513 page_pool->ppp_st_lowfree = page_pool->ppp_free_pages;
515 if (page_pool->ppp_total_pages > page_pool->ppp_st_max_pages)
516 page_pool->ppp_st_max_pages = page_pool->ppp_total_pages;
518 CDEBUG(D_SEC, "add %d pages to total %lu\n", npages,
519 page_pool->ppp_total_pages);
521 spin_unlock(&page_pool->ppp_lock);
524 #define POOL_INIT_SIZE (PTLRPC_MAX_BRW_SIZE / 4)
525 static int pool_add_pages(int npages, struct ptlrpc_page_pool *page_pool)
528 int nptr_pages, alloced = 0;
529 int i, j, rc = -ENOMEM;
530 unsigned int pool_order = page_pool->ppp_order;
532 if (npages < POOL_INIT_SIZE / element_size(page_pool))
533 npages = POOL_INIT_SIZE / element_size(page_pool);
535 mutex_lock(&page_pool->add_pages_mutex);
537 if (npages + page_pool->ppp_total_pages > page_pool->ppp_max_pages)
538 npages = page_pool->ppp_max_pages - page_pool->ppp_total_pages;
541 page_pool->ppp_st_grows++;
543 nptr_pages = npages_to_nptr_pages(npages);
544 OBD_ALLOC_PTR_ARRAY(ptr_pages, nptr_pages);
545 if (ptr_pages == NULL)
548 for (i = 0; i < nptr_pages; i++) {
549 OBD_ALLOC(ptr_pages[i], PAGE_SIZE);
550 if (ptr_pages[i] == NULL)
553 for (j = 0; j < PTRS_PER_PAGE && alloced < npages; j++) {
555 ptr_pages[i][j] = alloc_page(GFP_NOFS |
558 OBD_ALLOC_LARGE(ptr_pages[i][j],
559 element_size(page_pool));
561 if (ptr_pages[i][j] == NULL)
567 LASSERT(alloced == npages);
569 pool_insert_ptrs(ptr_pages, nptr_pages, npages, page_pool);
570 CDEBUG(D_SEC, "added %d pages into pool\n", npages);
571 OBD_FREE_PTR_ARRAY(ptr_pages, nptr_pages);
576 pool_cleanup(ptr_pages, nptr_pages, page_pool);
580 page_pool->ppp_st_grow_fails++;
581 CERROR("Failed to allocate %d pages\n", npages);
584 mutex_unlock(&page_pool->add_pages_mutex);
588 static inline void pool_wakeup(struct ptlrpc_page_pool *pool)
590 assert_spin_locked(&pool->ppp_lock);
592 /* waitqueue_active */
593 if (unlikely(waitqueue_active(&pool->ppp_waitq)))
594 wake_up_all(&pool->ppp_waitq);
597 static int pool_should_grow(int needed, struct ptlrpc_page_pool *pool)
600 * don't grow if someone else is growing the pool right now,
601 * or the pool has reached its full capacity
603 if (pool->ppp_growing || pool->ppp_total_pages == pool->ppp_max_pages)
606 /* if total pages is not enough, we need to grow */
607 if (pool->ppp_total_pages < needed)
610 * we wanted to return 0 here if there was a shrink just
611 * happened a moment ago, but this may cause deadlock if both
612 * client and ost live on single node.
616 * here we perhaps need consider other factors like wait queue
617 * length, idle index, etc. ?
620 /* grow the pool in any other cases */
625 * Export the number of free pages in the pool of 'order'
627 int sptlrpc_pool_get_free_pages(unsigned int order)
629 return page_pools[order]->ppp_free_pages;
631 EXPORT_SYMBOL(sptlrpc_pool_get_free_pages);
634 * Let outside world know if pool full capacity is reached
636 int pool_is_at_full_capacity(int order)
638 return (page_pools[order]->ppp_total_pages ==
639 page_pools[order]->ppp_max_pages);
641 EXPORT_SYMBOL(pool_is_at_full_capacity);
643 static inline void **page_from_bulkdesc(void *array, int index)
645 struct ptlrpc_bulk_desc *desc = (struct ptlrpc_bulk_desc *)array;
647 return (void **)&desc->bd_enc_vec[index].bv_page;
650 static inline void **page_from_pagearray(void *array, int index)
652 struct page **pa = (struct page **)array;
654 return (void **)&pa[index];
657 static inline void **page_from_bufarray(void *array, int index)
659 return (void **)array;
662 static bool __grow_pool_try(int needed, struct ptlrpc_page_pool *pool);
665 * we allocate the requested pages atomically.
667 static inline int __sptlrpc_pool_get_pages(void *array, unsigned int count,
669 void **(*page_from)(void *, int))
671 struct ptlrpc_page_pool *page_pool = page_pools[order];
672 wait_queue_entry_t waitlink;
673 unsigned long this_idle = -1;
678 if (!array || count <= 0 || count > page_pool->ppp_max_pages)
681 spin_lock(&page_pool->ppp_lock);
683 page_pool->ppp_st_access++;
685 if (unlikely(page_pool->ppp_free_pages < count)) {
687 tick_ns = ktime_get_ns();
689 page_pool->ppp_st_missings++;
690 page_pool->ppp_pages_short += count;
692 /* if we aren't able to add pages, check if someone else is
693 * growing the pool and sleep if so, otherwise we return
694 * ENOMEM because we can't sleep here waiting for other ops to
695 * complete (main user is ptlrpcd, which must not sleep waiting
696 * for other ops... technically sleeping for pool growth is
697 * also questionable but it's very unlikely in practice to get
700 * if ENOMEM is returned here, the RPC will go back in the queue
702 if (!__grow_pool_try(count, page_pool)) {
703 if (page_pool->ppp_growing) {
704 if (++page_pool->ppp_waitqlen >
705 page_pool->ppp_st_max_wqlen)
706 page_pool->ppp_st_max_wqlen =
707 page_pool->ppp_waitqlen;
709 set_current_state(TASK_UNINTERRUPTIBLE);
710 init_wait(&waitlink);
711 add_wait_queue(&page_pool->ppp_waitq,
714 spin_unlock(&page_pool->ppp_lock);
716 remove_wait_queue(&page_pool->ppp_waitq,
718 spin_lock(&page_pool->ppp_lock);
719 page_pool->ppp_waitqlen--;
722 * ptlrpcd thread should not sleep in that
723 * case or deadlock may occur!
724 * Instead, return -ENOMEM so that upper layers
725 * will put request back in queue.
727 page_pool->ppp_st_outofmem++;
728 GOTO(out_unlock, rc = -ENOMEM);
732 if (page_pool->ppp_pages_short < count)
733 GOTO(out_unlock, rc = -EPROTO);
734 page_pool->ppp_pages_short -= count;
740 /* record max wait time */
741 if (unlikely(tick_ns)) {
742 ktime_t tick = ktime_sub_ns(ktime_get(), tick_ns);
744 if (ktime_after(tick, page_pool->ppp_st_max_wait))
745 page_pool->ppp_st_max_wait = tick;
748 /* proceed with rest of allocation */
749 page_pool->ppp_free_pages -= count;
751 p_idx = page_pool->ppp_free_pages / PTRS_PER_PAGE;
752 g_idx = page_pool->ppp_free_pages % PTRS_PER_PAGE;
754 for (i = 0; i < count; i++) {
755 void **pagep = page_from(array, i);
757 if (page_pool->ppp_ptr_pages[p_idx][g_idx] == NULL)
758 GOTO(out_unlock, rc = -EPROTO);
759 *pagep = page_pool->ppp_ptr_pages[p_idx][g_idx];
760 page_pool->ppp_ptr_pages[p_idx][g_idx] = NULL;
762 if (++g_idx == PTRS_PER_PAGE) {
768 if (page_pool->ppp_free_pages < page_pool->ppp_st_lowfree)
769 page_pool->ppp_st_lowfree =
770 page_pool->ppp_free_pages;
773 * new idle index = (old * weight + new) / (weight + 1)
775 if (this_idle == -1) {
776 this_idle = page_pool->ppp_free_pages * IDLE_IDX_MAX /
777 page_pool->ppp_total_pages;
779 page_pool->ppp_idle_idx = (page_pool->ppp_idle_idx *
780 IDLE_IDX_WEIGHT + this_idle) /
781 (IDLE_IDX_WEIGHT + 1);
783 page_pool->ppp_last_access = ktime_get_seconds();
786 spin_unlock(&page_pool->ppp_lock);
790 int sptlrpc_pool_get_desc_pages(struct ptlrpc_bulk_desc *desc)
794 LASSERT(desc->bd_iov_count > 0);
795 LASSERT(desc->bd_iov_count <= page_pools[0]->ppp_max_pages);
797 /* resent bulk, enc iov might have been allocated previously */
798 if (desc->bd_enc_vec != NULL)
801 OBD_ALLOC_LARGE(desc->bd_enc_vec,
802 desc->bd_iov_count * sizeof(*desc->bd_enc_vec));
803 if (desc->bd_enc_vec == NULL)
806 rc = __sptlrpc_pool_get_pages((void *)desc, desc->bd_iov_count, 0,
809 OBD_FREE_LARGE(desc->bd_enc_vec,
811 sizeof(*desc->bd_enc_vec));
812 desc->bd_enc_vec = NULL;
816 EXPORT_SYMBOL(sptlrpc_pool_get_desc_pages);
818 int sptlrpc_pool_get_pages_array(struct page **pa, unsigned int count)
820 return __sptlrpc_pool_get_pages((void *)pa, count, 0,
821 page_from_pagearray);
823 EXPORT_SYMBOL(sptlrpc_pool_get_pages_array);
825 int sptlrpc_pool_get_pages(void **pages, unsigned int order)
827 return __sptlrpc_pool_get_pages((void *)pages, 1, order,
830 EXPORT_SYMBOL(sptlrpc_pool_get_pages);
832 static int __sptlrpc_pool_put_pages(void *array, unsigned int count,
834 void **(*page_from)(void *, int))
836 struct ptlrpc_page_pool *page_pool;
840 LASSERTF(order < POOLS_COUNT, "count %u, pool %u\n",
843 CERROR("Faled to put %u pages, from pool %u\n",
848 page_pool = page_pools[order];
849 LASSERTF(page_pool != NULL, "count %u, pool %u\n", count, order);
851 spin_lock(&page_pool->ppp_lock);
853 p_idx = page_pool->ppp_free_pages / PTRS_PER_PAGE;
854 g_idx = page_pool->ppp_free_pages % PTRS_PER_PAGE;
856 if (page_pool->ppp_free_pages + count > page_pool->ppp_total_pages)
857 GOTO(out_unlock, rc = -EPROTO);
858 if (!page_pool->ppp_ptr_pages[p_idx])
859 GOTO(out_unlock, rc = -EPROTO);
861 for (i = 0; i < count; i++) {
862 void **pagep = page_from(array, i);
865 page_pool->ppp_ptr_pages[p_idx][g_idx] != NULL)
866 GOTO(out_unlock, rc = -EPROTO);
868 page_pool->ppp_ptr_pages[p_idx][g_idx] = *pagep;
869 if (++g_idx == PTRS_PER_PAGE) {
875 page_pool->ppp_free_pages += count;
876 pool_wakeup(page_pool);
879 spin_unlock(&page_pool->ppp_lock);
883 void sptlrpc_pool_put_desc_pages(struct ptlrpc_bulk_desc *desc)
887 if (desc->bd_enc_vec == NULL)
890 rc = __sptlrpc_pool_put_pages((void *)desc, desc->bd_iov_count, 0,
893 CDEBUG(D_SEC, "error putting pages in pool: %d\n", rc);
895 OBD_FREE_LARGE(desc->bd_enc_vec,
896 desc->bd_iov_count * sizeof(*desc->bd_enc_vec));
897 desc->bd_enc_vec = NULL;
900 void sptlrpc_pool_put_pages_array(struct page **pa, unsigned int count)
904 rc = __sptlrpc_pool_put_pages((void *)pa, count, 0,
905 page_from_pagearray);
908 CDEBUG(D_SEC, "error putting pages in pool: %d\n", rc);
910 EXPORT_SYMBOL(sptlrpc_pool_put_pages_array);
912 void sptlrpc_pool_put_pages(void *buf, unsigned int order)
916 rc = __sptlrpc_pool_put_pages(buf, 1, order, page_from_bufarray);
918 CDEBUG(D_SEC, "error putting pages in pool: %d\n", rc);
920 EXPORT_SYMBOL(sptlrpc_pool_put_pages);
922 /* called with pool->ppp_lock held */
923 static bool __grow_pool_try(int needed, struct ptlrpc_page_pool *pool)
925 bool pool_grown = false;
927 assert_spin_locked(&pool->ppp_lock);
929 if (pool_should_grow(needed, pool)) {
933 pool->ppp_growing = 1;
934 /* the pool of single pages is grown a large amount on
937 if (pool->ppp_order == 0 &&
938 pool->ppp_total_pages == 0)
939 to_add = PTLRPC_MAX_BRW_PAGES * 2;
940 else /* otherwise, we add requested or at least 8 items */
941 to_add = max(needed, 8);
942 spin_unlock(&pool->ppp_lock);
945 "pool %d is %lu elements (size %d bytes), growing by %d items\n",
946 pool->ppp_order, pool->ppp_pages_short,
947 element_size(pool), to_add);
948 /* we can't hold a spinlock over page allocation */
949 rc = pool_add_pages(to_add, pool);
953 spin_lock(&pool->ppp_lock);
954 pool->ppp_growing = 0;
961 static bool grow_pool_try(int needed, struct ptlrpc_page_pool *pool)
965 spin_lock(&pool->ppp_lock);
966 rc = __grow_pool_try(needed, pool);
967 spin_unlock(&pool->ppp_lock);
973 * we don't do much stuff for add_user/del_user anymore, except adding some
974 * initial pages in add_user() if current pool is empty, rest would be
975 * handled by the pool self-adaption.
977 void sptlrpc_pool_add_user(void)
979 struct ptlrpc_page_pool *pool = page_pools[0];
981 /* since this is startup, no one is waiting for these pages, so we
982 * don't worry about sucess or failure here
984 grow_pool_try(1, pool);
986 EXPORT_SYMBOL(sptlrpc_pool_add_user);
988 static inline void pool_ptrs_alloc(struct ptlrpc_page_pool *pool)
990 LASSERT(pool->ppp_max_ptr_pages);
991 OBD_ALLOC_LARGE(pool->ppp_ptr_pages,
992 pool->ppp_max_ptr_pages *
993 sizeof(*pool->ppp_ptr_pages));
996 static inline void pool_ptrs_free(struct ptlrpc_page_pool *pool)
998 LASSERT(pool->ppp_max_ptr_pages);
999 LASSERT(pool->ppp_ptr_pages);
1001 OBD_FREE_LARGE(pool->ppp_ptr_pages,
1002 pool->ppp_max_ptr_pages * sizeof(*pool->ppp_ptr_pages));
1005 int sptlrpc_pool_init(void)
1007 struct ptlrpc_page_pool *pool;
1008 int pool_max_pages = cfs_totalram_pages() / POOLS_COUNT;
1015 if (pool_max_memory_mb == 0 && enc_pool_max_memory_mb > 0)
1016 pool_max_memory_mb = enc_pool_max_memory_mb;
1017 if (pool_max_memory_mb > 0 &&
1018 pool_max_memory_mb <= PAGES_TO_MiB(cfs_totalram_pages()))
1019 pool_max_pages = MiB_TO_PAGES(pool_max_memory_mb);
1021 OBD_ALLOC(page_pools, POOLS_COUNT * sizeof(*page_pools));
1022 if (page_pools == NULL)
1024 for (pool_order = 0; pool_order < POOLS_COUNT; pool_order++) {
1025 OBD_ALLOC(page_pools[pool_order], sizeof(**page_pools));
1026 if (page_pools[pool_order] == NULL)
1027 GOTO(fail, rc = -ENOMEM);
1029 pool = page_pools[pool_order];
1030 pool->ppp_max_pages = pool_max_pages;
1032 pool->ppp_max_ptr_pages =
1033 npages_to_nptr_pages(pool->ppp_max_pages);
1035 init_waitqueue_head(&pool->ppp_waitq);
1036 pool->ppp_last_shrink = ktime_get_seconds();
1037 pool->ppp_last_access = ktime_get_seconds();
1039 spin_lock_init(&pool->ppp_lock);
1040 pool->ppp_st_max_wait = ktime_set(0, 0);
1042 pool_ptrs_alloc(pool);
1043 pool->ppp_order = pool_order;
1044 CDEBUG(D_SEC, "Allocated pool %i\n", pool_order);
1045 if (pool->ppp_ptr_pages == NULL)
1046 GOTO(fail, rc = -ENOMEM);
1047 /* Pass pool number as part of pool_shrinker_seeks value */
1048 #ifdef HAVE_SHRINKER_COUNT
1049 pool->ppp_shops.count_objects = pool_shrink_count;
1050 pool->ppp_shops.scan_objects = pool_shrink_scan;
1052 pool->ppp_shops.shrink = pool_shrink;
1054 pool->ppp_shops.seeks = ORDER_TO_SEEKS(pool_order);
1056 pool->pool_shrinker = ll_shrinker_create(&pool->ppp_shops, 0,
1058 if (IS_ERR(pool->pool_shrinker))
1059 GOTO(fail, rc = PTR_ERR(pool->pool_shrinker));
1061 mutex_init(&pool->add_pages_mutex);
1066 to_revert = pool_order;
1067 for (pool_order = 0; pool_order <= to_revert; pool_order++) {
1068 pool = page_pools[pool_order];
1070 if (pool->ppp_ptr_pages)
1071 pool_ptrs_free(pool);
1072 OBD_FREE(pool, sizeof(**page_pools));
1075 OBD_FREE(page_pools, POOLS_COUNT * sizeof(*page_pools));
1080 void sptlrpc_pool_fini(void)
1082 unsigned long cleaned, nptr_pages;
1084 struct ptlrpc_page_pool *pool;
1086 for (pool_order = 0; pool_order < POOLS_COUNT; pool_order++) {
1087 pool = page_pools[pool_order];
1088 shrinker_free(pool->pool_shrinker);
1089 LASSERT(pool->ppp_ptr_pages);
1090 LASSERT(pool->ppp_total_pages == pool->ppp_free_pages);
1092 nptr_pages = npages_to_nptr_pages(pool->ppp_total_pages);
1093 cleaned = pool_cleanup(pool->ppp_ptr_pages, nptr_pages, pool);
1094 LASSERT(cleaned == pool->ppp_total_pages);
1096 pool_ptrs_free(pool);
1098 if (pool->ppp_st_access > 0) {
1100 "max pages %lu, grows %u, grow fails %u, shrinks %u, access %lu, missing %lu, max qlen %u, max wait ms %lld, out of mem %lu\n",
1101 pool->ppp_st_max_pages,
1103 pool->ppp_st_grow_fails,
1104 pool->ppp_st_shrinks,
1105 pool->ppp_st_access,
1106 pool->ppp_st_missings,
1107 pool->ppp_st_max_wqlen,
1108 ktime_to_ms(pool->ppp_st_max_wait),
1109 pool->ppp_st_outofmem);
1112 OBD_FREE(pool, sizeof(**page_pools));
1115 OBD_FREE(page_pools, POOLS_COUNT * sizeof(*page_pools));
1118 static int cfs_hash_alg_id[] = {
1119 [BULK_HASH_ALG_NULL] = CFS_HASH_ALG_NULL,
1120 [BULK_HASH_ALG_ADLER32] = CFS_HASH_ALG_ADLER32,
1121 [BULK_HASH_ALG_CRC32] = CFS_HASH_ALG_CRC32,
1122 [BULK_HASH_ALG_MD5] = CFS_HASH_ALG_MD5,
1123 [BULK_HASH_ALG_SHA1] = CFS_HASH_ALG_SHA1,
1124 [BULK_HASH_ALG_SHA256] = CFS_HASH_ALG_SHA256,
1125 [BULK_HASH_ALG_SHA384] = CFS_HASH_ALG_SHA384,
1126 [BULK_HASH_ALG_SHA512] = CFS_HASH_ALG_SHA512,
1128 const char *sptlrpc_get_hash_name(__u8 hash_alg)
1130 return cfs_crypto_hash_name(cfs_hash_alg_id[hash_alg]);
1133 __u8 sptlrpc_get_hash_alg(const char *algname)
1135 return cfs_crypto_hash_alg(algname);
1138 int bulk_sec_desc_unpack(struct lustre_msg *msg, int offset, int swabbed)
1140 struct ptlrpc_bulk_sec_desc *bsd;
1141 int size = msg->lm_buflens[offset];
1143 bsd = lustre_msg_buf(msg, offset, sizeof(*bsd));
1145 CERROR("Invalid bulk sec desc: size %d\n", size);
1150 __swab32s(&bsd->bsd_nob);
1152 if (unlikely(bsd->bsd_version != 0)) {
1153 CERROR("Unexpected version %u\n", bsd->bsd_version);
1157 if (unlikely(bsd->bsd_type >= SPTLRPC_BULK_MAX)) {
1158 CERROR("Invalid type %u\n", bsd->bsd_type);
1162 /* FIXME more sanity check here */
1164 if (unlikely(bsd->bsd_svc != SPTLRPC_BULK_SVC_NULL &&
1165 bsd->bsd_svc != SPTLRPC_BULK_SVC_INTG &&
1166 bsd->bsd_svc != SPTLRPC_BULK_SVC_PRIV)) {
1167 CERROR("Invalid svc %u\n", bsd->bsd_svc);
1173 EXPORT_SYMBOL(bulk_sec_desc_unpack);
1176 * Compute the checksum of an RPC buffer payload. If the return \a buflen
1177 * is not large enough, truncate the result to fit so that it is possible
1178 * to use a hash function with a large hash space, but only use a part of
1179 * the resulting hash.
1181 int sptlrpc_get_bulk_checksum(struct ptlrpc_bulk_desc *desc, __u8 alg,
1182 void *buf, int buflen)
1184 struct ahash_request *req;
1186 unsigned int bufsize;
1189 LASSERT(alg > BULK_HASH_ALG_NULL && alg < BULK_HASH_ALG_MAX);
1190 LASSERT(buflen >= 4);
1192 req = cfs_crypto_hash_init(cfs_hash_alg_id[alg], NULL, 0);
1194 CERROR("Unable to initialize checksum hash %s\n",
1195 cfs_crypto_hash_name(cfs_hash_alg_id[alg]));
1196 return PTR_ERR(req);
1199 hashsize = cfs_crypto_hash_digestsize(cfs_hash_alg_id[alg]);
1201 for (i = 0; i < desc->bd_iov_count; i++) {
1202 cfs_crypto_hash_update_page(req,
1203 desc->bd_vec[i].bv_page,
1204 desc->bd_vec[i].bv_offset &
1206 desc->bd_vec[i].bv_len);
1209 if (hashsize > buflen) {
1210 unsigned char hashbuf[CFS_CRYPTO_HASH_DIGESTSIZE_MAX];
1212 bufsize = sizeof(hashbuf);
1213 LASSERTF(bufsize >= hashsize, "bufsize = %u < hashsize %u\n",
1215 err = cfs_crypto_hash_final(req, hashbuf, &bufsize);
1216 memcpy(buf, hashbuf, buflen);
1219 err = cfs_crypto_hash_final(req, buf, &bufsize);