开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

入库是怎么处理的 没有flink基础 不知道怎么入手?

入库是怎么处理的 没有flink基础 不知道怎么入手?

展开
收起
爱喝咖啡嘿 2022-12-12 10:11:02 360 0
2 条回答
写回答
取消 提交回答
  • 把kafka那一部分去掉 换成flinkjdbc,可以参照这个https://blog.csdn.net/haoheiao/article/details/126519588——此答案整理自钉群“Flink CDC 社区”

    2022-12-12 11:37:01
    赞同 展开评论 打赏
  • 天下风云出我辈,一入江湖岁月催,皇图霸业谈笑中,不胜人生一场醉。

    利用flink自带的JDBCOutputFormat进行入库

    public class DoveBootStrap {

    public static void main(String[] args) throws Exception {
        TimeInterval timer = DateUtil.timer();
        JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
                .setDrivername("com.mysql.cj.jdbc.Driver")
                .setDBUrl("jdbc:mysql://localhost:3306/sms?user=root&password=123456&serverTimezone=UTC")
                .setQuery("insert into sea_dove1 (id,send_time,phone,msg,business,source) values (?,?,?,?,?,?)")
                //设置为每有1000条数据就提交一次,这里可以不要也行
                .setBatchInterval(1000)
                .finish();
        //初始化批处理执行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //文件地址
        String filePath = "D:\\log\\seadove\\10yue.json";
        //读取文件内容
        DataSource<String> dataSource = env.readTextFile(filePath);
        //进行了数据的筛选
        FlatMapOperator<String, SeaDove> mapOperator = dataSource.flatMap(new SeaDoveFlatMapFunction());
        //筛选内容赋值
        MapOperator<SeaDove, Row> map = mapOperator.map(new MapFunction<SeaDove, Row>() {
    
            @Override
            public Row  map(SeaDove value) throws Exception {
                Row row = new Row(6);
                row.setField(0, SnowFlakeFactory.getSnowFlakeFromCache().nextId());
                row.setField(1, value.getSend_time());
                row.setField(2, value.getPhone());
                row.setField(3, value.getMsg());
                row.setField(4, value.getBusiness());
                row.setField(5, value.getSource());
                return row;
            }
        });
        //输出
        map.output(jdbcOutput);
        env.execute();
        System.out.println("JDBCOutputFormat 耗时:"+timer.interval());
    }
    

    }

    2022-12-12 10:36:33
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载