kafka集群安装

2021-04-08 09:56
260
0

kafka集群安装

1,安装 zookeeper集群

wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.1/apache-zookeeper-3.6.1-bin.tar.gz

tar -xvf apache-zookeeper-3.6.1-bin.tar.gz

cd apache-zookeeper-3.6.1-bin/conf

cp zoo_sample.cfg zoo.cfg

mkdir -p /data/logs/zkdata

mkdir -p /data/logs/zkdatalog

mkdir -p /data/logs/kafka/

 

2 vi zoo.cfg

tickTime=2000

initLimit=10

syncLimit=5

dataDir=/data/logs/zkdata

dataLogDir=/data/logs/zkdatalog

#clientPort=2181

server.1=60.247.145.62:2888:3888

server.2=60.247.145.43:2888:3888

server.3=60.247.145.132:2888:3888

#server.1 这个1是服务器的标识也可以是其他的数字, 表示这个是第几号服务器,用来标识服务器,这个标识要写到快照目录下面myid文件里

#60.247.145.62为集群里的IP地址,第一个端口是master和slave之间的通信端口,默认是2888,

#第二个端口是leader选举的端口,集群刚启动的时候选举或者leader挂掉之后进行新的选举的端口默认是3888

 

3 创建 myid 文件

分别在三台主机的 dataDir 路径下创建一个文件名为 myid 的文件,文件内容为该 zk 节点的编号。

例如,在第一台主机上建立的 myid 文!件内容是 1,第二台是 2,第三台是3

我的 目录是 /data/logs/zkdata

 

cd /data/apache-zookeeper-3.6.1-bin/bin

4  ./zkServer.sh start

 

5 ./zkServer.sh status

 

6,安装kafka

wget https://mirror.bit.edu.cn/apache/kafka/2.6.0/kafka_2.13-2.6.0.tgz

tar zxvf kafka_2.13-2.6.0.tgz

vi kafka/config/server.properties

broker.id=134   

port=9092  

host.name=192.168.138.134

advertised.port=9092

advertised.host.name=192.168.138.134

#ladvertised.listeners=PLAINTEXT://192.168.138.134:9092

num.network.threads=3

num.io.threads=8  

log.dirs=/data/logs/kafka/  

delete.topic.enable=true

socket.send.buffer.bytes=102400  

socket.receive.buffer.bytes=102400  

socket.request.max.bytes=104857600  

num.partitions=3

log.retention.hours=168  

message.max.byte=5242880   

default.replication.factor=2   

replica.fetch.max.bytes=5242880  

log.segment.bytes=1073741824  

log.retention.check.interval.ms=300000  

log.cleaner.enable=false  

zookeeper.connect=192.168.1.132:2181,192.168.1.43:2181,192.168.1.62:2181

192.168.1.132:2181,192.168.1.43:2181,192.168.1.62:2181

##############################################################################################

broker.id   当前机器在集群中的唯一标识,和zookeeper的myid性质一样  建议用自己主机的后三位  每台(主机)broker不一致

port       当前kafka对外提供服务的端口默认是9092  生产者(producer)要以这个端口为准     

host.name  这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。  (填写本机地址即可)

num.network.threads=3 #这个是borker进行网络处理的线程数

num.io.threads=8 #这个是borker进行I/O处理的线程数

log.dirs=/usr/logs/kafka/   #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个

socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能

socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘

socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小

num.partitions=3 #默认的分区数,一个topic默认1个分区数

log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天

message.max.byte=5242880 #消息保存的最大值5M

default.replication.factor=2 #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务

replica.fetch.max.bytes=5242880 #取消息的最大直接数

log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件

log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除

log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能

zookeeper.connect=192.168.138.132:2181,192.168.138.133:2181,192.168.138.134:2181   #设置zookeeper的连接端口 消费的时候要以这个端口消费

###############################################################################################

 

7 启动kafka

./kafka-server-start.sh -daemon ../config/server.properties

 

8 创建topic名为 test,拥有两个分区,两个副本的Topic

./kafka-topics.sh --create --zookeeper 60.247.145.132:2181,60.247.145.43:2181,60.247.145.62:2181 --replication-factor 2 --partitions 2 --topic test

 

./kafka-topics.sh --create --zookeeper 192.168.138.132:2181,192.168.138.133:2181,192.168.138.134:2181 --replication-factor 2 --partitions 3 --topic mylog

 

./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1 --topic mylog

 

 

./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 1 --partitions 1  --topic eth_yx

 

./kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 3 --partitions 2  --topic mylog

 

9查看zk集群上的 topic列表

./kafka-topics.sh --list --zookeeper 60.247.145.132:2181,60.247.145.43:2181,60.247.145.62:2181

 

./kafka-topics.sh --list --zookeeper 127.0.0.1:2181

 

10 查看topic描述

./kafka-topics.sh --describe --zookeeper 60.247.145.132:2181,60.247.145.43:2181,60.247.145.62:2181 --topic test

 

./kafka-topics.sh --describe --zookeeper 127.0.0.1:2181

 

11创建生产者producer

./kafka-console-producer.sh --broker-list 60.247.145.132:9092,60.247.145.43:9092,60.247.145.62:9092 --topic test

 

12创建消费者consumer 在134主机执行

./kafka-console-consumer.sh  --bootstrap-server 60.247.145.132:9092,60.247.145.43:9092,60.247.145.62:9092  --from-beginning  --topic  test

 

13其他命令:

kafka启动后查看zk broker情况

进入zk安装目录

 

bin/zkCli.sh -server 192.168.138.132:2181,192.168.138.133:2181,192.168.138.134:2181

 

ls /

 

ls /brokers/ids

 

可以查看到这里有我们设置的 132  133  134主机  

 

get /brokers/ids/132  可以看到132主机的详细信息

14关闭kafka

 

bin/kafka-server-stop.sh -daemon config/server.properties

1

如果要修改 kafka的broker.id

 

1.先修改

    config/server.properties

2.再修改config/server.properties配置的 log.dirs  下面的 meta.properties  

全部评论