【采坑-Flink消费kafka中的数据】阿里云ECS/VMware之zookeeper和kafka单机/集群环境

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
注册配置 MSE Nacos/ZooKeeper,118元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【采坑-Flink消费kafka中的数据】阿里云ECS/VMware之zookeeper和kafka单机/集群环境

一、环境


1.centos7.6

2.zookeeper-3.4.5

3.kafka_2.11-0.10.2.1

4.jdk1.8_261


二、单机-ECS-zookeeper


tar -zxvf zookeeper-3.4.5 -C /root/apps/

cp zoo_simple.cfg zoo.cfg

提前创建好数据和目录日志文件夹

dataDir=/root/data/zookeeper

dataLogDir=/root/data/zookeeperlog


三、单机-ECS-kafka


3.1 安装


tar -zxvf kafka_2.11-0.10.2.1 -C /root/apps/


3.2 修改server.properties

broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092
host.name=172.17.81.232 # 阿里云内网地址
advertised.host.name=47.94.39.202 # 阿里云外网地址
zookeeper.connect=localhost:2181 #zookeeper地址

3.3 启动本机测试


# 启动
bin/kafka-server-start.sh -daemon config/server.properties 
# 创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hanyaoyao
# 开启生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hanyaoyao
# 开启消费者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic hanyaoyao --from-beginning

四、VMware单机


4.1 zookeeper单机



五、VMware集群

hadoop1 192.168.52.200  
hadoop2 192.168.52.201  
hadoop3 192.168.52.202

5.1 zookeeper集群


1.解压安装
tar -zxvf zookeeper-3.4.5 -C /root/apps/
2.进入conf目录
cp zoo_simple.cfg  zoo.cfg  
3.数据目录【提前创建好集群的三个目录】
dataDir=/root/zkdata


4.集群配置


server.1=192.168.52.201:2888:3888  
server.2=192.168.52.202:2888:3888  
server.3=192.168.52.200:2888:3888


5.集群分发


scp -r zookeeper/ hadoop2:$PWD  
scp -r zookeeper/ hadoop3:$PWD


6.逐台启动

bin/zkStart.sh start

7.查看状态

bin/zkStart status

2020092209264179.png


5.2 kafka集群


1.tar -zxvf kafka_2.11-0.10.2.1 -C /root/apps/

2.修改配置文件如下:

broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://hadoop1:9092
host.name=hadoop1
log.dirs=/root/data/kafka
zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181

3.分发集群


scp  kafka_2.11-0.10.2.1/ hadoop2:$PWD
scp  kafka_2.11-0.10.2.1/ hadoop3:$PWD


4.修改hadoop2,hadoop3的集群编号

vi server.properties


broker.id=1
broker.id=2


5.逐台启动测试

1.启动
bin/kafka-server-start.sh -daemon config/server.properties  bin/kafka-server-start.sh -daemon config/server.properties
2..创建topic
bin/kafka-topics.sh --create --zookeeper  hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-factor 2 --partitions 2 --topic superman
# Created topic "superman".
3.创建生产者
bin/kafka-console-producer.sh --broker-list hadoop1:9092,hadoop2:9092,hadoop3:9092 --topic superman
4.创建消费者
bin/kafka-console-consumer.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --topic superman  --from-beginning

20200922093553660.png


六、Flink-Kafka


6.1 pom

    <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.9.1</version>
            <!-- provided在这表示此依赖只在代码编译的时候使用,运行和打包的时候不使用 --
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.9.1</version>
            <!--<scope>provided</scope>-->
        </dependency>
                <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.9.1</version>
        </dependency>


6.2 Flink-KafkaSource

public class KafkaSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //kafka配置
        String topic = "superman";
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers","192.168.52.200:9092");//多个的话可以指定
        prop.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("auto.offset.reset","earliest");
        prop.setProperty("group.id","consumer3");
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop);
        DataStreamSource<String> lines = env.addSource(kafkaSource);
        lines.print();
        env.execute();
    }
}

七、错误解决


在连接kafkasource时候,总是消费不到kafka中的数据,开始怀疑以下问题:

1.zookeeper集群,kafka集群消息不通

2.宿主机与虚拟机网络不通

3.flink版本与kafka版本jar冲突

4.windows防火墙问题

5.hosts文件的主机名没有配置


最后经过查文档和排除问题,终于得知了zookeeper在集群中的消息是以主机名发送的,所以需要配置主机名。

