[关闭]
@xtccc 2016-01-10T08:46:28.000000Z 字数 17831 阅读 9054

Consumer API

给我写信
GitHub

此处输入图片的描述


Kafka


参考:



目录


1. 关于 High Level Consumer


使用High Level Consumer API,我们可以将精力集中在取到数据上,而不用关心取回数据的offset。High Level Consumer在从某个给定partition中取回一条消息时,会将该消息的offet存储在ZooKeeper中,对该offset的存储是基于该consumer的Consumer Group Name。

当一个consumer以某个Consumer Group Name启动时(并指定一个topic),Kafka会将该consumer线程加入到可以消费该topic的线程队列中,并且会触发“re-balance”。在“re-balance”的过程中,Kafka会对partitions与threads的对应关系进行调整,也许会将一个partition对应到一个新的thread。




2. 设计High Level Consumer


在创建一个High Level Consumer时,它应该是一个多线程的模型,且多线程模型须围绕目标Topic的partitions数量来规划。对于给定的Topic而言:




3. High Level Consumer API 使用实例


  1. /**
  2. * Created by tao on 6/24/15.
  3. * 这是一个简单的High Level Consumer Client
  4. */
  5. public class ConsumerA implements Runnable {
  6. public String title;
  7. public KafkaStream<byte[], byte[]> stream;
  8. public ConsumerA(String title, KafkaStream<byte[], byte[]> stream) {
  9. this.title = title;
  10. this.stream = stream;
  11. }
  12. @Override
  13. public void run() {
  14. System.out.println("开始运行 " + title);
  15. ConsumerIterator<byte[], byte[]> it = stream.iterator();
  16. /**
  17. * 不停地从stream读取新到来的消息,在等待新的消息时,hasNext()会阻塞
  18. * 如果调用 `ConsumerConnector#shutdown`,那么`hasNext`会返回false
  19. * */
  20. while (it.hasNext()) {
  21. MessageAndMetadata<byte[], byte[]> data = it.next();
  22. String topic = data.topic();
  23. int partition = data.partition();
  24. long offset = data.offset();
  25. int key = bytes2Int(data.key()); /** key和value都是bytes */
  26. String msg = new String(data.message()); /** 需要转换 */
  27. System.out.println(String.format(
  28. "Consumer: [%s], Topic: [%s], PartitionId: [%d],
  29. Offset: [%d], Key: [%d], msg: [%s]" ,
  30. title, topic, partition, offset, key, msg));
  31. }
  32. System.err.println(String.format("Consumer: [%s] exiting ...", title));
  33. }
  34. /** 从bytes 转换到int */
  35. public int bytes2Int(byte[] bytes) {
  36. if (null == bytes || 4 != bytes.length)
  37. throw new IllegalArgumentException("invalid byte array");
  38. return java.nio.ByteBuffer.wrap(bytes).getInt();
  39. }
  40. }
  1. // main方法
  2. Properties props = new Properties();
  3. props.put("group.id", "gid-1");
  4. props.put("zookeeper.connect", "ecs2:2181,ecs3:2181,ecs4:2181/kafka");
  5. props.put("auto.offset.reset", "largest");
  6. props.put("auto.commit.interval.ms", "1000");
  7. props.put("partition.assignment.strategy", "roundrobin");
  8. ConsumerConfig config = new ConsumerConfig(props);
  9. String topic = "topic-C";
  10. /** 创建 `kafka.javaapi.consumer.ConsumerConnector` */
  11. ConsumerConnector consumerConn =
  12. kafka.consumer.Consumer.createJavaConsumerConnector(config);
  13. /** 设置 map of (topic -> #streams ) */
  14. Map<String, Integer> topicCountMap = new HashMap<>();
  15. topicCountMap.put(topic, 3);
  16. /** 创建 map of (topic -> list of streams) */
  17. Map<String, List<KafkaStream<byte[], byte[]>>> topicStreamsMap =
  18. consumerConn.createMessageStreams(topicCountMap);
  19. /** 取出 `topic-B` 对应的 streams */
  20. List<KafkaStream<byte[], byte[]>> streams = topicStreamsMap.get(topic);
  21. /** 创建一个容量为3的线程池 */
  22. ExecutorService executor = Executors.newFixedThreadPool(3);
  23. /** 创建3个consumer threads */
  24. for (int i = 0; i < streams.size(); i++)
  25. executor.execute(new ConsumerA("消费者" + (i + 1), streams.get(i)));
  26. /** 给3个consumer threads 60秒的时间读取消息,然后令它们停止读取消息
  27. * 要先断开ConsumerConnector,然后再销毁consumer client threads
  28. *
  29. * 如果不调用`consumerConn.shutdown()`,那么这3个消费者线程永远不会结束,
  30. * 因为只要ConsumerConnector还在的话,consumer会一直等待新消息,不会自己退出
  31. * 如果ConsumerConnector关闭了,那么consumer中的`hasNext`就会返回false
  32. * */
  33. Thread.sleep(60*1000);
  34. consumerConn.shutdown();
  35. /** 给3个consumer threads 5秒的时间,让它们将最后读取的消息的offset保存进ZK
  36. * */
  37. Thread.sleep(5*1000);
  38. executor.shutdown();

