安装部署 集群规划
node1
node1
node1
zk
zk
zk
kafka
kafka
kafka
集群部署 官方下载地址 http://kafka.apache.org/downloads.html
解压安装包 1 [zhangan@node1 software]$ tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/bigdata/kafka/
创建软连接 1 [zhangan@node1 kafka]$ ln kafka_2.12-3.0.0/ default
修改配置文件 1 2 [zhangan@node1 default]$ cd config/ [zhangan@node1 config]$ vim server.properties
输入以下内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 broker.id=0 num.network.threads=3 num.io.threads=8 socket.send.buffer.bytes=102400 socket.receive.buffer.bytes=102400 socket.request.max.bytes=104857600 log.dirs=/opt/bigdata/kafka/default/datas num.partitions=1 num.recovery.threads.per.data.dir=1 offsets.topic.replication.factor=1 log.retention.hours=168 log.segment.bytes=1073741824 log.retention.check.interval.ms=300000 zookeeper.connect=node1:2181,node2:2181,node3:2181/kafka
分发安装包 1 [zhangan@node1 default]$ xsync kafka/
5)分别在node2和node3上修改配置文件/opt/bigdata/kafka/default/config/server.properties中的broker.id=2、broker.id=3
注:broker.id不得重复,整个集群中唯一。
1 2 3 4 5 6 7 8 9 [zhangan@node2 default]$ vim config/server.properties 修改: broker.id=2 [zhangan@node3 default]$ vim config/server.properties 修改: broker.id=3
配置环境变量
启动集群
先启动Zookeeper集群,然后启动Kafka。
1 [zhangan@node1 kafka]$ zk.sh start
依次在node1、node2、node3节点上启动Kafka。
1 2 3 [zhangan@node1 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties [zhangan@node2 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties [zhangan@node3 kafka]$ bin/kafka-server-start.sh -daemon config/server.properties
注意:配置文件的路径要能够到server.properties。
关闭集群 1 2 3 [zhangan@node1 kafka]$ bin/kafka-server-stop.sh [zhangan@node2 kafka]$ bin/kafka-server-stop.sh [zhangan@node3 kafka]$ bin/kafka-server-stop.sh
集群启停脚本 启停脚本 在/home/zhangan/bin目录下创建文件kf.sh脚本文件
1 [zhangan@node1 bin]$ vim kf.sh
脚本如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 #! /bin/bash case $1 in "start" ){ for i in node1 node2 node3 do echo " --------启动 $i Kafka-------" ssh $i "kafka-server-start.sh -daemon /opt/bigdata/kafka/default/config/server.properties" done };; "stop" ){ for i in node1 node2 node3 do echo " --------停止 $i Kafka-------" ssh $i "kafka-server-stop.sh stop" done };; esac
添加执行权限 略。
启动集群命令 1 [zhangan@node1 ~]$ kf.sh start
停止集群命令 1 [zhangan@node1 ~]$ kf.sh stop
注意:停止Kafka集群时,一定要等Kafka所有节点进程全部停止后再停止Zookeeper集群。因为Zookeeper集群当中记录着Kafka集群相关信息,Zookeeper集群一旦先停止,Kafka集群就没有办法再获取停止进程的信息,只能手动杀死Kafka进程了。
Kafka命令行操作 主题命令行操作 查看操作主题命令参数 1 [zhangan@node1 kafka]$ bin/kafka-topics.sh
参数
描述
–bootstrap-server <String: server toconnect to>
连接的Kafka Broker主机名称和端口号。
–topic <String: topic>
操作的topic名称。
–create
创建主题。
–delete
删除主题。
–alter
修改主题。
–list
查看所有主题。
–describe
查看主题详细描述。
–partitions <Integer: # of partitions>
设置分区数。
–replication-factor<Integer: replication factor>
设置分区副本。
–config <String: name=value>
更新系统默认的配置。
查看当前服务器中的所有topic 1 [zhangan@node1 kafka]$ bin/kafka-topics.sh --bootstrap-server node1:9092 --list
创建first topic 1 [zhangan@node1 kafka]$ bin/kafka-topics.sh --bootstrap-server node1:9092 --create --partitions 1 --replication-factor 3 --topic first
选项说明: –topic 定义topic名 –replication-factor 定义副本数 –partitions 定义分区数
查看first主题的详情 1 [zhangan@node1 kafka]$ bin/kafka-topics.sh --bootstrap-server node1:9092 --describe --topic first
修改分区数(注意:分区数只能增加,不能减少) 1 [zhangan@node1 kafka]$ bin/kafka-topics.sh --bootstrap-server node1:9092 --alter --topic first --partitions 3
再次查看first主题的详情 1 [zhangan@node1 kafka]$ bin/kafka-topics.sh --bootstrap-server node1:9092 --describe --topic first
删除topic 1 [zhangan@node1 kafka]$ bin/kafka-topics.sh --bootstrap-server node1:9092 --delete --topic first
生产者命令行操作 查看操作生产者命令参数 1 [zhangan@node1 kafka]$ bin/kafka-console-producer.sh
参数
描述
–bootstrap-server <String: server toconnect to>
连接的Kafka Broker主机名称和端口号。
–topic <String: topic>
操作的topic名称。
发送消息 1 2 3 [zhangan@node1 kafka]$ bin/kafka-console-producer.sh --bootstrap-server node1:9092 --topic first >hello world >zhangan zhangan
消费者命令行操作 查看操作消费者命令参数 1 [zhangan@node1 kafka]$ bin/kafka-console-consumer.sh
参数
描述
–bootstrap-server <String: server toconnect to>
连接的Kafka Broker主机名称和端口号。
–topic <String: topic>
操作的topic名称。
–from-beginning
从头开始消费。
–group <String: consumer group id>
指定消费者组名称。
消费消息 (1)消费first主题中的数据。
1 [zhangan@node1 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --topic first
(2)把主题中所有的数据都读取出来(包括历史数据)。
1 [zhangan@node1 kafka]$ bin/kafka-console-consumer.sh --bootstrap-server node1:9092 --from-beginning --topic first
Kafka-Eagle监控 Kafka-Eagle框架可以监控Kafka集群的整体运行情况,在生产环境中经常使用。
MySQL环境准备 Kafka-Eagle的安装依赖于MySQL,MySQL主要用来存储可视化展示的数据。如果集群中之前安装过MySQL可以跨过该步。
Kafka环境准备 1)关闭Kafka集群
1 [zhangan@node1 kafka]$ kf.sh stop
2)修改/opt/bigdata/kafka/default/bin/kafka-server-start.sh命令中
1 [zhangan@node1 kafka]$ vim bin/kafka-server-start.sh
修改如下参数值: 原内容:
1 2 3 if [ "x$KAFKA_HEAP_OPTS " = "x" ]; then export KAFKA_HEAP_OPTS="-Xmx1G -Xms1G" fi
修改为:
1 2 3 4 5 if [ "x$KAFKA_HEAP_OPTS " = "x" ]; then export KAFKA_HEAP_OPTS="-server -Xms2G -Xmx2G -XX:PermSize=128m -XX:+UseG1GC -XX:MaxGCPauseMillis=200 -XX:ParallelGCThreads=8 -XX:ConcGCThreads=5 -XX:InitiatingHeapOccupancyPercent=70" export JMX_PORT="9999" fi
注意:修改之后在启动Kafka之前要分发之其他节点
1 [zhangan@node1 bin]$ xsync kafka-server-start.sh
Kafka-Eagle安装 0)官网:https://www.kafka-eagle.org/
1)上传压缩包kafka-eagle-bin-2.0.8.tar.gz到集群/opt/software目录
2)解压到本地
1 [zhangan@node1 software]$ tar -zxvf kafka-eagle-bin-2.0.8.tar.gz
3)进入刚才解压的目录
1 [zhangan@node1 kafka-eagle-bin-2.0.8]$ ll
总用量 79164 -rw-rw-r–. 1 zhangan zhangan 81062577 10月 13 00:00 efak-web-2.0.8-bin.tar.gz
4)将efak-web-2.0.8-bin.tar.gz解压至/opt/module
1 [zhangan@node1 kafka-eagle-bin-2.0.8]$ tar -zxvf efak-web-2.0.8-bin.tar.gz -C /opt/module/
5)修改名称
1 [zhangan@node1 module]$ mv efak-web-2.0.8/ efak
6)修改配置文件 /opt/module/efak/conf/system-config.properties
1 [zhangan@node1 conf]$ vim system-config.properties
配置文件内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 ###################################### # multi zookeeper & kafka cluster list # Settings prefixed with 'kafka.eagle.' will be deprecated, use 'efak.' instead ###################################### efak.zk.cluster.alias=cluster1 cluster1.zk.list=node1:2181,node2:2181,node3:2181/kafka ###################################### # zookeeper enable acl ###################################### cluster1.zk.acl.enable=false cluster1.zk.acl.schema=digest cluster1.zk.acl.username=test cluster1.zk.acl.password=test123 ###################################### # broker size online list ###################################### cluster1.efak.broker.size=20 ###################################### # zk client thread limit ###################################### kafka.zk.limit.size=32 ###################################### # EFAK webui port ###################################### efak.webui.port=8048 ###################################### # kafka jmx acl and ssl authenticate ###################################### cluster1.efak.jmx.acl=false cluster1.efak.jmx.user=keadmin cluster1.efak.jmx.password=keadmin123 cluster1.efak.jmx.ssl=false cluster1.efak.jmx.truststore.location=/data/ssl/certificates/kafka.truststore cluster1.efak.jmx.truststore.password=ke123456 ###################################### # kafka offset storage ###################################### # offset保存在kafka cluster1.efak.offset.storage=kafka ###################################### # kafka jmx uri ###################################### cluster1.efak.jmx.uri=service:jmx:rmi:///jndi/rmi://%s/jmxrmi ###################################### # kafka metrics, 15 days by default ###################################### efak.metrics.charts=true efak.metrics.retain=15 ###################################### # kafka sql topic records max ###################################### efak.sql.topic.records.max=5000 efak.sql.topic.preview.records.max=10 ###################################### # delete kafka topic token ###################################### efak.topic.token=keadmin ###################################### # kafka sasl authenticate ###################################### cluster1.efak.sasl.enable=false cluster1.efak.sasl.protocol=SASL_PLAINTEXT cluster1.efak.sasl.mechanism=SCRAM-SHA-256 cluster1.efak.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="kafka" password="kafka-eagle"; cluster1.efak.sasl.client.id= cluster1.efak.blacklist.topics= cluster1.efak.sasl.cgroup.enable=false cluster1.efak.sasl.cgroup.topics= cluster2.efak.sasl.enable=false cluster2.efak.sasl.protocol=SASL_PLAINTEXT cluster2.efak.sasl.mechanism=PLAIN cluster2.efak.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="kafka" password="kafka-eagle"; cluster2.efak.sasl.client.id= cluster2.efak.blacklist.topics= cluster2.efak.sasl.cgroup.enable=false cluster2.efak.sasl.cgroup.topics= ###################################### # kafka ssl authenticate ###################################### cluster3.efak.ssl.enable=false cluster3.efak.ssl.protocol=SSL cluster3.efak.ssl.truststore.location= cluster3.efak.ssl.truststore.password= cluster3.efak.ssl.keystore.location= cluster3.efak.ssl.keystore.password= cluster3.efak.ssl.key.password= cluster3.efak.ssl.endpoint.identification.algorithm=https cluster3.efak.blacklist.topics= cluster3.efak.ssl.cgroup.enable=false cluster3.efak.ssl.cgroup.topics= ###################################### # kafka sqlite jdbc driver address ###################################### # 配置mysql连接 efak.driver=com.mysql.jdbc.Driver efak.url=jdbc:mysql://node1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull efak.username=root efak.password=000000 ###################################### # kafka mysql jdbc driver address ###################################### #efak.driver=com.mysql.cj.jdbc.Driver #efak.url=jdbc:mysql://127.0.0.1:3306/ke?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull #efak.username=root #efak.password=123456
7)添加环境变量
1 [zhangan@node1 conf]$ sudo vim /etc/profile.d/my_env.sh
添加内容:
1 2 3 # kafkaEFAK export KE_HOME=/opt/module/efak export PATH=$PATH:$KE_HOME/bin
注意:source /etc/profile
1 [zhangan@node1 conf]$ source /etc/profile
8)启动 (1)注意:启动之前需要先启动ZK以及KAFKA。
1 [zhangan@node1 kafka]$ kf.sh start
(2)启动efak
1 [zhangan@node1 efak]$ bin/ke.sh start
启动成功提示:
1 2 3 4 5 6 7 8 9 Version 2.0.8 -- Copyright 2016-2021 ***************************************************************** * EFAK Service has started success. * Welcome, Now you can visit 'http://192.168.10.102:8048' * Account:admin ,Password:123456 ***************************************************************** * <Usage> ke.sh [start|status|stop|restart|stats] </Usage> * <Usage> https://www.kafka-eagle.org/ </Usage> *****************************************************************
说明:如果停止efak,执行命令。
1 [zhangan@node1 efak]$ bin/ke.sh stop
Kafka-Eagle页面操作 1)登录页面查看监控数据http://192.168.10.102:8048/