Whamcloud - gitweb
Branch HEAD
[fs/lustre-release.git] / lustre / sec / sec.c
index 070724c..40f892a 100644 (file)
 #include <linux/lustre_sec.h>
 
 static spinlock_t sectypes_lock = SPIN_LOCK_UNLOCKED;
-static struct ptlrpc_sec_type *sectypes[PTLRPC_SEC_MAX_FLAVORS] = {
+static struct ptlrpc_sec_type *sectypes[PTLRPCS_FLVR_MAJOR_MAX] = {
         NULL,
 };
 
 int ptlrpcs_register(struct ptlrpc_sec_type *type)
 {
-        __u32 flavor = type->pst_flavor.flavor;
+        __u32 flavor = type->pst_flavor;
 
         LASSERT(type->pst_name);
         LASSERT(type->pst_ops);
 
-        if (flavor >= PTLRPC_SEC_MAX_FLAVORS)
+        if (flavor >= PTLRPCS_FLVR_MAJOR_MAX)
                 return -EINVAL;
 
         spin_lock(&sectypes_lock);
@@ -64,49 +64,46 @@ int ptlrpcs_register(struct ptlrpc_sec_type *type)
         atomic_set(&type->pst_inst, 0);
         spin_unlock(&sectypes_lock);
 
-        CWARN("Security module %s registered\n", type->pst_name);
+        CWARN("%s: registered\n", type->pst_name);
         return 0;
 }
 
 int ptlrpcs_unregister(struct ptlrpc_sec_type *type)
 {
-        __u32 flavor = type->pst_flavor.flavor;
+        __u32 major = type->pst_flavor;
 
-        if (flavor >= PTLRPC_SEC_MAX_FLAVORS)
-                return -EINVAL;
+        LASSERT(major < PTLRPCS_FLVR_MAJOR_MAX);
 
         spin_lock(&sectypes_lock);
-        if (!sectypes[flavor]) {
+        if (!sectypes[major]) {
                 spin_unlock(&sectypes_lock);
+                CERROR("%s: already unregistered?\n", type->pst_name);
                 return -EINVAL;
         }
 
-        if (sectypes[flavor] != type) {
-                CERROR("invalid unregister\n");
-                return -EINVAL;
-        }
+        LASSERT(sectypes[major] == type);
 
         if (atomic_read(&type->pst_inst)) {
-                CERROR("sec module %s still have instance %d\n",
+                CERROR("%s: still have %d instances\n",
                        type->pst_name, atomic_read(&type->pst_inst));
                 spin_unlock(&sectypes_lock);
                 return -EINVAL;
         }
 
-        CDEBUG(D_SEC, "Security module %s unregistered\n", type->pst_name);
-        sectypes[flavor] = NULL;
+        sectypes[major] = NULL;
         spin_unlock(&sectypes_lock);
 
+        CDEBUG(D_SEC, "%s: unregistered\n", type->pst_name);
         return 0;
 }
 
 static
-struct ptlrpc_sec_type * ptlrpcs_flavor2type(ptlrpcs_flavor_t *flavor)
+struct ptlrpc_sec_type * ptlrpcs_flavor2type(__u32 flavor)
 {
         struct ptlrpc_sec_type *type;
-        __u32 major = flavor->flavor;
+        __u32 major = SEC_FLAVOR_MAJOR(flavor);
 
-        if (major >= PTLRPC_SEC_MAX_FLAVORS)
+        if (major >= PTLRPCS_FLVR_MAJOR_MAX)
                 return NULL;
 
         spin_lock(&sectypes_lock);
@@ -123,6 +120,37 @@ void ptlrpcs_type_put(struct ptlrpc_sec_type *type)
         module_put(type->pst_owner);
 }
 
+__u32 ptlrpcs_name2flavor(const char *name)
+{
+        if (!strcmp(name, "null"))
+                return PTLRPCS_FLVR_NULL;
+        if (!strcmp(name, "krb5"))
+                return PTLRPCS_FLVR_KRB5;
+        if (!strcmp(name, "krb5i"))
+                return PTLRPCS_FLVR_KRB5I;
+        if (!strcmp(name, "krb5p"))
+                return PTLRPCS_FLVR_KRB5P;
+
+        return PTLRPCS_FLVR_INVALID;
+}
+
+char *ptlrpcs_flavor2name(__u32 flavor)
+{
+        switch (flavor) {
+        case PTLRPCS_FLVR_NULL:
+                return "null";
+        case PTLRPCS_FLVR_KRB5:
+                return "krb5";
+        case PTLRPCS_FLVR_KRB5I:
+                return "krb5i";
+        case PTLRPCS_FLVR_KRB5P:
+                return "krb5p";
+        default:
+                CERROR("invalid flavor 0x%x\n", flavor);
+        }
+        return "unknown";
+}
+
 /***********************************************
  * credential cache helpers                    *
  ***********************************************/
@@ -132,19 +160,35 @@ void ptlrpcs_init_credcache(struct ptlrpc_sec *sec)
         int i;
         for (i = 0; i < PTLRPC_CREDCACHE_NR; i++)
                 INIT_LIST_HEAD(&sec->ps_credcache[i]);
-        sec->ps_nextgc = get_seconds() + (sec->ps_expire >> 1);
+
+        /* ps_nextgc == 0 means never do gc */
+        if (sec->ps_nextgc)
+                sec->ps_nextgc = get_seconds() + (sec->ps_expire >> 1);
 }
 
-static void ptlrpcs_cred_destroy(struct ptlrpc_cred *cred)
+/*
+ * return 1 means we should also destroy the sec structure.
+ * normally return 0
+ */
+static int ptlrpcs_cred_destroy(struct ptlrpc_cred *cred)
 {
         struct ptlrpc_sec *sec = cred->pc_sec;
+        int rc = 0;
 
         LASSERT(cred->pc_sec);
         LASSERT(atomic_read(&cred->pc_refcount) == 0);
         LASSERT(list_empty(&cred->pc_hash));
 
         cred->pc_ops->destroy(cred);
-        atomic_dec(&sec->ps_credcount);
+
+        /* spinlock to protect against ptlrpcs_sec_put() */
+        LASSERT(atomic_read(&sec->ps_credcount));
+        spin_lock(&sec->ps_lock);
+        if (atomic_dec_and_test(&sec->ps_credcount) &&
+            !atomic_read(&sec->ps_refcount))
+                rc = 1;
+        spin_unlock(&sec->ps_lock);
+        return rc;
 }
 
 static void ptlrpcs_destroy_credlist(struct list_head *head)
@@ -159,29 +203,55 @@ static void ptlrpcs_destroy_credlist(struct list_head *head)
 }
 
 static
-int ptlrpcs_cred_unlink_expired(struct ptlrpc_cred *cred,
-                                struct list_head *freelist)
+int cred_check_dead(struct ptlrpc_cred *cred,
+                    struct list_head *freelist, int removal)
 {
-        LASSERT(cred->pc_sec);
+        /* here we do the exact thing as asked. but an alternative
+         * way is remove dead entries immediately without be asked
+         * remove, since dead entry will not lead to further rpcs.
+         */
+        if (unlikely(ptlrpcs_cred_is_dead(cred))) {
+                /* don't try to destroy a busy entry */
+                if (atomic_read(&cred->pc_refcount))
+                        return 1;
+                goto out;
+        }
 
-        /* only unlink non-busy entries */
+        /* a busy non-dead entry is considered as "good" one.
+         * Note in a very busy client where cred always busy, we
+         * will not be able to find the expire here, but some other
+         * part will, e.g. checking during refresh, or got error
+         * notification from server, etc. We don't touch busy cred
+         * here is because a busy cred's flag might be changed at
+         * anytime by the owner, we don't want to compete with them.
+         */
         if (atomic_read(&cred->pc_refcount) != 0)
                 return 0;
+
         /* expire is 0 means never expire. a newly created gss cred
          * which during upcall also has 0 expiration
          */
         if (cred->pc_expire == 0)
                 return 0;
+
         /* check real expiration */
         if (time_after(cred->pc_expire, get_seconds()))
                 return 0;
 
-        LASSERT((cred->pc_flags & PTLRPC_CRED_FLAGS_MASK) ==
-                PTLRPC_CRED_UPTODATE);
-        CWARN("cred %p: get expired, unlink\n", cred);
+        /* although we'v checked the bit right above, there's still
+         * possibility that somebody else set the bit elsewhere.
+         */
+        ptlrpcs_cred_expire(cred);
+
+out:
+        if (removal) {
+                LASSERT(atomic_read(&cred->pc_refcount) >= 0);
+                LASSERT(cred->pc_sec);
+                LASSERT(spin_is_locked(&cred->pc_sec->ps_lock));
+                LASSERT(freelist);
 
-        list_del(&cred->pc_hash);
-        list_add(&cred->pc_hash, freelist);
+                list_move(&cred->pc_hash, freelist);
+        }
         return 1;
 }
 
@@ -196,51 +266,63 @@ void ptlrpcs_credcache_gc(struct ptlrpc_sec *sec,
         CDEBUG(D_SEC, "do gc on sec %s\n", sec->ps_type->pst_name);
         for (i = 0; i < PTLRPC_CREDCACHE_NR; i++) {
                 list_for_each_entry_safe(cred, n, &sec->ps_credcache[i],
-                                         pc_hash) {
-                        if (cred->pc_flags & (PTLRPC_CRED_DEAD |
-                                              PTLRPC_CRED_ERROR)) {
-                                LASSERT(atomic_read(&cred->pc_refcount));
-                                continue;
-                        }
-                        ptlrpcs_cred_unlink_expired(cred, freelist);
-                }
+                                         pc_hash)
+                        cred_check_dead(cred, freelist, 1);
         }
         sec->ps_nextgc = get_seconds() + sec->ps_expire;
         EXIT;
 }
 
 /*
- * grace: mark cred DEAD, allow graceful destroy like notify
- *        server side, etc.
- * force: flush all entries, otherwise only free ones be flushed.
+ * @uid: which user. "-1" means flush all.
+ * @grace: mark cred DEAD, allow graceful destroy like notify
+ *         server side, etc.
+ * @force: flush all entries, otherwise only free ones be flushed.
  */
 static
