docker-compose实现zookeeper及kafka集群

为了在三台虚拟机上部署ZooKeeper和Kafka集群,每台主机上各有一个ZooKeeper实例和一个Kafka实例,你可以使用docker-compose来简化这一过程。下面是一个基本的示例配置,用于在每台主机上部署ZooKeeper和Kafka。

准备工作

  1. 确保每台虚拟机都安装了Docker和Docker Compose
  2. 确定每台虚拟机的IP地址
    • 192.168.88.130
    • 192.168.88.131
    • 192.168.88.132

1、创建 Docker Compose 文件

在每台虚拟机上创建一个docker-compose.yml文件。这个文件将定义ZooKeeper和Kafka的服务。

docker-compose.yml

节点1

version: '3'

services:
  zk1:
    container_name: zk1
    hostname: zk1
    image: wurstmeister/zookeeper:latest
    restart: always
    environment:
      - ZOO_MY_ID=1
      - ZOO_SERVERS=server.1=0.0.0.0:2888:3888,server.2=192.168.88.131:2888:3888,server.3=192.168.88.132:2888:3888
    volumes:
      - ./zk_conf/zoo.cfg:/opt/zookeeper-3.4.13/conf/zoo.cfg
      - ./zk_data:/opt/zookeeper-3.4.13/data
    ports:
      - 2181:2181
      - 2888:2888
      - 3888:3888

  kafka1:
    container_name: kafka1
    hostname: kafka1
    image: wurstmeister/kafka:latest
    restart: always
    environment:
      - KAFKA_BROKER_ID=1
      - KAFKA_MIN_INSYNC_REPLICAS=1
      - KAFKA_DEFAULT_REPLICATION_FACTOR=3
      - KAFKA_ZOOKEEPER_CONNECT=192.168.88.130:2181,192.168.88.131:2181,192.168.88.132:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.88.130:9092
      - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
    ports:
      - 9092:9092

节点2

version: '3'

services:
  zk2:
    container_name: zk2
    hostname: zk2
    image: wurstmeister/zookeeper:latest
    restart: always
    environment:
      - ZOO_MY_ID=2
      - ZOO_SERVERS=server.1=192.168.88.130:2888:3888,server.2=0.0.0.0:2888:3888,server.3=192.168.88.132:2888:3888
    volumes:
      - ./zk_conf/zoo.cfg:/opt/zookeeper-3.4.13/conf/zoo.cfg
      - ./zk_data:/opt/zookeeper-3.4.13/data
    ports:
      - 2181:2181
      - 2888:2888
      - 3888:3888

  kafka2:
    container_name: kafka2
    hostname: kafka2
    image: wurstmeister/kafka:latest
    restart: always
    environment:
      - KAFKA_BROKER_ID=2
      - KAFKA_MIN_INSYNC_REPLICAS=1
      - KAFKA_DEFAULT_REPLICATION_FACTOR=3
      - KAFKA_ZOOKEEPER_CONNECT=192.168.88.130:2181,192.168.88.131:2181,192.168.88.132:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.88.131:9092
      - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
    ports:
      - 9092:9092

节点3

version: '3'

services:
  zk3:
    container_name: zk3
    hostname: zk3
    image: wurstmeister/zookeeper:latest
    restart: always
    environment:
      - ZOO_MY_ID=3
      - ZOO_SERVERS=server.1=192.168.88.130:2888:3888,server.2=192.168.88.131:2888:3888,server.3=0.0.0.0:2888:3888
    volumes:
      - ./zk_conf/zoo.cfg:/opt/zookeeper-3.4.13/conf/zoo.cfg
      - ./zk_data:/opt/zookeeper-3.4.13/data
    ports:
      - 2181:2181
      - 2888:2888
      - 3888:3888

  kafka3:
    container_name: kafka3
    hostname: kafka3
    image: wurstmeister/kafka:latest
    restart: always
    environment:
      - KAFKA_BROKER_ID=3
      - KAFKA_MIN_INSYNC_REPLICAS=1
      - KAFKA_DEFAULT_REPLICATION_FACTOR=3
      - KAFKA_ZOOKEEPER_CONNECT=192.168.88.130:2181,192.168.88.131:2181,192.168.88.132:2181
      - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.88.132:9092
      - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092
    ports:
      - 9092:9092

