深入浅出:图解淘宝分布式数据库TDDL(及开源替代方案)

简介: TDDL(淘宝分布式数据层)是阿里自研的数据库中间件,支持分库分表、读写分离与分布式事务,历经多年演进并开源。其核心通过SQL解析、智能路由、重写与结果聚合,透明化处理海量数据与高并发,兼容MySQL协议,助力系统线性扩展与高可用。(239字)

一、TDDL概述
1.1 什么是TDDL?
TDDL(Taobao Distributed Data Layer)​ 是淘宝自主研发的分布式数据库中间件,是解决海量数据存储和高并发访问的核心技术架构。
1.2 核心定位
应用层

TDDL(SQL路由、分片、读写分离、事务协调)

物理数据库集群(MySQL/Oracle/PostgreSQL等)
1.3 发展历程
2008年:淘宝面临数据库瓶颈,开始研发TDDL
2010年:TDDL 1.0上线,支持基本分库分表
2013年:TDDL 2.0支持读写分离、分布式事务
2016年:TDDL 3.0(DRDS)支持弹性扩容、多租户
2020年:开源版本TDDL-Lite发布
二、TDDL核心架构
2.1 整体架构图
graph TB
subgraph "应用层"
A1[应用服务1]
A2[应用服务2]
A3[应用服务N]
end

subgraph "TDDL中间件层"
    B1[SQL解析器]
    B2[路由决策器]
    B3[SQL重写器]
    B4[结果聚合器]
    B5[事务管理器]
    B6[连接池管理器]
end

subgraph "数据库层"
    C1[主库1]
    C2[从库1]
    C3[主库2]
    C4[从库2]
    C5[主库N]
    C6[从库N]
end

A1 --> B1
A2 --> B1
A3 --> B1

B1 --> B2
B2 --> B3
B3 --> B6
B6 --> C1
B6 --> C2
B6 --> C3
B6 --> C4
B6 --> C5
B6 --> C6

B4 --> A1
B4 --> A2
B4 --> A3

