二、Centos7安装部署Kafka集群

bridge
2022-01-27 / 0 评论 / 0 点赞 / 1,605 阅读 / 4,874 字 / 正在检测是否收录...
温馨提示:
本文最后更新于 2022-01-27,若内容或图片失效,请留言反馈。部分素材来自网络,若不小心影响到您的利益,请联系我们删除。

准备软件

软件版本下载地址
jdkjdk_8u261下载地址;提取码:500w
zookeeeperzookeeper 3.6.1下载地址
kafkakafka_2.13-2.5.0下载地址
kafka-managerkafka-manager-1.3.3.4下载地址;提取码:nse2

安装Jdk

  • 解压并将文件移动到/opt/java目录下
sudo tar -vxzf jdk-8u201-linux-x64.tar.gz
sudo mv jdk1.8.0_201 /opt/java
  • 配置java环境变量
vim /etc/profile
  • 在profile文件的最后面添加
export JAVA_HOME=/opt/java
export PATH=$JAVA_HOME/bin:$PATH
  • 环境变量立即生效
source /etc/profile
  • 添加成功后通过java -version来查看jdk是否安装成功
java -version

安装zookeeper集群

  • 解压并将文件移动到/opt/zookeeper目录下
sudo tar -zxvf apache-zookeeper-3.6.1-bin.tar.gz
sudo mv apache-zookeeper-3.6.1-bin /opt/zookeeper
  • 编辑zookeeper的配置(本例子中是3个节点)
# 复制默认配置文件
cp /opt/zookeeper/conf/zoo_sample.cfg /opt/zookeeper/conf/zoo.cfg
# 修改默认配置
vi /opt/zookeeper/conf/zoo.cfg
  • 配置说明
# 心跳和超时时间,单位毫秒
tickTime=2000
# 指定follower节点初始时连接leader节点的最大tick次数(5*tickTime=10秒),否则被视为超时
initLimit=10
# follwer节点与leader节点进行同步的最大时间(tick次数)
syncLimit=5

# 内存中保存系统快照的位置,生产环境中注意该文件夹的磁盘占用情况
dataDir=/opt/zookeeper/data
# 日志目录
dataLogDir=/opt/zookeeper/logs
# zookeeper监听客户端连接的端口(同一台机器用不同端口,不同机器可以用相同或者不同的端口),默认是2181
clientPort=2181

# X必须是一个全局唯一的数字,且需要与myid文件中的数字相对应,host可以是域名/机器名/IP,port1用于follower节点连接leader,port2用于leader选举
server.1=node01:2888:3888
server.2=node02:2888:3888
server.3=node03:2888:3888
  • 创建刚才配置的data,logs目录
mkdir -p /opt/zookeeper/{data,logs}
  • 设置myid

群集中每台机器的myid文件内容不能重复,并和配置文件里的server相对应。1-255即可

echo "1" > /opt/zookeeper/data/myid
  • 复制到其他主机
scp -r /opt/zookeeper/ root@node02:/opt/zookeeper
scp -r /opt/zookeeper/ root@node03:/opt/zookeeper
  • 修改其他节点的myid
vi /opt/zookeeper/data/myid
  • 启动zookeeper(所有节点都要启动)
/opt/zookeeper/bin/zkServer.sh start
  • 查看zookeeper状态
[root@localhost ~]# /opt/zookeeper/bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader

安装配置 kafka 集群

  • 解压并移动到/opt/kafka
sudo tar -zxvf kafka_2.13-2.5.0.tgz
sudo mv kafka_2.13-2.5.0 /opt/kafka
  • 复制配置文件
cp /opt/kafka/config/server.properties /opt/kafka/config/server-default.properties
  • 设置配置文件
vi /opt/kafka/config/server.properties
  • 配置说明
############################# Server Basics #############################

# 每一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变broker.id的话不会影响consumers
broker.id=0

############################# Socket Server Settings #############################

# 提供给客户端响应的地址和端口
#listeners=PLAINTEXT://:9092

# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# broker 处理消息的最大线程数,一般情况下不需要去修改
num.network.threads=3

# broker处理磁盘IO的线程数 ,数值应该大于你的硬盘数
num.io.threads=8

# 发送套接字的缓冲区大小
socket.send.buffer.bytes=102400

#接收套接字的缓冲区大小
socket.receive.buffer.bytes=102400

# 请求套接字的缓冲区大小
socket.request.max.bytes=104857600

############################# Log Basics #############################

# kafka数据的存放地址,多个地址的话用逗号分割/opt/kafka/logs,/opt/kafka/logs-2
log.dirs=/tmp/kafka-logs

# 每个topic的分区个数,若是在topic创建时候没有指定的话会被topic创建时的指定参数覆盖
num.partitions=1

# 我们知道segment文件默认会被保留7天的时间,超时的话就会被清理,那么清理这件事情就需要有一些线程来做。这里就是用来设置恢复和清理data下数据的线程数量
num.recovery.threads.per.data.dir=1

############################# Internal Topic Settings  #############################
# For anything other than development testing, a value greater than 1 is recommended to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Flush Policy #############################

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# segment文件保留的最长时间,默认保留7天(168小时),超时将被删除,也就是说7天之前的数据将被清理掉。
log.retention.hours=168

# segments drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# 日志文件中每个segment的大小,默认为1G
log.segment.bytes=1073741824

#上面的参数设置了每一个segment文件的大小是1G,那么就需要有一个东西去定期检查segment文件有没有达到1G,多长时间去检查一次,就需要设置一个周期性检查文件大小的时间(单位是毫秒)。
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# 消费者集群通过连接Zookeeper来找到broker。zookeeper连接服务器地址
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=18000

############################# Group Coordinator Settings #############################

# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup.
group.initial.rebalance.delay.ms=0
  • 把kafka复制到其他节点
scp -r /opt/kafka root@node02:/opt/kafka
scp -r /opt/kafka root@node03:/opt/kafka
  • 修改其他节点的broker.id
vi /opt/kafka/config/server.properties
  • 已守护进程方式启动kafka(所有节点都要启动)
/opt/kafka/bin/kafka-server-start.sh -daemon /opt/kafka/config/server.properties
  • 使用zk客户端连接zk查看是否启动成功
# 连接
zkCli.sh -server node01:2181
# 查看brokers.id信息
ls /brokers/ids
# 标识成功
[1, 2, 3]

集群安装成功~

0

评论区