Large-Scale Instant Messaging Hosting on Alibaba Cloud

本文涉及的产品
表格存储 Tablestore,50G 2个月
简介: How can we build a stable, high-concurrency instant messaging (IM) system architecture?

Abstract: How can we build a stable, high-concurrency instant messaging (IM) system architecture? This is a common requirement when building a social networking IM or apps like WeChat Moments. In such scenarios, it is a basic requirement to update a user's messages and new posts in his/her Moments to the user's friends in a timely and accurate manner. To achieve this, we usually need to set an incremental serial number or ID for each message sent by the user or each new post in the user's feed.

How can we build a stable, high-concurrency IM system architecture? This requirement is frequently seen when we build a social networking IM or apps like WeChat Moments. In such scenarios, it is a basic requirement to update a user messages and new posts in his/her Moments to the user's friends in a timely and accurate manner. To achieve this, an incremental serial number or ID is usually required for each message or update by the user. With this mechanism, all messages can be completely processed by receiving terminals in the correct order. This mechanism ensures that all messages are integrate and processed in the correct order by the receiving end.

If the total number of messages or the concurrency of sent messages is high, NoSQL storage products are often used to store the messages. However, common NoSQL products do not have the column auto-increment feature and thus external components are usually required to increment the serial numbers and IDs of messages. This complicates the overall architecture and aggravates the latency of the entire link.

Cloud Production Solution

The auto-increment feature of primary key columns provided by Alibaba Cloud Table Store is perfect for solving the problems mentioned above. The specific process is as follows. Declare a primary key column as an auto-increment column when creating a table. As a result, the app does not have to enter an actual value but enters only a placeholder in the auto-increment column when writing a row of new data. Table Store automatically generates a value for the auto-increment column upon receiving the row of new data, while ensuring that the value generated later is greater than the one generated earlier in the same partitioning key range.

The auto-increment feature of primary key columns has the following characteristics:

The system architecture exclusive to Table Store and the implementation through auto-incrementing primary key columns ensure that the value generated for the auto-incrementing column is unique and strictly incrementing.

Currently, multiple primary keys are supported while the first primary key must be the partitioning key. To ensure even distribution of data, the partitioning key column cannot be set as an auto-incrementing column.

Given that restriction, the auto-increment of the primary key column is actually at the partitioning key level.

Any primary key columns except the partitioning key column can be set as the auto-incrementing column.

Currently, only one primary key column can be set as the auto-incrementing column for each table.

The attribute column is not allowed to be set as auto-incrementing.

The values generated by the auto-incrementing column are 64-bit signed long integers.

The auto-incrementing column feature is table-oriented, namely the same instance can have both tables with auto-incrementing columns and tables without auto-incrementing columns.

The auto-incrementing column can be set only when you create a table. You cannot set an auto-incrementing column in an existing table.

The next section explains how to use the auto-increment feature of primary key columns of Table Store in a specific scenario.

Application Scenario

In this example scenario, we will build an IM chat tool to describe the roles and usage of the auto-increment feature.

Features

This IM chat tool needs to support the following features:

One-on-one chat

Group chat

Multi-terminal message synchronization for the same user

Existing architecture

Step 1: Determine a messaging model

37_1

The figure above shows the messaging model.

A message sent by the sender will be pushed to the background system by the sender's client.

The background system will first store the message temporarily.

After storing the message successfully, the background system will push the message to the receiver's client.

Step 2: Determine background architecture

37_2

The background architecture mainly consists of two parts: the logic layer and the storage layer.

The logic layer includes the application server, queuing service, and auto-incrementing ID generator. This layer is the core of the entire background architecture and is responsible for executing various core services including message receiving, pushing and notification and group message copy-on-write.

The storage layer is mainly used for persistence of message data and other data as needed.

For one-on-one chat, the sender first sends a message to the application server. The application server stores the message to the table where the primary key is the receiver while notifying the message push service in the application server of a new message. Finally, the message push service uses the ID of the last message pushed to the receiver as the initial primary key value to read all the messages with an ID greater than the ID from the storage system and push the messages to the receiver.

For group chat, the messaging logic is even more complicated. Specifically, the logic uses an asynchronous queue to complete the extensive writing of messages, namely a message sent to the group will be stored for each member of the group.

37_3

The figure above shows the group messaging process without the storage layer.

The process uses extensive writing but not extensive reading for the following two reasons:

The number of members in a group is usually small, and storage costs are low. The cost is even lower with data compression.

By extensively writing messages to each member's storage space (inbox), the system only needs to check the inbox of each member when pushing messages to each receiver. Under these conditions, the processing logic of group chat is the same as that of single chat and is easy to implement.

