flink 维表join 的方式很多,本文主要讲述的是广播维表的方式进行join.
实现方式:
利用broadcast State将维度数据流广播到下游做join 将维度数据发送到kakfa作为广播原始流S1 定义状态描述符MapStateDescriptor 调用S1.broadcast()获得broadCastStream S2 调用非广播流S3.connect(S2),得到BroadcastConnectedStream S4 应用混合流的S4.process(),并在KeyedBroadcastProcessFunction/BroadcastProcessFunction实现关联处理逻辑
优点:
维度数据实时更新
数据保存在内存中,支持维表数据量比较小。
下面为大家贴上完整版demo:
逻辑如下:定时读取mysql 维度表数据,关联kafka 数据,分别写入到pulsar 和dorisdb中。
1: pom.xml
<repositories> <repository> <id>dorisdb-maven-releases</id> <url>http://dorisdbvisitor:dorisdbvisitor134@nexus.dorisdb.com/repository/maven-releases/</url> </repository> <repository> <id>dorisdb-maven-snapshots</id> <url>http://dorisdbvisitor:dorisdbvisitor134@nexus.dorisdb.com/repository/maven-snapshots/</url> </repository> </repositories> <dependency> <groupId>com.dorisdb.connector</groupId> <artifactId>flink-connector-doris</artifactId> <version>1.0.30-SNAPSHOT</version> <!-- for flink-1.11 ~ flink-1.12 --> <scope>provided</scope> </dependency> <!-- flink-pulsar依赖--> <dependency> <groupId>io.streamnative.connectors</groupId> <artifactId>pulsar-flink-connector_2.11</artifactId> <version>1.12.3.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.pulsar</groupId> <artifactId>pulsar-client</artifactId> <version>2.7.1</version> <scope>provided</scope> </dependency>
2: DorisSource
import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; public class DorisSource extends RichParallelSourceFunction<EventDim>{ private PreparedStatement ps; private Connection connection; private volatile int lastUpdateMin = -1; private volatile boolean isRunning; /** * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接 **/ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); isRunning = true; Class.forName("com.mysql.cj.jdbc.Driver"); connection = DriverManager.getConnection("jdbc:mysql://sd.dorisdb-fe001:9030/fengmi?characterEncoding=UTF-8", "xxxx", "xxxx"); String sql="select * from test_dim"; ps = connection.prepareStatement(sql); } @Override public void close() throws Exception { super.close(); if (connection != null) { connection.close(); } if (ps != null) { ps.close(); } } /** * DataStream 调用一次 run() 方法用来获取数据 **/ @Override public void run(SourceContext<EventDim> sourceContext) throws Exception { try { while (isRunning) { LocalDateTime date = LocalDateTime.now(); int min = date.getMinute(); ResultSet rs = ps.executeQuery(); if(min != lastUpdateMin) { lastUpdateMin = min; while (resultSet.next()) { EventDim eventDim = new EventDim(); eventDim.setXXXX1(resultSet.getString("xxxxx1")); eventDim.setXXXX2(resultSet.getString("xxxx2")); sourceContext.collect(eventDim); } } //每隔一小时执行一次查询 Thread.sleep(1000 * 3600); } } catch (Exception e){ System.out.println("Mysql data update error.."+e.getMessage()); } } @Override public void cancel() { isRunning=false; } }
3: BroadCastJoinDemo
import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.pulsar.FlinkPulsarSink; import org.apache.flink.streaming.connectors.pulsar.config.RecordSchemaType; import org.apache.flink.streaming.connectors.pulsar.internal.JsonSer; import org.apache.flink.streaming.connectors.pulsar.internal.PulsarOptions; import org.apache.flink.streaming.connectors.pulsar.table.PulsarSinkSemantic; import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema; import org.apache.flink.streaming.util.serialization.PulsarSerializationSchema; import org.apache.flink.streaming.util.serialization.PulsarSerializationSchemaWrapper; import org.apache.flink.util.Collector; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import java.text.SimpleDateFormat; import java.util.Optional; import java.util.Properties; public class SmartNewsSentimentTagSink2Dorisdb { public static void main(String[] args) throws Exception { //创建基本环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.disableOperatorChaining(); env.enableCheckpointing(3 * 1000 * 60, CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointTimeout(1 * 1000 * 60); env.setStateBackend(new MemoryStateBackend()); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(0, 3 * 1000 * 60)); // 设置kafka相关配置 Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "sd-kafka001:9092,sd-kafka002:9092,sd-kafka003:9092"); properties.setProperty("group.id","gp_sdbd-etl-test01"); properties.setProperty("request.timeout.ms", "600000"); properties.setProperty("enable.auto.commit","false"); //从kafka获取数据 FlinkKafkaConsumer<ObjectNode> kafkaConsumer = new FlinkKafkaConsumer<>("smartnews-sentiment", new JSONKeyValueDeserializationSchema(true), properties ); kafkaConsumer.setStartFromEarliest(); // 从最开始处消费 DataStreamSource<ObjectNode> dataStreamSource = env.addSource(kafkaConsumer); // 添加kafkaSource // 读取dorisdb 数据 SingleOutputStreamOperator<Tuple2<String, String>> dorisDataStreamSource = env.addSource(new DorisSource()).map(new MapFunction<EventDim, Tuple2<String, String>>() { @Override public Tuple2<String, String> map(EventDim value) throws Exception { return Tuple2.of(value.xxxx1,value.xxxx2); } }); // 解析数据 SingleOutputStreamOperator<Tuple5<String, String, String, String, String>> sentimentSource = ... //广播数据的状态描述器 MapStateDescriptor<String, String> mapStateDescriptor = new MapStateDescriptor<String, String>( "broadcasr-state", String.class, String.class ); // 广播dorisdb数据进行广播 BroadcastStream<Tuple2<String, String>> broadcastStream = dorisDataStreamSource.broadcast(mapStateDescriptor); // 将非广播的流和广播的流connect连接到一起 BroadcastConnectedStream<Tuple5<String, String, String, String, String>, Tuple2<String, String>> connected = sentimentSource.connect(broadcastStream); // new BroadcastProcessFuction传入的参数分别为:非广播的流的数据类型,广播的流的数据类型,经过处理之后返回的流的数据类型 SingleOutputStreamOperator<SentimentBean> broadCastSource = connected.process(new BroadcastProcessFunction<Tuple5<String, String, String, String, String>, Tuple2<String, String>, SentimentBean>() { // 处理每一条数据 @Override public void processElement(Tuple5<String, String, String, String, String> input, ReadOnlyContext ctx, Collector<SentimentBean> out) throws Exception { ReadOnlyBroadcastState<String, String> broadcastState = ctx.getBroadcastState(mapStateDescriptor); ..... //通过event关联broadcastState得到eventcode String eventCode = broadcastState.get(event); out.collect(new SentimentBean(keyId, id, event, eventCode, publish_time, stock_code)); } // 获取广播数据 @Override public void processBroadcastElement(Tuple2<String, String> tp, Context ctx, Collector<SentimentBean> out) throws Exception { ctx.getBroadcastState(mapStateDescriptor).put(tp.f0, tp.f1); } }); // sink 到dorisdb jdbc方式 broadCastSource.addSink(new DorisSink()); // 将pojo 转化为json形式的字符串,pojo方式写入pulsar会报序列化相关错误 SingleOutputStreamOperator<String> dataStream = broadCastSource.map(new MapFunction<SentimentBean, String>() { @Override public String map(SentimentBean value) throws Exception { return JSON.toJSONString(value); } }); // 定义pulsar sink String topic = "persistent://fycmb/fengyan-namespace/mytopic"; Properties props = new Properties(); props.setProperty("topic", topic); props.setProperty("partition.discovery.interval-millis", "5000"); props.setProperty(PulsarOptions.FLUSH_ON_CHECKPOINT_OPTION_KEY, "false"); ClientConfigurationData conf = new ClientConfigurationData(); conf.setServiceUrl("pulsar://ip:6650"); FlinkPulsarSink pulsarSink = new FlinkPulsarSink( "http://ip:8080", Optional.of(topic), conf, props, new PulsarSerializationSchemaWrapper.Builder<>(new SimpleStringSchema()).useAtomicMode(DataTypes.STRING()).build() , PulsarSinkSemantic.AT_LEAST_ONCE ); // sink 到pulsar dataStream.addSink(pulsarSink); env.execute("SmartNewsSentimentTag job"); } } }
4: DorisSink
import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; public class Sentiment2Doris extends RichSinkFunction<SentimentBean> { PreparedStatement ps; Connection connection; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); .... ps = this.connection.prepareStatement(sql); } @Override public void invoke(SentimentBean value, Context context) throws Exception { .... ps.executeUpdate(); } @Override public void close() throws Exception { //关闭连接和释放资源 if (ps != null) { ps.close(); } if (connection != null) { connection.close(); } super.close(); } }