[关闭]
@sasaki 2016-02-18T17:27:11.000000Z 字数 8987 阅读 7899

Kafka——分布式发布订阅消息系统

BigData


版本控制

  1. @Title Kafka——分布式发布订阅消息系统
  2. @Version v1.0
  3. @Timestamp 2016-01-14 18:37
  4. @Author Nicholas
  5. @Mail redskirt@outlook.com

一、Kafka概述

  1. 消息:是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串;也可以更复杂,可能包含嵌入对象。

  2. 消息队列:消息队列是在消息的传输过程中保存消息的容器,消息传递中间人。

    • 提供路由并保证消息的传递;
    • 如果发送消息时接收者丌可用,消息队列会保留消息,直到可以成功地传递它;
    • 分布式应用中,消息队列是互相交换信息的一种技术。可驻留在内存或磁盘上,队列存储消息直到它们被应用程序读走;
    • 通过消息队列,应用程序可独立地执行——松耦合。
  3. Kafka是由Linkedin公司开发,使用Scala语言编写的一种分布式、分区的、多副本的、多订阅者的日志系统(分布式MQ系统),可以用亍web/nginx日志,搜索日志,监控日志,访问日志等等。
    Kafka的设计初衷是希望作为一个统一的信息收集平台,能够实时的收集反馈信息,并能够支撑较大的数据量,且具备良好的容错能力。

二、Kafka架构

Kafka生产者、消费者和代理环境

Kafka的整体架构

因为Kafka内在就是分布式的,一个Kafka集群通常包括多个代理。为了均衡负载,将话题分成多个分区,每个代理存储一或多个分区。多个生产者和消费者能够同时生产和获取消息。

二、Kafka原理

QQ截图20160114185313.jpg-214.7kB

在Kafka中,发送消息者称为Producer,而消息接收者称为Consumer。
Producer和Consumer都依赖亍Zookeeper来保证系统可用性,为集群保存一些meta信息。

QQ截图20160114192839.jpg-319.4kB

Kafka集群有多个实例组成,每个节点称为broker。对消息保存时根据Topic进行归类(是一个逻辑概念,相当于一个分布式队列),为了加快读取速度,多个comsumer可以划分为一个组,并行消费一个topic。

一个Topic的多个partitions,被分布在kafka集群中的多个server上;每个server(kafka实例)负责partitions中消息的读写操作;此外kafka还可以配置partitions需要备份的个数(replicas),每个partition将会被备份到多台机器上,以提高可用性。
基于replicated方案,那么就意味着需要对多个备份进行调度;每个partition都有一个server为"leader";leader负责所有的读写操作,如果leader失效,那么将会有其他follower来接管(成为新的leader);follower只是单调的和leader跟进,同步消息即可。。由此可见作为leader的server承载了全部的请求压力,因此从集群的整体考虑,有多少个partitions就意味着有多少个"leader",kafka会将"leader"均衡的分散在每个实例上,来确保整体的性能稳定。

Producers
Producer将消息发布到指定的Topic中,同时Producer也能决定将此消息归属于哪个partition;比如基于"round-robin"方式或者通过其他的一些算法等。

Consumers
本质上kafka只支持Topic。每个consumer属于一个consumer group;反过来说,每个group中可以有多个consumer。发送到Topic的消息,只会被订阅此Topic的每个group中的一个consumer消费。
如果所有的consumer都具有相同的group,这种情况和queue模式很像;消息将会在consumers之间负载均衡。
如果所有的consumer都具有不同的group,那这就是"发布-订阅";消息将会广播给所有的消费者。

在kafka中,一个partition中的消息只会被group中的一个consumer消费;每个group中consumer消息消费互相独立;我们可以认为一个group是一个“订阅”者,一个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer可以消费多个partitions中的消息。kafka只能保证一个partition中的消息被某个consumer消费时,消息是顺序的。事实上,从Topic角度来说,消息仍不是有序的。

kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息。

三、构建Kafka单机测试环境

