【Kafka】(十六)Kafka 生产者(producer)生产 topic 数据常见 API

简介: 【Kafka】(十六)Kafka 生产者(producer)生产 topic 数据常见 API

文章目录


一、将本地数据用java语言(API)导入到topic

二、Scala版本将本地文件以JSON格式打到Kafka中

三、直接在shell中使用kafka的producer


一、将本地数据用java语言(API)导入到topic


1.本次主要是把文本文件所有数据导入到topic中


代码说明:将本地文件所有内容逐行地 通过API 打入kafka 的 topic 中

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class Producer3 {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.16.100:9092");
        props.put("ack","1");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        //获得文件路径
        String filePath1="D:\\AWork\\4_Spark\\Project\\GZKY\\src\\file\\WordsList.txt";
        //创建buffer
        BufferedReader br = new BufferedReader(new FileReader(filePath1));
        String line ;
        while((line = br.readLine()) != null) {
            //将文本每条数据转换成 ProducerRecord
            final ProducerRecord<String, String> record = new ProducerRecord<String, String>("gong_test", line+",ll");
            //将数据发个topic
            producer.send(record, new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    // 如果发送消息成功,返回了 RecordMetadata
                    if(metadata != null) {
                        StringBuilder sb = new StringBuilder();
                        sb.append("message has been sent successfully! ")
                                .append("send to partition ").append(metadata.partition())
                                .append(", offset = ").append(metadata.offset());
                        System.out.println(sb.toString());
                        //System.out.println(record.toString());
                    }
                    // 如果消息发送失败,抛出异常
                    if(e != null) {
                        e.printStackTrace();
                    }
                }
            });
            //每隔500ms产生以此数据
            Thread.sleep(500);
        }
        producer.close();
    }
}


2.本地文件通过API 以Json格式 打入kafka 的 topic 中

此时可以通过json的形式,选择性地拿取本地文件数据到topic

代码如下:

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
import org.json.JSONException;
import org.json.JSONObject;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/*
此版本是java版本
将本地文件 通过API 以Json格式 打入kafka  的  topic 中
 */
public class Producer4 {
    public static void main(String[] args) throws IOException, JSONException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.16.100:9092");
        props.put("ack","1");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        //获得文件路径
        String filePath1="D:\\AWork\\4_Spark\\Project\\GZKY\\src\\file\\WordsList.txt";
        //
        BufferedReader bf=new BufferedReader(new FileReader(filePath1));
        String line;
        while ((line=bf.readLine())!=null){
            JSONObject jo=new JSONObject();
            String[] lines=line.split(",");
            jo.put("1",lines[0]);
            jo.put("2",lines[1]);
            jo.put("3",lines[2]);
            jo.put("4",lines[3]);
            ProducerRecord<String,String> record=new ProducerRecord<String,String> ("gong_test",jo.toString());
            producer.send(record, new Callback() {
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if(recordMetadata!=null){
                        StringBuffer sb=new StringBuffer();
                        sb.append("success  ").append("partition:").append(recordMetadata.partition())
                                .append(" offset:").append(recordMetadata.offset());
                        System.out.println(sb.toString());
                    }
                    if(e!=null){
                        e.printStackTrace();
                    }
                }
            });
            Thread.sleep(500);
        }
        producer.close();
    }
}


二、Scala版本将本地文件以JSON格式打到Kafka中


import java.util.Properties
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import org.apache.spark.sql.SparkSession
import org.json.JSONObject
/*
此版本是spark版本
把本地文本数据数据导入到Kafka的topic中  此方法可以挑选文本中有用的字段->json格式
*/
object ProducerJson {
  def main(args: Array[String]): Unit = {
    //往topic中写数据
    val topic = "gong_test"
    //指定broker的ip和端口号
    val brokers="192.168.16.100:9092"
    //建配置文件
    val props=new Properties()
    props.put("metadata.broker.list",brokers)
    //指定Kafka的编译器 放入
    props.put("serializer.class","kafka.serializer.StringEncoder")
    //配置kafka的config
    //val kafkaconfig=new ProducerConfig(props)、
    val kafkaconfig=new ProducerConfig(props)
    val producer= new Producer[String,String](kafkaconfig)
    //配置SPark的congfig
    val ss = SparkSession.builder().appName("LocalToKafka").master("local[2]").getOrCreate()
    val sc =ss.sparkContext
    //定义path
    val filePath="D:\\AWork\\gzky\\WordsList.txt"
    val records=sc.textFile(filePath).map(_.split(",")).collect()
    //把数据预处理变成json
    for (record<-records){
      val event = new JSONObject() // import org.json.JSONObject
      event
        .put("camera_id", record(0))
        .put("car_id", record(1))
        .put("event_time", record(2))
        .put("speed", record(3))
        .put("road_id", record(4))
      // 生产event 消息
      producer.send(new KeyedMessage[String,String](topic,event.toString()))
      println(""+event)
      Thread.sleep(200)
    }
     sc.stop()
  }
}


