【建议收藏】Mysql+Flink CDC+Doris 数据同步实战(中)

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
简介: 【建议收藏】Mysql+Flink CDC+Doris 数据同步实战

4、准备工作:

安装部署

Mysql:

①:wget http://dev.mysql.com/get/mysql57-community-release-el7-8.noarch.rpm

②:yum -y localinstall mysql57-community-release-el7-8.noarch.rpm

③:yum -y install mysql-community-server --nogpgcheck

Doris:

安装部署doris;

①:上传tar包

②:修改fe.conf的ip和端口

③:修改be.conf的ip和端口

④:启动fe,添加be节点

前置要求

Flink on yarn

①:安装部署Flink

②:配置环境变量HADOOP_CLASSPATH, 在/etc/profile.d/my.sh中配置

③:配置FLINK_HOME到/etc/profile

export HADOOP_CLASSPATH=`hadoop classpath

Flink doris connector

上传Flink doris connector到flink的lib目录下

Flink CDC

上传FlinkCDC到Flink的lib目录下

Mysql ODBC

①:解压tar包

②:复制文件到linux的/use/lib64目录下

cp lib/* /usr/lib64

③:运行bin目录下的文件

./myodbc-installer -d -a -n "MySQL ODBC 5.3 Driver" -t "DRIVER=/usr/lib64/libmyodbc5w.so;SETUP=/usr/lib64/libmyodbc5w.so"

④:修改be.odbcinst.ini配置文件并且分发所有be节点

5、功能演示

mysql外表同步:

  • mysql中创建表:
    mysql -uroot -proot;
create database t_demo;
use t_demo;
CREATE TABLE `t_cickp_charge_connector` (
  `ID` varchar(32) NOT NULL COMMENT '主键',
  `E_ID` varchar(32) NOT NULL COMMENT '关联充电设备表ID',
  `CONNECTOR_ID` varchar(26) NOT NULL,
  `CONNECTOR_NAME` varchar(30) DEFAULT NULL,
  `CONNECTOR_TYPE` smallint DEFAULT NULL,
  `VOLTAGE_UPPER_LIMIT` int DEFAULT NULL,
  `VOLTAGE_LOWER_LIMIT` int DEFAULT NULL,
  `CONNECTOR_CURRENT` int DEFAULT NULL,
  `CONNECTOR_POWER` decimal(19,10) DEFAULT NULL,
  `PARK_NO` varchar(10) DEFAULT NULL,
  `VOLTAGE` int DEFAULT NULL,
  `BMS_POWER_TYPE` smallint DEFAULT NULL,
  `CREATE_TIME` datetime DEFAULT NULL,
  `UPDATE_TIME` datetime DEFAULT NULL,
  PRIMARY KEY (`ID`),
  KEY `idx_equipment_id` (`E_ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='充电枪表';
insert into t_demo.t_cickp_charge_connector (ID, E_ID, CONNECTOR_ID, CONNECTOR_NAME, CONNECTOR_TYPE, VOLTAGE_UPPER_LIMIT, VOLTAGE_LOWER_LIMIT, CONNECTOR_CURRENT, CONNECTOR_POWER, PARK_NO, VOLTAGE, BMS_POWER_TYPE, CREATE_TIME, UPDATE_TIME)
values  ('000256eb359d470082b20ad4a4edf88e', '207f80bec5e14c27b38042d46930282d', '000000001001132__0', '充电枪01', 3, 750, 220, 112, 84.0000000000, '', 0, 3, '2022-08-22 10:50:12', null),
        ('00081df6bbeb4463bc945bde3076b1b0', 'd619e677456f4bf8998e241dae28692b', 'TS1704140074001', 'TS1704140074', 3, 220, 220, 32, 7.0000000000, null, 220, 255, '2022-07-23 12:30:04', null),
        ('0010e36ba5b6432b812901512d2767b2', 'd7cd94fa66944c80951997ac81b9f53f', 'HE121118010071001', 'HE121118010071', 3, 220, 220, 32, 7.0000000000, null, 220, 255, '2022-07-23 15:40:43', null),
        ('001a672ca25f45deab52deae778a98e1', '3bc1663886a942febf04cb39ea8ea803', '500085001', '500085A', 4, 0, 0, 200, 120.0000000000, '', 750, 1, '2021-10-12 16:05:53', null),
        ('001cca79ea2f4ac7a73e830165ceb0cd', '02823a48b93047dd955839f34948084b', 'HE121118062736_1001', 'HE121118062736_1', 3, 220, 220, 32, 7.0000000000, null, 220, 255, '2022-07-23 12:29:56', null),
        ('002f30773c674d32960ae5ec1230ee16', 'bc296f5e53c24594b839ebea34ff565f', 'HE121118040817_1001', 'HE121118040817_1', 3, 220, 220, 32, 7.0000000000, null, 220, 255, '2022-07-23 12:29:52', null),
        ('003b630113fb45f6ad52337ad4e21619', '0313985f1b274a50a861a29287776551', 'HE121122010801001', 'HE121122010801', 3, 220, 220, 32, 7.0000000000, null, 220, 255, '2022-07-23 12:30:19', null),
        ('003dc6e54c60427cb5a1003de4504877', '178de38f88454d85bc3fd36fb2c63c28', 'GP1804280341001', 'GP1804280341', 3, 220, 220, 32, 7.0000000000, null, 220, 255, '2022-07-23 12:30:25', null),
        ('003e884f521442b4ab9bf050395f0b7f', 'fc9b29532bfe4cc9a10647f4ed2ea7e2', 'HE121118063280_1001', 'HE121118063280_1', 3, 220, 220, 32, 7.0000000000, null, 220, 255, '2022-07-23 15:40:44', null),
        ('004e481d713242c1b49026b45042809a', '3949d30b22e7414b953bca61ee973476', 'HE121121050028001', 'HE121121050028', 3, 220, 220, 32, 7.0000000000, null, 220, 255, '2022-07-23 15:40:44', null);
