@muyanfeixiang
2016-12-26T07:18:11.000000Z
字数 6989
阅读 2512
kafka 使用文档
简单来说kafka就是一个消息队列,同rabbitmq、zeromq等功能类似。更广义的来讲Kafka是一个分布式流平台。这意味着它能够:
因此可以使用两种广义的应用:
我们目前主要使用前两个。
一个topic可以认为就是一个队列。一个topic可以有0个,1个或多个订阅者。对于一个topic,kafka通过如下分区log来维护(partitioned log)
每一个partition都是有序的、不可变动的record序列,每个新的record只能append到partition(可以认为partition是structured commit log)。partition中每个record都分配的有一个offset(偏移量),标识在partition中是唯一的。
kafka保存所有的record,不管有没有被消费。可以通过配置滞留期(retention period)来设置消息过期,到期后record会被删除。kafka的性能与数据大小是常量相关的,所以存储大量数据是没关系的。

事实上,consumer只需要保存offset数据。offset是由consumer控制的,每次消费数据后可以递增offset,但是consumer也可以以任意顺序和从任意位置来访问partition。这就使得consumer非常轻量级,可以来回访问partition而不影响其他consumer。
将partition放在log中一方面可以在单个服务器上方便调整大小,每个partition要符合宿主服务器大小。但是一个topic可以有多个partition,因此可以方大量数据。另一方面可以并行。
PS. topic在创建的时候会根据kafka server中 num.partitons来创建若干的partition。在之后如果改变了partition的数量,之前的topic的partition数量不会自动改变。
partition是分布到kafka集群中不同的服务器上的,每个partition会根据replication配置,会有若干备份来进行容错。partition之间也是分leader和follower的,只有leader是可读可写的。在leader挂了之后,follower会选出新的leader。一般leader和follower是在不同的机器上。
Producer将消息发布到topic。Producer负责将哪个消息发布到topic上的哪个partition。可以使用round-robbin来做的负载均衡,也可以使用其他方法。在producer的config中可以配置。
Consumer可以使用Consumer_Group来进行分组。topic中的每一个消息只会送达给每个分组中的一个Consumer。但是每个消息会送达各个分组。Consumer可是在单独的进程或者机器。
如果所有的Consumer都在同一个分组,那么就可以实现负载均衡;
如果每个Consumer都在不同的分组,那么就可以实现广播的效果。

