Whamcloud - gitweb
branch: HEAD
[fs/lustre-release.git] / lustre / ptlrpc / gss / gss_keyring.c
index 305d4c5..5c6722c 100644 (file)
@@ -144,7 +144,7 @@ void ctx_start_timer_kr(struct ptlrpc_cli_ctx *ctx, long timeout)
 
         LASSERT(timer);
 
-        CWARN("ctx %p: start timer %lds\n", ctx, timeout);
+        CDEBUG(D_SEC, "ctx %p: start timer %lds\n", ctx, timeout);
         timeout = timeout * HZ + cfs_time_current();
 
         init_timer(timer);
@@ -155,16 +155,20 @@ void ctx_start_timer_kr(struct ptlrpc_cli_ctx *ctx, long timeout)
         add_timer(timer);
 }
 
+/*
+ * caller should make sure no race with other threads
+ */
 static
 void ctx_clear_timer_kr(struct ptlrpc_cli_ctx *ctx)
 {
         struct gss_cli_ctx_keyring *gctx_kr = ctx2gctx_keyring(ctx);
         struct timer_list          *timer = gctx_kr->gck_timer;
 
-        CWARN("ctx %p, key %p\n", ctx, gctx_kr->gck_key);
         if (timer == NULL)
                 return;
 
+        CDEBUG(D_SEC, "ctx %p, key %p\n", ctx, gctx_kr->gck_key);
+
         gctx_kr->gck_timer = NULL;
 
         del_singleshot_timer_sync(timer);
@@ -211,7 +215,7 @@ static void ctx_destroy_kr(struct ptlrpc_cli_ctx *ctx)
         struct gss_cli_ctx_keyring *gctx_kr = ctx2gctx_keyring(ctx);
         int                         rc;
 
-        CWARN("destroying ctx %p\n", ctx);
+        CDEBUG(D_SEC, "destroying ctx %p\n", ctx);
 
         /* at this time the association with key has been broken. */
         LASSERT(sec);
@@ -279,7 +283,7 @@ void ctx_enlist_kr(struct ptlrpc_cli_ctx *ctx, int is_root, int locked)
 
         atomic_inc(&ctx->cc_refcount);
         set_bit(PTLRPC_CTX_CACHED_BIT, &ctx->cc_flags);
-        hlist_add_head(&ctx->cc_hash, &gsec_kr->gsk_clist);
+        hlist_add_head(&ctx->cc_cache, &gsec_kr->gsk_clist);
         if (is_root)
                 gsec_kr->gsk_root_ctx = ctx;
 
@@ -305,8 +309,6 @@ int ctx_unlist_kr(struct ptlrpc_cli_ctx *ctx, int locked)
         if (test_and_clear_bit(PTLRPC_CTX_CACHED_BIT, &ctx->cc_flags) == 0)
                 return 0;
 
-        CWARN("ctx %p(%d) unlist\n", ctx, atomic_read(&ctx->cc_refcount));
-
         /*
          * drop ref inside spin lock to prevent race with other operations
          */
@@ -314,7 +316,7 @@ int ctx_unlist_kr(struct ptlrpc_cli_ctx *ctx, int locked)
 
         if (gsec_kr->gsk_root_ctx == ctx)
                 gsec_kr->gsk_root_ctx = NULL;
-        hlist_del_init(&ctx->cc_hash);
+        hlist_del_init(&ctx->cc_cache);
         atomic_dec(&ctx->cc_refcount);
 
         spin_unlock_if(&sec->ps_lock, !locked);
@@ -419,17 +421,23 @@ static void kill_key_locked(struct key *key)
 }
 
 /*
- * since this called, nobody else could touch the ctx in @freelist
+ * caller should hold one ref on contexts in freelist.
  */
 static void dispose_ctx_list_kr(struct hlist_head *freelist)
 {
         struct hlist_node      *pos, *next;
         struct ptlrpc_cli_ctx  *ctx;
 
-        hlist_for_each_entry_safe(ctx, pos, next, freelist, cc_hash) {
-                hlist_del_init(&ctx->cc_hash);
+        hlist_for_each_entry_safe(ctx, pos, next, freelist, cc_cache) {
+                hlist_del_init(&ctx->cc_cache);
+
+                /*
+                 * we need to wakeup waiting reqs here. the context might
+                 * be forced released before upcall finished, then the
+                 * late-arrived downcall can't find the ctx even.
+                 */
+                sptlrpc_cli_ctx_wakeup(ctx);
 
-                atomic_inc(&ctx->cc_refcount);
                 unbind_ctx_kr(ctx);
                 ctx_put_kr(ctx);
         }
@@ -448,6 +456,25 @@ struct ptlrpc_cli_ctx * sec_lookup_root_ctx_kr(struct ptlrpc_sec *sec)
         spin_lock(&sec->ps_lock);
 
         ctx = gsec_kr->gsk_root_ctx;
+
+        if (ctx == NULL && unlikely(sec_is_reverse(sec))) {
+                struct hlist_node      *node;
+                struct ptlrpc_cli_ctx  *tmp;
+                /*
+                 * reverse ctx, search root ctx in list, choose the one
+                 * with shortest expire time, which is most possibly have
+                 * an established peer ctx at client side.
+                 */
+                hlist_for_each_entry(tmp, node, &gsec_kr->gsk_clist, cc_cache) {
+                        if (ctx == NULL || ctx->cc_expire == 0 ||
+                            ctx->cc_expire > tmp->cc_expire) {
+                                ctx = tmp;
+                                /* promote to be root_ctx */
+                                gsec_kr->gsk_root_ctx = ctx;
+                        }
+                }
+        }
+
         if (ctx) {
                 LASSERT(atomic_read(&ctx->cc_refcount) > 0);
                 LASSERT(!hlist_empty(&gsec_kr->gsk_clist));
@@ -459,40 +486,44 @@ struct ptlrpc_cli_ctx * sec_lookup_root_ctx_kr(struct ptlrpc_sec *sec)
         return ctx;
 }
 
-static void sec_replace_root_ctx_kr(struct ptlrpc_sec *sec,
-                                    struct ptlrpc_cli_ctx *new_ctx,
-                                    struct key *key)
+#define RVS_CTX_EXPIRE_NICE    (10)
+
+static
+void rvs_sec_install_root_ctx_kr(struct ptlrpc_sec *sec,
+                                 struct ptlrpc_cli_ctx *new_ctx,
+                                 struct key *key)
 {
         struct gss_sec_keyring *gsec_kr = sec2gsec_keyring(sec);
-        struct ptlrpc_cli_ctx  *root_ctx;
-        struct hlist_head       freelist = HLIST_HEAD_INIT;
+        struct hlist_node      *hnode;
+        struct ptlrpc_cli_ctx  *ctx;
+        cfs_time_t              now;
         ENTRY;
 
-        spin_lock(&sec->ps_lock);
+        LASSERT(sec_is_reverse(sec));
 
-        if (gsec_kr->gsk_root_ctx) {
-                root_ctx = gsec_kr->gsk_root_ctx;
+        spin_lock(&sec->ps_lock);
 
-                set_bit(PTLRPC_CTX_DEAD_BIT, &root_ctx->cc_flags);
+        now = cfs_time_current_sec();
 
-                if (ctx_unlist_kr(root_ctx, 1))
-                        hlist_add_head(&root_ctx->cc_hash, &freelist);
+        /* set all existing ctxs short expiry */
+        hlist_for_each_entry(ctx, hnode, &gsec_kr->gsk_clist, cc_cache) {
+                if (ctx->cc_expire > now + RVS_CTX_EXPIRE_NICE) {
+                        ctx->cc_early_expire = 1;
+                        ctx->cc_expire = now + RVS_CTX_EXPIRE_NICE;
+                }
         }
 
-        /*
-         * at this time, we can't guarantee the gsk_root_ctx is NULL, because
-         * another thread might clear the HASHED flag of root ctx earlier,
-         * and waiting for spinlock which is held by us. But anyway we just
-         * install the new root ctx.
+        /* if there's root_ctx there, instead obsolete the current
+         * immediately, we leave it continue operating for a little while.
+         * hopefully when the first backward rpc with newest ctx send out,
+         * the client side already have the peer ctx well established.
          */
-        ctx_enlist_kr(new_ctx, 1, 1);
+        ctx_enlist_kr(new_ctx, gsec_kr->gsk_root_ctx ? 0 : 1, 1);
 
         if (key)
                 bind_key_ctx(key, new_ctx);
 
         spin_unlock(&sec->ps_lock);
-
-        dispose_ctx_list_kr(&freelist);
 }
 
 static void construct_key_desc(void *buf, int bufsize,
@@ -522,7 +553,7 @@ struct ptlrpc_sec * gss_sec_create_kr(struct obd_import *imp,
                 RETURN(NULL);
 
         gsec_kr->gsk_id = atomic_inc_return(&gss_sec_id_kr);
-        INIT_HLIST_HEAD(&gsec_kr->gsk_clist);
+        CFS_INIT_HLIST_HEAD(&gsec_kr->gsk_clist);
         gsec_kr->gsk_root_ctx = NULL;
         mutex_init(&gsec_kr->gsk_root_uc_lock);
 #ifdef HAVE_KEYRING_UPCALL_SERIALIZED
@@ -553,7 +584,7 @@ void gss_sec_destroy_kr(struct ptlrpc_sec *sec)
         struct gss_sec          *gsec = sec2gsec(sec);
         struct gss_sec_keyring  *gsec_kr = sec2gsec_keyring(sec);
 
-        CWARN("destroy %s@%p\n", sec->ps_policy->sp_name, sec);
+        CDEBUG(D_SEC, "destroy %s@%p\n", sec->ps_policy->sp_name, sec);
 
         LASSERT(hlist_empty(&gsec_kr->gsk_clist));
         LASSERT(gsec_kr->gsk_root_ctx == NULL);
@@ -648,7 +679,7 @@ struct ptlrpc_cli_ctx * gss_sec_lookup_ctx_kr(struct ptlrpc_sec *sec,
                  * Only lookup directly for REVERSE sec, which should
                  * always succeed.
                  */
-                if (ctx || (sec->ps_flags & PTLRPC_SEC_FL_REVERSE))
+                if (ctx || sec_is_reverse(sec))
                         RETURN(ctx);
         }
 
@@ -727,8 +758,8 @@ struct ptlrpc_cli_ctx * gss_sec_lookup_ctx_kr(struct ptlrpc_sec *sec,
 
                         ctx_start_timer_kr(ctx, KEYRING_UPCALL_TIMEOUT);
 
-                        CWARN("installed key %p <-> ctx %p (sec %p)\n",
-                              key, ctx, sec);
+                        CDEBUG(D_SEC, "installed key %p <-> ctx %p (sec %p)\n",
+                               key, ctx, sec);
                 } else {
                         /*
                          * we'd prefer to call key_revoke(), but we more like
@@ -757,8 +788,14 @@ void gss_sec_release_ctx_kr(struct ptlrpc_sec *sec,
                             struct ptlrpc_cli_ctx *ctx,
                             int sync)
 {
-        CWARN("ctx %p\n", ctx);
-        ctx_destroy_kr(ctx);
+        LASSERT(atomic_read(&ctx->cc_refcount) == 0);
+
+        if (sync)
+                ctx_destroy_kr(ctx);
+        else {
+                atomic_inc(&ctx->cc_refcount);
+                sptlrpc_gc_add_ctx(ctx);
+        }
 }
 
 /*
@@ -777,7 +814,7 @@ void flush_user_ctx_cache_kr(struct ptlrpc_sec *sec,
         char                     desc[24];
 
         /* nothing to do for reverse or rootonly sec */
-        if (sec->ps_flags & (PTLRPC_SEC_FL_REVERSE | PTLRPC_SEC_FL_ROOTONLY))
+        if (sec_is_reverse(sec) || sec_is_rootonly(sec))
                 return;
 
         construct_key_desc(desc, sizeof(desc), sec, uid);
@@ -794,7 +831,6 @@ void flush_user_ctx_cache_kr(struct ptlrpc_sec *sec,
 
                 down_write(&key->sem);
 
-                CWARN("invalidating key %p - ctx %p\n", key, key->payload.data);
                 kill_key_locked(key);
 
                 /* kill_key_locked() should usually revoke the key, but we
@@ -818,7 +854,7 @@ void flush_spec_ctx_cache_kr(struct ptlrpc_sec *sec,
                              int grace, int force)
 {
         struct gss_sec_keyring *gsec_kr;
-        struct hlist_head       freelist = HLIST_HEAD_INIT;
+        struct hlist_head       freelist = CFS_HLIST_HEAD_INIT;
         struct hlist_node      *pos, *next;
         struct ptlrpc_cli_ctx  *ctx;
         ENTRY;
@@ -827,7 +863,7 @@ void flush_spec_ctx_cache_kr(struct ptlrpc_sec *sec,
 
         spin_lock(&sec->ps_lock);
         hlist_for_each_entry_safe(ctx, pos, next,
-                                  &gsec_kr->gsk_clist, cc_hash) {
+                                  &gsec_kr->gsk_clist, cc_cache) {
                 LASSERT(atomic_read(&ctx->cc_refcount) > 0);
 
                 if (uid != -1 && uid != ctx->cc_vcred.vc_uid)
@@ -849,11 +885,14 @@ void flush_spec_ctx_cache_kr(struct ptlrpc_sec *sec,
                 if (!grace)
                         clear_bit(PTLRPC_CTX_UPTODATE_BIT, &ctx->cc_flags);
 
-                if (ctx_unlist_kr(ctx, 1)) {
-                        hlist_add_head(&ctx->cc_hash, &freelist);
-                        CWARN("unlisted ctx %p\n", ctx);
-                } else
-                        CWARN("ctx %p: unlist return 0, let it go\n", ctx);
+                atomic_inc(&ctx->cc_refcount);
+
+                if (ctx_unlist_kr(ctx, 1))
+                        hlist_add_head(&ctx->cc_cache, &freelist);
+                else {
+                        LASSERT(atomic_read(&ctx->cc_refcount) >= 2);
+                        atomic_dec(&ctx->cc_refcount);
+                }
 
         }
         spin_unlock(&sec->ps_lock);
@@ -869,9 +908,9 @@ int gss_sec_flush_ctx_cache_kr(struct ptlrpc_sec *sec,
 {
         ENTRY;
 
-        CWARN("sec %p(%d, busy %d), uid %d, grace %d, force %d\n",
-              sec, atomic_read(&sec->ps_refcount), atomic_read(&sec->ps_busy),
-              uid, grace, force);
+        CDEBUG(D_SEC, "sec %p(%d, busy %d), uid %d, grace %d, force %d\n",
+               sec, atomic_read(&sec->ps_refcount), atomic_read(&sec->ps_busy),
+               uid, grace, force);
 
         if (uid != -1 && uid != 0)
                 flush_user_ctx_cache_kr(sec, uid, grace, force);
@@ -885,7 +924,7 @@ static
 void gss_sec_gc_ctx_kr(struct ptlrpc_sec *sec)
 {
         struct gss_sec_keyring *gsec_kr = sec2gsec_keyring(sec);
-        struct hlist_head       freelist = HLIST_HEAD_INIT;
+        struct hlist_head       freelist = CFS_HLIST_HEAD_INIT;
         struct hlist_node      *pos, *next;
         struct ptlrpc_cli_ctx  *ctx;
         ENTRY;
@@ -894,12 +933,17 @@ void gss_sec_gc_ctx_kr(struct ptlrpc_sec *sec)
 
         spin_lock(&sec->ps_lock);
         hlist_for_each_entry_safe(ctx, pos, next,
-                                  &gsec_kr->gsk_clist, cc_hash) {
+                                  &gsec_kr->gsk_clist, cc_cache) {
                 LASSERT(atomic_read(&ctx->cc_refcount) > 0);
 
+                atomic_inc(&ctx->cc_refcount);
+
                 if (cli_ctx_check_death(ctx) && ctx_unlist_kr(ctx, 1)) {
-                        hlist_add_head(&ctx->cc_hash, &freelist);
+                        hlist_add_head(&ctx->cc_cache, &freelist);
                         CWARN("unhashed ctx %p\n", ctx);
+                } else {
+                        LASSERT(atomic_read(&ctx->cc_refcount) >= 2);
+                        atomic_dec(&ctx->cc_refcount);
                 }
         }
         spin_unlock(&sec->ps_lock);
@@ -924,19 +968,27 @@ int gss_sec_display_kr(struct ptlrpc_sec *sec, char *buf, int bufsize)
 
         spin_lock(&sec->ps_lock);
         hlist_for_each_entry_safe(ctx, pos, next,
-                                  &gsec_kr->gsk_clist, cc_hash) {
-                struct key *key;
-                int         len;
+                                  &gsec_kr->gsk_clist, cc_cache) {
+                struct gss_cli_ctx     *gctx;
+                struct key             *key;
+                char                    flags_str[40];
+                int                     len;
 
+                gctx = ctx2gctx(ctx);
                 key = ctx2gctx_keyring(ctx)->gck_key;
 
-                len = snprintf(buf, bufsize, "%p(%d): expire %ld(%ld), "
-                               "uid %u, flags 0x%lx, key %08x(%d)\n",
+                gss_cli_ctx_flags2str(ctx->cc_flags,
+                                      flags_str, sizeof(flags_str));
+
+                len = snprintf(buf, bufsize, "%p(%d): uid %u, exp %ld(%ld)s, "
+                               "fl %s, seq %d, win %u, key %08x(%d), ",
                                ctx, atomic_read(&ctx->cc_refcount),
+                               ctx->cc_vcred.vc_uid,
                                ctx->cc_expire,
                                ctx->cc_expire - cfs_time_current_sec(),
-                               ctx->cc_vcred.vc_uid,
-                               ctx->cc_flags,
+                               flags_str,
+                               atomic_read(&gctx->gc_seq),
+                               gctx->gc_win,
                                key ? key->serial : 0,
                                key ? atomic_read(&key->usage) : 0);
 
@@ -944,7 +996,19 @@ int gss_sec_display_kr(struct ptlrpc_sec *sec, char *buf, int bufsize)
                 buf += len;
                 bufsize -= len;
 
-                if (bufsize < len)
+                if (bufsize <= 0)
+                        break;
+
+                if (gctx->gc_mechctx)
+                        len = lgss_display(gctx->gc_mechctx, buf, bufsize);
+                else
+                        len = snprintf(buf, bufsize, "mech N/A\n");
+
+                written += len;
+                buf += len;
+                bufsize -= len;
+
+                if (bufsize <= 0)
                         break;
         }
         spin_unlock(&sec->ps_lock);
@@ -974,7 +1038,7 @@ int gss_cli_ctx_validate_kr(struct ptlrpc_cli_ctx *ctx)
                 return 1;
         }
 
-        if (cli_ctx_is_uptodate(ctx))
+        if (cli_ctx_is_ready(ctx))
                 return 0;
         return 1;
 }
@@ -1025,7 +1089,7 @@ int sec_install_rctx_kr(struct ptlrpc_sec *sec,
                 return rc;
         }
 
-        sec_replace_root_ctx_kr(sec, cli_ctx, NULL);
+        rvs_sec_install_root_ctx_kr(sec, cli_ctx, NULL);
 
         ctx_put_kr(cli_ctx);
 
@@ -1079,7 +1143,7 @@ int sec_install_rctx_kr(struct ptlrpc_sec *sec,
                 goto err_put;
         }
 
-        sec_replace_root_ctx_kr(sec, cli_ctx, key);
+        rvs_sec_install_root_ctx_kr(sec, cli_ctx, key);
 
         ctx_put_kr(cli_ctx);
         up_write(&key->sem);
@@ -1141,7 +1205,7 @@ int gss_kt_instantiate(struct key *key, const void *data, size_t datalen)
 
         /* XXX */
         key->perm |= KEY_POS_ALL | KEY_USR_ALL;
-        CWARN("key %p instantiated, ctx %p\n", key, key->payload.data);
+        CDEBUG(D_SEC, "key %p instantiated, ctx %p\n", key, key->payload.data);
         RETURN(0);
 }
 
@@ -1203,7 +1267,6 @@ int gss_kt_update(struct key *key, const void *data, size_t datalen)
                 goto out;
         }
 
-        CWARN("secwin is %d\n", gctx->gc_win);
         if (gctx->gc_win == 0) {
                 __u32   nego_rpc_err, nego_gss_err;
 
@@ -1254,15 +1317,16 @@ out:
         if (rc == 0) {
                 gss_cli_ctx_uptodate(gctx);
         } else {
+                /*
+                 * this will also revoke the key. has to be done before
+                 * wakeup waiters otherwise they can find the stale key
+                 */
+                kill_key_locked(key);
+
                 cli_ctx_expire(ctx);
 
                 if (rc != -ERESTART)
                         set_bit(PTLRPC_CTX_ERROR_BIT, &ctx->cc_flags);
-
-                /* this will also revoke the key. has to be done before
-                 * wakeup waiters otherwise they can find the stale key
-                 */
-                kill_key_locked(key);
         }
 
         sptlrpc_cli_ctx_wakeup(ctx);
@@ -1283,7 +1347,7 @@ void gss_kt_destroy(struct key *key)
 {
         ENTRY;
         LASSERT(key->payload.data == NULL);
-        CWARN("destroy key %p\n", key);
+        CDEBUG(D_SEC, "destroy key %p\n", key);
         EXIT;
 }
 
@@ -1316,7 +1380,6 @@ static struct ptlrpc_ctx_ops gss_keyring_ctxops = {
         .refresh                = gss_cli_ctx_refresh_kr,
         .validate               = gss_cli_ctx_validate_kr,
         .die                    = gss_cli_ctx_die_kr,
-        .display                = gss_cli_ctx_display,
         .sign                   = gss_cli_ctx_sign,
         .verify                 = gss_cli_ctx_verify,
         .seal                   = gss_cli_ctx_seal,