2、创建配置文件

根据每台主机的情况,你需要修改compose文件中环境变量,以确保每台主机上的ZooKeeper和Kafka实例具有唯一的ID。每台需要修改为本机IP,集群配置不变。

在每台主机上面创建目录

mkdir /data/zk_kafka/zk_conf -p
mkdir /data/zk_kafka/zk_data
cd /data/zk_kafka

vim zk_conf/zoo.cfg       # 三台主机都需要创建

tickTime=2000
initLimit=10
syncLimit=5
dataDir=/opt/zookeeper-3.4.13/data
clientPort=2181
autopurge.snapRetainCount=3                                                        
autopurge.purgeInterval=1
server.1=0.0.0.0:2888:3888         # 节点1配置
server.2=192.168.88.131:2888:3888  # 节点2到时配置0.0.0.0,节点3一致这样配置。
server.3=192.168.88.132:2888:3888

# 第一台
echo 1 > zk_data/myid
# 第二台
echo 2 > zk_data/myid
# 第三台
echo 3 > zk_data/myid

3、启动 Docker Compose

在每台虚拟机上,使用以下命令启动Docker Compose:

docker-compose ps
NAME                COMMAND                  SERVICE             STATUS              PORTS
kafka1              "start-kafka.sh"         kafka1              running             0.0.0.0:9092->9092/tcp
zk1                 "/bin/sh -c '/usr/sb…"   zk1                 running             0.0.0.0:2181->2181/tcp, 0.0.0.0:2888->2888/tcp, 0.0.0.0:3888->3888/tcp

这将启动ZooKeeper和Kafka服务,并将它们作为守护进程在后台运行。

4、验证ZooKeeper

  • 使用ZooKeeper客户端工具(如zkCli.sh)连接到ZooKeeper。

要验证ZooKeeper是否正常运行,你可以使用ZooKeeper自带的命令行工具zkCli.sh来进行一些基本的操作,比如查看集群状态、检查节点数据等。以下是如何验证ZooKeeper集群是否正常运行的步骤:

验证zookeeper角色

# 节点1
docker exec -it zk1 bash
./bin/zkServer.sh status
# 输出结果
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg
Mode: follower

# 节点2
docker exec -it zk2 bash
./bin/zkServer.sh status
# 输出结果
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg
Mode: follower

# 节点3
docker exec -it zk3 bash
./bin/zkServer.sh status
# 输出结果
ZooKeeper JMX enabled by default
Using config: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg
Mode: leader

可以看到zookeeper三台集群各自的角色。

验证集群同步

./bin/zkCli.sh
create /test "hello zookeeper"
# 输出结果
Created /test

查看临时节点

get /test
# 输出结果
hello zookeeper
cZxid = 0x200000002
ctime = Fri Sep 06 08:06:31 UTC 2024
mZxid = 0x200000002
mtime = Fri Sep 06 08:06:31 UTC 2024
pZxid = 0x200000002
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 15
numChildren = 0

连接到其他ZooKeeper实例,并检查刚才创建的临时节点是否存在:

docker exec -it zk2 bash
./bin/zkCli.sh
get /test
# 输出结果
hello zookeeper
cZxid = 0x200000002
ctime = Fri Sep 06 08:06:31 UTC 2024
mZxid = 0x200000002
mtime = Fri Sep 06 08:06:31 UTC 2024
pZxid = 0x200000002
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 15
numChildren = 0

如果其他ZooKeeper实例也显示了这个临时节点,则说明集群同步正常。

总结
通过以上步骤,你可以验证ZooKeeper是否正常运行,并检查集群的状态。如果一切正常,你应该能看到集群成员的角色以及数据同步情况。如果遇到任何问题,可以通过查看ZooKeeper的日志文件或使用更详细的命令来诊断问题。

5、验证Kafka

  • 使用Kafka命令行工具(如kafka-topics.sh)创建主题并验证集群状态。