-int ptlrpcs_flush_credcache(struct ptlrpc_sec *sec, int grace, int force)
+int flush_credcache(struct ptlrpc_sec *sec, unsigned long pag, uid_t uid,
+                    int grace, int force)
 {
         struct ptlrpc_cred *cred, *n;
         LIST_HEAD(freelist);
         int i, busy = 0;
         ENTRY;
 
+        might_sleep_if(grace);
+
         spin_lock(&sec->ps_lock);
         for (i = 0; i < PTLRPC_CREDCACHE_NR; i++) {
                 list_for_each_entry_safe(cred, n, &sec->ps_credcache[i],
                                          pc_hash) {
                         LASSERT(atomic_read(&cred->pc_refcount) >= 0);
+
+                        if (sec->ps_flags & PTLRPC_SEC_FL_PAG) {
+                                if (pag != -1 && pag != cred->pc_pag)
+                                        continue;
+                        } else {
+                                if (uid != -1 && uid != cred->pc_uid)
+                                        continue;
+                        }
+
                         if (atomic_read(&cred->pc_refcount)) {
                                 busy = 1;
                                 if (!force)
                                         continue;
                                 list_del_init(&cred->pc_hash);
+                                CDEBUG(D_SEC, "sec %p: flush busy(%d) cred %p "
+                                       "by force\n", sec,
+                                       atomic_read(&cred->pc_refcount), cred);
                         } else
                                 list_move(&cred->pc_hash, &freelist);
 
-                        cred->pc_flags |= PTLRPC_CRED_DEAD;
+                        set_bit(PTLRPC_CRED_DEAD_BIT, &cred->pc_flags);
                         if (!grace)
-                                cred->pc_flags &= ~PTLRPC_CRED_UPTODATE;
+                                clear_bit(PTLRPC_CRED_UPTODATE_BIT,
+                                          &cred->pc_flags);
                 }
         }
         spin_unlock(&sec->ps_lock);
+
         ptlrpcs_destroy_credlist(&freelist);
         RETURN(busy);
 }
