【采坑-Flink消费kafka中的数据】阿里云ECS/VMware之zookeeper和kafka单机/集群环境

简介: 【采坑-Flink消费kafka中的数据】阿里云ECS/VMware之zookeeper和kafka单机/集群环境

一、环境


1.centos7.6

2.zookeeper-3.4.5

3.kafka_2.11-0.10.2.1

4.jdk1.8_261


二、单机-ECS-zookeeper


tar -zxvf zookeeper-3.4.5 -C /root/apps/

cp zoo_simple.cfg zoo.cfg

提前创建好数据和目录日志文件夹

dataDir=/root/data/zookeeper

dataLogDir=/root/data/zookeeperlog


三、单机-ECS-kafka


3.1 安装


tar -zxvf kafka_2.11-0.10.2.1 -C /root/apps/


3.2 修改server.properties

broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://localhost:9092
advertised.listeners=PLAINTEXT://localhost:9092
host.name=172.17.81.232 # 阿里云内网地址
advertised.host.name=47.94.39.202 # 阿里云外网地址
zookeeper.connect=localhost:2181 #zookeeper地址

3.3 启动本机测试


# 启动
bin/kafka-server-start.sh -daemon config/server.properties 
# 创建topic
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hanyaoyao
# 开启生产者
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hanyaoyao
# 开启消费者
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic hanyaoyao --from-beginning

四、VMware单机


4.1 zookeeper单机



五、VMware集群

hadoop1 192.168.52.200  
hadoop2 192.168.52.201  
hadoop3 192.168.52.202

5.1 zookeeper集群


1.解压安装
tar -zxvf zookeeper-3.4.5 -C /root/apps/
2.进入conf目录
cp zoo_simple.cfg  zoo.cfg  
3.数据目录【提前创建好集群的三个目录】
dataDir=/root/zkdata


4.集群配置


server.1=192.168.52.201:2888:3888  
server.2=192.168.52.202:2888:3888  
server.3=192.168.52.200:2888:3888


5.集群分发


scp -r zookeeper/ hadoop2:$PWD  
scp -r zookeeper/ hadoop3:$PWD


6.逐台启动

bin/zkStart.sh start

7.查看状态

bin/zkStart status

2020092209264179.png


5.2 kafka集群


1.tar -zxvf kafka_2.11-0.10.2.1 -C /root/apps/

2.修改配置文件如下:

broker.id=0
delete.topic.enable=true
listeners=PLAINTEXT://hadoop1:9092
host.name=hadoop1
log.dirs=/root/data/kafka
zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181

3.分发集群


scp  kafka_2.11-0.10.2.1/ hadoop2:$PWD
scp  kafka_2.11-0.10.2.1/ hadoop3:$PWD


4.修改hadoop2,hadoop3的集群编号

vi server.properties


broker.id=1
broker.id=2


5.逐台启动测试

1.启动
bin/kafka-server-start.sh -daemon config/server.properties  bin/kafka-server-start.sh -daemon config/server.properties
2..创建topic
bin/kafka-topics.sh --create --zookeeper  hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-factor 2 --partitions 2 --topic superman
# Created topic "superman".
3.创建生产者
bin/kafka-console-producer.sh --broker-list hadoop1:9092,hadoop2:9092,hadoop3:9092 --topic superman
4.创建消费者
bin/kafka-console-consumer.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --topic superman  --from-beginning

20200922093553660.png


六、Flink-Kafka


6.1 pom

    <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.9.1</version>
            <!-- provided在这表示此依赖只在代码编译的时候使用,运行和打包的时候不使用 --
            <!--<scope>provided</scope>-->
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.9.1</version>
            <!--<scope>provided</scope>-->
        </dependency>
                <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.9.1</version>
        </dependency>


6.2 Flink-KafkaSource

public class KafkaSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //kafka配置
        String topic = "superman";
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers","192.168.52.200:9092");//多个的话可以指定
        prop.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        prop.setProperty("auto.offset.reset","earliest");
        prop.setProperty("group.id","consumer3");
        FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop);
        DataStreamSource<String> lines = env.addSource(kafkaSource);
        lines.print();
        env.execute();
    }
}

