实战!用Scrapy+Flask构建京东商品比价微信机器人

简介: 本项目是一个基于微信的京东智能比价机器人,集成Scrapy+Selenium爬虫、Flask API与itchat/Wechaty微信服务,支持商品搜索、实时比价、30天价格趋势分析、降价自动预警及收藏管理,全栈可部署(Docker+Nginx),助力用户省钱决策。(239字)

一、项目概述
1.1 项目简介
本项目将构建一个智能微信比价机器人,能够:
✅ 实时抓取:用Scrapy抓取京东商品价格信息
✅ 智能比价:分析商品历史价格趋势
✅ 微信交互:通过Flask接收用户查询并返回结果
✅ 定时监控:自动监控商品价格变化
✅ 价格预警:低于目标价格时自动提醒
1.2 技术栈
爬虫框架:Scrapy + Selenium(动态渲染)
Web框架:Flask + RESTful API
微信集成:itchat / Wechaty
数据存储:Redis + MySQL
任务调度:APScheduler
部署运维:Docker + Nginx
1.3 核心功能
🛍️ 商品搜索:通过微信发送商品名,返回比价结果
📊 价格趋势:展示商品30天价格走势
🔔 降价提醒:设置目标价,降价自动通知
⭐ 收藏管理:收藏关注商品,一键查询
📈 数据分析:智能推荐最佳购买时机
二、环境配置
2.1 项目结构
jd_price_bot/
├── spiders/ # Scrapy爬虫
│ ├── jd_spider.py # 京东商品爬虫
│ ├── price_monitor.py # 价格监控爬虫
│ └── middlewares.py # 中间件
├── web/ # Flask应用
│ ├── app.py # Flask主应用
│ ├── routes/ # 路由模块
│ ├── models/ # 数据模型
│ └── templates/ # 模板文件
├── services/ # 业务服务
│ ├── wechat_service.py # 微信服务
│ ├── price_service.py # 价格服务
│ └── notification_service.py # 通知服务
├── utils/ # 工具类
│ ├── database.py # 数据库连接
│ ├── cache.py # 缓存工具
│ └── config.py # 配置管理
├── requirements.txt # 依赖包
├── docker-compose.yml # Docker配置
└── README.md
2.2 环境配置

requirements.txt

Web框架

Flask==2.3.3
flask-restful==0.3.10
flask-sqlalchemy==3.0.5
flask-apscheduler==1.12.4

爬虫相关

Scrapy==2.11.0
selenium==4.15.0
requests==2.31.0
beautifulsoup4==4.12.2
lxml==4.9.3

微信相关

itchat==1.3.10
wechaty==0.8.21

数据库

redis==5.0.1
mysql-connector-python==8.1.0
sqlalchemy==2.0.23

数据处理

pandas==2.1.3
numpy==1.25.2
matplotlib==3.7.2

工具类

python-dotenv==1.0.0
APScheduler==3.10.4
schedule==1.2.0
2.3 配置文件

utils/config.py

import os
from dotenv import load_dotenv

load_dotenv()

class Config:

# 基础配置
SECRET_KEY = os.getenv('SECRET_KEY', 'jd-price-bot-secret-key')
DEBUG = os.getenv('DEBUG', 'False').lower() == 'true'

# 数据库配置
REDIS_URL = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
MYSQL_HOST = os.getenv('MYSQL_HOST', 'localhost')
MYSQL_PORT = os.getenv('MYSQL_PORT', '3306')
MYSQL_USER = os.getenv('MYSQL_USER', 'root')
MYSQL_PASSWORD = os.getenv('MYSQL_PASSWORD', 'password')
MYSQL_DATABASE = os.getenv('MYSQL_DATABASE', 'jd_price_bot')

# 微信配置
WECHAT_QR_CODE_PATH = os.getenv('WECHAT_QR_CODE_PATH', './qr_code.png')
WECHAT_AUTO_LOGIN = os.getenv('WECHAT_AUTO_LOGIN', 'True').lower() == 'true'

# 爬虫配置
JD_SEARCH_URL = 'https://search.jd.com/Search'
JD_PRODUCT_URL = 'https://item.jd.com/{}.html'
SPIDER_DELAY = int(os.getenv('SPIDER_DELAY', '2'))
MAX_PRODUCTS_PER_SEARCH = int(os.getenv('MAX_PRODUCTS_PER_SEARCH', '20'))

# 价格监控配置
PRICE_CHECK_INTERVAL = int(os.getenv('PRICE_CHECK_INTERVAL', '3600'))  # 1小时
PRICE_HISTORY_DAYS = int(os.getenv('PRICE_HISTORY_DAYS', '30'))

# 通知配置
PRICE_DROP_THRESHOLD = float(os.getenv('PRICE_DROP_THRESHOLD', '0.1'))  # 10%降价
ENABLE_PRICE_ALERT = os.getenv('ENABLE_PRICE_ALERT', 'True').lower() == 'true'

# 安全配置
USER_WHITELIST = os.getenv('USER_WHITELIST', '').split(',')  # 用户白名单
MAX_REQUESTS_PER_MINUTE = int(os.getenv('MAX_REQUESTS_PER_MINUTE', '30'))

# 日志配置
LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO')
LOG_FILE = os.getenv('LOG_FILE', './logs/jd_bot.log')

三、核心模块实现
3.1 京东爬虫实现(Scrapy)
3.1.1 商品搜索爬虫

spiders/jd_spider.py

import scrapy
import json
import re
from urllib.parse import quote, urlencode
from typing import Dict, List, Optional
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
import time

class JDSpider(scrapy.Spider):
name = 'jd_spider'
allowed_domains = ['jd.com', 'search.jd.com']

