kafka_java

简介:

使用kafka在eclipe上可以设计生产着 和消费者
例子一
将本地文件上传到kafka上然后通过设计kafka的消费者取回到本地


上传到kafka上需要
KafkaProducerproducer;
Properties;//kafka的链接需要初始化数据这里需要properties将所需的东西以字符串的形式写在properties文件中所需东西不多且不会修改的情况下可以直接写在类里面.
FileInputStream 以字节流的方式传入到kafka


package com.ocean.kafka;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Reader;
import java.util.Properties;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class HomeWork {

    private KafkaProducer<String, byte[]> producer;
    private Properties properties;

    // 初始化数据
    public HomeWork() {
        properties = new Properties();
        properties.put("bootstrap.servers", "master:9092,slave1:9092,slave2:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

        producer = new KafkaProducer<String, byte[]>(properties);
    }

    // 指定发送到的topic
    public void assignPartitionSend(String key, byte[] value) {

        ProducerRecord<String, byte[]> record = new ProducerRecord<String, byte[]>("home-work_pic",0, key, value);
        producer.send(record);
    }

    // 准备数据
    public void preparLocalData() throws IOException {
        File file = new File("C:\\Users\\Administrator\\Desktop\\psb.jpg");
        FileInputStream fis =new FileInputStream(file);
        
        byte[] context = new byte[1024];
        int a = 0;
        int length=0;
        while ((length=fis.read(context))!=-1) {
//这里要注意得判断读取内容的实际长度 如果不这样设置 回到多出来//很多空格如果是图片的话则取回时无法还原
            byte[] newbyte =new byte[length];
            System.arraycopy(context, 0,   newbyte, 0, length);
            assignPartitionSend("TIMES" + a, newbyte);
            
            a++;
        }
        
        fis.close();
    }

    

    public void close() {
        producer.flush();
        producer.close();
    }

    public static void main(String[] args) throws IOException {
        HomeWork homeWork = new HomeWork();
         try {
         homeWork.preparLocalData();
         } catch (IOException e) {
        
         e.printStackTrace();
         }
    
        homeWork.close();
    }
}

将数据传到Kafka上之后要想查看 可以通过zookeeper的客户端但是什么都看不懂因为是字节流(需要注意的一点是文件传到kafka上分区为0不然的话就会出现文件对应不上 )
最后想看的话就是取回来 下面的累就是将kafka上的文件数据传回到本地


package com.ocean.kafka;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

public class HomeWork2 {
    private Properties properties = new Properties();
    private KafkaConsumer<String, byte[]> consumer;

    public HomeWork2() {

        properties = new Properties();
        properties.put("bootstrap.servers", "master:9092");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
        properties.setProperty("group.id", "home-work_pic");
        consumer = new KafkaConsumer<String, byte[]>(properties);

    }
    
    public void getfile() throws IOException {
        File file =new File("C:\\Users\\Administrator\\Desktop\\output.jpg");
        FileOutputStream fileOutputStream =new FileOutputStream(file,true);
        List<String>topics =new ArrayList<String>();
        topics.add("home-work_pic");
        consumer.subscribe(topics);
        while(true){
            ConsumerRecords<String, byte[]> records =consumer.poll(1000);
            for (ConsumerRecord<String, byte[]> record : records) {
                if(record.value()!=null){
                System.out.println(record.value());
                byte[] b =record.value();
            fileOutputStream.write(b);
                fileOutputStream.flush();
                }
            }
        }
        
    }
        
    public static void main(String[] args)  {
        HomeWork2 homeWork2 =new HomeWork2();
        try {
            homeWork2.getfile();
        } catch (IOException e) {
        
            e.printStackTrace();
        }
    }
}

这就是一个简单的设置将文件以字节流的方式上传和下载从kafka上

相关文章
|
4月前
|
消息中间件 Java Kafka
Java 客户端访问kafka
Java 客户端访问kafka
40 9
|
4月前
|
消息中间件 Kafka 数据处理
实时计算 Flink版操作报错合集之使用kafka connector时,报错:java.lang.ClassNotFoundException,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
4月前
|
消息中间件 负载均衡 Java
如何在Java中使用Kafka
如何在Java中使用Kafka
|
5月前
|
Java
使用kafka-clients操作数据(java)
使用kafka-clients操作数据(java)
|
12月前
|
消息中间件 存储 负载均衡
Java面试题 -Kafka
Java面试题 -Kafka
75 0
|
6月前
|
消息中间件 Java Kafka
Apache Kafka-初体验Kafka(04)-Java客户端操作Kafka
Apache Kafka-初体验Kafka(04)-Java客户端操作Kafka
54 0
|
消息中间件 Java Kafka
Java 最常见的面试题:使用 kafka 集群需要注意什么?
Java 最常见的面试题:使用 kafka 集群需要注意什么?
|
消息中间件 存储 负载均衡
JAVA面试——Kafka
JAVA面试——Kafka
125 0
JAVA面试——Kafka
|
消息中间件 Java Kafka
【Java客户端访问Kafka】
【Java客户端访问Kafka】
103 0
【Java客户端访问Kafka】
|
消息中间件 监控 Kafka
Java--Apache kafka安装和配置
Apache kafka的下载安装和配置。
250 0
Java--Apache kafka安装和配置