5.1 创建主题
docker exec -it kafka1 bash
cd /opt/kafka/
./bin/kafka-topics.sh --create --zookeeper 192.168.88.130:2181 --topic test --partitions 3 --replication-factor 3
# 输出结果
Created topic test.

这里创建了一个名为test-topic的主题,它有3个分区和3个副本,以确保高可用性。

验证主题是否成功创建:

# 到另外两台查看新创建的test
# 查看topics
# 192.168.88.131
./bin/kafka-topics.sh --list --zookeeper 192.168.88.131:2181
test
# 192.168.88.132
./bin/kafka-topics.sh --list --zookeeper 192.168.88.132:2181
test

你应该能看到刚刚创建的主题test

查看主题的详细信息:

./bin/kafka-topics.sh --describe --topic test --bootstrap-server 192.168.88.130:9092
# 输出结果
Topic: test	TopicId: jKMCnaYeSvayrZatlgZp8A	PartitionCount: 3	ReplicationFactor: 3	Configs: min.insync.replicas=2,segment.bytes=1073741824
    Topic: test	Partition: 0	Leader: 1	Replicas: 1,2,3	Isr: 1,2,3
    Topic: test	Partition: 1	Leader: 2	Replicas: 2,3,1	Isr: 2,3,1
    Topic: test	Partition: 2	Leader: 3	Replicas: 3,1,2	Isr: 3,1,2

这段信息描述了一个名为 test 的 Kafka 主题的配置和分区状态。以下是对这些信息的详细解释:

主题配置

  • Topic: test
  • TopicId: jKMCnaYeSvayrZatlgZp8A (这是主题的唯一标识符)
  • PartitionCount: 3 (主题被分为3个分区)
  • ReplicationFactor: 3 (每个分区的副本因子为3,意味着每个分区的数据会被复制到3个不同的Kafka broker上)
  • Configs:
    • min.insync.replicas=2: 表示生产者在发送消息时,至少需要有2个副本同步确认后才算成功。
    • segment.bytes=1073741824: 每个分区的segment文件的最大大小为1GB。
      分区状态
  • Partition 0:
    • Leader: 1 (Broker 1是该分区的领导者,负责处理所有对该分区的读写请求)
    • Replicas: 1,2,3 (该分区的副本存储在Broker 1, 2, 3上)
    • Isr: 1,2,3 (In-Sync Replicas,即与领导者同步的副本列表,这里所有副本都是同步的)
  • Partition 1:
    • Leader: 2 (Broker 2是该分区的领导者)
    • Replicas: 2,3,1
    • Isr: 2,3,1
  • Partition 2:
    • Leader: 3 (Broker 3是该分区的领导者)
    • Replicas: 3,1,2
    • Isr: 3,1,2

这个主题的配置和状态显示了一个典型的高可用性和高可靠性的Kafka主题设置。如果你需要进行任何操作,如调整配置或监控状态,这些信息都是非常重要的基础数据。

这些信息对于监控和维护Kafka集群的健康状态非常重要。例如,如果某个分区的ISR列表中缺少某些副本,可能意味着这些副本落后于领导者,或者存在其他同步问题。在这种情况下,可能需要进一步的调查和干预。

5.2 发布消息

向主题发布一条消息:

./bin/kafka-console-producer.sh --broker-list 192.168.88.130:9092 --topic test

然后输入一些消息,例如:

>hello kafka

按回车键发送每条消息。

消费消息

同时在另一个终端窗口中,启动一个消费者来消费消息:

# 节点2
./bin/kafka-console-consumer.sh --bootstrap-server 192.168.88.131:9092 --topic test --from-beginning
# 输出结果
hello kafka

验证Kafka集群的状态,确保所有的Broker都在集群中并且正常工作:

./bin/kafka-topics.sh --describe --bootstrap-server 192.168.88.131:9092
......

检查输出结果中的LeaderReplicas部分,确保每个分区都有一个Leader,并且所有的副本都在同步中。

总结
通过以上步骤,你可以验证Kafka集群是否正常运行,并检查集群的状态。如果一切正常,你应该能看到主题列表、主题描述、发布的消息以及消费的消息。如果遇到任何问题,可以通过查看Kafka Broker的日志文件或使用更详细的命令来诊断问题。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.mfbz.cn/a/875388.html

