[关闭]
@xtccc 2016-01-07T10:39:51.000000Z 字数 4175 阅读 5452

Producer API

给我写信
GitHub

此处输入图片的描述

Kafka


参考链接:

本文适用于Kafka 0.8.2.0



目录


1. 基本要素


Kafka producer产生消息时,需要确定以下几点:




2. 产生不含key的消息


下面将产生不含key、且不指定partition number的消息,消息的类型为字符串。

例1
QQ20160104-11@2x.png-259.5kB


其中,“bootstrap.servers”并不需要全部的Kafka brokers,请看如下对该参数的解释

A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).




3. Key and Partitioner


对于一个Topic,它的parition数量是固定的,除非人为地去调整。Partition id从0开始计算。
下面将产生含key的消息,且根据Key计算partition number 。

QQ20160105-8@2x.png-196.7kB

运行结果为
QQ20160105-7@2x.png-169.7kB



4. send的Callback


4.1 Callback Interface

通过send方法向topic发送一个ProducerRecord时,可以为该消息指定一个callback。当发送的消息被响应后(这里的响应指的是Kafka brokers对收到该消息的确认响应,而不是指存在某个consumer消费了该消息),这个回调方法会被立即调用一次。

这个回调方法必须实现接口 org.apache.kafka.clients.producer.Callback

在下面的例子中,我们将不间断地产生10条消息发送,并为每一条消息设置一个callback。在每个callback中,会打印出与该消息相关的元数据,并阻塞3秒钟。

在运行这个Producer程序时,我们并没有运行针对“topic-B”的任何的consumer。
QQ20160105-11@2x.png-287.6kB

运行结果如下:
QQ20160105-2@2x.png-283.4kB

问题:这里的offset指的是什么?


4.2 Callback方法的执行


4.3 send callback中的异常处理

如果Callback中发生了异常,会怎么样?该怎样处理?

下面,我们在Callback方法中制造 “divide by zero” 异常:
QQ20160105-12@2x.png-195.9kB


运行结果:
QQ20160105-13@2x.png-185.8kB


可见:


那么问题来了

1. 为什么callback线程中抛出异常时,Producer所在的主线程没有打印出异常信息?

查看org.apache.kafka.common.utils.KafkaThread的源码,就可以知道,callback中抛出的异常都被打印到日志中去了,不会向上传递到主线程中。

QQ20160106-15@2x.png-121.3kB



2. 如果希望自定义对callback异常的处理,而不是采用默认的行为,该怎么办?

在定义callback时,添加处理异常的 try{···} catch{···} 代码块即可
QQ20160106-16@2x.png-250.1kB

运行结果为
QQ20160106-17@2x.png-139.7kB


4.4 Callback Thread 与 Producer Thread的关系

Callback thread与producer thread的关系是:



下面验证
QQ20160106-0@2x.png-312.6kB

运行结果
QQ20160106-1@2x.png-321.8kB




5. send的返回结果 - Future


5.1 Asyc send

除了依靠send的callback方法外,我们还可以通过send返回的结果Future[RecordMetadata]来异步地对被发送消息的响应结果进行处理。

QQ20160105-3@2x.png-217.7kB

运行结果
QQ20160105-4@2x.png-284.2kB


5.2 Sync send

send也可以以blocking的方式来调用,如下
QQ20160106-6@2x.png-174.7kB

运行结果
QQ20160106-7@2x.png-297.6kB





6. Acknowledgement


在传递给KafkaProducer的配置参数中,有一个参数acks,它可以取以下三种值:





7. Serializer





8. 获取关于Topic的信息


通过KafkaProducer实例,可以取得关于某个Topic的元数据。

QQ20160105-5@2x.png-141.3kB

运行结果
QQ20160105-6@2x.png-66.9kB




9. 配置项的变动


有些在过去的版本中存在配置,现在已经改变了。

旧配置 新配置 变动版本
metadata.broker.list bootstrap.servers
request.required.acks acks




添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注