Whamcloud - gitweb
land b_colibri_devel on HEAD:
[fs/lustre-release.git] / lustre / ptlrpc / sec_plain.c
index e15a69e..5c75f66 100644 (file)
@@ -1,7 +1,7 @@
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- * Copyright (C) 2006 Cluster File Systems, Inc.
+ * Copyright (C) 2006-2007 Cluster File Systems, Inc.
  *   Author: Eric Mei <ericm@clusterfs.com>
  *
  *   This file is part of Lustre, http://www.lustre.org.
 #include <lustre_net.h>
 #include <lustre_sec.h>
 
+struct plain_sec {
+        struct ptlrpc_sec       pls_base;
+        rwlock_t                pls_lock;
+        struct ptlrpc_cli_ctx  *pls_ctx;
+};
+
+static inline struct plain_sec *sec2plsec(struct ptlrpc_sec *sec)
+{
+        return container_of(sec, struct plain_sec, pls_base);
+}
+
 static struct ptlrpc_sec_policy plain_policy;
 static struct ptlrpc_ctx_ops    plain_ctx_ops;
-static struct ptlrpc_sec        plain_sec;
-static struct ptlrpc_cli_ctx    plain_cli_ctx;
 static struct ptlrpc_svc_ctx    plain_svc_ctx;
 
+/*
+ * flavor flags (maximum 8 flags)
+ */
+#define PLAIN_WFLVR_FLAGS_OFFSET        (12)
+#define PLAIN_WFLVR_FLAG_BULK           (1 << (0 + PLAIN_WFLVR_FLAGS_OFFSET))
+#define PLAIN_WFLVR_FLAG_USER           (1 << (1 + PLAIN_WFLVR_FLAGS_OFFSET))
+
+#define PLAIN_WFLVR_HAS_BULK(wflvr)      \
+        (((wflvr) & PLAIN_WFLVR_FLAG_BULK) != 0)
+#define PLAIN_WFLVR_HAS_USER(wflvr)      \
+        (((wflvr) & PLAIN_WFLVR_FLAG_USER) != 0)
+
+#define PLAIN_WFLVR_TO_RPC(wflvr)       \
+        ((wflvr) & ((1 << PLAIN_WFLVR_FLAGS_OFFSET) - 1))
+
+/*
+ * similar to null sec, temporarily use the third byte of lm_secflvr to identify
+ * the source sec part.
+ */
+static inline
+void plain_encode_sec_part(struct lustre_msg *msg, enum lustre_sec_part sp)
+{
+        msg->lm_secflvr |= (((__u32) sp) & 0xFF) << 16;
+}
+
+static inline
+enum lustre_sec_part plain_decode_sec_part(struct lustre_msg *msg)
+{
+        return (msg->lm_secflvr >> 16) & 0xFF;
+}
+
+/*
+ * for simplicity, plain policy rpc use fixed layout.
+ */
+#define PLAIN_PACK_SEGMENTS             (3)
+
+#define PLAIN_PACK_MSG_OFF              (0)
+#define PLAIN_PACK_USER_OFF             (1)
+#define PLAIN_PACK_BULK_OFF             (2)
+
 /****************************************
  * cli_ctx apis                         *
  ****************************************/
@@ -53,12 +102,25 @@ int plain_ctx_refresh(struct ptlrpc_cli_ctx *ctx)
 }
 
 static
