@1234567890
2017-05-22T13:27:51.000000Z
字数 10457
阅读 1441
消息队列
# The id of the broker. This must be set to a unique integer for each broker.
# 非常重要的一个属性,在Kafka集群中每一个brocker的id一定要不一样,否则启动时会报错
broker.id=2
# The port the socket server listens on
port=9092
# Hostname the broker will bind to. If not set, the server will bind to all interfaces
#host.name=localhost
# The number of threads handling network requests
num.network.threads=2
# The number of threads doing disk I/O
# 故名思议,就是有多少个线程同时进行磁盘IO操作。
# 这个值实际上并不是设置得越大性能越好。
# 在我后续的“存储”专题会讲到,如果您提供给Kafka使用的文件系统物理层只有一个磁头在工作
# 那么这个值就变得没有任何意义了
num.io.threads=8
# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=1048576
# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=1048576
# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600
# A comma seperated list of directories under which to store log files
# 很多开发人员在使用Kafka时,不重视这个属性。
# 实际上Kafka的工作性能绝大部分就取决于您提供什么样的文件系统
log.dirs=/tmp/kafka-logs
# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across the brokers.
num.partitions=2
# The number of messages to accept before forcing a flush of data to disk
# 从Page Cache中将消息正式写入磁盘上的阀值:以待转储消息数量为依据
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
# 从Page Cache中将消息正式写入磁盘上的阀值:以转储间隔时间为依据
#log.flush.interval.ms=1000
# The minimum age of a log file to be eligible for deletion
# log消息信息保存时长,默认为168个小时
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
# 默认为1GB,在此之前log文件不会执行删除策略
# 实际环境中,由于磁盘空间根本不是问题,并且内存空间足够大。所以笔者会将这个值设置的较大,例如100GB。
#log.retention.bytes=1073741824
# The maximum size of a log segment file.
# When this size is reached a new log segment will be created.
# 默认为512MB,当达到这个大小,Kafka将为这个Partition创建一个新的分段文件
log.segment.bytes=536870912
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
# 文件删除的保留策略,多久被检查一次(单位毫秒)
# 实际生产环境中,6-12小时检查一次就够了
log.retention.check.interval.ms=60000
# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false
############################# Zookeeper #############################
# Zookeeper connection string (see zookeeper docs for details).
# root directory for all kafka znodes.
# 到zookeeper的连接信息,如果有多个zookeeper服务节点,则使用“,”进行分割
# 例如:127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002
zookeeper.connect=192.168.61.140:2181
# Timeout in ms for connecting to zookeeper
# zookeeper连接超时时间
zookeeper.connection.timeout.ms=1000000
启动kafka
//1.进入zookeeper目录,启动zk
bin/zkServer.sh start
//2.进入kafka目录,启动kafka
bin/kafka-server-start.sh config/server.properties
创建topic
kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
生产者
kafka-console-producer.sh --broker-list localhost:9092 --topic test
消费者
kafka-console-consumer.sh --zookeeper localhost:2181 --topic test
查看topic状态
kafka-topics.sh --describe --zookeeper 192.168.61.139:2181 --topic my_topic2
整个Kafka集群中,可以有多个消息生产者。这些消息生产者可能在同一个物理节点上,也可能在不同的物理节点。它们都必须知道哪些Kafka Broker List是将要发送的目标:消息生产者会决定发送的消息将会送入Topic的哪一个分区(Partition)。
消费者都是按照“组”的单位进行消息隔离:在同一个Topic下,消息生产者发送一条消息后,同一个Topic下不同组的消费者都会收到这条信息。
同一组下的消息消费者可以消费Topic下一个分区或者多个分区中的消息,但是一个分区中的消息只能被同一组下的某一个消息消费者所处理。
由于存在以上的操作规则,所以Kafka集群中Consumer(消费者)需要和Kafka集群中的Server Broker进行协调工作:这个协调工作者交给了Zookeeper集群。zookeeper集群需要记录/协调的工作包括:当前整个Kafka集群中有哪些Broker节点以及每一个节点处于什么状态(活动/离线/状态)、当前集群中所有已创建的Topic以及分区情况、当前集群中所有活动的消费者组/消费者、每一个消费者组针对每个topic的索引位置等。
如果当前消费者连接时,发现整个Kafka集群中存在一个消费者(记为消费者A)关联Topic下多个分区的情况,且消费者A处于繁忙无法处理这些分区下新的消息(即消费者A的上一批Pull的消息还没有处理完成)。这时新的消费者将接替原消费者A所关联的一个(或者多个)分区,并且一直保持和这个分区的关联。
由于Kafka集群中只保证同一个分区(Partition)下消息队列中消息的顺序。所以当一个或者多个消费者分别Pull一个Topic下的多个消息分区时,您在消费者端观察的现象可能就是消息顺序是混乱的。这里我们一直在说消费者端的Pull行为,是指的Topic下分区中的消息并不是由Broker主动推送到(Push)到消费者端,而是由消费者端主动拉取(Pull)。
Kafka将分区的多个副本分为两种角色:Leader和Follower,Leader Broker是主要服务节点,消息只会从消息生产者发送给Leader Broker,消息消费者也只会从Leader Broker中Pull消息。Follower Broker为副本服务节点,正常情况下不会公布给生产者或者消费者直接进行操作。Follower Broker服务节点将会主动从Leader Broker上Pull消息。
在这种工作机制下,Follower和Leader的消息复制过程由于Follower服务节点的性能、压力、网络等原因,它们和Leader服务节点会有一个消息差异性。当这个差异性扩大到一定的范围,Leader节点就会认为这个Follower节点再也跟不上自己的节奏,导致的结果就是Leader节点会将这个Follower节点移出“待同步副本集”ISR(in-sync replicas),不再关注这个Follower节点的同步问题。
只有当ISR中所有分区副本全部完成了某一条消息的同步过程,这条消息才算真正完成了“记录”操作。只有这样的消息才会发送给消息消费者。至于这个真正完成“记录”操作的通知是否能返回给消息生产者,完全取决于消息生产者采用的acks模式。----->强一致性复制和弱一致性复制
消息生产这还可以决定是以同步方式向Broker发送消息还是以异步方式向Broker发送消息。只需要使用生产者配置中的“producer.type”属性进行指定。当该属性值为“sync”时,表示使用同步发送的方式;当该属性值为“async”时,表示使用异步发送方式。
在异步发送方式下,开发人员调用send方法发送消息时,这个消息并不会立即被发送到topic指定的Leader partition所在的Broker,而是会存储在本地的一个缓冲区域(一定注意是客户端本地)。当缓冲区的状态满足最长等待时间或者最大数据量条数时,消息会以一个设置值批量发送给Broker。
在Kafka的实现中,强一致性复制是指当Leader Partition收到消息后,将在所有Follower partition完成这条消息的复制后才认为消息处理成功,并向消息生产者返回ack信息;弱一致性复制是指当Leader partition收到消息后,只要Leader Broker自己完成了消息的存储就认为消息处理成立,并向消息生产者返回ack信息(复制过程随后由Broker节点自行完成);
// 可以通过这个属性控制复制过程的一致性规则 //props.put("request.required.acks", "1");
当acks设置为0时,生产者端不会等待Server Broker回执任何的ACK确认信息。只是将要发送的消息交给网络层。这种情况下,消息是否真的到达了Server Broker,实际上生产者端并不知道。由于生产者端并不等待Server Broker回执任何的ACK确认信息,那么消息一旦传输失败(例如,等待超时的情况)“重试”过程就无从谈起了。由于生产者端在这种情况下发送的消息,很可能Server Broker还没来得及处理,甚至更有可能Server Broker都没有接收到,所以Server Broker也无法告知生产者这条消息在分区中的偏移位置。
当acks设置为1时,生产者发送消息将等待这个分区的Leader Server Broker 完成它本地的消息记录操作,但不会等待这个分区下其它Follower Server Brokers的操作。在这种情况下,虽然Leader Server Broker对消息的处理成功了,也返回了ACK信息给生产者端,但是在进行副本复制时,还是可能失败。
当acks设置为“all”时,消息生产者发送消息时将会等待目标分区的Leader Server Broker以及所有的Follower Server Brokers全部处理完,才会得到ACK确认信息。这样的处理逻辑下牺牲了一部分性能,但是消息存储可靠性是最高的。
# 脚本命令范例
kafka-topics.sh --describe --zookeeper 192.168.61.139:2181 --topic my_topic2
# 显示的结果
Topic:my_topic2 PartitionCount:4 ReplicationFactor:2 Configs:
Topic: my_topic2 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: my_topic2 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: my_topic2 Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: my_topic2 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2
package cn.wht.kafka;
import java.util.Date;
import java.util.Properties;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
/**
* Created by haoting.wang on 2016/11/29.
*/
public class KafkaProducer {
public static void main(String[] args) {
Properties props = new Properties();
// 指定kafka节点列表,不需要由zookeeper进行协调
// 并且连接的目的也不是为了发送消息,而是为了在这些节点列表中选取一个,来获取topic的分区状况
props.put("metadata.broker.list", "127.0.0.1:9092");
// 使用这个属性可以指定“将消息送到topic的哪一个partition中”,如果业务规则比较复杂的话可以指定分区控制器
// 不过开发者最好要清楚topic有多少个分区,这样才好进行多线程(负载均衡)发送
//props.put("partitioner.class", "kafkaTQ.PartitionerController");
// 可以通过这个参数控制是异步发送还是同步发送(默认为“同步”)
//props.put("producer.type", "async");
// 可以通过这个属性控制复制过程的一致性规则
//props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
// 创建消费者
Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(config);
// 由于我们为topic创建了四个partition,所以将数据分别发往这四个分区
for (Integer partitionIndex = 0; ; partitionIndex++) {
Date time = new Date();
// 创建和发送消息,可以指定这条消息的key,producer根据这个key来决定这条消息发送到哪个parition中
// 另外一个可以决定parition的方式是实现kafka.producer.Partitioner接口
String messageContext_Value = new Date().toString()+"this message from producer 由producer指的partitionIndex:[" + partitionIndex % 4 + "]" + time.getTime();
System.out.println(messageContext_Value);
byte[] messageContext = messageContext_Value.getBytes();
byte[] key = partitionIndex.toString().getBytes();
// 这是消息对象,请注意第二个参数和第三个参数,如果第三个参数没有被赋值,则使用第二个参数作为分区依据。所以在使用KeyedMessage类的构造函数时,您只需要指定其中的一个就完全够了。详情看下面KeyedMessage类(scala语言)
KeyedMessage<byte[], byte[]> message = new KeyedMessage<byte[], byte[]>("my_topic2", key , partitionIndex % 4 , messageContext);
producer.send(message);
// 休息0.5秒钟,循环发
synchronized (KafkaProducer.class) {
try {
KafkaProducer.class.wait(500);
} catch (InterruptedException e) {
e.printStackTrace(System.out);
}
}
}
}
}
package kafka.producer
/**
* A topic, key, and value.
* If a partition key is provided it will override the key for the purpose of partitioning but will not be stored.
*/
case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) {
if(topic == null)
throw new IllegalArgumentException("Topic cannot be null.")
def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)
def this(topic: String, key: K, message: V) = this(topic, key, key, message)
def partitionKey = {
if(partKey != null)
partKey
else if(hasKey)
key
else
null
}
def hasKey = key != null
}
package cn.wht.kafka;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
/**
* 这是Kafka的topic消费者
* @author yinwenjie
*/
public class KafkaConsumer_GroupOne {
public static void main(String[] args) throws RuntimeException {
// ==============首先各种连接属性
// Kafka消费者的完整连接属性在Apache Kafka官网http://kafka.apache.org/documentation.html#consumerconfigs
// 有详细介绍(请参看Old Consumer Configs。New Consumer Configs是给Kafka V0.9.0.0+使用的)
// 这里我们设置几个关键属性
Properties props = new Properties();
// zookeeper相关的,如果有多个zk节点,这里以“,”进行分割
props.put("zookeeper.connect", "127.0.0.1:2181");
props.put("zookeeper.connection.timeout.ms", "10000");
// 还记得上文的说明吗:对于一个topic而言,同一用户组内的所有用户只被允许访问一个分区。
// 所以要让多个Consumer实现对一个topic的负载均衡,每个groupid的名称都要一样
String groupname = "group2";
props.put("group.id", groupname);
//==============
ConsumerConfig consumerConfig = new ConsumerConfig(props);
ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
// 我们只创建一个消费者
HashMap<String, Integer> map = new HashMap<String, Integer>();
map.put("my_topic2", 1);
Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumerConnector.createMessageStreams(map);
// 获取并启动消费线程,注意看关键就在这里,一个消费线程可以负责消费一个topic中的多个partition
// 但是一个partition只能分配到一个消费线程去
KafkaStream<byte[], byte[]> stream = topicMessageStreams.get("my_topic2").get(0);
new Thread(new ConsumerThread(stream)).start();
// 接着锁住主线程,让其不退出
synchronized (KafkaConsumer_GroupOne.class) {
try {
KafkaConsumer_GroupOne.class.wait();
} catch (InterruptedException e) {
e.printStackTrace(System.out);
}
}
}
/**
* @author yinwenjie
*/
private static class ConsumerThread implements Runnable {
private KafkaStream<byte[], byte[]> stream;
/**
* @param stream
*/
public ConsumerThread(KafkaStream<byte[], byte[]> stream) {
this.stream = stream;
}
public void run() {
ConsumerIterator<byte[], byte[]> iterator = this.stream.iterator();
//============这个消费者获取的数据在这里
while(iterator.hasNext()){
MessageAndMetadata<byte[], byte[]> message = iterator.next();
int partition = message.partition();
String topic = message.topic();
String messageT = new String(message.message());
System.out.println("接收到: " + messageT + "来自于topic:[" + topic + "] + 第partition[" + partition + "]");
}
}
}
}