准备软件
软件 | 版本 | 下载地址 |
---|---|---|
jdk | jdk_8u261 | 下载地址;提取码:500w |
zookeeeper | zookeeper 3.6.1 | 下载地址 |
kafka | kafka_2.13-2.5.0 | 下载地址 |
kafka-manager | kafka-manager-1.3.3.4 | 下载地址;提取码:nse2 |
安装Jdk
- 解压并将文件移动到/opt/java目录下
sudo tar -vxzf jdk-8u201-linux-x64.tar.gz
sudo mv jdk1.8.0_201 /opt/java
- 配置java环境变量
vim /etc/profile
- 在profile文件的最后面添加
export JAVA_HOME=/opt/java
export PATH=$JAVA_HOME/bin:$PATH
- 环境变量立即生效
source /etc/profile
- 添加成功后通过java -version来查看jdk是否安装成功
java -version
安装zookeeper集群
- 解压并将文件移动到/opt/zookeeper目录下
sudo tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz
sudo mv apache-zookeeper-3.6.1-bin /opt/zookeeper
- 编辑zookeeper的配置(本例子中是3个节点)
# 复制默认配置文件
cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg
# 修改默认配置
vi /opt/zookeeper/conf/zoo.cfg
- 配置说明
# 心跳和超时时间,单位毫秒
tickTime=2000
# 指定follower节点初始时连接leader节点的最大tick次数(5*tickTime=10秒),否则被视为超时
initLimit=10
# follwer节点与leader节点进行同步的最大时间(tick次数)
syncLimit=5
# 内存中保存系统快照的位置,生产环境中注意该文件夹的磁盘占用情况
dataDir=/opt/zookeeper/data
# 日志目录
dataLogDir=/opt/zookeeper/logs
# zookeeper监听客户端连接的端口(同一台机器用不同端口,不同机器可以用相同或者不同的端口),默认是2181
clientPort=2181
# X必须是一个全局唯一的数字,且需要与myid文件中的数字相对应,host可以是域名/机器名/IP,port1用于follower节点连接leader,port2用于leader选举
server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888
- 创建刚才配置的data,logs目录
mkdir -p /opt/zookeeper/{data,logs}
- 设置myid
群集中每台机器的myid文件内容不能重复,并和配置文件里的server相对应。1-255即可
echo "1" > /opt/zookeeper/data/myid
- 复制到其他主机
scp -r /opt/zookeeper/ root@node02:/opt/zookeeper
scp -r /opt/zookeeper/ root@node03:/opt/zookeeper
- 修改其他节点的myid
vi /opt/zookeeper/data/myid
- 启动zookeeper(所有节点都要启动)
/opt/zookeeper/bin/zkServer.sh start
- 查看zookeeper状态
[root@localhost ~]# /opt/zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader
安装配置 kafka 集群
- 解压并移动到/opt/kafka
sudo tar -zxvf kafka_2.13-2.5.0.tgz
sudo mv kafka_2.13-2.5.0 /opt/kafka
- 复制配置文件
cp /opt/kafka/config/server.properties /opt/kafka/config/server-default.properties
- 设置配置文件
vi /opt/kafka/config/server.properties
- 配置说明
############################# Server Basics #############################
# 每一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变broker.id的话不会影响consumers
broker.id=0
############################# Socket Server Settings #############################
# 提供给客户端响应的地址和端口
#listeners=PLAINTEXT://:9092
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
# broker 处理消息的最大线程数,一般情况下不需要去修改
num.network.threads=3
# broker处理磁盘IO的线程数 ,数值应该大于你的硬盘数
num.io.threads=8
# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400
#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400
# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600
############################# Log Basics #############################
# kafka数据的存放地址,多个地址的话用逗号分割/opt/kafka/logs,/opt/kafka/logs-2
log.dirs=/tmp/kafka-logs
# 每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖
num.partitions=1
# 我们知道segment文件默认会被保留7天的时间,超时的话就会被清理,那么清理这件事情就需要有一些线程来做。这里就是用来设置恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1
############################# Internal Topic Settings #############################
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
############################# Log Flush Policy #############################
# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000
# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000
############################# Log Retention Policy #############################
# segment文件保留的最长时间,默认保留7天(168小时),超时将被删除,也就是说7天之前的数据将被清理掉。
log.retention.hours=168
# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824
# 日志文件中每个segment的大小,默认为1G
log.segment.bytes=1073741824
#上面的参数设置了每一个segment文件的大小是1G,那么就需要有一个东西去定期检查segment文件有没有达到1G,多长时间去检查一次,就需要设置一个周期性检查文件大小的时间(单位是毫秒)。
log.retention.check.interval.ms=300000
############################# Zookeeper #############################
# 消费者集群通过连接Zookeeper来找到broker。zookeeper连接服务器地址
zookeeper.connect=localhost:2181
# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000
############################# Group Coordinator Settings #############################
# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
- 把kafka复制到其他节点
scp -r /opt/kafka root@node02:/opt/kafka
scp -r /opt/kafka root@node03:/opt/kafka
- 修改其他节点的broker.id
vi /opt/kafka/config/server.properties
- 已守护进程方式启动kafka(所有节点都要启动)
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
- 使用zk客户端连接zk查看是否启动成功
# 连接
zkCli.sh -server node01:2181
# 查看brokers.id信息
ls /brokers/ids
# 标识成功
[1, 2, 3]
评论区