Whamcloud - gitweb
adjust gss upcall timeout value just be secinit timeout, service gss upcall
[fs/lustre-release.git] / lustre / sec / gss / sec_gss.c
index d82e3ae..33b4291 100644 (file)
@@ -76,22 +76,17 @@ struct rpc_clnt;
 #include "gss_internal.h"
 #include "gss_api.h"
 
-#define GSS_CREDCACHE_EXPIRE    (60)               /* 1 minute */
-#define GSS_CRED_EXPIRE         (8 * 60 * 60)      /* 8 hours */
-#define GSS_CRED_SIGN_SIZE      (1024)
-#define GSS_CRED_VERIFY_SIZE    (56)
-
 #define LUSTRE_PIPEDIR          "/lustre"
 
+#define GSS_CREDCACHE_EXPIRE    (30 * 60)          /* 30 minute */
+
 /**********************************************
  * gss security init/fini helper              *
  **********************************************/
 
-#define SECINIT_RPC_TIMEOUT     (10)
-#define SECFINI_RPC_TIMEOUT     (10)
-
 static int secinit_compose_request(struct obd_import *imp,
                                    char *buf, int bufsize,
+                                   int lustre_srv,
                                    uid_t uid, gid_t gid,
                                    long token_size,
                                    char __user *token)
