实时即未来,车联网项目之原始终端数据实时ETL【二】

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 实时即未来,车联网项目之原始终端数据实时ETL【二】

Flink 将报文解析后的数据推送到 kafka 中


  • 步骤


1.开启 kafka 集群


# 三台节点都要开启 kafka 
[root@node01 kafka]# bin/kafka-server-start.sh -daemon config/server.properties


2.使用 kafka tool 连接 kafka 集群,创建 topic


# 第1种方式通过命令
bin/kafka-topics.sh --create --zookeeper node01:2181,node02:2181,node03:2181 --topic vehicledata --replication-factor 2 --partitions 3
# 查看 kafka topic 的列表
bin/kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181 --list
# 第2种 kafka tool 工具

51e925cc287c7ccb447e56d87081435e.png


3.通过 flink 将解析后的报文 json 字符串推送到 kafka 中


package cn.maynor.flink.source;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import javax.annotation.Nullable;
import java.util.Properties;
/**
 * Author maynor
 * Date 2021/9/20 9:11
 * 实现flink将数据写入到kafka集群中
 * 开发步骤:
 * 1.开启流处理环境
 * 2.设置并行度、chk、重启策略等参数
 * 3.创建FlinkKafkaProducer类
 * 3.1.配置属性
 * 4.设置数据源
 * 5.执行流处理环境
 */
public class FlinkKafkaWriter {
    public static void main(String[] args) {
        //1.开启流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //2.设置并行度、chk、重启策略等参数
        env.setParallelism(1);
        //2.1.读取车辆 json 数据
        DataStreamSource<String> source = env
                .readTextFile("F:\\1.授课视频\\4-车联网项目\\05_深圳24期\\全部讲义\\2-星途车联网系统第二章-原始终端数据实时ETL\\原始数据\\sourcedata.txt");
        //3.创建FlinkKafkaProducer类
        //3.1.配置属性
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "node01:9092,node02:9092,node03:9092");
        props.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "5");
        props.setProperty(ProducerConfig.ACKS_CONFIG, "0");
        //props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.flink.api.common.serialization.SimpleStringSchema");
        //3.2.实例化FlinkKafkaProducer
        FlinkKafkaProducer<String> producer = new FlinkKafkaProducer<>(
                "vehicledata",
                new KafkaSerializationSchema<String>() {
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
                        return new ProducerRecord(
                                "vehicledata",
                                element.getBytes()
                        );
                    }
                },
                props,
                FlinkKafkaProducer.Semantic.NONE
        );
        //4.设置数据源
        source.addSink(producer);
        //5.执行流处理环境
        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}


实时ETL开发


  • 创建模块 —— StreamingAnalysis
  • 导入项目的 pom 依赖
  • 常见包的含义 task , source ,sink ,entity
  • 配置文件的导入 conf.properties 和 logback.xml
  • 工具类的走读


。日期处理

。读取配置文件 静态代码块

。字符串常用工具 - 字符串翻转

。JSON 字符串转对象


原始数据的实时ETL设置


开发的流程


9e90b282ed55c72fc4817f86f304d7fc.png


开发的类名 —— KafkaSourceDataTask


    //todo 1.创建流执行环境
        //todo 2.设置并行度 ①配置文件并行度设置 ②客户端设置 flink run -p 2 ③在程序中 env.setParallel(2) ④算子上并行度(级别最高)
        //todo 3.开启checkpoint及相应的配置,最大容忍次数,最大并行checkpoint个数,checkpoint间最短间隔时间,checkpoint的最大
        //todo 容忍的超时时间,checkpoint如果取消是否删除checkpoint 等
        //todo 4.开启重启策略
        //todo 5. 读取kafka中的数据
        //todo 5.1 设置 FlinkKafkaConsumer
        //todo 5.2 配置参数
        //todo 5.3 消费 kafka 的offset 提交给 flink 来管理
        //todo 6 env.addSource
        //todo 7 打印输出
        //todo 8 将读取出来的 json 字符串转换成 maynorDataObj
        //todo 9 将数据拆分成正确的数据和异常的数据
        //todo 10 将正确的数据保存到 hdfs
        //todo 11 将错误的数据保存到 hdfs 上
        //todo 12 将正确的数据写入到 hbase 中
        //todo 8 执行流环境


设置 checkpoint 中 statebackend


  • 配置的地方有两种


1.配置文件中 flink-conf.yaml

2.在 job 中配置 env.setStateBackend()


  • 配置的方式三种


1.memorystatebackend

2.fsStatebackend

