内容简介:随着Kafka版本2.0.0 的先决条件
随着Kafka版本2.0.0 的 KIP-255 (Kafka改进提案)的提交,现在我们可以使用SASL(简单认证和安全层)OAUTHBEARER来验证客户端到代理或中间人身份验证。
先决条件
- Docker
- Docker-compose
- Git
实现 Java 类支持OAuth机制
有了KIP-255的文档,我们需要实现2个类来使用外部OAuth2服务器来验证我们的客户端或代理。
第一个类实现AuthenticateCallbackHandler,并将为需要进行身份验证的客户端或代理服务。
<b>public</b> <b>class</b> OauthAuthenticateLoginCallbackHandler implements AuthenticateCallbackHandler {
<b>private</b> <b>final</b> Logger log = LoggerFactory.getLogger(OauthAuthenticateLoginCallbackHandler.<b>class</b>);
<b>private</b> Map<String, String> moduleOptions = <b>null</b>;
<b>private</b> <b>boolean</b> configured = false;
@Override
<b>public</b> <b>void</b> configure(Map<String, ?> map, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
<b>if</b> (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
<b>throw</b> <b>new</b> IllegalArgumentException(String.format(<font>"Unexpected SASL mechanism: %s"</font><font>, saslMechanism));
<b>if</b> (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == <b>null</b>)
<b>throw</b> <b>new</b> IllegalArgumentException(
String.format(</font><font>"Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)"</font><font>,
jaasConfigEntries.size()));
<b>this</b>.moduleOptions = Collections.unmodifiableMap((Map<String, String>) jaasConfigEntries.get(0).getOptions());
configured = <b>true</b>;
}
<b>public</b> <b>boolean</b> isConfigured(){
<b>return</b> <b>this</b>.configured;
}
@Override
<b>public</b> <b>void</b> close() {
}
@Override
<b>public</b> <b>void</b> handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
<b>if</b> (!isConfigured())
<b>throw</b> <b>new</b> IllegalStateException(</font><font>"Callback handler not configured"</font><font>);
<b>for</b> (Callback callback : callbacks) {
<b>if</b> (callback instanceof OAuthBearerTokenCallback)
<b>try</b> {
handleCallback((OAuthBearerTokenCallback) callback);
} <b>catch</b> (KafkaException e) {
<b>throw</b> <b>new</b> IOException(e.getMessage(), e);
}
<b>else</b>
<b>throw</b> <b>new</b> UnsupportedCallbackException(callback);
}
}
<b>private</b> <b>void</b> handleCallback(OAuthBearerTokenCallback callback){
<b>if</b> (callback.token() != <b>null</b>)
<b>throw</b> <b>new</b> IllegalArgumentException(</font><font>"Callback had a token already"</font><font>);
log.info(</font><font>"Try to acquire token!"</font><font>);
OauthBearerTokenJwt token = OauthHttpCalls.login(<b>null</b>);
log.info(</font><font>"Retrieved token.."</font><font>);
<b>if</b>(token == <b>null</b>){
<b>throw</b> <b>new</b> IllegalArgumentException(</font><font>"Null token returned from server"</font><font>);
}
callback.token(token);
}
}
</font>
第二个类实现相同的类,能让Kafka使用OAuth令牌自查来验证发送令牌。
<b>public</b> <b>class</b> OauthAuthenticateValidatorCallbackHandler implements AuthenticateCallbackHandler {
<b>private</b> <b>final</b> Logger log = LoggerFactory.getLogger(OauthAuthenticateValidatorCallbackHandler.<b>class</b>);
<b>private</b> List<AppConfigurationEntry> jaasConfigEntries;
<b>private</b> Map<String, String> moduleOptions = <b>null</b>;
<b>private</b> <b>boolean</b> configured = false;
<b>private</b> Time time = Time.SYSTEM;
@Override
<b>public</b> <b>void</b> configure(Map<String, ?> map, String saslMechanism, List<AppConfigurationEntry> jaasConfigEntries) {
<b>if</b> (!OAuthBearerLoginModule.OAUTHBEARER_MECHANISM.equals(saslMechanism))
<b>throw</b> <b>new</b> IllegalArgumentException(String.format(<font>"Unexpected SASL mechanism: %s"</font><font>, saslMechanism));
<b>if</b> (Objects.requireNonNull(jaasConfigEntries).size() != 1 || jaasConfigEntries.get(0) == <b>null</b>)
<b>throw</b> <b>new</b> IllegalArgumentException(
String.format(</font><font>"Must supply exactly 1 non-null JAAS mechanism configuration (size was %d)"</font><font>,
jaasConfigEntries.size()));
<b>this</b>.moduleOptions = Collections.unmodifiableMap((Map<String, String>) jaasConfigEntries.get(0).getOptions());
configured = <b>true</b>;
}
<b>public</b> <b>boolean</b> isConfigured(){
<b>return</b> <b>this</b>.configured;
}
@Override
<b>public</b> <b>void</b> close() {
}
@Override
<b>public</b> <b>void</b> handle(Callback[] callbacks) throws IOException, UnsupportedCallbackException {
<b>if</b> (!isConfigured())
<b>throw</b> <b>new</b> IllegalStateException(</font><font>"Callback handler not configured"</font><font>);
<b>for</b> (Callback callback : callbacks) {
<b>if</b> (callback instanceof OAuthBearerValidatorCallback)
<b>try</b> {
OAuthBearerValidatorCallback validationCallback = (OAuthBearerValidatorCallback) callback;
handleCallback(validationCallback);
} <b>catch</b> (KafkaException e) {
<b>throw</b> <b>new</b> IOException(e.getMessage(), e);
}
<b>else</b>
<b>throw</b> <b>new</b> UnsupportedCallbackException(callback);
}
}
<b>private</b> <b>void</b> handleCallback(OAuthBearerValidatorCallback callback){
String accessToken = callback.tokenValue();
<b>if</b> (accessToken == <b>null</b>)
<b>throw</b> <b>new</b> IllegalArgumentException(</font><font>"Callback missing required token value"</font><font>);
log.info(</font><font>"Trying to introspect Token!"</font><font>);
OauthBearerTokenJwt token = OauthHttpCalls.introspectBearer(accessToken);
log.info(</font><font>"Trying to introspected"</font><font>);
</font><font><i>// Implement Check Expire Token..</i></font><font>
<b>long</b> now = time.milliseconds();
<b>if</b>(now > token.expirationTime()){
OAuthBearerValidationResult.newFailure(</font><font>"Expired Token, needs refresh!"</font><font>);
}
log.info(</font><font>"Validated! token.."</font><font>);
callback.token(token);
}
}
</font>
server.properties文件
在这个文件中,我们将设置将用于在OAuth2服务器中进行登录和验证的类。完整的server.properties文件位于GitHub存储库中。
# ############################的OAuth类################### ########## listener.name.sasl_plaintext.oauthbearer.sasl.login.callback.handler.<b>class</b> = br.com.jairsjunior.security.oauthbearer.OauthAuthenticateLoginCallbackHandler listener.name.sasl_plaintext.oauthbearer.sasl.server.callback.handler.<b>class</b> = br.com.jairsjunior.security.oauthbearer.OauthAuthenticateValidatorCallbackHandler
启动OAuth2服务器和我们的Kafka
此项目需要OAuth2服务器来提供客户端或代理的令牌和验证。一个简单而开源的替代方案是使用ORY Hydra,这是一个用 Go 编写的认证OAuth2服务器。对于此示例,我们使用docker-compose文件来设置服务器并创建3个帐户:
- consumer-kafka:用于消费者容器
- producer-kafka:用于生产者容器
- broker-kafka:用于interbroker身份验证
为了启动我们的OAuth2服务器和我们的Kafka代理,我们需要克隆 kafka-playground GitHub存储库并在根文件夹中运行docker-compose文件。在运行docker-compose之前,我们需要设置一个名为HOST_IP的环境变量。
HOST_IP=XXX.XXX.XXX.XXX docker-compose up
配置我们的客户端(生产者/消费者/流)
在Git的存储库中,我们有一个名为kafka-using-java的文件夹,它包含一个生成器示例,使用我们的.jar文件。 要运行此示例,您需要设置HOST_IP环境变量,其中包含正在运行的计算机的IP地址。
HOST_IP=XXX.XXX.XXX.XXX docker-compose up
源码: Github
以上所述就是小编给大家介绍的《如何为Kafka设置OAuth2安全机制?》,希望对大家有所帮助,如果大家有任何疑问请给我留言,小编会及时回复大家的。在此也非常感谢大家对 码农网 的支持!
猜你喜欢:- spring事务机制及一种简单的主从设置
- [CentOS7]redis设置开机启动,设置密码
- hadoop地址配置、内存配置、守护进程设置、环境设置
- OpenMediaVault 设置
- scrapy代理的设置
- jvm的参数设置
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
Numerical Methods and Methods of Approximation in Science and En
Karan Surana / CRC Press / 2018-10-31
ABOUT THIS BOOK Numerical Methods and Methods of Approximation in Science and Engineering prepares students and other readers for advanced studies involving applied numerical and computational anal......一起来看看 《Numerical Methods and Methods of Approximation in Science and En》 这本书的介绍吧!