20200922142213864.png

目录
相关文章
|
2月前
|
SQL 机器学习/深度学习 分布式计算
大数据-81 Spark 安装配置环境 集群环境配置 超详细 三台云服务器
大数据-81 Spark 安装配置环境 集群环境配置 超详细 三台云服务器
89 1
|
2天前
|
存储 SpringCloudAlibaba Java
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论。
【SpringCloud Alibaba系列】一文全面解析Zookeeper安装、常用命令、JavaAPI操作、Watch事件监听、分布式锁、集群搭建、核心理论
|
17天前
|
运维 Cloud Native 应用服务中间件
阿里云微服务引擎 MSE 及 云原生 API 网关 2024 年 11 月产品动态
阿里云微服务引擎 MSE 面向业界主流开源微服务项目, 提供注册配置中心和分布式协调(原生支持 Nacos/ZooKeeper/Eureka )、云原生网关(原生支持Higress/Nginx/Envoy,遵循Ingress标准)、微服务治理(原生支持 Spring Cloud/Dubbo/Sentinel,遵循 OpenSergo 服务治理规范)能力。API 网关 (API Gateway),提供 APl 托管服务,覆盖设计、开发、测试、发布、售卖、运维监测、安全管控、下线等 API 生命周期阶段。帮助您快速构建以 API 为核心的系统架构.满足新技术引入、系统集成、业务中台等诸多场景需要
|
1月前
|
运维 Cloud Native 应用服务中间件
阿里云微服务引擎 MSE 及 云原生 API 网关 2024 年 10 月产品动态
阿里云微服务引擎 MSE 面向业界主流开源微服务项目, 提供注册配置中心和分布式协调(原生支持 Nacos/ZooKeeper/Eureka )、云原生网关(原生支持Higress/Nginx/Envoy,遵循Ingress标准)、微服务治理(原生支持 Spring Cloud/Dubbo/Sentinel,遵循 OpenSergo 服务治理规范)能力。API 网关 (API Gateway),提供 APl 托管服务,覆盖设计、开发、测试、发布、售卖、运维监测、安全管控、下线等 API 生命周期阶段。帮助您快速构建以 API 为核心的系统架构.满足新技术引入、系统集成、业务中台等诸多场景需要
|
2月前
|
消息中间件 分布式计算 监控
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
104 6
|
2月前
|
运维 Cloud Native 应用服务中间件
阿里云微服务引擎 MSE 及 云原生 API 网关 2024 年 09 月产品动态
阿里云微服务引擎 MSE 面向业界主流开源微服务项目, 提供注册配置中心和分布式协调(原生支持 Nacos/ZooKeeper/Eureka )、云原生网关(原生支持Higress/Nginx/Envoy,遵循Ingress标准)、微服务治理(原生支持 Spring Cloud/Dubbo/Sentinel,遵循 OpenSergo 服务治理规范)能力。API 网关 (API Gateway),提供 APl 托管服务,覆盖设计、开发、测试、发布、售卖、运维监测、安全管控、下线等 API 生命周期阶段。帮助您快速构建以 API 为核心的系统架构.满足新技术引入、系统集成、业务中台等诸多场景需要
|
2月前
|
SQL 分布式计算 NoSQL
大数据-170 Elasticsearch 云服务器三节点集群搭建 测试运行
大数据-170 Elasticsearch 云服务器三节点集群搭建 测试运行
58 4
|
2月前
|
分布式计算 Hadoop Shell
Hadoop-35 HBase 集群配置和启动 3节点云服务器 集群效果测试 Shell测试
Hadoop-35 HBase 集群配置和启动 3节点云服务器 集群效果测试 Shell测试
85 4
|
2月前
|
SQL 分布式计算 Hadoop
Hadoop-37 HBase集群 JavaAPI 操作3台云服务器 POM 实现增删改查调用操作 列族信息 扫描全表
Hadoop-37 HBase集群 JavaAPI 操作3台云服务器 POM 实现增删改查调用操作 列族信息 扫描全表
39 3
|
2月前
|
分布式计算 Hadoop Shell
Hadoop-36 HBase 3节点云服务器集群 HBase Shell 增删改查 全程多图详细 列族 row key value filter
Hadoop-36 HBase 3节点云服务器集群 HBase Shell 增删改查 全程多图详细 列族 row key value filter
62 3