博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka
阅读量:4093 次
发布时间:2019-05-25

本文共 15788 字,大约阅读时间需要 52 分钟。

一、基础概念

1.1 Kafka是什么?

Apache Kafka® 是 一个分布式流处理平台. 这到底意味着什么呢?

我们知道流处理平台有以下三种特性:

  1. 可以让你发布和订阅流式的记录。这一方面与消息队列或者企业消息系统类似。
  2. 可以储存流式的记录,并且有较好的容错性。
  3. 可以在流式记录产生时就进行处理。

注意:kafak是用scala语言编写的。

1.2 Kafka适合什么样的场景?

它可以用于两大类别的应用:

  1. 构造实时流数据管道,它可以在系统或应用之间可靠地获取数据。 (相当于message queue)
  2. 构建实时流式应用程序,对这些流数据进行转换或者影响。 (就是流处理,通过kafka stream topic和topic之间内部进行变化)

场景实例:

  • 日志收集:一个公司可以用Kafka收集各种服务的log,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
  • 用户活动跟踪:Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,然后订阅者通过订阅这些topic来做实时的监控分析,或者装载到hadoop、数据仓库中做离线分析和挖掘。
  • 实时计算:如在线教育公司的看课,做题等数据计算和消费。

1.3 Kafka特性

 

1.3.1 基础特性

  • Kafka作为一个集群,运行在一台或者多台服务器上.
  • Kafka 通过 topic 对存储的流数据进行分类。
  • 每条记录中包含一个key,一个value和一个timestamp(时间戳)。

 

应用架构:

 

 

Kafka有四个核心的API:

  • The  允许一个应用程序发布一串流式的数据到一个或者多个Kafka topic。
  • The  允许一个应用程序订阅一个或多个 topic ,并且对发布给他们的流式数据进行处理。
  • The  允许一个应用程序作为一个流处理器,消费一个或者多个topic产生的输入流,然后生产一个输出流到一个或多个topic中去,在输入输出流中进行有效的转换。
  • The  允许构建并运行可重用的生产者或者消费者,将Kafka topics连接到已存在的应用程序或者数据系统。比如,连接到一个关系型数据库,捕捉表(table)的所有变更内容。

 

关键术语

1.主题(topic):一个 topic 里保存的是同一类消息,相当于对消息的分类,每个 producer 将消息发送到 kafka 中,都需要指明要存的 topic 是哪个,也就是指明这个消息属于哪一类。

2.分区(partition):每个 topic 都可以分成多个 partition,每个 partition 在存储层面是 append log 文件。任何发布到此 partition 的消息都会被直接追加到 log 文件的尾部。

为什么要进行分区呢?最根本的原因就是:kafka基于文件进行存储,当文件内容大到一定程度时,很容易达到单个磁盘的上限,因此,采用分区的办法,一个分区对应一个文件,这样就可以将数据分别存储到不同的server上去,另外这样做也可以负载均衡,容纳更多的消费者。

3.偏移量(Offset):一个分区对应一个磁盘上的文件,而消息在文件中的位置就称为 offset(偏移量),offset 为一个 long 型数字,它可以唯一标记一条消息。

由于kafka 并没有提供其他额外的索引机制来存储 offset,文件只能顺序的读写,所以在kafka中几乎不允许对消息进行“随机读写”。

4.生产者

生产者可以将数据发布到所选择的topic(主题)中并且分配到topic的哪一个 partition(分区)中。

可以使用循环的方式来简单地实现负载均衡,也可以根据某些语义分区函数(例如:记录中的key)来完成。

5.消费者

消费者使用一个 消费组 名称来进行标识,发布到topic中的每条记录被分配给订阅消费组中的一个消费者实例.消费者实例可以分布在多个进程中或者多个机器上。

如果所有的消费者实例在同一消费组中,消息记录会负载平衡到每一个消费者实例.

如果所有的消费者实例在不同的消费组中,每条消息记录会广播到所有的消费者进程.

 

 

上图说明:由2个broker组成的kafka集群,总共有4个partition(P0-P3)。这个集群由2个Consumer Group, A有2个 consumer instances ,B有四个。

