实战!用Scrapy+Flask构建京东商品比价微信机器人

简介: 本项目是一个基于微信的京东智能比价机器人,集成Scrapy+Selenium爬虫、Flask API与itchat/Wechaty微信服务,支持商品搜索、实时比价、30天价格趋势分析、降价自动预警及收藏管理,全栈可部署(Docker+Nginx),助力用户省钱决策。(239字)

一、项目概述
1.1 项目简介
本项目将构建一个智能微信比价机器人,能够:
✅ 实时抓取:用Scrapy抓取京东商品价格信息
✅ 智能比价:分析商品历史价格趋势
✅ 微信交互:通过Flask接收用户查询并返回结果
✅ 定时监控:自动监控商品价格变化
✅ 价格预警:低于目标价格时自动提醒
1.2 技术栈
爬虫框架:Scrapy + Selenium(动态渲染)
Web框架:Flask + RESTful API
微信集成:itchat / Wechaty
数据存储:Redis + MySQL
任务调度:APScheduler
部署运维:Docker + Nginx
1.3 核心功能
🛍️ 商品搜索:通过微信发送商品名,返回比价结果
📊 价格趋势:展示商品30天价格走势
🔔 降价提醒:设置目标价,降价自动通知
⭐ 收藏管理:收藏关注商品,一键查询
📈 数据分析:智能推荐最佳购买时机
二、环境配置
2.1 项目结构
jd_price_bot/
├── spiders/ # Scrapy爬虫
│ ├── jd_spider.py # 京东商品爬虫
│ ├── price_monitor.py # 价格监控爬虫
│ └── middlewares.py # 中间件
├── web/ # Flask应用
│ ├── app.py # Flask主应用
│ ├── routes/ # 路由模块
│ ├── models/ # 数据模型
│ └── templates/ # 模板文件
├── services/ # 业务服务
│ ├── wechat_service.py # 微信服务
│ ├── price_service.py # 价格服务
│ └── notification_service.py # 通知服务
├── utils/ # 工具类
│ ├── database.py # 数据库连接
│ ├── cache.py # 缓存工具
│ └── config.py # 配置管理
├── requirements.txt # 依赖包
├── docker-compose.yml # Docker配置
└── README.md
2.2 环境配置

requirements.txt

Web框架

Flask==2.3.3
flask-restful==0.3.10
flask-sqlalchemy==3.0.5
flask-apscheduler==1.12.4

爬虫相关

Scrapy==2.11.0
selenium==4.15.0
requests==2.31.0
beautifulsoup4==4.12.2
lxml==4.9.3

微信相关

itchat==1.3.10
wechaty==0.8.21

数据库

redis==5.0.1
mysql-connector-python==8.1.0
sqlalchemy==2.0.23

数据处理

pandas==2.1.3
numpy==1.25.2
matplotlib==3.7.2

工具类

python-dotenv==1.0.0
APScheduler==3.10.4
schedule==1.2.0
2.3 配置文件

utils/config.py

import os
from dotenv import load_dotenv

load_dotenv()

class Config:

# 基础配置
SECRET_KEY = os.getenv('SECRET_KEY', 'jd-price-bot-secret-key')
DEBUG = os.getenv('DEBUG', 'False').lower() == 'true'

# 数据库配置
REDIS_URL = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
MYSQL_HOST = os.getenv('MYSQL_HOST', 'localhost')
MYSQL_PORT = os.getenv('MYSQL_PORT', '3306')
MYSQL_USER = os.getenv('MYSQL_USER', 'root')
MYSQL_PASSWORD = os.getenv('MYSQL_PASSWORD', 'password')
MYSQL_DATABASE = os.getenv('MYSQL_DATABASE', 'jd_price_bot')

# 微信配置
WECHAT_QR_CODE_PATH = os.getenv('WECHAT_QR_CODE_PATH', './qr_code.png')
WECHAT_AUTO_LOGIN = os.getenv('WECHAT_AUTO_LOGIN', 'True').lower() == 'true'

# 爬虫配置
JD_SEARCH_URL = 'https://search.jd.com/Search'
JD_PRODUCT_URL = 'https://item.jd.com/{}.html'
SPIDER_DELAY = int(os.getenv('SPIDER_DELAY', '2'))
MAX_PRODUCTS_PER_SEARCH = int(os.getenv('MAX_PRODUCTS_PER_SEARCH', '20'))

# 价格监控配置
PRICE_CHECK_INTERVAL = int(os.getenv('PRICE_CHECK_INTERVAL', '3600'))  # 1小时
PRICE_HISTORY_DAYS = int(os.getenv('PRICE_HISTORY_DAYS', '30'))

# 通知配置
PRICE_DROP_THRESHOLD = float(os.getenv('PRICE_DROP_THRESHOLD', '0.1'))  # 10%降价
ENABLE_PRICE_ALERT = os.getenv('ENABLE_PRICE_ALERT', 'True').lower() == 'true'

# 安全配置
USER_WHITELIST = os.getenv('USER_WHITELIST', '').split(',')  # 用户白名单
MAX_REQUESTS_PER_MINUTE = int(os.getenv('MAX_REQUESTS_PER_MINUTE', '30'))

# 日志配置
LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO')
LOG_FILE = os.getenv('LOG_FILE', './logs/jd_bot.log')

三、核心模块实现
3.1 京东爬虫实现(Scrapy)
3.1.1 商品搜索爬虫

spiders/jd_spider.py

import scrapy
import json
import re
from urllib.parse import quote, urlencode
from typing import Dict, List, Optional
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
import time

class JDSpider(scrapy.Spider):
name = 'jd_spider'
allowed_domains = ['jd.com', 'search.jd.com']

