如何为Kafka设置OAuth2安全机制?

栏目: 后端 · 发布时间: 7年前

内容简介:随着Kafka版本2.0.0 的先决条件

随着Kafka版本2.0.0 的 KIP-255 (Kafka改进提案)的提交,现在我们可以使用SASL(简单认证和安全层)OAUTHBEARER来验证客户端到代理或中间人身份验证。

先决条件

  • Docker
  • Docker-compose
  • Git

实现 Java 类支持OAuth机制

有了KIP-255的文档,我们需要实现2个类来使用外部OAuth2服务器来验证我们的客户端或代理。

第一个类实现AuthenticateCallbackHandler,并将为需要进行身份验证的客户端或代理服务。

<b>public</b> <b>class</b> OauthAuthenticateLoginCallbackHandler implements AuthenticateCallbackHandler {
    <b>private</b> <b>final</b> Logger log = LoggerFactory.getLogger(OauthAuthenticateLoginCallbackHandler.<b>class</b>);
    <b>private</b> Map<String, String> moduleOptions = <b>null</b>;
    <b>private</b> <b>boolean</b> configured = false;

    @Override
    <b>public</b> <b>void</b> configure(Map<String, ?> map, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
        <b>if</b> (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
            <b>throw</b> <b>new</b> IllegalArgumentException(String.format(<font>"Unexpected SASL mechanism: %s"</font><font>, saslMechanism));
        <b>if</b> (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == <b>null</b>)
            <b>throw</b> <b>new</b> IllegalArgumentException(
                    String.format(</font><font>"Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)"</font><font>,
                            jaasConfigEntries.size()));
        <b>this</b>.moduleOptions = Collections.unmodifiableMap((Map<String, String>) jaasConfigEntries.get(0).getOptions());
        configured = <b>true</b>;
    }

    <b>public</b> <b>boolean</b> isConfigured(){
        <b>return</b> <b>this</b>.configured;
    }

    @Override
    <b>public</b> <b>void</b> close() {
    }

    @Override
    <b>public</b> <b>void</b> handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        <b>if</b> (!isConfigured())
            <b>throw</b> <b>new</b> IllegalStateException(</font><font>"Callback handler not configured"</font><font>);
        <b>for</b> (Callback callback : callbacks) {
            <b>if</b> (callback instanceof OAuthBearerTokenCallback)
                <b>try</b> {
                    handleCallback((OAuthBearerTokenCallback) callback);
                } <b>catch</b> (KafkaException e) {
                    <b>throw</b> <b>new</b> IOException(e.getMessage(), e);
                }
            <b>else</b>
                <b>throw</b> <b>new</b> UnsupportedCallbackException(callback);
        }
    }

    <b>private</b> <b>void</b> handleCallback(OAuthBearerTokenCallback callback){
        <b>if</b> (callback.token() != <b>null</b>)
            <b>throw</b> <b>new</b> IllegalArgumentException(</font><font>"Callback had a token already"</font><font>);

        log.info(</font><font>"Try to acquire token!"</font><font>);
        OauthBearerTokenJwt token = OauthHttpCalls.login(<b>null</b>);
        log.info(</font><font>"Retrieved token.."</font><font>);
        <b>if</b>(token == <b>null</b>){
            <b>throw</b> <b>new</b> IllegalArgumentException(</font><font>"Null token returned from server"</font><font>);
        }
        callback.token(token);
    }

}
</font>

第二个类实现相同的类,能让Kafka使用OAuth令牌自查来验证发送令牌。