通常一个topic会有几个consumer group,每个consumer group都是一个逻辑上的订阅者(
logical
subscriber
)。每个consumer group由多个consumer instance组成,从而达到可扩展和容灾的功能。

 

在Kafka中实现消费的方式是将日志中的分区划分到每一个消费者实例上,以便在任何时间,每个实例都是分区唯一的消费者。

维护消费组中的消费关系由Kafka协议动态处理。如果新的实例加入组,他们将从组中其他成员处接管一些 partition 分区;

如果一个实例消失,拥有的分区将被分发到剩余的实例。

1.3.2 Topics和日志

 

Topic 就是数据主题,是数据记录发布的地方,可以用来区分业务系统。Kafka中的Topics总是多订阅者模式,一个topic可以拥有一个或者多个消费者来订阅它的数据。

对于每一个topic, Kafka集群都会维持一个分区日志,如下所示:

 

 

 

每个分区都是有序且顺序不可变的记录集,并且不断地追加到结构化的commit log文件。

分区中的每一个记录都会分配一个id号来表示顺序,我们称之为offset,offset用来唯一的标识分区中每一条记录。

 

 

在每一个消费者中唯一保存的元数据是offset(偏移量)即消费在log中的位置.

偏移量由消费者所控制:通常在读取记录后,消费者会以线性的方式增加偏移量,但是实际上,由于这个位置由消费者控制,所以消费者可以采用任何顺序来消费记录。

例如,一个消费者可以重置到一个旧的偏移量,从而重新处理过去的数据;也可以跳过最近的记录,从"现在"最新发送过来的消息开始消费。

 

这些细节说明Kafka 消费者是非常廉价的—消费者的增加和减少,对集群或者其他消费者没有多大的影响。

比如,你可以使用命令行工具,对一些topic内容执行 tail操作,并不会影响已存在的消费者消费数据。

 

日志中的 partition(分区)有以下几个用途。

第一,当日志大小超过了单台服务器的限制,允许日志进行扩展。每个单独的分区都必须受限于主机的文件限制,不过一个主题可能有多个分区,因此可以处理无限量的数据。

第二,可以作为并行的单元集,日志的分区partition (分布)在Kafka集群的服务器上。每个服务器在处理数据和请求时,共享这些分区。

每一个分区都会在已配置的服务器上进行备份,确保容错性.每个分区都有一台 server 作为 “leader”,零台或者多台server作为 follwers 。

leader server 处理一切对 partition (分区)的读写请求,而follwers只需被动的同步leader上的数据。当leader宕机了,followers 中的一台服务器会自动成为新的 leader。

每台 server 都会成为某些分区的 leader 和某些分区的 follower,因此集群的负载是平衡的。注意服务器的leader是以分区为维度,即可能topic1-1是leader,topic1-2却是follower。

 

思考

1.Kafka约定一个分区只能被一个消费者消费,为什么要这么设计?

在我看来,如果有多个consumer(消费者)可以消费一个分区的数据,那么在操作这个消费进度的时候就需要加锁,可能会对性能有一定的影响。

如果不加锁如果读到分区的同一个消息,可能会造成重复消费的问题。

 

 

二、核心原理

 

Kafka架构设计

发布 - 订阅消息的工作流程

以下是 Pub-Sub 消息的逐步工作流程 -

  • 生产者定期向主题发送消息。
  • Kafka 代理存储为该特定主题配置的分区中的所有消息。 它确保消息在分区之间平等共享。 如果生产者发送两个消息并且有两个分区,Kafka 将在第一分区中存储一个消息,在第二分区中存储第二消息。
  • 消费者订阅特定主题。
  • 一旦消费者订阅主题,Kafka 将向消费者提供主题的当前偏移,并且还将偏移保存在 Zookeeper 系统中。
  • 消费者将定期请求 Kafka (如100 Ms)新消息。
  • 一旦 Kafka 收到来自生产者的消息,它将这些消息转发给消费者。
  • 消费者将收到消息并进行处理。
  • 一旦消息被处理,消费者将向 Kafka 代理发送确认。
  • 一旦 Kafka 收到确认,它将偏移更改为新值,并在 Zookeeper 中更新它。 由于偏移在 Zookeeper 中维护,消费者可以正确地读取下一封邮件,即使在服务器暴力期间。
  • 以上流程将重复,直到消费者停止请求。
  • 消费者可以随时回退/跳到所需的主题偏移量,并阅读所有后续消息。

