Whamcloud - gitweb
branch: HEAD
[fs/lustre-release.git] / lustre / ptlrpc / sec.c
index 2aab749..76b40c5 100644 (file)
@@ -30,6 +30,7 @@
 #include <libcfs/list.h>
 #else
 #include <linux/crypto.h>
+#include <linux/key.h>
 #endif
 
 #include <obd.h>
 
 #include "ptlrpc_internal.h"
 
-static void sptlrpc_sec_destroy(struct ptlrpc_sec *sec);
-static int sptlrpc_sec_destroy_ctx(struct ptlrpc_sec *sec,
-                                   struct ptlrpc_cli_ctx *ctx);
-static void sptlrpc_ctx_refresh(struct ptlrpc_cli_ctx *ctx);
-
 /***********************************************
  * policy registers                            *
  ***********************************************/
@@ -129,7 +125,9 @@ again:
 #ifdef CONFIG_KMOD
         /* if failure, try to load gss module, once */
         if (unlikely(policy == NULL) &&
-            number == SPTLRPC_POLICY_GSS && flag == 0) {
+            flag == 0 &&
+            (number == SPTLRPC_POLICY_GSS ||
+             number == SPTLRPC_POLICY_GSS_PIPEFS)) {
                 mutex_down(&load_mutex);
                 if (atomic_read(&loaded) == 0) {
                         if (request_module("ptlrpc_gss") != 0)
@@ -187,359 +185,18 @@ char *sptlrpc_flavor2name(ptlrpc_sec_flavor_t flavor)
 }
 EXPORT_SYMBOL(sptlrpc_flavor2name);
 
-/***********************************************
- * context helpers                             *
- * internal APIs                               *
- * cache management                            *
- ***********************************************/
-
-static inline
-unsigned long ctx_status(struct ptlrpc_cli_ctx *ctx)
-{
-        smp_mb();
-        return (ctx->cc_flags & PTLRPC_CTX_STATUS_MASK);
-}
-
-static inline
-int ctx_is_uptodate(struct ptlrpc_cli_ctx *ctx)
-{
-        return (ctx_status(ctx) == PTLRPC_CTX_UPTODATE);
-}
-
-static inline
-int ctx_is_refreshed(struct ptlrpc_cli_ctx *ctx)
-{
-        return (ctx_status(ctx) != 0);
-}
-
-static inline
-int ctx_is_dead(struct ptlrpc_cli_ctx *ctx)
-{
-        smp_mb();
-        return ((ctx->cc_flags & (PTLRPC_CTX_DEAD | PTLRPC_CTX_ERROR)) != 0);
-}
-
-static inline
-int ctx_is_eternal(struct ptlrpc_cli_ctx *ctx)
-{
-        smp_mb();
-        return ((ctx->cc_flags & PTLRPC_CTX_ETERNAL) != 0);
-}
-
-static
-int ctx_expire(struct ptlrpc_cli_ctx *ctx)
-{
-        LASSERT(atomic_read(&ctx->cc_refcount));
-
-        if (!test_and_set_bit(PTLRPC_CTX_DEAD_BIT, &ctx->cc_flags)) {
-                cfs_time_t now = cfs_time_current_sec();
-
-                smp_mb();
-                clear_bit(PTLRPC_CTX_UPTODATE_BIT, &ctx->cc_flags);
-
-                if (ctx->cc_expire && cfs_time_aftereq(now, ctx->cc_expire))
-                        CWARN("ctx %p(%u->%s): get expired (%lds exceeds)\n",
-                              ctx, ctx->cc_vcred.vc_uid,
-                              sec2target_str(ctx->cc_sec),
-                              cfs_time_sub(now, ctx->cc_expire));
-                else
-                        CWARN("ctx %p(%u->%s): force to die (%lds remains)\n",
-                              ctx, ctx->cc_vcred.vc_uid,
-                              sec2target_str(ctx->cc_sec),
-                              ctx->cc_expire == 0 ? 0 :
-                              cfs_time_sub(ctx->cc_expire, now));
-
-                return 1;
-        }
-        return 0;
-}
-
-static
-void ctx_enhash(struct ptlrpc_cli_ctx *ctx, struct hlist_head *hash)
-{
-        set_bit(PTLRPC_CTX_HASHED_BIT, &ctx->cc_flags);
-        atomic_inc(&ctx->cc_refcount);
-        hlist_add_head(&ctx->cc_hash, hash);
-}
-
-static
-void ctx_unhash(struct ptlrpc_cli_ctx *ctx, struct hlist_head *freelist)
-{
-        LASSERT_SPIN_LOCKED(&ctx->cc_sec->ps_lock);
-        LASSERT(atomic_read(&ctx->cc_refcount) > 0);
-        LASSERT(test_bit(PTLRPC_CTX_HASHED_BIT, &ctx->cc_flags));
-        LASSERT(!hlist_unhashed(&ctx->cc_hash));
-
-        clear_bit(PTLRPC_CTX_HASHED_BIT, &ctx->cc_flags);
-
-        if (atomic_dec_and_test(&ctx->cc_refcount)) {
-                __hlist_del(&ctx->cc_hash);
-                hlist_add_head(&ctx->cc_hash, freelist);
-        } else
-                hlist_del_init(&ctx->cc_hash);
-}
-
-/*
- * return 1 if the context is dead.
- */
-static
-int ctx_check_death(struct ptlrpc_cli_ctx *ctx, struct hlist_head *freelist)
-{
-        if (unlikely(ctx_is_dead(ctx)))
-                goto unhash;
-
-        /* expire is 0 means never expire. a newly created gss context
-         * which during upcall also has 0 expiration
-         */
-        smp_mb();
-        if (ctx->cc_expire == 0)
-                return 0;
-
-        /* check real expiration */
-        smp_mb();
-        if (cfs_time_after(ctx->cc_expire, cfs_time_current_sec()))
-                return 0;
-
-        ctx_expire(ctx);
-
-unhash:
-        if (freelist)
-                ctx_unhash(ctx, freelist);
-
-        return 1;
-}
-
-static inline
-int ctx_check_death_locked(struct ptlrpc_cli_ctx *ctx,
-                           struct hlist_head *freelist)
-{
-        LASSERT(ctx->cc_sec);
-        LASSERT(atomic_read(&ctx->cc_refcount) > 0);
-        LASSERT_SPIN_LOCKED(&ctx->cc_sec->ps_lock);
-        LASSERT(test_bit(PTLRPC_CTX_HASHED_BIT, &ctx->cc_flags));
-
-        return ctx_check_death(ctx, freelist);
-}
-
-static
-int ctx_check_uptodate(struct ptlrpc_cli_ctx *ctx)
-{
-        LASSERT(ctx->cc_sec);
-        LASSERT(atomic_read(&ctx->cc_refcount) > 0);
-
-        if (!ctx_check_death(ctx, NULL) && ctx_is_uptodate(ctx))
-                return 1;
-        return 0;
-}
-
-static inline
-int ctx_match(struct ptlrpc_cli_ctx *ctx, struct vfs_cred *vcred)
-{
-        /* a little bit optimization for null policy */
-        if (!ctx->cc_ops->match)
-                return 1;
-
-        return ctx->cc_ops->match(ctx, vcred);
-}
-
-static
-void ctx_list_destroy(struct hlist_head *head)
-{
-        struct ptlrpc_cli_ctx *ctx;
-
-        while (!hlist_empty(head)) {
-                ctx = hlist_entry(head->first, struct ptlrpc_cli_ctx, cc_hash);
-
-                LASSERT(atomic_read(&ctx->cc_refcount) == 0);
-                LASSERT(test_bit(PTLRPC_CTX_HASHED_BIT, &ctx->cc_flags) == 0);
-
-                hlist_del_init(&ctx->cc_hash);
-                sptlrpc_sec_destroy_ctx(ctx->cc_sec, ctx);
-        }
-}
-
-static
-void ctx_cache_gc(struct ptlrpc_sec *sec, struct hlist_head *freelist)
-{
-        struct ptlrpc_cli_ctx *ctx;
-        struct hlist_node *pos, *next;
-        int i;
-        ENTRY;
-
-        CDEBUG(D_SEC, "do gc on sec %s@%p\n", sec->ps_policy->sp_name, sec);
-
-        for (i = 0; i < sec->ps_ccache_size; i++) {
-                hlist_for_each_entry_safe(ctx, pos, next,
-                                          &sec->ps_ccache[i], cc_hash)
-                        ctx_check_death_locked(ctx, freelist);
-        }
-
-        sec->ps_gc_next = cfs_time_current_sec() + sec->ps_gc_interval;
-        EXIT;
-}
-
-/*
- * @uid: which user. "-1" means flush all.
- * @grace: mark context DEAD, allow graceful destroy like notify
- *         server side, etc.
- * @force: also flush busy entries.
- *
- * return the number of busy context encountered.
- *
- * In any cases, never touch "eternal" contexts.
- */
-static
-int ctx_cache_flush(struct ptlrpc_sec *sec, uid_t uid, int grace, int force)
-{
-        struct ptlrpc_cli_ctx *ctx;
-        struct hlist_node *pos, *next;
-        HLIST_HEAD(freelist);
-        int i, busy = 0;
-        ENTRY;
-
-        might_sleep_if(grace);
-
-        spin_lock(&sec->ps_lock);
-        for (i = 0; i < sec->ps_ccache_size; i++) {
-                hlist_for_each_entry_safe(ctx, pos, next,
-                                          &sec->ps_ccache[i], cc_hash) {
-                        LASSERT(atomic_read(&ctx->cc_refcount) > 0);
-
-                        if (ctx_is_eternal(ctx))
-                                continue;
-                        if (uid != -1 && uid != ctx->cc_vcred.vc_uid)
-                                continue;
-
-                        if (atomic_read(&ctx->cc_refcount) > 1) {
-                                busy++;
-                                if (!force)
-                                        continue;
-
-                                CWARN("flush busy(%d) ctx %p(%u->%s) by force, "
-                                      "grace %d\n",
-                                      atomic_read(&ctx->cc_refcount),
-                                      ctx, ctx->cc_vcred.vc_uid,
-                                      sec2target_str(ctx->cc_sec), grace);
-                        }
-                        ctx_unhash(ctx, &freelist);
-
-                        set_bit(PTLRPC_CTX_DEAD_BIT, &ctx->cc_flags);
-                        if (!grace)
-                                clear_bit(PTLRPC_CTX_UPTODATE_BIT,
-                                          &ctx->cc_flags);
-                }
-        }
-        spin_unlock(&sec->ps_lock);
-
-        ctx_list_destroy(&freelist);
-        RETURN(busy);
-}
-
-static inline
-unsigned int ctx_hash_index(struct ptlrpc_sec *sec, __u64 key)
-{
-        return (unsigned int) (key & (sec->ps_ccache_size - 1));
-}
+/**************************************************
+ * client context APIs                            *
+ **************************************************/
 
-/*
- * return matched context. If it's a newly created one, we also give the
- * first push to refresh. return NULL if error happens.
- */
 static
-struct ptlrpc_cli_ctx * ctx_cache_lookup(struct ptlrpc_sec *sec,
-                                         struct vfs_cred *vcred,
-                                         int create, int remove_dead)
-{
-        struct ptlrpc_cli_ctx *ctx = NULL, *new = NULL;
-        struct hlist_head *hash_head;
-        struct hlist_node *pos, *next;
-        HLIST_HEAD(freelist);
-        unsigned int hash, gc = 0, found = 0;
-        ENTRY;
-
-        might_sleep();
-
-        hash = ctx_hash_index(sec, (__u64) vcred->vc_uid);
-        LASSERT(hash < sec->ps_ccache_size);
-        hash_head = &sec->ps_ccache[hash];
-
-retry:
-        spin_lock(&sec->ps_lock);
-
-        /* gc_next == 0 means never do gc */
-        if (remove_dead && sec->ps_gc_next &&
-            cfs_time_after(cfs_time_current_sec(), sec->ps_gc_next)) {
-                ctx_cache_gc(sec, &freelist);
-                gc = 1;
-        }
-
-        hlist_for_each_entry_safe(ctx, pos, next, hash_head, cc_hash) {
-                if (gc == 0 &&
-                    ctx_check_death_locked(ctx, remove_dead ? &freelist : NULL))
-                        continue;
-
-                if (ctx_match(ctx, vcred)) {
-                        found = 1;
-                        break;
-                }
-        }
-
-        if (found) {
-                if (new && new != ctx) {
-                        /* lost the race, just free it */
-                        hlist_add_head(&new->cc_hash, &freelist);
-                        new = NULL;
-                }
-
-                /* hot node, move to head */
-                if (hash_head->first != &ctx->cc_hash) {
-                        __hlist_del(&ctx->cc_hash);
-                        hlist_add_head(&ctx->cc_hash, hash_head);
-                }
-        } else {
-                /* don't allocate for reverse sec */
-                if (sec->ps_flags & PTLRPC_SEC_FL_REVERSE) {
-                        spin_unlock(&sec->ps_lock);
-                        RETURN(NULL);
-                }
-
-                if (new) {
-                        ctx_enhash(new, hash_head);
-                        ctx = new;
-                } else if (create) {
-                        spin_unlock(&sec->ps_lock);
-                        new = sec->ps_policy->sp_cops->create_ctx(sec, vcred);
-                        if (new) {
-                                atomic_inc(&sec->ps_busy);
-                                goto retry;
-                        }
-                } else
-                        ctx = NULL;
-        }
-
-        /* hold a ref */
-        if (ctx)
-                atomic_inc(&ctx->cc_refcount);
-
-        spin_unlock(&sec->ps_lock);
-
-        /* the allocator of the context must give the first push to refresh */
-        if (new) {
-                LASSERT(new == ctx);
-                sptlrpc_ctx_refresh(new);
-        }
-
-        ctx_list_destroy(&freelist);
-        RETURN(ctx);
-}
-
-static inline
 struct ptlrpc_cli_ctx *get_my_ctx(struct ptlrpc_sec *sec)
 {
         struct vfs_cred vcred;
         int create = 1, remove_dead = 1;
 
         LASSERT(sec);
+        LASSERT(sec->ps_policy->sp_cops->lookup_ctx);
 
         if (sec->ps_flags & (PTLRPC_SEC_FL_REVERSE | PTLRPC_SEC_FL_ROOTONLY)) {
                 vcred.vc_uid = 0;
@@ -553,34 +210,19 @@ struct ptlrpc_cli_ctx *get_my_ctx(struct ptlrpc_sec *sec)
                 vcred.vc_gid = cfs_current()->gid;
         }
 
-        if (sec->ps_policy->sp_cops->lookup_ctx)
-                return sec->ps_policy->sp_cops->lookup_ctx(sec, &vcred);
-        else
-                return ctx_cache_lookup(sec, &vcred, create, remove_dead);
-}
-
-/**************************************************
- * client context APIs                            *
- **************************************************/
-
-static
-void sptlrpc_ctx_refresh(struct ptlrpc_cli_ctx *ctx)
-{
-        LASSERT(atomic_read(&ctx->cc_refcount) > 0);
-
-        if (!ctx_is_refreshed(ctx) && ctx->cc_ops->refresh)
-                ctx->cc_ops->refresh(ctx);
+        return sec->ps_policy->sp_cops->lookup_ctx(sec, &vcred,
+                                                   create, remove_dead);
 }
 
-struct ptlrpc_cli_ctx *sptlrpc_ctx_get(struct ptlrpc_cli_ctx *ctx)
+struct ptlrpc_cli_ctx *sptlrpc_cli_ctx_get(struct ptlrpc_cli_ctx *ctx)
 {
         LASSERT(atomic_read(&ctx->cc_refcount) > 0);
         atomic_inc(&ctx->cc_refcount);
         return ctx;
 }
-EXPORT_SYMBOL(sptlrpc_ctx_get);
+EXPORT_SYMBOL(sptlrpc_cli_ctx_get);
 
-void sptlrpc_ctx_put(struct ptlrpc_cli_ctx *ctx, int sync)
+void sptlrpc_cli_ctx_put(struct ptlrpc_cli_ctx *ctx, int sync)
 {
         struct ptlrpc_sec *sec = ctx->cc_sec;
 
@@ -590,85 +232,43 @@ void sptlrpc_ctx_put(struct ptlrpc_cli_ctx *ctx, int sync)
         if (!atomic_dec_and_test(&ctx->cc_refcount))
                 return;
 
-        LASSERT(test_bit(PTLRPC_CTX_HASHED_BIT, &ctx->cc_flags) == 0);
-        LASSERT(hlist_unhashed(&ctx->cc_hash));
-
-        /* if required async, we must clear the UPTODATE bit to prevent extra
-         * rpcs during destroy procedure.
-         */
-        if (!sync)
-                clear_bit(PTLRPC_CTX_UPTODATE_BIT, &ctx->cc_flags);
-
-        /* destroy this context */
-        if (!sptlrpc_sec_destroy_ctx(sec, ctx))
-                return;
-
-        CWARN("%s@%p: put last ctx, also destroy the sec\n",
-              sec->ps_policy->sp_name, sec);
-
-        sptlrpc_sec_destroy(sec);
+        sec->ps_policy->sp_cops->release_ctx(sec, ctx, sync);
 }
-EXPORT_SYMBOL(sptlrpc_ctx_put);
+EXPORT_SYMBOL(sptlrpc_cli_ctx_put);
 
 /*
- * mark a ctx as DEAD, and pull it out from hash table.
- *
- * NOTE: the caller must hold at least 1 ref on the ctx.
+ * expire the context immediately.
+ * the caller must hold at least 1 ref on the ctx.
  */
-void sptlrpc_ctx_expire(struct ptlrpc_cli_ctx *ctx)
+void sptlrpc_cli_ctx_expire(struct ptlrpc_cli_ctx *ctx)
 {
-        LASSERT(ctx->cc_sec);
-        LASSERT(atomic_read(&ctx->cc_refcount) > 0);
-
-        ctx_expire(ctx);
-
-        spin_lock(&ctx->cc_sec->ps_lock);
-
-        if (test_and_clear_bit(PTLRPC_CTX_HASHED_BIT, &ctx->cc_flags)) {
-                LASSERT(!hlist_unhashed(&ctx->cc_hash));
-                LASSERT(atomic_read(&ctx->cc_refcount) > 1);
-
-                hlist_del_init(&ctx->cc_hash);
-                if (atomic_dec_and_test(&ctx->cc_refcount))
-                        LBUG();
-        }
-
-        spin_unlock(&ctx->cc_sec->ps_lock);
+        LASSERT(ctx->cc_ops->die);
+        ctx->cc_ops->die(ctx, 0);
 }
-EXPORT_SYMBOL(sptlrpc_ctx_expire);
+EXPORT_SYMBOL(sptlrpc_cli_ctx_expire);
 
-void sptlrpc_ctx_replace(struct ptlrpc_sec *sec, struct ptlrpc_cli_ctx *new)
+void sptlrpc_cli_ctx_wakeup(struct ptlrpc_cli_ctx *ctx)
 {
-        struct ptlrpc_cli_ctx *ctx;
-        struct hlist_node *pos, *next;
-        HLIST_HEAD(freelist);
-        unsigned int hash;
-        ENTRY;
-
-        hash = ctx_hash_index(sec, (__u64) new->cc_vcred.vc_uid);
-        LASSERT(hash < sec->ps_ccache_size);
-
-        spin_lock(&sec->ps_lock);
-
-        hlist_for_each_entry_safe(ctx, pos, next,
-                                  &sec->ps_ccache[hash], cc_hash) {
-                if (!ctx_match(ctx, &new->cc_vcred))
-                        continue;
+        struct ptlrpc_request *req, *next;
 
-                ctx_expire(ctx);
-                ctx_unhash(ctx, &freelist);
-                break;
+        spin_lock(&ctx->cc_lock);
+        list_for_each_entry_safe(req, next, &ctx->cc_req_list, rq_ctx_chain) {
+                list_del_init(&req->rq_ctx_chain);
+                ptlrpc_wake_client_req(req);
         }
+        spin_unlock(&ctx->cc_lock);
+}
+EXPORT_SYMBOL(sptlrpc_cli_ctx_wakeup);
 
-        ctx_enhash(new, &sec->ps_ccache[hash]);
-        atomic_inc(&sec->ps_busy);
+int sptlrpc_cli_ctx_display(struct ptlrpc_cli_ctx *ctx, char *buf, int bufsize)
+{
+        LASSERT(ctx->cc_ops);
 
-        spin_unlock(&sec->ps_lock);
+        if (ctx->cc_ops->display == NULL)
+                return 0;
 
-        ctx_list_destroy(&freelist);
-        EXIT;
+        return ctx->cc_ops->display(ctx, buf, bufsize);
 }
-EXPORT_SYMBOL(sptlrpc_ctx_replace);
 
 int sptlrpc_req_get_ctx(struct ptlrpc_request *req)
 {
@@ -687,36 +287,13 @@ int sptlrpc_req_get_ctx(struct ptlrpc_request *req)
         req->rq_cli_ctx = get_my_ctx(imp->imp_sec);
 
         if (!req->rq_cli_ctx) {
-                CERROR("req %p: fail to get context from cache\n", req);
+                CERROR("req %p: fail to get context\n", req);
                 RETURN(-ENOMEM);
         }
 
         RETURN(0);
 }
 
-void sptlrpc_ctx_wakeup(struct ptlrpc_cli_ctx *ctx)
-{
-        struct ptlrpc_request *req, *next;
-
-        spin_lock(&ctx->cc_lock);
-        list_for_each_entry_safe(req, next, &ctx->cc_req_list, rq_ctx_chain) {
-                list_del_init(&req->rq_ctx_chain);
-                ptlrpc_wake_client_req(req);
-        }
-        spin_unlock(&ctx->cc_lock);
-}
-EXPORT_SYMBOL(sptlrpc_ctx_wakeup);
-
-int sptlrpc_ctx_display(struct ptlrpc_cli_ctx *ctx, char *buf, int bufsize)
-{
-        LASSERT(ctx->cc_ops);
-
-        if (ctx->cc_ops->display == NULL)
-                return 0;
-
-        return ctx->cc_ops->display(ctx, buf, bufsize);
-}
-
 void sptlrpc_req_put_ctx(struct ptlrpc_request *req)
 {
         ENTRY;
@@ -734,7 +311,7 @@ void sptlrpc_req_put_ctx(struct ptlrpc_request *req)
         }
 
         /* this could be called with spinlock hold, use async mode */
-        sptlrpc_ctx_put(req->rq_cli_ctx, 0);
+        sptlrpc_cli_ctx_put(req->rq_cli_ctx, 0);
         req->rq_cli_ctx = NULL;
         EXIT;
 }
@@ -757,13 +334,12 @@ int sptlrpc_req_replace_dead_ctx(struct ptlrpc_request *req)
         list_del_init(&req->rq_ctx_chain);
         spin_unlock(&ctx->cc_lock);
 
-        sptlrpc_ctx_get(ctx);
+        sptlrpc_cli_ctx_get(ctx);
         sptlrpc_req_put_ctx(req);
         rc = sptlrpc_req_get_ctx(req);
         if (!rc) {
                 LASSERT(req->rq_cli_ctx);
-                LASSERT(req->rq_cli_ctx != ctx);
-                sptlrpc_ctx_put(ctx, 1);
+                sptlrpc_cli_ctx_put(ctx, 1);
         } else {
                 LASSERT(!req->rq_cli_ctx);
                 req->rq_cli_ctx = ctx;
@@ -775,8 +351,7 @@ EXPORT_SYMBOL(sptlrpc_req_replace_dead_ctx);
 static
 int ctx_check_refresh(struct ptlrpc_cli_ctx *ctx)
 {
-        smp_mb();
-        if (ctx_is_refreshed(ctx))
+        if (cli_ctx_is_refreshed(ctx))
                 return 1;
         return 0;
 }
@@ -798,7 +373,7 @@ int ctx_refresh_timeout(void *data)
          * later than the context refresh expire time.
          */
         if (rc == 0)
-                ctx_expire(req->rq_cli_ctx);
+                req->rq_cli_ctx->cc_ops->die(req->rq_cli_ctx, 0);
         return rc;
 }
 
@@ -808,10 +383,19 @@ void ctx_refresh_interrupt(void *data)
         /* do nothing */
 }
 
