Whamcloud - gitweb
branch: HEAD
[fs/lustre-release.git] / lustre / ptlrpc / sec.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004-2007 Cluster File Systems, Inc.
5  *   Author: Eric Mei <ericm@clusterfs.com>
6  *
7  *   This file is part of Lustre, http://www.lustre.org.
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */
22
23 #ifndef EXPORT_SYMTAB
24 #define EXPORT_SYMTAB
25 #endif
26 #define DEBUG_SUBSYSTEM S_SEC
27
28 #include <libcfs/libcfs.h>
29 #ifndef __KERNEL__
30 #include <liblustre.h>
31 #include <libcfs/list.h>
32 #else
33 #include <linux/crypto.h>
34 #include <linux/key.h>
35 #endif
36
37 #include <obd.h>
38 #include <obd_class.h>
39 #include <obd_support.h>
40 #include <lustre_net.h>
41 #include <lustre_import.h>
42 #include <lustre_dlm.h>
43 #include <lustre_sec.h>
44
45 #include "ptlrpc_internal.h"
46
47 /***********************************************
48  * policy registers                            *
49  ***********************************************/
50
51 static rwlock_t policy_lock = RW_LOCK_UNLOCKED;
52 static struct ptlrpc_sec_policy *policies[SPTLRPC_POLICY_MAX] = {
53         NULL,
54 };
55
56 int sptlrpc_register_policy(struct ptlrpc_sec_policy *policy)
57 {
58         __u16 number = policy->sp_policy;
59
60         LASSERT(policy->sp_name);
61         LASSERT(policy->sp_cops);
62         LASSERT(policy->sp_sops);
63
64         if (number >= SPTLRPC_POLICY_MAX)
65                 return -EINVAL;
66
67         write_lock(&policy_lock);
68         if (unlikely(policies[number])) {
69                 write_unlock(&policy_lock);
70                 return -EALREADY;
71         }
72         policies[number] = policy;
73         write_unlock(&policy_lock);
74
75         CDEBUG(D_SEC, "%s: registered\n", policy->sp_name);
76         return 0;
77 }
78 EXPORT_SYMBOL(sptlrpc_register_policy);
79
80 int sptlrpc_unregister_policy(struct ptlrpc_sec_policy *policy)
81 {
82         __u16 number = policy->sp_policy;
83
84         LASSERT(number < SPTLRPC_POLICY_MAX);
85
86         write_lock(&policy_lock);
87         if (unlikely(policies[number] == NULL)) {
88                 write_unlock(&policy_lock);
89                 CERROR("%s: already unregistered\n", policy->sp_name);
90                 return -EINVAL;
91         }
92
93         LASSERT(policies[number] == policy);
94         policies[number] = NULL;
95         write_unlock(&policy_lock);
96
97         CDEBUG(D_SEC, "%s: unregistered\n", policy->sp_name);
98         return 0;
99 }
100 EXPORT_SYMBOL(sptlrpc_unregister_policy);
101
102 static
103 struct ptlrpc_sec_policy * sptlrpc_rpcflavor2policy(__u16 flavor)
104 {
105         static DECLARE_MUTEX(load_mutex);
106         static atomic_t           loaded = ATOMIC_INIT(0);
107         struct ptlrpc_sec_policy *policy;
108         __u16                     number = RPC_FLVR_POLICY(flavor), flag = 0;
109
110         if (number >= SPTLRPC_POLICY_MAX)
111                 return NULL;
112
113 again:
114         read_lock(&policy_lock);
115         policy = policies[number];
116         if (policy && !try_module_get(policy->sp_owner))
117                 policy = NULL;
118         if (policy == NULL)
119                 flag = atomic_read(&loaded);
120         read_unlock(&policy_lock);
121
122         /* if failure, try to load gss module, once */
123         if (unlikely(policy == NULL) && flag == 0 &&
124             number == SPTLRPC_POLICY_GSS) {
125                 mutex_down(&load_mutex);
126                 if (atomic_read(&loaded) == 0) {
127                         if (request_module("ptlrpc_gss") != 0)
128                                 CERROR("Unable to load module ptlrpc_gss\n");
129                         else
130                                 CWARN("module ptlrpc_gss loaded on demand\n");
131
132                         atomic_set(&loaded, 1);
133                 }
134                 mutex_up(&load_mutex);
135
136                 goto again;
137         }
138
139         return policy;
140 }
141
142 __u16 sptlrpc_name2rpcflavor(const char *name)
143 {
144         if (!strcmp(name, "null"))
145                 return SPTLRPC_FLVR_NULL;
146         if (!strcmp(name, "plain"))
147                 return SPTLRPC_FLVR_PLAIN;
148         if (!strcmp(name, "krb5n"))
149                 return SPTLRPC_FLVR_KRB5N;
150         if (!strcmp(name, "krb5i"))
151                 return SPTLRPC_FLVR_KRB5I;
152         if (!strcmp(name, "krb5p"))
153                 return SPTLRPC_FLVR_KRB5P;
154
155         return SPTLRPC_FLVR_INVALID;
156 }
157 EXPORT_SYMBOL(sptlrpc_name2rpcflavor);
158
159 const char *sptlrpc_rpcflavor2name(__u16 flavor)
160 {
161         switch (flavor) {
162         case SPTLRPC_FLVR_NULL:
163                 return "null";
164         case SPTLRPC_FLVR_PLAIN:
165                 return "plain";
166         case SPTLRPC_FLVR_KRB5N:
167                 return "krb5n";
168         case SPTLRPC_FLVR_KRB5A:
169                 return "krb5a";
170         case SPTLRPC_FLVR_KRB5I:
171                 return "krb5i";
172         case SPTLRPC_FLVR_KRB5P:
173                 return "krb5p";
174         default:
175                 CERROR("invalid rpc flavor 0x%x(p%u,s%u,v%u)\n", flavor,
176                        RPC_FLVR_POLICY(flavor), RPC_FLVR_MECH(flavor),
177                        RPC_FLVR_SVC(flavor));
178         }
179         return "unknown";
180 }
181 EXPORT_SYMBOL(sptlrpc_rpcflavor2name);
182
183 int sptlrpc_flavor2name(struct sptlrpc_flavor *sf, char *buf, int bufsize)
184 {
185         char           *bulk;
186
187         if (sf->sf_bulk_ciph != BULK_CIPH_ALG_NULL)
188                 bulk = "bulkp";
189         else if (sf->sf_bulk_hash != BULK_HASH_ALG_NULL)
190                 bulk = "bulki";
191         else
192                 bulk = "bulkn";
193
194         snprintf(buf, bufsize, "%s-%s:%s/%s",
195                  sptlrpc_rpcflavor2name(sf->sf_rpc), bulk,
196                  sptlrpc_get_hash_name(sf->sf_bulk_hash),
197                  sptlrpc_get_ciph_name(sf->sf_bulk_ciph));
198         return 0;
199 }
200 EXPORT_SYMBOL(sptlrpc_flavor2name);
201
202 /**************************************************
203  * client context APIs                            *
204  **************************************************/
205
206 static
207 struct ptlrpc_cli_ctx *get_my_ctx(struct ptlrpc_sec *sec)
208 {
209         struct vfs_cred vcred;
210         int create = 1, remove_dead = 1;
211
212         LASSERT(sec);
213         LASSERT(sec->ps_policy->sp_cops->lookup_ctx);
214
215         if (sec->ps_flvr.sf_flags & (PTLRPC_SEC_FL_REVERSE |
216                                      PTLRPC_SEC_FL_ROOTONLY)) {
217                 vcred.vc_uid = 0;
218                 vcred.vc_gid = 0;
219                 if (sec->ps_flvr.sf_flags & PTLRPC_SEC_FL_REVERSE) {
220                         create = 0;
221                         remove_dead = 0;
222                 }
223         } else {
224                 vcred.vc_uid = cfs_current()->uid;
225                 vcred.vc_gid = cfs_current()->gid;
226         }
227
228         return sec->ps_policy->sp_cops->lookup_ctx(sec, &vcred,
229                                                    create, remove_dead);
230 }
231
232 struct ptlrpc_cli_ctx *sptlrpc_cli_ctx_get(struct ptlrpc_cli_ctx *ctx)
233 {
234         LASSERT(atomic_read(&ctx->cc_refcount) > 0);
235         atomic_inc(&ctx->cc_refcount);
236         return ctx;
237 }
238 EXPORT_SYMBOL(sptlrpc_cli_ctx_get);
239
240 void sptlrpc_cli_ctx_put(struct ptlrpc_cli_ctx *ctx, int sync)
241 {
242         struct ptlrpc_sec *sec = ctx->cc_sec;
243
244         LASSERT(sec);
245         LASSERT(atomic_read(&ctx->cc_refcount));
246
247         if (!atomic_dec_and_test(&ctx->cc_refcount))
248                 return;
249
250         sec->ps_policy->sp_cops->release_ctx(sec, ctx, sync);
251 }
252 EXPORT_SYMBOL(sptlrpc_cli_ctx_put);
253
254 /*
255  * expire the context immediately.
256  * the caller must hold at least 1 ref on the ctx.
257  */
258 void sptlrpc_cli_ctx_expire(struct ptlrpc_cli_ctx *ctx)
259 {
260         LASSERT(ctx->cc_ops->die);
261         ctx->cc_ops->die(ctx, 0);
262 }
263 EXPORT_SYMBOL(sptlrpc_cli_ctx_expire);
264
265 void sptlrpc_cli_ctx_wakeup(struct ptlrpc_cli_ctx *ctx)
266 {
267         struct ptlrpc_request *req, *next;
268
269         spin_lock(&ctx->cc_lock);
270         list_for_each_entry_safe(req, next, &ctx->cc_req_list, rq_ctx_chain) {
271                 list_del_init(&req->rq_ctx_chain);
272                 ptlrpc_wake_client_req(req);
273         }
274         spin_unlock(&ctx->cc_lock);
275 }
276 EXPORT_SYMBOL(sptlrpc_cli_ctx_wakeup);
277
278 int sptlrpc_cli_ctx_display(struct ptlrpc_cli_ctx *ctx, char *buf, int bufsize)
279 {
280         LASSERT(ctx->cc_ops);
281
282         if (ctx->cc_ops->display == NULL)
283                 return 0;
284
285         return ctx->cc_ops->display(ctx, buf, bufsize);
286 }
287
288 static int sptlrpc_import_sec_check_expire(struct obd_import *imp)
289 {
290         int     adapt = 0;
291
292         spin_lock(&imp->imp_lock);
293         if (imp->imp_sec_expire &&
294             imp->imp_sec_expire < cfs_time_current_sec()) {
295                 adapt = 1;
296                 imp->imp_sec_expire = 0;
297         }
298         spin_unlock(&imp->imp_lock);
299
300         if (!adapt)
301                 return 0;
302
303         CDEBUG(D_SEC, "found delayed sec adapt expired, do it now\n");
304         return sptlrpc_import_sec_adapt(imp, NULL, 0);
305 }
306
307 int sptlrpc_req_get_ctx(struct ptlrpc_request *req)
308 {
309         struct obd_import *imp = req->rq_import;
310         struct ptlrpc_sec *sec;
311         int                rc;
312         ENTRY;
313
314         LASSERT(!req->rq_cli_ctx);
315         LASSERT(imp);
316
317         if (unlikely(imp->imp_sec_expire)) {
318                 rc = sptlrpc_import_sec_check_expire(imp);
319                 if (rc)
320                         RETURN(rc);
321         }
322
323         sec = sptlrpc_import_sec_ref(imp);
324         if (sec == NULL) {
325                 CERROR("import %p (%s) with no ptlrpc_sec\n",
326                        imp, ptlrpc_import_state_name(imp->imp_state));
327                 RETURN(-EACCES);
328         }
329
330         if (unlikely(sec->ps_dying)) {
331                 CERROR("attempt to use dying sec %p\n", sec);
332                 return -EACCES;
333         }
334
335         req->rq_cli_ctx = get_my_ctx(sec);
336
337         sptlrpc_sec_put(sec);
338
339         if (!req->rq_cli_ctx) {
340                 CERROR("req %p: fail to get context\n", req);
341                 RETURN(-ENOMEM);
342         }
343
344         RETURN(0);
345 }
346
347 /*
348  * if @sync == 0, this function should return quickly without sleep;
349  * otherwise might trigger ctx destroying rpc to server.
350  */
351 void sptlrpc_req_put_ctx(struct ptlrpc_request *req, int sync)
352 {
353         ENTRY;
354
355         LASSERT(req);
356         LASSERT(req->rq_cli_ctx);
357
358         /* request might be asked to release earlier while still
359          * in the context waiting list.
360          */
361         if (!list_empty(&req->rq_ctx_chain)) {
362                 spin_lock(&req->rq_cli_ctx->cc_lock);
363                 list_del_init(&req->rq_ctx_chain);
364                 spin_unlock(&req->rq_cli_ctx->cc_lock);
365         }
366
367         sptlrpc_cli_ctx_put(req->rq_cli_ctx, sync);
368         req->rq_cli_ctx = NULL;
369         EXIT;
370 }
371
372 static
373 int sptlrpc_req_ctx_switch(struct ptlrpc_request *req,
374                            struct ptlrpc_cli_ctx *oldctx,
375                            struct ptlrpc_cli_ctx *newctx)
376 {
377         struct sptlrpc_flavor   old_flvr;
378         char                   *reqmsg;
379         int                     reqmsg_size;
380         int                     rc;
381
382         if (likely(oldctx->cc_sec == newctx->cc_sec))
383                 return 0;
384
385         LASSERT(req->rq_reqmsg);
386         LASSERT(req->rq_reqlen);
387         LASSERT(req->rq_replen);
388
389         CWARN("req %p: switch ctx %p -> %p, switch sec %p(%s) -> %p(%s)\n",
390               req, oldctx, newctx,
391               oldctx->cc_sec, oldctx->cc_sec->ps_policy->sp_name,
392               newctx->cc_sec, newctx->cc_sec->ps_policy->sp_name);
393
394         /* save flavor */
395         old_flvr = req->rq_flvr;
396
397         /* save request message */
398         reqmsg_size = req->rq_reqlen;
399         OBD_ALLOC(reqmsg, reqmsg_size);
400         if (reqmsg == NULL)
401                 return -ENOMEM;
402         memcpy(reqmsg, req->rq_reqmsg, reqmsg_size);
403
404         /* release old req/rep buf */
405         req->rq_cli_ctx = oldctx;
406         sptlrpc_cli_free_reqbuf(req);
407         sptlrpc_cli_free_repbuf(req);
408         req->rq_cli_ctx = newctx;
409
410         /* recalculate the flavor */
411         sptlrpc_req_set_flavor(req, 0);
412
413         /* alloc new request buffer
414          * we don't need to alloc reply buffer here, leave it to the
415          * rest procedure of ptlrpc
416          */
417         rc = sptlrpc_cli_alloc_reqbuf(req, reqmsg_size);
418         if (!rc) {
419                 LASSERT(req->rq_reqmsg);
420                 memcpy(req->rq_reqmsg, reqmsg, reqmsg_size);
421         } else {
422                 CWARN("failed to alloc reqbuf: %d\n", rc);
423                 req->rq_flvr = old_flvr;
424         }
425
426         OBD_FREE(reqmsg, reqmsg_size);
427         return rc;
428 }
429
430 /*
431  * request must have a context. in any case of failure, restore the
432  * restore the old one. a request must have a ctx.
433  */
434 int sptlrpc_req_replace_dead_ctx(struct ptlrpc_request *req)
435 {
436         struct ptlrpc_cli_ctx *oldctx = req->rq_cli_ctx;
437         struct ptlrpc_cli_ctx *newctx;
438         int                    rc;
439         ENTRY;
440
441         LASSERT(oldctx);
442         LASSERT(test_bit(PTLRPC_CTX_DEAD_BIT, &oldctx->cc_flags));
443
444         sptlrpc_cli_ctx_get(oldctx);
445         sptlrpc_req_put_ctx(req, 0);
446
447         rc = sptlrpc_req_get_ctx(req);
448         if (unlikely(rc)) {
449                 LASSERT(!req->rq_cli_ctx);
450
451                 /* restore old ctx */
452                 req->rq_cli_ctx = oldctx;
453                 RETURN(rc);
454         }
455
456         newctx = req->rq_cli_ctx;
457         LASSERT(newctx);
458
459         if (unlikely(newctx == oldctx)) {
460                 /*
461                  * still get the old ctx, usually means system busy
462                  */
463                 CWARN("ctx (%p, fl %lx) doesn't switch, relax a little bit\n",
464                       newctx, newctx->cc_flags);
465
466                 schedule_timeout(HZ);
467         } else {
468                 rc = sptlrpc_req_ctx_switch(req, oldctx, newctx);
469                 if (rc) {
470                         /* restore old ctx */
471                         sptlrpc_req_put_ctx(req, 0);
472                         req->rq_cli_ctx = oldctx;
473                         RETURN(rc);
474                 }
475
476                 LASSERT(req->rq_cli_ctx == newctx);
477         }
478
479         sptlrpc_cli_ctx_put(oldctx, 1);
480         RETURN(0);
481 }
482 EXPORT_SYMBOL(sptlrpc_req_replace_dead_ctx);
483
484 static
485 int ctx_check_refresh(struct ptlrpc_cli_ctx *ctx)
486 {
487         if (cli_ctx_is_refreshed(ctx))
488                 return 1;
489         return 0;
490 }
491
492 static
493 int ctx_refresh_timeout(void *data)
494 {
495         struct ptlrpc_request *req = data;
496         int rc;
497
498         /* conn_cnt is needed in expire_one_request */
499         lustre_msg_set_conn_cnt(req->rq_reqmsg, req->rq_import->imp_conn_cnt);
500
501         rc = ptlrpc_expire_one_request(req);
502         /* if we started recovery, we should mark this ctx dead; otherwise
503          * in case of lgssd died nobody would retire this ctx, following
504          * connecting will still find the same ctx thus cause deadlock.
505          * there's an assumption that expire time of the request should be
506          * later than the context refresh expire time.
507          */
508         if (rc == 0)
509                 req->rq_cli_ctx->cc_ops->die(req->rq_cli_ctx, 0);
510         return rc;
511 }
512
513 static
514 void ctx_refresh_interrupt(void *data)
515 {
516         struct ptlrpc_request *req = data;
517
518         spin_lock(&req->rq_lock);
519         req->rq_intr = 1;
520         spin_unlock(&req->rq_lock);
521 }
522
523 static
524 void req_off_ctx_list(struct ptlrpc_request *req, struct ptlrpc_cli_ctx *ctx)
525 {
526         spin_lock(&ctx->cc_lock);
527         if (!list_empty(&req->rq_ctx_chain))
528                 list_del_init(&req->rq_ctx_chain);
529         spin_unlock(&ctx->cc_lock);
530 }
531
532 /*
533  * the status of context could be subject to be changed by other threads at any
534  * time. we allow this race. but once we return with 0, the caller will
535  * suppose it's uptodated and keep using it until the owning rpc is done.
536  *
537  * @timeout:
538  *    < 0  - don't wait
539  *    = 0  - wait until success or fatal error occur
540  *    > 0  - timeout value
541  *
542  * return 0 only if the context is uptodated.
543  */
544 int sptlrpc_req_refresh_ctx(struct ptlrpc_request *req, long timeout)
545 {
546         struct ptlrpc_cli_ctx  *ctx = req->rq_cli_ctx;
547         struct l_wait_info      lwi;
548         int                     rc;
549         ENTRY;
550
551         LASSERT(ctx);
552
553         /*
554          * during the process a request's context might change type even
555          * (e.g. from gss ctx to plain ctx), so each loop we need to re-check
556          * everything
557          */
558 again:
559         /* skip special ctxs */
560         if (cli_ctx_is_eternal(ctx) || req->rq_ctx_init || req->rq_ctx_fini)
561                 RETURN(0);
562
563         if (test_bit(PTLRPC_CTX_NEW_BIT, &ctx->cc_flags)) {
564                 LASSERT(ctx->cc_ops->refresh);
565                 ctx->cc_ops->refresh(ctx);
566         }
567         LASSERT(test_bit(PTLRPC_CTX_NEW_BIT, &ctx->cc_flags) == 0);
568
569         LASSERT(ctx->cc_ops->validate);
570         if (ctx->cc_ops->validate(ctx) == 0) {
571                 req_off_ctx_list(req, ctx);
572                 RETURN(0);
573         }
574
575         if (unlikely(test_bit(PTLRPC_CTX_ERROR_BIT, &ctx->cc_flags))) {
576                 req->rq_err = 1;
577                 req_off_ctx_list(req, ctx);
578                 RETURN(-EPERM);
579         }
580
581         /* This is subtle. For resent message we have to keep original
582          * context to survive following situation:
583          *  1. the request sent to server
584          *  2. recovery was kick start
585          *  3. recovery finished, the request marked as resent
586          *  4. resend the request
587          *  5. old reply from server received (because xid is the same)
588          *  6. verify reply (has to be success)
589          *  7. new reply from server received, lnet drop it
590          *
591          * Note we can't simply change xid for resent request because
592          * server reply on it for reply reconstruction.
593          *
594          * Commonly the original context should be uptodate because we
595          * have a expiry nice time; And server will keep their half part
596          * context because we at least hold a ref of old context which
597          * prevent the context detroy RPC be sent. So server still can
598          * accept the request and finish RPC. Two cases:
599          *  1. If server side context has been trimed, a NO_CONTEXT will
600          *     be returned, gss_cli_ctx_verify/unseal will switch to new
601          *     context by force.
602          *  2. Current context never be refreshed, then we are fine: we
603          *     never really send request with old context before.
604          */
605         if (test_bit(PTLRPC_CTX_UPTODATE_BIT, &ctx->cc_flags) &&
606             unlikely(req->rq_reqmsg) &&
607             lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
608                 req_off_ctx_list(req, ctx);
609                 RETURN(0);
610         }
611
612         if (unlikely(test_bit(PTLRPC_CTX_DEAD_BIT, &ctx->cc_flags))) {
613                 rc = sptlrpc_req_replace_dead_ctx(req);
614                 if (rc) {
615                         LASSERT(ctx == req->rq_cli_ctx);
616                         CERROR("req %p: failed to replace dead ctx %p: %d\n",
617                                 req, ctx, rc);
618                         req->rq_err = 1;
619                         LASSERT(list_empty(&req->rq_ctx_chain));
620                         RETURN(rc);
621                 }
622
623                 CWARN("req %p: replace dead ctx %p => ctx %p (%u->%s)\n",
624                       req, ctx, req->rq_cli_ctx,
625                       req->rq_cli_ctx->cc_vcred.vc_uid,
626                       sec2target_str(req->rq_cli_ctx->cc_sec));
627
628                 ctx = req->rq_cli_ctx;
629                 LASSERT(list_empty(&req->rq_ctx_chain));
630
631                 goto again;
632         }
633
634         /* Now we're sure this context is during upcall, add myself into
635          * waiting list
636          */
637         spin_lock(&ctx->cc_lock);
638         if (list_empty(&req->rq_ctx_chain))
639                 list_add(&req->rq_ctx_chain, &ctx->cc_req_list);
640         spin_unlock(&ctx->cc_lock);
641
642         if (timeout < 0) {
643                 RETURN(-EWOULDBLOCK);
644         }
645
646         /* Clear any flags that may be present from previous sends */
647         LASSERT(req->rq_receiving_reply == 0);
648         spin_lock(&req->rq_lock);
649         req->rq_err = 0;
650         req->rq_timedout = 0;
651         req->rq_resend = 0;
652         req->rq_restart = 0;
653         spin_unlock(&req->rq_lock);
654
655         lwi = LWI_TIMEOUT_INTR(timeout * HZ, ctx_refresh_timeout,
656                                ctx_refresh_interrupt, req);
657         rc = l_wait_event(req->rq_reply_waitq, ctx_check_refresh(ctx), &lwi);
658
659         /* following cases we could be here:
660          * - successfully refreshed;
661          * - interruptted;
662          * - timedout, and we don't want recover from the failure;
663          * - timedout, and waked up upon recovery finished;
664          * - someone else mark this ctx dead by force;
665          * - someone invalidate the req and call wake_client_req(),
666          *   e.g. ptlrpc_abort_inflight();
667          */
668         if (!cli_ctx_is_refreshed(ctx)) {
669                 /* timed out or interruptted */
670                 req_off_ctx_list(req, ctx);
671
672                 LASSERT(rc != 0);
673                 RETURN(rc);
674         }
675
676         goto again;
677 }
678
679 /*
680  * Note this could be called in two situations:
681  * - new request from ptlrpc_pre_req(), with proper @opcode
682  * - old request which changed ctx in the middle, with @opcode == 0
683  */
684 void sptlrpc_req_set_flavor(struct ptlrpc_request *req, int opcode)
685 {
686         struct ptlrpc_sec *sec;
687
688         LASSERT(req->rq_import);
689         LASSERT(req->rq_cli_ctx);
690         LASSERT(req->rq_cli_ctx->cc_sec);
691         LASSERT(req->rq_bulk_read == 0 || req->rq_bulk_write == 0);
692
693         /* special security flags accoding to opcode */
694         switch (opcode) {
695         case OST_READ:
696                 req->rq_bulk_read = 1;
697                 break;
698         case OST_WRITE:
699                 req->rq_bulk_write = 1;
700                 break;
701         case SEC_CTX_INIT:
702                 req->rq_ctx_init = 1;
703                 break;
704         case SEC_CTX_FINI:
705                 req->rq_ctx_fini = 1;
706                 break;
707         case 0:
708                 /* init/fini rpc won't be resend, so can't be here */
709                 LASSERT(req->rq_ctx_init == 0);
710                 LASSERT(req->rq_ctx_fini == 0);
711
712                 /* cleanup flags, which should be recalculated */
713                 req->rq_pack_udesc = 0;
714                 req->rq_pack_bulk = 0;
715                 break;
716         }
717
718         sec = req->rq_cli_ctx->cc_sec;
719
720         spin_lock(&sec->ps_lock);
721         req->rq_flvr = sec->ps_flvr;
722         spin_unlock(&sec->ps_lock);
723
724         /* force SVC_NULL for context initiation rpc, SVC_INTG for context
725          * destruction rpc */
726         if (unlikely(req->rq_ctx_init))
727                 rpc_flvr_set_svc(&req->rq_flvr.sf_rpc, SPTLRPC_SVC_NULL);
728         else if (unlikely(req->rq_ctx_fini))
729                 rpc_flvr_set_svc(&req->rq_flvr.sf_rpc, SPTLRPC_SVC_INTG);
730
731         /* user descriptor flag, null security can't do it anyway */
732         if ((sec->ps_flvr.sf_flags & PTLRPC_SEC_FL_UDESC) &&
733             (req->rq_flvr.sf_rpc != SPTLRPC_FLVR_NULL))
734                 req->rq_pack_udesc = 1;
735
736         /* bulk security flag */
737         if ((req->rq_bulk_read || req->rq_bulk_write) &&
738             (req->rq_flvr.sf_bulk_ciph != BULK_CIPH_ALG_NULL ||
739              req->rq_flvr.sf_bulk_hash != BULK_HASH_ALG_NULL))
740                 req->rq_pack_bulk = 1;
741 }
742
743 void sptlrpc_request_out_callback(struct ptlrpc_request *req)
744 {
745         if (RPC_FLVR_SVC(req->rq_flvr.sf_rpc) != SPTLRPC_SVC_PRIV)
746                 return;
747
748         LASSERT(req->rq_clrbuf);
749         if (req->rq_pool || !req->rq_reqbuf)
750                 return;
751
752         OBD_FREE(req->rq_reqbuf, req->rq_reqbuf_len);
753         req->rq_reqbuf = NULL;
754         req->rq_reqbuf_len = 0;
755 }
756
757 /*
758  * check whether current user have valid context for an import or not.
759  * might repeatedly try in case of non-fatal errors.
760  * return 0 on success, < 0 on failure
761  */
762 int sptlrpc_import_check_ctx(struct obd_import *imp)
763 {
764         struct ptlrpc_sec     *sec;
765         struct ptlrpc_cli_ctx *ctx;
766         struct ptlrpc_request *req = NULL;
767         int rc;
768         ENTRY;
769
770         might_sleep();
771
772         sec = sptlrpc_import_sec_ref(imp);
773         ctx = get_my_ctx(sec);
774         sptlrpc_sec_put(sec);
775
776         if (!ctx)
777                 RETURN(1);
778
779         if (cli_ctx_is_eternal(ctx) ||
780             ctx->cc_ops->validate(ctx) == 0) {
781                 sptlrpc_cli_ctx_put(ctx, 1);
782                 RETURN(0);
783         }
784
785         OBD_ALLOC_PTR(req);
786         if (!req)
787                 RETURN(-ENOMEM);
788
789         spin_lock_init(&req->rq_lock);
790         atomic_set(&req->rq_refcount, 10000);
791         CFS_INIT_LIST_HEAD(&req->rq_ctx_chain);
792         init_waitqueue_head(&req->rq_reply_waitq);
793         req->rq_import = imp;
794         req->rq_cli_ctx = ctx;
795
796         rc = sptlrpc_req_refresh_ctx(req, 0);
797         LASSERT(list_empty(&req->rq_ctx_chain));
798         sptlrpc_cli_ctx_put(req->rq_cli_ctx, 1);
799         OBD_FREE_PTR(req);
800
801         RETURN(rc);
802 }
803
804 int sptlrpc_cli_wrap_request(struct ptlrpc_request *req)
805 {
806         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
807         int rc = 0;
808         ENTRY;
809
810         LASSERT(ctx);
811         LASSERT(ctx->cc_sec);
812         LASSERT(req->rq_reqbuf || req->rq_clrbuf);
813
814         /* we wrap bulk request here because now we can be sure
815          * the context is uptodate.
816          */
817         if (req->rq_bulk) {
818                 rc = sptlrpc_cli_wrap_bulk(req, req->rq_bulk);
819                 if (rc)
820                         RETURN(rc);
821         }
822
823         switch (RPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
824         case SPTLRPC_SVC_NULL:
825         case SPTLRPC_SVC_AUTH:
826         case SPTLRPC_SVC_INTG:
827                 LASSERT(ctx->cc_ops->sign);
828                 rc = ctx->cc_ops->sign(ctx, req);
829                 break;
830         case SPTLRPC_SVC_PRIV:
831                 LASSERT(ctx->cc_ops->seal);
832                 rc = ctx->cc_ops->seal(ctx, req);
833                 break;
834         default:
835                 LBUG();
836         }
837
838         if (rc == 0) {
839                 LASSERT(req->rq_reqdata_len);
840                 LASSERT(req->rq_reqdata_len % 8 == 0);
841                 LASSERT(req->rq_reqdata_len <= req->rq_reqbuf_len);
842         }
843
844         RETURN(rc);
845 }
846
847 /*
848  * rq_nob_received is the actual received data length
849  */
850 int sptlrpc_cli_unwrap_reply(struct ptlrpc_request *req)
851 {
852         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
853         int                    rc;
854         ENTRY;
855
856         LASSERT(ctx);
857         LASSERT(ctx->cc_sec);
858         LASSERT(ctx->cc_ops);
859         LASSERT(req->rq_repbuf);
860
861         req->rq_repdata_len = req->rq_nob_received;
862
863         if (req->rq_nob_received < sizeof(struct lustre_msg)) {
864                 CERROR("replied data length %d too small\n",
865                        req->rq_nob_received);
866                 RETURN(-EPROTO);
867         }
868
869
870         if (req->rq_repbuf->lm_magic == LUSTRE_MSG_MAGIC_V1 ||
871             req->rq_repbuf->lm_magic == LUSTRE_MSG_MAGIC_V1_SWABBED) {
872                 /*
873                  * v1 message, it's must be null flavor, so our requets also
874                  * should be in null flavor
875                  */
876                 if (RPC_FLVR_POLICY(req->rq_flvr.sf_rpc) !=
877                     SPTLRPC_POLICY_NULL) {
878                         CERROR("request was %s but reply with null\n",
879                                sptlrpc_rpcflavor2name(req->rq_flvr.sf_rpc));
880                         RETURN(-EPROTO);
881                 }
882         } else {
883                 /*
884                  * v2 message, check request/reply policy match
885                  */
886                 __u16 rpc_flvr = WIRE_FLVR_RPC(req->rq_repbuf->lm_secflvr);
887
888                 if (req->rq_repbuf->lm_magic == LUSTRE_MSG_MAGIC_V2_SWABBED)
889                         __swab16s(&rpc_flvr);
890
891                 if (RPC_FLVR_POLICY(rpc_flvr) !=
892                     RPC_FLVR_POLICY(req->rq_flvr.sf_rpc)) {
893                         CERROR("request policy was %u while reply with %u\n",
894                                RPC_FLVR_POLICY(req->rq_flvr.sf_rpc),
895                                RPC_FLVR_POLICY(rpc_flvr));
896                         RETURN(-EPROTO);
897                 }
898
899                 /* do nothing if it's null policy; otherwise unpack the
900                  * wrapper message
901                  */
902                 if (RPC_FLVR_POLICY(rpc_flvr) != SPTLRPC_POLICY_NULL &&
903                     lustre_unpack_msg(req->rq_repbuf, req->rq_nob_received))
904                         RETURN(-EPROTO);
905         }
906
907         switch (RPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
908         case SPTLRPC_SVC_NULL:
909         case SPTLRPC_SVC_AUTH:
910         case SPTLRPC_SVC_INTG:
911                 LASSERT(ctx->cc_ops->verify);
912                 rc = ctx->cc_ops->verify(ctx, req);
913                 break;
914         case SPTLRPC_SVC_PRIV:
915                 LASSERT(ctx->cc_ops->unseal);
916                 rc = ctx->cc_ops->unseal(ctx, req);
917                 break;
918         default:
919                 LBUG();
920         }
921
922         LASSERT(rc || req->rq_repmsg || req->rq_resend);
923         RETURN(rc);
924 }
925
926 /**************************************************
927  * sec ID                                         *
928  **************************************************/
929
930 /*
931  * "fixed" sec (e.g. null) use sec_id < 0
932  */
933 static atomic_t sptlrpc_sec_id = ATOMIC_INIT(1);
934
935 int sptlrpc_get_next_secid(void)
936 {
937         return atomic_inc_return(&sptlrpc_sec_id);
938 }
939 EXPORT_SYMBOL(sptlrpc_get_next_secid);
940
941 /**************************************************
942  * client side high-level security APIs           *
943  **************************************************/
944
945 static int sec_cop_flush_ctx_cache(struct ptlrpc_sec *sec, uid_t uid,
946                                    int grace, int force)
947 {
948         struct ptlrpc_sec_policy *policy = sec->ps_policy;
949
950         LASSERT(policy->sp_cops);
951         LASSERT(policy->sp_cops->flush_ctx_cache);
952
953         return policy->sp_cops->flush_ctx_cache(sec, uid, grace, force);
954 }
955
956 static void sec_cop_destroy_sec(struct ptlrpc_sec *sec)
957 {
958         struct ptlrpc_sec_policy *policy = sec->ps_policy;
959
960         LASSERT(atomic_read(&sec->ps_refcount) == 0);
961         LASSERT(atomic_read(&sec->ps_nctx) == 0);
962         LASSERT(policy->sp_cops->destroy_sec);
963
964         CDEBUG(D_SEC, "%s@%p: being destroied\n", sec->ps_policy->sp_name, sec);
965
966         policy->sp_cops->destroy_sec(sec);
967         sptlrpc_policy_put(policy);
968 }
969
970 void sptlrpc_sec_destroy(struct ptlrpc_sec *sec)
971 {
972         sec_cop_destroy_sec(sec);
973 }
974 EXPORT_SYMBOL(sptlrpc_sec_destroy);
975
976 static void sptlrpc_sec_kill(struct ptlrpc_sec *sec)
977 {
978         LASSERT(atomic_read(&sec->ps_refcount) > 0);
979
980         if (sec->ps_policy->sp_cops->kill_sec) {
981                 sec->ps_policy->sp_cops->kill_sec(sec);
982
983                 sec_cop_flush_ctx_cache(sec, -1, 1, 1);
984         }
985 }
986
987 struct ptlrpc_sec *sptlrpc_sec_get(struct ptlrpc_sec *sec)
988 {
989         if (sec) {
990                 LASSERT(atomic_read(&sec->ps_refcount) > 0);
991                 atomic_inc(&sec->ps_refcount);
992         }
993
994         return sec;
995 }
996 EXPORT_SYMBOL(sptlrpc_sec_get);
997
998 void sptlrpc_sec_put(struct ptlrpc_sec *sec)
999 {
1000         if (sec) {
1001                 LASSERT(atomic_read(&sec->ps_refcount) > 0);
1002
1003                 if (atomic_dec_and_test(&sec->ps_refcount)) {
1004                         LASSERT(atomic_read(&sec->ps_nctx) == 0);
1005
1006                         sptlrpc_gc_del_sec(sec);
1007                         sec_cop_destroy_sec(sec);
1008                 }
1009         }
1010 }
1011 EXPORT_SYMBOL(sptlrpc_sec_put);
1012
1013 /*
1014  * it's policy module responsible for taking refrence of import
1015  */
1016 static
1017 struct ptlrpc_sec * sptlrpc_sec_create(struct obd_import *imp,
1018                                        struct ptlrpc_svc_ctx *svc_ctx,
1019                                        struct sptlrpc_flavor *sf,
1020                                        enum lustre_sec_part sp)
1021 {
1022         struct ptlrpc_sec_policy *policy;
1023         struct ptlrpc_sec        *sec;
1024         ENTRY;
1025
1026         if (svc_ctx) {
1027                 LASSERT(imp->imp_dlm_fake == 1);
1028
1029                 CDEBUG(D_SEC, "%s %s: reverse sec using flavor %s\n",
1030                        imp->imp_obd->obd_type->typ_name,
1031                        imp->imp_obd->obd_name,
1032                        sptlrpc_rpcflavor2name(sf->sf_rpc));
1033
1034                 policy = sptlrpc_policy_get(svc_ctx->sc_policy);
1035                 sf->sf_flags |= PTLRPC_SEC_FL_REVERSE | PTLRPC_SEC_FL_ROOTONLY;
1036         } else {
1037                 LASSERT(imp->imp_dlm_fake == 0);
1038
1039                 CDEBUG(D_SEC, "%s %s: select security flavor %s\n",
1040                        imp->imp_obd->obd_type->typ_name,
1041                        imp->imp_obd->obd_name,
1042                        sptlrpc_rpcflavor2name(sf->sf_rpc));
1043
1044                 policy = sptlrpc_rpcflavor2policy(sf->sf_rpc);
1045                 if (!policy) {
1046                         CERROR("invalid flavor 0x%x\n", sf->sf_rpc);
1047                         RETURN(NULL);
1048                 }
1049         }
1050
1051         sec = policy->sp_cops->create_sec(imp, svc_ctx, sf);
1052         if (sec) {
1053                 atomic_inc(&sec->ps_refcount);
1054
1055                 sec->ps_part = sp;
1056
1057                 if (sec->ps_gc_interval && policy->sp_cops->gc_ctx)
1058                         sptlrpc_gc_add_sec(sec);
1059         } else {
1060                 sptlrpc_policy_put(policy);
1061         }
1062
1063         RETURN(sec);
1064 }
1065
1066 struct ptlrpc_sec *sptlrpc_import_sec_ref(struct obd_import *imp)
1067 {
1068         struct ptlrpc_sec *sec;
1069
1070         spin_lock(&imp->imp_lock);
1071         sec = sptlrpc_sec_get(imp->imp_sec);
1072         spin_unlock(&imp->imp_lock);
1073
1074         return sec;
1075 }
1076 EXPORT_SYMBOL(sptlrpc_import_sec_ref);
1077
1078 static void sptlrpc_import_sec_install(struct obd_import *imp,
1079                                        struct ptlrpc_sec *sec)
1080 {
1081         struct ptlrpc_sec *old_sec;
1082
1083         LASSERT(atomic_read(&sec->ps_refcount) > 0);
1084
1085         spin_lock(&imp->imp_lock);
1086         old_sec = imp->imp_sec;
1087         imp->imp_sec = sec;
1088         spin_unlock(&imp->imp_lock);
1089
1090         if (old_sec) {
1091                 sptlrpc_sec_kill(old_sec);
1092
1093                 /* balance the ref taken by this import */
1094                 sptlrpc_sec_put(old_sec);
1095         }
1096 }
1097
1098 static void sptlrpc_import_sec_adapt_inplace(struct obd_import *imp,
1099                                              struct ptlrpc_sec *sec,
1100                                              struct sptlrpc_flavor *sf)
1101 {
1102         if (sf->sf_bulk_ciph != sec->ps_flvr.sf_bulk_ciph ||
1103             sf->sf_bulk_hash != sec->ps_flvr.sf_bulk_hash) {
1104                 CWARN("imp %p (%s->%s): changing bulk flavor %s/%s -> %s/%s\n",
1105                       imp, imp->imp_obd->obd_name,
1106                       obd_uuid2str(&imp->imp_connection->c_remote_uuid),
1107                       sptlrpc_get_ciph_name(sec->ps_flvr.sf_bulk_ciph),
1108                       sptlrpc_get_hash_name(sec->ps_flvr.sf_bulk_hash),
1109                       sptlrpc_get_ciph_name(sf->sf_bulk_ciph),
1110                       sptlrpc_get_hash_name(sf->sf_bulk_hash));
1111
1112                 spin_lock(&sec->ps_lock);
1113                 sec->ps_flvr.sf_bulk_ciph = sf->sf_bulk_ciph;
1114                 sec->ps_flvr.sf_bulk_hash = sf->sf_bulk_hash;
1115                 spin_unlock(&sec->ps_lock);
1116         }
1117
1118         if (!equi(sf->sf_flags & PTLRPC_SEC_FL_UDESC,
1119                   sec->ps_flvr.sf_flags & PTLRPC_SEC_FL_UDESC)) {
1120                 CWARN("imp %p (%s->%s): %s shipping user descriptor\n",
1121                       imp, imp->imp_obd->obd_name,
1122                       obd_uuid2str(&imp->imp_connection->c_remote_uuid),
1123                       (sf->sf_flags & PTLRPC_SEC_FL_UDESC) ? "start" : "stop");
1124
1125                 spin_lock(&sec->ps_lock);
1126                 sec->ps_flvr.sf_flags &= ~PTLRPC_SEC_FL_UDESC;
1127                 sec->ps_flvr.sf_flags |= sf->sf_flags & PTLRPC_SEC_FL_UDESC;
1128                 spin_unlock(&sec->ps_lock);
1129         }
1130 }
1131
1132 /*
1133  * for normal import, @svc_ctx should be NULL and @rpc_flavor is ignored;
1134  * for reverse import, @svc_ctx and @rpc_flavor is from incoming request.
1135  */
1136 int sptlrpc_import_sec_adapt(struct obd_import *imp,
1137                              struct ptlrpc_svc_ctx *svc_ctx,
1138                              __u16 rpc_flavor)
1139 {
1140         struct ptlrpc_connection   *conn;
1141         struct sptlrpc_flavor       sf;
1142         struct ptlrpc_sec          *sec, *newsec;
1143         enum lustre_sec_part        sp;
1144         int                         rc;
1145
1146         if (imp == NULL)
1147                 return 0;
1148
1149         conn = imp->imp_connection;
1150
1151         if (svc_ctx == NULL) {
1152                 /* normal import, determine flavor from rule set */
1153                 sptlrpc_rule_set_choose(&imp->imp_obd->u.cli.cl_sptlrpc_rset,
1154                                         LUSTRE_SP_ANY, conn->c_self, &sf);
1155
1156                 sp = imp->imp_obd->u.cli.cl_sec_part;
1157         } else {
1158                 /* reverse import, determine flavor from incoming reqeust */
1159                 sf.sf_rpc = rpc_flavor;
1160                 sf.sf_bulk_ciph = BULK_CIPH_ALG_NULL;
1161                 sf.sf_bulk_hash = BULK_HASH_ALG_NULL;
1162                 sf.sf_flags = PTLRPC_SEC_FL_REVERSE | PTLRPC_SEC_FL_ROOTONLY;
1163
1164                 sp = sptlrpc_target_sec_part(imp->imp_obd);
1165         }
1166
1167         sec = sptlrpc_import_sec_ref(imp);
1168         if (sec) {
1169                 if (svc_ctx == NULL) {
1170                         /* normal import, only check rpc flavor, if just bulk
1171                          * flavor or flags changed, we can handle it on the fly
1172                          * without switching sec. */
1173                         if (sf.sf_rpc == sec->ps_flvr.sf_rpc) {
1174                                 sptlrpc_import_sec_adapt_inplace(imp, sec, &sf);
1175
1176                                 rc = 0;
1177                                 goto out;
1178                         }
1179                 } else {
1180                         /* reverse import, do not compare bulk flavor */
1181                         if (sf.sf_rpc == sec->ps_flvr.sf_rpc) {
1182                                 rc = 0;
1183                                 goto out;
1184                         }
1185                 }
1186
1187                 CWARN("%simport %p (%s%s%s): changing flavor "
1188                       "(%s, %s/%s) -> (%s, %s/%s)\n",
1189                       svc_ctx ? "reverse " : "",
1190                       imp, imp->imp_obd->obd_name,
1191                       svc_ctx == NULL ? "->" : "<-",
1192                       obd_uuid2str(&conn->c_remote_uuid),
1193                       sptlrpc_rpcflavor2name(sec->ps_flvr.sf_rpc),
1194                       sptlrpc_get_hash_name(sec->ps_flvr.sf_bulk_hash),
1195                       sptlrpc_get_ciph_name(sec->ps_flvr.sf_bulk_ciph),
1196                       sptlrpc_rpcflavor2name(sf.sf_rpc),
1197                       sptlrpc_get_hash_name(sf.sf_bulk_hash),
1198                       sptlrpc_get_ciph_name(sf.sf_bulk_ciph));
1199         } else {
1200                 CWARN("%simport %p (%s%s%s) netid %x: "
1201                       "select initial flavor (%s, %s/%s)\n",
1202                       svc_ctx == NULL ? "" : "reverse ",
1203                       imp, imp->imp_obd->obd_name,
1204                       svc_ctx == NULL ? "->" : "<-",
1205                       obd_uuid2str(&conn->c_remote_uuid),
1206                       LNET_NIDNET(conn->c_self),
1207                       sptlrpc_rpcflavor2name(sf.sf_rpc),
1208                       sptlrpc_get_hash_name(sf.sf_bulk_hash),
1209                       sptlrpc_get_ciph_name(sf.sf_bulk_ciph));
1210         }
1211
1212         mutex_down(&imp->imp_sec_mutex);
1213
1214         newsec = sptlrpc_sec_create(imp, svc_ctx, &sf, sp);
1215         if (newsec) {
1216                 sptlrpc_import_sec_install(imp, newsec);
1217                 rc = 0;
1218         } else {
1219                 CERROR("%simport %p (%s): failed to create new sec\n",
1220                        svc_ctx == NULL ? "" : "reverse ",
1221                        imp, obd_uuid2str(&conn->c_remote_uuid));
1222                 rc = -EPERM;
1223         }
1224
1225         mutex_up(&imp->imp_sec_mutex);
1226
1227 out:
1228         sptlrpc_sec_put(sec);
1229         return 0;
1230 }
1231
1232 void sptlrpc_import_sec_put(struct obd_import *imp)
1233 {
1234         if (imp->imp_sec) {
1235                 sptlrpc_sec_kill(imp->imp_sec);
1236
1237                 sptlrpc_sec_put(imp->imp_sec);
1238                 imp->imp_sec = NULL;
1239         }
1240 }
1241
1242 static void import_flush_ctx_common(struct obd_import *imp,
1243                                     uid_t uid, int grace, int force)
1244 {
1245         struct ptlrpc_sec *sec;
1246
1247         if (imp == NULL)
1248                 return;
1249
1250         sec = sptlrpc_import_sec_ref(imp);
1251         if (sec == NULL)
1252                 return;
1253
1254         sec_cop_flush_ctx_cache(sec, uid, grace, force);
1255         sptlrpc_sec_put(sec);
1256 }
1257
1258 void sptlrpc_import_inval_all_ctx(struct obd_import *imp)
1259 {
1260         /* use grace == 0 */
1261         import_flush_ctx_common(imp, -1, 0, 1);
1262 }
1263
1264 void sptlrpc_import_flush_root_ctx(struct obd_import *imp)
1265 {
1266         /* it's important to use grace mode, see explain in
1267          * sptlrpc_req_refresh_ctx() */
1268         import_flush_ctx_common(imp, 0, 1, 1);
1269 }
1270
1271 void sptlrpc_import_flush_my_ctx(struct obd_import *imp)
1272 {
1273         import_flush_ctx_common(imp, cfs_current()->uid, 1, 1);
1274 }
1275 EXPORT_SYMBOL(sptlrpc_import_flush_my_ctx);
1276
1277 void sptlrpc_import_flush_all_ctx(struct obd_import *imp)
1278 {
1279         import_flush_ctx_common(imp, -1, 1, 1);
1280 }
1281 EXPORT_SYMBOL(sptlrpc_import_flush_all_ctx);
1282
1283 /*
1284  * when complete successfully, req->rq_reqmsg should point to the
1285  * right place.
1286  */
1287 int sptlrpc_cli_alloc_reqbuf(struct ptlrpc_request *req, int msgsize)
1288 {
1289         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
1290         struct ptlrpc_sec_policy *policy;
1291         int rc;
1292
1293         LASSERT(ctx);
1294         LASSERT(atomic_read(&ctx->cc_refcount));
1295         LASSERT(ctx->cc_sec);
1296         LASSERT(ctx->cc_sec->ps_policy);
1297         LASSERT(req->rq_reqmsg == NULL);
1298
1299         policy = ctx->cc_sec->ps_policy;
1300         rc = policy->sp_cops->alloc_reqbuf(ctx->cc_sec, req, msgsize);
1301         if (!rc) {
1302                 LASSERT(req->rq_reqmsg);
1303                 LASSERT(req->rq_reqbuf || req->rq_clrbuf);
1304
1305                 /* zeroing preallocated buffer */
1306                 if (req->rq_pool)
1307                         memset(req->rq_reqmsg, 0, msgsize);
1308         }
1309
1310         return rc;
1311 }
1312
1313 void sptlrpc_cli_free_reqbuf(struct ptlrpc_request *req)
1314 {
1315         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
1316         struct ptlrpc_sec_policy *policy;
1317
1318         LASSERT(ctx);
1319         LASSERT(atomic_read(&ctx->cc_refcount));
1320         LASSERT(ctx->cc_sec);
1321         LASSERT(ctx->cc_sec->ps_policy);
1322
1323         if (req->rq_reqbuf == NULL && req->rq_clrbuf == NULL)
1324                 return;
1325
1326         policy = ctx->cc_sec->ps_policy;
1327         policy->sp_cops->free_reqbuf(ctx->cc_sec, req);
1328 }
1329
1330 /*
1331  * NOTE caller must guarantee the buffer size is enough for the enlargement
1332  */
1333 void _sptlrpc_enlarge_msg_inplace(struct lustre_msg *msg,
1334                                   int segment, int newsize)
1335 {
1336         void   *src, *dst;
1337         int     oldsize, oldmsg_size, movesize;
1338
1339         LASSERT(segment < msg->lm_bufcount);
1340         LASSERT(msg->lm_buflens[segment] <= newsize);
1341
1342         if (msg->lm_buflens[segment] == newsize)
1343                 return;
1344
1345         /* nothing to do if we are enlarging the last segment */
1346         if (segment == msg->lm_bufcount - 1) {
1347                 msg->lm_buflens[segment] = newsize;
1348                 return;
1349         }
1350
1351         oldsize = msg->lm_buflens[segment];
1352
1353         src = lustre_msg_buf(msg, segment + 1, 0);
1354         msg->lm_buflens[segment] = newsize;
1355         dst = lustre_msg_buf(msg, segment + 1, 0);
1356         msg->lm_buflens[segment] = oldsize;
1357
1358         /* move from segment + 1 to end segment */
1359         LASSERT(msg->lm_magic == LUSTRE_MSG_MAGIC_V2);
1360         oldmsg_size = lustre_msg_size_v2(msg->lm_bufcount, msg->lm_buflens);
1361         movesize = oldmsg_size - ((unsigned long) src - (unsigned long) msg);
1362         LASSERT(movesize >= 0);
1363
1364         if (movesize)
1365                 memmove(dst, src, movesize);
1366
1367         /* note we don't clear the ares where old data live, not secret */
1368
1369         /* finally set new segment size */
1370         msg->lm_buflens[segment] = newsize;
1371 }
1372 EXPORT_SYMBOL(_sptlrpc_enlarge_msg_inplace);
1373
1374 /*
1375  * enlarge @segment of upper message req->rq_reqmsg to @newsize, all data
1376  * will be preserved after enlargement. this must be called after rq_reqmsg has
1377  * been intialized at least.
1378  *
1379  * caller's attention: upon return, rq_reqmsg and rq_reqlen might have
1380  * been changed.
1381  */
1382 int sptlrpc_cli_enlarge_reqbuf(struct ptlrpc_request *req,
1383                                int segment, int newsize)
1384 {
1385         struct ptlrpc_cli_ctx    *ctx = req->rq_cli_ctx;
1386         struct ptlrpc_sec_cops   *cops;
1387         struct lustre_msg        *msg = req->rq_reqmsg;
1388
1389         LASSERT(ctx);
1390         LASSERT(msg);
1391         LASSERT(msg->lm_bufcount > segment);
1392         LASSERT(msg->lm_buflens[segment] <= newsize);
1393
1394         if (msg->lm_buflens[segment] == newsize)
1395                 return 0;
1396
1397         cops = ctx->cc_sec->ps_policy->sp_cops;
1398         LASSERT(cops->enlarge_reqbuf);
1399         return cops->enlarge_reqbuf(ctx->cc_sec, req, segment, newsize);
1400 }
1401 EXPORT_SYMBOL(sptlrpc_cli_enlarge_reqbuf);
1402
1403 int sptlrpc_cli_alloc_repbuf(struct ptlrpc_request *req, int msgsize)
1404 {
1405         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
1406         struct ptlrpc_sec_policy *policy;
1407         ENTRY;
1408
1409         LASSERT(ctx);
1410         LASSERT(atomic_read(&ctx->cc_refcount));
1411         LASSERT(ctx->cc_sec);
1412         LASSERT(ctx->cc_sec->ps_policy);
1413
1414         if (req->rq_repbuf)
1415                 RETURN(0);
1416
1417         policy = ctx->cc_sec->ps_policy;
1418         RETURN(policy->sp_cops->alloc_repbuf(ctx->cc_sec, req, msgsize));
1419 }
1420
1421 void sptlrpc_cli_free_repbuf(struct ptlrpc_request *req)
1422 {
1423         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
1424         struct ptlrpc_sec_policy *policy;
1425         ENTRY;
1426
1427         LASSERT(ctx);
1428         LASSERT(atomic_read(&ctx->cc_refcount));
1429         LASSERT(ctx->cc_sec);
1430         LASSERT(ctx->cc_sec->ps_policy);
1431
1432         if (req->rq_repbuf == NULL)
1433                 return;
1434         LASSERT(req->rq_repbuf_len);
1435
1436         policy = ctx->cc_sec->ps_policy;
1437         policy->sp_cops->free_repbuf(ctx->cc_sec, req);
1438         EXIT;
1439 }
1440
1441 int sptlrpc_cli_install_rvs_ctx(struct obd_import *imp,
1442                                 struct ptlrpc_cli_ctx *ctx)
1443 {
1444         struct ptlrpc_sec_policy *policy = ctx->cc_sec->ps_policy;
1445
1446         if (!policy->sp_cops->install_rctx)
1447                 return 0;
1448         return policy->sp_cops->install_rctx(imp, ctx->cc_sec, ctx);
1449 }
1450
1451 int sptlrpc_svc_install_rvs_ctx(struct obd_import *imp,
1452                                 struct ptlrpc_svc_ctx *ctx)
1453 {
1454         struct ptlrpc_sec_policy *policy = ctx->sc_policy;
1455
1456         if (!policy->sp_sops->install_rctx)
1457                 return 0;
1458         return policy->sp_sops->install_rctx(imp, ctx);
1459 }
1460
1461 /****************************************
1462  * server side security                 *
1463  ****************************************/
1464
1465 static int flavor_allowed(struct sptlrpc_flavor *exp,
1466                           struct ptlrpc_request *req)
1467 {
1468         struct sptlrpc_flavor *flvr = &req->rq_flvr;
1469
1470         if (exp->sf_rpc == flvr->sf_rpc)
1471                 return 1;
1472
1473         if ((req->rq_ctx_init || req->rq_ctx_fini) &&
1474             RPC_FLVR_POLICY(exp->sf_rpc) == RPC_FLVR_POLICY(flvr->sf_rpc) &&
1475             RPC_FLVR_MECH(exp->sf_rpc) == RPC_FLVR_MECH(flvr->sf_rpc))
1476                 return 1;
1477
1478         return 0;
1479 }
1480
1481 #define EXP_FLVR_UPDATE_EXPIRE      (OBD_TIMEOUT_DEFAULT + 10)
1482
1483 int sptlrpc_target_export_check(struct obd_export *exp,
1484                                 struct ptlrpc_request *req)
1485 {
1486         struct sptlrpc_flavor   flavor;
1487
1488         if (exp == NULL)
1489                 return 0;
1490
1491         /* client side export has no imp_reverse, skip
1492          * FIXME maybe we should check flavor this as well??? */
1493         if (exp->exp_imp_reverse == NULL)
1494                 return 0;
1495
1496         /* don't care about ctx fini rpc */
1497         if (req->rq_ctx_fini)
1498                 return 0;
1499
1500         spin_lock(&exp->exp_lock);
1501
1502         /* if flavor just changed (exp->exp_flvr_changed != 0), we wait for
1503          * the first req with the new flavor, then treat it as current flavor,
1504          * adapt reverse sec according to it.
1505          * note the first rpc with new flavor might not be with root ctx, in
1506          * which case delay the sec_adapt by leaving exp_flvr_adapt == 1. */
1507         if (unlikely(exp->exp_flvr_changed) &&
1508             flavor_allowed(&exp->exp_flvr_old[1], req)) {
1509                 /* make the new flavor as "current", and old ones as
1510                  * about-to-expire */
1511                 CDEBUG(D_SEC, "exp %p: just changed: %x->%x\n", exp,
1512                        exp->exp_flvr.sf_rpc, exp->exp_flvr_old[1].sf_rpc);
1513                 flavor = exp->exp_flvr_old[1];
1514                 exp->exp_flvr_old[1] = exp->exp_flvr_old[0];
1515                 exp->exp_flvr_expire[1] = exp->exp_flvr_expire[0];
1516                 exp->exp_flvr_old[0] = exp->exp_flvr;
1517                 exp->exp_flvr_expire[0] = cfs_time_current_sec() +
1518                                           EXP_FLVR_UPDATE_EXPIRE;
1519                 exp->exp_flvr = flavor;
1520
1521                 /* flavor change finished */
1522                 exp->exp_flvr_changed = 0;
1523                 LASSERT(exp->exp_flvr_adapt == 1);
1524
1525                 /* if it's gss, we only interested in root ctx init */
1526                 if (req->rq_auth_gss &&
1527                     !(req->rq_ctx_init && (req->rq_auth_usr_root ||
1528                                            req->rq_auth_usr_mdt))) {
1529                         spin_unlock(&exp->exp_lock);
1530                         CDEBUG(D_SEC, "is good but not root(%d:%d:%d:%d)\n",
1531                                req->rq_auth_gss, req->rq_ctx_init,
1532                                req->rq_auth_usr_root, req->rq_auth_usr_mdt);
1533                         return 0;
1534                 }
1535
1536                 exp->exp_flvr_adapt = 0;
1537                 spin_unlock(&exp->exp_lock);
1538
1539                 return sptlrpc_import_sec_adapt(exp->exp_imp_reverse,
1540                                                 req->rq_svc_ctx, flavor.sf_rpc);
1541         }
1542
1543         /* if it equals to the current flavor, we accept it, but need to
1544          * dealing with reverse sec/ctx */
1545         if (likely(flavor_allowed(&exp->exp_flvr, req))) {
1546                 /* most cases should return here, we only interested in
1547                  * gss root ctx init */
1548                 if (!req->rq_auth_gss || !req->rq_ctx_init ||
1549                     (!req->rq_auth_usr_root && !req->rq_auth_usr_mdt)) {
1550                         spin_unlock(&exp->exp_lock);
1551                         return 0;
1552                 }
1553
1554                 /* if flavor just changed, we should not proceed, just leave
1555                  * it and current flavor will be discovered and replaced
1556                  * shortly, and let _this_ rpc pass through */
1557                 if (exp->exp_flvr_changed) {
1558                         LASSERT(exp->exp_flvr_adapt);
1559                         spin_unlock(&exp->exp_lock);
1560                         return 0;
1561                 }
1562
1563                 if (exp->exp_flvr_adapt) {
1564                         exp->exp_flvr_adapt = 0;
1565                         CDEBUG(D_SEC, "exp %p (%x|%x|%x): do delayed adapt\n",
1566                                exp, exp->exp_flvr.sf_rpc,
1567                                exp->exp_flvr_old[0].sf_rpc,
1568                                exp->exp_flvr_old[1].sf_rpc);
1569                         flavor = exp->exp_flvr;
1570                         spin_unlock(&exp->exp_lock);
1571
1572                         return sptlrpc_import_sec_adapt(exp->exp_imp_reverse,
1573                                                         req->rq_svc_ctx,
1574                                                         flavor.sf_rpc);
1575                 } else {
1576                         CDEBUG(D_SEC, "exp %p (%x|%x|%x): is current flavor, "
1577                                "install rvs ctx\n", exp, exp->exp_flvr.sf_rpc,
1578                                exp->exp_flvr_old[0].sf_rpc,
1579                                exp->exp_flvr_old[1].sf_rpc);
1580                         spin_unlock(&exp->exp_lock);
1581
1582                         return sptlrpc_svc_install_rvs_ctx(exp->exp_imp_reverse,
1583                                                            req->rq_svc_ctx);
1584                 }
1585         }
1586
1587         if (exp->exp_flvr_expire[0]) {
1588                 if (exp->exp_flvr_expire[0] >= cfs_time_current_sec()) {
1589                         if (flavor_allowed(&exp->exp_flvr_old[0], req)) {
1590                                 CDEBUG(D_SEC, "exp %p (%x|%x|%x): match the "
1591                                        "middle one (%lu)\n", exp,
1592                                        exp->exp_flvr.sf_rpc,
1593                                        exp->exp_flvr_old[0].sf_rpc,
1594                                        exp->exp_flvr_old[1].sf_rpc,
1595                                        exp->exp_flvr_expire[0] -
1596                                                 cfs_time_current_sec());
1597                                 spin_unlock(&exp->exp_lock);
1598                                 return 0;
1599                         }
1600                 } else {
1601                         CDEBUG(D_SEC, "mark middle expired\n");
1602                         exp->exp_flvr_expire[0] = 0;
1603                 }
1604                 CDEBUG(D_SEC, "exp %p (%x|%x|%x): %x not match middle\n", exp,
1605                        exp->exp_flvr.sf_rpc,
1606                        exp->exp_flvr_old[0].sf_rpc, exp->exp_flvr_old[1].sf_rpc,
1607                        req->rq_flvr.sf_rpc);
1608         }
1609
1610         /* now it doesn't match the current flavor, the only chance we can
1611          * accept it is match the old flavors which is not expired. */
1612         if (exp->exp_flvr_changed == 0 && exp->exp_flvr_expire[1]) {
1613                 if (exp->exp_flvr_expire[1] >= cfs_time_current_sec()) {
1614                         if (flavor_allowed(&exp->exp_flvr_old[1], req)) {
1615                                 CDEBUG(D_SEC, "exp %p (%x|%x|%x): match the "
1616                                        "oldest one (%lu)\n", exp,
1617                                        exp->exp_flvr.sf_rpc,
1618                                        exp->exp_flvr_old[0].sf_rpc,
1619                                        exp->exp_flvr_old[1].sf_rpc,
1620                                        exp->exp_flvr_expire[1] -
1621                                                 cfs_time_current_sec());
1622                                 spin_unlock(&exp->exp_lock);
1623                                 return 0;
1624                         }
1625                 } else {
1626                         CDEBUG(D_SEC, "mark oldest expired\n");
1627                         exp->exp_flvr_expire[1] = 0;
1628                 }
1629                 CDEBUG(D_SEC, "exp %p (%x|%x|%x): %x not match found\n",
1630                        exp, exp->exp_flvr.sf_rpc,
1631                        exp->exp_flvr_old[0].sf_rpc, exp->exp_flvr_old[1].sf_rpc,
1632                        req->rq_flvr.sf_rpc);
1633         } else {
1634                 CDEBUG(D_SEC, "exp %p (%x|%x|%x): skip the last one\n",
1635                        exp, exp->exp_flvr.sf_rpc, exp->exp_flvr_old[0].sf_rpc,
1636                        exp->exp_flvr_old[1].sf_rpc);
1637         }
1638
1639         spin_unlock(&exp->exp_lock);
1640
1641         CWARN("req %p: (%u|%u|%u|%u|%u) with unauthorized flavor %x\n",
1642               req, req->rq_auth_gss, req->rq_ctx_init, req->rq_ctx_fini,
1643               req->rq_auth_usr_root, req->rq_auth_usr_mdt, req->rq_flvr.sf_rpc);
1644         return -EACCES;
1645 }
1646
1647 void sptlrpc_target_update_exp_flavor(struct obd_device *obd,
1648                                       struct sptlrpc_rule_set *rset)
1649 {
1650         struct obd_export       *exp;
1651         struct sptlrpc_flavor    new_flvr;
1652
1653         LASSERT(obd);
1654
1655         spin_lock(&obd->obd_dev_lock);
1656
1657         list_for_each_entry(exp, &obd->obd_exports, exp_obd_chain) {
1658                 if (exp->exp_connection == NULL)
1659                         continue;
1660
1661                 /* note if this export had just been updated flavor
1662                  * (exp_flvr_changed == 1), this will override the
1663                  * previous one. */
1664                 spin_lock(&exp->exp_lock);
1665                 sptlrpc_rule_set_choose(rset, exp->exp_sp_peer,
1666                                         exp->exp_connection->c_peer.nid,
1667                                         &new_flvr);
1668                 if (exp->exp_flvr_changed ||
1669                     memcmp(&new_flvr, &exp->exp_flvr, sizeof(new_flvr))) {
1670                         exp->exp_flvr_old[1] = new_flvr;
1671                         exp->exp_flvr_expire[1] = 0;
1672                         exp->exp_flvr_changed = 1;
1673                         exp->exp_flvr_adapt = 1;
1674                         CDEBUG(D_SEC, "exp %p (%s): updated flavor %x->%x\n",
1675                                exp, sptlrpc_part2name(exp->exp_sp_peer),
1676                                exp->exp_flvr.sf_rpc,
1677                                exp->exp_flvr_old[1].sf_rpc);
1678                 }
1679                 spin_unlock(&exp->exp_lock);
1680         }
1681
1682         spin_unlock(&obd->obd_dev_lock);
1683 }
1684 EXPORT_SYMBOL(sptlrpc_target_update_exp_flavor);
1685
1686 static int sptlrpc_svc_check_from(struct ptlrpc_request *req, int svc_rc)
1687 {
1688         if (svc_rc == SECSVC_DROP)
1689                 return SECSVC_DROP;
1690
1691         switch (req->rq_sp_from) {
1692         case LUSTRE_SP_CLI:
1693         case LUSTRE_SP_MDT:
1694         case LUSTRE_SP_OST:
1695         case LUSTRE_SP_MGS:
1696         case LUSTRE_SP_ANY:
1697                 break;
1698         default:
1699                 DEBUG_REQ(D_ERROR, req, "invalid source %u", req->rq_sp_from);
1700                 return SECSVC_DROP;
1701         }
1702
1703         if (!req->rq_auth_gss)
1704                 return svc_rc;
1705
1706         if (unlikely(req->rq_sp_from == LUSTRE_SP_ANY)) {
1707                 CERROR("not specific part\n");
1708                 return SECSVC_DROP;
1709         }
1710
1711         /* from MDT, must be authenticated as MDT */
1712         if (unlikely(req->rq_sp_from == LUSTRE_SP_MDT &&
1713                      !req->rq_auth_usr_mdt)) {
1714                 DEBUG_REQ(D_ERROR, req, "fake source MDT");
1715                 return SECSVC_DROP;
1716         }
1717
1718         /* from OST, must be callback to MDT and CLI, the reverse sec
1719          * was from mdt/root keytab, so it should be MDT or root FIXME */
1720         if (unlikely(req->rq_sp_from == LUSTRE_SP_OST &&
1721                      !req->rq_auth_usr_mdt && !req->rq_auth_usr_root)) {
1722                 DEBUG_REQ(D_ERROR, req, "fake source OST");
1723                 return SECSVC_DROP;
1724         }
1725
1726         return svc_rc;
1727 }
1728
1729 int sptlrpc_svc_unwrap_request(struct ptlrpc_request *req)
1730 {
1731         struct ptlrpc_sec_policy *policy;
1732         struct lustre_msg *msg = req->rq_reqbuf;
1733         int rc;
1734         ENTRY;
1735
1736         LASSERT(msg);
1737         LASSERT(req->rq_reqmsg == NULL);
1738         LASSERT(req->rq_repmsg == NULL);
1739
1740         req->rq_sp_from = LUSTRE_SP_ANY;
1741         req->rq_auth_uid = INVALID_UID;
1742         req->rq_auth_mapped_uid = INVALID_UID;
1743
1744         if (req->rq_reqdata_len < sizeof(struct lustre_msg)) {
1745                 CERROR("request size %d too small\n", req->rq_reqdata_len);
1746                 RETURN(SECSVC_DROP);
1747         }
1748
1749         if (msg->lm_magic == LUSTRE_MSG_MAGIC_V1 ||
1750             msg->lm_magic == LUSTRE_MSG_MAGIC_V1_SWABBED) {
1751                 /*
1752                  * v1 message, treat as to be null
1753                  */
1754                 req->rq_flvr.sf_rpc = SPTLRPC_FLVR_NULL;
1755         } else {
1756                 /*
1757                  * v2 message.
1758                  */
1759                 if (msg->lm_magic == LUSTRE_MSG_MAGIC_V2)
1760                         req->rq_flvr.sf_rpc = WIRE_FLVR_RPC(msg->lm_secflvr);
1761                 else
1762                         req->rq_flvr.sf_rpc = WIRE_FLVR_RPC(
1763                                                 __swab32(msg->lm_secflvr));
1764
1765                 /* unpack the wrapper message if the policy is not null */
1766                 if ((RPC_FLVR_POLICY(req->rq_flvr.sf_rpc) !=
1767                      SPTLRPC_POLICY_NULL) &&
1768                     lustre_unpack_msg(msg, req->rq_reqdata_len))
1769                         RETURN(SECSVC_DROP);
1770         }
1771
1772         policy = sptlrpc_rpcflavor2policy(req->rq_flvr.sf_rpc);
1773         if (!policy) {
1774                 CERROR("unsupported rpc flavor %x\n", req->rq_flvr.sf_rpc);
1775                 RETURN(SECSVC_DROP);
1776         }
1777
1778         LASSERT(policy->sp_sops->accept);
1779         rc = policy->sp_sops->accept(req);
1780
1781         LASSERT(req->rq_reqmsg || rc != SECSVC_OK);
1782         sptlrpc_policy_put(policy);
1783
1784         /* sanity check for the request source */
1785         rc = sptlrpc_svc_check_from(req, rc);
1786
1787         /* FIXME move to proper place */
1788         if (rc == SECSVC_OK) {
1789                 __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1790
1791                 if (opc == OST_WRITE)
1792                         req->rq_bulk_write = 1;
1793                 else if (opc == OST_READ)
1794                         req->rq_bulk_read = 1;
1795         }
1796
1797         LASSERT(req->rq_svc_ctx || rc == SECSVC_DROP);
1798         RETURN(rc);
1799 }
1800
1801 int sptlrpc_svc_alloc_rs(struct ptlrpc_request *req,
1802                          int msglen)
1803 {
1804         struct ptlrpc_sec_policy *policy;
1805         struct ptlrpc_reply_state *rs;
1806         int rc;
1807         ENTRY;
1808
1809         LASSERT(req->rq_svc_ctx);
1810         LASSERT(req->rq_svc_ctx->sc_policy);
1811
1812         policy = req->rq_svc_ctx->sc_policy;
1813         LASSERT(policy->sp_sops->alloc_rs);
1814
1815         rc = policy->sp_sops->alloc_rs(req, msglen);
1816         if (unlikely(rc == -ENOMEM)) {
1817                 /* failed alloc, try emergency pool */
1818                 rs = lustre_get_emerg_rs(req->rq_rqbd->rqbd_service);
1819                 if (rs == NULL)
1820                         RETURN(-ENOMEM);
1821
1822                 req->rq_reply_state = rs;
1823                 rc = policy->sp_sops->alloc_rs(req, msglen);
1824                 if (rc) {
1825                         lustre_put_emerg_rs(rs);
1826                         req->rq_reply_state = NULL;
1827                 }
1828         }
1829
1830         LASSERT(rc != 0 ||
1831                 (req->rq_reply_state && req->rq_reply_state->rs_msg));
1832
1833         RETURN(rc);
1834 }
1835
1836 int sptlrpc_svc_wrap_reply(struct ptlrpc_request *req)
1837 {
1838         struct ptlrpc_sec_policy *policy;
1839         int rc;
1840         ENTRY;
1841
1842         LASSERT(req->rq_svc_ctx);
1843         LASSERT(req->rq_svc_ctx->sc_policy);
1844
1845         policy = req->rq_svc_ctx->sc_policy;
1846         LASSERT(policy->sp_sops->authorize);
1847
1848         rc = policy->sp_sops->authorize(req);
1849         LASSERT(rc || req->rq_reply_state->rs_repdata_len);
1850
1851         RETURN(rc);
1852 }
1853
1854 void sptlrpc_svc_free_rs(struct ptlrpc_reply_state *rs)
1855 {
1856         struct ptlrpc_sec_policy *policy;
1857         unsigned int prealloc;
1858         ENTRY;
1859
1860         LASSERT(rs->rs_svc_ctx);
1861         LASSERT(rs->rs_svc_ctx->sc_policy);
1862
1863         policy = rs->rs_svc_ctx->sc_policy;
1864         LASSERT(policy->sp_sops->free_rs);
1865
1866         prealloc = rs->rs_prealloc;
1867         policy->sp_sops->free_rs(rs);
1868
1869         if (prealloc)
1870                 lustre_put_emerg_rs(rs);
1871         EXIT;
1872 }
1873
1874 void sptlrpc_svc_ctx_addref(struct ptlrpc_request *req)
1875 {
1876         struct ptlrpc_svc_ctx *ctx = req->rq_svc_ctx;
1877
1878         if (ctx == NULL)
1879                 return;
1880
1881         LASSERT(atomic_read(&ctx->sc_refcount) > 0);
1882         atomic_inc(&ctx->sc_refcount);
1883 }
1884
1885 void sptlrpc_svc_ctx_decref(struct ptlrpc_request *req)
1886 {
1887         struct ptlrpc_svc_ctx *ctx = req->rq_svc_ctx;
1888
1889         if (ctx == NULL)
1890                 return;
1891
1892         LASSERT(atomic_read(&ctx->sc_refcount) > 0);
1893         if (atomic_dec_and_test(&ctx->sc_refcount)) {
1894                 if (ctx->sc_policy->sp_sops->free_ctx)
1895                         ctx->sc_policy->sp_sops->free_ctx(ctx);
1896         }
1897         req->rq_svc_ctx = NULL;
1898 }
1899
1900 void sptlrpc_svc_ctx_invalidate(struct ptlrpc_request *req)
1901 {
1902         struct ptlrpc_svc_ctx *ctx = req->rq_svc_ctx;
1903
1904         if (ctx == NULL)
1905                 return;
1906
1907         LASSERT(atomic_read(&ctx->sc_refcount) > 0);
1908         if (ctx->sc_policy->sp_sops->invalidate_ctx)
1909                 ctx->sc_policy->sp_sops->invalidate_ctx(ctx);
1910 }
1911 EXPORT_SYMBOL(sptlrpc_svc_ctx_invalidate);
1912
1913 /****************************************
1914  * bulk security                        *
1915  ****************************************/
1916
1917 int sptlrpc_cli_wrap_bulk(struct ptlrpc_request *req,
1918                           struct ptlrpc_bulk_desc *desc)
1919 {
1920         struct ptlrpc_cli_ctx *ctx;
1921
1922         if (!req->rq_pack_bulk)
1923                 return 0;
1924
1925         LASSERT(req->rq_bulk_read || req->rq_bulk_write);
1926
1927         ctx = req->rq_cli_ctx;
1928         if (ctx->cc_ops->wrap_bulk)
1929                 return ctx->cc_ops->wrap_bulk(ctx, req, desc);
1930         return 0;
1931 }
1932 EXPORT_SYMBOL(sptlrpc_cli_wrap_bulk);
1933
1934 static
1935 void pga_to_bulk_desc(int nob, obd_count pg_count, struct brw_page **pga,
1936                       struct ptlrpc_bulk_desc *desc)
1937 {
1938         int i;
1939
1940         LASSERT(pga);
1941         LASSERT(*pga);
1942
1943         for (i = 0; i < pg_count && nob > 0; i++) {
1944 #ifdef __KERNEL__
1945                 desc->bd_iov[i].kiov_page = pga[i]->pg;
1946                 desc->bd_iov[i].kiov_len = pga[i]->count > nob ?
1947                                            nob : pga[i]->count;
1948                 desc->bd_iov[i].kiov_offset = pga[i]->off & ~CFS_PAGE_MASK;
1949 #else
1950 #warning FIXME for liblustre!
1951                 desc->bd_iov[i].iov_base = pga[i]->pg->addr;
1952                 desc->bd_iov[i].iov_len = pga[i]->count > nob ?
1953                                            nob : pga[i]->count;
1954 #endif
1955
1956                 desc->bd_iov_count++;
1957                 nob -= pga[i]->count;
1958         }
1959 }
1960
1961 int sptlrpc_cli_unwrap_bulk_read(struct ptlrpc_request *req,
1962                                  int nob, obd_count pg_count,
1963                                  struct brw_page **pga)
1964 {
1965         struct ptlrpc_bulk_desc *desc;
1966         struct ptlrpc_cli_ctx *ctx;
1967         int rc = 0;
1968
1969         if (!req->rq_pack_bulk)
1970                 return 0;
1971
1972         LASSERT(req->rq_bulk_read && !req->rq_bulk_write);
1973
1974         OBD_ALLOC(desc, offsetof(struct ptlrpc_bulk_desc, bd_iov[pg_count]));
1975         if (desc == NULL) {
1976                 CERROR("out of memory, can't verify bulk read data\n");
1977                 return -ENOMEM;
1978         }
1979
1980         pga_to_bulk_desc(nob, pg_count, pga, desc);
1981
1982         ctx = req->rq_cli_ctx;
1983         if (ctx->cc_ops->unwrap_bulk)
1984                 rc = ctx->cc_ops->unwrap_bulk(ctx, req, desc);
1985
1986         OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc, bd_iov[pg_count]));
1987
1988         return rc;
1989 }
1990 EXPORT_SYMBOL(sptlrpc_cli_unwrap_bulk_read);
1991
1992 int sptlrpc_cli_unwrap_bulk_write(struct ptlrpc_request *req,
1993                                   struct ptlrpc_bulk_desc *desc)
1994 {
1995         struct ptlrpc_cli_ctx *ctx;
1996
1997         if (!req->rq_pack_bulk)
1998                 return 0;
1999
2000         LASSERT(!req->rq_bulk_read && req->rq_bulk_write);
2001
2002         ctx = req->rq_cli_ctx;
2003         if (ctx->cc_ops->unwrap_bulk)
2004                 return ctx->cc_ops->unwrap_bulk(ctx, req, desc);
2005
2006         return 0;
2007 }
2008 EXPORT_SYMBOL(sptlrpc_cli_unwrap_bulk_write);
2009
2010 int sptlrpc_svc_wrap_bulk(struct ptlrpc_request *req,
2011                           struct ptlrpc_bulk_desc *desc)
2012 {
2013         struct ptlrpc_svc_ctx *ctx;
2014
2015         if (!req->rq_pack_bulk)
2016                 return 0;
2017
2018         LASSERT(req->rq_bulk_read || req->rq_bulk_write);
2019
2020         ctx = req->rq_svc_ctx;
2021         if (ctx->sc_policy->sp_sops->wrap_bulk)
2022                 return ctx->sc_policy->sp_sops->wrap_bulk(req, desc);
2023
2024         return 0;
2025 }
2026 EXPORT_SYMBOL(sptlrpc_svc_wrap_bulk);
2027
2028 int sptlrpc_svc_unwrap_bulk(struct ptlrpc_request *req,
2029                             struct ptlrpc_bulk_desc *desc)
2030 {
2031         struct ptlrpc_svc_ctx *ctx;
2032
2033         if (!req->rq_pack_bulk)
2034                 return 0;
2035
2036         LASSERT(req->rq_bulk_read || req->rq_bulk_write);
2037
2038         ctx = req->rq_svc_ctx;
2039         if (ctx->sc_policy->sp_sops->unwrap_bulk);
2040                 return ctx->sc_policy->sp_sops->unwrap_bulk(req, desc);
2041
2042         return 0;
2043 }
2044 EXPORT_SYMBOL(sptlrpc_svc_unwrap_bulk);
2045
2046
2047 /****************************************
2048  * user descriptor helpers              *
2049  ****************************************/
2050
2051 int sptlrpc_current_user_desc_size(void)
2052 {
2053         int ngroups;
2054
2055 #ifdef __KERNEL__
2056         ngroups = current_ngroups;
2057
2058         if (ngroups > LUSTRE_MAX_GROUPS)
2059                 ngroups = LUSTRE_MAX_GROUPS;
2060 #else
2061         ngroups = 0;
2062 #endif
2063         return sptlrpc_user_desc_size(ngroups);
2064 }
2065 EXPORT_SYMBOL(sptlrpc_current_user_desc_size);
2066
2067 int sptlrpc_pack_user_desc(struct lustre_msg *msg, int offset)
2068 {
2069         struct ptlrpc_user_desc *pud;
2070
2071         pud = lustre_msg_buf(msg, offset, 0);
2072
2073         pud->pud_uid = cfs_current()->uid;
2074         pud->pud_gid = cfs_current()->gid;
2075         pud->pud_fsuid = cfs_current()->fsuid;
2076         pud->pud_fsgid = cfs_current()->fsgid;
2077         pud->pud_cap = cfs_current()->cap_effective;
2078         pud->pud_ngroups = (msg->lm_buflens[offset] - sizeof(*pud)) / 4;
2079
2080 #ifdef __KERNEL__
2081         task_lock(current);
2082         if (pud->pud_ngroups > current_ngroups)
2083                 pud->pud_ngroups = current_ngroups;
2084         memcpy(pud->pud_groups, cfs_current()->group_info->blocks[0],
2085                pud->pud_ngroups * sizeof(__u32));
2086         task_unlock(current);
2087 #endif
2088
2089         return 0;
2090 }
2091 EXPORT_SYMBOL(sptlrpc_pack_user_desc);
2092
2093 int sptlrpc_unpack_user_desc(struct lustre_msg *msg, int offset)
2094 {
2095         struct ptlrpc_user_desc *pud;
2096         int                      i;
2097
2098         pud = lustre_msg_buf(msg, offset, sizeof(*pud));
2099         if (!pud)
2100                 return -EINVAL;
2101
2102         if (lustre_msg_swabbed(msg)) {
2103                 __swab32s(&pud->pud_uid);
2104                 __swab32s(&pud->pud_gid);
2105                 __swab32s(&pud->pud_fsuid);
2106                 __swab32s(&pud->pud_fsgid);
2107                 __swab32s(&pud->pud_cap);
2108                 __swab32s(&pud->pud_ngroups);
2109         }
2110
2111         if (pud->pud_ngroups > LUSTRE_MAX_GROUPS) {
2112                 CERROR("%u groups is too large\n", pud->pud_ngroups);
2113                 return -EINVAL;
2114         }
2115
2116         if (sizeof(*pud) + pud->pud_ngroups * sizeof(__u32) >
2117             msg->lm_buflens[offset]) {
2118                 CERROR("%u groups are claimed but bufsize only %u\n",
2119                        pud->pud_ngroups, msg->lm_buflens[offset]);
2120                 return -EINVAL;
2121         }
2122
2123         if (lustre_msg_swabbed(msg)) {
2124                 for (i = 0; i < pud->pud_ngroups; i++)
2125                         __swab32s(&pud->pud_groups[i]);
2126         }
2127
2128         return 0;
2129 }
2130 EXPORT_SYMBOL(sptlrpc_unpack_user_desc);
2131
2132 /****************************************
2133  * misc helpers                         *
2134  ****************************************/
2135
2136 const char * sec2target_str(struct ptlrpc_sec *sec)
2137 {
2138         if (!sec || !sec->ps_import || !sec->ps_import->imp_obd)
2139                 return "*";
2140         if (sec_is_reverse(sec))
2141                 return "c";
2142         return obd_uuid2str(&sec->ps_import->imp_obd->u.cli.cl_target_uuid);
2143 }
2144 EXPORT_SYMBOL(sec2target_str);
2145
2146 /****************************************
2147  * initialize/finalize                  *
2148  ****************************************/
2149
2150 int __init sptlrpc_init(void)
2151 {
2152         int rc;
2153
2154         rc = sptlrpc_gc_start_thread();
2155         if (rc)
2156                 goto out;
2157
2158         rc = sptlrpc_enc_pool_init();
2159         if (rc)
2160                 goto out_gc;
2161
2162         rc = sptlrpc_null_init();
2163         if (rc)
2164                 goto out_pool;
2165
2166         rc = sptlrpc_plain_init();
2167         if (rc)
2168                 goto out_null;
2169
2170         rc = sptlrpc_lproc_init();
2171         if (rc)
2172                 goto out_plain;
2173
2174         return 0;
2175
2176 out_plain:
2177         sptlrpc_plain_fini();
2178 out_null:
2179         sptlrpc_null_fini();
2180 out_pool:
2181         sptlrpc_enc_pool_fini();
2182 out_gc:
2183         sptlrpc_gc_stop_thread();
2184 out:
2185         return rc;
2186 }
2187
2188 void __exit sptlrpc_fini(void)
2189 {
2190         sptlrpc_lproc_fini();
2191         sptlrpc_plain_fini();
2192         sptlrpc_null_fini();
2193         sptlrpc_enc_pool_fini();
2194         sptlrpc_gc_stop_thread();
2195 }