2021年最新最全Flink系列教程__Flink高级特性和新特性(八)

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
实时计算 Flink 版,5000CU*H 3个月
简介: 2021年最新最全Flink系列教程__Flink高级特性和新特性(八)

day08_Flink高级特性和新特性

BroadcastState 状态管理

  • broadcast state 广播变量状态

  • 应用场景
    关联更新的规则,获取指定的数据(给ip得到经度纬度)=> 地图 API 获取到 省市区街道位置
  • 需求
    实时Flink DataStream 过滤出配置中(数据库)的用户,并在事件流中补全这批用户的基础信息。
  • 需求流程

  • 开发步骤
package cn.itcast.flink.broadcast;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.tuple.Tuple6;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
/**
 * Author itcast
 * Date 2021/6/24 8:29
 * 两个数据流  1.事件流 2.用户配置流 3.connect关联操作 4.打印输出 5.执行任务
 * <String,String,String,Integer></>
 * {"userID": "user_3", "eventTime": "2019-08-17 12:19:47", "eventType": "browse", "productID": 1}
 * {"userID": "user_2", "eventTime": "2019-08-17 12:19:48", "eventType": "click", "productID": 1}
 * <String, String, Integer ></>
 * 'user_2', '李四', 20
 * 最终的数据流  6个 Tuple6<String,String,String,Integer,String,Integer></>
 * (user_3,2019-08-17 12:19:47,browse,1,王五,33)
 * (user_2,2019-08-17 12:19:48,click,1,李四,20)
 */