2.1 Kafka核心总控制器Controller

在Kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。

 

  • 当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。
  • 当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。
  • 当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责分区的重新分配。
     

2.1.1 Controller选举机制

 
在kafka集群启动的时候,会自动选举一台broker作为controller来管理整个集群,选举的过程是集群中每个broker都会 尝试在zookeeper上创建一个 /controller 临时节点,zookeeper会保证有且仅有一个broker能创建成功,这个broker 就会成为集群的总控器controller。
 
当这个controller角色的broker宕机了,此时zookeeper临时节点会消失,集群里其他broker会一直监听这个临时节 点,发现临时节点消失了,就竞争再次创建临时节点,就是我们上面说的选举机制,zookeeper又会保证有一个broker 成为新的controller。(其实质是使用到了zk的watch机制)。
 
 
具备控制器身份的broker需要比其他普通的broker多一份职责,具体细节如下:
1. 监听broker相关的变化。为Zookeeper中的/brokers/ids/节点添加BrokerChangeListener,用来处理broker 增减的变化。
2. 监听topic相关的变化。为Zookeeper中的/brokers/topics节点添加TopicChangeListener,用来处理topic增减 的变化;
为Zookeeper中的/admin/delete_topics节点添加TopicDeletionListener,用来处理删除topic的动作。
3. 从Zookeeper中读取获取当前所有与topic、partition以及broker有关的信息并进行相应的管理。对于所有topic 所对应的Zookeeper中的/brokers/topics/[topic]节点添加PartitionModificationsListener,用来监听topic中的 分区分配变化。
4. 更新集群的元数据信息,同步到其他普通的broker节点中。
 
 

2.1.2 Partition副本选举Leader机制

controller感知到分区leader所在的broker挂了(controller监听了很多zk节点可以感知到broker存活),controller会从每 个parititon的 replicas 副本列表中取出第一个broker作为leader,

当然这个broker需要也同时在ISR列表里。

 

2.2 消费者Rebalance机制

消费者rebalance就是说如果consumer group中某个消费者挂了,此时会自动把分配给他的分区交给其他的消费者,如 果他又重启了,那么又会把一些分区重新交还给他。

如下情况可能会触发消费者rebalance

1. consumer所在服务重启或宕机了

2. 动态给topic增加了分区

3. 消费组订阅了更多的topic

2.2.1 Rebalance流程

当有消费者加入消费组时,消费者、消费组及组协调器之间会经历以下几个阶段。

第一阶段:选择组协调器

组协调器GroupCoordinator:每个consumer group都会选择一个broker作为自己的组协调器coordinator,负责监控 这个消费组里的所有消费者的心跳,以及判断是否宕机,然后开启消费者rebalance。

consumer group中的每个consumer启动时会向kafka集群中的某个节点发送 FindCoordinatorRequest 请求来查找对 应的组协调器GroupCoordinator,并跟其建立网络连接。

组协调器选择方式:

通过如下公式可以选出consumer消费的offset要提交到__consumer_offsets的哪个分区,这个分区leader对应的broker 就是这个consumer group的coordinator 公式:

hash(consumer group id) % __consumer_offsets主题的分区数

第二阶段:加入消费组JOIN GROUP

在成功找到消费组所对应的 GroupCoordinator 之后就进入加入消费组的阶段,在此阶段的消费者会向 GroupCoordinator 发送 JoinGroupRequest 请求,并处理响应。

然后GroupCoordinator 从一个consumer group中 选择第一个加入group的consumer作为leader(消费组协调器),把consumer group情况发送给这个leader,接着这个 leader会负责制定分区方案。

第三阶段( SYNC GROUP)

consumer leader通过给GroupCoordinator发送SyncGroupRequest,接着GroupCoordinator就把分区方案下发给各 个consumer,他们会根据指定分区的leader broker进行网络连接以及消息消费。

2.2.2 消费者Rebalance分区分配策略

主要有三种rebalance的策略:range、round-robin、sticky。

Kafka 提供了消费者客户端参数partition.assignment.strategy 来设置消费者与订阅主题之间的分区分配策略。默认情 况为range分配策略。