def __init__(self, keyword: str = None, max_results: int = 20, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.keyword = keyword
    self.max_results = max_results
    self.driver = None
    self.setup_driver()

def setup_driver(self):
    """设置Selenium WebDriver"""
    options = webdriver.ChromeOptions()
    options.add_argument('--headless')
    options.add_argument('--no-sandbox')
    options.add_argument('--disable-dev-shm-usage')
    options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36')

    self.driver = webdriver.Chrome(options=options)
    self.driver.implicitly_wait(10)

def start_requests(self):
    """开始请求"""
    if self.keyword:
        url = self.build_search_url(self.keyword)
        yield scrapy.Request(url=url, callback=self.parse_search_results)

def build_search_url(self, keyword: str) -> str:
    """构建搜索URL"""
    params = {
        'keyword': keyword,
        'enc': 'utf-8',
        'wq': keyword,
        'pvid': self.generate_pvid()
    }
    return f"https://search.jd.com/Search?{urlencode(params)}"

def generate_pvid(self) -> str:
    """生成随机pvid"""
    import uuid
    return str(uuid.uuid4()).replace('-', '')[:16]

def parse_search_results(self, response):
    """解析搜索结果页"""
    try:
        # 等待页面加载完成
        WebDriverWait(self.driver, 10).until(
            EC.presence_of_element_located((By.CSS_SELECTOR, ".gl-item"))
        )

        # 获取商品列表
        products = self.driver.find_elements(By.CSS_SELECTOR, ".gl-item")
        results = []

        for i, product in enumerate(products[:self.max_results]):
            try:
                product_info = self.extract_product_info(product)
                if product_info:
                    results.append(product_info)
                    self.logger.info(f"提取商品: {product_info['title'][:30]}")
            except Exception as e:
                self.logger.error(f"提取商品信息失败: {e}")
                continue

        # 返回结果
        yield {
            'keyword': self.keyword,
            'results': results,
            'count': len(results)
        }

    except TimeoutException:
        self.logger.error("搜索页面加载超时")
    finally:
        self.close_driver()

def extract_product_info(self, product_element) -> Optional[Dict]:
    """提取商品信息"""
    try:
        # 商品ID
        data_sku = product_element.get_attribute('data-sku')
        if not data_sku:
            return None

        # 商品标题
        title_element = product_element.find_element(By.CSS_SELECTOR, ".p-name a em")
        title = title_element.text.strip() if title_element else ""

        # 价格
        price_element = product_element.find_element(By.CSS_SELECTOR, ".p-price strong i")
        price = float(price_element.text) if price_element and price_element.text else 0.0

        # 店铺
        shop_element = product_element.find_element(By.CSS_SELECTOR, ".p-shop a")
        shop = shop_element.text.strip() if shop_element else ""

        # 评论数
        comment_element = product_element.find_element(By.CSS_SELECTOR, ".p-commit a")
        comment_text = comment_element.text.strip() if comment_element else ""
        comment_count = self.extract_comment_count(comment_text)

        # 商品链接
        link_element = product_element.find_element(By.CSS_SELECTOR, ".p-name a")
        link = link_element.get_attribute('href') if link_element else ""

        # 图片
        img_element = product_element.find_element(By.CSS_SELECTOR, ".p-img img")
        image_url = img_element.get_attribute('src') or img_element.get_attribute('data-lazy-img')

        return {
            'product_id': data_sku,
            'title': title,
            'price': price,
            'shop': shop,
            'comment_count': comment_count,
            'link': link,
            'image_url': image_url,
            'crawl_time': int(time.time())
        }

    except Exception as e:
        self.logger.error(f"提取商品信息异常: {e}")
        return None

def extract_comment_count(self, comment_text: str) -> int:
    """提取评论数量"""
    if not comment_text:
        return 0

    # 匹配数字
    match = re.search(r'(\d+(\.\d+)?)', comment_text.replace('+', '').replace('万', '0000'))
    if match:
        count = float(match.group(1))
        if '万' in comment_text:
            count *= 10000
        return int(count)
    return 0

def close_driver(self):
    """关闭浏览器"""
    if self.driver:
        self.driver.quit()

def closed(self, reason):
    """爬虫关闭时清理资源"""
    self.close_driver()
    super().closed(reason)

价格详情爬虫

class JDPriceSpider(scrapy.Spider):
name = 'jd_price_spider'

def __init__(self, product_ids: List[str] = None, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.product_ids = product_ids or []

def start_requests(self):
    """开始请求"""
    for product_id in self.product_ids:
        url = f"https://item.jd.com/{product_id}.html"
        yield scrapy.Request(
            url=url,
            callback=self.parse_product_page,
            meta={'product_id': product_id}
        )

def parse_product_page(self, response):
    """解析商品详情页"""
    product_id = response.meta['product_id']

    try:
        # 提取价格信息
        price_script = response.xpath('//script[contains(text(), "price:")]/text()').get()
        price = self.extract_price_from_script(price_script)

        # 提取促销信息
        promotions = self.extract_promotions(response)

        # 提取库存信息
        stock = self.extract_stock_info(response)

        yield {
            'product_id': product_id,
            'price': price,
            'promotions': promotions,
            'stock': stock,
            'crawl_time': int(time.time())
        }

    except Exception as e:
        self.logger.error(f"解析商品页面失败: {e}")

def extract_price_from_script(self, script_text: str) -> float:
    """从JavaScript中提取价格"""
    if not script_text:
        return 0.0

    try:
        # 匹配价格模式
        price_patterns = [
            r'"price"\s*:\s*"([\d.]+)"',
            r"'price'\s*:\s*'([\d.]+)'",
            r'price\s*=\s*([\d.]+)'
        ]

        for pattern in price_patterns:
            match = re.search(pattern, script_text)
            if match:
                return float(match.group(1))

        return 0.0
    except:
        return 0.0

def extract_promotions(self, response) -> List[str]:
    """提取促销信息"""
    promotions = []

    # 提取优惠券
    coupons = response.xpath('//div[contains(@class, "coupon-item")]//text()').getall()
    coupons_text = ' '.join([c.strip() for c in coupons if c.strip()])
    if coupons_text:
        promotions.append(f"优惠券: {coupons_text}")

    # 提取促销活动
    promo_elements = response.xpath('//div[contains(@class, "promotion-item")]')
    for promo in promo_elements:
        promo_text = promo.xpath('.//text()').getall()
        promo_text = ' '.join([p.strip() for p in promo_text if p.strip()])
        if promo_text:
            promotions.append(promo_text)

    return promotions

def extract_stock_info(self, response) -> str:
    """提取库存信息"""
    stock_text = response.xpath('//div[contains(@class, "store-prompt")]//text()').get()
    if stock_text:
        return stock_text.strip()

    # 检查是否有货
    buy_btn = response.xpath('//a[contains(@class, "btn-addtocart")]')
    if buy_btn:
        return "有货"
    else:
        return "无货"

3.1.2 价格监控爬虫

spiders/price_monitor.py

import scrapy
from scrapy import signals
import json
import time
from datetime import datetime, timedelta
from typing import Dict, List

class JDPriceMonitor(scrapy.Spider):
name = 'jd_price_monitor'
custom_settings = {
'CONCURRENT_REQUESTS': 1,
'DOWNLOAD_DELAY': 3,
'AUTOTHROTTLE_ENABLED': True
}

def __init__(self, product_ids: List[str] = None, *args, **kwargs):
    super().__init__(*args, **kwargs)
    self.product_ids = product_ids or []
    self.price_history = {}
    self.redis_client = None

@classmethod
def from_crawler(cls, crawler, *args, **kwargs):
    spider = super().from_crawler(crawler, *args, **kwargs)
    crawler.signals.connect(spider.spider_opened, signal=signals.spider_opened)
    crawler.signals.connect(spider.spider_closed, signal=signals.spider_closed)
    return spider

def spider_opened(self, spider):
    """爬虫启动时"""
    from utils.cache import get_redis_client
    self.redis_client = get_redis_client()

    # 加载需要监控的商品
    if not self.product_ids:
        self.product_ids = self.load_monitored_products()

def spider_closed(self, spider):
    """爬虫关闭时"""
    if self.redis_client:
        self.redis_client.close()

def load_monitored_products(self) -> List[str]:
    """从数据库加载需要监控的商品"""
    try:
        from services.price_service import PriceService
        price_service = PriceService()
        products = price_service.get_monitored_products()
        return [p.product_id for p in products]
    except Exception as e:
        self.logger.error(f"加载监控商品失败: {e}")
        return []

def start_requests(self):
    """开始请求"""
    for product_id in self.product_ids:
        url = f"https://item.jd.com/{product_id}.html"
        yield scrapy.Request(
            url=url,
            callback=self.parse_product_price,
            meta={'product_id': product_id},
            dont_filter=True
        )

def parse_product_price(self, response):
    """解析商品价格"""
    product_id = response.meta['product_id']

    try:
        # 提取价格
        price = self.extract_price(response)

        if price > 0:
            # 保存价格记录
            self.save_price_record(product_id, price)

            # 检查价格变化
            price_change = self.check_price_change(product_id, price)

            yield {
                'product_id': product_id,
                'price': price,
                'price_change': price_change,
                'timestamp': int(time.time()),
                'status': 'success'
            }
        else:
            yield {
                'product_id': product_id,
                'price': 0.0,
                'timestamp': int(time.time()),
                'status': 'failed',
                'error': '价格提取失败'
            }

    except Exception as e:
        self.logger.error(f"价格监控失败: {e}")
        yield {
            'product_id': product_id,
            'price': 0.0,
            'timestamp': int(time.time()),
            'status': 'failed',
            'error': str(e)
        }

def extract_price(self, response) -> float:
    """提取价格"""
    # 多种方式提取价格
    price_selectors = [
        '//span[@class="price J-p-{}"]/text()',
        '//strong[@class="J-p-{}"]/i/text()',
        '//span[contains(@class, "price J-p-")]/text()',
        '//script[contains(text(), "price:")]/text()'
    ]

    for selector in price_selectors:
        try:
            price_text = response.xpath(selector).get()
            if price_text:
                # 清理价格文本
                price_text = price_text.replace('¥', '').replace('¥', '').strip()
                if price_text and price_text.replace('.', '').isdigit():
                    return float(price_text)
        except:
            continue

    return 0.0

def save_price_record(self, product_id: str, price: float):
    """保存价格记录"""
    try:
        from services.price_service import PriceService
        price_service = PriceService()
        price_service.save_price_record(product_id, price)

        # 缓存最新价格
        if self.redis_client:
            cache_key = f"jd:price:{product_id}"
            self.redis_client.setex(
                cache_key, 
                3600,  # 1小时
                json.dumps({
                    'price': price,
                    'timestamp': int(time.time())
                })
            )
    except Exception as e:
        self.logger.error(f"保存价格记录失败: {e}")

def check_price_change(self, product_id: str, current_price: float) -> Dict:
    """检查价格变化"""
    try:
        from services.price_service import PriceService
        price_service = PriceService()

        # 获取历史价格
        history = price_service.get_price_history(product_id, days=7)
        if not history:
            return {'change': 0.0, 'type': 'new'}

        # 计算价格变化
        last_price = history[-1].price
        change = current_price - last_price
        change_percent = (change / last_price) * 100 if last_price > 0 else 0.0

        # 检查是否触发预警
        if change_percent < -10:  # 降价超过10%
            self.trigger_price_alert(product_id, current_price, change_percent)

        return {
            'change': change_percent,
            'type': 'drop' if change_percent < 0 else 'rise',
            'last_price': last_price
        }

    except Exception as e:
        self.logger.error(f"检查价格变化失败: {e}")
        return {'change': 0.0, 'type': 'unknown'}

def trigger_price_alert(self, product_id: str, price: float, change_percent: float):
    """触发价格预警"""
    try:
        from services.notification_service import NotificationService
        notification_service = NotificationService()

        # 获取商品信息
        from services.price_service import PriceService
        price_service = PriceService()
        product = price_service.get_product_info(product_id)

        if product:
            message = f"🚨 价格预警!\n商品:{product.title[:30]}...\n当前价格:¥{price}\n降价幅度:{abs(change_percent):.1f}%"
            notification_service.send_price_alert(product_id, message)
    except Exception as e:
        self.logger.error(f"发送价格预警失败: {e}")

3.2 Flask Web应用
3.2.1 主应用

web/app.py

from flask import Flask, request, jsonify, render_template
from flask_restful import Api
from flask_apscheduler import APScheduler
import logging
from utils.config import Config
from utils.database import db, init_db
from services.wechat_service import WeChatService
from services.price_service import PriceService
from services.notification_service import NotificationService

配置日志

logging.basicConfig(
level=getattr(logging, Config.LOG_LEVEL),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(Config.LOG_FILE),
logging.StreamHandler()
]
)

app = Flask(name)
app.config.from_object(Config)

初始化数据库

init_db(app)

初始化API

api = Api(app)

初始化定时任务

scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()

初始化服务

wechat_service = WeChatService()
price_service = PriceService()
notification_service = NotificationService()

注册路由

from web.routes.wechat_routes import wechat_bp
from web.routes.price_routes import price_bp
from web.routes.product_routes import product_bp

app.register_blueprint(wechat_bp, url_prefix='/wechat')
app.register_blueprint(price_bp, url_prefix='/price')
app.register_blueprint(product_bp, url_prefix='/product')

@app.route('/')
def index():
"""首页"""
return render_template('index.html')

@app.route('/health')
def health_check():
"""健康检查"""
return jsonify({'status': 'healthy', 'timestamp': int(time.time())})

@app.route('/search')
def search_products():
"""搜索商品"""
keyword = request.args.get('keyword', '')
if not keyword:
return jsonify({'error': '请输入搜索关键词'}), 400

try:
    results = price_service.search_products(keyword)
    return jsonify({
        'keyword': keyword,
        'results': results,
        'count': len(results)
    })
except Exception as e:
    logging.error(f"搜索失败: {e}")
    return jsonify({'error': '搜索失败'}), 500

定时任务

@scheduler.task('interval', id='monitor_prices', hours=1)
def monitor_prices():
"""定时监控价格"""
try:
from spiders.price_monitor import JDPriceMonitor
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings

    # 启动价格监控爬虫
    settings = get_project_settings()
    process = CrawlerProcess(settings)
    process.crawl(JDPriceMonitor)
    process.start()

    logging.info("价格监控任务执行完成")
except Exception as e:
    logging.error(f"价格监控任务失败: {e}")

if name == 'main':
app.run(host='0.0.0.0', port=5000, debug=Config.DEBUG)
3.2.2 微信路由

web/routes/wechat_routes.py

from flask import Blueprint, request, jsonify
import re
from services.wechat_service import WeChatService
from services.price_service import PriceService
import logging

wechat_bp = Blueprint('wechat', name)
wechat_service = WeChatService()
price_service = PriceService()

@wechat_bp.route('/message', methods=['POST'])
def handle_wechat_message():
"""处理微信消息"""
try:
data = request.json
if not data:
return jsonify({'error': '无效的请求数据'}), 400

    message_type = data.get('type', '')
    content = data.get('content', '')
    user_id = data.get('user_id', '')

    # 验证用户权限
    if not wechat_service.validate_user(user_id):
        return jsonify({'error': '用户无权限'}), 403

    # 处理不同类型的消息
    if message_type == 'text':
        response = handle_text_message(content, user_id)
    elif message_type == 'image':
        response = handle_image_message(content, user_id)
    else:
        response = {'type': 'text', 'content': '暂不支持此类型消息'}

    return jsonify(response)

except Exception as e:
    logging.error(f"处理微信消息失败: {e}")
    return jsonify({'error': '处理消息失败'}), 500

def handle_text_message(content: str, user_id: str) -> dict:
"""处理文本消息"""

# 帮助命令
if content in ['帮助', 'help', '?']:
    return {
        'type': 'text',
        'content': get_help_message()
    }

# 搜索商品
if content.startswith('搜索 ') or content.startswith('search '):
    keyword = content.replace('搜索 ', '').replace('search ', '')
    return search_products(keyword, user_id)

# 查询价格
if content.startswith('价格 ') or content.startswith('price '):
    product_id = content.replace('价格 ', '').replace('price ', '')
    return get_product_price(product_id, user_id)

# 设置预警
if content.startswith('预警 ') or content.startswith('alert '):
    parts = content.split(' ')
    if len(parts) >= 3:
        product_id = parts[1]
        target_price = float(parts[2])
        return set_price_alert(product_id, target_price, user_id)

# 查询历史价格
if content.startswith('历史 ') or content.startswith('history '):
    product_id = content.replace('历史 ', '').replace('history ', '')
    return get_price_history(product_id, user_id)

# 收藏商品
if content.startswith('收藏 ') or content.startswith('fav '):
    product_id = content.replace('收藏 ', '').replace('fav ', '')
    return add_to_favorites(product_id, user_id)

# 查看收藏
if content in ['收藏列表', 'favorites']:
    return get_favorites(user_id)

# 默认回复
return {
    'type': 'text',
    'content': '请输入"帮助"查看可用命令'
}

def get_help_message() -> str:
"""获取帮助信息"""
return """
🤖 京东比价机器人使用指南:

🔍 搜索商品:
搜索 iPhone 15
search 笔记本电脑

💰 查询价格:
价格 100012345678
price 100012345678

📊 历史价格:
历史 100012345678
history 100012345678

🔔 价格预警:
预警 100012345678 5999
alert 100012345678 5999

⭐ 收藏管理:
收藏 100012345678
收藏列表

📈 价格趋势图:
趋势 100012345678
trend 100012345678
"""

def search_products(keyword: str, user_id: str) -> dict:
"""搜索商品"""
try:
results = price_service.search_products(keyword, limit=5)

    if not results:
        return {
            'type': 'text',
            'content': f'未找到与"{keyword}"相关的商品'
        }

    # 构建回复消息
    message = f"🔍 搜索“{keyword}”结果:\n\n"
    for i, product in enumerate(results, 1):
        message += f"{i}. {product['title'][:30]}...\n"
        message += f"   💰 ¥{product['price']} | 👑 {product['shop']}\n"
        message += f"   📞 评论: {product['comment_count']}\n"
        message += f"   🔗 ID: {product['product_id']}\n\n"

    message += "输入“价格 ID”查看详情,如:价格 100012345678"

    return {
        'type': 'text',
        'content': message
    }

except Exception as e:
    logging.error(f"搜索商品失败: {e}")
    return {
        'type': 'text',
        'content': '搜索失败,请稍后重试'
    }

def get_product_price(product_id: str, user_id: str) -> dict:
"""获取商品价格"""
try:
product = price_service.get_product_info(product_id)
if not product:
return {
'type': 'text',
'content': f'未找到商品 ID: {product_id}'
}

    # 获取当前价格
    current_price = price_service.get_current_price(product_id)

    # 获取价格变化
    price_change = price_service.get_price_change(product_id, days=7)

    message = f"📱 {product.title}\n\n"
    message += f"💰 当前价格: ¥{current_price}\n"

    if price_change:
        change_type = "📈" if price_change['change'] > 0 else "📉"
        message += f"{change_type} 7天变化: {price_change['change']:.1f}%\n"

    message += f"🏪 店铺: {product.shop}\n"
    message += f"📞 评论: {product.comment_count}\n\n"
    message += "输入“历史 ID”查看价格趋势"

    return {
        'type': 'text',
        'content': message
    }

except Exception as e:
    logging.error(f"获取商品价格失败: {e}")
    return {
        'type': 'text',
        'content': '获取价格失败,请稍后重试'
    }

def set_price_alert(product_id: str, target_price: float, user_id: str) -> dict:
"""设置价格预警"""
try:
success = price_service.set_price_alert(user_id, product_id, target_price)

    if success:
        return {
            'type': 'text',
            'content': f'✅ 预警设置成功!\n当价格 ≤ ¥{target_price} 时会通知您'
        }
    else:
        return {
            'type': 'text',
            'content': '预警设置失败,请检查商品ID'
        }

except Exception as e:
    logging.error(f"设置价格预警失败: {e}")
    return {
        'type': 'text',
        'content': '预警设置失败,请稍后重试'
    }

3.3 核心服务
3.3.1 微信服务

services/wechat_service.py

import itchat
import qrcode
import os
import threading
import time
from typing import Dict, List, Optional
from utils.config import Config
import logging

class WeChatService:
"""微信服务"""

def __init__(self):
    self.is_logged_in = False
    self.friends = {}
    self.msg_handler = None
    self.login_thread = None
    self.qr_path = Config.WECHAT_QR_CODE_PATH

def login(self) -> bool:
    """登录微信"""
    try:
        # 创建二维码
        qr = qrcode.QRCode(version=1, box_size=10, border=5)
        qr.add_data('https://login.weixin.qq.com/')
        qr.make(fit=True)

        img = qr.make_image(fill_color="black", back_color="white")
        img.save(self.qr_path)
        print(f"请扫描二维码登录微信: {self.qr_path}")

        # 登录
        itchat.auto_login(
            hotReload=Config.WECHAT_AUTO_LOGIN,
            picDir=self.qr_path
        )

        self.is_logged_in = True
        self.friends = itchat.get_friends(update=True)

        # 启动消息监听
        threading.Thread(target=self.start_listening, daemon=True).start()

        logging.info("微信登录成功")
        return True

    except Exception as e:
        logging.error(f"微信登录失败: {e}")
        return False

def start_listening(self):
    """启动消息监听"""
    @itchat.msg_register(itchat.content.TEXT)
    def handle_text_message(msg):
        try:
            user_id = msg['FromUserName']
            content = msg['Text']

            # 转发到Flask处理
            if self.msg_handler:
                response = self.msg_handler(user_id, content)
                if response:
                    self.send_message(user_id, response)

        except Exception as e:
            logging.error(f"处理微信消息失败: {e}")

    itchat.run()

def send_message(self, user_id: str, message: str) -> bool:
    """发送微信消息"""
    try:
        itchat.send(message, toUserName=user_id)
        return True
    except Exception as e:
        logging.error(f"发送微信消息失败: {e}")
        return False

def validate_user(self, user_id: str) -> bool:
    """验证用户权限"""
    if not Config.USER_WHITELIST:
        return True

    # 检查用户是否在白名单中
    for friend in self.friends:
        if friend['UserName'] == user_id:
            return friend['NickName'] in Config.USER_WHITELIST

    return False

def get_friend_info(self, user_id: str) -> Optional[Dict]:
    """获取好友信息"""
    for friend in self.friends:
        if friend['UserName'] == user_id:
            return {
                'nickname': friend['NickName'],
                'remark_name': friend['RemarkName'],
                'sex': friend['Sex']
            }
    return None

def set_message_handler(self, handler):
    """设置消息处理器"""
    self.msg_handler = handler

3.3.2 价格服务

services/price_service.py

from typing import Dict, List, Optional
from datetime import datetime, timedelta
from utils.database import db
from web.models.product_model import Product, PriceHistory, PriceAlert
import logging
import json
from utils.cache import get_redis_client

class PriceService:
"""价格服务"""

def __init__(self):
    self.redis_client = get_redis_client()

def search_products(self, keyword: str, limit: int = 10) -> List[Dict]:
    """搜索商品"""
    try:
        # 检查缓存
        cache_key = f"jd:search:{keyword}"
        cached = self.redis_client.get(cache_key)
        if cached:
            return json.loads(cached)

        # 调用爬虫搜索
        from spiders.jd_spider import JDSpider
        from scrapy.crawler import CrawlerProcess
        from scrapy.utils.project import get_project_settings

        settings = get_project_settings()
        process = CrawlerProcess(settings)

        results = []
        def collect_results(item):
            results.extend(item.get('results', []))

        process.crawl(JDSpider, keyword=keyword, max_results=limit)
        process.start()

        # 缓存结果
        if results:
            self.redis_client.setex(cache_key, 1800, json.dumps(results))  # 30分钟

        return results

    except Exception as e:
        logging.error(f"搜索商品失败: {e}")
        return []

def get_product_info(self, product_id: str) -> Optional[Product]:
    """获取商品信息"""
    try:
        product = Product.query.filter_by(product_id=product_id).first()
        if product:
            return product

        # 从京东获取商品信息
        from spiders.jd_price_spider import JDPriceSpider
        from scrapy.crawler import CrawlerProcess
        from scrapy.utils.project import get_project_settings

        settings = get_project_settings()
        process = CrawlerProcess(settings)

        result = None
        def collect_result(item):
            nonlocal result
            result = item

        process.crawl(JDPriceSpider, product_ids=[product_id])
        process.start()

        if result:
            # 保存到数据库
            product = Product(
                product_id=product_id,
                title=result.get('title', ''),
                shop=result.get('shop', ''),
                comment_count=result.get('comment_count', 0),
                image_url=result.get('image_url', ''),
                link=result.get('link', '')
            )
            db.session.add(product)
            db.session.commit()
            return product

        return None

    except Exception as e:
        logging.error(f"获取商品信息失败: {e}")
        return None

def get_current_price(self, product_id: str) -> float:
    """获取当前价格"""
    try:
        # 检查缓存
        cache_key = f"jd:price:{product_id}"
        cached = self.redis_client.get(cache_key)
        if cached:
            data = json.loads(cached)
            return data.get('price', 0.0)

        # 从数据库获取最新价格
        price_record = PriceHistory.query.filter_by(
            product_id=product_id
        ).order_by(PriceHistory.timestamp.desc()).first()

        if price_record:
            # 更新缓存
            self.redis_client.setex(
                cache_key, 
                3600, 
                json.dumps({
                    'price': price_record.price,
                    'timestamp': int(price_record.timestamp.timestamp())
                })
            )
            return price_record.price

        return 0.0

    except Exception as e:
        logging.error(f"获取当前价格失败: {e}")
        return 0.0

def save_price_record(self, product_id: str, price: float) -> bool:
    """保存价格记录"""
    try:
        # 检查是否重复记录
        latest_record = PriceHistory.query.filter_by(
            product_id=product_id
        ).order_by(PriceHistory.timestamp.desc()).first()

        if latest_record and latest_record.price == price:
            # 价格未变化,不重复记录
            return True

        # 保存新记录
        price_record = PriceHistory(
            product_id=product_id,
            price=price,
            timestamp=datetime.now()
        )
        db.session.add(price_record)
        db.session.commit()

        logging.info(f"保存价格记录: {product_id} - ¥{price}")
        return True

    except Exception as e:
        logging.error(f"保存价格记录失败: {e}")
        db.session.rollback()
        return False

def get_price_history(self, product_id: str, days: int = 30) -> List[PriceHistory]:
    """获取价格历史"""
    try:
        start_date = datetime.now() - timedelta(days=days)
        return PriceHistory.query.filter(
            PriceHistory.product_id == product_id,
            PriceHistory.timestamp >= start_date
        ).order_by(PriceHistory.timestamp.asc()).all()

    except Exception as e:
        logging.error(f"获取价格历史失败: {e}")
        return []

def get_price_change(self, product_id: str, days: int = 7) -> Optional[Dict]:
    """获取价格变化"""
    try:
        history = self.get_price_history(product_id, days=days)
        if len(history) < 2:
            return None

        current_price = history[-1].price
        old_price = history[0].price

        change = current_price - old_price
        change_percent = (change / old_price) * 100 if old_price > 0 else 0.0

        return {
            'change': change_percent,
            'type': 'drop' if change_percent < 0 else 'rise',
            'current_price': current_price,
            'old_price': old_price
        }

    except Exception as e:
        logging.error(f"计算价格变化失败: {e}")
        return None

def set_price_alert(self, user_id: str, product_id: str, target_price: float) -> bool:
    """设置价格预警"""
    try:
        # 检查是否已存在
        existing_alert = PriceAlert.query.filter_by(
            user_id=user_id,
            product_id=product_id
        ).first()

        if existing_alert:
            existing_alert.target_price = target_price
            existing_alert.updated_at = datetime.now()
        else:
            alert = PriceAlert(
                user_id=user_id,
                product_id=product_id,
                target_price=target_price,
                created_at=datetime.now(),
                updated_at=datetime.now()
            )
            db.session.add(alert)

        db.session.commit()
        return True

    except Exception as e:
        logging.error(f"设置价格预警失败: {e}")
        db.session.rollback()
        return False

def get_monitored_products(self) -> List[Product]:
    """获取需要监控的商品"""
    try:
        # 获取所有设置了预警的商品
        alerts = PriceAlert.query.distinct(PriceAlert.product_id).all()
        product_ids = [alert.product_id for alert in alerts]

        return Product.query.filter(Product.product_id.in_(product_ids)).all()

    except Exception as e:
        logging.error(f"获取监控商品失败: {e}")
        return []

3.3.3 通知服务

services/notification_service.py

from typing import Dict, List
from utils.database import db
from web.models.product_model import PriceAlert
from services.wechat_service import WeChatService
import logging

class NotificationService:
"""通知服务"""

def __init__(self):
    self.wechat_service = WeChatService()

def send_price_alert(self, product_id: str, message: str) -> bool:
    """发送价格预警"""
    try:
        # 获取设置了预警的用户
        alerts = PriceAlert.query.filter_by(product_id=product_id).all()

        for alert in alerts:
            try:
                # 检查当前价格是否达到目标价
                from services.price_service import PriceService
                price_service = PriceService()
                current_price = price_service.get_current_price(product_id)

                if current_price <= alert.target_price:
                    # 发送通知
                    self.wechat_service.send_message(alert.user_id, message)
                    logging.info(f"发送价格预警给用户 {alert.user_id}: {message}")
            except Exception as e:
                logging.error(f"发送预警给用户 {alert.user_id} 失败: {e}")
                continue

        return True

    except Exception as e:
        logging.error(f"发送价格预警失败: {e}")
        return False

def send_daily_summary(self, user_id: str) -> bool:
    """发送每日摘要"""
    try:
        # 获取用户关注商品的价格变化
        alerts = PriceAlert.query.filter_by(user_id=user_id).all()

        if not alerts:
            return True

        message = "📊 今日价格变化摘要:\n\n"
        has_changes = False

        for alert in alerts:
            from services.price_service import PriceService
            price_service = PriceService()

            # 获取24小时价格变化
            price_change = price_service.get_price_change(alert.product_id, days=1)
            if price_change and abs(price_change['change']) > 1.0:  # 变化超过1%
                has_changes = True
                change_icon = "📈" if price_change['change'] > 0 else "📉"
                message += f"{change_icon} {alert.product.title[:20]}...\n"
                message += f"   变化: {price_change['change']:.1f}% | 当前: ¥{price_change['current_price']}\n\n"

        if has_changes:
            self.wechat_service.send_message(user_id, message)

        return True

    except Exception as e:
        logging.error(f"发送每日摘要失败: {e}")
        return False

3.4 数据模型

web/models/product_model.py

from utils.database import db
from datetime import datetime

class Product(db.Model):
"""商品模型"""
tablename = 'products'

id = db.Column(db.Integer, primary_key=True)
product_id = db.Column(db.String(20), unique=True, nullable=False)  # 京东商品ID
title = db.Column(db.Text, nullable=False)  # 商品标题
shop = db.Column(db.String(200), nullable=False)  # 店铺名称
comment_count = db.Column(db.Integer, default=0)  # 评论数
image_url = db.Column(db.Text)  # 图片URL
link = db.Column(db.Text)  # 商品链接
created_at = db.Column(db.DateTime, default=datetime.now)
updated_at = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now)

