【采坑-Flink消费kafka中的数据】阿里云ECS/VMware之zookeeper和kafka单机/集群环境

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云原生网关 MSE Higress,422元/月
服务治理 MSE Sentinel/OpenSergo,Agent数量 不受限
简介: 【采坑-Flink消费kafka中的数据】阿里云ECS/VMware之zookeeper和kafka单机/集群环境

一、环境


1.centos7.6

2.zookeeper-3.4.5

3.kafka_2.11-0.10.2.1

4.jdk1.8_261


二、单机-ECS-zookeeper


tar -zxvf zookeeper-3.4.5 -C /root/apps/

cp zoo_simple.cfg zoo.cfg

提前创建好数据和目录日志文件夹

dataDir=/root/data/zookeeper

dataLogDir=/root/data/zookeeperlog


三、单机-ECS-kafka


3.1 安装


tar -zxvf kafka_2.11-0.10.2.1 -C /root/apps/


3.2 修改server.properties

broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092
host.name=172.17.81.232 # 阿里云内网地址
advertised.host.name=47.94.39.202 # 阿里云外网地址
zookeeper.connect=localhost:2181 #zookeeper地址

3.3 启动本机测试


# 启动
bin/kafka-server-start.sh -daemon config/server.properties 
# 创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hanyaoyao
# 开启生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hanyaoyao
# 开启消费者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic hanyaoyao --from-beginning

四、VMware单机


4.1 zookeeper单机



五、VMware集群

hadoop1 192.168.52.200  
hadoop2 192.168.52.201  
hadoop3 192.168.52.202

5.1 zookeeper集群


1.解压安装
tar -zxvf zookeeper-3.4.5 -C /root/apps/
2.进入conf目录
cp zoo_simple.cfg  zoo.cfg  
3.数据目录【提前创建好集群的三个目录】
dataDir=/root/zkdata


4.集群配置


server.1=192.168.52.201:2888:3888  
server.2=192.168.52.202:2888:3888  
server.3=192.168.52.200:2888:3888


5.集群分发


scp -r zookeeper/ hadoop2:$PWD  
scp -r zookeeper/ hadoop3:$PWD


6.逐台启动

bin/zkStart.sh start

7.查看状态

bin/zkStart status

2020092209264179.png


5.2 kafka集群


1.tar -zxvf kafka_2.11-0.10.2.1 -C /root/apps/

2.修改配置文件如下:

broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://hadoop1:9092
host.name=hadoop1
log.dirs=/root/data/kafka
zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181

3.分发集群


scp  kafka_2.11-0.10.2.1/ hadoop2:$PWD
scp  kafka_2.11-0.10.2.1/ hadoop3:$PWD


4.修改hadoop2,hadoop3的集群编号

vi server.properties


broker.id=1
broker.id=2


5.逐台启动测试

1.启动
bin/kafka-server-start.sh -daemon config/server.properties  bin/kafka-server-start.sh -daemon config/server.properties
2..创建topic
bin/kafka-topics.sh --create --zookeeper  hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-factor 2 --partitions 2 --topic superman
# Created topic "superman".
3.创建生产者
bin/kafka-console-producer.sh --broker-list hadoop1:9092,hadoop2:9092,hadoop3:9092 --topic superman
4.创建消费者
bin/kafka-console-consumer.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --topic superman  --from-beginning

20200922093553660.png


六、Flink-Kafka


6.1 pom

    <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.9.1</version>
            <!-- provided在这表示此依赖只在代码编译的时候使用,运行和打包的时候不使用 --
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.9.1</version>
            <!--<scope>provided</scope>-->
        </dependency>
                <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.9.1</version>
        </dependency>


6.2 Flink-KafkaSource

public class KafkaSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //kafka配置
        String topic = "superman";
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers","192.168.52.200:9092");//多个的话可以指定
        prop.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("auto.offset.reset","earliest");
        prop.setProperty("group.id","consumer3");
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop);
        DataStreamSource<String> lines = env.addSource(kafkaSource);
        lines.print();
        env.execute();
    }
}

七、错误解决


在连接kafkasource时候,总是消费不到kafka中的数据,开始怀疑以下问题:

1.zookeeper集群,kafka集群消息不通

2.宿主机与虚拟机网络不通

3.flink版本与kafka版本jar冲突

4.windows防火墙问题

5.hosts文件的主机名没有配置


最后经过查文档和排除问题,终于得知了zookeeper在集群中的消息是以主机名发送的,所以需要配置主机名。

20200922142213864.png