CREATE TABLE `t_cickp_charge_equipment` (
  `ID` varchar(32) NOT NULL COMMENT '主键',
  `S_ID` varchar(32) NOT NULL COMMENT '关联充电站表ID',
  `EQUIPMENT_ID` varchar(24) NOT NULL,
  `MANUFACTURER_ID` varchar(10) DEFAULT NULL,
  `EQUIPMENT_MODEL` varchar(20) DEFAULT NULL,
  `PRODUCTION_DATE` varchar(10) DEFAULT NULL,
  `EQUIPMENT_TYPE` smallint DEFAULT NULL,
  `EQUIPMENT_LNG` decimal(19,6) DEFAULT NULL,
  `EQUIPMENT_LAT` decimal(19,6) DEFAULT NULL,
  `EQUIPMENT_NAME` varchar(30) DEFAULT NULL,
  `EQUIPMENT_TOTAL_POWER` decimal(19,1) DEFAULT NULL,
  `MANUFACTURER_NAME` varchar(30) DEFAULT NULL,
  `EQUIPMENT_ORDER` varchar(255) DEFAULT NULL,
  `EQUIPMENT_STATUS` smallint DEFAULT NULL,
  `EQUIPMENT_POWER` decimal(19,1) DEFAULT NULL,
  `NEW_NATIONAL_STANDARD` smallint DEFAULT NULL,
  `CREATE_TIME` datetime DEFAULT NULL,
  `UPDATE_TIME` datetime DEFAULT NULL,
  `ACCURACY_LEVEL` smallint DEFAULT NULL,
  `CHECK_TIME_LAST` varchar(30) DEFAULT NULL,
  `CHECK_TIME_NEXT` varchar(30) DEFAULT NULL,
  `CERTIFICATE` varchar(255) DEFAULT NULL,
  `start_using_date` varchar(10) CHARACTER SET utf8 COLLATE utf8_general_ci DEFAULT NULL COMMENT '投入使用时间',
  `put_into_use` datetime(3) DEFAULT NULL COMMENT '投入使用时间',
  PRIMARY KEY (`ID`),
  KEY `idx_station_id` (`S_ID`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb3 COMMENT='充电桩表';
insert into t_demo.t_cickp_charge_equipment (ID, S_ID, EQUIPMENT_ID, MANUFACTURER_ID, EQUIPMENT_MODEL, PRODUCTION_DATE, EQUIPMENT_TYPE, EQUIPMENT_LNG, EQUIPMENT_LAT, EQUIPMENT_NAME, EQUIPMENT_TOTAL_POWER, MANUFACTURER_NAME, EQUIPMENT_ORDER, EQUIPMENT_STATUS, EQUIPMENT_POWER, NEW_NATIONAL_STANDARD, CREATE_TIME, UPDATE_TIME, ACCURACY_LEVEL, CHECK_TIME_LAST, CHECK_TIME_NEXT, CERTIFICATE, start_using_date, put_into_use)
values  ('0014d314ec694faead804302efabfeee', '4392ed2f04154473b9f8b91c2a938741', 'HE121121050250', '360051856', 'ZL30-A6', '2021-10-08', 2, 113.905457, 22.771673, 'HE121121050250', 7.0, null, null, 50, 7.0, 2, '2022-07-23 12:30:24', null, null, null, null, null, null, null),
        ('001e0b7c5dc7436fb5479179080983f5', 'bf036a0583334522acab7b2f33444608', 'HE121119050138', '360051856', 'ZL30-A6', '2020-05-18', 2, 114.322876, 22.693771, 'HE121119050138', 7.0, null, null, 50, 7.0, 2, '2022-07-23 12:30:29', null, null, null, null, null, null, null),
        ('002c8597b9b241978b0a2d700e32cfae', '3465e6c1098a41d9893aa8b52727798d', '500020', 'MA5DRRDX1', 'ZDDC120BG', null, 1, 0.000000, 0.000000, '17', 120.0, '深圳智电新能源科技有限公司', null, 50, 120.0, 2, '2021-10-12 16:01:34', '2022-06-20 12:12:43', null, null, null, null, null, null),
        ('008b368751c643219da292fd76d569f1', '65ce25d50c1642799bd97759da568373', '000000001046001', 'MA5DA0053', 'CL5823', '2017-06-02', 1, 114.352464, 22.711021, '1号桩', 120.0, '深圳车电网', null, 50, 120.0, 2, '2022-08-22 10:50:06', null, null, null, null, null, null, null),
        ('009cb3f213384199b9d1816fb6900dcc', '1713051024a74b15874ffabd0411890c', '000000001063015', 'MA5DA0053', 'CL5899', '2021-11-27', 2, 113.920228, 22.535621, '交流充电桩', 7.0, '深圳市车电网络有限公司', null, 50, 7.0, 2, '2022-05-30 11:19:17', '2022-06-14 16:32:47', null, null, null, null, null, null),
        ('00ba38325d0e4804960bbb591f0f69e1', '7a69943d322a411e92132ca6989c55f7', 'HE121121052645', '360051856', 'ZL30-A6', '2022-01-10', 2, 113.909195, 22.601933, 'HE121121052645', 7.0, null, null, 50, 7.0, 2, '2022-07-23 12:30:04', null, null, null, null, null, null, null),
        ('00c25458804446d8b2fad1d007c1c812', 'cd14b93718ee468cbeccf1e84a2583cb', 'HE121119052764', '360051856', 'ZL30-A6', '2019-07-26', 2, 114.049232, 22.701670, 'HE121119052764', 7.0, null, null, 50, 7.0, 2, '2022-07-23 12:29:55', null, null, null, null, null, null, null),
        ('00d113034d9b4067bd12afbea03a1b52', '1c0a1b912d974a828ba82c620e0bba26', 'HE121120041074', '360051856', 'ZL30-A6', '2021-05-12', 2, 113.946404, 22.498718, 'HE121120041074', 7.0, null, null, 50, 7.0, 2, '2022-07-23 12:30:15', null, null, null, null, null, null, null),
        ('00d61d48e8004df095eb74985258d493', '8b7214a83fd74cd1bce14ace5e129107', 'HE121119030713_1', '360051856', 'ZL30-A6', '2019-05-15', 2, 113.883667, 22.788374, 'HE121119030713_1', 7.0, null, null, 50, 7.0, 2, '2022-07-23 12:29:57', null, null, null, null, null, null, null),
        ('00d70d75e8ce44ba9eed370f67432bff', '4f52fcd099f547db8b44421d22d53dec', 'TS1704250011', '360051856', 'JL7-A2-TS', '2017-07-17', 2, 113.930763, 22.523027, 'TS1704250011', 7.0, null, null, 50, 7.0, 2, '2022-07-23 12:30:11', null, null, null, null, null, null, null);
create database p_demo;
use p_demo;
CREATE TABLE `p_inspection_task` (
  `id` bigint NOT NULL AUTO_INCREMENT COMMENT '检定任务id',
  `name` varchar(63) DEFAULT NULL COMMENT '任务名称',
  `type` int DEFAULT NULL COMMENT '任务类型(0:指定检定,1:桩随机抽检,2:双随机抽检)',
  `task_issue_date` datetime DEFAULT NULL COMMENT '任务下达日期',
  `status` int DEFAULT '0' COMMENT '状态(0:新建,1:已下达,2:进行中,3:已完成)',
  `task_achieve_time` datetime DEFAULT NULL COMMENT '实际完成时间',
  `create_time` datetime DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
  `update_time` datetime DEFAULT NULL COMMENT '更新时间',
  `deadline` datetime DEFAULT NULL COMMENT '计划完成时间',
  `charger_platform_task` bigint DEFAULT NULL COMMENT '充电桩平台下发的任务id',
  `creator` int DEFAULT NULL COMMENT '创建者',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=178 DEFAULT CHARSET=utf8mb3 COMMENT='检定任务表';
insert into p_demo.p_inspection_task (id, name, type, task_issue_date, status, task_achieve_time, create_time, update_time, deadline, charger_platform_task, creator)
values  (1, '任务1', 0, '2022-06-22 10:44:14', 0, '2022-06-23 00:00:00', '2022-07-13 16:34:32', null, null, null, null),
        (2, '任务2', 0, '2022-06-22 11:07:24', 0, '2022-06-09 00:00:00', '2022-07-13 16:34:32', null, null, null, null),
        (3, '任务3', 1, '2022-06-22 11:19:23', 0, '2022-06-01 00:00:00', '2022-07-13 16:34:32', null, null, null, null),
        (4, '预警检定任务-1', 0, '2022-06-22 14:07:23', 0, '2022-06-30 00:00:00', '2022-07-13 16:34:32', null, null, null, null),
        (5, '预警检定-2', 0, '2022-06-22 14:09:47', 0, '2022-06-29 00:00:00', '2022-07-13 16:34:32', null, null, null, null),
        (6, '预警检定任务-1', 0, '2022-06-22 14:22:03', 0, '2022-06-10 00:00:00', '2022-07-13 16:34:32', null, null, null, null),
        (7, '预警检定任务-2', 0, '2022-06-22 14:22:28', 2, '2022-06-29 00:00:00', '2022-07-13 16:34:32', null, null, null, null),
        (8, '预警检定任务XX', 0, '2022-06-23 10:43:59', 0, '2022-06-30 00:00:00', '2022-07-13 16:34:32', null, null, null, null),
        (9, 'A任务1', 0, '2022-06-23 10:56:23', 0, '2022-06-30 00:00:00', '2022-07-13 16:34:32', null, null, null, null),
        (10, 'A任务2', 1, '2022-06-23 11:06:10', 0, '2022-06-28 00:00:00', '2022-07-13 16:34:32', null, null, null, null);
CREATE TABLE `p_inspection_result_record` (
  `id` bigint NOT NULL AUTO_INCREMENT,
  `inspect_task_detail_id` varchar(255) NOT NULL COMMENT '检定任务详情id',
  `looks_check` int DEFAULT NULL COMMENT '外观检查(0:不合格,1:合格)',
  `insulation_resistance_check` int DEFAULT NULL COMMENT '绝缘电阻(0:不合格,1:合格)',
  `work_temp_amend_value` varchar(16) DEFAULT NULL COMMENT '工作误差温度修正值',
  `accuracy_level` varchar(16) DEFAULT NULL COMMENT '工作误差精确度等级',
  `value_temp_amend_value` varchar(16) DEFAULT NULL COMMENT '示值误差温度修正值',
  `value_errors_check` int DEFAULT NULL COMMENT '示值误差检查(0:不合格,1:合格)',
  `pay_errors_check` int DEFAULT NULL COMMENT '付费金额误差检查(0:不合格,1:合格)',
  `clock_errors` int DEFAULT NULL COMMENT '时钟求值误差检查(0:不合格,1:合格)',
  `conclusion` int DEFAULT NULL COMMENT '结论(0:不合格,1:合格)',
  `check_time` datetime DEFAULT NULL COMMENT '检定时间',
  `report_state` int DEFAULT NULL COMMENT '报告审批状态(0审批驳回,1审批通过,2进行中)',
  `inspect_report` varchar(255) DEFAULT NULL COMMENT '鉴定报告',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=57 DEFAULT CHARSET=utf8mb3 COMMENT='检定结果记录';
insert into p_demo.p_inspection_result_record (id, inspect_task_detail_id, looks_check, insulation_resistance_check, work_temp_amend_value, accuracy_level, value_temp_amend_value, value_errors_check, pay_errors_check, clock_errors, conclusion, check_time, report_state, inspect_report)
values  (1, '10000', 1, 1, '0', '1', '0', 0, 1, 1, null, '2022-07-16 14:20:59', 2, null),
        (2, '44444', 1, null, null, null, null, 1, 1, 1, null, '2022-09-17 15:37:24', null, null),
        (3, '3713841537fc462d8304e3ec60cf803a', null, null, null, null, null, null, null, null, null, '2022-08-02 08:00:00', null, null),
        (4, '537b3970e78441f097a8c6b42dbebdef', null, null, null, null, null, null, null, null, null, '2022-08-03 08:00:00', null, null),
        (5, '319d093d14c44c90810ed90e179d8ec6', 0, 1, '121', '1', '133', 1, 0, 1, null, '2022-11-09 08:00:00', null, null),
        (6, '7271e97cd6cd4472b30f881170ae0d4d', null, null, null, null, null, null, null, null, null, '2022-08-09 08:00:00', null, null),
        (7, 'f2522ac4834546e7aaf31630631ab53b', null, null, null, null, null, null, null, null, null, '2022-08-09 08:00:00', null, null),
        (8, 'fc52a61adc174d7b95de600f7033a336', null, null, null, null, null, null, null, null, null, '2022-08-04 08:00:00', null, null),
        (9, '4a2d79a855374c098a7a4704077e7b73', null, null, null, null, null, null, null, null, null, '2022-08-09 08:00:00', null, null),
        (10, 'e95419eb804f438aa43ef31138cc282e', null, null, null, null, null, null, null, null, null, '2022-08-02 08:00:00', null, null);
  • 配置文件
doris.conf
doris_tables
mysql.conf
mysql_tables
  • mysql_tables
t_demo.t_cickp_charge_connector
t_demo.t_cickp_charge_equipment
p_demo.p_inspection_task
p_demo.p_inspection_result_record

doris_tables

demo.demo1
demo.demo2
demo.demo3
demo2.demo

从idea演示多库多表的情况;

删除字段,修改数据类型;

删除字段:
alter table t_demo.t_cickp_charge_connector drop column CONNECTOR_NAME;
修改数据类型:
alter table p_inspection_result_record modify column looks_check decimal(4,2);

演示监控元数据监控及其原理;

#!/bin/bash
source ../conf/e_mysql/doris.conf
echo "source ../result/e_mysql_to_doris.sql;" |mysql -h$fe_host -P$fe_master_port -uroot -p$fe_password
while (( 1 == 1 ))
do
sleep 60
sh ./e_mysql_to_doris.sh ../result/new_e_mysql_to_doris.sql
old=`md5sum ../result/e_mysql_to_doris.sql |awk -F ' ' '{print $1}'`
new=`md5sum ../result/new_e_mysql_to_doris.sql |awk -F ' ' '{print $1}'`
        if [[ $old != $new ]];then
        x=0
                for table in $(cat ../conf/e_mysql/doris_tables |grep -v '#' | awk -F '\n' '{print $1}')
                do
                       echo "drop table if exists ${table};" |mysql -h$fe_host -P$fe_master_port -uroot -p$fe_password
                done
                echo "source ../result/new_e_mysql_to_doris.sql;" |mysql -h$fe_host -P$fe_master_port -uroot -p$fe_password
                rm -rf ../result/e_mysql_to_doris.sql
                mv ../result/new_e_mysql_to_doris.sql ../result/e_mysql_to_doris.sql
                fi
        rm -f ../result/new_e_mysql_to_doris.sql
done

FlinkCDC实时同步:

mysql中建表:

CREATE DATABASE emp_1;
 USE emp_1;
CREATE TABLE employees_1 (
    emp_no      INT             NOT NULL,
    birth_date  DATE            NOT NULL,
    first_name  VARCHAR(14)     NOT NULL,
    last_name   VARCHAR(16)     NOT NULL,
    gender      ENUM ('M','F')  NOT NULL,    
    hire_date   DATE            NOT NULL,
    PRIMARY KEY (emp_no)
);
INSERT INTO `employees_1` VALUES (10001,'1953-09-02','Georgi','Facello','M','1986-06-26'),
(10002,'1964-06-02','Bezalel','Simmel','F','1985-11-21'),
(10003,'1959-12-03','Parto','Bamford','M','1986-08-28'),
(10004,'1954-05-01','Chirstian','Koblick','M','1986-12-01'),
(10005,'1955-01-21','Kyoichi','Maliniak','M','1989-09-12'),
(10006,'1953-04-20','Anneke','Preusig','F','1989-06-02'),
(10007,'1957-05-23','Tzvetan','Zielinski','F','1989-02-10'),
(10008,'1958-02-19','Saniya','Kalloufi','M','1994-09-15'),
(10009,'1952-04-19','Sumant','Peac','F','1985-02-18'),
(10010,'1963-06-01','Duangkaew','Piveteau','F','1989-08-24'),
(10011,'1953-11-07','Mary','Sluis','F','1990-01-22'),
(10012,'1960-10-04','Patricio','Bridgland','M','1992-12-18'),
(10013,'1963-06-07','Eberhardt','Terkki','M','1985-10-20'),
(10014,'1956-02-12','Berni','Genin','M','1987-03-11'),
(10015,'1959-08-19','Guoxiang','Nooteboom','M','1987-07-02'),
(10016,'1961-05-02','Kazuhito','Cappelletti','M','1995-01-27'),
(10017,'1958-07-06','Cristinel','Bouloucos','F','1993-08-03'),
(10018,'1954-06-19','Kazuhide','Peha','F','1987-04-03'),
(10019,'1953-01-23','Lillian','Haddadi','M','1999-04-30'),
(10020,'1952-12-24','Mayuko','Warwick','M','1991-01-26'),
(10021,'1960-02-20','Ramzi','Erde','M','1988-02-10'),
(10022,'1952-07-08','Shahaf','Famili','M','1995-08-22'),
(10023,'1953-09-29','Bojan','Montemayor','F','1989-12-17'),
(10024,'1958-09-05','Suzette','Pettey','F','1997-05-19'),
(10025,'1958-10-31','Prasadram','Heyers','M','1987-08-17'),
(10026,'1953-04-03','Yongqiao','Berztiss','M','1995-03-20'),
(10027,'1962-07-10','Divier','Reistad','F','1989-07-07'),
(10028,'1963-11-26','Domenick','Tempesti','M','1991-10-22'),
(10029,'1956-12-13','Otmar','Herbst','M','1985-11-20'),
(10030,'1958-07-14','Elvis','Demeyer','M','1994-02-17'),
(10031,'1959-01-27','Karsten','Joslin','M','1991-09-01'),
(10032,'1960-08-09','Jeong','Reistad','F','1990-06-20'),
(10033,'1956-11-14','Arif','Merlo','M','1987-03-18'),
(10034,'1962-12-29','Bader','Swan','M','1988-09-21'),
(10035,'1953-02-08','Alain','Chappelet','M','1988-09-05'),
(10036,'1959-08-10','Adamantios','Portugali','M','1992-01-03');
CREATE TABLE employees_2 (
    emp_no      INT             NOT NULL,
    birth_date  DATE            NOT NULL,
    first_name  VARCHAR(14)     NOT NULL,
    last_name   VARCHAR(16)     NOT NULL,
    gender      ENUM ('M','F')  NOT NULL,    
    hire_date   DATE            NOT NULL,
    PRIMARY KEY (emp_no)
);
INSERT INTO `employees_2` VALUES (10037,'1963-07-22','Pradeep','Makrucki','M','1990-12-05'),
(10038,'1960-07-20','Huan','Lortz','M','1989-09-20'),
(10039,'1959-10-01','Alejandro','Brender','M','1988-01-19'),
(10040,'1959-09-13','Weiyi','Meriste','F','1993-02-14'),
(10041,'1959-08-27','Uri','Lenart','F','1989-11-12'),
(10042,'1956-02-26','Magy','Stamatiou','F','1993-03-21'),
(10043,'1960-09-19','Yishay','Tzvieli','M','1990-10-20'),
(10044,'1961-09-21','Mingsen','Casley','F','1994-05-21'),
(10045,'1957-08-14','Moss','Shanbhogue','M','1989-09-02'),
(10046,'1960-07-23','Lucien','Rosenbaum','M','1992-06-20'),
(10047,'1952-06-29','Zvonko','Nyanchama','M','1989-03-31'),
(10048,'1963-07-11','Florian','Syrotiuk','M','1985-02-24'),
(10049,'1961-04-24','Basil','Tramer','F','1992-05-04'),
(10050,'1958-05-21','Yinghua','Dredge','M','1990-12-25'),
(10051,'1953-07-28','Hidefumi','Caine','M','1992-10-15'),
(10052,'1961-02-26','Heping','Nitsch','M','1988-05-21'),
(10053,'1954-09-13','Sanjiv','Zschoche','F','1986-02-04'),
(10054,'1957-04-04','Mayumi','Schueller','M','1995-03-13');
CREATE DATABASE emp_2;
USE emp_2;
CREATE TABLE employees_1 (
    emp_no      INT             NOT NULL,
    birth_date  DATE            NOT NULL,
    first_name  VARCHAR(14)     NOT NULL,
    last_name   VARCHAR(16)     NOT NULL,
    gender      ENUM ('M','F')  NOT NULL,    
    hire_date   DATE            NOT NULL,
    PRIMARY KEY (emp_no)
);
INSERT INTO `employees_1` VALUES  (10055,'1956-06-06','Georgy','Dredge','M','1992-04-27'),
(10056,'1961-09-01','Brendon','Bernini','F','1990-02-01'),
(10057,'1954-05-30','Ebbe','Callaway','F','1992-01-15'),
(10058,'1954-10-01','Berhard','McFarlin','M','1987-04-13'),
(10059,'1953-09-19','Alejandro','McAlpine','F','1991-06-26'),
(10060,'1961-10-15','Breannda','Billingsley','M','1987-11-02'),
(10061,'1962-10-19','Tse','Herber','M','1985-09-17'),
(10062,'1961-11-02','Anoosh','Peyn','M','1991-08-30'),
(10063,'1952-08-06','Gino','Leonhardt','F','1989-04-08'),
(10064,'1959-04-07','Udi','Jansch','M','1985-11-20'),
(10065,'1963-04-14','Satosi','Awdeh','M','1988-05-18'),
(10066,'1952-11-13','Kwee','Schusler','M','1986-02-26'),
(10067,'1953-01-07','Claudi','Stavenow','M','1987-03-04'),
(10068,'1962-11-26','Charlene','Brattka','M','1987-08-07'),
(10069,'1960-09-06','Margareta','Bierman','F','1989-11-05'),
(10070,'1955-08-20','Reuven','Garigliano','M','1985-10-14'),
(10071,'1958-01-21','Hisao','Lipner','M','1987-10-01'),
(10072,'1952-05-15','Hironoby','Sidou','F','1988-07-21'),
(10073,'1954-02-23','Shir','McClurg','M','1991-12-01'),
(10074,'1955-08-28','Mokhtar','Bernatsky','F','1990-08-13'),
(10075,'1960-03-09','Gao','Dolinsky','F','1987-03-19'),
(10076,'1952-06-13','Erez','Ritzmann','F','1985-07-09'),
(10077,'1964-04-18','Mona','Azuma','M','1990-03-02'),
(10078,'1959-12-25','Danel','Mondadori','F','1987-05-26'),
(10079,'1961-10-05','Kshitij','Gils','F','1986-03-27'),
(10080,'1957-12-03','Premal','Baek','M','1985-11-19'),
(10081,'1960-12-17','Zhongwei','Rosen','M','1986-10-30'),
(10082,'1963-09-09','Parviz','Lortz','M','1990-01-03'),
(10083,'1959-07-23','Vishv','Zockler','M','1987-03-31'),
(10084,'1960-05-25','Tuval','Kalloufi','M','1995-12-15');
CREATE TABLE employees_2(
    emp_no      INT             NOT NULL,
    birth_date  DATE            NOT NULL,
    first_name  VARCHAR(14)     NOT NULL,
    last_name   VARCHAR(16)     NOT NULL,
    gender      ENUM ('M','F')  NOT NULL,    
    hire_date   DATE            NOT NULL,
    PRIMARY KEY (emp_no)
);
INSERT INTO `employees_2` VALUES (10085,'1962-11-07','Kenroku','Malabarba','M','1994-04-09'),
(10086,'1962-11-19','Somnath','Foote','M','1990-02-16'),
(10087,'1959-07-23','Xinglin','Eugenio','F','1986-09-08'),
(10088,'1954-02-25','Jungsoon','Syrzycki','F','1988-09-02'),
(10089,'1963-03-21','Sudharsan','Flasterstein','F','1986-08-12'),
(10090,'1961-05-30','Kendra','Hofting','M','1986-03-14'),
(10091,'1955-10-04','Amabile','Gomatam','M','1992-11-18'),
(10092,'1964-10-18','Valdiodio','Niizuma','F','1989-09-22'),
(10093,'1964-06-11','Sailaja','Desikan','M','1996-11-05'),
(10094,'1957-05-25','Arumugam','Ossenbruggen','F','1987-04-18'),
(10095,'1965-01-03','Hilari','Morton','M','1986-07-15'),
(10096,'1954-09-16','Jayson','Mandell','M','1990-01-14'),
(10097,'1952-02-27','Remzi','Waschkowski','M','1990-09-15'),
(10098,'1961-09-23','Sreekrishna','Servieres','F','1985-05-13'),
(10099,'1956-05-25','Valter','Sullins','F','1988-10-18'),
(10100,'1953-04-21','Hironobu','Haraldson','F','1987-09-21'),
(10101,'1952-04-15','Perla','Heyers','F','1992-12-28'),
(10102,'1959-11-04','Paraskevi','Luby','F','1994-01-26'),
(10103,'1953-11-26','Akemi','Birch','M','1986-12-02'),
(10104,'1961-11-19','Xinyu','Warwick','M','1987-04-16'),
(10105,'1962-02-05','Hironoby','Piveteau','M','1999-03-23'),
(10106,'1952-08-29','Eben','Aingworth','M','1990-12-19'),
(10107,'1956-06-13','Dung','Baca','F','1994-03-22'),
(10108,'1952-04-07','Lunjin','Giveon','M','1986-10-02'),
(10109,'1958-11-25','Mariusz','Prampolini','F','1993-06-16'),
(10110,'1957-03-07','Xuejia','Ullian','F','1986-08-22'),
(10111,'1963-08-29','Hugo','Rosis','F','1988-06-19'),
(10112,'1963-08-13','Yuichiro','Swick','F','1985-10-08'),
(10113,'1963-11-13','Jaewon','Syrzycki','M','1989-12-24'),
(10114,'1957-02-16','Munir','Demeyer','F','1992-07-17'),
(10115,'1964-12-25','Chikara','Rissland','M','1986-01-23'),
(10116,'1955-08-26','Dayanand','Czap','F','1985-05-28');

doris中建表:

create database demo;
use demo;
CREATE TABLE all_employees_info (
    emp_no       int NOT NULL,
    birth_date   date,
    first_name   varchar(20),
    last_name    varchar(20),
    gender       char(2),
    hire_date    date
)
UNIQUE KEY(`emp_no`, `birth_date`)
DISTRIBUTED BY HASH(`birth_date`) BUCKETS 1
PROPERTIES (
"replication_allocation" = "tag.location.default: 1"
);
相关实践学习
基于CentOS快速搭建LAMP环境
本教程介绍如何搭建LAMP环境,其中LAMP分别代表Linux、Apache、MySQL和PHP。
全面了解阿里云能为你做什么
阿里云在全球各地部署高效节能的绿色数据中心,利用清洁计算为万物互联的新世界提供源源不断的能源动力,目前开服的区域包括中国(华北、华东、华南、香港)、新加坡、美国(美东、美西)、欧洲、中东、澳大利亚、日本。目前阿里云的产品涵盖弹性计算、数据库、存储与CDN、分析与搜索、云通信、网络、管理与监控、应用服务、互联网中间件、移动服务、视频服务等。通过本课程,来了解阿里云能够为你的业务带来哪些帮助     相关的阿里云产品:云服务器ECS 云服务器 ECS(Elastic Compute Service)是一种弹性可伸缩的计算服务,助您降低 IT 成本,提升运维效率,使您更专注于核心业务创新。产品详情: https://www.aliyun.com/product/ecs
相关文章
|
1月前
|
消息中间件 Java Kafka
Flink CDC 在外部查询某个 job 中的表数据
【2月更文挑战第27天】Flink CDC 在外部查询某个 job 中的表数据
40 5
|
1月前
|
API 数据库 流计算
有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
【2月更文挑战第27天】有大佬知道在使用flink cdc实现数据同步,如何实现如果服务停止了对数据源表的某个数据进行删除操作,重启服务之后目标表能进行对源表删除的数据进行删除吗?
50 3
|
20天前
|
SQL 关系型数据库 MySQL
【MySQL技术专题】「问题实战系列」深入探索和分析MySQL数据库的数据备份和恢复实战开发指南(8.0版本升级篇)
【MySQL技术专题】「问题实战系列」深入探索和分析MySQL数据库的数据备份和恢复实战开发指南(8.0版本升级篇)
94 0
|
1月前
|
自然语言处理 Java Scala
Flink CDC产品常见问题之大文件整库同步怎么解决
Flink CDC产品常见问题之大文件整库同步怎么解决
|
8天前
|
存储 SQL 关系型数据库
【MySQL实战笔记】03.事务隔离:为什么你改了我还看不见?-02
【4月更文挑战第7天】数据库通过视图实现事务隔离,不同隔离级别如读未提交、读已提交、可重复读和串行化采用不同策略。以可重复读为例,MySQL使用多版本并发控制(MVCC),每个事务有其独立的视图。回滚日志在无更早视图时被删除。长事务可能导致大量存储占用,应避免。事务启动可显式用`begin`或设置`autocommit=0`,但后者可能意外开启长事务。建议使用`autocommit=1`并显式管理事务,若需减少交互,可使用`commit work and chain`。
28 5
|
10天前
|
SQL 存储 关系型数据库
【MySQL实战笔记】02.一条SQL更新语句是如何执行的-2
【4月更文挑战第5天】两阶段提交是为确保`redo log`和`binlog`逻辑一致,避免数据不一致。若先写`redo log`, crash后数据可能丢失,导致恢复后状态错误;若先写`binlog`,crash则可能导致重复事务,影响数据库一致性。一天一备相较于一周一备,能缩短“最长恢复时间”,但需权衡额外的存储成本。
15 1
|
20天前
|
canal 消息中间件 关系型数据库
【分布式技术专题】「分布式技术架构」MySQL数据同步到Elasticsearch之N种方案解析,实现高效数据同步
【分布式技术专题】「分布式技术架构」MySQL数据同步到Elasticsearch之N种方案解析,实现高效数据同步
66 0
|
20天前
|
SQL 关系型数据库 MySQL
【MySQL技术专题】「问题实战系列」深入探索和分析MySQL数据库的数据备份和恢复实战开发指南(数据恢复补充篇)(一)
【MySQL技术专题】「问题实战系列」深入探索和分析MySQL数据库的数据备份和恢复实战开发指南(数据恢复补充篇)
29 0
|
26天前
|
Java 关系型数据库 MySQL
Flink1.18.1和CDC2.4.1 本地没问题 提交任务到服务器 报错java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
【2月更文挑战第33天】Flink1.18.1和CDC2.4.1 本地没问题 提交任务到服务器 报错java.lang.NoClassDefFoundError: Could not initialize class io.debezium.connector.mysql.MySqlConnectorConfig
46 2
|
29天前
|
消息中间件 Kafka 流计算
如果有多个版本的Flink CDC在同一环境中运行,可能会导致Debezium版本冲突
【2月更文挑战第30天】如果有多个版本的Flink CDC在同一环境中运行,可能会导致Debezium版本冲突
19 2