flink 维表join(一):广播流的使用

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: flink 维表join(一):广播流的使用

flink 维表join 的方式很多,本文主要讲述的是广播维表的方式进行join.

实现方式:

利用broadcast State将维度数据流广播到下游做join
将维度数据发送到kakfa作为广播原始流S1
定义状态描述符MapStateDescriptor 调用S1.broadcast()获得broadCastStream S2
调用非广播流S3.connect(S2),得到BroadcastConnectedStream S4
应用混合流的S4.process(),并在KeyedBroadcastProcessFunction/BroadcastProcessFunction实现关联处理逻辑

优点:

维度数据实时更新

数据保存在内存中,支持维表数据量比较小。

下面为大家贴上完整版demo:

逻辑如下:定时读取mysql 维度表数据,关联kafka 数据,分别写入到pulsar 和dorisdb中。

1: pom.xml

<repositories>
        <repository>
            <id>dorisdb-maven-releases</id>
            <url>http://dorisdbvisitor:dorisdbvisitor134@nexus.dorisdb.com/repository/maven-releases/</url>
        </repository>
        <repository>
            <id>dorisdb-maven-snapshots</id>
            <url>http://dorisdbvisitor:dorisdbvisitor134@nexus.dorisdb.com/repository/maven-snapshots/</url>
        </repository>
    </repositories>
         <dependency>
            <groupId>com.dorisdb.connector</groupId>
            <artifactId>flink-connector-doris</artifactId>
            <version>1.0.30-SNAPSHOT</version>  <!-- for flink-1.11 ~ flink-1.12 -->
            <scope>provided</scope>
        </dependency>
          <!-- flink-pulsar依赖-->
        <dependency>
            <groupId>io.streamnative.connectors</groupId>
            <artifactId>pulsar-flink-connector_2.11</artifactId>
            <version>1.12.3.0</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.pulsar</groupId>
            <artifactId>pulsar-client</artifactId>
            <version>2.7.1</version>
            <scope>provided</scope>
        </dependency>

2: DorisSource

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
public class DorisSource extends RichParallelSourceFunction<EventDim>{
    private PreparedStatement ps;
    private Connection connection;
    private volatile int lastUpdateMin = -1;
    private volatile boolean isRunning;
    /**
     * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
     **/
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        isRunning = true;
        Class.forName("com.mysql.cj.jdbc.Driver");
        connection = DriverManager.getConnection("jdbc:mysql://sd.dorisdb-fe001:9030/fengmi?characterEncoding=UTF-8", "xxxx", "xxxx");
        String sql="select * from test_dim";
        ps = connection.prepareStatement(sql);
    }
    @Override
    public void close() throws Exception {
        super.close();
        if (connection != null) {
            connection.close();
        }
        if (ps != null) {
            ps.close();
        }
    }
    /**
     * DataStream 调用一次 run() 方法用来获取数据
     **/
    @Override
    public void run(SourceContext<EventDim> sourceContext) throws Exception {
       try {
            while (isRunning) {
                LocalDateTime date = LocalDateTime.now();
                int min = date.getMinute();
                ResultSet rs = ps.executeQuery();
                if(min != lastUpdateMin) {
                    lastUpdateMin = min;            
                    while (resultSet.next()) {
                      EventDim eventDim = new EventDim();
                      eventDim.setXXXX1(resultSet.getString("xxxxx1"));
                      eventDim.setXXXX2(resultSet.getString("xxxx2"));
                      sourceContext.collect(eventDim);
                  }
                }
                //每隔一小时执行一次查询
                Thread.sleep(1000 * 3600);
                }
        } catch (Exception e){
            System.out.println("Mysql data update error.."+e.getMessage());
        }
    }
    @Override
    public void cancel() {
         isRunning=false;
    }
}

3: BroadCastJoinDemo

