【建议收藏】Mysql+Flink CDC+Doris 数据同步实战(中)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
简介: 【建议收藏】Mysql+Flink CDC+Doris 数据同步实战

4、准备工作:

安装部署

Mysql:

①:wget http://dev.mysql.com/get/mysql57-community-release-el7-8.noarch.rpm

②:yum -y localinstall mysql57-community-release-el7-8.noarch.rpm

③:yum -y install mysql-community-server --nogpgcheck

Doris:

安装部署doris;

①:上传tar包

②:修改fe.conf的ip和端口

③:修改be.conf的ip和端口

④:启动fe,添加be节点

前置要求

Flink on yarn

①:安装部署Flink

②:配置环境变量HADOOP_CLASSPATH, 在/etc/profile.d/my.sh中配置

③:配置FLINK_HOME到/etc/profile

export HADOOP_CLASSPATH=`hadoop classpath

Flink doris connector

上传Flink doris connector到flink的lib目录下

Flink CDC

上传FlinkCDC到Flink的lib目录下

Mysql ODBC

①:解压tar包

②:复制文件到linux的/use/lib64目录下

cp lib/* /usr/lib64

③:运行bin目录下的文件

./myodbc-installer -d -a -n "MySQL ODBC 5.3 Driver" -t "DRIVER=/usr/lib64/libmyodbc5w.so;SETUP=/usr/lib64/libmyodbc5w.so"

④:修改be.odbcinst.ini配置文件并且分发所有be节点

5、功能演示

mysql外表同步:

  • mysql中创建表:
    mysql -uroot -proot;
create database t_demo;
use t_demo;
CREATE TABLE `t_cickp_charge_connector` (
  `ID` varchar(32) NOT NULL COMMENT '主键',
  `E_ID` varchar(32) NOT NULL COMMENT '关联充电设备表ID',
  `CONNECTOR_ID` varchar(26) NOT NULL,
  `CONNECTOR_NAME` varchar(30) DEFAULT NULL,
  `CONNECTOR_TYPE` smallint DEFAULT NULL,
  `VOLTAGE_UPPER_LIMIT` int DEFAULT NULL,
  `VOLTAGE_LOWER_LIMIT` int DEFAULT NULL,
  `CONNECTOR_CURRENT` int DEFAULT NULL,
  `CONNECTOR_POWER` decimal(19,10) DEFAULT NULL,
  `PARK_NO` varchar(10) DEFAULT NULL,
  `VOLTAGE` int DEFAULT NULL,
  `BMS_POWER_TYPE` smallint DEFAULT NULL,
  `CREATE_TIME` datetime DEFAULT NULL,
  `UPDATE_TIME` datetime DEFAULT NULL,
  PRIMARY KEY (`ID`),
  KEY `idx_equipment_id` (`E_ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='充电枪表';
insert into t_demo.t_cickp_charge_connector (ID, E_ID, CONNECTOR_ID, CONNECTOR_NAME, CONNECTOR_TYPE, VOLTAGE_UPPER_LIMIT, VOLTAGE_LOWER_LIMIT, CONNECTOR_CURRENT, CONNECTOR_POWER, PARK_NO, VOLTAGE, BMS_POWER_TYPE, CREATE_TIME, UPDATE_TIME)
values  ('000256eb359d470082b20ad4a4edf88e', '207f80bec5e14c27b38042d46930282d', '000000001001132__0', '充电枪01', 3, 750, 220, 112, 84.0000000000, '', 0, 3, '2022-08-22 10:50:12', null),
        ('00081df6bbeb4463bc945bde3076b1b0', 'd619e677456f4bf8998e241dae28692b', 'TS1704140074001', 'TS1704140074', 3, 220, 220, 32, 7.0000000000, null, 220, 255, '2022-07-23 12:30:04', null),
        ('0010e36ba5b6432b812901512d2767b2', 'd7cd94fa66944c80951997ac81b9f53f', 'HE121118010071001', 'HE121118010071', 3, 220, 220, 32, 7.0000000000, null, 220, 255, '2022-07-23 15:40:43', null),
        ('001a672ca25f45deab52deae778a98e1', '3bc1663886a942febf04cb39ea8ea803', '500085001', '500085A', 4, 0, 0, 200, 120.0000000000, '', 750, 1, '2021-10-12 16:05:53', null),
        ('001cca79ea2f4ac7a73e830165ceb0cd', '02823a48b93047dd955839f34948084b', 'HE121118062736_1001', 'HE121118062736_1', 3, 220, 220, 32, 7.0000000000, null, 220, 255, '2022-07-23 12:29:56', null),
        ('002f30773c674d32960ae5ec1230ee16', 'bc296f5e53c24594b839ebea34ff565f', 'HE121118040817_1001', 'HE121118040817_1', 3, 220, 220, 32, 7.0000000000, null, 220, 255, '2022-07-23 12:29:52', null),
        ('003b630113fb45f6ad52337ad4e21619', '0313985f1b274a50a861a29287776551', 'HE121122010801001', 'HE121122010801', 3, 220, 220, 32, 7.0000000000, null, 220, 255, '2022-07-23 12:30:19', null),
        ('003dc6e54c60427cb5a1003de4504877', '178de38f88454d85bc3fd36fb2c63c28', 'GP1804280341001', 'GP1804280341', 3, 220, 220, 32, 7.0000000000, null, 220, 255, '2022-07-23 12:30:25', null),
        ('003e884f521442b4ab9bf050395f0b7f', 'fc9b29532bfe4cc9a10647f4ed2ea7e2', 'HE121118063280_1001', 'HE121118063280_1', 3, 220, 220, 32, 7.0000000000, null, 220, 255, '2022-07-23 15:40:44', null),
        ('004e481d713242c1b49026b45042809a', '3949d30b22e7414b953bca61ee973476', 'HE121121050028001', 'HE121121050028', 3, 220, 220, 32, 7.0000000000, null, 220, 255, '2022-07-23 15:40:44', null);