假设一个主题有10个分区(0-9),现在有三个consumer消费:

  • range策略

range策略就是按照分区序号排序,假设 n=分区数/消费者数量 = 3, m=分区数%消费者数量 = 1,那么前 m 个消 费者每个分配 n+1 个分区,后面的(消费者数量-m )个消费者每个分配 n 个分区。

比如分区0~3给一个consumer,分区4~6给一个consumer,分区7~9给一个consumer。

  • round-robin策略

round-robin策略就是轮询分配,比如分区0、3、6、9给一个consumer,分区1、4、7给一个consumer,分区2、5、 8给一个consumer

  • sticky策略

sticky策略就是在rebalance的时候,需要保证如下两个原则。

1)分区的分配要尽可能均匀 。

2)分区的分配尽可能与上次分配的保持相同。

当两者发生冲突时,第一个目标优先于第二个目标 。这样可以最大程度维持原来的分区分配的策略。

比如对于第一种range情况的分配,如果第三个consumer挂了,那么重新用sticky策略分配的结果如下:

consumer1除了原有的0~3,会再分配一个7

consumer2除了原有的4~6,会再分配8和9

 

2.3 producer发布消息机制

2.3.1 写入机制

1、写入方式

producer采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘 效率比随机写内存要高,保障 kafka 吞吐率)。

2、消息路由

producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪一个 partition。其路由机制为:

  1. 指定了 patition,则直接使用;
  2. 未指定 patition 但指定 key,通过对 key 的 value 进行hash 选出一个 patition
  3. patition 和 key 都未指定,使用轮询选出一个 patition。

 

3、写入broker流程

 
1. producer 先从 zookeeper 的 "/brokers/.../state" 节点找到该 partition 的 leader
2. producer 将消息发送给该 leader
3. leader 将消息写入本地 log
4. followers 从 leader pull 消息,写入本地 log 后 向leader 发送 ACK
5. leader 收到所有 ISR 中的 replica 的 ACK 后,增加 HW(high watermark,最后 commit 的 offset) 并向
producer 发送 ACK

2.3.2 HW与LEO详解

 
HW俗称高水位,HighWatermark的缩写,取一个partition对应的ISR中最小的LEO(log-end-offset)作为HW, consumer最多只能消费到HW所在的位置。
 
另外每个replica都有HW,leader和follower各自负责更新自己的HW的状 态。
 
对于leader新写入的消息,consumer不能立刻消费,leader会等待该消息被所有ISR中的replicas同步后更新HW, 此时消息才能被consumer消费。
 
这样就保证了如果leader所在的broker失效,该消息仍然可以从新选举的leader中获 取。对于来自内部broker的读取请求,没有HW的限制。
 
下图详细的说明了当producer生产消息至broker后,ISR以及HW和LEO的流转过程:
 
https://note.youdao.com/yws/public/resource/d9fed88c81ff75e6c0e6364012d19fef/xmlnote/94D2D8CC6AF34C40ABE4AF815F9DE93C/83193
 
由此可见,Kafka的复制机制既不是完全的同步复制,也不是单纯的异步复制。事实上,同步复制要求所有能工作的follower都复制完,这
条消息才会被commit,这种复制方式极大的影响了吞吐率。而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写
入log就被认为已经commit,这种情况下如果follower都还没有复制完,落后于leader时,突然leader宕机,则会丢失数据。而Kafka的这
种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。
 
再回顾下消息发送端对发出消息持久化机制参数
acks
的设置,我们结合HW 和LEO来看下acks=1的情况
 
https://note.youdao.com/yws/public/resource/d9fed88c81ff75e6c0e6364012d19fef/xmlnote/040FAAC6C2264B90BD5A9FBBD138073A/83205
 
 

2.4 kafka之消息存储

Kafka具有存储功能,默认保存数据时间为7天或者大小1G,也就是说kafka broker上的数据超7天或者1G,就会被清理掉。

故消息消费完后,Kafka并不会删除对应的消息,而是由内部过期删除机制进行消息的删除。当然开发者也可以自己去配置符合自己业务的消息保存时间。

这些数据存放在broker服务器上,以log文件的形式存在。