+int plain_ctx_validate(struct ptlrpc_cli_ctx *ctx)
+{
+        return 0;
+}
+
+static
 int plain_ctx_sign(struct ptlrpc_cli_ctx *ctx, struct ptlrpc_request *req)
 {
         struct lustre_msg_v2 *msg = req->rq_reqbuf;
         ENTRY;
 
-        msg->lm_secflvr = req->rq_sec_flavor;
+        msg->lm_secflvr = req->rq_flvr.sf_rpc;
+        if (req->rq_pack_bulk)
+                msg->lm_secflvr |= PLAIN_WFLVR_FLAG_BULK;
+        if (req->rq_pack_udesc)
+                msg->lm_secflvr |= PLAIN_WFLVR_FLAG_USER;
+
+        plain_encode_sec_part(msg, ctx->cc_sec->ps_part);
+
         req->rq_reqdata_len = lustre_msg_size_v2(msg->lm_bufcount,
                                                  msg->lm_buflens);
         RETURN(0);
@@ -68,23 +130,37 @@ static
 int plain_ctx_verify(struct ptlrpc_cli_ctx *ctx, struct ptlrpc_request *req)
 {
         struct lustre_msg *msg = req->rq_repbuf;
+        __u16              wflvr;
         ENTRY;
 
-        if (SEC_FLAVOR_HAS_BULK(req->rq_sec_flavor)) {
-                if (msg->lm_bufcount != 2) {
-                        CERROR("Protocol error: invalid buf count %d\n",
-                               msg->lm_bufcount);
-                        RETURN(-EPROTO);
-                }
+        if (msg->lm_bufcount != PLAIN_PACK_SEGMENTS) {
+                CERROR("unexpected reply buf count %u\n", msg->lm_bufcount);
+                RETURN(-EPROTO);
+        }
 
-                if (bulk_sec_desc_unpack(msg, 1)) {
-                        CERROR("Mal-formed bulk checksum reply\n");
-                        RETURN(-EINVAL);
-                }
+        wflvr = WIRE_FLVR_RPC(msg->lm_secflvr);
+
+        /* expect no user desc in reply */
+        if (PLAIN_WFLVR_HAS_USER(wflvr)) {
+                CERROR("Unexpected udesc flag in reply\n");
+                RETURN(-EPROTO);
+        }
+
+        /* whether we sent with bulk or not, we expect the same in reply */
+        if (!equi(req->rq_pack_bulk == 1, PLAIN_WFLVR_HAS_BULK(wflvr))) {
+                CERROR("%s bulk checksum in reply\n",
+                       req->rq_pack_bulk ? "Missing" : "Unexpected");
+                RETURN(-EPROTO);
         }
 
-        req->rq_repmsg = lustre_msg_buf(msg, 0, 0);
-        req->rq_replen = msg->lm_buflens[0];
+        if (req->rq_pack_bulk &&
+            bulk_sec_desc_unpack(msg, PLAIN_PACK_BULK_OFF)) {
+                CERROR("Mal-formed bulk checksum reply\n");
+                RETURN(-EINVAL);
+        }
+
+        req->rq_repmsg = lustre_msg_buf(msg, PLAIN_PACK_MSG_OFF, 0);
+        req->rq_replen = msg->lm_buflens[PLAIN_PACK_MSG_OFF];
         RETURN(0);
 }
 
@@ -93,17 +169,13 @@ int plain_cli_wrap_bulk(struct ptlrpc_cli_ctx *ctx,
                         struct ptlrpc_request *req,
                         struct ptlrpc_bulk_desc *desc)
 {
-        struct sec_flavor_config *conf;
-
-        LASSERT(req->rq_import);
-        LASSERT(SEC_FLAVOR_HAS_BULK(req->rq_sec_flavor));
-        LASSERT(req->rq_reqbuf->lm_bufcount >= 2);
+        LASSERT(req->rq_pack_bulk);
+        LASSERT(req->rq_reqbuf->lm_bufcount == PLAIN_PACK_SEGMENTS);
 
-        conf = &req->rq_import->imp_obd->u.cli.cl_sec_conf;
         return bulk_csum_cli_request(desc, req->rq_bulk_read,
-                                     conf->sfc_bulk_csum,
+                                     req->rq_flvr.sf_bulk_csum,
                                      req->rq_reqbuf,
-                                     req->rq_reqbuf->lm_bufcount - 1);
+                                     PLAIN_PACK_BULK_OFF);
 }
 
 static
@@ -111,15 +183,13 @@ int plain_cli_unwrap_bulk(struct ptlrpc_cli_ctx *ctx,
                           struct ptlrpc_request *req,
                           struct ptlrpc_bulk_desc *desc)
 {
-        LASSERT(SEC_FLAVOR_HAS_BULK(req->rq_sec_flavor));
-        LASSERT(req->rq_reqbuf->lm_bufcount >= 2);
-        LASSERT(req->rq_repbuf->lm_bufcount >= 2);
+        LASSERT(req->rq_pack_bulk);
+        LASSERT(req->rq_reqbuf->lm_bufcount == PLAIN_PACK_SEGMENTS);
+        LASSERT(req->rq_repbuf->lm_bufcount == PLAIN_PACK_SEGMENTS);
 
         return bulk_csum_cli_reply(desc, req->rq_bulk_read,
-                                   req->rq_reqbuf,
-                                   req->rq_reqbuf->lm_bufcount - 1,
-                                   req->rq_repbuf,
-                                   req->rq_repbuf->lm_bufcount - 1);
+                                   req->rq_reqbuf, PLAIN_PACK_BULK_OFF,
+                                   req->rq_repbuf, PLAIN_PACK_BULK_OFF);
 }
 
 /****************************************
@@ -127,40 +197,178 @@ int plain_cli_unwrap_bulk(struct ptlrpc_cli_ctx *ctx,
  ****************************************/
 
 static
