数据实时同步平台搭建

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 数据实时同步平台搭建
  1. 执行安装命令,安装openjdk8
yum install java-1.8.0-openjdk
  1. 查看路径,找到Java的具体安装路径
which java
ls -lrt /usr/bin/java
  1. 进入上一步找到的Java目录,查看java是否安装成功
java -version
  1. 安装Javac
yum install java-1.8.0-openjdk-devel.x86_64
  1. 配置Java环境变量,编辑/etc/profile文件,在末尾追加如下内容
JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-xxxx 这就是刚刚说的那个路径的名称
JRE_HOME=$JAVA_HOME/jre
CLASS_PATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar:$JRE_HOME/lib
PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME/bin
export JAVA_HOME JRE_HOME CLASS_PATH PATH

二、MySQL部署

  1. 在/usr/local目录下创建mysql文件夹。
mkdir mysql
  1. 下载mysql8.0安装包,下载地址:https://dev.mysql.com/downloads/mysql/
  2. 上传mysql-8.0.30-linux-glibc2.12-x86_64.tar.xz到/usr/local/mysql目录下面。
  3. 解压文件并重命名。
tar xJf mysql-8.0.30-linux-glibc2.12-x86_64.tar.xz
mv mysql-8.0.30-linux-glibc2.12-x86_64 mysql-8.0
  1. 在/usr/local/mysql目录下创建data文件夹,用于存放mysql数据。
mkdir data
  1. 分别创建用户组以及用户和密码
groupadd mysql
useradd -g mysql mysql
  1. 授权刚刚新建的用户
chown -R mysql.mysql /usr/local/mysql/mysql-8.0
chmod 750 /usr/local/mysql/data -R
  1. 配置环境变量。修改/etc/profile文件,在末尾追加。
export PATH=$PATH:/usr/local/mysql/mysql-8.0/bin:/usr/local/mysql/mysql-8.0/lib
  1. 编辑数据库配置文件 /etc/my.cnf
[mysql]
default-character-set=utf8mb4
[client]
socket=/var/lib/mysql/mysql.sock
[mysqld]
user=mysql
general_log = 1
general_log_file= /var/log/mysql/mysql.log
socket=/var/lib/mysql/mysql.sock
basedir=/usr/local/mysql/mysql-8.0
datadir=/usr/local/mysql/data
log-bin=/usr/local/mysql/data/mysql-bin
innodb_data_home_dir=/usr/local/mysql/data
innodb_log_group_home_dir=/usr/local/mysql/data/
character-set-server=utf8mb4
lower_case_table_names=1
autocommit=1
default_authentication_plugin=mysql_native_password
symbolic-links=0
[mysqld_safe]
log-error=/usr/local/mysql/data/mysql.log
pid-file=/usr/local/mysql/data/mysql.pid
  1. 初始化MySQL信息,得到数据库的初始密码(在/usr/local/mysql/mysql-8.0/bin目录下执行)
./mysqld --user=mysql --basedir=/usr/local/mysql/mysql-8.0 --datadir=/usr/local/mysql/data/ --initialize
  1. 复制 mysql.server 文件,在/usr/local/mysql/mysql-8.0目录下执行。
cp -a ./support-files/mysql.server /etc/init.d/mysql
cp -a ./support-files/mysql.server /etc/init.d/mysqld
  1. 赋予权限
chown 777 /etc/my.cnf
chmod +x /etc/init.d/mysql
chmod +x /etc/init.d/mysqld
  1. 检查一下/var/lib/mysql是否存在,否则进行创建并授权。
mkdir /var/lib/mysql
chown -R mysql:mysql /var/lib/mysql/
  1. 启动数据库。
service mysql start
  1. 修改root账号密码。
mysql -h127.0.0.1 -uroot -p
ALTER USER 'root'@'localhost' IDENTIFIED WITH mysql_native_password BY '新密码';
flush privileges;

三、zookeeper部署

zookeeper集群部署

0.下载zookeeper3.8.0,下载地址:https://www.apache.org/dyn/closer.lua/zookeeper/zookeeper-3.8.0/apache-zookeeper-3.8.0-bin.tar.gz

  1. 将zookeeper解压到/usr/local目录下面(zookeeper要下载名称中带-bin的包)。
tar -zxf apache-zookeeper-3.8.0-bin.tar.gz
  1. 在zookeeper下面创建zkData目录。
mkdir zkData
  1. 重命名conf目录下面的配置文件zoo_simple.cfg
mv zoo_simple.cfg zoo.cfg
  1. 修改zoo.cfg文件。
具体配置 
dataDir=/usr/local/zookeeper/zkData 
增加如下配置 
#######################cluster########################## 
server.1=test-bigdata1:2888:3888
server.2=test-bigdata2:2888:3888
server.3=test-bigdata3:2888:3888
  1. 在zkData目录下面创建myid文件
