Apache Flink 漫谈系列(06) - 流表对偶(duality)性

简介: 实际问题 很多大数据计算产品,都对用户提供了SQL API,比如Hive, Spark, Flink等,那么SQL作为传统关系数据库的查询语言,是应用在批查询场景的。Hive和Spark本质上都是Batch的计算模式(在《Apache Flink 漫谈系列 - 概述》我们介绍过Spark是Micr.

实际问题

很多大数据计算产品,都对用户提供了SQL API,比如Hive, Spark, Flink等,那么SQL作为传统关系数据库的查询语言,是应用在批查询场景的。Hive和Spark本质上都是Batch的计算模式(在《Apache Flink 漫谈系列 - 概述》我们介绍过Spark是Micro Batching模式),提供SQL API很容易被人理解,但是Flink是纯流(Native Streaming)的计算模式, 流与批在数据集和计算过程上有很大的区别,如下:
image

  • 批查询场景的特点 - 有限数据集,一次查询返回一个计算结果就结束查询

  • 流查询场景的特点 - 无限数据集,一次查询不断修正计算结果,查询永远不结束

我们发现批与流的查询场景在数据集合和计算过程上都有很大的不同,那么基于Native Streaming模式的Apache Flink为啥也能为用户提供SQL API呢?

流与批的语义关系

我们知道SQL都是作用于关系表的,在传统数据库中进行查询时候,SQL所要查询的表在触发查询时候数据是不会变化的,也就是说在查询那一刻,表是一张静态表,相当于是一个有限的批数据,这样也说明SQL是源于对批计算的查询的,那么要回答Apache Flink为啥也能为用户提供SQL API,我们首先要理解流与批在语义层面的关系。我们以一个具体示例说明,如下图:
image

上图展现的是一个携带时间戳和用户名的点击事件流,我们先对这些事件流进行流式统计,同时在最后的流事件上触发批计算。流计算中每接收一个数据都会触发一次计算,我们以2018/4/30 22:37:45 Mary到来那一时间切片看,无论是在流还是批上计算结果都是6。也就是说在相同的数据源,相同的查询逻辑下,流和批的计算结果是相同的。相同的SQL在流和批这两种模式下,最终结果是一致的,那么流与批在语义上是完全相同的。

流与表的关系

流与批在语义上是一致的,SQL是作用于表的,那么要回答Apache Flink为啥也能为用户提供SQL API的问题,就变成了流与表是否具有等价性,也就是本篇要重点介绍的为什么流表具有对偶(duality)性?如下图所示,一张表可以看做为流吗?同样流可以看做是一张表吗?如果可以需要怎样的条件和变通?
image

MySQL主备复制

在介绍流与表的关系之前我们先聊聊MySQL的主备复制,binlog是MySQL实现主备复制的核心手段,简单来说MySQL主备复制实现分成三个步骤:

  • Master将改变(change logs)以二进制日志事件(binary log events)形式记录到二进制日志(binlog)中;
  • Slave将Master的binary log events拷贝到它的中继日志(relay log);
  • Slave重做中继日志中的事件,将改变反映到数据;

具体如下图所示:
image

binlog

接下来我们从binlog模式,binlog格式以及通过查看binlog的具体内容来详尽介绍binlog与表的关系。

binlog模式

上面介绍的MySQL主备复制的核心手段是利用binlog实现的,那边binlog会记录那些内容呢?binlog记录了数据库所有的增、删、更新等操作。MySQL支持三种方式记录binlog:

  • statement-based logging - Events contain SQL statements that produce data changes (inserts, updates, deletes);
  • row-based logging - Events describe changes to individual rows;
  • mixed-base logging - 该模式默认是statement-based,当遇到如下情况会自动切换到row-based:
    • NDB存储引擎,DML操作以row格式记录;
    • 使用UUID()、USER()、CURRENT_USER()、FOUND_ROWS()等不确定函数;
    • 使用Insert Delay语句;
    • 使用用户自定义函数(UDF);
    • 使用临时表;

binlog格式

我们以row-based 模式为例介绍一下binlog的存储格式 ,所有的 binary log events都是字节序列,由两部分组成:

  • event header
  • event data

关于event header和event data 的格式在数据库的不同版本略有不同,但共同的地方如下:

+=====================================+
| event  | timestamp         0 : 4    |
| header +----------------------------+
|        | type_code         4 : 1    |
|        +----------------------------+
|        | server_id         5 : 4    |
|        +----------------------------+
|        | event_length      9 : 4    |
|        +----------------------------+
|        |不同版本不一样(省略)          |
+=====================================+
| event  | fixed part                 |
| data   +----------------------------+
|        | variable part              |
+=====================================+

这里有个值得我们注意的地方就是在binlog的header中有一个属性是timestamp,这个属性是标识了change发生的先后顺序,在备库进行复制时候会严格按照时间顺序进行log的重放。

binlog的生成

我们以对MySQL进行实际操作的方式,直观的介绍一下binlog的生成,binlog是二进制存储的,下面我们会利用工具查看binlog的文本内容。

  • 查看一下binlog是否打开:

    show variables like 'log_bin'-> ;
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    | log_bin       | ON    |
    +---------------+-------+
    1 row in set (0.00 sec)
    
  • 查看一下binlog的模式(我需要row-base模式):

show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+
1 row in set (0.00 sec)
  • 清除现有的binlog
MySQL> reset master;
Query OK, 0 rows affected (0.00 sec)创建一张我们做实验的表MySQL> create table tab(
    ->    id INT NOT NULL AUTO_INCREMENT,
    ->    user VARCHAR(100) NOT NULL,
    ->    clicks INT NOT NULL,
    ->    PRIMARY KEY (id)
    -> );
Query OK, 0 rows affected (0.10 sec)

MySQL> show tables;
+-------------------+
| Tables_in_Apache Flinkdb |
+-------------------+
| tab         |
+-------------------+
1 row in set (0.00 sec)
  • 进行DML操作
MySQL> insert into tab(user, clicks) values ('Mary', 1);
Query OK, 1 row affected (0.03 sec)

MySQL> insert into tab(user, clicks) values ('Bob', 1);
Query OK, 1 row affected (0.08 sec)

MySQL> update tab set clicks=2 where user='Mary'
    -> ;
Query OK, 1 row affected (0.06 sec)
Rows matched: 1  Changed: 1  Warnings: 0

MySQL> insert into tab(user, clicks) values ('Llz', 1);
Query OK, 1 row affected (0.08 sec)

MySQL> update tab set clicks=2 where user='Bob';
Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0

MySQL> update tab set clicks=3 where user='Mary';
Query OK, 1 row affected (0.05 sec)
Rows matched: 1  Changed: 1  Warnings: 0

MySQL> select * from tab;
+----+------+--------+
| id | user | clicks |
+----+------+--------+
|  1 | Mary |      3 |
|  2 | Bob  |      2 |
|  3 | Llz  |      1 |
+----+------+--------+
3 rows in set (0.00 sec)
  • 查看正在操作的binlog

    MySQL> show master status\G
    *************************** 1. row ***************************
               File: binlog.000001
           Position: 2547
       Binlog_Do_DB: 
    Binlog_Ignore_DB: 
    Executed_Gtid_Set: 
    1 row in set (0.00 sec)
    

    上面 binlog.000001 文件是我们正在操作的binlog。

  • 查看binlog.000001文件的操作记录

    MySQL> show binlog events in 'binlog.000001';
    +---------------+------+----------------+-----------+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    | Log_name      | Pos  | Event_type     | Server_id | End_log_pos | Info                                                                                                                                                                |
    +---------------+------+----------------+-----------+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    | binlog.000001 |    4 | Format_desc    |         1 |         124 | Server ver: 8.0.11, Binlog ver: 4                                                                                                                                   |
    | binlog.000001 |  124 | Previous_gtids |         1 |         155 |                                                                                                                                                                     |
    | binlog.000001 |  155 | Anonymous_Gtid |         1 |         228 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'                                                                                                                                |
    | binlog.000001 |  228 | Query          |         1 |         368 | use `Apache Flinkdb`; DROP TABLE `tab` /* generated by server */ /* xid=22 */                                                                                        |
    | binlog.000001 |  368 | Anonymous_Gtid |         1 |         443 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'                                                                                                                                |
    | binlog.000001 |  443 | Query          |         1 |         670 | use `Apache Flinkdb`; create table tab(
     id INT NOT NULL AUTO_INCREMENT,
     user VARCHAR(100) NOT NULL,
     clicks INT NOT NULL,
     PRIMARY KEY (id)
    ) /* xid=23 */ |
    | binlog.000001 |  670 | Anonymous_Gtid |         1 |         745 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'                                                                                                                                |
    | binlog.000001 |  745 | Query          |         1 |         823 | BEGIN                                                                                                                                                               |
    | binlog.000001 |  823 | Table_map      |         1 |         890 | table_id: 96 (Apache Flinkdb.tab)                                                                                                                                    |
    | binlog.000001 |  890 | Write_rows     |         1 |         940 | table_id: 96 flags: STMT_END_F                                                                                                                                      |
    | binlog.000001 |  940 | Xid            |         1 |         971 | COMMIT /* xid=25 */                                                                                                                                                 |
    | binlog.000001 |  971 | Anonymous_Gtid |         1 |        1046 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'                                                                                                                                |
    | binlog.000001 | 1046 | Query          |         1 |        1124 | BEGIN                                                                                                                                                               |
    | binlog.000001 | 1124 | Table_map      |         1 |        1191 | table_id: 96 (Apache Flinkdb.tab)                                                                                                                                    |
    | binlog.000001 | 1191 | Write_rows     |         1 |        1240 | table_id: 96 flags: STMT_END_F                                                                                                                                      |
    | binlog.000001 | 1240 | Xid            |         1 |        1271 | COMMIT /* xid=26 */                                                                                                                                                 |
    | binlog.000001 | 1271 | Anonymous_Gtid |         1 |        1346 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'                                                                                                                                |
    | binlog.000001 | 1346 | Query          |         1 |        1433 | BEGIN                                                                                                                                                               |
    | binlog.000001 | 1433 | Table_map      |         1 |        1500 | table_id: 96 (Apache Flinkdb.tab)                                                                                                                                    |
    | binlog.000001 | 1500 | Update_rows    |         1 |        1566 | table_id: 96 flags: STMT_END_F                                                                                                                                      |
    | binlog.000001 | 1566 | Xid            |         1 |        1597 | COMMIT /* xid=27 */                                                                                                                                                 |
    | binlog.000001 | 1597 | Anonymous_Gtid |         1 |        1672 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'                                                                                                                                |
    | binlog.000001 | 1672 | Query          |         1 |        1750 | BEGIN                                                                                                                                                               |
    | binlog.000001 | 1750 | Table_map      |         1 |        1817 | table_id: 96 (Apache Flinkdb.tab)                                                                                                                                    |
    | binlog.000001 | 1817 | Write_rows     |         1 |        1866 | table_id: 96 flags: STMT_END_F                                                                                                                                      |
    | binlog.000001 | 1866 | Xid            |         1 |        1897 | COMMIT /* xid=28 */                                                                                                                                                 |
    | binlog.000001 | 1897 | Anonymous_Gtid |         1 |        1972 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'                                                                                                                                |
    | binlog.000001 | 1972 | Query          |         1 |        2059 | BEGIN                                                                                                                                                               |
    | binlog.000001 | 2059 | Table_map      |         1 |        2126 | table_id: 96 (Apache Flinkdb.tab)                                                                                                                                    |
    | binlog.000001 | 2126 | Update_rows    |         1 |        2190 | table_id: 96 flags: STMT_END_F                                                                                                                                      |
    | binlog.000001 | 2190 | Xid            |         1 |        2221 | COMMIT /* xid=29 */                                                                                                                                                 |
    | binlog.000001 | 2221 | Anonymous_Gtid |         1 |        2296 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'                                                                                                                                |
    | binlog.000001 | 2296 | Query          |         1 |        2383 | BEGIN                                                                                                                                                               |
    | binlog.000001 | 2383 | Table_map      |         1 |        2450 | table_id: 96 (Apache Flinkdb.tab)                                                                                                                                    |
    | binlog.000001 | 2450 | Update_rows    |         1 |        2516 | table_id: 96 flags: STMT_END_F                                                                                                                                      |
    | binlog.000001 | 2516 | Xid            |         1 |        2547 | COMMIT /* xid=30 */                                                                                                                                                 |
    +---------------+------+----------------+-----------+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    36 rows in set (0.00 sec)
    

    上面我们进行了3次insert和3次update,那么在binlog中我们看到了三条Write_rows和三条Update_rows,并且在记录顺序和操作顺序保持一致,接下来我们看看Write_rows和Update_rows的具体timestamp和data明文。

  • 导出明文

    sudo MySQLbinlog --start-datetime='2018-04-29 00:00:03' --stop-datetime='2018-05-02 00:30:00' --base64-output=decode-rows -v /usr/local/MySQL/data/binlog.000001 > ~/binlog.txt
    

打开binlog.txt 内容如下:

/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=1*/;
/*!50003 SET @OLD_COMPLETION_TYPE=@@COMPLETION_TYPE,COMPLETION_TYPE=0*/;
DELIMITER /*!*/;
# at 4
#180430 22:29:33 server id 1  end_log_pos 124 CRC32 0xff61797c  Start: binlog v 4, server v 8.0.11 created 180430 22:29:33 at startup
# Warning: this binlog is either in use or was not closed properly.
ROLLBACK/*!*/;
# at 124
#180430 22:29:33 server id 1  end_log_pos 155 CRC32 0x629ae755  Previous-GTIDs
# [empty]
# at 155
#180430 22:32:11 server id 1  end_log_pos 228 CRC32 0xbde49fca  Anonymous_GTID  last_committed=0        sequence_number=1       rbr_only=no     original_committed_timestamp=1525098731207902   immediate_commit_timestamp=1525098731207902     transaction_length=213
# original_commit_timestamp=1525098731207902 (2018-04-30 22:32:11.207902 CST)
# immediate_commit_timestamp=1525098731207902 (2018-04-30 22:32:11.207902 CST)
/*!80001 SET @@session.original_commit_timestamp=1525098731207902*//*!*/;
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 228
#180430 22:32:11 server id 1  end_log_pos 368 CRC32 0xe5f330e7  Query   thread_id=9     exec_time=0     error_code=0    Xid = 22
use `Apache Flinkdb`/*!*/;
SET TIMESTAMP=1525098731/*!*/;
SET @@session.pseudo_thread_id=9/*!*/;
SET @@session.foreign_key_checks=1, @@session.sql_auto_is_null=0, @@session.unique_checks=1, @@session.autocommit=1/*!*/;
SET @@session.sql_mode=1168113696/*!*/;
SET @@session.auto_increment_increment=1, @@session.auto_increment_offset=1/*!*/;
/*!\C utf8mb4 *//*!*/;
SET @@session.character_set_client=255,@@session.collation_connection=255,@@session.collation_server=255/*!*/;
SET @@session.lc_time_names=0/*!*/;
SET @@session.collation_database=DEFAULT/*!*/;
/*!80005 SET @@session.default_collation_for_utf8mb4=255*//*!*/;
DROP TABLE `tab` /* generated by server */
/*!*/;
# at 368
#180430 22:32:21 server id 1  end_log_pos 443 CRC32 0x50e5acb7  Anonymous_GTID  last_committed=1        sequence_number=2       rbr_only=no     original_committed_timestamp=1525098741628960   immediate_commit_timestamp=1525098741628960     transaction_length=302
# original_commit_timestamp=1525098741628960 (2018-04-30 22:32:21.628960 CST)
# immediate_commit_timestamp=1525098741628960 (2018-04-30 22:32:21.628960 CST)
/*!80001 SET @@session.original_commit_timestamp=1525098741628960*//*!*/;
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 443
#180430 22:32:21 server id 1  end_log_pos 670 CRC32 0xe1353dd6  Query   thread_id=9     exec_time=0     error_code=0    Xid = 23
SET TIMESTAMP=1525098741/*!*/;
create table tab(
   id INT NOT NULL AUTO_INCREMENT,
   user VARCHAR(100) NOT NULL,
   clicks INT NOT NULL,
   PRIMARY KEY (id)
)
/*!*/;
# at 670
#180430 22:36:53 server id 1  end_log_pos 745 CRC32 0xcf436fbb  Anonymous_GTID  last_committed=2        sequence_number=3       rbr_only=yes    original_committed_timestamp=1525099013988373   immediate_commit_timestamp=1525099013988373     transaction_length=301
/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/;
# original_commit_timestamp=1525099013988373 (2018-04-30 22:36:53.988373 CST)
# immediate_commit_timestamp=1525099013988373 (2018-04-30 22:36:53.988373 CST)
/*!80001 SET @@session.original_commit_timestamp=1525099013988373*//*!*/;
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 745
#180430 22:36:53 server id 1  end_log_pos 823 CRC32 0x71c64dd2  Query   thread_id=9     exec_time=0     error_code=0
SET TIMESTAMP=1525099013/*!*/;
BEGIN
/*!*/;
# at 823
#180430 22:36:53 server id 1  end_log_pos 890 CRC32 0x63792f6b  Table_map: `Apache Flinkdb`.`tab` mapped to number 96
# at 890
#180430 22:36:53 server id 1  end_log_pos 940 CRC32 0xf2dade22  Write_rows: table id 96 flags: STMT_END_F
### INSERT INTO `Apache Flinkdb`.`tab`
### SET
###   @1=1
###   @2='Mary'
###   @3=1
# at 940
#180430 22:36:53 server id 1  end_log_pos 971 CRC32 0x7db3e61e  Xid = 25
COMMIT/*!*/;
# at 971
#180430 22:37:06 server id 1  end_log_pos 1046 CRC32 0xd05dd12c         Anonymous_GTID  last_committed=3        sequence_number=4       rbr_only=yes    original_committed_timestamp=1525099026328547   immediate_commit_timestamp=1525099026328547     transaction_length=300
/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/;
# original_commit_timestamp=1525099026328547 (2018-04-30 22:37:06.328547 CST)
# immediate_commit_timestamp=1525099026328547 (2018-04-30 22:37:06.328547 CST)
/*!80001 SET @@session.original_commit_timestamp=1525099026328547*//*!*/;
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 1046
#180430 22:37:06 server id 1  end_log_pos 1124 CRC32 0x80f259e0         Query   thread_id=9     exec_time=0     error_code=0
SET TIMESTAMP=1525099026/*!*/;
BEGIN
/*!*/;
# at 1124
#180430 22:37:06 server id 1  end_log_pos 1191 CRC32 0x255903ba         Table_map: `Apache Flinkdb`.`tab` mapped to number 96
# at 1191
#180430 22:37:06 server id 1  end_log_pos 1240 CRC32 0xe76bfc79         Write_rows: table id 96 flags: STMT_END_F
### INSERT INTO `Apache Flinkdb`.`tab`
### SET
###   @1=2
###   @2='Bob'
###   @3=1
# at 1240
#180430 22:37:06 server id 1  end_log_pos 1271 CRC32 0x83cddfef         Xid = 26
COMMIT/*!*/;
# at 1271
#180430 22:37:15 server id 1  end_log_pos 1346 CRC32 0x7095baee         Anonymous_GTID  last_committed=4        sequence_number=5       rbr_only=yes    original_committed_timestamp=1525099035811597   immediate_commit_timestamp=1525099035811597     transaction_length=326
/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/;
# original_commit_timestamp=1525099035811597 (2018-04-30 22:37:15.811597 CST)
# immediate_commit_timestamp=1525099035811597 (2018-04-30 22:37:15.811597 CST)
/*!80001 SET @@session.original_commit_timestamp=1525099035811597*//*!*/;
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 1346
#180430 22:37:15 server id 1  end_log_pos 1433 CRC32 0x70ef97e2         Query   thread_id=9     exec_time=0     error_code=0
SET TIMESTAMP=1525099035/*!*/;
BEGIN
/*!*/;
# at 1433
#180430 22:37:15 server id 1  end_log_pos 1500 CRC32 0x75f1f399         Table_map: `Apache Flinkdb`.`tab` mapped to number 96
# at 1500
#180430 22:37:15 server id 1  end_log_pos 1566 CRC32 0x256bd4b8         Update_rows: table id 96 flags: STMT_END_F
### UPDATE `Apache Flinkdb`.`tab`
### WHERE
###   @1=1
###   @2='Mary'
###   @3=1
### SET
###   @1=1
###   @2='Mary'
###   @3=2
# at 1566
#180430 22:37:15 server id 1  end_log_pos 1597 CRC32 0x93c86579         Xid = 27
COMMIT/*!*/;
# at 1597
#180430 22:37:27 server id 1  end_log_pos 1672 CRC32 0xe8bd63e7         Anonymous_GTID  last_committed=5        sequence_number=6       rbr_only=yes    original_committed_timestamp=1525099047219517   immediate_commit_timestamp=1525099047219517     transaction_length=300
/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/;
# original_commit_timestamp=1525099047219517 (2018-04-30 22:37:27.219517 CST)
# immediate_commit_timestamp=1525099047219517 (2018-04-30 22:37:27.219517 CST)
/*!80001 SET @@session.original_commit_timestamp=1525099047219517*//*!*/;
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 1672
#180430 22:37:27 server id 1  end_log_pos 1750 CRC32 0x5356c3c7         Query   thread_id=9     exec_time=0     error_code=0
SET TIMESTAMP=1525099047/*!*/;
BEGIN
/*!*/;
# at 1750
#180430 22:37:27 server id 1  end_log_pos 1817 CRC32 0x37e6b1ce         Table_map: `Apache Flinkdb`.`tab` mapped to number 96
# at 1817
#180430 22:37:27 server id 1  end_log_pos 1866 CRC32 0x6ab1bbe6         Write_rows: table id 96 flags: STMT_END_F
### INSERT INTO `Apache Flinkdb`.`tab`
### SET
###   @1=3
###   @2='Llz'
###   @3=1
# at 1866
#180430 22:37:27 server id 1  end_log_pos 1897 CRC32 0x3b62b153         Xid = 28
COMMIT/*!*/;
# at 1897
#180430 22:37:36 server id 1  end_log_pos 1972 CRC32 0x603134c1         Anonymous_GTID  last_committed=6        sequence_number=7       rbr_only=yes    original_committed_timestamp=1525099056866022   immediate_commit_timestamp=1525099056866022     transaction_length=324
/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/;
# original_commit_timestamp=1525099056866022 (2018-04-30 22:37:36.866022 CST)
# immediate_commit_timestamp=1525099056866022 (2018-04-30 22:37:36.866022 CST)
/*!80001 SET @@session.original_commit_timestamp=1525099056866022*//*!*/;
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 1972
#180430 22:37:36 server id 1  end_log_pos 2059 CRC32 0xe17df4e4         Query   thread_id=9     exec_time=0     error_code=0
SET TIMESTAMP=1525099056/*!*/;
BEGIN
/*!*/;
# at 2059
#180430 22:37:36 server id 1  end_log_pos 2126 CRC32 0x53888b05         Table_map: `Apache Flinkdb`.`tab` mapped to number 96
# at 2126
#180430 22:37:36 server id 1  end_log_pos 2190 CRC32 0x85f34996         Update_rows: table id 96 flags: STMT_END_F
### UPDATE `Apache Flinkdb`.`tab`
### WHERE
###   @1=2
###   @2='Bob'
###   @3=1
### SET
###   @1=2
###   @2='Bob'
###   @3=2
# at 2190
#180430 22:37:36 server id 1  end_log_pos 2221 CRC32 0x877f1e23         Xid = 29
COMMIT/*!*/;
# at 2221
#180430 22:37:45 server id 1  end_log_pos 2296 CRC32 0xfbc7e868         Anonymous_GTID  last_committed=7        sequence_number=8       rbr_only=yes    original_committed_timestamp=1525099065089940   immediate_commit_timestamp=1525099065089940     transaction_length=326
/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/;
# original_commit_timestamp=1525099065089940 (2018-04-30 22:37:45.089940 CST)
# immediate_commit_timestamp=1525099065089940 (2018-04-30 22:37:45.089940 CST)
/*!80001 SET @@session.original_commit_timestamp=1525099065089940*//*!*/;
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 2296
#180430 22:37:45 server id 1  end_log_pos 2383 CRC32 0x8a514364         Query   thread_id=9     exec_time=0     error_code=0
SET TIMESTAMP=1525099065/*!*/;
BEGIN
/*!*/;
# at 2383
#180430 22:37:45 server id 1  end_log_pos 2450 CRC32 0xdf18ca60         Table_map: `Apache Flinkdb`.`tab` mapped to number 96
# at 2450
#180430 22:37:45 server id 1  end_log_pos 2516 CRC32 0xd50de69f         Update_rows: table id 96 flags: STMT_END_F
### UPDATE `Apache Flinkdb`.`tab`
### WHERE
###   @1=1
###   @2='Mary'
###   @3=2
### SET
###   @1=1
###   @2='Mary'
###   @3=3
# at 2516
#180430 22:37:45 server id 1  end_log_pos 2547 CRC32 0x94f89393         Xid = 30
COMMIT/*!*/;
SET @@SESSION.GTID_NEXT= 'AUTOMATIC' /* added by MySQLbinlog */ /*!*/;
DELIMITER ;
# End of log file
/*!50003 SET COMPLETION_TYPE=@OLD_COMPLETION_TYPE*/;
/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0*/;
  • 梳理操作和binlog的记录关系

    DML binlog-header(timestamp) data
    insert into blink_tab(user, clicks) values ('Mary', 1);

    1525099013

    (2018/4/30 22:36:53)

    ## INSERT INTO blinkdb.blink_tab
    ### SET
    ### @1=1
    ### @2='Mary'
    ### @3=1
    insert into blink_tab(user, clicks) values ('Bob', 1);

    1525099026

    (2018/4/30 22:37:06)

    ### INSERT INTO blinkdb.blink_tab
    ### SET
    ### @1=2
    ### @2='Bob'
    ### @3=1 
     update blink_tab set clicks=2 where user='Mary';

     1525099035

    (2018/4/30 22:37:15)

     ### UPDATE blinkdb.blink_tab
    ### WHERE
    ### @1=1
    ### @2='Mary'
    ### @3=1
    ### SET
    ### @1=1
    ### @2='Mary'
    ### @3=2
     insert into blink_tab(user, clicks) values ('Llz', 1);

    1525099047

    (2018/4/30 22:37:27) 

    ### INSERT INTO blinkdb.blink_tab
    ### SET
    ### @1=3
    ### @2='Llz'
    ### @3=1 
     update blink_tab set clicks=2 where user='Bob';

     1525099056

    (2018/4/30 22:37:36)

    ### UPDATE blinkdb.blink_tab
    ### WHERE
    ### @1=2
    ### @2='Bob'
    ### @3=1
    ### SET
    ### @1=2
    ### @2='Bob'
    ### @3=2 
     update blink_tab set clicks=3 where user='Mary';

    1525099065

    (2018/4/30 22:37:45) 

    ### UPDATE blinkdb.blink_tab
    ### WHERE
    ### @1=1
    ### @2='Mary'
    ### @3=2
    ### SET
    ### @1=1
    ### @2='Mary'
    ### @3=3 
  • 简化一下binlog

timestamp user clicks
1525099013 Mary 1
1525099026 Bob 1
1525099035 Mary 2
1525099047 LIz 1
1525099056 Bob 2
1525099065 Mary 3
  • replay binlog会得到如下表数据(按timestamp顺序)
user clicks
Mary 3
Bob 2
LIz 1
  • 表与binlog的关系简单示意如下
    image

流表对偶(duality)性

前面我花费了一些时间介绍了MySQL主备复制机制和binlog的数据格式,binlog中携带时间戳,我们将所有表的操作都按时间进行记录下来形成binlog,而对binlog的event进行重放的过程就是流数据处理的过程,重放的结果恰恰又形成了一张表。也就是表的操作会形成携带时间的事件流,对流的处理又会形成一张不断变化的表,表和流具有等价性,可以互转。随着时间推移,DML操作不断进行,那么表的内容也不断变化,具体如下:

image

如上图所示内容,流和表具备相同的特征:

  • 表 - Schema,Data,DML操作时间
  • 流 - Schema,Data, Data处理时间

我们发现,虽然大多数表上面没有明确的显示出DML操作时间,但本质上数据库系统里面是有数据操作时间信息的,这个和流上数据的处理时间(processing time)/产生时间(event-time)相对应。流与表具备相同的特征,可以信息无损的相互转换,我称之为流表对偶(duality)性。

上面我们描述的表,在流上称之为动态表(Dynamic Table),原因是在流上面任何一个事件的到来都是对表上数据的一次更新(包括插入和删除),表的内容是不断的变化的,任何一个时刻流的状态和表的快照一一对应。流与动态表(Dynamic Table)在时间维度上面具有等价性,这种等价性我们称之为流和动态表(Dynamic Table)的对偶(duality)性。

小结

本篇主要介绍Apache Flink作为一个流计算平台为什么可以为用户提供SQL API。其根本原因是如果将流上的数据看做是结构化的数据,流任务的核心是将一个具有时间属性的结构化数据变成同样具有时间属性的另一个结构化数据,而表的数据变化过程binlog恰恰就是一份具有时间属性的流数据,流与表具有信息无损的相互转换的特性,这种流表对偶性也决定了Apache Flink可以采用SQL作为流任务的开发语言。

关于点赞和评论

本系列文章难免有很多缺陷和不足,真诚希望读者对有收获的篇章给予点赞鼓励,对有不足的篇章给予反馈和建议,先行感谢大家!

相关实践学习
基于Hologres+Flink搭建GitHub实时数据大屏
通过使用Flink、Hologres构建实时数仓,并通过Hologres对接BI分析工具(以DataV为例),实现海量数据实时分析.
实时计算 Flink 实战课程
如何使用实时计算 Flink 搞定数据处理难题?实时计算 Flink 极客训练营产品、技术专家齐上阵,从开源 Flink功能介绍到实时计算 Flink 优势详解,现场实操,5天即可上手! 欢迎开通实时计算 Flink 版: https://cn.aliyun.com/product/bigdata/sc Flink Forward Asia 介绍: Flink Forward 是由 Apache 官方授权,Apache Flink Community China 支持的会议,通过参会不仅可以了解到 Flink 社区的最新动态和发展计划,还可以了解到国内外一线大厂围绕 Flink 生态的生产实践经验,是 Flink 开发者和使用者不可错过的盛会。 去年经过品牌升级后的 Flink Forward Asia 吸引了超过2000人线下参与,一举成为国内最大的 Apache 顶级项目会议。结合2020年的特殊情况,Flink Forward Asia 2020 将在12月26日以线上峰会的形式与大家见面。
目录
相关文章
|
3月前
|
人工智能 数据处理 API
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
Apache Flink Agents 是由阿里云、Ververica、Confluent 与 LinkedIn 联合推出的开源子项目,旨在基于 Flink 构建可扩展、事件驱动的生产级 AI 智能体框架,实现数据与智能的实时融合。
688 6
阿里云、Ververica、Confluent 与 LinkedIn 携手推进流式创新,共筑基于 Apache Flink Agents 的智能体 AI 未来
|
存储 Cloud Native 数据处理
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
本文整理自阿里云资深技术专家、Apache Flink PMC 成员梅源在 Flink Forward Asia 新加坡 2025上的分享,深入解析 Flink 状态管理系统的发展历程,从核心设计到 Flink 2.0 存算分离架构,并展望未来基于流批一体的通用增量计算方向。
425 0
从嵌入式状态管理到云原生架构:Apache Flink 的演进与下一代增量计算范式
|
5月前
|
SQL 人工智能 数据挖掘
Apache Flink:从实时数据分析到实时AI
Apache Flink 是实时数据处理领域的核心技术,历经十年发展,已从学术项目成长为实时计算的事实标准。它在现代数据架构中发挥着关键作用,支持实时数据分析、湖仓集成及实时 AI 应用。随着 Flink 2.0 的发布,其在流式湖仓、AI 驱动决策等方面展现出强大潜力,正推动企业迈向智能化、实时化的新阶段。
735 9
Apache Flink:从实时数据分析到实时AI
|
5月前
|
SQL 人工智能 API
Apache Flink 2.1.0: 面向实时 Data + AI 全面升级,开启智能流处理新纪元
Apache Flink 2.1.0 正式发布,标志着实时数据处理引擎向统一 Data + AI 平台迈进。新版本强化了实时 AI 能力,支持通过 Flink SQL 和 Table API 创建及调用 AI 模型,新增 Model DDL、ML_PREDICT 表值函数等功能,实现端到端的实时 AI 工作流。同时增强了 Flink SQL 的流处理能力,引入 Process Table Functions(PTFs)、Variant 数据类型,优化流式 Join 及状态管理,显著提升作业稳定性与资源利用率。
663 0
|
4月前
|
人工智能 运维 Java
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
本文基于Apache Flink PMC成员宋辛童在Community Over Code Asia 2025的演讲,深入解析Flink Agents项目的技术背景、架构设计与应用场景。该项目聚焦事件驱动型AI智能体,结合Flink的实时处理能力,推动AI在工业场景中的工程化落地,涵盖智能运维、直播分析等典型应用,展现其在AI发展第四层次——智能体AI中的重要意义。
1699 27
Flink Agents:基于Apache Flink的事件驱动AI智能体框架
|
5月前
|
存储 人工智能 数据处理
对话王峰:Apache Flink 在 AI 时代的“剑锋”所向
Flink 2.0 架构升级实现存算分离,迈向彻底云原生化,支持更大规模状态管理、提升资源效率、增强容灾能力。通过流批一体与 AI 场景融合,推动实时计算向智能化演进。生态项目如 Paimon、Fluss 和 Flink CDC 构建湖流一体架构,实现分钟级时效性与低成本平衡。未来,Flink 将深化 AI Agents 框架,引领事件驱动的智能数据处理新方向。
643 6
|
5月前
|
消息中间件 存储 Kafka
Apache Flink错误处理实战手册:2年生产环境调试经验总结
本文由 Ververica 客户成功经理 Naci Simsek 撰写,基于其在多个行业 Flink 项目中的实战经验,总结了 Apache Flink 生产环境中常见的三大典型问题及其解决方案。内容涵盖 Kafka 连接器迁移导致的状态管理问题、任务槽负载不均问题以及 Kryo 序列化引发的性能陷阱,旨在帮助企业开发者避免常见误区,提升实时流处理系统的稳定性与性能。
558 0
Apache Flink错误处理实战手册:2年生产环境调试经验总结
|
SQL 大数据 Apache
Apache Flink 2021 最新入门课程 | 图谱精选课程
轻松收获 Flink 生产环境开发技能
Apache Flink 2021 最新入门课程 | 图谱精选课程
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
858 33
The Past, Present and Future of Apache Flink
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
1650 13
Apache Flink 2.0-preview released

热门文章

最新文章

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多