如果启动一个Producer,让它向“topic-C”发送16条消息,每条消息的key是整数,value是key的字符串形式,那么上面的Consumer代码的运行结果为:
QQ20160107-5@2x.png-289.4kB


在Consumer收到的消息中,key和value都是byte array,因此需要对这些字节重新解码。

我们也可以在创建message streams时为key和value提供decoder,这样让consumer直接得到解码后的数据。

  1. /** 在这里可以为key和value直接提供各自的 decoder,
  2. * 这样consumer thread 收到的解码后的数据
  3. * */
  4. Map<String, List<KafkaStream<Integer, String>>> topicStreamsMap =
  5. consumerConn.createMessageStreams(topicCountMap, new IntDecoder(), new StringDecoder());
  6. // 下面是两个解码器
  7. public static class IntDecoder implements Decoder<Integer> {
  8. @Override
  9. public Integer fromBytes(byte[] bytes) {
  10. if (null == bytes || bytes.length != 4)
  11. throw new IllegalArgumentException("Invalid bytes");
  12. return java.nio.ByteBuffer.wrap(bytes).getInt();
  13. }
  14. }
  15. public static class StringDecoder implements Decoder<String> {
  16. @Override
  17. public String fromBytes(byte[] bytes) {
  18. if (null == bytes )
  19. throw new IllegalArgumentException("Null bytes");
  20. return new String(bytes);
  21. }
  22. }


3.1 High Level Consumer配置



还有其他的很多配置,请见 Consumer Configs


3.2 获取给定Topic的KafkaStream

每个KafkaStream代表一个消息流,里面的消息来自一个或者多个partitions(但是1个partition的消息只会流向1个KafkaStream),每1个KafkaStream应该由1个线程处理。

对于每一个KafkaStream,它将一直等待接收新的消息,每收到一条消息,it.hasNext()将会返回true。


3.3 正确地结束Consumer

Kafka并不是每读取一条消息就将该消息的offset写入到ZK中,而是每隔一段时间(由“auto.commit.interval.ms”控制)才将最后读取的消息的offset写入到ZK中。因此,如果简单地让Consumer直接退出,那么已被读取的消息的offset可能不会被更新到ZK中,这会造成下一次启动Consumer时读取到已被读过的消息。

造成Consumer读取到已被读过的消息,还有两种可能的原因:

  1. 丢失了一个broker
  2. 某个partition的leader发生变化

因此,需要一种优雅的方法结束Consumer。

我们采用的方法是:首先调用ConsumerConnector.shutdown(),这样,所有的KafkaStreams都不能继续消费数据;然后等待几秒钟,待Kafka将这些streams消费的最后一条消息的offset写入到ZK中之后,再调用ExecutorService.shutdown()命令线程池销毁所有已经运行完毕的线程。


3.4 处理Consumer Thread中的异常

当Consumer thread中发生异常时,我们需要捕获该异常并进行处理。怎样捕获该异常取决与Consumer Thread是怎样被提交的。




4. 相关异常


4.1 ConsumerRebalanceFailedException

在启动一个Consumer时,报错:

