关系型数据库(RDS)
阿里云关系型数据库(Relational Database Service)简称RDS是一种稳定可靠、可弹性伸缩的在线数据库服务。基于阿里云分布式文件系统和高性能存储,RDS支持MySQL、SQL Server、PostgreSQL和PPAS(Postgre Plus Advanced Server,一种高度兼容 Oracle 的数据库)引擎,并且提供了容灾、备份、恢复、监控、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。
注意:
关系型数据库(RDS/DRDS)插件中的WITH参数一致,可以通用。
在使用关系型数据库(RDS/DRDS)作为结果表时,RDS或DRDS中必须要有真实的表存在。
DDL定义
实时计算支持使用RDS/DRDS作为结果输出(目前仅支持MySql数据存储类型)。示例代码如下。
create table rds_output(
id int,
len int,
content VARCHAR,
primary key(id,len)
) with (
type='rds',
url='jdbc:mysql:XXXXXXXXXX',
tableName='test4',
userName='test',
password='XXXXXX'
);
注意:
- 实时计算写入RDS/DRDS数据库结果表原理:针对实时计算每行结果数据,拼接成一行SQL向目标端数据库进行执行。如果使用批量写,需要在url后面加上参数
?rewriteBatchedStatements=true
,否则性能较差。
- RDS/MySQL支持自增主键。如果需要让实时计算写入数据支持自增主键,在DDL中不声明该自增字段即可。
例如,ID是自增字段,实时计算DDL不写出该自增字段,则数据库在一行数据写入过程中会自动填补相关的自增字段。
- 如果DRDS有分区表,拆分键必须在实时计算DDL里
primary key()
中声明,否则拆分的表无法写入。关于DRDS分库分表的概念可参见DRDS分库分表。
- 建议使用数据存储,参见数据存储云数据库(RDS)。
WITH参数
参数 |
注释说明 |
备注 |
url |
地址 |
RDS的URL地址, DRDS的URL地址
|
tableName |
表名 |
无 |
userName |
用户名 |
无 |
password |
密码 |
无 |
maxRetryTimes |
最大尝试插入次数 |
可选,默认为3 |
batchSize |
每次写的批次大小 |
可选,默认值50,表示每次写多少条。 |
bufferSize |
去重的buffer大小,需要指定主键才生效 |
可选,默认值1(1.4.1默认值改为1000),表示输入的数据达到1条就开始输出。 |
flushIntervalMs |
写超时时间 |
可选,单位毫秒,默认值5000,表示数据超过了5秒,还没有写过,就会将缓存的数据都写一次。 |
excludeUpdateColumns |
对相同key的值更新时排除掉相应的column |
可选,默认为空(primary keys 字段默认会排除) |
ignoreDelete |
是否忽略delete操作 |
默认为false |
partitionBy |
写入Sink节点前,会根据该值做hash。数据会流向相应的hash节点。 |
可选,默认为空。 |
类型映射
RDS字段类型 |
实时计算字段类型 |
text |
varchar |
byte |
varchar |
integer |
int |
long |
bigint |
double |
double |
date |
varchar |
datetime |
varchar |
timestamp |
varchar |
time |
varchar |
year |
varchar |
float |
float |
decimal |
decimal |
char |
varchar |
JDBC 连接参数
参数名称 |
参数说明 |
缺省值 |
最低版本要求 |
useUnicode |
是否使用Unicode字符集,如果参数characterEncoding设置为gb2312或gbk,本参数值必须设置为true。 |
false |
1.1g |
characterEncoding |
当useUnicode设置为true时,指定字符编码。比如可设置为gb2312或gbk。 |
false |
1.1g |
autoReconnect |
当数据库连接异常中断时,是否自动重新连接。 |
false |
1.1 |
autoReconnectForPools |
是否使用针对数据库连接池的重连策略。 |
false |
3.1.3 |
failOverReadOnly |
自动重连成功后,连接是否设置为只读。 |
true |
3.0.12 |
maxReconnects |
autoReconnect设置为true时,重试连接的次数。 |
3 |
1.1 |
initialTimeout |
autoReconnect设置为true时,两次重连之间的时间间隔,单位:秒。 |
2 |
1.1 |
connectTimeout |
和数据库服务器建立socket连接时的超时,单位:毫秒。 0表示永不超时,适用于JDK 1.4及更高版本。 |
0 |
3.0.1 |
socketTimeout |
socket操作(读写)超时,单位:毫秒。 0表示永不超时。 |
0 |
3.0.1 |
FAQ
Q:Flink的结果数据写入RDS表,是按主键更新的,还是新生成一条记录?
A:如果在DDL中定义了主键,会采用insert into on duplicate key update
的方式更新记录,也就意味着对于不存在的主键字段会直接插入,存在的主键字段则更新相应的值。
如果DDL中没有声明primary key,则会用insert into
方式插入记录,追加数据。
Q:使用RDS表中的唯一索引做GROUP BY需要注意什么?
A:RDS中只有一个自增主键,实时计算作业中不能声明为Primary Key;如果需要使用RDS表中的唯一索引做GROUP BY,需要在作业中的Primary Key中声明这些唯一索引。