public class BroadcastStateDemo {
    public static void main(String[] args) throws Exception {
        //1.env
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //设置并行度
        env.setParallelism(1);
        //2.source
        //-1.构建实时数据事件流-自定义随机
        //<userID, eventTime, eventType, productID>
        DataStreamSource<Tuple4<String, String, String, Integer>> clickSource = env.addSource(new MySource());
        //-2.构建配置流-从MySQL
        //<用户id,<姓名,年龄>>
        DataStreamSource<Map<String, Tuple2<String, Integer>>> configSource = env.addSource(new MySQLSource());
        //3.transformation
        //-1.定义状态描述器
        //MapStateDescriptor<Void, Map<String, Tuple2<String, Integer>>> descriptor =
        //new MapStateDescriptor<>("config",Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));
        MapStateDescriptor<Void, Map<String, Tuple2<String,Integer>>> broadcastDesc = new MapStateDescriptor<>("config", Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT)));
        //-2.广播配置流
        //BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDS = configDS.broadcast(descriptor);
        BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDS = configSource.broadcast(broadcastDesc);
        //-3.将事件流和广播流进行连接
        //BroadcastConnectedStream<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>> connectDS =eventDS.connect(broadcastDS);
        SingleOutputStreamOperator<Tuple6<String, String, String, Integer, String, Integer>> result = clickSource.connect(broadcastDS)
                //-4.处理连接后的流-根据配置流补全事件流中的用户的信息
                .process(new BroadcastProcessFunction<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>, Tuple6<String, String, String, Integer, String, Integer>>() {
                    @Override
                    public void processElement(Tuple4<String, String, String, Integer> value, ReadOnlyContext ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception {
                        //读取出来 f0 为 userId
                        //事件流中读取用户 userId
                        String userId = value.f0;
                        //从ctx环境变量中通过 desc 读取出来广播状态
                        ReadOnlyBroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(broadcastDesc);
                        //如果广播状态不为空,get(null) 获取出来 配置数据Tuple2
                        if (broadcastState != null) {
                            Map<String, Tuple2<String, Integer>> map = broadcastState.get(null);
                            //判断 map 不为空则
                            if (map != null) {
                                Tuple2<String, Integer> stringIntegerTuple2 = map.get(userId);
                                //取出姓名和年龄
                                //collect 收集 Tuple6
                                //3-4.处理(process)连接后的流-根据配置流补全事件流中的用户的信息,Tuple4和Tuple2合并
                                //处理每一条元素,processElement
                                out.collect(Tuple6.of(
                                        userId,
                                        value.f1,
                                        value.f2,
                                        value.f3,
                                        stringIntegerTuple2.f0,
                                        stringIntegerTuple2.f1
                                ));
                            }
                        }
                    }
                    //value就是MySQLSource中每隔一段时间获取到的最新的map数据
                    //先根据状态描述器获取历史的广播状态 ctx.getBroadcastState(desc)
                    @Override
                    public void processBroadcastElement(Map<String, Tuple2<String, Integer>> value, Context ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception {
                        //再清空历史状态 broadcastState 数据
                        BroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(broadcastDesc);
                        //最后将最新的广播流数据放到 state 中(更新状态数据) broadcastState.put(null,value)
                        broadcastState.clear();
                        broadcastState.put(null, value);
                    }
                });
        //处理广播中的元素
        //4.sinks
        result.print();
        //5.execute
        env.execute();
    }
    /**
     * <userID, eventTime, eventType, productID>
     */
    public static class MySource implements SourceFunction<Tuple4<String, String, String, Integer>> {
        private boolean isRunning = true;
        @Override
        public void run(SourceContext<Tuple4<String, String, String, Integer>> ctx) throws Exception {
            Random random = new Random();
            SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            while (isRunning){
                int id = random.nextInt(4) + 1;
                String user_id = "user_" + id;
                String eventTime = df.format(new Date());
                String eventType = "type_" + random.nextInt(3);
                int productId = random.nextInt(4);
                ctx.collect(Tuple4.of(user_id,eventTime,eventType,productId));
                Thread.sleep(500);
            }
        }
        @Override
        public void cancel() {
            isRunning = false;
        }
    }
    /**
     * <用户id,<姓名,年龄>>
     */
    public static class MySQLSource extends RichSourceFunction<Map<String, Tuple2<String, Integer>>> {
        private boolean flag = true;
        private Connection conn = null;
        private PreparedStatement ps = null;
        private ResultSet rs = null;
        @Override
        public void open(Configuration parameters) throws Exception {
            conn = DriverManager.getConnection("jdbc:mysql://node3:3306/bigdata?useSSL=false",
                    "root",
                    "123456");
            String sql = "select `userID`, `userName`, `userAge` from `user_info`";
            ps = conn.prepareStatement(sql);
        }
        @Override
        public void run(SourceContext<Map<String, Tuple2<String, Integer>>> ctx) throws Exception {
            while (flag){
                Map<String, Tuple2<String, Integer>> map = new HashMap<>();
                ResultSet rs = ps.executeQuery();
                while (rs.next()){
                    String userID = rs.getString("userID");
                    String userName = rs.getString("userName");
                    int userAge = rs.getInt("userAge");
                    //Map<String, Tuple2<String, Integer>>
                    map.put(userID, Tuple2.of(userName,userAge));
                }
                ctx.collect(map);
                Thread.sleep(5000);//每隔5s更新一下用户的配置信息!
            }
        }
        @Override
        public void cancel() {
            flag = false;
        }
        @Override
        public void close() throws Exception {
            if (conn != null) conn.close();
            if (ps != null) ps.close();
            if (rs != null) rs.close();
        }
    }
}
  • 实时的数据流和 动态变化的数据库中的配置流 进行 connect 操作, 打印输出

双流 JOIN

  • 多个数据流 DataStream 之间进行 JOIN 操作
  • 双流 JOIN 分为两大类: Window 窗口的join, Interval 的 join
  • Window窗口 分为 tumbling 窗口, sliding 窗口, session 窗口
  • Interval 包括 下届, 上届

  • 需求订单明细表和商品表每 5 秒中进行一个窗口 JOIN , 将结果落地并打印输出
  • 开发步骤
package cn.itcast.flink.broadcast;
import com.alibaba.fastjson.JSON;
import lombok.Data;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
 * Author itcast
 * Date 2021/6/24 9:40
 * Desc TODO
 */