kafka.common.ConsumerRebalanceFailedException: gid-1_ecs1.njzd.com-1452137308525-b23962d3 can't rebalance after 4 retries

这个原因往往是因为:在启动Consumer(不妨成为c1)时,Kafka试图进行“re-balance”,实际上已经有另外一个Consumer(不妨称其为c2)在运行了,,而且c1与c2有相同的consumer group name。

触发“re-balance”的一个原因为:···

这时可以查看ZK中的数据:

QQ20160107-1@2x.png-48kB

只要将c2关掉,即可正常运行c1.
关掉c2之后,可以看到ZK中的数据如下:

QQ20160107-2@2x.png-30.7kB




5. 关于Simple Consumer API


使用High Level Consumer API,我们可以不关心offset等细节,直接从topic中读取数据。但在有些场景中,我们要更精确地控制读取数据的细节:

  1. 对同一条消息读取多次
  2. 只读取部分partitions中的消息
  3. 确保对每条消息处理1次且仅1次


使用Simple Consumer API要比High Level Consumer API更加复杂,这表现在:

  1. 用户需要自己保存offset
  2. 用户必须知道给定(topic, partition)的leader是哪一个broker
  3. 用户需要自己处理leader broker的切换




6. 使用Simple Consumer API


使用Simple Consumer API的步骤如下:

  1. 向一个active broker发起查询,找出哪一个broker是目标(topic, partition)的leader和replica brokers
  2. 确定从哪个offset开始读取消息
  3. 取回数据
  4. 检测leader的变动,并做出相应的调整


6.1 为(topic, partition)找到leader broker与replica brokers

  1. public class SimpleConsumerUtil {
  2. /**
  3. * 为给定的(topic, partition)找到它的leader broker
  4. * seedBrokers并不一定要是全部的brokers,只要包含一个live broker,
  5. * 通过这个live broker去查询Leader broker的元数据即可
  6. * */
  7. public PartitionMetadata
  8. findLeader(List<String> seedBrokers, int port, String topic, int partition) {
  9. PartitionMetadata retMeta = null;
  10. /** 循环地查询 */
  11. loop:
  12. for (String broker : seedBrokers) { /** 按照每一个 broker 循环*/
  13. SimpleConsumer consumer = null;
  14. try {
  15. /** 连接到不同的broker时,需要对该broker创建新的`SimpleConsumer`实例 */
  16. consumer = new SimpleConsumer(broker, port, 100*1000, 64*1024,
  17. "leaderLookupClient");
  18. List<String> topics = Collections.singletonList(topic);
  19. TopicMetadataRequest req = new TopicMetadataRequest(topics);
  20. /** Fetch metadata for a sequence of topics, which
  21. * returns metadata for each topic in the request */
  22. TopicMetadataResponse resp = consumer.send(req);
  23. /** 在某个broker上按照每一个topic循环,并检查一个topic中的每一个partition */
  24. for (TopicMetadata topicMeta : resp.topicsMetadata()) {
  25. for (PartitionMetadata parMeta : topicMeta.partitionsMetadata()) {
  26. if (parMeta.partitionId() == partition) {
  27. retMeta = parMeta; /** 根据 partition id 进行匹配,找到了就退出 */
  28. break loop;
  29. }
  30. }
  31. }
  32. } catch (Exception ex) {
  33. System.err.println("Error: 在向Broker “" + broker +
  34. "” 询问 (Topic=“" + topic + "”, Partition=“" + partition
  35. + "”) 的Leader Broker 时发生异常,原因为: \n" + ex.toString());
  36. } finally {
  37. if (null != consumer)
  38. consumer.close(); /** 关闭连接到该broker的`SimpleConsumer`*/
  39. }
  40. } // end => for (String broker : seedBrokers)
  41. return retMeta;
  42. }
  43. }