2.2 核心组件详解

  1. SQL解析器
    class SQLParser:
    """SQL解析器"""

    def parse(self, sql: str) -> ParsedSQL:

     """
     解析SQL语句
    
     Args:
         sql: SQL语句
    
     Returns:
         解析后的SQL对象
     """
     # 1. 词法分析
     tokens = self._lexical_analysis(sql)
    
     # 2. 语法分析
     ast = self._syntax_analysis(tokens)
    
     # 3. 语义分析
     parsed_sql = self._semantic_analysis(ast)
    
     return parsed_sql
    

    def _lexical_analysis(self, sql: str) -> List[Token]:

     """词法分析"""
     # 识别关键字、标识符、运算符等
     tokens = []
     # 实现细节...
     return tokens
    

    def _syntax_analysis(self, tokens: List[Token]) -> ASTNode:

     """语法分析"""
     # 构建抽象语法树
     root = ASTNode(type='SELECT')
     # 实现细节...
     return root
    

    def _semantic_analysis(self, ast: ASTNode) -> ParsedSQL:

     """语义分析"""
     parsed = ParsedSQL()
    
     # 提取表名
     parsed.table_names = self._extract_table_names(ast)
    
     # 提取查询条件
     parsed.where_conditions = self._extract_where_conditions(ast)
    
     # 提取聚合函数
     parsed.aggregate_functions = self._extract_aggregate_functions(ast)
    
     # 判断是否跨分片
     parsed.is_cross_shard = self._check_cross_shard(parsed)
    
     return parsed
    
  2. 路由决策器
    class Router:
    """路由决策器"""

    def init(self, sharding_config: ShardingConfig):

     self.config = sharding_config
    

    def route(self, parsed_sql: ParsedSQL) -> List[RouteResult]:

     """
     路由决策
    
     Args:
         parsed_sql: 解析后的SQL
    
     Returns:
         路由结果列表
     """
     results = []
    
     # 单表查询
     if len(parsed_sql.table_names) == 1:
         table_name = parsed_sql.table_names[0]
         sharding_key = self._extract_sharding_key(parsed_sql)
    
         if sharding_key is not None:
             # 根据分片键计算目标分片
             shard_id = self._calculate_shard_id(sharding_key)
             database = self._get_database_by_shard(shard_id)
    
             results.append(RouteResult(
                 database=database,
                 table_suffix=f"_{shard_id}",
                 sql=parsed_sql.original_sql
             ))
         else:
             # 全分片扫描
             for shard_id in range(self.config.shard_count):
                 database = self._get_database_by_shard(shard_id)
                 results.append(RouteResult(
                     database=database,
                     table_suffix=f"_{shard_id}",
                     sql=self._rewrite_table_name(parsed_sql, shard_id)
                 ))
    
     # 多表JOIN(复杂路由)
     elif len(parsed_sql.table_names) > 1:
         # 检查是否同库JOIN
         if self._is_same_shard_join(parsed_sql):
             # 同库JOIN优化
             results = self._route_same_shard_join(parsed_sql)
         else:
             # 跨库JOIN,需要特殊处理
             results = self._route_cross_shard_join(parsed_sql)
    
     return results
    

    def _extract_sharding_key(self, parsed_sql: ParsedSQL) -> Optional[str]:

     """提取分片键"""
     # 从WHERE条件中提取分片键
     for condition in parsed_sql.where_conditions:
         if condition.column == self.config.sharding_key:
             return condition.value
    
     return None
    

    def _calculate_shard_id(self, sharding_key: str) -> int:

     """计算分片ID"""
     # 哈希分片算法
     hash_value = hash(sharding_key)
     return hash_value % self.config.shard_count
    
  3. SQL重写器
    class SQLRewriter:
    """SQL重写器"""

    def rewrite(self, sql: str, route_result: RouteResult) -> str:

     """
     重写SQL语句
    
     Args:
         sql: 原始SQL
         route_result: 路由结果
    
     Returns:
         重写后的SQL
     """
     # 替换表名
     original_table = route_result.original_table
     new_table = f"{original_table}{route_result.table_suffix}"
     sql = sql.replace(original_table, new_table)
    
     # 处理分页
     if "LIMIT" in sql.upper():
         sql = self._rewrite_limit(sql, route_result)
    
     # 处理聚合函数
     if any(func in sql.upper() for func in ['COUNT', 'SUM', 'AVG']):
         sql = self._rewrite_aggregate(sql, route_result)
    
     return sql
    

    def _rewrite_limit(self, sql: str, route_result: RouteResult) -> str:

     """重写LIMIT子句"""
     # 解析LIMIT
     import re
     match = re.search(r'LIMIT\s+(\d+)(?:\s*,\s*(\d+))?', sql, re.IGNORECASE)
    
     if match:
         if match.group(2):  # LIMIT offset, count
             offset = int(match.group(1))
             count = int(match.group(2))
    
             # 分布式LIMIT需要特殊处理
             if route_result.is_cross_shard:
                 # 每个分片需要计算自己的LIMIT
                 new_limit = f"LIMIT {offset + count}"
                 sql = re.sub(r'LIMIT\s+\d+\s*,\s*\d+', new_limit, sql, flags=re.IGNORECASE)
    
     return sql
    
  4. 结果聚合器
    class ResultAggregator:
    """结果聚合器"""

    def aggregate(self, results: List[QueryResult]) -> QueryResult:

     """
     聚合多个分片的查询结果
    
     Args:
         results: 分片查询结果列表
    
     Returns:
         聚合后的结果
     """
     if not results:
         return QueryResult(rows=[], columns=[])
    
     # 简单查询(无聚合函数)
     if self._is_simple_query(results):
         return self._aggregate_simple_results(results)
    
     # 聚合查询
     elif self._is_aggregate_query(results):
         return self._aggregate_aggregate_results(results)
    
     # 跨分片JOIN
     elif self._is_join_query(results):
         return self._aggregate_join_results(results)
    
     else:
         raise Exception("不支持的结果聚合类型")
    

    def _aggregate_simple_results(self, results: List[QueryResult]) -> QueryResult:

     """聚合简单查询结果"""
     aggregated_rows = []
    
     for result in results:
         aggregated_rows.extend(result.rows)
    
     # 排序(如果原查询有ORDER BY)
     if hasattr(results[0], 'sort_key'):
         aggregated_rows.sort(key=lambda row: row[results[0].sort_key])
    
     return QueryResult(rows=aggregated_rows, columns=results[0].columns)
    

    def _aggregate_aggregate_results(self, results: List[QueryResult]) -> QueryResult:

     """聚合聚合查询结果"""
     # 处理COUNT
     count = 0
     for result in results:
         if result.rows and len(result.rows[0]) > 0:
             count += result.rows[0][0]  # 假设第一列是COUNT
    
     # 处理SUM
     sum_value = 0
     for result in results:
         if result.rows and len(result.rows[0]) > 1:
             sum_value += result.rows[0][1]
    
     # 处理AVG
     avg_value = sum_value / count if count > 0 else 0
    
     return QueryResult(
         rows=[[count, sum_value, avg_value]],
         columns=['count', 'sum', 'avg']
     )
    

    三、TDDL核心特性详解
    3.1 分库分表策略
    3.1.1 分片算法
    class ShardingAlgorithm:
    """分片算法基类"""

    def get_shard_id(self, sharding_key: Any, shard_count: int) -> int:

     """获取分片ID"""
     raise NotImplementedError
    

