[关闭]
@1234567890 2017-05-22T13:27:51.000000Z 字数 10457 阅读 1441

Kafka

消息队列


server.property

  1. # The id of the broker. This must be set to a unique integer for each broker.
  2. # 非常重要的一个属性,在Kafka集群中每一个brocker的id一定要不一样,否则启动时会报错
  3. broker.id=2
  4. # The port the socket server listens on
  5. port=9092
  6. # Hostname the broker will bind to. If not set, the server will bind to all interfaces
  7. #host.name=localhost
  8. # The number of threads handling network requests
  9. num.network.threads=2
  10. # The number of threads doing disk I/O
  11. # 故名思议,就是有多少个线程同时进行磁盘IO操作。
  12. # 这个值实际上并不是设置得越大性能越好。
  13. # 在我后续的“存储”专题会讲到,如果您提供给Kafka使用的文件系统物理层只有一个磁头在工作
  14. # 那么这个值就变得没有任何意义了
  15. num.io.threads=8
  16. # The send buffer (SO_SNDBUF) used by the socket server
  17. socket.send.buffer.bytes=1048576
  18. # The receive buffer (SO_RCVBUF) used by the socket server
  19. socket.receive.buffer.bytes=1048576
  20. # The maximum size of a request that the socket server will accept (protection against OOM)
  21. socket.request.max.bytes=104857600
  22. # A comma seperated list of directories under which to store log files
  23. # 很多开发人员在使用Kafka时,不重视这个属性。
  24. # 实际上Kafka的工作性能绝大部分就取决于您提供什么样的文件系统
  25. log.dirs=/tmp/kafka-logs
  26. # The default number of log partitions per topic. More partitions allow greater
  27. # parallelism for consumption, but this will also result in more files across the brokers.
  28. num.partitions=2
  29. # The number of messages to accept before forcing a flush of data to disk
  30. # 从Page Cache中将消息正式写入磁盘上的阀值:以待转储消息数量为依据
  31. #log.flush.interval.messages=10000
  32. # The maximum amount of time a message can sit in a log before we force a flush
  33. # 从Page Cache中将消息正式写入磁盘上的阀值:以转储间隔时间为依据
  34. #log.flush.interval.ms=1000
  35. # The minimum age of a log file to be eligible for deletion
  36. # log消息信息保存时长,默认为168个小时
  37. log.retention.hours=168
  38. # A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
  39. # segments don't drop below log.retention.bytes.
  40. # 默认为1GB,在此之前log文件不会执行删除策略
  41. # 实际环境中,由于磁盘空间根本不是问题,并且内存空间足够大。所以笔者会将这个值设置的较大,例如100GB。
  42. #log.retention.bytes=1073741824
  43. # The maximum size of a log segment file.
  44. # When this size is reached a new log segment will be created.
  45. # 默认为512MB,当达到这个大小,Kafka将为这个Partition创建一个新的分段文件
  46. log.segment.bytes=536870912
  47. # The interval at which log segments are checked to see if they can be deleted according
  48. # to the retention policies
  49. # 文件删除的保留策略,多久被检查一次(单位毫秒)
  50. # 实际生产环境中,6-12小时检查一次就够了
  51. log.retention.check.interval.ms=60000
  52. # By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
  53. # If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
  54. log.cleaner.enable=false
  55. ############################# Zookeeper #############################
  56. # Zookeeper connection string (see zookeeper docs for details).
  57. # root directory for all kafka znodes.
  58. # 到zookeeper的连接信息,如果有多个zookeeper服务节点,则使用“,”进行分割
  59. # 例如:127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002
  60. zookeeper.connect=192.168.61.140:2181
  61. # Timeout in ms for connecting to zookeeper
  62. # zookeeper连接超时时间
  63. zookeeper.connection.timeout.ms=1000000

Kafka常用命令

启动kafka

  1. //1.进入zookeeper目录,启动zk
  2. bin/zkServer.sh start
  3. //2.进入kafka目录,启动kafka
  4. bin/kafka-server-start.sh config/server.properties

创建topic

  1. kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

生产者

  1. kafka-console-producer.sh --broker-list localhost:9092 --topic test

消费者

  1. kafka-console-consumer.sh --zookeeper localhost:2181 --topic test

查看topic状态

  1. kafka-topics.sh --describe --zookeeper 192.168.61.139:2181 --topic my_topic2

Kafka结构

kafka.bmp-2062.7kB

kafka复制

同步和异步发送

