[关闭]
@muyanfeixiang 2016-12-26T15:18:11.000000Z 字数 6989 阅读 2153

快速了解kafka

kafka 使用文档


什么是kafka?

简单来说kafka就是一个消息队列,同rabbitmq、zeromq等功能类似。更广义的来讲Kafka是一个分布式流平台。这意味着它能够:

  1. stream of records的订阅和发布,这一点类似于传统的消息队列。
  2. 可以以容错的方式存储records.
  3. 可以是充当流处理器。

因此可以使用两种广义的应用:

  1. 在系统或应用之间,充当可靠的流数据管道。
  2. 构建实时的流应用,来对数据进行处理。

一些概念

  1. Kafka集群可以单服务器或者多服务器
  2. Kafka以类别来存储stream of records,称为topic
  3. 每个topic有一个value,一个key和一个timestamp

核心API

我们目前主要使用前两个。

作为消息系统

Topic和Logs

一个topic可以认为就是一个队列。一个topic可以有0个,1个或多个订阅者。对于一个topic,kafka通过如下分区log来维护(partitioned log)
image_1b40qtkmkc9h138uamqb3u1p2r9.png-37.2kB
每一个partition都是有序的、不可变动的record序列,每个新的record只能append到partition(可以认为partition是structured commit log)。partition中每个record都分配的有一个offset(偏移量),标识在partition中是唯一的。

kafka保存所有的record,不管有没有被消费。可以通过配置滞留期(retention period)来设置消息过期,到期后record会被删除。kafka的性能与数据大小是常量相关的,所以存储大量数据是没关系的。

image_1b40r8bjnfe5133dv2215nttqjm.png-34.4kB

事实上,consumer只需要保存offset数据。offset是由consumer控制的,每次消费数据后可以递增offset,但是consumer也可以以任意顺序和从任意位置来访问partition。这就使得consumer非常轻量级,可以来回访问partition而不影响其他consumer。

将partition放在log中一方面可以在单个服务器上方便调整大小,每个partition要符合宿主服务器大小。但是一个topic可以有多个partition,因此可以方大量数据。另一方面可以并行。

PS. topic在创建的时候会根据kafka server中 num.partitons来创建若干的partition。在之后如果改变了partition的数量,之前的topic的partition数量不会自动改变。

Distribution

partition是分布到kafka集群中不同的服务器上的,每个partition会根据replication配置,会有若干备份来进行容错。partition之间也是分leader和follower的,只有leader是可读可写的。在leader挂了之后,follower会选出新的leader。一般leader和follower是在不同的机器上。

Producer

Producer将消息发布到topic。Producer负责将哪个消息发布到topic上的哪个partition。可以使用round-robbin来做的负载均衡,也可以使用其他方法。在producer的config中可以配置。

Consumer

Consumer可以使用Consumer_Group来进行分组。topic中的每一个消息只会送达给每个分组中的一个Consumer。但是每个消息会送达各个分组。Consumer可是在单独的进程或者机器。

如果所有的Consumer都在同一个分组,那么就可以实现负载均衡;
如果每个Consumer都在不同的分组,那么就可以实现广播的效果。

image_1b40shn40io21v3cmbo11o119ro13.png-53.7kB

如上,两个kafka server,4个partition,P0-P3,A组有两个consumer,B组有四个consumer。
一般情况,一个topic会有几个订阅组,用来进行不同的逻辑处理。每个组有若干的consumer来进行扩展和容错。
kafka中通过把partition来均分给consumer组的每个consumer来进行消费的。没个consumer实例在每一时刻都会独占partitions的一部分份额。kafka协议动态的调整partition在consumer中间的分配。新增和删除组内的consumer实例,kafka都会重新分配partition。

对于同一个topic,kafka提供partition内消息顺序而不是在所有partition之间。大部分情况,partition内的排序和partition中消息的key是能满足需求的。如果需要整个topic的全序,可以只使用一个partition。

保证

  1. 可以保证一个producer发布到一个partition的消息有有序的,先发的消息offset比后发的offset更小。
  2. consumer看到存储的消息是有序的。
  3. 对一个有N个replica的topic,最多能允许N-1个失效。

Kafka作为消息系统优势

传统的消息队列有queque和publish-subscribe模式。前者是多个consumer从一个server来读,每个record只会到一个consumer;后者消息是发送给所有的订阅者。各有优缺点。queque的优点是可以分发消息做到负载均衡,但是它不是多个订阅者的,一个消息一旦被消费就小时了。publish-subcribe模式允许多个订阅者,但是无法做到负载分配。

kafka通过组的概念,可以既可以做到订阅也可以负载分配。

传统消息队列在服务端是能保证消息的有序,但是一旦消息分发给多个consumer,因为消息分发过程是异步的,不能保证客户端的消息有序。这就意味着,消息系统需要通过使用“独占”consumer来保证消息的有序,当然这就无法做到并行处理。

kafka通过使用partition来提供顺序保证和负载均衡。此时群组内的消费者实例不能多于partition的数量。

样例代码

使用的是https://github.com/ah-/rdkafka-dotnet这个c# client

simple producer

  1. string brokerList = args[0]; //可以多个broker
  2. string topicName = args[1];
  3. using (Producer producer = new Producer(brokerList))
  4. using (Topic topic = producer.Topic(topicName))
  5. {
  6. Console.WriteLine($"{producer.Name} producing on {topic.Name}. q to exit.");
  7. string text;
  8. while ((text = Console.ReadLine()) != "q")
  9. {
  10. byte[] data = Encoding.UTF8.GetBytes(text);
  11. Task<DeliveryReport> deliveryReport = topic.Produce(data);
  12. var unused = deliveryReport.ContinueWith(task =>
  13. {
  14. Console.WriteLine($"Partition: {task.Result.Partition}, Offset: {task.Result.Offset}");
  15. }); //publish confirm callback
  16. }
  17. }