class HashShardingAlgorithm(ShardingAlgorithm):
"""哈希分片算法"""

def get_shard_id(self, sharding_key: Any, shard_count: int) -> int:
    hash_value = hash(str(sharding_key))
    return abs(hash_value) % shard_count

class RangeShardingAlgorithm(ShardingAlgorithm):
"""范围分片算法"""

def __init__(self, ranges: List[Tuple[int, int]]):
    self.ranges = ranges

def get_shard_id(self, sharding_key: Any, shard_count: int) -> int:
    for i, (min_val, max_val) in enumerate(self.ranges):
        if min_val <= sharding_key <= max_val:
            return i
    return -1  # 无匹配分片

class TimeBasedShardingAlgorithm(ShardingAlgorithm):
"""基于时间的分片算法"""

def get_shard_id(self, sharding_key: datetime, shard_count: int) -> int:
    # 按月分片
    month = sharding_key.month
    year = sharding_key.year

    # 计算相对于基准时间的月份偏移
    base_year = 2020
    month_offset = (year - base_year) * 12 + (month - 1)

    return month_offset % shard_count

3.1.2 分片配置示例

sharding-config.yaml

tables:
user_orders:
sharding_key: user_id
algorithm: hash
shard_count: 8
databases:

  - db_0
  - db_1
  - db_2
  - db_3
  - db_4
  - db_5
  - db_6
  - db_7
actual_tables:
  - user_orders_0
  - user_orders_1
  - user_orders_2
  - user_orders_3
  - user_orders_4
  - user_orders_5
  - user_orders_6
  - user_orders_7

product_info:
sharding_key: product_id
algorithm: range
ranges:

  - [1, 1000000]    # 分片0
  - [1000001, 2000000]  # 分片1
  - [2000001, 3000000]  # 分片2
databases:
  - db_products_0
  - db_products_1
  - db_products_2

3.2 读写分离
3.2.1 读策略配置
class ReadStrategy:
"""读策略"""

MASTER_ONLY = "master_only"      # 只读主库
SLAVE_ONLY = "slave_only"        # 只读从库
MASTER_SLAVE = "master_slave"   # 主从负载均衡
SLAVE_FIRST = "slave_first"     # 优先从库,失败切主库

class ReadWriteSplitter:
"""读写分离器"""

def __init__(self, config: ReadWriteConfig):
    self.config = config
    self.master_pool = self._create_connection_pool(config.master)
    self.slave_pools = [self._create_connection_pool(slave) for slave in config.slaves]

def get_connection(self, sql: str, is_write: bool = False) -> Connection:
    """获取数据库连接"""
    if is_write or self._must_use_master(sql):
        return self.master_pool.get_connection()

    # 读操作,根据策略选择
    strategy = self.config.read_strategy

    if strategy == ReadStrategy.MASTER_ONLY:
        return self.master_pool.get_connection()

    elif strategy == ReadStrategy.SLAVE_ONLY:
        return self._get_slave_connection()

    elif strategy == ReadStrategy.MASTER_SLAVE:
        # 负载均衡
        if random.random() < 0.7:  # 70%走从库
            return self._get_slave_connection()
        else:
            return self.master_pool.get_connection()

    elif strategy == ReadStrategy.SLAVE_FIRST:
        try:
            return self._get_slave_connection()
        except Exception as e:
            # 从库失败,切到主库
            return self.master_pool.get_connection()

