@muyanfeixiang
2016-12-26T15:18:11.000000Z
字数 6989
阅读 2153
kafka
使用文档
简单来说kafka就是一个消息队列,同rabbitmq、zeromq等功能类似。更广义的来讲Kafka是一个分布式流平台。这意味着它能够:
因此可以使用两种广义的应用:
我们目前主要使用前两个。
一个topic可以认为就是一个队列。一个topic可以有0个,1个或多个订阅者。对于一个topic,kafka通过如下分区log来维护(partitioned log)
每一个partition都是有序的、不可变动的record序列,每个新的record只能append到partition(可以认为partition是structured commit log)。partition中每个record都分配的有一个offset(偏移量),标识在partition中是唯一的。
kafka保存所有的record,不管有没有被消费。可以通过配置滞留期(retention period)来设置消息过期,到期后record会被删除。kafka的性能与数据大小是常量相关的,所以存储大量数据是没关系的。
事实上,consumer只需要保存offset数据。offset是由consumer控制的,每次消费数据后可以递增offset,但是consumer也可以以任意顺序和从任意位置来访问partition。这就使得consumer非常轻量级,可以来回访问partition而不影响其他consumer。
将partition放在log中一方面可以在单个服务器上方便调整大小,每个partition要符合宿主服务器大小。但是一个topic可以有多个partition,因此可以方大量数据。另一方面可以并行。
PS. topic在创建的时候会根据kafka server中 num.partitons来创建若干的partition。在之后如果改变了partition的数量,之前的topic的partition数量不会自动改变。
partition是分布到kafka集群中不同的服务器上的,每个partition会根据replication配置,会有若干备份来进行容错。partition之间也是分leader和follower的,只有leader是可读可写的。在leader挂了之后,follower会选出新的leader。一般leader和follower是在不同的机器上。
Producer将消息发布到topic。Producer负责将哪个消息发布到topic上的哪个partition。可以使用round-robbin来做的负载均衡,也可以使用其他方法。在producer的config中可以配置。
Consumer可以使用Consumer_Group来进行分组。topic中的每一个消息只会送达给每个分组中的一个Consumer。但是每个消息会送达各个分组。Consumer可是在单独的进程或者机器。
如果所有的Consumer都在同一个分组,那么就可以实现负载均衡;
如果每个Consumer都在不同的分组,那么就可以实现广播的效果。
如上,两个kafka server,4个partition,P0-P3,A组有两个consumer,B组有四个consumer。
一般情况,一个topic会有几个订阅组,用来进行不同的逻辑处理。每个组有若干的consumer来进行扩展和容错。
kafka中通过把partition来均分给consumer组的每个consumer来进行消费的。没个consumer实例在每一时刻都会独占partitions的一部分份额。kafka协议动态的调整partition在consumer中间的分配。新增和删除组内的consumer实例,kafka都会重新分配partition。
对于同一个topic,kafka提供partition内消息顺序而不是在所有partition之间。大部分情况,partition内的排序和partition中消息的key是能满足需求的。如果需要整个topic的全序,可以只使用一个partition。
传统的消息队列有queque和publish-subscribe模式。前者是多个consumer从一个server来读,每个record只会到一个consumer;后者消息是发送给所有的订阅者。各有优缺点。queque的优点是可以分发消息做到负载均衡,但是它不是多个订阅者的,一个消息一旦被消费就小时了。publish-subcribe模式允许多个订阅者,但是无法做到负载分配。
kafka通过组的概念,可以既可以做到订阅也可以负载分配。
传统消息队列在服务端是能保证消息的有序,但是一旦消息分发给多个consumer,因为消息分发过程是异步的,不能保证客户端的消息有序。这就意味着,消息系统需要通过使用“独占”consumer来保证消息的有序,当然这就无法做到并行处理。
kafka通过使用partition来提供顺序保证和负载均衡。此时群组内的消费者实例不能多于partition的数量。
使用的是https://github.com/ah-/rdkafka-dotnet这个c# client
string brokerList = args[0]; //可以多个broker
string topicName = args[1];
using (Producer producer = new Producer(brokerList))
using (Topic topic = producer.Topic(topicName))
{
Console.WriteLine($"{producer.Name} producing on {topic.Name}. q to exit.");
string text;
while ((text = Console.ReadLine()) != "q")
{
byte[] data = Encoding.UTF8.GetBytes(text);
Task<DeliveryReport> deliveryReport = topic.Produce(data);
var unused = deliveryReport.ContinueWith(task =>
{
Console.WriteLine($"Partition: {task.Result.Partition}, Offset: {task.Result.Offset}");
}); //publish confirm callback
}
}
string brokerList = args[0];
var topics = args.Skip(1).ToList();
var config = new Config() { GroupId = "simple-csharp-consumer" };
using (var consumer = new EventConsumer(config, brokerList))
{
consumer.OnMessage += (obj, msg) =>
{
string text = Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length);
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {text}");
};
consumer.Assign(new List<TopicPartitionOffset>
{
new TopicPartitionOffset(topics.First(), 1, 0),
new TopicPartitionOffset(topics.First(), 2, 0),
new TopicPartitionOffset(topics.First(), 0, 0)
});//这里是重置topic在partition 0的offset到5的位置。可以指定多个partition的偏移量
consumer.Subscribe(topics);
consumer.Start();
Console.WriteLine("Started consumer, press enter to stop consuming");
Console.ReadLine();
//string brokerList = args[0];
//string topicName = args[1];
var brokerList = "127.0.0.1:9092";
var topics = new string[] { "test1", "test2" }.ToList();
var topicConfig = new TopicConfig
{
CustomPartitioner = (top, key, cnt) =>
{
var kt = (key != null) ? Encoding.UTF8.GetString(key, 0, key.Length) : "(null)";
int partition = (key?.Length ?? 0) % cnt;
bool available = top.PartitionAvailable(partition);
Console.WriteLine($"Partitioner topic: {top.Name} key: {kt} partition count: {cnt} -> {partition} {available}");
return partition;
}
};//配置根据record的key来讲消息分布到不同的partition
using (Producer producer = new Producer(brokerList))
using (Topic topic = producer.Topic("havePartition", topicConfig))
{
Console.WriteLine($"{producer.Name} producing on {topic.Name}. q to exit.");
string text;
while ((text = Console.ReadLine()) != "q")
{
byte[] data = Encoding.UTF8.GetBytes(text);
byte[] key = null;
// Use the first word as the key
int index = text.IndexOf(" ");
if (index != -1)
{
key = Encoding.UTF8.GetBytes(text.Substring(0, index));
}
Task<DeliveryReport> deliveryReport = topic.Produce(data, key);
var unused = deliveryReport.ContinueWith(task =>
{
Console.WriteLine($"Partition: {task.Result.Partition}, Offset: {task.Result.Offset}");
});
}
}
public static void Run(string brokerList, List<string> topics)
{
bool enableAutoCommit = false;
var config = new Config()
{
GroupId = "advanced-csharp-consumer",
EnableAutoCommit = enableAutoCommit,
StatisticsInterval = TimeSpan.FromSeconds(60) //Statistics emit interval for OnStatistics.
};
using (var consumer = new EventConsumer(config, brokerList))
{
consumer.OnMessage += (obj, msg) =>
{
string text = Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length);
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {text}");
if (!enableAutoCommit && msg.Offset % 10 == 0)
{
Console.WriteLine($"Committing offset");
consumer.Commit(msg).Wait();
Console.WriteLine($"Committed offset");
}
};
consumer.OnConsumerError += (obj, errorCode) =>
{
Console.WriteLine($"Consumer Error: {errorCode}");
};
consumer.OnEndReached += (obj, end) =>
{
Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");
};
consumer.OnError += (obj, error) =>
{
Console.WriteLine($"Error: {error.ErrorCode} {error.Reason}");
};
if (enableAutoCommit)
{
consumer.OnOffsetCommit += (obj, commit) =>
{
if (commit.Error != ErrorCode.NO_ERROR)
{
Console.WriteLine($"Failed to commit offsets: {commit.Error}");
}
Console.WriteLine($"Successfully committed offsets: [{string.Join(", ", commit.Offsets)}]");
};
}
consumer.OnPartitionsAssigned += (obj, partitions) =>
{
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}], member id: {consumer.MemberId}");
consumer.Assign(partitions);
};
consumer.OnPartitionsRevoked += (obj, partitions) =>
{
Console.WriteLine($"Revoked partitions: [{string.Join(", ", partitions)}]");
consumer.Unassign();
};
consumer.OnStatistics += (obj, json) =>
{
Console.WriteLine($"Statistics: {json}");
};
consumer.Subscribe(topics);
consumer.Start();
Console.WriteLine($"Assigned to: [{string.Join(", ", consumer.Assignment)}]");
Console.WriteLine($"Subscribed to: [{string.Join(", ", consumer.Subscription)}]");
Console.WriteLine($"Started consumer, press enter to stop consuming");
Console.ReadLine();
}
}