前言
大家好,我是ChinaManor,直译过来就是中国码农的意思,我希望自己能成为国家复兴道路的铺路人,大数据领域的耕耘者,平凡但不甘于平庸的人。
这次是上篇文章的续集,最新的Flink版本大大简化了之前复杂的写法~
首先准备模拟数据:
//1、准备配置文件 Properties props = new Properties(); props.put("bootstrap.servers", "node1.itcast.cn:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("KafkaCustomPartitioner.class", "test.KafkaCustomPartitioner"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Kafka的一系列配置,可以从官网直接copy过来@~@~
然后正式生产模拟数据:
//2、创建KafkaProducer KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(props); String[] categorys = {"女装", "男装", "图书", "家电", "洗护", "美妆", "运动", "游戏", "户外", "家具", "乐器", "办公"}; Random random = new Random(); while (true){ //随机生成分类和金额 int index = random.nextInt(categorys.length);//[0~length) ==> [0~length-1] String category = categorys[index];//获取的随机分类 double price = random.nextDouble() * 100;//注意nextDouble生成的是[0~1)之间的随机数,*100之后表示[0~100) CategoryPojo categoryPojo = new CategoryPojo(category, price,System.currentTimeMillis()); String data = JSON.toJSONString(categoryPojo); //3、发送数据 kafkaProducer.send(new ProducerRecord<String, String>("topicDemo",data)); System.out.println("数据是"+data); try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } }
这里的实体类用Lombok,比较简单:
@Data @AllArgsConstructor @NoArgsConstructor public static class CategoryPojo { private String category;//分类名称 private double price;//该分类总销售额 private long time;// 截止到当前时间的时间,本来应该是EventTime,但是我们这里简化了直接用当前系统时间即可 }
有了数据写入Kafka,我们开始消费“她”:
设置一下Flink运行环境:
//TODO 1.设置环境env StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //并行度为1,表示不分区 env.setParallelism(1);
配置Kafka相关并从哪里开始读offset
//TODO 2设置Kafka相关参数 Properties props = new Properties(); //kafka的地址,消费组名 props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.88.161:9092"); props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,"category"); //Flink设置kafka的offset,从最新的开始 FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>( "myDemo", new SimpleStringSchema(), props ); consumer.setStartFromLatest(); consumer.setCommitOffsetsOnCheckpoints(true);
第3步解析数据源并测试:
DataStreamSource<String> source = env.addSource(consumer); SingleOutputStreamOperator<Order> mapDS = source.map(new MapFunction<String, Order>() { @Override public Order map(String s) throws Exception { JSONObject jsonObject = JSON.parseObject(s); Order order = JSON.toJavaObject(jsonObject, Order.class); return order; } }); //测试一下 mapDS.print();
success!
最后存入Mysql
//sink输出到Mysql result.addSink(JdbcSink.sink( "INSERT INTO t_order(category,price,time) values(?,?,?)", (ps,order)->{ ps.setString(1,order.category); ps.setDouble(2,order.price); ps.setLong(3,order.time); }, //批处理 JdbcExecutionOptions.builder() .withBatchSize(1000) .withBatchIntervalMs(200) .withMaxRetries(5) .build(), new JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl("jdbc:mysql://192.168.88.163:3306/bigdata?characterEncoding=utf-8") //jdbc .withUsername("root") //配置用户名 .withPassword("123456") //密码 .withDriverName("com.mysql.jdbc.Driver") //驱动类 .build() )); env.execute();
以上就是全部内容了,感谢您的阅读!
另外补充一些不成熟的代码:双流Join
//双流Join SingleOutputStreamOperator<Order> order1watermark = mapDS.assignTimestampsAndWatermarks(new OrderItem1WaterMark()); SingleOutputStreamOperator<Order> order2watermark = mapDS.assignTimestampsAndWatermarks(new OrderItem2WaterMark()); //商品ID=订单ID final DataStream<Order> result = order1watermark.join(order2watermark) .where(o1 -> o1.category) .equalTo(o2 -> o2.category) .window(TumblingEventTimeWindows.of(Time.seconds(5))) .apply((o1, o2) -> { Order order = new Order(); order.setCategory(o1.category); order.setPrice(o2.price); order.setTime(o2.time); return order; }); // result.print();
水印机制,简化了直接使用系统时间
//水印机制 public static class OrderItem2WaterMark implements WatermarkStrategy<Order>{ @Override public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<Order>() { @Override public void onEvent(Order order, long l, WatermarkOutput watermarkOutput) { watermarkOutput.emitWatermark(new Watermark(System.currentTimeMillis())); } @Override public void onPeriodicEmit(WatermarkOutput watermarkOutput) { watermarkOutput.emitWatermark(new Watermark(System.currentTimeMillis())); } }; } @Override public TimestampAssigner<Order> createTimestampAssigner(TimestampAssignerSupplier.Context context) { return (element,recordTimestamp)->System.currentTimeMillis(); } } public static class OrderItem1WaterMark implements WatermarkStrategy<Order> { @Override public TimestampAssigner<Order> createTimestampAssigner(TimestampAssignerSupplier.Context context) { return (element, recordTimestamp) -> System.currentTimeMillis(); } @Override public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) { return new WatermarkGenerator<Order>() { @Override public void onEvent(Order event, long eventTimestamp, WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } @Override public void onPeriodicEmit(WatermarkOutput output) { output.emitWatermark(new Watermark(System.currentTimeMillis())); } }; } }
好了,终于完成了✅
双流join不怎么会写,慢慢来吧,
毕竟对于考60分的人,下一次考80分已经是极大的进步~~
总结
以上便是Flink数据写入Kafka+从Kafka存入Mysql(二)~
喜欢的小伙伴欢迎一键三连
!!!
我是manor
,一枚相信技术改变世界的码农,我们下期再见~