开发者社区> cloudcoder> 正文
阿里云
为了无法计算的价值
打开APP
阿里云APP内打开

Kafka JAVA客户端代码示例--高级应用

简介:
+关注继续查看

什么时间使用高级应用?

  1. 针对一个消息读取多次
  2. 在一个process中,仅仅处理一个topic中的一组partitions
  3. 使用事务,确保每个消息只被处理一次

使用高级应用(调用较底层函数)的缺点?

    SimpleConsumer需要做很多额外的工作(在以groups方式进行消息处理时不需要)

  1. 在应用程序中跟踪上次消息处理的offset
  2. 确定一个topic partition的lead broker
  3. 手工处理broker leander的改变

使用底层函数(SimpleConsumer)开发的步骤

  1.  通过active broker,确定topic partition的lead broker
  2. 确定topic partition的replicat brokers
  3. 根据需要,创建数据请求
  4. 抓取数据
  5. 识别lead brokder改变并进行恢复

代码示例

import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import kafka.api.FetchRequest;
import kafka.api.FetchRequestBuilder;
import kafka.api.PartitionOffsetRequestInfo;
import kafka.cluster.Broker;
import kafka.common.ErrorMapping;
import kafka.common.TopicAndPartition;
import kafka.javaapi.FetchResponse;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.PartitionMetadata;
import kafka.javaapi.TopicMetadata;
import kafka.javaapi.TopicMetadataRequest;
import kafka.javaapi.TopicMetadataResponse;
import kafka.javaapi.consumer.SimpleConsumer;
import kafka.message.MessageAndOffset;

/**
 * https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example
 * @author Fung
 *
 */
public class ConsumerSimpleExample {
	public static void main(String arg[]) {
		String[] args={"20","page_visits","2","172.168.63.233","9092"};
		ConsumerSimpleExample example = new ConsumerSimpleExample();
		long maxReads = Long.parseLong(args[0]);
		String topic = args[1];
		int partition = Integer.parseInt(args[2]);
		List<String> seeds = new ArrayList<String>();
		seeds.add(args[3]);
		int port = Integer.parseInt(args[4]);
		try {
			example.run(maxReads, topic, partition, seeds, port);
		} catch (Exception e) {
			System.out.println("Oops:" + e);
			e.printStackTrace();
		}
	}

	private List<String> m_replicaBrokers = new ArrayList<String>();

	public ConsumerSimpleExample() {
		m_replicaBrokers = new ArrayList<String>();
	}

