一、环境
1.centos7.6
2.zookeeper-3.4.5
3.kafka_2.11-0.10.2.1
4.jdk1.8_261
二、单机-ECS-zookeeper
tar -zxvf zookeeper-3.4.5 -C /root/apps/
cp zoo_simple.cfg zoo.cfg
提前创建好数据和目录日志文件夹
dataDir=/root/data/zookeeper
dataLogDir=/root/data/zookeeperlog
三、单机-ECS-kafka
3.1 安装
tar -zxvf kafka_2.11-0.10.2.1 -C /root/apps/
3.2 修改server.properties
broker.id=0 delete.topic.enable=true listeners=PLAINTEXT://localhost:9092 advertised.listeners=PLAINTEXT://localhost:9092 host.name=172.17.81.232 # 阿里云内网地址 advertised.host.name=47.94.39.202 # 阿里云外网地址 zookeeper.connect=localhost:2181 #zookeeper地址
3.3 启动本机测试
# 启动 bin/kafka-server-start.sh -daemon config/server.properties # 创建topic bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic hanyaoyao # 开启生产者 bin/kafka-console-producer.sh --broker-list localhost:9092 --topic hanyaoyao # 开启消费者 bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic hanyaoyao --from-beginning
四、VMware单机
4.1 zookeeper单机
略
五、VMware集群
hadoop1 192.168.52.200 hadoop2 192.168.52.201 hadoop3 192.168.52.202
5.1 zookeeper集群
1.解压安装 tar -zxvf zookeeper-3.4.5 -C /root/apps/ 2.进入conf目录 cp zoo_simple.cfg zoo.cfg 3.数据目录【提前创建好集群的三个目录】 dataDir=/root/zkdata
4.集群配置
server.1=192.168.52.201:2888:3888 server.2=192.168.52.202:2888:3888 server.3=192.168.52.200:2888:3888
5.集群分发
scp -r zookeeper/ hadoop2:$PWD scp -r zookeeper/ hadoop3:$PWD
6.逐台启动
bin/zkStart.sh start
7.查看状态
bin/zkStart status
5.2 kafka集群
1.tar -zxvf kafka_2.11-0.10.2.1 -C /root/apps/
2.修改配置文件如下:
broker.id=0 delete.topic.enable=true listeners=PLAINTEXT://hadoop1:9092 host.name=hadoop1 log.dirs=/root/data/kafka zookeeper.connect=hadoop1:2181,hadoop2:2181,hadoop3:2181
3.分发集群
scp kafka_2.11-0.10.2.1/ hadoop2:$PWD scp kafka_2.11-0.10.2.1/ hadoop3:$PWD
4.修改hadoop2,hadoop3的集群编号
vi server.properties
broker.id=1 broker.id=2
5.逐台启动测试
1.启动 bin/kafka-server-start.sh -daemon config/server.properties bin/kafka-server-start.sh -daemon config/server.properties 2..创建topic bin/kafka-topics.sh --create --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --replication-factor 2 --partitions 2 --topic superman # Created topic "superman". 3.创建生产者 bin/kafka-console-producer.sh --broker-list hadoop1:9092,hadoop2:9092,hadoop3:9092 --topic superman 4.创建消费者 bin/kafka-console-consumer.sh --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181 --topic superman --from-beginning
六、Flink-Kafka
6.1 pom
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.9.1</version> <!-- provided在这表示此依赖只在代码编译的时候使用,运行和打包的时候不使用 -- <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.11</artifactId> <version>1.9.1</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.9.1</version> </dependency>
6.2 Flink-KafkaSource
public class KafkaSource { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //kafka配置 String topic = "superman"; Properties prop = new Properties(); prop.setProperty("bootstrap.servers","192.168.52.200:9092");//多个的话可以指定 prop.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); prop.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer"); prop.setProperty("auto.offset.reset","earliest"); prop.setProperty("group.id","consumer3"); FlinkKafkaConsumer<String> kafkaSource = new FlinkKafkaConsumer<>(topic, new SimpleStringSchema(), prop); DataStreamSource<String> lines = env.addSource(kafkaSource); lines.print(); env.execute(); } }
七、错误解决
在连接kafkasource时候,总是消费不到kafka中的数据,开始怀疑以下问题:
1.zookeeper集群,kafka集群消息不通
2.宿主机与虚拟机网络不通
3.flink版本与kafka版本jar冲突
4.windows防火墙问题
5.hosts文件的主机名没有配置
最后经过查文档和排除问题,终于得知了zookeeper在集群中的消息是以主机名发送的,所以需要配置主机名。