下面我们测试这个方法是否能够找出(Topic = “topic-C”, Partition=0)的partition metadata。

  1. /**
  2. * 测试`SimpleConsumerUtil#findLeader`方法的使用
  3. * 找到(Topic=“topic-C”, Partition=0)当前的leader partition
  4. * */
  5. public static void TestFindLeader() {
  6. SimpleConsumerUtil consumer = new SimpleConsumerUtil();
  7. List<String> seedBrokers = new ArrayList<>();
  8. seedBrokers.add("ecs1.njzd.com");
  9. seedBrokers.add("ecs3.njzd.com");
  10. PartitionMetadata parMeta = consumer.findLeader(seedBrokers, 9092, "topic-C", 0);
  11. System.out.println("Partition metadata for (Topic=“topic-C”, Partition=“0”) is : \n"
  12. + parMeta + "\n");
  13. }



运行结果为:
QQ20160108-2@2x.png-74.8kB



我们可以查询(Topic=“topic-C”, Partition=“0”)的真实metadata是什么样的,看看上面的代码查询出来的结果对不对。
QQ20160108-1@2x.png-97.1kB


6.2 确定从哪个offset开始读取消息

使用Simple Consuer API,读取消息时要根据(Topic, Partition, Offset)这个坐标来获取消息,因此,我们需要确定给定(Topic, Partition)的offset,才能进行消息的读取。

在查询时,只能向目标(topic, partition)的leader partition(leader broker)进行查询,不能向replica partitions(follower brokers)进行查询。

  1. public class SimpleConsumerUtil {
  2. /**
  3. * 确定从(topic, partition)的什么地方开始读取消息,即确定offset
  4. * Kafka提供了2个常量:
  5. * `kafka.api.OffsetRequest.EarliestTime()`找到log data的开始时间
  6. * `kafka.api.OffsetRequest.LatestTime()`是最新的时间
  7. *
  8. * 最旧的offset并不一定是0,因为随着时间的推移,部分数据将被从log中移除
  9. *
  10. *
  11. * 这里传入的参数`consumer`的host必须是(topic, partition)的leader,
  12. * 否则会报错,错误代码为6
  13. *
  14. * 各种失败代码的含义可以查询这里:kafka.common.ErrorMapping
  15. * */
  16. public long
  17. getLastOffset(String leaderHost, int port, String clientId,
  18. String topic, int partition, long time) {
  19. /** 用leader host 创建一个SimpleConsumer */
  20. SimpleConsumer consumer =
  21. new SimpleConsumer(leaderHost, port, 100*1000, 64*1024, clientId);
  22. TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
  23. Map<TopicAndPartition, PartitionOffsetRequestInfo> reqInfo = new HashMap<>();
  24. reqInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(time, 1));
  25. OffsetRequest req =
  26. new OffsetRequest(reqInfo, kafka.api.OffsetRequest.CurrentVersion(), clientId);
  27. OffsetResponse resp = consumer.getOffsetsBefore(req);
  28. consumer.close();
  29. /** 处理失败 */
  30. if (resp.hasError()) {
  31. short errorCode = resp.errorCode(topic, partition);
  32. System.err.println("为 (Topic=“" + topic + "”, Partition=“" + partition +
  33. "”, Time=“" + time + "”) 查询 offset 失败,失败代码为:" + errorCode +
  34. ", 失败原因为: " + ErrorMapping.exceptionFor(errorCode));
  35. return -1;
  36. }
  37. /** 为什么这里返回的是数组?数组中会有几个数据?
  38. * 经过实际的测试,里面只有1个数据 */
  39. long[] offsets = resp.offsets(topic, partition);
  40. return offsets[0];
  41. }
  42. }