def __init__(self, keyword: str = None, max_results: int = 20, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.keyword = keyword
    self.max_results = max_results
    self.driver = None
    self.setup_driver()

def setup_driver(self):
    """设置Selenium WebDriver"""
    options = webdriver.ChromeOptions()
    options.add_argument('--headless')
    options.add_argument('--no-sandbox')
    options.add_argument('--disable-dev-shm-usage')
    options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36')

    self.driver = webdriver.Chrome(options=options)
    self.driver.implicitly_wait(10)

def start_requests(self):
    """开始请求"""
    if self.keyword:
        url = self.build_search_url(self.keyword)
        yield scrapy.Request(url=url, callback=self.parse_search_results)

def build_search_url(self, keyword: str) -> str:
    """构建搜索URL"""
    params = {
        'keyword': keyword,
        'enc': 'utf-8',
        'wq': keyword,
        'pvid': self.generate_pvid()
    }
    return f"https://search.jd.com/Search?{urlencode(params)}"

def generate_pvid(self) -> str:
    """生成随机pvid"""
    import uuid
    return str(uuid.uuid4()).replace('-', '')[:16]

def parse_search_results(self, response):
    """解析搜索结果页"""
    try:
        # 等待页面加载完成
        WebDriverWait(self.driver, 10).until(
            EC.presence_of_element_located((By.CSS_SELECTOR, ".gl-item"))
        )

        # 获取商品列表
        products = self.driver.find_elements(By.CSS_SELECTOR, ".gl-item")
        results = []

        for i, product in enumerate(products[:self.max_results]):
            try:
                product_info = self.extract_product_info(product)
                if product_info:
                    results.append(product_info)
                    self.logger.info(f"提取商品: {product_info['title'][:30]}")
            except Exception as e:
                self.logger.error(f"提取商品信息失败: {e}")
                continue

        # 返回结果
        yield {
            'keyword': self.keyword,
            'results': results,
            'count': len(results)
        }

    except TimeoutException:
        self.logger.error("搜索页面加载超时")
    finally:
        self.close_driver()

def extract_product_info(self, product_element) -> Optional[Dict]:
    """提取商品信息"""
    try:
        # 商品ID
        data_sku = product_element.get_attribute('data-sku')
        if not data_sku:
            return None

        # 商品标题
        title_element = product_element.find_element(By.CSS_SELECTOR, ".p-name a em")
        title = title_element.text.strip() if title_element else ""

        # 价格
        price_element = product_element.find_element(By.CSS_SELECTOR, ".p-price strong i")
        price = float(price_element.text) if price_element and price_element.text else 0.0

        # 店铺
        shop_element = product_element.find_element(By.CSS_SELECTOR, ".p-shop a")
        shop = shop_element.text.strip() if shop_element else ""

        # 评论数
        comment_element = product_element.find_element(By.CSS_SELECTOR, ".p-commit a")
        comment_text = comment_element.text.strip() if comment_element else ""
        comment_count = self.extract_comment_count(comment_text)

        # 商品链接
        link_element = product_element.find_element(By.CSS_SELECTOR, ".p-name a")
        link = link_element.get_attribute('href') if link_element else ""

        # 图片
        img_element = product_element.find_element(By.CSS_SELECTOR, ".p-img img")
        image_url = img_element.get_attribute('src') or img_element.get_attribute('data-lazy-img')

        return {
            'product_id': data_sku,
            'title': title,
            'price': price,
            'shop': shop,
            'comment_count': comment_count,
            'link': link,
            'image_url': image_url,
            'crawl_time': int(time.time())
        }

    except Exception as e:
        self.logger.error(f"提取商品信息异常: {e}")
        return None

def extract_comment_count(self, comment_text: str) -> int:
    """提取评论数量"""
    if not comment_text:
        return 0

    # 匹配数字
    match = re.search(r'(\d+(\.\d+)?)', comment_text.replace('+', '').replace('万', '0000'))
    if match:
        count = float(match.group(1))
        if '万' in comment_text:
            count *= 10000
        return int(count)
    return 0

def close_driver(self):
    """关闭浏览器"""
    if self.driver:
        self.driver.quit()

def closed(self, reason):
    """爬虫关闭时清理资源"""
    self.close_driver()
    super().closed(reason)

价格详情爬虫

class JDPriceSpider(scrapy.Spider):
name = 'jd_price_spider'

def __init__(self, product_ids: List[str] = None, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.product_ids = product_ids or []

def start_requests(self):
    """开始请求"""
    for product_id in self.product_ids:
        url = f"https://item.jd.com/{product_id}.html"
        yield scrapy.Request(
            url=url,
            callback=self.parse_product_page,
            meta={'product_id': product_id}
        )

def parse_product_page(self, response):
    """解析商品详情页"""
    product_id = response.meta['product_id']

    try:
        # 提取价格信息
        price_script = response.xpath('//script[contains(text(), "price:")]/text()').get()
        price = self.extract_price_from_script(price_script)

        # 提取促销信息
        promotions = self.extract_promotions(response)

        # 提取库存信息
        stock = self.extract_stock_info(response)

        yield {
            'product_id': product_id,
            'price': price,
            'promotions': promotions,
            'stock': stock,
            'crawl_time': int(time.time())
        }

    except Exception as e:
        self.logger.error(f"解析商品页面失败: {e}")

def extract_price_from_script(self, script_text: str) -> float:
    """从JavaScript中提取价格"""
    if not script_text:
        return 0.0

    try:
        # 匹配价格模式
        price_patterns = [
            r'"price"\s*:\s*"([\d.]+)"',
            r"'price'\s*:\s*'([\d.]+)'",
            r'price\s*=\s*([\d.]+)'
        ]

        for pattern in price_patterns:
            match = re.search(pattern, script_text)
            if match:
                return float(match.group(1))

        return 0.0
    except:
        return 0.0

def extract_promotions(self, response) -> List[str]:
    """提取促销信息"""
    promotions = []

    # 提取优惠券
    coupons = response.xpath('//div[contains(@class, "coupon-item")]//text()').getall()
    coupons_text = ' '.join([c.strip() for c in coupons if c.strip()])
    if coupons_text:
        promotions.append(f"优惠券: {coupons_text}")

    # 提取促销活动
    promo_elements = response.xpath('//div[contains(@class, "promotion-item")]')
    for promo in promo_elements:
        promo_text = promo.xpath('.//text()').getall()
        promo_text = ' '.join([p.strip() for p in promo_text if p.strip()])
        if promo_text:
            promotions.append(promo_text)

    return promotions

def extract_stock_info(self, response) -> str:
    """提取库存信息"""
    stock_text = response.xpath('//div[contains(@class, "store-prompt")]//text()').get()
    if stock_text:
        return stock_text.strip()

    # 检查是否有货
    buy_btn = response.xpath('//a[contains(@class, "btn-addtocart")]')
    if buy_btn:
        return "有货"
    else:
        return "无货"

3.1.2 价格监控爬虫

spiders/price_monitor.py

import scrapy
from scrapy import signals
import json
import time
from datetime import datetime, timedelta
from typing import Dict, List

class JDPriceMonitor(scrapy.Spider):
name = 'jd_price_monitor'
custom_settings = {
'CONCURRENT_REQUESTS': 1,
'DOWNLOAD_DELAY': 3,
'AUTOTHROTTLE_ENABLED': True
}

def __init__(self, product_ids: List[str] = None, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.product_ids = product_ids or []
    self.price_history = {}
    self.redis_client = None

@classmethod
def from_crawler(cls, crawler, *args, **kwargs):
    spider = super().from_crawler(crawler, *args, **kwargs)
    crawler.signals.connect(spider.spider_opened, signal=signals.spider_opened)
    crawler.signals.connect(spider.spider_closed, signal=signals.spider_closed)
    return spider

def spider_opened(self, spider):
    """爬虫启动时"""
    from utils.cache import get_redis_client
    self.redis_client = get_redis_client()

    # 加载需要监控的商品
    if not self.product_ids:
        self.product_ids = self.load_monitored_products()

def spider_closed(self, spider):
    """爬虫关闭时"""
    if self.redis_client:
        self.redis_client.close()

def load_monitored_products(self) -> List[str]:
    """从数据库加载需要监控的商品"""
    try:
        from services.price_service import PriceService
        price_service = PriceService()
        products = price_service.get_monitored_products()
        return [p.product_id for p in products]
    except Exception as e:
        self.logger.error(f"加载监控商品失败: {e}")
        return []

def start_requests(self):
    """开始请求"""
    for product_id in self.product_ids:
        url = f"https://item.jd.com/{product_id}.html"
        yield scrapy.Request(
            url=url,
            callback=self.parse_product_price,
            meta={'product_id': product_id},
            dont_filter=True
        )

def parse_product_price(self, response):
    """解析商品价格"""
    product_id = response.meta['product_id']

    try:
        # 提取价格
        price = self.extract_price(response)

        if price > 0:
            # 保存价格记录
            self.save_price_record(product_id, price)

            # 检查价格变化
            price_change = self.check_price_change(product_id, price)

            yield {
                'product_id': product_id,
                'price': price,
                'price_change': price_change,
                'timestamp': int(time.time()),
                'status': 'success'
            }
        else:
            yield {
                'product_id': product_id,
                'price': 0.0,
                'timestamp': int(time.time()),
                'status': 'failed',
                'error': '价格提取失败'
            }

    except Exception as e:
        self.logger.error(f"价格监控失败: {e}")
        yield {
            'product_id': product_id,
            'price': 0.0,
            'timestamp': int(time.time()),
            'status': 'failed',
            'error': str(e)
        }

def extract_price(self, response) -> float:
    """提取价格"""
    # 多种方式提取价格
    price_selectors = [
        '//span[@class="price J-p-{}"]/text()',
        '//strong[@class="J-p-{}"]/i/text()',
        '//span[contains(@class, "price J-p-")]/text()',
        '//script[contains(text(), "price:")]/text()'
    ]

    for selector in price_selectors:
        try:
            price_text = response.xpath(selector).get()
            if price_text:
                # 清理价格文本
                price_text = price_text.replace('¥', '').replace('¥', '').strip()
                if price_text and price_text.replace('.', '').isdigit():
                    return float(price_text)
        except:
            continue

    return 0.0

def save_price_record(self, product_id: str, price: float):
    """保存价格记录"""
    try:
        from services.price_service import PriceService
        price_service = PriceService()
        price_service.save_price_record(product_id, price)

        # 缓存最新价格
        if self.redis_client:
            cache_key = f"jd:price:{product_id}"
            self.redis_client.setex(
                cache_key, 
                3600,  # 1小时
                json.dumps({
                    'price': price,
                    'timestamp': int(time.time())
                })
            )
    except Exception as e:
        self.logger.error(f"保存价格记录失败: {e}")

def check_price_change(self, product_id: str, current_price: float) -> Dict:
    """检查价格变化"""
    try:
        from services.price_service import PriceService
        price_service = PriceService()

        # 获取历史价格
        history = price_service.get_price_history(product_id, days=7)
        if not history:
            return {'change': 0.0, 'type': 'new'}

        # 计算价格变化
        last_price = history[-1].price
        change = current_price - last_price
        change_percent = (change / last_price) * 100 if last_price > 0 else 0.0

        # 检查是否触发预警
        if change_percent < -10:  # 降价超过10%
            self.trigger_price_alert(product_id, current_price, change_percent)

        return {
            'change': change_percent,
            'type': 'drop' if change_percent < 0 else 'rise',
            'last_price': last_price
        }

    except Exception as e:
        self.logger.error(f"检查价格变化失败: {e}")
        return {'change': 0.0, 'type': 'unknown'}

def trigger_price_alert(self, product_id: str, price: float, change_percent: float):
    """触发价格预警"""
    try:
        from services.notification_service import NotificationService
        notification_service = NotificationService()

        # 获取商品信息
        from services.price_service import PriceService
        price_service = PriceService()
        product = price_service.get_product_info(product_id)

        if product:
            message = f"🚨 价格预警!\n商品:{product.title[:30]}...\n当前价格:¥{price}\n降价幅度:{abs(change_percent):.1f}%"
            notification_service.send_price_alert(product_id, message)
    except Exception as e:
        self.logger.error(f"发送价格预警失败: {e}")

3.2 Flask Web应用
3.2.1 主应用

web/app.py

from flask import Flask, request, jsonify, render_template
from flask_restful import Api
from flask_apscheduler import APScheduler
import logging
from utils.config import Config
from utils.database import db, init_db
from services.wechat_service import WeChatService
from services.price_service import PriceService
from services.notification_service import NotificationService

配置日志

logging.basicConfig(
level=getattr(logging, Config.LOG_LEVEL),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(Config.LOG_FILE),
logging.StreamHandler()
]
)

app = Flask(name)
app.config.from_object(Config)

初始化数据库

init_db(app)

初始化API

api = Api(app)

初始化定时任务

scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()

初始化服务

wechat_service = WeChatService()
price_service = PriceService()
notification_service = NotificationService()

注册路由

from web.routes.wechat_routes import wechat_bp
from web.routes.price_routes import price_bp
from web.routes.product_routes import product_bp

app.register_blueprint(wechat_bp, url_prefix='/wechat')
app.register_blueprint(price_bp, url_prefix='/price')
app.register_blueprint(product_bp, url_prefix='/product')

@app.route('/')
def index():
"""首页"""
return render_template('index.html')

@app.route('/health')
def health_check():
"""健康检查"""
return jsonify({'status': 'healthy', 'timestamp': int(time.time())})

@app.route('/search')
def search_products():
"""搜索商品"""
keyword = request.args.get('keyword', '')
if not keyword:
return jsonify({'error': '请输入搜索关键词'}), 400

try:
    results = price_service.search_products(keyword)
    return jsonify({
        'keyword': keyword,
        'results': results,
        'count': len(results)
    })
except Exception as e:
    logging.error(f"搜索失败: {e}")
    return jsonify({'error': '搜索失败'}), 500

定时任务

@scheduler.task('interval', id='monitor_prices', hours=1)
def monitor_prices():
"""定时监控价格"""
try:
from spiders.price_monitor import JDPriceMonitor
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings

    # 启动价格监控爬虫
    settings = get_project_settings()
    process = CrawlerProcess(settings)
    process.crawl(JDPriceMonitor)
    process.start()

    logging.info("价格监控任务执行完成")
except Exception as e:
    logging.error(f"价格监控任务失败: {e}")

if name == 'main':
app.run(host='0.0.0.0', port=5000, debug=Config.DEBUG)
3.2.2 微信路由

web/routes/wechat_routes.py

from flask import Blueprint, request, jsonify
import re
from services.wechat_service import WeChatService
from services.price_service import PriceService
import logging

wechat_bp = Blueprint('wechat', name)
wechat_service = WeChatService()
price_service = PriceService()

@wechat_bp.route('/message', methods=['POST'])
def handle_wechat_message():
"""处理微信消息"""
try:
data = request.json
if not data:
return jsonify({'error': '无效的请求数据'}), 400

    message_type = data.get('type', '')
    content = data.get('content', '')
    user_id = data.get('user_id', '')

    # 验证用户权限
    if not wechat_service.validate_user(user_id):
        return jsonify({'error': '用户无权限'}), 403

    # 处理不同类型的消息
    if message_type == 'text':
        response = handle_text_message(content, user_id)
    elif message_type == 'image':
        response = handle_image_message(content, user_id)
    else:
        response = {'type': 'text', 'content': '暂不支持此类型消息'}

    return jsonify(response)

except Exception as e:
    logging.error(f"处理微信消息失败: {e}")
    return jsonify({'error': '处理消息失败'}), 500

def handle_text_message(content: str, user_id: str) -> dict:
"""处理文本消息"""

# 帮助命令
if content in ['帮助', 'help', '?']:
    return {
        'type': 'text',
        'content': get_help_message()
    }

# 搜索商品
if content.startswith('搜索 ') or content.startswith('search '):
    keyword = content.replace('搜索 ', '').replace('search ', '')
    return search_products(keyword, user_id)

# 查询价格
if content.startswith('价格 ') or content.startswith('price '):
    product_id = content.replace('价格 ', '').replace('price ', '')
    return get_product_price(product_id, user_id)

# 设置预警
if content.startswith('预警 ') or content.startswith('alert '):
    parts = content.split(' ')
    if len(parts) >= 3:
        product_id = parts[1]
        target_price = float(parts[2])
        return set_price_alert(product_id, target_price, user_id)

# 查询历史价格
if content.startswith('历史 ') or content.startswith('history '):
    product_id = content.replace('历史 ', '').replace('history ', '')
    return get_price_history(product_id, user_id)

# 收藏商品
if content.startswith('收藏 ') or content.startswith('fav '):
    product_id = content.replace('收藏 ', '').replace('fav ', '')
    return add_to_favorites(product_id, user_id)

# 查看收藏
if content in ['收藏列表', 'favorites']:
    return get_favorites(user_id)

# 默认回复
return {
    'type': 'text',
    'content': '请输入"帮助"查看可用命令'
}

def get_help_message() -> str:
"""获取帮助信息"""
return """
🤖 京东比价机器人使用指南:

🔍 搜索商品:
搜索 iPhone 15
search 笔记本电脑

💰 查询价格:
价格 100012345678
price 100012345678

📊 历史价格:
历史 100012345678
history 100012345678

🔔 价格预警:
预警 100012345678 5999
alert 100012345678 5999

⭐ 收藏管理:
收藏 100012345678
收藏列表

📈 价格趋势图:
趋势 100012345678
trend 100012345678
"""

def search_products(keyword: str, user_id: str) -> dict:
"""搜索商品"""
try:
results = price_service.search_products(keyword, limit=5)

    if not results:
        return {
            'type': 'text',
            'content': f'未找到与"{keyword}"相关的商品'
        }

    # 构建回复消息
    message = f"🔍 搜索“{keyword}”结果:\n\n"
    for i, product in enumerate(results, 1):
        message += f"{i}. {product['title'][:30]}...\n"
        message += f"   💰 ¥{product['price']} | 👑 {product['shop']}\n"
        message += f"   📞 评论: {product['comment_count']}\n"
        message += f"   🔗 ID: {product['product_id']}\n\n"

    message += "输入“价格 ID”查看详情,如:价格 100012345678"

    return {
        'type': 'text',
        'content': message
    }

except Exception as e:
    logging.error(f"搜索商品失败: {e}")
    return {
        'type': 'text',
        'content': '搜索失败,请稍后重试'
    }

def get_product_price(product_id: str, user_id: str) -> dict:
"""获取商品价格"""
try:
product = price_service.get_product_info(product_id)
if not product:
return {
'type': 'text',
'content': f'未找到商品 ID: {product_id}'
}

    # 获取当前价格
    current_price = price_service.get_current_price(product_id)

    # 获取价格变化
    price_change = price_service.get_price_change(product_id, days=7)

    message = f"📱 {product.title}\n\n"
    message += f"💰 当前价格: ¥{current_price}\n"

    if price_change:
        change_type = "📈" if price_change['change'] > 0 else "📉"
        message += f"{change_type} 7天变化: {price_change['change']:.1f}%\n"

    message += f"🏪 店铺: {product.shop}\n"
    message += f"📞 评论: {product.comment_count}\n\n"
    message += "输入“历史 ID”查看价格趋势"

    return {
        'type': 'text',
        'content': message
    }

except Exception as e:
    logging.error(f"获取商品价格失败: {e}")
    return {
        'type': 'text',
        'content': '获取价格失败,请稍后重试'
    }

def set_price_alert(product_id: str, target_price: float, user_id: str) -> dict:
"""设置价格预警"""
try:
success = price_service.set_price_alert(user_id, product_id, target_price)

    if success:
        return {
            'type': 'text',
            'content': f'✅ 预警设置成功!\n当价格 ≤ ¥{target_price} 时会通知您'
        }
    else:
        return {
            'type': 'text',
            'content': '预警设置失败,请检查商品ID'
        }

except Exception as e:
    logging.error(f"设置价格预警失败: {e}")
    return {
        'type': 'text',
        'content': '预警设置失败,请稍后重试'
    }

3.3 核心服务
3.3.1 微信服务

services/wechat_service.py

import itchat
import qrcode
import os
import threading
import time
from typing import Dict, List, Optional
from utils.config import Config
import logging

class WeChatService:
"""微信服务"""

def __init__(self):
    self.is_logged_in = False
    self.friends = {}
    self.msg_handler = None
    self.login_thread = None
    self.qr_path = Config.WECHAT_QR_CODE_PATH

def login(self) -> bool:
    """登录微信"""
    try:
        # 创建二维码
        qr = qrcode.QRCode(version=1, box_size=10, border=5)
        qr.add_data('https://login.weixin.qq.com/')
        qr.make(fit=True)

        img = qr.make_image(fill_color="black", back_color="white")
        img.save(self.qr_path)
        print(f"请扫描二维码登录微信: {self.qr_path}")

        # 登录
        itchat.auto_login(
            hotReload=Config.WECHAT_AUTO_LOGIN,
            picDir=self.qr_path
        )

        self.is_logged_in = True
        self.friends = itchat.get_friends(update=True)

        # 启动消息监听
        threading.Thread(target=self.start_listening, daemon=True).start()

        logging.info("微信登录成功")
        return True

    except Exception as e:
        logging.error(f"微信登录失败: {e}")
        return False

def start_listening(self):
    """启动消息监听"""
    @itchat.msg_register(itchat.content.TEXT)
    def handle_text_message(msg):
        try:
            user_id = msg['FromUserName']
            content = msg['Text']

            # 转发到Flask处理
            if self.msg_handler:
                response = self.msg_handler(user_id, content)
                if response:
                    self.send_message(user_id, response)

        except Exception as e:
            logging.error(f"处理微信消息失败: {e}")

    itchat.run()

def send_message(self, user_id: str, message: str) -> bool:
    """发送微信消息"""
    try:
        itchat.send(message, toUserName=user_id)
        return True
    except Exception as e:
        logging.error(f"发送微信消息失败: {e}")
        return False

def validate_user(self, user_id: str) -> bool:
    """验证用户权限"""
    if not Config.USER_WHITELIST:
        return True

    # 检查用户是否在白名单中
    for friend in self.friends:
        if friend['UserName'] == user_id:
            return friend['NickName'] in Config.USER_WHITELIST

    return False

def get_friend_info(self, user_id: str) -> Optional[Dict]:
    """获取好友信息"""
    for friend in self.friends:
        if friend['UserName'] == user_id:
            return {
                'nickname': friend['NickName'],
                'remark_name': friend['RemarkName'],
                'sex': friend['Sex']
            }
    return None

def set_message_handler(self, handler):
    """设置消息处理器"""
    self.msg_handler = handler

3.3.2 价格服务

services/price_service.py

from typing import Dict, List, Optional
from datetime import datetime, timedelta
from utils.database import db
from web.models.product_model import Product, PriceHistory, PriceAlert
import logging
import json
from utils.cache import get_redis_client

class PriceService:
"""价格服务"""

def __init__(self):
    self.redis_client = get_redis_client()

def search_products(self, keyword: str, limit: int = 10) -> List[Dict]:
    """搜索商品"""
    try:
        # 检查缓存
        cache_key = f"jd:search:{keyword}"
        cached = self.redis_client.get(cache_key)
        if cached:
            return json.loads(cached)

        # 调用爬虫搜索
        from spiders.jd_spider import JDSpider
        from scrapy.crawler import CrawlerProcess
        from scrapy.utils.project import get_project_settings

        settings = get_project_settings()
        process = CrawlerProcess(settings)

        results = []
        def collect_results(item):
            results.extend(item.get('results', []))

        process.crawl(JDSpider, keyword=keyword, max_results=limit)
        process.start()

        # 缓存结果
        if results:
            self.redis_client.setex(cache_key, 1800, json.dumps(results))  # 30分钟

        return results

    except Exception as e:
        logging.error(f"搜索商品失败: {e}")
        return []

def get_product_info(self, product_id: str) -> Optional[Product]:
    """获取商品信息"""
    try:
        product = Product.query.filter_by(product_id=product_id).first()
        if product:
            return product

        # 从京东获取商品信息
        from spiders.jd_price_spider import JDPriceSpider
        from scrapy.crawler import CrawlerProcess
        from scrapy.utils.project import get_project_settings

        settings = get_project_settings()
        process = CrawlerProcess(settings)

        result = None
        def collect_result(item):
            nonlocal result
            result = item

        process.crawl(JDPriceSpider, product_ids=[product_id])
        process.start()

        if result:
            # 保存到数据库
            product = Product(
                product_id=product_id,
                title=result.get('title', ''),
                shop=result.get('shop', ''),
                comment_count=result.get('comment_count', 0),
                image_url=result.get('image_url', ''),
                link=result.get('link', '')
            )
            db.session.add(product)
            db.session.commit()
            return product

        return None

    except Exception as e:
        logging.error(f"获取商品信息失败: {e}")
        return None

def get_current_price(self, product_id: str) -> float:
    """获取当前价格"""
    try:
        # 检查缓存
        cache_key = f"jd:price:{product_id}"
        cached = self.redis_client.get(cache_key)
        if cached:
            data = json.loads(cached)
            return data.get('price', 0.0)

        # 从数据库获取最新价格
        price_record = PriceHistory.query.filter_by(
            product_id=product_id
        ).order_by(PriceHistory.timestamp.desc()).first()

        if price_record:
            # 更新缓存
            self.redis_client.setex(
                cache_key, 
                3600, 
                json.dumps({
                    'price': price_record.price,
                    'timestamp': int(price_record.timestamp.timestamp())
                })
            )
            return price_record.price

        return 0.0

    except Exception as e:
        logging.error(f"获取当前价格失败: {e}")
        return 0.0

def save_price_record(self, product_id: str, price: float) -> bool:
    """保存价格记录"""
    try:
        # 检查是否重复记录
        latest_record = PriceHistory.query.filter_by(
            product_id=product_id
        ).order_by(PriceHistory.timestamp.desc()).first()

        if latest_record and latest_record.price == price:
            # 价格未变化,不重复记录
            return True

        # 保存新记录
        price_record = PriceHistory(
            product_id=product_id,
            price=price,
            timestamp=datetime.now()
        )
        db.session.add(price_record)
        db.session.commit()

        logging.info(f"保存价格记录: {product_id} - ¥{price}")
        return True

    except Exception as e:
        logging.error(f"保存价格记录失败: {e}")
        db.session.rollback()
        return False

def get_price_history(self, product_id: str, days: int = 30) -> List[PriceHistory]:
    """获取价格历史"""
    try:
        start_date = datetime.now() - timedelta(days=days)
        return PriceHistory.query.filter(
            PriceHistory.product_id == product_id,
            PriceHistory.timestamp >= start_date
        ).order_by(PriceHistory.timestamp.asc()).all()

    except Exception as e:
        logging.error(f"获取价格历史失败: {e}")
        return []

def get_price_change(self, product_id: str, days: int = 7) -> Optional[Dict]:
    """获取价格变化"""
    try:
        history = self.get_price_history(product_id, days=days)
        if len(history) < 2:
            return None

        current_price = history[-1].price
        old_price = history[0].price

        change = current_price - old_price
        change_percent = (change / old_price) * 100 if old_price > 0 else 0.0

        return {
            'change': change_percent,
            'type': 'drop' if change_percent < 0 else 'rise',
            'current_price': current_price,
            'old_price': old_price
        }

    except Exception as e:
        logging.error(f"计算价格变化失败: {e}")
        return None

def set_price_alert(self, user_id: str, product_id: str, target_price: float) -> bool:
    """设置价格预警"""
    try:
        # 检查是否已存在
        existing_alert = PriceAlert.query.filter_by(
            user_id=user_id,
            product_id=product_id
        ).first()

        if existing_alert:
            existing_alert.target_price = target_price
            existing_alert.updated_at = datetime.now()
        else:
            alert = PriceAlert(
                user_id=user_id,
                product_id=product_id,
                target_price=target_price,
                created_at=datetime.now(),
                updated_at=datetime.now()
            )
            db.session.add(alert)

        db.session.commit()
        return True

    except Exception as e:
        logging.error(f"设置价格预警失败: {e}")
        db.session.rollback()
        return False

def get_monitored_products(self) -> List[Product]:
    """获取需要监控的商品"""
    try:
        # 获取所有设置了预警的商品
        alerts = PriceAlert.query.distinct(PriceAlert.product_id).all()
        product_ids = [alert.product_id for alert in alerts]

        return Product.query.filter(Product.product_id.in_(product_ids)).all()

    except Exception as e:
        logging.error(f"获取监控商品失败: {e}")
        return []

3.3.3 通知服务

services/notification_service.py

from typing import Dict, List
from utils.database import db
from web.models.product_model import PriceAlert
from services.wechat_service import WeChatService
import logging

class NotificationService:
"""通知服务"""

def __init__(self):
    self.wechat_service = WeChatService()

def send_price_alert(self, product_id: str, message: str) -> bool:
    """发送价格预警"""
    try:
        # 获取设置了预警的用户
        alerts = PriceAlert.query.filter_by(product_id=product_id).all()

        for alert in alerts:
            try:
                # 检查当前价格是否达到目标价
                from services.price_service import PriceService
                price_service = PriceService()
                current_price = price_service.get_current_price(product_id)

                if current_price <= alert.target_price:
                    # 发送通知
                    self.wechat_service.send_message(alert.user_id, message)
                    logging.info(f"发送价格预警给用户 {alert.user_id}: {message}")
            except Exception as e:
                logging.error(f"发送预警给用户 {alert.user_id} 失败: {e}")
                continue

        return True

    except Exception as e:
        logging.error(f"发送价格预警失败: {e}")
        return False

def send_daily_summary(self, user_id: str) -> bool:
    """发送每日摘要"""
    try:
        # 获取用户关注商品的价格变化
        alerts = PriceAlert.query.filter_by(user_id=user_id).all()

        if not alerts:
            return True

        message = "📊 今日价格变化摘要:\n\n"
        has_changes = False

        for alert in alerts:
            from services.price_service import PriceService
            price_service = PriceService()

            # 获取24小时价格变化
            price_change = price_service.get_price_change(alert.product_id, days=1)
            if price_change and abs(price_change['change']) > 1.0:  # 变化超过1%
                has_changes = True
                change_icon = "📈" if price_change['change'] > 0 else "📉"
                message += f"{change_icon} {alert.product.title[:20]}...\n"
                message += f"   变化: {price_change['change']:.1f}% | 当前: ¥{price_change['current_price']}\n\n"

        if has_changes:
            self.wechat_service.send_message(user_id, message)

        return True

    except Exception as e:
        logging.error(f"发送每日摘要失败: {e}")
        return False

3.4 数据模型

web/models/product_model.py

from utils.database import db
from datetime import datetime

class Product(db.Model):
"""商品模型"""
tablename = 'products'

id = db.Column(db.Integer, primary_key=True)
product_id = db.Column(db.String(20), unique=True, nullable=False)  # 京东商品ID
title = db.Column(db.Text, nullable=False)  # 商品标题
shop = db.Column(db.String(200), nullable=False)  # 店铺名称
comment_count = db.Column(db.Integer, default=0)  # 评论数
image_url = db.Column(db.Text)  # 图片URL
link = db.Column(db.Text)  # 商品链接
created_at = db.Column(db.DateTime, default=datetime.now)
updated_at = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now)

def __repr__(self):
    return f'<Product {self.product_id}: {self.title[:20]}>'

class PriceHistory(db.Model):
"""价格历史模型"""
tablename = 'price_history'

id = db.Column(db.Integer, primary_key=True)
product_id = db.Column(db.String(20), db.ForeignKey('products.product_id'), nullable=False)
price = db.Column(db.Float, nullable=False)  # 价格
timestamp = db.Column(db.DateTime, nullable=False)  # 记录时间

# 关系
product = db.relationship('Product', backref=db.backref('price_history', lazy=True))

def __repr__(self):
    return f'<PriceHistory {self.product_id}: ¥{self.price} at {self.timestamp}>'

class PriceAlert(db.Model):
"""价格预警模型"""
tablename = 'price_alerts'

id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.String(100), nullable=False)  # 微信用户ID
product_id = db.Column(db.String(20), db.ForeignKey('products.product_id'), nullable=False)
target_price = db.Column(db.Float, nullable=False)  # 目标价格
created_at = db.Column(db.DateTime, default=datetime.now)
updated_at = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now)

