Kafka JAVA客户端代码示例--高级应用

简介:

什么时间使用高级应用?

  1. 针对一个消息读取多次
  2. 在一个process中,仅仅处理一个topic中的一组partitions
  3. 使用事务,确保每个消息只被处理一次

使用高级应用(调用较底层函数)的缺点?

    SimpleConsumer需要做很多额外的工作(在以groups方式进行消息处理时不需要)

  1. 在应用程序中跟踪上次消息处理的offset
  2. 确定一个topic partition的lead broker
  3. 手工处理broker leander的改变

使用底层函数(SimpleConsumer)开发的步骤

  1.  通过active broker,确定topic partition的lead broker
  2. 确定topic partition的replicat brokers
  3. 根据需要,创建数据请求
  4. 抓取数据
  5. 识别lead brokder改变并进行恢复

代码示例

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

/**
 * https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
 * @author Fung
 *
 */
public class ConsumerSimpleExample {
	public static void main(String arg[]) {
		String[] args={"20","page_visits","2","172.168.63.233","9092"};
		ConsumerSimpleExample example = new ConsumerSimpleExample();
		long maxReads = Long.parseLong(args[0]);
		String topic = args[1];
		int partition = Integer.parseInt(args[2]);
		List<String> seeds = new ArrayList<String>();
		seeds.add(args[3]);
		int port = Integer.parseInt(args[4]);
		try {
			example.run(maxReads, topic, partition, seeds, port);
		} catch (Exception e) {
			System.out.println("Oops:" + e);
			e.printStackTrace();
		}
	}

	private List<String> m_replicaBrokers = new ArrayList<String>();

	public ConsumerSimpleExample() {
		m_replicaBrokers = new ArrayList<String>();
	}

	public void run(long a_maxReads, String a_topic, int a_partition,
			List<String> a_seedBrokers, int a_port) throws Exception {
		// find the meta data about the topic and partition we are interested in
		//
		PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic,
				a_partition);
		if (metadata == null) {
			System.out
					.println("Can't find metadata for Topic and Partition. Exiting");
			return;
		}
		if (metadata.leader() == null) {
			System.out
					.println("Can't find Leader for Topic and Partition. Exiting");
			return;
		}
		String leadBroker = metadata.leader().host();
		String clientName = "Client_" + a_topic + "_" + a_partition;

		SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port,
				100000, 64 * 1024, clientName);
		long readOffset = getLastOffset(consumer, a_topic, a_partition,
				kafka.api.OffsetRequest.LatestTime(), clientName);

		int numErrors = 0;
		while (a_maxReads > 0) {
			if (consumer == null) {
				consumer = new SimpleConsumer(leadBroker, a_port, 100000,
						64 * 1024, clientName);
			}
			// Note: this fetchSize of 100000 might need to be increased if
			// large batches are written to Kafka
			FetchRequest req = new FetchRequestBuilder().clientId(clientName)
					.addFetch(a_topic, a_partition, readOffset, 100000).build();
			FetchResponse fetchResponse = consumer.fetch(req);

			if (fetchResponse.hasError()) {
				numErrors++;
				// Something went wrong!
				short code = fetchResponse.errorCode(a_topic, a_partition);
				System.out.println("Error fetching data from the Broker:"
						+ leadBroker + " Reason: " + code);
				if (numErrors > 5)
					break;
				if (code == ErrorMapping.OffsetOutOfRangeCode()) {
					// We asked for an invalid offset. For simple case ask for
					// the last element to reset
					readOffset = getLastOffset(consumer, a_topic, a_partition,
							kafka.api.OffsetRequest.LatestTime(), clientName);
					continue;
				}
				consumer.close();
				consumer = null;
				leadBroker = findNewLeader(leadBroker, a_topic, a_partition,
						a_port);
				continue;
			}
			numErrors = 0;

			long numRead = 0;
			for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(
					a_topic, a_partition)) {
				long currentOffset = messageAndOffset.offset();
				if (currentOffset < readOffset) {
					System.out.println("Found an old offset: " + currentOffset
							+ " Expecting: " + readOffset);
					continue;
				}
				readOffset = messageAndOffset.nextOffset();
				ByteBuffer payload = messageAndOffset.message().payload();
				byte[] bytes = new byte[payload.limit()];
				payload.get(bytes);
				System.out.println(String.valueOf(messageAndOffset.offset())
						+ ": " + new String(bytes, "UTF-8"));
				numRead++;
				a_maxReads--;
			}

			if (numRead == 0) {
				try {
					Thread.sleep(1000);
				} catch (InterruptedException ie) {
				}
			}
		}
		if (consumer != null)
			consumer.close();
	}

	public static long getLastOffset(SimpleConsumer consumer, String topic,
			int partition, long whichTime, String clientName) {
		TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
				partition);
		Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
		requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
				whichTime, 1));
		kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
				requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
				clientName);
		OffsetResponse response = consumer.getOffsetsBefore(request);

		if (response.hasError()) {
			System.out
					.println("Error fetching data Offset Data the Broker. Reason: "
							+ response.errorCode(topic, partition));
			return 0;
		}
		long[] offsets = response.offsets(topic, partition);
		return offsets[0];
	}

	private String findNewLeader(String a_oldLeader, String a_topic,
			int a_partition, int a_port) throws Exception {
		for (int i = 0; i < 3; i++) {
			boolean goToSleep = false;
			PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port,
					a_topic, a_partition);
			if (metadata == null) {
				goToSleep = true;
			} else if (metadata.leader() == null) {
				goToSleep = true;
			} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host())
					&& i == 0) {
				// first time through if the leader hasn't changed give
				// ZooKeeper a second to recover
				// second time, assume the broker did recover before failover,
				// or it was a non-Broker issue
				//
				goToSleep = true;
			} else {
				return metadata.leader().host();
			}
			if (goToSleep) {
				try {
					Thread.sleep(1000);
				} catch (InterruptedException ie) {
				}
			}
		}
		System.out
				.println("Unable to find new leader after Broker failure. Exiting");
		throw new Exception(
				"Unable to find new leader after Broker failure. Exiting");
	}

	private PartitionMetadata findLeader(List<String> a_seedBrokers,
			int a_port, String a_topic, int a_partition) {
		PartitionMetadata returnMetaData = null;
		loop: for (String seed : a_seedBrokers) {
			SimpleConsumer consumer = null;
			try {
				consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024,
						"leaderLookup");
				List<String> topics = Collections.singletonList(a_topic);
				TopicMetadataRequest req = new TopicMetadataRequest(topics);
				TopicMetadataResponse resp = consumer.send(req);

				List<TopicMetadata> metaData = resp.topicsMetadata();
				for (TopicMetadata item : metaData) {
					for (PartitionMetadata part : item.partitionsMetadata()) {
						if (part.partitionId() == a_partition) {
							returnMetaData = part;
							break loop;
						}
					}
				}
			} catch (Exception e) {
				System.out.println("Error communicating with Broker [" + seed
						+ "] to find Leader for [" + a_topic + ", "
						+ a_partition + "] Reason: " + e);
			} finally {
				if (consumer != null)
					consumer.close();
			}
		}
		if (returnMetaData != null) {
			m_replicaBrokers.clear();
			for (Broker replica : returnMetaData.replicas()) {
				m_replicaBrokers.add(replica.host());
			}
		}
		return returnMetaData;
	}
}