kafka的安装目录下面的/conf/server.propertites文件中中设置:

2.4.1日志分段存储

Kafka 一个分区的消息数据对应存储在一个文件夹下,以topic名称+分区号命名,消息在分区内是分段(segment)存储,每个段的消息都存储在不一样的log文件里.

这种特性方便old segment file快速被删除,kafka规定了一个段位的 log 文件最大为 1G,做这个限制目的是为了方便把 log 文件加载到内存去操作:

# 部分消息的offset索引文件,kafka每次往分区发4K(可配置)消息就会记录一条当前消息的offset到index文件,# 如果要定位消息的offset会先在这个文件里快速定位,再去log文件里找具体消息00000000000000000000.index# 消息存储文件,主要存offset和消息体00000000000000000000.log# 消息的发送时间索引文件,kafka每次往分区发4K(可配置)消息就会记录一条当前消息的发送时间戳与对应的offset到timeindex文件,# 如果需要按照时间来定位消息的offset,会先在这个文件里查找00000000000000000000.timeindex00000000000005367851.index00000000000005367851.log00000000000005367851.timeindex00000000000009936472.index00000000000009936472.log00000000000009936472.timeindex

这个 9936472 之类的数字,就是代表了这个日志段文件里包含的起始 Offset,也就说明这个分区里至少都写入了接近 1000 万条数据了。

Kafka Broker 有一个参数,log.segment.bytes,限定了每个日志段文件的大小,最大就是 1GB。

一个日志段文件满了,就自动开一个新的日志段文件来写入,避免单个文件过大,影响文件的读写性能,这个过程叫做 log rolling,正在被写入的那个日志段文件,叫做 active log segment。

2.4.2 consumer的offset的存储

consumer有两种消息方式,一种是存放在broker的日志目录中,另一种方式是存放在zookeeper中。

两种存放方式和你使用kafka-console-consumer命令使用的选项有关。

如果使用的是bootstrap-server,那么就存放在broker;如果使用的是–zookeeper那么就存放在zookeeper。

2.5 Kafka消息提交机制

offset:指的是kafka的topic中的每个消费组消费的下标。

一条消息对应一个offset下标,每次消费数据的时候如果提交offset,那么下次消费就会从提交的offset加一那里开始消费。

例子:比如一个topic中有100条数据,我消费了50条并且提交了,那么此时的kafka服务端记录提交的offset就是49(offset从0开始),那么下次消费的时候offset就从50开始消费。

2.5.1消费端提交机制

每个consumer会定期将自己消费分区的offset提交给kafka内部topic:__consumer_offsets,提交过去的时候,key是 consumerGroupId+topic+分区号,value就是当前offset的值,

kafka会定期清理topic里的消息,最后就保留最新的那条数据。

因为__consumer_offsets可能会接收高并发的请求,kafka默认给其分配50个分区(可以通过 offsets.topic.num.partitions设置),这样可以通过加机器的方式抗大并发。

消费端自动提交和手动提交

offset提交的方式有两种,自动提交和手动提交。

conf/server.properties配置文件的参数设置:
enable.auto.commit = true (那么这个是自动提交,false为手动)

默认为自动提交,可参考源码:org.springframework.kafka.core.DefaultKafkaConsumerFactory#isAutoCommit

自动提交偏移量:

Kafka中偏移量的自动提交是由参数enable_auto_commit和auto_commit_interval_ms控制的,

当enable_auto_commit=True时,Kafka在消费的过程中会以频率为auto_commit_interval_ms
向Kafka自带的topic(__consumer_offsets)进行偏移量提交,具体提交到哪个Partation是以算法:
partation=hash(group_id)%50来计算的。

如:group_id=test_group_1,则partation=hash(“test_group_1”)%50=28

自动提交使用场景

1.自动提交能较大提升消费速度,所以对不是很重要的消息,如只是纪录一次当天是否有没有学习的消息,如果已经消费了这条消息后面不再消费,使用自动提交可以提供吞吐量。

2.自动提交可能造成消息丢失,所以尤其是在需要事物控制的时候,最好不使用自动提交,因为一旦消费端消费失败,这条消息就不会再进行消费了。如拉取数据之后进行写入mysql这种,所以这时我们就需要进行手动提交kafka的offset下标。