七、错误解决


在连接kafkasource时候,总是消费不到kafka中的数据,开始怀疑以下问题:

1.zookeeper集群,kafka集群消息不通

2.宿主机与虚拟机网络不通

3.flink版本与kafka版本jar冲突

4.windows防火墙问题

5.hosts文件的主机名没有配置


最后经过查文档和排除问题,终于得知了zookeeper在集群中的消息是以主机名发送的,所以需要配置主机名。

20200922142213864.png

目录
相关文章
|
消息中间件 运维 算法
Kafka 为什么要抛弃 Zookeeper?
本文探讨了Kafka为何逐步淘汰ZooKeeper。长久以来,ZooKeeper作为Kafka的核心组件,负责集群管理和协调任务。然而,随着Kafka的发展,ZooKeeper带来的复杂性增加、性能瓶颈及一致性问题日益凸显。为解决这些问题,Kafka引入了KRaft,这是一种基于Raft算法的内置元数据管理方案,不仅简化了部署流程,还提升了系统的一致性和扩展性。本文详细分析了这一转变背后的原因及其带来的优势,并展望了Kafka未来的发展方向。
1084 1
|
消息中间件 运维 Java
搭建Zookeeper、Kafka集群
本文详细介绍了Zookeeper和Kafka集群的搭建过程,涵盖系统环境配置、IP设置、主机名设定、防火墙与Selinux关闭、JDK安装等基础步骤。随后深入讲解了Zookeeper集群的安装与配置,包括数据目录创建、节点信息设置、SASL认证配置及服务启动管理。接着描述了Kafka集群的安装,涉及配置文件修改、安全认证设置、生产消费认证以及服务启停操作。最后通过创建Topic、发送与查看消息等测试验证集群功能。全网可搜《小陈运维》获取更多信息。
1027 1
|
消息中间件 Java Kafka
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
本文介绍了Kafka集群的搭建过程,涵盖从虚拟机安装到集群测试的详细步骤。首先规划了集群架构,包括三台Kafka Broker节点,并说明了分布式环境下的服务进程配置。接着,通过VMware导入模板机并克隆出三台虚拟机(kafka-broker1、kafka-broker2、kafka-broker3),分别设置IP地址和主机名。随后,依次安装JDK、ZooKeeper和Kafka,并配置相应的环境变量与启动脚本,确保各组件能正常运行。最后,通过编写启停脚本简化集群的操作流程,并对集群进行测试,验证其功能完整性。整个过程强调了自动化脚本的应用,提高了部署效率。
3453 1
【手把手教你Linux环境下快速搭建Kafka集群】内含脚本分发教程,实现一键部署多个Kafka节点
|
消息中间件 监控 Ubuntu
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
516 3
大数据-54 Kafka 安装配置 环境变量配置 启动服务 Ubuntu配置 ZooKeeper
|
消息中间件 分布式计算 监控
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
大数据-78 Kafka 集群模式 集群的应用场景与Kafka集群的搭建 三台云服务器
378 6
|
消息中间件 Java Kafka
windows服务器重装系统之后,Kafka服务如何恢复?
windows服务器重装系统之后,Kafka服务如何恢复?
296 8
|
消息中间件 Java Kafka
ELFK对接zookeeper&kafka
ELFK对接zookeeper&kafka
|
资源调度 Java Scala
实时计算 Flink版产品使用问题之如何实现ZooKeeper抖动导致任务失败时,能从最近的检查点重新启动任务
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
消息中间件 存储 缓存
kafka 的数据是放在磁盘上还是内存上,为什么速度会快?
Kafka的数据存储机制通过将数据同时写入磁盘和内存,确保高吞吐量与持久性。其日志文件按主题和分区组织,使用预写日志(WAL)保证数据持久性,并借助操作系统的页缓存加速读取。Kafka采用顺序I/O、零拷贝技术和批量处理优化性能,支持分区分段以实现并行处理。示例代码展示了如何使用KafkaProducer发送消息。
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
632 1