FLINK Producer数据写入到kafka 方法三

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: FLINK Producer数据写入到kafka

package kafkaproducer;
public class Data {
    public String user;
    public String activity;
    public long timeStramp;
    public int pageViews;
    public String typeP;
    public String city;
    public Data() {
    }
    public Data(String user, String activity, long timeStramp, int pageViews, String typeP, String city) {
        this.user = user;
        this.activity = activity;
        this.timeStramp = timeStramp;
        this.pageViews = pageViews;
        this.typeP = typeP;
        this.city = city;
    }
    @Override
    public String toString() {
        return "data{" +
                "user='" + user + '\'' +
                ", activity='" + activity + '\'' +
                ", timeStramp=" + timeStramp +
                ", pageViews=" + pageViews +
                ", typeP='" + typeP + '\'' +
                ", city='" + city + '\'' +
                '}';
    }
    public String getUser() {
        return user;
    }
    public void setUser(String user) {
        this.user = user;
    }
    public String getActivity() {
        return activity;
    }
    public void setActivity(String activity) {
        this.activity = activity;
    }
    public long getTimeStramp() {
        return timeStramp;
    }
    public void setTimeStramp(long timeStramp) {
        this.timeStramp = timeStramp;
    }
    public int getPageViews() {
        return pageViews;
    }
    public void setPageViews(int pageViews) {
        this.pageViews = pageViews;
    }
    public String getTypeP() {
        return typeP;
    }
    public void setTypeP(String typeP) {
        this.typeP = typeP;
    }
    public String getCity() {
        return city;
    }
    public void setCity(String city) {
        this.city = city;
    }
}
package kafkaproducer;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.text.DecimalFormat;
import java.util.Properties;
public class DataProducer {
    //定义broker
    public static final String broker_list = "master:9092,slave1:9092,slave2:9092";
    //定义topic 和kafka正在用的一致
    public static final String topic = "test";
    public static void writerTest() throws Exception {
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", broker_list);
        prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //key 序列化
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value 序列化
        KafkaProducer producer = new KafkaProducer<String, String>(prop);
        //位数不够,自动填充补0
        DecimalFormat userDecimal = new DecimalFormat("000");
        DecimalFormat typeDecimal = new DecimalFormat("0");
        String[] typeList = {"pv", "pu", "cart"};
        String[] cityList = {"北京市", "天津市", "上海市", "深圳市", "重庆市", "河北省", "湖北省", "河南省", "山东省"};
        //获取1-10数
        int r_user = (int)(Math.round(Math.random() * 9 + 1));
        int r_activity = (int)(Math.round(Math.random() * 4 + 1));
        int p_type = (int)(Math.random() * typeList.length);
        int t_city = (int)(Math.random() * cityList.length);
        //对用户进行组合
        String user = "U" + userDecimal.format(r_user);
        String activity = "A" + typeDecimal.format(r_activity);
        //获取当前时间戳
        long timeStramp = System.currentTimeMillis();
        int pageview = (int) (Math.round(Math.random() * 4 + 1));
        String typeP = typeList[p_type];
        String city = cityList[t_city];
        Data data = new Data();
        data.setUser(user);
        data.setActivity(activity);
        data.setPageViews(pageview);
        data.setCity(city);
        data.setTypeP(typeP);
        data.setTimeStramp(timeStramp);
        //直接用数据写入到kafka
 ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, null, (data.user + " " + data.activity + " " + data.pageViews + " " + data.city + " " + data.typeP + " " + data.timeStramp));
        producer.send(record);
 System.out.println("发送数据: " + (data.user + " " + data.activity + " " + data.pageViews + " " + data.city + " " + data.typeP + " " + data.timeStramp));
        producer.flush();
    }
    public static void main(String[] args) throws Exception {
        //无限循环,保证数据源不断流
        while (true) {
            Thread.sleep(300);
            writerTest();
        }
    }
}

POM.XML添加

        <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka_2.11</artifactId>
      <version>1.9.3</version>
    </dependency>
        <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.68</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.2.0</version>
    </dependency>
相关文章
|
3月前
|
消息中间件 关系型数据库 MySQL
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
大数据-117 - Flink DataStream Sink 案例:写出到MySQL、写出到Kafka
227 0
|
2月前
|
存储 监控 数据处理
flink 向doris 数据库写入数据时出现背压如何排查?
本文介绍了如何确定和解决Flink任务向Doris数据库写入数据时遇到的背压问题。首先通过Flink Web UI和性能指标监控识别背压,然后从Doris数据库性能、网络连接稳定性、Flink任务数据处理逻辑及资源配置等方面排查原因,并通过分析相关日志进一步定位问题。
189 61
|
3月前
|
运维 数据处理 Apache
数据实时计算产品对比测评报告:阿里云实时计算Flink版
数据实时计算产品对比测评报告:阿里云实时计算Flink版
|
3月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
89 1
|
3月前
|
消息中间件 NoSQL Kafka
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
大数据-116 - Flink DataStream Sink 原理、概念、常见Sink类型 配置与使用 附带案例1:消费Kafka写到Redis
206 0
|
3月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
111 1
|
3月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
63 1
|
5月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
364 9
|
5月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
87 3
|
5月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
188 0