手动提交

对于手动提交offset主要有3种方式:

1.同步提交 2.异步提交 3.异步+同步 组合的方式提交

参考:kafka原理系列之(一)消息存储和offset提交机制 https://blog.csdn.net/sheep8521/article/details/89491372

2.5.2 生产端提交机制

发出消息持久化机制参数

(1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。

(2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一 条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。

(3)acks=-1或all: 需要等待 min.insync.replicas(默认为1,推荐配置大于等于2) 这个参数配置的副本个数都成功写入日志,这种策略会保证 只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。

props.put(ProducerConfig.ACKS_CONFIG, "1");

 

2.6 kafka与zookeeper注册和发现

 

当然使用Kafka的管理控制页面将更加清晰可见。

 

批量消费

关于分区和消费者关系,后面会补充,先摘录如下:

If, say, 6 TopicPartition s are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartition s, 2 containers will get 2 partitions and the third will get 1. If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.

三、高可用探究

 

high-level Kafka给予以下保证:

  • 生产者发送到特定topic partition 的消息将按照发送的顺序处理。 也就是说,如果记录M1和记录M2由相同的生产者发送,并先发送M1记录,那么M1的偏移比M2小,并在日志中较早出现
  • 一个消费者实例按照日志中的顺序查看记录.
  • 对于具有N个副本的主题,我们最多容忍N-1个服务器故障,从而保证不会丢失任何提交到日志中的记录.

 

1.分布式集群保证高可用

log的partitions分布在kafka集群中不同的broker上,每个broker可以请求备份其他broker上partition上的数据。

kafka集群支持配置一个partition备份的数量。针对每个partition,都有一个broker起到“leader”的作用,0个或多个其他的broker作为“follwers”的作用。

leader处理所有的针对这个partition的读写请求,而followers被动复制leader的结果,followers是不承接发送端和消费端的读写的,只是负责备份和leader失效后的选举。如果这个leader失效了,其中的一个follower将会自动的变成新的leader。

思考

1.为何Kafka设置多分区方案?

我个人的理解有如下原因:

1.文件大小的限制,如果一个broker存储了所有的消息,那么文件的大小将非常大,且读取的性能很差,分摊到不同的broker,可以减少文件存储的压力和提升读取的性能。

2.一个broker挂了,影响将减少。因为那个broker的分区的leader挂了,只会影响该broker的分区leader,其他broker的leader不会有影响。

如果不分区,并且使用zk那用的主从架构,那么一个broker挂了,只有等leader选举完成之后才能正常使用。

2.分区数越多吞吐量越高吗?

可以用kafka压测工具自己测试分区数不同,各种情况下的吞吐量。

网络上很多资料都说分区数越多吞吐量越高 , 但从压测结果来看,分区数到达某个值吞吐量反而开始下降,实际上很多事情都会有一个 临界值,当超过 这个临界值之后,很多原本符合既定逻辑的走向又会变得不同。一般情况分区数跟集群机器数量相当就差不多了。 当然吞吐量的数值和走势还会和磁盘、文件系统、 I/O调度策略等因素相关。
 

 

2.顺序消费

一个partition同一个时刻在一个consumer group中只有对应的一个consumer instance在消费,从而保证顺序。但一个消费者实例可以订阅多个分区。

Kafka只在partition的范围内保证消息消费的局部顺序性,不能在同一个topic中的多个partition中保证总的消费顺序性。

如果有在总体上保证消费顺序的需求,那么我们可以通过将topic的partition数量设置为1,将consumer group中的 consumer instance数量也设置为1。当然这是不推荐的,最好就是跟进key值设置分区,如在线教育公司,做题的分发到一个partition中,看课的分发到另一个partition中,这样就能保证某一个业务场景,消费的业务如做题是有序的。

 

3.延时队列

实现思路

发送延时消息时先把消息按照不同的延迟时间段发送到指定的队列中(topic_1s,topic_5s,topic_10s,...topic_2h,这个一 般不能支持任意时间段的延时),

然后通过定时器进行轮训消费这些topic,查看消息是否到期,如果到期就把这个消息发送到具体业务处 理的topic中,然后对应的topic去处理延时消息。

队列中消息越靠前的到期时间越早,具体来说就是定时器在一次消费过程中,对消息的发送时间做判断,看下是否延迟到对 应时间了,如果到了就转发,如果还没到这一次定时任务就可以提前结束了。

 

 

 

四、kafka应用实战

 

4.1 kafka典型应用架构

日志收集应用

 

4.2 线上部署环境配置

服务器部署选型

 

JVM参数设置
 

kafka是scala语言开发,运行在JVM上,需要对JVM参数合理设置, 修改bin/kafka-start-server.sh中的jvm设置

 export KAFKA_HEAP_OPTS="‐Xmx16G ‐Xms16G ‐Xmn12G ‐XX:MetaspaceSize=256M ‐XX:+UseG1GC ‐XX:MaxGCPauseMillis=50"

这种大内存的情况一般都要用G1垃圾收集器,因为年轻代内存比较大,用G1可以设置GC最大停顿时间,不至于一次minor gc就花费太长 时间

 

 

4.3 Kafak常见配置

Kafak消费端配置

max.poll.interval.ms
默认为5分钟
如果两次poll操作间隔超过了这个时间,broker就会认为这个consumer处理能力太弱,会将其踢出消费组,将分区分配给别的consumer消费 ,触发rebalance 。
如果你的消费者节点总是在重启完不久就不消费了,可以考虑检查改配置项或者优化你的消费者的消费速度等等。

max.poll.records

参考源码解释

 

常见配置参考:

/**     * kafka配置 default     *     * @return     */    private Map
consumerProperties() { Map
props = new HashMap<>(16); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollIntervalMS); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "50"); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); return props; } /** * 自动提交消费者工厂 * * @return */ @Bean("autocommitConsumerFactory") @Primary public DefaultKafkaConsumerFactory autocommitConsumerFactory() { Map
props = consumerProperties(); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, Boolean.TRUE); return new DefaultKafkaConsumerFactory(props); }