touch myid
在myid里面写入1,表示zoo.cfg里面server.1中的1。
  1. 将配置好的zookeeper拷贝到另外两台服务器,修改myid文件中的值为对象的server的值。
  2. 分别启动zookeeper。
bin/zkServer.sh start
  1. 查看启动状态,有一台leader,两天follower,表示启动成功。
bin/zkServer.sh status
ZooKeeper JMX enabled by default
Client port found: 2181. Client address: localhost.
Mode: follower
bin/zkServer.sh status
ZooKeeper JMX enabled by default
Client port found: 2181. Client address: localhost.
Mode: leader
bin/zkServer.sh status
ZooKeeper JMX enabled by default
Client port found: 2181. Client address: localhost.
Mode: follower
  1. ** 如果启动过程中出现org.apache.zookeeper.server.quorum.QuorumPeerConfig$ConfigException: Address unresolved: bigdata03:3888 异常,是因为bigdata03:3888后面有空格所致。**

部署zkui可视化页面

  1. 在GITHUB下载源码打包编译,https://github.com/DeemOpen/zkui
mvn clean package
  1. 打包完成会在target目录下面生成zkui-2.0-SNAPSHOT-jar-with-dependencies.jar文件。
  2. 将zkui-2.0-SNAPSHOT-jar-with-dependencies.jar和config.cfg文件拷贝到服务器。
  3. 创建需要的数据库和用户,并初始化。
mysql -h127.0.0.1 -uroot -p 
# 创建账号并修改密码
CREATE user 'zkui'@'%';
alter user 'zkui'@'%' identified with mysql_native_password by '密码';
#创建数据库
create database zkui;
# 授权数据库
grant all privileges on zkui.* to "zkui"@"%";
# 刷新权限
flush privileges;
  1. 修改config.cfg文件,配置相应的数据库信息。
zkServer=test-bigdata1:2181,test-bigdata2:2181,test-bigdata3:2181
jdbcClass=com.mysql.jdbc.Driver
jdbcUrl=jdbc:mysql://test-bigdata4:3306/zkui?characterEncoding=utf8&useSSL=false
jdbcUser=zkui
jdbcPwd=密码
  1. 创建启动脚本,新建start.sh内容如下。
nohup java -jar zkui-2.0-SNAPSHOT-jar-with-dependencies.jar >2&1 >logs/zkui-out.log &
  1. 创建停止脚本,新建stop.sh内容如下。
#! /bin/bash
PROGRAM_PID=`ps -ef | grep zkui-2.0-SNAPSHOT-jar-with-dependencies.jar| grep -v grep | awk '{print $2}'` 
if [[ ${PROGRAM_PID}x != ""x ]]; then
        echo "stop service at PID[${PROGRAM_PID}] ..."
        kill -9 ${PROGRAM_PID}
        if [[ $? -eq 0 ]]; then
                echo "stop service at PID[${PROGRAM_PID}] success"
        else
                echo "stop service at PID[${PROGRAM_PID}] error"
        fi
else
        echo "service is not running."
fi
  1. 启动
./start.sh
  1. 查看页面
http://192.168.20.167:9090/home
账号密码:admin/manager

四、Pulsar部署

  1. 下载pulsar,下载地址:http://archive.apache.org/dist/pulsar/pulsar-2.10.1/apache-pulsar-2.10.1-bin.tar.gz
  2. 将其上传到/usr/local目录下面
  3. 解压
tar -zxf apache-pulsar-2.10.1-bin.tar.gz
  1. 配置环境变量,修改/etc/profile文件,在其末尾追加。
export PULSAR_HOME=/opt/local/apache-pulsar-2.10.1
export PATH=$PATH:$PULSAR_HOME/bin
  1. 初始化集群元数据。
pulsar initialize-cluster-metadata \
  --cluster pulsar-cluster-1 \
  --zookeeper 192.168.20.164:2181 \
  --configuration-store 192.168.20.164:2181 \
  --web-service-url http://192.168.20.164:8070,192.168.20.165:8070,192.168.20.166:8070 \
  --broker-service-url pulsar://192.168.20.164:6650,192.168.20.165:6650,192.168.20.166:6650

参数说明如下:

参数 说明
—cluster pulsar 集群名字
–zookeeper zookeeper 地址,只需要包含 zookeeer 集群中的任意一台机器即可
–configuration-store 配置存储地址,只需要包含 zookeeer 集群中的任意一台机器即可
–web-service-url pulsar 集群 web 服务的 URL 以及端口,默认的端口是8080,自定义为8070
–broker-service-url broker 服务的URL,用于与 pulsar 集群中的 brokers 进行交互,默认端口是 6650

