Whamcloud - gitweb
Revert my use of the configure_flags macro in the lustre.spec file and just
[fs/lustre-release.git] / lustre / sec / gss / svcsec_gss.c
index f40544e..fedaafd 100644 (file)
@@ -105,10 +105,11 @@ static inline unsigned long hash_mem(char *buf, int length, int bits)
 
 struct rsi {
         struct cache_head       h;
+        __u32                   lustre_svc;
         __u32                   naltype;
         __u32                   netid;
         __u64                   nid;
-        rawobj_t                in_handle, in_token;
+        rawobj_t                in_handle, in_token, in_srv_type;
         rawobj_t                out_handle, out_token;
         int                     major_status, minor_status;
 };
@@ -127,7 +128,9 @@ static void rsi_free(struct rsi *rsii)
 static void rsi_put(struct cache_head *item, struct cache_detail *cd)
 {
         struct rsi *rsii = container_of(item, struct rsi, h);
+        LASSERT(atomic_read(&item->refcnt) > 0);
         if (cache_put(item, cd)) {
+                LASSERT(item->next == NULL);
                 rsi_free(rsii);
                 OBD_FREE(rsii, sizeof(*rsii));
         }
@@ -135,8 +138,8 @@ static void rsi_put(struct cache_head *item, struct cache_detail *cd)
 
 static inline int rsi_hash(struct rsi *item)
 {
-        return hash_mem(item->in_handle.data, item->in_handle.len, RSI_HASHBITS)
-              ^ hash_mem(item->in_token.data, item->in_token.len, RSI_HASHBITS);
+        return hash_mem((char *)item->in_handle.data, item->in_handle.len, RSI_HASHBITS)
+                ^ hash_mem((char *)item->in_token.data, item->in_token.len, RSI_HASHBITS);
 }
 
 static inline int rsi_match(struct rsi *item, struct rsi *tmp)
@@ -151,6 +154,8 @@ static void rsi_request(struct cache_detail *cd,
 {
         struct rsi *rsii = container_of(h, struct rsi, h);
 
+        qword_addhex(bpp, blen, (char *) &rsii->lustre_svc,
+                     sizeof(rsii->lustre_svc));
         qword_addhex(bpp, blen, (char *) &rsii->naltype, sizeof(rsii->naltype));
         qword_addhex(bpp, blen, (char *) &rsii->netid, sizeof(rsii->netid));
         qword_addhex(bpp, blen, (char *) &rsii->nid, sizeof(rsii->nid));
@@ -177,6 +182,7 @@ gssd_reply(struct rsi *item)
                         tmp->h.next = NULL;
                         rsi_cache.entries--;
                         if (test_bit(CACHE_VALID, &tmp->h.flags)) {
+                                CERROR("rsi is valid\n");
                                 write_unlock(&rsi_cache.hash_lock);
                                 rsi_put(&tmp->h, &rsi_cache);
                                 RETURN(-EINVAL);
@@ -229,18 +235,19 @@ gssd_upcall(struct rsi *item, struct cache_req *chandle)
                         return tmp;
                 }
         }
-        // cache_get(&item->h);
+        cache_get(&item->h);
         set_bit(CACHE_HASHED, &item->h.flags);
         item->h.next = *head;
         *head = &item->h;
         rsi_cache.entries++;
         read_unlock(&rsi_cache.hash_lock);
-        cache_get(&item->h);
+        //cache_get(&item->h);
 
         cache_check(&rsi_cache, &item->h, chandle);
         starttime = get_seconds();
         do {
-                yield();
+                set_current_state(TASK_UNINTERRUPTIBLE);
+                schedule_timeout(HZ/2);
                 read_lock(&rsi_cache.hash_lock);
                 for (hp = head; *hp != NULL; hp = &tmp->h.next) {
                         tmp = container_of(*hp, struct rsi, h);
@@ -261,8 +268,9 @@ gssd_upcall(struct rsi *item, struct cache_req *chandle)
                         }
                 }
                 read_unlock(&rsi_cache.hash_lock);
-        } while ((get_seconds() - starttime) <= 5);
-        CERROR("5s timeout while waiting cache refill\n");
+        } while ((get_seconds() - starttime) <= SVCSEC_UPCALL_TIMEOUT);
+        CERROR("%ds timeout while waiting cache refill\n",
+               SVCSEC_UPCALL_TIMEOUT);
         return NULL;
 }
 
