Apache Flink 漫谈系列(06) - 流表对偶(duality)性

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 实际问题 很多大数据计算产品,都对用户提供了SQL API,比如Hive, Spark, Flink等,那么SQL作为传统关系数据库的查询语言,是应用在批查询场景的。Hive和Spark本质上都是Batch的计算模式(在《Apache Flink 漫谈系列 - 概述》我们介绍过Spark是Micr.

实际问题

很多大数据计算产品,都对用户提供了SQL API,比如Hive, Spark, Flink等,那么SQL作为传统关系数据库的查询语言,是应用在批查询场景的。Hive和Spark本质上都是Batch的计算模式(在《Apache Flink 漫谈系列 - 概述》我们介绍过Spark是Micro Batching模式),提供SQL API很容易被人理解,但是Flink是纯流(Native Streaming)的计算模式, 流与批在数据集和计算过程上有很大的区别,如下:
image

  • 批查询场景的特点 - 有限数据集,一次查询返回一个计算结果就结束查询

  • 流查询场景的特点 - 无限数据集,一次查询不断修正计算结果,查询永远不结束

我们发现批与流的查询场景在数据集合和计算过程上都有很大的不同,那么基于Native Streaming模式的Apache Flink为啥也能为用户提供SQL API呢?

流与批的语义关系

我们知道SQL都是作用于关系表的,在传统数据库中进行查询时候,SQL所要查询的表在触发查询时候数据是不会变化的,也就是说在查询那一刻,表是一张静态表,相当于是一个有限的批数据,这样也说明SQL是源于对批计算的查询的,那么要回答Apache Flink为啥也能为用户提供SQL API,我们首先要理解流与批在语义层面的关系。我们以一个具体示例说明,如下图:
image

上图展现的是一个携带时间戳和用户名的点击事件流,我们先对这些事件流进行流式统计,同时在最后的流事件上触发批计算。流计算中每接收一个数据都会触发一次计算,我们以2018/4/30 22:37:45 Mary到来那一时间切片看,无论是在流还是批上计算结果都是6。也就是说在相同的数据源,相同的查询逻辑下,流和批的计算结果是相同的。相同的SQL在流和批这两种模式下,最终结果是一致的,那么流与批在语义上是完全相同的。

流与表的关系

流与批在语义上是一致的,SQL是作用于表的,那么要回答Apache Flink为啥也能为用户提供SQL API的问题,就变成了流与表是否具有等价性,也就是本篇要重点介绍的为什么流表具有对偶(duality)性?如下图所示,一张表可以看做为流吗?同样流可以看做是一张表吗?如果可以需要怎样的条件和变通?
image

MySQL主备复制

在介绍流与表的关系之前我们先聊聊MySQL的主备复制,binlog是MySQL实现主备复制的核心手段,简单来说MySQL主备复制实现分成三个步骤:

  • Master将改变(change logs)以二进制日志事件(binary log events)形式记录到二进制日志(binlog)中;
  • Slave将Master的binary log events拷贝到它的中继日志(relay log);
  • Slave重做中继日志中的事件,将改变反映到数据;

具体如下图所示:
image

binlog

接下来我们从binlog模式,binlog格式以及通过查看binlog的具体内容来详尽介绍binlog与表的关系。

binlog模式

上面介绍的MySQL主备复制的核心手段是利用binlog实现的,那边binlog会记录那些内容呢?binlog记录了数据库所有的增、删、更新等操作。MySQL支持三种方式记录binlog:

  • statement-based logging - Events contain SQL statements that produce data changes (inserts, updates, deletes);
  • row-based logging - Events describe changes to individual rows;
  • mixed-base logging - 该模式默认是statement-based,当遇到如下情况会自动切换到row-based:
    • NDB存储引擎,DML操作以row格式记录;
    • 使用UUID()、USER()、CURRENT_USER()、FOUND_ROWS()等不确定函数;
    • 使用Insert Delay语句;
    • 使用用户自定义函数(UDF);
    • 使用临时表;

binlog格式

我们以row-based 模式为例介绍一下binlog的存储格式 ,所有的 binary log events都是字节序列,由两部分组成:

  • event header
  • event data

关于event header和event data 的格式在数据库的不同版本略有不同,但共同的地方如下:

+=====================================+
| event  | timestamp         0 : 4    |
| header +----------------------------+
|        | type_code         4 : 1    |
|        +----------------------------+
|        | server_id         5 : 4    |
|        +----------------------------+
|        | event_length      9 : 4    |
|        +----------------------------+
|        |不同版本不一样(省略)          |
+=====================================+
| event  | fixed part                 |
| data   +----------------------------+
|        | variable part              |
+=====================================+

这里有个值得我们注意的地方就是在binlog的header中有一个属性是timestamp,这个属性是标识了change发生的先后顺序,在备库进行复制时候会严格按照时间顺序进行log的重放。

binlog的生成

我们以对MySQL进行实际操作的方式,直观的介绍一下binlog的生成,binlog是二进制存储的,下面我们会利用工具查看binlog的文本内容。

  • 查看一下binlog是否打开:

    show variables like 'log_bin'-> ;
    +---------------+-------+
    | Variable_name | Value |
    +---------------+-------+
    | log_bin       | ON    |
    +---------------+-------+
    1 row in set (0.00 sec)
    
  • 查看一下binlog的模式(我需要row-base模式):

show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+
1 row in set (0.00 sec)
  • 清除现有的binlog
MySQL> reset master;
Query OK, 0 rows affected (0.00 sec)创建一张我们做实验的表MySQL> create table tab(
    ->    id INT NOT NULL AUTO_INCREMENT,
    ->    user VARCHAR(100) NOT NULL,
    ->    clicks INT NOT NULL,
    ->    PRIMARY KEY (id)
    -> );
Query OK, 0 rows affected (0.10 sec)

MySQL> show tables;
+-------------------+
| Tables_in_Apache Flinkdb |
+-------------------+
| tab         |
+-------------------+
1 row in set (0.00 sec)
  • 进行DML操作
MySQL> insert into tab(user, clicks) values ('Mary', 1);
Query OK, 1 row affected (0.03 sec)

MySQL> insert into tab(user, clicks) values ('Bob', 1);
Query OK, 1 row affected (0.08 sec)

MySQL> update tab set clicks=2 where user='Mary'
    -> ;
Query OK, 1 row affected (0.06 sec)
Rows matched: 1  Changed: 1  Warnings: 0

MySQL> insert into tab(user, clicks) values ('Llz', 1);
Query OK, 1 row affected (0.08 sec)

MySQL> update tab set clicks=2 where user='Bob';
Query OK, 1 row affected (0.01 sec)
Rows matched: 1  Changed: 1  Warnings: 0

MySQL> update tab set clicks=3 where user='Mary';
Query OK, 1 row affected (0.05 sec)
Rows matched: 1  Changed: 1  Warnings: 0

MySQL> select * from tab;
+----+------+--------+
| id | user | clicks |
+----+------+--------+
|  1 | Mary |      3 |
|  2 | Bob  |      2 |
|  3 | Llz  |      1 |
+----+------+--------+
3 rows in set (0.00 sec)
  • 查看正在操作的binlog

    MySQL> show master status\G
    *************************** 1. row ***************************
               File: binlog.000001
           Position: 2547
       Binlog_Do_DB: 
    Binlog_Ignore_DB: 
    Executed_Gtid_Set: 
    1 row in set (0.00 sec)
    

    上面 binlog.000001 文件是我们正在操作的binlog。

  • 查看binlog.000001文件的操作记录

    MySQL> show binlog events in 'binlog.000001';
    +---------------+------+----------------+-----------+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    | Log_name      | Pos  | Event_type     | Server_id | End_log_pos | Info                                                                                                                                                                |
    +---------------+------+----------------+-----------+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    | binlog.000001 |    4 | Format_desc    |         1 |         124 | Server ver: 8.0.11, Binlog ver: 4                                                                                                                                   |
    | binlog.000001 |  124 | Previous_gtids |         1 |         155 |                                                                                                                                                                     |
    | binlog.000001 |  155 | Anonymous_Gtid |         1 |         228 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'                                                                                                                                |
    | binlog.000001 |  228 | Query          |         1 |         368 | use `Apache Flinkdb`; DROP TABLE `tab` /* generated by server */ /* xid=22 */                                                                                        |
    | binlog.000001 |  368 | Anonymous_Gtid |         1 |         443 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'                                                                                                                                |
    | binlog.000001 |  443 | Query          |         1 |         670 | use `Apache Flinkdb`; create table tab(
     id INT NOT NULL AUTO_INCREMENT,
     user VARCHAR(100) NOT NULL,
     clicks INT NOT NULL,
     PRIMARY KEY (id)
    ) /* xid=23 */ |
    | binlog.000001 |  670 | Anonymous_Gtid |         1 |         745 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'                                                                                                                                |
    | binlog.000001 |  745 | Query          |         1 |         823 | BEGIN                                                                                                                                                               |
    | binlog.000001 |  823 | Table_map      |         1 |         890 | table_id: 96 (Apache Flinkdb.tab)                                                                                                                                    |
    | binlog.000001 |  890 | Write_rows     |         1 |         940 | table_id: 96 flags: STMT_END_F                                                                                                                                      |
    | binlog.000001 |  940 | Xid            |         1 |         971 | COMMIT /* xid=25 */                                                                                                                                                 |
    | binlog.000001 |  971 | Anonymous_Gtid |         1 |        1046 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'                                                                                                                                |
    | binlog.000001 | 1046 | Query          |         1 |        1124 | BEGIN                                                                                                                                                               |
    | binlog.000001 | 1124 | Table_map      |         1 |        1191 | table_id: 96 (Apache Flinkdb.tab)                                                                                                                                    |
    | binlog.000001 | 1191 | Write_rows     |         1 |        1240 | table_id: 96 flags: STMT_END_F                                                                                                                                      |
    | binlog.000001 | 1240 | Xid            |         1 |        1271 | COMMIT /* xid=26 */                                                                                                                                                 |
    | binlog.000001 | 1271 | Anonymous_Gtid |         1 |        1346 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'                                                                                                                                |
    | binlog.000001 | 1346 | Query          |         1 |        1433 | BEGIN                                                                                                                                                               |
    | binlog.000001 | 1433 | Table_map      |         1 |        1500 | table_id: 96 (Apache Flinkdb.tab)                                                                                                                                    |
    | binlog.000001 | 1500 | Update_rows    |         1 |        1566 | table_id: 96 flags: STMT_END_F                                                                                                                                      |
    | binlog.000001 | 1566 | Xid            |         1 |        1597 | COMMIT /* xid=27 */                                                                                                                                                 |
    | binlog.000001 | 1597 | Anonymous_Gtid |         1 |        1672 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'                                                                                                                                |
    | binlog.000001 | 1672 | Query          |         1 |        1750 | BEGIN                                                                                                                                                               |
    | binlog.000001 | 1750 | Table_map      |         1 |        1817 | table_id: 96 (Apache Flinkdb.tab)                                                                                                                                    |
    | binlog.000001 | 1817 | Write_rows     |         1 |        1866 | table_id: 96 flags: STMT_END_F                                                                                                                                      |
    | binlog.000001 | 1866 | Xid            |         1 |        1897 | COMMIT /* xid=28 */                                                                                                                                                 |
    | binlog.000001 | 1897 | Anonymous_Gtid |         1 |        1972 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'                                                                                                                                |
    | binlog.000001 | 1972 | Query          |         1 |        2059 | BEGIN                                                                                                                                                               |
    | binlog.000001 | 2059 | Table_map      |         1 |        2126 | table_id: 96 (Apache Flinkdb.tab)                                                                                                                                    |
    | binlog.000001 | 2126 | Update_rows    |         1 |        2190 | table_id: 96 flags: STMT_END_F                                                                                                                                      |
    | binlog.000001 | 2190 | Xid            |         1 |        2221 | COMMIT /* xid=29 */                                                                                                                                                 |
    | binlog.000001 | 2221 | Anonymous_Gtid |         1 |        2296 | SET @@SESSION.GTID_NEXT= 'ANONYMOUS'                                                                                                                                |
    | binlog.000001 | 2296 | Query          |         1 |        2383 | BEGIN                                                                                                                                                               |
    | binlog.000001 | 2383 | Table_map      |         1 |        2450 | table_id: 96 (Apache Flinkdb.tab)                                                                                                                                    |
    | binlog.000001 | 2450 | Update_rows    |         1 |        2516 | table_id: 96 flags: STMT_END_F                                                                                                                                      |
    | binlog.000001 | 2516 | Xid            |         1 |        2547 | COMMIT /* xid=30 */                                                                                                                                                 |
    +---------------+------+----------------+-----------+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
    36 rows in set (0.00 sec)
    

    上面我们进行了3次insert和3次update,那么在binlog中我们看到了三条Write_rows和三条Update_rows,并且在记录顺序和操作顺序保持一致,接下来我们看看Write_rows和Update_rows的具体timestamp和data明文。

  • 导出明文

    sudo MySQLbinlog --start-datetime='2018-04-29 00:00:03' --stop-datetime='2018-05-02 00:30:00' --base64-output=decode-rows -v /usr/local/MySQL/data/binlog.000001 > ~/binlog.txt
    

打开binlog.txt 内容如下:

/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=1*/;
/*!50003 SET @OLD_COMPLETION_TYPE=@@COMPLETION_TYPE,COMPLETION_TYPE=0*/;
DELIMITER /*!*/;
# at 4
#180430 22:29:33 server id 1  end_log_pos 124 CRC32 0xff61797c  Start: binlog v 4, server v 8.0.11 created 180430 22:29:33 at startup
# Warning: this binlog is either in use or was not closed properly.
ROLLBACK/*!*/;
# at 124
#180430 22:29:33 server id 1  end_log_pos 155 CRC32 0x629ae755  Previous-GTIDs
# [empty]
# at 155
#180430 22:32:11 server id 1  end_log_pos 228 CRC32 0xbde49fca  Anonymous_GTID  last_committed=0        sequence_number=1       rbr_only=no     original_committed_timestamp=1525098731207902   immediate_commit_timestamp=1525098731207902     transaction_length=213
# original_commit_timestamp=1525098731207902 (2018-04-30 22:32:11.207902 CST)
# immediate_commit_timestamp=1525098731207902 (2018-04-30 22:32:11.207902 CST)
/*!80001 SET @@session.original_commit_timestamp=1525098731207902*//*!*/;
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 228
#180430 22:32:11 server id 1  end_log_pos 368 CRC32 0xe5f330e7  Query   thread_id=9     exec_time=0     error_code=0    Xid = 22
use `Apache Flinkdb`/*!*/;
SET TIMESTAMP=1525098731/*!*/;
SET @@session.pseudo_thread_id=9/*!*/;
SET @@session.foreign_key_checks=1, @@session.sql_auto_is_null=0, @@session.unique_checks=1, @@session.autocommit=1/*!*/;
SET @@session.sql_mode=1168113696/*!*/;
SET @@session.auto_increment_increment=1, @@session.auto_increment_offset=1/*!*/;
/*!\C utf8mb4 *//*!*/;
SET @@session.character_set_client=255,@@session.collation_connection=255,@@session.collation_server=255/*!*/;
SET @@session.lc_time_names=0/*!*/;
SET @@session.collation_database=DEFAULT/*!*/;
/*!80005 SET @@session.default_collation_for_utf8mb4=255*//*!*/;
DROP TABLE `tab` /* generated by server */
/*!*/;
# at 368
#180430 22:32:21 server id 1  end_log_pos 443 CRC32 0x50e5acb7  Anonymous_GTID  last_committed=1        sequence_number=2       rbr_only=no     original_committed_timestamp=1525098741628960   immediate_commit_timestamp=1525098741628960     transaction_length=302
# original_commit_timestamp=1525098741628960 (2018-04-30 22:32:21.628960 CST)
# immediate_commit_timestamp=1525098741628960 (2018-04-30 22:32:21.628960 CST)
/*!80001 SET @@session.original_commit_timestamp=1525098741628960*//*!*/;
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 443
#180430 22:32:21 server id 1  end_log_pos 670 CRC32 0xe1353dd6  Query   thread_id=9     exec_time=0     error_code=0    Xid = 23
SET TIMESTAMP=1525098741/*!*/;
create table tab(
   id INT NOT NULL AUTO_INCREMENT,
   user VARCHAR(100) NOT NULL,
   clicks INT NOT NULL,
   PRIMARY KEY (id)
)
/*!*/;
# at 670
#180430 22:36:53 server id 1  end_log_pos 745 CRC32 0xcf436fbb  Anonymous_GTID  last_committed=2        sequence_number=3       rbr_only=yes    original_committed_timestamp=1525099013988373   immediate_commit_timestamp=1525099013988373     transaction_length=301
/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/;
# original_commit_timestamp=1525099013988373 (2018-04-30 22:36:53.988373 CST)
# immediate_commit_timestamp=1525099013988373 (2018-04-30 22:36:53.988373 CST)
/*!80001 SET @@session.original_commit_timestamp=1525099013988373*//*!*/;
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 745
#180430 22:36:53 server id 1  end_log_pos 823 CRC32 0x71c64dd2  Query   thread_id=9     exec_time=0     error_code=0
SET TIMESTAMP=1525099013/*!*/;
BEGIN
/*!*/;
# at 823
#180430 22:36:53 server id 1  end_log_pos 890 CRC32 0x63792f6b  Table_map: `Apache Flinkdb`.`tab` mapped to number 96
# at 890
#180430 22:36:53 server id 1  end_log_pos 940 CRC32 0xf2dade22  Write_rows: table id 96 flags: STMT_END_F
### INSERT INTO `Apache Flinkdb`.`tab`
### SET
###   @1=1
###   @2='Mary'
###   @3=1
# at 940
#180430 22:36:53 server id 1  end_log_pos 971 CRC32 0x7db3e61e  Xid = 25
COMMIT/*!*/;
# at 971
#180430 22:37:06 server id 1  end_log_pos 1046 CRC32 0xd05dd12c         Anonymous_GTID  last_committed=3        sequence_number=4       rbr_only=yes    original_committed_timestamp=1525099026328547   immediate_commit_timestamp=1525099026328547     transaction_length=300
/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/;
# original_commit_timestamp=1525099026328547 (2018-04-30 22:37:06.328547 CST)
# immediate_commit_timestamp=1525099026328547 (2018-04-30 22:37:06.328547 CST)
/*!80001 SET @@session.original_commit_timestamp=1525099026328547*//*!*/;
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 1046
#180430 22:37:06 server id 1  end_log_pos 1124 CRC32 0x80f259e0         Query   thread_id=9     exec_time=0     error_code=0
SET TIMESTAMP=1525099026/*!*/;
BEGIN
/*!*/;
# at 1124
#180430 22:37:06 server id 1  end_log_pos 1191 CRC32 0x255903ba         Table_map: `Apache Flinkdb`.`tab` mapped to number 96
# at 1191
#180430 22:37:06 server id 1  end_log_pos 1240 CRC32 0xe76bfc79         Write_rows: table id 96 flags: STMT_END_F
### INSERT INTO `Apache Flinkdb`.`tab`
### SET
###   @1=2
###   @2='Bob'
###   @3=1
# at 1240
#180430 22:37:06 server id 1  end_log_pos 1271 CRC32 0x83cddfef         Xid = 26
COMMIT/*!*/;
# at 1271
#180430 22:37:15 server id 1  end_log_pos 1346 CRC32 0x7095baee         Anonymous_GTID  last_committed=4        sequence_number=5       rbr_only=yes    original_committed_timestamp=1525099035811597   immediate_commit_timestamp=1525099035811597     transaction_length=326
/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/;
# original_commit_timestamp=1525099035811597 (2018-04-30 22:37:15.811597 CST)
# immediate_commit_timestamp=1525099035811597 (2018-04-30 22:37:15.811597 CST)
/*!80001 SET @@session.original_commit_timestamp=1525099035811597*//*!*/;
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 1346
#180430 22:37:15 server id 1  end_log_pos 1433 CRC32 0x70ef97e2         Query   thread_id=9     exec_time=0     error_code=0
SET TIMESTAMP=1525099035/*!*/;
BEGIN
/*!*/;
# at 1433
#180430 22:37:15 server id 1  end_log_pos 1500 CRC32 0x75f1f399         Table_map: `Apache Flinkdb`.`tab` mapped to number 96
# at 1500
#180430 22:37:15 server id 1  end_log_pos 1566 CRC32 0x256bd4b8         Update_rows: table id 96 flags: STMT_END_F
### UPDATE `Apache Flinkdb`.`tab`
### WHERE
###   @1=1
###   @2='Mary'
###   @3=1
### SET
###   @1=1
###   @2='Mary'
###   @3=2
# at 1566
#180430 22:37:15 server id 1  end_log_pos 1597 CRC32 0x93c86579         Xid = 27
COMMIT/*!*/;
# at 1597
#180430 22:37:27 server id 1  end_log_pos 1672 CRC32 0xe8bd63e7         Anonymous_GTID  last_committed=5        sequence_number=6       rbr_only=yes    original_committed_timestamp=1525099047219517   immediate_commit_timestamp=1525099047219517     transaction_length=300
/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/;
# original_commit_timestamp=1525099047219517 (2018-04-30 22:37:27.219517 CST)
# immediate_commit_timestamp=1525099047219517 (2018-04-30 22:37:27.219517 CST)
/*!80001 SET @@session.original_commit_timestamp=1525099047219517*//*!*/;
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 1672
#180430 22:37:27 server id 1  end_log_pos 1750 CRC32 0x5356c3c7         Query   thread_id=9     exec_time=0     error_code=0
SET TIMESTAMP=1525099047/*!*/;
BEGIN
/*!*/;
# at 1750
#180430 22:37:27 server id 1  end_log_pos 1817 CRC32 0x37e6b1ce         Table_map: `Apache Flinkdb`.`tab` mapped to number 96
# at 1817
#180430 22:37:27 server id 1  end_log_pos 1866 CRC32 0x6ab1bbe6         Write_rows: table id 96 flags: STMT_END_F
### INSERT INTO `Apache Flinkdb`.`tab`
### SET
###   @1=3
###   @2='Llz'
###   @3=1
# at 1866
#180430 22:37:27 server id 1  end_log_pos 1897 CRC32 0x3b62b153         Xid = 28
COMMIT/*!*/;
# at 1897
#180430 22:37:36 server id 1  end_log_pos 1972 CRC32 0x603134c1         Anonymous_GTID  last_committed=6        sequence_number=7       rbr_only=yes    original_committed_timestamp=1525099056866022   immediate_commit_timestamp=1525099056866022     transaction_length=324
/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/;
# original_commit_timestamp=1525099056866022 (2018-04-30 22:37:36.866022 CST)
# immediate_commit_timestamp=1525099056866022 (2018-04-30 22:37:36.866022 CST)
/*!80001 SET @@session.original_commit_timestamp=1525099056866022*//*!*/;
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 1972
#180430 22:37:36 server id 1  end_log_pos 2059 CRC32 0xe17df4e4         Query   thread_id=9     exec_time=0     error_code=0
SET TIMESTAMP=1525099056/*!*/;
BEGIN
/*!*/;
# at 2059
#180430 22:37:36 server id 1  end_log_pos 2126 CRC32 0x53888b05         Table_map: `Apache Flinkdb`.`tab` mapped to number 96
# at 2126
#180430 22:37:36 server id 1  end_log_pos 2190 CRC32 0x85f34996         Update_rows: table id 96 flags: STMT_END_F
### UPDATE `Apache Flinkdb`.`tab`
### WHERE
###   @1=2
###   @2='Bob'
###   @3=1
### SET
###   @1=2
###   @2='Bob'
###   @3=2
# at 2190
#180430 22:37:36 server id 1  end_log_pos 2221 CRC32 0x877f1e23         Xid = 29
COMMIT/*!*/;
# at 2221
#180430 22:37:45 server id 1  end_log_pos 2296 CRC32 0xfbc7e868         Anonymous_GTID  last_committed=7        sequence_number=8       rbr_only=yes    original_committed_timestamp=1525099065089940   immediate_commit_timestamp=1525099065089940     transaction_length=326
/*!50718 SET TRANSACTION ISOLATION LEVEL READ COMMITTED*//*!*/;
# original_commit_timestamp=1525099065089940 (2018-04-30 22:37:45.089940 CST)
# immediate_commit_timestamp=1525099065089940 (2018-04-30 22:37:45.089940 CST)
/*!80001 SET @@session.original_commit_timestamp=1525099065089940*//*!*/;
SET @@SESSION.GTID_NEXT= 'ANONYMOUS'/*!*/;
# at 2296
#180430 22:37:45 server id 1  end_log_pos 2383 CRC32 0x8a514364         Query   thread_id=9     exec_time=0     error_code=0
SET TIMESTAMP=1525099065/*!*/;
BEGIN
/*!*/;
# at 2383
#180430 22:37:45 server id 1  end_log_pos 2450 CRC32 0xdf18ca60         Table_map: `Apache Flinkdb`.`tab` mapped to number 96
# at 2450
#180430 22:37:45 server id 1  end_log_pos 2516 CRC32 0xd50de69f         Update_rows: table id 96 flags: STMT_END_F
### UPDATE `Apache Flinkdb`.`tab`
### WHERE
###   @1=1
###   @2='Mary'
###   @3=2
### SET
###   @1=1
###   @2='Mary'
###   @3=3
# at 2516
#180430 22:37:45 server id 1  end_log_pos 2547 CRC32 0x94f89393         Xid = 30
COMMIT/*!*/;
SET @@SESSION.GTID_NEXT= 'AUTOMATIC' /* added by MySQLbinlog */ /*!*/;
DELIMITER ;
# End of log file
/*!50003 SET COMPLETION_TYPE=@OLD_COMPLETION_TYPE*/;
/*!50530 SET @@SESSION.PSEUDO_SLAVE_MODE=0*/;
  • 梳理操作和binlog的记录关系

    DML binlog-header(timestamp) data
    insert into blink_tab(user, clicks) values ('Mary', 1);

    1525099013

    (2018/4/30 22:36:53)

    ## INSERT INTO blinkdb.blink_tab
    ### SET
    ### @1=1
    ### @2='Mary'
    ### @3=1
    insert into blink_tab(user, clicks) values ('Bob', 1);

    1525099026

    (2018/4/30 22:37:06)

    ### INSERT INTO blinkdb.blink_tab
    ### SET
    ### @1=2
    ### @2='Bob'
    ### @3=1 
     update blink_tab set clicks=2 where user='Mary';

     1525099035

    (2018/4/30 22:37:15)

     ### UPDATE blinkdb.blink_tab
    ### WHERE
    ### @1=1
    ### @2='Mary'
    ### @3=1
    ### SET
    ### @1=1
    ### @2='Mary'
    ### @3=2
     insert into blink_tab(user, clicks) values ('Llz', 1);

    1525099047

    (2018/4/30 22:37:27) 

    ### INSERT INTO blinkdb.blink_tab
    ### SET
    ### @1=3
    ### @2='Llz'
    ### @3=1 
     update blink_tab set clicks=2 where user='Bob';

     1525099056

    (2018/4/30 22:37:36)

    ### UPDATE blinkdb.blink_tab
    ### WHERE
    ### @1=2
    ### @2='Bob'
    ### @3=1
    ### SET
    ### @1=2
    ### @2='Bob'
    ### @3=2 
     update blink_tab set clicks=3 where user='Mary';

    1525099065

    (2018/4/30 22:37:45) 

    ### UPDATE blinkdb.blink_tab
    ### WHERE
    ### @1=1
    ### @2='Mary'
    ### @3=2
    ### SET
    ### @1=1
    ### @2='Mary'
    ### @3=3 
  • 简化一下binlog

timestamp user clicks
1525099013 Mary 1
1525099026 Bob 1
1525099035 Mary 2
1525099047 LIz 1
1525099056 Bob 2
1525099065 Mary 3
  • replay binlog会得到如下表数据(按timestamp顺序)
user clicks
Mary 3
Bob 2
LIz 1
  • 表与binlog的关系简单示意如下
    image

流表对偶(duality)性

前面我花费了一些时间介绍了MySQL主备复制机制和binlog的数据格式,binlog中携带时间戳,我们将所有表的操作都按时间进行记录下来形成binlog,而对binlog的event进行重放的过程就是流数据处理的过程,重放的结果恰恰又形成了一张表。也就是表的操作会形成携带时间的事件流,对流的处理又会形成一张不断变化的表,表和流具有等价性,可以互转。随着时间推移,DML操作不断进行,那么表的内容也不断变化,具体如下:

image

如上图所示内容,流和表具备相同的特征:

  • 表 - Schema,Data,DML操作时间
  • 流 - Schema,Data, Data处理时间

我们发现,虽然大多数表上面没有明确的显示出DML操作时间,但本质上数据库系统里面是有数据操作时间信息的,这个和流上数据的处理时间(processing time)/产生时间(event-time)相对应。流与表具备相同的特征,可以信息无损的相互转换,我称之为流表对偶(duality)性。

上面我们描述的表,在流上称之为动态表(Dynamic Table),原因是在流上面任何一个事件的到来都是对表上数据的一次更新(包括插入和删除),表的内容是不断的变化的,任何一个时刻流的状态和表的快照一一对应。流与动态表(Dynamic Table)在时间维度上面具有等价性,这种等价性我们称之为流和动态表(Dynamic Table)的对偶(duality)性。

小结

本篇主要介绍Apache Flink作为一个流计算平台为什么可以为用户提供SQL API。其根本原因是如果将流上的数据看做是结构化的数据,流任务的核心是将一个具有时间属性的结构化数据变成同样具有时间属性的另一个结构化数据,而表的数据变化过程binlog恰恰就是一份具有时间属性的流数据,流与表具有信息无损的相互转换的特性,这种流表对偶性也决定了Apache Flink可以采用SQL作为流任务的开发语言。

关于点赞和评论

本系列文章难免有很多缺陷和不足,真诚希望读者对有收获的篇章给予点赞鼓励,对有不足的篇章给予反馈和建议,先行感谢大家!

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
1月前
|
存储 人工智能 大数据
The Past, Present and Future of Apache Flink
本文整理自阿里云开源大数据负责人王峰(莫问)在 Flink Forward Asia 2024 上海站主论坛开场的分享,今年正值 Flink 开源项目诞生的第 10 周年,借此时机,王峰回顾了 Flink 在过去 10 年的发展历程以及 Flink社区当前最新的技术成果,最后展望下一个十年 Flink 路向何方。
353 33
The Past, Present and Future of Apache Flink
|
3月前
|
SQL Java API
Apache Flink 2.0-preview released
Apache Flink 社区正积极筹备 Flink 2.0 的发布,这是自 Flink 1.0 发布以来的首个重大更新。Flink 2.0 将引入多项激动人心的功能和改进,包括存算分离状态管理、物化表、批作业自适应执行等,同时也包含了一些不兼容的变更。目前提供的预览版旨在让用户提前尝试新功能并收集反馈,但不建议在生产环境中使用。
966 13
Apache Flink 2.0-preview released
|
3月前
|
存储 缓存 算法
分布式锁服务深度解析:以Apache Flink的Checkpointing机制为例
【10月更文挑战第7天】在分布式系统中,多个进程或节点可能需要同时访问和操作共享资源。为了确保数据的一致性和系统的稳定性,我们需要一种机制来协调这些进程或节点的访问,避免并发冲突和竞态条件。分布式锁服务正是为此而生的一种解决方案。它通过在网络环境中实现锁机制,确保同一时间只有一个进程或节点能够访问和操作共享资源。
148 3
|
4月前
|
SQL 消息中间件 关系型数据库
Apache Doris Flink Connector 24.0.0 版本正式发布
该版本新增了对 Flink 1.20 的支持,并支持通过 Arrow Flight SQL 高速读取 Doris 中数据。
|
5月前
|
消息中间件 监控 数据挖掘
基于RabbitMQ与Apache Flink构建实时分析系统
【8月更文第28天】本文将介绍如何利用RabbitMQ作为数据源,结合Apache Flink进行实时数据分析。我们将构建一个简单的实时分析系统,该系统能够接收来自不同来源的数据,对数据进行实时处理,并将结果输出到另一个队列或存储系统中。
343 2
|
4月前
|
消息中间件 资源调度 API
Apache Flink 流批融合技术介绍
本文源自阿里云高级研发工程师周云峰在Apache Asia Community OverCode 2024的分享,内容涵盖从“流批一体”到“流批融合”的演进、技术解决方案及社区进展。流批一体已在API、算子和引擎层面实现统一,但用户仍需手动配置作业模式。流批融合旨在通过动态调整优化策略,自动适应不同场景需求。文章详细介绍了如何通过量化指标(如isProcessingBacklog和isInsertOnly)实现这一目标,并展示了针对不同场景的具体优化措施。此外,还概述了社区当前进展及未来规划,包括将优化方案推向Flink社区、动态调整算子流程结构等。
460 31
Apache Flink 流批融合技术介绍
|
3月前
|
分布式计算 监控 大数据
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
大数据-148 Apache Kudu 从 Flink 下沉数据到 Kudu
107 1
|
3月前
|
数据挖掘 物联网 数据处理
深入探讨Apache Flink:实时数据流处理的强大框架
在数据驱动时代,企业需高效处理实时数据流。Apache Flink作为开源流处理框架,以其高性能和灵活性成为首选平台。本文详细介绍Flink的核心特性和应用场景,包括实时流处理、强大的状态管理、灵活的窗口机制及批处理兼容性。无论在实时数据分析、金融服务、物联网还是广告技术领域,Flink均展现出巨大潜力,是企业实时数据处理的理想选择。随着大数据需求增长,Flink将继续在数据处理领域发挥重要作用。
292 0
|
3月前
|
消息中间件 druid Kafka
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
从Apache Flink到Kafka再到Druid的实时数据传输,用于分析/决策
109 0
|
5月前
|
Java 微服务 Spring
驾驭复杂性:Spring Cloud在微服务构建中的决胜法则
【8月更文挑战第31天】Spring Cloud是在Spring Framework基础上打造的微服务解决方案,提供服务发现、配置管理、消息路由等功能,适用于构建复杂的微服务架构。本文介绍如何利用Spring Cloud搭建微服务,包括Eureka服务发现、Config Server配置管理和Zuul API网关等组件的配置与使用。通过Spring Cloud,可实现快速开发、自动化配置,并提升系统的伸缩性和容错性,尽管仍需面对分布式事务等挑战,但其强大的社区支持有助于解决问题。
99 0

相关产品

  • 实时计算 Flink版
  • 推荐镜像

    更多