部署BookKeeper集群

  1. 修改BookKeeper配置文件
vim bookkeeper.conf 
zkServers=192.168.20.164:2181,192.168.20.165:2181,192.168.20.166:2181
  1. 创建bookie所需要的目录
mkdir -p /usr/local/apache-pulsar-2.10.1/data/bookkeeper
  1. 初始化BookKeeper元数据,(只需在一个bookie节点执行一次)
bookkeeper shell metaformat
  1. 启动BookKeeper集群。三台分别启动
pulsar-daemon start bookie
  1. 验证集群状态,在任意一台 Bookeeper 节点上使用 Bookeeper shell 的 simpletest 命令,去校验集群内所有的 bookie 是否都已经启动,3 为 Bookeeper 节点数量。
bookkeeper shell simpletest --ensemble 3 --writeQuorum 3 --ackQuorum 3 --numEntries 3

参数含义如下: -a,–ackQuorum Ack quorum size (default 2) 当指定数量的 bookie ack 响应时,认为消息写入成功 -e,–ensemble Ensemble size (default 3) 写入数据的 bookie 节点数量 -n,–numEntries Entries to write (default 1000) 一批消息的消息数量 -w,–writeQuorum Write quorum size (default 2) 每条消息副本数量 这个命令会在集群上创建和 bookie 同等数量的 ledger,并往里面写一些条目,然后读取它,最后删除这个 ledger。

部署pulsar集群

  1. 修改pulsar配置文件。三台均需要
vim broker.conf
# 配置pulsar broker连接的zookeeper集群地址
zookeeperServers=192.168.20.164:2181,192.168.20.165:2181,192.168.20.166:2181
configurationStoreServers=192.168.20.164:2181,192.168.20.165:2181,192.168.20.166:2181
clusterName=pulsar-cluster-1
  1. 启动Pulsar集群,三台均需要。
pulsar-daemon start broker
  1. 配置客户端连接pulsar集群。
vim client.conf
webServiceUrl=http://192.168.20.164:8070,192.168.20.165:8070,192.168.20.166:8070
brokerServiceUrl=pulsar://192.168.20.164:6650,192.168.20.165:6650,192.168.20.166:6650
  1. 命令行验证生产消费消息。

启动消费者

pulsar-client consume \
  persistent://public/default/pulsar-test \
  -n 100 \
  -s "consumer-test" \
  -t "Exclusive"

启动生产者

pulsar-client produce \
  persistent://public/default/pulsar-test \
  -n 1 \
  -m "Hello Pulsar"

观察消费端控制台输出,如打输出内容content:Hello Pulsar,则流程全部完成;

部署PulsarManager

  1. 下载pulsar-manager的源码,https://github.com/apache/pulsar-manager
  2. 修改源代码的build.gradle文件,新增MySQL依赖。
compile group: 'mysql', name: 'mysql-connector-java', version: mysqlVersion
  1. 修改源码的gradle.properties文件,新增MySQL版本。
mysqlVersion=8.0.25
  1. 执行打包命令,打包文件。
gradlew.bat distTar -x test
  1. 打包成功后,在build\distributions目录下面会生成pulsar-manager.tar文件。
  2. 下载apache-pulsar-manager-0.3.0-bin.tar.gz。下载地址:https://www.apache.org/dyn/mirrors/mirrors.cgi?action=download&filename=pulsar/pulsar-manager/pulsar-manager-0.3.0/apache-pulsar-manager-0.3.0-bin.tar.gz
  3. 将apache-pulsar-manager-0.3.0-bin.tar.gz和pulsar-manager.tar全都上传到/usr/local目录下面
  4. 解压文件。
tar -zxf apache-pulsar-manager-0.3.0-bin.tar.gz -c /usr/local/test
tar -zxf pulsar-manager.tar
  1. 将 apache-pulsar-manager-0.3.0-bin.tar.gz解压包里面的dist目录。拷贝到pulsar-manager下面。
cp -r /usr/local/test/pulsar-manager/dist /usr/local/pulsar-manager/ui
  1. 在MySQL中创建数据库和账号。
mysql -h127.0.0.1 -uroot -p 
# 创建账号并修改密码
CREATE user 'pulsar'@'%';
alter user 'pulsar'@'%' identified with mysql_native_password by '密码';
#创建数据库
create database pulsar_manager;
# 授权数据库
grant all privileges on pulsar_manager.* to "pulsar"@"%";
# 刷新权限
flush privileges;
  1. 修改配置文件application.properties。
# mysql configuration
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
spring.datasource.url=jdbc:mysql://test-bigdata4:3306/pulsar_manager?characterEncoding=utf8&useSSL=false
spring.datasource.username=pulsar
spring.datasource.password=密码
  1. 创建一个start.sh并授权,作为启动