三、直接在shell中使用kafka的producer


目的将本地文件一次性打入到topic中

./kafka-console-producer.sh --broker-list 192.168.16.100:9092 --topic gonst </root/WordsList.txt


总结:


当然kafka的topic数据来源有很多,比如:从一个端口直接生产数据,或者从flume端接收数据等,上面只是写了从本地数据到topic。

目录
相关文章
|
1月前
|
消息中间件 NoSQL Kafka
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
大数据-52 Kafka 基础概念和基本架构 核心API介绍 应用场景等
63 5
|
1月前
|
消息中间件 存储 分布式计算
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
大数据-53 Kafka 基本架构核心概念 Producer Consumer Broker Topic Partition Offset 基础概念了解
67 4
|
3月前
|
消息中间件 Kafka API
【Kafka消费新风潮】告别复杂,迎接简洁之美——深度解析Kafka新旧消费者API大比拼!
【8月更文挑战第24天】Apache Kafka作为一个领先的分布式流处理平台,广泛用于实时数据管道和流式应用的构建。随着其发展,消费者API经历了重大更新。旧消费者API(包括“低级”和“高级”API)虽提供灵活性但在消息顺序处理上存在挑战。2017年引入的新消费者API简化了接口,自动管理偏移量,支持更强大的消费组功能,显著降低了开发复杂度。通过对比新旧消费者API的代码示例可以看出,新API极大提高了开发效率和系统可维护性。
133 58
|
2月前
|
消息中间件 Kafka Apache
kafka: invalid configuration (That topic/partition is already being consumed)
kafka: invalid configuration (That topic/partition is already being consumed)
|
5月前
|
消息中间件 负载均衡 Kafka
一文读懂Kafka API:Producer、Consumer和Streams全解析
大家好,今天我们将深入探讨Kafka的三大核心API。通过这篇文章,你将了解如何使用Producer API发布记录流,利用Consumer API订阅和处理数据,以及通过Streams API实现复杂的流处理。一起开启Kafka的探索之旅吧!
198 2
|
4月前
|
消息中间件 监控 Kafka
查询Kafka集群中消费组(group)信息和对应topic的消费情况
查询Kafka集群中消费组(group)信息和对应topic的消费情况
2501 0
|
1月前
|
消息中间件 存储 运维
为什么说Kafka还不是完美的实时数据通道
【10月更文挑战第19天】Kafka 虽然作为数据通道被广泛应用,但在实时性、数据一致性、性能及管理方面存在局限。数据延迟受消息堆积和分区再平衡影响;数据一致性难以达到恰好一次;性能瓶颈在于网络和磁盘I/O;管理复杂性涉及集群配置与版本升级。
|
1月前
|
消息中间件 Java Kafka
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
Flink-04 Flink Java 3分钟上手 FlinkKafkaConsumer消费Kafka数据 进行计算SingleOutputStreamOperatorDataStreamSource
49 1
|
3月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
300 9
|
3月前
|
消息中间件 负载均衡 Java
"Kafka核心机制揭秘:深入探索Producer的高效数据发布策略与Java实战应用"
【8月更文挑战第10天】Apache Kafka作为顶级分布式流处理平台,其Producer组件是数据高效发布的引擎。Producer遵循高吞吐、低延迟等设计原则,采用分批发送、异步处理及数据压缩等技术提升性能。它支持按消息键值分区,确保数据有序并实现负载均衡;提供多种确认机制保证可靠性;具备失败重试功能确保消息最终送达。Java示例展示了基本配置与消息发送流程,体现了Producer的强大与灵活性。
68 3
下一篇
无影云桌面