【采坑-Flink消费kafka中的数据】阿里云ECS/VMware之zookeeper和kafka单机/集群环境

本文涉及的产品
实时计算 Flink 版,1000CU*H 3个月
任务调度 XXL-JOB 版免费试用,400 元额度,开发版规格
云原生网关 MSE Higress,422元/月
简介: 【采坑-Flink消费kafka中的数据】阿里云ECS/VMware之zookeeper和kafka单机/集群环境

一、环境


1.centos7.6

2.zookeeper-3.4.5

3.kafka_2.11-0.10.2.1

4.jdk1.8_261


二、单机-ECS-zookeeper


tar -zxvf zookeeper-3.4.5 -C /root/apps/

cp zoo_simple.cfg zoo.cfg

提前创建好数据和目录日志文件夹

dataDir=/root/data/zookeeper

dataLogDir=/root/data/zookeeperlog


三、单机-ECS-kafka


3.1 安装


tar -zxvf kafka_2.11-0.10.2.1 -C /root/apps/


3.2 修改server.properties

broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092
host.name=172.17.81.232 # 阿里云内网地址
advertised.host.name=47.94.39.202 # 阿里云外网地址
zookeeper.connect=localhost:2181 #zookeeper地址

3.3 启动本机测试


# 启动
bin/kafka-server-start.sh -daemon config/server.properties 
# 创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hanyaoyao
# 开启生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hanyaoyao
# 开启消费者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic hanyaoyao --from-beginning

四、VMware单机


4.1 zookeeper单机



五、VMware集群

hadoop1 192.168.52.200  
hadoop2 192.168.52.201  
hadoop3 192.168.52.202

5.1 zookeeper集群


1.解压安装
tar -zxvf zookeeper-3.4.5 -C /root/apps/
2.进入conf目录
cp zoo_simple.cfg  zoo.cfg  
3.数据目录【提前创建好集群的三个目录】
dataDir=/root/zkdata


4.集群配置


server.1=192.168.52.201:2888:3888  
server.2=192.168.52.202:2888:3888  
server.3=192.168.52.200:2888:3888


5.集群分发


scp -r zookeeper/ hadoop2:$PWD  
scp -r zookeeper/ hadoop3:$PWD


6.逐台启动

bin/zkStart.sh start

7.查看状态

bin/zkStart status

2020092209264179.png


5.2 kafka集群


1.tar -zxvf kafka_2.11-0.10.2.1 -C /root/apps/

2.修改配置文件如下:

broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://hadoop1:9092
host.name=hadoop1
log.dirs=/root/data/kafka
zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181

3.分发集群


scp  kafka_2.11-0.10.2.1/ hadoop2:$PWD
scp  kafka_2.11-0.10.2.1/ hadoop3:$PWD


4.修改hadoop2,hadoop3的集群编号

vi server.properties


broker.id=1
broker.id=2


5.逐台启动测试

1.启动
bin/kafka-server-start.sh -daemon config/server.properties  bin/kafka-server-start.sh -daemon config/server.properties
2..创建topic
bin/kafka-topics.sh --create --zookeeper  hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-factor 2 --partitions 2 --topic superman
# Created topic "superman".
3.创建生产者
bin/kafka-console-producer.sh --broker-list hadoop1:9092,hadoop2:9092,hadoop3:9092 --topic superman
4.创建消费者
bin/kafka-console-consumer.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --topic superman  --from-beginning

20200922093553660.png


六、Flink-Kafka


6.1 pom

    <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.9.1</version>
            <!-- provided在这表示此依赖只在代码编译的时候使用,运行和打包的时候不使用 --
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.9.1</version>
            <!--<scope>provided</scope>-->
        </dependency>
                <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.9.1</version>
        </dependency>


6.2 Flink-KafkaSource

public class KafkaSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //kafka配置
        String topic = "superman";
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers","192.168.52.200:9092");//多个的话可以指定
        prop.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("auto.offset.reset","earliest");
        prop.setProperty("group.id","consumer3");
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop);
        DataStreamSource<String> lines = env.addSource(kafkaSource);
        lines.print();
        env.execute();
    }
}

七、错误解决


在连接kafkasource时候,总是消费不到kafka中的数据,开始怀疑以下问题:

1.zookeeper集群,kafka集群消息不通

2.宿主机与虚拟机网络不通

3.flink版本与kafka版本jar冲突

4.windows防火墙问题

5.hosts文件的主机名没有配置


最后经过查文档和排除问题,终于得知了zookeeper在集群中的消息是以主机名发送的,所以需要配置主机名。

20200922142213864.png

目录
相关文章
|
1月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
486 43
|
1月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
166 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
5月前
|
存储 消息中间件 Kafka
基于 Flink 的中国电信星海时空数据多引擎实时改造
本文整理自中国电信集团大数据架构师李新虎老师在Flink Forward Asia 2024的分享,围绕星海时空智能系统展开,涵盖四个核心部分:时空数据现状、实时场景多引擎化、典型应用及未来展望。系统日处理8000亿条数据,具备亚米级定位能力,通过Flink多引擎架构解决数据膨胀与响应时效等问题,优化资源利用并提升计算效率。应用场景包括运动状态识别、个体行为分析和群智感知,未来将推进湖仓一体改造与三维时空服务体系建设,助力数字化转型与智慧城市建设。
629 3
基于 Flink 的中国电信星海时空数据多引擎实时改造
|
1月前
|
SQL 关系型数据库 Apache
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
本文将深入解析 Flink-Doris-Connector 三大典型场景中的设计与实现,并结合 Flink CDC 详细介绍了整库同步的解决方案,助力构建更加高效、稳定的实时数据处理体系。
986 0
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
|
2月前
|
存储 消息中间件 搜索推荐
京东零售基于Flink的推荐系统智能数据体系
摘要:本文整理自京东零售技术专家张颖老师,在 Flink Forward Asia 2024 生产实践(二)专场中的分享,介绍了基于Flink构建的推荐系统数据,以及Flink智能体系带来的智能服务功能。内容分为以下六个部分: 推荐系统架构 索引 样本 特征 可解释 指标 Tips:关注「公众号」回复 FFA 2024 查看会后资料~
222 1
京东零售基于Flink的推荐系统智能数据体系
|
安全 应用服务中间件 API
微服务分布式系统架构之zookeeper与dubbo-2
微服务分布式系统架构之zookeeper与dubbo-2
|
负载均衡 Java 应用服务中间件
微服务分布式系统架构之zookeeper与dubbor-1
微服务分布式系统架构之zookeeper与dubbor-1
|
存储 负载均衡 Dubbo
分布式-Zookeeper(一)
分布式-Zookeeper(一)
|
9月前
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
|
10月前
|
存储 运维 NoSQL
分布式读写锁的奥义:上古世代 ZooKeeper 的进击
本文作者将介绍女娲对社区 ZooKeeper 在分布式读写锁实践细节上的思考,希望帮助大家理解分布式读写锁背后的原理。
266 11

热门文章

最新文章