@@ -279,69 +287,67 @@ static int rsi_parse(struct cache_detail *cd,
         ENTRY;
 
         OBD_ALLOC(rsii, sizeof(*rsii));
-        if (!rsii) {
-                CERROR("failed to alloc rsii\n");
+        if (!rsii)
                 RETURN(-ENOMEM);
-        }
         cache_init(&rsii->h);
 
         /* handle */
         len = qword_get(&mesg, buf, mlen);
         if (len < 0)
                 goto out;
-        status = -ENOMEM;
-        if (rawobj_alloc(&rsii->in_handle, buf, len))
+        if (rawobj_alloc(&rsii->in_handle, buf, len)) {
+                status = -ENOMEM;
                 goto out;
+        }
 
         /* token */
         len = qword_get(&mesg, buf, mlen);
-        status = -EINVAL;
         if (len < 0)
-                goto out;;
-        status = -ENOMEM;
-        if (rawobj_alloc(&rsii->in_token, buf, len))
                 goto out;
+        if (rawobj_alloc(&rsii->in_token, buf, len)) {
+                status = -ENOMEM;
+                goto out;
+        }
 
         /* expiry */
         expiry = get_expiry(&mesg);
-        status = -EINVAL;
         if (expiry == 0)
                 goto out;
 
-        /* major/minor */
+        /* major */
         len = qword_get(&mesg, buf, mlen);
-        if (len < 0)
+        if (len <= 0)
                 goto out;
-        if (len == 0) {
+        rsii->major_status = simple_strtol(buf, &ep, 10);
+        if (*ep)
                 goto out;
-        } else {
-                rsii->major_status = simple_strtoul(buf, &ep, 10);
-                if (*ep)
-                        goto out;
-                len = qword_get(&mesg, buf, mlen);
-                if (len <= 0)
-                        goto out;
-                rsii->minor_status = simple_strtoul(buf, &ep, 10);
-                if (*ep)
-                        goto out;
 
-                /* out_handle */
-                len = qword_get(&mesg, buf, mlen);
-                if (len < 0)
-                        goto out;
+        /* minor */
+        len = qword_get(&mesg, buf, mlen);
+        if (len <= 0)
+                goto out;
+        rsii->minor_status = simple_strtol(buf, &ep, 10);
+        if (*ep)
+                goto out;
+
+        /* out_handle */
+        len = qword_get(&mesg, buf, mlen);
+        if (len < 0)
+                goto out;
+        if (rawobj_alloc(&rsii->out_handle, buf, len)) {
                 status = -ENOMEM;
-                if (rawobj_alloc(&rsii->out_handle, buf, len))
-                        goto out;
+                goto out;
+        }
 
-                /* out_token */
-                len = qword_get(&mesg, buf, mlen);
-                status = -EINVAL;
-                if (len < 0)
-                        goto out;
+        /* out_token */
+        len = qword_get(&mesg, buf, mlen);
+        if (len < 0)
+                goto out;
+        if (rawobj_alloc(&rsii->out_token, buf, len)) {
                 status = -ENOMEM;
-                if (rawobj_alloc(&rsii->out_token, buf, len))
-                        goto out;
+                goto out;
         }
+
         rsii->h.expiry_time = expiry;
         status = gssd_reply(rsii);
 out:
@@ -384,7 +390,9 @@ struct gss_svc_seq_data {
 struct rsc {
         struct cache_head       h;
         rawobj_t                handle;
-        __u32                   remote_realm;
+        __u32                   remote_realm:1,
+                                auth_usr_mds:1,
+                                auth_usr_oss:1;
         struct vfs_cred         cred;
         uid_t                   mapped_uid;
         struct gss_svc_seq_data seqdata;
@@ -409,7 +417,9 @@ static void rsc_put(struct cache_head *item, struct cache_detail *cd)
 {
         struct rsc *rsci = container_of(item, struct rsc, h);
 
+        LASSERT(atomic_read(&item->refcnt) > 0);
         if (cache_put(item, cd)) {
+                LASSERT(item->next == NULL);
                 rsc_free(rsci);
                 OBD_FREE(rsci, sizeof(*rsci));
         }
@@ -418,7 +428,8 @@ static void rsc_put(struct cache_head *item, struct cache_detail *cd)
 static inline int
 rsc_hash(struct rsc *rsci)
 {
-        return hash_mem(rsci->handle.data, rsci->handle.len, RSC_HASHBITS);
+        return hash_mem((char *)rsci->handle.data,
+                        rsci->handle.len, RSC_HASHBITS);
 }
 
 static inline int
@@ -443,9 +454,8 @@ static struct rsc *rsc_lookup(struct rsc *item, int set)
                 if (!rsc_match(tmp, item))
                         continue;
                 cache_get(&tmp->h);
-                if (!set) {
+                if (!set)
                         goto out_noset;
-                }
                 *hp = tmp->h.next;
                 tmp->h.next = NULL;
                 clear_bit(CACHE_HASHED, &tmp->h.flags);
@@ -454,7 +464,7 @@ static struct rsc *rsc_lookup(struct rsc *item, int set)
         }
         /* Didn't find anything */
         if (!set)
-                goto out_noset;
+                goto out_nada;
         rsc_cache.entries++;
 out_set:
         set_bit(CACHE_HASHED, &item->h.flags);
@@ -464,17 +474,20 @@ out_set:
         cache_fresh(&rsc_cache, &item->h, item->h.expiry_time);
         cache_get(&item->h);
         RETURN(item);
+out_nada:
+        tmp = NULL;
 out_noset:
         read_unlock(&rsc_cache.hash_lock);
         RETURN(tmp);
 }
-                                                                                                                        
+
 static int rsc_parse(struct cache_detail *cd,
                      char *mesg, int mlen)
 {
-        /* contexthandle expiry [ uid gid N <n gids> mechname ...mechdata... ] */
+        /* contexthandle expiry [ uid gid N <n gids> mechname
+         * ...mechdata... ] */
         char *buf = mesg;
-        int len, rv;
+        int len, rv, tmp_int;
         struct rsc *rsci, *res = NULL;
         time_t expiry;
         int status = -EINVAL;
@@ -500,21 +513,38 @@ static int rsc_parse(struct cache_detail *cd,
                 goto out;
 
         /* remote flag */
-        rv = get_int(&mesg, &rsci->remote_realm);
+        rv = get_int(&mesg, &tmp_int);
         if (rv) {
                 CERROR("fail to get remote flag\n");
                 goto out;
         }
+        rsci->remote_realm = (tmp_int != 0);
+
+        /* mds user flag */
+        rv = get_int(&mesg, &tmp_int);
+        if (rv) {
+                CERROR("fail to get mds user flag\n");
+                goto out;
+        }
+        rsci->auth_usr_mds = (tmp_int != 0);
+
+        /* oss user flag */
+        rv = get_int(&mesg, &tmp_int);
+        if (rv) {
+                CERROR("fail to get oss user flag\n");
+                goto out;
+        }
+        rsci->auth_usr_oss = (tmp_int != 0);
 
         /* mapped uid */
-        rv = get_int(&mesg, &rsci->mapped_uid);
+        rv = get_int(&mesg, (int *)&rsci->mapped_uid);
         if (rv) {
                 CERROR("fail to get mapped uid\n");
                 goto out;
         }
 
         /* uid, or NEGATIVE */
-        rv = get_int(&mesg, &rsci->cred.vc_uid);
+        rv = get_int(&mesg, (int *)&rsci->cred.vc_uid);
         if (rv == -EINVAL)
                 goto out;
         if (rv == -ENOENT) {
@@ -526,7 +556,7 @@ static int rsc_parse(struct cache_detail *cd,
                 __u64 ctx_expiry;
 
                 /* gid */
-                if (get_int(&mesg, &rsci->cred.vc_gid))
+                if (get_int(&mesg, (int *)&rsci->cred.vc_gid))
                         goto out;
 
                 /* mech name */
@@ -546,7 +576,7 @@ static int rsc_parse(struct cache_detail *cd,
                         goto out;
                 }
                 tmp_buf.len = len;
-                tmp_buf.data = buf;
+                tmp_buf.data = (unsigned char *)buf;
                 if (kgss_import_sec_context(&tmp_buf, gm, &rsci->mechctx)) {
                         kgss_mech_put(gm);
                         goto out;
@@ -561,7 +591,7 @@ static int rsc_parse(struct cache_detail *cd,
                         kgss_mech_put(gm);
                         goto out;
                 }
-                expiry = (time_t) ctx_expiry;
+                expiry = (time_t) gss_roundup_expire_time(ctx_expiry);
 
                 kgss_mech_put(gm);
         }
@@ -588,24 +618,31 @@ static void rsc_flush(uid_t uid)
         int n;
         ENTRY;
 
+        if (uid == -1)
+                CWARN("flush all gss contexts\n");
+
         write_lock(&rsc_cache.hash_lock);
         for (n = 0; n < RSC_HASHMAX; n++) {
                 for (ch = &rsc_cache.hash_table[n]; *ch;) {
                         rscp = container_of(*ch, struct rsc, h);
-                        if (uid == -1 || rscp->cred.vc_uid == uid) {
-                                /* it seems simply set NEGATIVE doesn't work */
-                                *ch = (*ch)->next;
-                                rscp->h.next = NULL;
-                                cache_get(&rscp->h);
-                                set_bit(CACHE_NEGATIVE, &rscp->h.flags);
-                                clear_bit(CACHE_HASHED, &rscp->h.flags);
-                                CWARN("flush rsc %p for uid %u\n",
-                                       rscp, rscp->cred.vc_uid);
-                                rsc_put(&rscp->h, &rsc_cache);
-                                rsc_cache.entries--;
+
+                        if (uid != -1 && rscp->cred.vc_uid != uid) {
+                                ch = &((*ch)->next);
                                 continue;
                         }
-                        ch = &((*ch)->next);
+
+                        /* it seems simply set NEGATIVE doesn't work */
+                        *ch = (*ch)->next;
+                        rscp->h.next = NULL;
+                        cache_get(&rscp->h);
+                        set_bit(CACHE_NEGATIVE, &rscp->h.flags);
+                        clear_bit(CACHE_HASHED, &rscp->h.flags);
+                        if (uid != -1)
+                                CWARN("flush rsc %p(%u) for uid %u\n", rscp,
+                                      *((__u32 *) rscp->handle.data),
+                                      rscp->cred.vc_uid);
+                        rsc_put(&rscp->h, &rsc_cache);
+                        rsc_cache.entries--;
                 }
         }
         write_unlock(&rsc_cache.hash_lock);
@@ -637,17 +674,6 @@ gss_svc_searchbyctx(rawobj_t *handle)
         return found;
 }
 
-struct gss_svc_data {
-        /* decoded gss client cred: */
-        struct rpc_gss_wire_cred        clcred;
-        /* internal used status */
-        unsigned int                    is_init:1,
-                                        is_init_continue:1,
-                                        is_err_notify:1,
-                                        is_fini:1;
-        int                             reserve_len;
-};
-
 /* FIXME
  * again hacking: only try to give the svcgssd a chance to handle
  * upcalls.
@@ -660,6 +686,38 @@ struct cache_deferred_req* my_defer(struct cache_req *req)
 static struct cache_req my_chandle = {my_defer};
 
 /* Implements sequence number algorithm as specified in RFC 2203. */
+static inline void __dbg_dump_seqwin(struct gss_svc_seq_data *sd)
+{
+        char buf[sizeof(sd->sd_win)*2+1];
+        int i;
+
+        for (i = 0; i < sizeof(sd->sd_win); i++)
+                sprintf(&buf[i+i], "%02x", ((__u8 *) sd->sd_win)[i]);
+        CWARN("dump seqwin: %s\n", buf);
+}
+
+static inline void __dbg_seq_jump(struct gss_svc_seq_data *sd, __u32 seq_num)
+{
+        CWARN("seq jump to %u, cur max %u!\n", seq_num, sd->sd_max);
+        __dbg_dump_seqwin(sd);
+}
+
+static inline void __dbg_seq_increase(struct gss_svc_seq_data *sd, __u32 seq_num)
+{
+        int n = seq_num - sd->sd_max;
+        int i, notset=0;
+
+        for (i = 0; i < n; i++) {
+                if (!test_bit(i, sd->sd_win))
+                        notset++;
+        }
+        if (!notset)
+                return;
+
+        CWARN("seq increase to %u, cur max %u\n", seq_num, sd->sd_max);
+        __dbg_dump_seqwin(sd);
+}
+
 static int
 gss_check_seq_num(struct gss_svc_seq_data *sd, __u32 seq_num)
 {
@@ -668,9 +726,11 @@ gss_check_seq_num(struct gss_svc_seq_data *sd, __u32 seq_num)
         spin_lock(&sd->sd_lock);
         if (seq_num > sd->sd_max) {
                 if (seq_num >= sd->sd_max + GSS_SEQ_WIN) {
+                        __dbg_seq_jump(sd, seq_num);
                         memset(sd->sd_win, 0, sizeof(sd->sd_win));
                         sd->sd_max = seq_num;
                 } else {
+                        __dbg_seq_increase(sd, seq_num);
                         while(sd->sd_max < seq_num) {
                                 sd->sd_max++;
                                 __clear_bit(sd->sd_max % GSS_SEQ_WIN,
@@ -718,7 +778,7 @@ gss_svc_verify_request(struct ptlrpc_request *req,
         msg.data = (__u8 *)req->rq_reqmsg;
 
         mic.len = le32_to_cpu(*vp++);
-        mic.data = (char *) vp;
+        mic.data = (unsigned char *)vp;
         vlen -= 4;
 
         if (mic.len > vlen) {
@@ -739,8 +799,9 @@ gss_svc_verify_request(struct ptlrpc_request *req,
         }
 
         if (gss_check_seq_num(&rsci->seqdata, gc->gc_seq)) {
-                CERROR("discard request %p with old seq_num %u\n",
-                        req, gc->gc_seq);
+                CERROR("discard replayed request %p(o%u,x"LPU64",t"LPU64")\n",
+                        req, req->rq_reqmsg->opc, req->rq_xid,
+                        req->rq_reqmsg->transno);
                 RETURN(GSS_S_DUPLICATE_TOKEN);
         }
 
@@ -785,8 +846,9 @@ gss_svc_unseal_request(struct ptlrpc_request *req,
         }
 
         if (gss_check_seq_num(&rsci->seqdata, gc->gc_seq)) {
-                CERROR("discard request %p with old seq_num %u\n",
-                        req, gc->gc_seq);
+                CERROR("discard replayed request %p(o%u,x"LPU64",t"LPU64")\n",
+                        req, req->rq_reqmsg->opc, req->rq_xid,
+                        req->rq_reqmsg->transno);
                 RETURN(GSS_S_DUPLICATE_TOKEN);
         }
 
@@ -802,7 +864,7 @@ static int
 gss_pack_err_notify(struct ptlrpc_request *req,
                     __u32 major, __u32 minor)
 {
-        struct gss_svc_data *svcdata = req->rq_sec_svcdata;
+        struct gss_svc_data *svcdata = req->rq_svcsec_data;
         __u32 reslen, *resp, *reslenp;
         char  nidstr[PTL_NALFMT_SIZE];
         const __u32 secdata_len = 7 * 4;
@@ -827,8 +889,8 @@ gss_pack_err_notify(struct ptlrpc_request *req,
         resp = (__u32 *) req->rq_reply_state->rs_repbuf;
 
         /* header */
-        *resp++ = cpu_to_le32(PTLRPC_SEC_GSS);
-        *resp++ = cpu_to_le32(PTLRPC_SEC_TYPE_NONE);
+        *resp++ = cpu_to_le32(PTLRPCS_FLVR_GSS_NONE);
+        *resp++ = cpu_to_le32(PTLRPCS_SVC_NONE);
         *resp++ = cpu_to_le32(req->rq_replen);
         reslenp = resp++;
 
@@ -841,8 +903,8 @@ gss_pack_err_notify(struct ptlrpc_request *req,
          * obj1(fake), obj2(fake)
          */
         *resp++ = cpu_to_le32(PTLRPC_SEC_GSS_VERSION);
-        *resp++ = cpu_to_le32(PTLRPC_SEC_GSS_KRB5I);
-        *resp++ = cpu_to_le32(PTLRPC_GSS_PROC_ERR);
+        *resp++ = cpu_to_le32(PTLRPCS_FLVR_KRB5I);
+        *resp++ = cpu_to_le32(PTLRPCS_GSS_PROC_ERR);
         *resp++ = cpu_to_le32(major);
         *resp++ = cpu_to_le32(minor);
         *resp++ = 0;
@@ -852,19 +914,37 @@ gss_pack_err_notify(struct ptlrpc_request *req,
         *reslenp = cpu_to_le32(secdata_len);
 
         req->rq_reply_state->rs_repdata_len += (secdata_len);
-        CWARN("prepare gss error notify(0x%x/0x%x) to %s\n", major, minor,
+        CDEBUG(D_SEC, "prepare gss error notify(0x%x/0x%x) to %s\n",
+               major, minor,
                portals_nid2str(req->rq_peer.peer_ni->pni_number,
                                req->rq_peer.peer_id.nid, nidstr));
         RETURN(0);
 }
 
+static void dump_cache_head(struct cache_head *h)
+{
+        CWARN("ref %d, fl %lx, n %p, t %ld, %ld\n",
+              atomic_read(&h->refcnt), h->flags, h->next,
+              h->expiry_time, h->last_refresh);
+}
+static void dump_rsi(struct rsi *rsi)
+{
+        CWARN("dump rsi %p\n", rsi);
+        dump_cache_head(&rsi->h);
+        CWARN("%x,%x,%llx\n", rsi->naltype, rsi->netid, rsi->nid);
+        CWARN("len %d, d %p\n", rsi->in_handle.len, rsi->in_handle.data);
+        CWARN("len %d, d %p\n", rsi->in_token.len, rsi->in_token.data);
+        CWARN("len %d, d %p\n", rsi->out_handle.len, rsi->out_handle.data);
+        CWARN("len %d, d %p\n", rsi->out_token.len, rsi->out_token.data);
+}
+
 static int
 gss_svcsec_handle_init(struct ptlrpc_request *req,
                        struct rpc_gss_wire_cred *gc,
                        __u32 *secdata, __u32 seclen,
                        enum ptlrpcs_error *res)
 {
-        struct gss_svc_data *svcdata = req->rq_sec_svcdata;
+        struct gss_svc_data *svcdata = req->rq_svcsec_data;
         struct rsc          *rsci;
         struct rsi          *rsikey, *rsip;
         rawobj_t             tmpobj;
@@ -875,7 +955,7 @@ gss_svcsec_handle_init(struct ptlrpc_request *req,
 
         LASSERT(svcdata);
 
-        CWARN("processing gss init(%d) request from %s\n", gc->gc_proc,
+        CDEBUG(D_SEC, "processing gss init(%d) request from %s\n", gc->gc_proc,
                portals_nid2str(req->rq_peer.peer_ni->pni_number,
                                req->rq_peer.peer_id.nid, nidstr));
 
@@ -896,10 +976,21 @@ gss_svcsec_handle_init(struct ptlrpc_request *req,
         }
         cache_init(&rsikey->h);
 
+        /* obtain lustre svc type */
+        if (seclen < 4) {
+                CERROR("sec size %d too small\n", seclen);
+                GOTO(out_rsikey, rc = SVC_DROP);
+        }
+        rsikey->lustre_svc = le32_to_cpu(*secdata++);
+        seclen -= 4;
+
+        /* duplicate context handle. currently always 0 */
         if (rawobj_dup(&rsikey->in_handle, &gc->gc_ctx)) {
                 CERROR("fail to dup context handle\n");
                 GOTO(out_rsikey, rc = SVC_DROP);
         }
+
+        /* extract token */
         *res = PTLRPCS_BADVERF;
         if (rawobj_extract(&tmpobj, &secdata, &seclen)) {
                 CERROR("can't extract token\n");
@@ -917,27 +1008,31 @@ gss_svcsec_handle_init(struct ptlrpc_request *req,
         rsip = gssd_upcall(rsikey, &my_chandle);
         if (!rsip) {
                 CERROR("error in gssd_upcall.\n");
-                GOTO(out_rsikey, rc = SVC_DROP);
+
+                rc = SVC_COMPLETE;
+                if (gss_pack_err_notify(req, GSS_S_FAILURE, 0))
+                        rc = SVC_DROP;
+
+                GOTO(out_rsikey, rc);
         }
 
         rsci = gss_svc_searchbyctx(&rsip->out_handle);
         if (!rsci) {
                 CERROR("rsci still not mature yet?\n");
 
+                rc = SVC_COMPLETE;
                 if (gss_pack_err_notify(req, GSS_S_FAILURE, 0))
                         rc = SVC_DROP;
-                else
-                        rc = SVC_COMPLETE;
 
                 GOTO(out_rsip, rc);
         }
-        CWARN("svcsec create gss context %p(%u@%s)\n",
+        CDEBUG(D_SEC, "svcsec create gss context %p(%u@%s)\n",
                rsci, rsci->cred.vc_uid,
                portals_nid2str(req->rq_peer.peer_ni->pni_number,
                                req->rq_peer.peer_id.nid, nidstr));
 
         svcdata->is_init = 1;
-        svcdata->reserve_len = 6 * 4 +
+        svcdata->reserve_len = 7 * 4 +
                 size_round4(rsip->out_handle.len) +
                 size_round4(rsip->out_token.len);
 
@@ -950,28 +1045,35 @@ gss_svcsec_handle_init(struct ptlrpc_request *req,
 
         /* header */
         resp = (__u32 *) req->rq_reply_state->rs_repbuf;
-        *resp++ = cpu_to_le32(PTLRPC_SEC_GSS);
-        *resp++ = cpu_to_le32(PTLRPC_SEC_TYPE_NONE);
+        *resp++ = cpu_to_le32(PTLRPCS_FLVR_GSS_NONE);
+        *resp++ = cpu_to_le32(PTLRPCS_SVC_NONE);
         *resp++ = cpu_to_le32(req->rq_replen);
         reslenp = resp++;
 
         resp += req->rq_replen / 4;
         reslen = svcdata->reserve_len;
 
-        /* gss reply:
-         * status, major, minor, seq, out_handle, out_token
+        /* gss reply: (conform to err notify format)
+         * x, x, seq, major, minor, handle, token
          */
-        *resp++ = cpu_to_le32(PTLRPCS_OK);
+        *resp++ = 0;
+        *resp++ = 0;
+        *resp++ = cpu_to_le32(GSS_SEQ_WIN);
         *resp++ = cpu_to_le32(rsip->major_status);
         *resp++ = cpu_to_le32(rsip->minor_status);
-        *resp++ = cpu_to_le32(GSS_SEQ_WIN);
-        reslen -= (4 * 4);
+        reslen -= (5 * 4);
         if (rawobj_serialize(&rsip->out_handle,
-                             &resp, &reslen))
+                             &resp, &reslen)) {
+                dump_rsi(rsip);
+                dump_rsi(rsikey);
                 LBUG();
+        }
         if (rawobj_serialize(&rsip->out_token,
-                             &resp, &reslen))
+                             &resp, &reslen)) {
+                dump_rsi(rsip);
+                dump_rsi(rsikey);
                 LBUG();
+        }
         /* the actual sec data length */
         *reslenp = cpu_to_le32(svcdata->reserve_len - reslen);
 
@@ -983,10 +1085,23 @@ gss_svcsec_handle_init(struct ptlrpc_request *req,
 
         *res = PTLRPCS_OK;
 
-        req->rq_auth_uid = rsci->cred.vc_uid;
         req->rq_remote_realm = rsci->remote_realm;
+        req->rq_auth_usr_mds = rsci->auth_usr_mds;
+        req->rq_auth_usr_oss = rsci->auth_usr_oss;
+        req->rq_auth_uid = rsci->cred.vc_uid;
         req->rq_mapped_uid = rsci->mapped_uid;
 
+        if (req->rq_auth_usr_mds) {
+                CWARN("usr from %s authenticated as mds svc cred\n",
+                portals_nid2str(req->rq_peer.peer_ni->pni_number,
+                                req->rq_peer.peer_id.nid, nidstr));
+        }
+        if (req->rq_auth_usr_oss) {
+                CWARN("usr from %s authenticated as oss svc cred\n",
+                portals_nid2str(req->rq_peer.peer_ni->pni_number,
+                                req->rq_peer.peer_id.nid, nidstr));
+        }
+
         /* This is simplified since right now we doesn't support
          * INIT_CONTINUE yet.
          */
@@ -1035,7 +1150,7 @@ gss_svcsec_handle_data(struct ptlrpc_request *req,
         }
 
         switch (gc->gc_svc) {
-        case PTLRPC_GSS_SVC_INTEGRITY:
+        case PTLRPCS_GSS_SVC_INTEGRITY:
                 major = gss_svc_verify_request(req, rsci, gc, secdata, seclen);
                 if (major == GSS_S_COMPLETE)
                         break;
@@ -1044,7 +1159,7 @@ gss_svcsec_handle_data(struct ptlrpc_request *req,
                        portals_nid2str(req->rq_peer.peer_ni->pni_number,
                                        req->rq_peer.peer_id.nid, nidstr));
                 goto notify_err;
-        case PTLRPC_GSS_SVC_PRIVACY:
+        case PTLRPCS_GSS_SVC_PRIVACY:
                 major = gss_svc_unseal_request(req, rsci, gc, secdata, seclen);
                 if (major == GSS_S_COMPLETE)
                         break;
@@ -1058,8 +1173,10 @@ gss_svcsec_handle_data(struct ptlrpc_request *req,
                 GOTO(out, rc = SVC_DROP);
         }
 
-        req->rq_auth_uid = rsci->cred.vc_uid;
         req->rq_remote_realm = rsci->remote_realm;
+        req->rq_auth_usr_mds = rsci->auth_usr_mds;
+        req->rq_auth_usr_oss = rsci->auth_usr_oss;
+        req->rq_auth_uid = rsci->cred.vc_uid;
         req->rq_mapped_uid = rsci->mapped_uid;
 
         *res = PTLRPCS_OK;
@@ -1082,7 +1199,7 @@ gss_svcsec_handle_destroy(struct ptlrpc_request *req,
                           __u32 *secdata, __u32 seclen,
                           enum ptlrpcs_error *res)
 {
-        struct gss_svc_data *svcdata = req->rq_sec_svcdata;
+        struct gss_svc_data *svcdata = req->rq_svcsec_data;
         struct rsc          *rsci;
         char                 nidstr[PTL_NALFMT_SIZE];
         int                  rc;
@@ -1097,7 +1214,7 @@ gss_svcsec_handle_destroy(struct ptlrpc_request *req,
                 RETURN(SVC_DROP);
         }
 
-        if (gc->gc_svc != PTLRPC_GSS_SVC_INTEGRITY) {
+        if (gc->gc_svc != PTLRPCS_GSS_SVC_INTEGRITY) {
                 CERROR("service %d is not supported in destroy.\n",
                         gc->gc_svc);
                 GOTO(out, rc = SVC_DROP);
@@ -1112,7 +1229,7 @@ gss_svcsec_handle_destroy(struct ptlrpc_request *req,
         if (lustre_pack_reply(req, 0, NULL, NULL))
                 GOTO(out, rc = SVC_DROP);
 
-        CWARN("svcsec destroy gss context %p(%u@%s)\n",
+        CDEBUG(D_SEC, "svcsec destroy gss context %p(%u@%s)\n",
                rsci, rsci->cred.vc_uid,
                portals_nid2str(req->rq_peer.peer_ni->pni_number,
                                req->rq_peer.peer_id.nid, nidstr));
@@ -1139,7 +1256,7 @@ gss_svcsec_accept(struct ptlrpc_request *req, enum ptlrpcs_error *res)
         struct gss_svc_data *svcdata;
         struct rpc_gss_wire_cred *gc;
         struct ptlrpcs_wire_hdr *sec_hdr;
-        __u32 seclen, *secdata, version, subflavor;
+        __u32 subflavor, seclen, *secdata, version;
         int rc;
         ENTRY;
 
@@ -1150,7 +1267,7 @@ gss_svcsec_accept(struct ptlrpc_request *req, enum ptlrpcs_error *res)
         *res = PTLRPCS_BADCRED;
 
         sec_hdr = buf_to_sec_hdr(req->rq_reqbuf);
-        LASSERT(sec_hdr->flavor == PTLRPC_SEC_GSS);
+        LASSERT(SEC_FLAVOR_MAJOR(sec_hdr->flavor) == PTLRPCS_FLVR_MAJOR_GSS);
 
         seclen = req->rq_reqbuf_len - sizeof(*sec_hdr) - sec_hdr->msg_len;
         secdata = (__u32 *) buf_to_sec_data(req->rq_reqbuf);
@@ -1166,13 +1283,13 @@ gss_svcsec_accept(struct ptlrpc_request *req, enum ptlrpcs_error *res)
                 RETURN(SVC_DROP);
         }
 
-        LASSERT(!req->rq_sec_svcdata);
+        LASSERT(!req->rq_svcsec_data);
         OBD_ALLOC(svcdata, sizeof(*svcdata));
         if (!svcdata) {
                 CERROR("fail to alloc svcdata\n");
                 RETURN(SVC_DROP);
         }
-        req->rq_sec_svcdata = svcdata;
+        req->rq_svcsec_data = svcdata;
         gc = &svcdata->clcred;
 
         /* Now secdata/seclen is what we want to parse
@@ -1185,7 +1302,8 @@ gss_svcsec_accept(struct ptlrpc_request *req, enum ptlrpcs_error *res)
         seclen -= 5 * 4;
 
         CDEBUG(D_SEC, "wire gss_hdr: %u/%u/%u/%u/%u\n",
-               version, subflavor, gc->gc_proc, gc->gc_seq, gc->gc_svc);
+               version, subflavor, gc->gc_proc,
+               gc->gc_seq, gc->gc_svc);
 
         if (version != PTLRPC_SEC_GSS_VERSION) {
                 CERROR("gss version mismatch: %d - %d\n",
@@ -1193,7 +1311,11 @@ gss_svcsec_accept(struct ptlrpc_request *req, enum ptlrpcs_error *res)
                 GOTO(err_free, rc = SVC_DROP);
         }
 
-        if (rawobj_extract(&gc->gc_ctx, &secdata, &seclen)) {
+        /* We _must_ alloc new storage for gc_ctx. In case of recovery
+         * request will be saved to delayed handling, at that time the
+         * incoming buffer might have already been released.
+         */
+        if (rawobj_extract_alloc(&gc->gc_ctx, &secdata, &seclen)) {
                 CERROR("fail to obtain gss context handle\n");
                 GOTO(err_free, rc = SVC_DROP);
         }
@@ -1216,9 +1338,9 @@ gss_svcsec_accept(struct ptlrpc_request *req, enum ptlrpcs_error *res)
         }
 
 err_free:
-        if (rc == SVC_DROP && req->rq_sec_svcdata) {
-                OBD_FREE(req->rq_sec_svcdata, sizeof(struct gss_svc_data));
-                req->rq_sec_svcdata = NULL;
+        if (rc == SVC_DROP && req->rq_svcsec_data) {
+                OBD_FREE(req->rq_svcsec_data, sizeof(struct gss_svc_data));
+                req->rq_svcsec_data = NULL;
         }
 
         RETURN(rc);
@@ -1228,7 +1350,7 @@ static int
 gss_svcsec_authorize(struct ptlrpc_request *req)
 {
         struct ptlrpc_reply_state *rs = req->rq_reply_state;
-        struct gss_svc_data *gsd = (struct gss_svc_data *)req->rq_sec_svcdata;
+        struct gss_svc_data *gsd = (struct gss_svc_data *)req->rq_svcsec_data;
         struct rpc_gss_wire_cred  *gc = &gsd->clcred;
         struct rsc                *rscp;
         struct ptlrpcs_wire_hdr   *sec_hdr;
@@ -1257,13 +1379,14 @@ gss_svcsec_authorize(struct ptlrpc_request *req)
 
         rscp = gss_svc_searchbyctx(&gc->gc_ctx);
         if (!rscp) {
-                CERROR("ctx disapeared under us?\n");
+                CERROR("ctx %u disapeared under us\n",
+                       *((__u32 *) gc->gc_ctx.data));
                 RETURN(-EINVAL);
         }
 
         sec_hdr = (struct ptlrpcs_wire_hdr *) rs->rs_repbuf;
         switch (gc->gc_svc) {
-        case  PTLRPC_GSS_SVC_INTEGRITY:
+        case  PTLRPCS_GSS_SVC_INTEGRITY:
                 /* prepare various pointers */
                 lmsg.len = req->rq_replen;
                 lmsg.data = (__u8 *) (rs->rs_repbuf + sizeof(*sec_hdr));
@@ -1271,23 +1394,22 @@ gss_svcsec_authorize(struct ptlrpc_request *req)
                 vlen = rs->rs_repbuf_len - sizeof(*sec_hdr) - lmsg.len;
                 seclen = vlen;
 
-                sec_hdr->flavor = cpu_to_le32(PTLRPC_SEC_GSS);
-                sec_hdr->sectype = cpu_to_le32(PTLRPC_SEC_TYPE_AUTH);
+                sec_hdr->flavor = cpu_to_le32(PTLRPCS_FLVR_GSS_AUTH);
                 sec_hdr->msg_len = cpu_to_le32(req->rq_replen);
 
                 /* standard gss hdr */
                 LASSERT(vlen >= 7 * 4);
                 *vp++ = cpu_to_le32(PTLRPC_SEC_GSS_VERSION);
-                *vp++ = cpu_to_le32(PTLRPC_SEC_GSS_KRB5I);
+                *vp++ = cpu_to_le32(PTLRPCS_FLVR_KRB5I);
                 *vp++ = cpu_to_le32(RPC_GSS_PROC_DATA);
                 *vp++ = cpu_to_le32(gc->gc_seq);
-                *vp++ = cpu_to_le32(PTLRPC_GSS_SVC_INTEGRITY);
+                *vp++ = cpu_to_le32(PTLRPCS_GSS_SVC_INTEGRITY);
                 *vp++ = 0;      /* fake ctx handle */
                 vpsave = vp++;  /* reserve size */
                 vlen -= 7 * 4;
 
                 mic.len = vlen;
-                mic.data = (char *) vp;
+                mic.data = (unsigned char *)vp;
 
                 major = kgss_get_mic(rscp->mechctx, 0, &lmsg, &mic);
                 if (major) {
@@ -1299,22 +1421,21 @@ gss_svcsec_authorize(struct ptlrpc_request *req)
                 sec_hdr->sec_len = cpu_to_le32(seclen);
                 rs->rs_repdata_len += size_round(seclen);
                 break;
-        case  PTLRPC_GSS_SVC_PRIVACY:
+        case  PTLRPCS_GSS_SVC_PRIVACY:
                 vp = (__u32 *) (rs->rs_repbuf + sizeof(*sec_hdr));
                 vlen = rs->rs_repbuf_len - sizeof(*sec_hdr);
                 seclen = vlen;
 
-                sec_hdr->flavor = cpu_to_le32(PTLRPC_SEC_GSS);
-                sec_hdr->sectype = cpu_to_le32(PTLRPC_SEC_TYPE_PRIV);
+                sec_hdr->flavor = cpu_to_le32(PTLRPCS_FLVR_GSS_PRIV);
                 sec_hdr->msg_len = cpu_to_le32(0);
 
                 /* standard gss hdr */
                 LASSERT(vlen >= 7 * 4);
                 *vp++ = cpu_to_le32(PTLRPC_SEC_GSS_VERSION);
-                *vp++ = cpu_to_le32(PTLRPC_SEC_GSS_KRB5I);
+                *vp++ = cpu_to_le32(PTLRPCS_FLVR_KRB5I);
                 *vp++ = cpu_to_le32(RPC_GSS_PROC_DATA);
                 *vp++ = cpu_to_le32(gc->gc_seq);
-                *vp++ = cpu_to_le32(PTLRPC_GSS_SVC_PRIVACY);
+                *vp++ = cpu_to_le32(PTLRPCS_GSS_SVC_PRIVACY);
                 *vp++ = 0;      /* fake ctx handle */
                 vpsave = vp++;  /* reserve size */
                 vlen -= 7 * 4;
@@ -1355,18 +1476,18 @@ static
 void gss_svcsec_cleanup_req(struct ptlrpc_svcsec *svcsec,
                             struct ptlrpc_request *req)
 {
-        struct gss_svc_data *gsd = (struct gss_svc_data *) req->rq_sec_svcdata;
+        struct gss_svc_data *gsd = (struct gss_svc_data *) req->rq_svcsec_data;
 
         if (!gsd) {
                 CDEBUG(D_SEC, "no svc_data present. do nothing\n");
                 return;
         }
 
-        /* gsd->clclred.gc_ctx is NOT allocated, just set pointer
-         * to the incoming packet buffer, so don't need free it
-         */
+        /* gc_ctx is allocated, see gss_svcsec_accept() */
+        rawobj_free(&gsd->clcred.gc_ctx);
+
         OBD_FREE(gsd, sizeof(*gsd));
-        req->rq_sec_svcdata = NULL;
+        req->rq_svcsec_data = NULL;
         return;
 }
 
@@ -1375,7 +1496,7 @@ int gss_svcsec_est_payload(struct ptlrpc_svcsec *svcsec,
                            struct ptlrpc_request *req,
                            int msgsize)
 {
-        struct gss_svc_data *svcdata = req->rq_sec_svcdata;
+        struct gss_svc_data *svcdata = req->rq_svcsec_data;
         ENTRY;
 
         /* just return the pre-set reserve_len for init/fini/err cases.
@@ -1397,10 +1518,10 @@ int gss_svcsec_est_payload(struct ptlrpc_svcsec *svcsec,
                 CDEBUG(D_SEC, "is_fini, reserver size 0\n");
                 RETURN(0);
         } else {
-                if (svcdata->clcred.gc_svc == PTLRPC_GSS_SVC_NONE ||
-                    svcdata->clcred.gc_svc == PTLRPC_GSS_SVC_INTEGRITY)
+                if (svcdata->clcred.gc_svc == PTLRPCS_GSS_SVC_NONE ||
+                    svcdata->clcred.gc_svc == PTLRPCS_GSS_SVC_INTEGRITY)
                         RETURN(size_round(GSS_MAX_AUTH_PAYLOAD));
-                else if (svcdata->clcred.gc_svc == PTLRPC_GSS_SVC_PRIVACY)
+                else if (svcdata->clcred.gc_svc == PTLRPCS_GSS_SVC_PRIVACY)
                         RETURN(size_round16(GSS_MAX_AUTH_PAYLOAD + msgsize +
                                             GSS_PRIVBUF_PREFIX_LEN +
                                             GSS_PRIVBUF_SUFFIX_LEN));
@@ -1417,7 +1538,7 @@ int gss_svcsec_alloc_repbuf(struct ptlrpc_svcsec *svcsec,
                             struct ptlrpc_request *req,
                             int msgsize)
 {
-        struct gss_svc_data *gsd = (struct gss_svc_data *) req->rq_sec_svcdata;
+        struct gss_svc_data *gsd = (struct gss_svc_data *) req->rq_svcsec_data;
         struct ptlrpc_reply_state *rs;
         int msg_payload, sec_payload;
         int privacy, rc;
@@ -1430,7 +1551,7 @@ int gss_svcsec_alloc_repbuf(struct ptlrpc_svcsec *svcsec,
         LASSERT(gsd);
         if (!gsd->is_init && !gsd->is_init_continue &&
             !gsd->is_fini && !gsd->is_err_notify &&
-            gsd->clcred.gc_svc == PTLRPC_GSS_SVC_PRIVACY)
+            gsd->clcred.gc_svc == PTLRPCS_GSS_SVC_PRIVACY)
                 privacy = 1;
         else
                 privacy = 0;
@@ -1501,8 +1622,8 @@ void gss_svcsec_free_repbuf(struct ptlrpc_svcsec *svcsec,
 
 struct ptlrpc_svcsec svcsec_gss = {
         .pss_owner              = THIS_MODULE,
-        .pss_name               = "GSS_SVCSEC",
-        .pss_flavor             = {PTLRPC_SEC_GSS, 0},
+        .pss_name               = "svcsec.gss",
+        .pss_flavor             = PTLRPCS_FLVR_MAJOR_GSS,
         .accept                 = gss_svcsec_accept,
         .authorize              = gss_svcsec_authorize,
         .alloc_repbuf           = gss_svcsec_alloc_repbuf,
@@ -1539,8 +1660,19 @@ int gss_svc_init(void)
 void gss_svc_exit(void)
 {
         int rc;
-        if ((rc = cache_unregister(&rsi_cache)))
-                CERROR("unregister rsi cache: %d\n", rc);
+
+        /* XXX rsi didn't take module refcount. without really
+         * cleanup it we can't simply go, later user-space operations
+         * will certainly cause oops.
+         * use space might slow or stuck on something, wait it for
+         * a bit -- bad hack.
+         */
+        while ((rc = cache_unregister(&rsi_cache))) {
+                CERROR("unregister rsi cache: %d. Try again\n", rc);
+                schedule_timeout(2 * HZ);
+                cache_purge(&rsi_cache);
+        }
+
         if ((rc = cache_unregister(&rsc_cache)))
                 CERROR("unregister rsc cache: %d\n", rc);
         if ((rc = svcsec_unregister(&svcsec_gss)))