nohup bin/pulsar-manager-0.3.0 >> logs/pulsar-manager.log 2>&1 &
chmod +x start.sh
  1. 启动
./start.sh
  1. 初始化密码.
# 执行获取csrf-token
curl http://192.168.20.167:7750/pulsar-manager/csrf-token
# 使用上一步获取的token替换CSRF_TOKEN值,然后执行
curl -H 'X-XSRF-TOKEN: $CSRF_TOKEN' -H 'Cookie: XSRF-TOKEN=$CSRF_TOKEN;' -H "Content-Type: application/json" -X PUT http://192.168.20.167:7750/pulsar-manager/users/superuser -d '{"name": "pulsar", "password": "pulsar", "description": "superuser", "email": "surperuser@pulsar.org"}'
  1. 查看页面
http://192.168.20.167:7750/ui/index.html
账号密码:pulsar/pulsar
  1. 新增环境

image.png

五、HADOOP部署

修改配置文件

  1. 安装hadoop,选择安装hadoop3.2.1。下载地址:https://archive.apache.org/dist/hadoop/common/hadoop-3.1.0/hadoop-3.1.0.tar.gz
  2. 解压文件到/usr/local目录下面
tar -zxf hadoop-3.2.1.tar.gz -C /usr/local
  1. 修改hadoop的etc/hadoop目录下面的hadoop-env.sh文件。
# 在其中加入jdk路,替换为自己真实的地址
JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-xxxx
# 加入各个组件操作的用户,防止放弃操作出现用户无法操作问题。
export HDFS_NAMENODE_USER=root
export HDFS_DATANODE_USER=root
export HDFS_SECONDARYNAMENODE_USER=root
export YARN_RESOURCEMANAGER_USER=root
export YARN_NODEMANAGER_USER=root
export HDFS_JOURNALNODE_USER=root
export HDFS_ZKFC_USER=root
  1. 修改的etc/hadoop目录下面的core-site.xml文件。
<configuration>
   <!--把两个NameNode的地址组装成一个集群mycluster -->
   <property>
      <name>fs.defaultFS</name>
      <value>hdfs://mycluster</value>
   </property>   
   <!--指定hadoop运行时产生文件的存储目录 -->
   <property>
      <name>hadoop.tmp.dir</name>
      <value>/usr/local/hadoop-3.2.1/data/tmp</value>
   </property>
   <!--配置zookeeper集群的地址,进行namenode发生故障时自动转移 -->
   <property>
      <name>ha.zookeeper.quorum</name>
      <value>test-bigdata1:2181,test-bigdata2:2181,test-bigdata3:2181</value>
   </property>
</configuration>
  1. 修改hadoop中etc/hadoop目录下面的hdfs-site.xml文件。
<configuration>
  <!--完全分布式集群名称 -->
  <property>
    <name>dfs.nameservices</name>
    <value>mycluster</value>
  </property>
  <!--集群中NameNode节点名称 -->
  <property>
    <name>dfs.ha.namenodes.mycluster</name>
    <value>nn1,nn2</value>
  </property>
  <!--nn1的RPC通信地址 -->
  <property>
    <name>dfs.namenode.rpc-address.mycluster.nn1</name>
    <value>test-bigdata1:9000</value>
  </property>
  <!-- nn2的RPC通信地址 -->
  <property>
    <name>dfs.namenode.rpc-address.mycluster.nn2</name>
    <value>test-bigdata2:9000</value>
  </property>
  <!--nn1的http通信地址 -->
  <property>
    <name>dfs.namenode.http-address.mycluster.nn1</name>
    <value>test-bigdata1:50070</value>
  </property>
  <!-- nn2的http通信地址 -->
  <property>
    <name>dfs.namenode.http-address.mycluster.nn2</name>
    <value>test-bigdata2:50070</value>
  </property>
  <!--指定Namenode元数据在JournalNode上的存放位置 -->
  <property>
    <name>dfs.namenode.shared.edits.dir</name>
    <value>qjournal://test-bigdata1:8485;test-bigdata2:8485;test-bigdata3:8485/mycluster</value>
  </property>
  <!--配置隔离机制,即同一时刻只能有一台服务器对外响应 -->
  <property>
    <name>dfs.ha.fencing.methods</name>
    <value>shell(/bin/true)</value>
  </property>
  <!--使用隔离机制时需要ssh无秘钥登录-->
  <property>
    <name>dfs.ha.fencing.ssh.private-key-files</name>
    <value>/root/.ssh/id_rsa</value>
  </property>
  <!--声明journalnode服务器存储目录-->
  <property>
    <name>dfs.journalnode.edits.dir</name>
    <value>/usr/local/hadoop-3.2.1/data/jn</value>
  </property>
  <!--关闭检查权限 -->
  <property>
    <name>dfs.permissions.enables</name>
    <value>false</value>
  </property>
  <!--访问代理类client,mycluster,active 配置失败自动切换实现方式-->
  <property>
    <name>dfs.client.failover.proxy.provider.mycluster</name>
    <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
  </property>
  <!-- 开启故障自动转移 -->
  <property>
    <name>dfs.ha.automatic-failover.enabled</name>
    <value>true</value>
  </property>
