day08_Flink高级特性和新特性
BroadcastState 状态管理
- broadcast state 广播变量状态
- 应用场景
关联更新的规则,获取指定的数据(给ip得到经度纬度)=> 地图 API 获取到 省市区街道位置 - 需求
实时Flink DataStream 过滤出配置中(数据库)的用户,并在事件流中补全这批用户的基础信息。 - 需求流程
- 开发步骤
package cn.itcast.flink.broadcast; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.tuple.Tuple6; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.util.Collector; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.text.SimpleDateFormat; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Random; /** * Author itcast * Date 2021/6/24 8:29 * 两个数据流 1.事件流 2.用户配置流 3.connect关联操作 4.打印输出 5.执行任务 * <String,String,String,Integer></> * {"userID": "user_3", "eventTime": "2019-08-17 12:19:47", "eventType": "browse", "productID": 1} * {"userID": "user_2", "eventTime": "2019-08-17 12:19:48", "eventType": "click", "productID": 1} * <String, String, Integer ></> * 'user_2', '李四', 20 * 最终的数据流 6个 Tuple6<String,String,String,Integer,String,Integer></> * (user_3,2019-08-17 12:19:47,browse,1,王五,33) * (user_2,2019-08-17 12:19:48,click,1,李四,20) */ public class BroadcastStateDemo { public static void main(String[] args) throws Exception { //1.env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //设置并行度 env.setParallelism(1); //2.source //-1.构建实时数据事件流-自定义随机 //<userID, eventTime, eventType, productID> DataStreamSource<Tuple4<String, String, String, Integer>> clickSource = env.addSource(new MySource()); //-2.构建配置流-从MySQL //<用户id,<姓名,年龄>> DataStreamSource<Map<String, Tuple2<String, Integer>>> configSource = env.addSource(new MySQLSource()); //3.transformation //-1.定义状态描述器 //MapStateDescriptor<Void, Map<String, Tuple2<String, Integer>>> descriptor = //new MapStateDescriptor<>("config",Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT))); MapStateDescriptor<Void, Map<String, Tuple2<String,Integer>>> broadcastDesc = new MapStateDescriptor<>("config", Types.VOID, Types.MAP(Types.STRING, Types.TUPLE(Types.STRING, Types.INT))); //-2.广播配置流 //BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDS = configDS.broadcast(descriptor); BroadcastStream<Map<String, Tuple2<String, Integer>>> broadcastDS = configSource.broadcast(broadcastDesc); //-3.将事件流和广播流进行连接 //BroadcastConnectedStream<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>> connectDS =eventDS.connect(broadcastDS); SingleOutputStreamOperator<Tuple6<String, String, String, Integer, String, Integer>> result = clickSource.connect(broadcastDS) //-4.处理连接后的流-根据配置流补全事件流中的用户的信息 .process(new BroadcastProcessFunction<Tuple4<String, String, String, Integer>, Map<String, Tuple2<String, Integer>>, Tuple6<String, String, String, Integer, String, Integer>>() { @Override public void processElement(Tuple4<String, String, String, Integer> value, ReadOnlyContext ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception { //读取出来 f0 为 userId //事件流中读取用户 userId String userId = value.f0; //从ctx环境变量中通过 desc 读取出来广播状态 ReadOnlyBroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(broadcastDesc); //如果广播状态不为空,get(null) 获取出来 配置数据Tuple2 if (broadcastState != null) { Map<String, Tuple2<String, Integer>> map = broadcastState.get(null); //判断 map 不为空则 if (map != null) { Tuple2<String, Integer> stringIntegerTuple2 = map.get(userId); //取出姓名和年龄 //collect 收集 Tuple6 //3-4.处理(process)连接后的流-根据配置流补全事件流中的用户的信息,Tuple4和Tuple2合并 //处理每一条元素,processElement out.collect(Tuple6.of( userId, value.f1, value.f2, value.f3, stringIntegerTuple2.f0, stringIntegerTuple2.f1 )); } } } //value就是MySQLSource中每隔一段时间获取到的最新的map数据 //先根据状态描述器获取历史的广播状态 ctx.getBroadcastState(desc) @Override public void processBroadcastElement(Map<String, Tuple2<String, Integer>> value, Context ctx, Collector<Tuple6<String, String, String, Integer, String, Integer>> out) throws Exception { //再清空历史状态 broadcastState 数据 BroadcastState<Void, Map<String, Tuple2<String, Integer>>> broadcastState = ctx.getBroadcastState(broadcastDesc); //最后将最新的广播流数据放到 state 中(更新状态数据) broadcastState.put(null,value) broadcastState.clear(); broadcastState.put(null, value); } }); //处理广播中的元素 //4.sinks result.print(); //5.execute env.execute(); } /** * <userID, eventTime, eventType, productID> */ public static class MySource implements SourceFunction<Tuple4<String, String, String, Integer>> { private boolean isRunning = true; @Override public void run(SourceContext<Tuple4<String, String, String, Integer>> ctx) throws Exception { Random random = new Random(); SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); while (isRunning){ int id = random.nextInt(4) + 1; String user_id = "user_" + id; String eventTime = df.format(new Date()); String eventType = "type_" + random.nextInt(3); int productId = random.nextInt(4); ctx.collect(Tuple4.of(user_id,eventTime,eventType,productId)); Thread.sleep(500); } } @Override public void cancel() { isRunning = false; } } /** * <用户id,<姓名,年龄>> */ public static class MySQLSource extends RichSourceFunction<Map<String, Tuple2<String, Integer>>> { private boolean flag = true; private Connection conn = null; private PreparedStatement ps = null; private ResultSet rs = null; @Override public void open(Configuration parameters) throws Exception { conn = DriverManager.getConnection("jdbc:mysql://node3:3306/bigdata?useSSL=false", "root", "123456"); String sql = "select `userID`, `userName`, `userAge` from `user_info`"; ps = conn.prepareStatement(sql); } @Override public void run(SourceContext<Map<String, Tuple2<String, Integer>>> ctx) throws Exception { while (flag){ Map<String, Tuple2<String, Integer>> map = new HashMap<>(); ResultSet rs = ps.executeQuery(); while (rs.next()){ String userID = rs.getString("userID"); String userName = rs.getString("userName"); int userAge = rs.getInt("userAge"); //Map<String, Tuple2<String, Integer>> map.put(userID, Tuple2.of(userName,userAge)); } ctx.collect(map); Thread.sleep(5000);//每隔5s更新一下用户的配置信息! } } @Override public void cancel() { flag = false; } @Override public void close() throws Exception { if (conn != null) conn.close(); if (ps != null) ps.close(); if (rs != null) rs.close(); } } }
- 实时的数据流和 动态变化的数据库中的配置流 进行 connect 操作, 打印输出
双流 JOIN
- 多个数据流 DataStream 之间进行 JOIN 操作
- 双流 JOIN 分为两大类: Window 窗口的join, Interval 的 join
- Window窗口 分为 tumbling 窗口, sliding 窗口, session 窗口
- Interval 包括 下届, 上届
- 需求订单明细表和商品表每 5 秒中进行一个窗口 JOIN , 将结果落地并打印输出
- 开发步骤
package cn.itcast.flink.broadcast; import com.alibaba.fastjson.JSON; import lombok.Data; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.math.BigDecimal; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * Author itcast * Date 2021/6/24 9:40 * Desc TODO */ public class JoinDemo { public static void main(String[] args) throws Exception { // 创建流执行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); // 构建商品数据流 SingleOutputStreamOperator<Goods> goodsSource = env.addSource(new GoodsSource()).assignTimestampsAndWatermarks(new GoodsWatermark()); // 构建订单明细数据流 SingleOutputStreamOperator<OrderItem> orderItemSource = env.addSource(new OrderItemSource()).assignTimestampsAndWatermarks(new OrderItemWatermark()); // 订单表 join 商品表 订单表.goodsId===商品表.goodsId DataStream<FactOrderItem> result = orderItemSource.join(goodsSource) .where(o -> o.goodsId) .equalTo(g -> g.goodsId) /// 窗口为滚动窗口 5 秒 .window(TumblingEventTimeWindows.of(Time.seconds(5))) /// apply 实现 (OrderItem first, Goods second) -> factOrderItem .apply((OrderItem first, Goods second) -> { FactOrderItem factOrderItem = new FactOrderItem(); factOrderItem.setGoodsId(first.goodsId); factOrderItem.setGoodsName(second.goodsName); factOrderItem.setCount(new BigDecimal(first.count)); factOrderItem.setTotalMoney(new BigDecimal(first.count).multiply(second.goodsPrice)); return factOrderItem; }); //打印输出 result.print(); //执行环境 env.execute(); } //商品类实体类 @Data public static class Goods { private String goodsId; private String goodsName; private BigDecimal goodsPrice; public static List<Goods> GOODS_LIST; public static Random r; static { r = new Random(); GOODS_LIST = new ArrayList<>(); GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890))); GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000))); GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000))); GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800))); GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200))); GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500))); } public static Goods randomGoods() { int rIndex = r.nextInt(GOODS_LIST.size()); return GOODS_LIST.get(rIndex); } public Goods() { } public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) { this.goodsId = goodsId; this.goodsName = goodsName; this.goodsPrice = goodsPrice; } @Override public String toString() { return JSON.toJSONString(this); } } //订单明细实体类 @Data public static class OrderItem { private String itemId; private String goodsId; private Integer count; @Override public String toString() { return JSON.toJSONString(this); } } //关联结果,落地表的实体表 @Data public static class FactOrderItem { private String goodsId; private String goodsName; private BigDecimal count; private BigDecimal totalMoney; @Override public String toString() { return JSON.toJSONString(this); } } //构建一个商品Stream源(这个好比就是维表) public static class GoodsSource extends RichSourceFunction<Goods> { private Boolean isCancel; @Override public void open(Configuration parameters) throws Exception { isCancel = false; } @Override public void run(SourceContext sourceContext) throws Exception { while(!isCancel) { Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods)); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { isCancel = true; } } //构建订单明细Stream源 public static class OrderItemSource extends RichSourceFunction<OrderItem> { private Boolean isCancel; private Random r; @Override public void open(Configuration parameters) throws Exception { isCancel = false; r = new Random(); } @Override public void run(SourceContext sourceContext) throws Exception { while(!isCancel) { Goods goods = Goods.randomGoods(); OrderItem orderItem = new OrderItem(); orderItem.setGoodsId(goods.getGoodsId()); orderItem.setCount(r.nextInt(10) + 1); orderItem.setItemId(UUID.randomUUID().toString()); sourceContext.collect(orderItem); orderItem.setGoodsId("111"); sourceContext.collect(orderItem); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { isCancel = true; } } //构建水印分配器(此处为了简单),直接使用系统时间了 public static class GoodsWatermark implements WatermarkStrategy<Goods> { @Override public TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) { return (element, recordTimestamp) -> System.currentTimeMillis(); } @Override public WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<Goods>() { @Override public void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } @Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } }; } } public static class OrderItemWatermark implements WatermarkStrategy<OrderItem> { @Override public TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) { return (element, recordTimestamp) -> System.currentTimeMillis(); } @Override public WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<OrderItem>() { @Override public void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } @Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } }; } } }
- 需求
将商品数据和订单明细数据进行关联,间隔,上届 0(不包含),下届 -1(包含),统计数据并落地 - 开发步骤
package cn.itcast.flink.broadcast; import com.alibaba.fastjson.JSON; import lombok.Data; import org.apache.flink.api.common.eventtime.*; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; import java.math.BigDecimal; import java.util.ArrayList; import java.util.List; import java.util.Random; import java.util.UUID; import java.util.concurrent.TimeUnit; /** * Author itcast * Desc */ public class JoinDemo02 { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 构建商品数据流 DataStream<Goods> goodsDS = env.addSource(new GoodsSource11(), TypeInformation.of(Goods.class)).assignTimestampsAndWatermarks(new GoodsWatermark()); // 构建订单明细数据流 DataStream<OrderItem> orderItemDS = env.addSource(new OrderItemSource(), TypeInformation.of(OrderItem.class)).assignTimestampsAndWatermarks(new OrderItemWatermark()); // 进行关联查询 SingleOutputStreamOperator<FactOrderItem> factOrderItemDS = orderItemDS.keyBy(item -> item.getGoodsId()) .intervalJoin(goodsDS.keyBy(goods -> goods.getGoodsId())) .between(Time.seconds(-1), Time.seconds(0)) //上届的开区间,排除掉上届 [-1,0) .upperBoundExclusive() .process(new ProcessJoinFunction<OrderItem, Goods, FactOrderItem>() { @Override public void processElement(OrderItem left, Goods right, Context ctx, Collector<FactOrderItem> out) throws Exception { FactOrderItem factOrderItem = new FactOrderItem(); factOrderItem.setGoodsId(right.getGoodsId()); factOrderItem.setGoodsName(right.getGoodsName()); factOrderItem.setCount(new BigDecimal(left.getCount())); factOrderItem.setTotalMoney(right.getGoodsPrice().multiply(new BigDecimal(left.getCount()))); out.collect(factOrderItem); } }); factOrderItemDS.print(); env.execute("Interval JOIN"); } //商品类 @Data public static class Goods { private String goodsId; private String goodsName; private BigDecimal goodsPrice; public static List<Goods> GOODS_LIST; public static Random r; static { r = new Random(); GOODS_LIST = new ArrayList<>(); GOODS_LIST.add(new Goods("1", "小米12", new BigDecimal(4890))); GOODS_LIST.add(new Goods("2", "iphone12", new BigDecimal(12000))); GOODS_LIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000))); GOODS_LIST.add(new Goods("4", "Thinkpad X1", new BigDecimal(9800))); GOODS_LIST.add(new Goods("5", "MeiZu One", new BigDecimal(3200))); GOODS_LIST.add(new Goods("6", "Mate 40", new BigDecimal(6500))); } public static Goods randomGoods() { int rIndex = r.nextInt(GOODS_LIST.size()); return GOODS_LIST.get(rIndex); } public Goods() { } public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) { this.goodsId = goodsId; this.goodsName = goodsName; this.goodsPrice = goodsPrice; } @Override public String toString() { return JSON.toJSONString(this); } } //订单明细类 @Data public static class OrderItem { private String itemId; private String goodsId; private Integer count; @Override public String toString() { return JSON.toJSONString(this); } } //关联结果 @Data public static class FactOrderItem { private String goodsId; private String goodsName; private BigDecimal count; private BigDecimal totalMoney; @Override public String toString() { return JSON.toJSONString(this); } } //构建一个商品Stream源(这个好比就是维表) public static class GoodsSource11 extends RichSourceFunction { private Boolean isCancel; @Override public void open(Configuration parameters) throws Exception { isCancel = false; } @Override public void run(SourceContext sourceContext) throws Exception { while(!isCancel) { Goods.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods)); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { isCancel = true; } } //构建订单明细Stream源 public static class OrderItemSource extends RichSourceFunction { private Boolean isCancel; private Random r; @Override public void open(Configuration parameters) throws Exception { isCancel = false; r = new Random(); } @Override public void run(SourceContext sourceContext) throws Exception { while(!isCancel) { Goods goods = Goods.randomGoods(); OrderItem orderItem = new OrderItem(); orderItem.setGoodsId(goods.getGoodsId()); orderItem.setCount(r.nextInt(10) + 1); orderItem.setItemId(UUID.randomUUID().toString()); sourceContext.collect(orderItem); orderItem.setGoodsId("111"); sourceContext.collect(orderItem); TimeUnit.SECONDS.sleep(1); } } @Override public void cancel() { isCancel = true; } } //构建水印分配器(此处为了简单),直接使用系统时间了 public static class GoodsWatermark implements WatermarkStrategy<Goods> { @Override public TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) { return (element, recordTimestamp) -> System.currentTimeMillis(); } @Override public WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<Goods>() { @Override public void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } @Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } }; } } public static class OrderItemWatermark implements WatermarkStrategy<OrderItem> { @Override public TimestampAssigner<OrderItem> createTimestampAssigner(TimestampAssignerSupplier.Context context) { return (element, recordTimestamp) -> System.currentTimeMillis(); } @Override public WatermarkGenerator<OrderItem> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<OrderItem>() { @Override public void onEvent(OrderItem event, long eventTimestamp, WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } @Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } }; } } }
Streaming File Sink
- Sink 落地
- Sink 分类
- sink MySQL
- sink Kafka
- sink Redis
- sink 控制台
- Sink 落地到分布式文件系统上 HDFS 上
- Sink 到文件系统 Streaming File Sink 落地使用应用场景
- 实时数据仓库
- 小时级的数据分析 等
- 抽取数据
- 需求
通过在 socket 数据流中将数据定时 2秒钟写入到 hdfs 上。 - 开发步骤
package cn.itcast.flink.broadcast; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.common.time.Time; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; /** * Author itcast * Date 2021/6/24 10:52 * Desc TODO */ public class StreamingFileSinkDemo { public static void main(String[] args) throws Exception { //1.初始化流计算运行环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //2.设置Checkpoint(10s)周期性启动 和 stateBackend 存储路径 // Sink保证仅一次语义使用 checkpoint 和 二段提交 env.enableCheckpointing(10000); env.setStateBackend(new FsStateBackend("file:///d:/chk/")); //4.接入socket数据源,获取数据 DataStreamSource<String> source = env.socketTextStream("node1", 9999); //5.创建Streamingfilesink对象 OutputFileConfig config = OutputFileConfig .builder() .withPartPrefix("crm") .withPartSuffix(".txt") .build(); //5-1. 创建输出文件配置,指定输出路径 /FlinkStreamFileSink/parquet StreamingFileSink sink = StreamingFileSink.forRowFormat(new Path("hdfs://node1:8020/FlinkStreamFileSink/parquet"), new SimpleStringEncoder<String>("UTF-8")) // sink-kafka new FlinkKafkaProducer //5-2.StreamingFileSink 行格式化 , withBucketAssigner->DateTimeBucketAssigner .withBucketAssigner(new DateTimeBucketAssigner()) //withRollingPolicy -> 默认滚筒策略 .withRollingPolicy(DefaultRollingPolicy.builder() .withMaxPartSize(128 * 1024 * 1024) .withRolloverInterval(Time.seconds(2).toMilliseconds()) .withInactivityInterval(Time.seconds(2).toMilliseconds()) .build()) //withOutputFileConfig -> 输出文件的配置 .withOutputFileConfig(config) .build(); //6.设置输出 sink source.print(); source.addSink(sink).setParallelism(1); //7.执行任务 env.execute(); } }
Sink 行格式化 , withBucketAssigner->DateTimeBucketAssigner .withBucketAssigner(new DateTimeBucketAssigner()) //withRollingPolicy -> 默认滚筒策略 .withRollingPolicy(DefaultRollingPolicy.builder() .withMaxPartSize(128 * 1024 * 1024) .withRolloverInterval(Time.seconds(2).toMilliseconds()) .withInactivityInterval(Time.seconds(2).toMilliseconds()) .build()) //withOutputFileConfig -> 输出文件的配置 .withOutputFileConfig(config) .build(); //6.设置输出 sink source.print(); source.addSink(sink).setParallelism(1); //7.执行任务 env.execute(); } }