simple consumer

  1. string brokerList = args[0];
  2. var topics = args.Skip(1).ToList();
  3. var config = new Config() { GroupId = "simple-csharp-consumer" };
  4. using (var consumer = new EventConsumer(config, brokerList))
  5. {
  6. consumer.OnMessage += (obj, msg) =>
  7. {
  8. string text = Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length);
  9. Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {text}");
  10. };
  11. consumer.Assign(new List<TopicPartitionOffset>
  12. {
  13. new TopicPartitionOffset(topics.First(), 1, 0),
  14. new TopicPartitionOffset(topics.First(), 2, 0),
  15. new TopicPartitionOffset(topics.First(), 0, 0)
  16. });//这里是重置topic在partition 0的offset到5的位置。可以指定多个partition的偏移量
  17. consumer.Subscribe(topics);
  18. consumer.Start();
  19. Console.WriteLine("Started consumer, press enter to stop consuming");
  20. Console.ReadLine();

AdvancedProducer

  1. //string brokerList = args[0];
  2. //string topicName = args[1];
  3. var brokerList = "127.0.0.1:9092";
  4. var topics = new string[] { "test1", "test2" }.ToList();
  5. var topicConfig = new TopicConfig
  6. {
  7. CustomPartitioner = (top, key, cnt) =>
  8. {
  9. var kt = (key != null) ? Encoding.UTF8.GetString(key, 0, key.Length) : "(null)";
  10. int partition = (key?.Length ?? 0) % cnt;
  11. bool available = top.PartitionAvailable(partition);
  12. Console.WriteLine($"Partitioner topic: {top.Name} key: {kt} partition count: {cnt} -> {partition} {available}");
  13. return partition;
  14. }
  15. };//配置根据record的key来讲消息分布到不同的partition
  16. using (Producer producer = new Producer(brokerList))
  17. using (Topic topic = producer.Topic("havePartition", topicConfig))
  18. {
  19. Console.WriteLine($"{producer.Name} producing on {topic.Name}. q to exit.");
  20. string text;
  21. while ((text = Console.ReadLine()) != "q")
  22. {
  23. byte[] data = Encoding.UTF8.GetBytes(text);
  24. byte[] key = null;
  25. // Use the first word as the key
  26. int index = text.IndexOf(" ");
  27. if (index != -1)
  28. {
  29. key = Encoding.UTF8.GetBytes(text.Substring(0, index));
  30. }
  31. Task<DeliveryReport> deliveryReport = topic.Produce(data, key);
  32. var unused = deliveryReport.ContinueWith(task =>
  33. {
  34. Console.WriteLine($"Partition: {task.Result.Partition}, Offset: {task.Result.Offset}");
  35. });
  36. }
  37. }

AdvancedConsumer

  1. public static void Run(string brokerList, List<string> topics)
  2. {
  3. bool enableAutoCommit = false;
  4. var config = new Config()
  5. {
  6. GroupId = "advanced-csharp-consumer",
  7. EnableAutoCommit = enableAutoCommit,
  8. StatisticsInterval = TimeSpan.FromSeconds(60) //Statistics emit interval for OnStatistics.
  9. };
  10. using (var consumer = new EventConsumer(config, brokerList))
  11. {
  12. consumer.OnMessage += (obj, msg) =>
  13. {
  14. string text = Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length);
  15. Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {text}");
  16. if (!enableAutoCommit && msg.Offset % 10 == 0)
  17. {
  18. Console.WriteLine($"Committing offset");
  19. consumer.Commit(msg).Wait();
  20. Console.WriteLine($"Committed offset");
  21. }
  22. };
  23. consumer.OnConsumerError += (obj, errorCode) =>
  24. {
  25. Console.WriteLine($"Consumer Error: {errorCode}");
  26. };
  27. consumer.OnEndReached += (obj, end) =>
  28. {
  29. Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");
  30. };
  31. consumer.OnError += (obj, error) =>
  32. {
  33. Console.WriteLine($"Error: {error.ErrorCode} {error.Reason}");
  34. };
  35. if (enableAutoCommit)
  36. {
  37. consumer.OnOffsetCommit += (obj, commit) =>
  38. {
  39. if (commit.Error != ErrorCode.NO_ERROR)
  40. {
  41. Console.WriteLine($"Failed to commit offsets: {commit.Error}");
  42. }
  43. Console.WriteLine($"Successfully committed offsets: [{string.Join(", ", commit.Offsets)}]");
  44. };
  45. }
  46. consumer.OnPartitionsAssigned += (obj, partitions) =>
  47. {
  48. Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}], member id: {consumer.MemberId}");
  49. consumer.Assign(partitions);
  50. };
  51. consumer.OnPartitionsRevoked += (obj, partitions) =>
  52. {
  53. Console.WriteLine($"Revoked partitions: [{string.Join(", ", partitions)}]");
  54. consumer.Unassign();
  55. };
  56. consumer.OnStatistics += (obj, json) =>
  57. {
  58. Console.WriteLine($"Statistics: {json}");
  59. };
  60. consumer.Subscribe(topics);
  61. consumer.Start();
  62. Console.WriteLine($"Assigned to: [{string.Join(", ", consumer.Assignment)}]");
  63. Console.WriteLine($"Subscribed to: [{string.Join(", ", consumer.Subscription)}]");
  64. Console.WriteLine($"Started consumer, press enter to stop consuming");
  65. Console.ReadLine();
  66. }
  67. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注