下面测试:查询(Topic=“topic-C”, Partition=“1”)的最老offset、最新offset和当前offset:

  1. /***
  2. * 测试`ConsumerB#getLastOffset`方法的使用
  3. * 找出(Topic=“topic-C”, Partition=1)的最老、最新、当前的offset
  4. * */
  5. public static void TestGetLastOffset() {
  6. List<String> brokers = new ArrayList<>();
  7. brokers.add("ecs1");
  8. brokers.add("ecs4");
  9. int port = 9092;
  10. String topic = "topic-C";
  11. int partition = 1;
  12. /**
  13. * 首先查询出(Topic=“topic-C”, Partition=“1”)的leader broker's hostname
  14. * */
  15. SimpleConsumerUtil util = new SimpleConsumerUtil();
  16. PartitionMetadata parMeta = util.findLeader(brokers, port, topic, partition);
  17. String leaderHost = parMeta.leader().host();
  18. /** 最老的offset(依然在log中的msg)*/
  19. long earliestOffset = util.getLastOffset(leaderHost, port, "Client - 查询offset",
  20. topic, partition, OffsetRequest.EarliestTime());
  21. /** 最新的offset */
  22. long latestOffset = util.getLastOffset(leaderHost, port, "Client - 查询offset",
  23. topic, partition, OffsetRequest.LatestTime());
  24. /** 当前的offset */
  25. long currentOffset = util.getLastOffset(leaderHost, port, "Client - 查询offset",
  26. topic, partition, System.currentTimeMillis());
  27. System.out.println("(Topic=“" + topic + "”, Partition=“" + partition +
  28. "”) 的leader host是" + leaderHost);
  29. System.out.println("(Topic=“" + topic + "”, Partition=“" + partition +
  30. "”) 中的offsets: " +
  31. "\n\tEarliest Offset: " + earliestOffset +
  32. "\n\tLatest Offset: " + latestOffset +
  33. "\n\tCurrent Offset: " + currentOffset);
  34. }



运行的结果为:
QQ20160108-3@2x.png-60.5kB


6.3 寻找新的leader broker

如果某个(topic, partition)的leader broker崩溃了,那么ZooKeeper会检测到这种情况的发生,并重新为它分配一个new leader。

我们在使用Simple Consumer API时,需要能够处理这种问题的发生,并查询出新的leader broker。

  1. public class SimpleConsumerUtil {
  2. /**
  3. * 当原来的leader崩溃后,找出新的leader
  4. * */
  5. public Broker
  6. findNewLeader(String oldLeader, List<String> replicaBrokers, int port,
  7. String topic, int partition) {
  8. /**
  9. * 最多尝试5次,如果还找不到则查询new leader失败
  10. * */
  11. for (int loop = 0; loop < 5; loop++) {
  12. boolean willSleep = false;
  13. PartitionMetadata parMeta = findLeader(replicaBrokers, port, topic, partition);
  14. if (null == parMeta || null == parMeta.leader())
  15. willSleep = true;
  16. /**
  17. * 如果leader broker 崩溃,ZooKeeper会探测到这个情况的发生,并重新分配一个new leader broker
  18. * 这个过程需要很短的时间
  19. * 如果在第一次循环中,发现 new leader broker 等于 old leader broker,
  20. * 则睡眠1秒钟给ZK进行调整,并重新尝试查询new leader
  21. * **/
  22. else if (oldLeader.equalsIgnoreCase(parMeta.leader().host()) && 0 == loop) {
  23. willSleep = true;
  24. } else {
  25. return parMeta.leader();
  26. }
  27. /**
  28. * 睡眠1秒钟,给Kafka和ZooKeeper进行调整(failover)等
  29. * */
  30. if (willSleep)
  31. try {
  32. Thread.sleep(1000);
  33. } catch (InterruptedException e) {
  34. e.printStackTrace();
  35. }
  36. }
  37. // 查询失败
  38. return null;
  39. }
  40. }