-struct ptlrpc_sec* plain_create_sec(struct obd_import *imp,
-                                    struct ptlrpc_svc_ctx *ctx,
-                                    __u32 flavor,
-                                    unsigned long flags)
+struct ptlrpc_cli_ctx *plain_sec_install_ctx(struct plain_sec *plsec)
 {
-        ENTRY;
-        LASSERT(SEC_FLAVOR_POLICY(flavor) == SPTLRPC_POLICY_PLAIN);
-        RETURN(&plain_sec);
+        struct ptlrpc_cli_ctx  *ctx, *ctx_new;
+
+        OBD_ALLOC_PTR(ctx_new);
+
+        write_lock(&plsec->pls_lock);
+
+        ctx = plsec->pls_ctx;
+        if (ctx) {
+                atomic_inc(&ctx->cc_refcount);
+
+                if (ctx_new)
+                        OBD_FREE_PTR(ctx_new);
+        } else if (ctx_new) {
+                ctx = ctx_new;
+
+                atomic_set(&ctx->cc_refcount, 1); /* for cache */
+                ctx->cc_sec = &plsec->pls_base;
+                ctx->cc_ops = &plain_ctx_ops;
+                ctx->cc_expire = 0;
+                ctx->cc_flags = PTLRPC_CTX_CACHED | PTLRPC_CTX_UPTODATE;
+                ctx->cc_vcred.vc_uid = 0;
+                spin_lock_init(&ctx->cc_lock);
+                INIT_LIST_HEAD(&ctx->cc_req_list);
+                INIT_LIST_HEAD(&ctx->cc_gc_chain);
+
+                plsec->pls_ctx = ctx;
+                atomic_inc(&plsec->pls_base.ps_nctx);
+                atomic_inc(&plsec->pls_base.ps_refcount);
+
+                atomic_inc(&ctx->cc_refcount); /* for caller */
+        }
+
+        write_unlock(&plsec->pls_lock);
+
+        return ctx;
 }
 
 static
 void plain_destroy_sec(struct ptlrpc_sec *sec)
 {
+        struct plain_sec       *plsec = sec2plsec(sec);
         ENTRY;
-        LASSERT(sec == &plain_sec);
+
+        LASSERT(sec->ps_policy == &plain_policy);
+        LASSERT(sec->ps_import);
+        LASSERT(atomic_read(&sec->ps_refcount) == 0);
+        LASSERT(atomic_read(&sec->ps_nctx) == 0);
+        LASSERT(plsec->pls_ctx == NULL);
+
+        class_import_put(sec->ps_import);
+
+        OBD_FREE_PTR(plsec);
         EXIT;
 }
 
 static
+void plain_kill_sec(struct ptlrpc_sec *sec)
+{
+        sec->ps_dying = 1;
+}
+
+static
+struct ptlrpc_sec *plain_create_sec(struct obd_import *imp,
+                                    struct ptlrpc_svc_ctx *svc_ctx,
+                                    struct sptlrpc_flavor *sf)
+{
+        struct plain_sec       *plsec;
+        struct ptlrpc_sec      *sec;
+        struct ptlrpc_cli_ctx  *ctx;
+        ENTRY;
+
+        LASSERT(RPC_FLVR_POLICY(sf->sf_rpc) == SPTLRPC_POLICY_PLAIN);
+
+        if (sf->sf_bulk_priv != BULK_PRIV_ALG_NULL) {
+                CERROR("plain policy don't support bulk encryption: %u\n",
+                       sf->sf_bulk_priv);
+                RETURN(NULL);
+        }
+
+        OBD_ALLOC_PTR(plsec);
+        if (plsec == NULL)
+                RETURN(NULL);
+
+        /*
+         * initialize plain_sec
+         */
+        plsec->pls_lock = RW_LOCK_UNLOCKED;
+        plsec->pls_ctx = NULL;
+
+        sec = &plsec->pls_base;
+        sec->ps_policy = &plain_policy;
+        atomic_set(&sec->ps_refcount, 0);
+        atomic_set(&sec->ps_nctx, 0);
+        sec->ps_id = sptlrpc_get_next_secid();
+        sec->ps_import = class_import_get(imp);
+        sec->ps_flvr = *sf;
+        sec->ps_lock = SPIN_LOCK_UNLOCKED;
+        INIT_LIST_HEAD(&sec->ps_gc_list);
+        sec->ps_gc_interval = 0;
+        sec->ps_gc_next = 0;
+
+        /* install ctx immediately if this is a reverse sec */
+        if (svc_ctx) {
+                ctx = plain_sec_install_ctx(plsec);
+                if (ctx == NULL) {
+                        plain_destroy_sec(sec);
+                        RETURN(NULL);
+                }
+                sptlrpc_cli_ctx_put(ctx, 1);
+        }
+
+        RETURN(sec);
+}
+
+static
 struct ptlrpc_cli_ctx *plain_lookup_ctx(struct ptlrpc_sec *sec,
                                         struct vfs_cred *vcred,
                                         int create, int remove_dead)
 {
+        struct plain_sec       *plsec = sec2plsec(sec);
+        struct ptlrpc_cli_ctx  *ctx;
         ENTRY;
-        atomic_inc(&plain_cli_ctx.cc_refcount);
-        RETURN(&plain_cli_ctx);
+
+        read_lock(&plsec->pls_lock);
+        ctx = plsec->pls_ctx;
+        if (ctx)
+                atomic_inc(&ctx->cc_refcount);
+        read_unlock(&plsec->pls_lock);
+
+        if (unlikely(ctx == NULL))
+                ctx = plain_sec_install_ctx(plsec);
+
+        RETURN(ctx);
+}
+
+static
+void plain_release_ctx(struct ptlrpc_sec *sec,
+                       struct ptlrpc_cli_ctx *ctx, int sync)
+{
+        LASSERT(atomic_read(&sec->ps_refcount) > 0);
+        LASSERT(atomic_read(&sec->ps_nctx) > 0);
+        LASSERT(atomic_read(&ctx->cc_refcount) == 0);
+        LASSERT(ctx->cc_sec == sec);
+
+        OBD_FREE_PTR(ctx);
+
+        atomic_dec(&sec->ps_nctx);
+        sptlrpc_sec_put(sec);
 }
 
 static
 int plain_flush_ctx_cache(struct ptlrpc_sec *sec,
-                          uid_t uid,
-                          int grace, int force)
+                          uid_t uid, int grace, int force)
 {
-        return 0;
+        struct plain_sec       *plsec = sec2plsec(sec);
+        struct ptlrpc_cli_ctx  *ctx;
+        ENTRY;
+
+        /* do nothing unless caller want to flush for 'all' */
+        if (uid != -1)
+                RETURN(0);
+
+        write_lock(&plsec->pls_lock);
+        ctx = plsec->pls_ctx;
+        plsec->pls_ctx = NULL;
+        write_unlock(&plsec->pls_lock);
+
+        if (ctx)
+                sptlrpc_cli_ctx_put(ctx, 1);
+        RETURN(0);
 }
 
 static