如若内容造成侵权/违法违规/事实不符,请联系我们进行投诉反馈qq邮箱809451989@qq.com,一经查实,立即删除!

相关文章

3分钟带你了解什么是数据目录

什么是数据目录? 数据目录,顾名思义就是“数据的目录”。这里的“数据”指的是元数据。数据目录通过管理这些元数据,形成一个可用的数据清单,使数据开发者、数据分析师等人员能够通过查阅和搜索等操作,快速找到所需的数…

4052A/4052B/4052C/4052D/4052E/4052F/4052G /4052H信号/频谱分析仪

4052A/4052B/4052C/4052D/4052E/4052F/4052G /4052H信号/频谱分析仪 苏州新利通 Ceyear 4052具备出色的测试动态范围、相位噪声、幅度精度和测试速度,具备频谱分析、I/Q分析、实时频谱分析、瞬态分析、矢量信号分析、脉冲分析、音频分析等丰富的测试功能。 Ceyear…

长沙自闭症寄宿学校推荐,为孩子开启光明未来

在长沙这座历史悠久而又充满活力的城市中,自闭症儿童的成长与教育问题牵动着无数家庭的心。家长们渴望为孩子找到一所能够提供专业康复、温馨关怀与全面教育的学校,为他们的未来铺设一条光明之路。虽然本文起始于长沙的期盼,但我们的目光已跨…

SpringSecurity原理解析(二):认证流程

1、SpringSecurity认证流程包含哪几个子流程? 1)账号验证 2)密码验证 3)记住我—>Cookie记录 4)登录成功—>页面跳转 2、UsernamePasswordAuthenticationFilter 在SpringSecurity中处理认证逻辑是在UsernamePas…

Windows10 如何配置python IDE

Windows10 如何配置python IDE 前言Python直接安装(快速上手)Step1.找到网址Step2.选择版本(非常重要)Step3. 安装过程Step4. python测试 Anaconda安装(推荐,集成了Spyder和Pycharm的安装)Step1…

使用功率分析仪测量和分析电抗器(电感器)的方法

高频电抗器用于电动汽车 (EV) 和混合动力汽车 (HEV) 的各种位置。例如,电池和逆变器之间的升压 DC/DC 转换器以及电池充电电路中的 AC/DC 转换器。为了提高整个系统的效率,必须提高每个组成电路的效率,而电抗器是造成这些电路大量损耗的元件之…

Unity 之 【Android Unity FBO渲染】之 [Unity 渲染 Android 端播放的视频] 的一种方法简单整理

Unity 之 【Android Unity FBO渲染】之 [Unity 渲染 Android 端播放的视频] 的一种方法简单整理 目录 Unity 之 【Android Unity FBO渲染】之 [Unity 渲染 Android 端播放的视频] 的一种方法简单整理 一、简单介绍 二、FBO 简单介绍 三、案例实现原理 四、注意事项 五、简…

03 Flask-添加配置信息

回顾之前学习的内容 02 Flask-快速上手 Flask 中最简单的web应用组成 1. 导入核心库 Flask from flask import Flask2. 实例化 web应用 注意:不要漏了 app Flask(__name__) 中的 __name__ 表示:是从当前的py文件实例化 app Flask(__name__)3. 创…

力扣每日一题:1372.二叉树中的最长交错路径

题目 给你一棵以 root 为根的二叉树,二叉树中的交错路径定义如下: 选择二叉树中 任意 节点和一个方向(左或者右)。如果前进方向为右,那么移动到当前节点的的右子节点,否则移动到它的左子节点。改变前进方…

力扣213-打家劫舍 II(Java详细题解)

题目链接:213. 打家劫舍 II - 力扣(LeetCode) 前情提要: 本体是打家劫舍的一个变形题,希望大家能先做198. 打家劫舍 - 力扣(LeetCode),并看一下我上题的讲解力扣198-打家劫舍&…

制证书、制电子印章、签章 -- 演示程序说明

