Kafka的安装和部署相对简单,一般情况下是不需要对Kafka进行mock测试,由于官方对此没有做过多介绍,所以和大家分享一下。
Maven依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>${kafka.version}</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
<classifier>test</classifier>
<scope>test</scope>
</dependency>
Mock测试示例
KafkaMockTest.java
public class KafkaMockTest {
private int brokerId = 0;
private String topic = "QUEUE.TEST";
private String zkConnect;
private EmbeddedZookeeper zkServer;
private ZkClient zkClient;
private KafkaServer kafkaServer;
private int port = 9999;
private Properties kafkaProps;
@Before
public void before() {
zkServer = new EmbeddedZookeeper();
zkConnect = String.format("localhost:%d", zkServer.port());
ZkUtils zkUtils = ZkUtils.apply(zkConnect, 30000, 30000,
JaasUtils.isZkSecurityEnabled());
zkClient = zkUtils.zkClient();
Time mock = new SystemTime();
final Option<File> noFile = scala.Option.apply(null);
final Option<SecurityProtocol> noInterBrokerSecurityProtocol = scala.Option.apply(null);
final Option<Properties> noPropertiesOption = scala.Option.apply(null);
final Option<String> noStringOption = scala.Option.apply(null);
kafkaProps = TestUtils.createBrokerConfig(brokerId, zkConnect, false,
false, port, noInterBrokerSecurityProtocol, noFile, noPropertiesOption, true,
false, TestUtils.RandomPort(), false, TestUtils.RandomPort(),
false, TestUtils.RandomPort(), noStringOption, TestUtils.RandomPort());
kafkaProps.setProperty("auto.create.topics.enable", "true");
kafkaProps.setProperty("num.partitions", "1");
// We *must* override this to use the port we allocated (Kafka currently
// allocates one port
// that it always uses for ZK
kafkaProps.setProperty("zookeeper.connect", this.zkConnect);
kafkaProps.setProperty("host.name", "localhost");
kafkaProps.setProperty("port", port + "");
KafkaConfig config = new KafkaConfig(kafkaProps);
kafkaServer = TestUtils.createServer(config, mock);
// create topic
TopicCommand.TopicCommandOptions options = new TopicCommand.TopicCommandOptions(
new String[]{"--create", "--topic", topic,
"--replication-factor", "1", "--partitions", "1"});
TopicCommand.createTopic(zkUtils, options);
List<KafkaServer> servers = new ArrayList<KafkaServer>();
servers.add(kafkaServer);
TestUtils.waitUntilMetadataIsPropagated(
scala.collection.JavaConversions.asScalaBuffer(servers), topic,
0, 5000);
}
@After
public void after() {
try {
kafkaServer.shutdown();
zkClient.close();
zkServer.shutdown();
} catch (Exception e) {
}
}
@Test
public void test() throws Exception {
// bootstrap.servers=localhost:9999
}
}