消息生产这还可以决定是以同步方式向Broker发送消息还是以异步方式向Broker发送消息。只需要使用生产者配置中的“producer.type”属性进行指定。当该属性值为“sync”时,表示使用同步发送的方式;当该属性值为“async”时,表示使用异步发送方式。
在异步发送方式下,开发人员调用send方法发送消息时,这个消息并不会立即被发送到topic指定的Leader partition所在的Broker,而是会存储在本地的一个缓冲区域(一定注意是客户端本地)。当缓冲区的状态满足最长等待时间或者最大数据量条数时,消息会以一个设置值批量发送给Broker。

20160504152347893-17.9kB

强一致性复制和弱一致性复制

在Kafka的实现中,强一致性复制是指当Leader Partition收到消息后,将在所有Follower partition完成这条消息的复制后才认为消息处理成功,并向消息生产者返回ack信息;弱一致性复制是指当Leader partition收到消息后,只要Leader Broker自己完成了消息的存储就认为消息处理成立,并向消息生产者返回ack信息(复制过程随后由Broker节点自行完成);

  1. // 可以通过这个属性控制复制过程的一致性规则 //props.put("request.required.acks", "1");
  1. # 脚本命令范例
  2. kafka-topics.sh --describe --zookeeper 192.168.61.139:2181 --topic my_topic2
  3. # 显示的结果
  4. Topic:my_topic2 PartitionCount:4 ReplicationFactor:2 Configs:
  5. Topic: my_topic2 Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
  6. Topic: my_topic2 Partition: 1 Leader: 1 Replicas: 1,2 Isr: 1,2
  7. Topic: my_topic2 Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1
  8. Topic: my_topic2 Partition: 3 Leader: 1 Replicas: 1,2 Isr: 1,2

生产者代码

  1. package cn.wht.kafka;
  2. import java.util.Date;
  3. import java.util.Properties;
  4. import kafka.javaapi.producer.Producer;
  5. import kafka.producer.KeyedMessage;
  6. import kafka.producer.ProducerConfig;
  7. /**
  8. * Created by haoting.wang on 2016/11/29.
  9. */
  10. public class KafkaProducer {
  11. public static void main(String[] args) {
  12. Properties props = new Properties();
  13. // 指定kafka节点列表,不需要由zookeeper进行协调
  14. // 并且连接的目的也不是为了发送消息,而是为了在这些节点列表中选取一个,来获取topic的分区状况
  15. props.put("metadata.broker.list", "127.0.0.1:9092");
  16. // 使用这个属性可以指定“将消息送到topic的哪一个partition中”,如果业务规则比较复杂的话可以指定分区控制器
  17. // 不过开发者最好要清楚topic有多少个分区,这样才好进行多线程(负载均衡)发送
  18. //props.put("partitioner.class", "kafkaTQ.PartitionerController");
  19. // 可以通过这个参数控制是异步发送还是同步发送(默认为“同步”)
  20. //props.put("producer.type", "async");
  21. // 可以通过这个属性控制复制过程的一致性规则
  22. //props.put("request.required.acks", "1");
  23. ProducerConfig config = new ProducerConfig(props);
  24. // 创建消费者
  25. Producer<byte[], byte[]> producer = new Producer<byte[], byte[]>(config);
  26. // 由于我们为topic创建了四个partition,所以将数据分别发往这四个分区
  27. for (Integer partitionIndex = 0; ; partitionIndex++) {
  28. Date time = new Date();
  29. // 创建和发送消息,可以指定这条消息的key,producer根据这个key来决定这条消息发送到哪个parition中
  30. // 另外一个可以决定parition的方式是实现kafka.producer.Partitioner接口
  31. String messageContext_Value = new Date().toString()+"this message from producer 由producer指的partitionIndex:[" + partitionIndex % 4 + "]" + time.getTime();
  32. System.out.println(messageContext_Value);
  33. byte[] messageContext = messageContext_Value.getBytes();
  34. byte[] key = partitionIndex.toString().getBytes();
  35. // 这是消息对象,请注意第二个参数和第三个参数,如果第三个参数没有被赋值,则使用第二个参数作为分区依据。所以在使用KeyedMessage类的构造函数时,您只需要指定其中的一个就完全够了。详情看下面KeyedMessage类(scala语言)
  36. KeyedMessage<byte[], byte[]> message = new KeyedMessage<byte[], byte[]>("my_topic2", key , partitionIndex % 4 , messageContext);
  37. producer.send(message);
  38. // 休息0.5秒钟,循环发
  39. synchronized (KafkaProducer.class) {
  40. try {
  41. KafkaProducer.class.wait(500);
  42. } catch (InterruptedException e) {
  43. e.printStackTrace(System.out);
  44. }
  45. }
  46. }
  47. }
  48. }
  1. package kafka.producer
  2. /**
  3. * A topic, key, and value.
  4. * If a partition key is provided it will override the key for the purpose of partitioning but will not be stored.
  5. */
  6. case class KeyedMessage[K, V](val topic: String, val key: K, val partKey: Any, val message: V) {
  7. if(topic == null)
  8. throw new IllegalArgumentException("Topic cannot be null.")
  9. def this(topic: String, message: V) = this(topic, null.asInstanceOf[K], null, message)
  10. def this(topic: String, key: K, message: V) = this(topic, key, key, message)
  11. def partitionKey = {
  12. if(partKey != null)
  13. partKey
  14. else if(hasKey)
  15. key
  16. else
  17. null
  18. }
  19. def hasKey = key != null
  20. }