测试这个方法的使用:

  1. /**
  2. * 测试`ConsumerB#findNewLeader`方法的使用
  3. *
  4. * 测试方法:先找出一个leader broker,然后将该broker关掉,
  5. * 再使用`ConsumerB#findNewLeader`方法找出新的leader,
  6. * 检查能否正确地找出新leader
  7. *
  8. * */
  9. public static void TestFindNewLeader() {
  10. /**
  11. * 首先找到(Topic=“topic-C”, Partition=2)的leader broker与replica brokers
  12. * */
  13. SimpleConsumerUtil consumer = new SimpleConsumerUtil();
  14. List<String> seedBrokers = new ArrayList<>();
  15. seedBrokers.add("ecs1.njzd.com");
  16. seedBrokers.add("ecs3.njzd.com");
  17. int port = 9092;
  18. String topic = "topic-C";
  19. int partition = 2;
  20. PartitionMetadata parMeta = consumer.findLeader(seedBrokers, port, topic, partition);
  21. String leaderHost = parMeta.leader().host();
  22. List<Broker> replicas = parMeta.replicas();
  23. List<String> replicaHosts = new ArrayList<String>();
  24. for (Broker broker : replicas)
  25. replicaHosts.add(broker.host());
  26. // 打印出当前的leader broker
  27. System.out.println("leader broker is " + leaderHost +
  28. "\nreplica brokers are : ");
  29. for (Broker replica : replicas)
  30. System.out.println(replica.host() + " ");
  31. // 开始120秒倒计时,请杀掉当前的leader broker
  32. System.out.println("\n将在60秒后查找新的leader broker,在此期间请杀掉当前leader broker\n");
  33. for (int i = 1; i <= 60; i++) {
  34. System.out.print("\r");
  35. for (int j = 1; j <= i; j++)
  36. System.out.print("=");
  37. System.out.print("> " + i);
  38. try {
  39. Thread.sleep(1000);
  40. } catch (InterruptedException e) {
  41. e.printStackTrace();
  42. }
  43. }
  44. System.out.println("\n我已醒来,正在查找新的leader broker");
  45. /**
  46. * 下面开始调用 `findNewLeader` 找出新的leader broker
  47. * */
  48. Broker newLeader =
  49. consumer.findNewLeader(leaderHost, replicaHosts, port, topic, partition);
  50. System.out.println("新的leader broker 为 " + newLeader.host());
  51. }

运行结果:
QQ20160108-4@2x.png-116.6kB


6.4 读取数据

6.4.1 读取方法

读取数据时,要指定(topic, partition, offset, size)这4个参数。

  1. public class SimpleConsumerUtil {
  2. /**
  3. * 从Kafka中读取消息
  4. * 这个函数中,我们假定读取的消息的key是Int, value是String
  5. * 每收到一条消息,会将它的内容在控制台上显示出来
  6. * */
  7. public void
  8. readData(List<String> seedBrokers, int port, String clientId,
  9. String topic, int partition, long reqOffset, int fetchSize) {
  10. System.out.println("[Info]: 开始读取数据\n");
  11. /** 首先查询(topic, partition)的leader broker */
  12. PartitionMetadata parMeta = findLeader(seedBrokers, port, topic, partition);
  13. String leaderHost = parMeta.leader().host();
  14. List<Broker> replicaBrokers = parMeta.replicas();
  15. /** 为这个leader partition 创建一个SimpleConsumer */
  16. SimpleConsumer consumer =
  17. new SimpleConsumer(leaderHost, port, 100*1000, 64*1024, clientId);
  18. /**
  19. * 处理错误响应,最多重试5次
  20. * **/
  21. FetchResponse resp;
  22. int numErrors = 0;
  23. while (true) {
  24. /** 创建FetchRequest,并利用SimpleConsumer获取FetchResponse */
  25. resp = consumer.fetch(
  26. new FetchRequestBuilder()
  27. .clientId(clientId)
  28. .addFetch(topic, partition, reqOffset, fetchSize)
  29. .build());
  30. if (resp.hasError()) {
  31. numErrors++;
  32. short errorCode = resp.errorCode(topic, partition);
  33. String errorMsg = ErrorMapping.exceptionFor(errorCode).toString();
  34. System.err.println("[Error]: 请求获取消息时出现错误: " +
  35. "\n\t目标lead broker为 " + consumer.host() +
  36. ", 错误代码为 " + errorCode +
  37. ", 错误原因为 " + errorMsg);
  38. if (5 == numErrors) {
  39. System.err.println("错误次数达到5,不再重试,退出!");
  40. consumer.close();
  41. return;
  42. }
  43. /** 这种错误不需要重新寻找leader partition */
  44. if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
  45. reqOffset = getLastOffset(leaderHost, port, clientId,
  46. topic, partition, kafka.api.OffsetRequest.LatestTime());
  47. System.err.println(
  48. "[Error]: request offset 超出合法范围,自动调整为" + reqOffset);
  49. continue; // 用新的offset重试
  50. }
  51. /** 用新的leader broker来创建SimpleConsumer */
  52. else {
  53. List<String> replicaHosts = new ArrayList<String>();
  54. for (Broker broker : replicaBrokers)
  55. replicaHosts.add(broker.host());
  56. leaderHost = findNewLeader(leaderHost, replicaHosts,
  57. port, topic, partition).host();
  58. /**
  59. * 用新的leader来创建一个SimpleConsumer
  60. * */
  61. consumer.close();
  62. consumer =
  63. new SimpleConsumer(leaderHost, port, 100*1000, 64*1024, clientId);
  64. }
  65. } else {
  66. break; /** 没有发生错误则直接跳出循环 */
  67. }
  68. } // End => while (true)
  69. consumer.close();
  70. /**
  71. * 开始真正地读取消息
  72. */
  73. long numRead = 0;
  74. for (MessageAndOffset data : resp.messageSet(topic, partition)) {
  75. /** 要确保:实际读出的offset 不小于 要求读取的offset。
  76. * 因为如果Kafka对消息进行压缩,那么fetch request将会返回whole compressed block,
  77. * 即使我们要求的offset不是该 whole compressed block的起始offset。
  78. * 这可能会造成读取到之前已经读取过的消息。
  79. * */
  80. long currentOffset = data.offset();
  81. if (currentOffset < reqOffset) {
  82. System.err.println("[Error]: Current offset = " + currentOffset +
  83. ", Requested offset = " + reqOffset + ", Skip. ");
  84. continue;
  85. }
  86. /** `nextOffset` 向最后被读取的消息发起询问:“下一个offset的值是什么?” */
  87. long nextOffset = data.nextOffset();
  88. /** Message结构中包含: bytes, key, codec, payloadOffset, payloadSize */
  89. Message msg = data.message();
  90. ByteBuffer keyBuf = msg.key();
  91. byte[] key = new byte[keyBuf.limit()];
  92. keyBuf.get(key);
  93. ByteBuffer payload = msg.payload();
  94. byte[] value = new byte[payload.limit()];
  95. payload.get(value);
  96. System.out.printf("消息$%-2d , offset=%-2d , nextOffset=%-2d" +
  97. " , key=%-2d , value=%s \n",
  98. numRead+1, currentOffset, nextOffset, bytes2Int(key), bytes2Str(value));
  99. numRead++;
  100. }
  101. System.out.println("[INFO]: 读取完毕,共读取了 " + numRead + " 条消息");
  102. }
  103. }