</configuration>
  1. 修改hadoop中etc/hadoop目录下面的yarn-site.xml文件。
<configuration>
   <property> 
       <name>yarn.nodemanager.aux-services</name>
       <value>mapreduce_shuffle</value>
   </property>
   <!--启用 resourcemanager ha--> 
   <property>
       <name>yarn.resourcemanager.ha.enabled</name>
       <value>true</value>
   </property>
    <!--声明两台 resourcemanager 的地址--> 
   <property>
       <name>yarn.resourcemanager.cluster-id</name>
       <value>cluster-yarn1</value>
   </property>
   <property>
       <name>yarn.resourcemanager.ha.rm-ids</name>
       <value>rm1,rm2</value>
   </property>
   <property>
       <name>yarn.resourcemanager.hostname.rm1</name>
       <value>test-bigdata1</value>
   </property>
   <property>
       <name>yarn.resourcemanager.hostname.rm2</name>
      <value>test-bigdata2</value>
   </property>
    <!--指定 zookeeper 集群的地址--> 
   <property>
       <name>yarn.resourcemanager.zk-address</name>
       <value>test-bigdata1:2181,test-bigdata2:2181,test-bigdata3:2181</value>
   </property>
    <!--启用自动恢复--> 
   <property>
       <name>yarn.resourcemanager.recovery.enabled</name>
       <value>true</value>
   </property>
    <!--指定 resourcemanager 的状态信息存储在 zookeeper 集群-->  
   <property>
       <name>yarn.resourcemanager.store.class</name>
       <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
   </property>
</configuration>
  1. 修改/etc/profile文件