def _get_slave_connection(self) -> Connection:
    """获取从库连接"""
    # 简单轮询
    slave_pool = self.slave_pools[self.current_slave_index]
    self.current_slave_index = (self.current_slave_index + 1) % len(self.slave_pools)
    return slave_pool.get_connection()

def _must_use_master(self, sql: str) -> bool:
    """检查是否必须使用主库"""
    upper_sql = sql.upper()

    # 写操作
    if any(keyword in upper_sql for keyword in ['INSERT', 'UPDATE', 'DELETE']):
        return True

    # 事务操作
    if 'BEGIN' in upper_sql or 'COMMIT' in upper_sql or 'ROLLBACK' in upper_sql:
        return True

    # 包含LAST_INSERT_ID()等函数
    if 'LAST_INSERT_ID()' in upper_sql:
        return True

    return False

3.3 分布式事务
3.3.1 2PC(两阶段提交)
sequenceDiagram
participant C as Coordinator(TDDL)
participant P1 as Participant(分片1)
participant P2 as Participant(分片2)

C->>P1: prepare(transaction_id)
C->>P2: prepare(transaction_id)

P1-->>C: ready
P2-->>C: ready

C->>P1: commit(transaction_id)
C->>P2: commit(transaction_id)

P1-->>C: committed
P2-->>C: committed

3.3.2 2PC实现
class TwoPhaseCommitCoordinator:
"""两阶段提交协调器"""

def __init__(self, participants: List[DatabaseParticipant]):
    self.participants = participants
    self.transactions = {}  # 事务状态记录

def execute_transaction(self, transaction_id: str, operations: List[Operation]) -> bool:
    """执行分布式事务"""
    try:
        # 阶段1:准备阶段
        prepare_results = []
        for participant in self.participants:
            result = participant.prepare(transaction_id, operations)
            prepare_results.append(result)

        # 检查所有参与者是否准备就绪
        all_ready = all(result.status == 'ready' for result in prepare_results)

        if not all_ready:
            # 有参与者准备失败,回滚
            self.rollback(transaction_id)
            return False

        # 阶段2:提交阶段
        commit_results = []
        for participant in self.participants:
            result = participant.commit(transaction_id)
            commit_results.append(result)

        # 检查提交结果
        all_committed = all(result.status == 'committed' for result in commit_results)

        if not all_committed:
            # 提交失败,需要人工干预
            self._handle_commit_failure(transaction_id)
            return False

        return True

    except Exception as e:
        self.rollback(transaction_id)
        return False

def rollback(self, transaction_id: str):
    """回滚事务"""
    for participant in self.participants:
        try:
            participant.rollback(transaction_id)
        except Exception as e:
            # 记录回滚失败,需要人工干预
            self._log_rollback_failure(transaction_id, participant)

四、TDDL开源替代方案
4.1 主流替代方案对比
特性

TDDL

ShardingSphere

MyCat

Vitess

开发语言​

Java

Java

Java

Go

核心能力​

分库分表+读写分离

分库分表+读写分离+分布式事务

分库分表+读写分离

分片+高可用+云原生

协议兼容​

MySQL协议

多数据库协议

MySQL协议

MySQL协议

事务支持​

2PC/XA

2PC/XA/Seata

弱事务支持

2PC

生态成熟度​

高(淘宝内部)

高(Apache顶级项目)

高(CNCF项目)

部署复杂度​


4.2 ShardingSphere(推荐)
4.2.1 架构对比
TDDL架构:
应用 → TDDL客户端 → TDDL服务端 → 数据库

ShardingSphere架构:
应用 → ShardingSphere-JDBC(嵌入式) → 数据库

ShardingSphere-Proxy(独立服务) → 数据库
4.2.2 配置示例

