一、TDDL概述
1.1 什么是TDDL?
TDDL(Taobao Distributed Data Layer) 是淘宝自主研发的分布式数据库中间件,是解决海量数据存储和高并发访问的核心技术架构。
1.2 核心定位
应用层
↓
TDDL(SQL路由、分片、读写分离、事务协调)
↓
物理数据库集群(MySQL/Oracle/PostgreSQL等)
1.3 发展历程
2008年:淘宝面临数据库瓶颈,开始研发TDDL
2010年:TDDL 1.0上线,支持基本分库分表
2013年:TDDL 2.0支持读写分离、分布式事务
2016年:TDDL 3.0(DRDS)支持弹性扩容、多租户
2020年:开源版本TDDL-Lite发布
二、TDDL核心架构
2.1 整体架构图
graph TB
subgraph "应用层"
A1[应用服务1]
A2[应用服务2]
A3[应用服务N]
end
subgraph "TDDL中间件层"
B1[SQL解析器]
B2[路由决策器]
B3[SQL重写器]
B4[结果聚合器]
B5[事务管理器]
B6[连接池管理器]
end
subgraph "数据库层"
C1[主库1]
C2[从库1]
C3[主库2]
C4[从库2]
C5[主库N]
C6[从库N]
end
A1 --> B1
A2 --> B1
A3 --> B1
B1 --> B2
B2 --> B3
B3 --> B6
B6 --> C1
B6 --> C2
B6 --> C3
B6 --> C4
B6 --> C5
B6 --> C6
B4 --> A1
B4 --> A2
B4 --> A3
2.2 核心组件详解
SQL解析器
class SQLParser:
"""SQL解析器"""def parse(self, sql: str) -> ParsedSQL:
""" 解析SQL语句 Args: sql: SQL语句 Returns: 解析后的SQL对象 """ # 1. 词法分析 tokens = self._lexical_analysis(sql) # 2. 语法分析 ast = self._syntax_analysis(tokens) # 3. 语义分析 parsed_sql = self._semantic_analysis(ast) return parsed_sqldef _lexical_analysis(self, sql: str) -> List[Token]:
"""词法分析""" # 识别关键字、标识符、运算符等 tokens = [] # 实现细节... return tokensdef _syntax_analysis(self, tokens: List[Token]) -> ASTNode:
"""语法分析""" # 构建抽象语法树 root = ASTNode(type='SELECT') # 实现细节... return rootdef _semantic_analysis(self, ast: ASTNode) -> ParsedSQL:
"""语义分析""" parsed = ParsedSQL() # 提取表名 parsed.table_names = self._extract_table_names(ast) # 提取查询条件 parsed.where_conditions = self._extract_where_conditions(ast) # 提取聚合函数 parsed.aggregate_functions = self._extract_aggregate_functions(ast) # 判断是否跨分片 parsed.is_cross_shard = self._check_cross_shard(parsed) return parsed路由决策器
class Router:
"""路由决策器"""def init(self, sharding_config: ShardingConfig):
self.config = sharding_configdef route(self, parsed_sql: ParsedSQL) -> List[RouteResult]:
""" 路由决策 Args: parsed_sql: 解析后的SQL Returns: 路由结果列表 """ results = [] # 单表查询 if len(parsed_sql.table_names) == 1: table_name = parsed_sql.table_names[0] sharding_key = self._extract_sharding_key(parsed_sql) if sharding_key is not None: # 根据分片键计算目标分片 shard_id = self._calculate_shard_id(sharding_key) database = self._get_database_by_shard(shard_id) results.append(RouteResult( database=database, table_suffix=f"_{shard_id}", sql=parsed_sql.original_sql )) else: # 全分片扫描 for shard_id in range(self.config.shard_count): database = self._get_database_by_shard(shard_id) results.append(RouteResult( database=database, table_suffix=f"_{shard_id}", sql=self._rewrite_table_name(parsed_sql, shard_id) )) # 多表JOIN(复杂路由) elif len(parsed_sql.table_names) > 1: # 检查是否同库JOIN if self._is_same_shard_join(parsed_sql): # 同库JOIN优化 results = self._route_same_shard_join(parsed_sql) else: # 跨库JOIN,需要特殊处理 results = self._route_cross_shard_join(parsed_sql) return resultsdef _extract_sharding_key(self, parsed_sql: ParsedSQL) -> Optional[str]:
"""提取分片键""" # 从WHERE条件中提取分片键 for condition in parsed_sql.where_conditions: if condition.column == self.config.sharding_key: return condition.value return Nonedef _calculate_shard_id(self, sharding_key: str) -> int:
"""计算分片ID""" # 哈希分片算法 hash_value = hash(sharding_key) return hash_value % self.config.shard_countSQL重写器
class SQLRewriter:
"""SQL重写器"""def rewrite(self, sql: str, route_result: RouteResult) -> str:
""" 重写SQL语句 Args: sql: 原始SQL route_result: 路由结果 Returns: 重写后的SQL """ # 替换表名 original_table = route_result.original_table new_table = f"{original_table}{route_result.table_suffix}" sql = sql.replace(original_table, new_table) # 处理分页 if "LIMIT" in sql.upper(): sql = self._rewrite_limit(sql, route_result) # 处理聚合函数 if any(func in sql.upper() for func in ['COUNT', 'SUM', 'AVG']): sql = self._rewrite_aggregate(sql, route_result) return sqldef _rewrite_limit(self, sql: str, route_result: RouteResult) -> str:
"""重写LIMIT子句""" # 解析LIMIT import re match = re.search(r'LIMIT\s+(\d+)(?:\s*,\s*(\d+))?', sql, re.IGNORECASE) if match: if match.group(2): # LIMIT offset, count offset = int(match.group(1)) count = int(match.group(2)) # 分布式LIMIT需要特殊处理 if route_result.is_cross_shard: # 每个分片需要计算自己的LIMIT new_limit = f"LIMIT {offset + count}" sql = re.sub(r'LIMIT\s+\d+\s*,\s*\d+', new_limit, sql, flags=re.IGNORECASE) return sql结果聚合器
class ResultAggregator:
"""结果聚合器"""def aggregate(self, results: List[QueryResult]) -> QueryResult:
""" 聚合多个分片的查询结果 Args: results: 分片查询结果列表 Returns: 聚合后的结果 """ if not results: return QueryResult(rows=[], columns=[]) # 简单查询(无聚合函数) if self._is_simple_query(results): return self._aggregate_simple_results(results) # 聚合查询 elif self._is_aggregate_query(results): return self._aggregate_aggregate_results(results) # 跨分片JOIN elif self._is_join_query(results): return self._aggregate_join_results(results) else: raise Exception("不支持的结果聚合类型")def _aggregate_simple_results(self, results: List[QueryResult]) -> QueryResult:
"""聚合简单查询结果""" aggregated_rows = [] for result in results: aggregated_rows.extend(result.rows) # 排序(如果原查询有ORDER BY) if hasattr(results[0], 'sort_key'): aggregated_rows.sort(key=lambda row: row[results[0].sort_key]) return QueryResult(rows=aggregated_rows, columns=results[0].columns)def _aggregate_aggregate_results(self, results: List[QueryResult]) -> QueryResult:
"""聚合聚合查询结果""" # 处理COUNT count = 0 for result in results: if result.rows and len(result.rows[0]) > 0: count += result.rows[0][0] # 假设第一列是COUNT # 处理SUM sum_value = 0 for result in results: if result.rows and len(result.rows[0]) > 1: sum_value += result.rows[0][1] # 处理AVG avg_value = sum_value / count if count > 0 else 0 return QueryResult( rows=[[count, sum_value, avg_value]], columns=['count', 'sum', 'avg'] )三、TDDL核心特性详解
3.1 分库分表策略
3.1.1 分片算法
class ShardingAlgorithm:
"""分片算法基类"""def get_shard_id(self, sharding_key: Any, shard_count: int) -> int:
"""获取分片ID""" raise NotImplementedError
class HashShardingAlgorithm(ShardingAlgorithm):
"""哈希分片算法"""
def get_shard_id(self, sharding_key: Any, shard_count: int) -> int:
hash_value = hash(str(sharding_key))
return abs(hash_value) % shard_count
class RangeShardingAlgorithm(ShardingAlgorithm):
"""范围分片算法"""
def __init__(self, ranges: List[Tuple[int, int]]):
self.ranges = ranges
def get_shard_id(self, sharding_key: Any, shard_count: int) -> int:
for i, (min_val, max_val) in enumerate(self.ranges):
if min_val <= sharding_key <= max_val:
return i
return -1 # 无匹配分片
class TimeBasedShardingAlgorithm(ShardingAlgorithm):
"""基于时间的分片算法"""
def get_shard_id(self, sharding_key: datetime, shard_count: int) -> int:
# 按月分片
month = sharding_key.month
year = sharding_key.year
# 计算相对于基准时间的月份偏移
base_year = 2020
month_offset = (year - base_year) * 12 + (month - 1)
return month_offset % shard_count
3.1.2 分片配置示例
sharding-config.yaml
tables:
user_orders:
sharding_key: user_id
algorithm: hash
shard_count: 8
databases:
- db_0
- db_1
- db_2
- db_3
- db_4
- db_5
- db_6
- db_7
actual_tables:
- user_orders_0
- user_orders_1
- user_orders_2
- user_orders_3
- user_orders_4
- user_orders_5
- user_orders_6
- user_orders_7
product_info:
sharding_key: product_id
algorithm: range
ranges:
- [1, 1000000] # 分片0
- [1000001, 2000000] # 分片1
- [2000001, 3000000] # 分片2
databases:
- db_products_0
- db_products_1
- db_products_2
3.2 读写分离
3.2.1 读策略配置
class ReadStrategy:
"""读策略"""
MASTER_ONLY = "master_only" # 只读主库
SLAVE_ONLY = "slave_only" # 只读从库
MASTER_SLAVE = "master_slave" # 主从负载均衡
SLAVE_FIRST = "slave_first" # 优先从库,失败切主库
class ReadWriteSplitter:
"""读写分离器"""
def __init__(self, config: ReadWriteConfig):
self.config = config
self.master_pool = self._create_connection_pool(config.master)
self.slave_pools = [self._create_connection_pool(slave) for slave in config.slaves]
def get_connection(self, sql: str, is_write: bool = False) -> Connection:
"""获取数据库连接"""
if is_write or self._must_use_master(sql):
return self.master_pool.get_connection()
# 读操作,根据策略选择
strategy = self.config.read_strategy
if strategy == ReadStrategy.MASTER_ONLY:
return self.master_pool.get_connection()
elif strategy == ReadStrategy.SLAVE_ONLY:
return self._get_slave_connection()
elif strategy == ReadStrategy.MASTER_SLAVE:
# 负载均衡
if random.random() < 0.7: # 70%走从库
return self._get_slave_connection()
else:
return self.master_pool.get_connection()
elif strategy == ReadStrategy.SLAVE_FIRST:
try:
return self._get_slave_connection()
except Exception as e:
# 从库失败,切到主库
return self.master_pool.get_connection()
def _get_slave_connection(self) -> Connection:
"""获取从库连接"""
# 简单轮询
slave_pool = self.slave_pools[self.current_slave_index]
self.current_slave_index = (self.current_slave_index + 1) % len(self.slave_pools)
return slave_pool.get_connection()
def _must_use_master(self, sql: str) -> bool:
"""检查是否必须使用主库"""
upper_sql = sql.upper()
# 写操作
if any(keyword in upper_sql for keyword in ['INSERT', 'UPDATE', 'DELETE']):
return True
# 事务操作
if 'BEGIN' in upper_sql or 'COMMIT' in upper_sql or 'ROLLBACK' in upper_sql:
return True
# 包含LAST_INSERT_ID()等函数
if 'LAST_INSERT_ID()' in upper_sql:
return True
return False
3.3 分布式事务
3.3.1 2PC(两阶段提交)
sequenceDiagram
participant C as Coordinator(TDDL)
participant P1 as Participant(分片1)
participant P2 as Participant(分片2)
C->>P1: prepare(transaction_id)
C->>P2: prepare(transaction_id)
P1-->>C: ready
P2-->>C: ready
C->>P1: commit(transaction_id)
C->>P2: commit(transaction_id)
P1-->>C: committed
P2-->>C: committed
3.3.2 2PC实现
class TwoPhaseCommitCoordinator:
"""两阶段提交协调器"""
def __init__(self, participants: List[DatabaseParticipant]):
self.participants = participants
self.transactions = {} # 事务状态记录
def execute_transaction(self, transaction_id: str, operations: List[Operation]) -> bool:
"""执行分布式事务"""
try:
# 阶段1:准备阶段
prepare_results = []
for participant in self.participants:
result = participant.prepare(transaction_id, operations)
prepare_results.append(result)
# 检查所有参与者是否准备就绪
all_ready = all(result.status == 'ready' for result in prepare_results)
if not all_ready:
# 有参与者准备失败,回滚
self.rollback(transaction_id)
return False
# 阶段2:提交阶段
commit_results = []
for participant in self.participants:
result = participant.commit(transaction_id)
commit_results.append(result)
# 检查提交结果
all_committed = all(result.status == 'committed' for result in commit_results)
if not all_committed:
# 提交失败,需要人工干预
self._handle_commit_failure(transaction_id)
return False
return True
except Exception as e:
self.rollback(transaction_id)
return False
def rollback(self, transaction_id: str):
"""回滚事务"""
for participant in self.participants:
try:
participant.rollback(transaction_id)
except Exception as e:
# 记录回滚失败,需要人工干预
self._log_rollback_failure(transaction_id, participant)
四、TDDL开源替代方案
4.1 主流替代方案对比
特性
TDDL
ShardingSphere
MyCat
Vitess
开发语言
Java
Java
Java
Go
核心能力
分库分表+读写分离
分库分表+读写分离+分布式事务
分库分表+读写分离
分片+高可用+云原生
协议兼容
MySQL协议
多数据库协议
MySQL协议
MySQL协议
事务支持
2PC/XA
2PC/XA/Seata
弱事务支持
2PC
生态成熟度
高(淘宝内部)
高(Apache顶级项目)
中
高(CNCF项目)
部署复杂度
中
中
低
高
4.2 ShardingSphere(推荐)
4.2.1 架构对比
TDDL架构:
应用 → TDDL客户端 → TDDL服务端 → 数据库
ShardingSphere架构:
应用 → ShardingSphere-JDBC(嵌入式) → 数据库
↓
ShardingSphere-Proxy(独立服务) → 数据库
4.2.2 配置示例
shardingsphere-config.yaml
dataSources:
ds_0:
dataSourceClassName: com.zaxxer.hikari.HikariDataSource
jdbcUrl: jdbc:mysql://localhost:3306/ds_0
username: root
password: password
ds_1:
dataSourceClassName: com.zaxxer.hikari.HikariDataSource
jdbcUrl: jdbc:mysql://localhost:3306/ds_1
username: root
password: password
rules:
!SHARDING
tables:
t_order:actualDataNodes: ds_${0..1}.t_order_${0..1} tableStrategy: standard: shardingColumn: order_id shardingAlgorithmName: t_order_inline keyGenerateStrategy: column: order_id keyGeneratorName: snowflakebindingTables:
- t_order
defaultDatabaseStrategy:
none:
defaultTableStrategy:
none:
shardingAlgorithms:
t_order_inline:type: INLINE props: algorithm-expression: t_order_${order_id % 2}keyGenerators:
snowflake:type: SNOWFLAKE props: worker-id: 1234.2.3 使用示例
// Spring Boot集成
@SpringBootApplication
public class Application {public static void main(String[] args) {
SpringApplication.run(Application.class, args);}
}- t_order
// 业务代码
@Repository
public class OrderRepository {
@Autowired
private JdbcTemplate jdbcTemplate;
public void createOrder(Order order) {
String sql = "INSERT INTO t_order (order_id, user_id, amount) VALUES (?, ?, ?)";
jdbcTemplate.update(sql, order.getOrderId(), order.getUserId(), order.getAmount());
}
public List<Order> getOrdersByUserId(Long userId) {
String sql = "SELECT * FROM t_order WHERE user_id = ?";
return jdbcTemplate.query(sql, new OrderRowMapper(), userId);
}
}
4.3 MyCat(轻量级替代)
4.3.1 配置示例
druidparser
test
TESTDB
select user()
4.3.2 分片规则
user_id
mod-long
2
4.4 Vitess(云原生场景)
4.4.1 部署架构
vitess-operator.yaml
apiVersion: planetscale.com/v2
kind: VitessCluster
metadata:
name: example
spec:
images:
vtgate: vitess/vtgate:latest
vttablet: vitess/vttablet:latest
cells:
- name: zone1
gateway:
replicas: 3
keyspaces: - name: commerce
partitionings:- equal:
parts: 2
shardTemplate:
4.4.2 分片配置databaseInitScriptSecret: name: example-init tabletPools: - cell: zone1 type: primary replicas: 2 vttablet: extraFlags: db_charset: utf8mb4
-- 创建分片表
CREATE TABLE customers (
customer_id BIGINT,
name VARCHAR(100),
email VARCHAR(100),
PRIMARY KEY (customer_id)
) ENGINE=InnoDB;
- equal:
-- 配置分片
ALTER VSCHEMA ADD TABLE customers;
ALTER VSCHEMA ON customers ADD VINDEX hash(customer_id) USING hash;
五、TDDL最佳实践
5.1 分片设计原则
5.1.1 分片键选择
好的分片键
good_sharding_keys = [
'user_id', # 高基数,均匀分布
'order_id', # 业务主键
'tenant_id', # 多租户场景
'create_time' # 时间序列
]
差的分片键
bad_sharding_keys = [
'gender', # 基数低,分布不均
'status', # 状态字段,基数低
'is_deleted', # 布尔值,分布极不均匀
]
5.1.2 避免跨分片查询
-- 跨分片查询(性能差)
SELECT COUNT(*) FROM orders WHERE user_id IN (1, 2, 3, 4, 5);
-- 优化为单分片查询
SELECT COUNT() FROM orders WHERE user_id = 1;
SELECT COUNT() FROM orders WHERE user_id = 2;
-- ... 然后聚合结果
5.2 扩容策略
5.2.1 在线扩容方案
class OnlineExpansionService:
"""在线扩容服务"""
def expand_shards(self, new_shard_count: int):
"""扩容分片"""
# 1. 创建新分片数据库
new_databases = self._create_new_databases(new_shard_count)
# 2. 配置双写
self._enable_dual_write()
# 3. 数据迁移
self._migrate_data(new_shard_count)
# 4. 验证数据一致性
self._verify_data_consistency()
# 5. 切换流量
self._switch_traffic(new_shard_count)
# 6. 清理旧数据
self._cleanup_old_data()
def _migrate_data(self, new_shard_count: int):
"""数据迁移"""
# 使用一致性哈希减少数据迁移量
for old_shard_id in range(self.current_shard_count):
for record in self._get_records_from_shard(old_shard_id):
new_shard_id = self._calculate_new_shard_id(record.sharding_key, new_shard_count)
if new_shard_id != old_shard_id:
# 迁移数据
self._migrate_record(record, new_shard_id)
5.3 监控与运维
5.3.1 关键指标监控
class TDDLMonitor:
"""TDDL监控器"""
def __init__(self):
self.metrics = {}
def collect_metrics(self):
"""收集监控指标"""
metrics = {
# 连接池指标
'connection_pool_active': self._get_active_connections(),
'connection_pool_idle': self._get_idle_connections(),
'connection_pool_wait': self._get_waiting_connections(),
# 性能指标
'query_qps': self._get_query_qps(),
'query_latency': self._get_avg_latency(),
'error_rate': self._get_error_rate(),
# 分片指标
'shard_hit_rate': self._get_shard_hit_rate(),
'cross_shard_queries': self._get_cross_shard_queries(),
# 事务指标
'transaction_count': self._get_transaction_count(),
'transaction_timeout_rate': self._get_transaction_timeout_rate()
}
return metrics
def alert_rules(self):
"""告警规则"""
return {
'high_error_rate': {
'condition': 'error_rate > 0.05',
'message': 'TDDL错误率超过5%'
},
'high_latency': {
'condition': 'query_latency > 1000',
'message': '查询延迟超过1秒'
},
'connection_pool_exhausted': {
'condition': 'connection_pool_wait > 100',
'message': '连接池等待数超过100'
}
}
六、总结
6.1 TDDL核心价值
透明分片:应用无需感知底层分片细节
高可用性:自动故障转移和读写分离
线性扩展:支持水平扩容应对海量数据
兼容性:保持MySQL协议兼容,迁移成本低
6.2 选择建议
企业级场景:ShardingSphere(功能最全面)
云原生环境:Vitess(Kubernetes友好)
轻量级需求:MyCat(部署简单)
淘宝生态:TDDL(深度集成淘宝技术栈)
6.3 发展趋势
云原生:容器化部署和弹性扩缩容
智能优化:基于AI的SQL优化和路由决策
多模型支持:同时支持关系型和NoSQL数据
HTAP混合负载:支持OLTP和OLAP混合场景
通过本指南,你可以:
✅ 深入理解TDDL架构和核心原理
✅ 掌握分布式数据库中间件关键技术
✅ 选择合适的开源替代方案
✅ 设计高性能的分库分表架构
✅ 实施分布式数据库最佳实践