Whamcloud - gitweb
if client_disconnect_export was called without force flag set,
[fs/lustre-release.git] / lustre / ptlrpc / sec.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004-2006 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #ifndef EXPORT_SYMTAB
23 #define EXPORT_SYMTAB
24 #endif
25 #define DEBUG_SUBSYSTEM S_SEC
26
27 #include <libcfs/libcfs.h>
28 #ifndef __KERNEL__
29 #include <liblustre.h>
30 #include <libcfs/list.h>
31 #else
32 #include <linux/crypto.h>
33 #include <linux/key.h>
34 #endif
35
36 #include <obd.h>
37 #include <obd_class.h>
38 #include <obd_support.h>
39 #include <lustre_net.h>
40 #include <lustre_import.h>
41 #include <lustre_dlm.h>
42 #include <lustre_sec.h>
43
44 #include "ptlrpc_internal.h"
45
46 /***********************************************
47  * policy registers                            *
48  ***********************************************/
49
50 static rwlock_t policy_lock = RW_LOCK_UNLOCKED;
51 static struct ptlrpc_sec_policy *policies[SPTLRPC_POLICY_MAX] = {
52         NULL,
53 };
54
55 int sptlrpc_register_policy(struct ptlrpc_sec_policy *policy)
56 {
57         __u32 number = policy->sp_policy;
58
59         LASSERT(policy->sp_name);
60         LASSERT(policy->sp_cops);
61         LASSERT(policy->sp_sops);
62
63         if (number >= SPTLRPC_POLICY_MAX)
64                 return -EINVAL;
65
66         write_lock(&policy_lock);
67         if (unlikely(policies[number])) {
68                 write_unlock(&policy_lock);
69                 return -EALREADY;
70         }
71         policies[number] = policy;
72         write_unlock(&policy_lock);
73
74         CDEBUG(D_SEC, "%s: registered\n", policy->sp_name);
75         return 0;
76 }
77 EXPORT_SYMBOL(sptlrpc_register_policy);
78
79 int sptlrpc_unregister_policy(struct ptlrpc_sec_policy *policy)
80 {
81         __u32 number = policy->sp_policy;
82
83         LASSERT(number < SPTLRPC_POLICY_MAX);
84
85         write_lock(&policy_lock);
86         if (unlikely(policies[number] == NULL)) {
87                 write_unlock(&policy_lock);
88                 CERROR("%s: already unregistered\n", policy->sp_name);
89                 return -EINVAL;
90         }
91
92         LASSERT(policies[number] == policy);
93         policies[number] = NULL;
94         write_unlock(&policy_lock);
95
96         CDEBUG(D_SEC, "%s: unregistered\n", policy->sp_name);
97         return 0;
98 }
99 EXPORT_SYMBOL(sptlrpc_unregister_policy);
100
101 static
102 struct ptlrpc_sec_policy * sptlrpc_flavor2policy(ptlrpc_sec_flavor_t flavor)
103 {
104 #ifdef CONFIG_KMOD
105         static DECLARE_MUTEX(load_mutex);
106 #endif
107         static atomic_t         loaded = ATOMIC_INIT(0);
108         struct                  ptlrpc_sec_policy *policy;
109         __u32                   number = SEC_FLAVOR_POLICY(flavor), flag = 0;
110
111         if (number >= SPTLRPC_POLICY_MAX)
112                 return NULL;
113
114 #ifdef CONFIG_KMOD
115 again:
116 #endif
117         read_lock(&policy_lock);
118         policy = policies[number];
119         if (policy && !try_module_get(policy->sp_owner))
120                 policy = NULL;
121         if (policy == NULL)
122                 flag = atomic_read(&loaded);
123         read_unlock(&policy_lock);
124
125 #ifdef CONFIG_KMOD
126         /* if failure, try to load gss module, once */
127         if (unlikely(policy == NULL) &&
128             flag == 0 &&
129             (number == SPTLRPC_POLICY_GSS ||
130              number == SPTLRPC_POLICY_GSS_PIPEFS)) {
131                 mutex_down(&load_mutex);
132                 if (atomic_read(&loaded) == 0) {
133                         if (request_module("ptlrpc_gss") != 0)
134                                 CERROR("Unable to load module ptlrpc_gss\n");
135                         else
136                                 CWARN("module ptlrpc_gss loaded\n");
137
138                         atomic_set(&loaded, 1);
139                 }
140                 mutex_up(&load_mutex);
141
142                 goto again;
143         }
144 #endif
145
146         return policy;
147 }
148
149 ptlrpc_sec_flavor_t sptlrpc_name2flavor(const char *name)
150 {
151         if (!strcmp(name, "null"))
152                 return SPTLRPC_FLVR_NULL;
153         if (!strcmp(name, "plain"))
154                 return SPTLRPC_FLVR_PLAIN;
155         if (!strcmp(name, "krb5n"))
156                 return SPTLRPC_FLVR_KRB5N;
157         if (!strcmp(name, "krb5i"))
158                 return SPTLRPC_FLVR_KRB5I;
159         if (!strcmp(name, "krb5p"))
160                 return SPTLRPC_FLVR_KRB5P;
161
162         return SPTLRPC_FLVR_INVALID;
163 }
164 EXPORT_SYMBOL(sptlrpc_name2flavor);
165
166 char *sptlrpc_flavor2name(ptlrpc_sec_flavor_t flavor)
167 {
168         switch (flavor) {
169         case SPTLRPC_FLVR_NULL:
170                 return "null";
171         case SPTLRPC_FLVR_PLAIN:
172                 return "plain";
173         case SPTLRPC_FLVR_KRB5N:
174                 return "krb5n";
175         case SPTLRPC_FLVR_KRB5A:
176                 return "krb5a";
177         case SPTLRPC_FLVR_KRB5I:
178                 return "krb5i";
179         case SPTLRPC_FLVR_KRB5P:
180                 return "krb5p";
181         default:
182                 CERROR("invalid flavor 0x%x(p%u,s%u,v%u)\n", flavor,
183                        SEC_FLAVOR_POLICY(flavor), SEC_FLAVOR_MECH(flavor),
184                        SEC_FLAVOR_SVC(flavor));
185         }
186         return "UNKNOWN";
187 }
188 EXPORT_SYMBOL(sptlrpc_flavor2name);
189
190 /**************************************************
191  * client context APIs                            *
192  **************************************************/
193
194 static
195 struct ptlrpc_cli_ctx *get_my_ctx(struct ptlrpc_sec *sec)
196 {
197         struct vfs_cred vcred;
198         int create = 1, remove_dead = 1;
199
200         LASSERT(sec);
201         LASSERT(sec->ps_policy->sp_cops->lookup_ctx);
202
203         if (sec->ps_flags & (PTLRPC_SEC_FL_REVERSE | PTLRPC_SEC_FL_ROOTONLY)) {
204                 vcred.vc_uid = 0;
205                 vcred.vc_gid = 0;
206                 if (sec->ps_flags & PTLRPC_SEC_FL_REVERSE) {
207                         create = 0;
208                         remove_dead = 0;
209                 }
210         } else {
211                 vcred.vc_uid = cfs_current()->uid;
212                 vcred.vc_gid = cfs_current()->gid;
213         }
214
215         return sec->ps_policy->sp_cops->lookup_ctx(sec, &vcred,
216                                                    create, remove_dead);
217 }
218
219 struct ptlrpc_cli_ctx *sptlrpc_cli_ctx_get(struct ptlrpc_cli_ctx *ctx)
220 {
221         LASSERT(atomic_read(&ctx->cc_refcount) > 0);
222         atomic_inc(&ctx->cc_refcount);
223         return ctx;
224 }
225 EXPORT_SYMBOL(sptlrpc_cli_ctx_get);
226
227 void sptlrpc_cli_ctx_put(struct ptlrpc_cli_ctx *ctx, int sync)
228 {
229         struct ptlrpc_sec *sec = ctx->cc_sec;
230
231         LASSERT(sec);
232         LASSERT(atomic_read(&ctx->cc_refcount));
233
234         if (!atomic_dec_and_test(&ctx->cc_refcount))
235                 return;
236
237         sec->ps_policy->sp_cops->release_ctx(sec, ctx, sync);
238 }
239 EXPORT_SYMBOL(sptlrpc_cli_ctx_put);
240
241 /*
242  * expire the context immediately.
243  * the caller must hold at least 1 ref on the ctx.
244  */
245 void sptlrpc_cli_ctx_expire(struct ptlrpc_cli_ctx *ctx)
246 {
247         LASSERT(ctx->cc_ops->die);
248         ctx->cc_ops->die(ctx, 0);
249 }
250 EXPORT_SYMBOL(sptlrpc_cli_ctx_expire);
251
252 void sptlrpc_cli_ctx_wakeup(struct ptlrpc_cli_ctx *ctx)
253 {
254         struct ptlrpc_request *req, *next;
255
256         spin_lock(&ctx->cc_lock);
257         list_for_each_entry_safe(req, next, &ctx->cc_req_list, rq_ctx_chain) {
258                 list_del_init(&req->rq_ctx_chain);
259                 ptlrpc_wake_client_req(req);
260         }
261         spin_unlock(&ctx->cc_lock);
262 }
263 EXPORT_SYMBOL(sptlrpc_cli_ctx_wakeup);
264
265 int sptlrpc_cli_ctx_display(struct ptlrpc_cli_ctx *ctx, char *buf, int bufsize)
266 {
267         LASSERT(ctx->cc_ops);
268
269         if (ctx->cc_ops->display == NULL)
270                 return 0;
271
272         return ctx->cc_ops->display(ctx, buf, bufsize);
273 }
274
275 int sptlrpc_req_get_ctx(struct ptlrpc_request *req)
276 {
277         struct obd_import *imp = req->rq_import;
278         ENTRY;
279
280         LASSERT(!req->rq_cli_ctx);
281         LASSERT(imp);
282
283         if (imp->imp_sec == NULL) {
284                 CERROR("import %p (%s) with no sec pointer\n",
285                        imp, ptlrpc_import_state_name(imp->imp_state));
286                 RETURN(-EACCES);
287         }
288
289         req->rq_cli_ctx = get_my_ctx(imp->imp_sec);
290
291         if (!req->rq_cli_ctx) {
292                 CERROR("req %p: fail to get context\n", req);
293                 RETURN(-ENOMEM);
294         }
295
296         RETURN(0);
297 }
298
299 /*
300  * if @sync == 0, this function should return quickly without sleep;
301  * otherwise might trigger ctx destroying rpc to server.
302  */
303 void sptlrpc_req_put_ctx(struct ptlrpc_request *req, int sync)
304 {
305         ENTRY;
306
307         LASSERT(req);
308         LASSERT(req->rq_cli_ctx);
309
310         /* request might be asked to release earlier while still
311          * in the context waiting list.
312          */
313         if (!list_empty(&req->rq_ctx_chain)) {
314                 spin_lock(&req->rq_cli_ctx->cc_lock);
315                 list_del_init(&req->rq_ctx_chain);
316                 spin_unlock(&req->rq_cli_ctx->cc_lock);
317         }
318
319         sptlrpc_cli_ctx_put(req->rq_cli_ctx, sync);
320         req->rq_cli_ctx = NULL;
321         EXIT;
322 }
323
324 /*
325  * request must have a context. if failed to get new context,
326  * just restore the old one
327  */
328 int sptlrpc_req_replace_dead_ctx(struct ptlrpc_request *req)
329 {
330         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
331         int rc;
332         ENTRY;
333
334         LASSERT(ctx);
335         LASSERT(test_bit(PTLRPC_CTX_DEAD_BIT, &ctx->cc_flags));
336
337         /* make sure not on context waiting list */
338         spin_lock(&ctx->cc_lock);
339         list_del_init(&req->rq_ctx_chain);
340         spin_unlock(&ctx->cc_lock);
341
342         sptlrpc_cli_ctx_get(ctx);
343         sptlrpc_req_put_ctx(req, 0);
344         rc = sptlrpc_req_get_ctx(req);
345         if (!rc) {
346                 LASSERT(req->rq_cli_ctx);
347                 sptlrpc_cli_ctx_put(ctx, 1);
348         } else {
349                 LASSERT(!req->rq_cli_ctx);
350                 req->rq_cli_ctx = ctx;
351         }
352         RETURN(rc);
353 }
354 EXPORT_SYMBOL(sptlrpc_req_replace_dead_ctx);
355
356 static
357 int ctx_check_refresh(struct ptlrpc_cli_ctx *ctx)
358 {
359         if (cli_ctx_is_refreshed(ctx))
360                 return 1;
361         return 0;
362 }
363
364 static
365 int ctx_refresh_timeout(void *data)
366 {
367         struct ptlrpc_request *req = data;
368         int rc;
369
370         /* conn_cnt is needed in expire_one_request */
371         lustre_msg_set_conn_cnt(req->rq_reqmsg, req->rq_import->imp_conn_cnt);
372
373         rc = ptlrpc_expire_one_request(req);
374         /* if we started recovery, we should mark this ctx dead; otherwise
375          * in case of lgssd died nobody would retire this ctx, following
376          * connecting will still find the same ctx thus cause deadlock.
377          * there's an assumption that expire time of the request should be
378          * later than the context refresh expire time.
379          */
380         if (rc == 0)
381                 req->rq_cli_ctx->cc_ops->die(req->rq_cli_ctx, 0);
382         return rc;
383 }
384
385 static
386 void ctx_refresh_interrupt(void *data)
387 {
388         struct ptlrpc_request *req = data;
389
390         spin_lock(&req->rq_lock);
391         req->rq_intr = 1;
392         spin_unlock(&req->rq_lock);
393 }
394
395 static
396 void req_off_ctx_list(struct ptlrpc_request *req, struct ptlrpc_cli_ctx *ctx)
397 {
398         spin_lock(&ctx->cc_lock);
399         if (!list_empty(&req->rq_ctx_chain))
400                 list_del_init(&req->rq_ctx_chain);
401         spin_unlock(&ctx->cc_lock);
402 }
403
404 /*
405  * the status of context could be subject to be changed by other threads at any
406  * time. we allow this race. but once we return with 0, the caller will
407  * suppose it's uptodated and keep using it until the owning rpc is done.
408  *
409  * @timeout:
410  *    < 0  - don't wait
411  *    = 0  - wait until success or fatal error occur
412  *    > 0  - timeout value
413  *
414  * return 0 only if the context is uptodated.
415  */
416 int sptlrpc_req_refresh_ctx(struct ptlrpc_request *req, long timeout)
417 {
418         struct ptlrpc_cli_ctx  *ctx = req->rq_cli_ctx;
419         struct l_wait_info      lwi;
420         int                     rc;
421         ENTRY;
422
423         LASSERT(ctx);
424
425         /* skip special ctxs */
426         if (cli_ctx_is_eternal(ctx) || req->rq_ctx_init || req->rq_ctx_fini)
427                 RETURN(0);
428
429         if (test_bit(PTLRPC_CTX_NEW_BIT, &ctx->cc_flags)) {
430                 LASSERT(ctx->cc_ops->refresh);
431                 ctx->cc_ops->refresh(ctx);
432         }
433         LASSERT(test_bit(PTLRPC_CTX_NEW_BIT, &ctx->cc_flags) == 0);
434
435 again:
436         LASSERT(ctx->cc_ops->validate);
437         if (ctx->cc_ops->validate(ctx) == 0) {
438                 req_off_ctx_list(req, ctx);
439                 RETURN(0);
440         }
441
442         if (unlikely(test_bit(PTLRPC_CTX_ERROR_BIT, &ctx->cc_flags))) {
443                 req->rq_err = 1;
444                 req_off_ctx_list(req, ctx);
445                 RETURN(-EPERM);
446         }
447
448         /* This is subtle. For resent message we have to keep original
449          * context to survive following situation:
450          *  1. the request sent to server
451          *  2. recovery was kick start
452          *  3. recovery finished, the request marked as resent
453          *  4. resend the request
454          *  5. old reply from server received (because xid is the same)
455          *  6. verify reply (has to be success)
456          *  7. new reply from server received, lnet drop it
457          *
458          * Note we can't simply change xid for resent request because
459          * server reply on it for reply reconstruction.
460          *
461          * Commonly the original context should be uptodate because we
462          * have a expiry nice time; And server will keep their half part
463          * context because we at least hold a ref of old context which
464          * prevent the context detroy RPC be sent. So server still can
465          * accept the request and finish RPC. Two cases:
466          *  1. If server side context has been trimed, a NO_CONTEXT will
467          *     be returned, gss_cli_ctx_verify/unseal will switch to new
468          *     context by force.
469          *  2. Current context never be refreshed, then we are fine: we
470          *     never really send request with old context before.
471          */
472         if (test_bit(PTLRPC_CTX_UPTODATE_BIT, &ctx->cc_flags) &&
473             unlikely(req->rq_reqmsg) &&
474             lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
475                 req_off_ctx_list(req, ctx);
476                 RETURN(0);
477         }
478
479         if (unlikely(test_bit(PTLRPC_CTX_DEAD_BIT, &ctx->cc_flags))) {
480                 /* don't have to, but we don't want to release it too soon */
481                 sptlrpc_cli_ctx_get(ctx);
482
483                 rc = sptlrpc_req_replace_dead_ctx(req);
484                 if (rc) {
485                         LASSERT(ctx == req->rq_cli_ctx);
486                         CERROR("req %p: failed to replace dead ctx %p\n",
487                                 req, ctx);
488                         req->rq_err = 1;
489                         LASSERT(list_empty(&req->rq_ctx_chain));
490                         sptlrpc_cli_ctx_put(ctx, 1);
491                         RETURN(-ENOMEM);
492                 }
493
494                 /* FIXME
495                  * if ctx didn't really switch, might be cpu tight or sth,
496                  * we just relax a little bit.
497                  */
498                 if (ctx == req->rq_cli_ctx)
499                         schedule();
500
501                 CWARN("req %p: replace dead ctx %p(%u->%s) => %p\n",
502                       req, ctx, ctx->cc_vcred.vc_uid,
503                       sec2target_str(ctx->cc_sec), req->rq_cli_ctx);
504
505                 sptlrpc_cli_ctx_put(ctx, 1);
506                 ctx = req->rq_cli_ctx;
507                 LASSERT(list_empty(&req->rq_ctx_chain));
508
509                 goto again;
510         }
511
512         /* Now we're sure this context is during upcall, add myself into
513          * waiting list
514          */
515         spin_lock(&ctx->cc_lock);
516         if (list_empty(&req->rq_ctx_chain))
517                 list_add(&req->rq_ctx_chain, &ctx->cc_req_list);
518         spin_unlock(&ctx->cc_lock);
519
520         if (timeout < 0) {
521                 RETURN(-EWOULDBLOCK);
522         }
523
524         /* Clear any flags that may be present from previous sends */
525         LASSERT(req->rq_receiving_reply == 0);
526         spin_lock(&req->rq_lock);
527         req->rq_err = 0;
528         req->rq_timedout = 0;
529         req->rq_resend = 0;
530         req->rq_restart = 0;
531         spin_unlock(&req->rq_lock);
532
533         lwi = LWI_TIMEOUT_INTR(timeout * HZ, ctx_refresh_timeout,
534                                ctx_refresh_interrupt, req);
535         rc = l_wait_event(req->rq_reply_waitq, ctx_check_refresh(ctx), &lwi);
536
537         /* five cases we are here:
538          * 1. successfully refreshed;
539          * 2. someone else mark this ctx dead by force;
540          * 3. interruptted;
541          * 4. timedout, and we don't want recover from the failure;
542          * 5. timedout, and waked up upon recovery finished;
543          */
544         if (!cli_ctx_is_refreshed(ctx)) {
545                 /* timed out or interruptted */
546                 req_off_ctx_list(req, ctx);
547
548                 LASSERT(rc != 0);
549                 RETURN(rc);
550         }
551
552         goto again;
553 }
554
555 void sptlrpc_req_set_flavor(struct ptlrpc_request *req, int opcode)
556 {
557         struct sec_flavor_config *conf;
558
559         LASSERT(req->rq_import);
560         LASSERT(req->rq_import->imp_sec);
561         LASSERT(req->rq_cli_ctx);
562         LASSERT(req->rq_cli_ctx->cc_sec);
563         LASSERT(req->rq_bulk_read == 0 || req->rq_bulk_write == 0);
564
565         /* special security flags accoding to opcode */
566         switch (opcode) {
567         case OST_READ:
568                 req->rq_bulk_read = 1;
569                 break;
570         case OST_WRITE:
571                 req->rq_bulk_write = 1;
572                 break;
573         case SEC_CTX_INIT:
574                 req->rq_ctx_init = 1;
575                 break;
576         case SEC_CTX_FINI:
577                 req->rq_ctx_fini = 1;
578                 break;
579         }
580
581         req->rq_sec_flavor = req->rq_cli_ctx->cc_sec->ps_flavor;
582
583         /* force SVC_NULL for context initiation rpc, SVC_INTG for context
584          * destruction rpc
585          */
586         if (unlikely(req->rq_ctx_init)) {
587                 req->rq_sec_flavor = SEC_MAKE_RPC_FLAVOR(
588                                 SEC_FLAVOR_POLICY(req->rq_sec_flavor),
589                                 SEC_FLAVOR_MECH(req->rq_sec_flavor),
590                                 SPTLRPC_SVC_NULL);
591         } else if (unlikely(req->rq_ctx_fini)) {
592                 req->rq_sec_flavor = SEC_MAKE_RPC_FLAVOR(
593                                 SEC_FLAVOR_POLICY(req->rq_sec_flavor),
594                                 SEC_FLAVOR_MECH(req->rq_sec_flavor),
595                                 SPTLRPC_SVC_INTG);
596         }
597
598         conf = &req->rq_import->imp_obd->u.cli.cl_sec_conf;
599
600         /* user descriptor flag, except ROOTONLY which don't need, and
601          * null security which can't
602          */
603         if ((conf->sfc_flags & PTLRPC_SEC_FL_ROOTONLY) == 0 &&
604             req->rq_sec_flavor != SPTLRPC_FLVR_NULL)
605                 req->rq_sec_flavor |= SEC_FLAVOR_FL_USER;
606
607         /* bulk security flag */
608         if ((req->rq_bulk_read || req->rq_bulk_write) &&
609             (conf->sfc_bulk_priv != BULK_PRIV_ALG_NULL ||
610              conf->sfc_bulk_csum != BULK_CSUM_ALG_NULL))
611                 req->rq_sec_flavor |= SEC_FLAVOR_FL_BULK;
612 }
613
614 void sptlrpc_request_out_callback(struct ptlrpc_request *req)
615 {
616         if (SEC_FLAVOR_SVC(req->rq_sec_flavor) != SPTLRPC_SVC_PRIV)
617                 return;
618
619         LASSERT(req->rq_clrbuf);
620         if (req->rq_pool || !req->rq_reqbuf)
621                 return;
622
623         OBD_FREE(req->rq_reqbuf, req->rq_reqbuf_len);
624         req->rq_reqbuf = NULL;
625         req->rq_reqbuf_len = 0;
626 }
627
628 /*
629  * check whether current user have valid context for an import or not.
630  * might repeatedly try in case of non-fatal errors.
631  * return 0 on success, < 0 on failure
632  */
633 int sptlrpc_import_check_ctx(struct obd_import *imp)
634 {
635         struct ptlrpc_cli_ctx *ctx;
636         struct ptlrpc_request *req = NULL;
637         int rc;
638         ENTRY;
639
640         might_sleep();
641
642         ctx = get_my_ctx(imp->imp_sec);
643         if (!ctx)
644                 RETURN(1);
645
646         if (cli_ctx_is_eternal(ctx) ||
647             ctx->cc_ops->validate(ctx) == 0) {
648                 sptlrpc_cli_ctx_put(ctx, 1);
649                 RETURN(0);
650         }
651
652         OBD_ALLOC_PTR(req);
653         if (!req)
654                 RETURN(-ENOMEM);
655
656         spin_lock_init(&req->rq_lock);
657         atomic_set(&req->rq_refcount, 10000);
658         CFS_INIT_LIST_HEAD(&req->rq_ctx_chain);
659         init_waitqueue_head(&req->rq_reply_waitq);
660         req->rq_import = imp;
661         req->rq_cli_ctx = ctx;
662
663         rc = sptlrpc_req_refresh_ctx(req, 0);
664         LASSERT(list_empty(&req->rq_ctx_chain));
665         sptlrpc_cli_ctx_put(req->rq_cli_ctx, 1);
666         OBD_FREE_PTR(req);
667
668         RETURN(rc);
669 }
670
671 int sptlrpc_cli_wrap_request(struct ptlrpc_request *req)
672 {
673         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
674         int rc = 0;
675         ENTRY;
676
677         LASSERT(ctx);
678         LASSERT(ctx->cc_sec);
679         LASSERT(req->rq_reqbuf || req->rq_clrbuf);
680
681         /* we wrap bulk request here because now we can be sure
682          * the context is uptodate.
683          */
684         if (req->rq_bulk) {
685                 rc = sptlrpc_cli_wrap_bulk(req, req->rq_bulk);
686                 if (rc)
687                         RETURN(rc);
688         }
689
690         switch (SEC_FLAVOR_SVC(req->rq_sec_flavor)) {
691         case SPTLRPC_SVC_NULL:
692         case SPTLRPC_SVC_AUTH:
693         case SPTLRPC_SVC_INTG:
694                 LASSERT(ctx->cc_ops->sign);
695                 rc = ctx->cc_ops->sign(ctx, req);
696                 break;
697         case SPTLRPC_SVC_PRIV:
698                 LASSERT(ctx->cc_ops->seal);
699                 rc = ctx->cc_ops->seal(ctx, req);
700                 break;
701         default:
702                 LBUG();
703         }
704
705         if (rc == 0) {
706                 LASSERT(req->rq_reqdata_len);
707                 LASSERT(req->rq_reqdata_len % 8 == 0);
708                 LASSERT(req->rq_reqdata_len <= req->rq_reqbuf_len);
709         }
710
711         RETURN(rc);
712 }
713
714 /*
715  * rq_nob_received is the actual received data length
716  */
717 int sptlrpc_cli_unwrap_reply(struct ptlrpc_request *req)
718 {
719         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
720         int rc;
721         ENTRY;
722
723         LASSERT(ctx);
724         LASSERT(ctx->cc_sec);
725         LASSERT(ctx->cc_ops);
726         LASSERT(req->rq_repbuf);
727
728         req->rq_repdata_len = req->rq_nob_received;
729
730         if (req->rq_nob_received < sizeof(struct lustre_msg)) {
731                 CERROR("replied data length %d too small\n",
732                        req->rq_nob_received);
733                 RETURN(-EPROTO);
734         }
735
736         if (req->rq_repbuf->lm_magic == LUSTRE_MSG_MAGIC_V1 ||
737             req->rq_repbuf->lm_magic == LUSTRE_MSG_MAGIC_V1_SWABBED) {
738                 /* it's must be null flavor, so our requets also should be
739                  * in null flavor */
740                 if (SEC_FLAVOR_POLICY(req->rq_sec_flavor) !=
741                     SPTLRPC_POLICY_NULL) {
742                         CERROR("request flavor is %x but reply with null\n",
743                                req->rq_sec_flavor);
744                         RETURN(-EPROTO);
745                 }
746         } else {
747                 /* v2 message... */
748                 ptlrpc_sec_flavor_t tmpf = req->rq_repbuf->lm_secflvr;
749
750                 if (req->rq_repbuf->lm_magic == LUSTRE_MSG_MAGIC_V2_SWABBED)
751                         __swab32s(&tmpf);
752
753                 if (SEC_FLAVOR_POLICY(tmpf) !=
754                     SEC_FLAVOR_POLICY(req->rq_sec_flavor)) {
755                         CERROR("request policy %u while reply with %d\n",
756                                SEC_FLAVOR_POLICY(req->rq_sec_flavor),
757                                SEC_FLAVOR_POLICY(tmpf));
758                         RETURN(-EPROTO);
759                 }
760
761                 if ((SEC_FLAVOR_POLICY(req->rq_sec_flavor) !=
762                      SPTLRPC_POLICY_NULL) &&
763                     lustre_unpack_msg(req->rq_repbuf, req->rq_nob_received))
764                         RETURN(-EPROTO);
765         }
766
767         switch (SEC_FLAVOR_SVC(req->rq_sec_flavor)) {
768         case SPTLRPC_SVC_NULL:
769         case SPTLRPC_SVC_AUTH:
770         case SPTLRPC_SVC_INTG:
771                 LASSERT(ctx->cc_ops->verify);
772                 rc = ctx->cc_ops->verify(ctx, req);
773                 break;
774         case SPTLRPC_SVC_PRIV:
775                 LASSERT(ctx->cc_ops->unseal);
776                 rc = ctx->cc_ops->unseal(ctx, req);
777                 break;
778         default:
779                 LBUG();
780         }
781
782         LASSERT(rc || req->rq_repmsg || req->rq_resend);
783         RETURN(rc);
784 }
785
786 /**************************************************
787  * client side high-level security APIs           *
788  **************************************************/
789
790 static
791 void sec_cop_destroy_sec(struct ptlrpc_sec *sec)
792 {
793         struct ptlrpc_sec_policy *policy = sec->ps_policy;
794
795         LASSERT(atomic_read(&sec->ps_refcount) == 0);
796         LASSERT(atomic_read(&sec->ps_busy) == 0);
797         LASSERT(policy->sp_cops->destroy_sec);
798
799         CDEBUG(D_SEC, "%s@%p: being destroied\n", sec->ps_policy->sp_name, sec);
800
801         policy->sp_cops->destroy_sec(sec);
802         sptlrpc_policy_put(policy);
803 }
804
805 static
806 int sec_cop_flush_ctx_cache(struct ptlrpc_sec *sec, uid_t uid,
807                             int grace, int force)
808 {
809         struct ptlrpc_sec_policy *policy = sec->ps_policy;
810
811         LASSERT(policy->sp_cops);
812         LASSERT(policy->sp_cops->flush_ctx_cache);
813
814         return policy->sp_cops->flush_ctx_cache(sec, uid, grace, force);
815 }
816
817 void sptlrpc_sec_destroy(struct ptlrpc_sec *sec)
818 {
819         sec_cop_destroy_sec(sec);
820 }
821 EXPORT_SYMBOL(sptlrpc_sec_destroy);
822
823 /*
824  * let policy module to determine whether take refrence of
825  * import or not.
826  */
827 static
828 struct ptlrpc_sec * import_create_sec(struct obd_import *imp,
829                                       struct ptlrpc_svc_ctx *ctx,
830                                       __u32 flavor,
831                                       unsigned long flags)
832 {
833         struct ptlrpc_sec_policy *policy;
834         struct ptlrpc_sec *sec;
835         ENTRY;
836
837         flavor = SEC_FLAVOR_RPC(flavor);
838
839         if (ctx) {
840                 LASSERT(imp->imp_dlm_fake == 1);
841
842                 CDEBUG(D_SEC, "%s %s: reverse sec using flavor %s\n",
843                        imp->imp_obd->obd_type->typ_name,
844                        imp->imp_obd->obd_name,
845                        sptlrpc_flavor2name(flavor));
846
847                 policy = sptlrpc_policy_get(ctx->sc_policy);
848                 flags |= PTLRPC_SEC_FL_REVERSE | PTLRPC_SEC_FL_ROOTONLY;
849         } else {
850                 LASSERT(imp->imp_dlm_fake == 0);
851
852                 CDEBUG(D_SEC, "%s %s: select security flavor %s\n",
853                        imp->imp_obd->obd_type->typ_name,
854                        imp->imp_obd->obd_name,
855                        sptlrpc_flavor2name(flavor));
856
857                 policy = sptlrpc_flavor2policy(flavor);
858                 if (!policy) {
859                         CERROR("invalid flavor 0x%x\n", flavor);
860                         RETURN(NULL);
861                 }
862         }
863
864         sec = policy->sp_cops->create_sec(imp, ctx, flavor, flags);
865         if (sec) {
866                 atomic_inc(&sec->ps_refcount);
867
868                 /* take 1 busy count on behalf of sec itself,
869                  * balanced in sptlrpc_set_put()
870                  */
871                 atomic_inc(&sec->ps_busy);
872
873                 if (sec->ps_gc_interval && policy->sp_cops->gc_ctx)
874                         sptlrpc_gc_add_sec(sec);
875         } else
876                 sptlrpc_policy_put(policy);
877
878         RETURN(sec);
879 }
880
881 int sptlrpc_import_get_sec(struct obd_import *imp,
882                            struct ptlrpc_svc_ctx *ctx,
883                            __u32 flavor,
884                            unsigned long flags)
885 {
886         might_sleep();
887
888         /* old sec might be still there in reconnecting */
889         if (imp->imp_sec)
890                 return 0;
891
892         imp->imp_sec = import_create_sec(imp, ctx, flavor, flags);
893         if (!imp->imp_sec)
894                 return -EINVAL;
895
896         return 0;
897 }
898
899 void sptlrpc_import_put_sec(struct obd_import *imp)
900 {
901         struct ptlrpc_sec        *sec;
902         struct ptlrpc_sec_policy *policy;
903
904         might_sleep();
905
906         if (imp->imp_sec == NULL)
907                 return;
908
909         sec = imp->imp_sec;
910         policy = sec->ps_policy;
911
912         if (atomic_dec_and_test(&sec->ps_refcount)) {
913                 sec_cop_flush_ctx_cache(sec, -1, 1, 1);
914                 sptlrpc_gc_del_sec(sec);
915
916                 if (atomic_dec_and_test(&sec->ps_busy))
917                         sec_cop_destroy_sec(sec);
918                 else {
919                         CWARN("delay destroying busy sec %s %p\n",
920                               policy->sp_name, sec);
921                 }
922         } else {
923                 sptlrpc_policy_put(policy);
924         }
925
926         imp->imp_sec = NULL;
927 }
928
929 void sptlrpc_import_flush_root_ctx(struct obd_import *imp)
930 {
931         if (imp == NULL || imp->imp_sec == NULL)
932                 return;
933
934         /* it's important to use grace mode, see explain in
935          * sptlrpc_req_refresh_ctx()
936          */
937         sec_cop_flush_ctx_cache(imp->imp_sec, 0, 1, 1);
938 }
939
940 void sptlrpc_import_flush_my_ctx(struct obd_import *imp)
941 {
942         if (imp == NULL || imp->imp_sec == NULL)
943                 return;
944
945         sec_cop_flush_ctx_cache(imp->imp_sec, cfs_current()->uid, 1, 1);
946 }
947 EXPORT_SYMBOL(sptlrpc_import_flush_my_ctx);
948
949 void sptlrpc_import_flush_all_ctx(struct obd_import *imp)
950 {
951         if (imp == NULL || imp->imp_sec == NULL)
952                 return;
953
954         sec_cop_flush_ctx_cache(imp->imp_sec, -1, 1, 1);
955 }
956 EXPORT_SYMBOL(sptlrpc_import_flush_all_ctx);
957
958 /*
959  * when complete successfully, req->rq_reqmsg should point to the
960  * right place.
961  */
962 int sptlrpc_cli_alloc_reqbuf(struct ptlrpc_request *req, int msgsize)
963 {
964         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
965         struct ptlrpc_sec_policy *policy;
966         int rc;
967
968         LASSERT(ctx);
969         LASSERT(atomic_read(&ctx->cc_refcount));
970         LASSERT(ctx->cc_sec);
971         LASSERT(ctx->cc_sec->ps_policy);
972         LASSERT(req->rq_reqmsg == NULL);
973
974         policy = ctx->cc_sec->ps_policy;
975         rc = policy->sp_cops->alloc_reqbuf(ctx->cc_sec, req, msgsize);
976         if (!rc) {
977                 LASSERT(req->rq_reqmsg);
978                 LASSERT(req->rq_reqbuf || req->rq_clrbuf);
979
980                 /* zeroing preallocated buffer */
981                 if (req->rq_pool)
982                         memset(req->rq_reqmsg, 0, msgsize);
983         }
984
985         return rc;
986 }
987
988 void sptlrpc_cli_free_reqbuf(struct ptlrpc_request *req)
989 {
990         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
991         struct ptlrpc_sec_policy *policy;
992
993         LASSERT(ctx);
994         LASSERT(atomic_read(&ctx->cc_refcount));
995         LASSERT(ctx->cc_sec);
996         LASSERT(ctx->cc_sec->ps_policy);
997         LASSERT(req->rq_reqbuf || req->rq_clrbuf);
998
999         policy = ctx->cc_sec->ps_policy;
1000         policy->sp_cops->free_reqbuf(ctx->cc_sec, req);
1001 }
1002
1003 /*
1004  * NOTE caller must guarantee the buffer size is enough for the enlargement
1005  */
1006 void _sptlrpc_enlarge_msg_inplace(struct lustre_msg *msg,
1007                                   int segment, int newsize)
1008 {
1009         void   *src, *dst;
1010         int     oldsize, oldmsg_size, movesize;
1011
1012         LASSERT(segment < msg->lm_bufcount);
1013         LASSERT(msg->lm_buflens[segment] <= newsize);
1014
1015         if (msg->lm_buflens[segment] == newsize)
1016                 return;
1017
1018         /* nothing to do if we are enlarging the last segment */
1019         if (segment == msg->lm_bufcount - 1) {
1020                 msg->lm_buflens[segment] = newsize;
1021                 return;
1022         }
1023
1024         oldsize = msg->lm_buflens[segment];
1025
1026         src = lustre_msg_buf(msg, segment + 1, 0);
1027         msg->lm_buflens[segment] = newsize;
1028         dst = lustre_msg_buf(msg, segment + 1, 0);
1029         msg->lm_buflens[segment] = oldsize;
1030
1031         /* move from segment + 1 to end segment */
1032         LASSERT(msg->lm_magic == LUSTRE_MSG_MAGIC_V2);
1033         oldmsg_size = lustre_msg_size_v2(msg->lm_bufcount, msg->lm_buflens);
1034         movesize = oldmsg_size - ((unsigned long) src - (unsigned long) msg);
1035         LASSERT(movesize >= 0);
1036
1037         if (movesize)
1038                 memmove(dst, src, movesize);
1039
1040         /* note we don't clear the ares where old data live, not secret */
1041
1042         /* finally set new segment size */
1043         msg->lm_buflens[segment] = newsize;
1044 }
1045 EXPORT_SYMBOL(_sptlrpc_enlarge_msg_inplace);
1046
1047 /*
1048  * enlarge @segment of upper message req->rq_reqmsg to @newsize, all data
1049  * will be preserved after enlargement. this must be called after rq_reqmsg has
1050  * been intialized at least.
1051  *
1052  * caller's attention: upon return, rq_reqmsg and rq_reqlen might have
1053  * been changed.
1054  */
1055 int sptlrpc_cli_enlarge_reqbuf(struct ptlrpc_request *req,
1056                                int segment, int newsize)
1057 {
1058         struct ptlrpc_cli_ctx    *ctx = req->rq_cli_ctx;
1059         struct ptlrpc_sec_cops   *cops;
1060         struct lustre_msg        *msg = req->rq_reqmsg;
1061
1062         LASSERT(ctx);
1063         LASSERT(msg);
1064         LASSERT(msg->lm_bufcount > segment);
1065         LASSERT(msg->lm_buflens[segment] <= newsize);
1066
1067         if (msg->lm_buflens[segment] == newsize)
1068                 return 0;
1069
1070         cops = ctx->cc_sec->ps_policy->sp_cops;
1071         LASSERT(cops->enlarge_reqbuf);
1072         return cops->enlarge_reqbuf(ctx->cc_sec, req, segment, newsize);
1073 }
1074 EXPORT_SYMBOL(sptlrpc_cli_enlarge_reqbuf);
1075
1076 int sptlrpc_cli_alloc_repbuf(struct ptlrpc_request *req, int msgsize)
1077 {
1078         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
1079         struct ptlrpc_sec_policy *policy;
1080         ENTRY;
1081
1082         LASSERT(ctx);
1083         LASSERT(atomic_read(&ctx->cc_refcount));
1084         LASSERT(ctx->cc_sec);
1085         LASSERT(ctx->cc_sec->ps_policy);
1086
1087         if (req->rq_repbuf)
1088                 RETURN(0);
1089
1090         policy = ctx->cc_sec->ps_policy;
1091         RETURN(policy->sp_cops->alloc_repbuf(ctx->cc_sec, req, msgsize));
1092 }
1093
1094 void sptlrpc_cli_free_repbuf(struct ptlrpc_request *req)
1095 {
1096         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
1097         struct ptlrpc_sec_policy *policy;
1098         ENTRY;
1099
1100         LASSERT(ctx);
1101         LASSERT(atomic_read(&ctx->cc_refcount));
1102         LASSERT(ctx->cc_sec);
1103         LASSERT(ctx->cc_sec->ps_policy);
1104         LASSERT(req->rq_repbuf);
1105
1106         policy = ctx->cc_sec->ps_policy;
1107         policy->sp_cops->free_repbuf(ctx->cc_sec, req);
1108         EXIT;
1109 }
1110
1111 int sptlrpc_cli_install_rvs_ctx(struct obd_import *imp,
1112                                 struct ptlrpc_cli_ctx *ctx)
1113 {
1114         struct ptlrpc_sec_policy *policy = ctx->cc_sec->ps_policy;
1115
1116         if (!policy->sp_cops->install_rctx)
1117                 return 0;
1118         return policy->sp_cops->install_rctx(imp, ctx->cc_sec, ctx);
1119 }
1120
1121 int sptlrpc_svc_install_rvs_ctx(struct obd_import *imp,
1122                                 struct ptlrpc_svc_ctx *ctx)
1123 {
1124         struct ptlrpc_sec_policy *policy = ctx->sc_policy;
1125
1126         if (!policy->sp_sops->install_rctx)
1127                 return 0;
1128         return policy->sp_sops->install_rctx(imp, ctx);
1129 }
1130
1131 /****************************************
1132  * server side security                 *
1133  ****************************************/
1134
1135 int sptlrpc_target_export_check(struct obd_export *exp,
1136                                 struct ptlrpc_request *req)
1137 {
1138         if (!req->rq_auth_gss ||
1139             (!req->rq_auth_usr_root && !req->rq_auth_usr_mdt))
1140                 return 0;
1141
1142         if (!req->rq_ctx_init)
1143                 return 0;
1144
1145         LASSERT(exp->exp_imp_reverse);
1146         sptlrpc_svc_install_rvs_ctx(exp->exp_imp_reverse, req->rq_svc_ctx);
1147         return 0;
1148 }
1149
1150 int sptlrpc_svc_unwrap_request(struct ptlrpc_request *req)
1151 {
1152         struct ptlrpc_sec_policy *policy;
1153         struct lustre_msg *msg = req->rq_reqbuf;
1154         int rc;
1155         ENTRY;
1156
1157         LASSERT(msg);
1158         LASSERT(req->rq_reqmsg == NULL);
1159         LASSERT(req->rq_repmsg == NULL);
1160
1161         /* 
1162          * in any case we avoid to call unpack_msg() for request of null flavor
1163          * which will later be done by ptlrpc_server_handle_request().
1164          */
1165         if (req->rq_reqdata_len < sizeof(struct lustre_msg)) {
1166                 CERROR("request size %d too small\n", req->rq_reqdata_len);
1167                 RETURN(SECSVC_DROP);
1168         }
1169
1170         if (msg->lm_magic == LUSTRE_MSG_MAGIC_V1 ||
1171             msg->lm_magic == LUSTRE_MSG_MAGIC_V1_SWABBED) {
1172                 req->rq_sec_flavor = SPTLRPC_FLVR_NULL;
1173         } else {
1174                 req->rq_sec_flavor = msg->lm_secflvr;
1175
1176                 if (msg->lm_magic == LUSTRE_MSG_MAGIC_V2_SWABBED)
1177                         __swab32s(&req->rq_sec_flavor);
1178
1179                 if ((SEC_FLAVOR_POLICY(req->rq_sec_flavor) !=
1180                      SPTLRPC_POLICY_NULL) &&
1181                     lustre_unpack_msg(msg, req->rq_reqdata_len))
1182                         RETURN(SECSVC_DROP);
1183         }
1184
1185         policy = sptlrpc_flavor2policy(req->rq_sec_flavor);
1186         if (!policy) {
1187                 CERROR("unsupported security flavor %x\n", req->rq_sec_flavor);
1188                 RETURN(SECSVC_DROP);
1189         }
1190
1191         LASSERT(policy->sp_sops->accept);
1192         rc = policy->sp_sops->accept(req);
1193
1194         LASSERT(req->rq_reqmsg || rc != SECSVC_OK);
1195         sptlrpc_policy_put(policy);
1196
1197         /* FIXME move to proper place */
1198         if (rc == SECSVC_OK) {
1199                 __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1200
1201                 if (opc == OST_WRITE)
1202                         req->rq_bulk_write = 1;
1203                 else if (opc == OST_READ)
1204                         req->rq_bulk_read = 1;
1205         }
1206
1207         LASSERT(req->rq_svc_ctx || rc == SECSVC_DROP);
1208         RETURN(rc);
1209 }
1210
1211 int sptlrpc_svc_alloc_rs(struct ptlrpc_request *req,
1212                          int msglen)
1213 {
1214         struct ptlrpc_sec_policy *policy;
1215         struct ptlrpc_reply_state *rs;
1216         int rc;
1217         ENTRY;
1218
1219         LASSERT(req->rq_svc_ctx);
1220         LASSERT(req->rq_svc_ctx->sc_policy);
1221
1222         policy = req->rq_svc_ctx->sc_policy;
1223         LASSERT(policy->sp_sops->alloc_rs);
1224
1225         rc = policy->sp_sops->alloc_rs(req, msglen);
1226         if (unlikely(rc == -ENOMEM)) {
1227                 /* failed alloc, try emergency pool */
1228                 rs = lustre_get_emerg_rs(req->rq_rqbd->rqbd_service);
1229                 if (rs == NULL)
1230                         RETURN(-ENOMEM);
1231
1232                 req->rq_reply_state = rs;
1233                 rc = policy->sp_sops->alloc_rs(req, msglen);
1234                 if (rc) {
1235                         lustre_put_emerg_rs(rs);
1236                         req->rq_reply_state = NULL;
1237                 }
1238         }
1239
1240         LASSERT(rc != 0 ||
1241                 (req->rq_reply_state && req->rq_reply_state->rs_msg));
1242
1243         RETURN(rc);
1244 }
1245
1246 int sptlrpc_svc_wrap_reply(struct ptlrpc_request *req)
1247 {
1248         struct ptlrpc_sec_policy *policy;
1249         int rc;
1250         ENTRY;
1251
1252         LASSERT(req->rq_svc_ctx);
1253         LASSERT(req->rq_svc_ctx->sc_policy);
1254
1255         policy = req->rq_svc_ctx->sc_policy;
1256         LASSERT(policy->sp_sops->authorize);
1257
1258         rc = policy->sp_sops->authorize(req);
1259         LASSERT(rc || req->rq_reply_state->rs_repdata_len);
1260
1261         RETURN(rc);
1262 }
1263
1264 void sptlrpc_svc_free_rs(struct ptlrpc_reply_state *rs)
1265 {
1266         struct ptlrpc_sec_policy *policy;
1267         unsigned int prealloc;
1268         ENTRY;
1269
1270         LASSERT(rs->rs_svc_ctx);
1271         LASSERT(rs->rs_svc_ctx->sc_policy);
1272
1273         policy = rs->rs_svc_ctx->sc_policy;
1274         LASSERT(policy->sp_sops->free_rs);
1275
1276         prealloc = rs->rs_prealloc;
1277         policy->sp_sops->free_rs(rs);
1278
1279         if (prealloc)
1280                 lustre_put_emerg_rs(rs);
1281         EXIT;
1282 }
1283
1284 void sptlrpc_svc_ctx_addref(struct ptlrpc_request *req)
1285 {
1286         struct ptlrpc_svc_ctx *ctx = req->rq_svc_ctx;
1287
1288         if (ctx == NULL)
1289                 return;
1290
1291         LASSERT(atomic_read(&ctx->sc_refcount) > 0);
1292         atomic_inc(&ctx->sc_refcount);
1293 }
1294
1295 void sptlrpc_svc_ctx_decref(struct ptlrpc_request *req)
1296 {
1297         struct ptlrpc_svc_ctx *ctx = req->rq_svc_ctx;
1298
1299         if (ctx == NULL)
1300                 return;
1301
1302         LASSERT(atomic_read(&ctx->sc_refcount) > 0);
1303         if (atomic_dec_and_test(&ctx->sc_refcount)) {
1304                 if (ctx->sc_policy->sp_sops->free_ctx)
1305                         ctx->sc_policy->sp_sops->free_ctx(ctx);
1306         }
1307         req->rq_svc_ctx = NULL;
1308 }
1309
1310 void sptlrpc_svc_ctx_invalidate(struct ptlrpc_request *req)
1311 {
1312         struct ptlrpc_svc_ctx *ctx = req->rq_svc_ctx;
1313
1314         if (ctx == NULL)
1315                 return;
1316
1317         LASSERT(atomic_read(&ctx->sc_refcount) > 0);
1318         if (ctx->sc_policy->sp_sops->invalidate_ctx)
1319                 ctx->sc_policy->sp_sops->invalidate_ctx(ctx);
1320 }
1321 EXPORT_SYMBOL(sptlrpc_svc_ctx_invalidate);
1322
1323 /****************************************
1324  * bulk security                        *
1325  ****************************************/
1326
1327 int sptlrpc_cli_wrap_bulk(struct ptlrpc_request *req,
1328                           struct ptlrpc_bulk_desc *desc)
1329 {
1330         struct ptlrpc_cli_ctx *ctx;
1331
1332         if (!SEC_FLAVOR_HAS_BULK(req->rq_sec_flavor))
1333                 return 0;
1334
1335         LASSERT(req->rq_bulk_read || req->rq_bulk_write);
1336
1337         ctx = req->rq_cli_ctx;
1338         if (ctx->cc_ops->wrap_bulk)
1339                 return ctx->cc_ops->wrap_bulk(ctx, req, desc);
1340         return 0;
1341 }
1342 EXPORT_SYMBOL(sptlrpc_cli_wrap_bulk);
1343
1344 static
1345 void pga_to_bulk_desc(int nob, obd_count pg_count, struct brw_page **pga,
1346                       struct ptlrpc_bulk_desc *desc)
1347 {
1348         int i;
1349
1350         LASSERT(pga);
1351         LASSERT(*pga);
1352
1353         for (i = 0; i < pg_count && nob > 0; i++) {
1354 #ifdef __KERNEL__
1355                 desc->bd_iov[i].kiov_page = pga[i]->pg;
1356                 desc->bd_iov[i].kiov_len = pga[i]->count > nob ?
1357                                            nob : pga[i]->count;
1358                 desc->bd_iov[i].kiov_offset = pga[i]->off & ~CFS_PAGE_MASK;
1359 #else
1360 #warning FIXME for liblustre!
1361                 desc->bd_iov[i].iov_base = pga[i]->pg->addr;
1362                 desc->bd_iov[i].iov_len = pga[i]->count > nob ?
1363                                            nob : pga[i]->count;
1364 #endif
1365
1366                 desc->bd_iov_count++;
1367                 nob -= pga[i]->count;
1368         }
1369 }
1370
1371 int sptlrpc_cli_unwrap_bulk_read(struct ptlrpc_request *req,
1372                                  int nob, obd_count pg_count,
1373                                  struct brw_page **pga)
1374 {
1375         struct ptlrpc_bulk_desc *desc;
1376         struct ptlrpc_cli_ctx *ctx;
1377         int rc = 0;
1378
1379         if (!SEC_FLAVOR_HAS_BULK(req->rq_sec_flavor))
1380                 return 0;
1381
1382         LASSERT(req->rq_bulk_read && !req->rq_bulk_write);
1383
1384         OBD_ALLOC(desc, offsetof(struct ptlrpc_bulk_desc, bd_iov[pg_count]));
1385         if (desc == NULL) {
1386                 CERROR("out of memory, can't verify bulk read data\n");
1387                 return -ENOMEM;
1388         }
1389
1390         pga_to_bulk_desc(nob, pg_count, pga, desc);
1391
1392         ctx = req->rq_cli_ctx;
1393         if (ctx->cc_ops->unwrap_bulk)
1394                 rc = ctx->cc_ops->unwrap_bulk(ctx, req, desc);
1395
1396         OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc, bd_iov[pg_count]));
1397
1398         return rc;
1399 }
1400 EXPORT_SYMBOL(sptlrpc_cli_unwrap_bulk_read);
1401
1402 int sptlrpc_cli_unwrap_bulk_write(struct ptlrpc_request *req,
1403                                   struct ptlrpc_bulk_desc *desc)
1404 {
1405         struct ptlrpc_cli_ctx *ctx;
1406
1407         if (!SEC_FLAVOR_HAS_BULK(req->rq_sec_flavor))
1408                 return 0;
1409
1410         LASSERT(!req->rq_bulk_read && req->rq_bulk_write);
1411
1412         ctx = req->rq_cli_ctx;
1413         if (ctx->cc_ops->unwrap_bulk)
1414                 return ctx->cc_ops->unwrap_bulk(ctx, req, desc);
1415
1416         return 0;
1417 }
1418 EXPORT_SYMBOL(sptlrpc_cli_unwrap_bulk_write);
1419
1420 int sptlrpc_svc_wrap_bulk(struct ptlrpc_request *req,
1421                           struct ptlrpc_bulk_desc *desc)
1422 {
1423         struct ptlrpc_svc_ctx *ctx;
1424
1425         if (!SEC_FLAVOR_HAS_BULK(req->rq_sec_flavor))
1426                 return 0;
1427
1428         LASSERT(req->rq_bulk_read || req->rq_bulk_write);
1429
1430         ctx = req->rq_svc_ctx;
1431         if (ctx->sc_policy->sp_sops->wrap_bulk)
1432                 return ctx->sc_policy->sp_sops->wrap_bulk(req, desc);
1433
1434         return 0;
1435 }
1436 EXPORT_SYMBOL(sptlrpc_svc_wrap_bulk);
1437
1438 int sptlrpc_svc_unwrap_bulk(struct ptlrpc_request *req,
1439                             struct ptlrpc_bulk_desc *desc)
1440 {
1441         struct ptlrpc_svc_ctx *ctx;
1442
1443         if (!SEC_FLAVOR_HAS_BULK(req->rq_sec_flavor))
1444                 return 0;
1445
1446         LASSERT(req->rq_bulk_read || req->rq_bulk_write);
1447
1448         ctx = req->rq_svc_ctx;
1449         if (ctx->sc_policy->sp_sops->unwrap_bulk);
1450                 return ctx->sc_policy->sp_sops->unwrap_bulk(req, desc);
1451
1452         return 0;
1453 }
1454 EXPORT_SYMBOL(sptlrpc_svc_unwrap_bulk);
1455
1456
1457 /****************************************
1458  * user descriptor helpers              *
1459  ****************************************/
1460
1461 int sptlrpc_current_user_desc_size(void)
1462 {
1463         int ngroups;
1464
1465 #ifdef __KERNEL__
1466         ngroups = current_ngroups;
1467
1468         if (ngroups > LUSTRE_MAX_GROUPS)
1469                 ngroups = LUSTRE_MAX_GROUPS;
1470 #else
1471         ngroups = 0;
1472 #endif
1473         return sptlrpc_user_desc_size(ngroups);
1474 }
1475 EXPORT_SYMBOL(sptlrpc_current_user_desc_size);
1476
1477 int sptlrpc_pack_user_desc(struct lustre_msg *msg, int offset)
1478 {
1479         struct ptlrpc_user_desc *pud;
1480
1481         pud = lustre_msg_buf(msg, offset, 0);
1482
1483         pud->pud_uid = cfs_current()->uid;
1484         pud->pud_gid = cfs_current()->gid;
1485         pud->pud_fsuid = cfs_current()->fsuid;
1486         pud->pud_fsgid = cfs_current()->fsgid;
1487         pud->pud_cap = cfs_current()->cap_effective;
1488         pud->pud_ngroups = (msg->lm_buflens[offset] - sizeof(*pud)) / 4;
1489
1490 #ifdef __KERNEL__
1491         task_lock(current);
1492         if (pud->pud_ngroups > current_ngroups)
1493                 pud->pud_ngroups = current_ngroups;
1494         memcpy(pud->pud_groups, cfs_current()->group_info->blocks[0],
1495                pud->pud_ngroups * sizeof(__u32));
1496         task_unlock(current);
1497 #endif
1498
1499         return 0;
1500 }
1501 EXPORT_SYMBOL(sptlrpc_pack_user_desc);
1502
1503 int sptlrpc_unpack_user_desc(struct lustre_msg *msg, int offset)
1504 {
1505         struct ptlrpc_user_desc *pud;
1506         int                      i;
1507
1508         pud = lustre_msg_buf(msg, offset, sizeof(*pud));
1509         if (!pud)
1510                 return -EINVAL;
1511
1512         if (lustre_msg_swabbed(msg)) {
1513                 __swab32s(&pud->pud_uid);
1514                 __swab32s(&pud->pud_gid);
1515                 __swab32s(&pud->pud_fsuid);
1516                 __swab32s(&pud->pud_fsgid);
1517                 __swab32s(&pud->pud_cap);
1518                 __swab32s(&pud->pud_ngroups);
1519         }
1520
1521         if (pud->pud_ngroups > LUSTRE_MAX_GROUPS) {
1522                 CERROR("%u groups is too large\n", pud->pud_ngroups);
1523                 return -EINVAL;
1524         }
1525
1526         if (sizeof(*pud) + pud->pud_ngroups * sizeof(__u32) >
1527             msg->lm_buflens[offset]) {
1528                 CERROR("%u groups are claimed but bufsize only %u\n",
1529                        pud->pud_ngroups, msg->lm_buflens[offset]);
1530                 return -EINVAL;
1531         }
1532
1533         if (lustre_msg_swabbed(msg)) {
1534                 for (i = 0; i < pud->pud_ngroups; i++)
1535                         __swab32s(&pud->pud_groups[i]);
1536         }
1537
1538         return 0;
1539 }
1540 EXPORT_SYMBOL(sptlrpc_unpack_user_desc);
1541
1542 /****************************************
1543  * user supplied flavor string parsing  *
1544  ****************************************/
1545
1546 static
1547 int get_default_flavor(enum lustre_part to_part, struct sec_flavor_config *conf)
1548 {
1549         conf->sfc_bulk_priv = BULK_PRIV_ALG_NULL;
1550         conf->sfc_bulk_csum = BULK_CSUM_ALG_NULL;
1551         conf->sfc_flags = 0;
1552
1553         switch (to_part) {
1554         case LUSTRE_MDT:
1555                 conf->sfc_rpc_flavor = SPTLRPC_FLVR_PLAIN;
1556                 return 0;
1557         case LUSTRE_OST:
1558                 conf->sfc_rpc_flavor = SPTLRPC_FLVR_NULL;
1559                 return 0;
1560         default:
1561                 CERROR("Unknown to lustre part %d, apply defaults\n", to_part);
1562                 conf->sfc_rpc_flavor = SPTLRPC_FLVR_NULL;
1563                 return -EINVAL;
1564         }
1565 }
1566
1567 static
1568 void get_flavor_by_rpc(__u32 rpc_flavor, struct sec_flavor_config *conf)
1569 {
1570         conf->sfc_rpc_flavor = rpc_flavor;
1571         conf->sfc_bulk_priv = BULK_PRIV_ALG_NULL;
1572         conf->sfc_bulk_csum = BULK_CSUM_ALG_NULL;
1573         conf->sfc_flags = 0;
1574
1575         switch (rpc_flavor) {
1576         case SPTLRPC_FLVR_NULL:
1577         case SPTLRPC_FLVR_PLAIN:
1578         case SPTLRPC_FLVR_KRB5N:
1579         case SPTLRPC_FLVR_KRB5A:
1580                 break;
1581         case SPTLRPC_FLVR_KRB5P:
1582                 conf->sfc_bulk_priv = BULK_PRIV_ALG_ARC4;
1583                 /* fall through */
1584         case SPTLRPC_FLVR_KRB5I:
1585                 conf->sfc_bulk_csum = BULK_CSUM_ALG_SHA1;
1586                 break;
1587         default:
1588                 LBUG();
1589         }
1590 }
1591
1592 static
1593 void get_flavor_by_rpc_bulk(__u32 rpc_flavor, int bulk_priv,
1594                             struct sec_flavor_config *conf)
1595 {
1596         if (bulk_priv)
1597                 conf->sfc_bulk_priv = BULK_PRIV_ALG_ARC4;
1598         else
1599                 conf->sfc_bulk_priv = BULK_PRIV_ALG_NULL;
1600
1601         switch (rpc_flavor) {
1602         case SPTLRPC_FLVR_PLAIN:
1603                 conf->sfc_bulk_csum = BULK_CSUM_ALG_MD5;
1604                 break;
1605         case SPTLRPC_FLVR_KRB5N:
1606         case SPTLRPC_FLVR_KRB5A:
1607         case SPTLRPC_FLVR_KRB5I:
1608         case SPTLRPC_FLVR_KRB5P:
1609                 conf->sfc_bulk_csum = BULK_CSUM_ALG_SHA1;
1610                 break;
1611         default:
1612                 LBUG();
1613         }
1614 }
1615
1616 static __u32 __flavors[] = {
1617         SPTLRPC_FLVR_NULL,
1618         SPTLRPC_FLVR_PLAIN,
1619         SPTLRPC_FLVR_KRB5N,
1620         SPTLRPC_FLVR_KRB5A,
1621         SPTLRPC_FLVR_KRB5I,
1622         SPTLRPC_FLVR_KRB5P,
1623 };
1624
1625 #define __nflavors      (sizeof(__flavors)/sizeof(__u32))
1626
1627 /*
1628  * flavor string format: rpc[-bulk{n|i|p}[:cksum/enc]]
1629  * for examples:
1630  *  null
1631  *  plain-bulki
1632  *  krb5p-bulkn
1633  *  krb5i-bulkp
1634  *  krb5i-bulkp:sha512/arc4
1635  */
1636 int sptlrpc_parse_flavor(enum lustre_part from_part, enum lustre_part to_part,
1637                          char *str, struct sec_flavor_config *conf)
1638 {
1639         char   *f, *bulk, *alg, *enc;
1640         char    buf[64];
1641         int     i, bulk_priv;
1642         ENTRY;
1643
1644         if (str == NULL) {
1645                 if (get_default_flavor(to_part, conf))
1646                         return -EINVAL;
1647                 goto set_flags;
1648         }
1649
1650         for (i = 0; i < __nflavors; i++) {
1651                 f = sptlrpc_flavor2name(__flavors[i]);
1652                 if (strncmp(str, f, strlen(f)) == 0)
1653                         break;
1654         }
1655
1656         if (i >= __nflavors)
1657                 GOTO(invalid, -EINVAL);
1658
1659         /* prepare local buffer thus we can modify it as we want */
1660         strncpy(buf, str, 64);
1661         buf[64 - 1] = '\0';
1662
1663         /* find bulk string */
1664         bulk = strchr(buf, '-');
1665         if (bulk)
1666                 *bulk++ = '\0';
1667
1668         /* now the first part must equal to rpc flavor name */
1669         if (strcmp(buf, f) != 0)
1670                 GOTO(invalid, -EINVAL);
1671
1672         get_flavor_by_rpc(__flavors[i], conf);
1673
1674         if (bulk == NULL)
1675                 goto set_flags;
1676
1677         /* null flavor should not have any suffix */
1678         if (__flavors[i] == SPTLRPC_FLVR_NULL)
1679                 GOTO(invalid, -EINVAL);
1680
1681         /* find bulk algorithm string */
1682         alg = strchr(bulk, ':');
1683         if (alg)
1684                 *alg++ = '\0';
1685
1686         /* verify bulk section */
1687         if (strcmp(bulk, "bulkn") == 0) {
1688                 conf->sfc_bulk_csum = BULK_CSUM_ALG_NULL;
1689                 conf->sfc_bulk_priv = BULK_PRIV_ALG_NULL;
1690                 goto set_flags;
1691         }
1692
1693         if (strcmp(bulk, "bulki") == 0)
1694                 bulk_priv = 0;
1695         else if (strcmp(bulk, "bulkp") == 0)
1696                 bulk_priv = 1;
1697         else
1698                 GOTO(invalid, -EINVAL);
1699
1700         /* plain policy dosen't support bulk encryption */
1701         if (bulk_priv && __flavors[i] == SPTLRPC_FLVR_PLAIN)
1702                 GOTO(invalid, -EINVAL);
1703
1704         get_flavor_by_rpc_bulk(__flavors[i], bulk_priv, conf);
1705
1706         if (alg == NULL)
1707                 goto set_flags;
1708
1709         /* find encryption algorithm string */
1710         enc = strchr(alg, '/');
1711         if (enc)
1712                 *enc++ = '\0';
1713
1714         /* bulk combination sanity check */
1715         if ((bulk_priv && enc == NULL) || (bulk_priv == 0 && enc))
1716                 GOTO(invalid, -EINVAL);
1717
1718         /* checksum algorithm */
1719         for (i = 0; i < BULK_CSUM_ALG_MAX; i++) {
1720                 if (strcmp(alg, sptlrpc_bulk_csum_alg2name(i)) == 0) {
1721                         conf->sfc_bulk_csum = i;
1722                         break;
1723                 }
1724         }
1725         if (i >= BULK_CSUM_ALG_MAX)
1726                 GOTO(invalid, -EINVAL);
1727
1728         /* privacy algorithm */
1729         if (enc) {
1730                 if (strcmp(enc, "arc4") != 0)
1731                         GOTO(invalid, -EINVAL);
1732                 conf->sfc_bulk_priv = BULK_PRIV_ALG_ARC4;
1733         }
1734
1735 set_flags:
1736         /* * set ROOTONLY flag:
1737          *   - to OST
1738          *   - from MDT to MDT
1739          * * set BULK flag for:
1740          *   - from CLI to OST
1741          */
1742         if (to_part == LUSTRE_OST ||
1743             (from_part == LUSTRE_MDT && to_part == LUSTRE_MDT))
1744                 conf->sfc_flags |= PTLRPC_SEC_FL_ROOTONLY;
1745         if (from_part == LUSTRE_CLI && to_part == LUSTRE_OST)
1746                 conf->sfc_flags |= PTLRPC_SEC_FL_BULK;
1747
1748 #ifdef __BIG_ENDIAN
1749         __swab32s(&conf->sfc_rpc_flavor);
1750         __swab32s(&conf->sfc_bulk_csum);
1751         __swab32s(&conf->sfc_bulk_priv);
1752         __swab32s(&conf->sfc_flags);
1753 #endif
1754         return 0;
1755 invalid:
1756         CERROR("invalid flavor string: %s\n", str);
1757         return -EINVAL;
1758 }
1759 EXPORT_SYMBOL(sptlrpc_parse_flavor);
1760
1761 /****************************************
1762  * misc helpers                         *
1763  ****************************************/
1764
1765 const char * sec2target_str(struct ptlrpc_sec *sec)
1766 {
1767         if (!sec || !sec->ps_import || !sec->ps_import->imp_obd)
1768                 return "*";
1769         if (sec_is_reverse(sec))
1770                 return "c";
1771         return obd_uuid2str(&sec->ps_import->imp_obd->u.cli.cl_target_uuid);
1772 }
1773 EXPORT_SYMBOL(sec2target_str);
1774
1775 /****************************************
1776  * initialize/finalize                  *
1777  ****************************************/
1778
1779 int __init sptlrpc_init(void)
1780 {
1781         int rc;
1782
1783         rc = sptlrpc_gc_start_thread();
1784         if (rc)
1785                 goto out;
1786
1787         rc = sptlrpc_enc_pool_init();
1788         if (rc)
1789                 goto out_gc;
1790
1791         rc = sptlrpc_null_init();
1792         if (rc)
1793                 goto out_pool;
1794
1795         rc = sptlrpc_plain_init();
1796         if (rc)
1797                 goto out_null;
1798
1799         rc = sptlrpc_lproc_init();
1800         if (rc)
1801                 goto out_plain;
1802
1803         return 0;
1804
1805 out_plain:
1806         sptlrpc_plain_fini();
1807 out_null:
1808         sptlrpc_null_fini();
1809 out_pool:
1810         sptlrpc_enc_pool_fini();
1811 out_gc:
1812         sptlrpc_gc_stop_thread();
1813 out:
1814         return rc;
1815 }
1816
1817 void __exit sptlrpc_fini(void)
1818 {
1819         sptlrpc_lproc_fini();
1820         sptlrpc_plain_fini();
1821         sptlrpc_null_fini();
1822         sptlrpc_enc_pool_fini();
1823         sptlrpc_gc_stop_thread();
1824 }