如何为Kafka设置OAuth2安全机制?

栏目: 后端 · 发布时间: 7年前

内容简介:随着Kafka版本2.0.0 的先决条件

随着Kafka版本2.0.0 的 KIP-255 (Kafka改进提案)的提交,现在我们可以使用SASL(简单认证和安全层)OAUTHBEARER来验证客户端到代理或中间人身份验证。

先决条件

  • Docker
  • Docker-compose
  • Git

实现 Java 类支持OAuth机制

有了KIP-255的文档,我们需要实现2个类来使用外部OAuth2服务器来验证我们的客户端或代理。

第一个类实现AuthenticateCallbackHandler,并将为需要进行身份验证的客户端或代理服务。

<b>public</b> <b>class</b> OauthAuthenticateLoginCallbackHandler implements AuthenticateCallbackHandler {
    <b>private</b> <b>final</b> Logger log = LoggerFactory.getLogger(OauthAuthenticateLoginCallbackHandler.<b>class</b>);
    <b>private</b> Map<String, String> moduleOptions = <b>null</b>;
    <b>private</b> <b>boolean</b> configured = false;

    @Override
    <b>public</b> <b>void</b> configure(Map<String, ?> map, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
        <b>if</b> (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
            <b>throw</b> <b>new</b> IllegalArgumentException(String.format(<font>"Unexpected SASL mechanism: %s"</font><font>, saslMechanism));
        <b>if</b> (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == <b>null</b>)
            <b>throw</b> <b>new</b> IllegalArgumentException(
                    String.format(</font><font>"Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)"</font><font>,
                            jaasConfigEntries.size()));
        <b>this</b>.moduleOptions = Collections.unmodifiableMap((Map<String, String>) jaasConfigEntries.get(0).getOptions());
        configured = <b>true</b>;
    }

    <b>public</b> <b>boolean</b> isConfigured(){
        <b>return</b> <b>this</b>.configured;
    }

    @Override
    <b>public</b> <b>void</b> close() {
    }

    @Override
    <b>public</b> <b>void</b> handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        <b>if</b> (!isConfigured())
            <b>throw</b> <b>new</b> IllegalStateException(</font><font>"Callback handler not configured"</font><font>);
        <b>for</b> (Callback callback : callbacks) {
            <b>if</b> (callback instanceof OAuthBearerTokenCallback)
                <b>try</b> {
                    handleCallback((OAuthBearerTokenCallback) callback);
                } <b>catch</b> (KafkaException e) {
                    <b>throw</b> <b>new</b> IOException(e.getMessage(), e);
                }
            <b>else</b>
                <b>throw</b> <b>new</b> UnsupportedCallbackException(callback);
        }
    }

    <b>private</b> <b>void</b> handleCallback(OAuthBearerTokenCallback callback){
        <b>if</b> (callback.token() != <b>null</b>)
            <b>throw</b> <b>new</b> IllegalArgumentException(</font><font>"Callback had a token already"</font><font>);

        log.info(</font><font>"Try to acquire token!"</font><font>);
        OauthBearerTokenJwt token = OauthHttpCalls.login(<b>null</b>);
        log.info(</font><font>"Retrieved token.."</font><font>);
        <b>if</b>(token == <b>null</b>){
            <b>throw</b> <b>new</b> IllegalArgumentException(</font><font>"Null token returned from server"</font><font>);
        }
        callback.token(token);
    }

}
</font>

第二个类实现相同的类,能让Kafka使用OAuth令牌自查来验证发送令牌。