消费者代码

  1. package cn.wht.kafka;
  2. import java.util.HashMap;
  3. import java.util.List;
  4. import java.util.Map;
  5. import java.util.Properties;
  6. import kafka.consumer.Consumer;
  7. import kafka.consumer.ConsumerConfig;
  8. import kafka.consumer.ConsumerIterator;
  9. import kafka.consumer.KafkaStream;
  10. import kafka.javaapi.consumer.ConsumerConnector;
  11. import kafka.message.MessageAndMetadata;
  12. /**
  13. * 这是Kafka的topic消费者
  14. * @author yinwenjie
  15. */
  16. public class KafkaConsumer_GroupOne {
  17. public static void main(String[] args) throws RuntimeException {
  18. // ==============首先各种连接属性
  19. // Kafka消费者的完整连接属性在Apache Kafka官网http://kafka.apache.org/documentation.html#consumerconfigs
  20. // 有详细介绍(请参看Old Consumer Configs。New Consumer Configs是给Kafka V0.9.0.0+使用的)
  21. // 这里我们设置几个关键属性
  22. Properties props = new Properties();
  23. // zookeeper相关的,如果有多个zk节点,这里以“,”进行分割
  24. props.put("zookeeper.connect", "127.0.0.1:2181");
  25. props.put("zookeeper.connection.timeout.ms", "10000");
  26. // 还记得上文的说明吗:对于一个topic而言,同一用户组内的所有用户只被允许访问一个分区。
  27. // 所以要让多个Consumer实现对一个topic的负载均衡,每个groupid的名称都要一样
  28. String groupname = "group2";
  29. props.put("group.id", groupname);
  30. //==============
  31. ConsumerConfig consumerConfig = new ConsumerConfig(props);
  32. ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
  33. // 我们只创建一个消费者
  34. HashMap<String, Integer> map = new HashMap<String, Integer>();
  35. map.put("my_topic2", 1);
  36. Map<String, List<KafkaStream<byte[], byte[]>>> topicMessageStreams = consumerConnector.createMessageStreams(map);
  37. // 获取并启动消费线程,注意看关键就在这里,一个消费线程可以负责消费一个topic中的多个partition
  38. // 但是一个partition只能分配到一个消费线程去
  39. KafkaStream<byte[], byte[]> stream = topicMessageStreams.get("my_topic2").get(0);
  40. new Thread(new ConsumerThread(stream)).start();
  41. // 接着锁住主线程,让其不退出
  42. synchronized (KafkaConsumer_GroupOne.class) {
  43. try {
  44. KafkaConsumer_GroupOne.class.wait();
  45. } catch (InterruptedException e) {
  46. e.printStackTrace(System.out);
  47. }
  48. }
  49. }
  50. /**
  51. * @author yinwenjie
  52. */
  53. private static class ConsumerThread implements Runnable {
  54. private KafkaStream<byte[], byte[]> stream;
  55. /**
  56. * @param stream
  57. */
  58. public ConsumerThread(KafkaStream<byte[], byte[]> stream) {
  59. this.stream = stream;
  60. }
  61. public void run() {
  62. ConsumerIterator<byte[], byte[]> iterator = this.stream.iterator();
  63. //============这个消费者获取的数据在这里
  64. while(iterator.hasNext()){
  65. MessageAndMetadata<byte[], byte[]> message = iterator.next();
  66. int partition = message.partition();
  67. String topic = message.topic();
  68. String messageT = new String(message.message());
  69. System.out.println("接收到: " + messageT + "来自于topic:[" + topic + "] + 第partition[" + partition + "]");
  70. }
  71. }
  72. }
  73. }
添加新批注
在作者公开此批注前,只有你和作者可见。
回复批注