本文共 15788 字,大约阅读时间需要 52 分钟。
Apache Kafka® 是 一个分布式流处理平台. 这到底意味着什么呢?
我们知道流处理平台有以下三种特性:
注意:kafak是用scala语言编写的。
它可以用于两大类别的应用:
场景实例:
应用架构:
Kafka有四个核心的API:
关键术语
1.主题(topic):一个 topic 里保存的是同一类消息,相当于对消息的分类,每个 producer 将消息发送到 kafka 中,都需要指明要存的 topic 是哪个,也就是指明这个消息属于哪一类。2.分区(partition):每个 topic 都可以分成多个 partition,每个 partition 在存储层面是 append log 文件。任何发布到此 partition 的消息都会被直接追加到 log 文件的尾部。
为什么要进行分区呢?最根本的原因就是:kafka基于文件进行存储,当文件内容大到一定程度时,很容易达到单个磁盘的上限,因此,采用分区的办法,一个分区对应一个文件,这样就可以将数据分别存储到不同的server上去,另外这样做也可以负载均衡,容纳更多的消费者。
3.偏移量(Offset):一个分区对应一个磁盘上的文件,而消息在文件中的位置就称为 offset(偏移量),offset 为一个 long 型数字,它可以唯一标记一条消息。由于kafka 并没有提供其他额外的索引机制来存储 offset,文件只能顺序的读写,所以在kafka中几乎不允许对消息进行“随机读写”。
4.生产者
生产者可以将数据发布到所选择的topic(主题)中并且分配到topic的哪一个 partition(分区)中。
可以使用循环的方式来简单地实现负载均衡,也可以根据某些语义分区函数(例如:记录中的key)来完成。
5.消费者
消费者使用一个 消费组 名称来进行标识,发布到topic中的每条记录被分配给订阅消费组中的一个消费者实例.消费者实例可以分布在多个进程中或者多个机器上。
如果所有的消费者实例在同一消费组中,消息记录会负载平衡到每一个消费者实例.
如果所有的消费者实例在不同的消费组中,每条消息记录会广播到所有的消费者进程.
上图说明:由2个broker组成的kafka集群,总共有4个partition(P0-P3)。这个集群由2个Consumer Group, A有2个 consumer instances ,B有四个。
在Kafka中实现消费的方式是将日志中的分区划分到每一个消费者实例上,以便在任何时间,每个实例都是分区唯一的消费者。
维护消费组中的消费关系由Kafka协议动态处理。如果新的实例加入组,他们将从组中其他成员处接管一些 partition 分区;
如果一个实例消失,拥有的分区将被分发到剩余的实例。
Topic 就是数据主题,是数据记录发布的地方,可以用来区分业务系统。Kafka中的Topics总是多订阅者模式,一个topic可以拥有一个或者多个消费者来订阅它的数据。
对于每一个topic, Kafka集群都会维持一个分区日志,如下所示:
每个分区都是有序且顺序不可变的记录集,并且不断地追加到结构化的commit log文件。
分区中的每一个记录都会分配一个id号来表示顺序,我们称之为offset,offset用来唯一的标识分区中每一条记录。
在每一个消费者中唯一保存的元数据是offset(偏移量)即消费在log中的位置.
偏移量由消费者所控制:通常在读取记录后,消费者会以线性的方式增加偏移量,但是实际上,由于这个位置由消费者控制,所以消费者可以采用任何顺序来消费记录。
例如,一个消费者可以重置到一个旧的偏移量,从而重新处理过去的数据;也可以跳过最近的记录,从"现在"最新发送过来的消息开始消费。
这些细节说明Kafka 消费者是非常廉价的—消费者的增加和减少,对集群或者其他消费者没有多大的影响。
比如,你可以使用命令行工具,对一些topic内容执行 tail操作,并不会影响已存在的消费者消费数据。
日志中的 partition(分区)有以下几个用途。
第一,当日志大小超过了单台服务器的限制,允许日志进行扩展。每个单独的分区都必须受限于主机的文件限制,不过一个主题可能有多个分区,因此可以处理无限量的数据。
第二,可以作为并行的单元集,日志的分区partition (分布)在Kafka集群的服务器上。每个服务器在处理数据和请求时,共享这些分区。
每一个分区都会在已配置的服务器上进行备份,确保容错性.每个分区都有一台 server 作为 “leader”,零台或者多台server作为 follwers 。
leader server 处理一切对 partition (分区)的读写请求,而follwers只需被动的同步leader上的数据。当leader宕机了,followers 中的一台服务器会自动成为新的 leader。
每台 server 都会成为某些分区的 leader 和某些分区的 follower,因此集群的负载是平衡的。注意服务器的leader是以分区为维度,即可能topic1-1是leader,topic1-2却是follower。
在我看来,如果有多个consumer(消费者)可以消费一个分区的数据,那么在操作这个消费进度的时候就需要加锁,可能会对性能有一定的影响。
如果不加锁如果读到分区的同一个消息,可能会造成重复消费的问题。
以下是 Pub-Sub 消息的逐步工作流程 -
controller感知到分区leader所在的broker挂了(controller监听了很多zk节点可以感知到broker存活),controller会从每 个parititon的 replicas 副本列表中取出第一个broker作为leader,
当然这个broker需要也同时在ISR列表里。
消费者rebalance就是说如果consumer group中某个消费者挂了,此时会自动把分配给他的分区交给其他的消费者,如 果他又重启了,那么又会把一些分区重新交还给他。
如下情况可能会触发消费者rebalance
1. consumer所在服务重启或宕机了
2. 动态给topic增加了分区
3. 消费组订阅了更多的topic
当有消费者加入消费组时,消费者、消费组及组协调器之间会经历以下几个阶段。
第一阶段:选择组协调器
组协调器GroupCoordinator:每个consumer group都会选择一个broker作为自己的组协调器coordinator,负责监控 这个消费组里的所有消费者的心跳,以及判断是否宕机,然后开启消费者rebalance。
consumer group中的每个consumer启动时会向kafka集群中的某个节点发送 FindCoordinatorRequest 请求来查找对 应的组协调器GroupCoordinator,并跟其建立网络连接。
组协调器选择方式:
通过如下公式可以选出consumer消费的offset要提交到__consumer_offsets的哪个分区,这个分区leader对应的broker 就是这个consumer group的coordinator 公式:
hash(consumer group id) % __consumer_offsets主题的分区数
第二阶段:加入消费组JOIN GROUP
在成功找到消费组所对应的 GroupCoordinator 之后就进入加入消费组的阶段,在此阶段的消费者会向 GroupCoordinator 发送 JoinGroupRequest 请求,并处理响应。
然后GroupCoordinator 从一个consumer group中 选择第一个加入group的consumer作为leader(消费组协调器),把consumer group情况发送给这个leader,接着这个 leader会负责制定分区方案。
第三阶段( SYNC GROUP)consumer leader通过给GroupCoordinator发送SyncGroupRequest,接着GroupCoordinator就把分区方案下发给各 个consumer,他们会根据指定分区的leader broker进行网络连接以及消息消费。
主要有三种rebalance的策略:range、round-robin、sticky。
Kafka 提供了消费者客户端参数partition.assignment.strategy 来设置消费者与订阅主题之间的分区分配策略。默认情 况为range分配策略。
假设一个主题有10个分区(0-9),现在有三个consumer消费:
range策略就是按照分区序号排序,假设 n=分区数/消费者数量 = 3, m=分区数%消费者数量 = 1,那么前 m 个消 费者每个分配 n+1 个分区,后面的(消费者数量-m )个消费者每个分配 n 个分区。
比如分区0~3给一个consumer,分区4~6给一个consumer,分区7~9给一个consumer。
round-robin策略就是轮询分配,比如分区0、3、6、9给一个consumer,分区1、4、7给一个consumer,分区2、5、 8给一个consumer
sticky策略就是在rebalance的时候,需要保证如下两个原则。
1)分区的分配要尽可能均匀 。
2)分区的分配尽可能与上次分配的保持相同。
当两者发生冲突时,第一个目标优先于第二个目标 。这样可以最大程度维持原来的分区分配的策略。
比如对于第一种range情况的分配,如果第三个consumer挂了,那么重新用sticky策略分配的结果如下:
consumer1除了原有的0~3,会再分配一个7
consumer2除了原有的4~6,会再分配8和9
producer采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘 效率比随机写内存要高,保障 kafka 吞吐率)。
2、消息路由
producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。其路由机制为:
3、写入broker流程
Kafka具有存储功能,默认保存数据时间为7天或者大小1G,也就是说kafka broker上的数据超7天或者1G,就会被清理掉。
故消息消费完后,Kafka并不会删除对应的消息,而是由内部过期删除机制进行消息的删除。当然开发者也可以自己去配置符合自己业务的消息保存时间。
这些数据存放在broker服务器上,以log文件的形式存在。
kafka的安装目录下面的/conf/server.propertites文件中中设置:Kafka 一个分区的消息数据对应存储在一个文件夹下,以topic名称+分区号命名,消息在分区内是分段(segment)存储,每个段的消息都存储在不一样的log文件里.
这种特性方便old segment file快速被删除,kafka规定了一个段位的 log 文件最大为 1G,做这个限制目的是为了方便把 log 文件加载到内存去操作:
# 部分消息的offset索引文件,kafka每次往分区发4K(可配置)消息就会记录一条当前消息的offset到index文件,# 如果要定位消息的offset会先在这个文件里快速定位,再去log文件里找具体消息00000000000000000000.index# 消息存储文件,主要存offset和消息体00000000000000000000.log# 消息的发送时间索引文件,kafka每次往分区发4K(可配置)消息就会记录一条当前消息的发送时间戳与对应的offset到timeindex文件,# 如果需要按照时间来定位消息的offset,会先在这个文件里查找00000000000000000000.timeindex00000000000005367851.index00000000000005367851.log00000000000005367851.timeindex00000000000009936472.index00000000000009936472.log00000000000009936472.timeindex
这个 9936472 之类的数字,就是代表了这个日志段文件里包含的起始 Offset,也就说明这个分区里至少都写入了接近 1000 万条数据了。
Kafka Broker 有一个参数,log.segment.bytes,限定了每个日志段文件的大小,最大就是 1GB。
一个日志段文件满了,就自动开一个新的日志段文件来写入,避免单个文件过大,影响文件的读写性能,这个过程叫做 log rolling,正在被写入的那个日志段文件,叫做 active log segment。
consumer有两种消息方式,一种是存放在broker的日志目录中,另一种方式是存放在zookeeper中。
两种存放方式和你使用kafka-console-consumer命令使用的选项有关。
如果使用的是bootstrap-server,那么就存放在broker;如果使用的是–zookeeper那么就存放在zookeeper。offset:指的是kafka的topic中的每个消费组消费的下标。
一条消息对应一个offset下标,每次消费数据的时候如果提交offset,那么下次消费就会从提交的offset加一那里开始消费。
例子:比如一个topic中有100条数据,我消费了50条并且提交了,那么此时的kafka服务端记录提交的offset就是49(offset从0开始),那么下次消费的时候offset就从50开始消费。
每个consumer会定期将自己消费分区的offset提交给kafka内部topic:__consumer_offsets,提交过去的时候,key是 consumerGroupId+topic+分区号,value就是当前offset的值,
kafka会定期清理topic里的消息,最后就保留最新的那条数据。
因为__consumer_offsets可能会接收高并发的请求,kafka默认给其分配50个分区(可以通过 offsets.topic.num.partitions设置),这样可以通过加机器的方式抗大并发。
消费端自动提交和手动提交
offset提交的方式有两种,自动提交和手动提交。
conf/server.properties配置文件的参数设置: enable.auto.commit = true (那么这个是自动提交,false为手动) 默认为自动提交,可参考源码:org.springframework.kafka.core.DefaultKafkaConsumerFactory#isAutoCommit自动提交偏移量:
Kafka中偏移量的自动提交是由参数enable_auto_commit和auto_commit_interval_ms控制的,
当enable_auto_commit=True时,Kafka在消费的过程中会以频率为auto_commit_interval_ms 向Kafka自带的topic(__consumer_offsets)进行偏移量提交,具体提交到哪个Partation是以算法: partation=hash(group_id)%50来计算的。 如:group_id=test_group_1,则partation=hash(“test_group_1”)%50=28自动提交使用场景
1.自动提交能较大提升消费速度,所以对不是很重要的消息,如只是纪录一次当天是否有没有学习的消息,如果已经消费了这条消息后面不再消费,使用自动提交可以提供吞吐量。
2.自动提交可能造成消息丢失,所以尤其是在需要事物控制的时候,最好不使用自动提交,因为一旦消费端消费失败,这条消息就不会再进行消费了。如拉取数据之后进行写入mysql这种,所以这时我们就需要进行手动提交kafka的offset下标。
手动提交
对于手动提交offset主要有3种方式:
1.同步提交 2.异步提交 3.异步+同步 组合的方式提交参考:kafka原理系列之(一)消息存储和offset提交机制 https://blog.csdn.net/sheep8521/article/details/89491372
发出消息持久化机制参数
(1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。
(2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一 条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
(3)acks=-1或all: 需要等待 min.insync.replicas(默认为1,推荐配置大于等于2) 这个参数配置的副本个数都成功写入日志,这种策略会保证 只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。
props.put(ProducerConfig.ACKS_CONFIG, "1");
当然使用Kafka的管理控制页面将更加清晰可见。
关于分区和消费者关系,后面会补充,先摘录如下:
If, say, 6 TopicPartition s are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartition s, 2 containers will get 2 partitions and the third will get 1. If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.
high-level Kafka给予以下保证:
log的partitions分布在kafka集群中不同的broker上,每个broker可以请求备份其他broker上partition上的数据。
kafka集群支持配置一个partition备份的数量。针对每个partition,都有一个broker起到“leader”的作用,0个或多个其他的broker作为“follwers”的作用。
leader处理所有的针对这个partition的读写请求,而followers被动复制leader的结果,followers是不承接发送端和消费端的读写的,只是负责备份和leader失效后的选举。如果这个leader失效了,其中的一个follower将会自动的变成新的leader。我个人的理解有如下原因:
1.文件大小的限制,如果一个broker存储了所有的消息,那么文件的大小将非常大,且读取的性能很差,分摊到不同的broker,可以减少文件存储的压力和提升读取的性能。
2.一个broker挂了,影响将减少。因为那个broker的分区的leader挂了,只会影响该broker的分区leader,其他broker的leader不会有影响。
如果不分区,并且使用zk那用的主从架构,那么一个broker挂了,只有等leader选举完成之后才能正常使用。
可以用kafka压测工具自己测试分区数不同,各种情况下的吞吐量。
网络上很多资料都说分区数越多吞吐量越高 , 但从压测结果来看,分区数到达某个值吞吐量反而开始下降,实际上很多事情都会有一个 临界值,当超过 这个临界值之后,很多原本符合既定逻辑的走向又会变得不同。一般情况分区数跟集群机器数量相当就差不多了。 当然吞吐量的数值和走势还会和磁盘、文件系统、 I/O调度策略等因素相关。
一个partition同一个时刻在一个consumer group中只有对应的一个consumer instance在消费,从而保证顺序。但一个消费者实例可以订阅多个分区。
Kafka只在partition的范围内保证消息消费的局部顺序性,不能在同一个topic中的多个partition中保证总的消费顺序性。
如果有在总体上保证消费顺序的需求,那么我们可以通过将topic的partition数量设置为1,将consumer group中的 consumer instance数量也设置为1。当然这是不推荐的,最好就是跟进key值设置分区,如在线教育公司,做题的分发到一个partition中,看课的分发到另一个partition中,这样就能保证某一个业务场景,消费的业务如做题是有序的。
实现思路
发送延时消息时先把消息按照不同的延迟时间段发送到指定的队列中(topic_1s,topic_5s,topic_10s,...topic_2h,这个一 般不能支持任意时间段的延时),
然后通过定时器进行轮训消费这些topic,查看消息是否到期,如果到期就把这个消息发送到具体业务处 理的topic中,然后对应的topic去处理延时消息。
队列中消息越靠前的到期时间越早,具体来说就是定时器在一次消费过程中,对消息的发送时间做判断,看下是否延迟到对 应时间了,如果到了就转发,如果还没到这一次定时任务就可以提前结束了。
kafka是scala语言开发,运行在JVM上,需要对JVM参数合理设置, 修改bin/kafka-start-server.sh中的jvm设置
export KAFKA_HEAP_OPTS="‐Xmx16G ‐Xms16G ‐Xmn12G ‐XX:MetaspaceSize=256M ‐XX:+UseG1GC ‐XX:MaxGCPauseMillis=50"
这种大内存的情况一般都要用G1垃圾收集器,因为年轻代内存比较大,用G1可以设置GC最大停顿时间,不至于一次minor gc就花费太长 时间
max.poll.records
参考源码解释
常见配置参考:
/** * kafka配置 default * * @return */ private MapconsumerProperties() { Map props = new HashMap<>(16); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMS); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "50"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); return props; } /** * 自动提交消费者工厂 * * @return */ @Bean("autocommitConsumerFactory") @Primary public DefaultKafkaConsumerFactory autocommitConsumerFactory() { Map props = consumerProperties(); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE); return new DefaultKafkaConsumerFactory(props); }
/** * producerFactory * * @return */ @Bean public ProducerFactoryproducerFactory() { Map props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(props); } /** * kafkaTemplate * * @return */ @Bean public KafkaTemplate kafkaTemplate(ProducerFactory producerFactory) { return new KafkaTemplate<>(producerFactory); }
如果某段时间对已消费消息计算的结果觉得有问题,可能是由于程序bug导致的计算错误,当程序bug修复后,这时可能需要对之前已消 费的消息重新消费,可以指定从多久之前的消息回溯消费。
这种可以用consumer的offsetsForTimes、seek等方法指定从某个offset偏移 的消息开始消费。
安装及基本使用可参考:https://www.cnblogs.com/dadonggg/p/8205302.html
1.Kafka中文官方文档:https://kafka.apachecn.org/