@Catyee
2021-08-23T13:43:03.000000Z
字数 14465
阅读 492
面试
kafka是一个发布订阅模式的消息中间件,是一个分布式的流数据处理引擎。
它具有三方面的功能,首先它是一个消息系统,所以具有和其它消息系统一样的解耦、流量削峰、缓冲等功能,同时它还是一个消息存储系统,kafka会把消息持久化到磁盘,降低了消息丢失的风险,最后kafka是一个流式数据的处理引擎,可以进行窗口、变换、聚合等基础操作。也是其它流式数据处理框架的可靠数据源。
消息中间件就是提供高效可靠的消息传递机制进行异步的数据传输或者通信的系统。
消息中间件有两种模式:
两种模式的主要区别在于消息能不能重复消费。
要往kafka中吸入消息,需要先创建一个Producer,这个Producer用于连接kafka并发送消息。创建Producer的时候可以设置多个属性,但是只有三个属性是必须的:
1、acks:指定必须要有多少个partition副本收到消息,生产者才会认为消息的写入是成功的。
acks=0:生产者不需要等待服务器的响应,以网络能支持的最大速度发送消息,吞吐量高,但是broker有没有收到消息,生产者是不知道的,也就是有可能丢失消息
acks=1:leader partition收到消息,生产者就会收到一个来自服务器的成功响应,这种情况依然可能丢失数据,比如
acks=all:所有的partition都收到消息,生产者才会收到一个服务器的成功响应,这种情况不会丢失消息,但是同样这种配置也是吞吐量最低的
2、buffer.memory,设置生产者内缓存区域的大小,生产者用它缓冲要发送到服务器的消息。
创建了Producer之后就可以用Producer向kafka发送消息了,每个消息都会被封装为一个ProducerRecord对象,构建这个Producer对象的时候必须指定消息所属的Topic和消息值Value,此外还可以指定消息所属的Partition以及消息的Key,Producer对象可以只包含目标主题和值,键可以设置为null,如果key为null,依次将消息发送至该topic下的所有分区。不过大多数应用程序会用到键。键有两个用途 :可以作为消息的附加信息,也可以用来决定消息该被写到主题的哪个分区,拥有相同键的悄息将被写到同一个分区。
构建ProducerRecord之后就可以发送了,发送大致可以分为以下四个步骤:
它只管往 Kafka 里面发送消息,但是不关心消息是否正确到达。
这种方式的效率最高,但是可靠性也最差,比如当发生某些不可充实异常的时候会造成消息的丢失。
2、同步(sync)
producer.send()返回一个Future对象,调用get()方法进行同步等待就知道消息是否发送成功。
这种方式意味着发送一条消息需要等上个消息发送成功后才可以继续发送。
3、 异步(async)
Kafka支持producer.send()传入一个回调函数,消息不管成功或者失败都会调用这个回调函数,这样就算是异步发送,至于消息成功或失败后的处理逻辑就看用户的回调函数是如何实现的了。
消息队列有两种模式,点对点模式和发布订阅模式,对于点对点模式,一条消息只能被一个消费者消费,发布订阅模式模式则消息可以被多个消费者消费。
Kafka为这两种模型提供了单一的消费者抽象模型: 消费者组 (consumer group),一个消费者组中可以有多个消费者,一条消息可以在组间重复消费,但是在组内,只会有一个消费者消费到,假如所有的消费者都在一个组中,那么这就变成了队列模型,假如所有的消费者都在不同的组中,那么就完全变成了发布-订阅模型。
一个消费者组中消费者订阅同一个Topic,每个消费者接受Topic的一部分分区的消息,从而实现对消费者的横向扩展,提高了吞吐量。
一个消费者对应一个线程,一个消费者可以消费多个分区中的数据,但是如果消费者的数量大于topic中分区的数量,那么就会有消费者线程空跑。
使partition的所有权在消费者之间转移,这样的行为叫作再均衡:
第一步是创建消费者:也就是一个KafkaConsumer对象,同样只有三个参数是必要的,也就是bootstrap.servers、key.deserializer、value.deserializer。
消费者创建好之后下一步是订阅主题,可以通过正则来匹配多个主题,再下一步就可以通过轮询来向kafka请求数据了。轮询不只是获取数据那么简单。在第一次调用新消费者的poll()方法时,它会负责查找GroupCoordinator,然后加入群组,接受分配的分区。如果发生了再均衡,整个过程也是在轮询期间进行的。当然,心跳也是在轮询里发送出去的。所以我们要确保在轮询期间所做的任何逻辑都可以尽快完成。
消费者为什么要提交偏移量
当消费者崩溃或者有新的消费者加入,那么就会触发再均衡(rebalance),完成再均衡后,每个消费者可能会分配到新的分区,而不是之前处理那个,为了能够继续之前的工作,消费者需要读取每个partition最后一次提交的偏移量,然后从偏移量指定的地方继续处理。那么消费者是如何提交偏移量的呢?消费者往一个叫作_consumer_offset的特殊主题发送消息,消息里包含每个分区的偏移量。
提交偏移量的方式
在Kafka集群中会有一个或者多个broker,其中有一个broker会被选举为控制器(Kafka Controller),它负责管理整个集群中所有分区和副本的状态。当某个分区的leader副本出现故障时,由控制器负责为该分区选举新的leader副本。当检测到某个分区的ISR集合发生变化时,由控制器负责通知所有broker更新其元数据信息。当使用kafka-topics.sh脚本为某个topic增加分区数量时,同样还是由控制器负责分区的重新分配。
Kafka中的控制器选举的工作依赖于Zookeeper,成功竞选为控制器的broker会在Zookeeper中创建/controller这个临时(EPHEMERAL)节点,此临时节点的内容参考如下:
{"version":1,"brokerid":0,"timestamp":"1529210278988"}
其中version在目前版本中固定为1,brokerid表示称为控制器的broker的id编号,timestamp表示竞选成为控制器时的时间戳。
在任意时刻,集群中有且仅有一个控制器。每个broker启动的时候会去尝试去读取/controller节点的brokerid的值,如果读取到brokerid的值不为-1,则表示已经有其它broker节点成功竞选为控制器,所以当前broker就会放弃竞选;如果Zookeeper中不存在/controller这个节点,或者这个节点中的数据异常,那么就会尝试去创建/controller这个节点,当前broker去创建节点的时候,也有可能其他broker同时去尝试创建这个节点,只有创建成功的那个broker才会成为控制器,而创建失败的broker则表示竞选失败。每个broker都会在内存中保存当前控制器的brokerid值,这个值可以标识为activeControllerId。
Zookeeper中还有一个与控制器有关的/controller_epoch节点,这个节点是持久(PERSISTENT)节点,节点中存放的是一个整型的controller_epoch值。controller_epoch用于记录控制器发生变更的次数,即记录当前的控制器是第几代控制器,我们也可以称之为“控制器的纪元”。controller_epoch的初始值为1,即集群中第一个控制器的纪元为1,当控制器发生变更时,每选出一个新的控制器就将该字段值加1。每个和控制器交互的请求都会携带上controller_epoch这个字段,如果请求的controller_epoch值小于内存中的controller_epoch值,则认为这个请求是向已经过期的控制器所发送的请求,那么这个请求会被认定为无效的请求。如果请求的controller_epoch值大于内存中的controller_epoch值,那么则说明已经有新的控制器当选了。由此可见,Kafka通过controller_epoch来保证控制器的唯一性,进而保证相关操作的一致性。
按照常识,要想保证高可用保证不丢失,最直观的就是制造冗余,多做备份,数据互备嘛,Kafka 也是这么去做的。
在Kafka 中备份日志文件被称为replica,也就是副本,副本又分为leader replica和follower replica,follower replica不对外提供读写服务,存在的唯一目的就是防止消息丢失,只有leader才提供读写服务,follower的作用就是充当leader的候补,平时的操作只有信息同步。
为了保证性能,Kafka不会采用强一致性的方式来同步主从的数据,也就是说并不要求每个follower副本都和leader副本保持一致。Kafka采用的策略是维护了一个in-sync Replica的列表,也就是ISR,每个Partition都会有一个ISR,并且由leader动态维护。Leader 不需要等待所有 Follower 都完成同步,只要在 ISR 中的 Follower 完成数据同步就可以发送 ack 给生产者即可认为消息同步完成。同时如果发现 ISR 里面某一个 follower 落后太多的话,就会把它剔除。
ISR(in sync replica)的含义是同步的replica,相对的就有out of sync replica,也就是跟不上同步节奏的replica,现在面临的有两个问题,当replica 跟不上进度时该怎么处理(或原本跟不上节奏的现在又跟上节奏了该如何处理)、如何去判定跟不跟得上节奏。
第一个问题很简单,跟上节奏就加入ISR,跟不上节奏就踢出ISR。关键是如何判定一个followor是否跟上了同步节奏
在0.9.0.0之前,Kafka提供了replica lag.max.messages 来控制follower副本最多落后leader副本的消息数量,follower 相对于leader 落后当超过这个数量的时候就判定该follower是失效的,就会踢出ISR,这种方式存在一个严重的缺陷,当qps持续上升,请求打满之后,很容易造成同步速率下降或者长时间无响应,进而导致很多follower被踢出ISR,尤其是在在流量高峰时期会很常见,但是我们无法准确的判断流量高峰的时间。
之后的版本,Kafka采用落后于消费进度的时间长度来判断是否踢出ISR,这样有效的避免了在突发流量偶然落后于leader被不合理的踢出ISR的情况,如果长时间落后于leader这种情况实际故障是需要去踢的也没问题,也就有效的避免了ISR的反复移进移出所带来的代价
broker offset大致分为:base offset、high watemark(HW)、log end offset(LEO)
base offset:起始位移,replica中第一次消息的offset
HW:副本高水位线,也就是副本中最新一条已提交消息的位移。高水位线及之前的数据都是已经提交了的,从理论上来说不会丢失的数据,也是消费者可以消费的数据,但高水位线并不是最新的一条消息,高水位线之后还有消息,这些消息就是暂时还未提交的,也就是说并不是每个ISR中的副本都已经写入了,这些消息是消费者暂时无法消费的。每个replica都有高水位线值,但仅仅leader中的HW才能作为标示信息。什么意思呢,就是说当按照参数标准成功完成消息备份(成功同步给follower replica后)才会更新HW的值,代表消息理论上已经不会丢失,可以认为“已提交”。
LEO:日志末端位移,也就是副本中下一条待写入消息的offset,注意并不是最后一条,高水位线到末端位移之间的消息就是暂时还未提交的消息。
LEO和HW的更新
在kafka中每个分区都一个自己的LEO,叫做local LEO,另外leader还有一个remote LEO,实际上就是所有副本的LEO列表。
注意kafka的消息同步是pull的模式,也就是说follower向leader发同步请求,leader收到请求之后发送同步数据,follower再接收同步数据。
如果是一个follower节点,当它接收到leader的数据,并且成功写入消息之后,自己的本地leo就会加1,然后接着发送同步请求给leader,这个请求上就带有自己最新的leo。
leader节点如果收到生产者的消息,自己成功写入之后,本地leo就会更新。那什么时候会更新remote leo呢?实际上就是leader节点收到follower的同步请求之后,根据请求中的leo更新remote leo列表,然后从remote leo中找出最小的leo,根据这个值,尝试更新自己的hw
follower如何更新hw呢?当follower接收到leader的同步数据,这个同步数据中包含了leader的hw,follower先写数据,写完之后就会根据同步数据中的hw和自己本地的leo比较,取较小的一个作为自己的hw值。
分区leader的选举是由kafka controller来控制的,Kafka会在Zookeeper上针对每个Topic维护一个ISR(in-sync replica,已同步的副本)的集合,如果某个分区的Leader不可用,Kafka就会根据配置的分区选择策略从ISR集合中选择一个副本作为新的Leader。通过ISR,kafka需要的冗余度较低,可以容忍的失败数比较高。假设某个topic有n+1个副本,kafka可以容忍n个服务器不可用。
在消息队列中有三种交付语义,分别是最多一次、至少一次、精确一次
最多一次(at most once)
最多一次是指保证消息最多只处理一次,但是消息存在丢失的可能性。一般存在于宁愿消息丢失也不愿意重复处理的场景。kafka不提供这种语义的保证
至少一次(at leaset once)
至少一次是指保证消息不会丢失,但是有可能会重复处理,这种语义适用于可以容许重复处理但不容许丢失数据的场景。kafka默认提供这种语义的服务。
精确一次(exactly once)
精确一次指消息不会丢失也不会重复处理,精确一次是比较难保障的一种语义,kafka在0.11版本之后支持了事务和幂等性,也就支持了精确一次的语义。
既然kafka能够提供至少一次和精确一次的交付语义,那kafka的消息可能丢失吗?当然可能,但并不是kafka自身设计的问题,而是配置和使用姿势的问题。
kafka为了得到更高的性能和吞吐量,不是立马将数据落盘,而是将数据异步批量的刷入磁盘中,这样可以减少刷盘的次数,从而提高效率。
但是如果数据还在页缓存中,没有刷盘整个机器就挂掉了,那么数据就丢失了。kafka没有提供同步刷盘的机制,所以理论上来说单个broker始终是存在丢失数据的风险的。
kafka通过producer和broker协同处理单个broker丢失消息的情况,实际上就是正确设置producer的acks参数,如果参数设置为0,producer不知道kafka是否真的接收到了数据就认为已经成功了,这个时候丢失数据的风险是最高的。如果参数设置为1,那么只需要leader副本写入数据就响应,producer就会认为消息写入成功了,但如果leader所在的broker机器挂掉了,就可能丢失数据,所以要将acks参数设置大于1,这样可以保证消息会同步到多个副本,从而来避免单个broker丢失数据的可能性。
另外还要正确设置min.insync.replicas参数,也就是最少的同步副本数量,如果没设置,那么一个分区的ISR列表可能为空,此时acks设为几都没有用。
还有一个unclean.leader.election.enable参数,这个参数是在分区的主副本挂掉,然后在ISR集合中没有副本可以成为leader的时候,控制要不要让进度比较慢的副本成为leader的。不用多说,让进度比较慢的副本成为leader,肯定是要丢数据的。虽然可能会提高一些可用性,但如果你的业务场景不能忍受丢失数据,那还是将unclean.leader.election.enable设置为false吧。
为了提升效率,减少IO,producer在发送数据时可以将多个请求进行合并后发送。被合并的请求都被缓存在本地buffer中。producer可以将请求打包成“块”或者按照时间间隔从buffer中的取出数据来发出。
但是如果消息在buffer中还没发出去,producer所在的进程就挂掉了,那么这些没发出去的消息就丢失了。这个时候要看消息是否可以重新生成,能够重启生成就没问题,但如果消息不能重新生成那是会丢失数据的。
另外producer使用同步模式或者带有回调的异步模式,这样是比较安全的,可以处理kafka返回的异常信息。如果仅仅使用异步模式也有可能丢失数据。
consumer出现消息丢失主要原因是没有正确提交offset,Consumer可以自动提交offset,也可以手动提交offset,但是不正确使用两种方式都可能会出现消息丢失,主要原因在于消息处理失败了,但是offset已经提交了,所以编程的时候一定要特别注意,一定要等到消息处理成功之后再提交offset。
exactly once即精确一次,要求一条消息既不能丢也不能重,这个要求实际上是很难的,kafka也只是针对生产者实现了exactly once的语义。那kafka是通过哪些机制来实现的呢?主要是幂等性的生产者和事务两种机制来实现的。
所谓就是指不管执行多少次,结果都是一致的,对于kafka来说就是不管重复生产了多少条消息,kafka中都只存储一条。在0.11.0后,kafka提供了让producer支持幂等的配置操作,也就是在配置producer的时候加上一条配置开启生产者的幂等功能。
props.put("enable.idempotence", ture)
要注意开启幂等性之后,生产者的acks就自动变成all了。如果这时候手动将ackss设置为0,那么会报错。
当开启幂等性功能之后,producer在初始化的时候会分配到一个唯一的producerId,也就是所谓的pid(通过zookeeper来分配)这个pid不会暴露给用户,是完全透明的,这个pid就是用来标识一个producer,除了pid,每条消息还会生成一个单调递增的sequenceNumber,也就是序列号,用来标识消息的顺序,这个序列号也会在borker中进行持久化,每个分区都会维护自己独立的序列号,分区可以通过这个序列号来保证消息没有丢失也没有重复,具体来说:
要注意的是生产者幂等性只能保证同一个生产者在同一个分区里的消息是幂等的,处理不了跨topic或者跨分区的情况,也处理不了跨会话的情况。
所以在幂等性的基础上,kafka又通过事务来保证跨分区、跨会话时的exactly once的语义
如果一个事务跨越了会话(一个producer挂掉又重启了,producer的pid会改变),正常情况下kafka服务端是没办法感知宕机前后两个不同会话的producer是否是在处理同一个事务的,所以kafka引入了一个transactionId,用来标识一个事务,这个transactionId是用户提供的,即使不同的会话,只要transactionId一样,kafka server就认为是在处理同一个事务。
为了应对两个使用相同TransactionalId的producer同时存在,kafka还引入了epoch的概念,也就是任期,保证对应一个TransactionalId只有一个活跃的producer epoch。
具体的事务过程:
事务总是从生产者提起的,生产者通过调用initTransactions方法初始化事务上下文,这个方法的第一件事就是去kafka集群找到负责当前事务的事务协调者(Transaction coordinator),kafka内部有一个特殊的topic,叫做__transaction_state,一个事务的事务协调者实际就是这个特殊主题的某个分区的分区首领,这个事务相关的消息都记录在TransactionId的hash值取模分区数的分区上,它的事务协调者也就是这个分区的分区首领。
找到事务协调者之后第二部是获取pid和对应的epoch,如果只是开启producer的幂等性,也需要获取pid,但是这个pid由任意一个broker生成,但是如果是事务中的pid,则由第一步找到的事务协调者生成,并且还会生成对应的epoch。如果事务协调者发现之前这个TransactionID的信息已经存在了,还要处理之前的事务状态,如果之前事务没有处理完,都会抛出异常。在这之后就可以真正开启一个新的事务了。
如果上面initTransactions方法没有抛出异常,那么就可以调用beginTransaction方法开启一个事务,调用这个方法之后,Producer本地会记录已经开启的事务,但是事务协调者只有在Producer发送第一条消息后才认为事务已经开启。
生产者可能向多个topic的多个分区发送数据,在给一个新的分区发送数据之前,它需要先给事务协调者发消息,让事务协调者记录下对应的分区信息到transaction log中,并且状态设置为begin,如果这是事务的第一个消息,还会启动事务的计时。
然后生产者就可以正常发送消息给分区了,这和不使用事务发送消息是一样的,只是消息中多增加了事务信息,也就是pid,epoch和sequence number。
在事务相关的所有消息都发送完毕之后,生产者就可以调用commitTransaction方法来提交事务了,如果中间发生了异常,也可以调用abortTransaction来丢弃整个事务。
无论是commitTransaction还是abortTransaction实际上都是发送一个EndTxnRequest,这个请求中会有一个字段来标识是commit还是abort,事务协调者收到这个请求之后,会将prepareCommit或者prepareAbort写入到__transaction_state的分区中,然后事务协调者向所有事务涉及到的分区的分区首领发送事务标记(TransactionMarker),等待事务标记都写完了之后(并不需要所有replica都回复,因为事务标记也是幂等的,可以重发),事务协调者就会在事务日志中写入最终的complete_commit或者complete_abort消息,这样一个事务就真正结束了。
Kafka引入了一个很重要概念,叫做LSO,即last stable offset。对于同一个TopicPartition,其offset小于LSO的所有事务消息的状态都已确定,要不就是committed,要不就是aborted。而broker对于提交读隔离级别的consumer,只提供offset小于LSO的消息,也就是说处于提交读隔离级别的consumer,只会拉取到已经提交的或者已经丢弃的消息。
那么consumer如何区分出哪些消息是被丢弃的呢?kafka集群中有一个aborted transaction index文件,里面记录了被丢弃的事务相关的信息(pid, first offset, last stable offset),consumer也可以拉取到事务丢弃信息,consumer会在内部维护一个获取的事务丢弃列表,经过比对就可以知道哪些消息是commit了,哪些消息是丢弃了的。
参考:kafka事务原理
Kafka的消息是保存或缓存在磁盘上的,一般认为在磁盘上读写数据是会降低性能的,因为寻址会比较消耗时间,但是实际上,Kafka的特性之一就是高吞吐率。
kafka,每个Topic被分成多个Partition,Partition又分为多个segment(段),Partition从物理上可以理解成一个文件目录,这个目录中有多个数据文件和索引文件,数据文件和索引文件是一一对应的,可以认为一个数据文件加对应的索引文件就是一个segment。
知道了kafka的实际物理存储结构之后我们再来看kafka是如何写入消息和读取消息的。
为了提高写入的效率,kafka采用了两种策略,一个是顺序写,一个是MMAP
我们知道磁盘的随机读写之所以慢是因为磁盘寻址是一个机械操作(寻道需要移动指针,还要旋转磁盘)但是顺序写减少了磁盘寻址的消耗,所以性能是非常好的,所谓顺序写就是始终在一个文件最后追加内容,不会修改和删除之前已经写入的内容。
mmap即Memory Mapped Files,又被称为内存映射文件,实际上就是将一段用户空间的内存映射到内核空间,当映射成功后,用户对这段内存区域的修改可以直接反映到内核空间,这种方式减少了用户态和内核态的数据拷贝得开销,同时这是一种直接操作机器内存的方式,不会影响到jvm的内存,减少了jvm full gc的情况。
这一部分的实现是使用了java nio中的MappedByteBuffer,可以直接操作对外内存。
读取消息首先需要找到消息,然后再将消息发送给消费者,那提升读的效率也要从这两方面入手。提高寻找消息的效率最直接的思路就是减少数据搜索的范围,所以需要用到索引。而提升将消息发送给消费者的效率就要提升网络io效率,所以采用零拷贝的技术。另外一个提升效率的措施就是减少网络传输的数据量,这就可以对数据进行压缩。
kafka中分段日志文件以日志文件中存储的消息的最小offset命名,那么从分区指定offset读取消息的时候就可以根据二分查找定位到这个offset在哪个分段文件,分段文件对应的索引文件中保存了offset和这个offset在分段文件中实际存储的位置,所以可以再次使用二分查找找到这个offset实际存储的位置。
当然为了控制索引文件的大小,并没有为数据文件中的每条消息都建立索引,而是采用了稀疏存储的方式,每隔一定字节的数据建立一条索引。这样避免了索引文件占用过多的空间,从而可以将索引文件保留在内存中。但缺点是没有建立索引的Message也不能一次定位到其在数据文件的位置,从而需要做一次顺序扫描,但是这次顺序扫描的范围就很小了。
所谓零拷贝并不是真的不进行任何数据复制,而是指IO操作的时候尽量避免在用户态与内核态之间来回拷贝数据。
在NIO中FileChannel的transferTo()和TransferFrom()方法实现了零拷贝的功能,实际上这两个方法的底层都是调用linux中独有的指令sendfile来完成拷贝工作,sendfile会直接将数据从内核缓冲区拷贝到socket缓冲区,这个拷贝的过程不需要经过用户态。
为了提升网络开销就要尽量减少网络传输的数据量,可以考虑对数据进行压缩,Kafka 对数据提供了:Gzip和Snappy压缩协议等压缩协议,对消息结构体进行了压缩,减少了数据传输的消耗。