@@ -168,24 +376,24 @@ int plain_alloc_reqbuf(struct ptlrpc_sec *sec,
                        struct ptlrpc_request *req,
                        int msgsize)
 {
-        struct sec_flavor_config *conf;
-        int bufcnt = 1, buflens[2], alloc_len;
+        int buflens[PLAIN_PACK_SEGMENTS] = { 0, };
+        int alloc_len;
         ENTRY;
 
-        buflens[0] = msgsize;
+        buflens[PLAIN_PACK_MSG_OFF] = msgsize;
 
-        if (SEC_FLAVOR_HAS_USER(req->rq_sec_flavor))
-                buflens[bufcnt++] = sptlrpc_current_user_desc_size();
+        if (req->rq_pack_udesc)
+                buflens[PLAIN_PACK_USER_OFF] = sptlrpc_current_user_desc_size();
 
-        if (SEC_FLAVOR_HAS_BULK(req->rq_sec_flavor)) {
+        if (req->rq_pack_bulk) {
                 LASSERT(req->rq_bulk_read || req->rq_bulk_write);
 
-                conf = &req->rq_import->imp_obd->u.cli.cl_sec_conf;
-                buflens[bufcnt++] = bulk_sec_desc_size(conf->sfc_bulk_csum, 1,
-                                                       req->rq_bulk_read);
+                buflens[PLAIN_PACK_BULK_OFF] = bulk_sec_desc_size(
+                                                req->rq_flvr.sf_bulk_csum, 1,
+                                                req->rq_bulk_read);
         }
 
-        alloc_len = lustre_msg_size_v2(bufcnt, buflens);
+        alloc_len = lustre_msg_size_v2(PLAIN_PACK_SEGMENTS, buflens);
 
         if (!req->rq_reqbuf) {
                 LASSERT(!req->rq_pool);
@@ -202,11 +410,11 @@ int plain_alloc_reqbuf(struct ptlrpc_sec *sec,
                 memset(req->rq_reqbuf, 0, alloc_len);
         }
 
-        lustre_init_msg_v2(req->rq_reqbuf, bufcnt, buflens, NULL);
+        lustre_init_msg_v2(req->rq_reqbuf, PLAIN_PACK_SEGMENTS, buflens, NULL);
         req->rq_reqmsg = lustre_msg_buf_v2(req->rq_reqbuf, 0, 0);
 
-        if (SEC_FLAVOR_HAS_USER(req->rq_sec_flavor))
-                sptlrpc_pack_user_desc(req->rq_reqbuf, 1);
+        if (req->rq_pack_udesc)
+                sptlrpc_pack_user_desc(req->rq_reqbuf, PLAIN_PACK_USER_OFF);
 
         RETURN(0);
 }
@@ -221,6 +429,8 @@ void plain_free_reqbuf(struct ptlrpc_sec *sec,
                 req->rq_reqbuf = NULL;
                 req->rq_reqbuf_len = 0;
         }
+
+        req->rq_reqmsg = NULL;
         EXIT;
 }
 
@@ -229,21 +439,21 @@ int plain_alloc_repbuf(struct ptlrpc_sec *sec,
                        struct ptlrpc_request *req,
                        int msgsize)
 {
-        struct sec_flavor_config *conf;
-        int bufcnt = 1, buflens[2], alloc_len;
+        int buflens[PLAIN_PACK_SEGMENTS] = { 0, };
+        int alloc_len;
         ENTRY;
 
-        buflens[0] = msgsize;
+        buflens[PLAIN_PACK_MSG_OFF] = msgsize;
 
-        if (SEC_FLAVOR_HAS_BULK(req->rq_sec_flavor)) {
+        if (req->rq_pack_bulk) {
                 LASSERT(req->rq_bulk_read || req->rq_bulk_write);
 
-                conf = &req->rq_import->imp_obd->u.cli.cl_sec_conf;
-                buflens[bufcnt++] = bulk_sec_desc_size(conf->sfc_bulk_csum, 0,
-                                                       req->rq_bulk_read);
+                buflens[PLAIN_PACK_BULK_OFF] = bulk_sec_desc_size(
+                                                req->rq_flvr.sf_bulk_csum, 0,
+                                                req->rq_bulk_read);
         }
 
-        alloc_len = lustre_msg_size_v2(bufcnt, buflens);
+        alloc_len = lustre_msg_size_v2(PLAIN_PACK_SEGMENTS, buflens);
         alloc_len = size_roundup_power2(alloc_len);
 
         OBD_ALLOC(req->rq_repbuf, alloc_len);
@@ -262,6 +472,8 @@ void plain_free_repbuf(struct ptlrpc_sec *sec,
         OBD_FREE(req->rq_repbuf, req->rq_repbuf_len);
         req->rq_repbuf = NULL;
         req->rq_repbuf_len = 0;
+
+        req->rq_repmsg = NULL;
         EXIT;
 }
 
@@ -275,10 +487,10 @@ int plain_enlarge_reqbuf(struct ptlrpc_sec *sec,
         int                     newmsg_size, newbuf_size;
         ENTRY;
 
-        /* embedded msg always at seg 0 */
         LASSERT(req->rq_reqbuf);
         LASSERT(req->rq_reqbuf_len >= req->rq_reqlen);
-        LASSERT(lustre_msg_buf(req->rq_reqbuf, 0, 0) == req->rq_reqmsg);
+        LASSERT(lustre_msg_buf(req->rq_reqbuf, PLAIN_PACK_MSG_OFF, 0) ==
+                req->rq_reqmsg);
 
         /* compute new embedded msg size.  */
         oldsize = req->rq_reqmsg->lm_buflens[segment];
@@ -288,11 +500,11 @@ int plain_enlarge_reqbuf(struct ptlrpc_sec *sec,
         req->rq_reqmsg->lm_buflens[segment] = oldsize;
 
         /* compute new wrapper msg size.  */
-        oldsize = req->rq_reqbuf->lm_buflens[0];
-        req->rq_reqbuf->lm_buflens[0] = newmsg_size;
+        oldsize = req->rq_reqbuf->lm_buflens[PLAIN_PACK_MSG_OFF];
+        req->rq_reqbuf->lm_buflens[PLAIN_PACK_MSG_OFF] = newmsg_size;
         newbuf_size = lustre_msg_size_v2(req->rq_reqbuf->lm_bufcount,
                                          req->rq_reqbuf->lm_buflens);
-        req->rq_reqbuf->lm_buflens[0] = oldsize;
+        req->rq_reqbuf->lm_buflens[PLAIN_PACK_MSG_OFF] = oldsize;
 
         /* request from pool should always have enough buffer */
         LASSERT(!req->rq_pool || req->rq_reqbuf_len >= newbuf_size);
@@ -309,10 +521,12 @@ int plain_enlarge_reqbuf(struct ptlrpc_sec *sec,
                 OBD_FREE(req->rq_reqbuf, req->rq_reqbuf_len);
                 req->rq_reqbuf = newbuf;
                 req->rq_reqbuf_len = newbuf_size;
-                req->rq_reqmsg = lustre_msg_buf(req->rq_reqbuf, 0, 0);
+                req->rq_reqmsg = lustre_msg_buf(req->rq_reqbuf,
+                                                PLAIN_PACK_MSG_OFF, 0);
         }
 
-        _sptlrpc_enlarge_msg_inplace(req->rq_reqbuf, 0, newmsg_size);
+        _sptlrpc_enlarge_msg_inplace(req->rq_reqbuf, PLAIN_PACK_MSG_OFF,
+                                     newmsg_size);
         _sptlrpc_enlarge_msg_inplace(req->rq_reqmsg, segment, newsize);
 
         req->rq_reqlen = newmsg_size;
@@ -332,46 +546,43 @@ static
 int plain_accept(struct ptlrpc_request *req)
 {
         struct lustre_msg *msg = req->rq_reqbuf;
-        int                bufcnt = 1;
         ENTRY;
 
-        LASSERT(SEC_FLAVOR_POLICY(req->rq_sec_flavor) == SPTLRPC_POLICY_PLAIN);
+        LASSERT(RPC_FLVR_POLICY(req->rq_flvr.sf_rpc) == SPTLRPC_POLICY_PLAIN);
 
-        if (SEC_FLAVOR_RPC(req->rq_sec_flavor) != SPTLRPC_FLVR_PLAIN) {
-                CERROR("Invalid flavor 0x%x\n", req->rq_sec_flavor);
-                return SECSVC_DROP;
+        if (msg->lm_bufcount < PLAIN_PACK_SEGMENTS) {
+                CERROR("unexpected request buf count %u\n", msg->lm_bufcount);
+                RETURN(SECSVC_DROP);
         }
 
-        if (SEC_FLAVOR_HAS_USER(req->rq_sec_flavor)) {
-                if (msg->lm_bufcount < ++bufcnt) {
-                        CERROR("Protocal error: too small buf count %d\n",
-                               msg->lm_bufcount);
-                        RETURN(SECSVC_DROP);
-                }
+        if (req->rq_flvr.sf_rpc != SPTLRPC_FLVR_PLAIN) {
+                CERROR("Invalid rpc flavor %x\n", req->rq_flvr.sf_rpc);
+                RETURN(SECSVC_DROP);
+        }
+
+        req->rq_sp_from = plain_decode_sec_part(msg);
 
-                if (sptlrpc_unpack_user_desc(msg, bufcnt - 1)) {
+        if (PLAIN_WFLVR_HAS_USER(msg->lm_secflvr)) {
+                if (sptlrpc_unpack_user_desc(msg, PLAIN_PACK_USER_OFF)) {
                         CERROR("Mal-formed user descriptor\n");
                         RETURN(SECSVC_DROP);
                 }
 
-                req->rq_user_desc = lustre_msg_buf(msg, bufcnt - 1, 0);
+                req->rq_pack_udesc = 1;
+                req->rq_user_desc = lustre_msg_buf(msg, PLAIN_PACK_USER_OFF, 0);
         }
 
-        if (SEC_FLAVOR_HAS_BULK(req->rq_sec_flavor)) {
-                if (msg->lm_bufcount != ++bufcnt) {
-                        CERROR("Protocal error: invalid buf count %d\n",
-                               msg->lm_bufcount);
-                        RETURN(SECSVC_DROP);
-                }
-
-                if (bulk_sec_desc_unpack(msg, bufcnt - 1)) {
+        if (PLAIN_WFLVR_HAS_BULK(msg->lm_secflvr)) {
+                if (bulk_sec_desc_unpack(msg, PLAIN_PACK_BULK_OFF)) {
                         CERROR("Mal-formed bulk checksum request\n");
                         RETURN(SECSVC_DROP);
                 }
+
+                req->rq_pack_bulk = 1;
         }
 
-        req->rq_reqmsg = lustre_msg_buf(msg, 0, 0);
-        req->rq_reqlen = msg->lm_buflens[0];
+        req->rq_reqmsg = lustre_msg_buf(msg, PLAIN_PACK_MSG_OFF, 0);
+        req->rq_reqlen = msg->lm_buflens[PLAIN_PACK_MSG_OFF];
 
         req->rq_svc_ctx = &plain_svc_ctx;
         atomic_inc(&req->rq_svc_ctx->sc_refcount);
@@ -382,26 +593,26 @@ int plain_accept(struct ptlrpc_request *req)
 static
 int plain_alloc_rs(struct ptlrpc_request *req, int msgsize)
 {
-        struct ptlrpc_reply_state *rs;
+        struct ptlrpc_reply_state   *rs;
         struct ptlrpc_bulk_sec_desc *bsd;
-        int bufcnt = 1, buflens[2];
-        int rs_size = sizeof(*rs);
+        int                          buflens[PLAIN_PACK_SEGMENTS] = { 0, };
+        int                          rs_size = sizeof(*rs);
         ENTRY;
 
         LASSERT(msgsize % 8 == 0);
 
-        buflens[0] = msgsize;
-        if (SEC_FLAVOR_HAS_BULK(req->rq_sec_flavor) &&
-            (req->rq_bulk_read || req->rq_bulk_write)) {
+        buflens[PLAIN_PACK_MSG_OFF] = msgsize;
+
+        if (req->rq_pack_bulk && (req->rq_bulk_read || req->rq_bulk_write)) {
                 bsd = lustre_msg_buf(req->rq_reqbuf,
-                                     req->rq_reqbuf->lm_bufcount - 1,
-                                     sizeof(*bsd));
+                                     PLAIN_PACK_BULK_OFF, sizeof(*bsd));
                 LASSERT(bsd);
 
-                buflens[bufcnt++] = bulk_sec_desc_size(bsd->bsd_csum_alg, 0,
-                                                       req->rq_bulk_read);
+                buflens[PLAIN_PACK_BULK_OFF] = bulk_sec_desc_size(
+                                                        bsd->bsd_csum_alg, 0,
+                                                        req->rq_bulk_read);
         }
-        rs_size += lustre_msg_size_v2(bufcnt, buflens);
+        rs_size += lustre_msg_size_v2(PLAIN_PACK_SEGMENTS, buflens);
 
         rs = req->rq_reply_state;
 
@@ -421,8 +632,8 @@ int plain_alloc_rs(struct ptlrpc_request *req, int msgsize)
         rs->rs_repbuf = (struct lustre_msg *) (rs + 1);
         rs->rs_repbuf_len = rs_size - sizeof(*rs);
 
-        lustre_init_msg_v2(rs->rs_repbuf, bufcnt, buflens, NULL);
-        rs->rs_msg = lustre_msg_buf_v2(rs->rs_repbuf, 0, 0);
+        lustre_init_msg_v2(rs->rs_repbuf, PLAIN_PACK_SEGMENTS, buflens, NULL);
+        rs->rs_msg = lustre_msg_buf_v2(rs->rs_repbuf, PLAIN_PACK_MSG_OFF, 0);
 
         req->rq_reply_state = rs;
         RETURN(0);
@@ -452,12 +663,16 @@ int plain_authorize(struct ptlrpc_request *req)
         LASSERT(rs);
         LASSERT(msg);
 
-        if (req->rq_replen != msg->lm_buflens[0])
-                len = lustre_shrink_msg(msg, 0, req->rq_replen, 1);
+        if (req->rq_replen != msg->lm_buflens[PLAIN_PACK_MSG_OFF])
+                len = lustre_shrink_msg(msg, PLAIN_PACK_MSG_OFF,
+                                        req->rq_replen, 1);
         else
                 len = lustre_msg_size_v2(msg->lm_bufcount, msg->lm_buflens);
 
-        msg->lm_secflvr = req->rq_sec_flavor;
+        msg->lm_secflvr = req->rq_flvr.sf_rpc;
+        if (req->rq_pack_bulk)
+                msg->lm_secflvr |= PLAIN_WFLVR_FLAG_BULK;
+
         rs->rs_repdata_len = len;
         RETURN(0);
 }
@@ -467,18 +682,21 @@ int plain_svc_unwrap_bulk(struct ptlrpc_request *req,
                           struct ptlrpc_bulk_desc *desc)
 {
         struct ptlrpc_reply_state      *rs = req->rq_reply_state;
-        int                             voff, roff;
 
         LASSERT(rs);
-
-        voff = req->rq_reqbuf->lm_bufcount - 1;
-        roff = rs->rs_repbuf->lm_bufcount - 1;
+        LASSERT(req->rq_pack_bulk);
+        LASSERT(req->rq_reqbuf->lm_bufcount >= PLAIN_PACK_SEGMENTS);
+        LASSERT(rs->rs_repbuf->lm_bufcount == PLAIN_PACK_SEGMENTS);
 
         return bulk_csum_svc(desc, req->rq_bulk_read,
-                             lustre_msg_buf(req->rq_reqbuf, voff, 0),
-                             lustre_msg_buflen(req->rq_reqbuf, voff),
-                             lustre_msg_buf(rs->rs_repbuf, roff, 0),
-                             lustre_msg_buflen(rs->rs_repbuf, roff));
+                             lustre_msg_buf(req->rq_reqbuf,
+                                            PLAIN_PACK_BULK_OFF, 0),
+                             lustre_msg_buflen(req->rq_reqbuf,
+                                               PLAIN_PACK_BULK_OFF),
+                             lustre_msg_buf(rs->rs_repbuf,
+                                            PLAIN_PACK_BULK_OFF, 0),
+                             lustre_msg_buflen(rs->rs_repbuf,
+                                               PLAIN_PACK_BULK_OFF));
 }
 
 static
@@ -486,22 +704,26 @@ int plain_svc_wrap_bulk(struct ptlrpc_request *req,
                         struct ptlrpc_bulk_desc *desc)
 {
         struct ptlrpc_reply_state      *rs = req->rq_reply_state;
-        int                             voff, roff;
 
         LASSERT(rs);
-
-        voff = req->rq_reqbuf->lm_bufcount - 1;
-        roff = rs->rs_repbuf->lm_bufcount - 1;
+        LASSERT(req->rq_pack_bulk);
+        LASSERT(req->rq_reqbuf->lm_bufcount >= PLAIN_PACK_SEGMENTS);
+        LASSERT(rs->rs_repbuf->lm_bufcount == PLAIN_PACK_SEGMENTS);
 
         return bulk_csum_svc(desc, req->rq_bulk_read,
-                             lustre_msg_buf(req->rq_reqbuf, voff, 0),
-                             lustre_msg_buflen(req->rq_reqbuf, voff),
-                             lustre_msg_buf(rs->rs_repbuf, roff, 0),
-                             lustre_msg_buflen(rs->rs_repbuf, roff));
+                             lustre_msg_buf(req->rq_reqbuf,
+                                            PLAIN_PACK_BULK_OFF, 0),
+                             lustre_msg_buflen(req->rq_reqbuf,
+                                               PLAIN_PACK_BULK_OFF),
+                             lustre_msg_buf(rs->rs_repbuf,
+                                            PLAIN_PACK_BULK_OFF, 0),
+                             lustre_msg_buflen(rs->rs_repbuf,
+                                               PLAIN_PACK_BULK_OFF));
 }
 
 static struct ptlrpc_ctx_ops plain_ctx_ops = {
         .refresh                = plain_ctx_refresh,
+        .validate               = plain_ctx_validate,
         .sign                   = plain_ctx_sign,
         .verify                 = plain_ctx_verify,
         .wrap_bulk              = plain_cli_wrap_bulk,
@@ -511,7 +733,9 @@ static struct ptlrpc_ctx_ops plain_ctx_ops = {
 static struct ptlrpc_sec_cops plain_sec_cops = {
         .create_sec             = plain_create_sec,
         .destroy_sec            = plain_destroy_sec,
+        .kill_sec               = plain_kill_sec,
         .lookup_ctx             = plain_lookup_ctx,
+        .release_ctx            = plain_release_ctx,
         .flush_ctx_cache        = plain_flush_ctx_cache,
         .alloc_reqbuf           = plain_alloc_reqbuf,
         .alloc_repbuf           = plain_alloc_repbuf,
@@ -531,50 +755,19 @@ static struct ptlrpc_sec_sops plain_sec_sops = {
 
 static struct ptlrpc_sec_policy plain_policy = {
         .sp_owner               = THIS_MODULE,
-        .sp_name                = "sec.plain",
+        .sp_name                = "plain",
         .sp_policy              = SPTLRPC_POLICY_PLAIN,
         .sp_cops                = &plain_sec_cops,
         .sp_sops                = &plain_sec_sops,
 };
 
-static
-void plain_init_internal(void)
-{
-        static HLIST_HEAD(__list);
-
-        plain_sec.ps_policy = &plain_policy;
-        atomic_set(&plain_sec.ps_refcount, 1);     /* always busy */
-        plain_sec.ps_import = NULL;
-        plain_sec.ps_flavor = SPTLRPC_FLVR_PLAIN;
-        plain_sec.ps_flags = 0;
-        spin_lock_init(&plain_sec.ps_lock);
-        atomic_set(&plain_sec.ps_busy, 1);         /* for "plain_cli_ctx" */
-        CFS_INIT_LIST_HEAD(&plain_sec.ps_gc_list);
-        plain_sec.ps_gc_interval = 0;
-        plain_sec.ps_gc_next = 0;
-
-        hlist_add_head(&plain_cli_ctx.cc_cache, &__list);
-        atomic_set(&plain_cli_ctx.cc_refcount, 1);    /* for hash */
-        plain_cli_ctx.cc_sec = &plain_sec;
-        plain_cli_ctx.cc_ops = &plain_ctx_ops;
-        plain_cli_ctx.cc_expire = 0;
-        plain_cli_ctx.cc_flags = PTLRPC_CTX_CACHED | PTLRPC_CTX_ETERNAL |
-                                 PTLRPC_CTX_UPTODATE;
-        plain_cli_ctx.cc_vcred.vc_uid = 0;
-        spin_lock_init(&plain_cli_ctx.cc_lock);
-        CFS_INIT_LIST_HEAD(&plain_cli_ctx.cc_req_list);
-        CFS_INIT_LIST_HEAD(&plain_cli_ctx.cc_gc_chain);
-}
-
 int sptlrpc_plain_init(void)
 {
         int rc;
 
-        plain_init_internal();
-
         rc = sptlrpc_register_policy(&plain_policy);
         if (rc)
-                CERROR("failed to register sec.plain: %d\n", rc);
+                CERROR("failed to register: %d\n", rc);
 
         return rc;
 }
@@ -585,5 +778,5 @@ void sptlrpc_plain_fini(void)
 
         rc = sptlrpc_unregister_policy(&plain_policy);
         if (rc)
-                CERROR("cannot unregister sec.plain: %d\n", rc);
+                CERROR("cannot unregister: %d\n", rc);
 }