shardingsphere-config.yaml

dataSources:
ds_0:
dataSourceClassName: com.zaxxer.hikari.HikariDataSource
jdbcUrl: jdbc:mysql://localhost:3306/ds_0
username: root
password: password
ds_1:
dataSourceClassName: com.zaxxer.hikari.HikariDataSource
jdbcUrl: jdbc:mysql://localhost:3306/ds_1
username: root
password: password

rules:

  • !SHARDING
    tables:
    t_order:

    actualDataNodes: ds_${0..1}.t_order_${0..1}
    tableStrategy:
      standard:
        shardingColumn: order_id
        shardingAlgorithmName: t_order_inline
    keyGenerateStrategy:
      column: order_id
      keyGeneratorName: snowflake
    

    bindingTables:

    • t_order
      defaultDatabaseStrategy:
      none:
      defaultTableStrategy:
      none:

    shardingAlgorithms:
    t_order_inline:

    type: INLINE
    props:
      algorithm-expression: t_order_${order_id % 2}
    

    keyGenerators:
    snowflake:

    type: SNOWFLAKE
    props:
      worker-id: 123
    

    4.2.3 使用示例
    // Spring Boot集成
    @SpringBootApplication
    public class Application {

    public static void main(String[] args) {

      SpringApplication.run(Application.class, args);
    

    }
    }

// 业务代码
@Repository
public class OrderRepository {

@Autowired
private JdbcTemplate jdbcTemplate;

public void createOrder(Order order) {
    String sql = "INSERT INTO t_order (order_id, user_id, amount) VALUES (?, ?, ?)";
    jdbcTemplate.update(sql, order.getOrderId(), order.getUserId(), order.getAmount());
}

public List<Order> getOrdersByUserId(Long userId) {
    String sql = "SELECT * FROM t_order WHERE user_id = ?";
    return jdbcTemplate.query(sql, new OrderRowMapper(), userId);
}

}
4.3 MyCat(轻量级替代)
4.3.1 配置示例


druidparser

test
TESTDB



select user()


4.3.2 分片规则



user_id
mod-long


2

4.4 Vitess(云原生场景)
4.4.1 部署架构

vitess-operator.yaml

apiVersion: planetscale.com/v2
kind: VitessCluster
metadata:
name: example
spec:
images:
vtgate: vitess/vtgate:latest
vttablet: vitess/vttablet:latest
cells:

  • name: zone1
    gateway:
    replicas: 3
    keyspaces:
  • name: commerce
    partitionings:
    • equal:
      parts: 2
      shardTemplate:
      databaseInitScriptSecret:
        name: example-init
      tabletPools:
      - cell: zone1
        type: primary
        replicas: 2
        vttablet:
          extraFlags:
            db_charset: utf8mb4
      
      4.4.2 分片配置
      -- 创建分片表
      CREATE TABLE customers (
      customer_id BIGINT,
      name VARCHAR(100),
      email VARCHAR(100),
      PRIMARY KEY (customer_id)
      ) ENGINE=InnoDB;

-- 配置分片
ALTER VSCHEMA ADD TABLE customers;
ALTER VSCHEMA ON customers ADD VINDEX hash(customer_id) USING hash;
五、TDDL最佳实践
5.1 分片设计原则
5.1.1 分片键选择

好的分片键

good_sharding_keys = [
'user_id', # 高基数,均匀分布
'order_id', # 业务主键
'tenant_id', # 多租户场景
'create_time' # 时间序列
]

差的分片键

bad_sharding_keys = [
'gender', # 基数低,分布不均
'status', # 状态字段,基数低
'is_deleted', # 布尔值,分布极不均匀
]
5.1.2 避免跨分片查询
-- 跨分片查询(性能差)
SELECT COUNT(*) FROM orders WHERE user_id IN (1, 2, 3, 4, 5);

-- 优化为单分片查询
SELECT COUNT() FROM orders WHERE user_id = 1;
SELECT COUNT(
) FROM orders WHERE user_id = 2;
-- ... 然后聚合结果
5.2 扩容策略
5.2.1 在线扩容方案
class OnlineExpansionService:
"""在线扩容服务"""