3.rocksdbStatebackend(状态特别大的使用)


  • 配置读取kafka的数据的设置


数据积压和反压机制


  • 就是生产的数据大于消费的数据的速度,造成数据的积压


  • 解决反压机制的方法


185c5bd26b5e05aa43fb39d5d4ae175d.png


通过 credit 和 反压策略解决数据堆积问题


d5825aa24ab8b66f0aa6f08e3ee1cb50.png


抽象 BaseTask 用于处理数据流和读取kafka数据


  • 将公共的固定的代码抽象出来 BaseTask 抽象类
  • 使用 Flink 的自带的 ParameterTool 来接收 client 或 配置文件中的配置
目录
相关文章
|
5月前
|
消息中间件 存储 NoSQL
离线与实时数据开发方案
离线与实时数据开发方案
104 0
|
数据采集 存储 监控
大数据的数据来源 - 数据采集的方式(数据接入的方式)
大数据处理关键技术一般包括:大数据采集、大数据预处理、大数据存储及管理、大数据分析及挖掘、大数据展现和应用(大数据检索、大数据可视化、大数据应用、大数据安全等)。下面主要介绍下大数据采集
4091 0
|
消息中间件 数据采集 SQL
1、电商数仓(用户行为采集平台)数据仓库概念、用户行为日志、业务数据、模拟数据、用户行为数据采集模块、日志采集Flume(一)
1、电商数仓(用户行为采集平台)数据仓库概念、用户行为日志、业务数据、模拟数据、用户行为数据采集模块、日志采集Flume(一)
|
2月前
|
存储 JavaScript 前端开发
ShareDB:构建实时应用从未如此简单
ShareDB:构建实时应用从未如此简单
60 0
EMQ
|
5月前
|
数据采集 消息中间件 并行计算
NeuronEX 3.2.0 发布:增强数据采集、分析计算和管理功能
工业边缘网关软件 NeuronEX 3.2.0 版本现已正式发布,本次发布带来了一系列的增强功能和新特性,旨在为用户提供更多数据采集、分析计算以及管理的能力。
EMQ
103 2
NeuronEX 3.2.0 发布:增强数据采集、分析计算和管理功能
|
数据采集 小程序 前端开发
IoT小程序在展示中央空调采集数据和实时运行状态上的应用
IoT小程序框架在跨系统平台(AliOS Things、Ubuntu、Linux、MacOS、Window等)方面提供了非常优秀的基础能力,应用的更新升级提供了多种方式,在实际业务开发过程中可以灵活选择。IoT小程序框架通过JSAPI提供了调用系统底层应用的能力,同时提供了自定义JSAPI扩展封装的方法,这样就足够业务开发通过自定义的方式满足特殊的业务需求。 IoT小程序在前端框架能力、应用框架能力、图形框架能力都进行了适配和优化。那么接下来,我们按照其官方步骤搭建开发环境,然后结合中央空调数据采集和状态显示的实际应用场景开发物联网小程序应用。
23887 63
IoT小程序在展示中央空调采集数据和实时运行状态上的应用
|
消息中间件 数据采集 JSON
1、电商数仓(用户行为采集平台)数据仓库概念、用户行为日志、业务数据、模拟数据、用户行为数据采集模块、日志采集Flume(二)
1、电商数仓(用户行为采集平台)数据仓库概念、用户行为日志、业务数据、模拟数据、用户行为数据采集模块、日志采集Flume(二)
|
数据采集 消息中间件 分布式计算
大数据数据采集的数据采集(收集/聚合)的Logstash之数据采集流程的output
在大数据领域,数据采集是非常重要的一环。而Logstash作为一个开源的数据收集引擎,可以帮助我们轻松地实现数据的采集、聚合和传输等功能。本文将会对Logstash之数据采集流程的Output进行详细介绍。
112 1
|
数据采集 消息中间件 监控
大数据数据采集的数据采集(收集/聚合)的Logstash之数据采集流程的input
在大数据领域,数据采集是非常重要的一环。而Logstash作为一个开源的数据收集引擎,可以帮助我们轻松地实现数据的采集、聚合和传输等功能。本文将会对Logstash之数据采集流程的Input进行详细介绍。
118 1
|
数据采集 Java 大数据
大数据数据采集的数据采集(收集/聚合)的Logstash之强大的插件功能
在大数据领域中,Logstash是一款非常流行的数据采集工具。它具有丰富的插件功能,可以完成各种不同数据来源的数据采集任务。本文将介绍Logstash的插件功能,并为大家介绍几款强大的插件。
173 1