FLINK Producer数据写入到kafka 方法一

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: FLINK Producer数据写入到kafka

package kafkaproducer;
import com.alibaba.fastjson.JSON;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.awt.*;
import java.text.DecimalFormat;
import java.util.Properties;
import java.util.Random;
public class Producer {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //读取文件的方式写入kafka
//        DataStreamSource<String> lines = env.readTextFile("file:///d:/test.txt");
        DataStreamSource<String> lines = env.addSource(new SourceFunction<String>() {
            private static final long serialVersionUID = 1L;
            private volatile boolean isRunning = true;
            int count = 1;
            DecimalFormat userDecimal = new DecimalFormat("000");
            DecimalFormat typeDecimal = new DecimalFormat("0");
            String[] typeList = {"pv", "pu", "cart"};
            String[] cityList = {"北京市", "天津市", "上海市", "深圳市", "重庆市"};
            @Override
            public void run(SourceContext<String> out) throws Exception {
//                无限循环
//                while (isRunning){
                //这里修改需要的调数,方便进行数据统计
                while (count <= 100) {
                    int r_user = (int) (Math.round(Math.random() * 9 + 1));
                    int r_activity = (int) (Math.round(Math.random() * 4 + 1));
                    int p_type = (int) (Math.random() * typeList.length);
                    int t_city = (int) (Math.random() * cityList.length);
                    String user = "U" + userDecimal.format(r_user);
                    String activity = "A" + typeDecimal.format(r_activity);
                    long timeStramp = System.currentTimeMillis();
                    int pageview = (int) (Math.round(Math.random() * 4 + 1));
                    String typeP = typeList[p_type];
                    String city = cityList[t_city];
                    out.collect(user + " " + activity + " " + timeStramp + " " + pageview + " " + typeP + " " + city);
                    count++;
                }
            }
            @Override
            public void cancel() {
                isRunning = false;
            }
        });
        String groupID = "test";
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
        lines.addSink(new FlinkKafkaProducer<>(
                groupID,
                new SimpleStringSchema(),
                prop
        ));
        env.execute("Producer");
    }
}
相关文章
|
1月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
129 0
|
29天前
|
运维 数据处理 Apache
数据实时计算产品对比测评报告:阿里云实时计算Flink版
数据实时计算产品对比测评报告:阿里云实时计算Flink版
|
30天前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
1月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
55 1
|
1月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
126 0
|
1月前
|
消息中间件 资源调度 大数据
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
大数据-112 Flink DataStreamAPI 程序输入源 DataSource 基于文件、集合、Kafka连接器
38 0
|
1月前
|
SQL 分布式计算 大数据
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
45 0
|
1月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
44 1
|
3月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
262 9
|
3月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
66 3