目录
相关文章
|
4天前
|
机器学习/深度学习 编解码 人工智能
阿里云gpu云服务器租用价格:最新收费标准与活动价格及热门实例解析
随着人工智能、大数据和深度学习等领域的快速发展,GPU服务器的需求日益增长。阿里云的GPU服务器凭借强大的计算能力和灵活的资源配置,成为众多用户的首选。很多用户比较关心gpu云服务器的收费标准与活动价格情况,目前计算型gn6v实例云服务器一周价格为2138.27元/1周起,月付价格为3830.00元/1个月起;计算型gn7i实例云服务器一周价格为1793.30元/1周起,月付价格为3213.99元/1个月起;计算型 gn6i实例云服务器一周价格为942.11元/1周起,月付价格为1694.00元/1个月起。本文为大家整理汇总了gpu云服务器的最新收费标准与活动价格情况,以供参考。
阿里云gpu云服务器租用价格:最新收费标准与活动价格及热门实例解析
|
7天前
|
云安全 弹性计算 安全
阿里云服务器基础安全防护简介,云服务器基础安全防护及常见安全产品简介
在使用云服务器的过程中,云服务器的安全问题是很多用户非常关心的问题,阿里云服务器不仅提供了一些基础防护,我们也可以选择其他的云安全类产品来确保我们云服务器的安全。本文为大家介绍一下阿里云服务器的基础安全防护有哪些,以及阿里云的一些安全防护类云产品。
阿里云服务器基础安全防护简介,云服务器基础安全防护及常见安全产品简介
|
6天前
|
机器学习/深度学习 弹性计算 人工智能
阿里云第八代云服务器ECSg8i实例深度解析:性能及适用场景参考
目前企业对云服务器的性能、安全性和AI能力的要求日益提高。阿里云推出的第八代云服务器ECS g8i实例,以其卓越的性能、增强的AI能力和全面的安全防护,除了适用于通用互联网应用和在线音视频应用等场景之外,也广泛应用于AI相关应用。本文将深入解析ECS g8i实例的技术特性、产品优势、适用场景及与同类产品的对比,以供参考。
阿里云第八代云服务器ECSg8i实例深度解析:性能及适用场景参考
|
19天前
|
弹性计算 运维 搜索推荐
阿里云建站方案参考:云服务器、速成美站、企业官网区别及选择参考
随着数字化转型的浪潮不断推进,越来越多的企业和公司开始将业务迁移到云端,而搭建一个专业、高效的企业官网成为了上云的第一步。企业官网不仅是展示公司形象、产品和服务的重要窗口,更是与客户沟通、传递价值的关键渠道。随着阿里云服务器和建站产品的知名度越来越高,越来越多的用户选择阿里云的产品来搭建自己的官网。本文将深入探讨在阿里云平台上,如何选择最适合自己的建站方案:云服务器建站、云·速成美站还是云·企业官网。
105 13
阿里云建站方案参考:云服务器、速成美站、企业官网区别及选择参考
|
13天前
|
编解码 分布式计算 Linux
最新阿里云服务器、轻量应用服务器、GPU云服务器活动价格参考
阿里云服务器产品包含云服务器、轻量应用服务器、GPU云服务器等,本文汇总了这些云服务器当下最新的实时活动价格情况,包含经济型e实例云服务器价格、通用算力型u1实例云服务器价格、第七代云服务器价格、轻量应用服务器最新价格、GPU云服务器价格,以供大家参考。
最新阿里云服务器、轻量应用服务器、GPU云服务器活动价格参考
|
20天前
|
域名解析 弹性计算 数据挖掘
阿里云特惠云服务器82元、298元、99元和199元性能及选择参考
目前在阿里云的活动中有几款价格非常实惠的云服务器,现在购买2核2G轻量应用服务器最低只要82元1年,云服务器最低只要99元1年。购买2核4G轻量应用服务器最低只要298元1年,云服务器最低只要199元1年。本文为大家解析2024年阿里云这几款特惠云服务器的性能及选择参考。
阿里云特惠云服务器82元、298元、99元和199元性能及选择参考
|
5天前
|
开发框架 运维 应用服务中间件
阿里云轻量应用服务器82元和298元与云服务器99元和199元区别及选择参考
目前阿里云推出了几款价格比较实惠的轻量应用服务器和云服务器,轻量应用服务器有2核2G3M 50GB高效云盘,价格为82元1年;2核4G4M 60GB高效云盘,价格为298元1年;经济型e实例2核2G,40G ESSD Entry盘,3M带宽,价格为99元1年;通用算力型u1实例2核4G,80G ESSD Entry盘,5M带宽,价格为199元1年。本文将对这几款轻量应用服务器和云服务器进行对比和测评,分析其性能和适用场景,以供大家选择参考。
阿里云轻量应用服务器82元和298元与云服务器99元和199元区别及选择参考
|
8天前
|
运维 安全 网络安全
运维笔记:基于阿里云跨地域服务器通信
运维笔记:基于阿里云跨地域服务器通信
37 1
|
17天前
|
弹性计算 固态存储 ice
阿里云ECS服务器2核16G、4核32G和8核64G不同配置租赁价格表
2024年阿里云服务器提供多种配置与实例规格,如2核16G、4核32G及8核64G等,用户可根据需求选择内存型r8i、通用算力型u1等不同架构。以2核16G为例,r8i每月334.19元起,u1则为286.2元起。公网带宽与系统盘亦有多档价位。实际价格与折扣请参照官网。
|
14天前
|
存储 弹性计算 大数据
阿里云服务器详细介绍_ECS云服务器优势_云服务器问题解答FAQ
阿里云服务器ECS是一种安全可靠的云计算服务,具备弹性伸缩、高性能及易用性等特点。提供多样化的实例规格,如经济型e、通用算力型u1、计算型c7等,满足不同业务需求。用户可根据业务规模选择合适的计算架构、存储类型及付费模式(包年包月、按量付费等),同时享受专有网络VPC、快照备份及丰富的镜像类型支持。此外,ECS支持免费试用,帮助企业与个人快速上手。

热门文章

最新文章