Whamcloud - gitweb
branch: HEAD
authorericm <ericm>
Fri, 30 Jan 2009 17:35:56 +0000 (17:35 +0000)
committerericm <ericm>
Fri, 30 Jan 2009 17:35:56 +0000 (17:35 +0000)
rewrite the bulk i/o security, use same gssapi as on rpc data instead of
invent our own mechanism.
b=16830
r=fanyong
r=vitaly

29 files changed:
lustre/autoconf/lustre-core.m4
lustre/include/lustre_net.h
lustre/include/lustre_sec.h
lustre/ldlm/ldlm_lib.c
lustre/mdc/mdc_request.c
lustre/mdt/mdt_handler.c
lustre/mgs/mgs_llog.c
lustre/osc/osc_request.c
lustre/ost/ost_handler.c
lustre/ptlrpc/events.c
lustre/ptlrpc/gss/gss_api.h
lustre/ptlrpc/gss/gss_bulk.c
lustre/ptlrpc/gss/gss_internal.h
lustre/ptlrpc/gss/gss_keyring.c
lustre/ptlrpc/gss/gss_krb5_mech.c
lustre/ptlrpc/gss/gss_mech_switch.c
lustre/ptlrpc/gss/sec_gss.c
lustre/ptlrpc/niobuf.c
lustre/ptlrpc/pers.c
lustre/ptlrpc/sec.c
lustre/ptlrpc/sec_bulk.c
lustre/ptlrpc/sec_config.c
lustre/ptlrpc/sec_lproc.c
lustre/ptlrpc/sec_null.c
lustre/ptlrpc/sec_plain.c
lustre/ptlrpc/service.c
lustre/tests/sanity-gss.sh
lustre/tests/sanity.sh
lustre/tests/test-framework.sh

index 4e00bf6..04023c7 100644 (file)
@@ -623,7 +623,7 @@ dnl the AES symbol usually tied with arch, e.g. CRYPTO_AES_586
 dnl FIXME
 AC_DEFUN([LC_CONFIG_RMTCLIENT],
 [LB_LINUX_CONFIG_IM([CRYPTO_AES],[],[
-       AC_MSG_ERROR([Lustre remote client require that CONFIG_CRYPTO_AES is enabled in your kernel.])
+        AC_MSG_WARN([Lustre remote client require that CONFIG_CRYPTO_AES is enabled in your kernel.])
 ])
 ])
 