# 关系
product = db.relationship('Product', backref=db.backref('price_alerts', lazy=True))

def __repr__(self):
    return f'<PriceAlert {self.user_id} for {self.product_id}: ¥{self.target_price}>'

四、部署配置
4.1 Docker配置

docker-compose.yml

version: '3.8'

services:
web:
build: .
ports:

  - "5000:5000"
environment:
  - DEBUG=False
  - MYSQL_HOST=mysql
  - REDIS_URL=redis://redis:6379/0
depends_on:
  - mysql
  - redis
volumes:
  - ./logs:/app/logs
  - ./qr_code.png:/app/qr_code.png
restart: unless-stopped

mysql:
image: mysql:8.0
environment:

  - MYSQL_ROOT_PASSWORD=password
  - MYSQL_DATABASE=jd_price_bot
volumes:
  - mysql_data:/var/lib/mysql
restart: unless-stopped

redis:
image: redis:7-alpine
restart: unless-stopped

nginx:
image: nginx:alpine
ports:

  - "80:80"
  - "443:443"
volumes:
  - ./nginx.conf:/etc/nginx/nginx.conf
  - ./ssl:/etc/nginx/ssl
depends_on:
  - web
restart: unless-stopped

volumes:
mysql_data:
4.2 Dockerfile
FROM python:3.11-slim