def expand_shards(self, new_shard_count: int):
    """扩容分片"""
    # 1. 创建新分片数据库
    new_databases = self._create_new_databases(new_shard_count)

    # 2. 配置双写
    self._enable_dual_write()

    # 3. 数据迁移
    self._migrate_data(new_shard_count)

    # 4. 验证数据一致性
    self._verify_data_consistency()

    # 5. 切换流量
    self._switch_traffic(new_shard_count)

    # 6. 清理旧数据
    self._cleanup_old_data()

def _migrate_data(self, new_shard_count: int):
    """数据迁移"""
    # 使用一致性哈希减少数据迁移量
    for old_shard_id in range(self.current_shard_count):
        for record in self._get_records_from_shard(old_shard_id):
            new_shard_id = self._calculate_new_shard_id(record.sharding_key, new_shard_count)

            if new_shard_id != old_shard_id:
                # 迁移数据
                self._migrate_record(record, new_shard_id)

5.3 监控与运维
5.3.1 关键指标监控
class TDDLMonitor:
"""TDDL监控器"""

def __init__(self):
    self.metrics = {}

def collect_metrics(self):
    """收集监控指标"""
    metrics = {
        # 连接池指标
        'connection_pool_active': self._get_active_connections(),
        'connection_pool_idle': self._get_idle_connections(),
        'connection_pool_wait': self._get_waiting_connections(),

        # 性能指标
        'query_qps': self._get_query_qps(),
        'query_latency': self._get_avg_latency(),
        'error_rate': self._get_error_rate(),

        # 分片指标
        'shard_hit_rate': self._get_shard_hit_rate(),
        'cross_shard_queries': self._get_cross_shard_queries(),

        # 事务指标
        'transaction_count': self._get_transaction_count(),
        'transaction_timeout_rate': self._get_transaction_timeout_rate()
    }

    return metrics

def alert_rules(self):
    """告警规则"""
    return {
        'high_error_rate': {
            'condition': 'error_rate > 0.05',
            'message': 'TDDL错误率超过5%'
        },
        'high_latency': {
            'condition': 'query_latency > 1000',
            'message': '查询延迟超过1秒'
        },
        'connection_pool_exhausted': {
            'condition': 'connection_pool_wait > 100',
            'message': '连接池等待数超过100'
        }
    }

六、总结
6.1 TDDL核心价值
透明分片:应用无需感知底层分片细节
高可用性:自动故障转移和读写分离
线性扩展:支持水平扩容应对海量数据
兼容性:保持MySQL协议兼容,迁移成本低
6.2 选择建议
企业级场景:ShardingSphere(功能最全面)
云原生环境:Vitess(Kubernetes友好)
轻量级需求:MyCat(部署简单)
淘宝生态:TDDL(深度集成淘宝技术栈)
6.3 发展趋势
云原生:容器化部署和弹性扩缩容
智能优化:基于AI的SQL优化和路由决策
多模型支持:同时支持关系型和NoSQL数据
HTAP混合负载:支持OLTP和OLAP混合场景
通过本指南,你可以:
✅ 深入理解TDDL架构和核心原理
✅ 掌握分布式数据库中间件关键技术
✅ 选择合适的开源替代方案
✅ 设计高性能的分库分表架构
✅ 实施分布式数据库最佳实践