CREATE TABLE `t_cickp_charge_equipment` (
  `ID` varchar(32) NOT NULL COMMENT '主键',
  `S_ID` varchar(32) NOT NULL COMMENT '关联充电站表ID',
  `EQUIPMENT_ID` varchar(24) NOT NULL,
  `MANUFACTURER_ID` varchar(10) DEFAULT NULL,
  `EQUIPMENT_MODEL` varchar(20) DEFAULT NULL,
  `PRODUCTION_DATE` varchar(10) DEFAULT NULL,
  `EQUIPMENT_TYPE` smallint DEFAULT NULL,
  `EQUIPMENT_LNG` decimal(19,6) DEFAULT NULL,
  `EQUIPMENT_LAT` decimal(19,6) DEFAULT NULL,
  `EQUIPMENT_NAME` varchar(30) DEFAULT NULL,
  `EQUIPMENT_TOTAL_POWER` decimal(19,1) DEFAULT NULL,
  `MANUFACTURER_NAME` varchar(30) DEFAULT NULL,
  `EQUIPMENT_ORDER` varchar(255) DEFAULT NULL,
  `EQUIPMENT_STATUS` smallint DEFAULT NULL,
  `EQUIPMENT_POWER` decimal(19,1) DEFAULT NULL,
  `NEW_NATIONAL_STANDARD` smallint DEFAULT NULL,
  `CREATE_TIME` datetime DEFAULT NULL,
  `UPDATE_TIME` datetime DEFAULT NULL,
  `ACCURACY_LEVEL` smallint DEFAULT NULL,
  `CHECK_TIME_LAST` varchar(30) DEFAULT NULL,
  `CHECK_TIME_NEXT` varchar(30) DEFAULT NULL,
  `CERTIFICATE` varchar(255) DEFAULT NULL,
  `start_using_date` varchar(10) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '投入使用时间',
  `put_into_use` datetime(3) DEFAULT NULL COMMENT '投入使用时间',
  PRIMARY KEY (`ID`),
  KEY `idx_station_id` (`S_ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='充电桩表';
insert into t_demo.t_cickp_charge_equipment (ID, S_ID, EQUIPMENT_ID, MANUFACTURER_ID, EQUIPMENT_MODEL, PRODUCTION_DATE, EQUIPMENT_TYPE, EQUIPMENT_LNG, EQUIPMENT_LAT, EQUIPMENT_NAME, EQUIPMENT_TOTAL_POWER, MANUFACTURER_NAME, EQUIPMENT_ORDER, EQUIPMENT_STATUS, EQUIPMENT_POWER, NEW_NATIONAL_STANDARD, CREATE_TIME, UPDATE_TIME, ACCURACY_LEVEL, CHECK_TIME_LAST, CHECK_TIME_NEXT, CERTIFICATE, start_using_date, put_into_use)
values  ('0014d314ec694faead804302efabfeee', '4392ed2f04154473b9f8b91c2a938741', 'HE121121050250', '360051856', 'ZL30-A6', '2021-10-08', 2, 113.905457, 22.771673, 'HE121121050250', 7.0, null, null, 50, 7.0, 2, '2022-07-23 12:30:24', null, null, null, null, null, null, null),
        ('001e0b7c5dc7436fb5479179080983f5', 'bf036a0583334522acab7b2f33444608', 'HE121119050138', '360051856', 'ZL30-A6', '2020-05-18', 2, 114.322876, 22.693771, 'HE121119050138', 7.0, null, null, 50, 7.0, 2, '2022-07-23 12:30:29', null, null, null, null, null, null, null),
        ('002c8597b9b241978b0a2d700e32cfae', '3465e6c1098a41d9893aa8b52727798d', '500020', 'MA5DRRDX1', 'ZDDC120BG', null, 1, 0.000000, 0.000000, '17', 120.0, '深圳智电新能源科技有限公司', null, 50, 120.0, 2, '2021-10-12 16:01:34', '2022-06-20 12:12:43', null, null, null, null, null, null),
        ('008b368751c643219da292fd76d569f1', '65ce25d50c1642799bd97759da568373', '000000001046001', 'MA5DA0053', 'CL5823', '2017-06-02', 1, 114.352464, 22.711021, '1号桩', 120.0, '深圳车电网', null, 50, 120.0, 2, '2022-08-22 10:50:06', null, null, null, null, null, null, null),
        ('009cb3f213384199b9d1816fb6900dcc', '1713051024a74b15874ffabd0411890c', '000000001063015', 'MA5DA0053', 'CL5899', '2021-11-27', 2, 113.920228, 22.535621, '交流充电桩', 7.0, '深圳市车电网络有限公司', null, 50, 7.0, 2, '2022-05-30 11:19:17', '2022-06-14 16:32:47', null, null, null, null, null, null),
        ('00ba38325d0e4804960bbb591f0f69e1', '7a69943d322a411e92132ca6989c55f7', 'HE121121052645', '360051856', 'ZL30-A6', '2022-01-10', 2, 113.909195, 22.601933, 'HE121121052645', 7.0, null, null, 50, 7.0, 2, '2022-07-23 12:30:04', null, null, null, null, null, null, null),
        ('00c25458804446d8b2fad1d007c1c812', 'cd14b93718ee468cbeccf1e84a2583cb', 'HE121119052764', '360051856', 'ZL30-A6', '2019-07-26', 2, 114.049232, 22.701670, 'HE121119052764', 7.0, null, null, 50, 7.0, 2, '2022-07-23 12:29:55', null, null, null, null, null, null, null),
        ('00d113034d9b4067bd12afbea03a1b52', '1c0a1b912d974a828ba82c620e0bba26', 'HE121120041074', '360051856', 'ZL30-A6', '2021-05-12', 2, 113.946404, 22.498718, 'HE121120041074', 7.0, null, null, 50, 7.0, 2, '2022-07-23 12:30:15', null, null, null, null, null, null, null),
        ('00d61d48e8004df095eb74985258d493', '8b7214a83fd74cd1bce14ace5e129107', 'HE121119030713_1', '360051856', 'ZL30-A6', '2019-05-15', 2, 113.883667, 22.788374, 'HE121119030713_1', 7.0, null, null, 50, 7.0, 2, '2022-07-23 12:29:57', null, null, null, null, null, null, null),
        ('00d70d75e8ce44ba9eed370f67432bff', '4f52fcd099f547db8b44421d22d53dec', 'TS1704250011', '360051856', 'JL7-A2-TS', '2017-07-17', 2, 113.930763, 22.523027, 'TS1704250011', 7.0, null, null, 50, 7.0, 2, '2022-07-23 12:30:11', null, null, null, null, null, null, null);
create database p_demo;
use p_demo;
CREATE TABLE `p_inspection_task` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '检定任务id',
  `name` varchar(63) DEFAULT NULL COMMENT '任务名称',
  `type` int DEFAULT NULL COMMENT '任务类型(0:指定检定,1:桩随机抽检,2:双随机抽检)',
  `task_issue_date` datetime DEFAULT NULL COMMENT '任务下达日期',
  `status` int DEFAULT '0' COMMENT '状态(0:新建,1:已下达,2:进行中,3:已完成)',
  `task_achieve_time` datetime DEFAULT NULL COMMENT '实际完成时间',
  `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime DEFAULT NULL COMMENT '更新时间',
  `deadline` datetime DEFAULT NULL COMMENT '计划完成时间',
  `charger_platform_task` bigint DEFAULT NULL COMMENT '充电桩平台下发的任务id',
  `creator` int DEFAULT NULL COMMENT '创建者',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=178 DEFAULT CHARSET=utf8mb3 COMMENT='检定任务表';
insert into p_demo.p_inspection_task (id, name, type, task_issue_date, status, task_achieve_time, create_time, update_time, deadline, charger_platform_task, creator)
values  (1, '任务1', 0, '2022-06-22 10:44:14', 0, '2022-06-23 00:00:00', '2022-07-13 16:34:32', null, null, null, null),
        (2, '任务2', 0, '2022-06-22 11:07:24', 0, '2022-06-09 00:00:00', '2022-07-13 16:34:32', null, null, null, null),
        (3, '任务3', 1, '2022-06-22 11:19:23', 0, '2022-06-01 00:00:00', '2022-07-13 16:34:32', null, null, null, null),
        (4, '预警检定任务-1', 0, '2022-06-22 14:07:23', 0, '2022-06-30 00:00:00', '2022-07-13 16:34:32', null, null, null, null),
        (5, '预警检定-2', 0, '2022-06-22 14:09:47', 0, '2022-06-29 00:00:00', '2022-07-13 16:34:32', null, null, null, null),
        (6, '预警检定任务-1', 0, '2022-06-22 14:22:03', 0, '2022-06-10 00:00:00', '2022-07-13 16:34:32', null, null, null, null),
        (7, '预警检定任务-2', 0, '2022-06-22 14:22:28', 2, '2022-06-29 00:00:00', '2022-07-13 16:34:32', null, null, null, null),
        (8, '预警检定任务XX', 0, '2022-06-23 10:43:59', 0, '2022-06-30 00:00:00', '2022-07-13 16:34:32', null, null, null, null),
        (9, 'A任务1', 0, '2022-06-23 10:56:23', 0, '2022-06-30 00:00:00', '2022-07-13 16:34:32', null, null, null, null),
        (10, 'A任务2', 1, '2022-06-23 11:06:10', 0, '2022-06-28 00:00:00', '2022-07-13 16:34:32', null, null, null, null);
CREATE TABLE `p_inspection_result_record` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `inspect_task_detail_id` varchar(255) NOT NULL COMMENT '检定任务详情id',
  `looks_check` int DEFAULT NULL COMMENT '外观检查(0:不合格,1:合格)',
  `insulation_resistance_check` int DEFAULT NULL COMMENT '绝缘电阻(0:不合格,1:合格)',
  `work_temp_amend_value` varchar(16) DEFAULT NULL COMMENT '工作误差温度修正值',
  `accuracy_level` varchar(16) DEFAULT NULL COMMENT '工作误差精确度等级',
  `value_temp_amend_value` varchar(16) DEFAULT NULL COMMENT '示值误差温度修正值',
  `value_errors_check` int DEFAULT NULL COMMENT '示值误差检查(0:不合格,1:合格)',
  `pay_errors_check` int DEFAULT NULL COMMENT '付费金额误差检查(0:不合格,1:合格)',
  `clock_errors` int DEFAULT NULL COMMENT '时钟求值误差检查(0:不合格,1:合格)',
  `conclusion` int DEFAULT NULL COMMENT '结论(0:不合格,1:合格)',
  `check_time` datetime DEFAULT NULL COMMENT '检定时间',
  `report_state` int DEFAULT NULL COMMENT '报告审批状态(0审批驳回,1审批通过,2进行中)',
  `inspect_report` varchar(255) DEFAULT NULL COMMENT '鉴定报告',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=57 DEFAULT CHARSET=utf8mb3 COMMENT='检定结果记录';
insert into p_demo.p_inspection_result_record (id, inspect_task_detail_id, looks_check, insulation_resistance_check, work_temp_amend_value, accuracy_level, value_temp_amend_value, value_errors_check, pay_errors_check, clock_errors, conclusion, check_time, report_state, inspect_report)
values  (1, '10000', 1, 1, '0', '1', '0', 0, 1, 1, null, '2022-07-16 14:20:59', 2, null),
        (2, '44444', 1, null, null, null, null, 1, 1, 1, null, '2022-09-17 15:37:24', null, null),
        (3, '3713841537fc462d8304e3ec60cf803a', null, null, null, null, null, null, null, null, null, '2022-08-02 08:00:00', null, null),
        (4, '537b3970e78441f097a8c6b42dbebdef', null, null, null, null, null, null, null, null, null, '2022-08-03 08:00:00', null, null),
        (5, '319d093d14c44c90810ed90e179d8ec6', 0, 1, '121', '1', '133', 1, 0, 1, null, '2022-11-09 08:00:00', null, null),
        (6, '7271e97cd6cd4472b30f881170ae0d4d', null, null, null, null, null, null, null, null, null, '2022-08-09 08:00:00', null, null),
        (7, 'f2522ac4834546e7aaf31630631ab53b', null, null, null, null, null, null, null, null, null, '2022-08-09 08:00:00', null, null),
        (8, 'fc52a61adc174d7b95de600f7033a336', null, null, null, null, null, null, null, null, null, '2022-08-04 08:00:00', null, null),
        (9, '4a2d79a855374c098a7a4704077e7b73', null, null, null, null, null, null, null, null, null, '2022-08-09 08:00:00', null, null),
        (10, 'e95419eb804f438aa43ef31138cc282e', null, null, null, null, null, null, null, null, null, '2022-08-02 08:00:00', null, null);
  • 配置文件
doris.conf
doris_tables
mysql.conf
mysql_tables
  • mysql_tables
t_demo.t_cickp_charge_connector
t_demo.t_cickp_charge_equipment
p_demo.p_inspection_task
p_demo.p_inspection_result_record

doris_tables

demo.demo1
demo.demo2
demo.demo3
demo2.demo

从idea演示多库多表的情况;

删除字段,修改数据类型;

删除字段:
alter table t_demo.t_cickp_charge_connector drop column CONNECTOR_NAME;
修改数据类型:
alter table p_inspection_result_record modify column looks_check decimal(4,2);

演示监控元数据监控及其原理;

#!/bin/bash
source ../conf/e_mysql/doris.conf
echo "source ../result/e_mysql_to_doris.sql;" |mysql -h$fe_host -P$fe_master_port -uroot -p$fe_password
while (( 1 == 1 ))
do
sleep 60
sh ./e_mysql_to_doris.sh ../result/new_e_mysql_to_doris.sql
old=`md5sum ../result/e_mysql_to_doris.sql |awk -F ' ' '{print $1}'`
new=`md5sum ../result/new_e_mysql_to_doris.sql |awk -F ' ' '{print $1}'`
        if [[ $old != $new ]];then
        x=0
                for table in $(cat ../conf/e_mysql/doris_tables |grep -v '#' | awk -F '\n' '{print $1}')
                do
                       echo "drop table if exists ${table};" |mysql -h$fe_host -P$fe_master_port -uroot -p$fe_password
                done
                echo "source ../result/new_e_mysql_to_doris.sql;" |mysql -h$fe_host -P$fe_master_port -uroot -p$fe_password
                rm -rf ../result/e_mysql_to_doris.sql
                mv ../result/new_e_mysql_to_doris.sql ../result/e_mysql_to_doris.sql
                fi
        rm -f ../result/new_e_mysql_to_doris.sql
done

FlinkCDC实时同步:

mysql中建表:

CREATE DATABASE emp_1;
 USE emp_1;
CREATE TABLE employees_1 (
    emp_no      INT             NOT NULL,
    birth_date  DATE            NOT NULL,
    first_name  VARCHAR(14)     NOT NULL,
    last_name   VARCHAR(16)     NOT NULL,
    gender      ENUM ('M','F')  NOT NULL,    
    hire_date   DATE            NOT NULL,
    PRIMARY KEY (emp_no)
);
INSERT INTO `employees_1` VALUES (10001,'1953-09-02','Georgi','Facello','M','1986-06-26'),
(10002,'1964-06-02','Bezalel','Simmel','F','1985-11-21'),
(10003,'1959-12-03','Parto','Bamford','M','1986-08-28'),
(10004,'1954-05-01','Chirstian','Koblick','M','1986-12-01'),
(10005,'1955-01-21','Kyoichi','Maliniak','M','1989-09-12'),
(10006,'1953-04-20','Anneke','Preusig','F','1989-06-02'),
(10007,'1957-05-23','Tzvetan','Zielinski','F','1989-02-10'),
(10008,'1958-02-19','Saniya','Kalloufi','M','1994-09-15'),
(10009,'1952-04-19','Sumant','Peac','F','1985-02-18'),
(10010,'1963-06-01','Duangkaew','Piveteau','F','1989-08-24'),
(10011,'1953-11-07','Mary','Sluis','F','1990-01-22'),
(10012,'1960-10-04','Patricio','Bridgland','M','1992-12-18'),
(10013,'1963-06-07','Eberhardt','Terkki','M','1985-10-20'),
(10014,'1956-02-12','Berni','Genin','M','1987-03-11'),
(10015,'1959-08-19','Guoxiang','Nooteboom','M','1987-07-02'),
(10016,'1961-05-02','Kazuhito','Cappelletti','M','1995-01-27'),
(10017,'1958-07-06','Cristinel','Bouloucos','F','1993-08-03'),
(10018,'1954-06-19','Kazuhide','Peha','F','1987-04-03'),
(10019,'1953-01-23','Lillian','Haddadi','M','1999-04-30'),
(10020,'1952-12-24','Mayuko','Warwick','M','1991-01-26'),
(10021,'1960-02-20','Ramzi','Erde','M','1988-02-10'),
(10022,'1952-07-08','Shahaf','Famili','M','1995-08-22'),
(10023,'1953-09-29','Bojan','Montemayor','F','1989-12-17'),
(10024,'1958-09-05','Suzette','Pettey','F','1997-05-19'),
(10025,'1958-10-31','Prasadram','Heyers','M','1987-08-17'),
(10026,'1953-04-03','Yongqiao','Berztiss','M','1995-03-20'),
(10027,'1962-07-10','Divier','Reistad','F','1989-07-07'),
(10028,'1963-11-26','Domenick','Tempesti','M','1991-10-22'),
(10029,'1956-12-13','Otmar','Herbst','M','1985-11-20'),
(10030,'1958-07-14','Elvis','Demeyer','M','1994-02-17'),
(10031,'1959-01-27','Karsten','Joslin','M','1991-09-01'),
(10032,'1960-08-09','Jeong','Reistad','F','1990-06-20'),
(10033,'1956-11-14','Arif','Merlo','M','1987-03-18'),
(10034,'1962-12-29','Bader','Swan','M','1988-09-21'),
(10035,'1953-02-08','Alain','Chappelet','M','1988-09-05'),
(10036,'1959-08-10','Adamantios','Portugali','M','1992-01-03');
CREATE TABLE employees_2 (
    emp_no      INT             NOT NULL,
    birth_date  DATE            NOT NULL,
    first_name  VARCHAR(14)     NOT NULL,
    last_name   VARCHAR(16)     NOT NULL,
    gender      ENUM ('M','F')  NOT NULL,    
    hire_date   DATE            NOT NULL,
    PRIMARY KEY (emp_no)
);
INSERT INTO `employees_2` VALUES (10037,'1963-07-22','Pradeep','Makrucki','M','1990-12-05'),
(10038,'1960-07-20','Huan','Lortz','M','1989-09-20'),
(10039,'1959-10-01','Alejandro','Brender','M','1988-01-19'),
(10040,'1959-09-13','Weiyi','Meriste','F','1993-02-14'),
(10041,'1959-08-27','Uri','Lenart','F','1989-11-12'),
(10042,'1956-02-26','Magy','Stamatiou','F','1993-03-21'),
(10043,'1960-09-19','Yishay','Tzvieli','M','1990-10-20'),
(10044,'1961-09-21','Mingsen','Casley','F','1994-05-21'),
(10045,'1957-08-14','Moss','Shanbhogue','M','1989-09-02'),
(10046,'1960-07-23','Lucien','Rosenbaum','M','1992-06-20'),
(10047,'1952-06-29','Zvonko','Nyanchama','M','1989-03-31'),
(10048,'1963-07-11','Florian','Syrotiuk','M','1985-02-24'),
(10049,'1961-04-24','Basil','Tramer','F','1992-05-04'),
(10050,'1958-05-21','Yinghua','Dredge','M','1990-12-25'),
(10051,'1953-07-28','Hidefumi','Caine','M','1992-10-15'),
(10052,'1961-02-26','Heping','Nitsch','M','1988-05-21'),
(10053,'1954-09-13','Sanjiv','Zschoche','F','1986-02-04'),
(10054,'1957-04-04','Mayumi','Schueller','M','1995-03-13');
CREATE DATABASE emp_2;
USE emp_2;
CREATE TABLE employees_1 (
    emp_no      INT             NOT NULL,
    birth_date  DATE            NOT NULL,
    first_name  VARCHAR(14)     NOT NULL,
    last_name   VARCHAR(16)     NOT NULL,
    gender      ENUM ('M','F')  NOT NULL,    
    hire_date   DATE            NOT NULL,
    PRIMARY KEY (emp_no)
);
INSERT INTO `employees_1` VALUES  (10055,'1956-06-06','Georgy','Dredge','M','1992-04-27'),
(10056,'1961-09-01','Brendon','Bernini','F','1990-02-01'),
(10057,'1954-05-30','Ebbe','Callaway','F','1992-01-15'),
(10058,'1954-10-01','Berhard','McFarlin','M','1987-04-13'),
(10059,'1953-09-19','Alejandro','McAlpine','F','1991-06-26'),
(10060,'1961-10-15','Breannda','Billingsley','M','1987-11-02'),
(10061,'1962-10-19','Tse','Herber','M','1985-09-17'),
(10062,'1961-11-02','Anoosh','Peyn','M','1991-08-30'),
(10063,'1952-08-06','Gino','Leonhardt','F','1989-04-08'),
(10064,'1959-04-07','Udi','Jansch','M','1985-11-20'),
(10065,'1963-04-14','Satosi','Awdeh','M','1988-05-18'),
(10066,'1952-11-13','Kwee','Schusler','M','1986-02-26'),
(10067,'1953-01-07','Claudi','Stavenow','M','1987-03-04'),
(10068,'1962-11-26','Charlene','Brattka','M','1987-08-07'),
(10069,'1960-09-06','Margareta','Bierman','F','1989-11-05'),
(10070,'1955-08-20','Reuven','Garigliano','M','1985-10-14'),
(10071,'1958-01-21','Hisao','Lipner','M','1987-10-01'),
(10072,'1952-05-15','Hironoby','Sidou','F','1988-07-21'),
(10073,'1954-02-23','Shir','McClurg','M','1991-12-01'),
(10074,'1955-08-28','Mokhtar','Bernatsky','F','1990-08-13'),
(10075,'1960-03-09','Gao','Dolinsky','F','1987-03-19'),
(10076,'1952-06-13','Erez','Ritzmann','F','1985-07-09'),
(10077,'1964-04-18','Mona','Azuma','M','1990-03-02'),
(10078,'1959-12-25','Danel','Mondadori','F','1987-05-26'),
(10079,'1961-10-05','Kshitij','Gils','F','1986-03-27'),
(10080,'1957-12-03','Premal','Baek','M','1985-11-19'),
(10081,'1960-12-17','Zhongwei','Rosen','M','1986-10-30'),
(10082,'1963-09-09','Parviz','Lortz','M','1990-01-03'),
(10083,'1959-07-23','Vishv','Zockler','M','1987-03-31'),
(10084,'1960-05-25','Tuval','Kalloufi','M','1995-12-15');
CREATE TABLE employees_2(
    emp_no      INT             NOT NULL,
    birth_date  DATE            NOT NULL,
    first_name  VARCHAR(14)     NOT NULL,
    last_name   VARCHAR(16)     NOT NULL,
    gender      ENUM ('M','F')  NOT NULL,    
    hire_date   DATE            NOT NULL,
    PRIMARY KEY (emp_no)
);
INSERT INTO `employees_2` VALUES (10085,'1962-11-07','Kenroku','Malabarba','M','1994-04-09'),
(10086,'1962-11-19','Somnath','Foote','M','1990-02-16'),
(10087,'1959-07-23','Xinglin','Eugenio','F','1986-09-08'),
(10088,'1954-02-25','Jungsoon','Syrzycki','F','1988-09-02'),
(10089,'1963-03-21','Sudharsan','Flasterstein','F','1986-08-12'),
(10090,'1961-05-30','Kendra','Hofting','M','1986-03-14'),
(10091,'1955-10-04','Amabile','Gomatam','M','1992-11-18'),
(10092,'1964-10-18','Valdiodio','Niizuma','F','1989-09-22'),
(10093,'1964-06-11','Sailaja','Desikan','M','1996-11-05'),
(10094,'1957-05-25','Arumugam','Ossenbruggen','F','1987-04-18'),
(10095,'1965-01-03','Hilari','Morton','M','1986-07-15'),
(10096,'1954-09-16','Jayson','Mandell','M','1990-01-14'),
(10097,'1952-02-27','Remzi','Waschkowski','M','1990-09-15'),
(10098,'1961-09-23','Sreekrishna','Servieres','F','1985-05-13'),
(10099,'1956-05-25','Valter','Sullins','F','1988-10-18'),
(10100,'1953-04-21','Hironobu','Haraldson','F','1987-09-21'),
(10101,'1952-04-15','Perla','Heyers','F','1992-12-28'),
(10102,'1959-11-04','Paraskevi','Luby','F','1994-01-26'),
(10103,'1953-11-26','Akemi','Birch','M','1986-12-02'),
(10104,'1961-11-19','Xinyu','Warwick','M','1987-04-16'),
(10105,'1962-02-05','Hironoby','Piveteau','M','1999-03-23'),
(10106,'1952-08-29','Eben','Aingworth','M','1990-12-19'),
(10107,'1956-06-13','Dung','Baca','F','1994-03-22'),
(10108,'1952-04-07','Lunjin','Giveon','M','1986-10-02'),
(10109,'1958-11-25','Mariusz','Prampolini','F','1993-06-16'),
(10110,'1957-03-07','Xuejia','Ullian','F','1986-08-22'),
(10111,'1963-08-29','Hugo','Rosis','F','1988-06-19'),
(10112,'1963-08-13','Yuichiro','Swick','F','1985-10-08'),
(10113,'1963-11-13','Jaewon','Syrzycki','M','1989-12-24'),
(10114,'1957-02-16','Munir','Demeyer','F','1992-07-17'),
(10115,'1964-12-25','Chikara','Rissland','M','1986-01-23'),
(10116,'1955-08-26','Dayanand','Czap','F','1985-05-28');

doris中建表:

create database demo;
use demo;
CREATE TABLE all_employees_info (
    emp_no       int NOT NULL,
    birth_date   date,
    first_name   varchar(20),
    last_name    varchar(20),
    gender       char(2),
    hire_date    date
)
UNIQUE KEY(`emp_no`, `birth_date`)
DISTRIBUTED BY HASH(`birth_date`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
相关实践学习
如何在云端创建MySQL数据库
开始实验后,系统会自动创建一台自建MySQL的 源数据库 ECS 实例和一台 目标数据库 RDS。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
1月前
|
监控 关系型数据库 MySQL
深入了解MySQL主从复制:构建高效稳定的数据同步架构
深入了解MySQL主从复制:构建高效稳定的数据同步架构
99 1
|
2月前
|
canal 消息中间件 关系型数据库
Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
【9月更文挑战第1天】Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
534 4
|
17天前
|
消息中间件 NoSQL 关系型数据库
一文彻底搞定Redis与MySQL的数据同步
【10月更文挑战第21天】本文介绍了 Redis 与 MySQL 数据同步的原因及实现方式。同步的主要目的是为了优化性能和保持数据一致性。实现方式包括基于数据库触发器、应用层双写和使用消息队列。每种方式都有其优缺点,需根据具体场景选择合适的方法。此外,文章还强调了数据同步时需要注意的数据一致性、性能优化和异常处理等问题。
179 0
|
3月前
|
SQL 关系型数据库 MySQL
实时计算 Flink版产品使用问题之两个数据表是否可以同时进行双向的数据同步
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
SQL Oracle 关系型数据库
实时计算 Flink版产品使用问题之Oracle数据库是集群部署的,怎么进行数据同步
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
3月前
|
Oracle 关系型数据库 分布式数据库
实时计算 Flink版产品使用问题之怎么实现跨多个DRDS的数据同步
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
2月前
|
运维 数据处理 数据安全/隐私保护
阿里云实时计算Flink版测评报告
该测评报告详细介绍了阿里云实时计算Flink版在用户行为分析与标签画像中的应用实践,展示了其毫秒级的数据处理能力和高效的开发流程。报告还全面评测了该服务在稳定性、性能、开发运维及安全性方面的卓越表现,并对比自建Flink集群的优势。最后,报告评估了其成本效益,强调了其灵活扩展性和高投资回报率,适合各类实时数据处理需求。
|
4月前
|
存储 监控 大数据
阿里云实时计算Flink在多行业的应用和实践
本文整理自 Flink Forward Asia 2023 中闭门会的分享。主要分享实时计算在各行业的应用实践,对回归实时计算的重点场景进行介绍以及企业如何使用实时计算技术,并且提供一些在技术架构上的参考建议。
811 7
阿里云实时计算Flink在多行业的应用和实践
|
3天前
|
存储 分布式计算 流计算
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
本文介绍了阿里云开源大数据团队在实时计算领域的最新成果——向量化流计算引擎Flash。文章主要内容包括:Apache Flink 成为业界流计算标准、Flash 核心技术解读、性能测试数据以及在阿里巴巴集团的落地效果。Flash 是一款完全兼容 Apache Flink 的新一代流计算引擎,通过向量化技术和 C++ 实现,大幅提升了性能和成本效益。
468 8
实时计算 Flash – 兼容 Flink 的新一代向量化流计算引擎
|
3月前
|
SQL 消息中间件 Kafka
实时计算 Flink版产品使用问题之如何在EMR-Flink的Flink SOL中针对source表单独设置并行度
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。