内容简介:随着Kafka版本2.0.0 的先决条件
随着Kafka版本2.0.0 的 KIP-255 (Kafka改进提案)的提交,现在我们可以使用SASL(简单认证和安全层)OAUTHBEARER来验证客户端到代理或中间人身份验证。
先决条件
- Docker
- Docker-compose
- Git
实现 Java 类支持OAuth机制
有了KIP-255的文档,我们需要实现2个类来使用外部OAuth2服务器来验证我们的客户端或代理。
第一个类实现AuthenticateCallbackHandler,并将为需要进行身份验证的客户端或代理服务。
<b>public</b> <b>class</b> OauthAuthenticateLoginCallbackHandler implements AuthenticateCallbackHandler { <b>private</b> <b>final</b> Logger log = LoggerFactory.getLogger(OauthAuthenticateLoginCallbackHandler.<b>class</b>); <b>private</b> Map<String, String> moduleOptions = <b>null</b>; <b>private</b> <b>boolean</b> configured = false; @Override <b>public</b> <b>void</b> configure(Map<String, ?> map, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) { <b>if</b> (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism)) <b>throw</b> <b>new</b> IllegalArgumentException(String.format(<font>"Unexpected SASL mechanism: %s"</font><font>, saslMechanism)); <b>if</b> (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == <b>null</b>) <b>throw</b> <b>new</b> IllegalArgumentException( String.format(</font><font>"Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)"</font><font>, jaasConfigEntries.size())); <b>this</b>.moduleOptions = Collections.unmodifiableMap((Map<String, String>) jaasConfigEntries.get(0).getOptions()); configured = <b>true</b>; } <b>public</b> <b>boolean</b> isConfigured(){ <b>return</b> <b>this</b>.configured; } @Override <b>public</b> <b>void</b> close() { } @Override <b>public</b> <b>void</b> handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { <b>if</b> (!isConfigured()) <b>throw</b> <b>new</b> IllegalStateException(</font><font>"Callback handler not configured"</font><font>); <b>for</b> (Callback callback : callbacks) { <b>if</b> (callback instanceof OAuthBearerTokenCallback) <b>try</b> { handleCallback((OAuthBearerTokenCallback) callback); } <b>catch</b> (KafkaException e) { <b>throw</b> <b>new</b> IOException(e.getMessage(), e); } <b>else</b> <b>throw</b> <b>new</b> UnsupportedCallbackException(callback); } } <b>private</b> <b>void</b> handleCallback(OAuthBearerTokenCallback callback){ <b>if</b> (callback.token() != <b>null</b>) <b>throw</b> <b>new</b> IllegalArgumentException(</font><font>"Callback had a token already"</font><font>); log.info(</font><font>"Try to acquire token!"</font><font>); OauthBearerTokenJwt token = OauthHttpCalls.login(<b>null</b>); log.info(</font><font>"Retrieved token.."</font><font>); <b>if</b>(token == <b>null</b>){ <b>throw</b> <b>new</b> IllegalArgumentException(</font><font>"Null token returned from server"</font><font>); } callback.token(token); } } </font>
第二个类实现相同的类,能让Kafka使用OAuth令牌自查来验证发送令牌。
<b>public</b> <b>class</b> OauthAuthenticateValidatorCallbackHandler implements AuthenticateCallbackHandler { <b>private</b> <b>final</b> Logger log = LoggerFactory.getLogger(OauthAuthenticateValidatorCallbackHandler.<b>class</b>); <b>private</b> List<AppConfigurationEntry> jaasConfigEntries; <b>private</b> Map<String, String> moduleOptions = <b>null</b>; <b>private</b> <b>boolean</b> configured = false; <b>private</b> Time time = Time.SYSTEM; @Override <b>public</b> <b>void</b> configure(Map<String, ?> map, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) { <b>if</b> (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism)) <b>throw</b> <b>new</b> IllegalArgumentException(String.format(<font>"Unexpected SASL mechanism: %s"</font><font>, saslMechanism)); <b>if</b> (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == <b>null</b>) <b>throw</b> <b>new</b> IllegalArgumentException( String.format(</font><font>"Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)"</font><font>, jaasConfigEntries.size())); <b>this</b>.moduleOptions = Collections.unmodifiableMap((Map<String, String>) jaasConfigEntries.get(0).getOptions()); configured = <b>true</b>; } <b>public</b> <b>boolean</b> isConfigured(){ <b>return</b> <b>this</b>.configured; } @Override <b>public</b> <b>void</b> close() { } @Override <b>public</b> <b>void</b> handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException { <b>if</b> (!isConfigured()) <b>throw</b> <b>new</b> IllegalStateException(</font><font>"Callback handler not configured"</font><font>); <b>for</b> (Callback callback : callbacks) { <b>if</b> (callback instanceof OAuthBearerValidatorCallback) <b>try</b> { OAuthBearerValidatorCallback validationCallback = (OAuthBearerValidatorCallback) callback; handleCallback(validationCallback); } <b>catch</b> (KafkaException e) { <b>throw</b> <b>new</b> IOException(e.getMessage(), e); } <b>else</b> <b>throw</b> <b>new</b> UnsupportedCallbackException(callback); } } <b>private</b> <b>void</b> handleCallback(OAuthBearerValidatorCallback callback){ String accessToken = callback.tokenValue(); <b>if</b> (accessToken == <b>null</b>) <b>throw</b> <b>new</b> IllegalArgumentException(</font><font>"Callback missing required token value"</font><font>); log.info(</font><font>"Trying to introspect Token!"</font><font>); OauthBearerTokenJwt token = OauthHttpCalls.introspectBearer(accessToken); log.info(</font><font>"Trying to introspected"</font><font>); </font><font><i>// Implement Check Expire Token..</i></font><font> <b>long</b> now = time.milliseconds(); <b>if</b>(now > token.expirationTime()){ OAuthBearerValidationResult.newFailure(</font><font>"Expired Token, needs refresh!"</font><font>); } log.info(</font><font>"Validated! token.."</font><font>); callback.token(token); } } </font>
server.properties文件
在这个文件中,我们将设置将用于在OAuth2服务器中进行登录和验证的类。完整的server.properties文件位于GitHub存储库中。
# ############################的OAuth类################### ########## listener.name.sasl_plaintext.oauthbearer.sasl.login.callback.handler.<b>class</b> = br.com.jairsjunior.security.oauthbearer.OauthAuthenticateLoginCallbackHandler listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.<b>class</b> = br.com.jairsjunior.security.oauthbearer.OauthAuthenticateValidatorCallbackHandler
启动OAuth2服务器和我们的Kafka
此项目需要OAuth2服务器来提供客户端或代理的令牌和验证。一个简单而开源的替代方案是使用ORY Hydra,这是一个用 Go 编写的认证OAuth2服务器。对于此示例,我们使用docker-compose文件来设置服务器并创建3个帐户:
- consumer-kafka:用于消费者容器
- producer-kafka:用于生产者容器
- broker-kafka:用于interbroker身份验证
为了启动我们的OAuth2服务器和我们的Kafka代理,我们需要克隆 kafka-playground GitHub存储库并在根文件夹中运行docker-compose文件。在运行docker-compose之前,我们需要设置一个名为HOST_IP的环境变量。
HOST_IP=XXX.XXX.XXX.XXX docker-compose up
配置我们的客户端(生产者/消费者/流)
在Git的存储库中,我们有一个名为kafka-using-java的文件夹,它包含一个生成器示例,使用我们的.jar文件。 要运行此示例,您需要设置HOST_IP环境变量,其中包含正在运行的计算机的IP地址。
HOST_IP=XXX.XXX.XXX.XXX docker-compose up
源码: Github
以上所述就是小编给大家介绍的《如何为Kafka设置OAuth2安全机制?》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- spring事务机制及一种简单的主从设置
- [CentOS7]redis设置开机启动,设置密码
- hadoop地址配置、内存配置、守护进程设置、环境设置
- OpenMediaVault 设置
- scrapy代理的设置
- jvm的参数设置
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。