6.4.2 测试读取方法

调用上面的方法进行测试:

  1. /**
  2. * 测试方法 : readData
  3. *
  4. * */
  5. public static void TestReadData(String topic, int partition,
  6. long reqOffset, int fetchSize) {
  7. List<String> brokers = new ArrayList<>();
  8. brokers.add("ecs1");
  9. brokers.add("ecs4");
  10. int port = 9092;
  11. SimpleConsumerUtil util = new SimpleConsumerUtil();
  12. util.readData(brokers, port, "ClientReadData",
  13. topic, partition, reqOffset, fetchSize);
  14. }

在测试前,我们创建了“topic-D”,它有3个partition。 随后,我们向“topic-D”写入了17条消息,每条消息的key是int,value是String,每条消息的key,value,partitionId如下(可以看出,key所在的partition = key%numPartitions):



下面从几个方面来对“topic-D”测试读取过程。

6.4.2.1 测试:读取各个partition中的全部消息

指定reqOffset=0, fetchSize=10000,变幻partition:

总结:

  1. 对一个consumer而言,它要处理一个topic中每个partition的offset

  2. 在一个partition中,1条消息占用的offset是1,不论消息的字节数有多少,每读取一条消息,offset就应该往后加1,即offset指的是一条消息在partitin中的偏移量,与这条消息的大小无关


6.4.2.2 测试:读取时指定各种fetchSize

指定 partition=0, reqOffset=0, 变幻fetchSize



这说明在我们的这个例子中,一条消息的((0, "msg-0")或者(3, "msg-3"))所占的大小为37个字节。

如果要读取key为{0, 3, 6, 9}的这4条消息,fetchSize应该为37 * 4 = 148。下面验证:



如果要再读取下一条消息(12, "msg-12"),则fetchSize应该为 148 + 37 + 1 = 186,因为key为12的消息,它的value比前面几条消息的value多了一个字节。下面验证。




添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注