生产端配置

 

/**     * producerFactory     *      * @return     */    @Bean    public ProducerFactory
producerFactory() { Map
props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, servers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, LongSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(props); } /** * kafkaTemplate * * @return */ @Bean public KafkaTemplate
kafkaTemplate(ProducerFactory
producerFactory) { return new KafkaTemplate<>(producerFactory); }

 

4.4 消息回溯

如果某段时间对已消费消息计算的结果觉得有问题,可能是由于程序bug导致的计算错误,当程序bug修复后,这时可能需要对之前已消 费的消息重新消费,可以指定从多久之前的消息回溯消费。

这种可以用consumer的offsetsForTimes、seek等方法指定从某个offset偏移 的消息开始消费。

 

五、Kafka可视化管理工具

Kafka可视化管理工具kafka-manager

 

安装及基本使用可参考:https://www.cnblogs.com/dadonggg/p/8205302.html

 

 

参考文献

1.Kafka中文官方文档:https://kafka.apachecn.org/

你可能感兴趣的文章
深入了解php底层机制
查看>>
PHP中的stdClass 【转】
查看>>
XHProf-php轻量级的性能分析工具
查看>>
PHP7新特性 What will be in PHP 7/PHPNG
查看>>
比较strtr, str_replace和preg_replace三个函数的效率
查看>>
ubuntu 下编译PHP5.5.7问题:configure: error: freetype.h not found.
查看>>
PHP编译configure时常见错误 debian centos
查看>>
configure: error: Please reinstall the BZip2 distribution
查看>>
OpenCV gpu模块样例注释:video_reader.cpp
查看>>
【增强学习在无人驾驶中的应用】
查看>>
《python+opencv实践》四、图像特征提取与描述——29理解图像特征
查看>>
《python+opencv实践》四、图像特征提取与描述——30Harris 角点检测
查看>>
《python+opencv实践》四、图像特征提取与描述——31 Shi-Tomasi 角点检测& 适合于跟踪的图像特征
查看>>
OpenCV meanshift目标跟踪总结
查看>>
人工神经网络——神经元模型介绍
查看>>
人工神经网络——感知器介绍
查看>>
人工神经网络——反向传播算法(BackPropagation)
查看>>
进程的地址空间概述
查看>>
Windows 窗口底层原理
查看>>
一种函数指针的运用
查看>>