Step By Step
创建Datahub Project&Topic
pom.xml
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.10.2.2</version>
</dependency>
Code Sample
import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
public class KafkaProducerDemo {
public static void main(String args[]) {
Properties properties = new Properties();
properties.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required\n" +
"username=\"LTAIOZZg********\"\n" +
"password=\"v7CjUJCMk7j9aK****************\";");
properties.put("bootstrap.servers", "dh-cn-hangzhou.aliyuncs.com:9092");
properties.put("security.protocol", "SASL_SSL");
properties.put("sasl.mechanism", "PLAIN");
properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
properties.put("compression.type", "lz4");
String KafkaTopicName = "kafka_project.kafka_topic";
Producer<String, String> producer = new KafkaProducer<String, String>(properties);
try {
ProducerRecord<String, String> record = new ProducerRecord<>(KafkaTopicName, 0, "key_demo", "Hello DataHub!");
// sync send
for (int i = 0; i < 10; i++) {
producer.send(record).get();
System.out.println(i);
}
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
producer.close();
}
}
}
分区抽样查看数据