Whamcloud - gitweb
- make HEAD from b_post_cmd3
[fs/lustre-release.git] / lustre / ptlrpc / sec_bulk.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2006 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 #define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_SEC
26
27 #include <libcfs/libcfs.h>
28 #ifndef __KERNEL__
29 #include <liblustre.h>
30 #include <libcfs/list.h>
31 #else
32 #include <linux/crypto.h>
33 #endif
34
35 #include <obd.h>
36 #include <obd_class.h>
37 #include <obd_support.h>
38 #include <lustre_net.h>
39 #include <lustre_import.h>
40 #include <lustre_dlm.h>
41 #include <lustre_sec.h>
42
43 #include "ptlrpc_internal.h"
44
45 /****************************************
46  * bulk encryption page pools           *
47  ****************************************/
48
49 #ifdef __KERNEL__
50
51 #define PTRS_PER_PAGE   (CFS_PAGE_SIZE / sizeof(void *))
52 #define PAGES_PER_POOL  (PTRS_PER_PAGE)
53
54 static struct ptlrpc_enc_page_pool {
55         /*
56          * constants
57          */
58         unsigned long    epp_max_pages;   /* maximum pages can hold, const */
59         unsigned int     epp_max_pools;   /* number of pools, const */
60         /*
61          * users of the pools. the capacity grow as more user added,
62          * but doesn't shrink when users gone -- just current policy.
63          * during failover there might be user add/remove activities.
64          */
65         atomic_t         epp_users;       /* shared by how many users (osc) */
66         atomic_t         epp_users_gone;  /* users removed */
67         /*
68          * wait queue in case of not enough free pages.
69          */
70         cfs_waitq_t      epp_waitq;       /* waiting threads */
71         unsigned int     epp_waitqlen;    /* wait queue length */
72         unsigned long    epp_pages_short; /* # of pages wanted of in-q users */
73         unsigned long    epp_adding:1,    /* during adding pages */
74                          epp_full:1;      /* pools are all full */
75         /*
76          * in-pool pages bookkeeping
77          */
78         spinlock_t       epp_lock;        /* protect following fields */
79         unsigned long    epp_total_pages; /* total pages in pools */
80         unsigned long    epp_free_pages;  /* current pages available */
81         /*
82          * statistics
83          */
84         unsigned int     epp_st_adds;
85         unsigned int     epp_st_failadds; /* # of add pages failures */
86         unsigned long    epp_st_reqs;     /* # of get_pages requests */
87         unsigned long    epp_st_missings; /* # of cache missing */
88         unsigned long    epp_st_lowfree;  /* lowest free pages ever reached */
89         unsigned long    epp_st_max_wqlen;/* highest waitqueue length ever */
90         cfs_time_t       epp_st_max_wait; /* in jeffies */
91         /*
92          * pointers to pools
93          */
94         cfs_page_t    ***epp_pools;
95 } page_pools;
96
97 int sptlrpc_proc_read_enc_pool(char *page, char **start, off_t off, int count,
98                                int *eof, void *data)
99 {
100         int     rc;
101
102         spin_lock(&page_pools.epp_lock);
103
104         rc = snprintf(page, count,
105                       "physical pages:          %lu\n"
106                       "pages per pool:          %lu\n"
107                       "max pages:               %lu\n"
108                       "max pools:               %u\n"
109                       "users:                   %d - %d\n"
110                       "current waitqueue len:   %u\n"
111                       "current pages in short:  %lu\n"
112                       "total pages:             %lu\n"
113                       "total free:              %lu\n"
114                       "add page times:          %u\n"
115                       "add page failed times:   %u\n"
116                       "total requests:          %lu\n"
117                       "cache missing:           %lu\n"
118                       "lowest free pages:       %lu\n"
119                       "max waitqueue depth:     %lu\n"
120                       "max wait time:           "CFS_TIME_T"\n"
121                       ,
122                       num_physpages,
123                       PAGES_PER_POOL,
124                       page_pools.epp_max_pages,
125                       page_pools.epp_max_pools,
126                       atomic_read(&page_pools.epp_users),
127                       atomic_read(&page_pools.epp_users_gone),
128                       page_pools.epp_waitqlen,
129                       page_pools.epp_pages_short,
130                       page_pools.epp_total_pages,
131                       page_pools.epp_free_pages,
132                       page_pools.epp_st_adds,
133                       page_pools.epp_st_failadds,
134                       page_pools.epp_st_reqs,
135                       page_pools.epp_st_missings,
136                       page_pools.epp_st_lowfree,
137                       page_pools.epp_st_max_wqlen,
138                       page_pools.epp_st_max_wait
139                      );
140
141         spin_unlock(&page_pools.epp_lock);
142         return rc;
143 }
144
145 static inline
146 int npages_to_npools(unsigned long npages)
147 {
148         return (int) ((npages + PAGES_PER_POOL - 1) / PAGES_PER_POOL);
149 }
150
151 /*
152  * return how many pages cleaned up.
153  */
154 static unsigned long enc_cleanup_pools(cfs_page_t ***pools, int npools)
155 {
156         unsigned long cleaned = 0;
157         int           i, j;
158
159         for (i = 0; i < npools; i++) {
160                 if (pools[i]) {
161                         for (j = 0; j < PAGES_PER_POOL; j++) {
162                                 if (pools[i][j]) {
163                                         cfs_free_page(pools[i][j]);
164                                         cleaned++;
165                                 }
166                         }
167                         OBD_FREE(pools[i], CFS_PAGE_SIZE);
168                         pools[i] = NULL;
169                 }
170         }
171
172         return cleaned;
173 }
174
175 /*
176  * merge @npools pointed by @pools which contains @npages new pages
177  * into current pools.
178  *
179  * we have options to avoid most memory copy with some tricks. but we choose
180  * the simplest way to avoid complexity. It's not frequently called.
181  */
182 static void enc_insert_pool(cfs_page_t ***pools, int npools, int npages)
183 {
184         int     freeslot;
185         int     op_idx, np_idx, og_idx, ng_idx;
186         int     cur_npools, end_npools;
187
188         LASSERT(npages > 0);
189         LASSERT(page_pools.epp_total_pages+npages <= page_pools.epp_max_pages);
190         LASSERT(npages_to_npools(npages) == npools);
191
192         spin_lock(&page_pools.epp_lock);
193
194         /*
195          * (1) fill all the free slots of current pools.
196          */
197         /*
198          * free slots are those left by rent pages, and the extra ones with
199          * index >= eep_total_pages, locate at the tail of last pool.
200          */
201         freeslot = page_pools.epp_total_pages % PAGES_PER_POOL;
202         if (freeslot != 0)
203                 freeslot = PAGES_PER_POOL - freeslot;
204         freeslot += page_pools.epp_total_pages - page_pools.epp_free_pages;
205
206         op_idx = page_pools.epp_free_pages / PAGES_PER_POOL;
207         og_idx = page_pools.epp_free_pages % PAGES_PER_POOL;
208         np_idx = npools - 1;
209         ng_idx = (npages - 1) % PAGES_PER_POOL;
210
211         while (freeslot) {
212                 LASSERT(page_pools.epp_pools[op_idx][og_idx] == NULL);
213                 LASSERT(pools[np_idx][ng_idx] != NULL);
214
215                 page_pools.epp_pools[op_idx][og_idx] = pools[np_idx][ng_idx];
216                 pools[np_idx][ng_idx] = NULL;
217
218                 freeslot--;
219
220                 if (++og_idx == PAGES_PER_POOL) {
221                         op_idx++;
222                         og_idx = 0;
223                 }
224                 if (--ng_idx < 0) {
225                         if (np_idx == 0)
226                                 break;
227                         np_idx--;
228                         ng_idx = PAGES_PER_POOL - 1;
229                 }
230         }
231
232         /*
233          * (2) add pools if needed.
234          */
235         cur_npools = (page_pools.epp_total_pages + PAGES_PER_POOL - 1) /
236                      PAGES_PER_POOL;
237         end_npools = (page_pools.epp_total_pages + npages + PAGES_PER_POOL -1) /
238                      PAGES_PER_POOL;
239         LASSERT(end_npools <= page_pools.epp_max_pools);
240
241         np_idx = 0;
242         while (cur_npools < end_npools) {
243                 LASSERT(page_pools.epp_pools[cur_npools] == NULL);
244                 LASSERT(np_idx < npools);
245                 LASSERT(pools[np_idx] != NULL);
246
247                 page_pools.epp_pools[cur_npools++] = pools[np_idx];
248                 pools[np_idx++] = NULL;
249         }
250
251         page_pools.epp_total_pages += npages;
252         page_pools.epp_free_pages += npages;
253         page_pools.epp_st_lowfree = page_pools.epp_free_pages;
254
255         if (page_pools.epp_total_pages == page_pools.epp_max_pages)
256                 page_pools.epp_full = 1;
257
258         CDEBUG(D_SEC, "add %d pages to total %lu\n", npages,
259                page_pools.epp_total_pages);
260
261         spin_unlock(&page_pools.epp_lock);
262 }
263
264 static int enc_pools_add_pages(int npages)
265 {
266         static DECLARE_MUTEX(sem_add_pages);
267         cfs_page_t   ***pools;
268         int             npools, alloced = 0;
269         int             i, j, rc = -ENOMEM;
270
271         down(&sem_add_pages);
272
273         if (npages > page_pools.epp_max_pages - page_pools.epp_total_pages)
274                 npages = page_pools.epp_max_pages - page_pools.epp_total_pages;
275         if (npages == 0) {
276                 rc = 0;
277                 goto out;
278         }
279
280         page_pools.epp_st_adds++;
281
282         npools = npages_to_npools(npages);
283         OBD_ALLOC(pools, npools * sizeof(*pools));
284         if (pools == NULL)
285                 goto out;
286
287         for (i = 0; i < npools; i++) {
288                 OBD_ALLOC(pools[i], CFS_PAGE_SIZE);
289                 if (pools[i] == NULL)
290                         goto out_pools;
291
292                 for (j = 0; j < PAGES_PER_POOL && alloced < npages; j++) {
293                         pools[i][j] = cfs_alloc_page(CFS_ALLOC_IO |
294                                                      CFS_ALLOC_HIGH);
295                         if (pools[i][j] == NULL)
296                                 goto out_pools;
297
298                         alloced++;
299                 }
300         }
301
302         enc_insert_pool(pools, npools, npages);
303         CDEBUG(D_SEC, "add %d pages into enc page pools\n", npages);
304         rc = 0;
305
306 out_pools:
307         enc_cleanup_pools(pools, npools);
308         OBD_FREE(pools, npools * sizeof(*pools));
309 out:
310         if (rc) {
311                 page_pools.epp_st_failadds++;
312                 CERROR("Failed to pre-allocate %d enc pages\n", npages);
313         }
314
315         up(&sem_add_pages);
316         return rc;
317 }
318
319 /*
320  * both "max bulk rpcs inflight" and "lnet MTU" are tunable, we use the
321  * default fixed value initially.
322  */
323 int sptlrpc_enc_pool_add_user(void)
324 {
325         int page_plus = PTLRPC_MAX_BRW_PAGES * OSC_MAX_RIF_DEFAULT;
326         int users, users_gone, shift, rc;
327
328         LASSERT(!in_interrupt());
329         LASSERT(atomic_read(&page_pools.epp_users) >= 0);
330
331         users_gone = atomic_dec_return(&page_pools.epp_users_gone);
332         if (users_gone >= 0) {
333                 CWARN("%d users gone, skip\n", users_gone + 1);
334                 return 0;
335         }
336         atomic_inc(&page_pools.epp_users_gone);
337
338         /*
339          * prepare full pages for first 2 users; 1/2 for next 2 users;
340          * 1/4 for next 4 users; 1/8 for next 8 users; 1/16 for next 16 users;
341          * ...
342          */
343         users = atomic_add_return(1, &page_pools.epp_users);
344         shift = fls(users - 1);
345         shift = shift > 1 ? shift - 1 : 0;
346         page_plus = page_plus >> shift;
347         page_plus = page_plus > 2 ? page_plus : 2;
348
349         rc = enc_pools_add_pages(page_plus);
350         return 0;
351 }
352 EXPORT_SYMBOL(sptlrpc_enc_pool_add_user);
353
354 int sptlrpc_enc_pool_del_user(void)
355 {
356         atomic_inc(&page_pools.epp_users_gone);
357         return 0;
358 }
359 EXPORT_SYMBOL(sptlrpc_enc_pool_del_user);
360
361 /*
362  * we allocate the requested pages atomically.
363  */
364 int sptlrpc_enc_pool_get_pages(struct ptlrpc_bulk_desc *desc)
365 {
366         cfs_waitlink_t  waitlink;
367         cfs_time_t      tick1 = 0, tick2;
368         int             p_idx, g_idx;
369         int             i;
370
371         LASSERT(desc->bd_max_iov > 0);
372         LASSERT(desc->bd_max_iov <= page_pools.epp_total_pages);
373
374         /* resent bulk, enc pages might have been allocated previously */
375         if (desc->bd_enc_pages != NULL)
376                 return 0;
377
378         OBD_ALLOC(desc->bd_enc_pages,
379                   desc->bd_max_iov * sizeof(*desc->bd_enc_pages));
380         if (desc->bd_enc_pages == NULL)
381                 return -ENOMEM;
382
383         spin_lock(&page_pools.epp_lock);
384 again:
385         page_pools.epp_st_reqs++;
386
387         if (unlikely(page_pools.epp_free_pages < desc->bd_max_iov)) {
388                 if (tick1 == 0)
389                         tick1 = cfs_time_current();
390
391                 page_pools.epp_st_missings++;
392                 page_pools.epp_pages_short += desc->bd_max_iov;
393
394                 if (++page_pools.epp_waitqlen > page_pools.epp_st_max_wqlen)
395                         page_pools.epp_st_max_wqlen = page_pools.epp_waitqlen;
396                 /*
397                  * we just wait if someone else is adding more pages, or
398                  * wait queue length is not deep enough. otherwise try to
399                  * add more pages in the pools.
400                  *
401                  * FIXME the policy of detecting resource tight & growing pool
402                  * need to be reconsidered.
403                  */
404                 if (page_pools.epp_adding || page_pools.epp_waitqlen < 2 ||
405                     page_pools.epp_full) {
406                         set_current_state(TASK_UNINTERRUPTIBLE);
407                         cfs_waitlink_init(&waitlink);
408                         cfs_waitq_add(&page_pools.epp_waitq, &waitlink);
409
410                         spin_unlock(&page_pools.epp_lock);
411                         cfs_schedule();
412                         spin_lock(&page_pools.epp_lock);
413                 } else {
414                         page_pools.epp_adding = 1;
415
416                         spin_unlock(&page_pools.epp_lock);
417                         enc_pools_add_pages(page_pools.epp_pages_short / 2);
418                         spin_lock(&page_pools.epp_lock);
419
420                         page_pools.epp_adding = 0;
421                 }
422
423                 LASSERT(page_pools.epp_pages_short >= desc->bd_max_iov);
424                 LASSERT(page_pools.epp_waitqlen > 0);
425                 page_pools.epp_pages_short -= desc->bd_max_iov;
426                 page_pools.epp_waitqlen--;
427
428                 goto again;
429         }
430         /*
431          * record max wait time
432          */
433         if (unlikely(tick1 != 0)) {
434                 tick2 = cfs_time_current();
435                 if (tick2 - tick1 > page_pools.epp_st_max_wait)
436                         page_pools.epp_st_max_wait = tick2 - tick1;
437         }
438         /*
439          * proceed with rest of allocation
440          */
441         page_pools.epp_free_pages -= desc->bd_max_iov;
442
443         p_idx = page_pools.epp_free_pages / PAGES_PER_POOL;
444         g_idx = page_pools.epp_free_pages % PAGES_PER_POOL;
445
446         for (i = 0; i < desc->bd_max_iov; i++) {
447                 LASSERT(page_pools.epp_pools[p_idx][g_idx] != NULL);
448                 desc->bd_enc_pages[i] = page_pools.epp_pools[p_idx][g_idx];
449                 page_pools.epp_pools[p_idx][g_idx] = NULL;
450
451                 if (++g_idx == PAGES_PER_POOL) {
452                         p_idx++;
453                         g_idx = 0;
454                 }
455         }
456
457         if (page_pools.epp_free_pages < page_pools.epp_st_lowfree)
458                 page_pools.epp_st_lowfree = page_pools.epp_free_pages;
459
460         spin_unlock(&page_pools.epp_lock);
461         return 0;
462 }
463 EXPORT_SYMBOL(sptlrpc_enc_pool_get_pages);
464
465 void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc)
466 {
467         int     p_idx, g_idx;
468         int     i;
469
470         if (desc->bd_enc_pages == NULL)
471                 return;
472         if (desc->bd_max_iov == 0)
473                 return;
474
475         spin_lock(&page_pools.epp_lock);
476
477         p_idx = page_pools.epp_free_pages / PAGES_PER_POOL;
478         g_idx = page_pools.epp_free_pages % PAGES_PER_POOL;
479
480         LASSERT(page_pools.epp_free_pages + desc->bd_max_iov <=
481                 page_pools.epp_total_pages);
482         LASSERT(page_pools.epp_pools[p_idx]);
483
484         for (i = 0; i < desc->bd_max_iov; i++) {
485                 LASSERT(desc->bd_enc_pages[i] != NULL);
486                 LASSERT(g_idx != 0 || page_pools.epp_pools[p_idx]);
487                 LASSERT(page_pools.epp_pools[p_idx][g_idx] == NULL);
488
489                 page_pools.epp_pools[p_idx][g_idx] = desc->bd_enc_pages[i];
490
491                 if (++g_idx == PAGES_PER_POOL) {
492                         p_idx++;
493                         g_idx = 0;
494                 }
495         }
496
497         page_pools.epp_free_pages += desc->bd_max_iov;
498
499         if (unlikely(page_pools.epp_waitqlen)) {
500                 LASSERT(page_pools.epp_waitqlen > 0);
501                 LASSERT(cfs_waitq_active(&page_pools.epp_waitq));
502                 cfs_waitq_broadcast(&page_pools.epp_waitq);
503         }
504
505         spin_unlock(&page_pools.epp_lock);
506
507         OBD_FREE(desc->bd_enc_pages,
508                  desc->bd_max_iov * sizeof(*desc->bd_enc_pages));
509         desc->bd_enc_pages = NULL;
510 }
511 EXPORT_SYMBOL(sptlrpc_enc_pool_put_pages);
512
513 int sptlrpc_enc_pool_init(void)
514 {
515         /* constants */
516         page_pools.epp_max_pages = num_physpages / 4;
517         page_pools.epp_max_pools = npages_to_npools(page_pools.epp_max_pages);
518
519         atomic_set(&page_pools.epp_users, 0);
520         atomic_set(&page_pools.epp_users_gone, 0);
521
522         cfs_waitq_init(&page_pools.epp_waitq);
523         page_pools.epp_waitqlen = 0;
524         page_pools.epp_pages_short = 0;
525
526         page_pools.epp_adding = 0;
527         page_pools.epp_full = 0;
528
529         spin_lock_init(&page_pools.epp_lock);
530         page_pools.epp_total_pages = 0;
531         page_pools.epp_free_pages = 0;
532
533         page_pools.epp_st_adds = 0;
534         page_pools.epp_st_failadds = 0;
535         page_pools.epp_st_reqs = 0;
536         page_pools.epp_st_missings = 0;
537         page_pools.epp_st_lowfree = 0;
538         page_pools.epp_st_max_wqlen = 0;
539         page_pools.epp_st_max_wait = 0;
540
541         OBD_ALLOC(page_pools.epp_pools,
542                   page_pools.epp_max_pools * sizeof(*page_pools.epp_pools));
543         if (page_pools.epp_pools == NULL)
544                 return -ENOMEM;
545
546         return 0;
547 }
548
549 void sptlrpc_enc_pool_fini(void)
550 {
551         unsigned long cleaned, npools;
552
553         LASSERT(page_pools.epp_pools);
554         LASSERT(page_pools.epp_total_pages == page_pools.epp_free_pages);
555
556         npools = npages_to_npools(page_pools.epp_total_pages);
557         cleaned = enc_cleanup_pools(page_pools.epp_pools, npools);
558         LASSERT(cleaned == page_pools.epp_total_pages);
559
560         OBD_FREE(page_pools.epp_pools,
561                  page_pools.epp_max_pools * sizeof(*page_pools.epp_pools));
562 }
563
564 #else /* !__KERNEL__ */
565
566 int sptlrpc_enc_pool_get_pages(struct ptlrpc_bulk_desc *desc)
567 {
568         return 0;
569 }
570
571 void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc)
572 {
573 }
574
575 int sptlrpc_enc_pool_init(void)
576 {
577         return 0;
578 }
579
580 void sptlrpc_enc_pool_fini(void)
581 {
582 }
583 #endif
584
585 /****************************************
586  * Helpers to assist policy modules to  *
587  * implement checksum funcationality    *
588  ****************************************/
589
590 static struct {
591         char    *name;
592         int      size;
593 } csum_types[] = {
594         [BULK_CSUM_ALG_NULL]    = { "null",     0 },
595         [BULK_CSUM_ALG_CRC32]   = { "crc32",    4 },
596         [BULK_CSUM_ALG_MD5]     = { "md5",     16 },
597         [BULK_CSUM_ALG_SHA1]    = { "sha1",    20 },
598         [BULK_CSUM_ALG_SHA256]  = { "sha256",  32 },
599         [BULK_CSUM_ALG_SHA384]  = { "sha384",  48 },
600         [BULK_CSUM_ALG_SHA512]  = { "sha512",  64 },
601 };
602
603 const char * sptlrpc_bulk_csum_alg2name(__u32 csum_alg)
604 {
605         if (csum_alg < BULK_CSUM_ALG_MAX)
606                 return csum_types[csum_alg].name;
607         return "unknown_cksum";
608 }
609 EXPORT_SYMBOL(sptlrpc_bulk_csum_alg2name);
610
611 int bulk_sec_desc_size(__u32 csum_alg, int request, int read)
612 {
613         int size = sizeof(struct ptlrpc_bulk_sec_desc);
614
615         LASSERT(csum_alg < BULK_CSUM_ALG_MAX);
616
617         /* read request don't need extra data */
618         if (!(read && request))
619                 size += csum_types[csum_alg].size;
620
621         return size;
622 }
623 EXPORT_SYMBOL(bulk_sec_desc_size);
624
625 int bulk_sec_desc_unpack(struct lustre_msg *msg, int offset)
626 {
627         struct ptlrpc_bulk_sec_desc *bsd;
628         int    size = msg->lm_buflens[offset];
629
630         bsd = lustre_msg_buf(msg, offset, sizeof(*bsd));
631         if (bsd == NULL) {
632                 CERROR("Invalid bulk sec desc: size %d\n", size);
633                 return -EINVAL;
634         }
635
636         if (lustre_msg_swabbed(msg)) {
637                 __swab32s(&bsd->bsd_version);
638                 __swab32s(&bsd->bsd_pad);
639                 __swab32s(&bsd->bsd_csum_alg);
640                 __swab32s(&bsd->bsd_priv_alg);
641         }
642
643         if (bsd->bsd_version != 0) {
644                 CERROR("Unexpected version %u\n", bsd->bsd_version);
645                 return -EPROTO;
646         }
647
648         if (bsd->bsd_csum_alg >= BULK_CSUM_ALG_MAX) {
649                 CERROR("Unsupported checksum algorithm %u\n",
650                        bsd->bsd_csum_alg);
651                 return -EINVAL;
652         }
653         if (bsd->bsd_priv_alg >= BULK_PRIV_ALG_MAX) {
654                 CERROR("Unsupported cipher algorithm %u\n",
655                        bsd->bsd_priv_alg);
656                 return -EINVAL;
657         }
658
659         if (size > sizeof(*bsd) &&
660             size < sizeof(*bsd) + csum_types[bsd->bsd_csum_alg].size) {
661                 CERROR("Mal-formed checksum data: csum alg %u, size %d\n",
662                        bsd->bsd_csum_alg, size);
663                 return -EINVAL;
664         }
665
666         return 0;
667 }
668 EXPORT_SYMBOL(bulk_sec_desc_unpack);
669
670 #ifdef __KERNEL__
671 static
672 int do_bulk_checksum_crc32(struct ptlrpc_bulk_desc *desc, void *buf)
673 {
674         struct page *page;
675         int off;
676         char *ptr;
677         __u32 crc32 = ~0;
678         int len, i;
679
680         for (i = 0; i < desc->bd_iov_count; i++) {
681                 page = desc->bd_iov[i].kiov_page;
682                 off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
683                 ptr = cfs_kmap(page) + off;
684                 len = desc->bd_iov[i].kiov_len;
685
686                 crc32 = crc32_le(crc32, ptr, len);
687
688                 cfs_kunmap(page);
689         }
690
691         *((__u32 *) buf) = crc32;
692         return 0;
693 }
694
695 static
696 int do_bulk_checksum(struct ptlrpc_bulk_desc *desc, __u32 alg, void *buf)
697 {
698         struct crypto_tfm *tfm;
699         struct scatterlist *sl;
700         int i, rc = 0;
701
702         LASSERT(alg > BULK_CSUM_ALG_NULL &&
703                 alg < BULK_CSUM_ALG_MAX);
704
705         if (alg == BULK_CSUM_ALG_CRC32)
706                 return do_bulk_checksum_crc32(desc, buf);
707
708         tfm = crypto_alloc_tfm(csum_types[alg].name, 0);
709         if (tfm == NULL) {
710                 CERROR("Unable to allocate tfm %s\n", csum_types[alg].name);
711                 return -ENOMEM;
712         }
713
714         OBD_ALLOC(sl, sizeof(*sl) * desc->bd_iov_count);
715         if (sl == NULL) {
716                 rc = -ENOMEM;
717                 goto out_tfm;
718         }
719
720         for (i = 0; i < desc->bd_iov_count; i++) {
721                 sl[i].page = desc->bd_iov[i].kiov_page;
722                 sl[i].offset = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
723                 sl[i].length = desc->bd_iov[i].kiov_len;
724         }
725
726         crypto_digest_init(tfm);
727         crypto_digest_update(tfm, sl, desc->bd_iov_count);
728         crypto_digest_final(tfm, buf);
729
730         OBD_FREE(sl, sizeof(*sl) * desc->bd_iov_count);
731
732 out_tfm:
733         crypto_free_tfm(tfm);
734         return rc;
735 }
736                          
737 #else /* !__KERNEL__ */
738 static
739 int do_bulk_checksum(struct ptlrpc_bulk_desc *desc, __u32 alg, void *buf)
740 {
741         __u32 crc32 = ~0;
742         int i;
743
744         LASSERT(alg == BULK_CSUM_ALG_CRC32);
745
746         for (i = 0; i < desc->bd_iov_count; i++) {
747                 char *ptr = desc->bd_iov[i].iov_base;
748                 int len = desc->bd_iov[i].iov_len;
749
750                 crc32 = crc32_le(crc32, ptr, len);
751         }
752
753         *((__u32 *) buf) = crc32;
754         return 0;
755 }
756 #endif
757
758 /*
759  * perform algorithm @alg checksum on @desc, store result in @buf.
760  * if anything goes wrong, leave 'alg' be BULK_CSUM_ALG_NULL.
761  */
762 static
763 int generate_bulk_csum(struct ptlrpc_bulk_desc *desc, __u32 alg,
764                        struct ptlrpc_bulk_sec_desc *bsd, int bsdsize)
765 {
766         int rc;
767
768         LASSERT(bsd);
769         LASSERT(alg < BULK_CSUM_ALG_MAX);
770
771         bsd->bsd_csum_alg = BULK_CSUM_ALG_NULL;
772
773         if (alg == BULK_CSUM_ALG_NULL)
774                 return 0;
775
776         LASSERT(bsdsize >= sizeof(*bsd) + csum_types[alg].size);
777
778         rc = do_bulk_checksum(desc, alg, bsd->bsd_csum);
779         if (rc == 0)
780                 bsd->bsd_csum_alg = alg;
781
782         return rc;
783 }
784
785 static
786 int verify_bulk_csum(struct ptlrpc_bulk_desc *desc, int read,
787                      struct ptlrpc_bulk_sec_desc *bsdv, int bsdvsize,
788                      struct ptlrpc_bulk_sec_desc *bsdr, int bsdrsize)
789 {
790         char *csum_p;
791         char *buf = NULL;
792         int   csum_size, rc = 0;
793
794         LASSERT(bsdv);
795         LASSERT(bsdv->bsd_csum_alg < BULK_CSUM_ALG_MAX);
796
797         if (bsdr)
798                 bsdr->bsd_csum_alg = BULK_CSUM_ALG_NULL;
799
800         if (bsdv->bsd_csum_alg == BULK_CSUM_ALG_NULL)
801                 return 0;
802
803         /* for all supported algorithms */
804         csum_size = csum_types[bsdv->bsd_csum_alg].size;
805
806         if (bsdvsize < sizeof(*bsdv) + csum_size) {
807                 CERROR("verifier size %d too small, require %d\n",
808                        bsdvsize, (int) sizeof(*bsdv) + csum_size);
809                 return -EINVAL;
810         }
811
812         if (bsdr) {
813                 LASSERT(bsdrsize >= sizeof(*bsdr) + csum_size);
814                 csum_p = (char *) bsdr->bsd_csum;
815         } else {
816                 OBD_ALLOC(buf, csum_size);
817                 if (buf == NULL)
818                         return -EINVAL;
819                 csum_p = buf;
820         }
821
822         rc = do_bulk_checksum(desc, bsdv->bsd_csum_alg, csum_p);
823
824         if (memcmp(bsdv->bsd_csum, csum_p, csum_size)) {
825                 CERROR("BAD %s CHECKSUM (%s), data mutated during "
826                        "transfer!\n", read ? "READ" : "WRITE",
827                        csum_types[bsdv->bsd_csum_alg].name);
828                 rc = -EINVAL;
829         } else {
830                 CDEBUG(D_SEC, "bulk %s checksum (%s) verified\n",
831                       read ? "read" : "write",
832                       csum_types[bsdv->bsd_csum_alg].name);
833         }
834
835         if (bsdr) {
836                 bsdr->bsd_csum_alg = bsdv->bsd_csum_alg;
837                 memcpy(bsdr->bsd_csum, csum_p, csum_size);
838         } else {
839                 LASSERT(buf);
840                 OBD_FREE(buf, csum_size);
841         }
842
843         return rc;
844 }
845
846 int bulk_csum_cli_request(struct ptlrpc_bulk_desc *desc, int read,
847                           __u32 alg, struct lustre_msg *rmsg, int roff)
848 {
849         struct ptlrpc_bulk_sec_desc *bsdr;
850         int    rsize, rc = 0;
851
852         rsize = rmsg->lm_buflens[roff];
853         bsdr = lustre_msg_buf(rmsg, roff, sizeof(*bsdr));
854
855         LASSERT(bsdr);
856         LASSERT(rsize >= sizeof(*bsdr));
857         LASSERT(alg < BULK_CSUM_ALG_MAX);
858
859         if (read)
860                 bsdr->bsd_csum_alg = alg;
861         else {
862                 rc = generate_bulk_csum(desc, alg, bsdr, rsize);
863                 if (rc) {
864                         CERROR("client bulk write: failed to perform "
865                                "checksum: %d\n", rc);
866                 }
867         }
868
869         return rc;
870 }
871 EXPORT_SYMBOL(bulk_csum_cli_request);
872
873 int bulk_csum_cli_reply(struct ptlrpc_bulk_desc *desc, int read,
874                         struct lustre_msg *rmsg, int roff,
875                         struct lustre_msg *vmsg, int voff)
876 {
877         struct ptlrpc_bulk_sec_desc *bsdv, *bsdr;
878         int    rsize, vsize;
879
880         rsize = rmsg->lm_buflens[roff];
881         vsize = vmsg->lm_buflens[voff];
882         bsdr = lustre_msg_buf(rmsg, roff, 0);
883         bsdv = lustre_msg_buf(vmsg, voff, 0);
884
885         if (bsdv == NULL || vsize < sizeof(*bsdv)) {
886                 CERROR("Invalid checksum verifier from server: size %d\n",
887                        vsize);
888                 return -EINVAL;
889         }
890
891         LASSERT(bsdr);
892         LASSERT(rsize >= sizeof(*bsdr));
893         LASSERT(vsize >= sizeof(*bsdv));
894
895         if (bsdr->bsd_csum_alg != bsdv->bsd_csum_alg) {
896                 CERROR("bulk %s: checksum algorithm mismatch: client request "
897                        "%s but server reply with %s. try to use the new one "
898                        "for checksum verification\n",
899                        read ? "read" : "write",
900                        csum_types[bsdr->bsd_csum_alg].name,
901                        csum_types[bsdv->bsd_csum_alg].name);
902         }
903
904         if (read)
905                 return verify_bulk_csum(desc, 1, bsdv, vsize, NULL, 0);
906         else {
907                 char *cli, *srv, *new = NULL;
908                 int csum_size = csum_types[bsdr->bsd_csum_alg].size;
909
910                 LASSERT(bsdr->bsd_csum_alg < BULK_CSUM_ALG_MAX);
911                 if (bsdr->bsd_csum_alg == BULK_CSUM_ALG_NULL)
912                         return 0;
913
914                 if (vsize < sizeof(*bsdv) + csum_size) {
915                         CERROR("verifier size %d too small, require %d\n",
916                                vsize, (int) sizeof(*bsdv) + csum_size);
917                         return -EINVAL;
918                 }
919
920                 cli = (char *) (bsdr + 1);
921                 srv = (char *) (bsdv + 1);
922
923                 if (!memcmp(cli, srv, csum_size)) {
924                         /* checksum confirmed */
925                         CDEBUG(D_SEC, "bulk write checksum (%s) confirmed\n",
926                               csum_types[bsdr->bsd_csum_alg].name);
927                         return 0;
928                 }
929
930                 /* checksum mismatch, re-compute a new one and compare with
931                  * others, give out proper warnings.
932                  */
933                 OBD_ALLOC(new, csum_size);
934                 if (new == NULL)
935                         return -ENOMEM;
936
937                 do_bulk_checksum(desc, bsdr->bsd_csum_alg, new);
938
939                 if (!memcmp(new, srv, csum_size)) {
940                         CERROR("BAD WRITE CHECKSUM (%s): pages were mutated "
941                                "on the client after we checksummed them\n",
942                                csum_types[bsdr->bsd_csum_alg].name);
943                 } else if (!memcmp(new, cli, csum_size)) {
944                         CERROR("BAD WRITE CHECKSUM (%s): pages were mutated "
945                                "in transit\n",
946                                csum_types[bsdr->bsd_csum_alg].name);
947                 } else {
948                         CERROR("BAD WRITE CHECKSUM (%s): pages were mutated "
949                                "in transit, and the current page contents "
950                                "don't match the originals and what the server "
951                                "received\n",
952                                csum_types[bsdr->bsd_csum_alg].name);
953                 }
954                 OBD_FREE(new, csum_size);
955
956                 return -EINVAL;
957         }
958 }
959 EXPORT_SYMBOL(bulk_csum_cli_reply);
960
961 int bulk_csum_svc(struct ptlrpc_bulk_desc *desc, int read,
962                   struct lustre_msg *vmsg, int voff,
963                   struct lustre_msg *rmsg, int roff)
964 {
965         struct ptlrpc_bulk_sec_desc *bsdv, *bsdr;
966         int    vsize, rsize, rc;
967
968         vsize = vmsg->lm_buflens[voff];
969         rsize = rmsg->lm_buflens[roff];
970         bsdv = lustre_msg_buf(vmsg, voff, 0);
971         bsdr = lustre_msg_buf(rmsg, roff, 0);
972
973         LASSERT(vsize >= sizeof(*bsdv));
974         LASSERT(rsize >= sizeof(*bsdr));
975         LASSERT(bsdv && bsdr);
976
977         if (read) {
978                 rc = generate_bulk_csum(desc, bsdv->bsd_csum_alg, bsdr, rsize);
979                 if (rc)
980                         CERROR("bulk read: server failed to generate %s "
981                                "checksum: %d\n",
982                                csum_types[bsdv->bsd_csum_alg].name, rc);
983         } else
984                 rc = verify_bulk_csum(desc, 0, bsdv, vsize, bsdr, rsize);
985
986         return rc;
987 }
988 EXPORT_SYMBOL(bulk_csum_svc);
989
990 /****************************************
991  * Helpers to assist policy modules to  *
992  * implement encryption funcationality  *
993  ****************************************/
994
995 /*
996  * NOTE: These algorithms must be stream cipher!
997  */
998 static struct {
999         char    *name;
1000         __u32    flags;
1001 } priv_types[] = {
1002         [BULK_PRIV_ALG_NULL]   = { "null", 0   },
1003         [BULK_PRIV_ALG_ARC4]   = { "arc4", 0   },
1004 };
1005
1006 const char * sptlrpc_bulk_priv_alg2name(__u32 priv_alg)
1007 {
1008         if (priv_alg < BULK_PRIV_ALG_MAX)
1009                 return priv_types[priv_alg].name;
1010         return "unknown_priv";
1011 }
1012 EXPORT_SYMBOL(sptlrpc_bulk_priv_alg2name);