@sasaki
2016-02-18T17:27:11.000000Z
字数 8987
阅读 7899
BigData
@Title Kafka——分布式发布订阅消息系统
@Version v1.0
@Timestamp 2016-01-14 18:37
@Author Nicholas
@Mail redskirt@outlook.com
消息:是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。
消息队列:消息队列是在消息的传输过程中保存消息的容器,消息传递中间人。
Kafka是由Linkedin公司开发,使用Scala语言编写的一种分布式、分区的、多副本的、多订阅者的日志系统(分布式MQ系统),可以用亍web/nginx日志,搜索日志,监控日志,访问日志等等。
Kafka的设计初衷是希望作为一个统一的信息收集平台,能够实时的收集反馈信息,并能够支撑较大的数据量,且具备良好的容错能力。
因为Kafka内在就是分布式的,一个Kafka集群通常包括多个代理。为了均衡负载,将话题分成多个分区,每个代理存储一或多个分区。多个生产者和消费者能够同时生产和获取消息。
在Kafka中,发送消息者称为Producer,而消息接收者称为Consumer。
Producer和Consumer都依赖亍Zookeeper来保证系统可用性,为集群保存一些meta信息。
Kafka集群有多个实例组成,每个节点称为broker。对消息保存时根据Topic进行归类(是一个逻辑概念,相当于一个分布式队列),为了加快读取速度,多个comsumer可以划分为一个组,并行消费一个topic。
一个Topic的多个partitions,被分布在kafka集群中的多个server上;每个server(kafka实例)负责partitions中消息的读写操作;此外kafka还可以配置partitions需要备份的个数(replicas),每个partition将会被备份到多台机器上,以提高可用性。
基于replicated方案,那么就意味着需要对多个备份进行调度;每个partition都有一个server为"leader";leader负责所有的读写操作,如果leader失效,那么将会有其他follower来接管(成为新的leader);follower只是单调的和leader跟进,同步消息即可。。由此可见作为leader的server承载了全部的请求压力,因此从集群的整体考虑,有多少个partitions就意味着有多少个"leader",kafka会将"leader"均衡的分散在每个实例上,来确保整体的性能稳定。
Producers
Producer将消息发布到指定的Topic中,同时Producer也能决定将此消息归属于哪个partition;比如基于"round-robin"方式或者通过其他的一些算法等。
Consumers
本质上kafka只支持Topic。每个consumer属于一个consumer group;反过来说,每个group中可以有多个consumer。发送到Topic的消息,只会被订阅此Topic的每个group中的一个consumer消费。
如果所有的consumer都具有相同的group,这种情况和queue模式很像;消息将会在consumers之间负载均衡。
如果所有的consumer都具有不同的group,那这就是"发布-订阅";消息将会广播给所有的消费者。
在kafka中,一个partition中的消息只会被group中的一个consumer消费;每个group中consumer消息消费互相独立;我们可以认为一个group是一个“订阅”者,一个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer可以消费多个partitions中的消息。kafka只能保证一个partition中的消息被某个consumer消费时,消息是顺序的。事实上,从Topic角度来说,消息仍不是有序的。
kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息。
下载Kafka,当前最高版本2.11
[root@master usr]# wget http://apache.fayea.com/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
开启服务
# 启动Kafka自带的Zookeeper
[root@master kafka_2.10-0.8.2.0]# bin/zookeeper-server-start.sh config/zookeeper.properties &
# 启动Kafka服务
[root@master kafka_2.10-0.8.2.0]# bin/kafka-server-start.sh config/server.properties &
创建topic
[root@master kafka_2.10-0.8.2.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic
查看topic列表
[root@master kafka_2.10-0.8.2.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181
[2016-01-16 16:04:47,352] INFO Accepted socket connection from /127.0.0.1:49148 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2016-01-16 16:04:47,356] INFO Client attempting to establish new session at /127.0.0.1:49148 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-01-16 16:04:47,358] INFO Established session 0x152496f34a3000c with negotiated timeout 30000 for client /127.0.0.1:49148 (org.apache.zookeeper.server.ZooKeeperServer)
my-replicated-topic1
my-replicatedtopic1
my-replicatedtopic2
testTopic
[2016-01-16 16:04:47,422] INFO Processed session termination for sessionid: 0x152496f34a3000c (org.apache.zookeeper.server.PrepRequestProcessor)
[2016-01-16 16:04:47,424] INFO Closed socket connection for client /127.0.0.1:49148 which had sessionid 0x152496f34a3000c (org.apache.zookeeper.server.NIOServerCnxn)
模拟生产者(producer)
bin/kafka-console-producer.sh --broker-list localhost:9092 -- topic testTopic
This is a message
模拟消费者(consumer)
# 另开一个终端,正常情况下,会输出:producer 的输入信息
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testTopic --from-beginning
在slave01配置中更改zookeeper指向master节点。
[root@master kafka_2.10-0.8.2.0]# vim config/server.properties
...
zookeeper.connect=master:2181
创建新的topic
[root@master kafka_2.10-0.8.2.0]# bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 2 --partitions 3 --topic my-replicated-topic1
查看topic信息
[root@master kafka_2.10-0.8.2.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic1
[2016-01-16 16:01:51,896] INFO Accepted socket connection from /127.0.0.1:48967 (org.apache.zookeeper.server.NIOServerCnxnFactory)
[2016-01-16 16:01:51,901] INFO Client attempting to establish new session at /127.0.0.1:48967 (org.apache.zookeeper.server.ZooKeeperServer)
[2016-01-16 16:01:51,903] INFO Established session 0x152496f34a3000b with negotiated timeout 30000 for client /127.0.0.1:48967 (org.apache.zookeeper.server.ZooKeeperServer)
Topic:my-replicated-topic1 PartitionCount:3 ReplicationFactor:2 Configs:
Topic: my-replicated-topic1 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
Topic: my-replicated-topic1 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
Topic: my-replicated-topic1 Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
[2016-01-16 16:01:52,384] INFO Processed session termination for sessionid: 0x152496f34a3000b (org.apache.zookeeper.server.PrepRequestProcessor)
[2016-01-16 16:01:52,388] INFO Closed socket connection for client /127.0.0.1:48967 which had sessionid 0x152496f34a3000b (org.apache.zookeeper.server.NIOServerCnxn) bin/kafka-topics.sh --describe --zookeeper master:2181 --topic my-replicated-topic1
模拟生产者(producer)
[root@master kafka_2.10-0.8.2.0]# bin/kafka-console-producer.sh --broker-list master:9092 --topic my-replicated-topic1
[2016-01-16 16:13:49,399] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
hello
[2016-01-16 16:13:53,595] INFO Closing socket connection to /192.168.53.230. (kafka.network.Processor)
模拟消费者(consumer),接收消息
[root@slave01 kafka_2.10-0.8.2.0]# bin/kafka-console-consumer.sh --zookeeper master:2181 --from-beginning --topic my-replicated-topic1
[2016-01-16 16:15:01,518] INFO Closing socket connection to /192.168.53.231. (kafka.network.Processor)
hello
server.properties 中设置delete.topic.enable=true
则可以使用删除topic;
[root@master kafka_2.10-0.8.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic my-replicatedtopic1
Topic my-replicatedtopic1 is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
# 需要设置 delete.topic.enable=true 才可进行删除操作
[root@master kafka_2.10-0.8.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic my-replicatedtopic1
Topic my-replicatedtopic1 is already marked for deletion.
[root@master kafka_2.10-0.8.2.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181
my-replicated-topic1
my-replicatedtopic1 - marked for deletion
my-replicatedtopic2
testTopic
查看consumer组内消费的offset
每个consumer的创建都会自动创建一个group,在zookeeper中
[root@slave01 kafka_2.10-0.8.2.0]# bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test --topic my-topic
每个replica一个目录
[root@master kafka_2.10-0.8.2.0]# cd /tmp/kafka-logs/
[root@master kafka-logs]# ls
__consumer_offsets-0 __consumer_offsets-20 __consumer_offsets-32 __consumer_offsets-44 my-replicatedtopic1-0
__consumer_offsets-1 __consumer_offsets-21 __consumer_offsets-33 __consumer_offsets-45 my-replicated-topic1-1
[root@master kafka-logs]# cd my-replicated-topic1-0/
[root@master my-replicated-topic1-0]# ls
00000000000000000000.index 00000000000000000000.log
Replication
多replica写
在Zookeeper中查看Kafka角色
[root@master my-replicated-topic1-0]# /usr/bin/zookeeper-client
[zk: localhost:2181(CONNECTED) 1] ls /
[consumers, config, controller, isr_change_notification, admin, brokers, zookeeper, controller_epoch]
# 查看Controller
[zk: localhost:2181(CONNECTED) 2] get /controller
{"version":1,"brokerid":0,"timestamp":"1453122955298"}
cZxid = 0x171
ctime = Mon Jan 18 21:15:55 CST 2016
mZxid = 0x171
mtime = Mon Jan 18 21:15:55 CST 2016
pZxid = 0x171
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x152496f34a30016
dataLength = 54
numChildren = 0
# 查看删除的Topic
[zk: localhost:2181(CONNECTED) 3] ls /admin/delete_topics
[my-replicatedtopic1]
# 查看当前Brokers
[zk: localhost:2181(CONNECTED) 4] ls /brokers/ids
[0]
Kafka的工具都以脚本形式存在
[root@master my-replicated-topic1-0]# cd /usr/kafka_2.10-0.8.2.0/bin/
[root@master bin]# ls
kafka-console-consumer.sh kafka-mirror-maker.sh kafka-replay-log-producer.sh kafka-server-stop.sh zookeeper-server-start.sh
kafka-console-producer.sh kafka-preferred-replica-election.sh kafka-replica-verification.sh kafka-simple-consumer-shell.sh zookeeper-server-stop.sh
kafka-consumer-offset-checker.sh kafka-producer-perf-test.sh kafka-run-class.sh kafka-topics.sh zookeeper-shell.sh
kafka-consumer-perf-test.sh kafka-reassign-partitions.sh kafka-server-start.sh windows