	public void run(long a_maxReads, String a_topic, int a_partition,
			List<String> a_seedBrokers, int a_port) throws Exception {
		// find the meta data about the topic and partition we are interested in
		//
		PartitionMetadata metadata = findLeader(a_seedBrokers, a_port, a_topic,
				a_partition);
		if (metadata == null) {
			System.out
					.println("Can't find metadata for Topic and Partition. Exiting");
			return;
		}
		if (metadata.leader() == null) {
			System.out
					.println("Can't find Leader for Topic and Partition. Exiting");
			return;
		}
		String leadBroker = metadata.leader().host();
		String clientName = "Client_" + a_topic + "_" + a_partition;

		SimpleConsumer consumer = new SimpleConsumer(leadBroker, a_port,
				100000, 64 * 1024, clientName);
		long readOffset = getLastOffset(consumer, a_topic, a_partition,
				kafka.api.OffsetRequest.LatestTime(), clientName);

		int numErrors = 0;
		while (a_maxReads > 0) {
			if (consumer == null) {
				consumer = new SimpleConsumer(leadBroker, a_port, 100000,
						64 * 1024, clientName);
			}
			// Note: this fetchSize of 100000 might need to be increased if
			// large batches are written to Kafka
			FetchRequest req = new FetchRequestBuilder().clientId(clientName)
					.addFetch(a_topic, a_partition, readOffset, 100000).build();
			FetchResponse fetchResponse = consumer.fetch(req);

			if (fetchResponse.hasError()) {
				numErrors++;
				// Something went wrong!
				short code = fetchResponse.errorCode(a_topic, a_partition);
				System.out.println("Error fetching data from the Broker:"
						+ leadBroker + " Reason: " + code);
				if (numErrors > 5)
					break;
				if (code == ErrorMapping.OffsetOutOfRangeCode()) {
					// We asked for an invalid offset. For simple case ask for
					// the last element to reset
					readOffset = getLastOffset(consumer, a_topic, a_partition,
							kafka.api.OffsetRequest.LatestTime(), clientName);
					continue;
				}
				consumer.close();
				consumer = null;
				leadBroker = findNewLeader(leadBroker, a_topic, a_partition,
						a_port);
				continue;
			}
			numErrors = 0;

			long numRead = 0;
			for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(
					a_topic, a_partition)) {
				long currentOffset = messageAndOffset.offset();
				if (currentOffset < readOffset) {
					System.out.println("Found an old offset: " + currentOffset
							+ " Expecting: " + readOffset);
					continue;
				}
				readOffset = messageAndOffset.nextOffset();
				ByteBuffer payload = messageAndOffset.message().payload();
				byte[] bytes = new byte[payload.limit()];
				payload.get(bytes);
				System.out.println(String.valueOf(messageAndOffset.offset())
						+ ": " + new String(bytes, "UTF-8"));
				numRead++;
				a_maxReads--;
			}

			if (numRead == 0) {
				try {
					Thread.sleep(1000);
				} catch (InterruptedException ie) {
				}
			}
		}
		if (consumer != null)
			consumer.close();
	}

	public static long getLastOffset(SimpleConsumer consumer, String topic,
			int partition, long whichTime, String clientName) {
		TopicAndPartition topicAndPartition = new TopicAndPartition(topic,
				partition);
		Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
		requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(
				whichTime, 1));
		kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(
				requestInfo, kafka.api.OffsetRequest.CurrentVersion(),
				clientName);
		OffsetResponse response = consumer.getOffsetsBefore(request);

		if (response.hasError()) {
			System.out
					.println("Error fetching data Offset Data the Broker. Reason: "
							+ response.errorCode(topic, partition));
			return 0;
		}
		long[] offsets = response.offsets(topic, partition);
		return offsets[0];
	}

	private String findNewLeader(String a_oldLeader, String a_topic,
			int a_partition, int a_port) throws Exception {
		for (int i = 0; i < 3; i++) {
			boolean goToSleep = false;
			PartitionMetadata metadata = findLeader(m_replicaBrokers, a_port,
					a_topic, a_partition);
			if (metadata == null) {
				goToSleep = true;
			} else if (metadata.leader() == null) {
				goToSleep = true;
			} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host())
					&& i == 0) {
				// first time through if the leader hasn't changed give
				// ZooKeeper a second to recover
				// second time, assume the broker did recover before failover,
				// or it was a non-Broker issue
				//
				goToSleep = true;
			} else {
				return metadata.leader().host();
			}
			if (goToSleep) {
				try {
					Thread.sleep(1000);
				} catch (InterruptedException ie) {
				}
			}
		}
		System.out
				.println("Unable to find new leader after Broker failure. Exiting");
		throw new Exception(
				"Unable to find new leader after Broker failure. Exiting");
	}

	private PartitionMetadata findLeader(List<String> a_seedBrokers,
			int a_port, String a_topic, int a_partition) {
		PartitionMetadata returnMetaData = null;
		loop: for (String seed : a_seedBrokers) {
			SimpleConsumer consumer = null;
			try {
				consumer = new SimpleConsumer(seed, a_port, 100000, 64 * 1024,
						"leaderLookup");
				List<String> topics = Collections.singletonList(a_topic);
				TopicMetadataRequest req = new TopicMetadataRequest(topics);
				TopicMetadataResponse resp = consumer.send(req);

				List<TopicMetadata> metaData = resp.topicsMetadata();
				for (TopicMetadata item : metaData) {
					for (PartitionMetadata part : item.partitionsMetadata()) {
						if (part.partitionId() == a_partition) {
							returnMetaData = part;
							break loop;
						}
					}
				}
			} catch (Exception e) {
				System.out.println("Error communicating with Broker [" + seed
						+ "] to find Leader for [" + a_topic + ", "
						+ a_partition + "] Reason: " + e);
			} finally {
				if (consumer != null)
					consumer.close();
			}
		}
		if (returnMetaData != null) {
			m_replicaBrokers.clear();
			for (Broker replica : returnMetaData.replicas()) {
				m_replicaBrokers.add(replica.host());
			}
		}
		return returnMetaData;
	}
}

参考

https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+SimpleConsumer+Example

版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。

相关文章
Kafka 3.0重磅发布,弃用 Java 8 的支持!
Kafka 3.0重磅发布,弃用 Java 8 的支持!
473 0
java实现网页验证码
Servlet: package cn.bdqn.servlet; import javax.imageio.ImageIO; import javax.servlet.ServletException; import javax.
660 0
JAVA多线程实现的三种方式
JAVA多线程实现   JAVA多线程实现方式主要有三种:继承Thread类、实现Runnable接口、使用ExecutorService、Callable、Future实现有返回结果的多线程。
1278 0
微博爬虫“免登录”技巧详解及 Java 实现(业余草的博客)
版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/xmt1139057136/article/details/78110007 一、微博一定要登录才能抓取?目前,对于微博的爬虫,大部分是基于模拟微博账号登录的方式实现的,这种方式如果真的运营起来,实际上是一件非常头疼痛苦的事,你可能每天都过得提心吊胆,生怕新浪爸爸把你的那些账号给封了,而且现在随着实名制的落地,获得账号的渠道估计也会变得越来越少。
1060 0
TCP与UDP通信协议及Java实现
概述 TCP (Transmission Control Protocol):传输控制协议 UDP(User Datagram Protocol):用户数据报协议 TCP 与 UDP 都是 运输层(Transport Layer)上的因特网协议,运输层协议的功能就是为运行在不同主机上的应用进程之间提供 逻辑通信 ,使得运行不同进程的主机即使分隔于地球两侧,也能像是直接相连一样。
2190 0
+关注
cloudcoder
热衷于大数据处理技术研究、使用 关注中间件技术
文章
问答
文章排行榜
最热
最新
相关电子书
更多
The state of SQL-on-Hadoop in the Cloud
立即下载
Security & Governance using Apache Ranger & Apache Atlas
立即下载
JAVA 应用排查全景图
立即下载