After a message is sent by the sender, it will be pushed to the application server by the sender's client. The application server distributes the message to a queue (where the messages to the same receiver are stored in the same queue) according to the receiver ID. In the queue, messages will be processed in order. Specifically, a new message ID will be obtained for a new message from the auto-incrementing ID generator, and then the message will be written to the Table Store. After the message is written successfully, the next message will be written in the same way above.

Normally, messages to the same receiver are stored in the same queue. However, a single queue may contain messages to multiple receivers.

37_4

Data in each queue will be processed serially. Whenever a piece of data is written to the Table Store, a new ID will be assigned by the system, which is greater than any previous IDs. To ensure message IDs are strictly-incrementing and to avoid strict increment failure because of writing failure of the previous message, a user-level lock is required to prevent other messages from being written until the previous writing operation succeeds when data is written to the storage system. This lock ensures that all messages are written in order and will be released once a writing operation succeeds.

In the previous step, all messages need to be reprocessed if the queue fails. In this situation, these messages will be moved to a new queue and new message IDs greater than any previous IDs are required by the new queue. However, the new queue does not know what the greatest ID is so far and requires a global auto-increment ID generator as it cannot generate auto-incrementing IDs by itself.

37_5

In multi-terminal scenarios, when an online terminal goes offline, the session of the terminal will be stored to another table by the application server. When the terminal comes online again, the session will be restored from the storage system and unread messages will be pushed to the terminal.

Step 3: Determine a storage system

We selected Alibaba Cloud Table Store as the storage system for the following reasons:

Write operations support both single-row writing and multi-row batch writing modes to meet high-concurrency data writing requirements.

Range-specific reading and paging are supported for massive messages.

Data lifecycle management is supported to clear expired data automatically, saving storage costs. For details, see this document.

The Alibaba Cloud Table Store is a commercial cloud service with proven reliability and stability.

The Table Store is cost-effective, and special-offer packages are available for users with mass data scenarios.

The storage system features high reading/writing performance. For chat messages, the latency is as low as milliseconds or even microseconds.

Step 4: Determine a table structure

The determined table structure of the Table Store is as follows:

Primary key sequence Primary key name Primary key value Description
1 partition_key First 4 digits of md5(receive_id) Partitioning key for ensuring even data distribution
2 receive_id receive_id Receiver's user ID
3 message_id message_id Message ID

The table structure of Table Store consists of two parts: primary key columns and property columns. Primary key columns support up to four primary keys where the first primary key is the partitioning key.

Before using the storage system, you must determine the structure of primary key columns, which cannot be changed during use. Comparatively, property columns are schema-free and customizable. Given that property columns for each row of data can be different, you only need to design the structure of primary key columns.

The first primary key is the partitioning key, which is designed to ensure the even distribution of data and requests without hotspot congestion. Given that messages are eventually read according to receivers, you can use the receiver's ID as the partitioning key. To ensure better data distribution, you can also use part of the MD5 value (such as the first four characters) of the receiver's ID for the same purpose. By doing this, you can ensure better even data distribution.

To locate the messages to the receiver, the full receiver's ID must be stored, as the first primary key uses only part of the receiver's ID. Therefore, you can use the receiver's ID as the second primary key.

The third primary key can be the message ID, and the values of this primary key must be monotonically incrementing so that the latest message can be queried.

Property columns can store message content and metadata.

So far, a complete chat system has been built with high-concurrency processing capabilities and high performance. However, this system is also exposed to certain challenges.

Challenges

When messages to multiple users are in the same queue, those messages will be processed serially. To ensure the IDs of the messages are strict-incrementing, a lock is required during the processing. This introduces a challenge: if the number of messages to a user is massive, the total number of messages in the queue where the user resides is high and this may congest other users' messages, resulting in delayed messages for other users.

When the chat message volume becomes too large during major events or holidays, the capacity of the queue must be expanded, or the overall system latency may increase dramatically or even cause the system to crash due to overload.

Problem 2 can be resolved by adding hardware devices but problem 1 cannot be solved in the same manner. Is there better solution to both problems?

New Architecture

The complexity of both problems lies in forcible strict-incrementing messages. However, using the auto-incrementing feature of primary key columns simplifies the upper application layer.

The new architecture with Table Store auto-incrementing feature of primary key columns enabled is as follows:

37_6

The major difference from the simplified architecture lies in the removal of two components: the queuing service and the auto-incrementing ID generator.