public class JoinDemo {
    public static void main(String[] args) throws Exception {
        // 创建流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        // 构建商品数据流
        SingleOutputStreamOperator<Goods> goodsSource = env.addSource(new GoodsSource()).assignTimestampsAndWatermarks(new GoodsWatermark());
        // 构建订单明细数据流
        SingleOutputStreamOperator<OrderItem> orderItemSource = env.addSource(new OrderItemSource()).assignTimestampsAndWatermarks(new OrderItemWatermark());
        // 订单表 join 商品表 订单表.goodsId===商品表.goodsId
        DataStream<FactOrderItem> result = orderItemSource.join(goodsSource)
                .where(o -> o.goodsId)
                .equalTo(g -> g.goodsId)
                /// 窗口为滚动窗口 5 秒
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                /// apply 实现 (OrderItem first, Goods second) -> factOrderItem
                .apply((OrderItem first, Goods second) -> {
                    FactOrderItem factOrderItem = new FactOrderItem();
                    factOrderItem.setGoodsId(first.goodsId);
                    factOrderItem.setGoodsName(second.goodsName);
                    factOrderItem.setCount(new BigDecimal(first.count));
                    factOrderItem.setTotalMoney(new BigDecimal(first.count).multiply(second.goodsPrice));
                    return factOrderItem;
                });
        //打印输出
        result.print();
        //执行环境
        env.execute();
    }
    //商品类实体类
    @Data
    public static class Goods {
        private String goodsId;
        private String goodsName;
        private BigDecimal goodsPrice;
        public static List<Goods> GOODS_LIST;
        public static Random r;
        static  {
            r = new Random();
            GOODS_LIST = new ArrayList<>();
            GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890)));
            GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000)));
            GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));
            GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800)));
            GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200)));
            GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500)));
        }
        public static Goods randomGoods() {
            int rIndex = r.nextInt(GOODS_LIST.size());
            return GOODS_LIST.get(rIndex);
        }
        public Goods() {
        }
        public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) {
            this.goodsId = goodsId;
            this.goodsName = goodsName;
            this.goodsPrice = goodsPrice;
        }
        @Override
        public String toString() {
            return JSON.toJSONString(this);
        }
    }
    //订单明细实体类
    @Data
    public static class OrderItem {
        private String itemId;
        private String goodsId;
        private Integer count;
        @Override
        public String toString() {
            return JSON.toJSONString(this);
        }
    }
    //关联结果,落地表的实体表
    @Data
    public static class FactOrderItem {
        private String goodsId;
        private String goodsName;
        private BigDecimal count;
        private BigDecimal totalMoney;
        @Override
        public String toString() {
            return JSON.toJSONString(this);
        }
    }
    //构建一个商品Stream源(这个好比就是维表)
    public static class GoodsSource extends RichSourceFunction<Goods> {
        private Boolean isCancel;
        @Override
        public void open(Configuration parameters) throws Exception {
            isCancel = false;
        }
        @Override
        public void run(SourceContext sourceContext) throws Exception {
            while(!isCancel) {
                Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods));
                TimeUnit.SECONDS.sleep(1);
            }
        }
        @Override
        public void cancel() {
            isCancel = true;
        }
    }
    //构建订单明细Stream源
    public static class OrderItemSource extends RichSourceFunction<OrderItem>
    {
        private Boolean isCancel;
        private Random r;
        @Override
        public void open(Configuration parameters) throws Exception {
            isCancel = false;
            r = new Random();
        }
        @Override
        public void run(SourceContext sourceContext) throws Exception {
            while(!isCancel) {
                Goods goods = Goods.randomGoods();
                OrderItem orderItem = new OrderItem();
                orderItem.setGoodsId(goods.getGoodsId());
                orderItem.setCount(r.nextInt(10) + 1);
                orderItem.setItemId(UUID.randomUUID().toString());
                sourceContext.collect(orderItem);
                orderItem.setGoodsId("111");
                sourceContext.collect(orderItem);
                TimeUnit.SECONDS.sleep(1);
            }
        }
        @Override
        public void cancel() {
            isCancel = true;
        }
    }
    //构建水印分配器(此处为了简单),直接使用系统时间了
    public static class GoodsWatermark implements WatermarkStrategy<Goods> {
        @Override
        public TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (element, recordTimestamp) -> System.currentTimeMillis();
        }
        @Override
        public WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<Goods>() {
                @Override
                public void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
            };
        }
    }
    public static class OrderItemWatermark implements WatermarkStrategy<OrderItem> {
        @Override
        public TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (element, recordTimestamp) -> System.currentTimeMillis();
        }
        @Override
        public WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<OrderItem>() {
                @Override
                public void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
            };
        }
    }
}
  • 需求
    将商品数据和订单明细数据进行关联,间隔,上届 0(不包含),下届 -1(包含),统计数据并落地
  • 开发步骤
