【Kafka】(十六)Kafka 生产者(producer)生产 topic 数据常见 API

简介: 【Kafka】(十六)Kafka 生产者(producer)生产 topic 数据常见 API

文章目录


一、将本地数据用java语言(API)导入到topic

二、Scala版本将本地文件以JSON格式打到Kafka中

三、直接在shell中使用kafka的producer


一、将本地数据用java语言(API)导入到topic


1.本次主要是把文本文件所有数据导入到topic中


代码说明:将本地文件所有内容逐行地 通过API 打入kafka 的 topic 中

import java.io.BufferedReader;
import java.io.FileReader;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class Producer3 {
    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.16.100:9092");
        props.put("ack","1");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        //获得文件路径
        String filePath1="D:\\AWork\\4_Spark\\Project\\GZKY\\src\\file\\WordsList.txt";
        //创建buffer
        BufferedReader br = new BufferedReader(new FileReader(filePath1));
        String line ;
        while((line = br.readLine()) != null) {
            //将文本每条数据转换成 ProducerRecord
            final ProducerRecord<String, String> record = new ProducerRecord<String, String>("gong_test", line+",ll");
            //将数据发个topic
            producer.send(record, new Callback() {
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    // 如果发送消息成功,返回了 RecordMetadata
                    if(metadata != null) {
                        StringBuilder sb = new StringBuilder();
                        sb.append("message has been sent successfully! ")
                                .append("send to partition ").append(metadata.partition())
                                .append(", offset = ").append(metadata.offset());
                        System.out.println(sb.toString());
                        //System.out.println(record.toString());
                    }
                    // 如果消息发送失败,抛出异常
                    if(e != null) {
                        e.printStackTrace();
                    }
                }
            });
            //每隔500ms产生以此数据
            Thread.sleep(500);
        }
        producer.close();
    }
}


2.本地文件通过API 以Json格式 打入kafka 的 topic 中

此时可以通过json的形式,选择性地拿取本地文件数据到topic

代码如下:

import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.Properties;
import org.json.JSONException;
import org.json.JSONObject;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
/*
此版本是java版本
将本地文件 通过API 以Json格式 打入kafka  的  topic 中
 */
public class Producer4 {
    public static void main(String[] args) throws IOException, JSONException, InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.16.100:9092");
        props.put("ack","1");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        //获得文件路径
        String filePath1="D:\\AWork\\4_Spark\\Project\\GZKY\\src\\file\\WordsList.txt";
        //
        BufferedReader bf=new BufferedReader(new FileReader(filePath1));
        String line;
        while ((line=bf.readLine())!=null){
            JSONObject jo=new JSONObject();
            String[] lines=line.split(",");
            jo.put("1",lines[0]);
            jo.put("2",lines[1]);
            jo.put("3",lines[2]);
            jo.put("4",lines[3]);
            ProducerRecord<String,String> record=new ProducerRecord<String,String> ("gong_test",jo.toString());
            producer.send(record, new Callback() {
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if(recordMetadata!=null){
                        StringBuffer sb=new StringBuffer();
                        sb.append("success  ").append("partition:").append(recordMetadata.partition())
                                .append(" offset:").append(recordMetadata.offset());
                        System.out.println(sb.toString());
                    }
                    if(e!=null){
                        e.printStackTrace();
                    }
                }
            });
            Thread.sleep(500);
        }
        producer.close();
    }
}


二、Scala版本将本地文件以JSON格式打到Kafka中


import java.util.Properties
import kafka.producer.{KeyedMessage, Producer, ProducerConfig}
import org.apache.spark.sql.SparkSession
import org.json.JSONObject
/*
此版本是spark版本
把本地文本数据数据导入到Kafka的topic中  此方法可以挑选文本中有用的字段->json格式
*/
object ProducerJson {
  def main(args: Array[String]): Unit = {
    //往topic中写数据
    val topic = "gong_test"
    //指定broker的ip和端口号
    val brokers="192.168.16.100:9092"
    //建配置文件
    val props=new Properties()
    props.put("metadata.broker.list",brokers)
    //指定Kafka的编译器 放入
    props.put("serializer.class","kafka.serializer.StringEncoder")
    //配置kafka的config
    //val kafkaconfig=new ProducerConfig(props)、
    val kafkaconfig=new ProducerConfig(props)
    val producer= new Producer[String,String](kafkaconfig)
    //配置SPark的congfig
    val ss = SparkSession.builder().appName("LocalToKafka").master("local[2]").getOrCreate()
    val sc =ss.sparkContext
    //定义path
    val filePath="D:\\AWork\\gzky\\WordsList.txt"
    val records=sc.textFile(filePath).map(_.split(",")).collect()
    //把数据预处理变成json
    for (record<-records){
      val event = new JSONObject() // import org.json.JSONObject
      event
        .put("camera_id", record(0))
        .put("car_id", record(1))
        .put("event_time", record(2))
        .put("speed", record(3))
        .put("road_id", record(4))
      // 生产event 消息
      producer.send(new KeyedMessage[String,String](topic,event.toString()))
      println(""+event)
      Thread.sleep(200)
    }
     sc.stop()
  }
}