@@ -654,19 +654,19 @@ AC_DEFUN([LC_CONFIG_SUNRPC],
 AC_DEFUN([LC_CONFIG_GSS_KEYRING],
 [AC_MSG_CHECKING([whether to enable gss keyring backend])
  AC_ARG_ENABLE([gss_keyring],
-              [AC_HELP_STRING([--disable-gss-keyring],
+               [AC_HELP_STRING([--disable-gss-keyring],
                                [disable gss keyring backend])],
-              [],[enable_gss_keyring='yes'])
+               [],[enable_gss_keyring='yes'])
  AC_MSG_RESULT([$enable_gss_keyring])
 
  if test x$enable_gss_keyring != xno; then
-       LB_LINUX_CONFIG_IM([KEYS],[],
+        LB_LINUX_CONFIG_IM([KEYS],[],
                            [AC_MSG_ERROR([GSS keyring backend require that CONFIG_KEYS be enabled in your kernel.])])
 
-       AC_CHECK_LIB([keyutils], [keyctl_search], [],
+        AC_CHECK_LIB([keyutils], [keyctl_search], [],
                      [AC_MSG_ERROR([libkeyutils is not found, which is required by gss keyring backend])],)
 
-       AC_DEFINE([HAVE_GSS_KEYRING], [1],
+        AC_DEFINE([HAVE_GSS_KEYRING], [1],
                   [Define this if you enable gss keyring backend])
  fi
 ])
@@ -685,37 +685,29 @@ AC_DEFUN([LC_CONFIG_GSS],
  AC_MSG_RESULT([$enable_gss])
 
  if test x$enable_gss == xyes; then
-       LC_CONFIG_GSS_KEYRING
+        LC_CONFIG_GSS_KEYRING
         LC_CONFIG_SUNRPC
 
+        AC_DEFINE([HAVE_GSS], [1], [Define this if you enable gss])
+
         LB_LINUX_CONFIG_IM([CRYPTO_MD5],[],
                            [AC_MSG_WARN([kernel MD5 support is recommended by using GSS.])])
-       LB_LINUX_CONFIG_IM([CRYPTO_SHA1],[],
+        LB_LINUX_CONFIG_IM([CRYPTO_SHA1],[],
                            [AC_MSG_WARN([kernel SHA1 support is recommended by using GSS.])])
-       LB_LINUX_CONFIG_IM([CRYPTO_SHA256],[],
+        LB_LINUX_CONFIG_IM([CRYPTO_SHA256],[],
                            [AC_MSG_WARN([kernel SHA256 support is recommended by using GSS.])])
-       LB_LINUX_CONFIG_IM([CRYPTO_SHA512],[],
+        LB_LINUX_CONFIG_IM([CRYPTO_SHA512],[],
                            [AC_MSG_WARN([kernel SHA512 support is recommended by using GSS.])])
-       LB_LINUX_CONFIG_IM([CRYPTO_WP512],[],
-                           [AC_MSG_WARN([kernel WP512 support is recommended by using GSS.])])
-       LB_LINUX_CONFIG_IM([CRYPTO_ARC4],[],
-                           [AC_MSG_WARN([kernel ARC4 support is recommended by using GSS.])])
-        LB_LINUX_CONFIG_IM([CRYPTO_DES],[],
-                           [AC_MSG_WARN([kernel DES support is recommended by using GSS.])])
-        LB_LINUX_CONFIG_IM([CRYPTO_TWOFISH],[],
-                           [AC_MSG_WARN([kernel TWOFISH support is recommended by using GSS.])])
-        LB_LINUX_CONFIG_IM([CRYPTO_CAST6],[],
-                           [AC_MSG_WARN([kernel CAST6 support is recommended by using GSS.])])
-
-       AC_CHECK_LIB([gssapi], [gss_init_sec_context],
+
+        AC_CHECK_LIB([gssapi], [gss_init_sec_context],
                      [GSSAPI_LIBS="$GSSAPI_LDFLAGS -lgssapi"],
                      [AC_CHECK_LIB([gssglue], [gss_init_sec_context],
                                    [GSSAPI_LIBS="$GSSAPI_LDFLAGS -lgssglue"],
                                    [AC_MSG_ERROR([libgssapi or libgssglue is not found, which is required by GSS.])])],)
 
-       AC_SUBST(GSSAPI_LIBS)
+        AC_SUBST(GSSAPI_LIBS)
 
-       AC_KERBEROS_V5
+        AC_KERBEROS_V5
  fi
 ])
 
index efef5b4..01754a8 100644 (file)
@@ -636,8 +636,12 @@ struct ptlrpc_bulk_desc {
         lnet_handle_md_t       bd_md_h;         /* associated MD */
         lnet_nid_t             bd_sender;       /* stash event::sender */
 
-        cfs_page_t           **bd_enc_pages;
 #if defined(__KERNEL__)
+        /*
+         * encrypt iov, size is either 0 or bd_iov_count.
+         */
+        lnet_kiov_t           *bd_enc_iov;
+
         lnet_kiov_t            bd_iov[0];
 #else
         lnet_md_iovec_t        bd_iov[0];
index 57a58c7..50274fc 100644 (file)
@@ -94,99 +94,163 @@ enum sptlrpc_service_type {
         SPTLRPC_SVC_MAX,
 };
 
+enum sptlrpc_bulk_type {
+        SPTLRPC_BULK_DEFAULT            = 0,    /* follow rpc flavor */
+        SPTLRPC_BULK_HASH               = 1,    /* hash integrity */
+        SPTLRPC_BULK_MAX,
+};
+
+enum sptlrpc_bulk_service {
+        SPTLRPC_BULK_SVC_NULL           = 0,
+        SPTLRPC_BULK_SVC_AUTH           = 1,
+        SPTLRPC_BULK_SVC_INTG           = 2,
+        SPTLRPC_BULK_SVC_PRIV           = 3,
+        SPTLRPC_BULK_SVC_MAX,
+};
+
 /*
- * rpc flavor compose/extract, represented as 16 bits
+ * rpc flavor compose/extract, represented as 32 bits. currently the
+ * high 12 bits are unused, must be set as 0.
  *
- * 4b (reserved) | 4b (svc) | 4b (mech)  | 4b (policy)
+ * 4b (bulk svc) | 4b (bulk type) | 4b (svc) | 4b (mech)  | 4b (policy)
  */
-#define RPC_FLVR_POLICY_OFFSET        (0)
-#define RPC_FLVR_MECH_OFFSET          (4)
-#define RPC_FLVR_SVC_OFFSET           (8)
-
-#define MAKE_RPC_FLVR(policy, mech, svc)                                \
-        (((__u16)(policy) << RPC_FLVR_POLICY_OFFSET) |                  \
-         ((__u16)(mech) << RPC_FLVR_MECH_OFFSET) |                      \
-         ((__u16)(svc) << RPC_FLVR_SVC_OFFSET))
+#define FLVR_POLICY_OFFSET              (0)
+#define FLVR_MECH_OFFSET                (4)
+#define FLVR_SVC_OFFSET                 (8)
+#define FLVR_BULK_TYPE_OFFSET           (12)
+#define FLVR_BULK_SVC_OFFSET            (16)
+
+#define MAKE_FLVR(policy, mech, svc, btype, bsvc)                       \
+        (((__u32)(policy) << FLVR_POLICY_OFFSET) |                      \
+         ((__u32)(mech) << FLVR_MECH_OFFSET) |                          \
+         ((__u32)(svc) << FLVR_SVC_OFFSET) |                            \
+         ((__u32)(btype) << FLVR_BULK_TYPE_OFFSET) |                    \
+         ((__u32)(bsvc) << FLVR_BULK_SVC_OFFSET))
 
-#define MAKE_RPC_SUBFLVR(mech, svc)                                     \
-        ((__u16)(mech) |                                                \
-         ((__u16)(svc) << (RPC_FLVR_SVC_OFFSET - RPC_FLVR_MECH_OFFSET)))
-
-#define RPC_FLVR_SUB(flavor)                                            \
-        ((((__u16)(flavor)) >> RPC_FLVR_MECH_OFFSET) & 0xFF)
-
-#define RPC_FLVR_POLICY(flavor)                                         \
-        ((((__u16)(flavor)) >> RPC_FLVR_POLICY_OFFSET) & 0xF)
-#define RPC_FLVR_MECH(flavor)                                           \
-        ((((__u16)(flavor)) >> RPC_FLVR_MECH_OFFSET) & 0xF)
-#define RPC_FLVR_SVC(flavor)                                            \
-        ((((__u16)(flavor)) >> RPC_FLVR_SVC_OFFSET) & 0xF)
+/*
+ * extraction
+ */
+#define SPTLRPC_FLVR_POLICY(flavor)                                     \
+        ((((__u32)(flavor)) >> FLVR_POLICY_OFFSET) & 0xF)
+#define SPTLRPC_FLVR_MECH(flavor)                                       \
+        ((((__u32)(flavor)) >> FLVR_MECH_OFFSET) & 0xF)
+#define SPTLRPC_FLVR_SVC(flavor)                                        \
+        ((((__u32)(flavor)) >> FLVR_SVC_OFFSET) & 0xF)
+#define SPTLRPC_FLVR_BULK_TYPE(flavor)                                  \
+        ((((__u32)(flavor)) >> FLVR_BULK_TYPE_OFFSET) & 0xF)
+#define SPTLRPC_FLVR_BULK_SVC(flavor)                                   \
+        ((((__u32)(flavor)) >> FLVR_BULK_SVC_OFFSET) & 0xF)
+
+#define SPTLRPC_FLVR_BASE(flavor)                                       \
+        ((((__u32)(flavor)) >> FLVR_POLICY_OFFSET) & 0xFFF)
+#define SPTLRPC_FLVR_BASE_SUB(flavor)                                   \
+        ((((__u32)(flavor)) >> FLVR_MECH_OFFSET) & 0xFF)
 
 /*
  * gss subflavors
  */
+#define MAKE_BASE_SUBFLVR(mech, svc)                                    \
+        ((__u32)(mech) |                                                \
+         ((__u32)(svc) << (FLVR_SVC_OFFSET - FLVR_MECH_OFFSET)))
+
 #define SPTLRPC_SUBFLVR_KRB5N                                           \
-        MAKE_RPC_SUBFLVR(SPTLRPC_MECH_GSS_KRB5, SPTLRPC_SVC_NULL)
+        MAKE_BASE_SUBFLVR(SPTLRPC_MECH_GSS_KRB5, SPTLRPC_SVC_NULL)
 #define SPTLRPC_SUBFLVR_KRB5A                                           \
-        MAKE_RPC_SUBFLVR(SPTLRPC_MECH_GSS_KRB5, SPTLRPC_SVC_AUTH)
+        MAKE_BASE_SUBFLVR(SPTLRPC_MECH_GSS_KRB5, SPTLRPC_SVC_AUTH)
 #define SPTLRPC_SUBFLVR_KRB5I                                           \
-        MAKE_RPC_SUBFLVR(SPTLRPC_MECH_GSS_KRB5, SPTLRPC_SVC_INTG)
+        MAKE_BASE_SUBFLVR(SPTLRPC_MECH_GSS_KRB5, SPTLRPC_SVC_INTG)
 #define SPTLRPC_SUBFLVR_KRB5P                                           \
-        MAKE_RPC_SUBFLVR(SPTLRPC_MECH_GSS_KRB5, SPTLRPC_SVC_PRIV)
+        MAKE_BASE_SUBFLVR(SPTLRPC_MECH_GSS_KRB5, SPTLRPC_SVC_PRIV)
 
 /*
  * "end user" flavors
  */
 #define SPTLRPC_FLVR_NULL                               \
-        MAKE_RPC_FLVR(SPTLRPC_POLICY_NULL,              \
-                      SPTLRPC_MECH_NULL,                \
-                      SPTLRPC_SVC_NULL)
+        MAKE_FLVR(SPTLRPC_POLICY_NULL,                  \
+                  SPTLRPC_MECH_NULL,                    \
+                  SPTLRPC_SVC_NULL,                     \
+                  SPTLRPC_BULK_DEFAULT,                 \
+                  SPTLRPC_BULK_SVC_NULL)
 #define SPTLRPC_FLVR_PLAIN                              \
-        MAKE_RPC_FLVR(SPTLRPC_POLICY_PLAIN,             \
-                      SPTLRPC_MECH_PLAIN,               \
-                      SPTLRPC_SVC_NULL)
+        MAKE_FLVR(SPTLRPC_POLICY_PLAIN,                 \
+                  SPTLRPC_MECH_PLAIN,                   \
+                  SPTLRPC_SVC_NULL,                     \
+                  SPTLRPC_BULK_HASH,                    \
+                  SPTLRPC_BULK_SVC_INTG)
 #define SPTLRPC_FLVR_KRB5N                              \
-        MAKE_RPC_FLVR(SPTLRPC_POLICY_GSS,               \
-                      SPTLRPC_MECH_GSS_KRB5,            \
-                      SPTLRPC_SVC_NULL)
+        MAKE_FLVR(SPTLRPC_POLICY_GSS,                   \
+                  SPTLRPC_MECH_GSS_KRB5,                \
+                  SPTLRPC_SVC_NULL,                     \
+                  SPTLRPC_BULK_DEFAULT,                 \
+                  SPTLRPC_BULK_SVC_NULL)
 #define SPTLRPC_FLVR_KRB5A                              \
-        MAKE_RPC_FLVR(SPTLRPC_POLICY_GSS,               \
-                      SPTLRPC_MECH_GSS_KRB5,            \
-                      SPTLRPC_SVC_AUTH)
+        MAKE_FLVR(SPTLRPC_POLICY_GSS,                   \
+                  SPTLRPC_MECH_GSS_KRB5,                \
+                  SPTLRPC_SVC_AUTH,                     \
+                  SPTLRPC_BULK_DEFAULT,                 \
+                  SPTLRPC_BULK_SVC_NULL)
 #define SPTLRPC_FLVR_KRB5I                              \
-        MAKE_RPC_FLVR(SPTLRPC_POLICY_GSS,               \
-                      SPTLRPC_MECH_GSS_KRB5,            \
-                      SPTLRPC_SVC_INTG)
+        MAKE_FLVR(SPTLRPC_POLICY_GSS,                   \
+                  SPTLRPC_MECH_GSS_KRB5,                \
+                  SPTLRPC_SVC_INTG,                     \
+                  SPTLRPC_BULK_DEFAULT,                 \
+                  SPTLRPC_BULK_SVC_INTG)
 #define SPTLRPC_FLVR_KRB5P                              \
-        MAKE_RPC_FLVR(SPTLRPC_POLICY_GSS,               \
-                      SPTLRPC_MECH_GSS_KRB5,            \
-                      SPTLRPC_SVC_PRIV)
-
-#define SPTLRPC_FLVR_ANY                ((__u16) 0xf000)
-#define SPTLRPC_FLVR_INVALID            ((__u16) 0xffff)
+        MAKE_FLVR(SPTLRPC_POLICY_GSS,                   \
+                  SPTLRPC_MECH_GSS_KRB5,                \
+                  SPTLRPC_SVC_PRIV,                     \
+                  SPTLRPC_BULK_DEFAULT,                 \
+                  SPTLRPC_BULK_SVC_PRIV)
 
 #define SPTLRPC_FLVR_DEFAULT            SPTLRPC_FLVR_NULL
 
+#define SPTLRPC_FLVR_INVALID            ((__u32) 0xFFFFFFFF)
+#define SPTLRPC_FLVR_ANY                ((__u32) 0xFFF00000)
+
 /*
- * 32 bits wire flavor (msg->lm_secflvr), lower 12 bits is the rpc flavor,
- * higher 20 bits is not defined right now.
+ * extract the useful part from wire flavor
  */
-#define WIRE_FLVR_RPC(wflvr)            (((__u16) (wflvr)) & 0x0FFF)
+#define WIRE_FLVR(wflvr)                (((__u32) (wflvr)) & 0x000FFFFF)
 
-static inline void rpc_flvr_set_svc(__u16 *flvr, __u16 svc)
+static inline void flvr_set_svc(__u32 *flvr, __u32 svc)
 {
         LASSERT(svc < SPTLRPC_SVC_MAX);
-        *flvr = MAKE_RPC_FLVR(RPC_FLVR_POLICY(*flvr),
-                              RPC_FLVR_MECH(*flvr),
-                              svc);
+        *flvr = MAKE_FLVR(SPTLRPC_FLVR_POLICY(*flvr),
+                          SPTLRPC_FLVR_MECH(*flvr),
+                          svc,
+                          SPTLRPC_FLVR_BULK_TYPE(*flvr),
+                          SPTLRPC_FLVR_BULK_SVC(*flvr));
 }
 
+static inline void flvr_set_bulk_svc(__u32 *flvr, __u32 svc)
+{
+        LASSERT(svc < SPTLRPC_BULK_SVC_MAX);
+        *flvr = MAKE_FLVR(SPTLRPC_FLVR_POLICY(*flvr),
+                          SPTLRPC_FLVR_MECH(*flvr),
+                          SPTLRPC_FLVR_SVC(*flvr),
+                          SPTLRPC_FLVR_BULK_TYPE(*flvr),
+                          svc);
+}
+
+struct bulk_spec_hash {
+        __u8    hash_alg;
+};
 
 struct sptlrpc_flavor {
-        __u16   sf_rpc;         /* rpc flavor */
-        __u8    sf_bulk_ciph;   /* bulk cipher alg */
-        __u8    sf_bulk_hash;   /* bulk hash alg */
+        __u32   sf_rpc;         /* wire flavor - should be renamed to sf_wire */
         __u32   sf_flags;       /* general flags */
+        /*
+         * rpc flavor specification
+         */
+        union {
+                /* nothing for now */
+        } u_rpc;
+        /*
+         * bulk flavor specification
+         */
+        union {
+                struct bulk_spec_hash hash;
+        } u_bulk;
 };
 
 enum lustre_sec_part {
@@ -216,6 +280,7 @@ struct sptlrpc_rule_set {
 };
 
 int sptlrpc_parse_flavor(const char *str, struct sptlrpc_flavor *flvr);
+int sptlrpc_flavor_has_bulk(struct sptlrpc_flavor *flvr);
 
 static inline void sptlrpc_rule_set_init(struct sptlrpc_rule_set *set)
 {
@@ -223,10 +288,9 @@ static inline void sptlrpc_rule_set_init(struct sptlrpc_rule_set *set)
 }
 
 void sptlrpc_rule_set_free(struct sptlrpc_rule_set *set);
-int  sptlrpc_rule_set_expand(struct sptlrpc_rule_set *set, int expand);
+int  sptlrpc_rule_set_expand(struct sptlrpc_rule_set *set);
 int  sptlrpc_rule_set_merge(struct sptlrpc_rule_set *set,
-                            struct sptlrpc_rule *rule,
-                            int expand);
+                            struct sptlrpc_rule *rule);
 int sptlrpc_rule_set_choose(struct sptlrpc_rule_set *rset,
                             enum lustre_sec_part from,
                             enum lustre_sec_part to,
@@ -396,10 +460,12 @@ struct ptlrpc_sec_sops {
                                                 int msgsize);
         void                    (*free_rs)     (struct ptlrpc_reply_state *rs);
         void                    (*free_ctx)    (struct ptlrpc_svc_ctx *ctx);
-        /* reverse credential */
+        /* reverse context */
         int                     (*install_rctx)(struct obd_import *imp,
                                                 struct ptlrpc_svc_ctx *ctx);
         /* bulk transform */
+        int                     (*prep_bulk)   (struct ptlrpc_request *req,
+                                                struct ptlrpc_bulk_desc *desc);
         int                     (*unwrap_bulk) (struct ptlrpc_request *req,
                                                 struct ptlrpc_bulk_desc *desc);
         int                     (*wrap_bulk)   (struct ptlrpc_request *req,
@@ -481,55 +547,30 @@ enum sptlrpc_bulk_hash_alg {
         BULK_HASH_ALG_SHA256,
         BULK_HASH_ALG_SHA384,
         BULK_HASH_ALG_SHA512,
-        BULK_HASH_ALG_WP256,
-        BULK_HASH_ALG_WP384,
-        BULK_HASH_ALG_WP512,
         BULK_HASH_ALG_MAX
 };
 
-enum sptlrpc_bulk_cipher_alg {
-        BULK_CIPH_ALG_NULL      = 0,
-        BULK_CIPH_ALG_ARC4,
-        BULK_CIPH_ALG_AES128,
-        BULK_CIPH_ALG_AES192,
-        BULK_CIPH_ALG_AES256,
-        BULK_CIPH_ALG_CAST128,
-        BULK_CIPH_ALG_CAST256,
-        BULK_CIPH_ALG_TWOFISH128,
-        BULK_CIPH_ALG_TWOFISH256,
-        BULK_CIPH_ALG_MAX
-};
-
 struct sptlrpc_hash_type {
         char           *sht_name;
         char           *sht_tfm_name;
         unsigned int    sht_size;
 };
 
-struct sptlrpc_ciph_type {
-        char           *sct_name;
-        char           *sct_tfm_name;
-        __u32           sct_tfm_flags;
-        unsigned int    sct_ivsize;
-        unsigned int    sct_keysize;
-};
-
 const struct sptlrpc_hash_type *sptlrpc_get_hash_type(__u8 hash_alg);
 const char * sptlrpc_get_hash_name(__u8 hash_alg);
-const struct sptlrpc_ciph_type *sptlrpc_get_ciph_type(__u8 ciph_alg);
-const char *sptlrpc_get_ciph_name(__u8 ciph_alg);
+__u8 sptlrpc_get_hash_alg(const char *algname);
 
-#define CIPHER_MAX_BLKSIZE      (16)
-#define CIPHER_MAX_KEYSIZE      (64)
+enum {
+        BSD_FL_ERR      = 1,
+};
 
 struct ptlrpc_bulk_sec_desc {
-        __u8            bsd_version;
-        __u8            bsd_flags;
-        __u8            bsd_pad[4];
-        __u8            bsd_hash_alg;                /* hash algorithm */
-        __u8            bsd_ciph_alg;                /* cipher algorithm */
-        __u8            bsd_key[CIPHER_MAX_KEYSIZE]; /* encrypt key seed */
-        __u8            bsd_csum[0];
+        __u8            bsd_version;    /* 0 */
+        __u8            bsd_type;       /* SPTLRPC_BULK_XXX */
+        __u8            bsd_svc;        /* SPTLRPC_BULK_SVC_XXXX */
+        __u8            bsd_flags;      /* flags */
+        __u32           bsd_nob;        /* nob of bulk data */
+        __u8            bsd_data[0];    /* policy-specific token */
 };
 
 
@@ -567,9 +608,12 @@ void _sptlrpc_enlarge_msg_inplace(struct lustre_msg *msg,
 int sptlrpc_register_policy(struct ptlrpc_sec_policy *policy);
 int sptlrpc_unregister_policy(struct ptlrpc_sec_policy *policy);
 
-__u16 sptlrpc_name2rpcflavor(const char *name);
-const char *sptlrpc_rpcflavor2name(__u16 flavor);
-int sptlrpc_flavor2name(struct sptlrpc_flavor *sf, char *buf, int bufsize);
+__u32 sptlrpc_name2flavor_base(const char *name);
+const char *sptlrpc_flavor2name_base(__u32 flvr);
+char *sptlrpc_flavor2name_bulk(struct sptlrpc_flavor *sf,
+                               char *buf, int bufsize);
+char *sptlrpc_flavor2name(struct sptlrpc_flavor *sf, char *buf, int bufsize);
+char *sptlrpc_secflags2str(__u32 flags, char *buf, int bufsize);
 
 static inline
 struct ptlrpc_sec_policy *sptlrpc_policy_get(struct ptlrpc_sec_policy *policy)
@@ -672,7 +716,7 @@ void sptlrpc_request_out_callback(struct ptlrpc_request *req);
  */
 int sptlrpc_import_sec_adapt(struct obd_import *imp,
                              struct ptlrpc_svc_ctx *ctx,
-                             __u16 rpc_flavor);
+                             struct sptlrpc_flavor *flvr);
 struct ptlrpc_sec *sptlrpc_import_sec_ref(struct obd_import *imp);
 void sptlrpc_import_sec_put(struct obd_import *imp);
 
@@ -737,15 +781,23 @@ void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc);
 int sptlrpc_cli_wrap_bulk(struct ptlrpc_request *req,
                           struct ptlrpc_bulk_desc *desc);
 int sptlrpc_cli_unwrap_bulk_read(struct ptlrpc_request *req,
-                                 int nob, obd_count pg_count,
-                                 struct brw_page **pga);
+                                 struct ptlrpc_bulk_desc *desc,
+                                 int nob);
 int sptlrpc_cli_unwrap_bulk_write(struct ptlrpc_request *req,
                                   struct ptlrpc_bulk_desc *desc);
+int sptlrpc_svc_prep_bulk(struct ptlrpc_request *req,
+                          struct ptlrpc_bulk_desc *desc);
 int sptlrpc_svc_wrap_bulk(struct ptlrpc_request *req,
                           struct ptlrpc_bulk_desc *desc);
 int sptlrpc_svc_unwrap_bulk(struct ptlrpc_request *req,
                             struct ptlrpc_bulk_desc *desc);
 
+/* bulk helpers (internal use only by policies) */
+int sptlrpc_get_bulk_checksum(struct ptlrpc_bulk_desc *desc, __u8 alg,
+                              void *buf, int buflen);
+
+int bulk_sec_desc_unpack(struct lustre_msg *msg, int offset);
+
 /* user descriptor helpers */
 static inline int sptlrpc_user_desc_size(int ngroups)
 {
@@ -756,18 +808,6 @@ int sptlrpc_current_user_desc_size(void);
 int sptlrpc_pack_user_desc(struct lustre_msg *msg, int offset);
 int sptlrpc_unpack_user_desc(struct lustre_msg *msg, int offset);
 
-/* bulk helpers (internal use only by policies) */
-int bulk_sec_desc_size(__u8 hash_alg, int request, int read);
-int bulk_sec_desc_unpack(struct lustre_msg *msg, int offset);
-
-int bulk_csum_cli_request(struct ptlrpc_bulk_desc *desc, int read,
-                          __u32 alg, struct lustre_msg *rmsg, int roff);
-int bulk_csum_cli_reply(struct ptlrpc_bulk_desc *desc, int read,
-                        struct lustre_msg *rmsg, int roff,
-                        struct lustre_msg *vmsg, int voff);
-int bulk_csum_svc(struct ptlrpc_bulk_desc *desc, int read,
-                  struct ptlrpc_bulk_sec_desc *bsdv, int vsize,
-                  struct ptlrpc_bulk_sec_desc *bsdr, int rsize);
 
 #define CFS_CAP_CHOWN_MASK (1 << CFS_CAP_CHOWN)
 #define CFS_CAP_SYS_RESOURCE_MASK (1 << CFS_CAP_SYS_RESOURCE)
index 4bf5368..3c1b45a 100644 (file)
@@ -513,7 +513,13 @@ int client_disconnect_export(struct obd_export *exp)
                 to_be_freed = obd->obd_namespace;
         }
 
+        /*
+         * there's no necessary to hold sem during diconnecting an import,
+         * and actually it may cause deadlock in gss.
+         */
+        up_write(&cli->cl_sem);
         rc = ptlrpc_disconnect_import(imp, 0);
+        down_write(&cli->cl_sem);
 
         ptlrpc_invalidate_import(imp);
         /* set obd_namespace to NULL only after invalidate, because we can have
@@ -994,8 +1000,7 @@ dont_check_exports:
         else
                 revimp->imp_msghdr_flags &= ~MSGHDR_AT_SUPPORT;
 
-        rc = sptlrpc_import_sec_adapt(revimp, req->rq_svc_ctx,
-                                      req->rq_flvr.sf_rpc);
+        rc = sptlrpc_import_sec_adapt(revimp, req->rq_svc_ctx, &req->rq_flvr);
         if (rc) {
                 CERROR("Failed to get sec for reverse import: %d\n", rc);
                 export->exp_imp_reverse = NULL;
index 48c79ee..a8c902d 100644 (file)
@@ -961,7 +961,10 @@ int mdc_sendpage(struct obd_export *exp, const struct lu_fid *fid,
 
         ptlrpc_request_set_replen(req);
         rc = ptlrpc_queue_wait(req);
-        GOTO(out, rc);
+        if (rc)
+                GOTO(out, rc);
+
+        rc = sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk);
 out:
         ptlrpc_req_finished(req);
         return rc;
@@ -1011,6 +1014,13 @@ int mdc_readpage(struct obd_export *exp, const struct lu_fid *fid,
                 RETURN(rc);
         }
 
+        rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk,
+                                          req->rq_bulk->bd_nob_transferred);
+        if (rc < 0) {
+                ptlrpc_req_finished(req);
+                RETURN(rc);
+        }
+
         if (req->rq_bulk->bd_nob_transferred != CFS_PAGE_SIZE) {
                 CERROR("Unexpected # bytes transferred: %d (%ld expected)\n",
                         req->rq_bulk->bd_nob_transferred, CFS_PAGE_SIZE);
index 7acac5d..5c267db 100644 (file)
@@ -1168,6 +1168,10 @@ static int mdt_sendpage(struct mdt_thread_info *info,
         }
 
         LASSERT(desc->bd_nob == rdpg->rp_count);
+        rc = sptlrpc_svc_wrap_bulk(req, desc);
+        if (rc)
+                GOTO(free_desc, rc);
+
         rc = ptlrpc_start_bulk_transfer(desc);
         if (rc)
                 GOTO(free_desc, rc);
@@ -1327,6 +1331,9 @@ static int mdt_writepage(struct mdt_thread_info *info)
         ptlrpc_prep_bulk_page(desc, page, (int)reqbody->size,
                               (int)reqbody->nlink);
 
+        rc = sptlrpc_svc_prep_bulk(req, desc);
+        if (rc != 0)
+                GOTO(cleanup_page, rc);
         /*
          * Check if client was evicted while we were doing i/o before touching
          * network.
@@ -2771,6 +2778,15 @@ static int mdt_handle0(struct ptlrpc_request *req,
         if (likely(rc == 0)) {
                 rc = mdt_recovery(info);
                 if (likely(rc == +1)) {
+                        switch (lustre_msg_get_opc(msg)) {
+                        case MDS_READPAGE:
+                                req->rq_bulk_read = 1;
+                                break;
+                        case MDS_WRITEPAGE:
+                                req->rq_bulk_write = 1;
+                                break;
+                        }
+
                         h = mdt_handler_find(lustre_msg_get_opc(msg),
                                              supported);
                         if (likely(h != NULL)) {
index e328f33..e5adb2f 100644 (file)
@@ -2032,7 +2032,7 @@ static int mgs_srpc_set_param_mem(struct fs_db *fsdb,
                 rset = &fsdb->fsdb_srpc_gen;
         }
 
-        rc = sptlrpc_rule_set_merge(rset, &rule, 1);
+        rc = sptlrpc_rule_set_merge(rset, &rule);
 
         RETURN(rc);
 }
@@ -2046,6 +2046,9 @@ static int mgs_srpc_set_param(struct obd_device *obd,
         int                     rc, copy_size;
         ENTRY;
 
+#ifndef HAVE_GSS
+        RETURN(-EINVAL);
+#endif
         /* keep a copy of original param, which could be destroied
          * during parsing */
         copy_size = strlen(param) + 1;
index 1812828..1f7e465 100644 (file)
@@ -1175,7 +1175,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
         /* size[REQ_REC_OFF] still sizeof (*body) */
         if (opc == OST_WRITE) {
                 if (unlikely(cli->cl_checksum) &&
-                    req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
+                    !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
                         /* store cl_cksum_type in a local variable since
                          * it can be changed via lprocfs */
                         cksum_type_t cksum_type = cli->cl_cksum_type;
@@ -1204,7 +1204,7 @@ static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
                                      sizeof(__u32) * niocount);
         } else {
                 if (unlikely(cli->cl_checksum) &&
-                    req->rq_flvr.sf_bulk_hash == BULK_HASH_ALG_NULL) {
+                    !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
                                 body->oa.o_flags = 0;
                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
@@ -1331,6 +1331,9 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                 }
                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
 
+                if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
+                        RETURN(-EAGAIN);
+
                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
                     check_write_checksum(&body->oa, peer, client_cksum,
                                          body->oa.o_cksum, aa->aa_requested_nob,
@@ -1338,15 +1341,17 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
                                          cksum_type_unpack(aa->aa_oa->o_flags)))
                         RETURN(-EAGAIN);
 
-                if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
-                        RETURN(-EAGAIN);
-
                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
                                      aa->aa_page_count, aa->aa_ppga);
                 GOTO(out, rc);
         }
 
         /* The rest of this function executes only for OST_READs */
+
+        rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
+        if (rc < 0)
+                GOTO(out, rc);
+
         if (rc > aa->aa_requested_nob) {
                 CERROR("Unexpected rc %d (%d requested)\n", rc,
                        aa->aa_requested_nob);
@@ -1362,10 +1367,6 @@ static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
         if (rc < aa->aa_requested_nob)
                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
 
-        if (sptlrpc_cli_unwrap_bulk_read(req, rc, aa->aa_page_count,
-                                         aa->aa_ppga))
-                GOTO(out, rc = -EAGAIN);
-
         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
                 static int cksum_counter;
                 __u32      server_cksum = body->oa.o_cksum;
index 28430a4..1ae7308 100644 (file)
@@ -751,9 +751,9 @@ static int ost_brw_read(struct ptlrpc_request *req, struct obd_trans_info *oti)
                 if (exp->exp_failed)
                         rc = -ENOTCONN;
                 else {
-                        sptlrpc_svc_wrap_bulk(req, desc);
-
-                        rc = ptlrpc_start_bulk_transfer(desc);
+                        rc = sptlrpc_svc_wrap_bulk(req, desc);
+                        if (rc == 0)
+                                rc = ptlrpc_start_bulk_transfer(desc);
                 }
 
                 if (rc == 0) {
@@ -978,6 +978,10 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                                       local_nb[i].offset & ~CFS_PAGE_MASK,
                                       local_nb[i].len);
 
+        rc = sptlrpc_svc_prep_bulk(req, desc);
+        if (rc != 0)
+                GOTO(out_lock, rc);
+
         /* Check if client was evicted while we were doing i/o before touching
            network */
         if (desc->bd_export->exp_failed)
@@ -1012,23 +1016,18 @@ static int ost_brw_write(struct ptlrpc_request *req, struct obd_trans_info *oti)
                         DEBUG_REQ(D_ERROR, req, "Eviction on bulk GET");
                         rc = -ENOTCONN;
                         ptlrpc_abort_bulk(desc);
-                } else if (!desc->bd_success ||
-                           desc->bd_nob_transferred != desc->bd_nob) {
-                        DEBUG_REQ(D_ERROR, req, "%s bulk GET %d(%d)",
-                                  desc->bd_success ?
-                                  "truncated" : "network error on",
-                                  desc->bd_nob_transferred, desc->bd_nob);
+                } else if (!desc->bd_success) {
+                        DEBUG_REQ(D_ERROR, req, "network error on bulk GET");
                         /* XXX should this be a different errno? */
                         rc = -ETIMEDOUT;
+                } else {
+                        rc = sptlrpc_svc_unwrap_bulk(req, desc);
                 }
         } else {
                 DEBUG_REQ(D_ERROR, req, "ptlrpc_bulk_get failed: rc %d", rc);
         }
         no_reply = rc != 0;
 
-        if (rc == 0)
-                sptlrpc_svc_unwrap_bulk(req, desc);
-
         repbody = lustre_msg_buf(req->rq_repmsg, REPLY_REC_OFF,
                                  sizeof(*repbody));
         memcpy(&repbody->oa, &body->oa, sizeof(repbody->oa));
index 2d20ab7..92cdd7b 100644 (file)
@@ -194,7 +194,9 @@ void client_bulk_callback (lnet_event_t *ev)
                 desc->bd_sender = ev->sender;
         }
 
-        sptlrpc_enc_pool_put_pages(desc);
+        /* release the encrypted pages for write */
+        if (desc->bd_req->rq_bulk_write)
+                sptlrpc_enc_pool_put_pages(desc);
 
         /* NB don't unlock till after wakeup; desc can disappear under us
          * otherwise */
index 11b1c37..3b20c99 100644 (file)
@@ -51,11 +51,15 @@ __u32 lgss_get_mic(
                 struct gss_ctx          *ctx,
                 int                      msgcnt,
                 rawobj_t                *msgs,
+                int                      iovcnt,
+                lnet_kiov_t             *iovs,
                 rawobj_t                *mic_token);
 __u32 lgss_verify_mic(
                 struct gss_ctx          *ctx,
                 int                      msgcnt,
                 rawobj_t                *msgs,
+                int                      iovcnt,
+                lnet_kiov_t             *iovs,
                 rawobj_t                *mic_token);
 __u32 lgss_wrap(
                 struct gss_ctx          *ctx,
@@ -68,12 +72,18 @@ __u32 lgss_unwrap(
                 rawobj_t                *gsshdr,
                 rawobj_t                *token,
                 rawobj_t                *out_msg);
-__u32 lgss_plain_encrypt(
-                struct gss_ctx          *ctx,
-                int                      decrypt,
-                int                      length,
-                void                    *in_buf,
-                void                    *out_buf);
+__u32 lgss_prep_bulk(
+                struct gss_ctx          *gctx,
+                struct ptlrpc_bulk_desc *desc);
+__u32 lgss_wrap_bulk(
+                struct gss_ctx          *gctx,
+                struct ptlrpc_bulk_desc *desc,
+                rawobj_t                *token,
+                int                      adj_nob);
+__u32 lgss_unwrap_bulk(
+                struct gss_ctx          *gctx,
+                struct ptlrpc_bulk_desc *desc,
+                rawobj_t                *token);
 __u32 lgss_delete_sec_context(
                 struct gss_ctx         **ctx);
 int lgss_display(
@@ -115,11 +125,15 @@ struct gss_api_ops {
                         struct gss_ctx         *ctx,
                         int                     msgcnt,
                         rawobj_t               *msgs,
+                        int                     iovcnt,
+                        lnet_kiov_t            *iovs,
                         rawobj_t               *mic_token);
         __u32 (*gss_verify_mic)(
                         struct gss_ctx         *ctx,
                         int                     msgcnt,
                         rawobj_t               *msgs,
+                        int                     iovcnt,
+                        lnet_kiov_t            *iovs,
                         rawobj_t               *mic_token);
         __u32 (*gss_wrap)(
                         struct gss_ctx         *ctx,
@@ -132,12 +146,18 @@ struct gss_api_ops {
                         rawobj_t               *gsshdr,
                         rawobj_t               *token,
                         rawobj_t               *out_msg);
-        __u32 (*gss_plain_encrypt)(
-                        struct gss_ctx         *ctx,
-                        int                     decrypt,
-                        int                     length,
-                        void                   *in_buf,
-                        void                   *out_buf);
+        __u32 (*gss_prep_bulk)(
+                        struct gss_ctx         *gctx,
+                        struct ptlrpc_bulk_desc *desc);
+        __u32 (*gss_wrap_bulk)(
+                        struct gss_ctx         *gctx,
+                        struct ptlrpc_bulk_desc *desc,
+                        rawobj_t               *token,
+                        int                     adj_nob);
+        __u32 (*gss_unwrap_bulk)(
+                        struct gss_ctx         *gctx,
+                        struct ptlrpc_bulk_desc *desc,
+                        rawobj_t               *token);
         void (*gss_delete_sec_context)(
                         void                   *ctx);
         int  (*gss_display)(
index 03fd0ce..f8723f5 100644 (file)
 #include "gss_internal.h"
 #include "gss_api.h"
 
-static __u8 zero_iv[CIPHER_MAX_BLKSIZE] = { 0, };
-
-static void buf_to_sl(struct scatterlist *sl,
-                      void *buf, unsigned int len)
-{
-        sl->page = virt_to_page(buf);
-        sl->offset = offset_in_page(buf);
-        sl->length = len;
-}
-
-/*
- * CTS CBC encryption:
- * 1. X(n-1) = P(n-1)
- * 2. E(n-1) = Encrypt(K, X(n-1))
- * 3. C(n)   = HEAD(E(n-1))
- * 4. P      = P(n) | 0
- * 5. D(n)   = E(n-1) XOR P
- * 6. C(n-1) = Encrypt(K, D(n))
- *
- * CTS encryption using standard CBC interface:
- * 1. pad the last partial block with 0.
- * 2. do CBC encryption.
- * 3. swap the last two ciphertext blocks.
- * 4. truncate to original plaintext size.
- */
-static int cbc_cts_encrypt(struct ll_crypto_cipher *tfm,
-                           struct scatterlist      *sld,
-                           struct scatterlist      *sls)
-{
-        struct scatterlist      slst, sldt;
-        struct blkcipher_desc   desc;
-        void                   *data;
-        __u8                    sbuf[CIPHER_MAX_BLKSIZE];
-        __u8                    dbuf[CIPHER_MAX_BLKSIZE];
-        unsigned int            blksize, blks, tail;
-        int                     rc;
-
-        blksize = ll_crypto_blkcipher_blocksize(tfm);
-        blks = sls->length / blksize;
-        tail = sls->length % blksize;
-        LASSERT(blks > 0 && tail > 0);
-
-        /* pad tail block with 0, copy to sbuf */
-        data = cfs_kmap(sls->page);
-        memcpy(sbuf, data + sls->offset + blks * blksize, tail);
-        memset(sbuf + tail, 0, blksize - tail);
-        cfs_kunmap(sls->page);
-
-        buf_to_sl(&slst, sbuf, blksize);
-        buf_to_sl(&sldt, dbuf, blksize);
-        desc.tfm   = tfm;
-        desc.flags = 0;
-
-        /* encrypt head */
-        rc = ll_crypto_blkcipher_encrypt(&desc, sld, sls, sls->length - tail);
-        if (unlikely(rc)) {
-                CERROR("encrypt head (%u) data: %d\n", sls->length - tail, rc);
-                return rc;
-        }
-        /* encrypt tail */
-        rc = ll_crypto_blkcipher_encrypt(&desc, &sldt, &slst, blksize);
-        if (unlikely(rc)) {
-                CERROR("encrypt tail (%u) data: %d\n", slst.length, rc);
-                return rc;
-        }
-
-        /* swab C(n) and C(n-1), if n == 1, then C(n-1) is the IV */
-        data = cfs_kmap(sld->page);
-
-        memcpy(data + sld->offset + blks * blksize,
-               data + sld->offset + (blks - 1) * blksize, tail);
-        memcpy(data + sld->offset + (blks - 1) * blksize, dbuf, blksize);
-        cfs_kunmap(sld->page);
-
-        return 0;
-}
-
-/*
- * CTS CBC decryption:
- * 1. D(n)   = Decrypt(K, C(n-1))
- * 2. C      = C(n) | 0
- * 3. X(n)   = D(n) XOR C
- * 4. P(n)   = HEAD(X(n))
- * 5. E(n-1) = C(n) | TAIL(X(n))
- * 6. X(n-1) = Decrypt(K, E(n-1))
- * 7. P(n-1) = X(n-1) XOR C(n-2)
- *
- * CTS decryption using standard CBC interface:
- * 1. D(n)   = Decrypt(K, C(n-1))
- * 2. C(n)   = C(n) | TAIL(D(n))
- * 3. swap the last two ciphertext blocks.
- * 4. do CBC decryption.
- * 5. truncate to original ciphertext size.
- */
-static int cbc_cts_decrypt(struct ll_crypto_cipher *tfm,
-                           struct scatterlist *sld,
-                           struct scatterlist *sls)
-{
-        struct blkcipher_desc   desc;
-        struct scatterlist      slst, sldt;
-        void                   *data;
-        __u8                    sbuf[CIPHER_MAX_BLKSIZE];
-        __u8                    dbuf[CIPHER_MAX_BLKSIZE];
-        unsigned int            blksize, blks, tail;
-        int                     rc;
-
-        blksize = ll_crypto_blkcipher_blocksize(tfm);
-        blks = sls->length / blksize;
-        tail = sls->length % blksize;
-        LASSERT(blks > 0 && tail > 0);
-
-        /* save current IV, and set IV to zero */
-        ll_crypto_blkcipher_get_iv(tfm, sbuf, blksize);
-        ll_crypto_blkcipher_set_iv(tfm, zero_iv, blksize);
-
-        /* D(n) = Decrypt(K, C(n-1)) */
-        slst = *sls;
-        slst.offset += (blks - 1) * blksize;
-        slst.length = blksize;
-
-        buf_to_sl(&sldt, dbuf, blksize);
-        desc.tfm   = tfm;
-        desc.flags = 0;
-
-        rc = ll_crypto_blkcipher_decrypt(&desc, &sldt, &slst, blksize);
-        if (unlikely(rc)) {
-                CERROR("decrypt C(n-1) (%u): %d\n", slst.length, rc);
-                return rc;
-        }
-
-        /* restore IV */
-        ll_crypto_blkcipher_set_iv(tfm, sbuf, blksize);
-
-        data = cfs_kmap(sls->page);
-        /* C(n) = C(n) | TAIL(D(n)) */
-        memcpy(dbuf, data + sls->offset + blks * blksize, tail);
-        /* swab C(n) and C(n-1) */
-        memcpy(sbuf, data + sls->offset + (blks - 1) * blksize, blksize);
-        memcpy(data + sls->offset + (blks - 1) * blksize, dbuf, blksize);
-        cfs_kunmap(sls->page);
-
-        /* do cbc decrypt */
-        buf_to_sl(&slst, sbuf, blksize);
-        buf_to_sl(&sldt, dbuf, blksize);
-
-        /* decrypt head */
-        rc = ll_crypto_blkcipher_decrypt(&desc, sld, sls, sls->length - tail);
-        if (unlikely(rc)) {
-                CERROR("decrypt head (%u) data: %d\n", sls->length - tail, rc);
-                return rc;
-        }
-        /* decrypt tail */
-        rc = ll_crypto_blkcipher_decrypt(&desc, &sldt, &slst, blksize);
-        if (unlikely(rc)) {
-                CERROR("decrypt tail (%u) data: %d\n", slst.length, rc);
-                return rc;
-        }
-
-        /* truncate to original ciphertext size */
-        data = cfs_kmap(sld->page);
-        memcpy(data + sld->offset + blks * blksize, dbuf, tail);
-        cfs_kunmap(sld->page);
-
-        return 0;
-}
-
-static inline int do_cts_tfm(struct ll_crypto_cipher *tfm,
-                             int encrypt,
-                             struct scatterlist *sld,
-                             struct scatterlist *sls)
-{
-#ifndef HAVE_ASYNC_BLOCK_CIPHER
-        LASSERT(tfm->crt_cipher.cit_mode == CRYPTO_TFM_MODE_CBC);
-#endif
-
-        if (encrypt)
-                return cbc_cts_encrypt(tfm, sld, sls);
-        else
-                return cbc_cts_decrypt(tfm, sld, sls);
-}
-
-/*
- * normal encrypt/decrypt of data of even blocksize
- */
-static inline int do_cipher_tfm(struct ll_crypto_cipher *tfm,
-                                int encrypt,
-                                struct scatterlist *sld,
-                                struct scatterlist *sls)
-{
-        struct blkcipher_desc desc;
-        desc.tfm   = tfm;
-        desc.flags = 0;
-        if (encrypt)
-                return ll_crypto_blkcipher_encrypt(&desc, sld, sls, sls->length);
-        else
-                return ll_crypto_blkcipher_decrypt(&desc, sld, sls, sls->length);
-}
-
-static struct ll_crypto_cipher *get_stream_cipher(__u8 *key, unsigned int keylen)
-{
-        const struct sptlrpc_ciph_type *ct;
-        struct ll_crypto_cipher        *tfm;
-        int                             rc;
-
-        /* using ARC4, the only stream cipher in linux for now */
-        ct = sptlrpc_get_ciph_type(BULK_CIPH_ALG_ARC4);
-        LASSERT(ct);
-
-        tfm = ll_crypto_alloc_blkcipher(ct->sct_tfm_name, 0, 0);
-        if (tfm == NULL) {
-                CERROR("Failed to allocate stream TFM %s\n", ct->sct_name);
-                return NULL;
-        }
-        LASSERT(ll_crypto_blkcipher_blocksize(tfm));
-
-        if (keylen > ct->sct_keysize)
-                keylen = ct->sct_keysize;
-
-        LASSERT(keylen >= crypto_tfm_alg_min_keysize(tfm));
-        LASSERT(keylen <= crypto_tfm_alg_max_keysize(tfm));
-
-        rc = ll_crypto_blkcipher_setkey(tfm, key, keylen);
-        if (rc) {
-                CERROR("Failed to set key for TFM %s: %d\n", ct->sct_name, rc);
-                ll_crypto_free_blkcipher(tfm);
-                return NULL;
-        }
-
-        return tfm;
-}
-
-static int do_bulk_privacy(struct gss_ctx *gctx,
-                           struct ptlrpc_bulk_desc *desc,
-                           int encrypt, __u32 alg,
-                           struct ptlrpc_bulk_sec_desc *bsd)
-{
-        const struct sptlrpc_ciph_type *ct = sptlrpc_get_ciph_type(alg);
-        struct ll_crypto_cipher  *tfm;
-        struct ll_crypto_cipher  *stfm = NULL; /* backup stream cipher */
-        struct scatterlist        sls, sld, *sldp;
-        unsigned int              blksize, keygen_size;
-        int                       i, rc;
-        __u8                      key[CIPHER_MAX_KEYSIZE];
-
-        LASSERT(ct);
-
-        if (encrypt)
-                bsd->bsd_ciph_alg = BULK_CIPH_ALG_NULL;
-
-        if (alg == BULK_CIPH_ALG_NULL)
-                return 0;
-
-        if (desc->bd_iov_count <= 0) {
-                if (encrypt)
-                        bsd->bsd_ciph_alg = alg;
-                return 0;
-        }
-
-        tfm = ll_crypto_alloc_blkcipher(ct->sct_tfm_name, 0, 0 );
-        if (tfm == NULL) {
-                CERROR("Failed to allocate TFM %s\n", ct->sct_name);
-                return -ENOMEM;
-        }
-        blksize = ll_crypto_blkcipher_blocksize(tfm);
-
-        LASSERT(crypto_tfm_alg_max_keysize(tfm) >= ct->sct_keysize);
-        LASSERT(crypto_tfm_alg_min_keysize(tfm) <= ct->sct_keysize);
-        LASSERT(ct->sct_ivsize == 0 ||
-                ll_crypto_blkcipher_ivsize(tfm) == ct->sct_ivsize);
-        LASSERT(ct->sct_keysize <= CIPHER_MAX_KEYSIZE);
-        LASSERT(blksize <= CIPHER_MAX_BLKSIZE);
-
-        /* generate ramdom key seed and compute the secret key based on it.
-         * note determined by algorithm which lgss_plain_encrypt use, it
-         * might require the key size be its (blocksize * n). so here for
-         * simplicity, we force it's be n * MAX_BLKSIZE by padding 0 */
-        keygen_size = (ct->sct_keysize + CIPHER_MAX_BLKSIZE - 1) &
-                      ~(CIPHER_MAX_BLKSIZE - 1);
-        if (encrypt) {
-                get_random_bytes(bsd->bsd_key, ct->sct_keysize);
-                if (ct->sct_keysize < keygen_size)
-                        memset(bsd->bsd_key + ct->sct_keysize, 0,
-                               keygen_size - ct->sct_keysize);
-        }
-
-        rc = lgss_plain_encrypt(gctx, 0, keygen_size, bsd->bsd_key, key);
-        if (rc) {
-                CERROR("failed to compute secret key: %d\n", rc);
-                goto out;
-        }
-
-        rc = ll_crypto_blkcipher_setkey(tfm, key, ct->sct_keysize);
-        if (rc) {
-                CERROR("Failed to set key for TFM %s: %d\n", ct->sct_name, rc);
-                goto out;
-        }
-
-        /* stream cipher doesn't need iv */
-        if (blksize > 1)
-                ll_crypto_blkcipher_set_iv(tfm, zero_iv, blksize);
-
-        for (i = 0; i < desc->bd_iov_count; i++) {
-                sls.page = desc->bd_iov[i].kiov_page;
-                sls.offset = desc->bd_iov[i].kiov_offset;
-                sls.length = desc->bd_iov[i].kiov_len;
-
-                if (unlikely(sls.length == 0)) {
-                        CWARN("page %d with 0 length data?\n", i);
-                        continue;
-                }
-
-                if (unlikely(sls.offset % blksize)) {
-                        CERROR("page %d with odd offset %u, TFM %s\n",
-                               i, sls.offset, ct->sct_name);
-                        rc = -EINVAL;
-                        goto out;
-                }
-
-                if (desc->bd_enc_pages) {
-                        sld.page = desc->bd_enc_pages[i];
-                        sld.offset = desc->bd_iov[i].kiov_offset;
-                        sld.length = desc->bd_iov[i].kiov_len;
-
-                        sldp = &sld;
-                } else {
-                        sldp = &sls;
-                }
-
-                if (likely(sls.length % blksize == 0)) {
-                        /* data length is n * blocksize, do the normal tfm */
-                        rc = do_cipher_tfm(tfm, encrypt, sldp, &sls);
-                } else if (sls.length < blksize) {
-                        /* odd data length, and smaller than 1 block, CTS
-                         * doesn't work in this case because it requires
-                         * transfer a modified IV to peer. here we use a
-                         * "backup" stream cipher to do the tfm */
-                        if (stfm == NULL) {
-                                stfm = get_stream_cipher(key, ct->sct_keysize);
-                                if (tfm == NULL) {
-                                        rc = -ENOMEM;
-                                        goto out;
-                                }
-                        }
-                        rc = do_cipher_tfm(stfm, encrypt, sldp, &sls);
-                } else {
-                        /* odd data length but > 1 block, do CTS tfm */
-                        rc = do_cts_tfm(tfm, encrypt, sldp, &sls);
-                }
-
-                if (unlikely(rc)) {
-                        CERROR("error %s page %d/%d: %d\n",
-                               encrypt ? "encrypt" : "decrypt",
-                               i + 1, desc->bd_iov_count, rc);
-                        goto out;
-                }
-
-                if (desc->bd_enc_pages)
-                        desc->bd_iov[i].kiov_page = desc->bd_enc_pages[i];
-        }
-
-        if (encrypt)
-                bsd->bsd_ciph_alg = alg;
-
-out:
-        if (stfm)
-                ll_crypto_free_blkcipher(stfm);
-
-        ll_crypto_free_blkcipher(tfm);
-        return rc;
-}
-
 int gss_cli_ctx_wrap_bulk(struct ptlrpc_cli_ctx *ctx,
                           struct ptlrpc_request *req,
                           struct ptlrpc_bulk_desc *desc)
 {
         struct gss_cli_ctx              *gctx;
         struct lustre_msg               *msg;
-        struct ptlrpc_bulk_sec_desc     *bsdr;
-        int                              offset, rc;
+        struct ptlrpc_bulk_sec_desc     *bsd;
+        rawobj_t                         token;
+        __u32                            maj;
+        int                              offset;
+        int                              rc;
         ENTRY;
 
         LASSERT(req->rq_pack_bulk);
         LASSERT(req->rq_bulk_read || req->rq_bulk_write);
 
-        switch (RPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
+        gctx = container_of(ctx, struct gss_cli_ctx, gc_base);
+        LASSERT(gctx->gc_mechctx);
+
+        switch (SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
         case SPTLRPC_SVC_NULL:
                 LASSERT(req->rq_reqbuf->lm_bufcount >= 3);
                 msg = req->rq_reqbuf;
@@ -472,42 +107,68 @@ int gss_cli_ctx_wrap_bulk(struct ptlrpc_cli_ctx *ctx,
                 LBUG();
         }
 
-        /* make checksum */
-        rc = bulk_csum_cli_request(desc, req->rq_bulk_read,
-                                   req->rq_flvr.sf_bulk_hash, msg, offset);
-        if (rc) {
-                CERROR("client bulk %s: failed to generate checksum: %d\n",
-                       req->rq_bulk_read ? "read" : "write", rc);
-                RETURN(rc);
-        }
+        bsd = lustre_msg_buf(msg, offset, sizeof(*bsd));
+        bsd->bsd_version = 0;
+        bsd->bsd_flags = 0;
+        bsd->bsd_type = SPTLRPC_BULK_DEFAULT;
+        bsd->bsd_svc = SPTLRPC_FLVR_BULK_SVC(req->rq_flvr.sf_rpc);
 
-        if (req->rq_flvr.sf_bulk_ciph == BULK_CIPH_ALG_NULL)
+        if (bsd->bsd_svc == SPTLRPC_BULK_SVC_NULL)
                 RETURN(0);
 
-        /* previous bulk_csum_cli_request() has verified bsdr is good */
-        bsdr = lustre_msg_buf(msg, offset, 0);
+        LASSERT(bsd->bsd_svc == SPTLRPC_BULK_SVC_INTG ||
+                bsd->bsd_svc == SPTLRPC_BULK_SVC_PRIV);
 
         if (req->rq_bulk_read) {
-                bsdr->bsd_ciph_alg = req->rq_flvr.sf_bulk_ciph;
-                RETURN(0);
-        }
-
-        /* it turn out to be bulk write */
-        rc = sptlrpc_enc_pool_get_pages(desc);
-        if (rc) {
-                CERROR("bulk write: failed to allocate encryption pages\n");
-                RETURN(rc);
-        }
+                /*
+                 * bulk read: prepare receiving pages only for privacy mode.
+                 */
+                if (bsd->bsd_svc == SPTLRPC_BULK_SVC_PRIV)
+                        return gss_cli_prep_bulk(req, desc);
+        } else {
+                /*
+                 * bulk write: sign or encrypt bulk pages.
+                 */
+                bsd->bsd_nob = desc->bd_nob;
+
+                if (bsd->bsd_svc == SPTLRPC_BULK_SVC_INTG) {
+                        /* integrity mode */
+                        token.data = bsd->bsd_data;
+                        token.len = lustre_msg_buflen(msg, offset) -
+                                    sizeof(*bsd);
+
+                        maj = lgss_get_mic(gctx->gc_mechctx, 0, NULL,
+                                           desc->bd_iov_count, desc->bd_iov,
+                                           &token);
+                        if (maj != GSS_S_COMPLETE) {
+                                CWARN("failed to sign bulk data: %x\n", maj);
+                                RETURN(-EACCES);
+                        }
+                } else {
+                        /* privacy mode */
+                        if (desc->bd_iov_count == 0)
+                                RETURN(0);
+
+                        rc = sptlrpc_enc_pool_get_pages(desc);
+                        if (rc) {
+                                CERROR("bulk write: failed to allocate "
+                                       "encryption pages: %d\n", rc);
+                                RETURN(rc);
+                        }
 
-        gctx = container_of(ctx, struct gss_cli_ctx, gc_base);
-        LASSERT(gctx->gc_mechctx);
+                        token.data = bsd->bsd_data;
+                        token.len = lustre_msg_buflen(msg, offset) -
+                                    sizeof(*bsd);
 
-        rc = do_bulk_privacy(gctx->gc_mechctx, desc, 1,
-                             req->rq_flvr.sf_bulk_ciph, bsdr);
-        if (rc)
-                CERROR("bulk write: client failed to encrypt pages\n");
+                        maj = lgss_wrap_bulk(gctx->gc_mechctx, desc, &token, 0);
+                        if (maj != GSS_S_COMPLETE) {
+                                CWARN("fail to encrypt bulk data: %x\n", maj);
+                                RETURN(-EACCES);
+                        }
+                }
+        }
 
-        RETURN(rc);
+        RETURN(0);
 }
 
 int gss_cli_ctx_unwrap_bulk(struct ptlrpc_cli_ctx *ctx,
@@ -517,13 +178,15 @@ int gss_cli_ctx_unwrap_bulk(struct ptlrpc_cli_ctx *ctx,
         struct gss_cli_ctx              *gctx;
         struct lustre_msg               *rmsg, *vmsg;
         struct ptlrpc_bulk_sec_desc     *bsdr, *bsdv;
-        int                              roff, voff, rc;
+        rawobj_t                         token;
+        __u32                            maj;
+        int                              roff, voff;
         ENTRY;
 
         LASSERT(req->rq_pack_bulk);
         LASSERT(req->rq_bulk_read || req->rq_bulk_write);
 
-        switch (RPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
+        switch (SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
         case SPTLRPC_SVC_NULL:
                 vmsg = req->rq_repdata;
                 voff = vmsg->lm_bufcount - 1;
@@ -556,34 +219,158 @@ int gss_cli_ctx_unwrap_bulk(struct ptlrpc_cli_ctx *ctx,
                 LBUG();
         }
 
-        if (req->rq_bulk_read) {
-                bsdr = lustre_msg_buf(rmsg, roff, 0);
-                if (bsdr->bsd_ciph_alg == BULK_CIPH_ALG_NULL)
-                        goto verify_csum;
-
-                bsdv = lustre_msg_buf(vmsg, voff, 0);
-                if (bsdr->bsd_ciph_alg != bsdv->bsd_ciph_alg) {
-                        CERROR("bulk read: cipher algorithm mismatch: client "
-                               "request %s but server reply with %s. try to "
-                               "use the new one for decryption\n",
-                               sptlrpc_get_ciph_name(bsdr->bsd_ciph_alg),
-                               sptlrpc_get_ciph_name(bsdv->bsd_ciph_alg));
+        bsdr = lustre_msg_buf(rmsg, roff, sizeof(*bsdr));
+        bsdv = lustre_msg_buf(vmsg, voff, sizeof(*bsdv));
+        LASSERT(bsdr && bsdv);
+
+        if (bsdr->bsd_version != bsdv->bsd_version ||
+            bsdr->bsd_type != bsdv->bsd_type ||
+            bsdr->bsd_svc != bsdv->bsd_svc) {
+                CERROR("bulk security descriptor mismatch: "
+                       "(%u,%u,%u) != (%u,%u,%u)\n",
+                       bsdr->bsd_version, bsdr->bsd_type, bsdr->bsd_svc,
+                       bsdv->bsd_version, bsdv->bsd_type, bsdv->bsd_svc);
+                RETURN(-EPROTO);
+        }
+
+        LASSERT(bsdv->bsd_svc == SPTLRPC_BULK_SVC_NULL ||
+                bsdv->bsd_svc == SPTLRPC_BULK_SVC_INTG ||
+                bsdv->bsd_svc == SPTLRPC_BULK_SVC_PRIV);
+
+        /*
+         * in privacy mode if return success, make sure bd_nob_transferred
+         * is the actual size of the clear text, otherwise upper layer
+         * may be surprised.
+         */
+        if (req->rq_bulk_write) {
+                if (bsdv->bsd_flags & BSD_FL_ERR) {
+                        CERROR("server reported bulk i/o failure\n");
+                        RETURN(-EIO);
                 }
 
+                if (bsdv->bsd_svc == SPTLRPC_BULK_SVC_PRIV)
+                        desc->bd_nob_transferred = desc->bd_nob;
+        } else {
+                /*
+                 * bulk read, upon return success, bd_nob_transferred is
+                 * the size of plain text actually received.
+                 */
                 gctx = container_of(ctx, struct gss_cli_ctx, gc_base);
                 LASSERT(gctx->gc_mechctx);
 
-                rc = do_bulk_privacy(gctx->gc_mechctx, desc, 0,
-                                     bsdv->bsd_ciph_alg, bsdv);
-                if (rc) {
-                        CERROR("bulk read: client failed to decrypt data\n");
-                        RETURN(rc);
+                if (bsdv->bsd_svc == SPTLRPC_BULK_SVC_INTG) {
+                        int i, nob;
+
+                        /* fix the actual data size */
+                        for (i = 0, nob = 0; i < desc->bd_iov_count; i++) {
+                                if (desc->bd_iov[i].kiov_len + nob >
+                                    desc->bd_nob_transferred) {
+                                        desc->bd_iov[i].kiov_len =
+                                                desc->bd_nob_transferred - nob;
+                                }
+                                nob += desc->bd_iov[i].kiov_len;
+                        }
+
+                        token.data = bsdv->bsd_data;
+                        token.len = lustre_msg_buflen(vmsg, voff) -
+                                    sizeof(*bsdv);
+
+                        maj = lgss_verify_mic(gctx->gc_mechctx, 0, NULL,
+                                              desc->bd_iov_count, desc->bd_iov,
+                                              &token);
+                        if (maj != GSS_S_COMPLETE) {
+                                CERROR("failed to verify bulk read: %x\n", maj);
+                                RETURN(-EACCES);
+                        }
+                } else if (bsdv->bsd_svc == SPTLRPC_BULK_SVC_PRIV) {
+                        desc->bd_nob = bsdv->bsd_nob;
+                        if (desc->bd_nob == 0)
+                                RETURN(0);
+
+                        token.data = bsdv->bsd_data;
+                        token.len = lustre_msg_buflen(vmsg, voff) -
+                                    sizeof(*bsdr);
+
+                        maj = lgss_unwrap_bulk(gctx->gc_mechctx, desc, &token);
+                        if (maj != GSS_S_COMPLETE) {
+                                CERROR("failed to decrypt bulk read: %x\n",
+                                       maj);
+                                RETURN(-EACCES);
+                        }
+
+                        desc->bd_nob_transferred = desc->bd_nob;
                 }
         }
 
-verify_csum:
-        rc = bulk_csum_cli_reply(desc, req->rq_bulk_read,
-                                 rmsg, roff, vmsg, voff);
+        RETURN(0);
+}
+
+static int gss_prep_bulk(struct ptlrpc_bulk_desc *desc,
+                         struct gss_ctx *mechctx)
+{
+        int     rc;
+
+        if (desc->bd_iov_count == 0)
+                return 0;
+
+        rc = sptlrpc_enc_pool_get_pages(desc);
+        if (rc)
+                return rc;
+
+        if (lgss_prep_bulk(mechctx, desc) != GSS_S_COMPLETE)
+                return -EACCES;
+
+        return 0;
+}
+
+int gss_cli_prep_bulk(struct ptlrpc_request *req,
+                      struct ptlrpc_bulk_desc *desc)
+{
+        int             rc;
+        ENTRY;
+
+        LASSERT(req->rq_cli_ctx);
+        LASSERT(req->rq_pack_bulk);
+        LASSERT(req->rq_bulk_read);
+
+        if (SPTLRPC_FLVR_BULK_SVC(req->rq_flvr.sf_rpc) != SPTLRPC_BULK_SVC_PRIV)
+                RETURN(0);
+
+        rc = gss_prep_bulk(desc, ctx2gctx(req->rq_cli_ctx)->gc_mechctx);
+        if (rc)
+                CERROR("bulk read: failed to prepare encryption "
+                       "pages: %d\n", rc);
+
+        RETURN(rc);
+}
+
+int gss_svc_prep_bulk(struct ptlrpc_request *req,
+                      struct ptlrpc_bulk_desc *desc)
+{
+        struct gss_svc_reqctx        *grctx;
+        struct ptlrpc_bulk_sec_desc  *bsd;
+        int                           rc;
+        ENTRY;
+
+        LASSERT(req->rq_svc_ctx);
+        LASSERT(req->rq_pack_bulk);
+        LASSERT(req->rq_bulk_write);
+
+        grctx = gss_svc_ctx2reqctx(req->rq_svc_ctx);
+        LASSERT(grctx->src_reqbsd);
+        LASSERT(grctx->src_repbsd);
+        LASSERT(grctx->src_ctx);
+        LASSERT(grctx->src_ctx->gsc_mechctx);
+
+        bsd = grctx->src_reqbsd;
+        if (bsd->bsd_svc != SPTLRPC_BULK_SVC_PRIV)
+                RETURN(0);
+
+        rc = gss_prep_bulk(desc, grctx->src_ctx->gsc_mechctx);
+        if (rc)
+                CERROR("bulk write: failed to prepare encryption "
+                       "pages: %d\n", rc);
+
         RETURN(rc);
 }
 
@@ -591,7 +378,9 @@ int gss_svc_unwrap_bulk(struct ptlrpc_request *req,
                         struct ptlrpc_bulk_desc *desc)
 {
         struct gss_svc_reqctx        *grctx;
-        int                           rc;
+        struct ptlrpc_bulk_sec_desc  *bsdr, *bsdv;
+        rawobj_t                      token;
+        __u32                         maj;
         ENTRY;
 
         LASSERT(req->rq_svc_ctx);
@@ -605,29 +394,64 @@ int gss_svc_unwrap_bulk(struct ptlrpc_request *req,
         LASSERT(grctx->src_ctx);
         LASSERT(grctx->src_ctx->gsc_mechctx);
 
-        /* decrypt bulk data if it's encrypted */
-        if (grctx->src_reqbsd->bsd_ciph_alg != BULK_CIPH_ALG_NULL) {
-                rc = do_bulk_privacy(grctx->src_ctx->gsc_mechctx, desc, 0,
-                                     grctx->src_reqbsd->bsd_ciph_alg,
-                                     grctx->src_reqbsd);
-                if (rc) {
-                        CERROR("bulk write: server failed to decrypt data\n");
-                        RETURN(rc);
+        bsdr = grctx->src_reqbsd;
+        bsdv = grctx->src_repbsd;
+
+        /* bsdr has been sanity checked during unpacking */
+        bsdv->bsd_version = 0;
+        bsdv->bsd_type = SPTLRPC_BULK_DEFAULT;
+        bsdv->bsd_svc = bsdr->bsd_svc;
+        bsdv->bsd_flags = 0;
+
+        switch (bsdv->bsd_svc) {
+        case SPTLRPC_BULK_SVC_INTG:
+                token.data = bsdr->bsd_data;
+                token.len = grctx->src_reqbsd_size - sizeof(*bsdr);
+
+                maj = lgss_verify_mic(grctx->src_ctx->gsc_mechctx, 0, NULL,
+                                      desc->bd_iov_count, desc->bd_iov, &token);
+                if (maj != GSS_S_COMPLETE) {
+                        bsdv->bsd_flags |= BSD_FL_ERR;
+                        CERROR("failed to verify bulk signature: %x\n", maj);
+                        RETURN(-EACCES);
+                }
+                break;
+        case SPTLRPC_BULK_SVC_PRIV:
+                if (bsdr->bsd_nob != desc->bd_nob) {
+                        bsdv->bsd_flags |= BSD_FL_ERR;
+                        CERROR("prepared nob %d doesn't match the actual "
+                               "nob %d\n", desc->bd_nob, bsdr->bsd_nob);
+                        RETURN(-EPROTO);
                 }
-        }
 
-        /* verify bulk data checksum */
-        rc = bulk_csum_svc(desc, req->rq_bulk_read,
-                           grctx->src_reqbsd, grctx->src_reqbsd_size,
-                           grctx->src_repbsd, grctx->src_repbsd_size);
+                if (desc->bd_iov_count == 0) {
+                        LASSERT(desc->bd_nob == 0);
+                        break;
+                }
 
-        RETURN(rc);
+                token.data = bsdr->bsd_data;
+                token.len = grctx->src_reqbsd_size - sizeof(*bsdr);
+
+                maj = lgss_unwrap_bulk(grctx->src_ctx->gsc_mechctx,
+                                       desc, &token);
+                if (maj != GSS_S_COMPLETE) {
+                        bsdv->bsd_flags |= BSD_FL_ERR;
+                        CERROR("failed decrypt bulk data: %x\n", maj);
+                        RETURN(-EACCES);
+                }
+                break;
+        }
+
+        RETURN(0);
 }
 
 int gss_svc_wrap_bulk(struct ptlrpc_request *req,
                       struct ptlrpc_bulk_desc *desc)
 {
         struct gss_svc_reqctx        *grctx;
+        struct ptlrpc_bulk_sec_desc  *bsdr, *bsdv;
+        rawobj_t                      token;
+        __u32                         maj;
         int                           rc;
         ENTRY;
 
@@ -642,22 +466,56 @@ int gss_svc_wrap_bulk(struct ptlrpc_request *req,
         LASSERT(grctx->src_ctx);
         LASSERT(grctx->src_ctx->gsc_mechctx);
 
-        /* generate bulk data checksum */
-        rc = bulk_csum_svc(desc, req->rq_bulk_read,
-                           grctx->src_reqbsd, grctx->src_reqbsd_size,
-                           grctx->src_repbsd, grctx->src_repbsd_size);
-        if (rc)
-                RETURN(rc);
-
-        /* encrypt bulk data if required */
-        if (grctx->src_reqbsd->bsd_ciph_alg != BULK_CIPH_ALG_NULL) {
-                rc = do_bulk_privacy(grctx->src_ctx->gsc_mechctx, desc, 1,
-                                     grctx->src_reqbsd->bsd_ciph_alg,
-                                     grctx->src_repbsd);
-                if (rc)
-                        CERROR("bulk read: server failed to encrypt data: "
-                               "rc %d\n", rc);
+        bsdr = grctx->src_reqbsd;
+        bsdv = grctx->src_repbsd;
+
+        /* bsdr has been sanity checked during unpacking */
+        bsdv->bsd_version = 0;
+        bsdv->bsd_type = SPTLRPC_BULK_DEFAULT;
+        bsdv->bsd_svc = bsdr->bsd_svc;
+        bsdv->bsd_flags = 0;
+
+        switch (bsdv->bsd_svc) {
+        case SPTLRPC_BULK_SVC_INTG:
+                token.data = bsdv->bsd_data;
+                token.len = grctx->src_repbsd_size - sizeof(*bsdv);
+
+                maj = lgss_get_mic(grctx->src_ctx->gsc_mechctx, 0, NULL,
+                                   desc->bd_iov_count, desc->bd_iov, &token);
+                if (maj != GSS_S_COMPLETE) {
+                        bsdv->bsd_flags |= BSD_FL_ERR;
+                        CERROR("failed to sign bulk data: %x\n", maj);
+                        RETURN(-EACCES);
+                }
+                break;
+        case SPTLRPC_BULK_SVC_PRIV:
+                bsdv->bsd_nob = desc->bd_nob;
+
+                if (desc->bd_iov_count == 0) {
+                        LASSERT(desc->bd_nob == 0);
+                        break;
+                }
+
+                rc = sptlrpc_enc_pool_get_pages(desc);
+                if (rc) {
+                        bsdv->bsd_flags |= BSD_FL_ERR;
+                        CERROR("bulk read: failed to allocate encryption "
+                               "pages: %d\n", rc);
+                        RETURN(rc);
+                }
+
+                token.data = bsdv->bsd_data;
+                token.len = grctx->src_repbsd_size - sizeof(*bsdv);
+
+                maj = lgss_wrap_bulk(grctx->src_ctx->gsc_mechctx,
+                                     desc, &token, 1);
+                if (maj != GSS_S_COMPLETE) {
+                        bsdv->bsd_flags |= BSD_FL_ERR;
+                        CERROR("failed to encrypt bulk data: %x\n", maj);
+                        RETURN(-EACCES);
+                }
+                break;
         }
 
-        RETURN(rc);
+        RETURN(0);
 }
index afbb614..66afd61 100644 (file)
@@ -433,12 +433,16 @@ int  __init gss_init_pipefs(void);
 void __exit gss_exit_pipefs(void);
 
 /* gss_bulk.c */
+int gss_cli_prep_bulk(struct ptlrpc_request *req,
+                      struct ptlrpc_bulk_desc *desc);
 int gss_cli_ctx_wrap_bulk(struct ptlrpc_cli_ctx *ctx,
                           struct ptlrpc_request *req,
                           struct ptlrpc_bulk_desc *desc);
 int gss_cli_ctx_unwrap_bulk(struct ptlrpc_cli_ctx *ctx,
                             struct ptlrpc_request *req,
                             struct ptlrpc_bulk_desc *desc);
+int gss_svc_prep_bulk(struct ptlrpc_request *req,
+                      struct ptlrpc_bulk_desc *desc);
 int gss_svc_unwrap_bulk(struct ptlrpc_request *req,
                         struct ptlrpc_bulk_desc *desc);
 int gss_svc_wrap_bulk(struct ptlrpc_request *req,
index 74c786d..8906109 100644 (file)
@@ -1450,6 +1450,7 @@ static struct ptlrpc_sec_sops gss_sec_keyring_sops = {
         .authorize              = gss_svc_authorize,
         .free_rs                = gss_svc_free_rs,
         .free_ctx               = gss_svc_free_ctx,
+        .prep_bulk              = gss_svc_prep_bulk,
         .unwrap_bulk            = gss_svc_unwrap_bulk,
         .wrap_bulk              = gss_svc_wrap_bulk,
         .install_rctx           = gss_svc_install_rctx_kr,
index a9a5388..7eb0c95 100644 (file)
@@ -531,7 +531,7 @@ void gss_delete_sec_context_kerberos(void *internal_ctx)
 }
 
 static
-void buf_to_sg(struct scatterlist *sg, char *ptr, int len)
+void buf_to_sg(struct scatterlist *sg, void *ptr, int len)
 {
         sg->page = virt_to_page(ptr);
         sg->offset = offset_in_page(ptr);
@@ -582,13 +582,15 @@ out:
         return(ret);
 }
 
+#ifdef HAVE_ASYNC_BLOCK_CIPHER
+
 static inline
 int krb5_digest_hmac(struct ll_crypto_hash *tfm,
                      rawobj_t *key,
                      struct krb5_header *khdr,
                      int msgcnt, rawobj_t *msgs,
+                     int iovcnt, lnet_kiov_t *iovs,
                      rawobj_t *cksum)
-#ifdef HAVE_ASYNC_BLOCK_CIPHER
 {
         struct hash_desc   desc;
         struct scatterlist sg[1];
@@ -607,6 +609,15 @@ int krb5_digest_hmac(struct ll_crypto_hash *tfm,
                 ll_crypto_hash_update(&desc, sg, msgs[i].len);
         }
 
+        for (i = 0; i < iovcnt; i++) {
+                if (iovs[i].kiov_len == 0)
+                        continue;
+                sg[0].page = iovs[i].kiov_page;
+                sg[0].offset = iovs[i].kiov_offset;
+                sg[0].length = iovs[i].kiov_len;
+                ll_crypto_hash_update(&desc, sg, iovs[i].kiov_len);
+        }
+
         if (khdr) {
                 buf_to_sg(sg, (char *) khdr, sizeof(*khdr));
                 ll_crypto_hash_update(&desc, sg, sizeof(*khdr));
@@ -614,7 +625,16 @@ int krb5_digest_hmac(struct ll_crypto_hash *tfm,
 
         return ll_crypto_hash_final(&desc, cksum->data);
 }
-#else /* HAVE_ASYNC_BLOCK_CIPHER */
+
+#else /* ! HAVE_ASYNC_BLOCK_CIPHER */
+
+static inline
+int krb5_digest_hmac(struct ll_crypto_hash *tfm,
+                     rawobj_t *key,
+                     struct krb5_header *khdr,
+                     int msgcnt, rawobj_t *msgs,
+                     int iovcnt, lnet_kiov_t *iovs,
+                     rawobj_t *cksum)
 {
         struct scatterlist sg[1];
         __u32              keylen = key->len, i;
@@ -628,6 +648,15 @@ int krb5_digest_hmac(struct ll_crypto_hash *tfm,
                 crypto_hmac_update(tfm, sg, 1);
         }
 
+        for (i = 0; i < iovcnt; i++) {
+                if (iovs[i].kiov_len == 0)
+                        continue;
+                sg[0].page = iovs[i].kiov_page;
+                sg[0].offset = iovs[i].kiov_offset;
+                sg[0].length = iovs[i].kiov_len;
+                crypto_hmac_update(tfm, sg, 1);
+        }
+
         if (khdr) {
                 buf_to_sg(sg, (char *) khdr, sizeof(*khdr));
                 crypto_hmac_update(tfm, sg, 1);
@@ -636,6 +665,7 @@ int krb5_digest_hmac(struct ll_crypto_hash *tfm,
         crypto_hmac_final(tfm, key->data, &keylen, cksum->data);
         return 0;
 }
+
 #endif /* HAVE_ASYNC_BLOCK_CIPHER */
 
 static inline
@@ -643,6 +673,7 @@ int krb5_digest_norm(struct ll_crypto_hash *tfm,
                      struct krb5_keyblock *kb,
                      struct krb5_header *khdr,
                      int msgcnt, rawobj_t *msgs,
+                     int iovcnt, lnet_kiov_t *iovs,
                      rawobj_t *cksum)
 {
         struct hash_desc   desc;
@@ -662,6 +693,15 @@ int krb5_digest_norm(struct ll_crypto_hash *tfm,
                 ll_crypto_hash_update(&desc, sg, msgs[i].len);
         }
 
+        for (i = 0; i < iovcnt; i++) {
+                if (iovs[i].kiov_len == 0)
+                        continue;
+                sg[0].page = iovs[i].kiov_page;
+                sg[0].offset = iovs[i].kiov_offset;
+                sg[0].length = iovs[i].kiov_len;
+                ll_crypto_hash_update(&desc, sg, iovs[i].kiov_len);
+        }
+
         if (khdr) {
                 buf_to_sg(sg, (char *) khdr, sizeof(*khdr));
                 ll_crypto_hash_update(&desc, sg, sizeof(*khdr));
@@ -682,6 +722,7 @@ __s32 krb5_make_checksum(__u32 enctype,
                          struct krb5_keyblock *kb,
                          struct krb5_header *khdr,
                          int msgcnt, rawobj_t *msgs,
+                         int iovcnt, lnet_kiov_t *iovs,
                          rawobj_t *cksum)
 {
         struct krb5_enctype   *ke = &enctypes[enctype];
@@ -703,10 +744,10 @@ __s32 krb5_make_checksum(__u32 enctype,
 
         if (ke->ke_hash_hmac)
                 rc = krb5_digest_hmac(tfm, &kb->kb_key,
-                                      khdr, msgcnt, msgs, cksum);
+                                      khdr, msgcnt, msgs, iovcnt, iovs, cksum);
         else
                 rc = krb5_digest_norm(tfm, kb,
-                                      khdr, msgcnt, msgs, cksum);
+                                      khdr, msgcnt, msgs, iovcnt, iovs, cksum);
 
         if (rc == 0)
                 code = GSS_S_COMPLETE;
@@ -715,38 +756,96 @@ out_tfm:
         return code;
 }
 
+static void fill_krb5_header(struct krb5_ctx *kctx,
+                             struct krb5_header *khdr,
+                             int privacy)
+{
+        unsigned char acceptor_flag;
+
+        acceptor_flag = kctx->kc_initiate ? 0 : FLAG_SENDER_IS_ACCEPTOR;
+
+        if (privacy) {
+                khdr->kh_tok_id = cpu_to_be16(KG_TOK_WRAP_MSG);
+                khdr->kh_flags = acceptor_flag | FLAG_WRAP_CONFIDENTIAL;
+                khdr->kh_ec = cpu_to_be16(0);
+                khdr->kh_rrc = cpu_to_be16(0);
+        } else {
+                khdr->kh_tok_id = cpu_to_be16(KG_TOK_MIC_MSG);
+                khdr->kh_flags = acceptor_flag;
+                khdr->kh_ec = cpu_to_be16(0xffff);
+                khdr->kh_rrc = cpu_to_be16(0xffff);
+        }
+
+        khdr->kh_filler = 0xff;
+        spin_lock(&krb5_seq_lock);
+        khdr->kh_seq = cpu_to_be64(kctx->kc_seq_send++);
+        spin_unlock(&krb5_seq_lock);
+}
+
+static __u32 verify_krb5_header(struct krb5_ctx *kctx,
+                                struct krb5_header *khdr,
+                                int privacy)
+{
+        unsigned char acceptor_flag;
+        __u16         tok_id, ec_rrc;
+
+        acceptor_flag = kctx->kc_initiate ? FLAG_SENDER_IS_ACCEPTOR : 0;
+
+        if (privacy) {
+                tok_id = KG_TOK_WRAP_MSG;
+                ec_rrc = 0x0;
+        } else {
+                tok_id = KG_TOK_MIC_MSG;
+                ec_rrc = 0xffff;
+        }
+
+        /* sanity checks */
+        if (be16_to_cpu(khdr->kh_tok_id) != tok_id) {
+                CERROR("bad token id\n");
+                return GSS_S_DEFECTIVE_TOKEN;
+        }
+        if ((khdr->kh_flags & FLAG_SENDER_IS_ACCEPTOR) != acceptor_flag) {
+                CERROR("bad direction flag\n");
+                return GSS_S_BAD_SIG;
+        }
+        if (privacy && (khdr->kh_flags & FLAG_WRAP_CONFIDENTIAL) == 0) {
+                CERROR("missing confidential flag\n");
+                return GSS_S_BAD_SIG;
+        }
+        if (khdr->kh_filler != 0xff) {
+                CERROR("bad filler\n");
+                return GSS_S_DEFECTIVE_TOKEN;
+        }
+        if (be16_to_cpu(khdr->kh_ec) != ec_rrc ||
+            be16_to_cpu(khdr->kh_rrc) != ec_rrc) {
+                CERROR("bad EC or RRC\n");
+                return GSS_S_DEFECTIVE_TOKEN;
+        }
+        return GSS_S_COMPLETE;
+}
+
 static
 __u32 gss_get_mic_kerberos(struct gss_ctx *gctx,
                            int msgcnt,
                            rawobj_t *msgs,
+                           int iovcnt,
+                           lnet_kiov_t *iovs,
                            rawobj_t *token)
 {
         struct krb5_ctx     *kctx = gctx->internal_ctx_id;
         struct krb5_enctype *ke = &enctypes[kctx->kc_enctype];
         struct krb5_header  *khdr;
-        unsigned char        acceptor_flag;
         rawobj_t             cksum = RAWOBJ_EMPTY;
-        __u32                rc = GSS_S_FAILURE;
-
-        acceptor_flag = kctx->kc_initiate ? 0 : FLAG_SENDER_IS_ACCEPTOR;
 
         /* fill krb5 header */
         LASSERT(token->len >= sizeof(*khdr));
         khdr = (struct krb5_header *) token->data;
-
-        khdr->kh_tok_id = cpu_to_be16(KG_TOK_MIC_MSG);
-        khdr->kh_flags = acceptor_flag;
-        khdr->kh_filler = 0xff;
-        khdr->kh_ec = cpu_to_be16(0xffff);
-        khdr->kh_rrc = cpu_to_be16(0xffff);
-        spin_lock(&krb5_seq_lock);
-        khdr->kh_seq = cpu_to_be64(kctx->kc_seq_send++);
-        spin_unlock(&krb5_seq_lock);
+        fill_krb5_header(kctx, khdr, 0);
 
         /* checksum */
         if (krb5_make_checksum(kctx->kc_enctype, &kctx->kc_keyc,
-                               khdr, msgcnt, msgs, &cksum))
-                goto out_err;
+                               khdr, msgcnt, msgs, iovcnt, iovs, &cksum))
+                return GSS_S_FAILURE;
 
         LASSERT(cksum.len >= ke->ke_hash_size);
         LASSERT(token->len >= sizeof(*khdr) + ke->ke_hash_size);
@@ -754,26 +853,23 @@ __u32 gss_get_mic_kerberos(struct gss_ctx *gctx,
                ke->ke_hash_size);
 
         token->len = sizeof(*khdr) + ke->ke_hash_size;
-        rc = GSS_S_COMPLETE;
-out_err:
         rawobj_free(&cksum);
-        return rc;
+        return GSS_S_COMPLETE;
 }
 
 static
 __u32 gss_verify_mic_kerberos(struct gss_ctx *gctx,
                               int msgcnt,
                               rawobj_t *msgs,
+                              int iovcnt,
+                              lnet_kiov_t *iovs,
                               rawobj_t *token)
 {
         struct krb5_ctx     *kctx = gctx->internal_ctx_id;
         struct krb5_enctype *ke = &enctypes[kctx->kc_enctype];
         struct krb5_header  *khdr;
-        unsigned char        acceptor_flag;
         rawobj_t             cksum = RAWOBJ_EMPTY;
-        __u32                rc = GSS_S_FAILURE;
-
-        acceptor_flag = kctx->kc_initiate ? FLAG_SENDER_IS_ACCEPTOR : 0;
+        __u32                major;
 
         if (token->len < sizeof(*khdr)) {
                 CERROR("short signature: %u\n", token->len);
@@ -782,47 +878,34 @@ __u32 gss_verify_mic_kerberos(struct gss_ctx *gctx,
 
         khdr = (struct krb5_header *) token->data;
 
-        /* sanity checks */
-        if (be16_to_cpu(khdr->kh_tok_id) != KG_TOK_MIC_MSG) {
-                CERROR("bad token id\n");
-                return GSS_S_DEFECTIVE_TOKEN;
-        }
-        if ((khdr->kh_flags & FLAG_SENDER_IS_ACCEPTOR) != acceptor_flag) {
-                CERROR("bad direction flag\n");
-                return GSS_S_BAD_SIG;
-        }
-        if (khdr->kh_filler != 0xff) {
-                CERROR("bad filler\n");
-                return GSS_S_DEFECTIVE_TOKEN;
-        }
-        if (be16_to_cpu(khdr->kh_ec) != 0xffff ||
-            be16_to_cpu(khdr->kh_rrc) != 0xffff) {
-                CERROR("bad EC or RRC\n");
-                return GSS_S_DEFECTIVE_TOKEN;
+        major = verify_krb5_header(kctx, khdr, 0);
+        if (major != GSS_S_COMPLETE) {
+                CERROR("bad krb5 header\n");
+                return major;
         }
 
         if (token->len < sizeof(*khdr) + ke->ke_hash_size) {
                 CERROR("short signature: %u, require %d\n",
                        token->len, (int) sizeof(*khdr) + ke->ke_hash_size);
-                goto out;
+                return GSS_S_FAILURE;
         }
 
         if (krb5_make_checksum(kctx->kc_enctype, &kctx->kc_keyc,
-                               khdr, msgcnt, msgs, &cksum))
+                               khdr, msgcnt, msgs, iovcnt, iovs, &cksum)) {
+                CERROR("failed to make checksum\n");
                 return GSS_S_FAILURE;
+        }
 
         LASSERT(cksum.len >= ke->ke_hash_size);
         if (memcmp(khdr + 1, cksum.data + cksum.len - ke->ke_hash_size,
                    ke->ke_hash_size)) {
                 CERROR("checksum mismatch\n");
-                rc = GSS_S_BAD_SIG;
-                goto out;
+                rawobj_free(&cksum);
+                return GSS_S_BAD_SIG;
         }
 
-        rc = GSS_S_COMPLETE;
-out:
         rawobj_free(&cksum);
-        return rc;
+        return GSS_S_COMPLETE;
 }
 
 static
@@ -902,6 +985,195 @@ int krb5_encrypt_rawobjs(struct ll_crypto_cipher *tfm,
 }
 
 static
+int krb5_encrypt_bulk(struct ll_crypto_cipher *tfm,
+                      struct krb5_header *khdr,
+                      char *confounder,
+                      struct ptlrpc_bulk_desc *desc,
+                      rawobj_t *cipher,
+                      int adj_nob)
+{
+        struct blkcipher_desc   ciph_desc;
+        __u8                    local_iv[16] = {0};
+        struct scatterlist      src, dst;
+        int                     blocksize, i, rc, nob = 0;
+
+        LASSERT(desc->bd_iov_count);
+        LASSERT(desc->bd_enc_iov);
+
+        blocksize = ll_crypto_blkcipher_blocksize(tfm);
+        LASSERT(blocksize > 1);
+        LASSERT(cipher->len == blocksize + sizeof(*khdr));
+
+        ciph_desc.tfm  = tfm;
+        ciph_desc.info = local_iv;
+        ciph_desc.flags = 0;
+
+        /* encrypt confounder */
+        buf_to_sg(&src, confounder, blocksize);
+        buf_to_sg(&dst, cipher->data, blocksize);
+
+        rc = ll_crypto_blkcipher_encrypt_iv(&ciph_desc, &dst, &src, blocksize);
+        if (rc) {
+                CERROR("error to encrypt confounder: %d\n", rc);
+                return rc;
+        }
+
+        /* encrypt clear pages */
+        for (i = 0; i < desc->bd_iov_count; i++) {
+                src.page = desc->bd_iov[i].kiov_page;
+                src.offset = desc->bd_iov[i].kiov_offset;
+                src.length = (desc->bd_iov[i].kiov_len + blocksize - 1) &
+                             (~(blocksize - 1));
+
+                if (adj_nob)
+                        nob += src.length;
+
+                dst.page = desc->bd_enc_iov[i].kiov_page;
+                dst.offset = src.offset;
+                dst.length = src.length;
+
+                desc->bd_enc_iov[i].kiov_offset = dst.offset;
+                desc->bd_enc_iov[i].kiov_len = dst.length;
+
+                rc = ll_crypto_blkcipher_encrypt_iv(&ciph_desc, &dst, &src,
+                                                    src.length);
+                if (rc) {
+                        CERROR("error to encrypt page: %d\n", rc);
+                        return rc;
+                }
+        }
+
+        /* encrypt krb5 header */
+        buf_to_sg(&src, khdr, sizeof(*khdr));
+        buf_to_sg(&dst, cipher->data + blocksize, sizeof(*khdr));
+
+        rc = ll_crypto_blkcipher_encrypt_iv(&ciph_desc,
+                                            &dst, &src, sizeof(*khdr));
+        if (rc) {
+                CERROR("error to encrypt krb5 header: %d\n", rc);
+                return rc;
+        }
+
+        if (adj_nob)
+                desc->bd_nob = nob;
+
+        return 0;
+}
+
+/*
+ * desc->bd_nob_transferred is the size of cipher text received.
+ * desc->bd_nob is the target size of plain text supposed to be.
+ */
+static
+int krb5_decrypt_bulk(struct ll_crypto_cipher *tfm,
+                      struct krb5_header *khdr,
+                      struct ptlrpc_bulk_desc *desc,
+                      rawobj_t *cipher,
+                      rawobj_t *plain)
+{
+        struct blkcipher_desc   ciph_desc;
+        __u8                    local_iv[16] = {0};
+        struct scatterlist      src, dst;
+        int                     ct_nob = 0, pt_nob = 0;
+        int                     blocksize, i, rc;
+
+        LASSERT(desc->bd_iov_count);
+        LASSERT(desc->bd_enc_iov);
+        LASSERT(desc->bd_nob_transferred);
+
+        blocksize = ll_crypto_blkcipher_blocksize(tfm);
+        LASSERT(blocksize > 1);
+        LASSERT(cipher->len == blocksize + sizeof(*khdr));
+
+        ciph_desc.tfm  = tfm;
+        ciph_desc.info = local_iv;
+        ciph_desc.flags = 0;
+
+        if (desc->bd_nob_transferred % blocksize) {
+                CERROR("odd transferred nob: %d\n", desc->bd_nob_transferred);
+                return -EPROTO;
+        }
+
+        /* decrypt head (confounder) */
+        buf_to_sg(&src, cipher->data, blocksize);
+        buf_to_sg(&dst, plain->data, blocksize);
+
+        rc = ll_crypto_blkcipher_decrypt_iv(&ciph_desc, &dst, &src, blocksize);
+        if (rc) {
+                CERROR("error to decrypt confounder: %d\n", rc);
+                return rc;
+        }
+
+        /*
+         * decrypt clear pages. note the enc_iov is prepared by prep_bulk()
+         * which already done some sanity checkings.
+         *
+         * desc->bd_nob is the actual plain text size supposed to be
+         * transferred. desc->bd_nob_transferred is the actual cipher
+         * text received.
+         */
+        for (i = 0; i < desc->bd_iov_count && ct_nob < desc->bd_nob_transferred;
+             i++) {
+                if (desc->bd_enc_iov[i].kiov_len == 0)
+                        continue;
+
+                if (ct_nob + desc->bd_enc_iov[i].kiov_len >
+                    desc->bd_nob_transferred)
+                        desc->bd_enc_iov[i].kiov_len =
+                                desc->bd_nob_transferred - ct_nob;
+
+                desc->bd_iov[i].kiov_len = desc->bd_enc_iov[i].kiov_len;
+                if (pt_nob + desc->bd_enc_iov[i].kiov_len > desc->bd_nob)
+                        desc->bd_iov[i].kiov_len = desc->bd_nob - pt_nob;
+
+                src.page = desc->bd_enc_iov[i].kiov_page;
+                src.offset = desc->bd_enc_iov[i].kiov_offset;
+                src.length = desc->bd_enc_iov[i].kiov_len;
+
+                dst = src;
+
+                if (desc->bd_iov[i].kiov_offset % blocksize == 0)
+                        dst.page = desc->bd_iov[i].kiov_page;
+
+                rc = ll_crypto_blkcipher_decrypt_iv(&ciph_desc, &dst, &src,
+                                                    src.length);
+                if (rc) {
+                        CERROR("error to decrypt page: %d\n", rc);
+                        return rc;
+                }
+
+                if (desc->bd_iov[i].kiov_offset % blocksize) {
+                        memcpy(cfs_page_address(desc->bd_iov[i].kiov_page) +
+                               desc->bd_iov[i].kiov_offset,
+                               cfs_page_address(desc->bd_enc_iov[i].kiov_page) +
+                               desc->bd_iov[i].kiov_offset,
+                               desc->bd_iov[i].kiov_len);
+                }
+
+                ct_nob += desc->bd_enc_iov[i].kiov_len;
+                pt_nob += desc->bd_iov[i].kiov_len;
+        }
+
+        /* decrypt tail (krb5 header) */
+        buf_to_sg(&src, cipher->data + blocksize, sizeof(*khdr));
+        buf_to_sg(&dst, cipher->data + blocksize, sizeof(*khdr));
+
+        rc = ll_crypto_blkcipher_decrypt_iv(&ciph_desc,
+                                            &dst, &src, sizeof(*khdr));
+        if (rc) {
+                CERROR("error to decrypt tail: %d\n", rc);
+                return rc;
+        }
+
+        if (memcmp(cipher->data + blocksize, khdr, sizeof(*khdr))) {
+                CERROR("krb5 header doesn't match\n");
+                return -EACCES;
+        }
+
+        return 0;
+}
+
+static
 __u32 gss_wrap_kerberos(struct gss_ctx *gctx,
                         rawobj_t *gsshdr,
                         rawobj_t *msg,
@@ -911,12 +1183,11 @@ __u32 gss_wrap_kerberos(struct gss_ctx *gctx,
         struct krb5_ctx     *kctx = gctx->internal_ctx_id;
         struct krb5_enctype *ke = &enctypes[kctx->kc_enctype];
         struct krb5_header  *khdr;
-        unsigned char        acceptor_flag;
         int                  blocksize;
         rawobj_t             cksum = RAWOBJ_EMPTY;
-        rawobj_t             data_desc[4], cipher;
+        rawobj_t             data_desc[3], cipher;
         __u8                 conf[GSS_MAX_CIPHER_BLOCK];
-        int                  enc_rc = 0;
+        int                  rc = 0;
 
         LASSERT(ke);
         LASSERT(ke->ke_conf_size <= GSS_MAX_CIPHER_BLOCK);
@@ -934,16 +1205,7 @@ __u32 gss_wrap_kerberos(struct gss_ctx *gctx,
         /* fill krb5 header */
         LASSERT(token->len >= sizeof(*khdr));
         khdr = (struct krb5_header *) token->data;
-        acceptor_flag = kctx->kc_initiate ? 0 : FLAG_SENDER_IS_ACCEPTOR;
-
-        khdr->kh_tok_id = cpu_to_be16(KG_TOK_WRAP_MSG);
-        khdr->kh_flags = acceptor_flag | FLAG_WRAP_CONFIDENTIAL;
-        khdr->kh_filler = 0xff;
-        khdr->kh_ec = cpu_to_be16(0);
-        khdr->kh_rrc = cpu_to_be16(0);
-        spin_lock(&krb5_seq_lock);
-        khdr->kh_seq = cpu_to_be64(kctx->kc_seq_send++);
-        spin_unlock(&krb5_seq_lock);
+        fill_krb5_header(kctx, khdr, 1);
 
         /* generate confounder */
         get_random_bytes(conf, ke->ke_conf_size);
@@ -975,12 +1237,10 @@ __u32 gss_wrap_kerberos(struct gss_ctx *gctx,
         data_desc[1].len = gsshdr->len;
         data_desc[2].data = msg->data;
         data_desc[2].len = msg->len;
-        data_desc[3].data = (__u8 *) khdr;
-        data_desc[3].len = sizeof(*khdr);
 
         /* compute checksum */
         if (krb5_make_checksum(kctx->kc_enctype, &kctx->kc_keyi,
-                               khdr, 4, data_desc, &cksum))
+                               khdr, 3, data_desc, 0, NULL, &cksum))
                 return GSS_S_FAILURE;
         LASSERT(cksum.len >= ke->ke_hash_size);
 
@@ -1007,26 +1267,26 @@ __u32 gss_wrap_kerberos(struct gss_ctx *gctx,
                 struct ll_crypto_cipher *arc4_tfm;
 
                 if (krb5_make_checksum(ENCTYPE_ARCFOUR_HMAC, &kctx->kc_keyi,
-                                       NULL, 1, &cksum, &arc4_keye)) {
+                                       NULL, 1, &cksum, 0, NULL, &arc4_keye)) {
                         CERROR("failed to obtain arc4 enc key\n");
-                        GOTO(arc4_out, enc_rc = -EACCES);
+                        GOTO(arc4_out, rc = -EACCES);
                 }
 
                 arc4_tfm = ll_crypto_alloc_blkcipher("ecb(arc4)", 0, 0);
                 if (arc4_tfm == NULL) {
                         CERROR("failed to alloc tfm arc4 in ECB mode\n");
-                        GOTO(arc4_out_key, enc_rc = -EACCES);
+                        GOTO(arc4_out_key, rc = -EACCES);
                 }
 
                 if (ll_crypto_blkcipher_setkey(arc4_tfm, arc4_keye.data,
                                                arc4_keye.len)) {
                         CERROR("failed to set arc4 key, len %d\n",
                                arc4_keye.len);
-                        GOTO(arc4_out_tfm, enc_rc = -EACCES);
+                        GOTO(arc4_out_tfm, rc = -EACCES);
                 }
 
-                enc_rc = krb5_encrypt_rawobjs(arc4_tfm, 1,
-                                              3, data_desc, &cipher, 1);
+                rc = krb5_encrypt_rawobjs(arc4_tfm, 1,
+                                          3, data_desc, &cipher, 1);
 arc4_out_tfm:
                 ll_crypto_free_blkcipher(arc4_tfm);
 arc4_out_key:
@@ -1034,11 +1294,155 @@ arc4_out_key:
 arc4_out:
                 do {} while(0); /* just to avoid compile warning */
         } else {
-                enc_rc = krb5_encrypt_rawobjs(kctx->kc_keye.kb_tfm, 0,
-                                              3, data_desc, &cipher, 1);
+                rc = krb5_encrypt_rawobjs(kctx->kc_keye.kb_tfm, 0,
+                                          3, data_desc, &cipher, 1);
+        }
+
+        if (rc != 0) {
+                rawobj_free(&cksum);
+                return GSS_S_FAILURE;
+        }
+
+        /* fill in checksum */
+        LASSERT(token->len >= sizeof(*khdr) + cipher.len + ke->ke_hash_size);
+        memcpy((char *)(khdr + 1) + cipher.len,
+               cksum.data + cksum.len - ke->ke_hash_size,
+               ke->ke_hash_size);
+        rawobj_free(&cksum);
+
+        /* final token length */
+        token->len = sizeof(*khdr) + cipher.len + ke->ke_hash_size;
+        return GSS_S_COMPLETE;
+}
+
+static
+__u32 gss_prep_bulk_kerberos(struct gss_ctx *gctx,
+                             struct ptlrpc_bulk_desc *desc)
+{
+        struct krb5_ctx     *kctx = gctx->internal_ctx_id;
+        int                  blocksize, i;
+
+        LASSERT(desc->bd_iov_count);
+        LASSERT(desc->bd_enc_iov);
+        LASSERT(kctx->kc_keye.kb_tfm);
+
+        blocksize = ll_crypto_blkcipher_blocksize(kctx->kc_keye.kb_tfm);
+
+        for (i = 0; i < desc->bd_iov_count; i++) {
+                LASSERT(desc->bd_enc_iov[i].kiov_page);
+                /*
+                 * offset should always start at page boundary of either
+                 * client or server side.
+                 */
+                if (desc->bd_iov[i].kiov_offset & blocksize) {
+                        CERROR("odd offset %d in page %d\n",
+                               desc->bd_iov[i].kiov_offset, i);
+                        return GSS_S_FAILURE;
+                }
+
+                desc->bd_enc_iov[i].kiov_offset = desc->bd_iov[i].kiov_offset;
+                desc->bd_enc_iov[i].kiov_len = (desc->bd_iov[i].kiov_len +
+                                                blocksize - 1) & (~(blocksize - 1));
+        }
+
+        return GSS_S_COMPLETE;
+}
+
+static
+__u32 gss_wrap_bulk_kerberos(struct gss_ctx *gctx,
+                             struct ptlrpc_bulk_desc *desc,
+                             rawobj_t *token, int adj_nob)
+{
+        struct krb5_ctx     *kctx = gctx->internal_ctx_id;
+        struct krb5_enctype *ke = &enctypes[kctx->kc_enctype];
+        struct krb5_header  *khdr;
+        int                  blocksize;
+        rawobj_t             cksum = RAWOBJ_EMPTY;
+        rawobj_t             data_desc[1], cipher;
+        __u8                 conf[GSS_MAX_CIPHER_BLOCK];
+        int                  rc = 0;
+
+        LASSERT(ke);
+        LASSERT(ke->ke_conf_size <= GSS_MAX_CIPHER_BLOCK);
+
+        /*
+         * final token format:
+         * --------------------------------------------------
+         * | krb5 header | head/tail cipher text | checksum |
+         * --------------------------------------------------
+         */
+
+        /* fill krb5 header */
+        LASSERT(token->len >= sizeof(*khdr));
+        khdr = (struct krb5_header *) token->data;
+        fill_krb5_header(kctx, khdr, 1);
+
+        /* generate confounder */
+        get_random_bytes(conf, ke->ke_conf_size);
+
+        /* get encryption blocksize. note kc_keye might not associated with
+         * a tfm, currently only for arcfour-hmac */
+        if (kctx->kc_enctype == ENCTYPE_ARCFOUR_HMAC) {
+                LASSERT(kctx->kc_keye.kb_tfm == NULL);
+                blocksize = 1;
+        } else {
+                LASSERT(kctx->kc_keye.kb_tfm);
+                blocksize = ll_crypto_blkcipher_blocksize(kctx->kc_keye.kb_tfm);
+        }
+
+        /*
+         * we assume the size of krb5_header (16 bytes) must be n * blocksize.
+         * the bulk token size would be exactly (sizeof(krb5_header) +
+         * blocksize + sizeof(krb5_header) + hashsize)
+         */
+        LASSERT(blocksize <= ke->ke_conf_size);
+        LASSERT(sizeof(*khdr) >= blocksize && sizeof(*khdr) % blocksize == 0);
+        LASSERT(token->len >= sizeof(*khdr) + blocksize + sizeof(*khdr) + 16);
+
+        /*
+         * clear text layout for checksum:
+         * ------------------------------------------
+         * | confounder | clear pages | krb5 header |
+         * ------------------------------------------
+         */
+        data_desc[0].data = conf;
+        data_desc[0].len = ke->ke_conf_size;
+
+        /* compute checksum */
+        if (krb5_make_checksum(kctx->kc_enctype, &kctx->kc_keyi,
+                               khdr, 1, data_desc,
+                               desc->bd_iov_count, desc->bd_iov,
+                               &cksum))
+                return GSS_S_FAILURE;
+        LASSERT(cksum.len >= ke->ke_hash_size);
+
+        /*
+         * clear text layout for encryption:
+         * ------------------------------------------
+         * | confounder | clear pages | krb5 header |
+         * ------------------------------------------
+         *        |              |             |
+         *        ----------  (cipher pages)   |
+         * result token:   |                   |
+         * -------------------------------------------
+         * | krb5 header | cipher text | cipher text |
+         * -------------------------------------------
+         */
+        data_desc[0].data = conf;
+        data_desc[0].len = ke->ke_conf_size;
+
+        cipher.data = (__u8 *) (khdr + 1);
+        cipher.len = blocksize + sizeof(*khdr);
+
+        if (kctx->kc_enctype == ENCTYPE_ARCFOUR_HMAC) {
+                LBUG();
+                rc = 0;
+        } else {
+                rc = krb5_encrypt_bulk(kctx->kc_keye.kb_tfm, khdr,
+                                       conf, desc, &cipher, adj_nob);
         }
 
-        if (enc_rc != 0) {
+        if (rc != 0) {
                 rawobj_free(&cksum);
                 return GSS_S_FAILURE;
         }
@@ -1064,18 +1468,16 @@ __u32 gss_unwrap_kerberos(struct gss_ctx  *gctx,
         struct krb5_ctx     *kctx = gctx->internal_ctx_id;
         struct krb5_enctype *ke = &enctypes[kctx->kc_enctype];
         struct krb5_header  *khdr;
-        unsigned char        acceptor_flag;
         unsigned char       *tmpbuf;
         int                  blocksize, bodysize;
         rawobj_t             cksum = RAWOBJ_EMPTY;
         rawobj_t             cipher_in, plain_out;
         rawobj_t             hash_objs[3];
-        __u32                rc = GSS_S_FAILURE, enc_rc = 0;
+        int                  rc = 0;
+        __u32                major;
 
         LASSERT(ke);
 
-        acceptor_flag = kctx->kc_initiate ? FLAG_SENDER_IS_ACCEPTOR : 0;
-
         if (token->len < sizeof(*khdr)) {
                 CERROR("short signature: %u\n", token->len);
                 return GSS_S_DEFECTIVE_TOKEN;
@@ -1083,27 +1485,10 @@ __u32 gss_unwrap_kerberos(struct gss_ctx  *gctx,
 
         khdr = (struct krb5_header *) token->data;
 
-        /* sanity check header */
-        if (be16_to_cpu(khdr->kh_tok_id) != KG_TOK_WRAP_MSG) {
-                CERROR("bad token id\n");
-                return GSS_S_DEFECTIVE_TOKEN;
-        }
-        if ((khdr->kh_flags & FLAG_SENDER_IS_ACCEPTOR) != acceptor_flag) {
-                CERROR("bad direction flag\n");
-                return GSS_S_BAD_SIG;
-        }
-        if ((khdr->kh_flags & FLAG_WRAP_CONFIDENTIAL) == 0) {
-                CERROR("missing confidential flag\n");
-                return GSS_S_BAD_SIG;
-        }
-        if (khdr->kh_filler != 0xff) {
-                CERROR("bad filler\n");
-                return GSS_S_DEFECTIVE_TOKEN;
-        }
-        if (be16_to_cpu(khdr->kh_ec) != 0x0 ||
-            be16_to_cpu(khdr->kh_rrc) != 0x0) {
-                CERROR("bad EC or RRC\n");
-                return GSS_S_DEFECTIVE_TOKEN;
+        major = verify_krb5_header(kctx, khdr, 1);
+        if (major != GSS_S_COMPLETE) {
+                CERROR("bad krb5 header\n");
+                return major;
         }
 
         /* block size */
@@ -1143,6 +1528,8 @@ __u32 gss_unwrap_kerberos(struct gss_ctx  *gctx,
         if (!tmpbuf)
                 return GSS_S_FAILURE;
 
+        major = GSS_S_FAILURE;
+
         cipher_in.data = (__u8 *) (khdr + 1);
         cipher_in.len = bodysize;
         plain_out.data = tmpbuf;
@@ -1156,26 +1543,26 @@ __u32 gss_unwrap_kerberos(struct gss_ctx  *gctx,
                 cksum.len = ke->ke_hash_size;
 
                 if (krb5_make_checksum(ENCTYPE_ARCFOUR_HMAC, &kctx->kc_keyi,
-                                       NULL, 1, &cksum, &arc4_keye)) {
+                                       NULL, 1, &cksum, 0, NULL, &arc4_keye)) {
                         CERROR("failed to obtain arc4 enc key\n");
-                        GOTO(arc4_out, enc_rc = -EACCES);
+                        GOTO(arc4_out, rc = -EACCES);
                 }
 
                 arc4_tfm = ll_crypto_alloc_blkcipher("ecb(arc4)", 0, 0);
                 if (arc4_tfm == NULL) {
                         CERROR("failed to alloc tfm arc4 in ECB mode\n");
-                        GOTO(arc4_out_key, enc_rc = -EACCES);
+                        GOTO(arc4_out_key, rc = -EACCES);
                 }
 
                 if (ll_crypto_blkcipher_setkey(arc4_tfm,
                                          arc4_keye.data, arc4_keye.len)) {
                         CERROR("failed to set arc4 key, len %d\n",
                                arc4_keye.len);
-                        GOTO(arc4_out_tfm, enc_rc = -EACCES);
+                        GOTO(arc4_out_tfm, rc = -EACCES);
                 }
 
-                enc_rc = krb5_encrypt_rawobjs(arc4_tfm, 1,
-                                              1, &cipher_in, &plain_out, 0);
+                rc = krb5_encrypt_rawobjs(arc4_tfm, 1,
+                                          1, &cipher_in, &plain_out, 0);
 arc4_out_tfm:
                 ll_crypto_free_blkcipher(arc4_tfm);
 arc4_out_key:
@@ -1183,11 +1570,11 @@ arc4_out_key:
 arc4_out:
                 cksum = RAWOBJ_EMPTY;
         } else {
-                enc_rc = krb5_encrypt_rawobjs(kctx->kc_keye.kb_tfm, 0,
-                                              1, &cipher_in, &plain_out, 0);
+                rc = krb5_encrypt_rawobjs(kctx->kc_keye.kb_tfm, 0,
+                                          1, &cipher_in, &plain_out, 0);
         }
 
-        if (enc_rc != 0) {
+        if (rc != 0) {
                 CERROR("error decrypt\n");
                 goto out_free;
         }
@@ -1215,46 +1602,119 @@ arc4_out:
         hash_objs[0].data = plain_out.data;
         hash_objs[1].len = gsshdr->len;
         hash_objs[1].data = gsshdr->data;
-        hash_objs[2].len = plain_out.len - ke->ke_conf_size;
+        hash_objs[2].len = plain_out.len - ke->ke_conf_size - sizeof(*khdr);
         hash_objs[2].data = plain_out.data + ke->ke_conf_size;
         if (krb5_make_checksum(kctx->kc_enctype, &kctx->kc_keyi,
-                               khdr, 3, hash_objs, &cksum))
+                               khdr, 3, hash_objs, 0, NULL, &cksum))
                 goto out_free;
 
         LASSERT(cksum.len >= ke->ke_hash_size);
         if (memcmp((char *)(khdr + 1) + bodysize,
                    cksum.data + cksum.len - ke->ke_hash_size,
                    ke->ke_hash_size)) {
-                CERROR("cksum mismatch\n");
+                CERROR("checksum mismatch\n");
                 goto out_free;
         }
 
         msg->len =  bodysize - ke->ke_conf_size - sizeof(*khdr);
         memcpy(msg->data, tmpbuf + ke->ke_conf_size, msg->len);
 
-        rc = GSS_S_COMPLETE;
+        major = GSS_S_COMPLETE;
 out_free:
         OBD_FREE(tmpbuf, bodysize);
         rawobj_free(&cksum);
-        return rc;
+        return major;
 }
 
 static
-__u32 gss_plain_encrypt_kerberos(struct gss_ctx  *ctx,
-                                 int              decrypt,
-                                 int              length,
-                                 void            *in_buf,
-                                 void            *out_buf)
+__u32 gss_unwrap_bulk_kerberos(struct gss_ctx *gctx,
+                               struct ptlrpc_bulk_desc *desc,
+                               rawobj_t *token)
 {
-        struct krb5_ctx        *kctx = ctx->internal_ctx_id;
-        __u32                   rc;
+        struct krb5_ctx     *kctx = gctx->internal_ctx_id;
+        struct krb5_enctype *ke = &enctypes[kctx->kc_enctype];
+        struct krb5_header  *khdr;
+        int                  blocksize;
+        rawobj_t             cksum = RAWOBJ_EMPTY;
+        rawobj_t             cipher, plain;
+        rawobj_t             data_desc[1];
+        int                  rc;
+        __u32                major;
+
+        LASSERT(ke);
+
+        if (token->len < sizeof(*khdr)) {
+                CERROR("short signature: %u\n", token->len);
+                return GSS_S_DEFECTIVE_TOKEN;
+        }
+
+        khdr = (struct krb5_header *) token->data;
+
+        major = verify_krb5_header(kctx, khdr, 1);
+        if (major != GSS_S_COMPLETE) {
+                CERROR("bad krb5 header\n");
+                return major;
+        }
+
+        /* block size */
+        if (kctx->kc_enctype == ENCTYPE_ARCFOUR_HMAC) {
+                LASSERT(kctx->kc_keye.kb_tfm == NULL);
+                blocksize = 1;
+                LBUG();
+        } else {
+                LASSERT(kctx->kc_keye.kb_tfm);
+                blocksize = ll_crypto_blkcipher_blocksize(kctx->kc_keye.kb_tfm);
+        }
+        LASSERT(sizeof(*khdr) >= blocksize && sizeof(*khdr) % blocksize == 0);
+
+        /*
+         * token format is expected as:
+         * -----------------------------------------------
+         * | krb5 header | head/tail cipher text | cksum |
+         * -----------------------------------------------
+         */
+        if (token->len < sizeof(*khdr) + blocksize + sizeof(*khdr) +
+                         ke->ke_hash_size) {
+                CERROR("short token size: %u\n", token->len);
+                return GSS_S_DEFECTIVE_TOKEN;
+        }
+
+        cipher.data = (__u8 *) (khdr + 1);
+        cipher.len = blocksize + sizeof(*khdr);
+        plain.data = cipher.data;
+        plain.len = cipher.len;
 
-        rc = krb5_encrypt(kctx->kc_keye.kb_tfm, decrypt,
-                          NULL, in_buf, out_buf, length);
+        rc = krb5_decrypt_bulk(kctx->kc_keye.kb_tfm, khdr,
+                               desc, &cipher, &plain);
         if (rc)
-                CERROR("plain encrypt error: %d\n", rc);
+                return GSS_S_DEFECTIVE_TOKEN;
+
+        /*
+         * verify checksum, compose clear text as layout:
+         * ------------------------------------------
+         * | confounder | clear pages | krb5 header |
+         * ------------------------------------------
+         */
+        data_desc[0].data = plain.data;
+        data_desc[0].len = blocksize;
+
+        if (krb5_make_checksum(kctx->kc_enctype, &kctx->kc_keyi,
+                               khdr, 1, data_desc,
+                               desc->bd_iov_count, desc->bd_iov,
+                               &cksum))
+                return GSS_S_FAILURE;
+        LASSERT(cksum.len >= ke->ke_hash_size);
+
+        if (memcmp(plain.data + blocksize + sizeof(*khdr),
+                   cksum.data + cksum.len - ke->ke_hash_size,
+                   ke->ke_hash_size)) {
+                CERROR("checksum mismatch\n");
+                rawobj_free(&cksum);
+                return GSS_S_BAD_SIG;
+        }
 
-        return rc;
+        rawobj_free(&cksum);
+        return GSS_S_COMPLETE;
 }
 
 int gss_display_kerberos(struct gss_ctx        *ctx,
@@ -1277,7 +1737,9 @@ static struct gss_api_ops gss_kerberos_ops = {
         .gss_verify_mic             = gss_verify_mic_kerberos,
         .gss_wrap                   = gss_wrap_kerberos,
         .gss_unwrap                 = gss_unwrap_kerberos,
-        .gss_plain_encrypt          = gss_plain_encrypt_kerberos,
+        .gss_prep_bulk              = gss_prep_bulk_kerberos,
+        .gss_wrap_bulk              = gss_wrap_bulk_kerberos,
+        .gss_unwrap_bulk            = gss_unwrap_bulk_kerberos,
         .gss_delete_sec_context     = gss_delete_sec_context_kerberos,
         .gss_display                = gss_display_kerberos,
 };
index 8a4e627..ca55fe8 100644 (file)
@@ -214,6 +214,8 @@ __u32 lgss_inquire_context(struct gss_ctx *context_handle,
 __u32 lgss_get_mic(struct gss_ctx *context_handle,
                    int msgcnt,
                    rawobj_t *msg,
+                   int iovcnt,
+                   lnet_kiov_t *iovs,
                    rawobj_t *mic_token)
 {
         LASSERT(context_handle);
@@ -225,6 +227,8 @@ __u32 lgss_get_mic(struct gss_ctx *context_handle,
                 ->gss_get_mic(context_handle,
                               msgcnt,
                               msg,
+                              iovcnt,
+                              iovs,
                               mic_token);
 }
 
@@ -232,6 +236,8 @@ __u32 lgss_get_mic(struct gss_ctx *context_handle,
 __u32 lgss_verify_mic(struct gss_ctx *context_handle,
                       int msgcnt,
                       rawobj_t *msg,
+                      int iovcnt,
+                      lnet_kiov_t *iovs,
                       rawobj_t *mic_token)
 {
         LASSERT(context_handle);
@@ -243,6 +249,8 @@ __u32 lgss_verify_mic(struct gss_ctx *context_handle,
                 ->gss_verify_mic(context_handle,
                                  msgcnt,
                                  msg,
+                                 iovcnt,
+                                 iovs,
                                  mic_token);
 }
 
@@ -276,19 +284,43 @@ __u32 lgss_unwrap(struct gss_ctx *context_handle,
 }
 
 
-__u32 lgss_plain_encrypt(struct gss_ctx *ctx,
-                         int decrypt,
-                         int length,
-                         void *in_buf,
-                         void *out_buf)
+__u32 lgss_prep_bulk(struct gss_ctx *context_handle,
+                     struct ptlrpc_bulk_desc *desc)
 {
-        LASSERT(ctx);
-        LASSERT(ctx->mech_type);
-        LASSERT(ctx->mech_type->gm_ops);
-        LASSERT(ctx->mech_type->gm_ops->gss_plain_encrypt);
+        LASSERT(context_handle);
+        LASSERT(context_handle->mech_type);
+        LASSERT(context_handle->mech_type->gm_ops);
+        LASSERT(context_handle->mech_type->gm_ops->gss_prep_bulk);
 
-        return ctx->mech_type->gm_ops
-                ->gss_plain_encrypt(ctx, decrypt, length, in_buf, out_buf);
+        return context_handle->mech_type->gm_ops
+                ->gss_prep_bulk(context_handle, desc);
+}
+
+__u32 lgss_wrap_bulk(struct gss_ctx *context_handle,
+                     struct ptlrpc_bulk_desc *desc,
+                     rawobj_t *token,
+                     int adj_nob)
+{
+        LASSERT(context_handle);
+        LASSERT(context_handle->mech_type);
+        LASSERT(context_handle->mech_type->gm_ops);
+        LASSERT(context_handle->mech_type->gm_ops->gss_wrap_bulk);
+
+        return context_handle->mech_type->gm_ops
+                ->gss_wrap_bulk(context_handle, desc, token, adj_nob);
+}
+
+__u32 lgss_unwrap_bulk(struct gss_ctx *context_handle,
+                       struct ptlrpc_bulk_desc *desc,
+                       rawobj_t *token)
+{
+        LASSERT(context_handle);
+        LASSERT(context_handle->mech_type);
+        LASSERT(context_handle->mech_type->gm_ops);
+        LASSERT(context_handle->mech_type->gm_ops->gss_unwrap_bulk);
+
+        return context_handle->mech_type->gm_ops
+                ->gss_unwrap_bulk(context_handle, desc, token);
 }
 
 /* gss_delete_sec_context: free all resources associated with context_handle.
index f3aae3f..9b531f2 100644 (file)
@@ -182,7 +182,7 @@ static int gss_sign_msg(struct lustre_msg *msg,
                         rawobj_t *handle)
 {
         struct gss_header      *ghdr;
-        rawobj_t                text[3], mic;
+        rawobj_t                text[4], mic;
         int                     textcnt, max_textcnt, mic_idx;
         __u32                   major;
 
@@ -223,7 +223,7 @@ static int gss_sign_msg(struct lustre_msg *msg,
         mic.len = msg->lm_buflens[mic_idx];
         mic.data = lustre_msg_buf(msg, mic_idx, 0);
 
-        major = lgss_get_mic(mechctx, textcnt, text, &mic);
+        major = lgss_get_mic(mechctx, textcnt, text, 0, NULL, &mic);
         if (major != GSS_S_COMPLETE) {
                 CERROR("fail to generate MIC: %08x\n", major);
                 return -EPERM;
@@ -241,7 +241,7 @@ __u32 gss_verify_msg(struct lustre_msg *msg,
                      struct gss_ctx *mechctx,
                      __u32 svc)
 {
-        rawobj_t        text[3], mic;
+        rawobj_t        text[4], mic;
         int             textcnt, max_textcnt;
         int             mic_idx;
         __u32           major;
@@ -262,7 +262,7 @@ __u32 gss_verify_msg(struct lustre_msg *msg,
         mic.len = msg->lm_buflens[mic_idx];
         mic.data = lustre_msg_buf(msg, mic_idx, 0);
 
-        major = lgss_verify_mic(mechctx, textcnt, text, &mic);
+        major = lgss_verify_mic(mechctx, textcnt, text, 0, NULL, &mic);
         if (major != GSS_S_COMPLETE)
                 CERROR("mic verify error: %08x\n", major);
 
@@ -584,6 +584,33 @@ static inline int gss_cli_payload(struct ptlrpc_cli_ctx *ctx,
         return gss_mech_payload(NULL, msgsize, privacy);
 }
 
+static int gss_cli_bulk_payload(struct ptlrpc_cli_ctx *ctx,
+                                struct sptlrpc_flavor *flvr,
+                                int reply, int read)
+{
+        int     payload = sizeof(struct ptlrpc_bulk_sec_desc);
+
+        LASSERT(SPTLRPC_FLVR_BULK_TYPE(flvr->sf_rpc) == SPTLRPC_BULK_DEFAULT);
+
+        if ((!reply && !read) || (reply && read)) {
+                switch (SPTLRPC_FLVR_BULK_SVC(flvr->sf_rpc)) {
+                case SPTLRPC_BULK_SVC_NULL:
+                        break;
+                case SPTLRPC_BULK_SVC_INTG:
+                        payload += gss_cli_payload(ctx, 0, 0);
+                        break;
+                case SPTLRPC_BULK_SVC_PRIV:
+                        payload += gss_cli_payload(ctx, 0, 1);
+                        break;
+                case SPTLRPC_BULK_SVC_AUTH:
+                default:
+                        LBUG();
+                }
+        }
+
+        return payload;
+}
+
 int gss_cli_ctx_match(struct ptlrpc_cli_ctx *ctx, struct vfs_cred *vcred)
 {
         return (ctx->cc_vcred.vc_uid == vcred->vc_uid);
@@ -627,7 +654,7 @@ int gss_cli_ctx_sign(struct ptlrpc_cli_ctx *ctx,
         if (req->rq_ctx_init)
                 RETURN(0);
 
-        svc = RPC_FLVR_SVC(req->rq_flvr.sf_rpc);
+        svc = SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc);
         if (req->rq_pack_bulk)
                 flags |= LUSTRE_GSS_PACK_BULK;
         if (req->rq_pack_udesc)
@@ -798,8 +825,10 @@ int gss_cli_ctx_verify(struct ptlrpc_cli_ctx *ctx,
                         gss_header_swabber(ghdr);
 
                 major = gss_verify_msg(msg, gctx->gc_mechctx, reqhdr->gh_svc);
-                if (major != GSS_S_COMPLETE)
+                if (major != GSS_S_COMPLETE) {
+                        CERROR("failed to verify reply: %x\n", major);
                         RETURN(-EPERM);
+                }
 
                 if (req->rq_early && reqhdr->gh_svc == SPTLRPC_SVC_NULL) {
                         __u32 cksum;
@@ -996,6 +1025,7 @@ int gss_cli_ctx_unseal(struct ptlrpc_cli_ctx *ctx,
                 major = gss_unseal_msg(gctx->gc_mechctx, msg,
                                        &msglen, req->rq_repdata_len);
                 if (major != GSS_S_COMPLETE) {
+                        CERROR("failed to unwrap reply: %x\n", major);
                         rc = -EPERM;
                         break;
                 }
@@ -1018,7 +1048,7 @@ int gss_cli_ctx_unseal(struct ptlrpc_cli_ctx *ctx,
                         }
 
                         /* bulk checksum is the last segment */
-                        if (bulk_sec_desc_unpack(msg, msg->lm_bufcount-1))
+                        if (bulk_sec_desc_unpack(msg, msg->lm_bufcount - 1))
                                 RETURN(-EPROTO);
                 }
 
@@ -1067,12 +1097,13 @@ int gss_sec_create_common(struct gss_sec *gsec,
         struct ptlrpc_sec   *sec;
 
         LASSERT(imp);
-        LASSERT(RPC_FLVR_POLICY(sf->sf_rpc) == SPTLRPC_POLICY_GSS);
+        LASSERT(SPTLRPC_FLVR_POLICY(sf->sf_rpc) == SPTLRPC_POLICY_GSS);
 
-        gsec->gs_mech = lgss_subflavor_to_mech(RPC_FLVR_SUB(sf->sf_rpc));
+        gsec->gs_mech = lgss_subflavor_to_mech(
+                                SPTLRPC_FLVR_BASE_SUB(sf->sf_rpc));
         if (!gsec->gs_mech) {
                 CERROR("gss backend 0x%x not found\n",
-                       RPC_FLVR_SUB(sf->sf_rpc));
+                       SPTLRPC_FLVR_BASE_SUB(sf->sf_rpc));
                 return -EOPNOTSUPP;
         }
 
@@ -1099,8 +1130,7 @@ int gss_sec_create_common(struct gss_sec *gsec,
                 sec->ps_gc_interval = 0;
         }
 
-        if (sec->ps_flvr.sf_bulk_ciph != BULK_CIPH_ALG_NULL &&
-            sec->ps_flvr.sf_flags & PTLRPC_SEC_FL_BULK)
+        if (SPTLRPC_FLVR_BULK_SVC(sec->ps_flvr.sf_rpc) == SPTLRPC_BULK_SVC_PRIV)
                 sptlrpc_enc_pool_add_user();
 
         CDEBUG(D_SEC, "create %s%s@%p\n", (svcctx ? "reverse " : ""),
@@ -1124,8 +1154,7 @@ void gss_sec_destroy_common(struct gss_sec *gsec)
 
         class_import_put(sec->ps_import);
 
-        if (sec->ps_flvr.sf_bulk_ciph != BULK_CIPH_ALG_NULL &&
-            sec->ps_flvr.sf_flags & PTLRPC_SEC_FL_BULK)
+        if (SPTLRPC_FLVR_BULK_SVC(sec->ps_flvr.sf_rpc) == SPTLRPC_BULK_SVC_PRIV)
                 sptlrpc_enc_pool_del_user();
 
         EXIT;
@@ -1247,9 +1276,9 @@ int gss_alloc_reqbuf_intg(struct ptlrpc_sec *sec,
         }
 
         if (req->rq_pack_bulk) {
-                buflens[bufcnt] = bulk_sec_desc_size(
-                                                req->rq_flvr.sf_bulk_hash, 1,
-                                                req->rq_bulk_read);
+                buflens[bufcnt] = gss_cli_bulk_payload(req->rq_cli_ctx,
+                                                       &req->rq_flvr,
+                                                       0, req->rq_bulk_read);
                 if (svc == SPTLRPC_SVC_INTG)
                         txtsize += buflens[bufcnt];
                 bufcnt++;
@@ -1313,9 +1342,9 @@ int gss_alloc_reqbuf_priv(struct ptlrpc_sec *sec,
         if (req->rq_pack_udesc)
                 ibuflens[ibufcnt++] = sptlrpc_current_user_desc_size();
         if (req->rq_pack_bulk)
-                ibuflens[ibufcnt++] = bulk_sec_desc_size(
-                                                req->rq_flvr.sf_bulk_hash, 1,
-                                                req->rq_bulk_read);
+                ibuflens[ibufcnt++] = gss_cli_bulk_payload(req->rq_cli_ctx,
+                                                           &req->rq_flvr, 0,
+                                                           req->rq_bulk_read);
 
         clearsize = lustre_msg_size_v2(ibufcnt, ibuflens);
         /* to allow append padding during encryption */
@@ -1375,7 +1404,7 @@ int gss_alloc_reqbuf(struct ptlrpc_sec *sec,
                      struct ptlrpc_request *req,
                      int msgsize)
 {
-        int     svc = RPC_FLVR_SVC(req->rq_flvr.sf_rpc);
+        int     svc = SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc);
 
         LASSERT(!req->rq_pack_bulk ||
                 (req->rq_bulk_read || req->rq_bulk_write));
@@ -1400,7 +1429,7 @@ void gss_free_reqbuf(struct ptlrpc_sec *sec,
         ENTRY;
 
         LASSERT(!req->rq_pool || req->rq_reqbuf);
-        privacy = RPC_FLVR_SVC(req->rq_flvr.sf_rpc) == SPTLRPC_SVC_PRIV;
+        privacy = SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc) == SPTLRPC_SVC_PRIV;
 
         if (!req->rq_clrbuf)
                 goto release_reqbuf;
@@ -1477,9 +1506,9 @@ int gss_alloc_repbuf_intg(struct ptlrpc_sec *sec,
                 txtsize += buflens[1];
 
         if (req->rq_pack_bulk) {
-                buflens[bufcnt] = bulk_sec_desc_size(
-                                                req->rq_flvr.sf_bulk_hash, 0,
-                                                req->rq_bulk_read);
+                buflens[bufcnt] = gss_cli_bulk_payload(req->rq_cli_ctx,
+                                                       &req->rq_flvr,
+                                                       1, req->rq_bulk_read);
                 if (svc == SPTLRPC_SVC_INTG)
                         txtsize += buflens[bufcnt];
                 bufcnt++;
@@ -1513,9 +1542,9 @@ int gss_alloc_repbuf_priv(struct ptlrpc_sec *sec,
         buflens[0] = msgsize;
 
         if (req->rq_pack_bulk)
-                buflens[bufcnt++] = bulk_sec_desc_size(
-                                                req->rq_flvr.sf_bulk_hash, 0,
-                                                req->rq_bulk_read);
+                buflens[bufcnt++] = gss_cli_bulk_payload(req->rq_cli_ctx,
+                                                         &req->rq_flvr,
+                                                         1, req->rq_bulk_read);
         txtsize = lustre_msg_size_v2(bufcnt, buflens);
         txtsize += GSS_MAX_CIPHER_BLOCK;
 
@@ -1535,7 +1564,7 @@ int gss_alloc_repbuf(struct ptlrpc_sec *sec,
                      struct ptlrpc_request *req,
                      int msgsize)
 {
-        int     svc = RPC_FLVR_SVC(req->rq_flvr.sf_rpc);
+        int     svc = SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc);
         ENTRY;
 
         LASSERT(!req->rq_pack_bulk ||
@@ -1771,7 +1800,7 @@ int gss_enlarge_reqbuf(struct ptlrpc_sec *sec,
                        struct ptlrpc_request *req,
                        int segment, int newsize)
 {
-        int     svc = RPC_FLVR_SVC(req->rq_flvr.sf_rpc);
+        int     svc = SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc);
 
         LASSERT(!req->rq_ctx_init && !req->rq_ctx_fini);
 
@@ -2066,8 +2095,10 @@ int gss_svc_verify_request(struct ptlrpc_request *req,
         }
 
         *major = gss_verify_msg(msg, gctx->gsc_mechctx, gw->gw_svc);
-        if (*major != GSS_S_COMPLETE)
+        if (*major != GSS_S_COMPLETE) {
+                CERROR("failed to verify request: %x\n", *major);
                 RETURN(-EACCES);
+        }
 
         if (gctx->gsc_reverse == 0 &&
             gss_check_seq_num(&gctx->gsc_seqdata, gw->gw_seq, 1)) {
@@ -2094,10 +2125,10 @@ verified:
                 offset++;
         }
 
-        /* check bulk cksum data */
+        /* check bulk_sec_desc data */
         if (gw->gw_flags & LUSTRE_GSS_PACK_BULK) {
                 if (msg->lm_bufcount < (offset + 1)) {
-                        CERROR("no bulk checksum included\n");
+                        CERROR("missing bulk sec descriptor\n");
                         RETURN(-EINVAL);
                 }
 
@@ -2133,8 +2164,10 @@ int gss_svc_unseal_request(struct ptlrpc_request *req,
 
         *major = gss_unseal_msg(gctx->gsc_mechctx, msg,
                                &msglen, req->rq_reqdata_len);
-        if (*major != GSS_S_COMPLETE)
+        if (*major != GSS_S_COMPLETE) {
+                CERROR("failed to unwrap request: %x\n", *major);
                 RETURN(-EACCES);
+        }
 
         if (gss_check_seq_num(&gctx->gsc_seqdata, gw->gw_seq, 1)) {
                 CERROR("phase 1+: discard replayed req: seq %u\n", gw->gw_seq);
@@ -2405,6 +2438,31 @@ int gss_svc_payload(struct gss_svc_reqctx *grctx, int early,
         return gss_mech_payload(NULL, msgsize, privacy);
 }
 
+static int gss_svc_bulk_payload(struct gss_svc_ctx *gctx,
+                                struct sptlrpc_flavor *flvr,
+                                int read)
+{
+        int     payload = sizeof(struct ptlrpc_bulk_sec_desc);
+
+        if (read) {
+                switch (SPTLRPC_FLVR_BULK_SVC(flvr->sf_rpc)) {
+                case SPTLRPC_BULK_SVC_NULL:
+                        break;
+                case SPTLRPC_BULK_SVC_INTG:
+                        payload += gss_mech_payload(NULL, 0, 0);
+                        break;
+                case SPTLRPC_BULK_SVC_PRIV:
+                        payload += gss_mech_payload(NULL, 0, 1);
+                        break;
+                case SPTLRPC_BULK_SVC_AUTH:
+                default:
+                        LBUG();
+                }
+        }
+
+        return payload;
+}
+
 int gss_svc_alloc_rs(struct ptlrpc_request *req, int msglen)
 {
         struct gss_svc_reqctx       *grctx;
@@ -2422,7 +2480,7 @@ int gss_svc_alloc_rs(struct ptlrpc_request *req, int msglen)
                 RETURN(-EPROTO);
         }
 
-        svc = RPC_FLVR_SVC(req->rq_flvr.sf_rpc);
+        svc = SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc);
         early = (req->rq_packed_final == 0);
 
         grctx = gss_svc_ctx2reqctx(req->rq_svc_ctx);
@@ -2440,9 +2498,10 @@ int gss_svc_alloc_rs(struct ptlrpc_request *req, int msglen)
                         LASSERT(grctx->src_reqbsd);
 
                         bsd_off = ibufcnt;
-                        ibuflens[ibufcnt++] = bulk_sec_desc_size(
-                                                grctx->src_reqbsd->bsd_hash_alg,
-                                                0, req->rq_bulk_read);
+                        ibuflens[ibufcnt++] = gss_svc_bulk_payload(
+                                                        grctx->src_ctx,
+                                                        &req->rq_flvr,
+                                                        req->rq_bulk_read);
                 }
 
                 txtsize = lustre_msg_size_v2(ibufcnt, ibuflens);
@@ -2465,9 +2524,10 @@ int gss_svc_alloc_rs(struct ptlrpc_request *req, int msglen)
                         LASSERT(grctx->src_reqbsd);
 
                         bsd_off = bufcnt;
-                        buflens[bufcnt] = bulk_sec_desc_size(
-                                                grctx->src_reqbsd->bsd_hash_alg,
-                                                0, req->rq_bulk_read);
+                        buflens[bufcnt] = gss_svc_bulk_payload(
+                                                        grctx->src_ctx,
+                                                        &req->rq_flvr,
+                                                        req->rq_bulk_read);
                         if (svc == SPTLRPC_SVC_INTG)
                                 txtsize += buflens[bufcnt];
                         bufcnt++;
index 11f6641..357e559 100644 (file)
@@ -529,6 +529,9 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
         lustre_msghdr_set_flags(request->rq_reqmsg,
                                 request->rq_import->imp_msghdr_flags);
 
+        if (request->rq_resend)
+                lustre_msg_add_flags(request->rq_reqmsg, MSG_RESENT);
+
         rc = sptlrpc_cli_wrap_request(request);
         if (rc)
                 RETURN(rc);
@@ -540,9 +543,6 @@ int ptl_send_rpc(struct ptlrpc_request *request, int noreply)
                         RETURN(rc);
         }
 
-        if (request->rq_resend)
-                lustre_msg_add_flags(request->rq_reqmsg, MSG_RESENT);
-
         if (!noreply) {
                 LASSERT (request->rq_replen != 0);
                 if (request->rq_repbuf == NULL) {
index d53d42c..1b5f1ed 100644 (file)
@@ -57,8 +57,11 @@ void ptlrpc_fill_bulk_md (lnet_md_t *md, struct ptlrpc_bulk_desc *desc)
         LASSERT (!(md->options & (LNET_MD_IOVEC | LNET_MD_KIOV | LNET_MD_PHYS)));
 
         md->options |= LNET_MD_KIOV;
-        md->start = &desc->bd_iov[0];
         md->length = desc->bd_iov_count;
+        if (desc->bd_enc_iov)
+                md->start = desc->bd_enc_iov;
+        else
+                md->start = desc->bd_iov;
 }
 
 void ptlrpc_add_bulk_page(struct ptlrpc_bulk_desc *desc, cfs_page_t *page,
index d268380..69e618f 100644 (file)
@@ -118,12 +118,13 @@ int sptlrpc_unregister_policy(struct ptlrpc_sec_policy *policy)
 EXPORT_SYMBOL(sptlrpc_unregister_policy);
 
 static
-struct ptlrpc_sec_policy * sptlrpc_rpcflavor2policy(__u16 flavor)
+struct ptlrpc_sec_policy * sptlrpc_wireflavor2policy(__u32 flavor)
 {
         static DECLARE_MUTEX(load_mutex);
         static atomic_t           loaded = ATOMIC_INIT(0);
         struct ptlrpc_sec_policy *policy;
-        __u16                     number = RPC_FLVR_POLICY(flavor), flag = 0;
+        __u16                     number = SPTLRPC_FLVR_POLICY(flavor);
+        __u16                     flag = 0;
 
         if (number >= SPTLRPC_POLICY_MAX)
                 return NULL;
@@ -157,7 +158,7 @@ struct ptlrpc_sec_policy * sptlrpc_rpcflavor2policy(__u16 flavor)
         return policy;
 }
 
-__u16 sptlrpc_name2rpcflavor(const char *name)
+__u32 sptlrpc_name2flavor_base(const char *name)
 {
         if (!strcmp(name, "null"))
                 return SPTLRPC_FLVR_NULL;
@@ -174,51 +175,86 @@ __u16 sptlrpc_name2rpcflavor(const char *name)
 
         return SPTLRPC_FLVR_INVALID;
 }
-EXPORT_SYMBOL(sptlrpc_name2rpcflavor);
+EXPORT_SYMBOL(sptlrpc_name2flavor_base);
 
-const char *sptlrpc_rpcflavor2name(__u16 flavor)
+const char *sptlrpc_flavor2name_base(__u32 flvr)
 {
-        switch (flavor) {
-        case SPTLRPC_FLVR_NULL:
+        __u32   base = SPTLRPC_FLVR_BASE(flvr);
+
+        if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_NULL))
                 return "null";
-        case SPTLRPC_FLVR_PLAIN:
+        else if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_PLAIN))
                 return "plain";
-        case SPTLRPC_FLVR_KRB5N:
+        else if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_KRB5N))
                 return "krb5n";
-        case SPTLRPC_FLVR_KRB5A:
+        else if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_KRB5A))
                 return "krb5a";
-        case SPTLRPC_FLVR_KRB5I:
+        else if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_KRB5I))
                 return "krb5i";
-        case SPTLRPC_FLVR_KRB5P:
+        else if (base == SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_KRB5P))
                 return "krb5p";
-        default:
-                CERROR("invalid rpc flavor 0x%x(p%u,s%u,v%u)\n", flavor,
-                       RPC_FLVR_POLICY(flavor), RPC_FLVR_MECH(flavor),
-                       RPC_FLVR_SVC(flavor));
-        }
-        return "unknown";
+
+        CERROR("invalid wire flavor 0x%x\n", flvr);
+        return "invalid";
 }
-EXPORT_SYMBOL(sptlrpc_rpcflavor2name);
+EXPORT_SYMBOL(sptlrpc_flavor2name_base);
 
-int sptlrpc_flavor2name(struct sptlrpc_flavor *sf, char *buf, int bufsize)
+char *sptlrpc_flavor2name_bulk(struct sptlrpc_flavor *sf,
+                               char *buf, int bufsize)
 {
-        char           *bulk;
-
-        if (sf->sf_bulk_ciph != BULK_CIPH_ALG_NULL)
-                bulk = "bulkp";
-        else if (sf->sf_bulk_hash != BULK_HASH_ALG_NULL)
-                bulk = "bulki";
+        if (SPTLRPC_FLVR_POLICY(sf->sf_rpc) == SPTLRPC_POLICY_PLAIN)
+                snprintf(buf, bufsize, "hash:%s",
+                         sptlrpc_get_hash_name(sf->u_bulk.hash.hash_alg));
         else
-                bulk = "bulkn";
+                snprintf(buf, bufsize, "%s",
+                         sptlrpc_flavor2name_base(sf->sf_rpc));
 
-        snprintf(buf, bufsize, "%s-%s:%s/%s",
-                 sptlrpc_rpcflavor2name(sf->sf_rpc), bulk,
-                 sptlrpc_get_hash_name(sf->sf_bulk_hash),
-                 sptlrpc_get_ciph_name(sf->sf_bulk_ciph));
-        return 0;
+        buf[bufsize - 1] = '\0';
+        return buf;
+}
+EXPORT_SYMBOL(sptlrpc_flavor2name_bulk);
+
+char *sptlrpc_flavor2name(struct sptlrpc_flavor *sf, char *buf, int bufsize)
+{
+        snprintf(buf, bufsize, "%s", sptlrpc_flavor2name_base(sf->sf_rpc));
+
+        /*
+         * currently we don't support customized bulk specification for
+         * flavors other than plain
+         */
+        if (SPTLRPC_FLVR_POLICY(sf->sf_rpc) == SPTLRPC_POLICY_PLAIN) {
+                char bspec[16];
+
+                bspec[0] = '-';
+                sptlrpc_flavor2name_bulk(sf, &bspec[1], sizeof(bspec) - 1);
+                strncat(buf, bspec, bufsize);
+        }
+
+        buf[bufsize - 1] = '\0';
+        return buf;
 }
 EXPORT_SYMBOL(sptlrpc_flavor2name);
 
+char *sptlrpc_secflags2str(__u32 flags, char *buf, int bufsize)
+{
+        buf[0] = '\0';
+
+        if (flags & PTLRPC_SEC_FL_REVERSE)
+                strncat(buf, "reverse,", bufsize);
+        if (flags & PTLRPC_SEC_FL_ROOTONLY)
+                strncat(buf, "rootonly,", bufsize);
+        if (flags & PTLRPC_SEC_FL_UDESC)
+                strncat(buf, "udesc,", bufsize);
+        if (flags & PTLRPC_SEC_FL_BULK)
+                strncat(buf, "bulk,", bufsize);
+        if (buf[0] == '\0')
+                strncat(buf, "-,", bufsize);
+
+        buf[bufsize - 1] = '\0';
+        return buf;
+}
+EXPORT_SYMBOL(sptlrpc_secflags2str);
+
 /**************************************************
  * client context APIs                            *
  **************************************************/
@@ -752,9 +788,11 @@ void sptlrpc_req_set_flavor(struct ptlrpc_request *req, int opcode)
         /* special security flags accoding to opcode */
         switch (opcode) {
         case OST_READ:
+        case MDS_READPAGE:
                 req->rq_bulk_read = 1;
                 break;
         case OST_WRITE:
+        case MDS_WRITEPAGE:
                 req->rq_bulk_write = 1;
                 break;
         case SEC_CTX_INIT:
@@ -783,9 +821,9 @@ void sptlrpc_req_set_flavor(struct ptlrpc_request *req, int opcode)
         /* force SVC_NULL for context initiation rpc, SVC_INTG for context
          * destruction rpc */
         if (unlikely(req->rq_ctx_init))
-                rpc_flvr_set_svc(&req->rq_flvr.sf_rpc, SPTLRPC_SVC_NULL);
+                flvr_set_svc(&req->rq_flvr.sf_rpc, SPTLRPC_SVC_NULL);
         else if (unlikely(req->rq_ctx_fini))
-                rpc_flvr_set_svc(&req->rq_flvr.sf_rpc, SPTLRPC_SVC_INTG);
+                flvr_set_svc(&req->rq_flvr.sf_rpc, SPTLRPC_SVC_INTG);
 
         /* user descriptor flag, null security can't do it anyway */
         if ((sec->ps_flvr.sf_flags & PTLRPC_SEC_FL_UDESC) &&
@@ -794,14 +832,13 @@ void sptlrpc_req_set_flavor(struct ptlrpc_request *req, int opcode)
 
         /* bulk security flag */
         if ((req->rq_bulk_read || req->rq_bulk_write) &&
-            (req->rq_flvr.sf_bulk_ciph != BULK_CIPH_ALG_NULL ||
-             req->rq_flvr.sf_bulk_hash != BULK_HASH_ALG_NULL))
+            sptlrpc_flavor_has_bulk(&req->rq_flvr))
                 req->rq_pack_bulk = 1;
 }
 
 void sptlrpc_request_out_callback(struct ptlrpc_request *req)
 {
-        if (RPC_FLVR_SVC(req->rq_flvr.sf_rpc) != SPTLRPC_SVC_PRIV)
+        if (SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc) != SPTLRPC_SVC_PRIV)
                 return;
 
         LASSERT(req->rq_clrbuf);
@@ -885,7 +922,7 @@ int sptlrpc_cli_wrap_request(struct ptlrpc_request *req)
                         RETURN(rc);
         }
 
-        switch (RPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
+        switch (SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
         case SPTLRPC_SVC_NULL:
         case SPTLRPC_SVC_AUTH:
         case SPTLRPC_SVC_INTG:
@@ -913,7 +950,7 @@ static int do_cli_unwrap_reply(struct ptlrpc_request *req)
 {
         struct ptlrpc_cli_ctx *ctx = req->rq_cli_ctx;
         int                    rc;
-        __u16                  rpc_flvr;
+        __u32                  flvr;
         ENTRY;
 
         LASSERT(ctx);
@@ -929,26 +966,26 @@ static int do_cli_unwrap_reply(struct ptlrpc_request *req)
         }
 
         /* v2 message, check request/reply policy match */
-        rpc_flvr = WIRE_FLVR_RPC(req->rq_repdata->lm_secflvr);
+        flvr = WIRE_FLVR(req->rq_repdata->lm_secflvr);
 
         if (req->rq_repdata->lm_magic == LUSTRE_MSG_MAGIC_V2_SWABBED)
-                __swab16s(&rpc_flvr);
+                __swab32s(&flvr);
 
-        if (RPC_FLVR_POLICY(rpc_flvr) !=
-            RPC_FLVR_POLICY(req->rq_flvr.sf_rpc)) {
+        if (SPTLRPC_FLVR_POLICY(flvr) !=
+            SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc)) {
                 CERROR("request policy was %u while reply with %u\n",
-                       RPC_FLVR_POLICY(req->rq_flvr.sf_rpc),
-                       RPC_FLVR_POLICY(rpc_flvr));
+                       SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc),
+                       SPTLRPC_FLVR_POLICY(flvr));
                 RETURN(-EPROTO);
         }
 
         /* do nothing if it's null policy; otherwise unpack the
          * wrapper message */
-        if (RPC_FLVR_POLICY(rpc_flvr) != SPTLRPC_POLICY_NULL &&
+        if (SPTLRPC_FLVR_POLICY(flvr) != SPTLRPC_POLICY_NULL &&
             lustre_unpack_msg(req->rq_repdata, req->rq_repdata_len))
                 RETURN(-EPROTO);
 
-        switch (RPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
+        switch (SPTLRPC_FLVR_SVC(req->rq_flvr.sf_rpc)) {
         case SPTLRPC_SVC_NULL:
         case SPTLRPC_SVC_AUTH:
         case SPTLRPC_SVC_INTG:
@@ -1188,7 +1225,7 @@ void sptlrpc_sec_put(struct ptlrpc_sec *sec)
 EXPORT_SYMBOL(sptlrpc_sec_put);
 
 /*
- * it's policy module responsible for taking refrence of import
+ * policy module is responsible for taking refrence of import
  */
 static
 struct ptlrpc_sec * sptlrpc_sec_create(struct obd_import *imp,
@@ -1198,6 +1235,7 @@ struct ptlrpc_sec * sptlrpc_sec_create(struct obd_import *imp,
 {
         struct ptlrpc_sec_policy *policy;
         struct ptlrpc_sec        *sec;
+        char                      str[32];
         ENTRY;
 
         if (svc_ctx) {
@@ -1206,7 +1244,7 @@ struct ptlrpc_sec * sptlrpc_sec_create(struct obd_import *imp,
                 CDEBUG(D_SEC, "%s %s: reverse sec using flavor %s\n",
                        imp->imp_obd->obd_type->typ_name,
                        imp->imp_obd->obd_name,
-                       sptlrpc_rpcflavor2name(sf->sf_rpc));
+                       sptlrpc_flavor2name(sf, str, sizeof(str)));
 
                 policy = sptlrpc_policy_get(svc_ctx->sc_policy);
                 sf->sf_flags |= PTLRPC_SEC_FL_REVERSE | PTLRPC_SEC_FL_ROOTONLY;
@@ -1216,9 +1254,9 @@ struct ptlrpc_sec * sptlrpc_sec_create(struct obd_import *imp,
                 CDEBUG(D_SEC, "%s %s: select security flavor %s\n",
                        imp->imp_obd->obd_type->typ_name,
                        imp->imp_obd->obd_name,
-                       sptlrpc_rpcflavor2name(sf->sf_rpc));
+                       sptlrpc_flavor2name(sf, str, sizeof(str)));
 
-                policy = sptlrpc_rpcflavor2policy(sf->sf_rpc);
+                policy = sptlrpc_wireflavor2policy(sf->sf_rpc);
                 if (!policy) {
                         CERROR("invalid flavor 0x%x\n", sf->sf_rpc);
                         RETURN(NULL);
@@ -1272,52 +1310,49 @@ static void sptlrpc_import_sec_install(struct obd_import *imp,
         }
 }
 
+static inline
+int flavor_equal(struct sptlrpc_flavor *sf1, struct sptlrpc_flavor *sf2)
+{
+        return (memcmp(sf1, sf2, sizeof(*sf1)) == 0);
+}
+
+static inline
+void flavor_copy(struct sptlrpc_flavor *dst, struct sptlrpc_flavor *src)
+{
+        *dst = *src;
+}
+
 static void sptlrpc_import_sec_adapt_inplace(struct obd_import *imp,
                                              struct ptlrpc_sec *sec,
                                              struct sptlrpc_flavor *sf)
 {
-        if (sf->sf_bulk_ciph != sec->ps_flvr.sf_bulk_ciph ||
-            sf->sf_bulk_hash != sec->ps_flvr.sf_bulk_hash) {
-                CWARN("imp %p (%s->%s): changing bulk flavor %s/%s -> %s/%s\n",
-                      imp, imp->imp_obd->obd_name,
-                      obd_uuid2str(&imp->imp_connection->c_remote_uuid),
-                      sptlrpc_get_ciph_name(sec->ps_flvr.sf_bulk_ciph),
-                      sptlrpc_get_hash_name(sec->ps_flvr.sf_bulk_hash),
-                      sptlrpc_get_ciph_name(sf->sf_bulk_ciph),
-                      sptlrpc_get_hash_name(sf->sf_bulk_hash));
-
-                spin_lock(&sec->ps_lock);
-                sec->ps_flvr.sf_bulk_ciph = sf->sf_bulk_ciph;
-                sec->ps_flvr.sf_bulk_hash = sf->sf_bulk_hash;
-                spin_unlock(&sec->ps_lock);
-        }
+        char    str1[32], str2[32];
 
-        if (!equi(sf->sf_flags & PTLRPC_SEC_FL_UDESC,
-                  sec->ps_flvr.sf_flags & PTLRPC_SEC_FL_UDESC)) {
-                CWARN("imp %p (%s->%s): %s shipping user descriptor\n",
-                      imp, imp->imp_obd->obd_name,
-                      obd_uuid2str(&imp->imp_connection->c_remote_uuid),
-                      (sf->sf_flags & PTLRPC_SEC_FL_UDESC) ? "start" : "stop");
+        if (sec->ps_flvr.sf_flags != sf->sf_flags)
+                CWARN("changing sec flags: %s -> %s\n",
+                      sptlrpc_secflags2str(sec->ps_flvr.sf_flags,
+                                           str1, sizeof(str1)),
+                      sptlrpc_secflags2str(sf->sf_flags,
+                                           str2, sizeof(str2)));
 
-                spin_lock(&sec->ps_lock);
-                sec->ps_flvr.sf_flags &= ~PTLRPC_SEC_FL_UDESC;
-                sec->ps_flvr.sf_flags |= sf->sf_flags & PTLRPC_SEC_FL_UDESC;
-                spin_unlock(&sec->ps_lock);
-        }
+        spin_lock(&sec->ps_lock);
+        flavor_copy(&sec->ps_flvr, sf);
+        spin_unlock(&sec->ps_lock);
 }
 
 /*
- * for normal import, @svc_ctx should be NULL and @rpc_flavor is ignored;
- * for reverse import, @svc_ctx and @rpc_flavor is from incoming request.
+ * for normal import, @svc_ctx should be NULL and @flvr is ignored;
+ * for reverse import, @svc_ctx and @flvr is from incoming request.
  */
 int sptlrpc_import_sec_adapt(struct obd_import *imp,
                              struct ptlrpc_svc_ctx *svc_ctx,
-                             __u16 rpc_flavor)
+                             struct sptlrpc_flavor *flvr)
 {
         struct ptlrpc_connection   *conn;
         struct sptlrpc_flavor       sf;
         struct ptlrpc_sec          *sec, *newsec;
         enum lustre_sec_part        sp;
+        char                        str[24];
         int                         rc;
 
         might_sleep();
@@ -1344,57 +1379,45 @@ int sptlrpc_import_sec_adapt(struct obd_import *imp,
                 sp = imp->imp_obd->u.cli.cl_sp_me;
         } else {
                 /* reverse import, determine flavor from incoming reqeust */
-                sf.sf_rpc = rpc_flavor;
-                sf.sf_bulk_ciph = BULK_CIPH_ALG_NULL;
-                sf.sf_bulk_hash = BULK_HASH_ALG_NULL;
-                sf.sf_flags = PTLRPC_SEC_FL_REVERSE | PTLRPC_SEC_FL_ROOTONLY;
+                sf = *flvr;
+
+                if (sf.sf_rpc != SPTLRPC_FLVR_NULL)
+                        sf.sf_flags = PTLRPC_SEC_FL_REVERSE |
+                                      PTLRPC_SEC_FL_ROOTONLY;
 
                 sp = sptlrpc_target_sec_part(imp->imp_obd);
         }
 
         sec = sptlrpc_import_sec_ref(imp);
         if (sec) {
-                if (svc_ctx == NULL) {
-                        /* normal import, only check rpc flavor, if just bulk
-                         * flavor or flags changed, we can handle it on the fly
-                         * without switching sec. */
-                        if (sf.sf_rpc == sec->ps_flvr.sf_rpc) {
-                                sptlrpc_import_sec_adapt_inplace(imp, sec, &sf);
-
-                                rc = 0;
-                                goto out;
-                        }
-                } else {
-                        /* reverse import, do not compare bulk flavor */
-                        if (sf.sf_rpc == sec->ps_flvr.sf_rpc) {
-                                rc = 0;
-                                goto out;
-                        }
-                }
+                char    str2[24];
+
+                if (flavor_equal(&sf, &sec->ps_flvr))
+                        goto out;
 
                 CWARN("%simport %p (%s%s%s): changing flavor "
-                      "(%s, %s/%s) -> (%s, %s/%s)\n",
-                      svc_ctx ? "reverse " : "",
+                      "%s -> %s\n", svc_ctx ? "reverse " : "",
                       imp, imp->imp_obd->obd_name,
                       svc_ctx == NULL ? "->" : "<-",
                       obd_uuid2str(&conn->c_remote_uuid),
-                      sptlrpc_rpcflavor2name(sec->ps_flvr.sf_rpc),
-                      sptlrpc_get_hash_name(sec->ps_flvr.sf_bulk_hash),
-                      sptlrpc_get_ciph_name(sec->ps_flvr.sf_bulk_ciph),
-                      sptlrpc_rpcflavor2name(sf.sf_rpc),
-                      sptlrpc_get_hash_name(sf.sf_bulk_hash),
-                      sptlrpc_get_ciph_name(sf.sf_bulk_ciph));
+                      sptlrpc_flavor2name(&sec->ps_flvr, str, sizeof(str)),
+                      sptlrpc_flavor2name(&sf, str2, sizeof(str2)));
+
+                if (SPTLRPC_FLVR_POLICY(sf.sf_rpc) ==
+                    SPTLRPC_FLVR_POLICY(sec->ps_flvr.sf_rpc) &&
+                    SPTLRPC_FLVR_MECH(sf.sf_rpc) ==
+                    SPTLRPC_FLVR_MECH(sec->ps_flvr.sf_rpc)) {
+                        sptlrpc_import_sec_adapt_inplace(imp, sec, &sf);
+                        goto out;
+                }
         } else {
-                CWARN("%simport %p (%s%s%s) netid %x: "
-                      "select initial flavor (%s, %s/%s)\n",
+                CWARN("%simport %p (%s%s%s) netid %x: select flavor %s\n",
                       svc_ctx == NULL ? "" : "reverse ",
                       imp, imp->imp_obd->obd_name,
                       svc_ctx == NULL ? "->" : "<-",
                       obd_uuid2str(&conn->c_remote_uuid),
                       LNET_NIDNET(conn->c_self),
-                      sptlrpc_rpcflavor2name(sf.sf_rpc),
-                      sptlrpc_get_hash_name(sf.sf_bulk_hash),
-                      sptlrpc_get_ciph_name(sf.sf_bulk_ciph));
+                      sptlrpc_flavor2name(&sf, str, sizeof(str)));
         }
 
         mutex_down(&imp->imp_sec_mutex);
@@ -1659,8 +1682,9 @@ static int flavor_allowed(struct sptlrpc_flavor *exp,
                 return 1;
 
         if ((req->rq_ctx_init || req->rq_ctx_fini) &&
-            RPC_FLVR_POLICY(exp->sf_rpc) == RPC_FLVR_POLICY(flvr->sf_rpc) &&
-            RPC_FLVR_MECH(exp->sf_rpc) == RPC_FLVR_MECH(flvr->sf_rpc))
+            SPTLRPC_FLVR_POLICY(exp->sf_rpc) ==
+            SPTLRPC_FLVR_POLICY(flvr->sf_rpc) &&
+            SPTLRPC_FLVR_MECH(exp->sf_rpc) == SPTLRPC_FLVR_MECH(flvr->sf_rpc))
                 return 1;
 
         return 0;
@@ -1725,7 +1749,7 @@ int sptlrpc_target_export_check(struct obd_export *exp,
                 spin_unlock(&exp->exp_lock);
 
                 return sptlrpc_import_sec_adapt(exp->exp_imp_reverse,
-                                                req->rq_svc_ctx, flavor.sf_rpc);
+                                                req->rq_svc_ctx, &flavor);
         }
 
         /* if it equals to the current flavor, we accept it, but need to
@@ -1759,7 +1783,7 @@ int sptlrpc_target_export_check(struct obd_export *exp,
 
                         return sptlrpc_import_sec_adapt(exp->exp_imp_reverse,
                                                         req->rq_svc_ctx,
-                                                        flavor.sf_rpc);
+                                                        &flavor);
                 } else {
                         CDEBUG(D_SEC, "exp %p (%x|%x|%x): is current flavor, "
                                "install rvs ctx\n", exp, exp->exp_flvr.sf_rpc,
@@ -1866,7 +1890,7 @@ void sptlrpc_target_update_exp_flavor(struct obd_device *obd,
                                              exp->exp_connection->c_peer.nid,
                                              &new_flvr);
                 if (exp->exp_flvr_changed ||
-                    memcmp(&new_flvr, &exp->exp_flvr, sizeof(new_flvr))) {
+                    !flavor_equal(&new_flvr, &exp->exp_flvr)) {
                         exp->exp_flvr_old[1] = new_flvr;
                         exp->exp_flvr_expire[1] = 0;
                         exp->exp_flvr_changed = 1;
@@ -1931,13 +1955,14 @@ static int sptlrpc_svc_check_from(struct ptlrpc_request *req, int svc_rc)
 int sptlrpc_svc_unwrap_request(struct ptlrpc_request *req)
 {
         struct ptlrpc_sec_policy *policy;
-        struct lustre_msg *msg = req->rq_reqbuf;
-        int rc;
+        struct lustre_msg        *msg = req->rq_reqbuf;
+        int                       rc;
         ENTRY;
 
         LASSERT(msg);
         LASSERT(req->rq_reqmsg == NULL);
         LASSERT(req->rq_repmsg == NULL);
+        LASSERT(req->rq_svc_ctx == NULL);
 
         req->rq_sp_from = LUSTRE_SP_ANY;
         req->rq_auth_uid = INVALID_UID;
@@ -1949,19 +1974,28 @@ int sptlrpc_svc_unwrap_request(struct ptlrpc_request *req)
         }
 
         /*
-         * v2 message.
+         * only expect v2 message.
          */
-        if (msg->lm_magic == LUSTRE_MSG_MAGIC_V2)
-                req->rq_flvr.sf_rpc = WIRE_FLVR_RPC(msg->lm_secflvr);
-        else
-                req->rq_flvr.sf_rpc = WIRE_FLVR_RPC(__swab32(msg->lm_secflvr));
+        switch (msg->lm_magic) {
+        case LUSTRE_MSG_MAGIC_V2:
+                req->rq_flvr.sf_rpc = WIRE_FLVR(msg->lm_secflvr);
+                break;
+        case LUSTRE_MSG_MAGIC_V2_SWABBED:
+                req->rq_flvr.sf_rpc = WIRE_FLVR(__swab32(msg->lm_secflvr));
+                break;
+        default:
+                CERROR("invalid magic %x\n", msg->lm_magic);
+                RETURN(SECSVC_DROP);
+        }
 
         /* unpack the wrapper message if the policy is not null */
-        if ((RPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL) &&
-             lustre_unpack_msg(msg, req->rq_reqdata_len))
+        if (SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) != SPTLRPC_POLICY_NULL &&
+            lustre_unpack_msg(msg, req->rq_reqdata_len)) {
+                CERROR("invalid wrapper msg format\n");
                 RETURN(SECSVC_DROP);
+        }
 
-        policy = sptlrpc_rpcflavor2policy(req->rq_flvr.sf_rpc);
+        policy = sptlrpc_wireflavor2policy(req->rq_flvr.sf_rpc);
         if (!policy) {
                 CERROR("unsupported rpc flavor %x\n", req->rq_flvr.sf_rpc);
                 RETURN(SECSVC_DROP);
@@ -1971,22 +2005,11 @@ int sptlrpc_svc_unwrap_request(struct ptlrpc_request *req)
         rc = policy->sp_sops->accept(req);
 
         LASSERT(req->rq_reqmsg || rc != SECSVC_OK);
+        LASSERT(req->rq_svc_ctx || rc == SECSVC_DROP);
         sptlrpc_policy_put(policy);
 
         /* sanity check for the request source */
         rc = sptlrpc_svc_check_from(req, rc);
-
-        /* FIXME move to proper place */
-        if (rc == SECSVC_OK) {
-                __u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
-
-                if (opc == OST_WRITE)
-                        req->rq_bulk_write = 1;
-                else if (opc == OST_READ)
-                        req->rq_bulk_read = 1;
-        }
-
-        LASSERT(req->rq_svc_ctx || rc == SECSVC_DROP);
         RETURN(rc);
 }
 
@@ -2111,11 +2134,11 @@ int sptlrpc_cli_wrap_bulk(struct ptlrpc_request *req,
 {
         struct ptlrpc_cli_ctx *ctx;
 
+        LASSERT(req->rq_bulk_read || req->rq_bulk_write);
+
         if (!req->rq_pack_bulk)
                 return 0;
 
-        LASSERT(req->rq_bulk_read || req->rq_bulk_write);
-
         ctx = req->rq_cli_ctx;
         if (ctx->cc_ops->wrap_bulk)
                 return ctx->cc_ops->wrap_bulk(ctx, req, desc);
@@ -2123,79 +2146,61 @@ int sptlrpc_cli_wrap_bulk(struct ptlrpc_request *req,
 }
 EXPORT_SYMBOL(sptlrpc_cli_wrap_bulk);
 
-static
-void pga_to_bulk_desc(int nob, obd_count pg_count, struct brw_page **pga,
-                      struct ptlrpc_bulk_desc *desc)
-{
-        int i;
-
-        LASSERT(pga);
-        LASSERT(*pga);
-
-        for (i = 0; i < pg_count && nob > 0; i++) {
-#ifdef __KERNEL__
-                desc->bd_iov[i].kiov_page = pga[i]->pg;
-                desc->bd_iov[i].kiov_len = pga[i]->count > nob ?
-                                           nob : pga[i]->count;
-                desc->bd_iov[i].kiov_offset = pga[i]->off & ~CFS_PAGE_MASK;
-#else
-                /* FIXME currently liblustre doesn't support bulk encryption.
-                 * if we do, check again following may not be right. */
-                LASSERTF(0, "Bulk encryption not implemented for liblustre\n");
-                desc->bd_iov[i].iov_base = pga[i]->pg->addr;
-                desc->bd_iov[i].iov_len = pga[i]->count > nob ?
-                                           nob : pga[i]->count;
-#endif
-
-                desc->bd_iov_count++;
-                nob -= pga[i]->count;
-        }
-}
-
+/*
+ * return nob of actual plain text size received, or error code.
+ */
 int sptlrpc_cli_unwrap_bulk_read(struct ptlrpc_request *req,
-                                 int nob, obd_count pg_count,
-                                 struct brw_page **pga)
+                                 struct ptlrpc_bulk_desc *desc,
+                                 int nob)
 {
-        struct ptlrpc_bulk_desc *desc;
-        struct ptlrpc_cli_ctx *ctx;
-        int rc = 0;
-
-        if (!req->rq_pack_bulk)
-                return 0;
+        struct ptlrpc_cli_ctx  *ctx;
+        int                     rc;
 
         LASSERT(req->rq_bulk_read && !req->rq_bulk_write);
 
-        OBD_ALLOC(desc, offsetof(struct ptlrpc_bulk_desc, bd_iov[pg_count]));
-        if (desc == NULL) {
-                CERROR("out of memory, can't verify bulk read data\n");
-                return -ENOMEM;
-        }
-
-        pga_to_bulk_desc(nob, pg_count, pga, desc);
+        if (!req->rq_pack_bulk)
+                return desc->bd_nob_transferred;
 
         ctx = req->rq_cli_ctx;
-        if (ctx->cc_ops->unwrap_bulk)
+        if (ctx->cc_ops->unwrap_bulk) {
                 rc = ctx->cc_ops->unwrap_bulk(ctx, req, desc);
-
-        OBD_FREE(desc, offsetof(struct ptlrpc_bulk_desc, bd_iov[pg_count]));
-
-        return rc;
+                if (rc < 0)
+                        return rc;
+        }
+        return desc->bd_nob_transferred;
 }
 EXPORT_SYMBOL(sptlrpc_cli_unwrap_bulk_read);
 
+/*
+ * return 0 for success or error code.
+ */
 int sptlrpc_cli_unwrap_bulk_write(struct ptlrpc_request *req,
                                   struct ptlrpc_bulk_desc *desc)
 {
-        struct ptlrpc_cli_ctx *ctx;
+        struct ptlrpc_cli_ctx  *ctx;
+        int                     rc;
+
+        LASSERT(!req->rq_bulk_read && req->rq_bulk_write);
 
         if (!req->rq_pack_bulk)
                 return 0;
 
-        LASSERT(!req->rq_bulk_read && req->rq_bulk_write);
-
         ctx = req->rq_cli_ctx;
-        if (ctx->cc_ops->unwrap_bulk)
-                return ctx->cc_ops->unwrap_bulk(ctx, req, desc);
+        if (ctx->cc_ops->unwrap_bulk) {
+                rc = ctx->cc_ops->unwrap_bulk(ctx, req, desc);
+                if (rc < 0)
+                        return rc;
+        }
+
+        /*
+         * if everything is going right, nob should equals to nob_transferred.
+         * in case of privacy mode, nob_transferred needs to be adjusted.
+         */
+        if (desc->bd_nob != desc->bd_nob_transferred) {
+                CERROR("nob %d doesn't match transferred nob %d",
+                       desc->bd_nob, desc->bd_nob_transferred);
+                return -EPROTO;
+        }
 
         return 0;
 }
@@ -2206,11 +2211,11 @@ int sptlrpc_svc_wrap_bulk(struct ptlrpc_request *req,
 {
         struct ptlrpc_svc_ctx *ctx;
 
+        LASSERT(req->rq_bulk_read);
+
         if (!req->rq_pack_bulk)
                 return 0;
 
-        LASSERT(req->rq_bulk_read || req->rq_bulk_write);
-
         ctx = req->rq_svc_ctx;
         if (ctx->sc_policy->sp_sops->wrap_bulk)
                 return ctx->sc_policy->sp_sops->wrap_bulk(req, desc);
@@ -2223,20 +2228,50 @@ int sptlrpc_svc_unwrap_bulk(struct ptlrpc_request *req,
                             struct ptlrpc_bulk_desc *desc)
 {
         struct ptlrpc_svc_ctx *ctx;
+        int                    rc;
+
+        LASSERT(req->rq_bulk_write);
+
+        if (desc->bd_nob_transferred != desc->bd_nob &&
+            SPTLRPC_FLVR_BULK_SVC(req->rq_flvr.sf_rpc) !=
+            SPTLRPC_BULK_SVC_PRIV) {
+                DEBUG_REQ(D_ERROR, req, "truncated bulk GET %d(%d)",
+                          desc->bd_nob_transferred, desc->bd_nob);
+                return -ETIMEDOUT;
+        }
 
         if (!req->rq_pack_bulk)
                 return 0;
 
-        LASSERT(req->rq_bulk_read || req->rq_bulk_write);
-
         ctx = req->rq_svc_ctx;
-        if (ctx->sc_policy->sp_sops->unwrap_bulk);
-                return ctx->sc_policy->sp_sops->unwrap_bulk(req, desc);
+        if (ctx->sc_policy->sp_sops->unwrap_bulk) {
+                rc = ctx->sc_policy->sp_sops->unwrap_bulk(req, desc);
+                if (rc)
+                        CERROR("error unwrap bulk: %d\n", rc);
+        }
 
+        /* return 0 to allow reply be sent */
         return 0;
 }
 EXPORT_SYMBOL(sptlrpc_svc_unwrap_bulk);
 
+int sptlrpc_svc_prep_bulk(struct ptlrpc_request *req,
+                          struct ptlrpc_bulk_desc *desc)
+{
+        struct ptlrpc_svc_ctx *ctx;
+
+        LASSERT(req->rq_bulk_write);
+
+        if (!req->rq_pack_bulk)
+                return 0;
+
+        ctx = req->rq_svc_ctx;
+        if (ctx->sc_policy->sp_sops->prep_bulk)
+                return ctx->sc_policy->sp_sops->prep_bulk(req, desc);
+
+        return 0;
+}
+EXPORT_SYMBOL(sptlrpc_svc_prep_bulk);
 
 /****************************************
  * user descriptor helpers              *
@@ -2337,6 +2372,21 @@ const char * sec2target_str(struct ptlrpc_sec *sec)
 }
 EXPORT_SYMBOL(sec2target_str);
 
+/*
+ * return true if the bulk data is protected
+ */
+int sptlrpc_flavor_has_bulk(struct sptlrpc_flavor *flvr)
+{
+        switch (SPTLRPC_FLVR_BULK_SVC(flvr->sf_rpc)) {
+        case SPTLRPC_BULK_SVC_INTG:
+        case SPTLRPC_BULK_SVC_PRIV:
+                return 1;
+        default:
+                return 0;
+        }
+}
+EXPORT_SYMBOL(sptlrpc_flavor_has_bulk);
+
 /****************************************
  * crypto API helper/alloc blkciper     *
  ****************************************/
index 12ff171..c09cf0c 100644 (file)
@@ -456,8 +456,10 @@ out:
 
 static inline void enc_pools_wakeup(void)
 {
+        LASSERT_SPIN_LOCKED(&page_pools.epp_lock);
+        LASSERT(page_pools.epp_waitqlen >= 0);
+
         if (unlikely(page_pools.epp_waitqlen)) {
-                LASSERT(page_pools.epp_waitqlen > 0);
                 LASSERT(cfs_waitq_active(&page_pools.epp_waitq));
                 cfs_waitq_broadcast(&page_pools.epp_waitq);
         }
@@ -476,11 +478,15 @@ static int enc_pools_should_grow(int page_needed, long now)
         if (page_pools.epp_total_pages < page_needed)
                 return 1;
 
-        /* if we just did a shrink due to memory tight, we'd better
-         * wait a while to grow again.
+        /*
+         * we wanted to return 0 here if there was a shrink just happened
+         * moment ago, but this may cause deadlock if both client and ost
+         * live on single node.
          */
+#if 0
         if (now - page_pools.epp_last_shrink < 2)
                 return 0;
+#endif
 
         /*
          * here we perhaps need consider other factors like wait queue
@@ -503,32 +509,32 @@ int sptlrpc_enc_pool_get_pages(struct ptlrpc_bulk_desc *desc)
         int             p_idx, g_idx;
         int             i;
 
-        LASSERT(desc->bd_max_iov > 0);
-        LASSERT(desc->bd_max_iov <= page_pools.epp_max_pages);
+        LASSERT(desc->bd_iov_count > 0);
+        LASSERT(desc->bd_iov_count <= page_pools.epp_max_pages);
 
-        /* resent bulk, enc pages might have been allocated previously */
-        if (desc->bd_enc_pages != NULL)
+        /* resent bulk, enc iov might have been allocated previously */
+        if (desc->bd_enc_iov != NULL)
                 return 0;
 
-        OBD_ALLOC(desc->bd_enc_pages,
-                  desc->bd_max_iov * sizeof(*desc->bd_enc_pages));
-        if (desc->bd_enc_pages == NULL)
+        OBD_ALLOC(desc->bd_enc_iov,
+                  desc->bd_iov_count * sizeof(*desc->bd_enc_iov));
+        if (desc->bd_enc_iov == NULL)
                 return -ENOMEM;
 
         spin_lock(&page_pools.epp_lock);
 
         page_pools.epp_st_access++;
 again:
-        if (unlikely(page_pools.epp_free_pages < desc->bd_max_iov)) {
+        if (unlikely(page_pools.epp_free_pages < desc->bd_iov_count)) {
                 if (tick == 0)
                         tick = cfs_time_current();
 
                 now = cfs_time_current_sec();
 
                 page_pools.epp_st_missings++;
-                page_pools.epp_pages_short += desc->bd_max_iov;
+                page_pools.epp_pages_short += desc->bd_iov_count;
 
-                if (enc_pools_should_grow(desc->bd_max_iov, now)) {
+                if (enc_pools_should_grow(desc->bd_iov_count, now)) {
                         page_pools.epp_growing = 1;
 
                         spin_unlock(&page_pools.epp_lock);
@@ -536,6 +542,8 @@ again:
                         spin_lock(&page_pools.epp_lock);
 
                         page_pools.epp_growing = 0;
+
+                        enc_pools_wakeup();
                 } else {
                         if (++page_pools.epp_waitqlen >
                             page_pools.epp_st_max_wqlen)
@@ -549,14 +557,13 @@ again:
                         spin_unlock(&page_pools.epp_lock);
                         cfs_waitq_wait(&waitlink, CFS_TASK_UNINT);
                         cfs_waitq_del(&page_pools.epp_waitq, &waitlink);
-                        spin_lock(&page_pools.epp_lock);
-
                         LASSERT(page_pools.epp_waitqlen > 0);
+                        spin_lock(&page_pools.epp_lock);
                         page_pools.epp_waitqlen--;
                 }
 
-                LASSERT(page_pools.epp_pages_short >= desc->bd_max_iov);
-                page_pools.epp_pages_short -= desc->bd_max_iov;
+                LASSERT(page_pools.epp_pages_short >= desc->bd_iov_count);
+                page_pools.epp_pages_short -= desc->bd_iov_count;
 
                 this_idle = 0;
                 goto again;
@@ -570,14 +577,15 @@ again:
         }
 
         /* proceed with rest of allocation */
-        page_pools.epp_free_pages -= desc->bd_max_iov;
+        page_pools.epp_free_pages -= desc->bd_iov_count;
 
         p_idx = page_pools.epp_free_pages / PAGES_PER_POOL;
         g_idx = page_pools.epp_free_pages % PAGES_PER_POOL;
 
-        for (i = 0; i < desc->bd_max_iov; i++) {
+        for (i = 0; i < desc->bd_iov_count; i++) {
                 LASSERT(page_pools.epp_pools[p_idx][g_idx] != NULL);
-                desc->bd_enc_pages[i] = page_pools.epp_pools[p_idx][g_idx];
+                desc->bd_enc_iov[i].kiov_page =
+                                        page_pools.epp_pools[p_idx][g_idx];
                 page_pools.epp_pools[p_idx][g_idx] = NULL;
 
                 if (++g_idx == PAGES_PER_POOL) {
@@ -612,26 +620,27 @@ void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc)
         int     p_idx, g_idx;
         int     i;
 
-        if (desc->bd_enc_pages == NULL)
-                return;
-        if (desc->bd_max_iov == 0)
+        if (desc->bd_enc_iov == NULL)
                 return;
 
+        LASSERT(desc->bd_iov_count > 0);
+
         spin_lock(&page_pools.epp_lock);
 
         p_idx = page_pools.epp_free_pages / PAGES_PER_POOL;
         g_idx = page_pools.epp_free_pages % PAGES_PER_POOL;
 
-        LASSERT(page_pools.epp_free_pages + desc->bd_max_iov <=
+        LASSERT(page_pools.epp_free_pages + desc->bd_iov_count <=
                 page_pools.epp_total_pages);
         LASSERT(page_pools.epp_pools[p_idx]);
 
-        for (i = 0; i < desc->bd_max_iov; i++) {
-                LASSERT(desc->bd_enc_pages[i] != NULL);
+        for (i = 0; i < desc->bd_iov_count; i++) {
+                LASSERT(desc->bd_enc_iov[i].kiov_page != NULL);
                 LASSERT(g_idx != 0 || page_pools.epp_pools[p_idx]);
                 LASSERT(page_pools.epp_pools[p_idx][g_idx] == NULL);
 
-                page_pools.epp_pools[p_idx][g_idx] = desc->bd_enc_pages[i];
+                page_pools.epp_pools[p_idx][g_idx] =
+                                        desc->bd_enc_iov[i].kiov_page;
 
                 if (++g_idx == PAGES_PER_POOL) {
                         p_idx++;
@@ -639,15 +648,15 @@ void sptlrpc_enc_pool_put_pages(struct ptlrpc_bulk_desc *desc)
                 }
         }
 
-        page_pools.epp_free_pages += desc->bd_max_iov;
+        page_pools.epp_free_pages += desc->bd_iov_count;
 
         enc_pools_wakeup();
 
         spin_unlock(&page_pools.epp_lock);
 
-        OBD_FREE(desc->bd_enc_pages,
-                 desc->bd_max_iov * sizeof(*desc->bd_enc_pages));
-        desc->bd_enc_pages = NULL;
+        OBD_FREE(desc->bd_enc_iov,
+                 desc->bd_iov_count * sizeof(*desc->bd_enc_iov));
+        desc->bd_enc_iov = NULL;
 }
 EXPORT_SYMBOL(sptlrpc_enc_pool_put_pages);
 
@@ -668,7 +677,8 @@ int sptlrpc_enc_pool_add_user(void)
         spin_unlock(&page_pools.epp_lock);
 
         if (need_grow) {
-                enc_pools_add_pages(PTLRPC_MAX_BRW_PAGES);
+                enc_pools_add_pages(PTLRPC_MAX_BRW_PAGES +
+                                    PTLRPC_MAX_BRW_PAGES);
 
                 spin_lock(&page_pools.epp_lock);
                 page_pools.epp_growing = 0;
@@ -815,9 +825,6 @@ static struct sptlrpc_hash_type hash_types[] = {
         [BULK_HASH_ALG_SHA256]  = { "sha256",   "sha256",       32 },
         [BULK_HASH_ALG_SHA384]  = { "sha384",   "sha384",       48 },
         [BULK_HASH_ALG_SHA512]  = { "sha512",   "sha512",       64 },
-        [BULK_HASH_ALG_WP256]   = { "wp256",    "wp256",        32 },
-        [BULK_HASH_ALG_WP384]   = { "wp384",    "wp384",        48 },
-        [BULK_HASH_ALG_WP512]   = { "wp512",    "wp512",        64 },
 };
 
 const struct sptlrpc_hash_type *sptlrpc_get_hash_type(__u8 hash_alg)
@@ -845,24 +852,21 @@ const char * sptlrpc_get_hash_name(__u8 hash_alg)
 }
 EXPORT_SYMBOL(sptlrpc_get_hash_name);
 
-int bulk_sec_desc_size(__u8 hash_alg, int request, int read)
+__u8 sptlrpc_get_hash_alg(const char *algname)
 {
-        int size = sizeof(struct ptlrpc_bulk_sec_desc);
-
-        LASSERT(hash_alg < BULK_HASH_ALG_MAX);
-
-        /* read request don't need extra data */
-        if (!(read && request))
-                size += hash_types[hash_alg].sht_size;
+        int     i;
 
-        return size;
+        for (i = 0; i < BULK_HASH_ALG_MAX; i++)
+                if (!strcmp(hash_types[i].sht_name, algname))
+                        break;
+        return i;
 }
-EXPORT_SYMBOL(bulk_sec_desc_size);
+EXPORT_SYMBOL(sptlrpc_get_hash_alg);
 
 int bulk_sec_desc_unpack(struct lustre_msg *msg, int offset)
 {
         struct ptlrpc_bulk_sec_desc *bsd;
-        int    size = msg->lm_buflens[offset];
+        int                          size = msg->lm_buflens[offset];
 
         bsd = lustre_msg_buf(msg, offset, sizeof(*bsd));
         if (bsd == NULL) {
@@ -870,35 +874,27 @@ int bulk_sec_desc_unpack(struct lustre_msg *msg, int offset)
                 return -EINVAL;
         }
 
-        /* nothing to swab */
+        if (lustre_msg_swabbed(msg)) {
+                __swab32s(&bsd->bsd_nob);
+        }
 
         if (unlikely(bsd->bsd_version != 0)) {
                 CERROR("Unexpected version %u\n", bsd->bsd_version);
                 return -EPROTO;
         }
 
-        if (unlikely(bsd->bsd_flags != 0)) {
-                CERROR("Unexpected flags %x\n", bsd->bsd_flags);
+        if (unlikely(bsd->bsd_type >= SPTLRPC_BULK_MAX)) {
+                CERROR("Invalid type %u\n", bsd->bsd_type);
                 return -EPROTO;
         }
 
-        if (unlikely(!sptlrpc_get_hash_type(bsd->bsd_hash_alg))) {
-                CERROR("Unsupported checksum algorithm %u\n",
-                       bsd->bsd_hash_alg);
-                return -EINVAL;
-        }
+        /* FIXME more sanity check here */
 
-        if (unlikely(!sptlrpc_get_ciph_type(bsd->bsd_ciph_alg))) {
-                CERROR("Unsupported cipher algorithm %u\n",
-                       bsd->bsd_ciph_alg);
-                return -EINVAL;
-        }
-
-        if (unlikely(size > sizeof(*bsd)) &&
-            size < sizeof(*bsd) + hash_types[bsd->bsd_hash_alg].sht_size) {
-                CERROR("Mal-formed checksum data: csum alg %u, size %d\n",
-                       bsd->bsd_hash_alg, size);
-                return -EINVAL;
+        if (unlikely(bsd->bsd_svc != SPTLRPC_BULK_SVC_NULL &&
+                     bsd->bsd_svc != SPTLRPC_BULK_SVC_INTG &&
+                     bsd->bsd_svc != SPTLRPC_BULK_SVC_PRIV)) {
+                CERROR("Invalid svc %u\n", bsd->bsd_svc);
+                return -EPROTO;
         }
 
         return 0;
@@ -957,14 +953,17 @@ static int do_bulk_checksum_crc32(struct ptlrpc_bulk_desc *desc, void *buf)
         return 0;
 }
 
-static int do_bulk_checksum(struct ptlrpc_bulk_desc *desc, __u32 alg, void *buf)
+int sptlrpc_get_bulk_checksum(struct ptlrpc_bulk_desc *desc, __u8 alg,
+                              void *buf, int buflen)
 {
         struct hash_desc    hdesc;
-        struct scatterlist *sl;
-        int i, rc = 0, bytes = 0;
+        int                 hashsize;
+        char                hashbuf[64];
+        struct scatterlist  sl;
+        int                 i;
 
-        LASSERT(alg > BULK_HASH_ALG_NULL &&
-                alg < BULK_HASH_ALG_MAX);
+        LASSERT(alg > BULK_HASH_ALG_NULL && alg < BULK_HASH_ALG_MAX);
+        LASSERT(buflen >= 4);
 
         switch (alg) {
         case BULK_HASH_ALG_ADLER32:
@@ -983,35 +982,35 @@ static int do_bulk_checksum(struct ptlrpc_bulk_desc *desc, __u32 alg, void *buf)
                 CERROR("Unable to allocate TFM %s\n", hash_types[alg].sht_name);
                 return -ENOMEM;
         }
+
         hdesc.flags = 0;
+        ll_crypto_hash_init(&hdesc);
 
-        OBD_ALLOC(sl, sizeof(*sl) * desc->bd_iov_count);
-        if (sl == NULL) {
-                rc = -ENOMEM;
-                goto out_tfm;
-        }
+        hashsize = ll_crypto_hash_digestsize(hdesc.tfm);
 
         for (i = 0; i < desc->bd_iov_count; i++) {
-                sl[i].page = desc->bd_iov[i].kiov_page;
-                sl[i].offset = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
-                sl[i].length = desc->bd_iov[i].kiov_len;
-                bytes += desc->bd_iov[i].kiov_len;
+                sl.page = desc->bd_iov[i].kiov_page;
+                sl.offset = desc->bd_iov[i].kiov_offset;
+                sl.length = desc->bd_iov[i].kiov_len;
+                ll_crypto_hash_update(&hdesc, &sl, sl.length);
         }
 
-        ll_crypto_hash_init(&hdesc);
-        ll_crypto_hash_update(&hdesc, sl, bytes);
-        ll_crypto_hash_final(&hdesc, buf);
-
-        OBD_FREE(sl, sizeof(*sl) * desc->bd_iov_count);
+        if (hashsize > buflen) {
+                ll_crypto_hash_final(&hdesc, hashbuf);
+                memcpy(buf, hashbuf, buflen);
+        } else {
+                ll_crypto_hash_final(&hdesc, buf);
+        }
 
-out_tfm:
         ll_crypto_free_hash(hdesc.tfm);
-        return rc;
+        return 0;
 }
+EXPORT_SYMBOL(sptlrpc_get_bulk_checksum);
 
 #else /* !__KERNEL__ */
 
-static int do_bulk_checksum(struct ptlrpc_bulk_desc *desc, __u32 alg, void *buf)
+int sptlrpc_get_bulk_checksum(struct ptlrpc_bulk_desc *desc, __u8 alg,
+                              void *buf, int buflen)
 {
         __u32   csum32;
         int     i;
@@ -1048,328 +1047,3 @@ static int do_bulk_checksum(struct ptlrpc_bulk_desc *desc, __u32 alg, void *buf)
 }
 
 #endif /* __KERNEL__ */
-
-/*
- * perform algorithm @alg checksum on @desc, store result in @buf.
- * if anything goes wrong, leave 'alg' be BULK_HASH_ALG_NULL.
- */
-static
-int generate_bulk_csum(struct ptlrpc_bulk_desc *desc, __u32 alg,
-                       struct ptlrpc_bulk_sec_desc *bsd, int bsdsize)
-{
-        int rc;
-
-        LASSERT(bsd);
-        LASSERT(alg < BULK_HASH_ALG_MAX);
-
-        bsd->bsd_hash_alg = BULK_HASH_ALG_NULL;
-
-        if (alg == BULK_HASH_ALG_NULL)
-                return 0;
-
-        LASSERT(bsdsize >= sizeof(*bsd) + hash_types[alg].sht_size);
-
-        rc = do_bulk_checksum(desc, alg, bsd->bsd_csum);
-        if (rc == 0)
-                bsd->bsd_hash_alg = alg;
-
-        return rc;
-}
-
-static
-int verify_bulk_csum(struct ptlrpc_bulk_desc *desc, int read,
-                     struct ptlrpc_bulk_sec_desc *bsdv, int bsdvsize,
-                     struct ptlrpc_bulk_sec_desc *bsdr, int bsdrsize)
-{
-        char *csum_p;
-        char *buf = NULL;
-        int   csum_size, rc = 0;
-
-        LASSERT(bsdv);
-        LASSERT(bsdv->bsd_hash_alg < BULK_HASH_ALG_MAX);
-
-        if (bsdr)
-                bsdr->bsd_hash_alg = BULK_HASH_ALG_NULL;
-
-        if (bsdv->bsd_hash_alg == BULK_HASH_ALG_NULL)
-                return 0;
-
-        /* for all supported algorithms */
-        csum_size = hash_types[bsdv->bsd_hash_alg].sht_size;
-
-        if (bsdvsize < sizeof(*bsdv) + csum_size) {
-                CERROR("verifier size %d too small, require %d\n",
-                       bsdvsize, (int) sizeof(*bsdv) + csum_size);
-                return -EINVAL;
-        }
-
-        if (bsdr) {
-                LASSERT(bsdrsize >= sizeof(*bsdr) + csum_size);
-                csum_p = (char *) bsdr->bsd_csum;
-        } else {
-                OBD_ALLOC(buf, csum_size);
-                if (buf == NULL)
-                        return -EINVAL;
-                csum_p = buf;
-        }
-
-        rc = do_bulk_checksum(desc, bsdv->bsd_hash_alg, csum_p);
-
-        if (memcmp(bsdv->bsd_csum, csum_p, csum_size)) {
-                CERROR("BAD %s CHECKSUM (%s), data mutated during "
-                       "transfer!\n", read ? "READ" : "WRITE",
-                       hash_types[bsdv->bsd_hash_alg].sht_name);
-                rc = -EINVAL;
-        } else {
-                CDEBUG(D_SEC, "bulk %s checksum (%s) verified\n",
-                      read ? "read" : "write",
-                      hash_types[bsdv->bsd_hash_alg].sht_name);
-        }
-
-        if (bsdr) {
-                bsdr->bsd_hash_alg = bsdv->bsd_hash_alg;
-                memcpy(bsdr->bsd_csum, csum_p, csum_size);
-        } else {
-                LASSERT(buf);
-                OBD_FREE(buf, csum_size);
-        }
-
-        return rc;
-}
-
-int bulk_csum_cli_request(struct ptlrpc_bulk_desc *desc, int read,
-                          __u32 alg, struct lustre_msg *rmsg, int roff)
-{
-        struct ptlrpc_bulk_sec_desc *bsdr;
-        int    rsize, rc = 0;
-
-        rsize = rmsg->lm_buflens[roff];
-        bsdr = lustre_msg_buf(rmsg, roff, sizeof(*bsdr));
-
-        LASSERT(bsdr);
-        LASSERT(rsize >= sizeof(*bsdr));
-        LASSERT(alg < BULK_HASH_ALG_MAX);
-
-        if (read) {
-                bsdr->bsd_hash_alg = alg;
-        } else {
-                rc = generate_bulk_csum(desc, alg, bsdr, rsize);
-                if (rc)
-                        CERROR("bulk write: client failed to compute "
-                               "checksum: %d\n", rc);
-
-                /* For sending we only compute the wrong checksum instead
-                 * of corrupting the data so it is still correct on a redo */
-                if (rc == 0 && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND) &&
-                    bsdr->bsd_hash_alg != BULK_HASH_ALG_NULL)
-                        bsdr->bsd_csum[0] ^= 0x1;
-        }
-
-        return rc;
-}
-EXPORT_SYMBOL(bulk_csum_cli_request);
-
-int bulk_csum_cli_reply(struct ptlrpc_bulk_desc *desc, int read,
-                        struct lustre_msg *rmsg, int roff,
-                        struct lustre_msg *vmsg, int voff)
-{
-        struct ptlrpc_bulk_sec_desc *bsdv, *bsdr;
-        int    rsize, vsize;
-
-        rsize = rmsg->lm_buflens[roff];
-        vsize = vmsg->lm_buflens[voff];
-        bsdr = lustre_msg_buf(rmsg, roff, 0);
-        bsdv = lustre_msg_buf(vmsg, voff, 0);
-
-        if (bsdv == NULL || vsize < sizeof(*bsdv)) {
-                CERROR("Invalid checksum verifier from server: size %d\n",
-                       vsize);
-                return -EINVAL;
-        }
-
-        LASSERT(bsdr);
-        LASSERT(rsize >= sizeof(*bsdr));
-        LASSERT(vsize >= sizeof(*bsdv));
-
-        if (bsdr->bsd_hash_alg != bsdv->bsd_hash_alg) {
-                CERROR("bulk %s: checksum algorithm mismatch: client request "
-                       "%s but server reply with %s. try to use the new one "
-                       "for checksum verification\n",
-                       read ? "read" : "write",
-                       hash_types[bsdr->bsd_hash_alg].sht_name,
-                       hash_types[bsdv->bsd_hash_alg].sht_name);
-        }
-
-        if (read)
-                return verify_bulk_csum(desc, 1, bsdv, vsize, NULL, 0);
-        else {
-                char *cli, *srv, *new = NULL;
-                int csum_size = hash_types[bsdr->bsd_hash_alg].sht_size;
-
-                LASSERT(bsdr->bsd_hash_alg < BULK_HASH_ALG_MAX);
-                if (bsdr->bsd_hash_alg == BULK_HASH_ALG_NULL)
-                        return 0;
-
-                if (vsize < sizeof(*bsdv) + csum_size) {
-                        CERROR("verifier size %d too small, require %d\n",
-                               vsize, (int) sizeof(*bsdv) + csum_size);
-                        return -EINVAL;
-                }
-
-                cli = (char *) (bsdr + 1);
-                srv = (char *) (bsdv + 1);
-
-                if (!memcmp(cli, srv, csum_size)) {
-                        /* checksum confirmed */
-                        CDEBUG(D_SEC, "bulk write checksum (%s) confirmed\n",
-                               hash_types[bsdr->bsd_hash_alg].sht_name);
-                        return 0;
-                }
-
-                /* checksum mismatch, re-compute a new one and compare with
-                 * others, give out proper warnings. */
-                OBD_ALLOC(new, csum_size);
-                if (new == NULL)
-                        return -ENOMEM;
-
-                do_bulk_checksum(desc, bsdr->bsd_hash_alg, new);
-
-                if (!memcmp(new, srv, csum_size)) {
-                        CERROR("BAD WRITE CHECKSUM (%s): pages were mutated "
-                               "on the client after we checksummed them\n",
-                               hash_types[bsdr->bsd_hash_alg].sht_name);
-                } else if (!memcmp(new, cli, csum_size)) {
-                        CERROR("BAD WRITE CHECKSUM (%s): pages were mutated "
-                               "in transit\n",
-                               hash_types[bsdr->bsd_hash_alg].sht_name);
-                } else {
-                        CERROR("BAD WRITE CHECKSUM (%s): pages were mutated "
-                               "in transit, and the current page contents "
-                               "don't match the originals and what the server "
-                               "received\n",
-                               hash_types[bsdr->bsd_hash_alg].sht_name);
-                }
-                OBD_FREE(new, csum_size);
-
-                return -EINVAL;
-        }
-}
-EXPORT_SYMBOL(bulk_csum_cli_reply);
-
-#ifdef __KERNEL__
-static void corrupt_bulk_data(struct ptlrpc_bulk_desc *desc)
-{
-        char           *ptr;
-        unsigned int    off, i;
-
-        for (i = 0; i < desc->bd_iov_count; i++) {
-                if (desc->bd_iov[i].kiov_len == 0)
-                        continue;
-
-                ptr = cfs_kmap(desc->bd_iov[i].kiov_page);
-                off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
-                ptr[off] ^= 0x1;
-                cfs_kunmap(desc->bd_iov[i].kiov_page);
-                return;
-        }
-}
-#else
-static void corrupt_bulk_data(struct ptlrpc_bulk_desc *desc)
-{
-}
-#endif /* __KERNEL__ */
-
-int bulk_csum_svc(struct ptlrpc_bulk_desc *desc, int read,
-                  struct ptlrpc_bulk_sec_desc *bsdv, int vsize,
-                  struct ptlrpc_bulk_sec_desc *bsdr, int rsize)
-{
-        int    rc;
-
-        LASSERT(vsize >= sizeof(*bsdv));
-        LASSERT(rsize >= sizeof(*bsdr));
-        LASSERT(bsdv && bsdr);
-
-        if (read) {
-                rc = generate_bulk_csum(desc, bsdv->bsd_hash_alg, bsdr, rsize);
-                if (rc)
-                        CERROR("bulk read: server failed to generate %s "
-                               "checksum: %d\n",
-                               hash_types[bsdv->bsd_hash_alg].sht_name, rc);
-
-                /* corrupt the data after we compute the checksum, to
-                 * simulate an OST->client data error */
-                if (rc == 0 && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
-                        corrupt_bulk_data(desc);
-        } else {
-                rc = verify_bulk_csum(desc, 0, bsdv, vsize, bsdr, rsize);
-        }
-
-        return rc;
-}
-EXPORT_SYMBOL(bulk_csum_svc);
-
-/****************************************
- * Helpers to assist policy modules to  *
- * implement encryption funcationality  *
- ****************************************/
-
-/* FIXME */
-#ifndef __KERNEL__
-#define CRYPTO_TFM_MODE_ECB     (0)
-#define CRYPTO_TFM_MODE_CBC     (1)
-#endif
-
-static struct sptlrpc_ciph_type cipher_types[] = {
-        [BULK_CIPH_ALG_NULL]    = {
-                "null",         "null",       0,                   0,  0
-        },
-        [BULK_CIPH_ALG_ARC4]    = {
-                "arc4",         "ecb(arc4)",       0, 0,  16
-        },
-        [BULK_CIPH_ALG_AES128]  = {
-                "aes128",       "cbc(aes)",        0, 16, 16
-        },
-        [BULK_CIPH_ALG_AES192]  = {
-                "aes192",       "cbc(aes)",        0, 16, 24
-        },
-        [BULK_CIPH_ALG_AES256]  = {
-                "aes256",       "cbc(aes)",        0, 16, 32
-        },
-        [BULK_CIPH_ALG_CAST128] = {
-                "cast128",      "cbc(cast5)",      0, 8,  16
-        },
-        [BULK_CIPH_ALG_CAST256] = {
-                "cast256",      "cbc(cast6)",      0, 16, 32
-        },
-        [BULK_CIPH_ALG_TWOFISH128] = {
-                "twofish128",   "cbc(twofish)",    0, 16, 16
-        },
-        [BULK_CIPH_ALG_TWOFISH256] = {
-                "twofish256",   "cbc(twofish)",    0, 16, 32
-        },
-};
-
-const struct sptlrpc_ciph_type *sptlrpc_get_ciph_type(__u8 ciph_alg)
-{
-        struct sptlrpc_ciph_type *ct;
-
-        if (ciph_alg < BULK_CIPH_ALG_MAX) {
-                ct = &cipher_types[ciph_alg];
-                if (ct->sct_tfm_name)
-                        return ct;
-        }
-        return NULL;
-}
-EXPORT_SYMBOL(sptlrpc_get_ciph_type);
-
-const char *sptlrpc_get_ciph_name(__u8 ciph_alg)
-{
-        const struct sptlrpc_ciph_type *ct;
-
-        ct = sptlrpc_get_ciph_type(ciph_alg);
-        if (ct)
-                return ct->sct_name;
-        else
-                return "unknown";
-}
-EXPORT_SYMBOL(sptlrpc_get_ciph_name);
index b54a3a4..e9fe66f 100644 (file)
@@ -102,222 +102,67 @@ EXPORT_SYMBOL(sptlrpc_target_sec_part);
  * user supplied flavor string parsing  *
  ****************************************/
 
-#ifdef HAVE_ADLER
-#define BULK_HASH_ALG_DEFAULT   BULK_HASH_ALG_ADLER32
-#else
-#define BULK_HASH_ALG_DEFAULT   BULK_HASH_ALG_CRC32
-#endif
-
-typedef enum {
-        BULK_TYPE_N = 0,
-        BULK_TYPE_I = 1,
-        BULK_TYPE_P = 2
-} bulk_type_t;
-
-static void get_default_flavor(struct sptlrpc_flavor *sf)
-{
-        sf->sf_rpc = SPTLRPC_FLVR_NULL;
-        sf->sf_bulk_ciph = BULK_CIPH_ALG_NULL;
-        sf->sf_bulk_hash = BULK_HASH_ALG_NULL;
-        sf->sf_flags = 0;
-}
-
-static void get_flavor_by_rpc(struct sptlrpc_flavor *flvr, __u16 rpc_flavor)
-{
-        get_default_flavor(flvr);
-
-        flvr->sf_rpc = rpc_flavor;
-
-        switch (rpc_flavor) {
-        case SPTLRPC_FLVR_NULL:
-                break;
-        case SPTLRPC_FLVR_PLAIN:
-        case SPTLRPC_FLVR_KRB5N:
-        case SPTLRPC_FLVR_KRB5A:
-                flvr->sf_bulk_hash = BULK_HASH_ALG_DEFAULT;
-                break;
-        case SPTLRPC_FLVR_KRB5P:
-                flvr->sf_bulk_ciph = BULK_CIPH_ALG_AES128;
-                /* fall through */
-        case SPTLRPC_FLVR_KRB5I:
-                flvr->sf_bulk_hash = BULK_HASH_ALG_SHA1;
-                break;
-        default:
-                LBUG();
-        }
-}
-
-static void get_flavor_by_bulk(struct sptlrpc_flavor *flvr,
-                               __u16 rpc_flavor, bulk_type_t bulk_type)
-{
-        switch (bulk_type) {
-        case BULK_TYPE_N:
-                flvr->sf_bulk_hash = BULK_HASH_ALG_NULL;
-                flvr->sf_bulk_ciph = BULK_CIPH_ALG_NULL;
-                break;
-        case BULK_TYPE_I:
-                switch (rpc_flavor) {
-                case SPTLRPC_FLVR_PLAIN:
-                case SPTLRPC_FLVR_KRB5N:
-                case SPTLRPC_FLVR_KRB5A:
-                        flvr->sf_bulk_hash = BULK_HASH_ALG_DEFAULT;
-                        break;
-                case SPTLRPC_FLVR_KRB5I:
-                case SPTLRPC_FLVR_KRB5P:
-                        flvr->sf_bulk_hash = BULK_HASH_ALG_SHA1;
-                        break;
-                default:
-                        LBUG();
-                }
-                flvr->sf_bulk_ciph = BULK_CIPH_ALG_NULL;
-                break;
-        case BULK_TYPE_P:
-                flvr->sf_bulk_hash = BULK_HASH_ALG_SHA1;
-                flvr->sf_bulk_ciph = BULK_CIPH_ALG_AES128;
-                break;
-        default:
-                LBUG();
-        }
-}
-
-static __u16 __flavors[] = {
-        SPTLRPC_FLVR_NULL,
-        SPTLRPC_FLVR_PLAIN,
-        SPTLRPC_FLVR_KRB5N,
-        SPTLRPC_FLVR_KRB5A,
-        SPTLRPC_FLVR_KRB5I,
-        SPTLRPC_FLVR_KRB5P,
-};
-
-#define __nflavors      ARRAY_SIZE(__flavors)
-
 /*
- * flavor string format: rpc[-bulk{n|i|p}[:cksum/enc]]
- * for examples:
- *  null
- *  plain-bulki
- *  krb5p-bulkn
- *  krb5i-bulkp
- *  krb5i-bulkp:sha512/arc4
+ * format: <base_flavor>[-<bulk_type:alg_spec>]
  */
 int sptlrpc_parse_flavor(const char *str, struct sptlrpc_flavor *flvr)
 {
-        const char     *f;
-        char           *bulk, *alg, *enc;
-        char            buf[64];
-        bulk_type_t     bulk_type;
-        __u8            i;
-        ENTRY;
+        char            buf[32];
+        char           *bulk, *alg;
+
+        memset(flvr, 0, sizeof(*flvr));
 
         if (str == NULL || str[0] == '\0') {
                 flvr->sf_rpc = SPTLRPC_FLVR_INVALID;
-                goto out;
+                return 0;
         }
 
-        for (i = 0; i < __nflavors; i++) {
-                f = sptlrpc_rpcflavor2name(__flavors[i]);
-                if (strncmp(str, f, strlen(f)) == 0)
-                        break;
-        }
-
-        if (i >= __nflavors)
-                GOTO(invalid, -EINVAL);
+        strncpy(buf, str, sizeof(buf));
+        buf[sizeof(buf) - 1] = '\0';
 
-        /* prepare local buffer thus we can modify it as we want */
-        strncpy(buf, str, 64);
-        buf[64 - 1] = '\0';
-
-        /* find bulk string */
         bulk = strchr(buf, '-');
         if (bulk)
                 *bulk++ = '\0';
 
-        /* now the first part must equal to rpc flavor name */
-        if (strcmp(buf, f) != 0)
-                GOTO(invalid, -EINVAL);
-
-        get_flavor_by_rpc(flvr, __flavors[i]);
-
-        if (bulk == NULL)
-                goto out;
-
-        /* find bulk algorithm string */
-        alg = strchr(bulk, ':');
-        if (alg)
-                *alg++ = '\0';
-
-        /* verify bulk section */
-        if (strcmp(bulk, "bulkn") == 0) {
-                flvr->sf_bulk_hash = BULK_HASH_ALG_NULL;
-                flvr->sf_bulk_ciph = BULK_CIPH_ALG_NULL;
-                bulk_type = BULK_TYPE_N;
-        } else if (strcmp(bulk, "bulki") == 0)
-                bulk_type = BULK_TYPE_I;
-        else if (strcmp(bulk, "bulkp") == 0)
-                bulk_type = BULK_TYPE_P;
-        else
-                GOTO(invalid, -EINVAL);
-
-        /* null flavor don't support bulk i/p */
-        if (__flavors[i] == SPTLRPC_FLVR_NULL && bulk_type != BULK_TYPE_N)
-                GOTO(invalid, -EINVAL);
-
-        /* plain policy dosen't support bulk p */
-        if (__flavors[i] == SPTLRPC_FLVR_PLAIN && bulk_type == BULK_TYPE_P)
-                GOTO(invalid, -EINVAL);
-
-        get_flavor_by_bulk(flvr, __flavors[i], bulk_type);
-
-        if (alg == NULL)
-                goto out;
-
-        /* find encryption algorithm string */
-        enc = strchr(alg, '/');
-        if (enc)
-                *enc++ = '\0';
-
-        /* checksum algorithm */
-        for (i = 0; i < BULK_HASH_ALG_MAX; i++) {
-                if (strcmp(alg, sptlrpc_get_hash_name(i)) == 0) {
-                        flvr->sf_bulk_hash = i;
-                        break;
-                }
-        }
-        if (i >= BULK_HASH_ALG_MAX)
-                GOTO(invalid, -EINVAL);
-
-        /* privacy algorithm */
-        if (enc) {
-                for (i = 0; i < BULK_CIPH_ALG_MAX; i++) {
-                        if (strcmp(enc, sptlrpc_get_ciph_name(i)) == 0) {
-                                flvr->sf_bulk_ciph = i;
-                                break;
-                        }
-                }
-                if (i >= BULK_CIPH_ALG_MAX)
-                        GOTO(invalid, -EINVAL);
-        }
+        flvr->sf_rpc = sptlrpc_name2flavor_base(buf);
+        if (flvr->sf_rpc == SPTLRPC_FLVR_INVALID)
+                goto err_out;
 
         /*
-         * bulk combination sanity checks
+         * currently only base flavor "plain" can have bulk specification.
          */
-        if (bulk_type == BULK_TYPE_P &&
-            flvr->sf_bulk_ciph == BULK_CIPH_ALG_NULL)
-                GOTO(invalid, -EINVAL);
-
-        if (bulk_type == BULK_TYPE_I &&
-            (flvr->sf_bulk_hash == BULK_HASH_ALG_NULL ||
-             flvr->sf_bulk_ciph != BULK_CIPH_ALG_NULL))
-                GOTO(invalid, -EINVAL);
+        if (flvr->sf_rpc == SPTLRPC_FLVR_PLAIN) {
+                flvr->u_bulk.hash.hash_alg = BULK_HASH_ALG_ADLER32;
+                if (bulk) {
+                        /*
+                         * format: plain-hash:<hash_alg>
+                         */
+                        alg = strchr(bulk, ':');
+                        if (alg == NULL)
+                                goto err_out;
+                        *alg++ = '\0';
+
+                        if (strcmp(bulk, "hash"))
+                                goto err_out;
+
+                        flvr->u_bulk.hash.hash_alg = sptlrpc_get_hash_alg(alg);
+                        if (flvr->u_bulk.hash.hash_alg >= BULK_HASH_ALG_MAX)
+                                goto err_out;
+                }
 
-        if (bulk_type == BULK_TYPE_N &&
-            (flvr->sf_bulk_hash != BULK_HASH_ALG_NULL ||
-             flvr->sf_bulk_ciph != BULK_CIPH_ALG_NULL))
-                GOTO(invalid, -EINVAL);
+                if (flvr->u_bulk.hash.hash_alg == BULK_HASH_ALG_NULL)
+                        flvr_set_bulk_svc(&flvr->sf_rpc, SPTLRPC_BULK_SVC_NULL);
+                else
+                        flvr_set_bulk_svc(&flvr->sf_rpc, SPTLRPC_BULK_SVC_INTG);
+        } else {
+                if (bulk)
+                        goto err_out;
+        }
 
-out:
+        flvr->sf_flags = 0;
         return 0;
-invalid:
+
+err_out:
         CERROR("invalid flavor string: %s\n", str);
         return -EINVAL;
 }
@@ -327,6 +172,14 @@ EXPORT_SYMBOL(sptlrpc_parse_flavor);
  * configure rules                      *
  ****************************************/
 
+static void get_default_flavor(struct sptlrpc_flavor *sf)
+{
+        memset(sf, 0, sizeof(*sf));
+
+        sf->sf_rpc = SPTLRPC_FLVR_NULL;
+        sf->sf_flags = 0;
+}
+
 static void sptlrpc_rule_init(struct sptlrpc_rule *rule)
 {
         rule->sr_netid = LNET_NIDNET(LNET_NID_ANY);
@@ -411,19 +264,17 @@ EXPORT_SYMBOL(sptlrpc_rule_set_free);
 
 /*
  * return 0 if the rule set could accomodate one more rule.
- * if @expand != 0, the rule set might be expanded.
  */
-int sptlrpc_rule_set_expand(struct sptlrpc_rule_set *rset, int expand)
+int sptlrpc_rule_set_expand(struct sptlrpc_rule_set *rset)
 {
         struct sptlrpc_rule *rules;
         int nslot;
 
+        might_sleep();
+
         if (rset->srs_nrule < rset->srs_nslot)
                 return 0; 
 
-        if (expand == 0)
-                return -E2BIG;
-
         nslot = rset->srs_nslot + 8;
 
         /* better use realloc() if available */
@@ -468,16 +319,17 @@ static inline int rule_match_net(struct sptlrpc_rule *r1,
 
 /*
  * merge @rule into @rset.
- * if @expand != 0 then @rset slots might be expanded.
+ * the @rset slots might be expanded.
  */
 int sptlrpc_rule_set_merge(struct sptlrpc_rule_set *rset, 
-                           struct sptlrpc_rule *rule,
-                           int expand)
+                           struct sptlrpc_rule *rule)
 {
         struct sptlrpc_rule      *p = rset->srs_rules;
         int                       spec_dir, spec_net;
         int                       rc, n, match = 0;
 
+        might_sleep();
+
         spec_net = rule_spec_net(rule);
         spec_dir = rule_spec_dir(rule);
 
@@ -537,7 +389,7 @@ int sptlrpc_rule_set_merge(struct sptlrpc_rule_set *rset,
                 LASSERT(n >= 0 && n <= rset->srs_nrule);
 
                 if (rule->sr_flvr.sf_rpc != SPTLRPC_FLVR_INVALID) {
-                        rc = sptlrpc_rule_set_expand(rset, expand);
+                        rc = sptlrpc_rule_set_expand(rset);
                         if (rc)
                                 return rc;
 
@@ -616,6 +468,8 @@ static int sptlrpc_rule_set_extract(struct sptlrpc_rule_set *gen,
         struct sptlrpc_rule     *rule;
         int                      i, n, rc;
 
+        might_sleep();
+
         /* merge general rules firstly, then target-specific rules */
         for (i = 0; i < 2; i++) {
                 if (src[i] == NULL)
@@ -633,7 +487,7 @@ static int sptlrpc_rule_set_extract(struct sptlrpc_rule_set *gen,
                             rule->sr_to != to)
                                 continue;
 
-                        rc = sptlrpc_rule_set_merge(rset, rule, 1);
+                        rc = sptlrpc_rule_set_merge(rset, rule);
                         if (rc) {
                                 CERROR("can't merge: %d\n", rc);
                                 return rc;
@@ -800,7 +654,7 @@ static int sptlrpc_conf_merge_rule(struct sptlrpc_conf *conf,
                 }
         }
 
-        return sptlrpc_rule_set_merge(rule_set, rule, 1);
+        return sptlrpc_rule_set_merge(rule_set, rule);
 }
 
 /**
@@ -829,7 +683,7 @@ static int __sptlrpc_process_config(struct lustre_cfg *lcfg,
                 RETURN(-EINVAL);
         }
 
-        CDEBUG(D_SEC, "got one rule: %s.%s\n", target, param);
+        CDEBUG(D_SEC, "processing rule: %s.%s\n", target, param);
 
         /* parse rule to make sure the format is correct */
         if (strncmp(param, PARAM_SRPC_FLVR, sizeof(PARAM_SRPC_FLVR) - 1) != 0) {
@@ -974,6 +828,13 @@ static void inline flavor_set_flags(struct sptlrpc_flavor *sf,
                                     enum lustre_sec_part to,
                                     unsigned int fl_udesc)
 {
+        /*
+         * null flavor doesn't need to set any flavor, and in fact
+         * we'd better not do that because everybody share a single sec.
+         */
+        if (sf->sf_rpc == SPTLRPC_FLVR_NULL)
+                return;
+
         if (from == LUSTRE_SP_MDT) {
                 /* MDT->MDT; MDT->OST */
                 sf->sf_flags |= PTLRPC_SEC_FL_ROOTONLY;
index 51bace7..5a6fae9 100644 (file)
@@ -66,7 +66,7 @@
 struct proc_dir_entry *sptlrpc_proc_root = NULL;
 EXPORT_SYMBOL(sptlrpc_proc_root);
 
-void sec_flags2str(unsigned long flags, char *buf, int bufsize)
+char *sec_flags2str(unsigned long flags, char *buf, int bufsize)
 {
         buf[0] = '\0';
 
@@ -82,7 +82,7 @@ void sec_flags2str(unsigned long flags, char *buf, int bufsize)
                 strncat(buf, "-,", bufsize);
 
         buf[strlen(buf) - 1] = '\0';
-
+        return buf;
 }
 
 static int sptlrpc_info_lprocfs_seq_show(struct seq_file *seq, void *v)
@@ -90,7 +90,7 @@ static int sptlrpc_info_lprocfs_seq_show(struct seq_file *seq, void *v)
         struct obd_device *dev = seq->private;
         struct client_obd *cli = &dev->u.cli;
         struct ptlrpc_sec *sec = NULL;
-        char               flags_str[32];
+        char               str[32];
 
         LASSERT(strcmp(dev->obd_type->typ_name, LUSTRE_OSC_NAME) == 0 ||
                 strcmp(dev->obd_type->typ_name, LUSTRE_MDC_NAME) == 0 ||
@@ -101,14 +101,14 @@ static int sptlrpc_info_lprocfs_seq_show(struct seq_file *seq, void *v)
         if (sec == NULL)
                 goto out;
 
-        sec_flags2str(sec->ps_flvr.sf_flags, flags_str, sizeof(flags_str));
+        sec_flags2str(sec->ps_flvr.sf_flags, str, sizeof(str));
 
         seq_printf(seq, "rpc flavor:    %s\n",
-                   sptlrpc_rpcflavor2name(sec->ps_flvr.sf_rpc));
-        seq_printf(seq, "bulk flavor:   %s/%s\n",
-                   sptlrpc_get_hash_name(sec->ps_flvr.sf_bulk_hash),
-                   sptlrpc_get_ciph_name(sec->ps_flvr.sf_bulk_ciph));
-        seq_printf(seq, "flags:         %s\n", flags_str);
+                   sptlrpc_flavor2name_base(sec->ps_flvr.sf_rpc));
+        seq_printf(seq, "bulk flavor:   %s\n",
+                   sptlrpc_flavor2name_bulk(&sec->ps_flvr, str, sizeof(str)));
+        seq_printf(seq, "flags:         %s\n",
+                   sec_flags2str(sec->ps_flvr.sf_flags, str, sizeof(str)));
         seq_printf(seq, "id:            %d\n", sec->ps_id);
         seq_printf(seq, "refcount:      %d\n", atomic_read(&sec->ps_refcount));
         seq_printf(seq, "nctx:          %d\n", atomic_read(&sec->ps_nctx));
index 7b4368d..08baf12 100644 (file)
@@ -59,13 +59,13 @@ static struct ptlrpc_cli_ctx    null_cli_ctx;
 static struct ptlrpc_svc_ctx    null_svc_ctx;
 
 /*
- * null sec temporarily use the third byte of lm_secflvr to identify
+ * we can temporarily use the topmost 8-bits of lm_secflvr to identify
  * the source sec part.
  */
 static inline
 void null_encode_sec_part(struct lustre_msg *msg, enum lustre_sec_part sp)
 {
-        msg->lm_secflvr |= (((__u32) sp) & 0xFF) << 16;
+        msg->lm_secflvr |= (((__u32) sp) & 0xFF) << 24;
 }
 
 static inline
@@ -73,9 +73,9 @@ enum lustre_sec_part null_decode_sec_part(struct lustre_msg *msg)
 {
         switch (msg->lm_magic) {
         case LUSTRE_MSG_MAGIC_V2:
-                return (msg->lm_secflvr >> 16) & 0xFF;
+                return (msg->lm_secflvr >> 24) & 0xFF;
         case LUSTRE_MSG_MAGIC_V2_SWABBED:
-                return (msg->lm_secflvr >> 8) & 0xFF;
+                return (msg->lm_secflvr) & 0xFF;
         default:
                 return LUSTRE_SP_ANY;
         }
@@ -135,14 +135,7 @@ struct ptlrpc_sec *null_create_sec(struct obd_import *imp,
                                    struct ptlrpc_svc_ctx *svc_ctx,
                                    struct sptlrpc_flavor *sf)
 {
-        LASSERT(RPC_FLVR_POLICY(sf->sf_rpc) == SPTLRPC_POLICY_NULL);
-
-        if (sf->sf_bulk_ciph != BULK_CIPH_ALG_NULL ||
-            sf->sf_bulk_hash != BULK_HASH_ALG_NULL) {
-                CERROR("null sec don't support bulk algorithm: %u/%u\n",
-                       sf->sf_bulk_ciph, sf->sf_bulk_hash);
-                return NULL;
-        }
+        LASSERT(SPTLRPC_FLVR_POLICY(sf->sf_rpc) == SPTLRPC_POLICY_NULL);
 
         /* general layer has take a module reference for us, because we never
          * really destroy the sec, simply release the reference here.
@@ -300,7 +293,8 @@ static struct ptlrpc_svc_ctx null_svc_ctx = {
 static
 int null_accept(struct ptlrpc_request *req)
 {
-        LASSERT(RPC_FLVR_POLICY(req->rq_flvr.sf_rpc) == SPTLRPC_POLICY_NULL);
+        LASSERT(SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) ==
+                SPTLRPC_POLICY_NULL);
 
         if (req->rq_flvr.sf_rpc != SPTLRPC_FLVR_NULL) {
                 CERROR("Invalid rpc flavor 0x%x\n", req->rq_flvr.sf_rpc);
@@ -428,8 +422,6 @@ static void null_init_internal(void)
         null_sec.ps_id = -1;
         null_sec.ps_import = NULL;
         null_sec.ps_flvr.sf_rpc = SPTLRPC_FLVR_NULL;
-        null_sec.ps_flvr.sf_bulk_ciph = BULK_CIPH_ALG_NULL;
-        null_sec.ps_flvr.sf_bulk_hash = BULK_HASH_ALG_NULL;
         null_sec.ps_flvr.sf_flags = 0;
         null_sec.ps_part = LUSTRE_SP_ANY;
         null_sec.ps_dying = 0;
index eb9ee82..9b03d77 100644 (file)
@@ -71,44 +71,124 @@ static struct ptlrpc_svc_ctx    plain_svc_ctx;
 static unsigned int plain_at_offset;
 
 /*
- * flavor flags (maximum 8 flags)
+ * for simplicity, plain policy rpc use fixed layout.
  */
-#define PLAIN_WFLVR_FLAGS_OFFSET        (12)
-#define PLAIN_WFLVR_FLAG_BULK           (1 << (0 + PLAIN_WFLVR_FLAGS_OFFSET))
-#define PLAIN_WFLVR_FLAG_USER           (1 << (1 + PLAIN_WFLVR_FLAGS_OFFSET))
+#define PLAIN_PACK_SEGMENTS             (4)
+
+#define PLAIN_PACK_HDR_OFF              (0)
+#define PLAIN_PACK_MSG_OFF              (1)
+#define PLAIN_PACK_USER_OFF             (2)
+#define PLAIN_PACK_BULK_OFF             (3)
+
+#define PLAIN_FL_USER                   (0x01)
+#define PLAIN_FL_BULK                   (0x02)
+
+struct plain_header {
+        __u8            ph_ver;            /* 0 */
+        __u8            ph_flags;
+        __u8            ph_sp;             /* source */
+        __u8            ph_bulk_hash_alg;  /* complete flavor desc */
+        __u8            ph_pad[4];
+};
 
-#define PLAIN_WFLVR_HAS_BULK(wflvr)      \
-        (((wflvr) & PLAIN_WFLVR_FLAG_BULK) != 0)
-#define PLAIN_WFLVR_HAS_USER(wflvr)      \
-        (((wflvr) & PLAIN_WFLVR_FLAG_USER) != 0)
+struct plain_bulk_token {
+        __u8            pbt_hash[8];
+};
 
-#define PLAIN_WFLVR_TO_RPC(wflvr)       \
-        ((wflvr) & ((1 << PLAIN_WFLVR_FLAGS_OFFSET) - 1))
+#define PLAIN_BSD_SIZE \
+        (sizeof(struct ptlrpc_bulk_sec_desc) + sizeof(struct plain_bulk_token))
 
-/*
- * similar to null sec, temporarily use the third byte of lm_secflvr to identify
- * the source sec part.
- */
-static inline
-void plain_encode_sec_part(struct lustre_msg *msg, enum lustre_sec_part sp)
+/****************************************
+ * bulk checksum helpers                *
+ ****************************************/
+
+static int plain_unpack_bsd(struct lustre_msg *msg)
 {
-        msg->lm_secflvr |= (((__u32) sp) & 0xFF) << 16;
+        struct ptlrpc_bulk_sec_desc *bsd;
+
+        if (bulk_sec_desc_unpack(msg, PLAIN_PACK_BULK_OFF))
+                return -EPROTO;
+
+        bsd = lustre_msg_buf(msg, PLAIN_PACK_BULK_OFF, PLAIN_BSD_SIZE);
+        if (bsd == NULL) {
+                CERROR("bulk sec desc has short size %d\n",
+                       lustre_msg_buflen(msg, PLAIN_PACK_BULK_OFF));
+                return -EPROTO;
+        }
+
+        if (bsd->bsd_svc != SPTLRPC_BULK_SVC_NULL &&
+            bsd->bsd_svc != SPTLRPC_BULK_SVC_INTG) {
+                CERROR("invalid bulk svc %u\n", bsd->bsd_svc);
+                return -EPROTO;
+        }
+
+        return 0;
 }
 
-static inline
-enum lustre_sec_part plain_decode_sec_part(struct lustre_msg *msg)
+static int plain_generate_bulk_csum(struct ptlrpc_bulk_desc *desc,
+                                    __u8 hash_alg,
+                                    struct plain_bulk_token *token)
 {
-        return (msg->lm_secflvr >> 16) & 0xFF;
+        if (hash_alg == BULK_HASH_ALG_NULL)
+                return 0;
+
+        memset(token->pbt_hash, 0, sizeof(token->pbt_hash));
+        return sptlrpc_get_bulk_checksum(desc, hash_alg, token->pbt_hash,
+                                         sizeof(token->pbt_hash));
 }
 
-/*
- * for simplicity, plain policy rpc use fixed layout.
- */
-#define PLAIN_PACK_SEGMENTS             (3)
+static int plain_verify_bulk_csum(struct ptlrpc_bulk_desc *desc,
+                                  __u8 hash_alg,
+                                  struct plain_bulk_token *tokenr)
+{
+        struct plain_bulk_token tokenv;
+        int                     rc;
+
+        if (hash_alg == BULK_HASH_ALG_NULL)
+                return 0;
 
-#define PLAIN_PACK_MSG_OFF              (0)
-#define PLAIN_PACK_USER_OFF             (1)
-#define PLAIN_PACK_BULK_OFF             (2)
+        memset(&tokenv.pbt_hash, 0, sizeof(tokenv.pbt_hash));
+        rc = sptlrpc_get_bulk_checksum(desc, hash_alg, tokenv.pbt_hash,
+                                       sizeof(tokenv.pbt_hash));
+        if (rc)
+                return rc;
+
+        if (memcmp(tokenr->pbt_hash, tokenv.pbt_hash, sizeof(tokenr->pbt_hash)))
+                return -EACCES;
+        return 0;
+}
+
+#ifdef __KERNEL__
+static void corrupt_bulk_data(struct ptlrpc_bulk_desc *desc)
+{
+        char           *ptr;
+        unsigned int    off, i;
+
+        for (i = 0; i < desc->bd_iov_count; i++) {
+                if (desc->bd_iov[i].kiov_len == 0)
+                        continue;
+
+                ptr = cfs_kmap(desc->bd_iov[i].kiov_page);
+                off = desc->bd_iov[i].kiov_offset & ~CFS_PAGE_MASK;
+                ptr[off] ^= 0x1;
+                cfs_kunmap(desc->bd_iov[i].kiov_page);
+                return;
+        }
+}
+#else
+static void corrupt_bulk_data(struct ptlrpc_bulk_desc *desc)
+{
+        unsigned int    i;
+
+        for (i = 0; i < desc->bd_iov_count; i++) {
+                if (desc->bd_iov[i].iov_len == 0)
+                        continue;
+
+                ((char *)desc->bd_iov[i].iov_base)[i] ^= 0x1;
+                return;
+        }
+}
+#endif /* __KERNEL__ */
 
 /****************************************
  * cli_ctx apis                         *
@@ -131,16 +211,22 @@ int plain_ctx_validate(struct ptlrpc_cli_ctx *ctx)
 static
 int plain_ctx_sign(struct ptlrpc_cli_ctx *ctx, struct ptlrpc_request *req)
 {
-        struct lustre_msg_v2 *msg = req->rq_reqbuf;
+        struct lustre_msg   *msg = req->rq_reqbuf;
+        struct plain_header *phdr;
         ENTRY;
 
         msg->lm_secflvr = req->rq_flvr.sf_rpc;
-        if (req->rq_pack_bulk)
-                msg->lm_secflvr |= PLAIN_WFLVR_FLAG_BULK;
-        if (req->rq_pack_udesc)
-                msg->lm_secflvr |= PLAIN_WFLVR_FLAG_USER;
 
-        plain_encode_sec_part(msg, ctx->cc_sec->ps_part);
+        phdr = lustre_msg_buf(msg, PLAIN_PACK_HDR_OFF, 0);
+        phdr->ph_ver = 0;
+        phdr->ph_flags = 0;
+        phdr->ph_sp = ctx->cc_sec->ps_part;
+        phdr->ph_bulk_hash_alg = req->rq_flvr.u_bulk.hash.hash_alg;
+
+        if (req->rq_pack_udesc)
+                phdr->ph_flags |= PLAIN_FL_USER;
+        if (req->rq_pack_bulk)
+                phdr->ph_flags |= PLAIN_FL_BULK;
 
         req->rq_reqdata_len = lustre_msg_size_v2(msg->lm_bufcount,
                                                  msg->lm_buflens);
@@ -150,8 +236,9 @@ int plain_ctx_sign(struct ptlrpc_cli_ctx *ctx, struct ptlrpc_request *req)
 static
 int plain_ctx_verify(struct ptlrpc_cli_ctx *ctx, struct ptlrpc_request *req)
 {
-        struct lustre_msg *msg = req->rq_repdata;
-        __u32              cksum;
+        struct lustre_msg   *msg = req->rq_repdata;
+        struct plain_header *phdr;
+        __u32                cksum;
         ENTRY;
 
         if (msg->lm_bufcount != PLAIN_PACK_SEGMENTS) {
@@ -159,12 +246,29 @@ int plain_ctx_verify(struct ptlrpc_cli_ctx *ctx, struct ptlrpc_request *req)
                 RETURN(-EPROTO);
         }
 
+        phdr = lustre_msg_buf(msg, PLAIN_PACK_HDR_OFF, sizeof(*phdr));
+        if (phdr == NULL) {
+                CERROR("missing plain header\n");
+                RETURN(-EPROTO);
+        }
+
+        if (phdr->ph_ver != 0) {
+                CERROR("Invalid header version\n");
+                RETURN(-EPROTO);
+        }
+
         /* expect no user desc in reply */
-        if (PLAIN_WFLVR_HAS_USER(msg->lm_secflvr)) {
+        if (phdr->ph_flags & PLAIN_FL_USER) {
                 CERROR("Unexpected udesc flag in reply\n");
                 RETURN(-EPROTO);
         }
 
+        if (phdr->ph_bulk_hash_alg != req->rq_flvr.u_bulk.hash.hash_alg) {
+                CERROR("reply bulk flavor %u != %u\n", phdr->ph_bulk_hash_alg,
+                       req->rq_flvr.u_bulk.hash.hash_alg);
+                RETURN(-EPROTO);
+        }
+
         if (unlikely(req->rq_early)) {
                 cksum = crc32_le(!(__u32) 0,
                                  lustre_msg_buf(msg, PLAIN_PACK_MSG_OFF, 0),
@@ -179,16 +283,15 @@ int plain_ctx_verify(struct ptlrpc_cli_ctx *ctx, struct ptlrpc_request *req)
                  * in reply, except for early reply */
                 if (!req->rq_early &&
                     !equi(req->rq_pack_bulk == 1,
-                          PLAIN_WFLVR_HAS_BULK(msg->lm_secflvr))) {
+                          phdr->ph_flags & PLAIN_FL_BULK)) {
                         CERROR("%s bulk checksum in reply\n",
                                req->rq_pack_bulk ? "Missing" : "Unexpected");
                         RETURN(-EPROTO);
                 }
 
-                if (PLAIN_WFLVR_HAS_BULK(msg->lm_secflvr) &&
-                    bulk_sec_desc_unpack(msg, PLAIN_PACK_BULK_OFF)) {
-                        CERROR("Mal-formed bulk checksum reply\n");
-                        RETURN(-EINVAL);
+                if (phdr->ph_flags & PLAIN_FL_BULK) {
+                        if (plain_unpack_bsd(msg))
+                                RETURN(-EPROTO);
                 }
         }
 
@@ -202,13 +305,42 @@ int plain_cli_wrap_bulk(struct ptlrpc_cli_ctx *ctx,
                         struct ptlrpc_request *req,
                         struct ptlrpc_bulk_desc *desc)
 {
+        struct ptlrpc_bulk_sec_desc *bsd;
+        struct plain_bulk_token     *token;
+        int                          rc;
+
         LASSERT(req->rq_pack_bulk);
         LASSERT(req->rq_reqbuf->lm_bufcount == PLAIN_PACK_SEGMENTS);
 
-        return bulk_csum_cli_request(desc, req->rq_bulk_read,
-                                     req->rq_flvr.sf_bulk_hash,
-                                     req->rq_reqbuf,
-                                     PLAIN_PACK_BULK_OFF);
+        bsd = lustre_msg_buf(req->rq_reqbuf, PLAIN_PACK_BULK_OFF, 0);
+        token = (struct plain_bulk_token *) bsd->bsd_data;
+
+        bsd->bsd_version = 0;
+        bsd->bsd_flags = 0;
+        bsd->bsd_type = SPTLRPC_BULK_DEFAULT;
+        bsd->bsd_svc = SPTLRPC_FLVR_BULK_SVC(req->rq_flvr.sf_rpc);
+
+        if (bsd->bsd_svc == SPTLRPC_BULK_SVC_NULL)
+                RETURN(0);
+
+        if (req->rq_bulk_read)
+                RETURN(0);
+
+        rc = plain_generate_bulk_csum(desc, req->rq_flvr.u_bulk.hash.hash_alg,
+                                      token);
+        if (rc) {
+                CERROR("bulk write: failed to compute checksum: %d\n", rc);
+        } else {
+                /*
+                 * for sending we only compute the wrong checksum instead
+                 * of corrupting the data so it is still correct on a redo
+                 */
+                if (OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND) &&
+                    req->rq_flvr.u_bulk.hash.hash_alg != BULK_HASH_ALG_NULL)
+                        token->pbt_hash[0] ^= 0x1;
+        }
+
+        return rc;
 }
 
 static
@@ -216,13 +348,45 @@ int plain_cli_unwrap_bulk(struct ptlrpc_cli_ctx *ctx,
                           struct ptlrpc_request *req,
                           struct ptlrpc_bulk_desc *desc)
 {
+        struct ptlrpc_bulk_sec_desc *bsdr, *bsdv;
+        struct plain_bulk_token     *tokenr, *tokenv;
+        int                          rc;
+#ifdef __KERNEL__
+        int                          i, nob;
+#endif
+
         LASSERT(req->rq_pack_bulk);
         LASSERT(req->rq_reqbuf->lm_bufcount == PLAIN_PACK_SEGMENTS);
         LASSERT(req->rq_repdata->lm_bufcount == PLAIN_PACK_SEGMENTS);
 
-        return bulk_csum_cli_reply(desc, req->rq_bulk_read,
-                                   req->rq_reqbuf, PLAIN_PACK_BULK_OFF,
-                                   req->rq_repdata, PLAIN_PACK_BULK_OFF);
+        bsdr = lustre_msg_buf(req->rq_reqbuf, PLAIN_PACK_BULK_OFF, 0);
+        tokenr = (struct plain_bulk_token *) bsdr->bsd_data;
+        bsdv = lustre_msg_buf(req->rq_repdata, PLAIN_PACK_BULK_OFF, 0);
+        tokenv = (struct plain_bulk_token *) bsdv->bsd_data;
+
+        if (req->rq_bulk_write) {
+                if (bsdv->bsd_flags & BSD_FL_ERR)
+                        return -EIO;
+                return 0;
+        }
+
+#ifdef __KERNEL__
+        /* fix the actual data size */
+        for (i = 0, nob = 0; i < desc->bd_iov_count; i++) {
+                if (desc->bd_iov[i].kiov_len + nob > desc->bd_nob_transferred) {
+                        desc->bd_iov[i].kiov_len =
+                                desc->bd_nob_transferred - nob;
+                }
+                nob += desc->bd_iov[i].kiov_len;
+        }
+#endif
+
+        rc = plain_verify_bulk_csum(desc, req->rq_flvr.u_bulk.hash.hash_alg,
+                                    tokenv);
+        if (rc)
+                CERROR("bulk read: client verify failed: %d\n", rc);
+
+        return rc;
 }
 
 /****************************************
@@ -303,13 +467,7 @@ struct ptlrpc_sec *plain_create_sec(struct obd_import *imp,
         struct ptlrpc_cli_ctx  *ctx;
         ENTRY;
 
-        LASSERT(RPC_FLVR_POLICY(sf->sf_rpc) == SPTLRPC_POLICY_PLAIN);
-
-        if (sf->sf_bulk_ciph != BULK_CIPH_ALG_NULL) {
-                CERROR("plain policy don't support bulk cipher: %u\n",
-                       sf->sf_bulk_ciph);
-                RETURN(NULL);
-        }
+        LASSERT(SPTLRPC_FLVR_POLICY(sf->sf_rpc) == SPTLRPC_POLICY_PLAIN);
 
         OBD_ALLOC_PTR(plsec);
         if (plsec == NULL)
@@ -410,9 +568,10 @@ int plain_alloc_reqbuf(struct ptlrpc_sec *sec,
                        int msgsize)
 {
         __u32 buflens[PLAIN_PACK_SEGMENTS] = { 0, };
-        int alloc_len;
+        int   alloc_len;
         ENTRY;
 
+        buflens[PLAIN_PACK_HDR_OFF] = sizeof(struct plain_header);
         buflens[PLAIN_PACK_MSG_OFF] = msgsize;
 
         if (req->rq_pack_udesc)
@@ -420,10 +579,7 @@ int plain_alloc_reqbuf(struct ptlrpc_sec *sec,
 
         if (req->rq_pack_bulk) {
                 LASSERT(req->rq_bulk_read || req->rq_bulk_write);
-
-                buflens[PLAIN_PACK_BULK_OFF] = bulk_sec_desc_size(
-                                                req->rq_flvr.sf_bulk_hash, 1,
-                                                req->rq_bulk_read);
+                buflens[PLAIN_PACK_BULK_OFF] = PLAIN_BSD_SIZE;
         }
 
         alloc_len = lustre_msg_size_v2(PLAIN_PACK_SEGMENTS, buflens);
@@ -444,7 +600,7 @@ int plain_alloc_reqbuf(struct ptlrpc_sec *sec,
         }
 
         lustre_init_msg_v2(req->rq_reqbuf, PLAIN_PACK_SEGMENTS, buflens, NULL);
-        req->rq_reqmsg = lustre_msg_buf_v2(req->rq_reqbuf, 0, 0);
+        req->rq_reqmsg = lustre_msg_buf(req->rq_reqbuf, PLAIN_PACK_MSG_OFF, 0);
 
         if (req->rq_pack_udesc)
                 sptlrpc_pack_user_desc(req->rq_reqbuf, PLAIN_PACK_USER_OFF);
@@ -476,13 +632,12 @@ int plain_alloc_repbuf(struct ptlrpc_sec *sec,
         int alloc_len;
         ENTRY;
 
+        buflens[PLAIN_PACK_HDR_OFF] = sizeof(struct plain_header);
         buflens[PLAIN_PACK_MSG_OFF] = msgsize;
 
         if (req->rq_pack_bulk) {
                 LASSERT(req->rq_bulk_read || req->rq_bulk_write);
-                buflens[PLAIN_PACK_BULK_OFF] = bulk_sec_desc_size(
-                                                req->rq_flvr.sf_bulk_hash, 0,
-                                                req->rq_bulk_read);
+                buflens[PLAIN_PACK_BULK_OFF] = PLAIN_BSD_SIZE;
         }
 
         alloc_len = lustre_msg_size_v2(PLAIN_PACK_SEGMENTS, buflens);
@@ -581,24 +736,46 @@ static struct ptlrpc_svc_ctx plain_svc_ctx = {
 static
 int plain_accept(struct ptlrpc_request *req)
 {
-        struct lustre_msg *msg = req->rq_reqbuf;
+        struct lustre_msg   *msg = req->rq_reqbuf;
+        struct plain_header *phdr;
         ENTRY;
 
-        LASSERT(RPC_FLVR_POLICY(req->rq_flvr.sf_rpc) == SPTLRPC_POLICY_PLAIN);
+        LASSERT(SPTLRPC_FLVR_POLICY(req->rq_flvr.sf_rpc) ==
+                SPTLRPC_POLICY_PLAIN);
+
+        if (SPTLRPC_FLVR_BASE(req->rq_flvr.sf_rpc) !=
+            SPTLRPC_FLVR_BASE(SPTLRPC_FLVR_PLAIN) ||
+            SPTLRPC_FLVR_BULK_TYPE(req->rq_flvr.sf_rpc) !=
+            SPTLRPC_FLVR_BULK_TYPE(SPTLRPC_FLVR_PLAIN)) {
+                CERROR("Invalid rpc flavor %x\n", req->rq_flvr.sf_rpc);
+                RETURN(SECSVC_DROP);
+        }
 
         if (msg->lm_bufcount < PLAIN_PACK_SEGMENTS) {
                 CERROR("unexpected request buf count %u\n", msg->lm_bufcount);
                 RETURN(SECSVC_DROP);
         }
 
-        if (req->rq_flvr.sf_rpc != SPTLRPC_FLVR_PLAIN) {
-                CERROR("Invalid rpc flavor %x\n", req->rq_flvr.sf_rpc);
-                RETURN(SECSVC_DROP);
+        phdr = lustre_msg_buf(msg, PLAIN_PACK_HDR_OFF, sizeof(*phdr));
+        if (phdr == NULL) {
+                CERROR("missing plain header\n");
+                RETURN(-EPROTO);
         }
 
-        req->rq_sp_from = plain_decode_sec_part(msg);
+        if (phdr->ph_ver != 0) {
+                CERROR("Invalid header version\n");
+                RETURN(-EPROTO);
+        }
 
-        if (PLAIN_WFLVR_HAS_USER(msg->lm_secflvr)) {
+        if (phdr->ph_bulk_hash_alg >= BULK_HASH_ALG_MAX) {
+                CERROR("invalid hash algorithm: %u\n", phdr->ph_bulk_hash_alg);
+                RETURN(-EPROTO);
+        }
+
+        req->rq_sp_from = phdr->ph_sp;
+        req->rq_flvr.u_bulk.hash.hash_alg = phdr->ph_bulk_hash_alg;
+
+        if (phdr->ph_flags & PLAIN_FL_USER) {
                 if (sptlrpc_unpack_user_desc(msg, PLAIN_PACK_USER_OFF)) {
                         CERROR("Mal-formed user descriptor\n");
                         RETURN(SECSVC_DROP);
@@ -608,11 +785,9 @@ int plain_accept(struct ptlrpc_request *req)
                 req->rq_user_desc = lustre_msg_buf(msg, PLAIN_PACK_USER_OFF, 0);
         }
 
-        if (PLAIN_WFLVR_HAS_BULK(msg->lm_secflvr)) {
-                if (bulk_sec_desc_unpack(msg, PLAIN_PACK_BULK_OFF)) {
-                        CERROR("Mal-formed bulk checksum request\n");
+        if (phdr->ph_flags & PLAIN_FL_BULK) {
+                if (plain_unpack_bsd(msg))
                         RETURN(SECSVC_DROP);
-                }
 
                 req->rq_pack_bulk = 1;
         }
@@ -630,24 +805,18 @@ static
 int plain_alloc_rs(struct ptlrpc_request *req, int msgsize)
 {
         struct ptlrpc_reply_state   *rs;
-        struct ptlrpc_bulk_sec_desc *bsd;
         __u32                        buflens[PLAIN_PACK_SEGMENTS] = { 0, };
         int                          rs_size = sizeof(*rs);
         ENTRY;
 
         LASSERT(msgsize % 8 == 0);
 
+        buflens[PLAIN_PACK_HDR_OFF] = sizeof(struct plain_header);
         buflens[PLAIN_PACK_MSG_OFF] = msgsize;
 
-        if (req->rq_pack_bulk && (req->rq_bulk_read || req->rq_bulk_write)) {
-                bsd = lustre_msg_buf(req->rq_reqbuf,
-                                     PLAIN_PACK_BULK_OFF, sizeof(*bsd));
-                LASSERT(bsd);
+        if (req->rq_pack_bulk && (req->rq_bulk_read || req->rq_bulk_write))
+                buflens[PLAIN_PACK_BULK_OFF] = PLAIN_BSD_SIZE;
 
-                buflens[PLAIN_PACK_BULK_OFF] = bulk_sec_desc_size(
-                                                        bsd->bsd_hash_alg, 0,
-                                                        req->rq_bulk_read);
-        }
         rs_size += lustre_msg_size_v2(PLAIN_PACK_SEGMENTS, buflens);
 
         rs = req->rq_reply_state;
@@ -693,6 +862,7 @@ int plain_authorize(struct ptlrpc_request *req)
 {
         struct ptlrpc_reply_state *rs = req->rq_reply_state;
         struct lustre_msg_v2      *msg = rs->rs_repbuf;
+        struct plain_header       *phdr;
         int                        len;
         ENTRY;
 
@@ -706,8 +876,14 @@ int plain_authorize(struct ptlrpc_request *req)
                 len = lustre_msg_size_v2(msg->lm_bufcount, msg->lm_buflens);
 
         msg->lm_secflvr = req->rq_flvr.sf_rpc;
+
+        phdr = lustre_msg_buf(msg, PLAIN_PACK_HDR_OFF, 0);
+        phdr->ph_ver = 0;
+        phdr->ph_flags = 0;
+        phdr->ph_bulk_hash_alg = req->rq_flvr.u_bulk.hash.hash_alg;
+
         if (req->rq_pack_bulk)
-                msg->lm_secflvr |= PLAIN_WFLVR_FLAG_BULK;
+                phdr->ph_flags |= PLAIN_FL_BULK;
 
         rs->rs_repdata_len = len;
 
@@ -730,44 +906,73 @@ static
 int plain_svc_unwrap_bulk(struct ptlrpc_request *req,
                           struct ptlrpc_bulk_desc *desc)
 {
-        struct ptlrpc_reply_state      *rs = req->rq_reply_state;
+        struct ptlrpc_reply_state   *rs = req->rq_reply_state;
+        struct ptlrpc_bulk_sec_desc *bsdr, *bsdv;
+        struct plain_bulk_token     *tokenr, *tokenv;
+        int                          rc;
 
-        LASSERT(rs);
+        LASSERT(req->rq_bulk_write);
         LASSERT(req->rq_pack_bulk);
-        LASSERT(req->rq_reqbuf->lm_bufcount >= PLAIN_PACK_SEGMENTS);
-        LASSERT(rs->rs_repbuf->lm_bufcount == PLAIN_PACK_SEGMENTS);
 
-        return bulk_csum_svc(desc, req->rq_bulk_read,
-                             lustre_msg_buf(req->rq_reqbuf,
-                                            PLAIN_PACK_BULK_OFF, 0),
-                             lustre_msg_buflen(req->rq_reqbuf,
-                                               PLAIN_PACK_BULK_OFF),
-                             lustre_msg_buf(rs->rs_repbuf,
-                                            PLAIN_PACK_BULK_OFF, 0),
-                             lustre_msg_buflen(rs->rs_repbuf,
-                                               PLAIN_PACK_BULK_OFF));
+        bsdr = lustre_msg_buf(req->rq_reqbuf, PLAIN_PACK_BULK_OFF, 0);
+        tokenr = (struct plain_bulk_token *) bsdr->bsd_data;
+        bsdv = lustre_msg_buf(rs->rs_repbuf, PLAIN_PACK_BULK_OFF, 0);
+        tokenv = (struct plain_bulk_token *) bsdv->bsd_data;
+
+        bsdv->bsd_version = 0;
+        bsdv->bsd_type = SPTLRPC_BULK_DEFAULT;
+        bsdv->bsd_svc = bsdr->bsd_svc;
+        bsdv->bsd_flags = 0;
+
+        if (bsdr->bsd_svc == SPTLRPC_BULK_SVC_NULL)
+                return 0;
+
+        rc = plain_verify_bulk_csum(desc, req->rq_flvr.u_bulk.hash.hash_alg,
+                                    tokenr);
+        if (rc) {
+                bsdv->bsd_flags |= BSD_FL_ERR;
+                CERROR("bulk write: server verify failed: %d\n", rc);
+        }
+
+        return rc;
 }
 
 static
 int plain_svc_wrap_bulk(struct ptlrpc_request *req,
                         struct ptlrpc_bulk_desc *desc)
 {
-        struct ptlrpc_reply_state      *rs = req->rq_reply_state;
+        struct ptlrpc_reply_state   *rs = req->rq_reply_state;
+        struct ptlrpc_bulk_sec_desc *bsdr, *bsdv;
+        struct plain_bulk_token     *tokenr, *tokenv;
+        int                          rc;
 
-        LASSERT(rs);
+        LASSERT(req->rq_bulk_read);
         LASSERT(req->rq_pack_bulk);
-        LASSERT(req->rq_reqbuf->lm_bufcount >= PLAIN_PACK_SEGMENTS);
-        LASSERT(rs->rs_repbuf->lm_bufcount == PLAIN_PACK_SEGMENTS);
 
-        return bulk_csum_svc(desc, req->rq_bulk_read,
-                             lustre_msg_buf(req->rq_reqbuf,
-                                            PLAIN_PACK_BULK_OFF, 0),
-                             lustre_msg_buflen(req->rq_reqbuf,
-                                               PLAIN_PACK_BULK_OFF),
-                             lustre_msg_buf(rs->rs_repbuf,
-                                            PLAIN_PACK_BULK_OFF, 0),
-                             lustre_msg_buflen(rs->rs_repbuf,
-                                               PLAIN_PACK_BULK_OFF));
+        bsdr = lustre_msg_buf(req->rq_reqbuf, PLAIN_PACK_BULK_OFF, 0);
+        tokenr = (struct plain_bulk_token *) bsdr->bsd_data;
+        bsdv = lustre_msg_buf(rs->rs_repbuf, PLAIN_PACK_BULK_OFF, 0);
+        tokenv = (struct plain_bulk_token *) bsdv->bsd_data;
+
+        bsdv->bsd_version = 0;
+        bsdv->bsd_type = SPTLRPC_BULK_DEFAULT;
+        bsdv->bsd_svc = bsdr->bsd_svc;
+        bsdv->bsd_flags = 0;
+
+        if (bsdr->bsd_svc == SPTLRPC_BULK_SVC_NULL)
+                return 0;
+
+        rc = plain_generate_bulk_csum(desc, req->rq_flvr.u_bulk.hash.hash_alg,
+                                      tokenv);
+        if (rc) {
+                CERROR("bulk read: server failed to compute "
+                       "checksum: %d\n", rc);
+        } else {
+                if (OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE))
+                        corrupt_bulk_data(desc);
+        }
+
+        return rc;
 }
 
 static struct ptlrpc_ctx_ops plain_ctx_ops = {
@@ -787,8 +992,8 @@ static struct ptlrpc_sec_cops plain_sec_cops = {
         .release_ctx            = plain_release_ctx,
         .flush_ctx_cache        = plain_flush_ctx_cache,
         .alloc_reqbuf           = plain_alloc_reqbuf,
-        .alloc_repbuf           = plain_alloc_repbuf,
         .free_reqbuf            = plain_free_reqbuf,
+        .alloc_repbuf           = plain_alloc_repbuf,
         .free_repbuf            = plain_free_repbuf,
         .enlarge_reqbuf         = plain_enlarge_reqbuf,
 };
index cb101cf..a8d0785 100644 (file)
@@ -1311,6 +1311,17 @@ ptlrpc_server_handle_req_in(struct ptlrpc_service *svc)
                 goto err_req;
         }
 
+        switch(lustre_msg_get_opc(req->rq_reqmsg)) {
+        case MDS_WRITEPAGE:
+        case OST_WRITE:
+                req->rq_bulk_write = 1;
+                break;
+        case MDS_READPAGE:
+        case OST_READ:
+                req->rq_bulk_read = 1;
+                break;
+        }
+
         CDEBUG(D_NET, "got req "LPD64"\n", req->rq_xid);
 
         req->rq_export = class_conn2export(
index 018c242..478f872 100644 (file)
@@ -83,7 +83,7 @@ check_and_setup_lustre
 
 rm -rf $DIR/[df][0-9]*
 
-check_runas_id $RUNAS_ID $RUNAS
+check_runas_id $RUNAS_ID $RUNAS_ID $RUNAS
 
 build_test_filter
 
@@ -647,27 +647,39 @@ run_test 7 "exercise enlarge_reqbuf()"
 
 test_8()
 {
-    sleep $TIMEOUT
+    local ATHISTORY=$(do_facet mds "find /sys/ -name at_history")
+    local ATOLDBASE=$(do_facet mds "cat $ATHISTORY")
+    do_facet mds "echo 8 >> $ATHISTORY"
+
     $LCTL dk > /dev/null
     debugsave
     sysctl -w lnet.debug="+other"
 
+    mkdir -p $DIR/d8
+    chmod a+w $DIR/d8
+
+    REQ_DELAY=`lctl get_param -n mdc.${FSNAME}-MDT0000-mdc-*.timeouts |
+               awk '/portal 12/ {print $5}' | tail -1`
+    REQ_DELAY=$((${REQ_DELAY} + ${REQ_DELAY} / 4 + 5))
+
     # sleep sometime in ctx handle
-    do_facet mds lctl set_param fail_val=30
+    do_facet mds lctl set_param fail_val=$REQ_DELAY
 #define OBD_FAIL_SEC_CTX_HDL_PAUSE       0x1204
     do_facet mds lctl set_param fail_loc=0x1204
 
     $RUNAS $LFS flushctx || error "can't flush ctx"
 
-    $RUNAS df $DIR &
-    DFPID=$!
-    echo "waiting df (pid $TOUCHPID) to finish..."
-    sleep 2 # give df a chance to really trigger context init rpc
+    $RUNAS touch $DIR/d8/f &
+    TOUCHPID=$!
+    echo "waiting for touch (pid $TOUCHPID) to finish..."
+    sleep 2 # give it a chance to really trigger context init rpc
     do_facet mds sysctl -w lustre.fail_loc=0
-    wait $DFPID || error "df should have succeeded"
+    wait $TOUCHPID || error "touch should have succeeded"
 
     $LCTL dk | grep "Early reply #" || error "No early reply"
+
     debugrestore
+    do_facet mds "echo $ATOLDBASE >> $ATHISTORY" || true
 }
 run_test 8 "Early reply sent for slow gss context negotiation"
 
@@ -676,98 +688,6 @@ run_test 8 "Early reply sent for slow gss context negotiation"
 # so each test should not assume any start flavor.
 #
 
-test_50() {
-    local sample=$TMP/sanity-gss-8
-    local tdir=$MOUNT/dir8
-    local iosize="256K"
-    local hash_algs="adler32 crc32 md5 sha1 sha256 sha384 sha512 wp256 wp384 wp512"
-
-    # create sample file with aligned size for direct i/o
-    dd if=/dev/zero of=$sample bs=$iosize count=1 || error
-    dd conv=notrunc if=/etc/termcap of=$sample bs=$iosize count=1 || error
-
-    rm -rf $tdir
-    mkdir $tdir || error "create dir $tdir"
-
-    restore_to_default_flavor
-
-    for alg in $hash_algs; do
-        echo "Testing $alg..."
-        flavor=krb5i-bulki:$alg/null
-        set_rule $FSNAME any cli2ost $flavor
-        wait_flavor cli2ost $flavor $cnt_cli2ost
-
-        dd if=$sample of=$tdir/$alg oflag=direct,dsync bs=$iosize || error "$alg write"
-        diff $sample $tdir/$alg || error "$alg read"
-    done
-
-    rm -rf $tdir
-    rm -f $sample
-}
-run_test 50 "verify bulk hash algorithms works"
-
-test_51() {
-    local s1=$TMP/sanity-gss-9.1
-    local s2=$TMP/sanity-gss-9.2
-    local s3=$TMP/sanity-gss-9.3
-    local s4=$TMP/sanity-gss-9.4
-    local tdir=$MOUNT/dir9
-    local s1_size=4194304   # n * pagesize (4M)
-    local s2_size=512       # n * blksize
-    local s3_size=111       # n * blksize + m
-    local s4_size=5         # m
-    local cipher_algs="arc4 aes128 aes192 aes256 cast128 cast256 twofish128 twofish256"
-
-    # create sample files for each situation
-    rm -f $s1 $s2 $s2 $s4
-    dd if=/dev/urandom of=$s1 bs=1M count=4 || error
-    dd if=/dev/urandom of=$s2 bs=$s2_size count=1 || error
-    dd if=/dev/urandom of=$s3 bs=$s3_size count=1 || error
-    dd if=/dev/urandom of=$s4 bs=$s4_size count=1 || error
-
-    rm -rf $tdir
-    mkdir $tdir || error "create dir $tdir"
-
-    restore_to_default_flavor
-
-    #
-    # different bulk data alignment will lead to different behavior of
-    # the implementation: (n > 0; 0 < m < encryption_block_size)
-    #  - full page i/o
-    #  - partial page, size = n * encryption_block_size
-    #  - partial page, size = n * encryption_block_size + m
-    #  - partial page, size = m
-    #
-    for alg in $cipher_algs; do
-        echo "Testing $alg..."
-        flavor=krb5p-bulkp:sha1/$alg
-        set_rule $FSNAME any cli2ost $flavor
-        wait_flavor cli2ost $flavor $cnt_cli2ost
-
-        # sync write
-        dd if=$s1 of=$tdir/$alg.1 oflag=dsync bs=1M || error "write $alg.1"
-        dd if=$s2 of=$tdir/$alg.2 oflag=dsync || error "write $alg.2"
-        dd if=$s3 of=$tdir/$alg.3 oflag=dsync || error "write $alg.3"
-        dd if=$s4 of=$tdir/$alg.4 oflag=dsync || error "write $alg.4"
-
-        # remount client
-        umount_client $MOUNT
-        umount_client $MOUNT2
-        mount_client $MOUNT
-        mount_client $MOUNT2
-
-        # read & compare
-        diff $tdir/$alg.1 $s1 || error "read $alg.1"
-        diff $tdir/$alg.2 $s2 || error "read $alg.2"
-        diff $tdir/$alg.3 $s3 || error "read $alg.3"
-        diff $tdir/$alg.4 $s4 || error "read $alg.4"
-    done
-
-    rm -rf $tdir
-    rm -f $sample
-}
-run_test 51 "bulk data alignment test under encryption mode"
-
 test_90() {
     if [ "$SLOW" = "no" ]; then
         total=10
index 0fdd328..89f290d 100644 (file)
@@ -3432,6 +3432,7 @@ setup_f77() {
 }
 
 test_77a() { # bug 10889
+       $GSS && skip "could not run with gss" && return
        [ ! -f $F77_TMP ] && setup_f77
        set_checksums 1
        dd if=$F77_TMP of=$DIR/$tfile bs=1M count=$F77SZ || error "dd error"
@@ -3441,6 +3442,7 @@ test_77a() { # bug 10889
 run_test 77a "normal checksum read/write operation ============="
 
 test_77b() { # bug 10889
+       $GSS && skip "could not run with gss" && return
        [ ! -f $F77_TMP ] && setup_f77
        #define OBD_FAIL_OSC_CHECKSUM_SEND       0x409
        lctl set_param fail_loc=0x80000409
@@ -3454,6 +3456,7 @@ test_77b() { # bug 10889
 run_test 77b "checksum error on client write ===================="
 
 test_77c() { # bug 10889
+       $GSS && skip "could not run with gss" && return
        [ ! -f $DIR/f77b ] && skip "requires 77b - skipping" && return
        set_checksums 1
        for algo in $CKSUM_TYPES; do
@@ -3470,6 +3473,7 @@ test_77c() { # bug 10889
 run_test 77c "checksum error on client read ==================="
 
 test_77d() { # bug 10889
+       $GSS && skip "could not run with gss" && return
        #define OBD_FAIL_OSC_CHECKSUM_SEND       0x409
        lctl set_param fail_loc=0x80000409
        set_checksums 1
@@ -3481,6 +3485,7 @@ test_77d() { # bug 10889
 run_test 77d "checksum error on OST direct write ==============="
 
 test_77e() { # bug 10889
+       $GSS && skip "could not run with gss" && return
        [ ! -f $DIR/f77 ] && skip "requires 77d - skipping" && return
        #define OBD_FAIL_OSC_CHECKSUM_RECEIVE    0x408
        lctl set_param fail_loc=0x80000408
@@ -3494,6 +3499,7 @@ test_77e() { # bug 10889
 run_test 77e "checksum error on OST direct read ================"
 
 test_77f() { # bug 10889
+       $GSS && skip "could not run with gss" && return
        set_checksums 1
        for algo in $CKSUM_TYPES; do
                cancel_lru_locks osc
@@ -3510,6 +3516,7 @@ test_77f() { # bug 10889
 run_test 77f "repeat checksum error on write (expect error) ===="
 
 test_77g() { # bug 10889
+       $GSS && skip "could not run with gss" && return
        remote_ost_nodsh && skip "remote OST with nodsh" && return
 
        [ ! -f $F77_TMP ] && setup_f77
@@ -3526,6 +3533,7 @@ test_77g() { # bug 10889
 run_test 77g "checksum error on OST write ======================"
 
 test_77h() { # bug 10889
+       $GSS && skip "could not run with gss" && return
        remote_ost_nodsh && skip "remote OST with nodsh" && return
 
        [ ! -f $DIR/f77g ] && skip "requires 77g - skipping" && return
@@ -3540,6 +3548,7 @@ test_77h() { # bug 10889
 run_test 77h "checksum error on OST read ======================="
 
 test_77i() { # bug 13805
+       $GSS && skip "could not run with gss" && return
        #define OBD_FAIL_OSC_CONNECT_CKSUM       0x40b
        lctl set_param fail_loc=0x40b
        remount_client $MOUNT
@@ -3554,6 +3563,7 @@ test_77i() { # bug 13805
 run_test 77i "client not supporting OSD_CONNECT_CKSUM =========="
 
 test_77j() { # bug 13805
+       $GSS && skip "could not run with gss" && return
        #define OBD_FAIL_OSC_CKSUM_ADLER_ONLY    0x40c
        lctl set_param fail_loc=0x40c
        remount_client $MOUNT
index b7cb0a3..6d7a725 100644 (file)
@@ -427,6 +427,10 @@ stop_gss_daemons() {
 init_gss() {
     if $GSS; then
         start_gss_daemons
+
+        if [ -n "$LGSS_KEYRING_DEBUG" ]; then
+            echo $LGSS_KEYRING_DEBUG > /proc/fs/lustre/sptlrpc/gss/lgss_keyring/debug_level
+        fi
     fi
 }