ofd签章系统涉及证书的制作、电子印章制作、签章、验章等环节。关于ofd签章原理,本人写过多篇文章进行了阐述; 见文章《ofd板式文件 电子签章实现方法》、《一款简单易用的印章设计工具》、《签章那些事 -- 让你全面了解签章的流程》。 为了进一步加深对签章过程的理…

RK3229 ADNROID9 hdmi与耳机口同出声音

声卡0怎么配置才能跟HDMI同时输出一样的声音,下面是具体描述: 1、硬件连接 声卡0的连接是芯片的ADC音频输出脚直接接到DA芯片输出 2、cat /proc/asound/cards 0 [rockchiprk3229 ]: rockchip_rk3229 - rockchip,rk3229 rockchip,rk3229 1 [rockchiphdmi …

MFC工控项目实例之十一板卡测试信号输入界面

承接专栏《MFC工控项目实例之十添加系统测试对话框》 相关代码 1、在BoardTest.h文件中添加代码 class CBoardTest : public CDialog { // Construction public:CBoardTest(CWnd* pParent NULL); // standard constructorCButtonST m_btnStart[16];CWinThread* pThread…

FAT32文件系统详细分析 (格式化SD nandSD卡)

FAT32 文件系统详细分析 (格式化 SD nand/SD 卡) 目录 FAT32 文件系统详细分析 (格式化 SD nand/SD 卡)1. 前言2.格式化 SD nand/SD 卡3.FAT32 文件系统分析3.1 保留区分析3.1.1 BPB(BIOS Parameter Block) 及 BS 区分析3.1.2 FSInfo 结构扇区分析3.1.3 引导扇区剩余扇区3.1.4 …

RocketMQ 基础入门

文章内容是学习过程中的知识总结,如有纰漏,欢迎指正 文章目录 前言 RocketMQ 特点 RocketMQ 优势 1. RocketMQ 基本概念 1.1 NameServer 1.1.1 NameServer作用 1.1.2 和zk的区别 1.1.3 高可用保障 1.2 Broker 1.2.1 部署方式 1.2.1.1 单 Master 1.2.1.2 …

C语言 | Leetcode C语言题解之第396题旋转函数

题目&#xff1a; 题解&#xff1a; #define MAX(a, b) ((a) > (b) ? (a) : (b))int maxRotateFunction(int* nums, int numsSize){int f 0, numSum 0;for (int i 0; i < numsSize; i) {f i * nums[i];numSum nums[i];}int res f;for (int i numsSize - 1; i &g…

多维时序 | Matlab基于SSA-SVR麻雀算法优化支持向量机的数据多变量时间序列预测

多维时序 | Matlab基于SSA-SVR麻雀算法优化支持向量机的数据多变量时间序列预测 目录 多维时序 | Matlab基于SSA-SVR麻雀算法优化支持向量机的数据多变量时间序列预测效果一览基本介绍程序设计参考资料 效果一览 基本介绍 1.Matlab基于SSA-SVR麻雀算法优化支持向量机的数据多变…

Docker部署tenine实现后端应用的高可用与负载均衡

采用Docker方式的Tengine 和 keepalived 组合模式可以实现小应用场景的高可用负载均衡需求 目录 网络架构一、环境准备二、软件安装1. 下载Tenine镜像2. 下载Keepalived镜像3. 制作SpringBoot镜像 三、软件配置1. 创建应用容器2. 代理访问应用3. 创建Keepalived4. 测试高可用 网…

CSP-J算法基础 树状结构与二叉树

文章目录 前言树状结构树状结构的基本概念&#xff1a;为什么需要树状结构&#xff1f;优点树状结构的示例 二叉树什么是二叉树&#xff1f;二叉树的类型什么样的树不是二叉树&#xff1f;二叉树的五种形态 完全二叉树相关概念完全二叉树的定义&#xff1a; 相关概念1. **高度&…

Xcode报错:No exact matches in reference to static method ‘buildExpression‘

Xcode报错1&#xff1a;No exact matches in reference to static method buildExpression Xcode报错2&#xff1a;Type () cannot conform to View 这两个报错都是因为在SwiftUI的View的Body里面使用了ForEach循环,却没有在ForEach循环闭包的内部返回视图&#xff0c;而是做了…