三、直接在shell中使用kafka的producer


目的将本地文件一次性打入到topic中

./kafka-console-producer.sh --broker-list 192.168.16.100:9092 --topic gonst </root/WordsList.txt


总结:


当然kafka的topic数据来源有很多,比如:从一个端口直接生产数据,或者从flume端接收数据等,上面只是写了从本地数据到topic。

目录
打赏
0
0
0
0
89
分享
相关文章
1688商品数据实战:API搜索接口开发与供应链分析应用
本文详细介绍了如何通过1688开放API实现商品数据的获取与应用,涵盖接入准备、签名流程、数据解析存储及商业化场景。开发者可完成智能选品、价格监控和供应商评级等功能,同时提供代码示例与问题解决方案,确保法律合规与数据安全。适合企业开发者快速构建供应链管理系统。
不写一行代码,用MCP+魔搭API-Inference 搭建一个本地数据助手! 附所有工具和清单
还在为大模型开发的复杂技术栈、框架不兼容和工具调用问题头疼吗?MCP(Model Context Protocol servers)来拯救你了!它用统一的技术栈、兼容主流框架和简化工具调用的方式,让大模型开发变得简单高效。
177 1
Java||Springboot读取本地目录的文件和文件结构,读取服务器文档目录数据供前端渲染的API实现
博客不应该只有代码和解决方案,重点应该在于给出解决方案的同时分享思维模式,只有思维才能可持续地解决问题,只有思维才是真正值得学习和分享的核心要素。如果这篇博客能给您带来一点帮助,麻烦您点个赞支持一下,还可以收藏起来以备不时之需,有疑问和错误欢迎在评论区指出~
Java||Springboot读取本地目录的文件和文件结构,读取服务器文档目录数据供前端渲染的API实现
微店API开发全攻略:解锁电商数据与业务自动化的核心能力
微店开放平台提供覆盖商品、订单、用户、营销、物流五大核心模块的API接口,支持企业快速构建电商中台系统。其API体系具备模块化设计、双重认证机制、高并发支持和数据隔离等特性。文档详细解析了商品管理、订单处理、营销工具等核心接口功能,并提供实战代码示例。同时,介绍了企业级整合方案设计,如订单全链路自动化和商品数据中台架构,以及性能优化与稳定性保障措施。最后,针对高频问题提供了排查指南,帮助开发者高效利用API实现电商数智化转型。适合中高级开发者阅读。
淘宝拍立淘按图搜索API接口系列的应用与数据解析
淘宝拍立淘按图搜索API接口是阿里巴巴旗下淘宝平台提供的一项基于图像识别技术的创新服务。以下是对该接口系列的应用与数据解析的详细分析
深挖京东商品详情 API:一键获取全维度商品数据
京东商品详情API是京东开放平台为开发者提供的关键接口,支持通过编程方式获取商品详细信息,包括基本信息、描述、规格和用户评价等。该API数据全面、实时性强、稳定性高且灵活可定制,满足多场景需求。示例代码展示了如何用Python调用此API,帮助开发者快速集成京东商品数据到自身系统中,实现高效的商品数据分析与应用开发。体验链接:c0b.cc/R4rbK2 。
淘宝商品详情API接口概述与JSON数据示例
淘宝商品详情API是淘宝开放平台提供的核心接口之一,为开发者提供了获取商品深度信息的能力。以下是技术细节和示例:
【实战解析】smallredbook.item_get_video API:小红书视频数据获取与电商应用指南
本文介绍小红书官方API——`smallredbook.item_get_video`的功能与使用方法。该接口可获取笔记视频详情,包括无水印直链、封面图、时长、文本描述、标签及互动数据等,并支持电商场景分析。调用需提供`key`、`secret`和`num_iid`参数,返回字段涵盖视频链接、标题、标签及用户信息等。同时,文章提供了电商实战技巧,如竞品监控与个性化推荐,并列出合规注意事项及替代方案对比。最后解答了常见问题,如笔记ID获取与视频链接时效性等。
如何在自己的网站接入API接口获取数据?分步指南与实战示例
将第三方API(如微店API)接入网站是扩展功能和获取实时数据的关键。流程包括注册开发者账号、申请API权限、设置认证机制(OAuth 2.0或AppKey签名)、调用API实现前后端协作、处理数据与错误、优化安全性能,并解决常见问题。确保遵循最佳实践,保障系统稳定与安全。通过这些步骤,开发者可高效整合数据,提升应用功能。
如何高效爬取天猫商品数据?官方API与非官方接口全解析
本文介绍两种天猫商品数据爬取方案:官方API和非官方接口。官方API合法合规,适合企业长期使用,需申请企业资质;非官方接口适合快速验证需求,但需应对反爬机制。详细内容涵盖开发步骤、Python实现示例、反爬策略、数据解析与存储、注意事项及扩展应用场景。推荐工具链包括Playwright、aiohttp、lxml等。如需进一步帮助,请联系作者。