@zhangyy
2021-06-01T10:22:21.000000Z
字数 6338
阅读 213
kafka系列
- 一:资源规划
- 二:安装jdk1.8
- 三:安装zookeeper
- 四:安装kafka
- 五:安装kafka Eagle
组件 | node01.vpc.flyfish.cn | node02.vpc.flyfish.cn | node03.vpc.flyfish.cn |
---|---|---|---|
os | centos7.8 | centos7.8 | centos7.8 |
JDK | jdk1.8 | jdk1.8 | jdk1.8 |
zk | QuorumPeerMain | QuorumPeerMain | QuorumPeerMain |
kafka | kafka | kafka | kafka |
所有节点全部安装
rpm -ivh jdk-8u191-linux-x64.rpm
vim /etc/profile
---
export JAVA_HOME=/usr/java/jdk1.8.0_191-amd64
export CLASSPATH=.:$JAVA_HOME/jre/lib:$JAVA_HOME/lib:$JAVA_HOME/lib/tools.jar
PATH=$PATH:$HOME/bin:$JAVA_HOME/bin
---
source /etc/profile
java -version
tar -zxvf zookeeper-3.4.14.tar.gz
mv zookeeper-3.4.14 /usr/local/zookeeper
mkdir -p /usr/local/zookeeper/data
mkdir -p /usr/local/zookeeper/log
cd /usr/local/zookeeper/data/
echo 1 > myid
----
cd /usr/local/zookeeper/conf
cp zoo_sample.cfg zoo.cfg
vim zoo.cfg
----
# 心跳时间
tickTime=2000
# follow连接leader的初始化连接时间,表示tickTime的倍数
initLimit=10
# syncLimit配置表示leader与follower之间发送消息,请求和应答时间长度。如果followe在设置的时间内不能与leader进行通信,那么此follower将被丢弃,tickTime的倍数
syncLimit=5
# 客户端连接端口
clientPort=2181
# 节点数据存储目录,需要提前创建,注意myid添加,用于标识服务器节点
dataDir=/usr/local/zookeeper/data
dataLogDir=/usr/local/zookeeper/log
server.1=192.168.100.11:2888:3888
server.2=192.168.100.12:2888:3888
server.3=192.168.100.13:2888:3888
---
-----
scp -r zookeeper root@192.168.100.12:/usr/local/
scp -r zookeeper root@192.168.100.13:/usr/local/
修改192.168.100.12 节点 myid
cd /usr/local/zookeeper/data/
echo 2 > myid
修改192.168.100.13 节点 myid
cd /usr/local/zookeeper/data/
echo 3 > myid
启动zookeeper
cd /usr/local/zookeeper/bin/
./zkServer.sh start
wget https://mirrors.bfsu.edu.cn/apache/kafka/2.3.1/kafka_2.12-2.3.1.tgz
tar -zxvf kafka_2.12-2.3.1.tgz
mv kafka_2.12-2.3.1 /usr/local/kafka
# kafka数据目录
mkdir -p /usr/local/kafka/tmp/kafka-logs
chmod -R a+w /usr/local/kafka/tmp/kafka-logs
cat server.properties| egrep -v '^#|^*#|^$'
----
broker.id=101 # 修改 broker.id
listeners = PLAINTEXT://192.168.100.11:9092 ##侦听端口
advertised.listeners=PLAINTEXT://192.168.100.11:9092 ##节点侦听端口
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/usr/local/kafka/kafka-logs ## 日志目录
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.100.11:2181,192.168.100.12:2181,192.168.100.13:3181 ### zookeeper 地址
zookeeper.connection.timeout.ms=6000
group.initial.rebalance.delay.ms=0
delete.topic.enable=true ### 删除属性
----
vim zookeeper.properties
---
#Zookeeper的数据存储路径与Zookeeper集群配置保持一致
dataDir=/usr/local/zookeeer/data
---
vim consumer.properties
---
bootstrap.servers=192.168.100.11:9092,192.168.100.12:9092,192.168.100.13:9092
#配置Zookeeper地址
zookeeper.connect=192.168.100.11:2181,192.168.100.12:2181,192.168.100.13:2181
# consumer group id
group.id=test-consumer-group
---
vim producer.properties
---
bootstrap.servers=192.168.100.11:9092,192.168.100.12:9092,192.168.100.13:9092
#配置Zookeeper地址
zookeeper.connect=192.168.0.101:2181,192.168.0.102:2181,192.168.0.103:2181
# specify the compression codec for all data generated: none, gzip, snappy, lz4, zstd
compression.type=none
---
cd /usr/local/kafka/bin
vim kafka-run-class.sh
---
# 行首新增JAVA_HOME配置
export JAVA_HOME=/usr/java/jdk1.8.0_191-amd64
---
scp -r kafka root@192.168.100.12:/usr/local/
scp -r kafka root@192.168.100.13:/usr/local/
修改:192.168.100.12 的 server.properties
vim server.properties
---
broker.id=102
listeners = PLAINTEXT://192.168.100.12:9092
advertised.listeners=PLAINTEXT://192.168.100.12:9092
----
修改:192.168.100.13 的 server.properties
vim server.properties
---
broker.id=103
listeners = PLAINTEXT://192.168.100.13:9092
advertised.listeners=PLAINTEXT://192.168.100.13:9092
----
cd /usr/local/kafka/
# 启动(每个节点)
bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &
bin/kafka-server-start.sh -daemon config/server.properties
# 停止(每个节点)
bin/kafka-server-stop.sh config/server.properties
创建topic。(任意Kafka集群节点)
bin/kafka-topics.sh --zookeeper 192.168.100.11:2181,192.168.100.12:2181,192.168.100.13:2181 --create --topic test --replication-factor 1 --partitions 3
查看topic
bin/kafka-topics.sh --zookeeper 192.168.100.11:2181,192.168.100.12:2181,192.168.100.13:2181 --list
生产者生成数据
bin/kafka-console-producer.sh --broker-list 192.168.100.11:9092,192.168.100.12:9092,192.168.100.13:9092 --topic test
消费者数据:
# --from-beginning 从头开始消费,不加该参数则从最新的消息开始消费,之前的丢弃
# --bootstrap-server 将在kafka集群上创建一个名称为“__consumer_offsets”的topic,50个分区,1个副本,用于存放消费者偏移量
bin/kafka-console-consumer.sh --bootstrap-server 192.168.100.11:9092,192.168.100.12:9092,192.168.100.13:9092 --topic test --from-beginning
bin/kafka-topics.sh --list --zookeeper 192.168.100.11:2181,192.168.100.12:2181,192.168.100.13:2181
bin/kafka-topics.sh --describe --zookeeper 192.168.100.11:2181,192.168.100.12:2181,192.168.100.13:2181 --topic test
删除topic
bin/kafka-topics.sh --delete --zookeeper 192.168.100.11:2181,192.168.100.12:2181,192.168.100.13:2181 --topic test
wget https://codeload.github.com/smartloli/kafka-eagle-bin/tar.gz/v2.0.2
tar -zxvf v2.0.2
cd kafka-eagle-bin-2.0.2
tar -zxvf kafka-eagle-web-2.0.2-bin.tar.gz
mv kafka-eagle-web-2.0.2 /usr/local/eagle
环境变量配置
vim /etc/profile
---
export KE_HOME=/usr/local/eagle
export PATH=$KE_HOME/bin:$PATH
----
source /etc/profile
进入kafka-eagle的conf目录下修改配置文件,配置基础配置
# kafka集群配置
kafka.eagle.zk.cluster.alias=cluster1
cluster1.zk.list=192.168.100.11:2181,192.168.100.12:2181,192.168.100.13:2181
# offset存储配置
cluster1.kafka.eagle.offset.storage=kafka
# 注释或删除以下一行配置
cluster2.kafka.eagle.offset.storage=zk
# 删除topic时需要使用的Token
kafka.eagle.topic.token=polaris
kafka.eagle.metrics.charts=true
kafka.eagle.sql.fix.error=false
kafka.eagle.webui.port=8048
# jdbc配置
kafka.eagle.driver=com.mysql.jdbc.Driver
kafka.eagle.url=jdbc:mysql://192.168.100.11:3306/eagle?createDatabaseIfNotExist=true&
kafka.eagle.username=root
kafka.eagle.password=123456
开启Kafka JMX监控
修改Kafka各节点JMX启动配置,开启监控功能:
cd /usr/local/kafka/bin
vim kafka-server-start.sh
---
if [ "x$KAFKA_HEAP_OPTS" = "x" ]; then
export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70"
export JMX_PORT="9999"
fi
---
scp kafka-server-start.sh root@192.168.100.12:/usr/local/kafka/bin
scp kafka-server-start.sh root@192.168.100.13:/usr/local/kafka/bin
验证Kafka Eagle
cd /usr/local/eagle/bin
./eg.sh start