Kafka JAVA客户端代码示例--高级应用

简介:

什么时间使用高级应用?

  1. 针对一个消息读取多次
  2. 在一个process中,仅仅处理一个topic中的一组partitions
  3. 使用事务,确保每个消息只被处理一次

使用高级应用(调用较底层函数)的缺点?

    SimpleConsumer需要做很多额外的工作(在以groups方式进行消息处理时不需要)

  1. 在应用程序中跟踪上次消息处理的offset
  2. 确定一个topic partition的lead broker
  3. 手工处理broker leander的改变

使用底层函数(SimpleConsumer)开发的步骤

  1.  通过active broker,确定topic partition的lead broker
  2. 确定topic partition的replicat brokers
  3. 根据需要,创建数据请求
  4. 抓取数据
  5. 识别lead brokder改变并进行恢复

代码示例

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

/**
 * https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
 * @author Fung
 *
 */
public class ConsumerSimpleExample {
	public static void main(String arg[]) {
		String[] args={"20","page_visits","2","172.168.63.233","9092"};
		ConsumerSimpleExample example = new ConsumerSimpleExample();
		long maxReads = Long.parseLong(args[0]);
		String topic = args[1];
		int partition = Integer.parseInt(args[2]);
		List<String> seeds = new ArrayList<String>();
		seeds.add(args[3]);
		int port = Integer.parseInt(args[4]);
		try {
			example.run(maxReads, topic, partition, seeds, port);
		} catch (Exception e) {
			System.out.println("Oops:" + e);
			e.printStackTrace();
		}
	}

	private List<String> m_replicaBrokers = new ArrayList<String>();

	public ConsumerSimpleExample() {
		m_replicaBrokers = new ArrayList<String>();
	}

	public void run(long a_maxReads, String a_topic, int a_partition,
			List<String> a_seedBrokers, int a_port) throws Exception {
		// find the meta data about the topic and partition we are interested in
		//
		PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic,
				a_partition);
		if (metadata == null) {
			System.out
					.println("Can't find metadata for Topic and Partition. Exiting");
			return;
		}
		if (metadata.leader() == null) {
			System.out
					.println("Can't find Leader for Topic and Partition. Exiting");
			return;
		}
		String leadBroker = metadata.leader().host();
		String clientName = "Client_" + a_topic + "_" + a_partition;

		SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port,
				100000, 64 * 1024, clientName);
		long readOffset = getLastOffset(consumer, a_topic, a_partition,
				kafka.api.OffsetRequest.LatestTime(), clientName);

		int numErrors = 0;
		while (a_maxReads > 0) {
			if (consumer == null) {
				consumer = new SimpleConsumer(leadBroker, a_port, 100000,
						64 * 1024, clientName);
			}
			// Note: this fetchSize of 100000 might need to be increased if
			// large batches are written to Kafka
			FetchRequest req = new FetchRequestBuilder().clientId(clientName)
					.addFetch(a_topic, a_partition, readOffset, 100000).build();
			FetchResponse fetchResponse = consumer.fetch(req);

			if (fetchResponse.hasError()) {
				numErrors++;
				// Something went wrong!
				short code = fetchResponse.errorCode(a_topic, a_partition);
				System.out.println("Error fetching data from the Broker:"
						+ leadBroker + " Reason: " + code);
				if (numErrors > 5)
					break;
				if (code == ErrorMapping.OffsetOutOfRangeCode()) {
					// We asked for an invalid offset. For simple case ask for
					// the last element to reset
					readOffset = getLastOffset(consumer, a_topic, a_partition,
							kafka.api.OffsetRequest.LatestTime(), clientName);
					continue;
				}
				consumer.close();
				consumer = null;
				leadBroker = findNewLeader(leadBroker, a_topic, a_partition,
						a_port);
				continue;
			}
			numErrors = 0;

			long numRead = 0;
			for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(
					a_topic, a_partition)) {
				long currentOffset = messageAndOffset.offset();
				if (currentOffset < readOffset) {
					System.out.println("Found an old offset: " + currentOffset
							+ " Expecting: " + readOffset);
					continue;
				}
				readOffset = messageAndOffset.nextOffset();
				ByteBuffer payload = messageAndOffset.message().payload();
				byte[] bytes = new byte[payload.limit()];
				payload.get(bytes);
				System.out.println(String.valueOf(messageAndOffset.offset())
						+ ": " + new String(bytes, "UTF-8"));
				numRead++;
				a_maxReads--;
			}

			if (numRead == 0) {
				try {
					Thread.sleep(1000);
				} catch (InterruptedException ie) {
				}
			}
		}
		if (consumer != null)
			consumer.close();
	}

	public static long getLastOffset(SimpleConsumer consumer, String topic,
			int partition, long whichTime, String clientName) {
		TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
				partition);
		Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
		requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
				whichTime, 1));
		kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
				requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
				clientName);
		OffsetResponse response = consumer.getOffsetsBefore(request);

		if (response.hasError()) {
			System.out
					.println("Error fetching data Offset Data the Broker. Reason: "
							+ response.errorCode(topic, partition));
			return 0;
		}
		long[] offsets = response.offsets(topic, partition);
		return offsets[0];
	}

	private String findNewLeader(String a_oldLeader, String a_topic,
			int a_partition, int a_port) throws Exception {
		for (int i = 0; i < 3; i++) {
			boolean goToSleep = false;
			PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port,
					a_topic, a_partition);
			if (metadata == null) {
				goToSleep = true;
			} else if (metadata.leader() == null) {
				goToSleep = true;
			} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host())
					&& i == 0) {
				// first time through if the leader hasn't changed give
				// ZooKeeper a second to recover
				// second time, assume the broker did recover before failover,
				// or it was a non-Broker issue
				//
				goToSleep = true;
			} else {
				return metadata.leader().host();
			}
			if (goToSleep) {
				try {
					Thread.sleep(1000);
				} catch (InterruptedException ie) {
				}
			}
		}
		System.out
				.println("Unable to find new leader after Broker failure. Exiting");
		throw new Exception(
				"Unable to find new leader after Broker failure. Exiting");
	}

	private PartitionMetadata findLeader(List<String> a_seedBrokers,
			int a_port, String a_topic, int a_partition) {
		PartitionMetadata returnMetaData = null;
		loop: for (String seed : a_seedBrokers) {
			SimpleConsumer consumer = null;
			try {
				consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024,
						"leaderLookup");
				List<String> topics = Collections.singletonList(a_topic);
				TopicMetadataRequest req = new TopicMetadataRequest(topics);
				TopicMetadataResponse resp = consumer.send(req);

				List<TopicMetadata> metaData = resp.topicsMetadata();
				for (TopicMetadata item : metaData) {
					for (PartitionMetadata part : item.partitionsMetadata()) {
						if (part.partitionId() == a_partition) {
							returnMetaData = part;
							break loop;
						}
					}
				}
			} catch (Exception e) {
				System.out.println("Error communicating with Broker [" + seed
						+ "] to find Leader for [" + a_topic + ", "
						+ a_partition + "] Reason: " + e);
			} finally {
				if (consumer != null)
					consumer.close();
			}
		}
		if (returnMetaData != null) {
			m_replicaBrokers.clear();
			for (Broker replica : returnMetaData.replicas()) {
				m_replicaBrokers.add(replica.host());
			}
		}
		return returnMetaData;
	}
}

参考

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