def __repr__(self):
    return f'<Product {self.product_id}: {self.title[:20]}>'

class PriceHistory(db.Model):
"""价格历史模型"""
tablename = 'price_history'

id = db.Column(db.Integer, primary_key=True)
product_id = db.Column(db.String(20), db.ForeignKey('products.product_id'), nullable=False)
price = db.Column(db.Float, nullable=False)  # 价格
timestamp = db.Column(db.DateTime, nullable=False)  # 记录时间

# 关系
product = db.relationship('Product', backref=db.backref('price_history', lazy=True))

def __repr__(self):
    return f'<PriceHistory {self.product_id}: ¥{self.price} at {self.timestamp}>'

class PriceAlert(db.Model):
"""价格预警模型"""
tablename = 'price_alerts'

id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.String(100), nullable=False)  # 微信用户ID
product_id = db.Column(db.String(20), db.ForeignKey('products.product_id'), nullable=False)
target_price = db.Column(db.Float, nullable=False)  # 目标价格
created_at = db.Column(db.DateTime, default=datetime.now)
updated_at = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now)

# 关系
product = db.relationship('Product', backref=db.backref('price_alerts', lazy=True))

def __repr__(self):
    return f'<PriceAlert {self.user_id} for {self.product_id}: ¥{self.target_price}>'

