FLINK Producer数据写入到kafka 方法三

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: FLINK Producer数据写入到kafka

package kafkaproducer;
public class Data {
    public String user;
    public String activity;
    public long timeStramp;
    public int pageViews;
    public String typeP;
    public String city;
    public Data() {
    }
    public Data(String user, String activity, long timeStramp, int pageViews, String typeP, String city) {
        this.user = user;
        this.activity = activity;
        this.timeStramp = timeStramp;
        this.pageViews = pageViews;
        this.typeP = typeP;
        this.city = city;
    }
    @Override
    public String toString() {
        return "data{" +
                "user='" + user + '\'' +
                ", activity='" + activity + '\'' +
                ", timeStramp=" + timeStramp +
                ", pageViews=" + pageViews +
                ", typeP='" + typeP + '\'' +
                ", city='" + city + '\'' +
                '}';
    }
    public String getUser() {
        return user;
    }
    public void setUser(String user) {
        this.user = user;
    }
    public String getActivity() {
        return activity;
    }
    public void setActivity(String activity) {
        this.activity = activity;
    }
    public long getTimeStramp() {
        return timeStramp;
    }
    public void setTimeStramp(long timeStramp) {
        this.timeStramp = timeStramp;
    }
    public int getPageViews() {
        return pageViews;
    }
    public void setPageViews(int pageViews) {
        this.pageViews = pageViews;
    }
    public String getTypeP() {
        return typeP;
    }
    public void setTypeP(String typeP) {
        this.typeP = typeP;
    }
    public String getCity() {
        return city;
    }
    public void setCity(String city) {
        this.city = city;
    }
}
package kafkaproducer;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.text.DecimalFormat;
import java.util.Properties;
public class DataProducer {
    //定义broker
    public static final String broker_list = "master:9092,slave1:9092,slave2:9092";
    //定义topic 和kafka正在用的一致
    public static final String topic = "test";
    public static void writerTest() throws Exception {
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", broker_list);
        prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //key 序列化
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value 序列化
        KafkaProducer producer = new KafkaProducer<String, String>(prop);
        //位数不够,自动填充补0
        DecimalFormat userDecimal = new DecimalFormat("000");
        DecimalFormat typeDecimal = new DecimalFormat("0");
        String[] typeList = {"pv", "pu", "cart"};
        String[] cityList = {"北京市", "天津市", "上海市", "深圳市", "重庆市", "河北省", "湖北省", "河南省", "山东省"};
        //获取1-10数
        int r_user = (int)(Math.round(Math.random() * 9 + 1));
        int r_activity = (int)(Math.round(Math.random() * 4 + 1));
        int p_type = (int)(Math.random() * typeList.length);
        int t_city = (int)(Math.random() * cityList.length);
        //对用户进行组合
        String user = "U" + userDecimal.format(r_user);
        String activity = "A" + typeDecimal.format(r_activity);
        //获取当前时间戳
        long timeStramp = System.currentTimeMillis();
        int pageview = (int) (Math.round(Math.random() * 4 + 1));
        String typeP = typeList[p_type];
        String city = cityList[t_city];
        Data data = new Data();
        data.setUser(user);
        data.setActivity(activity);
        data.setPageViews(pageview);
        data.setCity(city);
        data.setTypeP(typeP);
        data.setTimeStramp(timeStramp);
        //直接用数据写入到kafka
 ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, null, (data.user + " " + data.activity + " " + data.pageViews + " " + data.city + " " + data.typeP + " " + data.timeStramp));
        producer.send(record);
 System.out.println("发送数据: " + (data.user + " " + data.activity + " " + data.pageViews + " " + data.city + " " + data.typeP + " " + data.timeStramp));
        producer.flush();
    }
    public static void main(String[] args) throws Exception {
        //无限循环,保证数据源不断流
        while (true) {
            Thread.sleep(300);
            writerTest();
        }
    }
}

POM.XML添加

        <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka_2.11</artifactId>
      <version>1.9.3</version>
    </dependency>
        <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.68</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.2.0</version>
    </dependency>
相关文章
|
1月前
|
运维 数据处理 Apache
数据实时计算产品对比测评报告:阿里云实时计算Flink版
数据实时计算产品对比测评报告:阿里云实时计算Flink版
|
1月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
1月前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
66 4
|
1月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
57 1
|
1月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
47 1
|
1月前
|
SQL 消息中间件 分布式计算
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
大数据-115 - Flink DataStream Transformation 多个函数方法 FlatMap Window Aggregations Reduce
36 0
|
1月前
|
SQL 分布式计算 大数据
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(一)
46 0
|
1月前
|
大数据 流计算
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(二)
大数据-108 Flink 快速应用案例 重回Hello WordCount!方案1批数据 方案2流数据(二)
46 0
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
56 0
|
3月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
280 9
下一篇
无影云桌面