Image 1.
Concept of Dynamic Tables
The concept of dynamic tables is analogous to the materialized view in a database. Similar to static batch tables, dynamic tables enable Structured Query Language (SQL) queries and form a new dynamic table with lossless inter-conversions (dual) between streams. The key to improving the existing application program interface (API) lies in changing table content over time. The current streaming table can be considered a dynamic table in the append mode of the dynamic table.
Conversion from Stream to Dynamic Table
When a stream is converted into a table, the conversion mode determines whether the table schema has a primary key defined or not. Let us look at some of these modes.
Append Mode
If the table schema does not include a key definition, use the append mode for conversion. Each new record in the stream is appended to the table as a new row. Data cannot be updated or deleted from the table after it is appended (in the current table, conversion to a new table is not considered here).
Image 2.
Replace Mode
If the table schema includes a key definition, records with unique keys create new entries in the table. Records with existing keys can only update the entries in the table, as illustrated in the image below.
Image 3.
Conversion from Dynamic Table to Stream
The table-to-stream operation refers to sending all changes downstream in the form of change log streams. Two modes exist for this step: retraction mode and update mode.
Retraction Mode
In the retraction mode, insert and delete requests to the Dynamic Table generate insert and omit events respectively. If it is an update change, two types of change events are subsequently generated. The record of a deleted event appears with the same key sent previously, and an insert event generates for the current record, as shown in the figure below.
Image 4.
Update Mode
In update mode, the updated schema is dependent on the Dynamic Table that defines the key. Each change event is a key-value pair. The key corresponds to the value of the table key in the current record. The insert and change values correspond to the new record. If the delete value is empty, it indicates deletion of the key, as shown in the figure below.
Image 5.
Querying Dynamic Tables
Similar to the contents of a dynamic table, the query results also change over time. To better understand the concept of dynamic tables, let us look at an example.
Table A is a dynamic table, and its snapshot at the time of t is denoted as A[t]. Function q queries the snapshot at time t and is denoted as q(A[t]).
Image 6.
Query Limitations
As the stream is infinite, the dynamic table is unbounded. When querying an infinite table, you need to ensure that the query is well timed and meaningful.
1.In practice, Flink converts the query into a continuous streaming application, and the executed query only applies to the current logical time. Therefore, the query for any point in time (A [t]) is unsupported.
2.The most intuitive principle is that the possible state of the query and the computing must be bounded for Flink to support queries for incremental computing. To ensure this, follow these steps:
- Constantly update the queries of the current results: The query can generate insert, update, and delete changes.
The query is expressed as Q(t+1) = q'(Q(t), c(T, t, t+1)), where Q(t) stands for the previous query result of the query q, c(T, t, t+1) represents the change in Table T from t+1 to t, and q' is the incremental version of q. - Generate an append-only table and calculate the new data directly from the end of the input table.
The query is expressed as Q(t+1) = q''(c(T, t-x, t+1)) ∪ Q(t), where q'' is the incremental version query q of q result. When time t is not required, c(T, t-x, t+1) represents the last x+1 data records of Table T, where x is dependent on the syntax. For example, window aggregation in the past hour requires at least all the data from the past hour as its state.
Other supported query types include:
- SELECT WHERE that is executed separately on each row
- GROUP BY clauses on the rowtime (such as the time-based window aggregate)
- OVER windows (row-windows) of ORDER BY rowtime
- ORDER BY rowtime.
3.When the input table is sufficiently small, every data record in the table can be accessed. For example, you can join two stream tables of fixed sizes (i.e., a fixed number of keys).
Bounded Intermediate State
As mentioned above, certain incremental queries need to retain specific data (part of the input data or intermediate results) as the state. To prevent the query from failing, it is important to ensure that the space required for the query is bounded and does not grow infinitely with time. There are two main reasons for the state growth:
- The progress of the intermediate computational state beyond the constraint of the time predicate (such as the expansion of the aggregate key)
- Time is bounded, but historical data is needed (such as window aggregation)
Although the "Last Result Offset" parameter, mentioned below, can resolve the second case, the first instance requires optimizer detection. You should reject intermediate state growth queries that are not subject to time constraints. The optimizer should provide information on how to fix the query and require the appropriate time predicate. Take the following query for example:
SELECT user, page, COUNT(page) AS pCnt
FROM pageviews
GROUP BY user, page
With the increase in the number of users and the number of pages, the intermediate state data increases over time. Adding a time predicate limits the requirements for the storage space:
SELECT user, page, COUNT(page) AS pCnt
FROM pageviews
WHERE rowtime BETWEEN now() - INTERVAL '1' HOUR AND now() // only last hour
GROUP BY user, page
Because not all attributes keep growing, you can tell the optimizer the domain size to infer that the intermediate state will not grow over time, and then queries without time predicates can be accepted.
val sensorT: Table = sensors
.toTable('id, 'loc, 'stime, 'temp)
.attributeDomain('loc, Domain.constant) // domain of 'loc is not growing
env.registerTable("sensors", sensorT)
SELECT loc, AVG(temp) AS avgTemp
FROM sensors
GROUP BY loc
Calculation of Results and Refinement of Time Sequence
Certain relational operators must wait for data to generate for result calculation. For example, if the window closes at 10:30, relational operators need to wait until at least 10:30 to calculate the result. Flink's logical clock (that defines the time as 10:30) depends on the use of event time or processing time. In the case of processing time, the logical time is the wall-clock of each machine. In the case of event time, the watermark provided by the source determines the logical clock. Due to disorderly data and data delays in the event time mode, Flink needs to wait for a period to reduce the imperfection in the calculation results. On the other hand, in specific cases, you might want to get improved early results. Therefore, there are different requirements for calculating, enhancing, and finalizing the results.
The following figure depicts how different configurations of parameters are used to control early results and refine the results.
Image 7
- "First Result Offset" refers to the first early result calculation time. The time is relative to the time for complete result calculation (e.g., relative to the 10:30 end time of the window). If the setting is -10 minutes, for the window with the end time at 10:30, the first result sent is calculated at the logical time of 10:20. The default value for this parameter is 0, which means the result calculates at the end of the window.
- "Complete Result Offset" indicates the time at which the calculation of the complete result finalizes. The time is corresponding to the time taken for making the first full calculation. If the setting is +5 minutes, for the window with the end time at 10:30, the time for generating the complete result is 10:35. This parameter can mitigate the impact of delayed data. The default value is 0, that is, the result calculated at the end of the window is the complete result.
- "Update Rate" signifies the time interval (which can be a time value or the number of occasions) for updating the result before the complete result calculation. Consider a case where the setting is 5 minutes. For the 30-minute tumbling window with start time 10:30, the "First Result Offset" -15 minutes, and the "Complete Result Offset" 2 minutes, the result updates at 10:20, 10: 25, and 10:30. A result generates at 10:15 and the complete result yields at 10:32.
- "Last Updates Switch" means whether the delayed update is calculated for the delayed data, after sending the complete result, until the calculation state clears.
- "Last Result Offset" indicates the time of the last result calculation. This is the time when the internal state clears, and removes the data that arrives after this time. Last Result Offset means that the calculation result is an approximate value and its accuracy is not guaranteed.
Conclusion:
This article explores the topic of dynamic tables. It discusses ways of converting dynamic tables to stream and the importance of the modes used in this conversion. This article also looks at the steps involved in the calculation of results for a time sequence, along with ways of optimizing the results.