【Flink-需求】RichMapFunction实现活动数据实时计算关联维度信息

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【Flink-需求】RichMapFunction实现活动数据实时计算关联维度信息

一、需求分析


1.维度信息,关联mysql数据库查询

image.png

2.数据为以下格式:

user001,A1,2020-09-23 10:10:10,2,北京市

user002,A3,2020-09-23 10:10:10,1,上海市

user003,A2,2020-09-23 10:10:10,2,苏州市

user002,A3,2020-09-23 10:10:10,1,辽宁市

user001,A2,2020-09-23 10:10:10,2,北京市

user002,A2,2020-09-23 10:10:10,1,上海市

user003,A1,2020-09-23 10:10:10,1,北京市

经过实时计算Flink处理后变成了如下格式

user001,新人礼物,2020-09-23 10:10:10,2,北京市

user002,年终礼物,2020-09-23 10:10:10,1,上海市

user003,月末礼物,2020-09-23 10:10:10,2,苏州市


二、环境要求


1.zookeeper

2.kafka

20200923111838359.png

3.创建topic


bin/kafka-topics.sh --create --zookeeper  hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-f


三、flink程序


3.1 Activity实体类

public class ActivityBean {
    public String uid;
    public String aid;
    public String activityName;
    public String time;
    public int eventType;
    public String province;
    public ActivityBean() {
    }
    public ActivityBean(String uid, String aid, String activityName, String time, int eventType, String province) {
        this.uid = uid;
        this.aid = aid;
        this.activityName = activityName;
        this.time = time;
        this.eventType = eventType;
        this.province = province;
    }
    @Override
    public String toString() {
        return "ActivityBean{" +
                "uid='" + uid + '\'' +
                ", aid='" + aid + '\'' +
                ", activityName='" + activityName + '\'' +
                ", time='" + time + '\'' +
                ", eventType=" + eventType +
                ", province='" + province + '\'' +
                '}';
    }
    public static ActivityBean of(String uid,String aid,String activityName,String time,int eventType,String province){
        return new ActivityBean(uid,aid,activityName,time,eventType,province);
    }
}

3.2 flink实时计算程序


public class ActivityCount {
        public static void main(String[] args) throws Exception {
            //1.获取环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //2.kafka配置
            String topic = "activity";
            Properties prop = new Properties();
            prop.setProperty("bootstrap.servers", "192.168.52.200:9092");//多个的话可以指定
            prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            prop.setProperty("auto.offset.reset", "earliest");
            prop.setProperty("group.id", "consumer1");
            FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), prop);
            //3.获取数据
            DataStream<String> lines = env.addSource(myConsumer);
            SingleOutputStreamOperator<ActivityBean> beans = lines.map(new RichMapFunction<String, ActivityBean>() {
                private Connection connection = null;
                // 4.连接数据库
                @Override
                public void open(Configuration parameters) throws Exception {
                    super.open(parameters);
                    connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink?characterEncoding=UTF-8","root","123456");
                }
                @Override
                public ActivityBean map(String line) throws Exception {
                    String[] fields = line.split(",");
                    String uid = fields[0];
                    String aid = fields[1];
                    // 5.查询条件为aid 活动标号,查出活动的名称
                    PreparedStatement preparedStatement = connection.prepareStatement("select name from activities where id = ?");
                    preparedStatement.setString(1, aid);
                    ResultSet resultSet = preparedStatement.executeQuery();
                    String name = null;
                    while (resultSet.next()) {
                        name = resultSet.getString(1);
                    }
                    preparedStatement.close();
                    String time = fields[2];
                    int eventType = Integer.parseInt(fields[3]);
                    String province = fields[4];
                    return ActivityBean.of(uid, aid, name, time, eventType, province);
                }
                // 6.关闭数据库连接
                @Override
                public void close() throws Exception {
                    super.close();
                    connection.close();
                }
            });
            beans.print();
            //7.执行
            env.execute("StreamingActivity");
        }
}


四、测试


1.开启kafka生产者


bin/kafka-console-producer.sh --broker-list 192.168.52.200:9092,192.168.52.201:9092,19


2.运行flink程序

3.运行结果:

20200923113044500.png

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1天前
|
Oracle Java 关系型数据库
实时计算 Flink版操作报错合集之本地打成jar包,运行报错,idea运行不报错,是什么导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
10 0
|
1天前
|
消息中间件 Oracle 关系型数据库
实时计算 Flink版操作报错合集之在连接Oracle 19c时报错如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
16 0
|
1天前
|
消息中间件 Kubernetes Java
实时计算 Flink版操作报错合集之写入 Kafka 报错 "Failed to send data to Kafka: Failed to allocate memory within the configured max blocking time 60000 ms",该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
11 0
|
1天前
|
SQL 关系型数据库 数据库
实时计算 Flink版操作报错合集之遇到报错:"An OperatorEvent from an OperatorCoordinator to a task was lost. Triggering task failover to ensure consistency." ,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
10 0
|
1天前
|
消息中间件 Oracle 关系型数据库
实时计算 Flink版操作报错合集之报错io.debezium.DebeziumException: The db history topic or its content is fully or partially missing. Please check database history topic configuration and re-execute the snapshot. 是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
13 0
|
1天前
|
SQL 存储 关系型数据库
实时计算 Flink版操作报错合集之向Hudi写入数据时遇到错误如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
14 0
|
1天前
|
Oracle 关系型数据库 数据库
实时计算 Flink版操作报错合集之执行Flink job,报错“Could not execute SQL statement. Reason:org.apache.flink.table.api.ValidationException: One or more required options are missing”,该怎么办
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
10 0
|
2天前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之遇到报错:Apache Kafka Connect错误如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
13 5
|
2天前
|
SQL 关系型数据库 MySQL
实时计算 Flink版操作报错合集之报错:org.apache.flink.table.api.validationexception如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
11 1
|
2天前
|
存储 SQL 关系型数据库
实时计算 Flink版操作报错合集之报错:WARN (org.apache.kafka.clients.consumer.ConsumerConfig:logUnused)这个错误如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
13 3