FLINK Producer数据写入到kafka 方法二

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: FLINK Producer数据写入到kafka

package kafkaproducer;
public class Data {
    public String user;
    public String activity;
    public long timeStramp;
    public int pageViews;
    public String typeP;
    public String city;
    public Data() {
    }
    public Data(String user, String activity, long timeStramp, int pageViews, String typeP, String city) {
        this.user = user;
        this.activity = activity;
        this.timeStramp = timeStramp;
        this.pageViews = pageViews;
        this.typeP = typeP;
        this.city = city;
    }
    @Override
    public String toString() {
        return "data{" +
                "user='" + user + '\'' +
                ", activity='" + activity + '\'' +
                ", timeStramp=" + timeStramp +
                ", pageViews=" + pageViews +
                ", typeP='" + typeP + '\'' +
                ", city='" + city + '\'' +
                '}';
    }
    public String getUser() {
        return user;
    }
    public void setUser(String user) {
        this.user = user;
    }
    public String getActivity() {
        return activity;
    }
    public void setActivity(String activity) {
        this.activity = activity;
    }
    public long getTimeStramp() {
        return timeStramp;
    }
    public void setTimeStramp(long timeStramp) {
        this.timeStramp = timeStramp;
    }
    public int getPageViews() {
        return pageViews;
    }
    public void setPageViews(int pageViews) {
        this.pageViews = pageViews;
    }
    public String getTypeP() {
        return typeP;
    }
    public void setTypeP(String typeP) {
        this.typeP = typeP;
    }
    public String getCity() {
        return city;
    }
    public void setCity(String city) {
        this.city = city;
    }
}
package kafkaproducer;
import com.alibaba.fastjson.JSON;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.text.DecimalFormat;
import java.util.Properties;
public class DataProducer {
    //定义broker
    public static final String broker_list = "master:9092,slave1:9092,slave2:9092";
    //定义topic 和kafka正在用的一致
    public static final String topic = "test";
    public static void writerTest() throws Exception {
        Properties prop = new Properties();
        prop.setProperty("bootstrap.servers", broker_list);
        prop.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //key 序列化
        prop.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value 序列化
        KafkaProducer producer = new KafkaProducer<String, String>(prop);
        //位数不够,自动填充补0
        DecimalFormat userDecimal = new DecimalFormat("000");
        DecimalFormat typeDecimal = new DecimalFormat("0");
        String[] typeList = {"pv", "pu", "cart"};
        String[] cityList = {"北京市", "天津市", "上海市", "深圳市", "重庆市", "河北省", "湖北省", "河南省", "山东省"};
        //获取1-10数
        int r_user = (int)(Math.round(Math.random() * 9 + 1));
        int r_activity = (int)(Math.round(Math.random() * 4 + 1));
        int p_type = (int)(Math.random() * typeList.length);
        int t_city = (int)(Math.random() * cityList.length);
        //对用户进行组合
        String user = "U" + userDecimal.format(r_user);
        String activity = "A" + typeDecimal.format(r_activity);
        //获取当前时间戳
        long timeStramp = System.currentTimeMillis();
        int pageview = (int) (Math.round(Math.random() * 4 + 1));
        String typeP = typeList[p_type];
        String city = cityList[t_city];
        Data data = new Data();
        data.setUser(user);
        data.setActivity(activity);
        data.setPageViews(pageview);
        data.setCity(city);
        data.setTypeP(typeP);
        data.setTimeStramp(timeStramp);
        //通过JSON格式写入到kafka中
        ProducerRecord<String, String> record = new ProducerRecord<>(topic, null, null, JSON.toJSONString(data));
        producer.send(record);
        System.out.println("发送数据: "+ JSON.toJSONString(data));
        producer.flush();
    }
    public static void main(String[] args) throws Exception {
        //无限循环,保证数据源不断流
        while (true) {
            Thread.sleep(300);
            writerTest();
        }
    }
}

POM.XML添加

        <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-java</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-kafka_2.11</artifactId>
      <version>1.9.3</version>
    </dependency>
        <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>1.2.68</version>
    </dependency>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>3.8.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.2.0</version>
    </dependency>


相关文章
|
6月前
|
消息中间件 存储 Java
【Kafka】Kafka 组件分析
【4月更文挑战第5天】【Kafka】Kafka 组件分析
|
2月前
|
消息中间件 存储 算法
kafka(二)
kafka(二)
|
3月前
|
消息中间件 Kafka
kafka里的acks是什么
【8月更文挑战第3天】kafka里的acks是什么
179 0
|
6月前
|
消息中间件 Java Kafka
Kafka
Kafka
49 1
|
6月前
|
消息中间件 Kafka 流计算
Flink消费kafka数据
Flink消费kafka数据
70 0
|
消息中间件 开发框架 Java
113 Kafka介绍
113 Kafka介绍
76 0
|
消息中间件 缓存 负载均衡
Kafka实战(一) : 认识Kafka
Kafka实战(一) : 认识Kafka
|
消息中间件 存储 缓存
kafka
kafka
363 0
|
消息中间件 存储 Kafka
kafka-初识kafka
- kafka是一个具有高吞吐,可水平扩展,可持久化的流式数据处理平台。 - kafka主要包括:消息系统、日志系统、流式处理平台、zookeeper 四大重要组件。 消息系统的重要概念:生产者(producer),消费者(customer),服务节点(broker)。消息系统中一个重要的原理:通过连通器原理实现了保持数据的一致性。
108 0
kafka-初识kafka
|
消息中间件 Java Kafka
Flink消费kafka消息实战
本次实战的内容是开发Flink应用,消费来自kafka的消息,进行实时计算
1097 0
Flink消费kafka消息实战