@@ -256,33 +338,33 @@ int ptlrpcs_cred_get_hash(__u64 pag)
         return (pag & PTLRPC_CREDCACHE_MASK);
 }
 
+/*
+ * return an uptodate or newly created cred entry.
+ */
 static
 struct ptlrpc_cred * cred_cache_lookup(struct ptlrpc_sec *sec,
                                        struct vfs_cred *vcred,
-                                       int create)
+                                       int create, int remove_dead)
 {
         struct ptlrpc_cred *cred, *new = NULL, *n;
         LIST_HEAD(freelist);
         int hash, found = 0;
         ENTRY;
 
+        might_sleep();
+
         hash = ptlrpcs_cred_get_hash(vcred->vc_pag);
 
 retry:
         spin_lock(&sec->ps_lock);
+
         /* do gc if expired */
-        if (time_after(get_seconds(), sec->ps_nextgc))
+        if (remove_dead &&
+            sec->ps_nextgc && time_after(get_seconds(), sec->ps_nextgc))
                 ptlrpcs_credcache_gc(sec, &freelist);
 
         list_for_each_entry_safe(cred, n, &sec->ps_credcache[hash], pc_hash) {
-                /* for DEAD and ERROR entries, its final put will
-                 * release them, so we simply skip here.
-                 */
-                if (cred->pc_flags & (PTLRPC_CRED_DEAD | PTLRPC_CRED_ERROR)) {
-                        LASSERT(atomic_read(&cred->pc_refcount));
-                        continue;
-                }
-                if (ptlrpcs_cred_unlink_expired(cred, &freelist))
+                if (cred_check_dead(cred, &freelist, remove_dead))
                         continue;
                 if (cred->pc_ops->match(cred, vcred)) {
                         found = 1;
@@ -327,7 +409,7 @@ struct ptlrpc_cred * ptlrpcs_cred_lookup(struct ptlrpc_sec *sec,
         struct ptlrpc_cred *cred;
         ENTRY;
 
-        cred = cred_cache_lookup(sec, vcred, 0);
+        cred = cred_cache_lookup(sec, vcred, 0, 1);
         RETURN(cred);
 }
 
@@ -336,13 +418,20 @@ static struct ptlrpc_cred *get_cred(struct ptlrpc_sec *sec)
         struct vfs_cred vcred;
 
         LASSERT(sec);
-        /* XXX
-         * for now we simply let PAG == real uid
-         */
-        vcred.vc_pag = (__u64) current->uid;
-        vcred.vc_uid = current->uid;
 
-        return cred_cache_lookup(sec, &vcred, 1);
+        if (sec->ps_flags &
+            (PTLRPC_SEC_FL_MDS | PTLRPC_SEC_FL_OSS | PTLRPC_SEC_FL_REVERSE)) {
+                vcred.vc_pag = 0;
+                vcred.vc_uid = 0;
+        } else {
+                if (sec->ps_flags & PTLRPC_SEC_FL_PAG)
+                        vcred.vc_pag = (__u64) current->pag;
+                else
+                        vcred.vc_pag = (__u64) current->uid;
+                vcred.vc_uid = current->uid;
+        }
+
+        return cred_cache_lookup(sec, &vcred, 1, 1);
 }
 
 int ptlrpcs_req_get_cred(struct ptlrpc_request *req)
@@ -373,19 +462,19 @@ int ptlrpcs_check_cred(struct obd_import *imp)
         struct ptlrpc_cred *cred;
         ENTRY;
 
+        might_sleep();
 again:
         cred = get_cred(imp->imp_sec);
         if (!cred)
                 RETURN(0);
 
         if (ptlrpcs_cred_is_uptodate(cred)) {
-                if (!ptlrpcs_cred_check_expire(cred)) {
-                        ptlrpcs_cred_put(cred, 1);
-                        RETURN(0);
-                } else {
-                        ptlrpcs_cred_put(cred, 1);
-                        goto again;
-                }
+                /* get_cred() has done expire checking, so we don't
+                 * expect it could expire so quickly, and actually
+                 * we don't care.
+                 */
+                ptlrpcs_cred_put(cred, 1);
+                RETURN(0);
         }
 
         ptlrpcs_cred_refresh(cred);
@@ -415,27 +504,51 @@ void ptlrpcs_cred_put(struct ptlrpc_cred *cred, int sync)
 {
         struct ptlrpc_sec *sec = cred->pc_sec;
 
-        LASSERT(cred);
         LASSERT(sec);
         LASSERT(atomic_read(&cred->pc_refcount));
 
         spin_lock(&sec->ps_lock);
-        if (atomic_dec_and_test(&cred->pc_refcount) && sync &&
-            cred->pc_flags & (PTLRPC_CRED_DEAD | PTLRPC_CRED_ERROR)) {
+
+        /* this has to be protected by ps_lock, because cred cache
+         * management code might increase ref against a 0-refed cred.
+         */
+        if (!atomic_dec_and_test(&cred->pc_refcount)) {
+                spin_unlock(&sec->ps_lock);
+                return;
+        }
+
+        /* if sec already unused, we have to destroy the cred (prevent it
+         * hanging there for ever)
+         */
+        if (atomic_read(&sec->ps_refcount) == 0) {
+                if (!test_and_set_bit(PTLRPC_CRED_DEAD_BIT, &cred->pc_flags))
+                        CWARN("cred %p: force expire on a unused sec\n", cred);
+                list_del_init(&cred->pc_hash);
+        } else if (unlikely(sync && ptlrpcs_cred_is_dead(cred)))
                 list_del_init(&cred->pc_hash);
-                ptlrpcs_cred_destroy(cred);
-                if (!atomic_read(&sec->ps_credcount) &&
-                    !atomic_read(&sec->ps_refcount)) {
-                        CWARN("put last cred on a dead sec %p(%s), "
-                              "also destroy the sec\n", sec,
-                               sec->ps_type->pst_name);
-                        spin_unlock(&sec->ps_lock);
 
-                        ptlrpcs_sec_destroy(sec);
-                        return;
-                }
+        if (!list_empty(&cred->pc_hash)) {
+                spin_unlock(&sec->ps_lock);
+                return;
         }
+
+        /* if required async, and we reached here, we have to clear
+         * the UPTODATE bit, thus no rpc is needed in destroy procedure.
+         */
+        if (!sync)
+                clear_bit(PTLRPC_CRED_UPTODATE_BIT, &cred->pc_flags);
+
         spin_unlock(&sec->ps_lock);
+
+        /* destroy this cred */
+        if (!ptlrpcs_cred_destroy(cred))
+                return;
+
+        LASSERT(!atomic_read(&sec->ps_credcount));
+        LASSERT(!atomic_read(&sec->ps_refcount));
+
+        CWARN("sec %p(%s), put last cred, also destroy the sec\n",
+              sec, sec->ps_type->pst_name);
 }
 
 void ptlrpcs_req_drop_cred(struct ptlrpc_request *req)
@@ -446,10 +559,8 @@ void ptlrpcs_req_drop_cred(struct ptlrpc_request *req)
         LASSERT(req->rq_cred);
 
         if (req->rq_cred) {
-                /* We'd like to not use 'sync' mode, but might cause
-                 * some cred leak. Need more thinking here. FIXME
-                 */
-                ptlrpcs_cred_put(req->rq_cred, 1);
+                /* this could be called with spinlock hold, use async mode */
+                ptlrpcs_cred_put(req->rq_cred, 0);
                 req->rq_cred = NULL;
         } else
                 CDEBUG(D_SEC, "req %p have no cred\n", req);
@@ -467,7 +578,7 @@ int ptlrpcs_req_replace_dead_cred(struct ptlrpc_request *req)
         ENTRY;
 
         LASSERT(cred);
-        LASSERT(cred->pc_flags & PTLRPC_CRED_DEAD);
+        LASSERT(test_bit(PTLRPC_CRED_DEAD_BIT, &cred->pc_flags));
 
         ptlrpcs_cred_get(cred);
         ptlrpcs_req_drop_cred(req);
@@ -486,7 +597,10 @@ int ptlrpcs_req_replace_dead_cred(struct ptlrpc_request *req)
 
 /*
  * since there's no lock on the cred, its status could be changed
- * by other threads at any time, we allow this race.
+ * by other threads at any time, we allow this race. If an uptodate
+ * cred turn to dead quickly under us, we don't know and continue
+ * using it, that's fine. if necessary the later error handling code
+ * will catch it.
  */
 int ptlrpcs_req_refresh_cred(struct ptlrpc_request *req)
 {
@@ -495,17 +609,15 @@ int ptlrpcs_req_refresh_cred(struct ptlrpc_request *req)
 
         LASSERT(cred);
 
-        if (ptlrpcs_cred_is_uptodate(cred)) {
-                if (!ptlrpcs_cred_check_expire(cred))
-                        RETURN(0);
-        }
+        if (!ptlrpcs_cred_check_uptodate(cred))
+                RETURN(0);
 
-        if (cred->pc_flags & PTLRPC_CRED_ERROR) {
+        if (test_bit(PTLRPC_CRED_ERROR_BIT, &cred->pc_flags)) {
                 req->rq_ptlrpcs_err = 1;
                 RETURN(-EPERM);
         }
 
-        if (cred->pc_flags & PTLRPC_CRED_DEAD) {
+        if (test_bit(PTLRPC_CRED_DEAD_BIT, &cred->pc_flags)) {
                 if (ptlrpcs_req_replace_dead_cred(req) == 0) {
                         LASSERT(cred != req->rq_cred);
                         CDEBUG(D_SEC, "req %p: replace cred %p => %p\n",
@@ -521,8 +633,9 @@ int ptlrpcs_req_refresh_cred(struct ptlrpc_request *req)
         }
 
         ptlrpcs_cred_refresh(cred);
+
         if (!ptlrpcs_cred_is_uptodate(cred)) {
-                if (cred->pc_flags & PTLRPC_CRED_ERROR)
+                if (test_bit(PTLRPC_CRED_ERROR_BIT, &cred->pc_flags))
                         req->rq_ptlrpcs_err = 1;
 
                 CERROR("req %p: failed to refresh cred %p, fatal %d\n",
@@ -551,9 +664,9 @@ int ptlrpcs_cli_wrap_request(struct ptlrpc_request *req)
         CDEBUG(D_SEC, "wrap req %p\n", req);
         cred = req->rq_cred;
 
-        switch (cred->pc_sec->ps_sectype) {
-        case PTLRPC_SEC_TYPE_NONE:
-        case PTLRPC_SEC_TYPE_AUTH:
+        switch (SEC_FLAVOR_SVC(req->rq_req_secflvr)) {
+        case PTLRPCS_SVC_NONE:
+        case PTLRPCS_SVC_AUTH:
                 if (req->rq_req_wrapped) {
                         CDEBUG(D_SEC, "req %p(o%u,x"LPU64",t"LPU64") "
                                "already signed, resend?\n", req,
@@ -570,7 +683,7 @@ int ptlrpcs_cli_wrap_request(struct ptlrpc_request *req)
                 if (!rc)
                         req->rq_req_wrapped = 1;
                 break;
-        case PTLRPC_SEC_TYPE_PRIV:
+        case PTLRPCS_SVC_PRIV:
                 if (req->rq_req_wrapped) {
                         CDEBUG(D_SEC, "req %p(o%u,x"LPU64",t"LPU64") "
                                "already encrypted, resend?\n", req,
@@ -619,17 +732,21 @@ int ptlrpcs_cli_unwrap_reply(struct ptlrpc_request *req)
 
         sec_hdr = (struct ptlrpcs_wire_hdr *) req->rq_repbuf;
         sec_hdr->flavor = le32_to_cpu(sec_hdr->flavor);
-        sec_hdr->sectype = le32_to_cpu(sec_hdr->sectype);
         sec_hdr->msg_len = le32_to_cpu(sec_hdr->msg_len);
         sec_hdr->sec_len = le32_to_cpu(sec_hdr->sec_len);
 
-        CDEBUG(D_SEC, "req %p, cred %p, flavor %u, sectype %u\n",
-               req, cred, sec_hdr->flavor, sec_hdr->sectype);
+        CDEBUG(D_SEC, "req %p, cred %p, flavor 0x%x\n",
+               req, cred, sec_hdr->flavor);
 
         sec = cred->pc_sec;
-        if (sec_hdr->flavor != sec->ps_flavor.flavor) {
-                CERROR("unmatched flavor %u while expect %u\n",
-                       sec_hdr->flavor, sec->ps_flavor.flavor);
+
+        /* only compare major flavor, reply might use different subflavor.
+         */
+        if (SEC_FLAVOR_MAJOR(sec_hdr->flavor) !=
+            SEC_FLAVOR_MAJOR(req->rq_req_secflvr)) {
+                CERROR("got major flavor %u while expect %u\n",
+                       SEC_FLAVOR_MAJOR(sec_hdr->flavor),
+                       SEC_FLAVOR_MAJOR(req->rq_req_secflvr));
                 RETURN(-EPROTO);
         }
 
@@ -641,14 +758,14 @@ int ptlrpcs_cli_unwrap_reply(struct ptlrpc_request *req)
                 RETURN(-EPROTO);
         }
 
-        switch (sec_hdr->sectype) {
-        case PTLRPC_SEC_TYPE_NONE:
-        case PTLRPC_SEC_TYPE_AUTH: {
+        switch (SEC_FLAVOR_SVC(sec_hdr->flavor)) {
+        case PTLRPCS_SVC_NONE:
+        case PTLRPCS_SVC_AUTH: {
                 LASSERT(cred->pc_ops->verify);
                 rc = cred->pc_ops->verify(cred, req);
                 LASSERT(rc || req->rq_repmsg || req->rq_ptlrpcs_restart);
                 break;
-        case PTLRPC_SEC_TYPE_PRIV:
+        case PTLRPCS_SVC_PRIV:
                 LASSERT(cred->pc_ops->unseal);
                 rc = cred->pc_ops->unseal(cred, req);
                 LASSERT(rc || req->rq_repmsg || req->rq_ptlrpcs_restart);
@@ -665,7 +782,8 @@ int ptlrpcs_cli_unwrap_reply(struct ptlrpc_request *req)
  * security APIs                                  *
  **************************************************/
 
-struct ptlrpc_sec * ptlrpcs_sec_create(ptlrpcs_flavor_t *flavor,
+struct ptlrpc_sec * ptlrpcs_sec_create(__u32 flavor,
+                                       unsigned long flags,
                                        struct obd_import *import,
                                        const char *pipe_dir,
                                        void *pipe_data)
@@ -676,7 +794,7 @@ struct ptlrpc_sec * ptlrpcs_sec_create(ptlrpcs_flavor_t *flavor,
 
         type = ptlrpcs_flavor2type(flavor);
         if (!type) {
-                CDEBUG(D_SEC, "invalid major flavor %u\n", flavor->flavor);
+                CERROR("invalid flavor 0x%x\n", flavor);
                 RETURN(NULL);
         }
 
@@ -685,7 +803,8 @@ struct ptlrpc_sec * ptlrpcs_sec_create(ptlrpcs_flavor_t *flavor,
                 spin_lock_init(&sec->ps_lock);
                 ptlrpcs_init_credcache(sec);
                 sec->ps_type = type;
-                sec->ps_flavor = *flavor;
+                sec->ps_flavor = flavor;
+                sec->ps_flags = flags;
                 sec->ps_import = class_import_get(import);
                 atomic_set(&sec->ps_refcount, 1);
                 atomic_set(&sec->ps_credcount, 0);
@@ -712,15 +831,22 @@ static void ptlrpcs_sec_destroy(struct ptlrpc_sec *sec)
 
 void ptlrpcs_sec_put(struct ptlrpc_sec *sec)
 {
+        int ncred;
+
         if (atomic_dec_and_test(&sec->ps_refcount)) {
-                ptlrpcs_flush_credcache(sec, 1, 1);
+                flush_credcache(sec, -1, -1, 1, 1);
+
+                /* this spinlock is protect against ptlrpcs_cred_destroy() */
+                spin_lock(&sec->ps_lock);
+                ncred = atomic_read(&sec->ps_credcount);
+                spin_unlock(&sec->ps_lock);
 
-                if (atomic_read(&sec->ps_credcount) == 0) {
+                if (ncred == 0) {
                         ptlrpcs_sec_destroy(sec);
                 } else {
-                        CWARN("sec %p(%s) is no usage while %d cred still "
+                        CWARN("%s %p is no usage while %d cred still "
                               "holded, destroy delayed\n",
-                               sec, sec->ps_type->pst_name,
+                               sec->ps_type->pst_name, sec,
                                atomic_read(&sec->ps_credcount));
                 }
         }
@@ -728,7 +854,7 @@ void ptlrpcs_sec_put(struct ptlrpc_sec *sec)
 
 void ptlrpcs_sec_invalidate_cache(struct ptlrpc_sec *sec)
 {
-        ptlrpcs_flush_credcache(sec, 0, 1);
+        flush_credcache(sec, -1, -1, 0, 1);
 }
 
 int sec_alloc_reqbuf(struct ptlrpc_sec *sec,
@@ -749,8 +875,7 @@ int sec_alloc_reqbuf(struct ptlrpc_sec *sec,
         }
 
         hdr = buf_to_sec_hdr(req->rq_reqbuf);
-        hdr->flavor = cpu_to_le32(sec->ps_flavor.flavor);
-        hdr->sectype = cpu_to_le32(sec->ps_sectype);
+        hdr->flavor = cpu_to_le32(req->rq_req_secflvr);
         hdr->msg_len = msgsize;
         /* security length will be filled later */
 
@@ -854,8 +979,9 @@ int ptlrpcs_cli_alloc_repbuf(struct ptlrpc_request *req, int msgsize)
                 RETURN(ops->alloc_repbuf(sec, req, msgsize));
 
         /* default allocation scheme */
-        msg_payload = sec->ps_sectype == PTLRPC_SEC_TYPE_PRIV ? 0 : msgsize;
-        sec_payload = size_round(ptlrpcs_est_rep_payload(sec, msgsize));
+        msg_payload = SEC_FLAVOR_SVC(req->rq_req_secflvr) == PTLRPCS_SVC_PRIV ?
+                      0 : msgsize;
+        sec_payload = size_round(ptlrpcs_est_rep_payload(req, msgsize));
 
         req->rq_repbuf_len = sizeof(struct ptlrpcs_wire_hdr) +
                              msg_payload + sec_payload;
@@ -898,7 +1024,8 @@ void ptlrpcs_cli_free_repbuf(struct ptlrpc_request *req)
 
 int ptlrpcs_import_get_sec(struct obd_import *imp)
 {
-        ptlrpcs_flavor_t flavor = {PTLRPC_SEC_NULL, 0};
+        __u32 flavor = PTLRPCS_FLVR_NULL;
+        unsigned long flags = 0;
         char *pipedir = NULL;
         ENTRY;
 
@@ -910,32 +1037,36 @@ int ptlrpcs_import_get_sec(struct obd_import *imp)
                 RETURN(0);
 
         /* find actual flavor for client obd. right now server side
-         * obd (reverse imp, etc) will simply use NULL.
-         */
-        if (!strcmp(imp->imp_obd->obd_type->typ_name, "mdc") ||
-            !strcmp(imp->imp_obd->obd_type->typ_name, "osc")) {
+         * obd (reverse imp, etc) will simply use NULL. */
+        if (!strcmp(imp->imp_obd->obd_type->typ_name, OBD_MDC_DEVICENAME) ||
+            !strcmp(imp->imp_obd->obd_type->typ_name, OBD_OSC_DEVICENAME)) {
                 struct client_obd *cli = &imp->imp_obd->u.cli;
 
-                if (cli->cl_sec_flavor == PTLRPC_SEC_GSS) {
-                        CWARN("select security gss/%s for %s(%s)\n",
-                               cli->cl_sec_subflavor == PTLRPC_SEC_GSS_KRB5I ?
-                               "krb5i" : "krb5p",
-                               imp->imp_obd->obd_type->typ_name,
-                               imp->imp_obd->obd_name);
-                        flavor.flavor = cli->cl_sec_flavor;
-                        flavor.subflavor = cli->cl_sec_subflavor;
-                        pipedir = imp->imp_obd->obd_name;
-                } else if (cli->cl_sec_flavor == PTLRPC_SEC_NULL) {
+                switch (SEC_FLAVOR_MAJOR(cli->cl_sec_flavor)) {
+                case PTLRPCS_FLVR_MAJOR_NULL:
                         CWARN("select security null for %s(%s)\n",
-                               imp->imp_obd->obd_type->typ_name,
-                               imp->imp_obd->obd_name);
-                } else {
-                        CWARN("unknown security flavor for mdc(%s), "
-                              "use 'null'\n", imp->imp_obd->obd_name);
+                              imp->imp_obd->obd_type->typ_name,
+                              imp->imp_obd->obd_name);
+                        break;
+                case PTLRPCS_FLVR_MAJOR_GSS:
+                        CWARN("select security %s for %s(%s)\n",
+                              ptlrpcs_flavor2name(cli->cl_sec_flavor),
+                              imp->imp_obd->obd_type->typ_name,
+                              imp->imp_obd->obd_name);
+                        flavor = cli->cl_sec_flavor;
+                        pipedir = imp->imp_obd->obd_name;
+                        break;
+                default:
+                        CWARN("unknown security flavor for %s(%s), "
+                              "use null\n",
+                              imp->imp_obd->obd_type->typ_name,
+                              imp->imp_obd->obd_name);
                 }
+
+                flags = cli->cl_sec_flags;
         }
 
-        imp->imp_sec = ptlrpcs_sec_create(&flavor, imp, pipedir, imp);
+        imp->imp_sec = ptlrpcs_sec_create(flavor, flags, imp, pipedir, imp);
         if (!imp->imp_sec)
                 RETURN(-EINVAL);
         else
@@ -952,6 +1083,16 @@ void ptlrpcs_import_drop_sec(struct obd_import *imp)
         EXIT;
 }
 
+void ptlrpcs_import_flush_current_creds(struct obd_import *imp)
+{
+        LASSERT(imp);
+
+        class_import_get(imp);
+        if (imp->imp_sec)
+                flush_credcache(imp->imp_sec, current->pag, current->uid, 1, 1);
+        class_import_put(imp);
+}
+
 int __init ptlrpc_sec_init(void)
 {
         int rc;
@@ -972,12 +1113,13 @@ int __init ptlrpc_sec_init(void)
         return 0;
 }
 
+#if defined __KERNEL__ && defined ENABLE_GSS
 static void __exit ptlrpc_sec_exit(void)
 {
         svcsec_null_exit();
         ptlrpcs_null_exit();
 }
-
+#endif
 
 EXPORT_SYMBOL(ptlrpcs_register);
 EXPORT_SYMBOL(ptlrpcs_unregister);
@@ -986,6 +1128,7 @@ EXPORT_SYMBOL(ptlrpcs_sec_put);
 EXPORT_SYMBOL(ptlrpcs_sec_invalidate_cache);
 EXPORT_SYMBOL(ptlrpcs_import_get_sec);
 EXPORT_SYMBOL(ptlrpcs_import_drop_sec);
+EXPORT_SYMBOL(ptlrpcs_import_flush_current_creds);
 EXPORT_SYMBOL(ptlrpcs_cred_lookup);
 EXPORT_SYMBOL(ptlrpcs_cred_put);
 EXPORT_SYMBOL(ptlrpcs_req_get_cred);
@@ -1013,6 +1156,9 @@ EXPORT_SYMBOL(svcsec_put);
 EXPORT_SYMBOL(svcsec_alloc_reply_state);
 EXPORT_SYMBOL(svcsec_free_reply_state);
 
+EXPORT_SYMBOL(ptlrpcs_name2flavor);
+EXPORT_SYMBOL(ptlrpcs_flavor2name);
+
 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
 MODULE_DESCRIPTION("Lustre Security Support");
 MODULE_LICENSE("GPL");