FLINK Producer数据写入到kafka 方法二

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: FLINK Producer数据写入到kafka

package kafkaproducer;
public class Data {
    public String user;
    public String activity;
    public long timeStramp;
    public int pageViews;
    public String typeP;
    public String city;
    public Data() {
    }
    public Data(String user, String activity, long timeStramp, int pageViews, String typeP, String city) {
        this.user = user;
        this.activity = activity;
        this.timeStramp = timeStramp;
        this.pageViews = pageViews;
        this.typeP = typeP;
        this.city = city;
    }
    @Override
    public String toString() {
        return "data{" +
                "user='" + user + '\'' +
                ", activity='" + activity + '\'' +
                ", timeStramp=" + timeStramp +
                ", pageViews=" + pageViews +
                ", typeP='" + typeP + '\'' +
                ", city='" + city + '\'' +
                '}';
    }
    public String getUser() {
        return user;
    }
    public void setUser(String user) {
        this.user = user;
    }
    public String getActivity() {
        return activity;
    }
    public void setActivity(String activity) {
        this.activity = activity;
    }
    public long getTimeStramp() {
        return timeStramp;
    }
    public void setTimeStramp(long timeStramp) {
        this.timeStramp = timeStramp;
    }
    public int getPageViews() {
        return pageViews;
    }
    public void setPageViews(int pageViews) {
        this.pageViews = pageViews;
    }
    public String getTypeP() {
        return typeP;
    }
    public void setTypeP(String typeP) {
        this.typeP = typeP;
    }
    public String getCity() {
        return city;
    }
    public void setCity(String city) {
        this.city = city;
    }
}
package kafkaproducer;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.text.DecimalFormat;
import java.util.Properties;
public class DataProducer {
    //定义broker
    public static final String broker_list = "master:9092,slave1:9092,slave2:9092";
    //定义topic 和kafka正在用的一致
    public static final String topic = "test";
    public static void writerTest() throws Exception {
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", broker_list);
        prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //key 序列化
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value 序列化
        KafkaProducer producer = new KafkaProducer<String, String>(prop);
        //位数不够,自动填充补0
        DecimalFormat userDecimal = new DecimalFormat("000");
        DecimalFormat typeDecimal = new DecimalFormat("0");
        String[] typeList = {"pv", "pu", "cart"};
        String[] cityList = {"北京市", "天津市", "上海市", "深圳市", "重庆市", "河北省", "湖北省", "河南省", "山东省"};
        //获取1-10数
        int r_user = (int)(Math.round(Math.random() * 9 + 1));
        int r_activity = (int)(Math.round(Math.random() * 4 + 1));
        int p_type = (int)(Math.random() * typeList.length);
        int t_city = (int)(Math.random() * cityList.length);
        //对用户进行组合
        String user = "U" + userDecimal.format(r_user);
        String activity = "A" + typeDecimal.format(r_activity);
        //获取当前时间戳
        long timeStramp = System.currentTimeMillis();
        int pageview = (int) (Math.round(Math.random() * 4 + 1));
        String typeP = typeList[p_type];
        String city = cityList[t_city];
        Data data = new Data();
        data.setUser(user);
        data.setActivity(activity);
        data.setPageViews(pageview);
        data.setCity(city);
        data.setTypeP(typeP);
        data.setTimeStramp(timeStramp);
        //通过JSON格式写入到kafka中
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, null, JSON.toJSONString(data));
        producer.send(record);
        System.out.println("发送数据: "+ JSON.toJSONString(data));
        producer.flush();
    }
    public static void main(String[] args) throws Exception {
        //无限循环,保证数据源不断流
        while (true) {
            Thread.sleep(300);
            writerTest();
        }
    }
}

POM.XML添加

        <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka_2.11</artifactId>
      <version>1.9.3</version>
    </dependency>
        <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.68</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.2.0</version>
    </dependency>


相关文章
|
7月前
|
消息中间件 Kafka Apache
Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
【2月更文挑战第6天】Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
265 2
|
4月前
|
消息中间件 存储 关系型数据库
实时计算 Flink版产品使用问题之如何使用Kafka Connector将数据写入到Kafka
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
消息中间件 NoSQL Java
Kafka性能篇:为何Kafka这么"快"?
Kafka性能篇:为何Kafka这么"快"?
730 0
|
消息中间件 存储 Kafka
Apache Flink在处理Kafka数据时遇到的问题
Apache Flink在处理Kafka数据时遇到的问题
107 5
|
7月前
|
消息中间件 Kafka 流计算
Flink消费kafka数据
Flink消费kafka数据
75 0
|
7月前
|
消息中间件 Java Kafka
Flink工作中常用__Kafka SourceAPI
Flink工作中常用__Kafka SourceAPI
48 0
|
消息中间件 Kafka Serverless
ffc的kafka触发器和kafka的connector的主要区别
c的kafka触发器和kafka的connector的主要区别
71 1
|
消息中间件 存储 Kafka
Apache Kafka - 流式处理
Apache Kafka - 流式处理
91 0
|
消息中间件 存储 缓存
聊聊 Kafka: Kafka 为啥这么快?
聊聊 Kafka: Kafka 为啥这么快?
|
消息中间件 存储 Java
「Kafka技术」Apache Kafka中的事务
「Kafka技术」Apache Kafka中的事务