@xtccc
2016-01-07T10:39:51.000000Z
字数 4175
阅读 5439
Kafka
参考链接:
本文适用于Kafka 0.8.2.0
目录
Kafka producer产生消息时,需要确定以下几点:
Kafka cluster
首要的事,就是找到target Kafka cluster
topic
产生的消息将发送给哪个topic
key
消息的key,当然,一条消息也可以不含key
key serializer
用于序列化key的serialier
value
消息的value,一条消息必须有value
value serializer
用于序列化消息value的serializer
partition number
消息将被发往指定topic的哪个partition。如果存在key但是没有指定parition number,则将使用 hash(key) 作为消息的partition number;如果消息既不含key也未指定partition number,则将按照round-robin的方式确定partition number。
acknowledgement
Kafka的brokers将是否已经确认了被发送的消息,是全部的brokers都确认收到该消息了,还是只有部分brokers确认收到该消息。
下面将产生不含key、且不指定partition number的消息,消息的类型为字符串。
例1
其中,“bootstrap.servers”并不需要全部的Kafka brokers,请看如下对该参数的解释
A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).
对于一个Topic,它的parition数量是固定的,除非人为地去调整。Partition id从0开始计算。
下面将产生含key的消息,且根据Key计算partition number 。
运行结果为
通过send
方法向topic发送一个ProducerRecord
时,可以为该消息指定一个callback。当发送的消息被响应后(这里的响应指的是Kafka brokers对收到该消息的确认响应,而不是指存在某个consumer消费了该消息),这个回调方法会被立即调用一次。
这个回调方法必须实现接口 org.apache.kafka.clients.producer.Callback
。
在下面的例子中,我们将不间断地产生10条消息发送,并为每一条消息设置一个callback。在每个callback中,会打印出与该消息相关的元数据,并阻塞3秒钟。
在运行这个Producer程序时,我们并没有运行针对“topic-B”的任何的consumer。
运行结果如下:
问题:这里的offset
指的是什么?
callback
与send
是异步执行的
10条消息的`send`是连续地被执行的,且没有任何租塞。`callback`是在`send`之后(消息被响应后)才执行。
虽然`send`不会因为等待`callback`而阻塞,但是如果Kafka集群出现问题(例如关闭了全部的brokers),那么`send`就无法顺利地进行下去。
同一个partition内消息间的callback
是有序调用的
如果两个`ProducerRecord`被发送给了topic的同一个partition,那么这两条消息的callback将按照消息的发送顺序被调用。例如,在上面的结果中,任何一个partition内,多条消息的callback都是间隔3秒被依次调用的。
不同partition间消息的callback
调用顺序无法保证
上面的运行结果中,所有的消息的`callback`都是间隔3秒执行的,为什么?
Callback在 Producer I/O 线程中运行
Callback的运行必须足够地快,否则会拖累其他线程发送消息的速度。如果要在Callback中运行较为耗时的操作,用户应该在Callback开启新的线程 (e.g., `java.util.concurrent.Executor`)。
消息发送队列
在执行`send`时,Producer会将消息放入一个队列中等待发送,这个队列的大小由“total.memory.size”决定,如果`send`调用的速度大于I/O线程将消息传输给brokers的速度,那么这个队列的可用空间最终将被耗尽。此时,默认的处理规则是阻塞对`send`的调用,直到队列获得了足够的可用空间;如果“block.on.buffer.full”被设置为“false”,那么producer会直接抛出异常。
如果Callback中发生了异常,会怎么样?该怎样处理?
下面,我们在Callback方法中制造 “divide by zero” 异常:
运行结果:
可见:
那么问题来了
1. 为什么callback线程中抛出异常时,Producer所在的主线程没有打印出异常信息?
查看
org.apache.kafka.common.utils.KafkaThread
的源码,就可以知道,callback中抛出的异常都被打印到日志中去了,不会向上传递到主线程中。
2. 如果希望自定义对callback异常的处理,而不是采用默认的行为,该怎么办?
在定义callback时,添加处理异常的 try{···} catch{···} 代码块即可
运行结果为
Callback thread与producer thread的关系是:
下面验证
运行结果
除了依靠send
的callback方法外,我们还可以通过send
返回的结果Future[RecordMetadata]
来异步地对被发送消息的响应结果进行处理。
运行结果
send也可以以blocking的方式来调用,如下
运行结果
在传递给KafkaProducer
的配置参数中,有一个参数acks
,它可以取以下三种值:
acks = 0
如果“acks”为“0”,那么producer不会等待任何broker的响应 —— 消息会被放入buffer,并被producer认为已经发送成功。
这种情况下:
1)无法保证消息真的已经被brokers收到了;
2)“retries”配置不起作用(因为客户端无法知道消息发送失败的情况);
3)每一个record的“offset”都将是“-1”
acks = 1
如果“acks”为“1”,那么leader将会把消息数据写入自己的本地日志,然后就向producer发送响应,告诉producer这条消息已经被Kafka cluster成功接收,而不会等待来自其他全部followers的响应。
在这种情况下:
1) record返回的“offset”是正常的数据
2) 如果在leader响应了producer之后、followers复制该消息数据之前的这段时间内leader崩溃了,那么该消息就丢失了。
acks = all
如果“acks”为“all”,那么leader将等待所有的in-sync replicas的确认之后,才会向producer回送消息已发送成功的响应。
在这种情况下:
1)record返回的“offset”是正常的数据
2)只要存在一个in-sync replica正常工作,那么这条消息就不会丢失。
通过KafkaProducer
实例,可以取得关于某个Topic的元数据。
运行结果
有些在过去的版本中存在配置,现在已经改变了。
旧配置 | 新配置 | 变动版本 |
---|---|---|
metadata.broker.list | bootstrap.servers | |
request.required.acks | acks |