Whamcloud - gitweb
LU-16724 ptlrc: ptlrpc: extend sec bulk functionality
[fs/lustre-release.git] / lustre / ptlrpc / sec_bulk.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  *
31  * lustre/ptlrpc/sec_bulk.c
32  *
33  * Author: Eric Mei <ericm@clusterfs.com>
34  */
35
36 #define DEBUG_SUBSYSTEM S_SEC
37
38 #include <libcfs/linux/linux-mem.h>
39
40 #include <obd.h>
41 #include <obd_cksum.h>
42 #include <obd_class.h>
43 #include <obd_support.h>
44 #include <lustre_net.h>
45 #include <lustre_import.h>
46 #include <lustre_dlm.h>
47 #include <lustre_sec.h>
48
49 #include "ptlrpc_internal.h"
50
51 #define PPOOL_MIN_CHUNK_BITS 16 /* 2^16 bytes = 64KiB */
52 #define PPOOL_MAX_CHUNK_BITS PTLRPC_MAX_BRW_BITS
53 #define POOLS_COUNT (PPOOL_MAX_CHUNK_BITS - PPOOL_MIN_CHUNK_BITS + 1)
54 #define PPOOL_SIZE_TO_INDEX(bits) ((bits) - PPOOL_MIN_CHUNK_BITS + 1)
55 #define POOL_BITS(pool) ((pool) + PPOOL_MIN_CHUNK_BITS - 1)
56 #define ELEMENT_SIZE(pool) (1 << (PPOOL_MIN_CHUNK_BITS + (pool) - 1))
57 #define mult (20 - PAGE_SHIFT)
58 static int enc_pool_max_memory_mb;
59 module_param(enc_pool_max_memory_mb, int, 0644);
60 MODULE_PARM_DESC(enc_pool_max_memory_mb,
61                  "Encoding pool max memory (MB), 1/8 of total physical memory by default");
62
63 /*
64  * bulk encryption page pools
65  */
66
67 #define PTRS_PER_PAGE   (PAGE_SIZE / sizeof(void *))
68 #define PAGES_PER_POOL  (PTRS_PER_PAGE)
69
70 #define IDLE_IDX_MAX            (100)
71 #define IDLE_IDX_WEIGHT         (3)
72
73 #define CACHE_QUIESCENT_PERIOD  (20)
74
75 static struct ptlrpc_enc_page_pool {
76         unsigned long epp_max_pages;   /* maximum pages can hold, const */
77         unsigned int epp_max_pools;   /* number of pools, const */
78
79         /*
80          * wait queue in case of not enough free pages.
81          */
82         wait_queue_head_t epp_waitq;   /* waiting threads */
83         unsigned int epp_waitqlen;    /* wait queue length */
84         unsigned long epp_pages_short; /* # of pages wanted of in-q users */
85         unsigned int epp_growing:1;   /* during adding pages */
86
87         /*
88          * indicating how idle the pools are, from 0 to MAX_IDLE_IDX
89          * this is counted based on each time when getting pages from
90          * the pools, not based on time. which means in case that system
91          * is idled for a while but the idle_idx might still be low if no
92          * activities happened in the pools.
93          */
94         unsigned long epp_idle_idx;
95
96         /* last shrink time due to mem tight */
97         time64_t epp_last_shrink;
98         time64_t epp_last_access;
99
100         /* in-pool pages bookkeeping */
101         spinlock_t epp_lock; /* protect following fields */
102         unsigned long epp_total_pages; /* total pages in pools */
103         unsigned long epp_free_pages;  /* current pages available */
104
105         /* statistics */
106         unsigned long epp_st_max_pages;      /* # of pages ever reached */
107         unsigned int epp_st_grows;          /* # of grows */
108         unsigned int epp_st_grow_fails;     /* # of add pages failures */
109         unsigned int epp_st_shrinks;        /* # of shrinks */
110         unsigned long epp_st_access;         /* # of access */
111         unsigned long epp_st_missings;       /* # of cache missing */
112         unsigned long epp_st_lowfree;        /* lowest free pages reached */
113         unsigned int epp_st_max_wqlen;      /* highest waitqueue length */
114         ktime_t epp_st_max_wait; /* in nanoseconds */
115         unsigned long epp_st_outofmem; /* # of out of mem requests */
116         /*
117          * pointers to pools, may be vmalloc'd
118          */
119         void ***epp_pools;
120         /*
121          * memory shrinker
122          */
123         struct shrinker pool_shrinker;
124         struct mutex add_pages_mutex;
125 } **page_pools;
126
127 /*
128  * /sys/kernel/debug/lustre/sptlrpc/encrypt_page_pools
129  */
130 int encrypt_page_pools_seq_show(struct seq_file *m, void *v)
131 {
132         spin_lock(&page_pools[PAGES_POOL]->epp_lock);
133         seq_printf(m, "physical pages:          %lu\n"
134                 "pages per pool:          %lu\n"
135                 "max pages:               %lu\n"
136                 "max pools:               %u\n"
137                 "total pages:             %lu\n"
138                 "total free:              %lu\n"
139                 "idle index:              %lu/100\n"
140                 "last shrink:             %llds\n"
141                 "last access:             %llds\n"
142                 "max pages reached:       %lu\n"
143                 "grows:                   %u\n"
144                 "grows failure:           %u\n"
145                 "shrinks:                 %u\n"
146                 "cache access:            %lu\n"
147                 "cache missing:           %lu\n"
148                 "low free mark:           %lu\n"
149                 "max waitqueue depth:     %u\n"
150                 "max wait time ms:        %lld\n"
151                 "out of mem:              %lu\n",
152                 cfs_totalram_pages(), PAGES_PER_POOL,
153                 page_pools[PAGES_POOL]->epp_max_pages,
154                 page_pools[PAGES_POOL]->epp_max_pools,
155                 page_pools[PAGES_POOL]->epp_total_pages,
156                 page_pools[PAGES_POOL]->epp_free_pages,
157                 page_pools[PAGES_POOL]->epp_idle_idx,
158                 ktime_get_seconds() - page_pools[PAGES_POOL]->epp_last_shrink,
159                 ktime_get_seconds() - page_pools[PAGES_POOL]->epp_last_access,
160                 page_pools[PAGES_POOL]->epp_st_max_pages,
161                 page_pools[PAGES_POOL]->epp_st_grows,
162                 page_pools[PAGES_POOL]->epp_st_grow_fails,
163                 page_pools[PAGES_POOL]->epp_st_shrinks,
164                 page_pools[PAGES_POOL]->epp_st_access,
165                 page_pools[PAGES_POOL]->epp_st_missings,
166                 page_pools[PAGES_POOL]->epp_st_lowfree,
167                 page_pools[PAGES_POOL]->epp_st_max_wqlen,
168                 ktime_to_ms(page_pools[PAGES_POOL]->epp_st_max_wait),
169                 page_pools[PAGES_POOL]->epp_st_outofmem);
170         spin_unlock(&page_pools[PAGES_POOL]->epp_lock);
171
172         return 0;
173 }
174
175 /*
176  * /sys/kernel/debug/lustre/sptlrpc/page_pools
177  */
178 int page_pools_seq_show(struct seq_file *m, void *v)
179 {
180         int pool_index;
181         struct ptlrpc_enc_page_pool *pool;
182
183         seq_printf(m, "physical_pages: %lu\n"
184                       "pages per pool: %lu\n\n"
185                       "pools:\n",
186                       cfs_totalram_pages(), PAGES_PER_POOL);
187
188         for (pool_index = 0; pool_index < POOLS_COUNT; pool_index++) {
189                 pool = page_pools[pool_index];
190                 if (!pool->epp_st_access)
191                         continue;
192                 spin_lock(&pool->epp_lock);
193                 seq_printf(m, "  pool_%luk:\n"
194                            "    max_pages: %lu\n"
195                            "    max_pools: %u\n"
196                            "    total_pages: %lu\n"
197                            "    total_free: %lu\n"
198                            "    idle_index: %lu/100\n"
199                            "    last_shrink: %llds\n"
200                            "    last_access: %llds\n"
201                            "    max_pages_reached: %lu\n"
202                            "    grows: %u\n"
203                            "    grows_failure: %u\n"
204                            "    shrinks: %u\n"
205                            "    cache_access: %lu\n"
206                            "    cache_missing: %lu\n"
207                            "    low_free_mark: %lu\n"
208                            "    max_waitqueue_depth: %u\n"
209                            "    max_wait_time_ms: %lld\n"
210                            "    out_of_mem: %lu\n",
211                            (pool_index ? ELEMENT_SIZE(pool_index - 10) :
212                            PAGE_SIZE >> 10),
213                            pool->epp_max_pages,
214                            pool->epp_max_pools,
215                            pool->epp_total_pages,
216                            pool->epp_free_pages,
217                            pool->epp_idle_idx,
218                            ktime_get_seconds() - pool->epp_last_shrink,
219                            ktime_get_seconds() - pool->epp_last_access,
220                            pool->epp_st_max_pages,
221                            pool->epp_st_grows,
222                            pool->epp_st_grow_fails,
223                            pool->epp_st_shrinks,
224                            pool->epp_st_access,
225                            pool->epp_st_missings,
226                            pool->epp_st_lowfree,
227                            pool->epp_st_max_wqlen,
228                            ktime_to_ms(pool->epp_st_max_wait),
229                            pool->epp_st_outofmem);
230
231                 spin_unlock(&pool->epp_lock);
232         }
233         return 0;
234 }
235
236 static void enc_pools_release_free_pages(long npages, unsigned int pool_idx)
237 {
238         int p_idx, g_idx;
239         int p_idx_max1, p_idx_max2;
240         struct ptlrpc_enc_page_pool *pool = page_pools[pool_idx];
241
242         LASSERT(npages > 0);
243         LASSERT(npages <= pool->epp_free_pages);
244         LASSERT(pool->epp_free_pages <= pool->epp_total_pages);
245
246         /* max pool index before the release */
247         p_idx_max2 = (pool->epp_total_pages - 1) / PAGES_PER_POOL;
248
249         pool->epp_free_pages -= npages;
250         pool->epp_total_pages -= npages;
251
252         /* max pool index after the release */
253         p_idx_max1 = pool->epp_total_pages == 0 ? -1 :
254                 ((pool->epp_total_pages - 1) / PAGES_PER_POOL);
255
256         p_idx = pool->epp_free_pages / PAGES_PER_POOL;
257         g_idx = pool->epp_free_pages % PAGES_PER_POOL;
258         LASSERT(pool->epp_pools[p_idx]);
259
260         while (npages--) {
261                 LASSERT(pool->epp_pools[p_idx]);
262                 LASSERT(pool->epp_pools[p_idx][g_idx] != NULL);
263
264                 if (pool_idx == 0)
265                         __free_page(pool->epp_pools[p_idx][g_idx]);
266                 else
267                         OBD_FREE_LARGE(pool->epp_pools[p_idx][g_idx],
268                                        ELEMENT_SIZE(pool_idx));
269                 pool->epp_pools[p_idx][g_idx] = NULL;
270
271                 if (++g_idx == PAGES_PER_POOL) {
272                         p_idx++;
273                         g_idx = 0;
274                 }
275         }
276
277         /* free unused pools */
278         while (p_idx_max1 < p_idx_max2) {
279                 LASSERT(pool->epp_pools[p_idx_max2]);
280                 OBD_FREE(pool->epp_pools[p_idx_max2], PAGE_SIZE);
281                 pool->epp_pools[p_idx_max2] = NULL;
282                 p_idx_max2--;
283         }
284 }
285
286 #define SEEKS_TO_INDEX(s) (((s)->seeks >> 8) & 0xff)
287 #define INDEX_TO_SEEKS(i) (DEFAULT_SEEKS | (i << 8))
288 /*
289  * we try to keep at least PTLRPC_MAX_BRW_PAGES pages in the pool.
290  */
291 static unsigned long enc_pools_shrink_count(struct shrinker *s,
292                                             struct shrink_control *sc)
293 {
294         unsigned int pool_index = SEEKS_TO_INDEX(s);
295         struct ptlrpc_enc_page_pool *pool = page_pools[pool_index];
296         /*
297          * if no pool access for a long time, we consider it's fully
298          * idle. A little race here is fine.
299          */
300         if (unlikely(ktime_get_seconds() - pool->epp_last_access >
301                      CACHE_QUIESCENT_PERIOD)) {
302                 spin_lock(&pool->epp_lock);
303                 pool->epp_idle_idx = IDLE_IDX_MAX;
304                 spin_unlock(&pool->epp_lock);
305         }
306
307         LASSERT(pool->epp_idle_idx <= IDLE_IDX_MAX);
308
309         return (pool->epp_free_pages <= PTLRPC_MAX_BRW_PAGES) ? 0 :
310                 (pool->epp_free_pages - PTLRPC_MAX_BRW_PAGES) *
311                 (IDLE_IDX_MAX - pool->epp_idle_idx) / IDLE_IDX_MAX;
312 }
313
314 /*
315  * we try to keep at least PTLRPC_MAX_BRW_PAGES pages in the pool.
316  */
317 static unsigned long enc_pools_shrink_scan(struct shrinker *s,
318                                            struct shrink_control *sc)
319 {
320         /* Get pool number passed as part of pools_shrinker_seeks value */
321         unsigned int pool_index = SEEKS_TO_INDEX(s);
322         struct ptlrpc_enc_page_pool *pool = page_pools[pool_index];
323
324         spin_lock(&pool->epp_lock);
325         if (pool->epp_free_pages <= PTLRPC_MAX_BRW_PAGES)
326                 sc->nr_to_scan = 0;
327         else
328                 sc->nr_to_scan = min_t(unsigned long, sc->nr_to_scan,
329                               pool->epp_free_pages - PTLRPC_MAX_BRW_PAGES);
330         if (sc->nr_to_scan > 0) {
331                 enc_pools_release_free_pages(sc->nr_to_scan, pool_index);
332                 CDEBUG(D_SEC, "released %ld pages, %ld left\n",
333                        (long)sc->nr_to_scan, pool->epp_free_pages);
334
335                 pool->epp_st_shrinks++;
336                 pool->epp_last_shrink = ktime_get_seconds();
337         }
338         spin_unlock(&pool->epp_lock);
339
340         /*
341          * if no pool access for a long time, we consider it's fully idle.
342          * a little race here is fine.
343          */
344         if (unlikely(ktime_get_seconds() - pool->epp_last_access >
345                      CACHE_QUIESCENT_PERIOD)) {
346                 spin_lock(&pool->epp_lock);
347                 pool->epp_idle_idx = IDLE_IDX_MAX;
348                 spin_unlock(&pool->epp_lock);
349         }
350
351         LASSERT(pool->epp_idle_idx <= IDLE_IDX_MAX);
352
353         return sc->nr_to_scan;
354 }
355
356 #ifndef HAVE_SHRINKER_COUNT
357 /*
358  * could be called frequently for query (@nr_to_scan == 0).
359  * we try to keep at least PTLRPC_MAX_BRW_PAGES pages in the pool.
360  */
361 static int enc_pools_shrink(struct shrinker *shrinker,
362                             struct shrink_control *sc)
363 {
364         enc_pools_shrink_scan(shrinker, sc);
365
366         return enc_pools_shrink_count(shrinker, sc);
367 }
368 #endif /* HAVE_SHRINKER_COUNT */
369
370 static inline
371 int npages_to_npools(unsigned long npages)
372 {
373         return (int) ((npages + PAGES_PER_POOL - 1) / PAGES_PER_POOL);
374 }
375
376 /*
377  * return how many pages cleaned up.
378  */
379 static unsigned long enc_pools_cleanup(void ***pools, int npools, int pool_idx)
380 {
381         unsigned long cleaned = 0;
382         int i, j;
383
384         for (i = 0; i < npools; i++) {
385                 if (pools[i]) {
386                         for (j = 0; j < PAGES_PER_POOL; j++) {
387                                 if (pools[i][j]) {
388                                         if (pool_idx == 0) {
389                                                 __free_page(pools[i][j]);
390                                         } else {
391                                                 OBD_FREE_LARGE(pools[i][j],
392                                                         ELEMENT_SIZE(pool_idx));
393                                         }
394                                         cleaned++;
395                                 }
396                         }
397                         OBD_FREE(pools[i], PAGE_SIZE);
398                         pools[i] = NULL;
399                 }
400         }
401
402         return cleaned;
403 }
404
405 /*
406  * merge @npools pointed by @pools which contains @npages new pages
407  * into current pools.
408  *
409  * we have options to avoid most memory copy with some tricks. but we choose
410  * the simplest way to avoid complexity. It's not frequently called.
411  */
412 static void enc_pools_insert(void ***pools, int npools, int npages,
413                              unsigned int pool_idx)
414 {
415         int freeslot;
416         int op_idx, np_idx, og_idx, ng_idx;
417         int cur_npools, end_npools;
418         struct ptlrpc_enc_page_pool *page_pool = page_pools[pool_idx];
419
420         LASSERT(npages > 0);
421         LASSERT(page_pool->epp_total_pages+npages <= page_pool->epp_max_pages);
422         LASSERT(npages_to_npools(npages) == npools);
423         LASSERT(page_pool->epp_growing);
424
425         spin_lock(&page_pool->epp_lock);
426
427         /*
428          * (1) fill all the free slots of current pools.
429          */
430         /*
431          * free slots are those left by rent pages, and the extra ones with
432          * index >= total_pages, locate at the tail of last pool.
433          */
434         freeslot = page_pool->epp_total_pages % PAGES_PER_POOL;
435         if (freeslot != 0)
436                 freeslot = PAGES_PER_POOL - freeslot;
437         freeslot += page_pool->epp_total_pages - page_pool->epp_free_pages;
438
439         op_idx = page_pool->epp_free_pages / PAGES_PER_POOL;
440         og_idx = page_pool->epp_free_pages % PAGES_PER_POOL;
441         np_idx = npools - 1;
442         ng_idx = (npages - 1) % PAGES_PER_POOL;
443
444         while (freeslot) {
445                 LASSERT(page_pool->epp_pools[op_idx][og_idx] == NULL);
446                 LASSERT(pools[np_idx][ng_idx] != NULL);
447
448                 page_pool->epp_pools[op_idx][og_idx] = pools[np_idx][ng_idx];
449                 pools[np_idx][ng_idx] = NULL;
450
451                 freeslot--;
452
453                 if (++og_idx == PAGES_PER_POOL) {
454                         op_idx++;
455                         og_idx = 0;
456                 }
457                 if (--ng_idx < 0) {
458                         if (np_idx == 0)
459                                 break;
460                         np_idx--;
461                         ng_idx = PAGES_PER_POOL - 1;
462                 }
463         }
464
465         /*
466          * (2) add pools if needed.
467          */
468         cur_npools = (page_pool->epp_total_pages + PAGES_PER_POOL - 1) /
469                       PAGES_PER_POOL;
470         end_npools = (page_pool->epp_total_pages + npages +
471                       PAGES_PER_POOL - 1) / PAGES_PER_POOL;
472         LASSERT(end_npools <= page_pool->epp_max_pools);
473
474         np_idx = 0;
475         while (cur_npools < end_npools) {
476                 LASSERT(page_pool->epp_pools[cur_npools] == NULL);
477                 LASSERT(np_idx < npools);
478                 LASSERT(pools[np_idx] != NULL);
479
480                 page_pool->epp_pools[cur_npools++] = pools[np_idx];
481                 pools[np_idx++] = NULL;
482         }
483
484         /*
485          * (3) free useless source pools
486          */
487         while (np_idx < npools) {
488                 LASSERT(pools[np_idx] != NULL);
489                 CDEBUG(D_SEC, "Free useless pool buffer: %i, %p\n", np_idx,
490                        pools[np_idx]);
491                 OBD_FREE(pools[np_idx], PAGE_SIZE);
492                 pools[np_idx++] = NULL;
493         }
494
495         page_pool->epp_total_pages += npages;
496         page_pool->epp_free_pages += npages;
497         page_pool->epp_st_lowfree = page_pool->epp_free_pages;
498
499         if (page_pool->epp_total_pages > page_pool->epp_st_max_pages)
500                 page_pool->epp_st_max_pages = page_pool->epp_total_pages;
501
502         CDEBUG(D_SEC, "add %d pages to total %lu\n", npages,
503                page_pool->epp_total_pages);
504
505         spin_unlock(&page_pool->epp_lock);
506 }
507
508 #define POOL_INIT_SIZE (PTLRPC_MAX_BRW_SIZE / 4)
509 static int enc_pools_add_pages(int npages, int pool_index)
510 {
511         void ***pools;
512         int npools, alloced = 0;
513         int i, j, rc = -ENOMEM;
514         struct ptlrpc_enc_page_pool *page_pool = page_pools[pool_index];
515
516         if (pool_index == 0) {
517                 if (npages < POOL_INIT_SIZE >> PAGE_SHIFT)
518                         npages = POOL_INIT_SIZE >> PAGE_SHIFT;
519         } else {
520                 if (npages < POOL_INIT_SIZE / ELEMENT_SIZE(pool_index))
521                         npages = POOL_INIT_SIZE / ELEMENT_SIZE(pool_index);
522         }
523
524         mutex_lock(&page_pool->add_pages_mutex);
525
526         if (npages + page_pool->epp_total_pages > page_pool->epp_max_pages)
527                 npages = page_pool->epp_max_pages - page_pool->epp_total_pages;
528         LASSERT(npages > 0);
529
530         page_pool->epp_st_grows++;
531
532         npools = npages_to_npools(npages);
533         OBD_ALLOC_PTR_ARRAY(pools, npools);
534         if (pools == NULL)
535                 goto out;
536
537         for (i = 0; i < npools; i++) {
538                 OBD_ALLOC(pools[i], PAGE_SIZE);
539                 if (pools[i] == NULL)
540                         goto out_pools;
541
542                 for (j = 0; j < PAGES_PER_POOL && alloced < npages; j++) {
543                         if (pool_index == 0)
544                                 pools[i][j] = alloc_page(GFP_NOFS |
545                                         __GFP_HIGHMEM);
546                         else {
547                                 OBD_ALLOC_LARGE(pools[i][j],
548                                         ELEMENT_SIZE(pool_index));
549                         }
550                         if (pools[i][j] == NULL)
551                                 goto out_pools;
552
553                         alloced++;
554                 }
555         }
556         LASSERT(alloced == npages);
557
558         enc_pools_insert(pools, npools, npages, pool_index);
559         CDEBUG(D_SEC, "added %d pages into pools\n", npages);
560         OBD_FREE_PTR_ARRAY(pools, npools);
561         rc = 0;
562
563 out_pools:
564         if (rc) {
565                 enc_pools_cleanup(pools, npools, pool_index);
566         }
567 out:
568         if (rc) {
569                 page_pool->epp_st_grow_fails++;
570                 CERROR("Failed to allocate %d enc pages\n", npages);
571         }
572
573         mutex_unlock(&page_pool->add_pages_mutex);
574         return rc;
575 }
576
577 static inline void enc_pools_wakeup(unsigned int pool)
578 {
579         assert_spin_locked(&page_pools[pool]->epp_lock);
580
581         /* waitqueue_active */
582         if (unlikely(waitqueue_active(&page_pools[pool]->epp_waitq)))
583                 wake_up_all(&page_pools[pool]->epp_waitq);
584 }
585
586 static int enc_pools_should_grow(int page_needed, time64_t now,
587                                  unsigned int pool_index)
588 {
589         /*
590          * don't grow if someone else is growing the pools right now,
591          * or the pools has reached its full capacity
592          */
593         if (page_pools[pool_index]->epp_growing ||
594             page_pools[pool_index]->epp_total_pages ==
595             page_pools[pool_index]->epp_max_pages)
596                 return 0;
597
598         /* if total pages is not enough, we need to grow */
599         if (page_pools[pool_index]->epp_total_pages < page_needed)
600                 return 1;
601         /*
602          * we wanted to return 0 here if there was a shrink just
603          * happened a moment ago, but this may cause deadlock if both
604          * client and ost live on single node.
605          */
606
607         /*
608          * here we perhaps need consider other factors like wait queue
609          * length, idle index, etc. ?
610          */
611
612         /* grow the pools in any other cases */
613         return 1;
614 }
615
616 /*
617  * Export the number of free pages in the pool
618  */
619 int sptlrpc_enc_pool_get_free_pages(unsigned int pool)
620 {
621         return page_pools[pool]->epp_free_pages;
622 }
623 EXPORT_SYMBOL(sptlrpc_enc_pool_get_free_pages);
624
625 /*
626  * Let outside world know if enc_pool full capacity is reached
627  */
628 int __pool_is_at_full_capacity(unsigned int pool)
629 {
630         return (page_pools[pool]->epp_total_pages ==
631                 page_pools[pool]->epp_max_pages);
632 }
633
634 /*
635  * Let outside world know if enc_pool full capacity is reached
636  */
637 int pool_is_at_full_capacity(void)
638 {
639         return __pool_is_at_full_capacity(PAGES_POOL);
640 }
641 EXPORT_SYMBOL(pool_is_at_full_capacity);
642
643 static inline void **page_from_bulkdesc(void *array, int index)
644 {
645         struct ptlrpc_bulk_desc *desc = (struct ptlrpc_bulk_desc *)array;
646
647         return (void **)&desc->bd_enc_vec[index].bv_page;
648 }
649
650 static inline void **page_from_pagearray(void *array, int index)
651 {
652         struct page **pa = (struct page **)array;
653
654         return (void **)&pa[index];
655 }
656
657 static inline void **page_from_bufarray(void *array, int index)
658 {
659         return (void **)array;
660 }
661
662 /*
663  * we allocate the requested pages atomically.
664  */
665 static inline int __sptlrpc_enc_pool_get_pages(void *array, unsigned int count,
666                                         unsigned int pool,
667                                         void **(*page_from)(void *, int))
668 {
669         struct ptlrpc_enc_page_pool *page_pool = page_pools[pool];
670         wait_queue_entry_t waitlink;
671         unsigned long this_idle = -1;
672         u64 tick_ns = 0;
673         time64_t now;
674         int p_idx, g_idx;
675         int i, rc = 0;
676
677         if (pool)
678                 count = 1;
679
680         if (!array || count <= 0 || count > page_pool->epp_max_pages)
681                 return -EINVAL;
682
683         spin_lock(&page_pool->epp_lock);
684
685         page_pool->epp_st_access++;
686 again:
687         if (unlikely(page_pool->epp_free_pages < count)) {
688                 if (tick_ns == 0)
689                         tick_ns = ktime_get_ns();
690
691                 now = ktime_get_real_seconds();
692
693                 page_pool->epp_st_missings++;
694                 page_pool->epp_pages_short += count;
695
696                 if (enc_pools_should_grow(count, now, pool)) {
697                         page_pool->epp_growing = 1;
698
699                         spin_unlock(&page_pool->epp_lock);
700                         CDEBUG(D_SEC, "epp_pages_short: %lu\n", page_pool->epp_pages_short);
701                         enc_pools_add_pages(8, pool);
702                         spin_lock(&page_pool->epp_lock);
703
704                         page_pool->epp_growing = 0;
705
706                         enc_pools_wakeup(pool);
707                 } else {
708                         if (page_pool->epp_growing) {
709                                 if (++page_pool->epp_waitqlen >
710                                     page_pool->epp_st_max_wqlen)
711                                         page_pool->epp_st_max_wqlen =
712                                                 page_pool->epp_waitqlen;
713
714                                 set_current_state(TASK_UNINTERRUPTIBLE);
715                                 init_wait(&waitlink);
716                                 add_wait_queue(&page_pool->epp_waitq,
717                                                &waitlink);
718
719                                 spin_unlock(&page_pool->epp_lock);
720                                 schedule();
721                                 remove_wait_queue(&page_pool->epp_waitq,
722                                                   &waitlink);
723                                 spin_lock(&page_pool->epp_lock);
724                                 page_pool->epp_waitqlen--;
725                         } else {
726                                 /*
727                                  * ptlrpcd thread should not sleep in that case,
728                                  * or deadlock may occur!
729                                  * Instead, return -ENOMEM so that upper layers
730                                  * will put request back in queue.
731                                  */
732                                 page_pool->epp_st_outofmem++;
733                                 GOTO(out_unlock, rc = -ENOMEM);
734                         }
735                 }
736
737                 if (page_pool->epp_pages_short < count)
738                         GOTO(out_unlock, rc = -EPROTO);
739                 page_pool->epp_pages_short -= count;
740
741                 this_idle = 0;
742                 goto again;
743         }
744
745         /* record max wait time */
746         if (unlikely(tick_ns)) {
747                 ktime_t tick = ktime_sub_ns(ktime_get(), tick_ns);
748
749                 if (ktime_after(tick, page_pool->epp_st_max_wait))
750                         page_pool->epp_st_max_wait = tick;
751         }
752
753         /* proceed with rest of allocation */
754         page_pool->epp_free_pages -= count;
755
756         p_idx = page_pool->epp_free_pages / PAGES_PER_POOL;
757         g_idx = page_pool->epp_free_pages % PAGES_PER_POOL;
758
759         for (i = 0; i < count; i++) {
760                 void **pagep = page_from(array, i);
761
762                 if (page_pool->epp_pools[p_idx][g_idx] == NULL)
763                         GOTO(out_unlock, rc = -EPROTO);
764                 *pagep = page_pool->epp_pools[p_idx][g_idx];
765                 page_pool->epp_pools[p_idx][g_idx] = NULL;
766
767                 if (++g_idx == PAGES_PER_POOL) {
768                         p_idx++;
769                         g_idx = 0;
770                 }
771         }
772
773         if (page_pool->epp_free_pages < page_pool->epp_st_lowfree)
774                 page_pool->epp_st_lowfree =
775                         page_pool->epp_free_pages;
776
777         /*
778          * new idle index = (old * weight + new) / (weight + 1)
779          */
780         if (this_idle == -1) {
781                 this_idle = page_pool->epp_free_pages * IDLE_IDX_MAX /
782                         page_pool->epp_total_pages;
783         }
784         page_pool->epp_idle_idx = (page_pool->epp_idle_idx *
785                         IDLE_IDX_WEIGHT + this_idle) /
786                         (IDLE_IDX_WEIGHT + 1);
787
788         page_pool->epp_last_access = ktime_get_seconds();
789
790 out_unlock:
791         spin_unlock(&page_pool->epp_lock);
792         return rc;
793 }
794
795 int sptlrpc_enc_pool_get_pages(struct ptlrpc_bulk_desc *desc)
796 {
797         int rc;
798
799         LASSERT(desc->bd_iov_count > 0);
800         LASSERT(desc->bd_iov_count <= page_pools[PAGES_POOL]->epp_max_pages);
801
802         /* resent bulk, enc iov might have been allocated previously */
803         if (desc->bd_enc_vec != NULL)
804                 return 0;
805
806         OBD_ALLOC_LARGE(desc->bd_enc_vec,
807                         desc->bd_iov_count * sizeof(*desc->bd_enc_vec));
808         if (desc->bd_enc_vec == NULL)
809                 return -ENOMEM;
810
811         rc = __sptlrpc_enc_pool_get_pages((void *)desc, desc->bd_iov_count,
812                                           PAGES_POOL, page_from_bulkdesc);
813         if (rc) {
814                 OBD_FREE_LARGE(desc->bd_enc_vec,
815                                desc->bd_iov_count *
816                                sizeof(*desc->bd_enc_vec));
817                 desc->bd_enc_vec = NULL;
818         }
819         return rc;
820 }
821 EXPORT_SYMBOL(sptlrpc_enc_pool_get_pages);
822
823 int sptlrpc_enc_pool_get_pages_array(struct page **pa, unsigned int count)
824 {
825         return __sptlrpc_enc_pool_get_pages((void *)pa, count, PAGES_POOL,
826                                             page_from_pagearray);
827 }
828 EXPORT_SYMBOL(sptlrpc_enc_pool_get_pages_array);
829
830 int sptlrpc_enc_pool_get_buf(void **buf, unsigned int size_bits)
831 {
832         return __sptlrpc_enc_pool_get_pages((void *)buf, 0,
833                                             PPOOL_SIZE_TO_INDEX(size_bits),
834                                             page_from_bufarray);
835 }
836 EXPORT_SYMBOL(sptlrpc_enc_pool_get_buf);
837
838 static int __sptlrpc_enc_pool_put_pages(void *array, unsigned int count,
839                                         unsigned int pool,
840                                         void **(*page_from)(void *, int))
841 {
842         int p_idx, g_idx;
843         int i, rc = 0;
844         struct ptlrpc_enc_page_pool *page_pool;
845
846         LASSERTF(pool < POOLS_COUNT, "count %u, pool %u\n", count, pool);
847         if (!array || pool >= POOLS_COUNT) {
848                 CERROR("Faled to put %u pages, from pull %u\n", count, pool);
849                 return -EINVAL;
850         }
851
852         page_pool = page_pools[pool];
853         LASSERTF(page_pool != NULL, "count %u, pool %u\n", count, pool);
854
855         spin_lock(&page_pool->epp_lock);
856
857         p_idx = page_pool->epp_free_pages / PAGES_PER_POOL;
858         g_idx = page_pool->epp_free_pages % PAGES_PER_POOL;
859
860         if (page_pool->epp_free_pages + count > page_pool->epp_total_pages)
861                 GOTO(out_unlock, rc = -EPROTO);
862         if (!page_pool->epp_pools[p_idx])
863                 GOTO(out_unlock, rc = -EPROTO);
864
865         for (i = 0; i < count; i++) {
866                 void **pagep = page_from(array, i);
867
868                 if (!*pagep ||
869                     page_pool->epp_pools[p_idx][g_idx] != NULL)
870                         GOTO(out_unlock, rc = -EPROTO);
871
872                 page_pool->epp_pools[p_idx][g_idx] = *pagep;
873                 if (++g_idx == PAGES_PER_POOL) {
874                         p_idx++;
875                         g_idx = 0;
876                 }
877         }
878
879         page_pool->epp_free_pages += count;
880         enc_pools_wakeup(pool);
881
882 out_unlock:
883         spin_unlock(&page_pool->epp_lock);
884         return rc;
885 }
886
887 void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc)
888 {
889         int rc;
890
891         if (desc->bd_enc_vec == NULL)
892                 return;
893
894         rc = __sptlrpc_enc_pool_put_pages((void *)desc, desc->bd_iov_count,
895                                           PAGES_POOL, page_from_bulkdesc);
896         if (rc)
897                 CDEBUG(D_SEC, "error putting pages in enc pool: %d\n", rc);
898
899         OBD_FREE_LARGE(desc->bd_enc_vec,
900                        desc->bd_iov_count * sizeof(*desc->bd_enc_vec));
901         desc->bd_enc_vec = NULL;
902 }
903
904 void sptlrpc_enc_pool_put_pages_array(struct page **pa, unsigned int count)
905 {
906         int rc;
907
908         rc = __sptlrpc_enc_pool_put_pages((void *)pa, count, PAGES_POOL,
909                                           page_from_pagearray);
910
911         if (rc)
912                 CDEBUG(D_SEC, "error putting pages in enc pool: %d\n", rc);
913 }
914 EXPORT_SYMBOL(sptlrpc_enc_pool_put_pages_array);
915
916 void sptlrpc_enc_pool_put_buf(void *buf, unsigned int size_bits)
917 {
918         int rc;
919
920         rc = __sptlrpc_enc_pool_put_pages(buf, 1,
921                                           PPOOL_SIZE_TO_INDEX(size_bits),
922                                           page_from_bufarray);
923         if (rc)
924                 CDEBUG(D_SEC, "error putting pages in enc pool: %d\n", rc);
925 }
926 EXPORT_SYMBOL(sptlrpc_enc_pool_put_buf);
927
928
929 /*
930  * we don't do much stuff for add_user/del_user anymore, except adding some
931  * initial pages in add_user() if current pools are empty, rest would be
932  * handled by the pools's self-adaption.
933  */
934 int sptlrpc_enc_pool_add_user(void)
935 {
936         int need_grow = 0;
937
938         spin_lock(&page_pools[PAGES_POOL]->epp_lock);
939         if (page_pools[PAGES_POOL]->epp_growing == 0 &&
940                 page_pools[PAGES_POOL]->epp_total_pages == 0) {
941                 page_pools[PAGES_POOL]->epp_growing = 1;
942                 need_grow = 1;
943         }
944         spin_unlock(&page_pools[PAGES_POOL]->epp_lock);
945
946
947         if (need_grow) {
948                 enc_pools_add_pages(PTLRPC_MAX_BRW_PAGES +
949                                     PTLRPC_MAX_BRW_PAGES, 0);
950
951                 spin_lock(&page_pools[PAGES_POOL]->epp_lock);
952                 page_pools[PAGES_POOL]->epp_growing = 0;
953                 enc_pools_wakeup(PAGES_POOL);
954                 spin_unlock(&page_pools[PAGES_POOL]->epp_lock);
955         }
956         return 0;
957 }
958 EXPORT_SYMBOL(sptlrpc_enc_pool_add_user);
959
960 static inline void enc_pools_alloc(struct ptlrpc_enc_page_pool *pool)
961 {
962         LASSERT(pool->epp_max_pools);
963         OBD_ALLOC_LARGE(pool->epp_pools,
964                         pool->epp_max_pools *
965                         sizeof(*pool->epp_pools));
966 }
967
968 static inline void enc_pools_free(unsigned int i)
969 {
970         LASSERT(page_pools[i]->epp_max_pools);
971         LASSERT(page_pools[i]->epp_pools);
972
973         OBD_FREE_LARGE(page_pools[i]->epp_pools,
974                        page_pools[i]->epp_max_pools *
975                        sizeof(*page_pools[i]->epp_pools));
976 }
977
978 int sptlrpc_enc_pool_init(void)
979 {
980         int pool_index = 0, to_revert;
981         int rc = 0;
982         struct ptlrpc_enc_page_pool *pool;
983
984         ENTRY;
985         OBD_ALLOC(page_pools, POOLS_COUNT * sizeof(*page_pools));
986         if (page_pools == NULL)
987                 RETURN(-ENOMEM);
988         for (pool_index = 0; pool_index < POOLS_COUNT; pool_index++) {
989                 OBD_ALLOC(page_pools[pool_index], sizeof(**page_pools));
990                 if (page_pools[pool_index] == NULL)
991                         GOTO(fail, rc = -ENOMEM);
992
993                 pool = page_pools[pool_index];
994                 pool->epp_max_pages =
995                         cfs_totalram_pages() / POOLS_COUNT;
996                 if (enc_pool_max_memory_mb > 0 &&
997                     enc_pool_max_memory_mb <= (cfs_totalram_pages() >> mult))
998                         pool->epp_max_pages =
999                                 enc_pool_max_memory_mb << mult;
1000
1001                 pool->epp_max_pools =
1002                         npages_to_npools(pool->epp_max_pages);
1003
1004                 init_waitqueue_head(&pool->epp_waitq);
1005                 pool->epp_last_shrink = ktime_get_seconds();
1006                 pool->epp_last_access = ktime_get_seconds();
1007
1008                 spin_lock_init(&pool->epp_lock);
1009                 pool->epp_st_max_wait = ktime_set(0, 0);
1010
1011                 enc_pools_alloc(pool);
1012                 CDEBUG(D_SEC, "Allocated pool %i\n", pool_index);
1013                 if (pool->epp_pools == NULL)
1014                         GOTO(fail, rc = -ENOMEM);
1015                 /* Pass pool number as part of pools_shrinker_seeks value */
1016 #ifdef HAVE_SHRINKER_COUNT
1017                 pool->pool_shrinker.count_objects = enc_pools_shrink_count;
1018                 pool->pool_shrinker.scan_objects = enc_pools_shrink_scan;
1019 #else
1020                 pool->pool_shrinker.shrink = enc_pools_shrink;
1021 #endif
1022                 pool->pool_shrinker.seeks = INDEX_TO_SEEKS(pool_index);
1023
1024                 rc = register_shrinker(&pool->pool_shrinker);
1025                 if (rc)
1026                         GOTO(fail, rc);
1027
1028                 mutex_init(&pool->add_pages_mutex);
1029         }
1030
1031         RETURN(0);
1032 fail:
1033         to_revert = pool_index;
1034         for (pool_index = 0; pool_index <= to_revert; pool_index++) {
1035                 pool = page_pools[pool_index];
1036                 if (pool) {
1037                         if (pool->epp_pools) 
1038                                 enc_pools_free(pool_index);
1039                         OBD_FREE(pool, sizeof(**page_pools));
1040                 }
1041         }
1042         OBD_FREE(page_pools, POOLS_COUNT * sizeof(*page_pools));
1043
1044         RETURN(rc);
1045 }
1046
1047 void sptlrpc_enc_pool_fini(void)
1048 {
1049         unsigned long cleaned, npools;
1050         int pool_index;
1051         struct ptlrpc_enc_page_pool *pool;
1052
1053         for (pool_index = 0; pool_index < POOLS_COUNT; pool_index++) {
1054                 pool = page_pools[pool_index];
1055                 unregister_shrinker(&pool->pool_shrinker);
1056                 LASSERT(pool->epp_pools);
1057                 LASSERT(pool->epp_total_pages == pool->epp_free_pages);
1058
1059                 npools = npages_to_npools(pool->epp_total_pages);
1060                 cleaned = enc_pools_cleanup(pool->epp_pools,
1061                                             npools, pool_index);
1062                 LASSERT(cleaned == pool->epp_total_pages);
1063
1064                 enc_pools_free(pool_index);
1065
1066                 if (pool->epp_st_access > 0) {
1067                         CDEBUG(D_SEC,
1068                                "max pages %lu, grows %u, grow fails %u, shrinks %u, access %lu, missing %lu, max qlen %u, max wait ms %lld, out of mem %lu\n",
1069                                pool->epp_st_max_pages,
1070                                pool->epp_st_grows,
1071                                pool->epp_st_grow_fails,
1072                                pool->epp_st_shrinks,
1073                                pool->epp_st_access,
1074                                pool->epp_st_missings,
1075                                pool->epp_st_max_wqlen,
1076                                ktime_to_ms(pool->epp_st_max_wait),
1077                                pool->epp_st_outofmem);
1078                 }
1079
1080                 OBD_FREE(pool, sizeof(**page_pools));
1081         }
1082
1083         OBD_FREE(page_pools, POOLS_COUNT * sizeof(*page_pools));
1084 }
1085
1086 static int cfs_hash_alg_id[] = {
1087         [BULK_HASH_ALG_NULL]    = CFS_HASH_ALG_NULL,
1088         [BULK_HASH_ALG_ADLER32] = CFS_HASH_ALG_ADLER32,
1089         [BULK_HASH_ALG_CRC32]   = CFS_HASH_ALG_CRC32,
1090         [BULK_HASH_ALG_MD5]     = CFS_HASH_ALG_MD5,
1091         [BULK_HASH_ALG_SHA1]    = CFS_HASH_ALG_SHA1,
1092         [BULK_HASH_ALG_SHA256]  = CFS_HASH_ALG_SHA256,
1093         [BULK_HASH_ALG_SHA384]  = CFS_HASH_ALG_SHA384,
1094         [BULK_HASH_ALG_SHA512]  = CFS_HASH_ALG_SHA512,
1095 };
1096 const char *sptlrpc_get_hash_name(__u8 hash_alg)
1097 {
1098         return cfs_crypto_hash_name(cfs_hash_alg_id[hash_alg]);
1099 }
1100
1101 __u8 sptlrpc_get_hash_alg(const char *algname)
1102 {
1103         return cfs_crypto_hash_alg(algname);
1104 }
1105
1106 int bulk_sec_desc_unpack(struct lustre_msg *msg, int offset, int swabbed)
1107 {
1108         struct ptlrpc_bulk_sec_desc *bsd;
1109         int size = msg->lm_buflens[offset];
1110
1111         bsd = lustre_msg_buf(msg, offset, sizeof(*bsd));
1112         if (bsd == NULL) {
1113                 CERROR("Invalid bulk sec desc: size %d\n", size);
1114                 return -EINVAL;
1115         }
1116
1117         if (swabbed)
1118                 __swab32s(&bsd->bsd_nob);
1119
1120         if (unlikely(bsd->bsd_version != 0)) {
1121                 CERROR("Unexpected version %u\n", bsd->bsd_version);
1122                 return -EPROTO;
1123         }
1124
1125         if (unlikely(bsd->bsd_type >= SPTLRPC_BULK_MAX)) {
1126                 CERROR("Invalid type %u\n", bsd->bsd_type);
1127                 return -EPROTO;
1128         }
1129
1130         /* FIXME more sanity check here */
1131
1132         if (unlikely(bsd->bsd_svc != SPTLRPC_BULK_SVC_NULL &&
1133                      bsd->bsd_svc != SPTLRPC_BULK_SVC_INTG &&
1134                      bsd->bsd_svc != SPTLRPC_BULK_SVC_PRIV)) {
1135                 CERROR("Invalid svc %u\n", bsd->bsd_svc);
1136                 return -EPROTO;
1137         }
1138
1139         return 0;
1140 }
1141 EXPORT_SYMBOL(bulk_sec_desc_unpack);
1142
1143 /*
1144  * Compute the checksum of an RPC buffer payload.  If the return \a buflen
1145  * is not large enough, truncate the result to fit so that it is possible
1146  * to use a hash function with a large hash space, but only use a part of
1147  * the resulting hash.
1148  */
1149 int sptlrpc_get_bulk_checksum(struct ptlrpc_bulk_desc *desc, __u8 alg,
1150                               void *buf, int buflen)
1151 {
1152         struct ahash_request *req;
1153         int hashsize;
1154         unsigned int bufsize;
1155         int i, err;
1156
1157         LASSERT(alg > BULK_HASH_ALG_NULL && alg < BULK_HASH_ALG_MAX);
1158         LASSERT(buflen >= 4);
1159
1160         req = cfs_crypto_hash_init(cfs_hash_alg_id[alg], NULL, 0);
1161         if (IS_ERR(req)) {
1162                 CERROR("Unable to initialize checksum hash %s\n",
1163                        cfs_crypto_hash_name(cfs_hash_alg_id[alg]));
1164                 return PTR_ERR(req);
1165         }
1166
1167         hashsize = cfs_crypto_hash_digestsize(cfs_hash_alg_id[alg]);
1168
1169         for (i = 0; i < desc->bd_iov_count; i++) {
1170                 cfs_crypto_hash_update_page(req,
1171                                   desc->bd_vec[i].bv_page,
1172                                   desc->bd_vec[i].bv_offset &
1173                                               ~PAGE_MASK,
1174                                   desc->bd_vec[i].bv_len);
1175         }
1176
1177         if (hashsize > buflen) {
1178                 unsigned char hashbuf[CFS_CRYPTO_HASH_DIGESTSIZE_MAX];
1179
1180                 bufsize = sizeof(hashbuf);
1181                 LASSERTF(bufsize >= hashsize, "bufsize = %u < hashsize %u\n",
1182                          bufsize, hashsize);
1183                 err = cfs_crypto_hash_final(req, hashbuf, &bufsize);
1184                 memcpy(buf, hashbuf, buflen);
1185         } else {
1186                 bufsize = buflen;
1187                 err = cfs_crypto_hash_final(req, buf, &bufsize);
1188         }
1189
1190         return err;
1191 }