参考

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

目录
相关文章
|
2月前
|
人工智能 安全 Java
Java和Python在企业中的应用情况
Java和Python在企业中的应用情况
69 7
|
2月前
|
JSON Java Apache
非常实用的Http应用框架,杜绝Java Http 接口对接繁琐编程
UniHttp 是一个声明式的 HTTP 接口对接框架,帮助开发者快速对接第三方 HTTP 接口。通过 @HttpApi 注解定义接口,使用 @GetHttpInterface 和 @PostHttpInterface 等注解配置请求方法和参数。支持自定义代理逻辑、全局请求参数、错误处理和连接池配置,提高代码的内聚性和可读性。
181 3
|
15天前
|
安全 算法 Java
Java CAS原理和应用场景大揭秘:你掌握了吗?
CAS(Compare and Swap)是一种乐观锁机制,通过硬件指令实现原子操作,确保多线程环境下对共享变量的安全访问。它避免了传统互斥锁的性能开销和线程阻塞问题。CAS操作包含三个步骤:获取期望值、比较当前值与期望值是否相等、若相等则更新为新值。CAS广泛应用于高并发场景,如数据库事务、分布式锁、无锁数据结构等,但需注意ABA问题。Java中常用`java.util.concurrent.atomic`包下的类支持CAS操作。
46 2
|
2月前
|
缓存 Java 开发者
Java多线程并发编程:同步机制与实践应用
本文深入探讨Java多线程中的同步机制,分析了多线程并发带来的数据不一致等问题,详细介绍了`synchronized`关键字、`ReentrantLock`显式锁及`ReentrantReadWriteLock`读写锁的应用,结合代码示例展示了如何有效解决竞态条件,提升程序性能与稳定性。
177 6
|
1月前
|
监控 Java 数据库连接
Java线程管理:守护线程与用户线程的区分与应用
在Java多线程编程中,线程可以分为守护线程(Daemon Thread)和用户线程(User Thread)。这两种线程在行为和用途上有着明显的区别,了解它们的差异对于编写高效、稳定的并发程序至关重要。
40 2
|
2月前
|
消息中间件 Java Kafka
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
Spring Boot 与 Apache Kafka 集成详解:构建高效消息驱动应用
59 1
|
2月前
|
安全 Java 开发者
Java 多线程并发控制:深入理解与实战应用
《Java多线程并发控制:深入理解与实战应用》一书详细解析了Java多线程编程的核心概念、并发控制技术及其实战技巧,适合Java开发者深入学习和实践参考。
70 6
|
2月前
|
关系型数据库 MySQL Java
MySQL索引优化与Java应用实践
【11月更文挑战第25天】在大数据量和高并发的业务场景下,MySQL数据库的索引优化是提升查询性能的关键。本文将深入探讨MySQL索引的多种类型、优化策略及其在Java应用中的实践,通过历史背景、业务场景、底层原理的介绍,并结合Java示例代码,帮助Java架构师更好地理解并应用这些技术。
60 2
|
2月前
|
存储 安全 Java
Java多线程编程中的并发容器:深入解析与实战应用####
在本文中,我们将探讨Java多线程编程中的一个核心话题——并发容器。不同于传统单一线程环境下的数据结构,并发容器专为多线程场景设计,确保数据访问的线程安全性和高效性。我们将从基础概念出发,逐步深入到`java.util.concurrent`包下的核心并发容器实现,如`ConcurrentHashMap`、`CopyOnWriteArrayList`以及`BlockingQueue`等,通过实例代码演示其使用方法,并分析它们背后的设计原理与适用场景。无论你是Java并发编程的初学者还是希望深化理解的开发者,本文都将为你提供有价值的见解与实践指导。 --- ####
|
2月前
|
Java 测试技术 API
Java 反射机制:深入解析与应用实践
《Java反射机制:深入解析与应用实践》全面解析Java反射API,探讨其内部运作原理、应用场景及最佳实践,帮助开发者掌握利用反射增强程序灵活性与可扩展性的技巧。
130 4