When a message is received by the application server, the message will be directly written to the Table Store. No definite value needs to be entered during data writing but you must define a specific placeholder for the message_id of the auto-incrementing primary key column. The value will then be generated inside the Table Store.

In the new architecture, auto-increment operations are executed inside the Table Store. Even when multiple application servers write messages to the same receiver in the Table Store, the storage system still can ensure that all of the messages are processed serially and that each message has a unique and strict-incrementing message ID. Therefore, the queuing service is no longer required in the new architecture. Eventually, this completely solves problem 1.

On the other hand, given that the Table Store is a cloud service and supports the pay-as-you-go payment option, you do not need to consider the capacity of the system, which completely solves problem 2.

In the former architecture, messages to the same user must be in the same queue. Comparatively, those messages can be concurrently processed by multiple queues, and a message volume surge for a single user does not affect other users, instead, the load is evenly distributed to all queues.

By using the auto-increment feature of primary key columns, application servers can directly write data to the Table Store without waiting in queues and obtaining message IDs, delivering better performance.

Implementation

With the architecture diagram above, you can implement it using a Java SDK. Java SDK v4.2.0 already supports the auto-increment feature of primary key columns. To download the documentation and installer of the SDK, visit Java SDK v4.2.0 documentation.

Step 1: Create a table

According to the aforementioned design, the table structure is as follows:

Primary key sequence Primary key name Primary key value Description
1 partition_key First four digits of hash(receive_id) Partitioning key for ensuring even data distribution, and MD5 values can be used as the hash function.
2 receive_id receive_id Receiver's user ID
3 message_id message_id Message ID
The primary key in the third column is message_id and this column is the auto-incrementing primary key column. When creating the table, set the property and type of the message_id column as AUTO_INCREMENT and INTEGER respectively.
private static void createTable(SyncClient client) {         TableMeta = new TableMeta("message_table");          // The first column is the partitioning key         tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("partition_key", PrimaryKeyType.STRING));          // The second column is the receiverID         tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("receive_id", PrimaryKeyType.STRING));          // The third column is the auto-incrementing message ID. The value type is INTEGER and its attribute is PKO_AUTO_INCREMENT         tableMeta.addPrimaryKeyColumn(new PrimaryKeySchema("message_id", PrimaryKeyType.INTEGER, PrimaryKeyOption.AUTO_INCREMENT));          int timeToLive = -1;  // It never expires. You can also set an expiration period and the data will be removed automatically upon expiration         int maxVersions = 1;  // Only one version is saved. Multiple versions are currently supported.           TableOptions tableOptions = new TableOptions(timeToLive, maxVersions);          CreateTableRequest request = new CreateTableRequest(tableMeta, tableOptions);          client.createTable(request);     }
By executing the code above, you can create a table where the primary key column in the third column is auto-incrementing.
Step 2: Write data
Currently, PutRow and BatchWriteRow data writing modes are supported and both modes support the auto-increment feature of primary key columns. During data writing, the third column message_id is the auto-incrementing column and no value needs to be entered in this column but a placeholder.
private static void putRow(SyncClient client, String receive_id) {         // Construct the primary key         PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();          // Values in the first column are the first four digits of hash(receive_id)λ         primaryKeyBuilder.addPrimaryKeyColumn("partition_key", PrimaryKeyValue.fromString(hash(receive_id).substring(4)));          // Values in the second column are the receiver IDs         primaryKeyBuilder.addPrimaryKeyColumn("receive_id", PrimaryKeyValue.fromString(receive_id));          // The third column stores the message ID, auto-incrementing. This value is generated by Table Store. You do not need to enter the real value here, just enter a placeholder: AUTO_INCREMENT.          primaryKeyBuilder.addPrimaryKeyColumn("message_id", PrimaryKeyValue.AUTO_INCREMENT);         PrimaryKey primaryKey = primaryKeyBuilder.build();          RowPutChange rowPutChange = new RowPutChange("message_table", primaryKey);          // The return type is set as RT_PK, which includes the value of the primary key column in the returned result. If ReturnType is not set, no result will return by default.         rowPutChange.setReturnType(ReturnType.RT_PK);          //Add the attribute column. The message content.          rowPutChange.addColumn(new Column("content", ColumnValue.fromString(content)));          //Write data to Table Store         PutRowResponse response = client.putRow(new PutRowRequest(rowPutChange));          // Print the returned PK column.         Row returnRow = response.getRow();         if (returnRow != null) {             System.out.println("PrimaryKey:" + returnRow.getPrimaryKey().toString());         }          // Print the CU used.          CapacityUnit  cu = response.getConsumedCapacity().getCapacityUnit();         System.out.println("Read CapacityUnit:" + cu.getReadCapacityUnit());         System.out.println("Write CapacityUnit:" + cu.getWriteCapacityUnit());     }
Step 3: Read data
During data reading, read the latest message using GetRange. The starting value of the message_id primary key column is the message_id value of the previous message plus 1 and the ending value is INF_MAX. In this way, you can always read the latest message and send it to the client.
private static void getRange(SyncClient client, String receive_id, String lastMessageId) {         RangeRowQueryCriteria rangeRowQueryCriteria = new RangeRowQueryCriteria("message_table");          // Set the starting primary key          PrimaryKeyBuilder primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();          // Values in the first column are the first four digits of hash(receive_id).         primaryKeyBuilder.addPrimaryKeyColumn("partition_key", PrimaryKeyValue.fromString(hash(receive_id).substring(4)));          // Values in the second column are the receiver IDs.         primaryKeyBuilder.addPrimaryKeyColumn("receive_id", PrimaryKeyValue.fromString(receive_id));          // Values in the third column are the message IDs, starting from the last message.         primaryKeyBuilder.addPrimaryKeyColumn("message_id", PrimaryKeyValue.fromLong(lastMessageId + 1));         rangeRowQueryCriteria.setInclusiveStartPrimaryKey(primaryKeyBuilder.build());          // Set the ending primary key.         primaryKeyBuilder = PrimaryKeyBuilder.createPrimaryKeyBuilder();          // Values in the first column are the first four digits of hash(receive_id).         primaryKeyBuilder.addPrimaryKeyColumn("partition_key", PrimaryKeyValue.fromString(hash(receive_id).substring(4)));          // Values in the second column are the receiver IDs.         primaryKeyBuilder.addPrimaryKeyColumn("receive_id", PrimaryKeyValue.fromString(receive_id));          // Values in the third column are the message IDs.         primaryKeyBuilder.addPrimaryKeyColumn("message_id", PrimaryKeyValue.INF_MAX);         rangeRowQueryCriteria.setExclusiveEndPrimaryKey(primaryKeyBuilder.build());          rangeRowQueryCriteria.setMaxVersions(1);          System.out.println("GetRange result is:");         while (true) {             GetRangeResponse getRangeResponse = client.getRange(new GetRangeRequest(rangeRowQueryCriteria));             for (Row row : getRangeResponse.getRows()) {                 System.out.println(row);             }              // If nextStartPrimaryKey is not null, data reading will continue.             if (getRangeResponse.getNextStartPrimaryKey() != null) {               rangeRowQueryCriteria.setInclusiveStartPrimaryKey(getRangeResponse.getNextStartPrimaryKey());             } else {                 break;             }         }     }
相关实践学习
消息队列+Serverless+Tablestore:实现高弹性的电商订单系统
基于消息队列以及函数计算,快速部署一个高弹性的商品订单系统,能够应对抢购场景下的高并发情况。
阿里云表格存储使用教程
表格存储(Table Store)是构建在阿里云飞天分布式系统之上的分布式NoSQL数据存储服务,根据99.99%的高可用以及11个9的数据可靠性的标准设计。表格存储通过数据分片和负载均衡技术,实现数据规模与访问并发上的无缝扩展,提供海量结构化数据的存储和实时访问。 产品详情:https://www.aliyun.com/product/ots
目录
相关文章
|
负载均衡 大数据 Linux
|
网络协议 关系型数据库 Linux
Cloud platform build management Topic | Cloud computing (FREE)
云平台构建及管理习题(试读)
168 0
|
存储 负载均衡 网络性能优化
|
存储 弹性计算 API
|
存储 弹性计算 分布式计算
NHCP H2: Computer Resource Management Topic | Cloud computing
HCNP云计算2:计算机资源管理
149 0
|
网络协议 安全 Unix
Admin & Engineer & Services Topic | Cloud computing (FREE)
云计算 Admin & Engineer & Services 习题(试读)
140 0
|
监控 网络协议 安全
Security Topic | Cloud computing (FREE)
云计算 Security 习题(试读)
122 0
|
存储 搜索推荐 Shell
Hype Cycle for Cloud Platform Services, 2022 -- Gartner
Hype Cycle for Cloud Platform Services, 2022 -- Gartner
279 0
Hype Cycle for Cloud Platform Services, 2022 -- Gartner
|
分布式计算 关系型数据库 数据库
New Product Launch: Alibaba Cloud Data Integration
Support online real-time & offline data exchange between all data sources, networks and locations with Alibaba Cloud Data Integration.
14605 0
New Product Launch: Alibaba Cloud Data Integration