<b>public</b> <b>class</b> OauthAuthenticateValidatorCallbackHandler implements AuthenticateCallbackHandler {
    <b>private</b> <b>final</b> Logger log = LoggerFactory.getLogger(OauthAuthenticateValidatorCallbackHandler.<b>class</b>);
    <b>private</b> List<AppConfigurationEntry> jaasConfigEntries;
    <b>private</b> Map<String, String> moduleOptions = <b>null</b>;
    <b>private</b> <b>boolean</b> configured = false;
    <b>private</b> Time time = Time.SYSTEM;

    @Override
    <b>public</b> <b>void</b> configure(Map<String, ?> map, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
        <b>if</b> (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
            <b>throw</b> <b>new</b> IllegalArgumentException(String.format(<font>"Unexpected SASL mechanism: %s"</font><font>, saslMechanism));
        <b>if</b> (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == <b>null</b>)
            <b>throw</b> <b>new</b> IllegalArgumentException(
                    String.format(</font><font>"Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)"</font><font>,
                            jaasConfigEntries.size()));
        <b>this</b>.moduleOptions = Collections.unmodifiableMap((Map<String, String>) jaasConfigEntries.get(0).getOptions());
        configured = <b>true</b>;
    }

    <b>public</b> <b>boolean</b> isConfigured(){
        <b>return</b> <b>this</b>.configured;
    }

    @Override
    <b>public</b> <b>void</b> close() {
    }

    @Override
    <b>public</b> <b>void</b> handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        <b>if</b> (!isConfigured())
            <b>throw</b> <b>new</b> IllegalStateException(</font><font>"Callback handler not configured"</font><font>);
        <b>for</b> (Callback callback : callbacks) {
            <b>if</b> (callback instanceof OAuthBearerValidatorCallback)
                <b>try</b> {
                    OAuthBearerValidatorCallback validationCallback = (OAuthBearerValidatorCallback) callback;
                    handleCallback(validationCallback);
                } <b>catch</b> (KafkaException e) {
                    <b>throw</b> <b>new</b> IOException(e.getMessage(), e);
                }
            <b>else</b>
                <b>throw</b> <b>new</b> UnsupportedCallbackException(callback);
        }
    }

    <b>private</b> <b>void</b> handleCallback(OAuthBearerValidatorCallback callback){
        String accessToken = callback.tokenValue();
        <b>if</b> (accessToken == <b>null</b>)
            <b>throw</b> <b>new</b> IllegalArgumentException(</font><font>"Callback missing required token value"</font><font>);

        log.info(</font><font>"Trying to introspect Token!"</font><font>);
        OauthBearerTokenJwt token = OauthHttpCalls.introspectBearer(accessToken);
        log.info(</font><font>"Trying to introspected"</font><font>);

        </font><font><i>// Implement Check Expire Token..</i></font><font>
        <b>long</b> now = time.milliseconds();
        <b>if</b>(now > token.expirationTime()){
            OAuthBearerValidationResult.newFailure(</font><font>"Expired Token, needs refresh!"</font><font>);
        }

        log.info(</font><font>"Validated! token.."</font><font>);
        callback.token(token);
    }

}
</font>

server.properties文件

在这个文件中,我们将设置将用于在OAuth2服务器中进行登录和验证的类。完整的server.properties文件位于GitHub存储库中。

# ############################的OAuth类################### ##########
listener.name.sasl_plaintext.oauthbearer.sasl.login.callback.handler.<b>class</b> = br.com.jairsjunior.security.oauthbearer.OauthAuthenticateLoginCallbackHandler
listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.<b>class</b> = br.com.jairsjunior.security.oauthbearer.OauthAuthenticateValidatorCallbackHandler

启动OAuth2服务器和我们的Kafka

此项目需要OAuth2服务器来提供客户端或代理的令牌和验证。一个简单而开源的替代方案是使用ORY Hydra,这是一个用 Go 编写的认证OAuth2服务器。对于此示例,我们使用docker-compose文件来设置服务器并创建3个帐户:

  • consumer-kafka:用于消费者容器
  • producer-kafka:用于生产者容器
  • broker-kafka:用于interbroker身份验证

为了启动我们的OAuth2服务器和我们的Kafka代理,我们需要克隆 kafka-playground GitHub存储库并在根文件夹中运行docker-compose文件。在运行docker-compose之前,我们需要设置一个名为HOST_IP的环境变量。

HOST_IP=XXX.XXX.XXX.XXX docker-compose up

配置我们的客户端(生产者/消费者/流)

在Git的存储库中,我们有一个名为kafka-using-java的文件夹,它包含一个生成器示例,使用我们的.jar文件。 要运行此示例,您需要设置HOST_IP环境变量,其中包含正在运行的计算机的IP地址。

HOST_IP=XXX.XXX.XXX.XXX docker-compose up

源码: Github


以上所述就是小编给大家介绍的《如何为Kafka设置OAuth2安全机制?》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Servlet&JSP学习笔记

Servlet&JSP学习笔记

林信良 / 清华大学出版社 / 2010-4 / 48.00元

《Servlet&JSP学习笔记》以“在线书签”项目贯穿全书,随着每一章的讲述都在适当的时候将 Servlet & JSP技术应用于“在线书签”程序之中,并作适当修改,以了解完整的应用程序构建方法。《Servlet&JSP学习笔记》内容包括简单的Web应用程序,开发简单的Servlet & JSP合理管理,JSP的使用,整合数据库等相关内容,《Servlet&JSP学习笔记》适合Servlet ......一起来看看 《Servlet&JSP学习笔记》 这本书的介绍吧!

HTML 压缩/解压工具
HTML 压缩/解压工具

在线压缩/解压 HTML 代码

XML、JSON 在线转换
XML、JSON 在线转换

在线XML、JSON转换工具

正则表达式在线测试
正则表达式在线测试

正则表达式在线测试