如何为Kafka设置OAuth2安全机制?

栏目: 后端 · 发布时间: 6年前

内容简介:随着Kafka版本2.0.0 的先决条件

随着Kafka版本2.0.0 的 KIP-255 (Kafka改进提案)的提交,现在我们可以使用SASL(简单认证和安全层)OAUTHBEARER来验证客户端到代理或中间人身份验证。

先决条件

  • Docker
  • Docker-compose
  • Git

实现 Java 类支持OAuth机制

有了KIP-255的文档,我们需要实现2个类来使用外部OAuth2服务器来验证我们的客户端或代理。

第一个类实现AuthenticateCallbackHandler,并将为需要进行身份验证的客户端或代理服务。

<b>public</b> <b>class</b> OauthAuthenticateLoginCallbackHandler implements AuthenticateCallbackHandler {
    <b>private</b> <b>final</b> Logger log = LoggerFactory.getLogger(OauthAuthenticateLoginCallbackHandler.<b>class</b>);
    <b>private</b> Map<String, String> moduleOptions = <b>null</b>;
    <b>private</b> <b>boolean</b> configured = false;

    @Override
    <b>public</b> <b>void</b> configure(Map<String, ?> map, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
        <b>if</b> (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
            <b>throw</b> <b>new</b> IllegalArgumentException(String.format(<font>"Unexpected SASL mechanism: %s"</font><font>, saslMechanism));
        <b>if</b> (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == <b>null</b>)
            <b>throw</b> <b>new</b> IllegalArgumentException(
                    String.format(</font><font>"Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)"</font><font>,
                            jaasConfigEntries.size()));
        <b>this</b>.moduleOptions = Collections.unmodifiableMap((Map<String, String>) jaasConfigEntries.get(0).getOptions());
        configured = <b>true</b>;
    }

    <b>public</b> <b>boolean</b> isConfigured(){
        <b>return</b> <b>this</b>.configured;
    }

    @Override
    <b>public</b> <b>void</b> close() {
    }

    @Override
    <b>public</b> <b>void</b> handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        <b>if</b> (!isConfigured())
            <b>throw</b> <b>new</b> IllegalStateException(</font><font>"Callback handler not configured"</font><font>);
        <b>for</b> (Callback callback : callbacks) {
            <b>if</b> (callback instanceof OAuthBearerTokenCallback)
                <b>try</b> {
                    handleCallback((OAuthBearerTokenCallback) callback);
                } <b>catch</b> (KafkaException e) {
                    <b>throw</b> <b>new</b> IOException(e.getMessage(), e);
                }
            <b>else</b>
                <b>throw</b> <b>new</b> UnsupportedCallbackException(callback);
        }
    }

    <b>private</b> <b>void</b> handleCallback(OAuthBearerTokenCallback callback){
        <b>if</b> (callback.token() != <b>null</b>)
            <b>throw</b> <b>new</b> IllegalArgumentException(</font><font>"Callback had a token already"</font><font>);

        log.info(</font><font>"Try to acquire token!"</font><font>);
        OauthBearerTokenJwt token = OauthHttpCalls.login(<b>null</b>);
        log.info(</font><font>"Retrieved token.."</font><font>);
        <b>if</b>(token == <b>null</b>){
            <b>throw</b> <b>new</b> IllegalArgumentException(</font><font>"Null token returned from server"</font><font>);
        }
        callback.token(token);
    }

}
</font>

第二个类实现相同的类,能让Kafka使用OAuth令牌自查来验证发送令牌。