相关文章
|
9天前
|
人工智能 自然语言处理 Shell
🦞 如何在 OpenClaw (Clawdbot/Moltbot) 配置阿里云百炼 API
本教程指导用户在开源AI助手Clawdbot中集成阿里云百炼API,涵盖安装Clawdbot、获取百炼API Key、配置环境变量与模型参数、验证调用等完整流程,支持Qwen3-max thinking (Qwen3-Max-2026-01-23)/Qwen - Plus等主流模型,助力本地化智能自动化。
🦞 如何在 OpenClaw (Clawdbot/Moltbot) 配置阿里云百炼 API
|
6天前
|
人工智能 机器人 Linux
保姆级 OpenClaw (原 Clawdbot)飞书对接教程 手把手教你搭建 AI 助手
OpenClaw(原Clawdbot)是一款开源本地AI智能体,支持飞书等多平台对接。本教程手把手教你Linux下部署,实现数据私有、系统控制、网页浏览与代码编写,全程保姆级操作,240字内搞定专属AI助手搭建!
4234 13
保姆级 OpenClaw (原 Clawdbot)飞书对接教程 手把手教你搭建 AI 助手
|
5天前
|
人工智能 安全 机器人
OpenClaw(原 Clawdbot)钉钉对接保姆级教程 手把手教你打造自己的 AI 助手
OpenClaw(原Clawdbot)是一款开源本地AI助手,支持钉钉、飞书等多平台接入。本教程手把手指导Linux下部署与钉钉机器人对接,涵盖环境配置、模型选择(如Qwen)、权限设置及调试,助你快速打造私有、安全、高权限的专属AI助理。(239字)
3359 9
OpenClaw(原 Clawdbot)钉钉对接保姆级教程 手把手教你打造自己的 AI 助手
|
8天前
|
人工智能 JavaScript 应用服务中间件
零门槛部署本地AI助手:Windows系统Moltbot(Clawdbot)保姆级教程
Moltbot(原Clawdbot)是一款功能全面的智能体AI助手,不仅能通过聊天互动响应需求,还具备“动手”和“跑腿”能力——“手”可读写本地文件、执行代码、操控命令行,“脚”能联网搜索、访问网页并分析内容,“大脑”则可接入Qwen、OpenAI等云端API,或利用本地GPU运行模型。本教程专为Windows系统用户打造,从环境搭建到问题排查,详细拆解全流程,即使无技术基础也能顺利部署本地AI助理。
6894 15
|
6天前
|
存储 人工智能 机器人
OpenClaw是什么?阿里云OpenClaw(原Clawdbot/Moltbot)一键部署官方教程参考
OpenClaw是什么?OpenClaw(原Clawdbot/Moltbot)是一款实用的个人AI助理,能够24小时响应指令并执行任务,如处理文件、查询信息、自动化协同等。阿里云推出的OpenClaw一键部署方案,简化了复杂配置流程,用户无需专业技术储备,即可快速在轻量应用服务器上启用该服务,打造专属AI助理。本文将详细拆解部署全流程、进阶功能配置及常见问题解决方案,确保不改变原意且无营销表述。
4468 4
|
3天前
|
人工智能 机器人 Linux
OpenClaw(Clawdbot、Moltbot)汉化版部署教程指南(零门槛)
OpenClaw作为2026年GitHub上增长最快的开源项目之一,一周内Stars从7800飙升至12万+,其核心优势在于打破传统聊天机器人的局限,能真正执行读写文件、运行脚本、浏览器自动化等实操任务。但原版全英文界面对中文用户存在上手门槛,汉化版通过覆盖命令行(CLI)与网页控制台(Dashboard)核心模块,解决了语言障碍,同时保持与官方版本的实时同步,确保新功能最快1小时内可用。本文将详细拆解汉化版OpenClaw的搭建流程,涵盖本地安装、Docker部署、服务器远程访问等场景,同时提供环境适配、问题排查与国内应用集成方案,助力中文用户高效搭建专属AI助手。
2275 5
|
8天前
|
人工智能 JavaScript API
零门槛部署本地 AI 助手:Clawdbot/Meltbot 部署深度保姆级教程
Clawdbot(Moltbot)是一款智能体AI助手,具备“手”(读写文件、执行代码)、“脚”(联网搜索、分析网页)和“脑”(接入Qwen/OpenAI等API或本地GPU模型)。本指南详解Windows下从Node.js环境搭建、一键安装到Token配置的全流程,助你快速部署本地AI助理。(239字)
4537 23
|
14天前
|
人工智能 API 开发者
Claude Code 国内保姆级使用指南:实测 GLM-4.7 与 Claude Opus 4.5 全方案解
Claude Code是Anthropic推出的编程AI代理工具。2026年国内开发者可通过配置`ANTHROPIC_BASE_URL`实现本地化接入:①极速平替——用Qwen Code v0.5.0或GLM-4.7,毫秒响应,适合日常编码;②满血原版——经灵芽API中转调用Claude Opus 4.5,胜任复杂架构与深度推理。
8308 12