<b>public</b> <b>class</b> OauthAuthenticateValidatorCallbackHandler implements AuthenticateCallbackHandler {
    <b>private</b> <b>final</b> Logger log = LoggerFactory.getLogger(OauthAuthenticateValidatorCallbackHandler.<b>class</b>);
    <b>private</b> List<AppConfigurationEntry> jaasConfigEntries;
    <b>private</b> Map<String, String> moduleOptions = <b>null</b>;
    <b>private</b> <b>boolean</b> configured = false;
    <b>private</b> Time time = Time.SYSTEM;

    @Override
    <b>public</b> <b>void</b> configure(Map<String, ?> map, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
        <b>if</b> (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
            <b>throw</b> <b>new</b> IllegalArgumentException(String.format(<font>"Unexpected SASL mechanism: %s"</font><font>, saslMechanism));
        <b>if</b> (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == <b>null</b>)
            <b>throw</b> <b>new</b> IllegalArgumentException(
                    String.format(</font><font>"Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)"</font><font>,
                            jaasConfigEntries.size()));
        <b>this</b>.moduleOptions = Collections.unmodifiableMap((Map<String, String>) jaasConfigEntries.get(0).getOptions());
        configured = <b>true</b>;
    }

    <b>public</b> <b>boolean</b> isConfigured(){
        <b>return</b> <b>this</b>.configured;
    }

    @Override
    <b>public</b> <b>void</b> close() {
    }

    @Override
    <b>public</b> <b>void</b> handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
        <b>if</b> (!isConfigured())
            <b>throw</b> <b>new</b> IllegalStateException(</font><font>"Callback handler not configured"</font><font>);
        <b>for</b> (Callback callback : callbacks) {
            <b>if</b> (callback instanceof OAuthBearerValidatorCallback)
                <b>try</b> {
                    OAuthBearerValidatorCallback validationCallback = (OAuthBearerValidatorCallback) callback;
                    handleCallback(validationCallback);
                } <b>catch</b> (KafkaException e) {
                    <b>throw</b> <b>new</b> IOException(e.getMessage(), e);
                }
            <b>else</b>
                <b>throw</b> <b>new</b> UnsupportedCallbackException(callback);
        }
    }

    <b>private</b> <b>void</b> handleCallback(OAuthBearerValidatorCallback callback){
        String accessToken = callback.tokenValue();
        <b>if</b> (accessToken == <b>null</b>)
            <b>throw</b> <b>new</b> IllegalArgumentException(</font><font>"Callback missing required token value"</font><font>);

        log.info(</font><font>"Trying to introspect Token!"</font><font>);
        OauthBearerTokenJwt token = OauthHttpCalls.introspectBearer(accessToken);
        log.info(</font><font>"Trying to introspected"</font><font>);

        </font><font><i>// Implement Check Expire Token..</i></font><font>
        <b>long</b> now = time.milliseconds();
        <b>if</b>(now > token.expirationTime()){
            OAuthBearerValidationResult.newFailure(</font><font>"Expired Token, needs refresh!"</font><font>);
        }

        log.info(</font><font>"Validated! token.."</font><font>);
        callback.token(token);
    }

}
</font>

server.properties文件

在这个文件中,我们将设置将用于在OAuth2服务器中进行登录和验证的类。完整的server.properties文件位于GitHub存储库中。

# ############################的OAuth类################### ##########
listener.name.sasl_plaintext.oauthbearer.sasl.login.callback.handler.<b>class</b> = br.com.jairsjunior.security.oauthbearer.OauthAuthenticateLoginCallbackHandler
listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.<b>class</b> = br.com.jairsjunior.security.oauthbearer.OauthAuthenticateValidatorCallbackHandler

启动OAuth2服务器和我们的Kafka

此项目需要OAuth2服务器来提供客户端或代理的令牌和验证。一个简单而开源的替代方案是使用ORY Hydra,这是一个用 Go 编写的认证OAuth2服务器。对于此示例,我们使用docker-compose文件来设置服务器并创建3个帐户:

  • consumer-kafka:用于消费者容器
  • producer-kafka:用于生产者容器
  • broker-kafka:用于interbroker身份验证

为了启动我们的OAuth2服务器和我们的Kafka代理,我们需要克隆 kafka-playground GitHub存储库并在根文件夹中运行docker-compose文件。在运行docker-compose之前,我们需要设置一个名为HOST_IP的环境变量。

HOST_IP=XXX.XXX.XXX.XXX docker-compose up

配置我们的客户端(生产者/消费者/流)

在Git的存储库中,我们有一个名为kafka-using-java的文件夹,它包含一个生成器示例,使用我们的.jar文件。 要运行此示例,您需要设置HOST_IP环境变量,其中包含正在运行的计算机的IP地址。

HOST_IP=XXX.XXX.XXX.XXX docker-compose up

源码: Github


以上所述就是小编给大家介绍的《如何为Kafka设置OAuth2安全机制?》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!

查看所有标签

猜你喜欢:

本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们

Numerical Methods and Methods of Approximation in Science and En

Numerical Methods and Methods of Approximation in Science and En

Karan Surana / CRC Press / 2018-10-31

ABOUT THIS BOOK Numerical Methods and Methods of Approximation in Science and Engineering prepares students and other readers for advanced studies involving applied numerical and computational anal......一起来看看 《Numerical Methods and Methods of Approximation in Science and En》 这本书的介绍吧!

CSS 压缩/解压工具
CSS 压缩/解压工具

在线压缩/解压 CSS 代码

图片转BASE64编码
图片转BASE64编码

在线图片转Base64编码工具

SHA 加密
SHA 加密

SHA 加密工具