<b>public</b> <b>class</b> OauthAuthenticateValidatorCallbackHandler implements AuthenticateCallbackHandler {
    <b>private</b> <b>final</b> Logger log = LoggerFactory.getLogger(OauthAuthenticateValidatorCallbackHandler.<b>class</b>);
    <b>private</b> List<AppConfigurationEntry> jaasConfigEntries;
    <b>private</b> Map<String, String> moduleOptions = <b>null</b>;
    <b>private</b> <b>boolean</b> configured = false;
    <b>private</b> Time time = Time.SYSTEM;

    @Override
    <b>public</b> <b>void</b> configure(Map<String, ?> map, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
        <b>if</b> (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
            <b>throw</b> <b>new</b> IllegalArgumentException(String.format(<font>"Unexpected SASL mechanism: %s"</font><font>, saslMechanism));
        <b>if</b> (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == <b>null</b>)
            <b>throw</b> <b>new</b> IllegalArgumentException(
                    String.format(</font><font>"Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)"</font><font>,
                            jaasConfigEntries.size()));
        <b>this</b>.moduleOptions = Collections.unmodifiableMap((Map<String, String>) jaasConfigEntries.get(0).getOptions());
        configured = <b>true</b>;
    }

    <b>public</b> <b>boolean</b> isConfigured(){
        <b>return</b> <b>this</b>.configured;
    }

    @Override
    <b>public</b> <b>void</b> close() {
    }

    @Override
    <b>public</b> <b>void</b> handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        <b>if</b> (!isConfigured())
            <b>throw</b> <b>new</b> IllegalStateException(</font><font>"Callback handler not configured"</font><font>);
        <b>for</b> (Callback callback : callbacks) {
            <b>if</b> (callback instanceof OAuthBearerValidatorCallback)
                <b>try</b> {
                    OAuthBearerValidatorCallback validationCallback = (OAuthBearerValidatorCallback) callback;
                    handleCallback(validationCallback);
                } <b>catch</b> (KafkaException e) {
                    <b>throw</b> <b>new</b> IOException(e.getMessage(), e);
                }
            <b>else</b>
                <b>throw</b> <b>new</b> UnsupportedCallbackException(callback);
        }
    }

    <b>private</b> <b>void</b> handleCallback(OAuthBearerValidatorCallback callback){
        String accessToken = callback.tokenValue();
        <b>if</b> (accessToken == <b>null</b>)
            <b>throw</b> <b>new</b> IllegalArgumentException(</font><font>"Callback missing required token value"</font><font>);

        log.info(</font><font>"Trying to introspect Token!"</font><font>);
        OauthBearerTokenJwt token = OauthHttpCalls.introspectBearer(accessToken);
        log.info(</font><font>"Trying to introspected"</font><font>);

        </font><font><i>// Implement Check Expire Token..</i></font><font>
        <b>long</b> now = time.milliseconds();
        <b>if</b>(now > token.expirationTime()){
            OAuthBearerValidationResult.newFailure(</font><font>"Expired Token, needs refresh!"</font><font>);
        }

        log.info(</font><font>"Validated! token.."</font><font>);
        callback.token(token);
    }

}
</font>

server.properties文件

在这个文件中,我们将设置将用于在OAuth2服务器中进行登录和验证的类。完整的server.properties文件位于GitHub存储库中。

# ############################的OAuth类################### ##########
listener.name.sasl_plaintext.oauthbearer.sasl.login.callback.handler.<b>class</b> = br.com.jairsjunior.security.oauthbearer.OauthAuthenticateLoginCallbackHandler
listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.<b>class</b> = br.com.jairsjunior.security.oauthbearer.OauthAuthenticateValidatorCallbackHandler

启动OAuth2服务器和我们的Kafka

此项目需要OAuth2服务器来提供客户端或代理的令牌和验证。一个简单而开源的替代方案是使用ORY Hydra,这是一个用 Go 编写的认证OAuth2服务器。对于此示例,我们使用docker-compose文件来设置服务器并创建3个帐户:

  • consumer-kafka:用于消费者容器
  • producer-kafka:用于生产者容器
  • broker-kafka:用于interbroker身份验证

为了启动我们的OAuth2服务器和我们的Kafka代理,我们需要克隆 kafka-playground GitHub存储库并在根文件夹中运行docker-compose文件。在运行docker-compose之前,我们需要设置一个名为HOST_IP的环境变量。

HOST_IP=XXX.XXX.XXX.XXX docker-compose up

配置我们的客户端(生产者/消费者/流)

在Git的存储库中,我们有一个名为kafka-using-java的文件夹,它包含一个生成器示例,使用我们的.jar文件。 要运行此示例,您需要设置HOST_IP环境变量,其中包含正在运行的计算机的IP地址。

HOST_IP=XXX.XXX.XXX.XXX docker-compose up

源码: Github


以上所述就是小编给大家介绍的《如何为Kafka设置OAuth2安全机制?》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

重构(影印版)

重构(影印版)

Martin Fowler / 中国电力出版社 / 2003-7-1 / 49.00元

随着对象技术应用越来越普及,软件开发社区出现了一个新的问题。缺乏经验的开发者编写出了大批设计较差的程序,导致这些应用程序非常低效,且难于维护和扩展。本书除了讨论重构的各种技巧之外,还提供了超过70个可行重构的详细编目,对如何应用它们给出了有用的提示;并以step by step的形式给出了应用每一种重构的指南;而且用实例展示了重构的工作原理。这些示例都是用Java语言写成的,但其中的思想却可以运用......一起来看看 《重构(影印版)》 这本书的介绍吧!

URL 编码/解码
URL 编码/解码

URL 编码/解码

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试

RGB HSV 转换
RGB HSV 转换

RGB HSV 互转工具