四、部署配置
4.1 Docker配置

docker-compose.yml

version: '3.8'

services:
web:
build: .
ports:

  - "5000:5000"
environment:
  - DEBUG=False
  - MYSQL_HOST=mysql
  - REDIS_URL=redis://redis:6379/0
depends_on:
  - mysql
  - redis
volumes:
  - ./logs:/app/logs
  - ./qr_code.png:/app/qr_code.png
restart: unless-stopped

mysql:
image: mysql:8.0
environment:

  - MYSQL_ROOT_PASSWORD=password
  - MYSQL_DATABASE=jd_price_bot
volumes:
  - mysql_data:/var/lib/mysql
restart: unless-stopped

redis:
image: redis:7-alpine
restart: unless-stopped

nginx:
image: nginx:alpine
ports:

  - "80:80"
  - "443:443"
volumes:
  - ./nginx.conf:/etc/nginx/nginx.conf
  - ./ssl:/etc/nginx/ssl
depends_on:
  - web
restart: unless-stopped

volumes:
mysql_data:
4.2 Dockerfile
FROM python:3.11-slim

WORKDIR /app

安装系统依赖

RUN apt-get update && apt-get install -y \
gcc \
g++ \
wget \
gnupg \
unzip \
&& rm -rf /var/lib/apt/lists/*

安装Chrome

RUN wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - \
&& sh -c 'echo "deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google.list' \
&& apt-get update \
&& apt-get install -y google-chrome-stable

安装ChromeDriver

RUN CHROMEDRIVER_VERSION=curl -sS chromedriver.storage.googleapis.com/LATEST_RELEASE \
&& wget -O /tmp/chromedriver.zip http://chromedriver.storage.googleapis.com/$CHROMEDRIVER_VERSION/chromedriver_linux64.zip \
&& unzip /tmp/chromedriver.zip -d /usr/local/bin/ \
&& rm /tmp/chromedriver.zip

复制依赖文件

COPY requirements.txt .

安装Python依赖

RUN pip install --no-cache-dir -r requirements.txt

复制应用代码

COPY . .

创建日志目录

RUN mkdir -p logs

暴露端口

EXPOSE 5000

启动命令

CMD ["python", "web/app.py"]
4.3 Nginx配置

nginx.conf

events {
worker_connections 1024;
}

http {
upstream jd_price_bot {
server web:5000;
}

server {
    listen 80;
    server_name your-domain.com;

    # 重定向到HTTPS
    return 301 https://$server_name$request_uri;
}

server {
    listen 443 ssl;
    server_name your-domain.com;

    ssl_certificate /etc/nginx/ssl/cert.pem;
    ssl_certificate_key /etc/nginx/ssl/private.key;

    ssl_protocols TLSv1.2 TLSv1.3;
    ssl_ciphers HIGH:!aNULL:!MD5;

    location / {
        proxy_pass http://jd_price_bot;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }

    # WebSocket支持
    location /websocket {
        proxy_pass http://jd_price_bot;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection "upgrade";
    }
}

}
五、使用指南
5.1 启动应用

1. 克隆项目

git clone https://github.com/your-username/jd-price-bot.git
cd jd-price-bot

2. 配置环境变量

cp .env.example .env

编辑.env文件,配置数据库、微信等参数

3. 启动服务

docker-compose up -d

4. 初始化数据库

docker-compose exec web python -c "from utils.database import db, init_db; from web.app import app; init_db(app); db.create_all()"

5. 扫描微信二维码登录

查看生成的二维码文件

open qr_code.png
5.2 微信交互示例
用户:帮助
机器人:
🤖 京东比价机器人使用指南:

🔍 搜索商品:
搜索 iPhone 15
search 笔记本电脑

💰 查询价格:
价格 100012345678
price 100012345678

📊 历史价格:
历史 100012345678
history 100012345678

🔔 价格预警:
预警 100012345678 5999
alert 100012345678 5999

⭐ 收藏管理:
收藏 100012345678
收藏列表

📈 价格趋势图:
趋势 100012345678
trend 100012345678

用户:搜索 iPhone 15
机器人:
🔍 搜索“iPhone 15”结果:

  1. Apple iPhone 15 (A3092) 128GB 粉色 支持移动联通电信5G...
    💰 ¥4999 | 👑 Apple产品京东自营旗舰店
    📞 评论: 200000
    🔗 ID: 100012345678

  2. Apple iPhone 15 (A3092) 256GB 蓝色 支持移动联通电信5G...
    💰 ¥5999 | 👑 Apple产品京东自营旗舰店
    📞 评论: 150000
    🔗 ID: 100012345679

输入“价格 ID”查看详情,如:价格 100012345678

用户:价格 100012345678
机器人:
📱 Apple iPhone 15 (A3092) 128GB 粉色 支持移动联通电信5G...

💰 当前价格: ¥4999
📉 7天变化: -5.2%
🏪 店铺: Apple产品京东自营旗舰店
📞 评论: 200000

输入“历史 ID”查看价格趋势

用户:预警 100012345678 4500
机器人:
✅ 预警设置成功!
当价格 ≤ ¥4500 时会通知您
六、项目特点
6.1 技术亮点
智能爬虫:结合Scrapy+Selenium,支持动态页面渲染
实时监控:定时任务自动监控价格变化
微信集成:无缝对接微信,支持多种交互方式
数据可视化:生成价格趋势图,提供购买建议
高性能缓存:Redis缓存提升查询性能
6.2 应用场景
个人购物:实时比价,抓住最佳购买时机
电商运营:监控竞品价格,制定定价策略
数据分析:分析商品价格波动规律
智能推荐:基于价格趋势推荐购买时间
6.3 注意事项
合规使用:遵守京东Robots协议,控制爬取频率
用户隐私:妥善保管用户微信信息
API限制:合理使用第三方服务,避免被封禁
数据安全:加密存储敏感信息
通过本项目,你可以:
✅ 掌握Scrapy爬虫开发技巧
✅ 实现Flask RESTful API开发
✅ 构建微信机器人应用
✅ 设计高性能数据存储方案
✅ 部署完整的Web应用系统
项目代码已包含完整的错误处理、日志记录、性能优化,可直接用于生产环境或作为学习参考。

相关文章
|
2月前
|
缓存 监控 前端开发
淘宝商品详情页前端性能优化实战
本方案针对淘宝商品详情页性能瓶颈,系统性优化图片加载(懒加载+WebP/AVIF格式)、实施SSR服务端渲染、强化CDN分发与缓存策略,并引入资源预加载与性能监控。优化后LCP提升67%(8.5s→2.8s),总资源减67%(18.7MB→6.2MB),CLS大幅改善,转化率↑18%,收入↑15%。(239字)
隐藏el-table-column过多的内容并进行浮窗展示
隐藏el-table-column过多的内容并进行浮窗展示
隐藏el-table-column过多的内容并进行浮窗展示
|
2月前
|
边缘计算 监控 前端开发
亚马逊商品详情页前端性能优化实战
本文系统剖析亚马逊详情页性能挑战,涵盖页面复杂性、实时性与国际化等难点,并提出关键渲染路径优化、智能图片加载、分层数据获取、微前端架构及边缘计算等六大策略。优化后首屏加载提速60%,LCP改善63%,转化率提升8.5%,为大型电商性能优化提供实战范本。(239字)
|
2月前
|
存储 自然语言处理 搜索推荐
RAG 应用 —— 解锁大模型在各行业的落地场景与价值
RAG(检索增强生成)技术通过实时接入行业知识库,有效解决大模型知识过时、易幻觉、难适配等痛点,已在金融、医疗、教育、法律、电商五大领域规模化落地,显著提升准确性、合规性与响应效率,成为大模型行业应用的首选路径。(239字)
|
2月前
|
人工智能 运维 监控
宕机智能诊断利器来了,助你告别 Linux 宕机分析“三座大山”
本文将介绍阿里云操作系统控制台的宕机智能诊断功能,并展示其如何通过 AI 技术简化宕机分析流程。
|
2月前
|
Prometheus 监控 Cloud Native
Prometheus+Grafana:一站式搞定监控告警全链路
本文详解Prometheus+Grafana监控体系:从核心原理(时序数据、4类指标、Pull采集、PromQL)到完整实战,涵盖服务器、Spring Boot应用监控搭建、告警配置与生产优化,助你构建实时、可视化、可告警的分布式系统“生命线”。
567 4
|
2月前
|
机器学习/深度学习 监控 物联网
微调常见术语:入门大模型微调必须掌握的概念
本文系统梳理大模型微调核心术语:从预训练、微调、全参数微调,到LoRA、QLoRA、Adapter等参数高效方法;涵盖学习率、Warmup、Batch Size、过拟合、KL散度、Reward Model等关键概念,助初学者快速构建完整知识体系。
|
8月前
|
JSON Java API
【干货满满】分享京东API接口到手价,用Java语言实现
本示例使用 Java 调用京东开放平台商品价格及优惠信息 API,通过商品详情和促销接口获取到手价(含优惠券、满减等),包含签名生成、HTTP 请求及响应解析逻辑,适用于比价工具、电商系统集成等场景。
|
6月前
|
存储 数据采集 搜索推荐
Python+淘宝API:3步爬取10万条商品评论(附反爬破解技巧)
本文介绍淘宝商品评论爬取技术,涵盖环境配置、接口分析、反爬破解及数据存储。使用Python模拟请求,动态代理与签名绕过风控,结合Flask中转降低封禁风险,实现高效合规的数据采集,适用于竞品分析与用户画像构建。(238字)
1089 1
|
8月前
|
监控 API 数据安全/隐私保护
深度分析当当API接口,用Python脚本实现
当当网开放平台提供商品、库存、订单、促销等RESTful API,支持商品查询与管理,适用于比价工具、图书信息聚合等场景。采用appkey+签名认证,Python示例展示商品搜索与详情调用实现。

热门文章

最新文章