Abstract: How can we build a stable, high-concurrency instant messaging (IM) system architecture? This is a common requirement when building a social networking IM or apps like WeChat Moments. In such scenarios, it is a basic requirement to update a user's messages and new posts in his/her Moments to the user's friends in a timely and accurate manner. To achieve this, we usually need to set an incremental serial number or ID for each message sent by the user or each new post in the user's feed.
How can we build a stable, high-concurrency IM system architecture? This requirement is frequently seen when we build a social networking IM or apps like WeChat Moments. In such scenarios, it is a basic requirement to update a user messages and new posts in his/her Moments to the user's friends in a timely and accurate manner. To achieve this, an incremental serial number or ID is usually required for each message or update by the user. With this mechanism, all messages can be completely processed by receiving terminals in the correct order. This mechanism ensures that all messages are integrate and processed in the correct order by the receiving end.
If the total number of messages or the concurrency of sent messages is high, NoSQL storage products are often used to store the messages. However, common NoSQL products do not have the column auto-increment feature and thus external components are usually required to increment the serial numbers and IDs of messages. This complicates the overall architecture and aggravates the latency of the entire link.
Cloud Production Solution
The auto-increment feature of primary key columns provided by Alibaba Cloud Table Store is perfect for solving the problems mentioned above. The specific process is as follows. Declare a primary key column as an auto-increment column when creating a table. As a result, the app does not have to enter an actual value but enters only a placeholder in the auto-increment column when writing a row of new data. Table Store automatically generates a value for the auto-increment column upon receiving the row of new data, while ensuring that the value generated later is greater than the one generated earlier in the same partitioning key range.
The auto-increment feature of primary key columns has the following characteristics:
The system architecture exclusive to Table Store and the implementation through auto-incrementing primary key columns ensure that the value generated for the auto-incrementing column is unique and strictly incrementing.
Currently, multiple primary keys are supported while the first primary key must be the partitioning key. To ensure even distribution of data, the partitioning key column cannot be set as an auto-incrementing column.
Given that restriction, the auto-increment of the primary key column is actually at the partitioning key level.
Any primary key columns except the partitioning key column can be set as the auto-incrementing column.
Currently, only one primary key column can be set as the auto-incrementing column for each table.
The attribute column is not allowed to be set as auto-incrementing.
The values generated by the auto-incrementing column are 64-bit signed long integers.
The auto-incrementing column feature is table-oriented, namely the same instance can have both tables with auto-incrementing columns and tables without auto-incrementing columns.
The auto-incrementing column can be set only when you create a table. You cannot set an auto-incrementing column in an existing table.
The next section explains how to use the auto-increment feature of primary key columns of Table Store in a specific scenario.
Application Scenario
In this example scenario, we will build an IM chat tool to describe the roles and usage of the auto-increment feature.
Features
This IM chat tool needs to support the following features:
One-on-one chat
Group chat
Multi-terminal message synchronization for the same user
Existing architecture
Step 1: Determine a messaging model
The figure above shows the messaging model.
A message sent by the sender will be pushed to the background system by the sender's client.
The background system will first store the message temporarily.
After storing the message successfully, the background system will push the message to the receiver's client.
Step 2: Determine background architecture
The background architecture mainly consists of two parts: the logic layer and the storage layer.
The logic layer includes the application server, queuing service, and auto-incrementing ID generator. This layer is the core of the entire background architecture and is responsible for executing various core services including message receiving, pushing and notification and group message copy-on-write.
The storage layer is mainly used for persistence of message data and other data as needed.
For one-on-one chat, the sender first sends a message to the application server. The application server stores the message to the table where the primary key is the receiver while notifying the message push service in the application server of a new message. Finally, the message push service uses the ID of the last message pushed to the receiver as the initial primary key value to read all the messages with an ID greater than the ID from the storage system and push the messages to the receiver.
For group chat, the messaging logic is even more complicated. Specifically, the logic uses an asynchronous queue to complete the extensive writing of messages, namely a message sent to the group will be stored for each member of the group.
The figure above shows the group messaging process without the storage layer.
The process uses extensive writing but not extensive reading for the following two reasons:
The number of members in a group is usually small, and storage costs are low. The cost is even lower with data compression.
By extensively writing messages to each member's storage space (inbox), the system only needs to check the inbox of each member when pushing messages to each receiver. Under these conditions, the processing logic of group chat is the same as that of single chat and is easy to implement.
After a message is sent by the sender, it will be pushed to the application server by the sender's client. The application server distributes the message to a queue (where the messages to the same receiver are stored in the same queue) according to the receiver ID. In the queue, messages will be processed in order. Specifically, a new message ID will be obtained for a new message from the auto-incrementing ID generator, and then the message will be written to the Table Store. After the message is written successfully, the next message will be written in the same way above.
Normally, messages to the same receiver are stored in the same queue. However, a single queue may contain messages to multiple receivers.
Data in each queue will be processed serially. Whenever a piece of data is written to the Table Store, a new ID will be assigned by the system, which is greater than any previous IDs. To ensure message IDs are strictly-incrementing and to avoid strict increment failure because of writing failure of the previous message, a user-level lock is required to prevent other messages from being written until the previous writing operation succeeds when data is written to the storage system. This lock ensures that all messages are written in order and will be released once a writing operation succeeds.
In the previous step, all messages need to be reprocessed if the queue fails. In this situation, these messages will be moved to a new queue and new message IDs greater than any previous IDs are required by the new queue. However, the new queue does not know what the greatest ID is so far and requires a global auto-increment ID generator as it cannot generate auto-incrementing IDs by itself.
In multi-terminal scenarios, when an online terminal goes offline, the session of the terminal will be stored to another table by the application server. When the terminal comes online again, the session will be restored from the storage system and unread messages will be pushed to the terminal.
Step 3: Determine a storage system
We selected Alibaba Cloud Table Store as the storage system for the following reasons:
Write operations support both single-row writing and multi-row batch writing modes to meet high-concurrency data writing requirements.
Range-specific reading and paging are supported for massive messages.
Data lifecycle management is supported to clear expired data automatically, saving storage costs. For details, see this document.
The Alibaba Cloud Table Store is a commercial cloud service with proven reliability and stability.
The Table Store is cost-effective, and special-offer packages are available for users with mass data scenarios.
The storage system features high reading/writing performance. For chat messages, the latency is as low as milliseconds or even microseconds.
Step 4: Determine a table structure
The determined table structure of the Table Store is as follows:
Primary key sequence | Primary key name | Primary key value | Description |
1 | partition_key | First 4 digits of md5(receive_id) | Partitioning key for ensuring even data distribution |
2 | receive_id | receive_id | Receiver's user ID |
3 | message_id | message_id | Message ID |
The table structure of Table Store consists of two parts: primary key columns and property columns. Primary key columns support up to four primary keys where the first primary key is the partitioning key.
Before using the storage system, you must determine the structure of primary key columns, which cannot be changed during use. Comparatively, property columns are schema-free and customizable. Given that property columns for each row of data can be different, you only need to design the structure of primary key columns.
The first primary key is the partitioning key, which is designed to ensure the even distribution of data and requests without hotspot congestion. Given that messages are eventually read according to receivers, you can use the receiver's ID as the partitioning key. To ensure better data distribution, you can also use part of the MD5 value (such as the first four characters) of the receiver's ID for the same purpose. By doing this, you can ensure better even data distribution.
To locate the messages to the receiver, the full receiver's ID must be stored, as the first primary key uses only part of the receiver's ID. Therefore, you can use the receiver's ID as the second primary key.
The third primary key can be the message ID, and the values of this primary key must be monotonically incrementing so that the latest message can be queried.
Property columns can store message content and metadata.
So far, a complete chat system has been built with high-concurrency processing capabilities and high performance. However, this system is also exposed to certain challenges.
Challenges
When messages to multiple users are in the same queue, those messages will be processed serially. To ensure the IDs of the messages are strict-incrementing, a lock is required during the processing. This introduces a challenge: if the number of messages to a user is massive, the total number of messages in the queue where the user resides is high and this may congest other users' messages, resulting in delayed messages for other users.
When the chat message volume becomes too large during major events or holidays, the capacity of the queue must be expanded, or the overall system latency may increase dramatically or even cause the system to crash due to overload.
Problem 2 can be resolved by adding hardware devices but problem 1 cannot be solved in the same manner. Is there better solution to both problems?
New Architecture
The complexity of both problems lies in forcible strict-incrementing messages. However, using the auto-incrementing feature of primary key columns simplifies the upper application layer.
The new architecture with Table Store auto-incrementing feature of primary key columns enabled is as follows:
The major difference from the simplified architecture lies in the removal of two components: the queuing service and the auto-incrementing ID generator.
When a message is received by the application server, the message will be directly written to the Table Store. No definite value needs to be entered during data writing but you must define a specific placeholder for the message_id of the auto-incrementing primary key column. The value will then be generated inside the Table Store.
In the new architecture, auto-increment operations are executed inside the Table Store. Even when multiple application servers write messages to the same receiver in the Table Store, the storage system still can ensure that all of the messages are processed serially and that each message has a unique and strict-incrementing message ID. Therefore, the queuing service is no longer required in the new architecture. Eventually, this completely solves problem 1.
On the other hand, given that the Table Store is a cloud service and supports the pay-as-you-go payment option, you do not need to consider the capacity of the system, which completely solves problem 2.
In the former architecture, messages to the same user must be in the same queue. Comparatively, those messages can be concurrently processed by multiple queues, and a message volume surge for a single user does not affect other users, instead, the load is evenly distributed to all queues.
By using the auto-increment feature of primary key columns, application servers can directly write data to the Table Store without waiting in queues and obtaining message IDs, delivering better performance.
Implementation
With the architecture diagram above, you can implement it using a Java SDK. Java SDK v4.2.0 already supports the auto-increment feature of primary key columns. To download the documentation and installer of the SDK, visit Java SDK v4.2.0 documentation.
Step 1: Create a table
According to the aforementioned design, the table structure is as follows:
Primary key sequence | Primary key name | Primary key value | Description |
1 | partition_key | First four digits of hash(receive_id) | Partitioning key for ensuring even data distribution, and MD5 values can be used as the hash function. |
2 | receive_id | receive_id | Receiver's user ID |
3 | message_id | message_id | Message ID |
private static void createTable(SyncClient client) { TableMeta = new TableMeta("message_table"); // The first column is the partitioning key tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("partition_key", PrimaryKeyType.STRING)); // The second column is the receiverID tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("receive_id", PrimaryKeyType.STRING)); // The third column is the auto-incrementing message ID. The value type is INTEGER and its attribute is PKO_AUTO_INCREMENT tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("message_id", PrimaryKeyType.INTEGER, PrimaryKeyOption.AUTO_INCREMENT)); int timeToLive = -1; // It never expires. You can also set an expiration period and the data will be removed automatically upon expiration int maxVersions = 1; // Only one version is saved. Multiple versions are currently supported. TableOptions tableOptions = new TableOptions(timeToLive, maxVersions); CreateTableRequest request = new CreateTableRequest(tableMeta, tableOptions); client.createTable(request); }
By executing the code above, you can create a table where the primary key column in the third column is auto-incrementing.
Step 2: Write data
Currently, PutRow and BatchWriteRow data writing modes are supported and both modes support the auto-increment feature of primary key columns. During data writing, the third column message_id is the auto-incrementing column and no value needs to be entered in this column but a placeholder.private static void putRow(SyncClient client, String receive_id) { // Construct the primary key PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder(); // Values in the first column are the first four digits of hash(receive_id)λ primaryKeyBuilder.addPrimaryKeyColumn("partition_key", PrimaryKeyValue.fromString(hash(receive_id).substring(4))); // Values in the second column are the receiver IDs primaryKeyBuilder.addPrimaryKeyColumn("receive_id", PrimaryKeyValue.fromString(receive_id)); // The third column stores the message ID, auto-incrementing. This value is generated by Table Store. You do not need to enter the real value here, just enter a placeholder: AUTO_INCREMENT. primaryKeyBuilder.addPrimaryKeyColumn("message_id", PrimaryKeyValue.AUTO_INCREMENT); PrimaryKey primaryKey = primaryKeyBuilder.build(); RowPutChange rowPutChange = new RowPutChange("message_table", primaryKey); // The return type is set as RT_PK, which includes the value of the primary key column in the returned result. If ReturnType is not set, no result will return by default. rowPutChange.setReturnType(ReturnType.RT_PK); //Add the attribute column. The message content. rowPutChange.addColumn(new Column("content", ColumnValue.fromString(content))); //Write data to Table Store PutRowResponse response = client.putRow(new PutRowRequest(rowPutChange)); // Print the returned PK column. Row returnRow = response.getRow(); if (returnRow != null) { System.out.println("PrimaryKey:" + returnRow.getPrimaryKey().toString()); } // Print the CU used. CapacityUnit cu = response.getConsumedCapacity().getCapacityUnit(); System.out.println("Read CapacityUnit:" + cu.getReadCapacityUnit()); System.out.println("Write CapacityUnit:" + cu.getWriteCapacityUnit()); }
Step 3: Read data
During data reading, read the latest message using GetRange. The starting value of the message_id primary key column is the message_id value of the previous message plus 1 and the ending value is INF_MAX. In this way, you can always read the latest message and send it to the client.private static void getRange(SyncClient client, String receive_id, String lastMessageId) { RangeRowQueryCriteria rangeRowQueryCriteria = new RangeRowQueryCriteria("message_table"); // Set the starting primary key PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder(); // Values in the first column are the first four digits of hash(receive_id). primaryKeyBuilder.addPrimaryKeyColumn("partition_key", PrimaryKeyValue.fromString(hash(receive_id).substring(4))); // Values in the second column are the receiver IDs. primaryKeyBuilder.addPrimaryKeyColumn("receive_id", PrimaryKeyValue.fromString(receive_id)); // Values in the third column are the message IDs, starting from the last message. primaryKeyBuilder.addPrimaryKeyColumn("message_id", PrimaryKeyValue.fromLong(lastMessageId + 1)); rangeRowQueryCriteria.setInclusiveStartPrimaryKey(primaryKeyBuilder.build()); // Set the ending primary key. primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder(); // Values in the first column are the first four digits of hash(receive_id). primaryKeyBuilder.addPrimaryKeyColumn("partition_key", PrimaryKeyValue.fromString(hash(receive_id).substring(4))); // Values in the second column are the receiver IDs. primaryKeyBuilder.addPrimaryKeyColumn("receive_id", PrimaryKeyValue.fromString(receive_id)); // Values in the third column are the message IDs. primaryKeyBuilder.addPrimaryKeyColumn("message_id", PrimaryKeyValue.INF_MAX); rangeRowQueryCriteria.setExclusiveEndPrimaryKey(primaryKeyBuilder.build()); rangeRowQueryCriteria.setMaxVersions(1); System.out.println("GetRange result is:"); while (true) { GetRangeResponse getRangeResponse = client.getRange(new GetRangeRequest(rangeRowQueryCriteria)); for (Row row : getRangeResponse.getRows()) { System.out.println(row); } // If nextStartPrimaryKey is not null, data reading will continue. if (getRangeResponse.getNextStartPrimaryKey() != null) { rangeRowQueryCriteria.setInclusiveStartPrimaryKey(getRangeResponse.getNextStartPrimaryKey()); } else { break; } } }