今天主要简单写一下kafka的读写,我写了java,scala两个版本的,写法比较老,但都能用,已经测试过了,直接上代码吧;
java版本:
package com.cn.kafka; import java.util.Arrays; import java.util.HashMap; import java.util.Map; import java.util.Properties; import net.sf.json.JSONObject; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; import kafka.producer.ProducerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; /** * 从kafka里读取数据 * @projectName:jason_kafka * @author:JasonLee * @ClassName:KafkaConsumerCS * @createdTime:2017-12-20 */ public class KafkaUtils { public void consumer(){ Properties props = new Properties(); props.put("bootstrap.servers", ""); props.put("group.id", "jason_"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); org.apache.kafka.clients.consumer.KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("jason_20180519")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records){ System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } } public void producer(){ Properties props = new Properties(); props.put("serializer.class", "kafka.serializer.StringEncoder"); props.put("metadata.broker.list", ""); //声明broker; props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); ProducerConfig config = new ProducerConfig(props); Producer<String, String> producer = new Producer<String, String>(config); Map map = new HashMap(); map.put("name","jason"); map.put("addr","100"); JSONObject jsonObject = JSONObject.fromObject(map); KeyedMessage<String, String> message = new KeyedMessage<String, String>("jason_0627", jsonObject.toString()); producer.send(message); } }
scala版本:
package kafka import java.util.{Collections, Properties} import net.sf.json.JSONObject import org.apache.kafka.clients.consumer.KafkaConsumer import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord} import org.apache.kafka.common.serialization.StringSerializer import scala.collection.JavaConversions._ object KafkaConsumer { def cunsumer(): Unit ={ val props = new Properties() props.put("bootstrap.servers", PropertiesScalaUtils.loadProperties("broker")) props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("group.id", "something") props.put("auto.offset.reset","earliest") props.put("enable.auto.commit", "true") props.put("auto.commit.interval.ms", "1000") val consumer = new KafkaConsumer[String, String](props) consumer.subscribe(Collections.singletonList("test_v1")) while (true){ val records = consumer.poll(100) for (record <- records){ println(record.offset() +"--" +record.key() +"--" +record.value()) } } consumer.close() } def producer(): Unit ={ val brokers_list = "" val topic = "jason_20180511" val properties = new Properties() properties.put("group.id", "jaosn_") properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,brokers_list) properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName) //key的序列化; properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[StringSerializer].getName)//value的序列化; val producer = new KafkaProducer[String, String](properties) var num = 0 for(i<- 1 to 1000){ val json = new JSONObject() json.put("name","jason"+i) json.put("addr","25"+i) producer.send(new ProducerRecord(topic,json.toString())) } producer.close() } }