Whamcloud - gitweb
branch: HEAD
[fs/lustre-release.git] / lustre / ptlrpc / sec_bulk.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2006 Cluster File Systems, Inc.
5  *   Author: Eric Mei <ericm@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */
22
23 #ifndef EXPORT_SYMTAB
24 #define EXPORT_SYMTAB
25 #endif
26 #define DEBUG_SUBSYSTEM S_SEC
27
28 #include <libcfs/libcfs.h>
29 #ifndef __KERNEL__
30 #include <liblustre.h>
31 #include <libcfs/list.h>
32 #else
33 #include <linux/crypto.h>
34 #endif
35
36 #include <obd.h>
37 #include <obd_class.h>
38 #include <obd_support.h>
39 #include <lustre_net.h>
40 #include <lustre_import.h>
41 #include <lustre_dlm.h>
42 #include <lustre_sec.h>
43
44 #include "ptlrpc_internal.h"
45
46 /****************************************
47  * bulk encryption page pools           *
48  ****************************************/
49
50 #ifdef __KERNEL__
51
52 #define PTRS_PER_PAGE   (CFS_PAGE_SIZE / sizeof(void *))
53 #define PAGES_PER_POOL  (PTRS_PER_PAGE)
54
55 static struct ptlrpc_enc_page_pool {
56         /*
57          * constants
58          */
59         unsigned long    epp_max_pages;   /* maximum pages can hold, const */
60         unsigned int     epp_max_pools;   /* number of pools, const */
61         /*
62          * users of the pools. the capacity grow as more user added,
63          * but doesn't shrink when users gone -- just current policy.
64          * during failover there might be user add/remove activities.
65          */
66         atomic_t         epp_users;       /* shared by how many users (osc) */
67         atomic_t         epp_users_gone;  /* users removed */
68         /*
69          * wait queue in case of not enough free pages.
70          */
71         cfs_waitq_t      epp_waitq;       /* waiting threads */
72         unsigned int     epp_waitqlen;    /* wait queue length */
73         unsigned long    epp_pages_short; /* # of pages wanted of in-q users */
74         unsigned long    epp_adding:1,    /* during adding pages */
75                          epp_full:1;      /* pools are all full */
76         /*
77          * in-pool pages bookkeeping
78          */
79         spinlock_t       epp_lock;        /* protect following fields */
80         unsigned long    epp_total_pages; /* total pages in pools */
81         unsigned long    epp_free_pages;  /* current pages available */
82         /*
83          * statistics
84          */
85         unsigned int     epp_st_adds;
86         unsigned int     epp_st_failadds; /* # of add pages failures */
87         unsigned long    epp_st_reqs;     /* # of get_pages requests */
88         unsigned long    epp_st_missings; /* # of cache missing */
89         unsigned long    epp_st_lowfree;  /* lowest free pages ever reached */
90         unsigned long    epp_st_max_wqlen;/* highest waitqueue length ever */
91         cfs_time_t       epp_st_max_wait; /* in jeffies */
92         /*
93          * pointers to pools
94          */
95         cfs_page_t    ***epp_pools;
96 } page_pools;
97
98 int sptlrpc_proc_read_enc_pool(char *page, char **start, off_t off, int count,
99                                int *eof, void *data)
100 {
101         int     rc;
102
103         spin_lock(&page_pools.epp_lock);
104
105         rc = snprintf(page, count,
106                       "physical pages:          %lu\n"
107                       "pages per pool:          %lu\n"
108                       "max pages:               %lu\n"
109                       "max pools:               %u\n"
110                       "users:                   %d - %d\n"
111                       "current waitqueue len:   %u\n"
112                       "current pages in short:  %lu\n"
113                       "total pages:             %lu\n"
114                       "total free:              %lu\n"
115                       "add page times:          %u\n"
116                       "add page failed times:   %u\n"
117                       "total requests:          %lu\n"
118                       "cache missing:           %lu\n"
119                       "lowest free pages:       %lu\n"
120                       "max waitqueue depth:     %lu\n"
121                       "max wait time:           "CFS_TIME_T"\n"
122                       ,
123                       num_physpages,
124                       PAGES_PER_POOL,
125                       page_pools.epp_max_pages,
126                       page_pools.epp_max_pools,
127                       atomic_read(&page_pools.epp_users),
128                       atomic_read(&page_pools.epp_users_gone),
129                       page_pools.epp_waitqlen,
130                       page_pools.epp_pages_short,
131                       page_pools.epp_total_pages,
132                       page_pools.epp_free_pages,
133                       page_pools.epp_st_adds,
134                       page_pools.epp_st_failadds,
135                       page_pools.epp_st_reqs,
136                       page_pools.epp_st_missings,
137                       page_pools.epp_st_lowfree,
138                       page_pools.epp_st_max_wqlen,
139                       page_pools.epp_st_max_wait
140                      );
141
142         spin_unlock(&page_pools.epp_lock);
143         return rc;
144 }
145
146 static inline
147 int npages_to_npools(unsigned long npages)
148 {
149         return (int) ((npages + PAGES_PER_POOL - 1) / PAGES_PER_POOL);
150 }
151
152 /*
153  * return how many pages cleaned up.
154  */
155 static unsigned long enc_cleanup_pools(cfs_page_t ***pools, int npools)
156 {
157         unsigned long cleaned = 0;
158         int           i, j;
159
160         for (i = 0; i < npools; i++) {
161                 if (pools[i]) {
162                         for (j = 0; j < PAGES_PER_POOL; j++) {
163                                 if (pools[i][j]) {
164                                         cfs_free_page(pools[i][j]);
165                                         cleaned++;
166                                 }
167                         }
168                         OBD_FREE(pools[i], CFS_PAGE_SIZE);
169                         pools[i] = NULL;
170                 }
171         }
172
173         return cleaned;
174 }
175
176 /*
177  * merge @npools pointed by @pools which contains @npages new pages
178  * into current pools.
179  *
180  * we have options to avoid most memory copy with some tricks. but we choose
181  * the simplest way to avoid complexity. It's not frequently called.
182  */
183 static void enc_insert_pool(cfs_page_t ***pools, int npools, int npages)
184 {
185         int     freeslot;
186         int     op_idx, np_idx, og_idx, ng_idx;
187         int     cur_npools, end_npools;
188
189         LASSERT(npages > 0);
190         LASSERT(page_pools.epp_total_pages+npages <= page_pools.epp_max_pages);
191         LASSERT(npages_to_npools(npages) == npools);
192
193         spin_lock(&page_pools.epp_lock);
194
195         /*
196          * (1) fill all the free slots of current pools.
197          */
198         /*
199          * free slots are those left by rent pages, and the extra ones with
200          * index >= eep_total_pages, locate at the tail of last pool.
201          */
202         freeslot = page_pools.epp_total_pages % PAGES_PER_POOL;
203         if (freeslot != 0)
204                 freeslot = PAGES_PER_POOL - freeslot;
205         freeslot += page_pools.epp_total_pages - page_pools.epp_free_pages;
206
207         op_idx = page_pools.epp_free_pages / PAGES_PER_POOL;
208         og_idx = page_pools.epp_free_pages % PAGES_PER_POOL;
209         np_idx = npools - 1;
210         ng_idx = (npages - 1) % PAGES_PER_POOL;
211
212         while (freeslot) {
213                 LASSERT(page_pools.epp_pools[op_idx][og_idx] == NULL);
214                 LASSERT(pools[np_idx][ng_idx] != NULL);
215
216                 page_pools.epp_pools[op_idx][og_idx] = pools[np_idx][ng_idx];
217                 pools[np_idx][ng_idx] = NULL;
218
219                 freeslot--;
220
221                 if (++og_idx == PAGES_PER_POOL) {
222                         op_idx++;
223                         og_idx = 0;
224                 }
225                 if (--ng_idx < 0) {
226                         if (np_idx == 0)
227                                 break;
228                         np_idx--;
229                         ng_idx = PAGES_PER_POOL - 1;
230                 }
231         }
232
233         /*
234          * (2) add pools if needed.
235          */
236         cur_npools = (page_pools.epp_total_pages + PAGES_PER_POOL - 1) /
237                      PAGES_PER_POOL;
238         end_npools = (page_pools.epp_total_pages + npages + PAGES_PER_POOL -1) /
239                      PAGES_PER_POOL;
240         LASSERT(end_npools <= page_pools.epp_max_pools);
241
242         np_idx = 0;
243         while (cur_npools < end_npools) {
244                 LASSERT(page_pools.epp_pools[cur_npools] == NULL);
245                 LASSERT(np_idx < npools);
246                 LASSERT(pools[np_idx] != NULL);
247
248                 page_pools.epp_pools[cur_npools++] = pools[np_idx];
249                 pools[np_idx++] = NULL;
250         }
251
252         page_pools.epp_total_pages += npages;
253         page_pools.epp_free_pages += npages;
254         page_pools.epp_st_lowfree = page_pools.epp_free_pages;
255
256         if (page_pools.epp_total_pages == page_pools.epp_max_pages)
257                 page_pools.epp_full = 1;
258
259         CDEBUG(D_SEC, "add %d pages to total %lu\n", npages,
260                page_pools.epp_total_pages);
261
262         spin_unlock(&page_pools.epp_lock);
263 }
264
265 static int enc_pools_add_pages(int npages)
266 {
267         static DECLARE_MUTEX(sem_add_pages);
268         cfs_page_t   ***pools;
269         int             npools, alloced = 0;
270         int             i, j, rc = -ENOMEM;
271
272         down(&sem_add_pages);
273
274         if (npages > page_pools.epp_max_pages - page_pools.epp_total_pages)
275                 npages = page_pools.epp_max_pages - page_pools.epp_total_pages;
276         if (npages == 0) {
277                 rc = 0;
278                 goto out;
279         }
280
281         page_pools.epp_st_adds++;
282
283         npools = npages_to_npools(npages);
284         OBD_ALLOC(pools, npools * sizeof(*pools));
285         if (pools == NULL)
286                 goto out;
287
288         for (i = 0; i < npools; i++) {
289                 OBD_ALLOC(pools[i], CFS_PAGE_SIZE);
290                 if (pools[i] == NULL)
291                         goto out_pools;
292
293                 for (j = 0; j < PAGES_PER_POOL && alloced < npages; j++) {
294                         pools[i][j] = cfs_alloc_page(CFS_ALLOC_IO |
295                                                      CFS_ALLOC_HIGH);
296                         if (pools[i][j] == NULL)
297                                 goto out_pools;
298
299                         alloced++;
300                 }
301         }
302
303         enc_insert_pool(pools, npools, npages);
304         CDEBUG(D_SEC, "add %d pages into enc page pools\n", npages);
305         rc = 0;
306
307 out_pools:
308         enc_cleanup_pools(pools, npools);
309         OBD_FREE(pools, npools * sizeof(*pools));
310 out:
311         if (rc) {
312                 page_pools.epp_st_failadds++;
313                 CERROR("Failed to pre-allocate %d enc pages\n", npages);
314         }
315
316         up(&sem_add_pages);
317         return rc;
318 }
319
320 /*
321  * both "max bulk rpcs inflight" and "lnet MTU" are tunable, we use the
322  * default fixed value initially.
323  */
324 int sptlrpc_enc_pool_add_user(void)
325 {
326         int page_plus = PTLRPC_MAX_BRW_PAGES * OSC_MAX_RIF_DEFAULT;
327         int users, users_gone, shift, rc;
328
329         LASSERT(!in_interrupt());
330         LASSERT(atomic_read(&page_pools.epp_users) >= 0);
331
332         users_gone = atomic_dec_return(&page_pools.epp_users_gone);
333         if (users_gone >= 0) {
334                 CWARN("%d users gone, skip\n", users_gone + 1);
335                 return 0;
336         }
337         atomic_inc(&page_pools.epp_users_gone);
338
339         /*
340          * prepare full pages for first 2 users; 1/2 for next 2 users;
341          * 1/4 for next 4 users; 1/8 for next 8 users; 1/16 for next 16 users;
342          * ...
343          */
344         users = atomic_add_return(1, &page_pools.epp_users);
345         shift = fls(users - 1);
346         shift = shift > 1 ? shift - 1 : 0;
347         page_plus = page_plus >> shift;
348         page_plus = page_plus > 2 ? page_plus : 2;
349
350         rc = enc_pools_add_pages(page_plus);
351         return 0;
352 }
353 EXPORT_SYMBOL(sptlrpc_enc_pool_add_user);
354
355 int sptlrpc_enc_pool_del_user(void)
356 {
357         atomic_inc(&page_pools.epp_users_gone);
358         return 0;
359 }
360 EXPORT_SYMBOL(sptlrpc_enc_pool_del_user);
361
362 /*
363  * we allocate the requested pages atomically.
364  */
365 int sptlrpc_enc_pool_get_pages(struct ptlrpc_bulk_desc *desc)
366 {
367         cfs_waitlink_t  waitlink;
368         cfs_time_t      tick1 = 0, tick2;
369         int             p_idx, g_idx;
370         int             i;
371
372         LASSERT(desc->bd_max_iov > 0);
373         LASSERT(desc->bd_max_iov <= page_pools.epp_total_pages);
374
375         /* resent bulk, enc pages might have been allocated previously */
376         if (desc->bd_enc_pages != NULL)
377                 return 0;
378
379         OBD_ALLOC(desc->bd_enc_pages,
380                   desc->bd_max_iov * sizeof(*desc->bd_enc_pages));
381         if (desc->bd_enc_pages == NULL)
382                 return -ENOMEM;
383
384         spin_lock(&page_pools.epp_lock);
385 again:
386         page_pools.epp_st_reqs++;
387
388         if (unlikely(page_pools.epp_free_pages < desc->bd_max_iov)) {
389                 if (tick1 == 0)
390                         tick1 = cfs_time_current();
391
392                 page_pools.epp_st_missings++;
393                 page_pools.epp_pages_short += desc->bd_max_iov;
394
395                 if (++page_pools.epp_waitqlen > page_pools.epp_st_max_wqlen)
396                         page_pools.epp_st_max_wqlen = page_pools.epp_waitqlen;
397                 /*
398                  * we just wait if someone else is adding more pages, or
399                  * wait queue length is not deep enough. otherwise try to
400                  * add more pages in the pools.
401                  *
402                  * FIXME the policy of detecting resource tight & growing pool
403                  * need to be reconsidered.
404                  */
405                 if (page_pools.epp_adding || page_pools.epp_waitqlen < 2 ||
406                     page_pools.epp_full) {
407                         set_current_state(TASK_UNINTERRUPTIBLE);
408                         cfs_waitlink_init(&waitlink);
409                         cfs_waitq_add(&page_pools.epp_waitq, &waitlink);
410
411                         spin_unlock(&page_pools.epp_lock);
412                         cfs_schedule();
413                         spin_lock(&page_pools.epp_lock);
414                 } else {
415                         page_pools.epp_adding = 1;
416
417                         spin_unlock(&page_pools.epp_lock);
418                         enc_pools_add_pages(page_pools.epp_pages_short / 2);
419                         spin_lock(&page_pools.epp_lock);
420
421                         page_pools.epp_adding = 0;
422                 }
423
424                 LASSERT(page_pools.epp_pages_short >= desc->bd_max_iov);
425                 LASSERT(page_pools.epp_waitqlen > 0);
426                 page_pools.epp_pages_short -= desc->bd_max_iov;
427                 page_pools.epp_waitqlen--;
428
429                 goto again;
430         }
431         /*
432          * record max wait time
433          */
434         if (unlikely(tick1 != 0)) {
435                 tick2 = cfs_time_current();
436                 if (tick2 - tick1 > page_pools.epp_st_max_wait)
437                         page_pools.epp_st_max_wait = tick2 - tick1;
438         }
439         /*
440          * proceed with rest of allocation
441          */
442         page_pools.epp_free_pages -= desc->bd_max_iov;
443
444         p_idx = page_pools.epp_free_pages / PAGES_PER_POOL;
445         g_idx = page_pools.epp_free_pages % PAGES_PER_POOL;
446
447         for (i = 0; i < desc->bd_max_iov; i++) {
448                 LASSERT(page_pools.epp_pools[p_idx][g_idx] != NULL);
449                 desc->bd_enc_pages[i] = page_pools.epp_pools[p_idx][g_idx];
450                 page_pools.epp_pools[p_idx][g_idx] = NULL;
451
452                 if (++g_idx == PAGES_PER_POOL) {
453                         p_idx++;
454                         g_idx = 0;
455                 }
456         }
457
458         if (page_pools.epp_free_pages < page_pools.epp_st_lowfree)
459                 page_pools.epp_st_lowfree = page_pools.epp_free_pages;
460
461         spin_unlock(&page_pools.epp_lock);
462         return 0;
463 }
464 EXPORT_SYMBOL(sptlrpc_enc_pool_get_pages);
465
466 void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc)
467 {
468         int     p_idx, g_idx;
469         int     i;
470
471         if (desc->bd_enc_pages == NULL)
472                 return;
473         if (desc->bd_max_iov == 0)
474                 return;
475
476         spin_lock(&page_pools.epp_lock);
477
478         p_idx = page_pools.epp_free_pages / PAGES_PER_POOL;
479         g_idx = page_pools.epp_free_pages % PAGES_PER_POOL;
480
481         LASSERT(page_pools.epp_free_pages + desc->bd_max_iov <=
482                 page_pools.epp_total_pages);
483         LASSERT(page_pools.epp_pools[p_idx]);
484
485         for (i = 0; i < desc->bd_max_iov; i++) {
486                 LASSERT(desc->bd_enc_pages[i] != NULL);
487                 LASSERT(g_idx != 0 || page_pools.epp_pools[p_idx]);
488                 LASSERT(page_pools.epp_pools[p_idx][g_idx] == NULL);
489
490                 page_pools.epp_pools[p_idx][g_idx] = desc->bd_enc_pages[i];
491
492                 if (++g_idx == PAGES_PER_POOL) {
493                         p_idx++;
494                         g_idx = 0;
495                 }
496         }
497
498         page_pools.epp_free_pages += desc->bd_max_iov;
499
500         if (unlikely(page_pools.epp_waitqlen)) {
501                 LASSERT(page_pools.epp_waitqlen > 0);
502                 LASSERT(cfs_waitq_active(&page_pools.epp_waitq));
503                 cfs_waitq_broadcast(&page_pools.epp_waitq);
504         }
505
506         spin_unlock(&page_pools.epp_lock);
507
508         OBD_FREE(desc->bd_enc_pages,
509                  desc->bd_max_iov * sizeof(*desc->bd_enc_pages));
510         desc->bd_enc_pages = NULL;
511 }
512 EXPORT_SYMBOL(sptlrpc_enc_pool_put_pages);
513
514 int sptlrpc_enc_pool_init(void)
515 {
516         /* constants */
517         page_pools.epp_max_pages = num_physpages / 4;
518         page_pools.epp_max_pools = npages_to_npools(page_pools.epp_max_pages);
519
520         atomic_set(&page_pools.epp_users, 0);
521         atomic_set(&page_pools.epp_users_gone, 0);
522
523         cfs_waitq_init(&page_pools.epp_waitq);
524         page_pools.epp_waitqlen = 0;
525         page_pools.epp_pages_short = 0;
526
527         page_pools.epp_adding = 0;
528         page_pools.epp_full = 0;
529
530         spin_lock_init(&page_pools.epp_lock);
531         page_pools.epp_total_pages = 0;
532         page_pools.epp_free_pages = 0;
533
534         page_pools.epp_st_adds = 0;
535         page_pools.epp_st_failadds = 0;
536         page_pools.epp_st_reqs = 0;
537         page_pools.epp_st_missings = 0;
538         page_pools.epp_st_lowfree = 0;
539         page_pools.epp_st_max_wqlen = 0;
540         page_pools.epp_st_max_wait = 0;
541
542         OBD_ALLOC(page_pools.epp_pools,
543                   page_pools.epp_max_pools * sizeof(*page_pools.epp_pools));
544         if (page_pools.epp_pools == NULL)
545                 return -ENOMEM;
546
547         return 0;
548 }
549
550 void sptlrpc_enc_pool_fini(void)
551 {
552         unsigned long cleaned, npools;
553
554         LASSERT(page_pools.epp_pools);
555         LASSERT(page_pools.epp_total_pages == page_pools.epp_free_pages);
556
557         npools = npages_to_npools(page_pools.epp_total_pages);
558         cleaned = enc_cleanup_pools(page_pools.epp_pools, npools);
559         LASSERT(cleaned == page_pools.epp_total_pages);
560
561         OBD_FREE(page_pools.epp_pools,
562                  page_pools.epp_max_pools * sizeof(*page_pools.epp_pools));
563 }
564
565 #else /* !__KERNEL__ */
566
567 int sptlrpc_enc_pool_get_pages(struct ptlrpc_bulk_desc *desc)
568 {
569         return 0;
570 }
571
572 void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc)
573 {
574 }
575
576 int sptlrpc_enc_pool_init(void)
577 {
578         return 0;
579 }
580
581 void sptlrpc_enc_pool_fini(void)
582 {
583 }
584 #endif
585
586 /****************************************
587  * Helpers to assist policy modules to  *
588  * implement checksum funcationality    *
589  ****************************************/
590
591 static struct {
592         char    *name;
593         int      size;
594 } csum_types[] = {
595         [BULK_CSUM_ALG_NULL]    = { "null",     0 },
596         [BULK_CSUM_ALG_CRC32]   = { "crc32",    4 },
597         [BULK_CSUM_ALG_MD5]     = { "md5",     16 },
598         [BULK_CSUM_ALG_SHA1]    = { "sha1",    20 },
599         [BULK_CSUM_ALG_SHA256]  = { "sha256",  32 },
600         [BULK_CSUM_ALG_SHA384]  = { "sha384",  48 },
601         [BULK_CSUM_ALG_SHA512]  = { "sha512",  64 },
602 };
603
604 const char * sptlrpc_bulk_csum_alg2name(__u32 csum_alg)
605 {
606         if (csum_alg < BULK_CSUM_ALG_MAX)
607                 return csum_types[csum_alg].name;
608         return "unknown_cksum";
609 }
610 EXPORT_SYMBOL(sptlrpc_bulk_csum_alg2name);
611
612 int bulk_sec_desc_size(__u32 csum_alg, int request, int read)
613 {
614         int size = sizeof(struct ptlrpc_bulk_sec_desc);
615
616         LASSERT(csum_alg < BULK_CSUM_ALG_MAX);
617
618         /* read request don't need extra data */
619         if (!(read && request))
620                 size += csum_types[csum_alg].size;
621
622         return size;
623 }
624 EXPORT_SYMBOL(bulk_sec_desc_size);
625
626 int bulk_sec_desc_unpack(struct lustre_msg *msg, int offset)
627 {
628         struct ptlrpc_bulk_sec_desc *bsd;
629         int    size = msg->lm_buflens[offset];
630
631         bsd = lustre_msg_buf(msg, offset, sizeof(*bsd));
632         if (bsd == NULL) {
633                 CERROR("Invalid bulk sec desc: size %d\n", size);
634                 return -EINVAL;
635         }
636
637         if (lustre_msg_swabbed(msg)) {
638                 __swab32s(&bsd->bsd_version);
639                 __swab32s(&bsd->bsd_pad);
640                 __swab32s(&bsd->bsd_csum_alg);
641                 __swab32s(&bsd->bsd_priv_alg);
642         }
643
644         if (bsd->bsd_version != 0) {
645                 CERROR("Unexpected version %u\n", bsd->bsd_version);
646                 return -EPROTO;
647         }
648
649         if (bsd->bsd_csum_alg >= BULK_CSUM_ALG_MAX) {
650                 CERROR("Unsupported checksum algorithm %u\n",
651                        bsd->bsd_csum_alg);
652                 return -EINVAL;
653         }
654         if (bsd->bsd_priv_alg >= BULK_PRIV_ALG_MAX) {
655                 CERROR("Unsupported cipher algorithm %u\n",
656                        bsd->bsd_priv_alg);
657                 return -EINVAL;
658         }
659
660         if (size > sizeof(*bsd) &&
661             size < sizeof(*bsd) + csum_types[bsd->bsd_csum_alg].size) {
662                 CERROR("Mal-formed checksum data: csum alg %u, size %d\n",
663                        bsd->bsd_csum_alg, size);
664                 return -EINVAL;
665         }
666
667         return 0;
668 }
669 EXPORT_SYMBOL(bulk_sec_desc_unpack);
670
671 #ifdef __KERNEL__
672 static
673 int do_bulk_checksum_crc32(struct ptlrpc_bulk_desc *desc, void *buf)
674 {
675         struct page *page;
676         int off;
677         char *ptr;
678         __u32 crc32 = ~0;
679         int len, i;
680
681         for (i = 0; i < desc->bd_iov_count; i++) {
682                 page = desc->bd_iov[i].kiov_page;
683                 off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
684                 ptr = cfs_kmap(page) + off;
685                 len = desc->bd_iov[i].kiov_len;
686
687                 crc32 = crc32_le(crc32, ptr, len);
688
689                 cfs_kunmap(page);
690         }
691
692         *((__u32 *) buf) = crc32;
693         return 0;
694 }
695
696 static
697 int do_bulk_checksum(struct ptlrpc_bulk_desc *desc, __u32 alg, void *buf)
698 {
699         struct crypto_tfm *tfm;
700         struct scatterlist *sl;
701         int i, rc = 0;
702
703         LASSERT(alg > BULK_CSUM_ALG_NULL &&
704                 alg < BULK_CSUM_ALG_MAX);
705
706         if (alg == BULK_CSUM_ALG_CRC32)
707                 return do_bulk_checksum_crc32(desc, buf);
708
709         tfm = crypto_alloc_tfm(csum_types[alg].name, 0);
710         if (tfm == NULL) {
711                 CERROR("Unable to allocate tfm %s\n", csum_types[alg].name);
712                 return -ENOMEM;
713         }
714
715         OBD_ALLOC(sl, sizeof(*sl) * desc->bd_iov_count);
716         if (sl == NULL) {
717                 rc = -ENOMEM;
718                 goto out_tfm;
719         }
720
721         for (i = 0; i < desc->bd_iov_count; i++) {
722                 sl[i].page = desc->bd_iov[i].kiov_page;
723                 sl[i].offset = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
724                 sl[i].length = desc->bd_iov[i].kiov_len;
725         }
726
727         crypto_digest_init(tfm);
728         crypto_digest_update(tfm, sl, desc->bd_iov_count);
729         crypto_digest_final(tfm, buf);
730
731         OBD_FREE(sl, sizeof(*sl) * desc->bd_iov_count);
732
733 out_tfm:
734         crypto_free_tfm(tfm);
735         return rc;
736 }
737                          
738 #else /* !__KERNEL__ */
739 static
740 int do_bulk_checksum(struct ptlrpc_bulk_desc *desc, __u32 alg, void *buf)
741 {
742         __u32 crc32 = ~0;
743         int i;
744
745         LASSERT(alg == BULK_CSUM_ALG_CRC32);
746
747         for (i = 0; i < desc->bd_iov_count; i++) {
748                 char *ptr = desc->bd_iov[i].iov_base;
749                 int len = desc->bd_iov[i].iov_len;
750
751                 crc32 = crc32_le(crc32, ptr, len);
752         }
753
754         *((__u32 *) buf) = crc32;
755         return 0;
756 }
757 #endif
758
759 /*
760  * perform algorithm @alg checksum on @desc, store result in @buf.
761  * if anything goes wrong, leave 'alg' be BULK_CSUM_ALG_NULL.
762  */
763 static
764 int generate_bulk_csum(struct ptlrpc_bulk_desc *desc, __u32 alg,
765                        struct ptlrpc_bulk_sec_desc *bsd, int bsdsize)
766 {
767         int rc;
768
769         LASSERT(bsd);
770         LASSERT(alg < BULK_CSUM_ALG_MAX);
771
772         bsd->bsd_csum_alg = BULK_CSUM_ALG_NULL;
773
774         if (alg == BULK_CSUM_ALG_NULL)
775                 return 0;
776
777         LASSERT(bsdsize >= sizeof(*bsd) + csum_types[alg].size);
778
779         rc = do_bulk_checksum(desc, alg, bsd->bsd_csum);
780         if (rc == 0)
781                 bsd->bsd_csum_alg = alg;
782
783         return rc;
784 }
785
786 static
787 int verify_bulk_csum(struct ptlrpc_bulk_desc *desc, int read,
788                      struct ptlrpc_bulk_sec_desc *bsdv, int bsdvsize,
789                      struct ptlrpc_bulk_sec_desc *bsdr, int bsdrsize)
790 {
791         char *csum_p;
792         char *buf = NULL;
793         int   csum_size, rc = 0;
794
795         LASSERT(bsdv);
796         LASSERT(bsdv->bsd_csum_alg < BULK_CSUM_ALG_MAX);
797
798         if (bsdr)
799                 bsdr->bsd_csum_alg = BULK_CSUM_ALG_NULL;
800
801         if (bsdv->bsd_csum_alg == BULK_CSUM_ALG_NULL)
802                 return 0;
803
804         /* for all supported algorithms */
805         csum_size = csum_types[bsdv->bsd_csum_alg].size;
806
807         if (bsdvsize < sizeof(*bsdv) + csum_size) {
808                 CERROR("verifier size %d too small, require %d\n",
809                        bsdvsize, (int) sizeof(*bsdv) + csum_size);
810                 return -EINVAL;
811         }
812
813         if (bsdr) {
814                 LASSERT(bsdrsize >= sizeof(*bsdr) + csum_size);
815                 csum_p = (char *) bsdr->bsd_csum;
816         } else {
817                 OBD_ALLOC(buf, csum_size);
818                 if (buf == NULL)
819                         return -EINVAL;
820                 csum_p = buf;
821         }
822
823         rc = do_bulk_checksum(desc, bsdv->bsd_csum_alg, csum_p);
824
825         if (memcmp(bsdv->bsd_csum, csum_p, csum_size)) {
826                 CERROR("BAD %s CHECKSUM (%s), data mutated during "
827                        "transfer!\n", read ? "READ" : "WRITE",
828                        csum_types[bsdv->bsd_csum_alg].name);
829                 rc = -EINVAL;
830         } else {
831                 CDEBUG(D_SEC, "bulk %s checksum (%s) verified\n",
832                       read ? "read" : "write",
833                       csum_types[bsdv->bsd_csum_alg].name);
834         }
835
836         if (bsdr) {
837                 bsdr->bsd_csum_alg = bsdv->bsd_csum_alg;
838                 memcpy(bsdr->bsd_csum, csum_p, csum_size);
839         } else {
840                 LASSERT(buf);
841                 OBD_FREE(buf, csum_size);
842         }
843
844         return rc;
845 }
846
847 int bulk_csum_cli_request(struct ptlrpc_bulk_desc *desc, int read,
848                           __u32 alg, struct lustre_msg *rmsg, int roff)
849 {
850         struct ptlrpc_bulk_sec_desc *bsdr;
851         int    rsize, rc = 0;
852
853         rsize = rmsg->lm_buflens[roff];
854         bsdr = lustre_msg_buf(rmsg, roff, sizeof(*bsdr));
855
856         LASSERT(bsdr);
857         LASSERT(rsize >= sizeof(*bsdr));
858         LASSERT(alg < BULK_CSUM_ALG_MAX);
859
860         if (read)
861                 bsdr->bsd_csum_alg = alg;
862         else {
863                 rc = generate_bulk_csum(desc, alg, bsdr, rsize);
864                 if (rc) {
865                         CERROR("client bulk write: failed to perform "
866                                "checksum: %d\n", rc);
867                 }
868         }
869
870         return rc;
871 }
872 EXPORT_SYMBOL(bulk_csum_cli_request);
873
874 int bulk_csum_cli_reply(struct ptlrpc_bulk_desc *desc, int read,
875                         struct lustre_msg *rmsg, int roff,
876                         struct lustre_msg *vmsg, int voff)
877 {
878         struct ptlrpc_bulk_sec_desc *bsdv, *bsdr;
879         int    rsize, vsize;
880
881         rsize = rmsg->lm_buflens[roff];
882         vsize = vmsg->lm_buflens[voff];
883         bsdr = lustre_msg_buf(rmsg, roff, 0);
884         bsdv = lustre_msg_buf(vmsg, voff, 0);
885
886         if (bsdv == NULL || vsize < sizeof(*bsdv)) {
887                 CERROR("Invalid checksum verifier from server: size %d\n",
888                        vsize);
889                 return -EINVAL;
890         }
891
892         LASSERT(bsdr);
893         LASSERT(rsize >= sizeof(*bsdr));
894         LASSERT(vsize >= sizeof(*bsdv));
895
896         if (bsdr->bsd_csum_alg != bsdv->bsd_csum_alg) {
897                 CERROR("bulk %s: checksum algorithm mismatch: client request "
898                        "%s but server reply with %s. try to use the new one "
899                        "for checksum verification\n",
900                        read ? "read" : "write",
901                        csum_types[bsdr->bsd_csum_alg].name,
902                        csum_types[bsdv->bsd_csum_alg].name);
903         }
904
905         if (read)
906                 return verify_bulk_csum(desc, 1, bsdv, vsize, NULL, 0);
907         else {
908                 char *cli, *srv, *new = NULL;
909                 int csum_size = csum_types[bsdr->bsd_csum_alg].size;
910
911                 LASSERT(bsdr->bsd_csum_alg < BULK_CSUM_ALG_MAX);
912                 if (bsdr->bsd_csum_alg == BULK_CSUM_ALG_NULL)
913                         return 0;
914
915                 if (vsize < sizeof(*bsdv) + csum_size) {
916                         CERROR("verifier size %d too small, require %d\n",
917                                vsize, (int) sizeof(*bsdv) + csum_size);
918                         return -EINVAL;
919                 }
920
921                 cli = (char *) (bsdr + 1);
922                 srv = (char *) (bsdv + 1);
923
924                 if (!memcmp(cli, srv, csum_size)) {
925                         /* checksum confirmed */
926                         CDEBUG(D_SEC, "bulk write checksum (%s) confirmed\n",
927                               csum_types[bsdr->bsd_csum_alg].name);
928                         return 0;
929                 }
930
931                 /* checksum mismatch, re-compute a new one and compare with
932                  * others, give out proper warnings.
933                  */
934                 OBD_ALLOC(new, csum_size);
935                 if (new == NULL)
936                         return -ENOMEM;
937
938                 do_bulk_checksum(desc, bsdr->bsd_csum_alg, new);
939
940                 if (!memcmp(new, srv, csum_size)) {
941                         CERROR("BAD WRITE CHECKSUM (%s): pages were mutated "
942                                "on the client after we checksummed them\n",
943                                csum_types[bsdr->bsd_csum_alg].name);
944                 } else if (!memcmp(new, cli, csum_size)) {
945                         CERROR("BAD WRITE CHECKSUM (%s): pages were mutated "
946                                "in transit\n",
947                                csum_types[bsdr->bsd_csum_alg].name);
948                 } else {
949                         CERROR("BAD WRITE CHECKSUM (%s): pages were mutated "
950                                "in transit, and the current page contents "
951                                "don't match the originals and what the server "
952                                "received\n",
953                                csum_types[bsdr->bsd_csum_alg].name);
954                 }
955                 OBD_FREE(new, csum_size);
956
957                 return -EINVAL;
958         }
959 }
960 EXPORT_SYMBOL(bulk_csum_cli_reply);
961
962 int bulk_csum_svc(struct ptlrpc_bulk_desc *desc, int read,
963                   struct ptlrpc_bulk_sec_desc *bsdv, int vsize,
964                   struct ptlrpc_bulk_sec_desc *bsdr, int rsize)
965 {
966         int    rc;
967
968         LASSERT(vsize >= sizeof(*bsdv));
969         LASSERT(rsize >= sizeof(*bsdr));
970         LASSERT(bsdv && bsdr);
971
972         if (read) {
973                 rc = generate_bulk_csum(desc, bsdv->bsd_csum_alg, bsdr, rsize);
974                 if (rc)
975                         CERROR("bulk read: server failed to generate %s "
976                                "checksum: %d\n",
977                                csum_types[bsdv->bsd_csum_alg].name, rc);
978         } else
979                 rc = verify_bulk_csum(desc, 0, bsdv, vsize, bsdr, rsize);
980
981         return rc;
982 }
983 EXPORT_SYMBOL(bulk_csum_svc);
984
985 /****************************************
986  * Helpers to assist policy modules to  *
987  * implement encryption funcationality  *
988  ****************************************/
989
990 /*
991  * NOTE: These algorithms must be stream cipher!
992  */
993 static struct {
994         char    *name;
995         __u32    flags;
996 } priv_types[] = {
997         [BULK_PRIV_ALG_NULL]   = { "null", 0   },
998         [BULK_PRIV_ALG_ARC4]   = { "arc4", 0   },
999 };
1000
1001 const char * sptlrpc_bulk_priv_alg2name(__u32 priv_alg)
1002 {
1003         if (priv_alg < BULK_PRIV_ALG_MAX)
1004                 return priv_types[priv_alg].name;
1005         return "unknown_priv";
1006 }
1007 EXPORT_SYMBOL(sptlrpc_bulk_priv_alg2name);
1008
1009 __u32 sptlrpc_bulk_priv_alg2flags(__u32 priv_alg)
1010 {
1011         if (priv_alg < BULK_PRIV_ALG_MAX)
1012                 return priv_types[priv_alg].flags;
1013         return 0;
1014 }
1015 EXPORT_SYMBOL(sptlrpc_bulk_priv_alg2flags);