package cn.itcast.flink.broadcast;
import com.alibaba.fastjson.JSON;
import lombok.Data;
import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
/**
 * Author itcast
 * Desc
 */
public class JoinDemo02 {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 构建商品数据流
        DataStream<Goods> goodsDS = env.addSource(new GoodsSource11(), TypeInformation.of(Goods.class)).assignTimestampsAndWatermarks(new GoodsWatermark());
        // 构建订单明细数据流
        DataStream<OrderItem> orderItemDS = env.addSource(new OrderItemSource(), TypeInformation.of(OrderItem.class)).assignTimestampsAndWatermarks(new OrderItemWatermark());
        // 进行关联查询
        SingleOutputStreamOperator<FactOrderItem> factOrderItemDS = orderItemDS.keyBy(item -> item.getGoodsId())
                .intervalJoin(goodsDS.keyBy(goods -> goods.getGoodsId()))
                .between(Time.seconds(-1), Time.seconds(0))
                //上届的开区间,排除掉上届 [-1,0)
                .upperBoundExclusive()
                .process(new ProcessJoinFunction<OrderItem, Goods, FactOrderItem>() {
                    @Override
                    public void processElement(OrderItem left, Goods right, Context ctx, Collector<FactOrderItem> out) throws Exception {
                        FactOrderItem factOrderItem = new FactOrderItem();
                        factOrderItem.setGoodsId(right.getGoodsId());
                        factOrderItem.setGoodsName(right.getGoodsName());
                        factOrderItem.setCount(new BigDecimal(left.getCount()));
                        factOrderItem.setTotalMoney(right.getGoodsPrice().multiply(new BigDecimal(left.getCount())));
                        out.collect(factOrderItem);
                    }
                });
        factOrderItemDS.print();
        env.execute("Interval JOIN");
    }
    //商品类
    @Data
    public static class Goods {
        private String goodsId;
        private String goodsName;
        private BigDecimal goodsPrice;
        public static List<Goods> GOODS_LIST;
        public static Random r;
        static  {
            r = new Random();
            GOODS_LIST = new ArrayList<>();
            GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890)));
            GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000)));
            GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));
            GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800)));
            GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200)));
            GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500)));
        }
        public static Goods randomGoods() {
            int rIndex = r.nextInt(GOODS_LIST.size());
            return GOODS_LIST.get(rIndex);
        }
        public Goods() {
        }
        public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) {
            this.goodsId = goodsId;
            this.goodsName = goodsName;
            this.goodsPrice = goodsPrice;
        }
        @Override
        public String toString() {
            return JSON.toJSONString(this);
        }
    }
    //订单明细类
    @Data
    public static class OrderItem {
        private String itemId;
        private String goodsId;
        private Integer count;
        @Override
        public String toString() {
            return JSON.toJSONString(this);
        }
    }
    //关联结果
    @Data
    public static class FactOrderItem {
        private String goodsId;
        private String goodsName;
        private BigDecimal count;
        private BigDecimal totalMoney;
        @Override
        public String toString() {
            return JSON.toJSONString(this);
        }
    }
    //构建一个商品Stream源(这个好比就是维表)
    public static class GoodsSource11 extends RichSourceFunction {
        private Boolean isCancel;
        @Override
        public void open(Configuration parameters) throws Exception {
            isCancel = false;
        }
        @Override
        public void run(SourceContext sourceContext) throws Exception {
            while(!isCancel) {
                Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods));
                TimeUnit.SECONDS.sleep(1);
            }
        }
        @Override
        public void cancel() {
            isCancel = true;
        }
    }
    //构建订单明细Stream源
    public static class OrderItemSource extends RichSourceFunction {
        private Boolean isCancel;
        private Random r;
        @Override
        public void open(Configuration parameters) throws Exception {
            isCancel = false;
            r = new Random();
        }
        @Override
        public void run(SourceContext sourceContext) throws Exception {
            while(!isCancel) {
                Goods goods = Goods.randomGoods();
                OrderItem orderItem = new OrderItem();
                orderItem.setGoodsId(goods.getGoodsId());
                orderItem.setCount(r.nextInt(10) + 1);
                orderItem.setItemId(UUID.randomUUID().toString());
                sourceContext.collect(orderItem);
                orderItem.setGoodsId("111");
                sourceContext.collect(orderItem);
                TimeUnit.SECONDS.sleep(1);
            }
        }
        @Override
        public void cancel() {
            isCancel = true;
        }
    }
    //构建水印分配器(此处为了简单),直接使用系统时间了
    public static class GoodsWatermark implements WatermarkStrategy<Goods> {
        @Override
        public TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (element, recordTimestamp) -> System.currentTimeMillis();
        }
        @Override
        public WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<Goods>() {
                @Override
                public void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
            };
        }
    }
    public static class OrderItemWatermark implements WatermarkStrategy<OrderItem> {
        @Override
        public TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
            return (element, recordTimestamp) -> System.currentTimeMillis();
        }
        @Override
        public WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
            return new WatermarkGenerator<OrderItem>() {
                @Override
                public void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
                @Override
                public void onPeriodicEmit(WatermarkOutput output) {
                    output.emitWatermark(new Watermark(System.currentTimeMillis()));
                }
            };
        }
    }
}