WORKDIR /app

安装系统依赖

RUN apt-get update && apt-get install -y \
gcc \
g++ \
wget \
gnupg \
unzip \
&& rm -rf /var/lib/apt/lists/*

安装Chrome

RUN wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - \
&& sh -c 'echo "deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google.list' \
&& apt-get update \
&& apt-get install -y google-chrome-stable

安装ChromeDriver

RUN CHROMEDRIVER_VERSION=curl -sS chromedriver.storage.googleapis.com/LATEST_RELEASE \
&& wget -O /tmp/chromedriver.zip http://chromedriver.storage.googleapis.com/$CHROMEDRIVER_VERSION/chromedriver_linux64.zip \
&& unzip /tmp/chromedriver.zip -d /usr/local/bin/ \
&& rm /tmp/chromedriver.zip

复制依赖文件

COPY requirements.txt .

安装Python依赖

RUN pip install --no-cache-dir -r requirements.txt

复制应用代码

COPY . .

创建日志目录

RUN mkdir -p logs

暴露端口

EXPOSE 5000

启动命令

CMD ["python", "web/app.py"]
4.3 Nginx配置

nginx.conf

events {
worker_connections 1024;
}

http {
upstream jd_price_bot {
server web:5000;
}

server {
    listen 80;
    server_name your-domain.com;

    # 重定向到HTTPS
    return 301 https://$server_name$request_uri;
}

server {
    listen 443 ssl;
    server_name your-domain.com;

    ssl_certificate /etc/nginx/ssl/cert.pem;
    ssl_certificate_key /etc/nginx/ssl/private.key;

    ssl_protocols TLSv1.2 TLSv1.3;
    ssl_ciphers HIGH:!aNULL:!MD5;

    location / {
        proxy_pass http://jd_price_bot;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }

    # WebSocket支持
    location /websocket {
        proxy_pass http://jd_price_bot;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
    }
}

}
五、使用指南
5.1 启动应用

1. 克隆项目

git clone https://github.com/your-username/jd-price-bot.git
cd jd-price-bot

2. 配置环境变量

cp .env.example .env

编辑.env文件,配置数据库、微信等参数

3. 启动服务

docker-compose up -d

4. 初始化数据库

docker-compose exec web python -c "from utils.database import db, init_db; from web.app import app; init_db(app); db.create_all()"

5. 扫描微信二维码登录

查看生成的二维码文件

open qr_code.png
5.2 微信交互示例
用户:帮助
机器人:
🤖 京东比价机器人使用指南:

🔍 搜索商品:
搜索 iPhone 15
search 笔记本电脑

💰 查询价格:
价格 100012345678
price 100012345678

📊 历史价格:
历史 100012345678
history 100012345678

🔔 价格预警:
预警 100012345678 5999
alert 100012345678 5999

⭐ 收藏管理:
收藏 100012345678
收藏列表

📈 价格趋势图:
趋势 100012345678
trend 100012345678

用户:搜索 iPhone 15
机器人:
🔍 搜索“iPhone 15”结果:

  1. Apple iPhone 15 (A3092) 128GB 粉色 支持移动联通电信5G...
    💰 ¥4999 | 👑 Apple产品京东自营旗舰店
    📞 评论: 200000
    🔗 ID: 100012345678

  2. Apple iPhone 15 (A3092) 256GB 蓝色 支持移动联通电信5G...
    💰 ¥5999 | 👑 Apple产品京东自营旗舰店
    📞 评论: 150000
    🔗 ID: 100012345679

输入“价格 ID”查看详情,如:价格 100012345678

用户:价格 100012345678
机器人:
📱 Apple iPhone 15 (A3092) 128GB 粉色 支持移动联通电信5G...

💰 当前价格: ¥4999
📉 7天变化: -5.2%
🏪 店铺: Apple产品京东自营旗舰店
📞 评论: 200000

输入“历史 ID”查看价格趋势

用户:预警 100012345678 4500
机器人:
✅ 预警设置成功!
当价格 ≤ ¥4500 时会通知您
六、项目特点
6.1 技术亮点
智能爬虫:结合Scrapy+Selenium,支持动态页面渲染
实时监控:定时任务自动监控价格变化
微信集成:无缝对接微信,支持多种交互方式
数据可视化:生成价格趋势图,提供购买建议
高性能缓存:Redis缓存提升查询性能
6.2 应用场景
个人购物:实时比价,抓住最佳购买时机
电商运营:监控竞品价格,制定定价策略
数据分析:分析商品价格波动规律
智能推荐:基于价格趋势推荐购买时间
6.3 注意事项
合规使用:遵守京东Robots协议,控制爬取频率
用户隐私:妥善保管用户微信信息
API限制:合理使用第三方服务,避免被封禁
数据安全:加密存储敏感信息
通过本项目,你可以:
✅ 掌握Scrapy爬虫开发技巧
✅ 实现Flask RESTful API开发
✅ 构建微信机器人应用
✅ 设计高性能数据存储方案
✅ 部署完整的Web应用系统
项目代码已包含完整的错误处理、日志记录、性能优化,可直接用于生产环境或作为学习参考。

相关文章
|
10天前
|
人工智能 自然语言处理 Shell
🦞 如何在 OpenClaw (Clawdbot/Moltbot) 配置阿里云百炼 API
本教程指导用户在开源AI助手Clawdbot中集成阿里云百炼API,涵盖安装Clawdbot、获取百炼API Key、配置环境变量与模型参数、验证调用等完整流程,支持Qwen3-max thinking (Qwen3-Max-2026-01-23)/Qwen - Plus等主流模型,助力本地化智能自动化。
🦞 如何在 OpenClaw (Clawdbot/Moltbot) 配置阿里云百炼 API
|
6天前
|
人工智能 机器人 Linux
保姆级 OpenClaw (原 Clawdbot)飞书对接教程 手把手教你搭建 AI 助手
OpenClaw(原Clawdbot)是一款开源本地AI智能体,支持飞书等多平台对接。本教程手把手教你Linux下部署,实现数据私有、系统控制、网页浏览与代码编写,全程保姆级操作,240字内搞定专属AI助手搭建!
4424 13
保姆级 OpenClaw (原 Clawdbot)飞书对接教程 手把手教你搭建 AI 助手
|
5天前
|
人工智能 安全 机器人
OpenClaw(原 Clawdbot)钉钉对接保姆级教程 手把手教你打造自己的 AI 助手
OpenClaw(原Clawdbot)是一款开源本地AI助手,支持钉钉、飞书等多平台接入。本教程手把手指导Linux下部署与钉钉机器人对接,涵盖环境配置、模型选择(如Qwen)、权限设置及调试,助你快速打造私有、安全、高权限的专属AI助理。(239字)
3755 10
OpenClaw(原 Clawdbot)钉钉对接保姆级教程 手把手教你打造自己的 AI 助手
|
8天前
|
人工智能 JavaScript 应用服务中间件
零门槛部署本地AI助手:Windows系统Moltbot(Clawdbot)保姆级教程
Moltbot(原Clawdbot)是一款功能全面的智能体AI助手,不仅能通过聊天互动响应需求,还具备“动手”和“跑腿”能力——“手”可读写本地文件、执行代码、操控命令行,“脚”能联网搜索、访问网页并分析内容,“大脑”则可接入Qwen、OpenAI等云端API,或利用本地GPU运行模型。本教程专为Windows系统用户打造,从环境搭建到问题排查,详细拆解全流程,即使无技术基础也能顺利部署本地AI助理。
7008 15
|
6天前
|
存储 人工智能 机器人
OpenClaw是什么?阿里云OpenClaw(原Clawdbot/Moltbot)一键部署官方教程参考
OpenClaw是什么?OpenClaw(原Clawdbot/Moltbot)是一款实用的个人AI助理,能够24小时响应指令并执行任务,如处理文件、查询信息、自动化协同等。阿里云推出的OpenClaw一键部署方案,简化了复杂配置流程,用户无需专业技术储备,即可快速在轻量应用服务器上启用该服务,打造专属AI助理。本文将详细拆解部署全流程、进阶功能配置及常见问题解决方案,确保不改变原意且无营销表述。
4575 4
|
4天前
|
人工智能 机器人 Linux
OpenClaw(Clawdbot、Moltbot)汉化版部署教程指南(零门槛)
OpenClaw作为2026年GitHub上增长最快的开源项目之一,一周内Stars从7800飙升至12万+,其核心优势在于打破传统聊天机器人的局限,能真正执行读写文件、运行脚本、浏览器自动化等实操任务。但原版全英文界面对中文用户存在上手门槛,汉化版通过覆盖命令行(CLI)与网页控制台(Dashboard)核心模块,解决了语言障碍,同时保持与官方版本的实时同步,确保新功能最快1小时内可用。本文将详细拆解汉化版OpenClaw的搭建流程,涵盖本地安装、Docker部署、服务器远程访问等场景,同时提供环境适配、问题排查与国内应用集成方案,助力中文用户高效搭建专属AI助手。
2543 5
|
8天前
|
人工智能 JavaScript API
零门槛部署本地 AI 助手:Clawdbot/Meltbot 部署深度保姆级教程
Clawdbot(Moltbot)是一款智能体AI助手,具备“手”(读写文件、执行代码)、“脚”(联网搜索、分析网页)和“脑”(接入Qwen/OpenAI等API或本地GPU模型)。本指南详解Windows下从Node.js环境搭建、一键安装到Token配置的全流程,助你快速部署本地AI助理。(239字)
4623 23
|
14天前
|
人工智能 API 开发者
Claude Code 国内保姆级使用指南:实测 GLM-4.7 与 Claude Opus 4.5 全方案解
Claude Code是Anthropic推出的编程AI代理工具。2026年国内开发者可通过配置`ANTHROPIC_BASE_URL`实现本地化接入:①极速平替——用Qwen Code v0.5.0或GLM-4.7,毫秒响应,适合日常编码;②满血原版——经灵芽API中转调用Claude Opus 4.5,胜任复杂架构与深度推理。
8566 13