目录
相关文章
|
15天前
|
移动开发 Java Android开发
构建高效Android应用:探究Kotlin与Java的性能差异
【4月更文挑战第3天】在移动开发领域,性能优化一直是开发者关注的焦点。随着Kotlin的兴起,其在Android开发中的地位逐渐上升,但关于其与Java在性能方面的对比,尚无明确共识。本文通过深入分析并结合实际测试数据,探讨了Kotlin与Java在Android平台上的性能表现,揭示了在不同场景下两者的差异及其对应用性能的潜在影响,为开发者在选择编程语言时提供参考依据。
|
16天前
|
缓存 算法 Java
Java内存管理与调优:释放应用潜能的关键
【4月更文挑战第2天】Java内存管理关乎性能与稳定性。理解JVM内存结构,如堆和栈,是优化基础。内存泄漏是常见问题,需谨慎管理对象生命周期,并使用工具如VisualVM检测。有效字符串处理、选择合适数据结构和算法能提升效率。垃圾回收自动回收内存,但策略调整影响性能,如选择不同类型的垃圾回收器。其他优化包括调整堆大小、使用对象池和缓存。掌握这些技巧,开发者能优化应用,提升系统性能。
|
15天前
|
Java
深入理解Java并发编程:线程池的应用与优化
【4月更文挑战第3天】 在Java并发编程中,线程池是一种重要的资源管理工具,它能有效地控制和管理线程的数量,提高系统性能。本文将深入探讨Java线程池的工作原理、应用场景以及优化策略,帮助读者更好地理解和应用线程池。
|
22天前
|
Java 编译器 Android开发
构建高效Android应用:探究Kotlin与Java的性能差异
在开发高性能的Android应用时,选择合适的编程语言至关重要。近年来,Kotlin因其简洁性和功能性受到开发者的青睐,但其性能是否与传统的Java相比有所不足?本文通过对比分析Kotlin与Java在Android平台上的运行效率,揭示二者在编译速度、运行时性能及资源消耗方面的具体差异,并探讨在实际项目中如何做出最佳选择。
17 4
|
24天前
|
数据采集 分布式计算 大数据
Java语言在大数据处理中的应用
传统的大数据处理往往依赖于庞大的数据中心和高性能的服务器,然而随着大数据时代的到来,Java作为一种强大的编程语言正在被广泛应用于大数据处理领域。本文将探讨Java语言在大数据处理中的优势和应用,以及其在分布式计算、数据处理和系统集成等方面的重要作用。
|
2天前
|
消息中间件 存储 Java
深度探索:使用Apache Kafka构建高效Java消息队列处理系统
【4月更文挑战第17天】本文介绍了在Java环境下使用Apache Kafka进行消息队列处理的方法。Kafka是一个分布式流处理平台,采用发布/订阅模型,支持高效的消息生产和消费。文章详细讲解了Kafka的核心概念,包括主题、生产者和消费者,以及消息的存储和消费流程。此外,还展示了Java代码示例,说明如何创建生产者和消费者。最后,讨论了在高并发场景下的优化策略,如分区、消息压缩和批处理。通过理解和应用这些策略,可以构建高性能的消息系统。
|
6天前
|
Java
探秘jstack:解决Java应用线程问题的利器
探秘jstack:解决Java应用线程问题的利器
14 1
探秘jstack:解决Java应用线程问题的利器
|
10天前
|
XML JSON JavaScript
Java中XML和JSON的比较与应用指南
本文对比了Java中XML和JSON的使用,XML以自我描述性和可扩展性著称,适合结构复杂、需验证的场景,但语法冗长。JSON结构简洁,适用于轻量级数据交换,但不支持命名空间。在Java中,处理XML可使用DOM、SAX解析器或XPath,而JSON可借助GSON、Jackson库。根据需求选择合适格式,注意安全、性能和可读性。
23 0
|
15天前
|
XML JSON JavaScript
使用JSON和XML:数据交换格式在Java Web开发中的应用
【4月更文挑战第3天】本文比较了JSON和XML在Java Web开发中的应用。JSON是一种轻量级、易读的数据交换格式,适合快速解析和节省空间,常用于API和Web服务。XML则提供更强的灵活性和数据描述能力,适合复杂数据结构。Java有Jackson和Gson等库处理JSON,JAXB和DOM/SAX处理XML。选择格式需根据应用场景和需求。
|
16天前
|
安全 Java 容器
Java并发编程:实现高效、线程安全的多线程应用
综上所述,Java并发编程需要注意线程安全、可见性、性能等方面的问题。合理使用线程池、同步机制、并发容器等工具,可以实现高效且线程安全的多线程应用。
14 1