FLINK Producer数据写入到kafka 方法一

简介: FLINK Producer数据写入到kafka

package kafkaproducer;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.awt.*;
import java.text.DecimalFormat;
import java.util.Properties;
import java.util.Random;
public class Producer {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //读取文件的方式写入kafka
//        DataStreamSource<String> lines = env.readTextFile("file:///d:/test.txt");
        DataStreamSource<String> lines = env.addSource(new SourceFunction<String>() {
            private static final long serialVersionUID = 1L;
            private volatile boolean isRunning = true;
            int count = 1;
            DecimalFormat userDecimal = new DecimalFormat("000");
            DecimalFormat typeDecimal = new DecimalFormat("0");
            String[] typeList = {"pv", "pu", "cart"};
            String[] cityList = {"北京市", "天津市", "上海市", "深圳市", "重庆市"};
            @Override
            public void run(SourceContext<String> out) throws Exception {
//                无限循环
//                while (isRunning){
                //这里修改需要的调数,方便进行数据统计
                while (count <= 100) {
                    int r_user = (int) (Math.round(Math.random() * 9 + 1));
                    int r_activity = (int) (Math.round(Math.random() * 4 + 1));
                    int p_type = (int) (Math.random() * typeList.length);
                    int t_city = (int) (Math.random() * cityList.length);
                    String user = "U" + userDecimal.format(r_user);
                    String activity = "A" + typeDecimal.format(r_activity);
                    long timeStramp = System.currentTimeMillis();
                    int pageview = (int) (Math.round(Math.random() * 4 + 1));
                    String typeP = typeList[p_type];
                    String city = cityList[t_city];
                    out.collect(user + " " + activity + " " + timeStramp + " " + pageview + " " + typeP + " " + city);
                    count++;
                }
            }
            @Override
            public void cancel() {
                isRunning = false;
            }
        });
        String groupID = "test";
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
        lines.addSink(new FlinkKafkaProducer<>(
                groupID,
                new SimpleStringSchema(),
                prop
        ));
        env.execute("Producer");
    }
}
相关文章
消息中间件 存储 传感器
269 0
|
5月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
本文整理自阿里云的高级技术专家、Apache Flink PMC 成员李麟老师在 Flink Forward Asia 2025 新加坡[1]站 —— 实时 AI 专场中的分享。将带来关于 Flink 2.1 版本中 SQL 在实时数据处理和 AI 方面进展的话题。
358 0
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
|
5月前
|
SQL 人工智能 JSON
Flink 2.1 SQL:解锁实时数据与AI集成,实现可扩展流处理
简介:本文整理自阿里云高级技术专家李麟在Flink Forward Asia 2025新加坡站的分享,介绍了Flink 2.1 SQL在实时数据处理与AI融合方面的关键进展,包括AI函数集成、Join优化及未来发展方向,助力构建高效实时AI管道。
875 43
|
5月前
|
SQL 关系型数据库 Apache
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
本文将深入解析 Flink-Doris-Connector 三大典型场景中的设计与实现,并结合 Flink CDC 详细介绍了整库同步的解决方案,助力构建更加高效、稳定的实时数据处理体系。
2202 0
从 Flink 到 Doris 的实时数据写入实践 —— 基于 Flink CDC 构建更实时高效的数据集成链路
|
6月前
|
存储 消息中间件 搜索推荐
京东零售基于Flink的推荐系统智能数据体系
摘要:本文整理自京东零售技术专家张颖老师,在 Flink Forward Asia 2024 生产实践(二)专场中的分享,介绍了基于Flink构建的推荐系统数据,以及Flink智能体系带来的智能服务功能。内容分为以下六个部分: 推荐系统架构 索引 样本 特征 可解释 指标 Tips:关注「公众号」回复 FFA 2024 查看会后资料~
433 1
京东零售基于Flink的推荐系统智能数据体系
|
7月前
|
消息中间件 SQL 关系型数据库
Flink CDC + Kafka 加速业务实时化
Flink CDC 是一种支持流批一体的分布式数据集成工具,通过 YAML 配置实现数据传输过程中的路由与转换操作。它已从单一数据源的 CDC 数据流发展为完整的数据同步解决方案,支持 MySQL、Kafka 等多种数据源和目标端(如 Delta Lake、Iceberg)。其核心功能包括多样化数据输入链路、Schema Evolution、Transform 和 Routing 模块,以及丰富的监控指标。相比传统 SQL 和 DataStream 作业,Flink CDC 提供更灵活的 Schema 变更控制和原始 binlog 同步能力。
|
8月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
直播预告|Kafka+Flink 双引擎实战:手把手带你搭建分布式实时分析平台!
251 12
|
8月前
|
消息中间件 运维 Kafka
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
在数字化转型中,企业亟需从海量数据中快速提取价值并转化为业务增长动力。5月15日19:00-21:00,阿里云三位技术专家将讲解Kafka与Flink的强强联合方案,帮助企业零门槛构建分布式实时分析平台。此组合广泛应用于实时风控、用户行为追踪等场景,具备高吞吐、弹性扩缩容及亚秒级响应优势。直播适合初学者、开发者和数据工程师,参与还有机会领取定制好礼!扫描海报二维码或点击链接预约直播:[https://developer.aliyun.com/live/255088](https://developer.aliyun.com/live/255088)
582 35
直播预告|Kafka+Flink双引擎实战:手把手带你搭建分布式实时分析平台!
|
9月前
|
存储 消息中间件 Kafka
基于 Flink 的中国电信星海时空数据多引擎实时改造
本文整理自中国电信集团大数据架构师李新虎老师在Flink Forward Asia 2024的分享,围绕星海时空智能系统展开,涵盖四个核心部分:时空数据现状、实时场景多引擎化、典型应用及未来展望。系统日处理8000亿条数据,具备亚米级定位能力,通过Flink多引擎架构解决数据膨胀与响应时效等问题,优化资源利用并提升计算效率。应用场景包括运动状态识别、个体行为分析和群智感知,未来将推进湖仓一体改造与三维时空服务体系建设,助力数字化转型与智慧城市建设。
896 3
基于 Flink 的中国电信星海时空数据多引擎实时改造
|
10月前
|
Oracle 关系型数据库 Java
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山
本文介绍通过Flink CDC实现Oracle数据实时同步至崖山数据库(YashanDB)的方法,支持全量与增量同步,并涵盖新增、修改和删除的DML操作。内容包括环境准备(如JDK、Flink版本等)、Oracle日志归档启用、用户权限配置、增量日志记录设置、元数据迁移、Flink安装与配置、生成Flink SQL文件、Streampark部署,以及创建和启动实时同步任务的具体步骤。适合需要跨数据库实时同步方案的技术人员参考。
【YashanDB知识库】Flink CDC实时同步Oracle数据到崖山