QQ截图20160114192731.jpg-507kB

  1. 下载Kafka,当前最高版本2.11

    1. [root@master usr]# wget http://apache.fayea.com/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
  2. 开启服务

    1. # 启动Kafka自带的Zookeeper
    2. [root@master kafka_2.10-0.8.2.0]# bin/zookeeper-server-start.sh config/zookeeper.properties &
    3. # 启动Kafka服务
    4. [root@master kafka_2.10-0.8.2.0]# bin/kafka-server-start.sh config/server.properties &
  3. 创建topic

    1. [root@master kafka_2.10-0.8.2.0]# bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic testTopic
  4. 查看topic列表

    1. [root@master kafka_2.10-0.8.2.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181
    2. [2016-01-16 16:04:47,352] INFO Accepted socket connection from /127.0.0.1:49148 (org.apache.zookeeper.server.NIOServerCnxnFactory)
    3. [2016-01-16 16:04:47,356] INFO Client attempting to establish new session at /127.0.0.1:49148 (org.apache.zookeeper.server.ZooKeeperServer)
    4. [2016-01-16 16:04:47,358] INFO Established session 0x152496f34a3000c with negotiated timeout 30000 for client /127.0.0.1:49148 (org.apache.zookeeper.server.ZooKeeperServer)
    5. my-replicated-topic1
    6. my-replicatedtopic1
    7. my-replicatedtopic2
    8. testTopic
    9. [2016-01-16 16:04:47,422] INFO Processed session termination for sessionid: 0x152496f34a3000c (org.apache.zookeeper.server.PrepRequestProcessor)
    10. [2016-01-16 16:04:47,424] INFO Closed socket connection for client /127.0.0.1:49148 which had sessionid 0x152496f34a3000c (org.apache.zookeeper.server.NIOServerCnxn)
  5. 模拟生产者(producer)

    1. bin/kafka-console-producer.sh --broker-list localhost:9092 -- topic testTopic
    2. This is a message
  6. 模拟消费者(consumer)

    1. # 另开一个终端,正常情况下,会输出:producer 的输入信息
    2. bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic testTopic --from-beginning

四、构建Kafka集群环境

在slave01配置中更改zookeeper指向master节点。

  1. [root@master kafka_2.10-0.8.2.0]# vim config/server.properties
  2. ...
  3. zookeeper.connect=master:2181

创建新的topic

  1. [root@master kafka_2.10-0.8.2.0]# bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 2 --partitions 3 --topic my-replicated-topic1

查看topic信息

  1. [root@master kafka_2.10-0.8.2.0]# bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic1
  2. [2016-01-16 16:01:51,896] INFO Accepted socket connection from /127.0.0.1:48967 (org.apache.zookeeper.server.NIOServerCnxnFactory)
  3. [2016-01-16 16:01:51,901] INFO Client attempting to establish new session at /127.0.0.1:48967 (org.apache.zookeeper.server.ZooKeeperServer)
  4. [2016-01-16 16:01:51,903] INFO Established session 0x152496f34a3000b with negotiated timeout 30000 for client /127.0.0.1:48967 (org.apache.zookeeper.server.ZooKeeperServer)
  5. Topic:my-replicated-topic1 PartitionCount:3 ReplicationFactor:2 Configs:
  6. Topic: my-replicated-topic1 Partition: 0 Leader: 0 Replicas: 0,1 Isr: 0,1
  7. Topic: my-replicated-topic1 Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0
  8. Topic: my-replicated-topic1 Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
  9. [2016-01-16 16:01:52,384] INFO Processed session termination for sessionid: 0x152496f34a3000b (org.apache.zookeeper.server.PrepRequestProcessor)
  10. [2016-01-16 16:01:52,388] INFO Closed socket connection for client /127.0.0.1:48967 which had sessionid 0x152496f34a3000b (org.apache.zookeeper.server.NIOServerCnxn) bin/kafka-topics.sh --describe --zookeeper master:2181 --topic my-replicated-topic1

模拟生产者(producer)

  1. [root@master kafka_2.10-0.8.2.0]# bin/kafka-console-producer.sh --broker-list master:9092 --topic my-replicated-topic1
  2. [2016-01-16 16:13:49,399] WARN Property topic is not valid (kafka.utils.VerifiableProperties)
  3. hello
  4. [2016-01-16 16:13:53,595] INFO Closing socket connection to /192.168.53.230. (kafka.network.Processor)

模拟消费者(consumer),接收消息

  1. [root@slave01 kafka_2.10-0.8.2.0]# bin/kafka-console-consumer.sh --zookeeper master:2181 --from-beginning --topic my-replicated-topic1
  2. [2016-01-16 16:15:01,518] INFO Closing socket connection to /192.168.53.231. (kafka.network.Processor)
  3. hello

QQ截图20160116161828.jpg-1503.9kB

五、Kafka基本操作

  1. server.properties 中设置delete.topic.enable=true则可以使用删除topic;

    1. [root@master kafka_2.10-0.8.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic my-replicatedtopic1
    2. Topic my-replicatedtopic1 is marked for deletion.
    3. Note: This will have no impact if delete.topic.enable is not set to true.
    4. # 需要设置 delete.topic.enable=true 才可进行删除操作
    5. [root@master kafka_2.10-0.8.2.0]# bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic my-replicatedtopic1
    6. Topic my-replicatedtopic1 is already marked for deletion.
    7. [root@master kafka_2.10-0.8.2.0]# bin/kafka-topics.sh --list --zookeeper localhost:2181
    8. my-replicated-topic1
    9. my-replicatedtopic1 - marked for deletion
    10. my-replicatedtopic2
    11. testTopic
  2. 查看consumer组内消费的offset
    每个consumer的创建都会自动创建一个group,在zookeeper中

    1. [root@slave01 kafka_2.10-0.8.2.0]# bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker --zookeeper localhost:2181 --group test --topic my-topic

四、Kafka存储

  1. 每个replica一个目录

    1. [root@master kafka_2.10-0.8.2.0]# cd /tmp/kafka-logs/
    2. [root@master kafka-logs]# ls
    3. __consumer_offsets-0 __consumer_offsets-20 __consumer_offsets-32 __consumer_offsets-44 my-replicatedtopic1-0
    4. __consumer_offsets-1 __consumer_offsets-21 __consumer_offsets-33 __consumer_offsets-45 my-replicated-topic1-1
    1. 二级结构
    1. [root@master kafka-logs]# cd my-replicated-topic1-0/
    2. [root@master my-replicated-topic1-0]# ls
    3. 00000000000000000000.index 00000000000000000000.log

    图片1.png-259kB

五、Kafka HA

  1. Replication

    • 0.8以前没有Replication,一旦某台brocker宕机,其上partition数据便丢失。
    • 引入Replication
    • 多Replication引入之后,需要Leader Election
    • Leader Election
      |- 多个Replica之间选个老大
      |- Leader负责数据读写,follower只向Leader顺序获取数据
    • 分布
      |- 同一个partition的不同Replica分布到不同机器
      |- 第i个Partition分配到第(i mod n)个Broker上
      |- 第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上
    • 消息扩散
      |- Producer通过zookeeper找到Leader并向其发送消息
      |- Leader将数据写入本地log
      |- Follower从Leader获取数据之后(还未写本地log),就像Leader发送ACK
      |- Leader收到所有Follower ACK之后,恢复发送端写入成功

    • |- Consumer从Leader读
  2. 多replica写
    图片2.png-95kB

在Zookeeper中查看Kafka角色

  1. [root@master my-replicated-topic1-0]# /usr/bin/zookeeper-client
  2. [zk: localhost:2181(CONNECTED) 1] ls /
  3. [consumers, config, controller, isr_change_notification, admin, brokers, zookeeper, controller_epoch]
  4. # 查看Controller
  5. [zk: localhost:2181(CONNECTED) 2] get /controller
  6. {"version":1,"brokerid":0,"timestamp":"1453122955298"}
  7. cZxid = 0x171
  8. ctime = Mon Jan 18 21:15:55 CST 2016
  9. mZxid = 0x171
  10. mtime = Mon Jan 18 21:15:55 CST 2016
  11. pZxid = 0x171
  12. cversion = 0
  13. dataVersion = 0
  14. aclVersion = 0
  15. ephemeralOwner = 0x152496f34a30016
  16. dataLength = 54
  17. numChildren = 0
  18. # 查看删除的Topic
  19. [zk: localhost:2181(CONNECTED) 3] ls /admin/delete_topics
  20. [my-replicatedtopic1]
  21. # 查看当前Brokers
  22. [zk: localhost:2181(CONNECTED) 4] ls /brokers/ids
  23. [0]

六、Kafka工具

Kafka的工具都以脚本形式存在

  1. [root@master my-replicated-topic1-0]# cd /usr/kafka_2.10-0.8.2.0/bin/
  2. [root@master bin]# ls
  3. kafka-console-consumer.sh kafka-mirror-maker.sh kafka-replay-log-producer.sh kafka-server-stop.sh zookeeper-server-start.sh
  4. kafka-console-producer.sh kafka-preferred-replica-election.sh kafka-replica-verification.sh kafka-simple-consumer-shell.sh zookeeper-server-stop.sh
  5. kafka-consumer-offset-checker.sh kafka-producer-perf-test.sh kafka-run-class.sh kafka-topics.sh zookeeper-shell.sh
  6. kafka-consumer-perf-test.sh kafka-reassign-partitions.sh kafka-server-start.sh windows
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注