HADOOP_HOME=/usr/local/hadoop-3.2.1
export HADOOP_CONF_DIR=${HADOOP_HOME}/etc/hadoop
export HADOOP_CLASSPATH=$HADOOP_HOME/share/hadoop/yarn/*:$HADOOP_HOME/share/hadoop/common/*:$HADOOP_HOME/share/hadoop/mapreduce/*:$HADOOP_HOME/share/hadoop/hdfs/*:$HADOOP_HOME/share/tools/*:$HADOOP_HOME/share/hadoop/httpfs/*:$HADOOP_HOME/share/hadoop/kms/*:$HADOOP_HOME/share/hadoop/common/lib/*

集群启动

  1. 启动高可用hadoop集群
  • 启动hdfs集群。

a. 在三台服务器上分别启动journalnode。

在hadoop的bin目录下面执行下面语句
./hdfs --daemon start journalnode

b. 在配置了NameNode的任一服务器上面格式化NameNode。

# 在bigdata01服务器的hadoop的bin目录下面执行命令
./hdfs namenode -format

c. 在执行了上面命令的那台服务器上面启动NameNode。

# 在bigdata01服务器的hadoop的bin目录下面执行命令
./hdfs --daemon start namenode

d. 在另外一台配置了NameNode的服务器上面同步NameNode信息。

# 在bigdata02服务器的hadoop的bin目录下面执行命令
./hdfs namenode -bootstrapStandby

e. 格式化zk

# 在bigdata01服务器的hadoop的bin目录下面执行命令
./hdfs zkfc -formatZK

f. 启动hadoop集群

# 在bigdata01服务器的hadoop的sbin目录下面执行命令
./start-dfs.sh

g. 启动其他服务器的DataNode。

# 在bigdata02和bigdata03服务器的bin目录下面执行命令
./hdfs --daemon start datanode
  • 启动yarn集群

a.在配置了resourcemanager的任一台服务器上面启动resourcemanager。

# 在bigdata01服务器的hadoop的sbin目录下执行命令
./start-yarn.sh

b. 在其余服务器上面启动nodemanager。

# 在bigdata02和bigdata03服务器的hadoop的bin目录下执行
./yarn --daemon start nodemanager

高可用测试

  1. 高可用测试。
  • hdfs高可用测试。

a.查看NameNode的状态。

# 在bigdata01和bigdata02服务器的hadoop的bin目录下面执行命令
./hdfs haadmin -getServiceState nn1
./hdfs haadmin -getServiceState nn2

b.手动切换NameNode状态

## 假如nn1的状态为standby,nn2的状态为active
# 在状态为standby的NameNode所在的机器上的hadoop的bin目录下执行命令
./hdfs haadmin -transitionToActive nn1
# 可将NameNode的状态切换为active。
# 在状态为active的NameNode所在的机器上的hadoop的bin目录下执行命令
./hdfs haadmin -transitionToStandby nn2
# 可将NameNode的状态切换为standby。

c. 将NameNode状态为active的服务器的active停掉。

# 在状态为active的NameNode所在的机器上的hadoop的bin目录下执行命令
./hdfs --daemon stop namenode
# 然后查看状态为standby的那个NameNode的状态,如果已经变为active,表示成功。
  • 测试yarn高可用

a. 查询yarn的状态

# 在bigdata01和bigdata02服务器的hadoop的bin目录下面执行命令
./hdfs rmadmin -getServiceState rm1
./hdfs rmadmin -getServiceState rm2

b. 手动切换yarn状态

## 假如rm1的状态为standby,rm2的状态为active
# 在状态为standby的resourcemanager所在的机器上的hadoop的bin目录下执行命令
./yarn rmadmin -transitionToActive rm1
# 可将resourcemanager的状态切换为active。
# 在状态为active的Nresourcemanager所在的机器上的hadoop的bin目录下执行命令
./hdfs rmadmin -transitionToStandby rm2
# 可将resourcemanager的状态切换为standby。

c. 将resourcemanager状态为active的服务器的resourcemanager停掉。

# 在状态为active的resourcemanager所在的机器上的hadoop的bin目录下执行命令
./yarn --daemon stop resourcemanager
# 然后查看状态为standby的那个resourcemanager的状态,如果已经变为active,表示成功。

六、flink部署(YARN模式)

  1. 下载flink-1.13.5-bin-scala_2.12.tgz 。下载地址:https://archive.apache.org/dist/flink/flink-1.13.5/flink-1.13.5-bin-scala_2.12.tgz
  2. 将flink-1.13.5-bin-scala_2.12.tgz 上传服务器。
  3. 解压文件。
tar -zxf flink-1.13.5-bin-scala_2.12.tgz -C /usr/local
  1. 配置环境变量, 在/etc/profile文件中追加如下内容
export FLINK_HOME=/usr/local/flink-1.13.5/
export PATH=$PATH:$FLINK_HOME/bin:
  1. 配置集群信息,修改flink-conf.yaml,三台均修改
#修改几个地方:
jobmanager.rpc.address: test-bigdata1
rest.port: 8081
rest.address: test-bigdata1
env.hadoop.conf.dir: /usr/local/3.2.1/etc/hadoop
high-availability: zookeeper
high-availability.storageDir: hdfs://mycluster/flink/ha/
high-availability.zookeeper.quorum: test-bigdata1:2181,test-bigdata2:2181,test-bigdata3:2181
  1. 修改msaters文件
test-bigdata1:8081
  1. 修改slaves文件
test-bigdata1
test-bigdata2
test-bigdata3
  1. 添加对接的jar包到flink安装目录的lib下。
flink-connector-jdbc_2.12-1.13.1.jar
flink-connector-starrocks-1.2.2_flink-1.13_2.11.jar 
flink-shaded-hadoop2-uber-2.8.3-1.8.3.jar
mysql-connector-java-8.0.29.jar
  1. 启动集群,在一台上面启动即可。
start-cluster.sh
  1. 查看页面
http://192.168.20.164:8081/

七、安装Dinky

修改配置

  1. 下载dlink-release-0.6.7.tar.gz,下载地址为:https://github.com/DataLinkDC/dlink/releases/download/v0.6.7/dlink-release-0.6.7.tar.gz
  2. 解压到/usr/local目录下
tar -zxf dlink-release-0.6.7.tar.gz -C /usr/local
mv dlink-release-0.6.7 dlink
  1. 初始化数据库。
mysql -h127.0.0.1 -uroot -p 
# 创建账号并修改密码
CREATE user 'dlink'@'%';
alter user 'dlink'@'%' identified with mysql_native_password by '密码';
#创建数据库
create database dlink;
# 授权数据库
grant all privileges on dlink.* to "dlink"@"%";
# 刷新权限
flush privileges;
# 下面使用新创建的dlink账号登录mysql
mysql -h127.0.0.1 -udlink -p 
# 切换数据库
use dlink;
# 导入数据库
source /usr/local/dlink/sql/dlink.sql;
source /usr/local/dlink/sql/dlinkmysqlcatalog.sql;
  1. 修改配置文件。修改application.yml文件的数据库连接信息。
spring:
  datasource:
    url: jdbc:mysql://127.0.0.1:3306/dlink?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull&serverTimezone=Asia/Shanghai&allowPublicKeyRetrieval=true
    username: dlink
    password: dlink
    driver-class-name: com.mysql.cj.jdbc.Driver
  1. 新增相关jar包。
#创建目录plugins
cd /usr/local/dlink/
mkdir plugins
# 将flink的lib下面的jar全都放入plugins目录下面 包含如下jar包
flink-connector-jdbc_2.12-1.13.1.jar
flink-connector-starrocks-1.2.2_flink-1.13_2.11.jar
flink-csv-1.13.5.jar
flink-dist_2.12-1.13.5.jar
flink-json-1.13.5.jar
flink-shaded-hadoop-2-uber-2.8.3-10.0.jar
flink-shaded-zookeeper-3.4.14.jar
flink-table_2.12-1.13.5.jar
flink-table-blink_2.12-1.13.5.jar
mysql-connector-java-8.0.29.jar
# 在其下面在新增两个jar包
javax.ws.rs-api-2.0.1.jar
log4j-1.2.17.jar
  1. 进行FLink版本适配
# 查看lib目录下面的jar包,主要关注一下几个jar包
dlink-catalog-mysql-1.13-0.6.7.jar
dlink-client-1.13-0.6.7.jar
dlink-connector-jdbc-1.13-0.6.7.jar
# 查看其版本是否为1.13,如果不是,将其删除,在extends目录下找到1.13版本,将其复制到lib目录下。
  1. 新增hadoop和flink的配置文件。
# 在安装根目录下面新建hadoop_conf文件夹,将hadoop目录中的etc/hadoop目录下面的文件全都
复制到里面。
# 在安装根目录下面新建flink_conf文件夹,将flink目录中的conf目录下面的文件全都
复制到里面。

新增HDFS文件

  1. 在HDFS上面新增/dlink/jar目录。将dlink安装目录下的dlink-app-1.13-0.6.7-jar-with-dependencies.jar重命名之后传入其中。
hdfs dfs -mkdir -p /dlink/jar
cd /usr/local/dlink/jar
mv dlink-app-1.13-0.6.7-jar-with-dependencies.jar dlink-app.jar
hdfs dfs -put dlink-app.jar /dlink/jar/
  1. 在HDFS上面新增/flink/lib目录,将flink下面相关的lib和plugins全都上传到该目录下面。
hdfs dfs -mkdir -p /flink/lib
cd /usr/local/flink-1.13.5/lib
hdfs dfs -put -f flink-shaded-zookeeper-3.4.14.jar /flink/lib
/usr/local/flink-1.13.5
hdfs dfs -put -f lib /flink/lib
hdfs dfs -put -f plugins /flink/lib

启动项目

  1. 启动项目
#启动
$sh auto.sh start
#停止
$sh auto.sh stop
#重启
$sh auto.sh restart
#查看状态
$sh auto.sh status
  1. 查看
http://192.168.20.167:8888/
默认用户名/密码: admin/admin

配置相关环境

  1. 新增集群实例

在 注册中心->集群管理->集群实例管理 下面新增集群实例的配置信息。

  1. 配置集群配置

在 注册中心->集群管理->集群配置管理 下面新增集群配置的配置信息。

  1. 测试mysql2starrocks

在 数据开发 -> 目录 下面新建目录flink-sql。然后新建作业。选择flink-sql,然后写入如下flinksql。

-- 表都需要提前建好
-- 创建mysql连接
create table mysql_test1(
  first_classify STRING,
  second_classify STRING,
  first_variety STRING,
  second_variety STRING
)  WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://192.168.20.167:3306/inlong_test?useSSL=false',
  'table-name' = 'dim_agriculture_products_classify',
  'username' = 'test',
  'password' = 'test123456',
  'driver' = 'com.mysql.cj.jdbc.Driver'
);
-- 创建satrrocks连接
create table starrocks_test1(
  first_classify STRING,
  second_classify STRING,
  first_variety STRING,
  second_variety STRING
)  WITH (
  'connector' = 'starrocks',
  'jdbc-url' = 'jdbc:mysql://192.168.20.213:9030/dlink_test',
  'load-url' = '192.168.20.211:8050;192.168.20.212:8050;192.168.20.213:8050',
  'database-name' = 'dlink_test',
  'table-name' = 'dim_agriculture_products_classify',
  'username' = 'root',
  'password' = 'root'
);
-- 将mysql数据同步到starrocks
insert into starrocks_test1
select * from mysql_test1;
  1. 测试starrocks2mysql

在 数据开发 -> 目录 下面新建目录flink-sql。然后新建作业。选择flink-sql,然后写入如下flinksql

-- 表都需要提前建好
-- 创建mysql连接
create table mysql_test2(
    first_classify STRING,
    second_classify STRING,
    first_variety STRING,
    second_variety STRING
)  WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://192.168.20.167:3306/inlong_test?useSSL=false',
   'table-name' = 'dim_agriculture_products_classify_1',
   'username' = 'test',
   'password' = 'test123456',
   'driver' = 'com.mysql.cj.jdbc.Driver'
);
-- 创建satrrocks连接
create table starrocks_test2(
    first_classify STRING,
    second_classify STRING,
    first_variety STRING,
    second_variety STRING
)  WITH (
   'connector' = 'starrocks',
   'jdbc-url' = 'jdbc:mysql://192.168.20.213:9030/dlink_test',
   'scan-url' = '192.168.20.211:8050,192.168.20.212:8050,192.168.20.213:8050',
   'database-name' = 'dlink_test',
   'table-name' = 'dim_agriculture_products_classify',
   'username' = 'root',
   'password' = 'root'
);
-- 将starrocks数据导入mysql
insert into mysql_test2
select * from starrocks_test2;
  1. 点击保存 -> 检查 ->运行。

  1. 运行之后可以到flink webui查看信息。也可以到yarn上面查看任务。
  2. 测试结果
mysql表结构
CREATE TABLE `dim_agriculture_products_classify_1` (
  `first_classify` varchar(50) NULL COMMENT "一级分类",
  `second_classify` varchar(50) NULL COMMENT "二级分类",
  `first_variety` varchar(50) NULL COMMENT "一级品种",
  `second_variety` varchar(1000) NULL COMMENT "二级品种"
) 
satrrocks表结构
CREATE TABLE `dim_agriculture_products_classify` (
  `first_classify` varchar(50) NULL COMMENT "一级分类",
  `second_classify` varchar(50) NULL COMMENT "二级分类",
  `first_variety` varchar(50) NULL COMMENT "一级品种",
  `second_variety` varchar(1000) NULL COMMENT "二级品种"
) ENGINE=OLAP 
UNIQUE KEY(`first_classify`, `second_classify`, `first_variety`, `second_variety`)
COMMENT "农产品分类维度数据"
DISTRIBUTED BY HASH(`second_variety`) BUCKETS 8 
PROPERTIES (
"replication_num" = "1",
"in_memory" = "false",
"storage_format" = "DEFAULT",
"enable_persistent_index" = "false"
);
数据量1099条,
mysql -> starrocks  2s
starrocks -> mysql 5s
相关实践学习
基于MSE实现微服务的全链路灰度
通过本场景的实验操作,您将了解并实现在线业务的微服务全链路灰度能力。
相关文章
|
SQL 数据采集 数据可视化
需求:公司需要做数据迁移同步,以下是几种常见的ETL工具选型对比
需求:公司需要做数据迁移同步,以下是几种常见的ETL工具选型对比
|
6月前
|
消息中间件 关系型数据库 Kafka
深入理解数仓开发(二)数据技术篇之数据同步
深入理解数仓开发(二)数据技术篇之数据同步
|
5月前
|
分布式计算 关系型数据库 数据处理
美柚与MaxCompute的数据同步架构设计与实践
数据处理与分析 一旦数据同步到MaxCompute后,就可以使用MaxCompute SQL或者MapReduce进行复杂的数据处理和分析。
|
6月前
|
消息中间件 前端开发 Kafka
离线数仓(二)【用户行为日志采集平台搭建】(1)
离线数仓(二)【用户行为日志采集平台搭建】
|
6月前
|
消息中间件 数据采集 关系型数据库
离线数仓(三)【业务日志采集平台搭建】(2)
离线数仓(三)【业务日志采集平台搭建】
|
6月前
|
消息中间件 存储 JSON
离线数仓(二)【用户行为日志采集平台搭建】(2)
离线数仓(二)【用户行为日志采集平台搭建】
|
6月前
|
存储 消息中间件 Kafka
离线数仓(三)【业务日志采集平台搭建】(1)
离线数仓(三)【业务日志采集平台搭建】
|
canal SQL 弹性计算
实时数据及离线数据上云方案
本实验通过使用CANAL、DataHub、DataWorks、MaxCompute服务,实现数据上云,解决了数据孤岛问题,同时把数据迁移到云计算平台,对后续数据的计算和应用提供了第一步开山之路。
|
SQL 消息中间件 Oracle
带你读《全链路数据治理-全域数据集成》之6:3. 实时同步场景与配置
带你读《全链路数据治理-全域数据集成》之6:3. 实时同步场景与配置
294 0
|
弹性计算 DataWorks 数据库
带你读《全链路数据治理-全域数据集成》之4:1. 实时同步功能概述
带你读《全链路数据治理-全域数据集成》之4:1. 实时同步功能概述
242 0