Whamcloud - gitweb
LU-1146 build: batch update copyright messages
[fs/lustre-release.git] / lustre / ptlrpc / gss / gss_svc_upcall.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Modifications for Lustre
5  *
6  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
7  *
8  * Copyright (c) 2012, Whamcloud, Inc.
9  *
10  * Author: Eric Mei <ericm@clusterfs.com>
11  */
12
13 /*
14  * Neil Brown <neilb@cse.unsw.edu.au>
15  * J. Bruce Fields <bfields@umich.edu>
16  * Andy Adamson <andros@umich.edu>
17  * Dug Song <dugsong@monkey.org>
18  *
19  * RPCSEC_GSS server authentication.
20  * This implements RPCSEC_GSS as defined in rfc2203 (rpcsec_gss) and rfc2078
21  * (gssapi)
22  *
23  * The RPCSEC_GSS involves three stages:
24  *  1/ context creation
25  *  2/ data exchange
26  *  3/ context destruction
27  *
28  * Context creation is handled largely by upcalls to user-space.
29  *  In particular, GSS_Accept_sec_context is handled by an upcall
30  * Data exchange is handled entirely within the kernel
31  *  In particular, GSS_GetMIC, GSS_VerifyMIC, GSS_Seal, GSS_Unseal are in-kernel.
32  * Context destruction is handled in-kernel
33  *  GSS_Delete_sec_context is in-kernel
34  *
35  * Context creation is initiated by a RPCSEC_GSS_INIT request arriving.
36  * The context handle and gss_token are used as a key into the rpcsec_init cache.
37  * The content of this cache includes some of the outputs of GSS_Accept_sec_context,
38  * being major_status, minor_status, context_handle, reply_token.
39  * These are sent back to the client.
40  * Sequence window management is handled by the kernel.  The window size if currently
41  * a compile time constant.
42  *
43  * When user-space is happy that a context is established, it places an entry
44  * in the rpcsec_context cache. The key for this cache is the context_handle.
45  * The content includes:
46  *   uid/gidlist - for determining access rights
47  *   mechanism type
48  *   mechanism specific information, such as a key
49  *
50  */
51
52 #define DEBUG_SUBSYSTEM S_SEC
53 #ifdef __KERNEL__
54 #include <linux/types.h>
55 #include <linux/init.h>
56 #include <linux/module.h>
57 #include <linux/slab.h>
58 #include <linux/hash.h>
59 #include <linux/mutex.h>
60 #include <linux/sunrpc/cache.h>
61 #else
62 #include <liblustre.h>
63 #endif
64
65 #include <obd.h>
66 #include <obd_class.h>
67 #include <obd_support.h>
68 #include <lustre/lustre_idl.h>
69 #include <lustre_net.h>
70 #include <lustre_import.h>
71 #include <lustre_sec.h>
72
73 #include "gss_err.h"
74 #include "gss_internal.h"
75 #include "gss_api.h"
76
77 #define GSS_SVC_UPCALL_TIMEOUT  (20)
78
79 static cfs_spinlock_t __ctx_index_lock;
80 static __u64 __ctx_index;
81
82 __u64 gss_get_next_ctx_index(void)
83 {
84         __u64 idx;
85
86         cfs_spin_lock(&__ctx_index_lock);
87         idx = __ctx_index++;
88         cfs_spin_unlock(&__ctx_index_lock);
89
90         return idx;
91 }
92
93 static inline unsigned long hash_mem(char *buf, int length, int bits)
94 {
95         unsigned long hash = 0;
96         unsigned long l = 0;
97         int len = 0;
98         unsigned char c;
99
100         do {
101                 if (len == length) {
102                         c = (char) len;
103                         len = -1;
104                 } else
105                         c = *buf++;
106
107                 l = (l << 8) | c;
108                 len++;
109
110                 if ((len & (BITS_PER_LONG/8-1)) == 0)
111                         hash = cfs_hash_long(hash^l, BITS_PER_LONG);
112         } while (len);
113
114         return hash >> (BITS_PER_LONG - bits);
115 }
116
117 /****************************************
118  * rsi cache                            *
119  ****************************************/
120
121 #define RSI_HASHBITS    (6)
122 #define RSI_HASHMAX     (1 << RSI_HASHBITS)
123 #define RSI_HASHMASK    (RSI_HASHMAX - 1)
124
125 struct rsi {
126         struct cache_head       h;
127         __u32                   lustre_svc;
128         __u64                   nid;
129         cfs_waitq_t             waitq;
130         rawobj_t                in_handle, in_token;
131         rawobj_t                out_handle, out_token;
132         int                     major_status, minor_status;
133 };
134
135 static struct cache_head *rsi_table[RSI_HASHMAX];
136 static struct cache_detail rsi_cache;
137 #ifdef HAVE_SUNRPC_CACHE_V2
138 static struct rsi *rsi_update(struct rsi *new, struct rsi *old);
139 static struct rsi *rsi_lookup(struct rsi *item);
140 #else
141 static struct rsi *rsi_lookup(struct rsi *item, int set);
142 #endif
143
144 static inline int rsi_hash(struct rsi *item)
145 {
146         return hash_mem((char *)item->in_handle.data, item->in_handle.len,
147                         RSI_HASHBITS) ^
148                hash_mem((char *)item->in_token.data, item->in_token.len,
149                         RSI_HASHBITS);
150 }
151
152 static inline int __rsi_match(struct rsi *item, struct rsi *tmp)
153 {
154         return (rawobj_equal(&item->in_handle, &tmp->in_handle) &&
155                 rawobj_equal(&item->in_token, &tmp->in_token));
156 }
157
158 static void rsi_free(struct rsi *rsi)
159 {
160         rawobj_free(&rsi->in_handle);
161         rawobj_free(&rsi->in_token);
162         rawobj_free(&rsi->out_handle);
163         rawobj_free(&rsi->out_token);
164 }
165
166 static void rsi_request(struct cache_detail *cd,
167                         struct cache_head *h,
168                         char **bpp, int *blen)
169 {
170         struct rsi *rsi = container_of(h, struct rsi, h);
171         __u64 index = 0;
172
173         /* if in_handle is null, provide kernel suggestion */
174         if (rsi->in_handle.len == 0)
175                 index = gss_get_next_ctx_index();
176
177         qword_addhex(bpp, blen, (char *) &rsi->lustre_svc,
178                      sizeof(rsi->lustre_svc));
179         qword_addhex(bpp, blen, (char *) &rsi->nid, sizeof(rsi->nid));
180         qword_addhex(bpp, blen, (char *) &index, sizeof(index));
181         qword_addhex(bpp, blen, rsi->in_handle.data, rsi->in_handle.len);
182         qword_addhex(bpp, blen, rsi->in_token.data, rsi->in_token.len);
183         (*bpp)[-1] = '\n';
184 }
185
186 #ifdef HAVE_CACHE_UPCALL
187 static int rsi_upcall(struct cache_detail *cd, struct cache_head *h)
188 {
189         return sunrpc_cache_pipe_upcall(cd, h, rsi_request);
190 }
191 #endif
192
193 static inline void __rsi_init(struct rsi *new, struct rsi *item)
194 {
195         new->out_handle = RAWOBJ_EMPTY;
196         new->out_token = RAWOBJ_EMPTY;
197
198         new->in_handle = item->in_handle;
199         item->in_handle = RAWOBJ_EMPTY;
200         new->in_token = item->in_token;
201         item->in_token = RAWOBJ_EMPTY;
202
203         new->lustre_svc = item->lustre_svc;
204         new->nid = item->nid;
205         cfs_waitq_init(&new->waitq);
206 }
207
208 static inline void __rsi_update(struct rsi *new, struct rsi *item)
209 {
210         LASSERT(new->out_handle.len == 0);
211         LASSERT(new->out_token.len == 0);
212
213         new->out_handle = item->out_handle;
214         item->out_handle = RAWOBJ_EMPTY;
215         new->out_token = item->out_token;
216         item->out_token = RAWOBJ_EMPTY;
217
218         new->major_status = item->major_status;
219         new->minor_status = item->minor_status;
220 }
221
222 #ifdef HAVE_SUNRPC_CACHE_V2
223
224 static void rsi_put(struct kref *ref)
225 {
226         struct rsi *rsi = container_of(ref, struct rsi, h.ref);
227
228         LASSERT(rsi->h.next == NULL);
229         rsi_free(rsi);
230         OBD_FREE_PTR(rsi);
231 }
232
233 static int rsi_match(struct cache_head *a, struct cache_head *b)
234 {
235         struct rsi *item = container_of(a, struct rsi, h);
236         struct rsi *tmp = container_of(b, struct rsi, h);
237
238         return __rsi_match(item, tmp);
239 }
240
241 static void rsi_init(struct cache_head *cnew, struct cache_head *citem)
242 {
243         struct rsi *new = container_of(cnew, struct rsi, h);
244         struct rsi *item = container_of(citem, struct rsi, h);
245
246         __rsi_init(new, item);
247 }
248
249 static void update_rsi(struct cache_head *cnew, struct cache_head *citem)
250 {
251         struct rsi *new = container_of(cnew, struct rsi, h);
252         struct rsi *item = container_of(citem, struct rsi, h);
253
254         __rsi_update(new, item);
255 }
256
257 static struct cache_head *rsi_alloc(void)
258 {
259         struct rsi *rsi;
260
261         OBD_ALLOC_PTR(rsi);
262         if (rsi) 
263                 return &rsi->h;
264         else
265                 return NULL;
266 }
267
268 static int rsi_parse(struct cache_detail *cd, char *mesg, int mlen)
269 {
270         char           *buf = mesg;
271         char           *ep;
272         int             len;
273         struct rsi      rsii, *rsip = NULL;
274         time_t          expiry;
275         int             status = -EINVAL;
276         ENTRY;
277
278
279         memset(&rsii, 0, sizeof(rsii));
280
281         /* handle */
282         len = qword_get(&mesg, buf, mlen);
283         if (len < 0)
284                 goto out;
285         if (rawobj_alloc(&rsii.in_handle, buf, len)) {
286                 status = -ENOMEM;
287                 goto out;
288         }
289
290         /* token */
291         len = qword_get(&mesg, buf, mlen);
292         if (len < 0)
293                 goto out;
294         if (rawobj_alloc(&rsii.in_token, buf, len)) {
295                 status = -ENOMEM;
296                 goto out;
297         }
298
299         rsip = rsi_lookup(&rsii);
300         if (!rsip)
301                 goto out;
302
303         rsii.h.flags = 0;
304         /* expiry */
305         expiry = get_expiry(&mesg);
306         if (expiry == 0)
307                 goto out;
308
309         len = qword_get(&mesg, buf, mlen);
310         if (len <= 0)
311                 goto out;
312
313         /* major */
314         rsii.major_status = simple_strtol(buf, &ep, 10);
315         if (*ep)
316                 goto out;
317
318         /* minor */
319         len = qword_get(&mesg, buf, mlen);
320         if (len <= 0)
321                 goto out;
322         rsii.minor_status = simple_strtol(buf, &ep, 10);
323         if (*ep)
324                 goto out;
325
326         /* out_handle */
327         len = qword_get(&mesg, buf, mlen);
328         if (len < 0)
329                 goto out;
330         if (rawobj_alloc(&rsii.out_handle, buf, len)) {
331                 status = -ENOMEM;
332                 goto out;
333         }
334
335         /* out_token */
336         len = qword_get(&mesg, buf, mlen);
337         if (len < 0)
338                 goto out;
339         if (rawobj_alloc(&rsii.out_token, buf, len)) {
340                 status = -ENOMEM;
341                 goto out;
342         }
343
344         rsii.h.expiry_time = expiry;
345         rsip = rsi_update(&rsii, rsip);
346         status = 0;
347 out:
348         rsi_free(&rsii);
349         if (rsip) {
350                 cfs_waitq_broadcast(&rsip->waitq);
351                 cache_put(&rsip->h, &rsi_cache);
352         } else {
353                 status = -ENOMEM;
354         }
355
356         if (status)
357                 CERROR("rsi parse error %d\n", status);
358         RETURN(status);
359 }
360
361 #else /* !HAVE_SUNRPC_CACHE_V2 */
362
363 static void rsi_put(struct cache_head *item, struct cache_detail *cd)
364 {
365         struct rsi *rsi = container_of(item, struct rsi, h);
366
367         LASSERT(cfs_atomic_read(&item->refcnt) > 0);
368
369         if (cache_put(item, cd)) {
370                 LASSERT(item->next == NULL);
371                 rsi_free(rsi);
372                 kfree(rsi); /* created by cache mgmt using kmalloc */
373         }
374 }
375
376 static inline int rsi_match(struct rsi *item, struct rsi *tmp)
377 {
378         return __rsi_match(item, tmp);
379 }
380
381 static inline void rsi_init(struct rsi *new, struct rsi *item)
382 {
383         __rsi_init(new, item);
384 }
385
386 static inline void rsi_update(struct rsi *new, struct rsi *item)
387 {
388         __rsi_update(new, item);
389 }
390
391 static int rsi_parse(struct cache_detail *cd, char *mesg, int mlen)
392 {
393         char           *buf = mesg;
394         char           *ep;
395         int             len;
396         struct rsi      rsii, *rsip = NULL;
397         time_t          expiry;
398         int             status = -EINVAL;
399         ENTRY;
400
401
402         memset(&rsii, 0, sizeof(rsii));
403
404         /* handle */
405         len = qword_get(&mesg, buf, mlen);
406         if (len < 0)
407                 goto out;
408         if (rawobj_alloc(&rsii.in_handle, buf, len)) {
409                 status = -ENOMEM;
410                 goto out;
411         }
412
413         /* token */
414         len = qword_get(&mesg, buf, mlen);
415         if (len < 0)
416                 goto out;
417         if (rawobj_alloc(&rsii.in_token, buf, len)) {
418                 status = -ENOMEM;
419                 goto out;
420         }
421
422         /* expiry */
423         expiry = get_expiry(&mesg);
424         if (expiry == 0)
425                 goto out;
426
427         len = qword_get(&mesg, buf, mlen);
428         if (len <= 0)
429                 goto out;
430
431         /* major */
432         rsii.major_status = simple_strtol(buf, &ep, 10);
433         if (*ep)
434                 goto out;
435
436         /* minor */
437         len = qword_get(&mesg, buf, mlen);
438         if (len <= 0)
439                 goto out;
440         rsii.minor_status = simple_strtol(buf, &ep, 10);
441         if (*ep)
442                 goto out;
443
444         /* out_handle */
445         len = qword_get(&mesg, buf, mlen);
446         if (len < 0)
447                 goto out;
448         if (rawobj_alloc(&rsii.out_handle, buf, len)) {
449                 status = -ENOMEM;
450                 goto out;
451         }
452
453         /* out_token */
454         len = qword_get(&mesg, buf, mlen);
455         if (len < 0)
456                 goto out;
457         if (rawobj_alloc(&rsii.out_token, buf, len)) {
458                 status = -ENOMEM;
459                 goto out;
460         }
461
462         rsii.h.expiry_time = expiry;
463         rsip = rsi_lookup(&rsii, 1);
464         status = 0;
465 out:
466         rsi_free(&rsii);
467         if (rsip) {
468                 cfs_waitq_broadcast(&rsip->waitq);
469                 rsi_put(&rsip->h, &rsi_cache);
470         }
471
472         if (status)
473                 CERROR("rsi parse error %d\n", status);
474         RETURN(status);
475 }
476
477 #endif /* HAVE_SUNRPC_CACHE_V2 */
478
479 static struct cache_detail rsi_cache = {
480         .hash_size      = RSI_HASHMAX,
481         .hash_table     = rsi_table,
482         .name           = "auth.sptlrpc.init",
483         .cache_put      = rsi_put,
484 #ifdef HAVE_CACHE_UPCALL
485         .cache_upcall   = rsi_upcall,
486 #else
487         .cache_request  = rsi_request,
488 #endif
489         .cache_parse    = rsi_parse,
490 #ifdef HAVE_SUNRPC_CACHE_V2
491         .match          = rsi_match,
492         .init           = rsi_init,
493         .update         = update_rsi,
494         .alloc          = rsi_alloc,
495 #endif
496 };
497
498 #ifdef HAVE_SUNRPC_CACHE_V2
499
500 static struct rsi *rsi_lookup(struct rsi *item)
501 {
502         struct cache_head *ch;
503         int hash = rsi_hash(item);
504
505         ch = sunrpc_cache_lookup(&rsi_cache, &item->h, hash);
506         if (ch)
507                 return container_of(ch, struct rsi, h);
508         else
509                 return NULL;
510 }
511
512 static struct rsi *rsi_update(struct rsi *new, struct rsi *old)
513 {
514         struct cache_head *ch;
515         int hash = rsi_hash(new);
516
517         ch = sunrpc_cache_update(&rsi_cache, &new->h, &old->h, hash);
518         if (ch)
519                 return container_of(ch, struct rsi, h);
520         else
521                 return NULL;
522 }
523
524 #else
525
526 static DefineSimpleCacheLookup(rsi, 0)
527
528 #endif
529
530 /****************************************
531  * rsc cache                            *
532  ****************************************/
533
534 #define RSC_HASHBITS    (10)
535 #define RSC_HASHMAX     (1 << RSC_HASHBITS)
536 #define RSC_HASHMASK    (RSC_HASHMAX - 1)
537
538 struct rsc {
539         struct cache_head       h;
540         struct obd_device      *target;
541         rawobj_t                handle;
542         struct gss_svc_ctx      ctx;
543 };
544
545 static struct cache_head *rsc_table[RSC_HASHMAX];
546 static struct cache_detail rsc_cache;
547 #ifdef HAVE_SUNRPC_CACHE_V2
548 static struct rsc *rsc_update(struct rsc *new, struct rsc *old);
549 static struct rsc *rsc_lookup(struct rsc *item);
550 #else
551 static struct rsc *rsc_lookup(struct rsc *item, int set);
552 #endif
553
554 static void rsc_free(struct rsc *rsci)
555 {
556         rawobj_free(&rsci->handle);
557         rawobj_free(&rsci->ctx.gsc_rvs_hdl);
558         lgss_delete_sec_context(&rsci->ctx.gsc_mechctx);
559 }
560
561 static inline int rsc_hash(struct rsc *rsci)
562 {
563         return hash_mem((char *)rsci->handle.data,
564                         rsci->handle.len, RSC_HASHBITS);
565 }
566
567 static inline int __rsc_match(struct rsc *new, struct rsc *tmp)
568 {
569         return rawobj_equal(&new->handle, &tmp->handle);
570 }
571
572 static inline void __rsc_init(struct rsc *new, struct rsc *tmp)
573 {
574         new->handle = tmp->handle;
575         tmp->handle = RAWOBJ_EMPTY;
576
577         new->target = NULL;
578         memset(&new->ctx, 0, sizeof(new->ctx));
579         new->ctx.gsc_rvs_hdl = RAWOBJ_EMPTY;
580 }
581
582 static inline void __rsc_update(struct rsc *new, struct rsc *tmp)
583 {
584         new->ctx = tmp->ctx;
585         tmp->ctx.gsc_rvs_hdl = RAWOBJ_EMPTY;
586         tmp->ctx.gsc_mechctx = NULL;
587
588         memset(&new->ctx.gsc_seqdata, 0, sizeof(new->ctx.gsc_seqdata));
589         cfs_spin_lock_init(&new->ctx.gsc_seqdata.ssd_lock);
590 }
591
592 #ifdef HAVE_SUNRPC_CACHE_V2
593
594 static void rsc_put(struct kref *ref)
595 {
596         struct rsc *rsci = container_of(ref, struct rsc, h.ref);
597
598         LASSERT(rsci->h.next == NULL);
599         rsc_free(rsci);
600         OBD_FREE_PTR(rsci);
601 }
602
603 static int rsc_match(struct cache_head *a, struct cache_head *b)
604 {
605         struct rsc *new = container_of(a, struct rsc, h);
606         struct rsc *tmp = container_of(b, struct rsc, h);
607
608         return __rsc_match(new, tmp);
609 }
610
611 static void rsc_init(struct cache_head *cnew, struct cache_head *ctmp)
612 {
613         struct rsc *new = container_of(cnew, struct rsc, h);
614         struct rsc *tmp = container_of(ctmp, struct rsc, h);
615
616         __rsc_init(new, tmp);
617 }
618
619 static void update_rsc(struct cache_head *cnew, struct cache_head *ctmp)
620 {
621         struct rsc *new = container_of(cnew, struct rsc, h);
622         struct rsc *tmp = container_of(ctmp, struct rsc, h);
623
624         __rsc_update(new, tmp);
625 }
626
627 static struct cache_head * rsc_alloc(void)
628 {
629         struct rsc *rsc;
630
631         OBD_ALLOC_PTR(rsc);
632         if (rsc)
633                 return &rsc->h;
634         else
635                 return NULL;
636 }
637
638 static int rsc_parse(struct cache_detail *cd, char *mesg, int mlen)
639 {
640         char                *buf = mesg;
641         int                  len, rv, tmp_int;
642         struct rsc           rsci, *rscp = NULL;
643         time_t               expiry;
644         int                  status = -EINVAL;
645         struct gss_api_mech *gm = NULL;
646
647         memset(&rsci, 0, sizeof(rsci));
648
649         /* context handle */
650         len = qword_get(&mesg, buf, mlen);
651         if (len < 0) goto out;
652         status = -ENOMEM;
653         if (rawobj_alloc(&rsci.handle, buf, len))
654                 goto out;
655
656         rsci.h.flags = 0;
657         /* expiry */
658         expiry = get_expiry(&mesg);
659         status = -EINVAL;
660         if (expiry == 0)
661                 goto out;
662
663         /* remote flag */
664         rv = get_int(&mesg, &tmp_int);
665         if (rv) {
666                 CERROR("fail to get remote flag\n");
667                 goto out;
668         }
669         rsci.ctx.gsc_remote = (tmp_int != 0);
670
671         /* root user flag */
672         rv = get_int(&mesg, &tmp_int);
673         if (rv) {
674                 CERROR("fail to get oss user flag\n");
675                 goto out;
676         }
677         rsci.ctx.gsc_usr_root = (tmp_int != 0);
678
679         /* mds user flag */
680         rv = get_int(&mesg, &tmp_int);
681         if (rv) {
682                 CERROR("fail to get mds user flag\n");
683                 goto out;
684         }
685         rsci.ctx.gsc_usr_mds = (tmp_int != 0);
686
687         /* oss user flag */
688         rv = get_int(&mesg, &tmp_int);
689         if (rv) {
690                 CERROR("fail to get oss user flag\n");
691                 goto out;
692         }
693         rsci.ctx.gsc_usr_oss = (tmp_int != 0);
694
695         /* mapped uid */
696         rv = get_int(&mesg, (int *) &rsci.ctx.gsc_mapped_uid);
697         if (rv) {
698                 CERROR("fail to get mapped uid\n");
699                 goto out;
700         }
701
702         rscp = rsc_lookup(&rsci);
703         if (!rscp)
704                 goto out;
705
706         /* uid, or NEGATIVE */
707         rv = get_int(&mesg, (int *) &rsci.ctx.gsc_uid);
708         if (rv == -EINVAL)
709                 goto out;
710         if (rv == -ENOENT) {
711                 CERROR("NOENT? set rsc entry negative\n");
712                 cfs_set_bit(CACHE_NEGATIVE, &rsci.h.flags);
713         } else {
714                 rawobj_t tmp_buf;
715                 unsigned long ctx_expiry;
716
717                 /* gid */
718                 if (get_int(&mesg, (int *) &rsci.ctx.gsc_gid))
719                         goto out;
720
721                 /* mech name */
722                 len = qword_get(&mesg, buf, mlen);
723                 if (len < 0)
724                         goto out;
725                 gm = lgss_name_to_mech(buf);
726                 status = -EOPNOTSUPP;
727                 if (!gm)
728                         goto out;
729
730                 status = -EINVAL;
731                 /* mech-specific data: */
732                 len = qword_get(&mesg, buf, mlen);
733                 if (len < 0)
734                         goto out;
735
736                 tmp_buf.len = len;
737                 tmp_buf.data = (unsigned char *)buf;
738                 if (lgss_import_sec_context(&tmp_buf, gm,
739                                             &rsci.ctx.gsc_mechctx))
740                         goto out;
741
742                 /* currently the expiry time passed down from user-space
743                  * is invalid, here we retrive it from mech. */
744                 if (lgss_inquire_context(rsci.ctx.gsc_mechctx, &ctx_expiry)) {
745                         CERROR("unable to get expire time, drop it\n");
746                         goto out;
747                 }
748                 expiry = (time_t) ctx_expiry;
749         }
750
751         rsci.h.expiry_time = expiry;
752         rscp = rsc_update(&rsci, rscp);
753         status = 0;
754 out:
755         if (gm)
756                 lgss_mech_put(gm);
757         rsc_free(&rsci);
758         if (rscp)
759                 cache_put(&rscp->h, &rsc_cache);
760         else
761                 status = -ENOMEM;
762
763         if (status)
764                 CERROR("parse rsc error %d\n", status);
765         return status;
766 }
767
768 #else /* !HAVE_SUNRPC_CACHE_V2 */
769
770 static void rsc_put(struct cache_head *item, struct cache_detail *cd)
771 {
772         struct rsc *rsci = container_of(item, struct rsc, h);
773
774         LASSERT(cfs_atomic_read(&item->refcnt) > 0);
775
776         if (cache_put(item, cd)) {
777                 LASSERT(item->next == NULL);
778                 rsc_free(rsci);
779                 kfree(rsci); /* created by cache mgmt using kmalloc */
780         }
781 }
782
783 static inline int rsc_match(struct rsc *new, struct rsc *tmp)
784 {
785         return __rsc_match(new, tmp);
786 }
787
788 static inline void rsc_init(struct rsc *new, struct rsc *tmp)
789 {
790         __rsc_init(new, tmp);
791 }
792
793 static inline void rsc_update(struct rsc *new, struct rsc *tmp)
794 {
795         __rsc_update(new, tmp);
796 }
797
798 static int rsc_parse(struct cache_detail *cd, char *mesg, int mlen)
799 {
800         char       *buf = mesg;
801         int         len, rv, tmp_int;
802         struct rsc  rsci, *rscp = NULL;
803         time_t      expiry;
804         int         status = -EINVAL;
805
806         memset(&rsci, 0, sizeof(rsci));
807
808         /* context handle */
809         len = qword_get(&mesg, buf, mlen);
810         if (len < 0) goto out;
811         status = -ENOMEM;
812         if (rawobj_alloc(&rsci.handle, buf, len))
813                 goto out;
814
815         rsci.h.flags = 0;
816         /* expiry */
817         expiry = get_expiry(&mesg);
818         status = -EINVAL;
819         if (expiry == 0)
820                 goto out;
821
822         /* remote flag */
823         rv = get_int(&mesg, &tmp_int);
824         if (rv) {
825                 CERROR("fail to get remote flag\n");
826                 goto out;
827         }
828         rsci.ctx.gsc_remote = (tmp_int != 0);
829
830         /* root user flag */
831         rv = get_int(&mesg, &tmp_int);
832         if (rv) {
833                 CERROR("fail to get oss user flag\n");
834                 goto out;
835         }
836         rsci.ctx.gsc_usr_root = (tmp_int != 0);
837
838         /* mds user flag */
839         rv = get_int(&mesg, &tmp_int);
840         if (rv) {
841                 CERROR("fail to get mds user flag\n");
842                 goto out;
843         }
844         rsci.ctx.gsc_usr_mds = (tmp_int != 0);
845
846         /* oss user flag */
847         rv = get_int(&mesg, &tmp_int);
848         if (rv) {
849                 CERROR("fail to get oss user flag\n");
850                 goto out;
851         }
852         rsci.ctx.gsc_usr_oss = (tmp_int != 0);
853
854         /* mapped uid */
855         rv = get_int(&mesg, (int *) &rsci.ctx.gsc_mapped_uid);
856         if (rv) {
857                 CERROR("fail to get mapped uid\n");
858                 goto out;
859         }
860
861         /* uid, or NEGATIVE */
862         rv = get_int(&mesg, (int *) &rsci.ctx.gsc_uid);
863         if (rv == -EINVAL)
864                 goto out;
865         if (rv == -ENOENT) {
866                 CERROR("NOENT? set rsc entry negative\n");
867                 cfs_set_bit(CACHE_NEGATIVE, &rsci.h.flags);
868         } else {
869                 struct gss_api_mech *gm;
870                 rawobj_t tmp_buf;
871                 unsigned long ctx_expiry;
872
873                 /* gid */
874                 if (get_int(&mesg, (int *) &rsci.ctx.gsc_gid))
875                         goto out;
876
877                 /* mech name */
878                 len = qword_get(&mesg, buf, mlen);
879                 if (len < 0)
880                         goto out;
881                 gm = lgss_name_to_mech(buf);
882                 status = -EOPNOTSUPP;
883                 if (!gm)
884                         goto out;
885
886                 status = -EINVAL;
887                 /* mech-specific data: */
888                 len = qword_get(&mesg, buf, mlen);
889                 if (len < 0) {
890                         lgss_mech_put(gm);
891                         goto out;
892                 }
893                 tmp_buf.len = len;
894                 tmp_buf.data = (unsigned char *)buf;
895                 if (lgss_import_sec_context(&tmp_buf, gm,
896                                             &rsci.ctx.gsc_mechctx)) {
897                         lgss_mech_put(gm);
898                         goto out;
899                 }
900
901                 /* currently the expiry time passed down from user-space
902                  * is invalid, here we retrive it from mech. */
903                 if (lgss_inquire_context(rsci.ctx.gsc_mechctx, &ctx_expiry)) {
904                         CERROR("unable to get expire time, drop it\n");
905                         lgss_mech_put(gm);
906                         goto out;
907                 }
908                 expiry = (time_t) ctx_expiry;
909
910                 lgss_mech_put(gm);
911         }
912
913         rsci.h.expiry_time = expiry;
914         rscp = rsc_lookup(&rsci, 1);
915         status = 0;
916 out:
917         rsc_free(&rsci);
918         if (rscp)
919                 rsc_put(&rscp->h, &rsc_cache);
920
921         if (status)
922                 CERROR("parse rsc error %d\n", status);
923         return status;
924 }
925
926 #endif /* HAVE_SUNRPC_CACHE_V2 */
927
928
929 static struct cache_detail rsc_cache = {
930         .hash_size      = RSC_HASHMAX,
931         .hash_table     = rsc_table,
932         .name           = "auth.sptlrpc.context",
933         .cache_put      = rsc_put,
934         .cache_parse    = rsc_parse,
935 #ifdef HAVE_SUNRPC_CACHE_V2
936         .match          = rsc_match,
937         .init           = rsc_init,
938         .update         = update_rsc,
939         .alloc          = rsc_alloc,
940 #endif
941 };
942
943 #ifdef HAVE_SUNRPC_CACHE_V2
944
945 static struct rsc *rsc_lookup(struct rsc *item)
946 {
947         struct cache_head *ch;
948         int                hash = rsc_hash(item);
949
950         ch = sunrpc_cache_lookup(&rsc_cache, &item->h, hash);
951         if (ch)
952                 return container_of(ch, struct rsc, h);
953         else
954                 return NULL;
955 }
956
957 static struct rsc *rsc_update(struct rsc *new, struct rsc *old)
958 {
959         struct cache_head *ch;
960         int                hash = rsc_hash(new);
961
962         ch = sunrpc_cache_update(&rsc_cache, &new->h, &old->h, hash);
963         if (ch)
964                 return container_of(ch, struct rsc, h);
965         else
966                 return NULL;
967 }
968
969 #define COMPAT_RSC_PUT(item, cd)        cache_put((item), (cd))
970
971 #else
972
973 static DefineSimpleCacheLookup(rsc, 0);
974
975 #define COMPAT_RSC_PUT(item, cd)        rsc_put((item), (cd))
976
977 #endif
978
979 /****************************************
980  * rsc cache flush                      *
981  ****************************************/
982
983 typedef int rsc_entry_match(struct rsc *rscp, long data);
984
985 static void rsc_flush(rsc_entry_match *match, long data)
986 {
987         struct cache_head **ch;
988         struct rsc *rscp;
989         int n;
990         ENTRY;
991
992         cfs_write_lock(&rsc_cache.hash_lock);
993         for (n = 0; n < RSC_HASHMAX; n++) {
994                 for (ch = &rsc_cache.hash_table[n]; *ch;) {
995                         rscp = container_of(*ch, struct rsc, h);
996
997                         if (!match(rscp, data)) {
998                                 ch = &((*ch)->next);
999                                 continue;
1000                         }
1001
1002                         /* it seems simply set NEGATIVE doesn't work */
1003                         *ch = (*ch)->next;
1004                         rscp->h.next = NULL;
1005                         cache_get(&rscp->h);
1006                         cfs_set_bit(CACHE_NEGATIVE, &rscp->h.flags);
1007                         COMPAT_RSC_PUT(&rscp->h, &rsc_cache);
1008                         rsc_cache.entries--;
1009                 }
1010         }
1011         cfs_write_unlock(&rsc_cache.hash_lock);
1012         EXIT;
1013 }
1014
1015 static int match_uid(struct rsc *rscp, long uid)
1016 {
1017         if ((int) uid == -1)
1018                 return 1;
1019         return ((int) rscp->ctx.gsc_uid == (int) uid);
1020 }
1021
1022 static int match_target(struct rsc *rscp, long target)
1023 {
1024         return (rscp->target == (struct obd_device *) target);
1025 }
1026
1027 static inline void rsc_flush_uid(int uid)
1028 {
1029         if (uid == -1)
1030                 CWARN("flush all gss contexts...\n");
1031
1032         rsc_flush(match_uid, (long) uid);
1033 }
1034
1035 static inline void rsc_flush_target(struct obd_device *target)
1036 {
1037         rsc_flush(match_target, (long) target);
1038 }
1039
1040 void gss_secsvc_flush(struct obd_device *target)
1041 {
1042         rsc_flush_target(target);
1043 }
1044 EXPORT_SYMBOL(gss_secsvc_flush);
1045
1046 static struct rsc *gss_svc_searchbyctx(rawobj_t *handle)
1047 {
1048         struct rsc  rsci;
1049         struct rsc *found;
1050
1051         memset(&rsci, 0, sizeof(rsci));
1052         if (rawobj_dup(&rsci.handle, handle))
1053                 return NULL;
1054
1055 #ifdef HAVE_SUNRPC_CACHE_V2
1056         found = rsc_lookup(&rsci);
1057 #else
1058         found = rsc_lookup(&rsci, 0);
1059 #endif
1060         rsc_free(&rsci);
1061         if (!found)
1062                 return NULL;
1063         if (cache_check(&rsc_cache, &found->h, NULL))
1064                 return NULL;
1065         return found;
1066 }
1067
1068 #ifdef HAVE_SUNRPC_CACHE_V2
1069
1070 int gss_svc_upcall_install_rvs_ctx(struct obd_import *imp,
1071                                    struct gss_sec *gsec,
1072                                    struct gss_cli_ctx *gctx)
1073 {
1074         struct rsc      rsci, *rscp = NULL;
1075         unsigned long   ctx_expiry;
1076         __u32           major;
1077         int             rc;
1078         ENTRY;
1079
1080         memset(&rsci, 0, sizeof(rsci));
1081
1082         if (rawobj_alloc(&rsci.handle, (char *) &gsec->gs_rvs_hdl,
1083                          sizeof(gsec->gs_rvs_hdl)))
1084                 GOTO(out, rc = -ENOMEM);
1085
1086         rscp = rsc_lookup(&rsci);
1087         if (rscp == NULL)
1088                 GOTO(out, rc = -ENOMEM);
1089
1090         major = lgss_copy_reverse_context(gctx->gc_mechctx,
1091                                           &rsci.ctx.gsc_mechctx);
1092         if (major != GSS_S_COMPLETE)
1093                 GOTO(out, rc = -ENOMEM);
1094
1095         if (lgss_inquire_context(rsci.ctx.gsc_mechctx, &ctx_expiry)) {
1096                 CERROR("unable to get expire time, drop it\n");
1097                 GOTO(out, rc = -EINVAL);
1098         }
1099         rsci.h.expiry_time = (time_t) ctx_expiry;
1100
1101         if (strcmp(imp->imp_obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0)
1102                 rsci.ctx.gsc_usr_mds = 1;
1103         else if (strcmp(imp->imp_obd->obd_type->typ_name, LUSTRE_OSC_NAME) == 0)
1104                 rsci.ctx.gsc_usr_oss = 1;
1105         else
1106                 rsci.ctx.gsc_usr_root = 1;
1107
1108         rscp = rsc_update(&rsci, rscp);
1109         if (rscp == NULL)
1110                 GOTO(out, rc = -ENOMEM);
1111
1112         rscp->target = imp->imp_obd;
1113         rawobj_dup(&gctx->gc_svc_handle, &rscp->handle);
1114
1115         CWARN("create reverse svc ctx %p to %s: idx "LPX64"\n",
1116               &rscp->ctx, obd2cli_tgt(imp->imp_obd), gsec->gs_rvs_hdl);
1117         rc = 0;
1118 out:
1119         if (rscp)
1120                 cache_put(&rscp->h, &rsc_cache);
1121         rsc_free(&rsci);
1122
1123         if (rc)
1124                 CERROR("create reverse svc ctx: idx "LPX64", rc %d\n",
1125                        gsec->gs_rvs_hdl, rc);
1126         RETURN(rc);
1127 }
1128
1129 #else /* !HAVE_SUNRPC_CACHE_V2 */
1130
1131 int gss_svc_upcall_install_rvs_ctx(struct obd_import *imp,
1132                                    struct gss_sec *gsec,
1133                                    struct gss_cli_ctx *gctx)
1134 {
1135         struct rsc      rsci, *rscp;
1136         unsigned long   ctx_expiry;
1137         __u32           major;
1138         int             rc;
1139         ENTRY;
1140
1141         memset(&rsci, 0, sizeof(rsci));
1142
1143         if (rawobj_alloc(&rsci.handle, (char *) &gsec->gs_rvs_hdl,
1144                          sizeof(gsec->gs_rvs_hdl)))
1145                 GOTO(out, rc = -ENOMEM);
1146
1147         major = lgss_copy_reverse_context(gctx->gc_mechctx,
1148                                           &rsci.ctx.gsc_mechctx);
1149         if (major != GSS_S_COMPLETE)
1150                 GOTO(out, rc = -ENOMEM);
1151
1152         if (lgss_inquire_context(rsci.ctx.gsc_mechctx, &ctx_expiry)) {
1153                 CERROR("unable to get expire time, drop it\n");
1154                 GOTO(out, rc = -ENOMEM);
1155         }
1156         rsci.h.expiry_time = (time_t) ctx_expiry;
1157
1158         if (strcmp(imp->imp_obd->obd_type->typ_name, LUSTRE_MDC_NAME) == 0)
1159                 rsci.ctx.gsc_usr_mds = 1;
1160         else if (strcmp(imp->imp_obd->obd_type->typ_name, LUSTRE_OSC_NAME) == 0)
1161                 rsci.ctx.gsc_usr_oss = 1;
1162         else
1163                 rsci.ctx.gsc_usr_root = 1;
1164
1165         rscp = rsc_lookup(&rsci, 1);
1166         if (rscp == NULL) {
1167                 CERROR("rsc lookup failed\n");
1168                 GOTO(out, rc = -ENOMEM);
1169         }
1170
1171         rscp->target = imp->imp_obd;
1172         rawobj_dup(&gctx->gc_svc_handle, &rscp->handle);
1173
1174         CWARN("create reverse svc ctx %p to %s: idx "LPX64"\n",
1175               &rscp->ctx, obd2cli_tgt(imp->imp_obd), gsec->gs_rvs_hdl);
1176         rsc_put(&rscp->h, &rsc_cache);
1177         rc = 0;
1178 out:
1179         rsc_free(&rsci);
1180         if (rc)
1181                 CERROR("create reverse svc ctx: idx "LPX64", rc %d\n",
1182                        gsec->gs_rvs_hdl, rc);
1183         RETURN(rc);
1184 }
1185
1186 #endif /* HAVE_SUNRPC_CACHE_V2 */
1187
1188 int gss_svc_upcall_expire_rvs_ctx(rawobj_t *handle)
1189 {
1190         const cfs_time_t        expire = 20;
1191         struct rsc             *rscp;
1192
1193         rscp = gss_svc_searchbyctx(handle);
1194         if (rscp) {
1195                 CDEBUG(D_SEC, "reverse svcctx %p (rsc %p) expire soon\n",
1196                        &rscp->ctx, rscp);
1197
1198                 rscp->h.expiry_time = cfs_time_current_sec() + expire;
1199                 COMPAT_RSC_PUT(&rscp->h, &rsc_cache);
1200         }
1201         return 0;
1202 }
1203
1204 int gss_svc_upcall_dup_handle(rawobj_t *handle, struct gss_svc_ctx *ctx)
1205 {
1206         struct rsc *rscp = container_of(ctx, struct rsc, ctx);
1207
1208         return rawobj_dup(handle, &rscp->handle);
1209 }
1210
1211 int gss_svc_upcall_update_sequence(rawobj_t *handle, __u32 seq)
1212 {
1213         struct rsc             *rscp;
1214
1215         rscp = gss_svc_searchbyctx(handle);
1216         if (rscp) {
1217                 CDEBUG(D_SEC, "reverse svcctx %p (rsc %p) update seq to %u\n",
1218                        &rscp->ctx, rscp, seq + 1);
1219
1220                 rscp->ctx.gsc_rvs_seq = seq + 1;
1221                 COMPAT_RSC_PUT(&rscp->h, &rsc_cache);
1222         }
1223         return 0;
1224 }
1225
1226 static struct cache_deferred_req* cache_upcall_defer(struct cache_req *req)
1227 {
1228         return NULL;
1229 }
1230 static struct cache_req cache_upcall_chandle = { cache_upcall_defer };
1231
1232 int gss_svc_upcall_handle_init(struct ptlrpc_request *req,
1233                                struct gss_svc_reqctx *grctx,
1234                                struct gss_wire_ctx *gw,
1235                                struct obd_device *target,
1236                                __u32 lustre_svc,
1237                                rawobj_t *rvs_hdl,
1238                                rawobj_t *in_token)
1239 {
1240         struct ptlrpc_reply_state *rs;
1241         struct rsc                *rsci = NULL;
1242         struct rsi                *rsip = NULL, rsikey;
1243         cfs_waitlink_t             wait;
1244         int                        replen = sizeof(struct ptlrpc_body);
1245         struct gss_rep_header     *rephdr;
1246         int                        first_check = 1;
1247         int                        rc = SECSVC_DROP;
1248         ENTRY;
1249
1250         memset(&rsikey, 0, sizeof(rsikey));
1251         rsikey.lustre_svc = lustre_svc;
1252         rsikey.nid = (__u64) req->rq_peer.nid;
1253
1254         /* duplicate context handle. for INIT it always 0 */
1255         if (rawobj_dup(&rsikey.in_handle, &gw->gw_handle)) {
1256                 CERROR("fail to dup context handle\n");
1257                 GOTO(out, rc);
1258         }
1259
1260         if (rawobj_dup(&rsikey.in_token, in_token)) {
1261                 CERROR("can't duplicate token\n");
1262                 rawobj_free(&rsikey.in_handle);
1263                 GOTO(out, rc);
1264         }
1265
1266 #ifdef HAVE_SUNRPC_CACHE_V2
1267         rsip = rsi_lookup(&rsikey);
1268 #else
1269         rsip = rsi_lookup(&rsikey, 0);
1270 #endif
1271         rsi_free(&rsikey);
1272         if (!rsip) {
1273                 CERROR("error in rsi_lookup.\n");
1274
1275                 if (!gss_pack_err_notify(req, GSS_S_FAILURE, 0))
1276                         rc = SECSVC_COMPLETE;
1277
1278                 GOTO(out, rc);
1279         }
1280
1281         cache_get(&rsip->h); /* take an extra ref */
1282         cfs_waitq_init(&rsip->waitq);
1283         cfs_waitlink_init(&wait);
1284         cfs_waitq_add(&rsip->waitq, &wait);
1285
1286 cache_check:
1287         /* Note each time cache_check() will drop a reference if return
1288          * non-zero. We hold an extra reference on initial rsip, but must
1289          * take care of following calls. */
1290         rc = cache_check(&rsi_cache, &rsip->h, &cache_upcall_chandle);
1291         switch (rc) {
1292         case -EAGAIN: {
1293                 int valid;
1294
1295                 if (first_check) {
1296                         first_check = 0;
1297
1298                         read_lock(&rsi_cache.hash_lock);
1299                         valid = cfs_test_bit(CACHE_VALID, &rsip->h.flags);
1300                         if (valid == 0)
1301                                 cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
1302                         read_unlock(&rsi_cache.hash_lock);
1303
1304                         if (valid == 0)
1305                                 cfs_schedule_timeout(GSS_SVC_UPCALL_TIMEOUT *
1306                                                      CFS_HZ);
1307
1308                         cache_get(&rsip->h);
1309                         goto cache_check;
1310                 }
1311                 CWARN("waited %ds timeout, drop\n", GSS_SVC_UPCALL_TIMEOUT);
1312                 break;
1313         }
1314         case -ENOENT:
1315                 CWARN("cache_check return ENOENT, drop\n");
1316                 break;
1317         case 0:
1318                 /* if not the first check, we have to release the extra
1319                  * reference we just added on it. */
1320                 if (!first_check)
1321                         cache_put(&rsip->h, &rsi_cache);
1322                 CDEBUG(D_SEC, "cache_check is good\n");
1323                 break;
1324         }
1325
1326         cfs_waitq_del(&rsip->waitq, &wait);
1327         cache_put(&rsip->h, &rsi_cache);
1328
1329         if (rc)
1330                 GOTO(out, rc = SECSVC_DROP);
1331
1332         rc = SECSVC_DROP;
1333         rsci = gss_svc_searchbyctx(&rsip->out_handle);
1334         if (!rsci) {
1335                 CERROR("authentication failed\n");
1336
1337                 if (!gss_pack_err_notify(req, GSS_S_FAILURE, 0))
1338                         rc = SECSVC_COMPLETE;
1339
1340                 GOTO(out, rc);
1341         } else {
1342                 cache_get(&rsci->h);
1343                 grctx->src_ctx = &rsci->ctx;
1344         }
1345
1346         if (rawobj_dup(&rsci->ctx.gsc_rvs_hdl, rvs_hdl)) {
1347                 CERROR("failed duplicate reverse handle\n");
1348                 GOTO(out, rc);
1349         }
1350
1351         rsci->target = target;
1352
1353         CDEBUG(D_SEC, "server create rsc %p(%u->%s)\n",
1354                rsci, rsci->ctx.gsc_uid, libcfs_nid2str(req->rq_peer.nid));
1355
1356         if (rsip->out_handle.len > PTLRPC_GSS_MAX_HANDLE_SIZE) {
1357                 CERROR("handle size %u too large\n", rsip->out_handle.len);
1358                 GOTO(out, rc = SECSVC_DROP);
1359         }
1360
1361         grctx->src_init = 1;
1362         grctx->src_reserve_len = cfs_size_round4(rsip->out_token.len);
1363
1364         rc = lustre_pack_reply_v2(req, 1, &replen, NULL, 0);
1365         if (rc) {
1366                 CERROR("failed to pack reply: %d\n", rc);
1367                 GOTO(out, rc = SECSVC_DROP);
1368         }
1369
1370         rs = req->rq_reply_state;
1371         LASSERT(rs->rs_repbuf->lm_bufcount == 3);
1372         LASSERT(rs->rs_repbuf->lm_buflens[0] >=
1373                 sizeof(*rephdr) + rsip->out_handle.len);
1374         LASSERT(rs->rs_repbuf->lm_buflens[2] >= rsip->out_token.len);
1375
1376         rephdr = lustre_msg_buf(rs->rs_repbuf, 0, 0);
1377         rephdr->gh_version = PTLRPC_GSS_VERSION;
1378         rephdr->gh_flags = 0;
1379         rephdr->gh_proc = PTLRPC_GSS_PROC_ERR;
1380         rephdr->gh_major = rsip->major_status;
1381         rephdr->gh_minor = rsip->minor_status;
1382         rephdr->gh_seqwin = GSS_SEQ_WIN;
1383         rephdr->gh_handle.len = rsip->out_handle.len;
1384         memcpy(rephdr->gh_handle.data, rsip->out_handle.data,
1385                rsip->out_handle.len);
1386
1387         memcpy(lustre_msg_buf(rs->rs_repbuf, 2, 0), rsip->out_token.data,
1388                rsip->out_token.len);
1389
1390         rs->rs_repdata_len = lustre_shrink_msg(rs->rs_repbuf, 2,
1391                                                rsip->out_token.len, 0);
1392
1393         rc = SECSVC_OK;
1394
1395 out:
1396         /* it looks like here we should put rsip also, but this mess up
1397          * with NFS cache mgmt code... FIXME */
1398 #if 0
1399         if (rsip)
1400                 rsi_put(&rsip->h, &rsi_cache);
1401 #endif
1402
1403         if (rsci) {
1404                 /* if anything went wrong, we don't keep the context too */
1405                 if (rc != SECSVC_OK)
1406                         cfs_set_bit(CACHE_NEGATIVE, &rsci->h.flags);
1407                 else
1408                         CDEBUG(D_SEC, "create rsc with idx "LPX64"\n",
1409                                gss_handle_to_u64(&rsci->handle));
1410
1411                 COMPAT_RSC_PUT(&rsci->h, &rsc_cache);
1412         }
1413         RETURN(rc);
1414 }
1415
1416 struct gss_svc_ctx *gss_svc_upcall_get_ctx(struct ptlrpc_request *req,
1417                                            struct gss_wire_ctx *gw)
1418 {
1419         struct rsc *rsc;
1420
1421         rsc = gss_svc_searchbyctx(&gw->gw_handle);
1422         if (!rsc) {
1423                 CWARN("Invalid gss ctx idx "LPX64" from %s\n",
1424                       gss_handle_to_u64(&gw->gw_handle),
1425                       libcfs_nid2str(req->rq_peer.nid));
1426                 return NULL;
1427         }
1428
1429         return &rsc->ctx;
1430 }
1431
1432 void gss_svc_upcall_put_ctx(struct gss_svc_ctx *ctx)
1433 {
1434         struct rsc *rsc = container_of(ctx, struct rsc, ctx);
1435
1436         COMPAT_RSC_PUT(&rsc->h, &rsc_cache);
1437 }
1438
1439 void gss_svc_upcall_destroy_ctx(struct gss_svc_ctx *ctx)
1440 {
1441         struct rsc *rsc = container_of(ctx, struct rsc, ctx);
1442
1443         /* can't be found */
1444         cfs_set_bit(CACHE_NEGATIVE, &rsc->h.flags);
1445         /* to be removed at next scan */
1446         rsc->h.expiry_time = 1;
1447 }
1448
1449 int __init gss_init_svc_upcall(void)
1450 {
1451         int     i;
1452
1453         cfs_spin_lock_init(&__ctx_index_lock);
1454         /*
1455          * this helps reducing context index confliction. after server reboot,
1456          * conflicting request from clients might be filtered out by initial
1457          * sequence number checking, thus no chance to sent error notification
1458          * back to clients.
1459          */
1460         cfs_get_random_bytes(&__ctx_index, sizeof(__ctx_index));
1461
1462
1463         cache_register(&rsi_cache);
1464         cache_register(&rsc_cache);
1465
1466         /* FIXME this looks stupid. we intend to give lsvcgssd a chance to open
1467          * the init upcall channel, otherwise there's big chance that the first
1468          * upcall issued before the channel be opened thus nfsv4 cache code will
1469          * drop the request direclty, thus lead to unnecessary recovery time.
1470          * here we wait at miximum 1.5 seconds. */
1471         for (i = 0; i < 6; i++) {
1472                 if (atomic_read(&rsi_cache.readers) > 0)
1473                         break;
1474                 cfs_set_current_state(TASK_UNINTERRUPTIBLE);
1475                 LASSERT(CFS_HZ >= 4);
1476                 cfs_schedule_timeout(CFS_HZ / 4);
1477         }
1478
1479         if (atomic_read(&rsi_cache.readers) == 0)
1480                 CWARN("Init channel is not opened by lsvcgssd, following "
1481                       "request might be dropped until lsvcgssd is active\n");
1482
1483         return 0;
1484 }
1485
1486 void __exit gss_exit_svc_upcall(void)
1487 {
1488         cache_purge(&rsi_cache);
1489         cache_unregister(&rsi_cache);
1490
1491         cache_purge(&rsc_cache);
1492         cache_unregister(&rsc_cache);
1493 }