+static
+void req_off_ctx_list(struct ptlrpc_request *req, struct ptlrpc_cli_ctx *ctx)
+{
+        spin_lock(&ctx->cc_lock);
+        if (!list_empty(&req->rq_ctx_chain))
+                list_del_init(&req->rq_ctx_chain);
+        spin_unlock(&ctx->cc_lock);
+}
+
 /*
  * the status of context could be subject to be changed by other threads at any
  * time. we allow this race. but once we return with 0, the caller will
- * suppose it's uptodated and keep using it until the affected rpc is done.
+ * suppose it's uptodated and keep using it until the owning rpc is done.
  *
  * @timeout:
  *    < 0  - don't wait
@@ -829,28 +413,24 @@ int sptlrpc_req_refresh_ctx(struct ptlrpc_request *req, long timeout)
 
         LASSERT(ctx);
 
-        /* special ctxs */
-        if (ctx_is_eternal(ctx) || req->rq_ctx_init || req->rq_ctx_fini)
-                RETURN(0);
-
-        /* reverse ctxs, don't refresh */
+        /* skip reverse ctxs */
         if (ctx->cc_sec->ps_flags & PTLRPC_SEC_FL_REVERSE)
                 RETURN(0);
 
-        spin_lock(&ctx->cc_lock);
-again:
-        if (ctx_check_uptodate(ctx)) {
-                if (!list_empty(&req->rq_ctx_chain))
-                        list_del_init(&req->rq_ctx_chain);
-                spin_unlock(&ctx->cc_lock);
+        /* skip special ctxs */
+        if (cli_ctx_is_eternal(ctx) || req->rq_ctx_init || req->rq_ctx_fini)
                 RETURN(0);
+
+        if (test_bit(PTLRPC_CTX_NEW_BIT, &ctx->cc_flags)) {
+                LASSERT(ctx->cc_ops->refresh);
+                ctx->cc_ops->refresh(ctx);
         }
+        LASSERT(test_bit(PTLRPC_CTX_NEW_BIT, &ctx->cc_flags) == 0);
 
-        if (test_bit(PTLRPC_CTX_ERROR_BIT, &ctx->cc_flags)) {
+again:
+        if (unlikely(test_bit(PTLRPC_CTX_ERROR_BIT, &ctx->cc_flags))) {
                 req->rq_err = 1;
-                if (!list_empty(&req->rq_ctx_chain))
-                        list_del_init(&req->rq_ctx_chain);
-                spin_unlock(&ctx->cc_lock);
+                req_off_ctx_list(req, ctx);
                 RETURN(-EPERM);
         }
 
@@ -879,19 +459,15 @@ again:
          *     never really send request with old context before.
          */
         if (test_bit(PTLRPC_CTX_UPTODATE_BIT, &ctx->cc_flags) &&
-            req->rq_reqmsg &&
+            unlikely(req->rq_reqmsg) &&
             lustre_msg_get_flags(req->rq_reqmsg) & MSG_RESENT) {
-                if (!list_empty(&req->rq_ctx_chain))
-                        list_del_init(&req->rq_ctx_chain);
-                spin_unlock(&ctx->cc_lock);
+                req_off_ctx_list(req, ctx);
                 RETURN(0);
         }
 
         if (unlikely(test_bit(PTLRPC_CTX_DEAD_BIT, &ctx->cc_flags))) {
-                spin_unlock(&ctx->cc_lock);
-
                 /* don't have to, but we don't want to release it too soon */
-                sptlrpc_ctx_get(ctx);
+                sptlrpc_cli_ctx_get(ctx);
 
                 rc = sptlrpc_req_replace_dead_ctx(req);
                 if (rc) {
@@ -900,29 +476,40 @@ again:
                                 req, ctx);
                         req->rq_err = 1;
                         LASSERT(list_empty(&req->rq_ctx_chain));
-                        sptlrpc_ctx_put(ctx, 1);
+                        sptlrpc_cli_ctx_put(ctx, 1);
                         RETURN(-ENOMEM);
                 }
 
-                LASSERT(ctx != req->rq_cli_ctx);
+                /* FIXME
+                 * if ctx didn't really switch, might be cpu tight or sth,
+                 * we just relax a little bit.
+                 */
+                if (ctx == req->rq_cli_ctx)
+                        schedule();
+
                 CWARN("req %p: replace dead ctx %p(%u->%s) => %p\n",
                       req, ctx, ctx->cc_vcred.vc_uid,
                       sec2target_str(ctx->cc_sec), req->rq_cli_ctx);
 
-                sptlrpc_ctx_put(ctx, 1);
+                sptlrpc_cli_ctx_put(ctx, 1);
                 ctx = req->rq_cli_ctx;
                 LASSERT(list_empty(&req->rq_ctx_chain));
 
-                spin_lock(&ctx->cc_lock);
                 goto again;
         }
 
+        LASSERT(ctx->cc_ops->validate);
+        if (ctx->cc_ops->validate(ctx) == 0) {
+                req_off_ctx_list(req, ctx);
+                RETURN(0);
+        }
+
         /* Now we're sure this context is during upcall, add myself into
          * waiting list
          */
+        spin_lock(&ctx->cc_lock);
         if (list_empty(&req->rq_ctx_chain))
                 list_add(&req->rq_ctx_chain, &ctx->cc_req_list);
-
         spin_unlock(&ctx->cc_lock);
 
         if (timeout < 0) {
@@ -942,7 +529,6 @@ again:
                                ctx_refresh_timeout, ctx_refresh_interrupt, req);
         rc = l_wait_event(req->rq_reply_waitq, ctx_check_refresh(ctx), &lwi);
 
-        spin_lock(&ctx->cc_lock);
         /* five cases we are here:
          * 1. successfully refreshed;
          * 2. someone else mark this ctx dead by force;
@@ -950,10 +536,9 @@ again:
          * 4. timedout, and we don't want recover from the failure;
          * 5. timedout, and waked up upon recovery finished;
          */
-        if (!ctx_is_refreshed(ctx)) {
+        if (!cli_ctx_is_refreshed(ctx)) {
                 /* timed out or interruptted */
-                list_del_init(&req->rq_ctx_chain);
-                spin_unlock(&ctx->cc_lock);
+                req_off_ctx_list(req, ctx);
 
                 LASSERT(rc != 0);
                 RETURN(rc);
@@ -1053,8 +638,9 @@ int sptlrpc_import_check_ctx(struct obd_import *imp)
         if (!ctx)
                 RETURN(1);
 
-        if (ctx_is_eternal(ctx)) {
-                sptlrpc_ctx_put(ctx, 1);
+        if (cli_ctx_is_eternal(ctx) ||
+            ctx->cc_ops->validate(ctx) == 0) {
+                sptlrpc_cli_ctx_put(ctx, 1);
                 RETURN(0);
         }
 
@@ -1071,7 +657,7 @@ int sptlrpc_import_check_ctx(struct obd_import *imp)
 
         rc = sptlrpc_req_refresh_ctx(req, 0);
         LASSERT(list_empty(&req->rq_ctx_chain));
-        sptlrpc_ctx_put(req->rq_cli_ctx, 1);
+        sptlrpc_cli_ctx_put(req->rq_cli_ctx, 1);
         OBD_FREE_PTR(req);
 
         RETURN(rc);
@@ -1191,18 +777,51 @@ int sptlrpc_cli_unwrap_reply(struct ptlrpc_request *req)
 }
 
 /**************************************************
- * security APIs                                  *
+ * client side high-level security APIs           *
  **************************************************/
 
+static
+void sec_cop_destroy_sec(struct ptlrpc_sec *sec)
+{
+        struct ptlrpc_sec_policy *policy = sec->ps_policy;
+
+        LASSERT(atomic_read(&sec->ps_refcount) == 0);
+        LASSERT(atomic_read(&sec->ps_busy) == 0);
+        LASSERT(policy->sp_cops->destroy_sec);
+
+        CWARN("%s@%p: being destroied\n", sec->ps_policy->sp_name, sec);
+
+        policy->sp_cops->destroy_sec(sec);
+        sptlrpc_policy_put(policy);
+}
+
+static
+int sec_cop_flush_ctx_cache(struct ptlrpc_sec *sec, uid_t uid,
+                            int grace, int force)
+{
+        struct ptlrpc_sec_policy *policy = sec->ps_policy;
+
+        LASSERT(policy->sp_cops);
+        LASSERT(policy->sp_cops->flush_ctx_cache);
+
+        return policy->sp_cops->flush_ctx_cache(sec, uid, grace, force);
+}
+
+void sptlrpc_sec_destroy(struct ptlrpc_sec *sec)
+{
+        sec_cop_destroy_sec(sec);
+}
+EXPORT_SYMBOL(sptlrpc_sec_destroy);
+
 /*
  * let policy module to determine whether take refrence of
  * import or not.
  */
 static
-struct ptlrpc_sec * sptlrpc_sec_create(struct obd_import *imp,
-                                       struct ptlrpc_svc_ctx *ctx,
-                                       __u32 flavor,
-                                       unsigned long flags)
+struct ptlrpc_sec * import_create_sec(struct obd_import *imp,
+                                      struct ptlrpc_svc_ctx *ctx,
+                                      __u32 flavor,
+                                      unsigned long flags)
 {
         struct ptlrpc_sec_policy *policy;
         struct ptlrpc_sec *sec;
@@ -1243,69 +862,93 @@ struct ptlrpc_sec * sptlrpc_sec_create(struct obd_import *imp,
                  * balanced in sptlrpc_set_put()
                  */
                 atomic_inc(&sec->ps_busy);
+
+                if (sec->ps_gc_interval && policy->sp_cops->gc_ctx)
+                        sptlrpc_gc_add_sec(sec);
         } else
                 sptlrpc_policy_put(policy);
 
         RETURN(sec);
 }
 
-static
-void sptlrpc_sec_destroy(struct ptlrpc_sec *sec)
+int sptlrpc_import_get_sec(struct obd_import *imp,
+                           struct ptlrpc_svc_ctx *ctx,
+                           __u32 flavor,
+                           unsigned long flags)
 {
-        struct ptlrpc_sec_policy *policy = sec->ps_policy;
+        might_sleep();
 
-        LASSERT(policy);
-        LASSERT(atomic_read(&sec->ps_refcount) == 0);
-        LASSERT(atomic_read(&sec->ps_busy) == 0);
-        LASSERT(policy->sp_cops->destroy_sec);
+        /* old sec might be still there in reconnecting */
+        if (imp->imp_sec)
+                return 0;
 
-        policy->sp_cops->destroy_sec(sec);
-        sptlrpc_policy_put(policy);
+        imp->imp_sec = import_create_sec(imp, ctx, flavor, flags);
+        if (!imp->imp_sec)
+                return -EINVAL;
+
+        return 0;
 }
 
-static
-void sptlrpc_sec_put(struct ptlrpc_sec *sec)
+void sptlrpc_import_put_sec(struct obd_import *imp)
 {
-        struct ptlrpc_sec_policy *policy = sec->ps_policy;
+        struct ptlrpc_sec        *sec;
+        struct ptlrpc_sec_policy *policy;
+
+        might_sleep();
+
+        if (imp->imp_sec == NULL)
+                return;
+
+        sec = imp->imp_sec;
+        policy = sec->ps_policy;
 
         if (!atomic_dec_and_test(&sec->ps_refcount)) {
                 sptlrpc_policy_put(policy);
-                return;
+                goto out;
         }
 
-        ctx_cache_flush(sec, -1, 1, 1);
+        sec_cop_flush_ctx_cache(sec, -1, 1, 1);
+        sptlrpc_gc_del_sec(sec);
 
         if (atomic_dec_and_test(&sec->ps_busy))
-                sptlrpc_sec_destroy(sec);
-        else
+                sec_cop_destroy_sec(sec);
+        else {
                 CWARN("delay to destroy %s@%p: busy contexts\n",
                       policy->sp_name, sec);
+        }
+
+out:
+        imp->imp_sec = NULL;
 }
 
-/*
- * return 1 means we should also destroy the sec structure.
- * normally return 0
- */
-static
-int sptlrpc_sec_destroy_ctx(struct ptlrpc_sec *sec,
-                            struct ptlrpc_cli_ctx *ctx)
+void sptlrpc_import_flush_root_ctx(struct obd_import *imp)
+{
+        if (imp == NULL || imp->imp_sec == NULL)
+                return;
+
+        /* it's important to use grace mode, see explain in
+         * sptlrpc_req_refresh_ctx()
+         */
+        sec_cop_flush_ctx_cache(imp->imp_sec, 0, 1, 1);
+}
+
+void sptlrpc_import_flush_my_ctx(struct obd_import *imp)
 {
-        LASSERT(sec == ctx->cc_sec);
-        LASSERT(atomic_read(&sec->ps_busy));
-        LASSERT(atomic_read(&ctx->cc_refcount) == 0);
-        LASSERT(hlist_unhashed(&ctx->cc_hash));
-        LASSERT(list_empty(&ctx->cc_req_list));
-        LASSERT(sec->ps_policy->sp_cops->destroy_ctx);
+        if (imp == NULL || imp->imp_sec == NULL)
+                return;
 
-        sec->ps_policy->sp_cops->destroy_ctx(sec, ctx);
+        sec_cop_flush_ctx_cache(imp->imp_sec, cfs_current()->uid, 1, 1);
+}
+EXPORT_SYMBOL(sptlrpc_import_flush_my_ctx);
 
-        if (atomic_dec_and_test(&sec->ps_busy)) {
-                LASSERT(atomic_read(&sec->ps_refcount) == 0);
-                return 1;
-        }
+void sptlrpc_import_flush_all_ctx(struct obd_import *imp)
+{
+        if (imp == NULL || imp->imp_sec == NULL)
+                return;
 
-        return 0;
+        sec_cop_flush_ctx_cache(imp->imp_sec, -1, 0, 1);
 }
+EXPORT_SYMBOL(sptlrpc_import_flush_all_ctx);
 
 /*
  * when complete successfully, req->rq_reqmsg should point to the
@@ -1460,66 +1103,6 @@ void sptlrpc_cli_free_repbuf(struct ptlrpc_request *req)
         EXIT;
 }
 
-int sptlrpc_import_get_sec(struct obd_import *imp,
-                           struct ptlrpc_svc_ctx *ctx,
-                           __u32 flavor,
-                           unsigned long flags)
-{
-        struct obd_device *obd = imp->imp_obd;
-        ENTRY;
-
-        LASSERT(obd);
-        LASSERT(obd->obd_type);
-
-        /* old sec might be still there in reconnecting */
-        if (imp->imp_sec)
-                RETURN(0);
-
-        imp->imp_sec = sptlrpc_sec_create(imp, ctx, flavor, flags);
-        if (!imp->imp_sec)
-                RETURN(-EINVAL);
-
-        RETURN(0);
-}
-
-void sptlrpc_import_put_sec(struct obd_import *imp)
-{
-        if (imp->imp_sec == NULL)
-                return;
-
-        sptlrpc_sec_put(imp->imp_sec);
-        imp->imp_sec = NULL;
-}
-
-void sptlrpc_import_flush_root_ctx(struct obd_import *imp)
-{
-        if (imp == NULL || imp->imp_sec == NULL)
-                return;
-
-        /* use 'grace' mode, it's crutial see explain in
-         * sptlrpc_req_refresh_ctx()
-         */
-        ctx_cache_flush(imp->imp_sec, 0, 1, 1);
-}
-
-void sptlrpc_import_flush_my_ctx(struct obd_import *imp)
-{
-        if (imp == NULL || imp->imp_sec == NULL)
-                return;
-
-        ctx_cache_flush(imp->imp_sec, cfs_current()->uid, 1, 1);
-}
-EXPORT_SYMBOL(sptlrpc_import_flush_my_ctx);
-
-void sptlrpc_import_flush_all_ctx(struct obd_import *imp)
-{
-        if (imp == NULL || imp->imp_sec == NULL)
-                return;
-
-        ctx_cache_flush(imp->imp_sec, -1, 0, 1);
-}
-EXPORT_SYMBOL(sptlrpc_import_flush_all_ctx);
-
 int sptlrpc_cli_install_rvs_ctx(struct obd_import *imp,
                                 struct ptlrpc_cli_ctx *ctx)
 {
@@ -2167,14 +1750,18 @@ EXPORT_SYMBOL(sec2target_str);
  * initialize/finalize                  *
  ****************************************/
 
-int sptlrpc_init(void)
+int __init sptlrpc_init(void)
 {
         int rc;
 
-        rc = sptlrpc_enc_pool_init();
+        rc = sptlrpc_gc_start_thread();
         if (rc)
                 goto out;
 
+        rc = sptlrpc_enc_pool_init();
+        if (rc)
+                goto out_gc;
+
         rc = sptlrpc_null_init();
         if (rc)
                 goto out_pool;
@@ -2195,14 +1782,17 @@ out_null:
         sptlrpc_null_fini();
 out_pool:
         sptlrpc_enc_pool_fini();
+out_gc:
+        sptlrpc_gc_stop_thread();
 out:
         return rc;
 }
 
-void sptlrpc_fini(void)
+void __exit sptlrpc_fini(void)
 {
         sptlrpc_lproc_fini();
         sptlrpc_plain_fini();
         sptlrpc_null_fini();
         sptlrpc_enc_pool_fini();
+        sptlrpc_gc_stop_thread();
 }