如上,两个kafka server,4个partition,P0-P3,A组有两个consumer,B组有四个consumer。
一般情况,一个topic会有几个订阅组,用来进行不同的逻辑处理。每个组有若干的consumer来进行扩展和容错。
kafka中通过把partition来均分给consumer组的每个consumer来进行消费的。没个consumer实例在每一时刻都会独占partitions的一部分份额。kafka协议动态的调整partition在consumer中间的分配。新增和删除组内的consumer实例,kafka都会重新分配partition。
对于同一个topic,kafka提供partition内消息顺序而不是在所有partition之间。大部分情况,partition内的排序和partition中消息的key是能满足需求的。如果需要整个topic的全序,可以只使用一个partition。
传统的消息队列有queque和publish-subscribe模式。前者是多个consumer从一个server来读,每个record只会到一个consumer;后者消息是发送给所有的订阅者。各有优缺点。queque的优点是可以分发消息做到负载均衡,但是它不是多个订阅者的,一个消息一旦被消费就小时了。publish-subcribe模式允许多个订阅者,但是无法做到负载分配。
kafka通过组的概念,可以既可以做到订阅也可以负载分配。
传统消息队列在服务端是能保证消息的有序,但是一旦消息分发给多个consumer,因为消息分发过程是异步的,不能保证客户端的消息有序。这就意味着,消息系统需要通过使用“独占”consumer来保证消息的有序,当然这就无法做到并行处理。
kafka通过使用partition来提供顺序保证和负载均衡。此时群组内的消费者实例不能多于partition的数量。
使用的是https://github.com/ah-/rdkafka-dotnet这个c# client
string brokerList = args[0]; //可以多个brokerstring topicName = args[1];using (Producer producer = new Producer(brokerList))using (Topic topic = producer.Topic(topicName)){Console.WriteLine($"{producer.Name} producing on {topic.Name}. q to exit.");string text;while ((text = Console.ReadLine()) != "q"){byte[] data = Encoding.UTF8.GetBytes(text);Task<DeliveryReport> deliveryReport = topic.Produce(data);var unused = deliveryReport.ContinueWith(task =>{Console.WriteLine($"Partition: {task.Result.Partition}, Offset: {task.Result.Offset}");}); //publish confirm callback}}
string brokerList = args[0];var topics = args.Skip(1).ToList();var config = new Config() { GroupId = "simple-csharp-consumer" };using (var consumer = new EventConsumer(config, brokerList)){consumer.OnMessage += (obj, msg) =>{string text = Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length);Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {text}");};consumer.Assign(new List<TopicPartitionOffset>{new TopicPartitionOffset(topics.First(), 1, 0),new TopicPartitionOffset(topics.First(), 2, 0),new TopicPartitionOffset(topics.First(), 0, 0)});//这里是重置topic在partition 0的offset到5的位置。可以指定多个partition的偏移量consumer.Subscribe(topics);consumer.Start();Console.WriteLine("Started consumer, press enter to stop consuming");Console.ReadLine();
//string brokerList = args[0];//string topicName = args[1];var brokerList = "127.0.0.1:9092";var topics = new string[] { "test1", "test2" }.ToList();var topicConfig = new TopicConfig{CustomPartitioner = (top, key, cnt) =>{var kt = (key != null) ? Encoding.UTF8.GetString(key, 0, key.Length) : "(null)";int partition = (key?.Length ?? 0) % cnt;bool available = top.PartitionAvailable(partition);Console.WriteLine($"Partitioner topic: {top.Name} key: {kt} partition count: {cnt} -> {partition} {available}");return partition;}};//配置根据record的key来讲消息分布到不同的partitionusing (Producer producer = new Producer(brokerList))using (Topic topic = producer.Topic("havePartition", topicConfig)){Console.WriteLine($"{producer.Name} producing on {topic.Name}. q to exit.");string text;while ((text = Console.ReadLine()) != "q"){byte[] data = Encoding.UTF8.GetBytes(text);byte[] key = null;// Use the first word as the keyint index = text.IndexOf(" ");if (index != -1){key = Encoding.UTF8.GetBytes(text.Substring(0, index));}Task<DeliveryReport> deliveryReport = topic.Produce(data, key);var unused = deliveryReport.ContinueWith(task =>{Console.WriteLine($"Partition: {task.Result.Partition}, Offset: {task.Result.Offset}");});}}
public static void Run(string brokerList, List<string> topics){bool enableAutoCommit = false;var config = new Config(){GroupId = "advanced-csharp-consumer",EnableAutoCommit = enableAutoCommit,StatisticsInterval = TimeSpan.FromSeconds(60) //Statistics emit interval for OnStatistics.};using (var consumer = new EventConsumer(config, brokerList)){consumer.OnMessage += (obj, msg) =>{string text = Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length);Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {text}");if (!enableAutoCommit && msg.Offset % 10 == 0){Console.WriteLine($"Committing offset");consumer.Commit(msg).Wait();Console.WriteLine($"Committed offset");}};consumer.OnConsumerError += (obj, errorCode) =>{Console.WriteLine($"Consumer Error: {errorCode}");};consumer.OnEndReached += (obj, end) =>{Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");};consumer.OnError += (obj, error) =>{Console.WriteLine($"Error: {error.ErrorCode} {error.Reason}");};if (enableAutoCommit){consumer.OnOffsetCommit += (obj, commit) =>{if (commit.Error != ErrorCode.NO_ERROR){Console.WriteLine($"Failed to commit offsets: {commit.Error}");}Console.WriteLine($"Successfully committed offsets: [{string.Join(", ", commit.Offsets)}]");};}consumer.OnPartitionsAssigned += (obj, partitions) =>{Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}], member id: {consumer.MemberId}");consumer.Assign(partitions);};consumer.OnPartitionsRevoked += (obj, partitions) =>{Console.WriteLine($"Revoked partitions: [{string.Join(", ", partitions)}]");consumer.Unassign();};consumer.OnStatistics += (obj, json) =>{Console.WriteLine($"Statistics: {json}");};consumer.Subscribe(topics);consumer.Start();Console.WriteLine($"Assigned to: [{string.Join(", ", consumer.Assignment)}]");Console.WriteLine($"Subscribed to: [{string.Join(", ", consumer.Subscription)}]");Console.WriteLine($"Started consumer, press enter to stop consuming");Console.ReadLine();}}