1.参考Flink写mysql
https://blog.csdn.net/weixin_43291055/article/details/100972694
2.新增批量写入函数
xml
<insert id="insertBatch" parameterType="java.util.List"> insert into sku_info (sku,num) values <foreach collection="list" item="info" index="index" separator=","> (#{info.skuId}, #{info.num}) </foreach> </insert>
dao
void insertBatch(List<Info> infos);
3.批量写入处理,采用Flink窗口函数,countWindow
package com.jd.xq; import com.google.common.collect.Lists; import com.jd.xq.mapper.LineMapper; import com.jd.xq.model.SkuInfo; import com.jd.xq.sink.SinkSku; import com.jd.xq.source.TestSource; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; import java.io.InputStream; import java.util.ArrayList; import java.util.List; /** * @author * @Date 2019-08-16 15:45 **/ public class StartJob { public static void main(String[] args) { try { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.enableCheckpointing(5000); // 非常关键,一定要设置启动检查点!! env.getConfig().setGlobalJobParameters(loadConfig("config.properties")); DataStream<String> text = env.addSource(new TestSource()); DataStream stream = text.flatMap(new LineMapper()); stream.countWindowAll(3).apply(new AllWindowFunction<SkuInfo, List<SkuInfo>, Window>() { @Override public void apply(Window window, Iterable<SkuInfo> iterable, Collector<List<SkuInfo>> collector) throws Exception { ArrayList<SkuInfo> skuInfos = Lists.newArrayList(iterable); if (skuInfos.size() > 0) { collector.collect(skuInfos); } } }).addSink(new SinkSku()); env.execute("WordCount from Kafka data"); } catch (Exception e) { System.out.println(e); } } public static ParameterTool loadConfig(String configFileName) throws Exception { try (InputStream is = StartJob.class.getClassLoader().getResourceAsStream(configFileName)) { return ParameterTool.fromPropertiesFile(is); } } }
TestSource-----source(随机生成数据)
public class TestSource extends RichSourceFunction<String> { private volatile boolean isRunning = true; @Override public void open(Configuration parameters) throws Exception { } @Override public void run(SourceContext<String> sourceContext) throws Exception { Random random = new Random(); while (isRunning) { Thread.sleep((getRuntimeContext().getIndexOfThisSubtask() + 1) * 1000); //String key = "类别" + (char) ('A' + random.nextInt(3)); int sku = random.nextInt(10); int value = random.nextInt(10) + 1; String str = sku + "," + value; //System.out.println(str); sourceContext.collect(str); } } @Override public void cancel() { isRunning = false; } }
LineMapper----数据预处理拆分
package com.jd.xq.mapper; import com.alibaba.fastjson.JSONObject; import com.jd.xq.model.Info; import com.jd.xq.model.SkuInfo; import com.jd.xq.model.TestModel; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; /** * @author duanxiaoqiu * @Date 2019-09-10 20:50 **/ public class LineMapper extends RichFlatMapFunction<String, SkuInfo> { @Override public void open(Configuration parameters) { ExecutionConfig executionConfig = getRuntimeContext().getExecutionConfig(); ParameterTool params = (ParameterTool) executionConfig.getGlobalJobParameters(); //System.out.println(params.get("xq.name")); } public void flatMap(String s, Collector<SkuInfo> collector) { try { String[] arr = s.split(","); SkuInfo model = new SkuInfo(); model.setSkuId(arr[0]); model.setNum(Integer.parseInt(arr[1])); //incr(model.getSku(), model.getNum()); System.out.println(JSONObject.toJSONString(model)); collector.collect(model); } catch (Exception e) { System.out.println(); } } }
sink,批量写mysql
public class SinkSku extends RichSinkFunction<List<SkuInfo>> { private SkuMapper skuMapper; @Override public void open(Configuration parameters) { try { skuMapper = SkuDao.getInstance(); } catch (Exception e) { System.out.println("====open===="); } } @Override public void invoke(List<SkuInfo> value, Context context) { try { System.out.println(JSONObject.toJSONString(value)); skuMapper.insertBatch(value); } catch (Exception e) { System.out.println("====invoke e:" + e); } } }
4.核心代码解析
stream.countWindowAll(3).apply(new AllWindowFunction<SkuInfo, List<SkuInfo>, Window>() { @Override public void apply(Window window, Iterable<SkuInfo> iterable, Collector<List<SkuInfo>> collector) throws Exception { ArrayList<SkuInfo> skuInfos = Lists.newArrayList(iterable); if (skuInfos.size() > 0) { collector.collect(skuInfos); } } }).addSink(new SinkSku());
主要利用了FLink的window函数,将三个数据划分一组,每批次插入3个数据,即:按照个数划分窗口
当然也可以按照其他划分方式,比如:按照每分钟批次,即:按照时间划分窗口
stream.timeWindowAll(Time.seconds(2)).apply(new AllWindowFunction<SkuInfo, List<SkuInfo>, TimeWindow>() { @Override public void apply(TimeWindow window, Iterable<SkuInfo> iterable, Collector<List<SkuInfo>> collector) { ArrayList<SkuInfo> skuInfos = null; try { skuInfos = Lists.newArrayList(iterable); if (skuInfos.size() > 0) { collector.collect(skuInfos); } } catch (Exception e) { System.out.println("==window e:" + e); } } }).addSink(new SinkSku());
参考
https://blog.csdn.net/aA518189/article/details/85250713