@@ -99,7 +94,7 @@ static int secinit_compose_request(struct obd_import *imp,
         struct ptlrpcs_wire_hdr *hdr;
         struct lustre_msg       *lmsg;
         struct mds_req_sec_desc *secdesc;
-        __u32                    size = sizeof(*secdesc);
+        int                      size = sizeof(*secdesc);
         __u32                    lmsg_size, *p;
         int                      rc;
 
@@ -112,10 +107,9 @@ static int secinit_compose_request(struct obd_import *imp,
 
         /* security wire hdr */
         hdr = buf_to_sec_hdr(buf);
-        hdr->flavor  = cpu_to_le32(PTLRPC_SEC_GSS);
-        hdr->sectype = cpu_to_le32(PTLRPC_SEC_TYPE_NONE);
+        hdr->flavor  = cpu_to_le32(PTLRPCS_FLVR_GSS_NONE);
         hdr->msg_len = cpu_to_le32(lmsg_size);
-        hdr->sec_len = cpu_to_le32(7 * 4 + token_size);
+        hdr->sec_len = cpu_to_le32(8 * 4 + token_size);
 
         /* lustre message & secdesc */
         lmsg = buf_to_lustre_msg(buf);
@@ -136,12 +130,15 @@ static int secinit_compose_request(struct obd_import *imp,
 
         /* gss hdr */
         *p++ = cpu_to_le32(PTLRPC_SEC_GSS_VERSION);     /* gss version */
-        *p++ = cpu_to_le32(PTLRPC_SEC_GSS_KRB5I);       /* subflavor */
-        *p++ = cpu_to_le32(PTLRPC_GSS_PROC_INIT);       /* proc */
+        *p++ = cpu_to_le32(PTLRPCS_FLVR_KRB5I);         /* subflavor */
+        *p++ = cpu_to_le32(PTLRPCS_GSS_PROC_INIT);      /* proc */
         *p++ = cpu_to_le32(0);                          /* seq */
-        *p++ = cpu_to_le32(PTLRPC_GSS_SVC_NONE);        /* service */
+        *p++ = cpu_to_le32(PTLRPCS_GSS_SVC_NONE);       /* service */
         *p++ = cpu_to_le32(0);                          /* context handle */
 
+        /* plus lustre svc type */
+        *p++ = cpu_to_le32(lustre_srv);
+
         /* now the token part */
         *p++ = cpu_to_le32((__u32) token_size);
         LASSERT(((char *)p - buf) + token_size <= bufsize);
@@ -171,7 +168,6 @@ static int secinit_parse_reply(char *repbuf, int replen,
         }
 
         hdr->flavor = le32_to_cpu(hdr->flavor);
-        hdr->sectype = le32_to_cpu(hdr->sectype);
         hdr->msg_len = le32_to_cpu(hdr->msg_len);
         hdr->sec_len = le32_to_cpu(hdr->sec_len);
 
@@ -179,8 +175,7 @@ static int secinit_parse_reply(char *repbuf, int replen,
         sec_len = le32_to_cpu(p[3]);
 
         /* sanity checks */
-        if (hdr->flavor != PTLRPC_SEC_GSS ||
-            hdr->sectype != PTLRPC_SEC_TYPE_NONE) {
+        if (hdr->flavor != PTLRPCS_FLVR_GSS_NONE) {
                 CERROR("unexpected reply\n");
                 return -EINVAL;
         }
@@ -197,10 +192,12 @@ static int secinit_parse_reply(char *repbuf, int replen,
         p = (__u32 *) buf_to_sec_data(repbuf);
         effective = 0;
 
-        status = le32_to_cpu(*p++);
+        p += 2; /* skip the leading unused bytes */
+        seq = le32_to_cpu(*p++);
         major = le32_to_cpu(*p++);
         minor = le32_to_cpu(*p++);
-        seq = le32_to_cpu(*p++);
+        status = 0;
+
         effective += 4 * 4;
 
         if (copy_to_user(outbuf, &status, 4))
@@ -245,6 +242,7 @@ static int secinit_parse_reply(char *repbuf, int replen,
 struct lgssd_ioctl_param {
         int             version;        /* in   */
         char           *uuid;           /* in   */
+        int             lustre_svc;     /* in   */
         uid_t           uid;            /* in   */
         gid_t           gid;            /* in   */
         long            send_token_size;/* in   */
@@ -258,6 +256,7 @@ struct lgssd_ioctl_param {
 static int gss_send_secinit_rpc(__user char *buffer, unsigned long count)
 {
         struct obd_import        *imp;
+        struct ptlrpc_request    *request = NULL;
         struct lgssd_ioctl_param  param;
         const int                 reqbuf_size = 1024;
         const int                 repbuf_size = 1024;
@@ -268,11 +267,14 @@ static int gss_send_secinit_rpc(__user char *buffer, unsigned long count)
         int                       rc, reqlen, replen;
 
         if (count != sizeof(param)) {
-                CERROR("partial write\n");
+                CERROR("ioctl size %lu, expect %d, please check lgssd version\n",
+                        count, sizeof(param));
                 RETURN(-EINVAL);
         }
-        if (copy_from_user(&param, buffer, sizeof(param)))
+        if (copy_from_user(&param, buffer, sizeof(param))) {
+                CERROR("failed copy data from lgssd\n");
                 RETURN(-EFAULT);
+        }
 
         if (param.version != GSSD_INTERFACE_VERSION) {
                 CERROR("gssd interface version %d (expect %d)\n",
@@ -292,11 +294,6 @@ static int gss_send_secinit_rpc(__user char *buffer, unsigned long count)
                 CERROR("no such obd %s\n", obdname);
                 RETURN(-EINVAL);
         }
-        if (strcmp(obd->obd_type->typ_name, "mdc") &&
-            strcmp(obd->obd_type->typ_name, "osc")) {
-                CERROR("%s not a mdc/osc device\n", obdname);
-                RETURN(-EINVAL);
-        }
 
         imp = class_import_get(obd->u.cli.cl_import);
 
@@ -305,69 +302,62 @@ static int gss_send_secinit_rpc(__user char *buffer, unsigned long count)
 
         if (!reqbuf || !repbuf) {
                 CERROR("Can't alloc buffer: %p/%p\n", reqbuf, repbuf);
-                GOTO(out_free, rc = -ENOMEM);
+                param.status = -ENOMEM;
+                goto out_copy;
         }
 
         /* get token */
         reqlen = secinit_compose_request(imp, reqbuf, reqbuf_size,
+                                         param.lustre_svc,
                                          param.uid, param.gid,
                                          param.send_token_size,
                                          param.send_token);
-        if (reqlen < 0)
-                GOTO(out_free, rc = reqlen);
+        if (reqlen < 0) {
+                param.status = reqlen;
+                goto out_copy;
+        }
 
-        replen = repbuf_size;
-        rc = ptlrpc_do_rawrpc(imp, reqbuf, reqlen,
-                              repbuf, &replen, SECINIT_RPC_TIMEOUT);
-        if (rc)
-                GOTO(out_free, rc);
+        request = ptl_do_rawrpc(imp, reqbuf, reqbuf_size, reqlen,
+                                repbuf, repbuf_size, &replen,
+                                SECINIT_RPC_TIMEOUT, &rc);
+        if (request == NULL || rc) {
+                param.status = rc;
+                goto out_copy;
+        }
 
         if (replen > param.reply_buf_size) {
                 CERROR("output buffer size %ld too small, need %d\n",
                         param.reply_buf_size, replen);
-                GOTO(out_free, rc = -EINVAL);
+                param.status = -EINVAL;
+                goto out_copy;
         }
 
         lsize = secinit_parse_reply(repbuf, replen,
                                     param.reply_buf, param.reply_buf_size);
-        if (lsize < 0)
-                GOTO(out_free, rc = (int)lsize);
+        if (lsize < 0) {
+                param.status = (int) lsize;
+                goto out_copy;
+        }
 
         param.status = 0;
         param.reply_length = lsize;
 
+out_copy:
         if (copy_to_user(buffer, &param, sizeof(param)))
                 rc = -EFAULT;
         else
                 rc = 0;
-out_free:
-        class_import_put(imp);
-        if (repbuf)
-                OBD_FREE(repbuf, repbuf_size);
-        if (reqbuf)
-                OBD_FREE(reqbuf, reqbuf_size);
-        RETURN(rc);
-}
 
-static int gss_send_secfini_rpc(struct obd_import *imp,
-                                char *reqbuf, int reqlen)
-{
-        const int repbuf_size = 1024;
-        char *repbuf;
-        int replen = repbuf_size;
-        int rc;
-
-        OBD_ALLOC(repbuf, repbuf_size);
-        if (!repbuf) {
-                CERROR("Out of memory\n");
-                return -ENOMEM;
+        class_import_put(imp);
+        if (request == NULL) {
+                if (repbuf)
+                        OBD_FREE(repbuf, repbuf_size);
+                if (reqbuf)
+                        OBD_FREE(reqbuf, reqbuf_size);
+        } else {
+                rawrpc_req_finished(request);
         }
-
-        rc = ptlrpc_do_rawrpc(imp, reqbuf, reqlen, repbuf, &replen,
-                              SECFINI_RPC_TIMEOUT);
-
-        OBD_FREE(repbuf, repbuf_size);
-        return rc;
+        RETURN(rc);
 }
 
 /**********************************************
@@ -376,19 +366,14 @@ static int gss_send_secfini_rpc(struct obd_import *imp,
 struct gss_sec {
         struct ptlrpc_sec       gs_base;
         struct gss_api_mech    *gs_mech;
-#ifdef __KERNEL__
         spinlock_t              gs_lock;
         struct list_head        gs_upcalls;
-        char                    gs_pipepath[64];
+        char                   *gs_pipepath;
         struct dentry          *gs_depipe;
-#endif
 };
 
-#ifdef __KERNEL__
-
-static rwlock_t gss_ctx_lock = RW_LOCK_UNLOCKED;
-
 struct gss_upcall_msg_data {
+        __u64                           gum_pag;
         __u32                           gum_uid;
         __u32                           gum_svc;
         __u32                           gum_nal;
@@ -406,6 +391,8 @@ struct gss_upcall_msg {
         struct gss_upcall_msg_data      gum_data;
 };
 
+#ifdef __KERNEL__
+static rwlock_t gss_ctx_lock = RW_LOCK_UNLOCKED;
 /**********************************************
  * rpc_pipe upcall helpers                    *
  **********************************************/
@@ -422,6 +409,20 @@ void gss_release_msg(struct gss_upcall_msg *gmsg)
                 return;
         }
         LASSERT(list_empty(&gmsg->gum_list));
+#if 0
+        LASSERT(list_empty(&gmsg->gum_base.list));
+#else
+        /* XXX */
+        if (!list_empty(&gmsg->gum_base.list)) {
+                int error = gmsg->gum_base.errno;
+                
+                CWARN("msg %p: list: %p/%p/%p, copied %d, err %d, wq %d\n",
+                      gmsg, &gmsg->gum_base.list, gmsg->gum_base.list.prev,
+                      gmsg->gum_base.list.next, gmsg->gum_base.copied, error,
+                      list_empty(&gmsg->gum_waitq.task_list));
+                LBUG();
+        }
+#endif
         OBD_FREE(gmsg, sizeof(*gmsg));
         EXIT;
 }
@@ -429,24 +430,15 @@ void gss_release_msg(struct gss_upcall_msg *gmsg)
 static void
 gss_unhash_msg_nolock(struct gss_upcall_msg *gmsg)
 {
-        ENTRY;
-        if (list_empty(&gmsg->gum_list)) {
-                EXIT;
+        LASSERT_SPIN_LOCKED(&gmsg->gum_gsec->gs_lock);
+
+        if (list_empty(&gmsg->gum_list))
                 return;
-        }
-        /* FIXME should not do this. when we in upper upcall queue,
-         * downcall will call unhash_msg, thus later put_msg might
-         * free msg buffer while it's not dequeued XXX */
-        list_del_init(&gmsg->gum_base.list);
-        /* FIXME */
 
         list_del_init(&gmsg->gum_list);
         wake_up(&gmsg->gum_waitq);
+        LASSERT(atomic_read(&gmsg->gum_refcount) > 1);
         atomic_dec(&gmsg->gum_refcount);
-        CDEBUG(D_SEC, "gmsg %p refcount now %d\n",
-               gmsg, atomic_read(&gmsg->gum_refcount));
-        LASSERT(atomic_read(&gmsg->gum_refcount) > 0);
-        EXIT;
 }
 
 static void
@@ -467,11 +459,14 @@ struct gss_upcall_msg * gss_find_upcall(struct gss_sec *gsec,
         struct gss_upcall_msg *gmsg;
         ENTRY;
 
+        LASSERT_SPIN_LOCKED(&gsec->gs_lock);
+
         list_for_each_entry(gmsg, &gsec->gs_upcalls, gum_list) {
                 if (memcmp(&gmsg->gum_data, gmd, sizeof(*gmd)))
                         continue;
                 if (strcmp(gmsg->gum_obdname, obdname))
                         continue;
+                LASSERT(atomic_read(&gmsg->gum_refcount) > 0);
                 atomic_inc(&gmsg->gum_refcount);
                 CDEBUG(D_SEC, "found gmsg at %p: obdname %s, uid %d, ref %d\n",
                        gmsg, obdname, gmd->gum_uid,
@@ -497,14 +492,19 @@ static void gss_init_upcall_msg(struct gss_upcall_msg *gmsg,
         memcpy(&gmsg->gum_data, gmd, sizeof(*gmd));
 
         rpcmsg = &gmsg->gum_base;
+        INIT_LIST_HEAD(&rpcmsg->list);
         rpcmsg->data = &gmsg->gum_data;
         rpcmsg->len = sizeof(gmsg->gum_data);
+        rpcmsg->copied = 0;
+        rpcmsg->errno = 0;
         EXIT;
 }
 #endif /* __KERNEL__ */
 
+/* this seems to be used only from userspace code */
+#ifndef __KERNEL__
 /********************************************
- * gss cred manupulation helpers            *
+ * gss cred manipulation helpers            *
  ********************************************/
 static
 int gss_cred_is_uptodate_ctx(struct ptlrpc_cred *cred)
@@ -513,14 +513,17 @@ int gss_cred_is_uptodate_ctx(struct ptlrpc_cred *cred)
         int res = 0;
 
         read_lock(&gss_ctx_lock);
-        if ((cred->pc_flags & PTLRPC_CRED_UPTODATE) && gcred->gc_ctx)
+        if (((cred->pc_flags & PTLRPC_CRED_FLAGS_MASK) ==
+             PTLRPC_CRED_UPTODATE) &&
+            gcred->gc_ctx)
                 res = 1;
         read_unlock(&gss_ctx_lock);
         return res;
 }
+#endif
 
 static inline
-struct gss_cl_ctx * gss_get_ctx(struct gss_cl_ctx *ctx)
+struct gss_cl_ctx *gss_get_ctx(struct gss_cl_ctx *ctx)
 {
         atomic_inc(&ctx->gc_refcount);
         return ctx;
@@ -580,12 +583,13 @@ void gss_cred_set_ctx(struct ptlrpc_cred *cred, struct gss_cl_ctx *ctx)
         write_lock(&gss_ctx_lock);
         old = gcred->gc_ctx;
         gcred->gc_ctx = ctx;
-        cred->pc_flags |= PTLRPC_CRED_UPTODATE;
+        set_bit(PTLRPC_CRED_UPTODATE_BIT, &cred->pc_flags);
         write_unlock(&gss_ctx_lock);
         if (old)
                 gss_put_ctx(old);
 
-        CWARN("client refreshed gss cred %p(uid %u)\n", cred, cred->pc_uid);
+        CDEBUG(D_SEC, "client refreshed gss cred %p(uid %u)\n",
+               cred, cred->pc_uid);
         EXIT;
 }
 
@@ -615,15 +619,16 @@ int gss_parse_init_downcall(struct gss_api_mech *gm, rawobj_t *buf,
                             struct gss_cl_ctx **gc,
                             struct gss_upcall_msg_data *gmd, int *gss_err)
 {
-        char *p = buf->data;
-        __u32 len = buf->len;
+        char *p = (char *)buf->data;
         struct gss_cl_ctx *ctx;
-        rawobj_t tmp_buf;
+        __u32 len = buf->len;
         unsigned int timeout;
-        int err = -EIO;
+        rawobj_t tmp_buf;
+        int err = -EPERM;
         ENTRY;
 
         *gc = NULL;
+        *gss_err = 0;
 
         OBD_ALLOC(ctx, sizeof(*ctx));
         if (!ctx)
@@ -634,46 +639,59 @@ int gss_parse_init_downcall(struct gss_api_mech *gm, rawobj_t *buf,
         spin_lock_init(&ctx->gc_seq_lock);
         atomic_set(&ctx->gc_refcount,1);
 
+        if (simple_get_bytes(&p, &len, &gmd->gum_pag, sizeof(gmd->gum_pag)))
+                goto err_free_ctx;
         if (simple_get_bytes(&p, &len, &gmd->gum_uid, sizeof(gmd->gum_uid)))
-                GOTO(err_free_ctx, err);
+                goto err_free_ctx;
         if (simple_get_bytes(&p, &len, &gmd->gum_svc, sizeof(gmd->gum_svc)))
-                GOTO(err_free_ctx, err);
+                goto err_free_ctx;
         if (simple_get_bytes(&p, &len, &gmd->gum_nal, sizeof(gmd->gum_nal)))
-                GOTO(err_free_ctx, err);
+                goto err_free_ctx;
         if (simple_get_bytes(&p, &len, &gmd->gum_netid, sizeof(gmd->gum_netid)))
-                GOTO(err_free_ctx, err);
+                goto err_free_ctx;
         if (simple_get_bytes(&p, &len, &gmd->gum_nid, sizeof(gmd->gum_nid)))
-                GOTO(err_free_ctx, err);
+                goto err_free_ctx;
         /* FIXME: discarded timeout for now */
         if (simple_get_bytes(&p, &len, &timeout, sizeof(timeout)))
-                GOTO(err_free_ctx, err);
-        *gss_err = 0;
+                goto err_free_ctx;
         if (simple_get_bytes(&p, &len, &ctx->gc_win, sizeof(ctx->gc_win)))
-                GOTO(err_free_ctx, err);
-        /* gssd signals an error by passing ctx->gc_win = 0: */
+                goto err_free_ctx;
+
+        /* lgssd signals an error by passing ctx->gc_win = 0: */
         if (!ctx->gc_win) {
-                /* in which case the next int is an error code: */
-                if (simple_get_bytes(&p, &len, gss_err, sizeof(*gss_err)))
-                        GOTO(err_free_ctx, err);
-                if (*gss_err == 0) {
-                        CERROR("error downcall pass no gss error\n");
-                        GOTO(err_free_ctx, err);
+                /* in which case the next 2 int are:
+                 * - rpc error
+                 * - gss error
+                 */
+                if (simple_get_bytes(&p, &len, &err, sizeof(err))) {
+                        err = -EPERM;
+                        goto err_free_ctx;
+                }
+                if (simple_get_bytes(&p, &len, gss_err, sizeof(*gss_err))) {
+                        err = -EPERM;
+                        goto err_free_ctx;
                 }
-                GOTO(err_free_ctx, err = 0);
+                if (err == 0 && *gss_err == 0) {
+                        CERROR("no error passed from downcall\n");
+                        err = -EPERM;
+                }
+                goto err_free_ctx;
         }
+
         if (rawobj_extract_local(&tmp_buf, (__u32 **) ((void *)&p), &len))
-                GOTO(err_free_ctx, err);
+                goto err_free_ctx;
         if (rawobj_dup(&ctx->gc_wire_ctx, &tmp_buf)) {
-                GOTO(err_free_ctx, err = -ENOMEM);
+                err = -ENOMEM;
+                goto err_free_ctx;
         }
         if (rawobj_extract_local(&tmp_buf, (__u32 **) ((void *)&p), &len))
-                GOTO(err_free_wire_ctx, err);
+                goto err_free_wire_ctx;
         if (len) {
                 CERROR("unexpected trailing %u bytes\n", len);
-                GOTO(err_free_wire_ctx, err);
+                goto err_free_wire_ctx;
         }
         if (kgss_import_sec_context(&tmp_buf, gm, &ctx->gc_gss_ctx))
-                GOTO(err_free_wire_ctx, err);
+                goto err_free_wire_ctx;
 
         *gc = ctx;
         RETURN(0);
@@ -691,7 +709,6 @@ err_free_ctx:
  * cred APIs                           *
  ***************************************/
 #ifdef __KERNEL__
-#define CRED_REFRESH_UPCALL_TIMEOUT     (20)
 static int gss_cred_refresh(struct ptlrpc_cred *cred)
 {
         struct obd_import          *import;
@@ -701,11 +718,13 @@ static int gss_cred_refresh(struct ptlrpc_cred *cred)
         struct dentry              *dentry;
         char                       *obdname, *obdtype;
         wait_queue_t                wait;
-        uid_t                       uid = cred->pc_uid;
         int                         res;
         ENTRY;
 
-        if (ptlrpcs_cred_is_uptodate(cred))
+        might_sleep();
+
+        /* any flags means it has been handled, do nothing */
+        if (cred->pc_flags & PTLRPC_CRED_FLAGS_MASK)
                 RETURN(0);
 
         LASSERT(cred->pc_sec);
@@ -718,16 +737,17 @@ static int gss_cred_refresh(struct ptlrpc_cred *cred)
                 RETURN(-EINVAL);
         }
 
-        gmd.gum_uid = uid;
+        gmd.gum_pag = cred->pc_pag;
+        gmd.gum_uid = cred->pc_uid;
         gmd.gum_nal = import->imp_connection->c_peer.peer_ni->pni_number;
         gmd.gum_netid = 0;
         gmd.gum_nid = import->imp_connection->c_peer.peer_id.nid;
 
         obdtype = import->imp_obd->obd_type->typ_name;
-        if (!strcmp(obdtype, "mdc"))
-                gmd.gum_svc = 0;
-        else if (!strcmp(obdtype, "osc"))
-                gmd.gum_svc = 1;
+        if (!strcmp(obdtype, OBD_MDC_DEVICENAME))
+                gmd.gum_svc = LUSTRE_GSS_SVC_MDS;
+        else if (!strcmp(obdtype, OBD_OSC_DEVICENAME))
+                gmd.gum_svc = LUSTRE_GSS_SVC_OSS;
         else {
                 CERROR("gss on %s?\n", obdtype);
                 RETURN(-EINVAL);
@@ -739,35 +759,39 @@ static int gss_cred_refresh(struct ptlrpc_cred *cred)
         gss_new = NULL;
         res = 0;
 
-        CWARN("Initiate gss context %p(%u@%s)\n",
+        CDEBUG(D_SEC, "Initiate gss context %p(%u@%s)\n",
                container_of(cred, struct gss_cred, gc_base),
-               uid, import->imp_target_uuid.uuid);
+               cred->pc_uid, import->imp_target_uuid.uuid);
 
 again:
         spin_lock(&gsec->gs_lock);
         gss_msg = gss_find_upcall(gsec, obdname, &gmd);
         if (gss_msg) {
-                spin_unlock(&gsec->gs_lock);
+                if (gss_new) {
+                        OBD_FREE(gss_new, sizeof(*gss_new));
+                        gss_new = NULL;
+                }
                 GOTO(waiting, res);
         }
+
         if (!gss_new) {
                 spin_unlock(&gsec->gs_lock);
                 OBD_ALLOC(gss_new, sizeof(*gss_new));
-                if (!gss_new) {
-                        CERROR("fail to alloc memory\n");
+                if (!gss_new)
                         RETURN(-ENOMEM);
-                }
                 goto again;
         }
         /* so far we'v created gss_new */
         gss_init_upcall_msg(gss_new, gsec, obdname, &gmd);
 
-        if (gss_cred_is_uptodate_ctx(cred)) {
-                /* someone else had done it for us, simply cancel
-                 * our own upcall */
-                CDEBUG(D_SEC, "cred("LPU64"/%u) has been refreshed by someone "
-                       "else, simply drop our request\n",
-                       cred->pc_pag, cred->pc_uid);
+        /* we'v created upcall msg, nobody else should touch the
+         * flag of this cred, unless be set as dead/expire by
+         * administrator via lctl etc.
+         */
+        if (cred->pc_flags & PTLRPC_CRED_FLAGS_MASK) {
+                CWARN("cred %p("LPU64"/%u) was set flags %lx unexpectedly\n",
+                      cred, cred->pc_pag, cred->pc_uid, cred->pc_flags);
+                cred->pc_flags |= PTLRPC_CRED_DEAD | PTLRPC_CRED_ERROR;
                 gss_unhash_msg_nolock(gss_new);
                 spin_unlock(&gsec->gs_lock);
                 gss_release_msg(gss_new);
@@ -781,36 +805,62 @@ again:
                 CERROR("rpc_queue_upcall failed: %d\n", res);
                 gss_unhash_msg(gss_new);
                 gss_release_msg(gss_new);
+                cred->pc_flags |= PTLRPC_CRED_DEAD | PTLRPC_CRED_ERROR;
                 RETURN(res);
         }
         gss_msg = gss_new;
+        spin_lock(&gsec->gs_lock);
 
 waiting:
+        /* upcall might finish quickly */
+        if (list_empty(&gss_msg->gum_list)) {
+                spin_unlock(&gsec->gs_lock);
+                res = 0;
+                goto out;
+        }
+
         init_waitqueue_entry(&wait, current);
-        spin_lock(&gsec->gs_lock);
-        add_wait_queue(&gss_msg->gum_waitq, &wait);
         set_current_state(TASK_INTERRUPTIBLE);
+        add_wait_queue(&gss_msg->gum_waitq, &wait);
         spin_unlock(&gsec->gs_lock);
 
-        res = schedule_timeout(CRED_REFRESH_UPCALL_TIMEOUT * HZ);
+        if (gss_new)
+                res = schedule_timeout(CRED_REFRESH_UPCALL_TIMEOUT * HZ);
+        else {
+                schedule();
+                res = 0;
+        }
 
         remove_wait_queue(&gss_msg->gum_waitq, &wait);
+
+        /* - the one who refresh the cred for us should also be responsible
+         *   to set the status of cred, we can simply return.
+         * - if cred flags has been set, we also don't need to do that again,
+         *   no matter signal pending or timeout etc.
+         */
+        if (!gss_new || cred->pc_flags & PTLRPC_CRED_FLAGS_MASK)
+                goto out;
+
         if (signal_pending(current)) {
-                CERROR("interrupted gss upcall: cred %p\n", cred);
+                CERROR("%s: cred %p: interrupted upcall\n",
+                       current->comm, cred);
+                cred->pc_flags |= PTLRPC_CRED_DEAD | PTLRPC_CRED_ERROR;
                 res = -EINTR;
         } else if (res == 0) {
-                CERROR("gss upcall timeout: cred %p\n", cred);
+                CERROR("cred %p: upcall timedout\n", cred);
+                set_bit(PTLRPC_CRED_DEAD_BIT, &cred->pc_flags);
                 res = -ETIMEDOUT;
         } else
                 res = 0;
 
+out:
         gss_release_msg(gss_msg);
+
         RETURN(res);
 }
 #else /* !__KERNEL__ */
 extern int lgss_handle_krb5_upcall(uid_t uid, __u32 dest_ip,
-                                   char *obd_name,
-                                   char *buf, int bufsize,
+                                   char *obd_name, char *buf, int bufsize,
                                    int (*callback)(char*, unsigned long));
 
 static int gss_cred_refresh(struct ptlrpc_cred *cred)
@@ -821,11 +871,11 @@ static int gss_cred_refresh(struct ptlrpc_cred *cred)
         struct gss_sec         *gsec;
         struct gss_api_mech    *mech;
         struct gss_cl_ctx      *ctx = NULL;
-        struct vfs_cred         vcred = { 0 };
         ptl_nid_t               peer_nid;
         __u32                   dest_ip;
         __u32                   subflavor;
         int                     rc, gss_err;
+        struct gss_upcall_msg_data gmd = { 0 };
 
         LASSERT(cred);
         LASSERT(cred->pc_sec);
@@ -838,9 +888,9 @@ static int gss_cred_refresh(struct ptlrpc_cred *cred)
         imp = cred->pc_sec->ps_import;
         peer_nid = imp->imp_connection->c_peer.peer_id.nid;
         dest_ip = (__u32) (peer_nid & 0xFFFFFFFF);
-        subflavor = cred->pc_sec->ps_flavor.subflavor;
+        subflavor = cred->pc_sec->ps_flavor;
 
-        if (subflavor != PTLRPC_SEC_GSS_KRB5I) {
+        if (subflavor != PTLRPCS_SUBFLVR_KRB5I) {
                 CERROR("unknown subflavor %u\n", subflavor);
                 GOTO(err_out, rc = -EINVAL);
         }
@@ -859,16 +909,15 @@ static int gss_cred_refresh(struct ptlrpc_cred *cred)
         gsec = container_of(cred->pc_sec, struct gss_sec, gs_base);
         mech = gsec->gs_mech;
         LASSERT(mech);
-        rc = gss_parse_init_downcall(mech, &obj, &ctx, &vcred, &dest_ip,
-                                     &gss_err);
-        if (rc) {
-                CERROR("parse init downcall error %d\n", rc);
-                goto err_out;
-        }
 
-        if (gss_err) {
-                CERROR("cred fresh got gss error %x\n", gss_err);
-                rc = -EINVAL;
+        rc = gss_parse_init_downcall(mech, &obj, &ctx, &gmd,
+                                     &gss_err);
+        if (rc || gss_err) {
+                CERROR("parse init downcall: rpc %d, gss 0x%x\n", rc, gss_err);
+                if (rc != -ERESTART || gss_err != 0)
+                        set_bit(PTLRPC_CRED_ERROR_BIT, &cred->pc_flags);
+                if (rc == 0)
+                        rc = -EPERM;
                 goto err_out;
         }
 
@@ -878,13 +927,12 @@ static int gss_cred_refresh(struct ptlrpc_cred *cred)
 
         return 0;
 err_out:
-        cred->pc_flags |= PTLRPC_CRED_DEAD;
+        set_bit(PTLRPC_CRED_DEAD_BIT, &cred->pc_flags);
         return rc;
 }
 #endif
 
 static int gss_cred_match(struct ptlrpc_cred *cred,
-                          struct ptlrpc_request *req,
                           struct vfs_cred *vcred)
 {
         RETURN(cred->pc_pag == vcred->vc_pag);
@@ -931,10 +979,10 @@ static int gss_cred_sign(struct ptlrpc_cred *cred,
         spin_unlock(&ctx->gc_seq_lock);
 
         *vp++ = cpu_to_le32(PTLRPC_SEC_GSS_VERSION);    /* version */
-        *vp++ = cpu_to_le32(PTLRPC_SEC_GSS_KRB5I);      /* subflavor */
+        *vp++ = cpu_to_le32(PTLRPCS_FLVR_KRB5I);        /* subflavor */
         *vp++ = cpu_to_le32(ctx->gc_proc);              /* proc */
         *vp++ = cpu_to_le32(seqnum);                    /* seq */
-        *vp++ = cpu_to_le32(PTLRPC_GSS_SVC_INTEGRITY);  /* service */
+        *vp++ = cpu_to_le32(PTLRPCS_GSS_SVC_INTEGRITY); /* service */
         vlen -= 5 * 4;
 
         if (rawobj_serialize(&ctx->gc_wire_ctx, &vp, &vlen)) {
@@ -947,13 +995,14 @@ static int gss_cred_sign(struct ptlrpc_cred *cred,
         vlen -= 4;
 
         mic.len = vlen;
-        mic.data = (char *) vp;
+        mic.data = (unsigned char *)vp;
 
         CDEBUG(D_SEC, "reqbuf at %p, lmsg at %p, len %d, mic at %p, len %d\n",
                req->rq_reqbuf, lmsg.data, lmsg.len, mic.data, mic.len);
         major = kgss_get_mic(ctx->gc_gss_ctx, GSS_C_QOP_DEFAULT, &lmsg, &mic);
         if (major) {
-                CERROR("gss compute mic error, major %x\n", major);
+                CERROR("cred %p: req %p compute mic error, major %x\n",
+                       cred, req, major);
                 rc = -EACCES;
                 goto out;
         }
@@ -1002,10 +1051,10 @@ static int gss_cred_verify(struct ptlrpc_cred *cred,
         vlen -= 3 * 4;
 
         switch (proc) {
-        case PTLRPC_GSS_PROC_DATA:
+        case PTLRPCS_GSS_PROC_DATA:
                 seq = le32_to_cpu(*vp++);
                 svc = le32_to_cpu(*vp++);
-                if (svc != PTLRPC_GSS_SVC_INTEGRITY) {
+                if (svc != PTLRPCS_GSS_SVC_INTEGRITY) {
                         CERROR("Unknown svc %d\n", svc);
                         RETURN(-EPROTO);
                 }
@@ -1019,7 +1068,7 @@ static int gss_cred_verify(struct ptlrpc_cred *cred,
                         CERROR("vlen %d, mic.len %d\n", vlen, mic.len);
                         RETURN(-EINVAL);
                 }
-                mic.data = (char *) vp;
+                mic.data = (unsigned char *)vp;
 
                 gcred = container_of(cred, struct gss_cred, gc_base);
                 ctx = gss_cred_get_ctx(cred);
@@ -1030,8 +1079,18 @@ static int gss_cred_verify(struct ptlrpc_cred *cred,
 
                 major = kgss_verify_mic(ctx->gc_gss_ctx, &lmsg, &mic, NULL);
                 if (major != GSS_S_COMPLETE) {
-                        CERROR("gss verify mic error: major %x\n", major);
-                        GOTO(proc_data_out, rc = -EINVAL);
+                        CERROR("cred %p: req %p verify mic error: major %x\n",
+                               cred, req, major);
+
+                        if (major == GSS_S_CREDENTIALS_EXPIRED ||
+                            major == GSS_S_CONTEXT_EXPIRED) {
+                                ptlrpcs_cred_expire(cred);
+                                req->rq_ptlrpcs_restart = 1;
+                                rc = 0;
+                        } else
+                                rc = -EINVAL;
+
+                        GOTO(proc_data_out, rc);
                 }
 
                 req->rq_repmsg = (struct lustre_msg *) lmsg.data;
@@ -1045,7 +1104,7 @@ static int gss_cred_verify(struct ptlrpc_cred *cred,
 proc_data_out:
                 gss_put_ctx(ctx);
                 break;
-        case PTLRPC_GSS_PROC_ERR:
+        case PTLRPCS_GSS_PROC_ERR:
                 major = le32_to_cpu(*vp++);
                 minor = le32_to_cpu(*vp++);
                 /* server return NO_CONTEXT might be caused by context expire
@@ -1060,19 +1119,16 @@ proc_data_out:
                  */
                 if (major == GSS_S_NO_CONTEXT ||
                     major == GSS_S_BAD_SIG) {
-                        CWARN("req %p: server report cred %p %s, expired?\n",
+                        CWARN("req %p: server report cred %p %s\n",
                                req, cred, (major == GSS_S_NO_CONTEXT) ?
                                            "NO_CONTEXT" : "BAD_SIG");
 
-                        ptlrpcs_cred_die(cred);
-                        rc = ptlrpcs_req_replace_dead_cred(req);
-                        if (!rc)
-                                req->rq_ptlrpcs_restart = 1;
-                        else
-                                CERROR("replace dead cred failed %d\n", rc);
+                        ptlrpcs_cred_expire(cred);
+                        req->rq_ptlrpcs_restart = 1;
+                        rc = 0;
                 } else {
-                        CERROR("Unrecognized gss error (%x/%x)\n",
-                                major, minor);
+                        CERROR("req %p: unrecognized gss error (%x/%x)\n",
+                                req, major, minor);
                         rc = -EACCES;
                 }
                 break;
@@ -1123,10 +1179,10 @@ static int gss_cred_seal(struct ptlrpc_cred *cred,
         spin_unlock(&ctx->gc_seq_lock);
 
         *vp++ = cpu_to_le32(PTLRPC_SEC_GSS_VERSION);    /* version */
-        *vp++ = cpu_to_le32(PTLRPC_SEC_GSS_KRB5P);      /* subflavor */
+        *vp++ = cpu_to_le32(PTLRPCS_FLVR_KRB5P);        /* subflavor */
         *vp++ = cpu_to_le32(ctx->gc_proc);              /* proc */
         *vp++ = cpu_to_le32(seqnum);                    /* seq */
-        *vp++ = cpu_to_le32(PTLRPC_GSS_SVC_PRIVACY);    /* service */
+        *vp++ = cpu_to_le32(PTLRPCS_GSS_SVC_PRIVACY);   /* service */
         vlen -= 5 * 4;
 
         if (rawobj_serialize(&ctx->gc_wire_ctx, &vp, &vlen)) {
@@ -1139,7 +1195,8 @@ static int gss_cred_seal(struct ptlrpc_cred *cred,
         vlen -= 4;
 
         msg_buf.buf = (__u8 *) req->rq_reqmsg - GSS_PRIVBUF_PREFIX_LEN;
-        msg_buf.buflen = req->rq_reqlen + GSS_PRIVBUF_PREFIX_LEN + GSS_PRIVBUF_SUFFIX_LEN;
+        msg_buf.buflen = req->rq_reqlen + GSS_PRIVBUF_PREFIX_LEN +
+                                          GSS_PRIVBUF_SUFFIX_LEN;
         msg_buf.dataoff = GSS_PRIVBUF_PREFIX_LEN;
         msg_buf.datalen = req->rq_reqlen;
 
@@ -1149,7 +1206,7 @@ static int gss_cred_seal(struct ptlrpc_cred *cred,
         major = kgss_wrap(ctx->gc_gss_ctx, GSS_C_QOP_DEFAULT,
                           &msg_buf, &cipher_buf);
         if (major) {
-                CERROR("error wrap: major 0x%x\n", major);
+                CERROR("cred %p: error wrap: major 0x%x\n", cred, major);
                 GOTO(out, rc = -EINVAL);
         }
 
@@ -1175,7 +1232,7 @@ static int gss_cred_unseal(struct ptlrpc_cred *cred,
         struct ptlrpcs_wire_hdr *sec_hdr;
         rawobj_t                cipher_text, plain_text;
         __u32                   *vp, vlen, subflavor, proc, seq, svc;
-        int                     rc;
+        __u32                   major, rc;
         ENTRY;
 
         LASSERT(req->rq_repbuf);
@@ -1206,8 +1263,8 @@ static int gss_cred_unseal(struct ptlrpc_cred *cred,
         vlen -= 5 * 4;
 
         switch (proc) {
-        case PTLRPC_GSS_PROC_DATA:
-                if (svc != PTLRPC_GSS_SVC_PRIVACY) {
+        case PTLRPCS_GSS_PROC_DATA:
+                if (svc != PTLRPCS_GSS_SVC_PRIVACY) {
                         CERROR("Unknown svc %d\n", svc);
                         RETURN(-EPROTO);
                 }
@@ -1233,11 +1290,21 @@ static int gss_cred_unseal(struct ptlrpc_cred *cred,
                 ctx = gss_cred_get_ctx(cred);
                 LASSERT(ctx);
 
-                rc = kgss_unwrap(ctx->gc_gss_ctx, GSS_C_QOP_DEFAULT,
-                                 &cipher_text, &plain_text);
-                if (rc) {
-                        CERROR("error unwrap: 0x%x\n", rc);
-                        GOTO(proc_out, rc = -EINVAL);
+                major = kgss_unwrap(ctx->gc_gss_ctx, GSS_C_QOP_DEFAULT,
+                                    &cipher_text, &plain_text);
+                if (major) {
+                        CERROR("cred %p: error unwrap: major 0x%x\n",
+                               cred, major);
+
+                        if (major == GSS_S_CREDENTIALS_EXPIRED ||
+                            major == GSS_S_CONTEXT_EXPIRED) {
+                                ptlrpcs_cred_expire(cred);
+                                req->rq_ptlrpcs_restart = 1;
+                                rc = 0;
+                        } else
+                                rc = -EINVAL;
+
+                        GOTO(proc_out, rc);
                 }
 
                 req->rq_repmsg = (struct lustre_msg *) vp;
@@ -1263,30 +1330,39 @@ static void destroy_gss_context(struct ptlrpc_cred *cred)
         struct ptlrpc_request    req;
         struct obd_import       *imp;
         __u32                   *vp, lmsg_size;
+        struct ptlrpc_request   *raw_req = NULL;
+        const int                repbuf_len = 256;
+        char                    *repbuf;
+        int                      replen, rc;
         ENTRY;
 
-        /* cred's refcount is 0, steal one */
-        atomic_inc(&cred->pc_refcount);
-
-        gcred = container_of(cred, struct gss_cred, gc_base);
-        gcred->gc_ctx->gc_proc = PTLRPC_GSS_PROC_DESTROY;
         imp = cred->pc_sec->ps_import;
         LASSERT(imp);
 
-        if (!(cred->pc_flags & PTLRPC_CRED_UPTODATE)) {
-                CWARN("Destroy a dead gss cred %p(%u@%s), don't send rpc\n",
-                       gcred, cred->pc_uid, imp->imp_target_uuid.uuid);
-                atomic_dec(&cred->pc_refcount);
+        if (test_bit(PTLRPC_CRED_ERROR_BIT, &cred->pc_flags) ||
+            !test_bit(PTLRPC_CRED_UPTODATE_BIT, &cred->pc_flags)) {
+                CDEBUG(D_SEC, "Destroy dead cred %p(%u@%s)\n",
+                       cred, cred->pc_uid, imp->imp_target_uuid.uuid);
                 EXIT;
                 return;
         }
 
-        CWARN("client destroy gss cred %p(%u@%s)\n",
+        might_sleep();
+
+        /* cred's refcount is 0, steal one */
+        atomic_inc(&cred->pc_refcount);
+
+        gcred = container_of(cred, struct gss_cred, gc_base);
+        gcred->gc_ctx->gc_proc = PTLRPCS_GSS_PROC_DESTROY;
+
+        CDEBUG(D_SEC, "client destroy gss cred %p(%u@%s)\n",
                gcred, cred->pc_uid, imp->imp_target_uuid.uuid);
 
         lmsg_size = lustre_msg_size(0, NULL);
+        req.rq_req_secflvr = cred->pc_sec->ps_flavor;
+        req.rq_cred = cred;
         req.rq_reqbuf_len = sizeof(*hdr) + lmsg_size +
-                            ptlrpcs_est_req_payload(cred->pc_sec, lmsg_size);
+                            ptlrpcs_est_req_payload(&req, lmsg_size);
 
         OBD_ALLOC(req.rq_reqbuf, req.rq_reqbuf_len);
         if (!req.rq_reqbuf) {
@@ -1298,8 +1374,7 @@ static void destroy_gss_context(struct ptlrpc_cred *cred)
 
         /* wire hdr */
         hdr = buf_to_sec_hdr(req.rq_reqbuf);
-        hdr->flavor  = cpu_to_le32(PTLRPC_SEC_GSS);
-        hdr->sectype = cpu_to_le32(PTLRPC_SEC_TYPE_AUTH);
+        hdr->flavor  = cpu_to_le32(PTLRPCS_FLVR_GSS_AUTH);
         hdr->msg_len = cpu_to_le32(lmsg_size);
         hdr->sec_len = cpu_to_le32(0);
 
@@ -1329,10 +1404,21 @@ static void destroy_gss_context(struct ptlrpc_cred *cred)
         }
         atomic_dec(&cred->pc_refcount);
 
-        /* send out */
-        gss_send_secfini_rpc(imp, req.rq_reqbuf, req.rq_reqdata_len);
+        OBD_ALLOC(repbuf, repbuf_len);
+        if (!repbuf)
+                goto exit;
+
+        raw_req = ptl_do_rawrpc(imp, req.rq_reqbuf, req.rq_reqbuf_len,
+                                req.rq_reqdata_len, repbuf, repbuf_len, &replen,
+                                SECFINI_RPC_TIMEOUT, &rc);
+        if (!raw_req)
+                OBD_FREE(repbuf, repbuf_len);
+
 exit:
-        OBD_FREE(req.rq_reqbuf, req.rq_reqbuf_len);
+        if (raw_req == NULL)
+                OBD_FREE(req.rq_reqbuf, req.rq_reqbuf_len);
+        else
+                rawrpc_req_finished(raw_req);
         EXIT;
 }
 
@@ -1350,7 +1436,7 @@ static void gss_cred_destroy(struct ptlrpc_cred *cred)
                 gss_put_ctx(gcred->gc_ctx);
         }
 
-        CDEBUG(D_SEC, "GSS_SEC: destroy cred %p\n", gcred);
+        CDEBUG(D_SEC, "sec.gss %p: destroy cred %p\n", cred->pc_sec, gcred);
 
         OBD_FREE(gcred, sizeof(*gcred));
         EXIT;
@@ -1429,7 +1515,7 @@ gss_pipe_downcall(struct file *filp, const char *src, size_t mlen)
         if (left)
                 GOTO(err_free, err = -EFAULT);
 
-        obj.data = buf;
+        obj.data = (unsigned char *)buf;
         obj.len = mlen;
 
         LASSERT(rpci->private);
@@ -1445,18 +1531,22 @@ gss_pipe_downcall(struct file *filp, const char *src, size_t mlen)
         if (err)
                 CERROR("parse init downcall err %d\n", err);
 
+        vcred.vc_pag = gmd.gum_pag;
         vcred.vc_uid = gmd.gum_uid;
-        vcred.vc_pag = vcred.vc_uid; /* FIXME */
 
         cred = ptlrpcs_cred_lookup(sec, &vcred);
         if (!cred) {
                 CWARN("didn't find cred for uid %u\n", vcred.vc_uid);
-                GOTO(err, err);
+                GOTO(err, err = -EINVAL);
         }
+
         if (err || gss_err) {
-                CERROR("got err %d, gss err %d, set cred %p dead\n",
-                        err, gss_err, cred);
-                cred->pc_flags |= PTLRPC_CRED_DEAD;
+                set_bit(PTLRPC_CRED_DEAD_BIT, &cred->pc_flags);
+                if (err != -ERESTART || gss_err != 0)
+                        set_bit(PTLRPC_CRED_ERROR_BIT, &cred->pc_flags);
+                CERROR("cred %p: rpc err %d, gss err 0x%x, fatal %d\n",
+                       cred, err, gss_err,
+                       (test_bit(PTLRPC_CRED_ERROR_BIT, &cred->pc_flags) != 0));
         } else {
                 CDEBUG(D_SEC, "get initial ctx:\n");
                 gss_cred_set_ctx(cred, ctx);
@@ -1492,6 +1582,8 @@ void gss_pipe_destroy_msg(struct rpc_pipe_msg *msg)
         static unsigned long ratelimit;
         ENTRY;
 
+        LASSERT(list_empty(&msg->list));
+
         if (msg->errno >= 0) {
                 EXIT;
                 return;
@@ -1499,12 +1591,13 @@ void gss_pipe_destroy_msg(struct rpc_pipe_msg *msg)
 
         gmsg = container_of(msg, struct gss_upcall_msg, gum_base);
         CDEBUG(D_SEC, "destroy gmsg %p\n", gmsg);
+        LASSERT(atomic_read(&gmsg->gum_refcount) > 0);
         atomic_inc(&gmsg->gum_refcount);
         gss_unhash_msg(gmsg);
         if (msg->errno == -ETIMEDOUT || msg->errno == -EPIPE) {
                 unsigned long now = get_seconds();
                 if (time_after(now, ratelimit)) {
-                        CWARN("GSS_SEC upcall timed out.\n"
+                        CWARN("sec.gss upcall timed out.\n"
                               "Please check user daemon is running!\n");
                         ratelimit = now + 15;
                 }
@@ -1529,6 +1622,7 @@ void gss_pipe_release(struct inode *inode)
 
                 gmsg = list_entry(gsec->gs_upcalls.next,
                                   struct gss_upcall_msg, gum_list);
+                LASSERT(list_empty(&gmsg->gum_base.list));
                 gmsg->gum_base.errno = -EPIPE;
                 atomic_inc(&gmsg->gum_refcount);
                 gss_unhash_msg_nolock(gmsg);
@@ -1551,18 +1645,21 @@ static struct rpc_pipe_ops gss_upcall_ops = {
  *********************************************/
 
 static
-struct ptlrpc_sec* gss_create_sec(ptlrpcs_flavor_t *flavor,
+struct ptlrpc_sec* gss_create_sec(__u32 flavor,
                                   const char *pipe_dir,
                                   void *pipe_data)
 {
         struct gss_sec *gsec;
         struct ptlrpc_sec *sec;
+        uid_t save_uid;
+
 #ifdef __KERNEL__
         char *pos;
+        int   pipepath_len;
 #endif
         ENTRY;
 
-        LASSERT(flavor->flavor == PTLRPC_SEC_GSS);
+        LASSERT(SEC_FLAVOR_MAJOR(flavor) == PTLRPCS_FLVR_MAJOR_GSS);
 
         OBD_ALLOC(gsec, sizeof(*gsec));
         if (!gsec) {
@@ -1570,9 +1667,9 @@ struct ptlrpc_sec* gss_create_sec(ptlrpcs_flavor_t *flavor,
                 RETURN(NULL);
         }
 
-        gsec->gs_mech = kgss_subflavor_to_mech(flavor->subflavor);
+        gsec->gs_mech = kgss_subflavor_to_mech(SEC_FLAVOR_SUB(flavor));
         if (!gsec->gs_mech) {
-                CERROR("subflavor %d not found\n", flavor->subflavor);
+                CERROR("subflavor 0x%x not found\n", flavor);
                 goto err_free;
         }
 
@@ -1581,15 +1678,24 @@ struct ptlrpc_sec* gss_create_sec(ptlrpcs_flavor_t *flavor,
         INIT_LIST_HEAD(&gsec->gs_upcalls);
         spin_lock_init(&gsec->gs_lock);
 
-        snprintf(gsec->gs_pipepath, sizeof(gsec->gs_pipepath),
-                 LUSTRE_PIPEDIR"/%s", pipe_dir);
+        pipepath_len = strlen(LUSTRE_PIPEDIR) + strlen(pipe_dir) +
+                       strlen(gsec->gs_mech->gm_name) + 3;
+        OBD_ALLOC(gsec->gs_pipepath, pipepath_len);
+        if (!gsec->gs_pipepath)
+                goto err_mech_put;
+
+        /* pipe rpc require root permission */
+        save_uid = current->fsuid;
+        current->fsuid = 0;
+
+        sprintf(gsec->gs_pipepath, LUSTRE_PIPEDIR"/%s", pipe_dir);
         if (IS_ERR(rpc_mkdir(gsec->gs_pipepath, NULL))) {
                 CERROR("can't make pipedir %s\n", gsec->gs_pipepath);
-                goto err_mech_put;
+                goto err_free_path;
         }
 
-        snprintf(gsec->gs_pipepath, sizeof(gsec->gs_pipepath),
-                 LUSTRE_PIPEDIR"/%s/%s", pipe_dir, gsec->gs_mech->gm_name); 
+        sprintf(gsec->gs_pipepath, LUSTRE_PIPEDIR"/%s/%s", pipe_dir,
+                gsec->gs_mech->gm_name); 
         gsec->gs_depipe = rpc_mkpipe(gsec->gs_pipepath, gsec,
                                      &gss_upcall_ops, RPC_PIPE_WAIT_FOR_OPEN);
         if (IS_ERR(gsec->gs_depipe)) {
@@ -1601,24 +1707,13 @@ struct ptlrpc_sec* gss_create_sec(ptlrpcs_flavor_t *flavor,
 #endif
 
         sec = &gsec->gs_base;
-
-        switch (flavor->subflavor) {
-        case PTLRPC_SEC_GSS_KRB5I:
-                sec->ps_sectype = PTLRPC_SEC_TYPE_AUTH;
-                break;
-        case PTLRPC_SEC_GSS_KRB5P:
-                sec->ps_sectype = PTLRPC_SEC_TYPE_PRIV;
-                break;
-        default:
-                LBUG();
-        }
-
         sec->ps_expire = GSS_CREDCACHE_EXPIRE;
         sec->ps_nextgc = get_seconds() + sec->ps_expire;
         sec->ps_flags = 0;
 
-        CDEBUG(D_SEC, "Create GSS security instance at %p(external %p)\n",
-               gsec, sec);
+        current->fsuid = save_uid;
+
+        CDEBUG(D_SEC, "Create sec.gss %p\n", gsec);
         RETURN(sec);
 
 #ifdef __KERNEL__
@@ -1627,6 +1722,9 @@ err_rmdir:
         LASSERT(pos);
         *pos = 0;
         rpc_rmdir(gsec->gs_pipepath);
+err_free_path:
+        current->fsuid = save_uid;
+        OBD_FREE(gsec->gs_pipepath, pipepath_len);
 err_mech_put:
 #endif
         kgss_mech_put(gsec->gs_mech);
@@ -1641,21 +1739,24 @@ void gss_destroy_sec(struct ptlrpc_sec *sec)
         struct gss_sec *gsec;
 #ifdef __KERNEL__
         char *pos;
+        int   pipepath_len;
 #endif
         ENTRY;
 
         gsec = container_of(sec, struct gss_sec, gs_base);
-        CDEBUG(D_SEC, "Destroy GSS security instance at %p\n", gsec);
+        CDEBUG(D_SEC, "Destroy sec.gss %p\n", gsec);
 
         LASSERT(gsec->gs_mech);
         LASSERT(!atomic_read(&sec->ps_refcount));
         LASSERT(!atomic_read(&sec->ps_credcount));
 #ifdef __KERNEL__
+        pipepath_len = strlen(gsec->gs_pipepath) + 1;
         rpc_unlink(gsec->gs_pipepath);
         pos = strrchr(gsec->gs_pipepath, '/');
         LASSERT(pos);
         *pos = 0;
         rpc_rmdir(gsec->gs_pipepath);
+        OBD_FREE(gsec->gs_pipepath, pipepath_len);
 #endif
 
         kgss_mech_put(gsec->gs_mech);
@@ -1665,7 +1766,6 @@ void gss_destroy_sec(struct ptlrpc_sec *sec)
 
 static
 struct ptlrpc_cred * gss_create_cred(struct ptlrpc_sec *sec,
-                                     struct ptlrpc_request *req,
                                      struct vfs_cred *vcred)
 {
         struct gss_cred *gcred;
@@ -1681,8 +1781,7 @@ struct ptlrpc_cred * gss_create_cred(struct ptlrpc_sec *sec,
         atomic_set(&cred->pc_refcount, 0);
         cred->pc_sec = sec;
         cred->pc_ops = &gss_credops;
-        cred->pc_req = req;
-        cred->pc_expire = get_seconds() + GSS_CRED_EXPIRE;
+        cred->pc_expire = 0;
         cred->pc_flags = 0;
         cred->pc_pag = vcred->vc_pag;
         cred->pc_uid = vcred->vc_uid;
@@ -1692,12 +1791,14 @@ struct ptlrpc_cred * gss_create_cred(struct ptlrpc_sec *sec,
         RETURN(cred);
 }
 
-static int gss_estimate_payload(struct ptlrpc_sec *sec, int msgsize)
+static int gss_estimate_payload(struct ptlrpc_sec *sec,
+                                struct ptlrpc_request *req,
+                                int msgsize)
 {
-        switch (sec->ps_sectype) {
-        case PTLRPC_SEC_TYPE_AUTH:
+        switch (SEC_FLAVOR_SVC(req->rq_req_secflvr)) {
+        case PTLRPCS_SVC_AUTH:
                 return GSS_MAX_AUTH_PAYLOAD;
-        case PTLRPC_SEC_TYPE_PRIV:
+        case PTLRPCS_SVC_PRIV:
                 return size_round16(GSS_MAX_AUTH_PAYLOAD + msgsize +
                                     GSS_PRIVBUF_PREFIX_LEN +
                                     GSS_PRIVBUF_SUFFIX_LEN);
@@ -1718,9 +1819,9 @@ static int gss_alloc_reqbuf(struct ptlrpc_sec *sec,
         /* In PRIVACY mode, lustre message is always 0 (already encoded into
          * security payload).
          */
-        privacy = sec->ps_sectype == PTLRPC_SEC_TYPE_PRIV;
+        privacy = (SEC_FLAVOR_SVC(req->rq_req_secflvr) == PTLRPCS_SVC_PRIV);
         msg_payload = privacy ? 0 : lmsg_size;
-        sec_payload = gss_estimate_payload(sec, lmsg_size);
+        sec_payload = gss_estimate_payload(sec, req, lmsg_size);
 
         rc = sec_alloc_reqbuf(sec, req, msg_payload, sec_payload);
         if (rc)
@@ -1754,7 +1855,7 @@ static void gss_free_reqbuf(struct ptlrpc_sec *sec,
         LASSERT(req->rq_reqmsg);
         LASSERT(req->rq_reqlen);
 
-        privacy = sec->ps_sectype == PTLRPC_SEC_TYPE_PRIV;
+        privacy = SEC_FLAVOR_SVC(req->rq_req_secflvr) == PTLRPCS_SVC_PRIV;
         if (privacy) {
                 buf = (char *) req->rq_reqmsg - GSS_PRIVBUF_PREFIX_LEN;
                 LASSERT(buf < req->rq_reqbuf ||
@@ -1779,9 +1880,9 @@ static struct ptlrpc_secops gss_secops = {
 
 static struct ptlrpc_sec_type gss_type = {
         .pst_owner      = THIS_MODULE,
-        .pst_name       = "GSS_SEC",
+        .pst_name       = "sec.gss",
         .pst_inst       = ATOMIC_INIT(0),
-        .pst_flavor     = {PTLRPC_SEC_GSS, 0},
+        .pst_flavor     = PTLRPCS_FLVR_MAJOR_GSS,
         .pst_ops        = &gss_secops,
 };