java,scala读写kafka操作

简介: 今天主要简单写一下kafka的读写,我写了java,scala两个版本的,写法比较老,但都能用,已经测试过了,直接上代码吧;java版本:package com.cn.kafka;import java.util.Arrays;import java.util.HashMap;

今天主要简单写一下kafka的读写,我写了java,scala两个版本的,写法比较老,但都能用,已经测试过了,直接上代码吧;


java版本:


package com.cn.kafka;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import net.sf.json.JSONObject;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
/**
 * 从kafka里读取数据
 * @projectName:jason_kafka
 * @author:JasonLee
 * @ClassName:KafkaConsumerCS
 * @createdTime:2017-12-20
 */
public class KafkaUtils {
  public void consumer(){
  Properties props = new Properties();
  props.put("bootstrap.servers", "");
  props.put("group.id", "jason_");
  props.put("enable.auto.commit", "true");
  props.put("auto.commit.interval.ms", "1000");
  props.put("session.timeout.ms", "30000");
  props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props);
  consumer.subscribe(Arrays.asList("jason_20180519"));
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records){
    System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
    }
  }
  }
  public void producer(){
  Properties props = new Properties();
  props.put("serializer.class", "kafka.serializer.StringEncoder");
  props.put("metadata.broker.list", "");  //声明broker;
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  ProducerConfig config = new ProducerConfig(props);
  Producer<String, String> producer = new Producer<String, String>(config);
  Map map = new HashMap();
  map.put("name","jason");
  map.put("addr","100");
  JSONObject jsonObject = JSONObject.fromObject(map);
  KeyedMessage<String, String> message =  new KeyedMessage<String, String>("jason_0627", jsonObject.toString());
  producer.send(message);
  }
}


scala版本:


package kafka
import java.util.{Collections, Properties}
import net.sf.json.JSONObject
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
import scala.collection.JavaConversions._
object KafkaConsumer {
  def cunsumer(): Unit ={
    val props = new Properties()
    props.put("bootstrap.servers", PropertiesScalaUtils.loadProperties("broker"))
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("group.id", "something")
    props.put("auto.offset.reset","earliest")
    props.put("enable.auto.commit", "true")
    props.put("auto.commit.interval.ms", "1000")
    val consumer = new KafkaConsumer[String, String](props)
    consumer.subscribe(Collections.singletonList("test_v1"))
    while (true){
      val records = consumer.poll(100)
      for (record <- records){
        println(record.offset() +"--" +record.key() +"--" +record.value())
      }
    }
    consumer.close()
  }
  def producer(): Unit ={
    val brokers_list = ""
    val topic = "jason_20180511"
    val properties = new Properties()
    properties.put("group.id", "jaosn_")
    properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers_list)
    properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName) //key的序列化;
    properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)//value的序列化;
    val producer = new KafkaProducer[String, String](properties)
    var num = 0
    for(i<- 1 to 1000){
      val json = new JSONObject()
      json.put("name","jason"+i)
      json.put("addr","25"+i)
      producer.send(new ProducerRecord(topic,json.toString()))
    }
    producer.close()
  }
}



相关文章
|
3月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
54 0
|
3月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
110 0
|
2月前
|
消息中间件 缓存 Java
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
零拷贝技术 Zero-Copy 是指计算机执行操作时,可以直接从源(如文件或网络套接字)将数据传输到目标缓冲区, 而不需要 CPU 先将数据从某处内存复制到另一个特定区域,从而减少上下文切换以及 CPU 的拷贝时间。
java nio,netty,kafka 中经常提到“零拷贝”到底是什么?
|
3月前
|
分布式计算 大数据 Java
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
50 1
大数据-86 Spark 集群 WordCount 用 Scala & Java 调用Spark 编译并打包上传运行 梦开始的地方
|
3月前
|
消息中间件 Java Kafka
Flink-07 Flink Java 3分钟上手 滚动窗口 事件驱动 Kafka TumblingWindow GlobalWindow CountWindow
Flink-07 Flink Java 3分钟上手 滚动窗口 事件驱动 Kafka TumblingWindow GlobalWindow CountWindow
48 7
|
3月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
57 3
|
3月前
|
消息中间件 分布式计算 Java
大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送
大数据-73 Kafka 高级特性 稳定性-事务 相关配置 事务操作Java 幂等性 仅一次发送
43 2
|
3月前
|
消息中间件 存储 Java
大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现
大数据-58 Kafka 高级特性 消息发送02-自定义序列化器、自定义分区器 Java代码实现
67 3
|
3月前
|
消息中间件 NoSQL Kafka
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
Flink-10 Flink Java 3分钟上手 Docker容器化部署 JobManager TaskManager Kafka Redis Dockerfile docker-compose
78 4
|
3月前
|
消息中间件 Java 大数据
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
大数据-56 Kafka SpringBoot与Kafka 基础简单配置和使用 Java代码 POM文件
82 2