Streaming File Sink

  • Sink 落地
  • Sink 分类
  1. sink MySQL
  2. sink Kafka
  3. sink Redis
  4. sink 控制台
  • Sink 落地到分布式文件系统上 HDFS 上
  • Sink 到文件系统 Streaming File Sink 落地使用应用场景
  1. 实时数据仓库
  2. 小时级的数据分析 等
  3. 抽取数据
  • 需求
    通过在 socket 数据流中将数据定时 2秒钟写入到 hdfs 上。
  • 开发步骤
package cn.itcast.flink.broadcast;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
/**
 * Author itcast
 * Date 2021/6/24 10:52
 * Desc TODO
 */
public class StreamingFileSinkDemo {
    public static void main(String[] args) throws Exception {
        //1.初始化流计算运行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //2.设置Checkpoint(10s)周期性启动 和 stateBackend 存储路径
        // Sink保证仅一次语义使用 checkpoint 和 二段提交
        env.enableCheckpointing(10000);
        env.setStateBackend(new FsStateBackend("file:///d:/chk/"));
        //4.接入socket数据源,获取数据
        DataStreamSource<String> source = env.socketTextStream("node1", 9999);
        //5.创建Streamingfilesink对象
        OutputFileConfig config = OutputFileConfig
                .builder()
                .withPartPrefix("crm")
                .withPartSuffix(".txt")
                .build();
        //5-1. 创建输出文件配置,指定输出路径 /FlinkStreamFileSink/parquet
        StreamingFileSink sink = StreamingFileSink.forRowFormat(new Path("hdfs://node1:8020/FlinkStreamFileSink/parquet"),
                new SimpleStringEncoder<String>("UTF-8"))
                // sink-kafka new FlinkKafkaProducer
                //5-2.StreamingFileSink 行格式化 , withBucketAssigner->DateTimeBucketAssigner
                .withBucketAssigner(new DateTimeBucketAssigner())
                //withRollingPolicy -> 默认滚筒策略
                .withRollingPolicy(DefaultRollingPolicy.builder()
                        .withMaxPartSize(128 * 1024 * 1024)
                        .withRolloverInterval(Time.seconds(2).toMilliseconds())
                        .withInactivityInterval(Time.seconds(2).toMilliseconds())
                        .build())
        //withOutputFileConfig -> 输出文件的配置
                .withOutputFileConfig(config)
                .build();
        //6.设置输出 sink
        source.print();
        source.addSink(sink).setParallelism(1);
        //7.执行任务
        env.execute();
    }
}
Sink 行格式化 , withBucketAssigner->DateTimeBucketAssigner
.withBucketAssigner(new DateTimeBucketAssigner())
//withRollingPolicy -> 默认滚筒策略
.withRollingPolicy(DefaultRollingPolicy.builder()
.withMaxPartSize(128 * 1024 * 1024)
.withRolloverInterval(Time.seconds(2).toMilliseconds())
.withInactivityInterval(Time.seconds(2).toMilliseconds())
.build())
//withOutputFileConfig -> 输出文件的配置
.withOutputFileConfig(config)
.build();
//6.设置输出 sink
source.print();
source.addSink(sink).setParallelism(1);
//7.执行任务
env.execute();
}
}

         


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
6月前
|
SQL 运维 API
Apache Flink 学习教程----持续更新
Apache Flink 学习教程----持续更新
291 0
|
6月前
|
流计算
JD Flink教程
JD Flink教程
47 0
|
6月前
|
Apache 流计算
Apache Flink教程
Apache Flink教程
263 0
|
1月前
|
SQL 大数据 API
大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等
大数据-118 - Flink DataSet 基本介绍 核心特性 创建、转换、输出等
54 0
|
5月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之CTAS特性只支持新增表,不支持删除表吗
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
资源调度 关系型数据库 MySQL
【Flink on YARN + CDC 3.0】神操作!看完这篇教程,你也能成为数据流处理高手!从零开始,一步步教会你在Flink on YARN模式下如何配置Debezium CDC 3.0,让你的数据库变更数据瞬间飞起来!
【8月更文挑战第15天】随着Apache Flink的普及,企业广泛采用Flink on YARN部署流处理应用,高效利用集群资源。变更数据捕获(CDC)工具在现代数据栈中至关重要,能实时捕捉数据库变化并转发给下游系统处理。本文以Flink on YARN为例,介绍如何在Debezium CDC 3.0中配置MySQL连接器,实现数据流处理。首先确保YARN上已部署Flink集群,接着安装Debezium MySQL连接器并配置Kafka Connect。最后,创建Flink任务消费变更事件并提交任务到Flink集群。通过这些步骤,可以构建出从数据库变更到实时处理的无缝数据管道。
298 2
|
6月前
|
机器学习/深度学习 消息中间件 算法
Flink ML的新特性解析与应用
本文整理自阿里巴巴算法专家赵伟波,在 Flink Forward Asia 2023 AI特征工程专场的分享。
129617 5
Flink ML的新特性解析与应用
|
6月前
|
Apache 流计算
Apache Flink教程----2.本地开发
Apache Flink教程----2.本地开发
70 0
|
6月前
|
Shell Apache 流计算
Apache Flink教程----1.安装初体验
Apache Flink教程----1.安装初体验
76 0
|
6月前
|
SQL 分布式计算 Java
2021年最新最全Flink系列教程__Flink综合案例(九)
2021年最新最全Flink系列教程__Flink综合案例(九)
67 0
下一篇
无影云桌面