package com.ocean.kafka;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import javax.swing.plaf.multi.MultiButtonUI;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
public class MennuCommitConsumer {
private Properties properties = new Properties();
private KafkaConsumer<String, String> consumer;
public MennuCommitConsumer() {
properties.setProperty("bootstrap.servers", "master:9092");
properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("group.id", "java_group");
// properties.setProperty("auto.offset.reset", "null");
properties.setProperty("enable.auto.commit", "false");
consumer = new KafkaConsumer<String, String>(properties);
}
public void subscirbleTopc() {
List<String> topics = new ArrayList<String>();
topics.add("b");
topics.add("from-java");
consumer.subscribe(topics);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("partition:" + record.partition() + "offset:" + record.offset() + "key:"
+ record.key() + "value:" + record.value());
}
// consumer.commitSync();
// 这句话是为了提交数据 如果不写 则会在下次启动时 还会出现
}
}
public void getOffset() {
OffsetAndMetadata offsets = consumer.committed(new TopicPartition("b", 0));
System.out.println("offsets:" + offsets.offset());
}
// 制定分区消费 指定从offset的值出开始消费
// 对消费着topic的消费指定有两种方式
// 1.consumer.subscribe(topics);
// 2.consumer.assign(topicPartitions);
public void sonsumerAssigned() {
// List<String>topics= new ArrayList<String>();
// topics.add("b");
// consumer.subscribe(topics);
// 指定分区
List<TopicPartition> topicPartitions = new ArrayList<TopicPartition>();
topicPartitions.add(new TopicPartition("from-java", 0));
consumer.assign(topicPartitions);
// 指定分区的offset分区的位置
consumer.seek(new TopicPartition("from-java", 0), 21);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println(
"partition:" + record.partition() + "offset:" + record.offset() + "value:" + record.value());
}
}
}
public void setCommentOffset() {
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<TopicPartition, OffsetAndMetadata>();
offsets.put(new TopicPartition("from_java", 0), new OffsetAndMetadata(0));
List<String> topics = new ArrayList<String>();
topics.add("from_java");
consumer.subscribe(topics);
// 指定位置提交某个分区的offsets的值 这会在下一次拉取数据前生效
consumer.commitSync(offsets);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
if (record.partition() == 0) {
System.out.println("partition:" + record.partition() + "offset:" + record.offset() + "value:"
+ record.value());
}
}
}
}
public void exactlyOnceConSumer(){
//1.配置上参数
properties.setProperty("enable.auto.commmit", "false");
//2.订阅主题或者分区
//consumer.subscribe(topics);
//重设offset (offset)的值需要从mysql中获取
//3.从mysql中获取
//4.1 consumer.commitSync(offsets);
//提交到kafka服务器中
//或者使用
//4.2 consumer.seek(new TopicPartition("from-java",0),0);
//来指定要从kafka中高消费数据的初始值位置
//订阅主题或分区
//consumer.subscribe(topics);
//5. poll数据
// recordes =consumer.pool(1000)
//6. 遍历参数值分析计算
//7.计算结束之后使用consumer.committed(new TopicPartition("from-java",1))
//获取当前消费的offset值
//8.把计算结果和offset值 以原子操作(事物)的形式保存到mysql数据库
//9.重新调到第五步循环执行 进行下一次pool和下一次计算
}
public static void main(String[] args) {
MennuCommitConsumer mennuCommitConsumer = new MennuCommitConsumer();
// mennuCommitConsumer.subscirbleTopc();
// mennuCommitConsumer.getOffset();
mennuCommitConsumer.sonsumerAssigned();
mennuCommitConsumer.setCommentOffset();
}
}
package com.ocean.kafka;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.log4j.pattern.PropertiesPatternConverter;
public class ProducerConsumer {
private Properties properties = new Properties();
private KafkaConsumer<String, String> consumer;
public ProducerConsumer() {
properties = new Properties();
properties.put("bootstrap.servers", "master:9092");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("group.id", "java-group");
consumer = new KafkaConsumer<String, String>(properties);
}
public void subscribeTopic() {
List<String> topics = new ArrayList<String>();
topics.add("home-work_pic");
consumer.subscribe(topics);
// 循环从kafka中拉取数据
while (true) {
// 从kafka中拉取数据
ConsumerRecords<String, String> records = consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
System.out.println("接收信息:partition" + record.partition() + "offset:" + record.offset() + "key:"
+ record.key() + "value:" + record.value());
}
}
}
public static void main(String[] args) {
ProducerConsumer producerConsumer = new ProducerConsumer();
producerConsumer.subscribeTopic();
}
}
package com.ocean.kafka;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProducerKafka {
private KafkaProducer<String, String> producer;
private Properties properties;
public ProducerKafka() {
properties=new Properties();
properties.put("bootstrap.servers", "master:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
// properties.put("acks", "all");
// properties.put("retries", 0);
//
producer=new KafkaProducer<String, String>(properties);
}
public void assignPartitionSend(String key,String value){
ProducerRecord<String, String>record =new ProducerRecord<String, String>("from-java", 0,key,value);
producer.send(record);
}
public void sendRecorder(String key,String value){
Logger logger=LoggerFactory.getLogger(ProducerKafka.class);
ProducerRecord<String, String> record =new ProducerRecord<String, String>("from-java", key,value);
producer.send(record);
}
public void getTopicPartitions(String topic){
Logger logger=LoggerFactory.getLogger(ProducerKafka.class);
// ProducerRecord<String, String> record =new ProducerRecord<String, String>("from-java", key,value);
List<PartitionInfo> partitionInfos =producer.partitionsFor(topic);
for (PartitionInfo partitionInfo : partitionInfos) {
System.out.println(partitionInfo);
}
}
public void getMetrics(){
@SuppressWarnings("unchecked")
Map<MetricName, Metric> metrics =(Map<MetricName, Metric>) producer.metrics();
for (MetricName name : metrics.keySet()) {
System.out.println(name.name()+":"+metrics.get(name).value());
}
}
public void sendRecorderWithCallback(String key,String value){
final Logger logger=LoggerFactory.getLogger(ProducerKafka.class);
ProducerRecord<String, String>record =new ProducerRecord<String, String>("from-java",key,value);
Callback callback=new Callback() {
//回掉方法
public void onCompletion(RecordMetadata metadata, Exception exception) {
if(exception==null){
logger.info("存储位置:partition:"+metadata.partition()+",offset:"+metadata.offset()+",ts:"+metadata.timestamp());
}else{
logger.warn("服务端出现异常");
exception.printStackTrace();
}
}
};
producer.send(record,callback);
}
public void close(){
producer.flush();
producer.close();
}
public static void main(String[] args) {
ProducerKafka client =new ProducerKafka();
for(int i=0;i<100;i++){
client.sendRecorderWithCallback("Ckey"+i, "Cvalue"+i);
}
// client.getMetrics();
client.close();
}
}