内容简介:学习资料:消息发送时都被发送到一个topic,其本质就是一个目录,而topic是由一些Partition Logs(分区日志)组成,其组织结构如下图所示:我们可以看到,每个Partition中的消息都是有序的,生产的消息被不断追加到Partition log上,其中的每一个消息都被赋予了一个唯一的offset值。
其他更多 java 基础文章:
java基础学习(目录)学习资料: kafka数据可靠性深度解读
Kafka概述
- Kafka是一个分布式消息队列。Kafka对消息保存时根据Topic进行归类,发送消息者称为Producer,消息接受者称为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。
- 无论是kafka集群,还是producer和consumer都依赖于zookeeper集群保存一些meta信息,来保证系统可用性。
- 在流式计算中,Kafka一般用来缓存数据,Spark通过消费Kafka的数据进行计算。
Kafka架构
- Producer :消息生产者,就是向kafka broker发消息的客户端。
- Consumer :消息消费者,向kafka broker取消息的客户端
- Topic :可以理解为一个队列。
- Consumer Group (CG):这是kafka用来实现一个topic消息的广播(发给所有的consumer)和单播(发给任意一个consumer)的手段。一个topic可以有多个CG。topic的消息会复制(不是真的复制,是概念上的)到所有的CG,但每个partion只会把消息发给该CG中的一个consumer。如果需要实现广播,只要每个consumer有一个独立的CG就可以了。要实现单播只要所有的consumer在同一个CG。用CG还可以将consumer进行自由的分组而不需要多次发送消息到不同的topic。
- Broker :一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。
- Partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体(多个partition间)的顺序。
分区
消息发送时都被发送到一个topic,其本质就是一个目录,而topic是由一些Partition Logs(分区日志)组成,其组织结构如下图所示:
我们可以看到,每个Partition中的消息都是有序的,生产的消息被不断追加到Partition log上,其中的每一个消息都被赋予了一个唯一的offset值。
- 分区的原因
- 方便在集群中扩展,每个Partition可以通过调整以适应它所在的机器,而一个topic又可以有多个Partition组成,因此整个集群就可以适应任意大小的数据了;
- 可以提高并发,因为可以以Partition为单位读写了。
- 分区的原则
- 指定了patition,则直接使用;
- 未指定patition但指定key,通过对key的value进行hash出一个patition
- patition和key都未指定,使用轮询选出一个patition。
副本(Replication)
同一个partition可能会有多个replication(对应 server.properties 配置中的 default.replication.factor=N)。没有replication的情况下,一旦broker 宕机,其上所有 patition 的数据都不可被消费,同时producer也不能再将数据存于其上的patition。引入replication之后,同一个partition可能会有多个replication,而这时需要在这些replication之间选出一个leader,producer和consumer只与这个leader交互,其它replication作为follower从leader 中复制数据。
写入流程
- producer先从zookeeper的 "/brokers/.../state"节点找到该partition的leader
- producer将消息发送给该leader
- leader将消息写入本地log
- followers从leader pull消息,写入本地log后向leader发送ACK
- leader收到所有ISR中的replication的ACK后,增加HW(high watermark,最后commit 的offset)并向producer发送ACK
ACK,HW,ISR等可以阅读 kafka数据可靠性深度解读 学习
简单来说:
- HW是HighWatermark的缩写,是指consumer能够看到的此partition的位置
- ISR (In-Sync Replicas),这个是指副本同步队列。所有的副本(replicas)统称为Assigned Replicas,即AR。ISR是AR中的一个子集,由leader维护ISR列表,follower从leader同步数据有一些延迟(包括延迟时间replica.lag.time.max.ms和延迟条数replica.lag.max.messages两个维度),任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。
- ack机制:在kafka发送数据的时候,每次发送消息都会有一个确认反馈机制,确保消息正常的能够被收到。
Kafka消费过程分析
kafka提供了两套consumer API:高级Consumer API和低级API。
高级API
- 高级API优点
- 高级API 写起来简单
- 不需要去自行去管理offset,系统通过zookeeper自行管理
- 不需要管理分区,副本等情况,系统自动管理
- 消费者断线会自动根据上一次记录在zookeeper中的offset去接着获取数据(默认设置1分钟更新一下zookeeper中存的的offset)
- 可以使用group来区分对同一个topic的不同程序访问分离开来(不同的group记录不同的offset,这样不同程序读取同一个topic才不会因为offset互相影响)
- 高级API缺点
- 不能自行控制offset(对于某些特殊需求来说)
- 不能细化控制如分区、副本、zk等
低级API
- 低级 API 优点
- 能够开发者自己控制offset,想从哪里读取就从哪里读取。
- 自行控制连接分区,对分区自定义进行负载均衡
- 对zookeeper的依赖性降低(如:offset不一定非要靠zk存储,自行存储offset即可,比如存在文件或者内存中)
- 低级API缺点
- 太过复杂,需要自行控制offset,连接哪个分区,找到分区leader 等。
消费者组
消费者是以consumer group消费者组的方式工作,由一个或者多个消费者组成一个组,共同消费一个topic。每个分区在同一时间只能由group中的一个消费者读取,但是多个group可以同时消费这个partition。
在图中,有一个由三个消费者组成的group,有一个消费者读取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也可以叫做某个消费者是某个分区的拥有者。 在这种情况下,消费者可以通过水平扩展的方式同时读取大量的消息。另外,如果一个消费者失败了,那么其他的group成员会自动负载均衡读取之前失败的消费者读取的分区。
JAVA中使用Kafka
生产者
import org.apache.kafka.clients.producer.*; import java.util.ArrayList; import java.util.List; import java.util.Properties; public class KafkaProducerDemo { public static void main(String[] args){ //test(); test2(); } public static void test(){ Properties props= new Properties(); props.put("bootstrap.servers", "172.26.40.181:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer producer = new KafkaProducer(props); for(int i = 0; i < 10; i++){ producer.send(new ProducerRecord("first",Integer.toString(i), Integer.toString(i))); } producer.close(); } /** * 带回调函数 */ public static void test2(){ Properties props = new Properties(); // Kafka服务端的主机名和端口号 props.put("bootstrap.servers", "172.26.40.181:9092"); // 等待所有副本节点的应答 props.put("acks", "all"); // 消息发送最大尝试次数 props.put("retries", 0); // 一批消息处理大小 props.put("batch.size", 16384); // 增加服务端请求延时 props.put("linger.ms", 1); // 发送缓存区内存大小 props.put("buffer.memory", 33554432); // key序列化 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // value序列化 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //拦截器 List<String> interceptor = new ArrayList<>(); interceptor.add("com.hiway.practice.kafka.TimeInterceptor"); interceptor.add("com.hiway.practice.kafka.CounterInterceptor"); props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,interceptor); KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(props); for (int i = 0; i < 50; i++) { kafkaProducer.send(new ProducerRecord<String, String>("first", "hello" + i), new Callback() { @Override public void onCompletion(RecordMetadata metadata, Exception exception) { if (metadata != null) { System.err.println(metadata.partition() + "---" + metadata.offset()); } } }); } kafkaProducer.close(); } } 复制代码
消费者
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import java.util.Arrays; import java.util.Properties; public class CustomNewConsumer { public static void main(String[] args) { Properties props = new Properties(); // 定义kakfa 服务的地址,不需要将所有broker指定上 props.put("bootstrap.servers", "172.26.40.181:9092"); // 制定consumer group props.put("group.id", "test"); // 是否自动确认offset props.put("enable.auto.commit", "true"); // 自动确认offset的时间间隔 props.put("auto.commit.interval.ms", "1000"); // key的序列化类 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value的序列化类 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // 定义consumer KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); // 消费者订阅的topic, 可同时订阅多个 consumer.subscribe(Arrays.asList("first")); while (true) { // 读取数据,读取超时时间为100ms ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } 复制代码
拦截器
public class TimeInterceptor implements ProducerInterceptor<String,String> { @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { return new ProducerRecord<>(record.topic(),record.partition(),record.timestamp(),record.key(),System.currentTimeMillis() + "," + record.value().toString()); } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { } @Override public void close() { } @Override public void configure(Map<String, ?> configs) { } } public class CounterInterceptor implements ProducerInterceptor<String, String> { private int errorCounter = 0; private int successCounter = 0; @Override public void configure(Map<String, ?> configs) { } @Override public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) { return record; } @Override public void onAcknowledgement(RecordMetadata metadata, Exception exception) { // 统计成功和失败的次数 if (exception == null) { successCounter++; } else { errorCounter++; } } @Override public void close() { // 保存结果 System.out.println("Successful sent: " + successCounter); System.out.println("Failed sent: " + errorCounter); } } 复制代码
错误总结
1. Uncaught error in kafka producer I/O thread错误
这个问题主要是服务器上的kafka版本和IDEA中的kafka版本不一致导致的。
2.producer发送数据到集群上无反应
将kafka/config/server.properties文件中advertised.listeners改为如下属性。172.26.40.181是我虚拟机的IP。改完后重启,OK了。Java端的代码终于能通信了 advertised.listeners=PLAINTEXT://172.26.40.181:9092 advertised.listeners上的注释是这样的:
#Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). 复制代码
意思就是说:hostname、port都会广播给producer、consumer。如果你没有配置了这个属性的话,则使用listeners的值,如果listeners的值也没有配置的话,则使用 java.net.InetAddress.getCanonicalHostName()返回值(这里也就是返回localhost了)。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持 码农网
猜你喜欢:- 一文读懂监督学习、无监督学习、半监督学习、强化学习这四种深度学习方式
- 学习:人工智能-机器学习-深度学习概念的区别
- 统计学习,机器学习与深度学习概念的关联与区别
- 混合学习环境下基于学习行为数据的学习预警系统设计与实现
- 学习如何学习
- 深度学习的学习历程
本站部分资源来源于网络,本站转载出于传递更多信息之目的,版权归原作者或者来源机构所有,如转载稿涉及版权问题,请联系我们。
The Hard Thing About Hard Things
Ben Horowitz / HarperBusiness / 2014-3-4 / USD 29.99
Ben Horowitz, cofounder of Andreessen Horowitz and one of Silicon Valley's most respected and experienced entrepreneurs, offers essential advice on building and running a startup—practical wisdom for ......一起来看看 《The Hard Thing About Hard Things》 这本书的介绍吧!