import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink;
import org.apache.flink.streaming.connectors.pulsar.config.RecordSchemaType;
import org.apache.flink.streaming.connectors.pulsar.internal.JsonSer;
import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions;
import org.apache.flink.streaming.connectors.pulsar.table.PulsarSinkSemantic;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.flink.streaming.util.serialization.PulsarSerializationSchema;
import org.apache.flink.streaming.util.serialization.PulsarSerializationSchemaWrapper;
import org.apache.flink.util.Collector;
import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
import java.text.SimpleDateFormat;
import java.util.Optional;
import java.util.Properties;
public class SmartNewsSentimentTagSink2Dorisdb {
    public static void main(String[] args) throws Exception {
        //创建基本环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.disableOperatorChaining();
        env.enableCheckpointing(3 * 1000 * 60, CheckpointingMode.EXACTLY_ONCE);
        env.getCheckpointConfig().setCheckpointTimeout(1 * 1000 * 60);
        env.setStateBackend(new MemoryStateBackend());
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(0, 3 * 1000 * 60));
        // 设置kafka相关配置
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "sd-kafka001:9092,sd-kafka002:9092,sd-kafka003:9092");
        properties.setProperty("group.id","gp_sdbd-etl-test01");
        properties.setProperty("request.timeout.ms", "600000");
        properties.setProperty("enable.auto.commit","false");
        //从kafka获取数据
        FlinkKafkaConsumer<ObjectNode> kafkaConsumer = new FlinkKafkaConsumer<>("smartnews-sentiment",
                new JSONKeyValueDeserializationSchema(true),
                properties
        );
        kafkaConsumer.setStartFromEarliest();   // 从最开始处消费
        DataStreamSource<ObjectNode> dataStreamSource = env.addSource(kafkaConsumer);    // 添加kafkaSource
        // 读取dorisdb 数据
        SingleOutputStreamOperator<Tuple2<String, String>> dorisDataStreamSource = env.addSource(new DorisSource()).map(new MapFunction<EventDim, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> map(EventDim value) throws Exception {
                return Tuple2.of(value.xxxx1,value.xxxx2);
            }
        });
        // 解析数据
        SingleOutputStreamOperator<Tuple5<String, String, String, String, String>> sentimentSource = ...
        //广播数据的状态描述器
        MapStateDescriptor<String, String> mapStateDescriptor = new MapStateDescriptor<String, String>(
                "broadcasr-state",
                String.class,
                String.class
        );
        // 广播dorisdb数据进行广播
        BroadcastStream<Tuple2<String, String>> broadcastStream = dorisDataStreamSource.broadcast(mapStateDescriptor);
        // 将非广播的流和广播的流connect连接到一起
        BroadcastConnectedStream<Tuple5<String, String, String, String, String>, Tuple2<String, String>> connected = sentimentSource.connect(broadcastStream);
        // new BroadcastProcessFuction传入的参数分别为:非广播的流的数据类型,广播的流的数据类型,经过处理之后返回的流的数据类型
       SingleOutputStreamOperator<SentimentBean> broadCastSource = connected.process(new BroadcastProcessFunction<Tuple5<String, String, String, String, String>, Tuple2<String, String>, SentimentBean>() {
           // 处理每一条数据
            @Override
            public void processElement(Tuple5<String, String, String, String, String> input, ReadOnlyContext ctx, Collector<SentimentBean> out) throws Exception {
                ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(mapStateDescriptor);
                .....
                //通过event关联broadcastState得到eventcode
                String eventCode = broadcastState.get(event);
                out.collect(new SentimentBean(keyId, id, event, eventCode, publish_time, stock_code));
            }
            // 获取广播数据
            @Override
            public void processBroadcastElement(Tuple2<String, String> tp, Context ctx, Collector<SentimentBean> out) throws Exception {
                ctx.getBroadcastState(mapStateDescriptor).put(tp.f0, tp.f1);
            }
        });
        // sink 到dorisdb jdbc方式
        broadCastSource.addSink(new DorisSink());
        // 将pojo 转化为json形式的字符串,pojo方式写入pulsar会报序列化相关错误
       SingleOutputStreamOperator<String> dataStream = broadCastSource.map(new MapFunction<SentimentBean, String>() {
            @Override
            public String map(SentimentBean value) throws Exception {
                return JSON.toJSONString(value);
            }
        });
        // 定义pulsar sink
        String topic = "persistent://fycmb/fengyan-namespace/mytopic";
        Properties props = new Properties();
        props.setProperty("topic", topic);
        props.setProperty("partition.discovery.interval-millis", "5000");
        props.setProperty(PulsarOptions.FLUSH_ON_CHECKPOINT_OPTION_KEY, "false");
        ClientConfigurationData conf = new ClientConfigurationData();
        conf.setServiceUrl("pulsar://ip:6650");
        FlinkPulsarSink pulsarSink = new FlinkPulsarSink(
                "http://ip:8080",
                Optional.of(topic),
                conf,
                props,
                new PulsarSerializationSchemaWrapper.Builder<>(new SimpleStringSchema()).useAtomicMode(DataTypes.STRING()).build()
                , PulsarSinkSemantic.AT_LEAST_ONCE
        );
        // sink 到pulsar
       dataStream.addSink(pulsarSink);
       env.execute("SmartNewsSentimentTag job");
     }
  }
}

4: DorisSink

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
public class Sentiment2Doris extends RichSinkFunction<SentimentBean> {
    PreparedStatement ps;
    Connection connection;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ....
        ps = this.connection.prepareStatement(sql);
    }
    @Override
    public void invoke(SentimentBean value, Context context) throws Exception {
            ....
            ps.executeUpdate();
    }
    @Override
    public void close() throws Exception {
        //关闭连接和释放资源
        if (ps != null) {
            ps.close();
        }
        if (connection != null) {
            connection.close();
        }
        super.close();
    }
}
    相关实践学习
    基于Hologres轻松玩转一站式实时仓库
    本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
    Linux入门到精通
    本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
    相关文章
    |
    4月前
    |
    消息中间件 SQL Kafka
    实时计算 Flink版产品使用问题之使用StarRocks作为Lookup Join的表是否合适
    实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
    |
    6月前
    |
    消息中间件 存储 Kafka
    实时计算 Flink版产品使用问题之 从Kafka读取数据,并与两个仅在任务启动时读取一次的维度表进行内连接(inner join)时,如果没有匹配到的数据会被直接丢弃还是会被存储在内存中
    实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
    |
    4月前
    |
    资源调度 监控 关系型数据库
    实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
    在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
    实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
    |
    4月前
    |
    存储 监控 Oracle
    实时计算 Flink版产品使用问题之如何解决双流Join导致的状态膨胀和资源压力问题
    实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
    |
    6月前
    |
    Java 数据处理 Apache
    实时计算 Flink版产品使用问题之lookup Join hologres的维表,是否可以指定查bitmap
    实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
    |
    7月前
    |
    SQL 消息中间件 数据库
    实时计算 Flink版操作报错合集之监听表和维表join的时,维表的字段超过两个时就会报错如何解决
    在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
    67 1
    |
    7月前
    |
    SQL 数据处理 Apache
    实时计算 Flink版产品使用合集之如何给join设置parallelism
    实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStreamAPI、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
    |
    7月前
    |
    消息中间件 关系型数据库 MySQL
    实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
    在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
    151 0
    |
    7月前
    |
    SQL 消息中间件 Kafka
    实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
    在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
    105 0
    |
    3月前
    |
    运维 数据处理 数据安全/隐私保护
    阿里云实时计算Flink版测评报告
    该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。