java,scala读写kafka操作

简介: 今天主要简单写一下kafka的读写,我写了java,scala两个版本的,写法比较老,但都能用,已经测试过了,直接上代码吧;java版本:package com.cn.kafka;import java.util.Arrays;import java.util.HashMap;

今天主要简单写一下kafka的读写,我写了java,scala两个版本的,写法比较老,但都能用,已经测试过了,直接上代码吧;


java版本:


package com.cn.kafka;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import net.sf.json.JSONObject;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
/**
 * 从kafka里读取数据
 * @projectName:jason_kafka
 * @author:JasonLee
 * @ClassName:KafkaConsumerCS
 * @createdTime:2017-12-20
 */
public class KafkaUtils {
  public void consumer(){
  Properties props = new Properties();
  props.put("bootstrap.servers", "");
  props.put("group.id", "jason_");
  props.put("enable.auto.commit", "true");
  props.put("auto.commit.interval.ms", "1000");
  props.put("session.timeout.ms", "30000");
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);
  consumer.subscribe(Arrays.asList("jason_20180519"));
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records){
    System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
    }
  }
  }
  public void producer(){
  Properties props = new Properties();
  props.put("serializer.class", "kafka.serializer.StringEncoder");
  props.put("metadata.broker.list", "");  //声明broker;
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  ProducerConfig config = new ProducerConfig(props);
  Producer<String, String> producer = new Producer<String, String>(config);
  Map map = new HashMap();
  map.put("name","jason");
  map.put("addr","100");
  JSONObject jsonObject = JSONObject.fromObject(map);
  KeyedMessage<String, String> message =  new KeyedMessage<String, String>("jason_0627", jsonObject.toString());
  producer.send(message);
  }
}


scala版本:


package kafka
import java.util.{Collections, Properties}
import net.sf.json.JSONObject
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import scala.collection.JavaConversions._
object KafkaConsumer {
  def cunsumer(): Unit ={
    val props = new Properties()
    props.put("bootstrap.servers", PropertiesScalaUtils.loadProperties("broker"))
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("group.id", "something")
    props.put("auto.offset.reset","earliest")
    props.put("enable.auto.commit", "true")
    props.put("auto.commit.interval.ms", "1000")
    val consumer = new KafkaConsumer[String, String](props)
    consumer.subscribe(Collections.singletonList("test_v1"))
    while (true){
      val records = consumer.poll(100)
      for (record <- records){
        println(record.offset() +"--" +record.key() +"--" +record.value())
      }
    }
    consumer.close()
  }
  def producer(): Unit ={
    val brokers_list = ""
    val topic = "jason_20180511"
    val properties = new Properties()
    properties.put("group.id", "jaosn_")
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers_list)
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName) //key的序列化;
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)//value的序列化;
    val producer = new KafkaProducer[String, String](properties)
    var num = 0
    for(i<- 1 to 1000){
      val json = new JSONObject()
      json.put("name","jason"+i)
      json.put("addr","25"+i)
      producer.send(new ProducerRecord(topic,json.toString()))
    }
    producer.close()
  }
}



相关文章
|
4天前
|
安全 Java 开发者
Java中的读写锁ReentrantReadWriteLock详解,存在一个小缺陷
Java中的读写锁ReentrantReadWriteLock详解,存在一个小缺陷
13 2
|
1天前
|
消息中间件 Kubernetes Java
实时计算 Flink版操作报错合集之写入 Kafka 报错 "Failed to send data to Kafka: Failed to allocate memory within the configured max blocking time 60000 ms",该怎么解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
10 0
|
1天前
|
SQL 消息中间件 Kafka
实时计算 Flink版操作报错合集之使用 Event Time Temporal Join 关联多个 HBase 后,Kafka 数据的某个字段变为 null 是什么原因导致的
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
9 0
|
1天前
|
网络安全 流计算 Python
实时计算 Flink版操作报错合集之Flink sql-client 针对kafka的protobuf格式数据建表,报错:java.lang.ClassNotFoundException 如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
11 1
|
1天前
|
消息中间件 关系型数据库 MySQL
实时计算 Flink版操作报错合集之遇到报错:Apache Kafka Connect错误如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
12 5
|
1天前
|
消息中间件 Kafka 数据库连接
实时计算 Flink版操作报错合集之无法将消费到的偏移量提交到Kafka如何解决
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
11 3
|
2天前
|
消息中间件 关系型数据库 网络安全
实时计算 Flink版操作报错合集之Flink sql-client 针对kafka的protobuf格式数据建表,报错:java.lang.ClassNotFoundException 如何解决
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
13 1
|
3天前
|
消息中间件 Java Kafka
Java大文件排序(有手就能学会),kafka面试题2024
Java大文件排序(有手就能学会),kafka面试题2024
|
3天前
|
消息中间件 前端开发 Java
java面试刷题软件kafka和mq的区别面试
java面试刷题软件kafka和mq的区别面试
|
4天前
|
监控 Java
Java一分钟之-NIO:非阻塞IO操作
【5月更文挑战第14天】Java的NIO(New IO)解决了传统BIO在高并发下的低效问题,通过非阻塞方式提高性能。NIO涉及复杂的选择器和缓冲区管理,易出现线程、内存和中断处理的误区。要避免这些问题,可以使用如Netty的NIO库,谨慎设计并发策略,并建立标准异常处理。示例展示了简单NIO服务器,接收连接并发送欢迎消息。理解NIO工作原理和最佳实践,有助于构建高效网络应用。
8 2