Whamcloud - gitweb
828a1619d730045e766d8af0c78d43518dc74e24
[fs/lustre-release.git] / lustre / ptlrpc / sec.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2014, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ptlrpc/sec.c
37  *
38  * Author: Eric Mei <ericm@clusterfs.com>
39  */
40
41 #define DEBUG_SUBSYSTEM S_SEC
42
43 #include <linux/user_namespace.h>
44 #ifdef HAVE_UIDGID_HEADER
45 # include <linux/uidgid.h>
46 #endif
47 #include <linux/crypto.h>
48 #include <linux/key.h>
49
50 #include <libcfs/libcfs.h>
51 #include <obd.h>
52 #include <obd_class.h>
53 #include <obd_support.h>
54 #include <lustre_net.h>
55 #include <lustre_import.h>
56 #include <lustre_dlm.h>
57 #include <lustre_sec.h>
58
59 #include "ptlrpc_internal.h"
60
61 /***********************************************
62  * policy registers                            *
63  ***********************************************/
64
65 static rwlock_t policy_lock;
66 static struct ptlrpc_sec_policy *policies[SPTLRPC_POLICY_MAX] = {
67         NULL,
68 };
69
70 int sptlrpc_register_policy(struct ptlrpc_sec_policy *policy)
71 {
72         __u16 number = policy->sp_policy;
73
74         LASSERT(policy->sp_name);
75         LASSERT(policy->sp_cops);
76         LASSERT(policy->sp_sops);
77
78         if (number >= SPTLRPC_POLICY_MAX)
79                 return -EINVAL;
80
81         write_lock(&policy_lock);
82         if (unlikely(policies[number])) {
83                 write_unlock(&policy_lock);
84                 return -EALREADY;
85         }
86         policies[number] = policy;
87         write_unlock(&policy_lock);
88
89         CDEBUG(D_SEC, "%s: registered\n", policy->sp_name);
90         return 0;
91 }
92 EXPORT_SYMBOL(sptlrpc_register_policy);
93
94 int sptlrpc_unregister_policy(struct ptlrpc_sec_policy *policy)
95 {
96         __u16 number = policy->sp_policy;
97
98         LASSERT(number < SPTLRPC_POLICY_MAX);
99
100         write_lock(&policy_lock);
101         if (unlikely(policies[number] == NULL)) {
102                 write_unlock(&policy_lock);
103                 CERROR("%s: already unregistered\n", policy->sp_name);
104                 return -EINVAL;
105         }
106
107         LASSERT(policies[number] == policy);
108         policies[number] = NULL;
109         write_unlock(&policy_lock);
110
111         CDEBUG(D_SEC, "%s: unregistered\n", policy->sp_name);
112         return 0;
113 }
114 EXPORT_SYMBOL(sptlrpc_unregister_policy);
115
116 static
117 struct ptlrpc_sec_policy * sptlrpc_wireflavor2policy(__u32 flavor)
118 {
119         static DEFINE_MUTEX(load_mutex);
120         static atomic_t           loaded = ATOMIC_INIT(0);
121         struct ptlrpc_sec_policy *policy;
122         __u16                     number = SPTLRPC_FLVR_POLICY(flavor);
123         __u16                     flag = 0;
124
125         if (number >= SPTLRPC_POLICY_MAX)
126                 return NULL;
127
128         while (1) {
129                 read_lock(&policy_lock);
130                 policy = policies[number];
131                 if (policy && !try_module_get(policy->sp_owner))
132                         policy = NULL;
133                 if (policy == NULL)
134                         flag = atomic_read(&loaded);
135                 read_unlock(&policy_lock);
136
137                 if (policy != NULL || flag != 0 ||
138                     number != SPTLRPC_POLICY_GSS)
139                         break;
140
141                 /* try to load gss module, once */
142                 mutex_lock(&load_mutex);
143                 if (atomic_read(&loaded) == 0) {
144                         if (request_module("ptlrpc_gss") == 0)
145                                 CDEBUG(D_SEC,
146                                        "module ptlrpc_gss loaded on demand\n");
147                         else
148                                 CERROR("Unable to load module ptlrpc_gss\n");
149
150                         atomic_set(&loaded, 1);
151                 }
152                 mutex_unlock(&load_mutex);
153         }
154
155         return policy;
156 }
157
158 __u32 sptlrpc_name2flavor_base(const char *name)
159 {
160         if (!strcmp(name, "null"))
161                 return SPTLRPC_FLVR_NULL;
162         if (!strcmp(name, "plain"))
163                 return SPTLRPC_FLVR_PLAIN;
164         if (!strcmp(name, "gssnull"))
165                 return SPTLRPC_FLVR_GSSNULL;
166         if (!strcmp(name, "krb5n"))
167                 return SPTLRPC_FLVR_KRB5N;
168         if (!strcmp(name, "krb5a"))
169                 return SPTLRPC_FLVR_KRB5A;
170         if (!strcmp(name, "krb5i"))
171                 return SPTLRPC_FLVR_KRB5I;
172         if (!strcmp(name, "krb5p"))
173                 return SPTLRPC_FLVR_KRB5P;
174         if (!strcmp(name, "ski"))
175                 return SPTLRPC_FLVR_SKI;
176         if (!strcmp(name, "skpi"))
177                 return SPTLRPC_FLVR_SKPI;
178
179         return SPTLRPC_FLVR_INVALID;
180 }
181 EXPORT_SYMBOL(sptlrpc_name2flavor_base);
182
183 const char *sptlrpc_flavor2name_base(__u32 flvr)
184 {
185         __u32   base = SPTLRPC_FLVR_BASE(flvr);
186
187         if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_NULL))
188                 return "null";
189         else if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_PLAIN))
190                 return "plain";
191         else if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_GSSNULL))
192                 return "gssnull";
193         else if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_KRB5N))
194                 return "krb5n";
195         else if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_KRB5A))
196                 return "krb5a";
197         else if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_KRB5I))
198                 return "krb5i";
199         else if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_KRB5P))
200                 return "krb5p";
201         else if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_SKI))
202                 return "ski";
203         else if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_SKPI))
204                 return "skpi";
205
206         CERROR("invalid wire flavor 0x%x\n", flvr);
207         return "invalid";
208 }
209 EXPORT_SYMBOL(sptlrpc_flavor2name_base);
210
211 char *sptlrpc_flavor2name_bulk(struct sptlrpc_flavor *sf,
212                                char *buf, int bufsize)
213 {
214         if (SPTLRPC_FLVR_POLICY(sf->sf_rpc) == SPTLRPC_POLICY_PLAIN)
215                 snprintf(buf, bufsize, "hash:%s",
216                          sptlrpc_get_hash_name(sf->u_bulk.hash.hash_alg));
217         else
218                 snprintf(buf, bufsize, "%s",
219                          sptlrpc_flavor2name_base(sf->sf_rpc));
220
221         buf[bufsize - 1] = '\0';
222         return buf;
223 }
224 EXPORT_SYMBOL(sptlrpc_flavor2name_bulk);
225
226 char *sptlrpc_flavor2name(struct sptlrpc_flavor *sf, char *buf, int bufsize)
227 {
228         snprintf(buf, bufsize, "%s", sptlrpc_flavor2name_base(sf->sf_rpc));
229
230         /*
231          * currently we don't support customized bulk specification for
232          * flavors other than plain
233          */
234         if (SPTLRPC_FLVR_POLICY(sf->sf_rpc) == SPTLRPC_POLICY_PLAIN) {
235                 char bspec[16];
236
237                 bspec[0] = '-';
238                 sptlrpc_flavor2name_bulk(sf, &bspec[1], sizeof(bspec) - 1);
239                 strncat(buf, bspec, bufsize);
240         }
241
242         buf[bufsize - 1] = '\0';
243         return buf;
244 }
245 EXPORT_SYMBOL(sptlrpc_flavor2name);
246
247 char *sptlrpc_secflags2str(__u32 flags, char *buf, int bufsize)
248 {
249         buf[0] = '\0';
250
251         if (flags & PTLRPC_SEC_FL_REVERSE)
252                 strlcat(buf, "reverse,", bufsize);
253         if (flags & PTLRPC_SEC_FL_ROOTONLY)
254                 strlcat(buf, "rootonly,", bufsize);
255         if (flags & PTLRPC_SEC_FL_UDESC)
256                 strlcat(buf, "udesc,", bufsize);
257         if (flags & PTLRPC_SEC_FL_BULK)
258                 strlcat(buf, "bulk,", bufsize);
259         if (buf[0] == '\0')
260                 strlcat(buf, "-,", bufsize);
261
262         return buf;
263 }
264 EXPORT_SYMBOL(sptlrpc_secflags2str);
265
266 /**************************************************
267  * client context APIs                            *
268  **************************************************/
269
270 static
271 struct ptlrpc_cli_ctx *get_my_ctx(struct ptlrpc_sec *sec)
272 {
273         struct vfs_cred vcred;
274         int create = 1, remove_dead = 1;
275
276         LASSERT(sec);
277         LASSERT(sec->ps_policy->sp_cops->lookup_ctx);
278
279         if (sec->ps_flvr.sf_flags & (PTLRPC_SEC_FL_REVERSE |
280                                      PTLRPC_SEC_FL_ROOTONLY)) {
281                 vcred.vc_uid = 0;
282                 vcred.vc_gid = 0;
283                 if (sec->ps_flvr.sf_flags & PTLRPC_SEC_FL_REVERSE) {
284                         create = 0;
285                         remove_dead = 0;
286                 }
287         } else {
288                 vcred.vc_uid = from_kuid(&init_user_ns, current_uid());
289                 vcred.vc_gid = from_kgid(&init_user_ns, current_gid());
290         }
291
292         return sec->ps_policy->sp_cops->lookup_ctx(sec, &vcred, create,
293                                                    remove_dead);
294 }
295
296 struct ptlrpc_cli_ctx *sptlrpc_cli_ctx_get(struct ptlrpc_cli_ctx *ctx)
297 {
298         atomic_inc(&ctx->cc_refcount);
299         return ctx;
300 }
301 EXPORT_SYMBOL(sptlrpc_cli_ctx_get);
302
303 void sptlrpc_cli_ctx_put(struct ptlrpc_cli_ctx *ctx, int sync)
304 {
305         struct ptlrpc_sec *sec = ctx->cc_sec;
306
307         LASSERT(sec);
308         LASSERT_ATOMIC_POS(&ctx->cc_refcount);
309
310         if (!atomic_dec_and_test(&ctx->cc_refcount))
311                 return;
312
313         sec->ps_policy->sp_cops->release_ctx(sec, ctx, sync);
314 }
315 EXPORT_SYMBOL(sptlrpc_cli_ctx_put);
316
317 /**
318  * Expire the client context immediately.
319  *
320  * \pre Caller must hold at least 1 reference on the \a ctx.
321  */
322 void sptlrpc_cli_ctx_expire(struct ptlrpc_cli_ctx *ctx)
323 {
324         LASSERT(ctx->cc_ops->die);
325         ctx->cc_ops->die(ctx, 0);
326 }
327 EXPORT_SYMBOL(sptlrpc_cli_ctx_expire);
328
329 /**
330  * To wake up the threads who are waiting for this client context. Called
331  * after some status change happened on \a ctx.
332  */
333 void sptlrpc_cli_ctx_wakeup(struct ptlrpc_cli_ctx *ctx)
334 {
335         struct ptlrpc_request *req, *next;
336
337         spin_lock(&ctx->cc_lock);
338         list_for_each_entry_safe(req, next, &ctx->cc_req_list,
339                                      rq_ctx_chain) {
340                 list_del_init(&req->rq_ctx_chain);
341                 ptlrpc_client_wake_req(req);
342         }
343         spin_unlock(&ctx->cc_lock);
344 }
345 EXPORT_SYMBOL(sptlrpc_cli_ctx_wakeup);
346
347 int sptlrpc_cli_ctx_display(struct ptlrpc_cli_ctx *ctx, char *buf, int bufsize)
348 {
349         LASSERT(ctx->cc_ops);
350
351         if (ctx->cc_ops->display == NULL)
352                 return 0;
353
354         return ctx->cc_ops->display(ctx, buf, bufsize);
355 }
356
357 static int import_sec_check_expire(struct obd_import *imp)
358 {
359         int     adapt = 0;
360
361         spin_lock(&imp->imp_lock);
362         if (imp->imp_sec_expire &&
363             imp->imp_sec_expire < cfs_time_current_sec()) {
364                 adapt = 1;
365                 imp->imp_sec_expire = 0;
366         }
367         spin_unlock(&imp->imp_lock);
368
369         if (!adapt)
370                 return 0;
371
372         CDEBUG(D_SEC, "found delayed sec adapt expired, do it now\n");
373         return sptlrpc_import_sec_adapt(imp, NULL, NULL);
374 }
375
376 /**
377  * Get and validate the client side ptlrpc security facilities from
378  * \a imp. There is a race condition on client reconnect when the import is
379  * being destroyed while there are outstanding client bound requests. In
380  * this case do not output any error messages if import secuity is not
381  * found.
382  *
383  * \param[in] imp obd import associated with client
384  * \param[out] sec client side ptlrpc security
385  *
386  * \retval 0 if security retrieved successfully
387  * \retval -ve errno if there was a problem
388  */
389 static int import_sec_validate_get(struct obd_import *imp,
390                                    struct ptlrpc_sec **sec)
391 {
392         int     rc;
393
394         if (unlikely(imp->imp_sec_expire)) {
395                 rc = import_sec_check_expire(imp);
396                 if (rc)
397                         return rc;
398         }
399
400         *sec = sptlrpc_import_sec_ref(imp);
401         /* Only output an error when the import is still active */
402         if (*sec == NULL) {
403                 if (list_empty(&imp->imp_zombie_chain))
404                         CERROR("import %p (%s) with no sec\n",
405                                 imp, ptlrpc_import_state_name(imp->imp_state));
406                 return -EACCES;
407         }
408
409         if (unlikely((*sec)->ps_dying)) {
410                 CERROR("attempt to use dying sec %p\n", sec);
411                 sptlrpc_sec_put(*sec);
412                 return -EACCES;
413         }
414
415         return 0;
416 }
417
418 /**
419  * Given a \a req, find or allocate an appropriate context for it.
420  * \pre req->rq_cli_ctx == NULL.
421  *
422  * \retval 0 succeed, and req->rq_cli_ctx is set.
423  * \retval -ev error number, and req->rq_cli_ctx == NULL.
424  */
425 int sptlrpc_req_get_ctx(struct ptlrpc_request *req)
426 {
427         struct obd_import *imp = req->rq_import;
428         struct ptlrpc_sec *sec;
429         int                rc;
430         ENTRY;
431
432         LASSERT(!req->rq_cli_ctx);
433         LASSERT(imp);
434
435         rc = import_sec_validate_get(imp, &sec);
436         if (rc)
437                 RETURN(rc);
438
439         req->rq_cli_ctx = get_my_ctx(sec);
440
441         sptlrpc_sec_put(sec);
442
443         if (!req->rq_cli_ctx) {
444                 CERROR("req %p: fail to get context\n", req);
445                 RETURN(-ECONNREFUSED);
446         }
447
448         RETURN(0);
449 }
450
451 /**
452  * Drop the context for \a req.
453  * \pre req->rq_cli_ctx != NULL.
454  * \post req->rq_cli_ctx == NULL.
455  *
456  * If \a sync == 0, this function should return quickly without sleep;
457  * otherwise it might trigger and wait for the whole process of sending
458  * an context-destroying rpc to server.
459  */
460 void sptlrpc_req_put_ctx(struct ptlrpc_request *req, int sync)
461 {
462         ENTRY;
463
464         LASSERT(req);
465         LASSERT(req->rq_cli_ctx);
466
467         /* request might be asked to release earlier while still
468          * in the context waiting list.
469          */
470         if (!list_empty(&req->rq_ctx_chain)) {
471                 spin_lock(&req->rq_cli_ctx->cc_lock);
472                 list_del_init(&req->rq_ctx_chain);
473                 spin_unlock(&req->rq_cli_ctx->cc_lock);
474         }
475
476         sptlrpc_cli_ctx_put(req->rq_cli_ctx, sync);
477         req->rq_cli_ctx = NULL;
478         EXIT;
479 }
480
481 static
482 int sptlrpc_req_ctx_switch(struct ptlrpc_request *req,
483                            struct ptlrpc_cli_ctx *oldctx,
484                            struct ptlrpc_cli_ctx *newctx)
485 {
486         struct sptlrpc_flavor   old_flvr;
487         char                   *reqmsg = NULL; /* to workaround old gcc */
488         int                     reqmsg_size;
489         int                     rc = 0;
490
491         LASSERT(req->rq_reqmsg);
492         LASSERT(req->rq_reqlen);
493         LASSERT(req->rq_replen);
494
495         CDEBUG(D_SEC, "req %p: switch ctx %p(%u->%s) -> %p(%u->%s), "
496                "switch sec %p(%s) -> %p(%s)\n", req,
497                oldctx, oldctx->cc_vcred.vc_uid, sec2target_str(oldctx->cc_sec),
498                newctx, newctx->cc_vcred.vc_uid, sec2target_str(newctx->cc_sec),
499                oldctx->cc_sec, oldctx->cc_sec->ps_policy->sp_name,
500                newctx->cc_sec, newctx->cc_sec->ps_policy->sp_name);
501
502         /* save flavor */
503         old_flvr = req->rq_flvr;
504
505         /* save request message */
506         reqmsg_size = req->rq_reqlen;
507         if (reqmsg_size != 0) {
508                 OBD_ALLOC_LARGE(reqmsg, reqmsg_size);
509                 if (reqmsg == NULL)
510                         return -ENOMEM;
511                 memcpy(reqmsg, req->rq_reqmsg, reqmsg_size);
512         }
513
514         /* release old req/rep buf */
515         req->rq_cli_ctx = oldctx;
516         sptlrpc_cli_free_reqbuf(req);
517         sptlrpc_cli_free_repbuf(req);
518         req->rq_cli_ctx = newctx;
519
520         /* recalculate the flavor */
521         sptlrpc_req_set_flavor(req, 0);
522
523         /* alloc new request buffer
524          * we don't need to alloc reply buffer here, leave it to the
525          * rest procedure of ptlrpc */
526         if (reqmsg_size != 0) {
527                 rc = sptlrpc_cli_alloc_reqbuf(req, reqmsg_size);
528                 if (!rc) {
529                         LASSERT(req->rq_reqmsg);
530                         memcpy(req->rq_reqmsg, reqmsg, reqmsg_size);
531                 } else {
532                         CWARN("failed to alloc reqbuf: %d\n", rc);
533                         req->rq_flvr = old_flvr;
534                 }
535
536                 OBD_FREE_LARGE(reqmsg, reqmsg_size);
537         }
538         return rc;
539 }
540
541 /**
542  * If current context of \a req is dead somehow, e.g. we just switched flavor
543  * thus marked original contexts dead, we'll find a new context for it. if
544  * no switch is needed, \a req will end up with the same context.
545  *
546  * \note a request must have a context, to keep other parts of code happy.
547  * In any case of failure during the switching, we must restore the old one.
548  */
549 int sptlrpc_req_replace_dead_ctx(struct ptlrpc_request *req)
550 {
551         struct ptlrpc_cli_ctx *oldctx = req->rq_cli_ctx;
552         struct ptlrpc_cli_ctx *newctx;
553         int                    rc;
554         ENTRY;
555
556         LASSERT(oldctx);
557
558         sptlrpc_cli_ctx_get(oldctx);
559         sptlrpc_req_put_ctx(req, 0);
560
561         rc = sptlrpc_req_get_ctx(req);
562         if (unlikely(rc)) {
563                 LASSERT(!req->rq_cli_ctx);
564
565                 /* restore old ctx */
566                 req->rq_cli_ctx = oldctx;
567                 RETURN(rc);
568         }
569
570         newctx = req->rq_cli_ctx;
571         LASSERT(newctx);
572
573         if (unlikely(newctx == oldctx &&
574                      test_bit(PTLRPC_CTX_DEAD_BIT, &oldctx->cc_flags))) {
575                 /*
576                  * still get the old dead ctx, usually means system too busy
577                  */
578                 CDEBUG(D_SEC,
579                        "ctx (%p, fl %lx) doesn't switch, relax a little bit\n",
580                        newctx, newctx->cc_flags);
581
582                 set_current_state(TASK_INTERRUPTIBLE);
583                 schedule_timeout(msecs_to_jiffies(MSEC_PER_SEC));
584         } else if (unlikely(test_bit(PTLRPC_CTX_UPTODATE_BIT, &newctx->cc_flags)
585                             == 0)) {
586                 /*
587                  * new ctx not up to date yet
588                  */
589                 CDEBUG(D_SEC,
590                        "ctx (%p, fl %lx) doesn't switch, not up to date yet\n",
591                        newctx, newctx->cc_flags);
592         } else {
593                 /*
594                  * it's possible newctx == oldctx if we're switching
595                  * subflavor with the same sec.
596                  */
597                 rc = sptlrpc_req_ctx_switch(req, oldctx, newctx);
598                 if (rc) {
599                         /* restore old ctx */
600                         sptlrpc_req_put_ctx(req, 0);
601                         req->rq_cli_ctx = oldctx;
602                         RETURN(rc);
603                 }
604
605                 LASSERT(req->rq_cli_ctx == newctx);
606         }
607
608         sptlrpc_cli_ctx_put(oldctx, 1);
609         RETURN(0);
610 }
611 EXPORT_SYMBOL(sptlrpc_req_replace_dead_ctx);
612
613 static
614 int ctx_check_refresh(struct ptlrpc_cli_ctx *ctx)
615 {
616         if (cli_ctx_is_refreshed(ctx))
617                 return 1;
618         return 0;
619 }
620
621 static
622 int ctx_refresh_timeout(void *data)
623 {
624         struct ptlrpc_request *req = data;
625         int rc;
626
627         /* conn_cnt is needed in expire_one_request */
628         lustre_msg_set_conn_cnt(req->rq_reqmsg, req->rq_import->imp_conn_cnt);
629
630         rc = ptlrpc_expire_one_request(req, 1);
631         /* if we started recovery, we should mark this ctx dead; otherwise
632          * in case of lgssd died nobody would retire this ctx, following
633          * connecting will still find the same ctx thus cause deadlock.
634          * there's an assumption that expire time of the request should be
635          * later than the context refresh expire time.
636          */
637         if (rc == 0)
638                 req->rq_cli_ctx->cc_ops->die(req->rq_cli_ctx, 0);
639         return rc;
640 }
641
642 static
643 void ctx_refresh_interrupt(void *data)
644 {
645         struct ptlrpc_request *req = data;
646
647         spin_lock(&req->rq_lock);
648         req->rq_intr = 1;
649         spin_unlock(&req->rq_lock);
650 }
651
652 static
653 void req_off_ctx_list(struct ptlrpc_request *req, struct ptlrpc_cli_ctx *ctx)
654 {
655         spin_lock(&ctx->cc_lock);
656         if (!list_empty(&req->rq_ctx_chain))
657                 list_del_init(&req->rq_ctx_chain);
658         spin_unlock(&ctx->cc_lock);
659 }
660
661 /**
662  * To refresh the context of \req, if it's not up-to-date.
663  * \param timeout
664  * - < 0: don't wait
665  * - = 0: wait until success or fatal error occur
666  * - > 0: timeout value (in seconds)
667  *
668  * The status of the context could be subject to be changed by other threads
669  * at any time. We allow this race, but once we return with 0, the caller will
670  * suppose it's uptodated and keep using it until the owning rpc is done.
671  *
672  * \retval 0 only if the context is uptodated.
673  * \retval -ev error number.
674  */
675 int sptlrpc_req_refresh_ctx(struct ptlrpc_request *req, long timeout)
676 {
677         struct ptlrpc_cli_ctx  *ctx = req->rq_cli_ctx;
678         struct ptlrpc_sec      *sec;
679         struct l_wait_info      lwi;
680         int                     rc;
681         ENTRY;
682
683         LASSERT(ctx);
684
685         if (req->rq_ctx_init || req->rq_ctx_fini)
686                 RETURN(0);
687
688         /*
689          * during the process a request's context might change type even
690          * (e.g. from gss ctx to null ctx), so each loop we need to re-check
691          * everything
692          */
693 again:
694         rc = import_sec_validate_get(req->rq_import, &sec);
695         if (rc)
696                 RETURN(rc);
697
698         if (sec->ps_flvr.sf_rpc != req->rq_flvr.sf_rpc) {
699                 CDEBUG(D_SEC, "req %p: flavor has changed %x -> %x\n",
700                       req, req->rq_flvr.sf_rpc, sec->ps_flvr.sf_rpc);
701                 req_off_ctx_list(req, ctx);
702                 sptlrpc_req_replace_dead_ctx(req);
703                 ctx = req->rq_cli_ctx;
704         }
705         sptlrpc_sec_put(sec);
706
707         if (cli_ctx_is_eternal(ctx))
708                 RETURN(0);
709
710         if (unlikely(test_bit(PTLRPC_CTX_NEW_BIT, &ctx->cc_flags))) {
711                 LASSERT(ctx->cc_ops->refresh);
712                 ctx->cc_ops->refresh(ctx);
713         }
714         LASSERT(test_bit(PTLRPC_CTX_NEW_BIT, &ctx->cc_flags) == 0);
715
716         LASSERT(ctx->cc_ops->validate);
717         if (ctx->cc_ops->validate(ctx) == 0) {
718                 req_off_ctx_list(req, ctx);
719                 RETURN(0);
720         }
721
722         if (unlikely(test_bit(PTLRPC_CTX_ERROR_BIT, &ctx->cc_flags))) {
723                 spin_lock(&req->rq_lock);
724                 req->rq_err = 1;
725                 spin_unlock(&req->rq_lock);
726                 req_off_ctx_list(req, ctx);
727                 RETURN(-EPERM);
728         }
729
730         /*
731          * There's a subtle issue for resending RPCs, suppose following
732          * situation:
733          *  1. the request was sent to server.
734          *  2. recovery was kicked start, after finished the request was
735          *     marked as resent.
736          *  3. resend the request.
737          *  4. old reply from server received, we accept and verify the reply.
738          *     this has to be success, otherwise the error will be aware
739          *     by application.
740          *  5. new reply from server received, dropped by LNet.
741          *
742          * Note the xid of old & new request is the same. We can't simply
743          * change xid for the resent request because the server replies on
744          * it for reply reconstruction.
745          *
746          * Commonly the original context should be uptodate because we
747          * have an expiry nice time; server will keep its context because
748          * we at least hold a ref of old context which prevent context
749          * from destroying RPC being sent. So server still can accept the
750          * request and finish the RPC. But if that's not the case:
751          *  1. If server side context has been trimmed, a NO_CONTEXT will
752          *     be returned, gss_cli_ctx_verify/unseal will switch to new
753          *     context by force.
754          *  2. Current context never be refreshed, then we are fine: we
755          *     never really send request with old context before.
756          */
757         if (test_bit(PTLRPC_CTX_UPTODATE_BIT, &ctx->cc_flags) &&
758             unlikely(req->rq_reqmsg) &&
759             lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
760                 req_off_ctx_list(req, ctx);
761                 RETURN(0);
762         }
763
764         if (unlikely(test_bit(PTLRPC_CTX_DEAD_BIT, &ctx->cc_flags))) {
765                 req_off_ctx_list(req, ctx);
766                 /*
767                  * don't switch ctx if import was deactivated
768                  */
769                 if (req->rq_import->imp_deactive) {
770                         spin_lock(&req->rq_lock);
771                         req->rq_err = 1;
772                         spin_unlock(&req->rq_lock);
773                         RETURN(-EINTR);
774                 }
775
776                 rc = sptlrpc_req_replace_dead_ctx(req);
777                 if (rc) {
778                         LASSERT(ctx == req->rq_cli_ctx);
779                         CERROR("req %p: failed to replace dead ctx %p: %d\n",
780                                req, ctx, rc);
781                         spin_lock(&req->rq_lock);
782                         req->rq_err = 1;
783                         spin_unlock(&req->rq_lock);
784                         RETURN(rc);
785                 }
786
787                 ctx = req->rq_cli_ctx;
788                 goto again;
789         }
790
791         /*
792          * Now we're sure this context is during upcall, add myself into
793          * waiting list
794          */
795         spin_lock(&ctx->cc_lock);
796         if (list_empty(&req->rq_ctx_chain))
797                 list_add(&req->rq_ctx_chain, &ctx->cc_req_list);
798         spin_unlock(&ctx->cc_lock);
799
800         if (timeout < 0)
801                 RETURN(-EWOULDBLOCK);
802
803         /* Clear any flags that may be present from previous sends */
804         LASSERT(req->rq_receiving_reply == 0);
805         spin_lock(&req->rq_lock);
806         req->rq_err = 0;
807         req->rq_timedout = 0;
808         req->rq_resend = 0;
809         req->rq_restart = 0;
810         spin_unlock(&req->rq_lock);
811
812         lwi = LWI_TIMEOUT_INTR(msecs_to_jiffies(timeout * MSEC_PER_SEC),
813                                ctx_refresh_timeout,
814                                ctx_refresh_interrupt, req);
815         rc = l_wait_event(req->rq_reply_waitq, ctx_check_refresh(ctx), &lwi);
816
817         /*
818          * following cases could lead us here:
819          * - successfully refreshed;
820          * - interrupted;
821          * - timedout, and we don't want recover from the failure;
822          * - timedout, and waked up upon recovery finished;
823          * - someone else mark this ctx dead by force;
824          * - someone invalidate the req and call ptlrpc_client_wake_req(),
825          *   e.g. ptlrpc_abort_inflight();
826          */
827         if (!cli_ctx_is_refreshed(ctx)) {
828                 /* timed out or interruptted */
829                 req_off_ctx_list(req, ctx);
830
831                 LASSERT(rc != 0);
832                 RETURN(rc);
833         }
834
835         goto again;
836 }
837
838 /**
839  * Initialize flavor settings for \a req, according to \a opcode.
840  *
841  * \note this could be called in two situations:
842  * - new request from ptlrpc_pre_req(), with proper @opcode
843  * - old request which changed ctx in the middle, with @opcode == 0
844  */
845 void sptlrpc_req_set_flavor(struct ptlrpc_request *req, int opcode)
846 {
847         struct ptlrpc_sec *sec;
848
849         LASSERT(req->rq_import);
850         LASSERT(req->rq_cli_ctx);
851         LASSERT(req->rq_cli_ctx->cc_sec);
852         LASSERT(req->rq_bulk_read == 0 || req->rq_bulk_write == 0);
853
854         /* special security flags according to opcode */
855         switch (opcode) {
856         case OST_READ:
857         case MDS_READPAGE:
858         case MGS_CONFIG_READ:
859         case OBD_IDX_READ:
860                 req->rq_bulk_read = 1;
861                 break;
862         case OST_WRITE:
863         case MDS_WRITEPAGE:
864                 req->rq_bulk_write = 1;
865                 break;
866         case SEC_CTX_INIT:
867                 req->rq_ctx_init = 1;
868                 break;
869         case SEC_CTX_FINI:
870                 req->rq_ctx_fini = 1;
871                 break;
872         case 0:
873                 /* init/fini rpc won't be resend, so can't be here */
874                 LASSERT(req->rq_ctx_init == 0);
875                 LASSERT(req->rq_ctx_fini == 0);
876
877                 /* cleanup flags, which should be recalculated */
878                 req->rq_pack_udesc = 0;
879                 req->rq_pack_bulk = 0;
880                 break;
881         }
882
883         sec = req->rq_cli_ctx->cc_sec;
884
885         spin_lock(&sec->ps_lock);
886         req->rq_flvr = sec->ps_flvr;
887         spin_unlock(&sec->ps_lock);
888
889         /* force SVC_NULL for context initiation rpc, SVC_INTG for context
890          * destruction rpc */
891         if (unlikely(req->rq_ctx_init))
892                 flvr_set_svc(&req->rq_flvr.sf_rpc, SPTLRPC_SVC_NULL);
893         else if (unlikely(req->rq_ctx_fini))
894                 flvr_set_svc(&req->rq_flvr.sf_rpc, SPTLRPC_SVC_INTG);
895
896         /* user descriptor flag, null security can't do it anyway */
897         if ((sec->ps_flvr.sf_flags & PTLRPC_SEC_FL_UDESC) &&
898             (req->rq_flvr.sf_rpc != SPTLRPC_FLVR_NULL))
899                 req->rq_pack_udesc = 1;
900
901         /* bulk security flag */
902         if ((req->rq_bulk_read || req->rq_bulk_write) &&
903             sptlrpc_flavor_has_bulk(&req->rq_flvr))
904                 req->rq_pack_bulk = 1;
905 }
906
907 void sptlrpc_request_out_callback(struct ptlrpc_request *req)
908 {
909         if (SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc) != SPTLRPC_SVC_PRIV)
910                 return;
911
912         LASSERT(req->rq_clrbuf);
913         if (req->rq_pool || !req->rq_reqbuf)
914                 return;
915
916         OBD_FREE(req->rq_reqbuf, req->rq_reqbuf_len);
917         req->rq_reqbuf = NULL;
918         req->rq_reqbuf_len = 0;
919 }
920
921 /**
922  * Given an import \a imp, check whether current user has a valid context
923  * or not. We may create a new context and try to refresh it, and try
924  * repeatedly try in case of non-fatal errors. Return 0 means success.
925  */
926 int sptlrpc_import_check_ctx(struct obd_import *imp)
927 {
928         struct ptlrpc_sec     *sec;
929         struct ptlrpc_cli_ctx *ctx;
930         struct ptlrpc_request *req = NULL;
931         int rc;
932         ENTRY;
933
934         might_sleep();
935
936         sec = sptlrpc_import_sec_ref(imp);
937         ctx = get_my_ctx(sec);
938         sptlrpc_sec_put(sec);
939
940         if (!ctx)
941                 RETURN(-ENOMEM);
942
943         if (cli_ctx_is_eternal(ctx) ||
944             ctx->cc_ops->validate(ctx) == 0) {
945                 sptlrpc_cli_ctx_put(ctx, 1);
946                 RETURN(0);
947         }
948
949         if (cli_ctx_is_error(ctx)) {
950                 sptlrpc_cli_ctx_put(ctx, 1);
951                 RETURN(-EACCES);
952         }
953
954         req = ptlrpc_request_cache_alloc(GFP_NOFS);
955         if (!req)
956                 RETURN(-ENOMEM);
957
958         ptlrpc_cli_req_init(req);
959         atomic_set(&req->rq_refcount, 10000);
960
961         req->rq_import = imp;
962         req->rq_flvr = sec->ps_flvr;
963         req->rq_cli_ctx = ctx;
964
965         rc = sptlrpc_req_refresh_ctx(req, 0);
966         LASSERT(list_empty(&req->rq_ctx_chain));
967         sptlrpc_cli_ctx_put(req->rq_cli_ctx, 1);
968         ptlrpc_request_cache_free(req);
969
970         RETURN(rc);
971 }
972
973 /**
974  * Used by ptlrpc client, to perform the pre-defined security transformation
975  * upon the request message of \a req. After this function called,
976  * req->rq_reqmsg is still accessible as clear text.
977  */
978 int sptlrpc_cli_wrap_request(struct ptlrpc_request *req)
979 {
980         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
981         int rc = 0;
982         ENTRY;
983
984         LASSERT(ctx);
985         LASSERT(ctx->cc_sec);
986         LASSERT(req->rq_reqbuf || req->rq_clrbuf);
987
988         /* we wrap bulk request here because now we can be sure
989          * the context is uptodate.
990          */
991         if (req->rq_bulk) {
992                 rc = sptlrpc_cli_wrap_bulk(req, req->rq_bulk);
993                 if (rc)
994                         RETURN(rc);
995         }
996
997         switch (SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
998         case SPTLRPC_SVC_NULL:
999         case SPTLRPC_SVC_AUTH:
1000         case SPTLRPC_SVC_INTG:
1001                 LASSERT(ctx->cc_ops->sign);
1002                 rc = ctx->cc_ops->sign(ctx, req);
1003                 break;
1004         case SPTLRPC_SVC_PRIV:
1005                 LASSERT(ctx->cc_ops->seal);
1006                 rc = ctx->cc_ops->seal(ctx, req);
1007                 break;
1008         default:
1009                 LBUG();
1010         }
1011
1012         if (rc == 0) {
1013                 LASSERT(req->rq_reqdata_len);
1014                 LASSERT(req->rq_reqdata_len % 8 == 0);
1015                 LASSERT(req->rq_reqdata_len <= req->rq_reqbuf_len);
1016         }
1017
1018         RETURN(rc);
1019 }
1020
1021 static int do_cli_unwrap_reply(struct ptlrpc_request *req)
1022 {
1023         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
1024         int                    rc;
1025         ENTRY;
1026
1027         LASSERT(ctx);
1028         LASSERT(ctx->cc_sec);
1029         LASSERT(req->rq_repbuf);
1030         LASSERT(req->rq_repdata);
1031         LASSERT(req->rq_repmsg == NULL);
1032
1033         req->rq_rep_swab_mask = 0;
1034
1035         rc = __lustre_unpack_msg(req->rq_repdata, req->rq_repdata_len);
1036         switch (rc) {
1037         case 1:
1038                 lustre_set_rep_swabbed(req, MSG_PTLRPC_HEADER_OFF);
1039         case 0:
1040                 break;
1041         default:
1042                 CERROR("failed unpack reply: x"LPU64"\n", req->rq_xid);
1043                 RETURN(-EPROTO);
1044         }
1045
1046         if (req->rq_repdata_len < sizeof(struct lustre_msg)) {
1047                 CERROR("replied data length %d too small\n",
1048                        req->rq_repdata_len);
1049                 RETURN(-EPROTO);
1050         }
1051
1052         if (SPTLRPC_FLVR_POLICY(req->rq_repdata->lm_secflvr) !=
1053             SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc)) {
1054                 CERROR("reply policy %u doesn't match request policy %u\n",
1055                        SPTLRPC_FLVR_POLICY(req->rq_repdata->lm_secflvr),
1056                        SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc));
1057                 RETURN(-EPROTO);
1058         }
1059
1060         switch (SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
1061         case SPTLRPC_SVC_NULL:
1062         case SPTLRPC_SVC_AUTH:
1063         case SPTLRPC_SVC_INTG:
1064                 LASSERT(ctx->cc_ops->verify);
1065                 rc = ctx->cc_ops->verify(ctx, req);
1066                 break;
1067         case SPTLRPC_SVC_PRIV:
1068                 LASSERT(ctx->cc_ops->unseal);
1069                 rc = ctx->cc_ops->unseal(ctx, req);
1070                 break;
1071         default:
1072                 LBUG();
1073         }
1074         LASSERT(rc || req->rq_repmsg || req->rq_resend);
1075
1076         if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL &&
1077             !req->rq_ctx_init)
1078                 req->rq_rep_swab_mask = 0;
1079         RETURN(rc);
1080 }
1081
1082 /**
1083  * Used by ptlrpc client, to perform security transformation upon the reply
1084  * message of \a req. After return successfully, req->rq_repmsg points to
1085  * the reply message in clear text.
1086  *
1087  * \pre the reply buffer should have been un-posted from LNet, so nothing is
1088  * going to change.
1089  */
1090 int sptlrpc_cli_unwrap_reply(struct ptlrpc_request *req)
1091 {
1092         LASSERT(req->rq_repbuf);
1093         LASSERT(req->rq_repdata == NULL);
1094         LASSERT(req->rq_repmsg == NULL);
1095         LASSERT(req->rq_reply_off + req->rq_nob_received <= req->rq_repbuf_len);
1096
1097         if (req->rq_reply_off == 0 &&
1098             (lustre_msghdr_get_flags(req->rq_reqmsg) & MSGHDR_AT_SUPPORT)) {
1099                 CERROR("real reply with offset 0\n");
1100                 return -EPROTO;
1101         }
1102
1103         if (req->rq_reply_off % 8 != 0) {
1104                 CERROR("reply at odd offset %u\n", req->rq_reply_off);
1105                 return -EPROTO;
1106         }
1107
1108         req->rq_repdata = (struct lustre_msg *)
1109                                 (req->rq_repbuf + req->rq_reply_off);
1110         req->rq_repdata_len = req->rq_nob_received;
1111
1112         return do_cli_unwrap_reply(req);
1113 }
1114
1115 /**
1116  * Used by ptlrpc client, to perform security transformation upon the early
1117  * reply message of \a req. We expect the rq_reply_off is 0, and
1118  * rq_nob_received is the early reply size.
1119  * 
1120  * Because the receive buffer might be still posted, the reply data might be
1121  * changed at any time, no matter we're holding rq_lock or not. For this reason
1122  * we allocate a separate ptlrpc_request and reply buffer for early reply
1123  * processing.
1124  *
1125  * \retval 0 success, \a req_ret is filled with a duplicated ptlrpc_request.
1126  * Later the caller must call sptlrpc_cli_finish_early_reply() on the returned
1127  * \a *req_ret to release it.
1128  * \retval -ev error number, and \a req_ret will not be set.
1129  */
1130 int sptlrpc_cli_unwrap_early_reply(struct ptlrpc_request *req,
1131                                    struct ptlrpc_request **req_ret)
1132 {
1133         struct ptlrpc_request  *early_req;
1134         char                   *early_buf;
1135         int                     early_bufsz, early_size;
1136         int                     rc;
1137         ENTRY;
1138
1139         early_req = ptlrpc_request_cache_alloc(GFP_NOFS);
1140         if (early_req == NULL)
1141                 RETURN(-ENOMEM);
1142
1143         ptlrpc_cli_req_init(early_req);
1144
1145         early_size = req->rq_nob_received;
1146         early_bufsz = size_roundup_power2(early_size);
1147         OBD_ALLOC_LARGE(early_buf, early_bufsz);
1148         if (early_buf == NULL)
1149                 GOTO(err_req, rc = -ENOMEM);
1150
1151         /* sanity checkings and copy data out, do it inside spinlock */
1152         spin_lock(&req->rq_lock);
1153
1154         if (req->rq_replied) {
1155                 spin_unlock(&req->rq_lock);
1156                 GOTO(err_buf, rc = -EALREADY);
1157         }
1158
1159         LASSERT(req->rq_repbuf);
1160         LASSERT(req->rq_repdata == NULL);
1161         LASSERT(req->rq_repmsg == NULL);
1162
1163         if (req->rq_reply_off != 0) {
1164                 CERROR("early reply with offset %u\n", req->rq_reply_off);
1165                 spin_unlock(&req->rq_lock);
1166                 GOTO(err_buf, rc = -EPROTO);
1167         }
1168
1169         if (req->rq_nob_received != early_size) {
1170                 /* even another early arrived the size should be the same */
1171                 CERROR("data size has changed from %u to %u\n",
1172                        early_size, req->rq_nob_received);
1173                 spin_unlock(&req->rq_lock);
1174                 GOTO(err_buf, rc = -EINVAL);
1175         }
1176
1177         if (req->rq_nob_received < sizeof(struct lustre_msg)) {
1178                 CERROR("early reply length %d too small\n",
1179                        req->rq_nob_received);
1180                 spin_unlock(&req->rq_lock);
1181                 GOTO(err_buf, rc = -EALREADY);
1182         }
1183
1184         memcpy(early_buf, req->rq_repbuf, early_size);
1185         spin_unlock(&req->rq_lock);
1186
1187         early_req->rq_cli_ctx = sptlrpc_cli_ctx_get(req->rq_cli_ctx);
1188         early_req->rq_flvr = req->rq_flvr;
1189         early_req->rq_repbuf = early_buf;
1190         early_req->rq_repbuf_len = early_bufsz;
1191         early_req->rq_repdata = (struct lustre_msg *) early_buf;
1192         early_req->rq_repdata_len = early_size;
1193         early_req->rq_early = 1;
1194         early_req->rq_reqmsg = req->rq_reqmsg;
1195
1196         rc = do_cli_unwrap_reply(early_req);
1197         if (rc) {
1198                 DEBUG_REQ(D_ADAPTTO, early_req,
1199                           "error %d unwrap early reply", rc);
1200                 GOTO(err_ctx, rc);
1201         }
1202
1203         LASSERT(early_req->rq_repmsg);
1204         *req_ret = early_req;
1205         RETURN(0);
1206
1207 err_ctx:
1208         sptlrpc_cli_ctx_put(early_req->rq_cli_ctx, 1);
1209 err_buf:
1210         OBD_FREE_LARGE(early_buf, early_bufsz);
1211 err_req:
1212         ptlrpc_request_cache_free(early_req);
1213         RETURN(rc);
1214 }
1215
1216 /**
1217  * Used by ptlrpc client, to release a processed early reply \a early_req.
1218  *
1219  * \pre \a early_req was obtained from calling sptlrpc_cli_unwrap_early_reply().
1220  */
1221 void sptlrpc_cli_finish_early_reply(struct ptlrpc_request *early_req)
1222 {
1223         LASSERT(early_req->rq_repbuf);
1224         LASSERT(early_req->rq_repdata);
1225         LASSERT(early_req->rq_repmsg);
1226
1227         sptlrpc_cli_ctx_put(early_req->rq_cli_ctx, 1);
1228         OBD_FREE_LARGE(early_req->rq_repbuf, early_req->rq_repbuf_len);
1229         ptlrpc_request_cache_free(early_req);
1230 }
1231
1232 /**************************************************
1233  * sec ID                                         *
1234  **************************************************/
1235
1236 /*
1237  * "fixed" sec (e.g. null) use sec_id < 0
1238  */
1239 static atomic_t sptlrpc_sec_id = ATOMIC_INIT(1);
1240
1241 int sptlrpc_get_next_secid(void)
1242 {
1243         return atomic_inc_return(&sptlrpc_sec_id);
1244 }
1245 EXPORT_SYMBOL(sptlrpc_get_next_secid);
1246
1247 /**************************************************
1248  * client side high-level security APIs           *
1249  **************************************************/
1250
1251 static int sec_cop_flush_ctx_cache(struct ptlrpc_sec *sec, uid_t uid,
1252                                    int grace, int force)
1253 {
1254         struct ptlrpc_sec_policy *policy = sec->ps_policy;
1255
1256         LASSERT(policy->sp_cops);
1257         LASSERT(policy->sp_cops->flush_ctx_cache);
1258
1259         return policy->sp_cops->flush_ctx_cache(sec, uid, grace, force);
1260 }
1261
1262 static void sec_cop_destroy_sec(struct ptlrpc_sec *sec)
1263 {
1264         struct ptlrpc_sec_policy *policy = sec->ps_policy;
1265
1266         LASSERT_ATOMIC_ZERO(&sec->ps_refcount);
1267         LASSERT_ATOMIC_ZERO(&sec->ps_nctx);
1268         LASSERT(policy->sp_cops->destroy_sec);
1269
1270         CDEBUG(D_SEC, "%s@%p: being destroied\n", sec->ps_policy->sp_name, sec);
1271
1272         policy->sp_cops->destroy_sec(sec);
1273         sptlrpc_policy_put(policy);
1274 }
1275
1276 void sptlrpc_sec_destroy(struct ptlrpc_sec *sec)
1277 {
1278         sec_cop_destroy_sec(sec);
1279 }
1280 EXPORT_SYMBOL(sptlrpc_sec_destroy);
1281
1282 static void sptlrpc_sec_kill(struct ptlrpc_sec *sec)
1283 {
1284         LASSERT_ATOMIC_POS(&sec->ps_refcount);
1285
1286         if (sec->ps_policy->sp_cops->kill_sec) {
1287                 sec->ps_policy->sp_cops->kill_sec(sec);
1288
1289                 sec_cop_flush_ctx_cache(sec, -1, 1, 1);
1290         }
1291 }
1292
1293 struct ptlrpc_sec *sptlrpc_sec_get(struct ptlrpc_sec *sec)
1294 {
1295         if (sec)
1296                 atomic_inc(&sec->ps_refcount);
1297
1298         return sec;
1299 }
1300 EXPORT_SYMBOL(sptlrpc_sec_get);
1301
1302 void sptlrpc_sec_put(struct ptlrpc_sec *sec)
1303 {
1304         if (sec) {
1305                 LASSERT_ATOMIC_POS(&sec->ps_refcount);
1306
1307                 if (atomic_dec_and_test(&sec->ps_refcount)) {
1308                         sptlrpc_gc_del_sec(sec);
1309                         sec_cop_destroy_sec(sec);
1310                 }
1311         }
1312 }
1313 EXPORT_SYMBOL(sptlrpc_sec_put);
1314
1315 /*
1316  * policy module is responsible for taking refrence of import
1317  */
1318 static
1319 struct ptlrpc_sec * sptlrpc_sec_create(struct obd_import *imp,
1320                                        struct ptlrpc_svc_ctx *svc_ctx,
1321                                        struct sptlrpc_flavor *sf,
1322                                        enum lustre_sec_part sp)
1323 {
1324         struct ptlrpc_sec_policy *policy;
1325         struct ptlrpc_sec        *sec;
1326         char                      str[32];
1327         ENTRY;
1328
1329         if (svc_ctx) {
1330                 LASSERT(imp->imp_dlm_fake == 1);
1331
1332                 CDEBUG(D_SEC, "%s %s: reverse sec using flavor %s\n",
1333                        imp->imp_obd->obd_type->typ_name,
1334                        imp->imp_obd->obd_name,
1335                        sptlrpc_flavor2name(sf, str, sizeof(str)));
1336
1337                 policy = sptlrpc_policy_get(svc_ctx->sc_policy);
1338                 sf->sf_flags |= PTLRPC_SEC_FL_REVERSE | PTLRPC_SEC_FL_ROOTONLY;
1339         } else {
1340                 LASSERT(imp->imp_dlm_fake == 0);
1341
1342                 CDEBUG(D_SEC, "%s %s: select security flavor %s\n",
1343                        imp->imp_obd->obd_type->typ_name,
1344                        imp->imp_obd->obd_name,
1345                        sptlrpc_flavor2name(sf, str, sizeof(str)));
1346
1347                 policy = sptlrpc_wireflavor2policy(sf->sf_rpc);
1348                 if (!policy) {
1349                         CERROR("invalid flavor 0x%x\n", sf->sf_rpc);
1350                         RETURN(NULL);
1351                 }
1352         }
1353
1354         sec = policy->sp_cops->create_sec(imp, svc_ctx, sf);
1355         if (sec) {
1356                 atomic_inc(&sec->ps_refcount);
1357
1358                 sec->ps_part = sp;
1359
1360                 if (sec->ps_gc_interval && policy->sp_cops->gc_ctx)
1361                         sptlrpc_gc_add_sec(sec);
1362         } else {
1363                 sptlrpc_policy_put(policy);
1364         }
1365
1366         RETURN(sec);
1367 }
1368
1369 struct ptlrpc_sec *sptlrpc_import_sec_ref(struct obd_import *imp)
1370 {
1371         struct ptlrpc_sec *sec;
1372
1373         spin_lock(&imp->imp_lock);
1374         sec = sptlrpc_sec_get(imp->imp_sec);
1375         spin_unlock(&imp->imp_lock);
1376
1377         return sec;
1378 }
1379 EXPORT_SYMBOL(sptlrpc_import_sec_ref);
1380
1381 static void sptlrpc_import_sec_install(struct obd_import *imp,
1382                                        struct ptlrpc_sec *sec)
1383 {
1384         struct ptlrpc_sec *old_sec;
1385
1386         LASSERT_ATOMIC_POS(&sec->ps_refcount);
1387
1388         spin_lock(&imp->imp_lock);
1389         old_sec = imp->imp_sec;
1390         imp->imp_sec = sec;
1391         spin_unlock(&imp->imp_lock);
1392
1393         if (old_sec) {
1394                 sptlrpc_sec_kill(old_sec);
1395
1396                 /* balance the ref taken by this import */
1397                 sptlrpc_sec_put(old_sec);
1398         }
1399 }
1400
1401 static inline
1402 int flavor_equal(struct sptlrpc_flavor *sf1, struct sptlrpc_flavor *sf2)
1403 {
1404         return (memcmp(sf1, sf2, sizeof(*sf1)) == 0);
1405 }
1406
1407 static inline
1408 void flavor_copy(struct sptlrpc_flavor *dst, struct sptlrpc_flavor *src)
1409 {
1410         *dst = *src;
1411 }
1412
1413 /**
1414  * To get an appropriate ptlrpc_sec for the \a imp, according to the current
1415  * configuration. Upon called, imp->imp_sec may or may not be NULL.
1416  *
1417  *  - regular import: \a svc_ctx should be NULL and \a flvr is ignored;
1418  *  - reverse import: \a svc_ctx and \a flvr are obtained from incoming request.
1419  */
1420 int sptlrpc_import_sec_adapt(struct obd_import *imp,
1421                              struct ptlrpc_svc_ctx *svc_ctx,
1422                              struct sptlrpc_flavor *flvr)
1423 {
1424         struct ptlrpc_connection   *conn;
1425         struct sptlrpc_flavor       sf;
1426         struct ptlrpc_sec          *sec, *newsec;
1427         enum lustre_sec_part        sp;
1428         char                        str[24];
1429         int                         rc = 0;
1430         ENTRY;
1431
1432         might_sleep();
1433
1434         if (imp == NULL)
1435                 RETURN(0);
1436
1437         conn = imp->imp_connection;
1438
1439         if (svc_ctx == NULL) {
1440                 struct client_obd *cliobd = &imp->imp_obd->u.cli;
1441                 /*
1442                  * normal import, determine flavor from rule set, except
1443                  * for mgc the flavor is predetermined.
1444                  */
1445                 if (cliobd->cl_sp_me == LUSTRE_SP_MGC)
1446                         sf = cliobd->cl_flvr_mgc;
1447                 else 
1448                         sptlrpc_conf_choose_flavor(cliobd->cl_sp_me,
1449                                                    cliobd->cl_sp_to,
1450                                                    &cliobd->cl_target_uuid,
1451                                                    conn->c_self, &sf);
1452
1453                 sp = imp->imp_obd->u.cli.cl_sp_me;
1454         } else {
1455                 /* reverse import, determine flavor from incoming reqeust */
1456                 sf = *flvr;
1457
1458                 if (sf.sf_rpc != SPTLRPC_FLVR_NULL)
1459                         sf.sf_flags = PTLRPC_SEC_FL_REVERSE |
1460                                       PTLRPC_SEC_FL_ROOTONLY;
1461
1462                 sp = sptlrpc_target_sec_part(imp->imp_obd);
1463         }
1464
1465         sec = sptlrpc_import_sec_ref(imp);
1466         if (sec) {
1467                 char    str2[24];
1468
1469                 if (flavor_equal(&sf, &sec->ps_flvr))
1470                         GOTO(out, rc);
1471
1472                 CDEBUG(D_SEC, "import %s->%s: changing flavor %s -> %s\n",
1473                        imp->imp_obd->obd_name,
1474                        obd_uuid2str(&conn->c_remote_uuid),
1475                        sptlrpc_flavor2name(&sec->ps_flvr, str, sizeof(str)),
1476                        sptlrpc_flavor2name(&sf, str2, sizeof(str2)));
1477         } else if (SPTLRPC_FLVR_BASE(sf.sf_rpc) !=
1478                    SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_NULL)) {
1479                 CDEBUG(D_SEC, "import %s->%s netid %x: select flavor %s\n",
1480                        imp->imp_obd->obd_name,
1481                        obd_uuid2str(&conn->c_remote_uuid),
1482                        LNET_NIDNET(conn->c_self),
1483                        sptlrpc_flavor2name(&sf, str, sizeof(str)));
1484         }
1485
1486         mutex_lock(&imp->imp_sec_mutex);
1487
1488         newsec = sptlrpc_sec_create(imp, svc_ctx, &sf, sp);
1489         if (newsec) {
1490                 sptlrpc_import_sec_install(imp, newsec);
1491         } else {
1492                 CERROR("import %s->%s: failed to create new sec\n",
1493                        imp->imp_obd->obd_name,
1494                        obd_uuid2str(&conn->c_remote_uuid));
1495                 rc = -EPERM;
1496         }
1497
1498         mutex_unlock(&imp->imp_sec_mutex);
1499 out:
1500         sptlrpc_sec_put(sec);
1501         RETURN(rc);
1502 }
1503
1504 void sptlrpc_import_sec_put(struct obd_import *imp)
1505 {
1506         if (imp->imp_sec) {
1507                 sptlrpc_sec_kill(imp->imp_sec);
1508
1509                 sptlrpc_sec_put(imp->imp_sec);
1510                 imp->imp_sec = NULL;
1511         }
1512 }
1513
1514 static void import_flush_ctx_common(struct obd_import *imp,
1515                                     uid_t uid, int grace, int force)
1516 {
1517         struct ptlrpc_sec *sec;
1518
1519         if (imp == NULL)
1520                 return;
1521
1522         sec = sptlrpc_import_sec_ref(imp);
1523         if (sec == NULL)
1524                 return;
1525
1526         sec_cop_flush_ctx_cache(sec, uid, grace, force);
1527         sptlrpc_sec_put(sec);
1528 }
1529
1530 void sptlrpc_import_flush_root_ctx(struct obd_import *imp)
1531 {
1532         /* it's important to use grace mode, see explain in
1533          * sptlrpc_req_refresh_ctx() */
1534         import_flush_ctx_common(imp, 0, 1, 1);
1535 }
1536
1537 void sptlrpc_import_flush_my_ctx(struct obd_import *imp)
1538 {
1539         import_flush_ctx_common(imp, from_kuid(&init_user_ns, current_uid()),
1540                                 1, 1);
1541 }
1542 EXPORT_SYMBOL(sptlrpc_import_flush_my_ctx);
1543
1544 void sptlrpc_import_flush_all_ctx(struct obd_import *imp)
1545 {
1546         import_flush_ctx_common(imp, -1, 1, 1);
1547 }
1548 EXPORT_SYMBOL(sptlrpc_import_flush_all_ctx);
1549
1550 /**
1551  * Used by ptlrpc client to allocate request buffer of \a req. Upon return
1552  * successfully, req->rq_reqmsg points to a buffer with size \a msgsize.
1553  */
1554 int sptlrpc_cli_alloc_reqbuf(struct ptlrpc_request *req, int msgsize)
1555 {
1556         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
1557         struct ptlrpc_sec_policy *policy;
1558         int rc;
1559
1560         LASSERT(ctx);
1561         LASSERT(ctx->cc_sec);
1562         LASSERT(ctx->cc_sec->ps_policy);
1563         LASSERT(req->rq_reqmsg == NULL);
1564         LASSERT_ATOMIC_POS(&ctx->cc_refcount);
1565
1566         policy = ctx->cc_sec->ps_policy;
1567         rc = policy->sp_cops->alloc_reqbuf(ctx->cc_sec, req, msgsize);
1568         if (!rc) {
1569                 LASSERT(req->rq_reqmsg);
1570                 LASSERT(req->rq_reqbuf || req->rq_clrbuf);
1571
1572                 /* zeroing preallocated buffer */
1573                 if (req->rq_pool)
1574                         memset(req->rq_reqmsg, 0, msgsize);
1575         }
1576
1577         return rc;
1578 }
1579
1580 /**
1581  * Used by ptlrpc client to free request buffer of \a req. After this
1582  * req->rq_reqmsg is set to NULL and should not be accessed anymore.
1583  */
1584 void sptlrpc_cli_free_reqbuf(struct ptlrpc_request *req)
1585 {
1586         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
1587         struct ptlrpc_sec_policy *policy;
1588
1589         LASSERT(ctx);
1590         LASSERT(ctx->cc_sec);
1591         LASSERT(ctx->cc_sec->ps_policy);
1592         LASSERT_ATOMIC_POS(&ctx->cc_refcount);
1593
1594         if (req->rq_reqbuf == NULL && req->rq_clrbuf == NULL)
1595                 return;
1596
1597         policy = ctx->cc_sec->ps_policy;
1598         policy->sp_cops->free_reqbuf(ctx->cc_sec, req);
1599         req->rq_reqmsg = NULL;
1600 }
1601
1602 /*
1603  * NOTE caller must guarantee the buffer size is enough for the enlargement
1604  */
1605 void _sptlrpc_enlarge_msg_inplace(struct lustre_msg *msg,
1606                                   int segment, int newsize)
1607 {
1608         void   *src, *dst;
1609         int     oldsize, oldmsg_size, movesize;
1610
1611         LASSERT(segment < msg->lm_bufcount);
1612         LASSERT(msg->lm_buflens[segment] <= newsize);
1613
1614         if (msg->lm_buflens[segment] == newsize)
1615                 return;
1616
1617         /* nothing to do if we are enlarging the last segment */
1618         if (segment == msg->lm_bufcount - 1) {
1619                 msg->lm_buflens[segment] = newsize;
1620                 return;
1621         }
1622
1623         oldsize = msg->lm_buflens[segment];
1624
1625         src = lustre_msg_buf(msg, segment + 1, 0);
1626         msg->lm_buflens[segment] = newsize;
1627         dst = lustre_msg_buf(msg, segment + 1, 0);
1628         msg->lm_buflens[segment] = oldsize;
1629
1630         /* move from segment + 1 to end segment */
1631         LASSERT(msg->lm_magic == LUSTRE_MSG_MAGIC_V2);
1632         oldmsg_size = lustre_msg_size_v2(msg->lm_bufcount, msg->lm_buflens);
1633         movesize = oldmsg_size - ((unsigned long) src - (unsigned long) msg);
1634         LASSERT(movesize >= 0);
1635
1636         if (movesize)
1637                 memmove(dst, src, movesize);
1638
1639         /* note we don't clear the ares where old data live, not secret */
1640
1641         /* finally set new segment size */
1642         msg->lm_buflens[segment] = newsize;
1643 }
1644 EXPORT_SYMBOL(_sptlrpc_enlarge_msg_inplace);
1645
1646 /**
1647  * Used by ptlrpc client to enlarge the \a segment of request message pointed
1648  * by req->rq_reqmsg to size \a newsize, all previously filled-in data will be
1649  * preserved after the enlargement. this must be called after original request
1650  * buffer being allocated.
1651  *
1652  * \note after this be called, rq_reqmsg and rq_reqlen might have been changed,
1653  * so caller should refresh its local pointers if needed.
1654  */
1655 int sptlrpc_cli_enlarge_reqbuf(struct ptlrpc_request *req,
1656                                int segment, int newsize)
1657 {
1658         struct ptlrpc_cli_ctx    *ctx = req->rq_cli_ctx;
1659         struct ptlrpc_sec_cops   *cops;
1660         struct lustre_msg        *msg = req->rq_reqmsg;
1661
1662         LASSERT(ctx);
1663         LASSERT(msg);
1664         LASSERT(msg->lm_bufcount > segment);
1665         LASSERT(msg->lm_buflens[segment] <= newsize);
1666
1667         if (msg->lm_buflens[segment] == newsize)
1668                 return 0;
1669
1670         cops = ctx->cc_sec->ps_policy->sp_cops;
1671         LASSERT(cops->enlarge_reqbuf);
1672         return cops->enlarge_reqbuf(ctx->cc_sec, req, segment, newsize);
1673 }
1674 EXPORT_SYMBOL(sptlrpc_cli_enlarge_reqbuf);
1675
1676 /**
1677  * Used by ptlrpc client to allocate reply buffer of \a req.
1678  *
1679  * \note After this, req->rq_repmsg is still not accessible.
1680  */
1681 int sptlrpc_cli_alloc_repbuf(struct ptlrpc_request *req, int msgsize)
1682 {
1683         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
1684         struct ptlrpc_sec_policy *policy;
1685         ENTRY;
1686
1687         LASSERT(ctx);
1688         LASSERT(ctx->cc_sec);
1689         LASSERT(ctx->cc_sec->ps_policy);
1690
1691         if (req->rq_repbuf)
1692                 RETURN(0);
1693
1694         policy = ctx->cc_sec->ps_policy;
1695         RETURN(policy->sp_cops->alloc_repbuf(ctx->cc_sec, req, msgsize));
1696 }
1697
1698 /**
1699  * Used by ptlrpc client to free reply buffer of \a req. After this
1700  * req->rq_repmsg is set to NULL and should not be accessed anymore.
1701  */
1702 void sptlrpc_cli_free_repbuf(struct ptlrpc_request *req)
1703 {
1704         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
1705         struct ptlrpc_sec_policy *policy;
1706         ENTRY;
1707
1708         LASSERT(ctx);
1709         LASSERT(ctx->cc_sec);
1710         LASSERT(ctx->cc_sec->ps_policy);
1711         LASSERT_ATOMIC_POS(&ctx->cc_refcount);
1712
1713         if (req->rq_repbuf == NULL)
1714                 return;
1715         LASSERT(req->rq_repbuf_len);
1716
1717         policy = ctx->cc_sec->ps_policy;
1718         policy->sp_cops->free_repbuf(ctx->cc_sec, req);
1719         req->rq_repmsg = NULL;
1720         EXIT;
1721 }
1722
1723 int sptlrpc_cli_install_rvs_ctx(struct obd_import *imp,
1724                                 struct ptlrpc_cli_ctx *ctx)
1725 {
1726         struct ptlrpc_sec_policy *policy = ctx->cc_sec->ps_policy;
1727
1728         if (!policy->sp_cops->install_rctx)
1729                 return 0;
1730         return policy->sp_cops->install_rctx(imp, ctx->cc_sec, ctx);
1731 }
1732
1733 int sptlrpc_svc_install_rvs_ctx(struct obd_import *imp,
1734                                 struct ptlrpc_svc_ctx *ctx)
1735 {
1736         struct ptlrpc_sec_policy *policy = ctx->sc_policy;
1737
1738         if (!policy->sp_sops->install_rctx)
1739                 return 0;
1740         return policy->sp_sops->install_rctx(imp, ctx);
1741 }
1742
1743 /****************************************
1744  * server side security                 *
1745  ****************************************/
1746
1747 static int flavor_allowed(struct sptlrpc_flavor *exp,
1748                           struct ptlrpc_request *req)
1749 {
1750         struct sptlrpc_flavor *flvr = &req->rq_flvr;
1751
1752         if (exp->sf_rpc == SPTLRPC_FLVR_ANY || exp->sf_rpc == flvr->sf_rpc)
1753                 return 1;
1754
1755         if ((req->rq_ctx_init || req->rq_ctx_fini) &&
1756             SPTLRPC_FLVR_POLICY(exp->sf_rpc) ==
1757             SPTLRPC_FLVR_POLICY(flvr->sf_rpc) &&
1758             SPTLRPC_FLVR_MECH(exp->sf_rpc) == SPTLRPC_FLVR_MECH(flvr->sf_rpc))
1759                 return 1;
1760
1761         return 0;
1762 }
1763
1764 #define EXP_FLVR_UPDATE_EXPIRE      (OBD_TIMEOUT_DEFAULT + 10)
1765
1766 /**
1767  * Given an export \a exp, check whether the flavor of incoming \a req
1768  * is allowed by the export \a exp. Main logic is about taking care of
1769  * changing configurations. Return 0 means success.
1770  */
1771 int sptlrpc_target_export_check(struct obd_export *exp,
1772                                 struct ptlrpc_request *req)
1773 {
1774         struct sptlrpc_flavor   flavor;
1775
1776         if (exp == NULL)
1777                 return 0;
1778
1779         /* client side export has no imp_reverse, skip
1780          * FIXME maybe we should check flavor this as well??? */
1781         if (exp->exp_imp_reverse == NULL)
1782                 return 0;
1783
1784         /* don't care about ctx fini rpc */
1785         if (req->rq_ctx_fini)
1786                 return 0;
1787
1788         spin_lock(&exp->exp_lock);
1789
1790         /* if flavor just changed (exp->exp_flvr_changed != 0), we wait for
1791          * the first req with the new flavor, then treat it as current flavor,
1792          * adapt reverse sec according to it.
1793          * note the first rpc with new flavor might not be with root ctx, in
1794          * which case delay the sec_adapt by leaving exp_flvr_adapt == 1. */
1795         if (unlikely(exp->exp_flvr_changed) &&
1796             flavor_allowed(&exp->exp_flvr_old[1], req)) {
1797                 /* make the new flavor as "current", and old ones as
1798                  * about-to-expire */
1799                 CDEBUG(D_SEC, "exp %p: just changed: %x->%x\n", exp,
1800                        exp->exp_flvr.sf_rpc, exp->exp_flvr_old[1].sf_rpc);
1801                 flavor = exp->exp_flvr_old[1];
1802                 exp->exp_flvr_old[1] = exp->exp_flvr_old[0];
1803                 exp->exp_flvr_expire[1] = exp->exp_flvr_expire[0];
1804                 exp->exp_flvr_old[0] = exp->exp_flvr;
1805                 exp->exp_flvr_expire[0] = cfs_time_current_sec() +
1806                                           EXP_FLVR_UPDATE_EXPIRE;
1807                 exp->exp_flvr = flavor;
1808
1809                 /* flavor change finished */
1810                 exp->exp_flvr_changed = 0;
1811                 LASSERT(exp->exp_flvr_adapt == 1);
1812
1813                 /* if it's gss, we only interested in root ctx init */
1814                 if (req->rq_auth_gss &&
1815                     !(req->rq_ctx_init &&
1816                       (req->rq_auth_usr_root || req->rq_auth_usr_mdt ||
1817                        req->rq_auth_usr_ost))) {
1818                         spin_unlock(&exp->exp_lock);
1819                         CDEBUG(D_SEC, "is good but not root(%d:%d:%d:%d:%d)\n",
1820                                req->rq_auth_gss, req->rq_ctx_init,
1821                                req->rq_auth_usr_root, req->rq_auth_usr_mdt,
1822                                req->rq_auth_usr_ost);
1823                         return 0;
1824                 }
1825
1826                 exp->exp_flvr_adapt = 0;
1827                 spin_unlock(&exp->exp_lock);
1828
1829                 return sptlrpc_import_sec_adapt(exp->exp_imp_reverse,
1830                                                 req->rq_svc_ctx, &flavor);
1831         }
1832
1833         /* if it equals to the current flavor, we accept it, but need to
1834          * dealing with reverse sec/ctx */
1835         if (likely(flavor_allowed(&exp->exp_flvr, req))) {
1836                 /* most cases should return here, we only interested in
1837                  * gss root ctx init */
1838                 if (!req->rq_auth_gss || !req->rq_ctx_init ||
1839                     (!req->rq_auth_usr_root && !req->rq_auth_usr_mdt &&
1840                      !req->rq_auth_usr_ost)) {
1841                         spin_unlock(&exp->exp_lock);
1842                         return 0;
1843                 }
1844
1845                 /* if flavor just changed, we should not proceed, just leave
1846                  * it and current flavor will be discovered and replaced
1847                  * shortly, and let _this_ rpc pass through */
1848                 if (exp->exp_flvr_changed) {
1849                         LASSERT(exp->exp_flvr_adapt);
1850                         spin_unlock(&exp->exp_lock);
1851                         return 0;
1852                 }
1853
1854                 if (exp->exp_flvr_adapt) {
1855                         exp->exp_flvr_adapt = 0;
1856                         CDEBUG(D_SEC, "exp %p (%x|%x|%x): do delayed adapt\n",
1857                                exp, exp->exp_flvr.sf_rpc,
1858                                exp->exp_flvr_old[0].sf_rpc,
1859                                exp->exp_flvr_old[1].sf_rpc);
1860                         flavor = exp->exp_flvr;
1861                         spin_unlock(&exp->exp_lock);
1862
1863                         return sptlrpc_import_sec_adapt(exp->exp_imp_reverse,
1864                                                         req->rq_svc_ctx,
1865                                                         &flavor);
1866                 } else {
1867                         CDEBUG(D_SEC, "exp %p (%x|%x|%x): is current flavor, "
1868                                "install rvs ctx\n", exp, exp->exp_flvr.sf_rpc,
1869                                exp->exp_flvr_old[0].sf_rpc,
1870                                exp->exp_flvr_old[1].sf_rpc);
1871                         spin_unlock(&exp->exp_lock);
1872
1873                         return sptlrpc_svc_install_rvs_ctx(exp->exp_imp_reverse,
1874                                                            req->rq_svc_ctx);
1875                 }
1876         }
1877
1878         if (exp->exp_flvr_expire[0]) {
1879                 if (exp->exp_flvr_expire[0] >= cfs_time_current_sec()) {
1880                         if (flavor_allowed(&exp->exp_flvr_old[0], req)) {
1881                                 CDEBUG(D_SEC, "exp %p (%x|%x|%x): match the "
1882                                        "middle one ("CFS_DURATION_T")\n", exp,
1883                                        exp->exp_flvr.sf_rpc,
1884                                        exp->exp_flvr_old[0].sf_rpc,
1885                                        exp->exp_flvr_old[1].sf_rpc,
1886                                        exp->exp_flvr_expire[0] -
1887                                                 cfs_time_current_sec());
1888                                 spin_unlock(&exp->exp_lock);
1889                                 return 0;
1890                         }
1891                 } else {
1892                         CDEBUG(D_SEC, "mark middle expired\n");
1893                         exp->exp_flvr_expire[0] = 0;
1894                 }
1895                 CDEBUG(D_SEC, "exp %p (%x|%x|%x): %x not match middle\n", exp,
1896                        exp->exp_flvr.sf_rpc,
1897                        exp->exp_flvr_old[0].sf_rpc, exp->exp_flvr_old[1].sf_rpc,
1898                        req->rq_flvr.sf_rpc);
1899         }
1900
1901         /* now it doesn't match the current flavor, the only chance we can
1902          * accept it is match the old flavors which is not expired. */
1903         if (exp->exp_flvr_changed == 0 && exp->exp_flvr_expire[1]) {
1904                 if (exp->exp_flvr_expire[1] >= cfs_time_current_sec()) {
1905                         if (flavor_allowed(&exp->exp_flvr_old[1], req)) {
1906                                 CDEBUG(D_SEC, "exp %p (%x|%x|%x): match the "
1907                                        "oldest one ("CFS_DURATION_T")\n", exp,
1908                                        exp->exp_flvr.sf_rpc,
1909                                        exp->exp_flvr_old[0].sf_rpc,
1910                                        exp->exp_flvr_old[1].sf_rpc,
1911                                        exp->exp_flvr_expire[1] -
1912                                                 cfs_time_current_sec());
1913                                 spin_unlock(&exp->exp_lock);
1914                                 return 0;
1915                         }
1916                 } else {
1917                         CDEBUG(D_SEC, "mark oldest expired\n");
1918                         exp->exp_flvr_expire[1] = 0;
1919                 }
1920                 CDEBUG(D_SEC, "exp %p (%x|%x|%x): %x not match found\n",
1921                        exp, exp->exp_flvr.sf_rpc,
1922                        exp->exp_flvr_old[0].sf_rpc, exp->exp_flvr_old[1].sf_rpc,
1923                        req->rq_flvr.sf_rpc);
1924         } else {
1925                 CDEBUG(D_SEC, "exp %p (%x|%x|%x): skip the last one\n",
1926                        exp, exp->exp_flvr.sf_rpc, exp->exp_flvr_old[0].sf_rpc,
1927                        exp->exp_flvr_old[1].sf_rpc);
1928         }
1929
1930         spin_unlock(&exp->exp_lock);
1931
1932         CWARN("exp %p(%s): req %p (%u|%u|%u|%u|%u|%u) with "
1933               "unauthorized flavor %x, expect %x|%x(%+ld)|%x(%+ld)\n",
1934               exp, exp->exp_obd->obd_name,
1935               req, req->rq_auth_gss, req->rq_ctx_init, req->rq_ctx_fini,
1936               req->rq_auth_usr_root, req->rq_auth_usr_mdt, req->rq_auth_usr_ost,
1937               req->rq_flvr.sf_rpc,
1938               exp->exp_flvr.sf_rpc,
1939               exp->exp_flvr_old[0].sf_rpc,
1940               exp->exp_flvr_expire[0] ?
1941               (unsigned long) (exp->exp_flvr_expire[0] -
1942                                cfs_time_current_sec()) : 0,
1943               exp->exp_flvr_old[1].sf_rpc,
1944               exp->exp_flvr_expire[1] ?
1945               (unsigned long) (exp->exp_flvr_expire[1] -
1946                                cfs_time_current_sec()) : 0);
1947         return -EACCES;
1948 }
1949 EXPORT_SYMBOL(sptlrpc_target_export_check);
1950
1951 void sptlrpc_target_update_exp_flavor(struct obd_device *obd,
1952                                       struct sptlrpc_rule_set *rset)
1953 {
1954         struct obd_export       *exp;
1955         struct sptlrpc_flavor    new_flvr;
1956
1957         LASSERT(obd);
1958
1959         spin_lock(&obd->obd_dev_lock);
1960
1961         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) {
1962                 if (exp->exp_connection == NULL)
1963                         continue;
1964
1965                 /* note if this export had just been updated flavor
1966                  * (exp_flvr_changed == 1), this will override the
1967                  * previous one. */
1968                 spin_lock(&exp->exp_lock);
1969                 sptlrpc_target_choose_flavor(rset, exp->exp_sp_peer,
1970                                              exp->exp_connection->c_peer.nid,
1971                                              &new_flvr);
1972                 if (exp->exp_flvr_changed ||
1973                     !flavor_equal(&new_flvr, &exp->exp_flvr)) {
1974                         exp->exp_flvr_old[1] = new_flvr;
1975                         exp->exp_flvr_expire[1] = 0;
1976                         exp->exp_flvr_changed = 1;
1977                         exp->exp_flvr_adapt = 1;
1978
1979                         CDEBUG(D_SEC, "exp %p (%s): updated flavor %x->%x\n",
1980                                exp, sptlrpc_part2name(exp->exp_sp_peer),
1981                                exp->exp_flvr.sf_rpc,
1982                                exp->exp_flvr_old[1].sf_rpc);
1983                 }
1984                 spin_unlock(&exp->exp_lock);
1985         }
1986
1987         spin_unlock(&obd->obd_dev_lock);
1988 }
1989 EXPORT_SYMBOL(sptlrpc_target_update_exp_flavor);
1990
1991 static int sptlrpc_svc_check_from(struct ptlrpc_request *req, int svc_rc)
1992 {
1993         /* peer's claim is unreliable unless gss is being used */
1994         if (!req->rq_auth_gss || svc_rc == SECSVC_DROP)
1995                 return svc_rc;
1996
1997         switch (req->rq_sp_from) {
1998         case LUSTRE_SP_CLI:
1999                 if (req->rq_auth_usr_mdt || req->rq_auth_usr_ost) {
2000                         DEBUG_REQ(D_ERROR, req, "faked source CLI");
2001                         svc_rc = SECSVC_DROP;
2002                 }
2003                 break;
2004         case LUSTRE_SP_MDT:
2005                 if (!req->rq_auth_usr_mdt) {
2006                         DEBUG_REQ(D_ERROR, req, "faked source MDT");
2007                         svc_rc = SECSVC_DROP;
2008                 }
2009                 break;
2010         case LUSTRE_SP_OST:
2011                 if (!req->rq_auth_usr_ost) {
2012                         DEBUG_REQ(D_ERROR, req, "faked source OST");
2013                         svc_rc = SECSVC_DROP;
2014                 }
2015                 break;
2016         case LUSTRE_SP_MGS:
2017         case LUSTRE_SP_MGC:
2018                 if (!req->rq_auth_usr_root && !req->rq_auth_usr_mdt &&
2019                     !req->rq_auth_usr_ost) {
2020                         DEBUG_REQ(D_ERROR, req, "faked source MGC/MGS");
2021                         svc_rc = SECSVC_DROP;
2022                 }
2023                 break;
2024         case LUSTRE_SP_ANY:
2025         default:
2026                 DEBUG_REQ(D_ERROR, req, "invalid source %u", req->rq_sp_from);
2027                 svc_rc = SECSVC_DROP;
2028         }
2029
2030         return svc_rc;
2031 }
2032
2033 /**
2034  * Used by ptlrpc server, to perform transformation upon request message of
2035  * incoming \a req. This must be the first thing to do with an incoming
2036  * request in ptlrpc layer.
2037  *
2038  * \retval SECSVC_OK success, and req->rq_reqmsg point to request message in
2039  * clear text, size is req->rq_reqlen; also req->rq_svc_ctx is set.
2040  * \retval SECSVC_COMPLETE success, the request has been fully processed, and
2041  * reply message has been prepared.
2042  * \retval SECSVC_DROP failed, this request should be dropped.
2043  */
2044 int sptlrpc_svc_unwrap_request(struct ptlrpc_request *req)
2045 {
2046         struct ptlrpc_sec_policy *policy;
2047         struct lustre_msg        *msg = req->rq_reqbuf;
2048         int                       rc;
2049         ENTRY;
2050
2051         LASSERT(msg);
2052         LASSERT(req->rq_reqmsg == NULL);
2053         LASSERT(req->rq_repmsg == NULL);
2054         LASSERT(req->rq_svc_ctx == NULL);
2055
2056         req->rq_req_swab_mask = 0;
2057
2058         rc = __lustre_unpack_msg(msg, req->rq_reqdata_len);
2059         switch (rc) {
2060         case 1:
2061                 lustre_set_req_swabbed(req, MSG_PTLRPC_HEADER_OFF);
2062         case 0:
2063                 break;
2064         default:
2065                 CERROR("error unpacking request from %s x"LPU64"\n",
2066                        libcfs_id2str(req->rq_peer), req->rq_xid);
2067                 RETURN(SECSVC_DROP);
2068         }
2069
2070         req->rq_flvr.sf_rpc = WIRE_FLVR(msg->lm_secflvr);
2071         req->rq_sp_from = LUSTRE_SP_ANY;
2072         req->rq_auth_uid = -1;          /* set to INVALID_UID */
2073         req->rq_auth_mapped_uid = -1;
2074
2075         policy = sptlrpc_wireflavor2policy(req->rq_flvr.sf_rpc);
2076         if (!policy) {
2077                 CERROR("unsupported rpc flavor %x\n", req->rq_flvr.sf_rpc);
2078                 RETURN(SECSVC_DROP);
2079         }
2080
2081         LASSERT(policy->sp_sops->accept);
2082         rc = policy->sp_sops->accept(req);
2083         sptlrpc_policy_put(policy);
2084         LASSERT(req->rq_reqmsg || rc != SECSVC_OK);
2085         LASSERT(req->rq_svc_ctx || rc == SECSVC_DROP);
2086
2087         /*
2088          * if it's not null flavor (which means embedded packing msg),
2089          * reset the swab mask for the comming inner msg unpacking.
2090          */
2091         if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL)
2092                 req->rq_req_swab_mask = 0;
2093
2094         /* sanity check for the request source */
2095         rc = sptlrpc_svc_check_from(req, rc);
2096         RETURN(rc);
2097 }
2098
2099 /**
2100  * Used by ptlrpc server, to allocate reply buffer for \a req. If succeed,
2101  * req->rq_reply_state is set, and req->rq_reply_state->rs_msg point to
2102  * a buffer of \a msglen size.
2103  */
2104 int sptlrpc_svc_alloc_rs(struct ptlrpc_request *req, int msglen)
2105 {
2106         struct ptlrpc_sec_policy *policy;
2107         struct ptlrpc_reply_state *rs;
2108         int rc;
2109         ENTRY;
2110
2111         LASSERT(req->rq_svc_ctx);
2112         LASSERT(req->rq_svc_ctx->sc_policy);
2113
2114         policy = req->rq_svc_ctx->sc_policy;
2115         LASSERT(policy->sp_sops->alloc_rs);
2116
2117         rc = policy->sp_sops->alloc_rs(req, msglen);
2118         if (unlikely(rc == -ENOMEM)) {
2119                 struct ptlrpc_service_part *svcpt = req->rq_rqbd->rqbd_svcpt;
2120                 if (svcpt->scp_service->srv_max_reply_size <
2121                    msglen + sizeof(struct ptlrpc_reply_state)) {
2122                         /* Just return failure if the size is too big */
2123                         CERROR("size of message is too big (%zd), %d allowed\n",
2124                                 msglen + sizeof(struct ptlrpc_reply_state),
2125                                 svcpt->scp_service->srv_max_reply_size);
2126                         RETURN(-ENOMEM);
2127                 }
2128
2129                 /* failed alloc, try emergency pool */
2130                 rs = lustre_get_emerg_rs(svcpt);
2131                 if (rs == NULL)
2132                         RETURN(-ENOMEM);
2133
2134                 req->rq_reply_state = rs;
2135                 rc = policy->sp_sops->alloc_rs(req, msglen);
2136                 if (rc) {
2137                         lustre_put_emerg_rs(rs);
2138                         req->rq_reply_state = NULL;
2139                 }
2140         }
2141
2142         LASSERT(rc != 0 ||
2143                 (req->rq_reply_state && req->rq_reply_state->rs_msg));
2144
2145         RETURN(rc);
2146 }
2147
2148 /**
2149  * Used by ptlrpc server, to perform transformation upon reply message.
2150  *
2151  * \post req->rq_reply_off is set to approriate server-controlled reply offset.
2152  * \post req->rq_repmsg and req->rq_reply_state->rs_msg becomes inaccessible.
2153  */
2154 int sptlrpc_svc_wrap_reply(struct ptlrpc_request *req)
2155 {
2156         struct ptlrpc_sec_policy *policy;
2157         int rc;
2158         ENTRY;
2159
2160         LASSERT(req->rq_svc_ctx);
2161         LASSERT(req->rq_svc_ctx->sc_policy);
2162
2163         policy = req->rq_svc_ctx->sc_policy;
2164         LASSERT(policy->sp_sops->authorize);
2165
2166         rc = policy->sp_sops->authorize(req);
2167         LASSERT(rc || req->rq_reply_state->rs_repdata_len);
2168
2169         RETURN(rc);
2170 }
2171
2172 /**
2173  * Used by ptlrpc server, to free reply_state.
2174  */
2175 void sptlrpc_svc_free_rs(struct ptlrpc_reply_state *rs)
2176 {
2177         struct ptlrpc_sec_policy *policy;
2178         unsigned int prealloc;
2179         ENTRY;
2180
2181         LASSERT(rs->rs_svc_ctx);
2182         LASSERT(rs->rs_svc_ctx->sc_policy);
2183
2184         policy = rs->rs_svc_ctx->sc_policy;
2185         LASSERT(policy->sp_sops->free_rs);
2186
2187         prealloc = rs->rs_prealloc;
2188         policy->sp_sops->free_rs(rs);
2189
2190         if (prealloc)
2191                 lustre_put_emerg_rs(rs);
2192         EXIT;
2193 }
2194
2195 void sptlrpc_svc_ctx_addref(struct ptlrpc_request *req)
2196 {
2197         struct ptlrpc_svc_ctx *ctx = req->rq_svc_ctx;
2198
2199         if (ctx != NULL)
2200                 atomic_inc(&ctx->sc_refcount);
2201 }
2202
2203 void sptlrpc_svc_ctx_decref(struct ptlrpc_request *req)
2204 {
2205         struct ptlrpc_svc_ctx *ctx = req->rq_svc_ctx;
2206
2207         if (ctx == NULL)
2208                 return;
2209
2210         LASSERT_ATOMIC_POS(&ctx->sc_refcount);
2211         if (atomic_dec_and_test(&ctx->sc_refcount)) {
2212                 if (ctx->sc_policy->sp_sops->free_ctx)
2213                         ctx->sc_policy->sp_sops->free_ctx(ctx);
2214         }
2215         req->rq_svc_ctx = NULL;
2216 }
2217
2218 void sptlrpc_svc_ctx_invalidate(struct ptlrpc_request *req)
2219 {
2220         struct ptlrpc_svc_ctx *ctx = req->rq_svc_ctx;
2221
2222         if (ctx == NULL)
2223                 return;
2224
2225         LASSERT_ATOMIC_POS(&ctx->sc_refcount);
2226         if (ctx->sc_policy->sp_sops->invalidate_ctx)
2227                 ctx->sc_policy->sp_sops->invalidate_ctx(ctx);
2228 }
2229 EXPORT_SYMBOL(sptlrpc_svc_ctx_invalidate);
2230
2231 /****************************************
2232  * bulk security                        *
2233  ****************************************/
2234
2235 /**
2236  * Perform transformation upon bulk data pointed by \a desc. This is called
2237  * before transforming the request message.
2238  */
2239 int sptlrpc_cli_wrap_bulk(struct ptlrpc_request *req,
2240                           struct ptlrpc_bulk_desc *desc)
2241 {
2242         struct ptlrpc_cli_ctx *ctx;
2243
2244         LASSERT(req->rq_bulk_read || req->rq_bulk_write);
2245
2246         if (!req->rq_pack_bulk)
2247                 return 0;
2248
2249         ctx = req->rq_cli_ctx;
2250         if (ctx->cc_ops->wrap_bulk)
2251                 return ctx->cc_ops->wrap_bulk(ctx, req, desc);
2252         return 0;
2253 }
2254 EXPORT_SYMBOL(sptlrpc_cli_wrap_bulk);
2255
2256 /**
2257  * This is called after unwrap the reply message.
2258  * return nob of actual plain text size received, or error code.
2259  */
2260 int sptlrpc_cli_unwrap_bulk_read(struct ptlrpc_request *req,
2261                                  struct ptlrpc_bulk_desc *desc,
2262                                  int nob)
2263 {
2264         struct ptlrpc_cli_ctx  *ctx;
2265         int                     rc;
2266
2267         LASSERT(req->rq_bulk_read && !req->rq_bulk_write);
2268
2269         if (!req->rq_pack_bulk)
2270                 return desc->bd_nob_transferred;
2271
2272         ctx = req->rq_cli_ctx;
2273         if (ctx->cc_ops->unwrap_bulk) {
2274                 rc = ctx->cc_ops->unwrap_bulk(ctx, req, desc);
2275                 if (rc < 0)
2276                         return rc;
2277         }
2278         return desc->bd_nob_transferred;
2279 }
2280 EXPORT_SYMBOL(sptlrpc_cli_unwrap_bulk_read);
2281
2282 /**
2283  * This is called after unwrap the reply message.
2284  * return 0 for success or error code.
2285  */
2286 int sptlrpc_cli_unwrap_bulk_write(struct ptlrpc_request *req,
2287                                   struct ptlrpc_bulk_desc *desc)
2288 {
2289         struct ptlrpc_cli_ctx  *ctx;
2290         int                     rc;
2291
2292         LASSERT(!req->rq_bulk_read && req->rq_bulk_write);
2293
2294         if (!req->rq_pack_bulk)
2295                 return 0;
2296
2297         ctx = req->rq_cli_ctx;
2298         if (ctx->cc_ops->unwrap_bulk) {
2299                 rc = ctx->cc_ops->unwrap_bulk(ctx, req, desc);
2300                 if (rc < 0)
2301                         return rc;
2302         }
2303
2304         /*
2305          * if everything is going right, nob should equals to nob_transferred.
2306          * in case of privacy mode, nob_transferred needs to be adjusted.
2307          */
2308         if (desc->bd_nob != desc->bd_nob_transferred) {
2309                 CERROR("nob %d doesn't match transferred nob %d\n",
2310                        desc->bd_nob, desc->bd_nob_transferred);
2311                 return -EPROTO;
2312         }
2313
2314         return 0;
2315 }
2316 EXPORT_SYMBOL(sptlrpc_cli_unwrap_bulk_write);
2317
2318 #ifdef HAVE_SERVER_SUPPORT
2319 /**
2320  * Performe transformation upon outgoing bulk read.
2321  */
2322 int sptlrpc_svc_wrap_bulk(struct ptlrpc_request *req,
2323                           struct ptlrpc_bulk_desc *desc)
2324 {
2325         struct ptlrpc_svc_ctx *ctx;
2326
2327         LASSERT(req->rq_bulk_read);
2328
2329         if (!req->rq_pack_bulk)
2330                 return 0;
2331
2332         ctx = req->rq_svc_ctx;
2333         if (ctx->sc_policy->sp_sops->wrap_bulk)
2334                 return ctx->sc_policy->sp_sops->wrap_bulk(req, desc);
2335
2336         return 0;
2337 }
2338 EXPORT_SYMBOL(sptlrpc_svc_wrap_bulk);
2339
2340 /**
2341  * Performe transformation upon incoming bulk write.
2342  */
2343 int sptlrpc_svc_unwrap_bulk(struct ptlrpc_request *req,
2344                             struct ptlrpc_bulk_desc *desc)
2345 {
2346         struct ptlrpc_svc_ctx *ctx;
2347         int                    rc;
2348
2349         LASSERT(req->rq_bulk_write);
2350
2351         /*
2352          * if it's in privacy mode, transferred should >= expected; otherwise
2353          * transferred should == expected.
2354          */
2355         if (desc->bd_nob_transferred < desc->bd_nob ||
2356             (desc->bd_nob_transferred > desc->bd_nob &&
2357              SPTLRPC_FLVR_BULK_SVC(req->rq_flvr.sf_rpc) !=
2358              SPTLRPC_BULK_SVC_PRIV)) {
2359                 DEBUG_REQ(D_ERROR, req, "truncated bulk GET %d(%d)",
2360                           desc->bd_nob_transferred, desc->bd_nob);
2361                 return -ETIMEDOUT;
2362         }
2363
2364         if (!req->rq_pack_bulk)
2365                 return 0;
2366
2367         ctx = req->rq_svc_ctx;
2368         if (ctx->sc_policy->sp_sops->unwrap_bulk) {
2369                 rc = ctx->sc_policy->sp_sops->unwrap_bulk(req, desc);
2370                 if (rc)
2371                         CERROR("error unwrap bulk: %d\n", rc);
2372         }
2373
2374         /* return 0 to allow reply be sent */
2375         return 0;
2376 }
2377 EXPORT_SYMBOL(sptlrpc_svc_unwrap_bulk);
2378
2379 /**
2380  * Prepare buffers for incoming bulk write.
2381  */
2382 int sptlrpc_svc_prep_bulk(struct ptlrpc_request *req,
2383                           struct ptlrpc_bulk_desc *desc)
2384 {
2385         struct ptlrpc_svc_ctx *ctx;
2386
2387         LASSERT(req->rq_bulk_write);
2388
2389         if (!req->rq_pack_bulk)
2390                 return 0;
2391
2392         ctx = req->rq_svc_ctx;
2393         if (ctx->sc_policy->sp_sops->prep_bulk)
2394                 return ctx->sc_policy->sp_sops->prep_bulk(req, desc);
2395
2396         return 0;
2397 }
2398 EXPORT_SYMBOL(sptlrpc_svc_prep_bulk);
2399
2400 #endif /* HAVE_SERVER_SUPPORT */
2401
2402 /****************************************
2403  * user descriptor helpers              *
2404  ****************************************/
2405
2406 int sptlrpc_current_user_desc_size(void)
2407 {
2408         int ngroups;
2409
2410         ngroups = current_ngroups;
2411
2412         if (ngroups > LUSTRE_MAX_GROUPS)
2413                 ngroups = LUSTRE_MAX_GROUPS;
2414         return sptlrpc_user_desc_size(ngroups);
2415 }
2416 EXPORT_SYMBOL(sptlrpc_current_user_desc_size);
2417
2418 int sptlrpc_pack_user_desc(struct lustre_msg *msg, int offset)
2419 {
2420         struct ptlrpc_user_desc *pud;
2421
2422         pud = lustre_msg_buf(msg, offset, 0);
2423
2424         pud->pud_uid = from_kuid(&init_user_ns, current_uid());
2425         pud->pud_gid = from_kgid(&init_user_ns, current_gid());
2426         pud->pud_fsuid = from_kuid(&init_user_ns, current_fsuid());
2427         pud->pud_fsgid = from_kgid(&init_user_ns, current_fsgid());
2428         pud->pud_cap = cfs_curproc_cap_pack();
2429         pud->pud_ngroups = (msg->lm_buflens[offset] - sizeof(*pud)) / 4;
2430
2431         task_lock(current);
2432         if (pud->pud_ngroups > current_ngroups)
2433                 pud->pud_ngroups = current_ngroups;
2434         memcpy(pud->pud_groups, current_cred()->group_info->blocks[0],
2435                pud->pud_ngroups * sizeof(__u32));
2436         task_unlock(current);
2437
2438         return 0;
2439 }
2440 EXPORT_SYMBOL(sptlrpc_pack_user_desc);
2441
2442 int sptlrpc_unpack_user_desc(struct lustre_msg *msg, int offset, int swabbed)
2443 {
2444         struct ptlrpc_user_desc *pud;
2445         int                      i;
2446
2447         pud = lustre_msg_buf(msg, offset, sizeof(*pud));
2448         if (!pud)
2449                 return -EINVAL;
2450
2451         if (swabbed) {
2452                 __swab32s(&pud->pud_uid);
2453                 __swab32s(&pud->pud_gid);
2454                 __swab32s(&pud->pud_fsuid);
2455                 __swab32s(&pud->pud_fsgid);
2456                 __swab32s(&pud->pud_cap);
2457                 __swab32s(&pud->pud_ngroups);
2458         }
2459
2460         if (pud->pud_ngroups > LUSTRE_MAX_GROUPS) {
2461                 CERROR("%u groups is too large\n", pud->pud_ngroups);
2462                 return -EINVAL;
2463         }
2464
2465         if (sizeof(*pud) + pud->pud_ngroups * sizeof(__u32) >
2466             msg->lm_buflens[offset]) {
2467                 CERROR("%u groups are claimed but bufsize only %u\n",
2468                        pud->pud_ngroups, msg->lm_buflens[offset]);
2469                 return -EINVAL;
2470         }
2471
2472         if (swabbed) {
2473                 for (i = 0; i < pud->pud_ngroups; i++)
2474                         __swab32s(&pud->pud_groups[i]);
2475         }
2476
2477         return 0;
2478 }
2479 EXPORT_SYMBOL(sptlrpc_unpack_user_desc);
2480
2481 /****************************************
2482  * misc helpers                         *
2483  ****************************************/
2484
2485 const char * sec2target_str(struct ptlrpc_sec *sec)
2486 {
2487         if (!sec || !sec->ps_import || !sec->ps_import->imp_obd)
2488                 return "*";
2489         if (sec_is_reverse(sec))
2490                 return "c";
2491         return obd_uuid2str(&sec->ps_import->imp_obd->u.cli.cl_target_uuid);
2492 }
2493 EXPORT_SYMBOL(sec2target_str);
2494
2495 /*
2496  * return true if the bulk data is protected
2497  */
2498 int sptlrpc_flavor_has_bulk(struct sptlrpc_flavor *flvr)
2499 {
2500         switch (SPTLRPC_FLVR_BULK_SVC(flvr->sf_rpc)) {
2501         case SPTLRPC_BULK_SVC_INTG:
2502         case SPTLRPC_BULK_SVC_PRIV:
2503                 return 1;
2504         default:
2505                 return 0;
2506         }
2507 }
2508 EXPORT_SYMBOL(sptlrpc_flavor_has_bulk);
2509
2510 /****************************************
2511  * crypto API helper/alloc blkciper     *
2512  ****************************************/
2513
2514 /****************************************
2515  * initialize/finalize                  *
2516  ****************************************/
2517
2518 int sptlrpc_init(void)
2519 {
2520         int rc;
2521
2522         rwlock_init(&policy_lock);
2523
2524         rc = sptlrpc_gc_init();
2525         if (rc)
2526                 goto out;
2527
2528         rc = sptlrpc_conf_init();
2529         if (rc)
2530                 goto out_gc;
2531
2532         rc = sptlrpc_enc_pool_init();
2533         if (rc)
2534                 goto out_conf;
2535
2536         rc = sptlrpc_null_init();
2537         if (rc)
2538                 goto out_pool;
2539
2540         rc = sptlrpc_plain_init();
2541         if (rc)
2542                 goto out_null;
2543
2544         rc = sptlrpc_lproc_init();
2545         if (rc)
2546                 goto out_plain;
2547
2548         return 0;
2549
2550 out_plain:
2551         sptlrpc_plain_fini();
2552 out_null:
2553         sptlrpc_null_fini();
2554 out_pool:
2555         sptlrpc_enc_pool_fini();
2556 out_conf:
2557         sptlrpc_conf_fini();
2558 out_gc:
2559         sptlrpc_gc_fini();
2560 out:
2561         return rc;
2562 }
2563
2564 void sptlrpc_fini(void)
2565 {
2566         sptlrpc_lproc_fini();
2567         sptlrpc_plain_fini();
2568         sptlrpc_null_fini();
2569         sptlrpc_enc_pool_fini();
2570         sptlrpc_conf_fini();
2571         sptlrpc_gc_fini();
2572 }