一、项目概述
1.1 项目简介
本项目将构建一个智能微信比价机器人,能够:
✅ 实时抓取:用Scrapy抓取京东商品价格信息
✅ 智能比价:分析商品历史价格趋势
✅ 微信交互:通过Flask接收用户查询并返回结果
✅ 定时监控:自动监控商品价格变化
✅ 价格预警:低于目标价格时自动提醒
1.2 技术栈
爬虫框架:Scrapy + Selenium(动态渲染)
Web框架:Flask + RESTful API
微信集成:itchat / Wechaty
数据存储:Redis + MySQL
任务调度:APScheduler
部署运维:Docker + Nginx
1.3 核心功能
🛍️ 商品搜索:通过微信发送商品名,返回比价结果
📊 价格趋势:展示商品30天价格走势
🔔 降价提醒:设置目标价,降价自动通知
⭐ 收藏管理:收藏关注商品,一键查询
📈 数据分析:智能推荐最佳购买时机
二、环境配置
2.1 项目结构
jd_price_bot/
├── spiders/ # Scrapy爬虫
│ ├── jd_spider.py # 京东商品爬虫
│ ├── price_monitor.py # 价格监控爬虫
│ └── middlewares.py # 中间件
├── web/ # Flask应用
│ ├── app.py # Flask主应用
│ ├── routes/ # 路由模块
│ ├── models/ # 数据模型
│ └── templates/ # 模板文件
├── services/ # 业务服务
│ ├── wechat_service.py # 微信服务
│ ├── price_service.py # 价格服务
│ └── notification_service.py # 通知服务
├── utils/ # 工具类
│ ├── database.py # 数据库连接
│ ├── cache.py # 缓存工具
│ └── config.py # 配置管理
├── requirements.txt # 依赖包
├── docker-compose.yml # Docker配置
└── README.md
2.2 环境配置
requirements.txt
Web框架
Flask==2.3.3
flask-restful==0.3.10
flask-sqlalchemy==3.0.5
flask-apscheduler==1.12.4
爬虫相关
Scrapy==2.11.0
selenium==4.15.0
requests==2.31.0
beautifulsoup4==4.12.2
lxml==4.9.3
微信相关
itchat==1.3.10
wechaty==0.8.21
数据库
redis==5.0.1
mysql-connector-python==8.1.0
sqlalchemy==2.0.23
数据处理
pandas==2.1.3
numpy==1.25.2
matplotlib==3.7.2
工具类
python-dotenv==1.0.0
APScheduler==3.10.4
schedule==1.2.0
2.3 配置文件
utils/config.py
import os
from dotenv import load_dotenv
load_dotenv()
class Config:
# 基础配置
SECRET_KEY = os.getenv('SECRET_KEY', 'jd-price-bot-secret-key')
DEBUG = os.getenv('DEBUG', 'False').lower() == 'true'
# 数据库配置
REDIS_URL = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
MYSQL_HOST = os.getenv('MYSQL_HOST', 'localhost')
MYSQL_PORT = os.getenv('MYSQL_PORT', '3306')
MYSQL_USER = os.getenv('MYSQL_USER', 'root')
MYSQL_PASSWORD = os.getenv('MYSQL_PASSWORD', 'password')
MYSQL_DATABASE = os.getenv('MYSQL_DATABASE', 'jd_price_bot')
# 微信配置
WECHAT_QR_CODE_PATH = os.getenv('WECHAT_QR_CODE_PATH', './qr_code.png')
WECHAT_AUTO_LOGIN = os.getenv('WECHAT_AUTO_LOGIN', 'True').lower() == 'true'
# 爬虫配置
JD_SEARCH_URL = 'https://search.jd.com/Search'
JD_PRODUCT_URL = 'https://item.jd.com/{}.html'
SPIDER_DELAY = int(os.getenv('SPIDER_DELAY', '2'))
MAX_PRODUCTS_PER_SEARCH = int(os.getenv('MAX_PRODUCTS_PER_SEARCH', '20'))
# 价格监控配置
PRICE_CHECK_INTERVAL = int(os.getenv('PRICE_CHECK_INTERVAL', '3600')) # 1小时
PRICE_HISTORY_DAYS = int(os.getenv('PRICE_HISTORY_DAYS', '30'))
# 通知配置
PRICE_DROP_THRESHOLD = float(os.getenv('PRICE_DROP_THRESHOLD', '0.1')) # 10%降价
ENABLE_PRICE_ALERT = os.getenv('ENABLE_PRICE_ALERT', 'True').lower() == 'true'
# 安全配置
USER_WHITELIST = os.getenv('USER_WHITELIST', '').split(',') # 用户白名单
MAX_REQUESTS_PER_MINUTE = int(os.getenv('MAX_REQUESTS_PER_MINUTE', '30'))
# 日志配置
LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO')
LOG_FILE = os.getenv('LOG_FILE', './logs/jd_bot.log')
三、核心模块实现
3.1 京东爬虫实现(Scrapy)
3.1.1 商品搜索爬虫
spiders/jd_spider.py
import scrapy
import json
import re
from urllib.parse import quote, urlencode
from typing import Dict, List, Optional
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.common.exceptions import TimeoutException
import time
class JDSpider(scrapy.Spider):
name = 'jd_spider'
allowed_domains = ['jd.com', 'search.jd.com']
def __init__(self, keyword: str = None, max_results: int = 20, *args, **kwargs):
super().__init__(*args, **kwargs)
self.keyword = keyword
self.max_results = max_results
self.driver = None
self.setup_driver()
def setup_driver(self):
"""设置Selenium WebDriver"""
options = webdriver.ChromeOptions()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36')
self.driver = webdriver.Chrome(options=options)
self.driver.implicitly_wait(10)
def start_requests(self):
"""开始请求"""
if self.keyword:
url = self.build_search_url(self.keyword)
yield scrapy.Request(url=url, callback=self.parse_search_results)
def build_search_url(self, keyword: str) -> str:
"""构建搜索URL"""
params = {
'keyword': keyword,
'enc': 'utf-8',
'wq': keyword,
'pvid': self.generate_pvid()
}
return f"https://search.jd.com/Search?{urlencode(params)}"
def generate_pvid(self) -> str:
"""生成随机pvid"""
import uuid
return str(uuid.uuid4()).replace('-', '')[:16]
def parse_search_results(self, response):
"""解析搜索结果页"""
try:
# 等待页面加载完成
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, ".gl-item"))
)
# 获取商品列表
products = self.driver.find_elements(By.CSS_SELECTOR, ".gl-item")
results = []
for i, product in enumerate(products[:self.max_results]):
try:
product_info = self.extract_product_info(product)
if product_info:
results.append(product_info)
self.logger.info(f"提取商品: {product_info['title'][:30]}")
except Exception as e:
self.logger.error(f"提取商品信息失败: {e}")
continue
# 返回结果
yield {
'keyword': self.keyword,
'results': results,
'count': len(results)
}
except TimeoutException:
self.logger.error("搜索页面加载超时")
finally:
self.close_driver()
def extract_product_info(self, product_element) -> Optional[Dict]:
"""提取商品信息"""
try:
# 商品ID
data_sku = product_element.get_attribute('data-sku')
if not data_sku:
return None
# 商品标题
title_element = product_element.find_element(By.CSS_SELECTOR, ".p-name a em")
title = title_element.text.strip() if title_element else ""
# 价格
price_element = product_element.find_element(By.CSS_SELECTOR, ".p-price strong i")
price = float(price_element.text) if price_element and price_element.text else 0.0
# 店铺
shop_element = product_element.find_element(By.CSS_SELECTOR, ".p-shop a")
shop = shop_element.text.strip() if shop_element else ""
# 评论数
comment_element = product_element.find_element(By.CSS_SELECTOR, ".p-commit a")
comment_text = comment_element.text.strip() if comment_element else ""
comment_count = self.extract_comment_count(comment_text)
# 商品链接
link_element = product_element.find_element(By.CSS_SELECTOR, ".p-name a")
link = link_element.get_attribute('href') if link_element else ""
# 图片
img_element = product_element.find_element(By.CSS_SELECTOR, ".p-img img")
image_url = img_element.get_attribute('src') or img_element.get_attribute('data-lazy-img')
return {
'product_id': data_sku,
'title': title,
'price': price,
'shop': shop,
'comment_count': comment_count,
'link': link,
'image_url': image_url,
'crawl_time': int(time.time())
}
except Exception as e:
self.logger.error(f"提取商品信息异常: {e}")
return None
def extract_comment_count(self, comment_text: str) -> int:
"""提取评论数量"""
if not comment_text:
return 0
# 匹配数字
match = re.search(r'(\d+(\.\d+)?)', comment_text.replace('+', '').replace('万', '0000'))
if match:
count = float(match.group(1))
if '万' in comment_text:
count *= 10000
return int(count)
return 0
def close_driver(self):
"""关闭浏览器"""
if self.driver:
self.driver.quit()
def closed(self, reason):
"""爬虫关闭时清理资源"""
self.close_driver()
super().closed(reason)
价格详情爬虫
class JDPriceSpider(scrapy.Spider):
name = 'jd_price_spider'
def __init__(self, product_ids: List[str] = None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.product_ids = product_ids or []
def start_requests(self):
"""开始请求"""
for product_id in self.product_ids:
url = f"https://item.jd.com/{product_id}.html"
yield scrapy.Request(
url=url,
callback=self.parse_product_page,
meta={'product_id': product_id}
)
def parse_product_page(self, response):
"""解析商品详情页"""
product_id = response.meta['product_id']
try:
# 提取价格信息
price_script = response.xpath('//script[contains(text(), "price:")]/text()').get()
price = self.extract_price_from_script(price_script)
# 提取促销信息
promotions = self.extract_promotions(response)
# 提取库存信息
stock = self.extract_stock_info(response)
yield {
'product_id': product_id,
'price': price,
'promotions': promotions,
'stock': stock,
'crawl_time': int(time.time())
}
except Exception as e:
self.logger.error(f"解析商品页面失败: {e}")
def extract_price_from_script(self, script_text: str) -> float:
"""从JavaScript中提取价格"""
if not script_text:
return 0.0
try:
# 匹配价格模式
price_patterns = [
r'"price"\s*:\s*"([\d.]+)"',
r"'price'\s*:\s*'([\d.]+)'",
r'price\s*=\s*([\d.]+)'
]
for pattern in price_patterns:
match = re.search(pattern, script_text)
if match:
return float(match.group(1))
return 0.0
except:
return 0.0
def extract_promotions(self, response) -> List[str]:
"""提取促销信息"""
promotions = []
# 提取优惠券
coupons = response.xpath('//div[contains(@class, "coupon-item")]//text()').getall()
coupons_text = ' '.join([c.strip() for c in coupons if c.strip()])
if coupons_text:
promotions.append(f"优惠券: {coupons_text}")
# 提取促销活动
promo_elements = response.xpath('//div[contains(@class, "promotion-item")]')
for promo in promo_elements:
promo_text = promo.xpath('.//text()').getall()
promo_text = ' '.join([p.strip() for p in promo_text if p.strip()])
if promo_text:
promotions.append(promo_text)
return promotions
def extract_stock_info(self, response) -> str:
"""提取库存信息"""
stock_text = response.xpath('//div[contains(@class, "store-prompt")]//text()').get()
if stock_text:
return stock_text.strip()
# 检查是否有货
buy_btn = response.xpath('//a[contains(@class, "btn-addtocart")]')
if buy_btn:
return "有货"
else:
return "无货"
3.1.2 价格监控爬虫
spiders/price_monitor.py
import scrapy
from scrapy import signals
import json
import time
from datetime import datetime, timedelta
from typing import Dict, List
class JDPriceMonitor(scrapy.Spider):
name = 'jd_price_monitor'
custom_settings = {
'CONCURRENT_REQUESTS': 1,
'DOWNLOAD_DELAY': 3,
'AUTOTHROTTLE_ENABLED': True
}
def __init__(self, product_ids: List[str] = None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.product_ids = product_ids or []
self.price_history = {}
self.redis_client = None
@classmethod
def from_crawler(cls, crawler, *args, **kwargs):
spider = super().from_crawler(crawler, *args, **kwargs)
crawler.signals.connect(spider.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(spider.spider_closed, signal=signals.spider_closed)
return spider
def spider_opened(self, spider):
"""爬虫启动时"""
from utils.cache import get_redis_client
self.redis_client = get_redis_client()
# 加载需要监控的商品
if not self.product_ids:
self.product_ids = self.load_monitored_products()
def spider_closed(self, spider):
"""爬虫关闭时"""
if self.redis_client:
self.redis_client.close()
def load_monitored_products(self) -> List[str]:
"""从数据库加载需要监控的商品"""
try:
from services.price_service import PriceService
price_service = PriceService()
products = price_service.get_monitored_products()
return [p.product_id for p in products]
except Exception as e:
self.logger.error(f"加载监控商品失败: {e}")
return []
def start_requests(self):
"""开始请求"""
for product_id in self.product_ids:
url = f"https://item.jd.com/{product_id}.html"
yield scrapy.Request(
url=url,
callback=self.parse_product_price,
meta={'product_id': product_id},
dont_filter=True
)
def parse_product_price(self, response):
"""解析商品价格"""
product_id = response.meta['product_id']
try:
# 提取价格
price = self.extract_price(response)
if price > 0:
# 保存价格记录
self.save_price_record(product_id, price)
# 检查价格变化
price_change = self.check_price_change(product_id, price)
yield {
'product_id': product_id,
'price': price,
'price_change': price_change,
'timestamp': int(time.time()),
'status': 'success'
}
else:
yield {
'product_id': product_id,
'price': 0.0,
'timestamp': int(time.time()),
'status': 'failed',
'error': '价格提取失败'
}
except Exception as e:
self.logger.error(f"价格监控失败: {e}")
yield {
'product_id': product_id,
'price': 0.0,
'timestamp': int(time.time()),
'status': 'failed',
'error': str(e)
}
def extract_price(self, response) -> float:
"""提取价格"""
# 多种方式提取价格
price_selectors = [
'//span[@class="price J-p-{}"]/text()',
'//strong[@class="J-p-{}"]/i/text()',
'//span[contains(@class, "price J-p-")]/text()',
'//script[contains(text(), "price:")]/text()'
]
for selector in price_selectors:
try:
price_text = response.xpath(selector).get()
if price_text:
# 清理价格文本
price_text = price_text.replace('¥', '').replace('¥', '').strip()
if price_text and price_text.replace('.', '').isdigit():
return float(price_text)
except:
continue
return 0.0
def save_price_record(self, product_id: str, price: float):
"""保存价格记录"""
try:
from services.price_service import PriceService
price_service = PriceService()
price_service.save_price_record(product_id, price)
# 缓存最新价格
if self.redis_client:
cache_key = f"jd:price:{product_id}"
self.redis_client.setex(
cache_key,
3600, # 1小时
json.dumps({
'price': price,
'timestamp': int(time.time())
})
)
except Exception as e:
self.logger.error(f"保存价格记录失败: {e}")
def check_price_change(self, product_id: str, current_price: float) -> Dict:
"""检查价格变化"""
try:
from services.price_service import PriceService
price_service = PriceService()
# 获取历史价格
history = price_service.get_price_history(product_id, days=7)
if not history:
return {'change': 0.0, 'type': 'new'}
# 计算价格变化
last_price = history[-1].price
change = current_price - last_price
change_percent = (change / last_price) * 100 if last_price > 0 else 0.0
# 检查是否触发预警
if change_percent < -10: # 降价超过10%
self.trigger_price_alert(product_id, current_price, change_percent)
return {
'change': change_percent,
'type': 'drop' if change_percent < 0 else 'rise',
'last_price': last_price
}
except Exception as e:
self.logger.error(f"检查价格变化失败: {e}")
return {'change': 0.0, 'type': 'unknown'}
def trigger_price_alert(self, product_id: str, price: float, change_percent: float):
"""触发价格预警"""
try:
from services.notification_service import NotificationService
notification_service = NotificationService()
# 获取商品信息
from services.price_service import PriceService
price_service = PriceService()
product = price_service.get_product_info(product_id)
if product:
message = f"🚨 价格预警!\n商品:{product.title[:30]}...\n当前价格:¥{price}\n降价幅度:{abs(change_percent):.1f}%"
notification_service.send_price_alert(product_id, message)
except Exception as e:
self.logger.error(f"发送价格预警失败: {e}")
3.2 Flask Web应用
3.2.1 主应用
web/app.py
from flask import Flask, request, jsonify, render_template
from flask_restful import Api
from flask_apscheduler import APScheduler
import logging
from utils.config import Config
from utils.database import db, init_db
from services.wechat_service import WeChatService
from services.price_service import PriceService
from services.notification_service import NotificationService
配置日志
logging.basicConfig(
level=getattr(logging, Config.LOG_LEVEL),
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(Config.LOG_FILE),
logging.StreamHandler()
]
)
app = Flask(name)
app.config.from_object(Config)
初始化数据库
init_db(app)
初始化API
api = Api(app)
初始化定时任务
scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()
初始化服务
wechat_service = WeChatService()
price_service = PriceService()
notification_service = NotificationService()
注册路由
from web.routes.wechat_routes import wechat_bp
from web.routes.price_routes import price_bp
from web.routes.product_routes import product_bp
app.register_blueprint(wechat_bp, url_prefix='/wechat')
app.register_blueprint(price_bp, url_prefix='/price')
app.register_blueprint(product_bp, url_prefix='/product')
@app.route('/')
def index():
"""首页"""
return render_template('index.html')
@app.route('/health')
def health_check():
"""健康检查"""
return jsonify({'status': 'healthy', 'timestamp': int(time.time())})
@app.route('/search')
def search_products():
"""搜索商品"""
keyword = request.args.get('keyword', '')
if not keyword:
return jsonify({'error': '请输入搜索关键词'}), 400
try:
results = price_service.search_products(keyword)
return jsonify({
'keyword': keyword,
'results': results,
'count': len(results)
})
except Exception as e:
logging.error(f"搜索失败: {e}")
return jsonify({'error': '搜索失败'}), 500
定时任务
@scheduler.task('interval', id='monitor_prices', hours=1)
def monitor_prices():
"""定时监控价格"""
try:
from spiders.price_monitor import JDPriceMonitor
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
# 启动价格监控爬虫
settings = get_project_settings()
process = CrawlerProcess(settings)
process.crawl(JDPriceMonitor)
process.start()
logging.info("价格监控任务执行完成")
except Exception as e:
logging.error(f"价格监控任务失败: {e}")
if name == 'main':
app.run(host='0.0.0.0', port=5000, debug=Config.DEBUG)
3.2.2 微信路由
web/routes/wechat_routes.py
from flask import Blueprint, request, jsonify
import re
from services.wechat_service import WeChatService
from services.price_service import PriceService
import logging
wechat_bp = Blueprint('wechat', name)
wechat_service = WeChatService()
price_service = PriceService()
@wechat_bp.route('/message', methods=['POST'])
def handle_wechat_message():
"""处理微信消息"""
try:
data = request.json
if not data:
return jsonify({'error': '无效的请求数据'}), 400
message_type = data.get('type', '')
content = data.get('content', '')
user_id = data.get('user_id', '')
# 验证用户权限
if not wechat_service.validate_user(user_id):
return jsonify({'error': '用户无权限'}), 403
# 处理不同类型的消息
if message_type == 'text':
response = handle_text_message(content, user_id)
elif message_type == 'image':
response = handle_image_message(content, user_id)
else:
response = {'type': 'text', 'content': '暂不支持此类型消息'}
return jsonify(response)
except Exception as e:
logging.error(f"处理微信消息失败: {e}")
return jsonify({'error': '处理消息失败'}), 500
def handle_text_message(content: str, user_id: str) -> dict:
"""处理文本消息"""
# 帮助命令
if content in ['帮助', 'help', '?']:
return {
'type': 'text',
'content': get_help_message()
}
# 搜索商品
if content.startswith('搜索 ') or content.startswith('search '):
keyword = content.replace('搜索 ', '').replace('search ', '')
return search_products(keyword, user_id)
# 查询价格
if content.startswith('价格 ') or content.startswith('price '):
product_id = content.replace('价格 ', '').replace('price ', '')
return get_product_price(product_id, user_id)
# 设置预警
if content.startswith('预警 ') or content.startswith('alert '):
parts = content.split(' ')
if len(parts) >= 3:
product_id = parts[1]
target_price = float(parts[2])
return set_price_alert(product_id, target_price, user_id)
# 查询历史价格
if content.startswith('历史 ') or content.startswith('history '):
product_id = content.replace('历史 ', '').replace('history ', '')
return get_price_history(product_id, user_id)
# 收藏商品
if content.startswith('收藏 ') or content.startswith('fav '):
product_id = content.replace('收藏 ', '').replace('fav ', '')
return add_to_favorites(product_id, user_id)
# 查看收藏
if content in ['收藏列表', 'favorites']:
return get_favorites(user_id)
# 默认回复
return {
'type': 'text',
'content': '请输入"帮助"查看可用命令'
}
def get_help_message() -> str:
"""获取帮助信息"""
return """
🤖 京东比价机器人使用指南:
🔍 搜索商品:
搜索 iPhone 15
search 笔记本电脑
💰 查询价格:
价格 100012345678
price 100012345678
📊 历史价格:
历史 100012345678
history 100012345678
🔔 价格预警:
预警 100012345678 5999
alert 100012345678 5999
⭐ 收藏管理:
收藏 100012345678
收藏列表
📈 价格趋势图:
趋势 100012345678
trend 100012345678
"""
def search_products(keyword: str, user_id: str) -> dict:
"""搜索商品"""
try:
results = price_service.search_products(keyword, limit=5)
if not results:
return {
'type': 'text',
'content': f'未找到与"{keyword}"相关的商品'
}
# 构建回复消息
message = f"🔍 搜索“{keyword}”结果:\n\n"
for i, product in enumerate(results, 1):
message += f"{i}. {product['title'][:30]}...\n"
message += f" 💰 ¥{product['price']} | 👑 {product['shop']}\n"
message += f" 📞 评论: {product['comment_count']}\n"
message += f" 🔗 ID: {product['product_id']}\n\n"
message += "输入“价格 ID”查看详情,如:价格 100012345678"
return {
'type': 'text',
'content': message
}
except Exception as e:
logging.error(f"搜索商品失败: {e}")
return {
'type': 'text',
'content': '搜索失败,请稍后重试'
}
def get_product_price(product_id: str, user_id: str) -> dict:
"""获取商品价格"""
try:
product = price_service.get_product_info(product_id)
if not product:
return {
'type': 'text',
'content': f'未找到商品 ID: {product_id}'
}
# 获取当前价格
current_price = price_service.get_current_price(product_id)
# 获取价格变化
price_change = price_service.get_price_change(product_id, days=7)
message = f"📱 {product.title}\n\n"
message += f"💰 当前价格: ¥{current_price}\n"
if price_change:
change_type = "📈" if price_change['change'] > 0 else "📉"
message += f"{change_type} 7天变化: {price_change['change']:.1f}%\n"
message += f"🏪 店铺: {product.shop}\n"
message += f"📞 评论: {product.comment_count}\n\n"
message += "输入“历史 ID”查看价格趋势"
return {
'type': 'text',
'content': message
}
except Exception as e:
logging.error(f"获取商品价格失败: {e}")
return {
'type': 'text',
'content': '获取价格失败,请稍后重试'
}
def set_price_alert(product_id: str, target_price: float, user_id: str) -> dict:
"""设置价格预警"""
try:
success = price_service.set_price_alert(user_id, product_id, target_price)
if success:
return {
'type': 'text',
'content': f'✅ 预警设置成功!\n当价格 ≤ ¥{target_price} 时会通知您'
}
else:
return {
'type': 'text',
'content': '预警设置失败,请检查商品ID'
}
except Exception as e:
logging.error(f"设置价格预警失败: {e}")
return {
'type': 'text',
'content': '预警设置失败,请稍后重试'
}
3.3 核心服务
3.3.1 微信服务
services/wechat_service.py
import itchat
import qrcode
import os
import threading
import time
from typing import Dict, List, Optional
from utils.config import Config
import logging
class WeChatService:
"""微信服务"""
def __init__(self):
self.is_logged_in = False
self.friends = {}
self.msg_handler = None
self.login_thread = None
self.qr_path = Config.WECHAT_QR_CODE_PATH
def login(self) -> bool:
"""登录微信"""
try:
# 创建二维码
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data('https://login.weixin.qq.com/')
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
img.save(self.qr_path)
print(f"请扫描二维码登录微信: {self.qr_path}")
# 登录
itchat.auto_login(
hotReload=Config.WECHAT_AUTO_LOGIN,
picDir=self.qr_path
)
self.is_logged_in = True
self.friends = itchat.get_friends(update=True)
# 启动消息监听
threading.Thread(target=self.start_listening, daemon=True).start()
logging.info("微信登录成功")
return True
except Exception as e:
logging.error(f"微信登录失败: {e}")
return False
def start_listening(self):
"""启动消息监听"""
@itchat.msg_register(itchat.content.TEXT)
def handle_text_message(msg):
try:
user_id = msg['FromUserName']
content = msg['Text']
# 转发到Flask处理
if self.msg_handler:
response = self.msg_handler(user_id, content)
if response:
self.send_message(user_id, response)
except Exception as e:
logging.error(f"处理微信消息失败: {e}")
itchat.run()
def send_message(self, user_id: str, message: str) -> bool:
"""发送微信消息"""
try:
itchat.send(message, toUserName=user_id)
return True
except Exception as e:
logging.error(f"发送微信消息失败: {e}")
return False
def validate_user(self, user_id: str) -> bool:
"""验证用户权限"""
if not Config.USER_WHITELIST:
return True
# 检查用户是否在白名单中
for friend in self.friends:
if friend['UserName'] == user_id:
return friend['NickName'] in Config.USER_WHITELIST
return False
def get_friend_info(self, user_id: str) -> Optional[Dict]:
"""获取好友信息"""
for friend in self.friends:
if friend['UserName'] == user_id:
return {
'nickname': friend['NickName'],
'remark_name': friend['RemarkName'],
'sex': friend['Sex']
}
return None
def set_message_handler(self, handler):
"""设置消息处理器"""
self.msg_handler = handler
3.3.2 价格服务
services/price_service.py
from typing import Dict, List, Optional
from datetime import datetime, timedelta
from utils.database import db
from web.models.product_model import Product, PriceHistory, PriceAlert
import logging
import json
from utils.cache import get_redis_client
class PriceService:
"""价格服务"""
def __init__(self):
self.redis_client = get_redis_client()
def search_products(self, keyword: str, limit: int = 10) -> List[Dict]:
"""搜索商品"""
try:
# 检查缓存
cache_key = f"jd:search:{keyword}"
cached = self.redis_client.get(cache_key)
if cached:
return json.loads(cached)
# 调用爬虫搜索
from spiders.jd_spider import JDSpider
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
settings = get_project_settings()
process = CrawlerProcess(settings)
results = []
def collect_results(item):
results.extend(item.get('results', []))
process.crawl(JDSpider, keyword=keyword, max_results=limit)
process.start()
# 缓存结果
if results:
self.redis_client.setex(cache_key, 1800, json.dumps(results)) # 30分钟
return results
except Exception as e:
logging.error(f"搜索商品失败: {e}")
return []
def get_product_info(self, product_id: str) -> Optional[Product]:
"""获取商品信息"""
try:
product = Product.query.filter_by(product_id=product_id).first()
if product:
return product
# 从京东获取商品信息
from spiders.jd_price_spider import JDPriceSpider
from scrapy.crawler import CrawlerProcess
from scrapy.utils.project import get_project_settings
settings = get_project_settings()
process = CrawlerProcess(settings)
result = None
def collect_result(item):
nonlocal result
result = item
process.crawl(JDPriceSpider, product_ids=[product_id])
process.start()
if result:
# 保存到数据库
product = Product(
product_id=product_id,
title=result.get('title', ''),
shop=result.get('shop', ''),
comment_count=result.get('comment_count', 0),
image_url=result.get('image_url', ''),
link=result.get('link', '')
)
db.session.add(product)
db.session.commit()
return product
return None
except Exception as e:
logging.error(f"获取商品信息失败: {e}")
return None
def get_current_price(self, product_id: str) -> float:
"""获取当前价格"""
try:
# 检查缓存
cache_key = f"jd:price:{product_id}"
cached = self.redis_client.get(cache_key)
if cached:
data = json.loads(cached)
return data.get('price', 0.0)
# 从数据库获取最新价格
price_record = PriceHistory.query.filter_by(
product_id=product_id
).order_by(PriceHistory.timestamp.desc()).first()
if price_record:
# 更新缓存
self.redis_client.setex(
cache_key,
3600,
json.dumps({
'price': price_record.price,
'timestamp': int(price_record.timestamp.timestamp())
})
)
return price_record.price
return 0.0
except Exception as e:
logging.error(f"获取当前价格失败: {e}")
return 0.0
def save_price_record(self, product_id: str, price: float) -> bool:
"""保存价格记录"""
try:
# 检查是否重复记录
latest_record = PriceHistory.query.filter_by(
product_id=product_id
).order_by(PriceHistory.timestamp.desc()).first()
if latest_record and latest_record.price == price:
# 价格未变化,不重复记录
return True
# 保存新记录
price_record = PriceHistory(
product_id=product_id,
price=price,
timestamp=datetime.now()
)
db.session.add(price_record)
db.session.commit()
logging.info(f"保存价格记录: {product_id} - ¥{price}")
return True
except Exception as e:
logging.error(f"保存价格记录失败: {e}")
db.session.rollback()
return False
def get_price_history(self, product_id: str, days: int = 30) -> List[PriceHistory]:
"""获取价格历史"""
try:
start_date = datetime.now() - timedelta(days=days)
return PriceHistory.query.filter(
PriceHistory.product_id == product_id,
PriceHistory.timestamp >= start_date
).order_by(PriceHistory.timestamp.asc()).all()
except Exception as e:
logging.error(f"获取价格历史失败: {e}")
return []
def get_price_change(self, product_id: str, days: int = 7) -> Optional[Dict]:
"""获取价格变化"""
try:
history = self.get_price_history(product_id, days=days)
if len(history) < 2:
return None
current_price = history[-1].price
old_price = history[0].price
change = current_price - old_price
change_percent = (change / old_price) * 100 if old_price > 0 else 0.0
return {
'change': change_percent,
'type': 'drop' if change_percent < 0 else 'rise',
'current_price': current_price,
'old_price': old_price
}
except Exception as e:
logging.error(f"计算价格变化失败: {e}")
return None
def set_price_alert(self, user_id: str, product_id: str, target_price: float) -> bool:
"""设置价格预警"""
try:
# 检查是否已存在
existing_alert = PriceAlert.query.filter_by(
user_id=user_id,
product_id=product_id
).first()
if existing_alert:
existing_alert.target_price = target_price
existing_alert.updated_at = datetime.now()
else:
alert = PriceAlert(
user_id=user_id,
product_id=product_id,
target_price=target_price,
created_at=datetime.now(),
updated_at=datetime.now()
)
db.session.add(alert)
db.session.commit()
return True
except Exception as e:
logging.error(f"设置价格预警失败: {e}")
db.session.rollback()
return False
def get_monitored_products(self) -> List[Product]:
"""获取需要监控的商品"""
try:
# 获取所有设置了预警的商品
alerts = PriceAlert.query.distinct(PriceAlert.product_id).all()
product_ids = [alert.product_id for alert in alerts]
return Product.query.filter(Product.product_id.in_(product_ids)).all()
except Exception as e:
logging.error(f"获取监控商品失败: {e}")
return []
3.3.3 通知服务
services/notification_service.py
from typing import Dict, List
from utils.database import db
from web.models.product_model import PriceAlert
from services.wechat_service import WeChatService
import logging
class NotificationService:
"""通知服务"""
def __init__(self):
self.wechat_service = WeChatService()
def send_price_alert(self, product_id: str, message: str) -> bool:
"""发送价格预警"""
try:
# 获取设置了预警的用户
alerts = PriceAlert.query.filter_by(product_id=product_id).all()
for alert in alerts:
try:
# 检查当前价格是否达到目标价
from services.price_service import PriceService
price_service = PriceService()
current_price = price_service.get_current_price(product_id)
if current_price <= alert.target_price:
# 发送通知
self.wechat_service.send_message(alert.user_id, message)
logging.info(f"发送价格预警给用户 {alert.user_id}: {message}")
except Exception as e:
logging.error(f"发送预警给用户 {alert.user_id} 失败: {e}")
continue
return True
except Exception as e:
logging.error(f"发送价格预警失败: {e}")
return False
def send_daily_summary(self, user_id: str) -> bool:
"""发送每日摘要"""
try:
# 获取用户关注商品的价格变化
alerts = PriceAlert.query.filter_by(user_id=user_id).all()
if not alerts:
return True
message = "📊 今日价格变化摘要:\n\n"
has_changes = False
for alert in alerts:
from services.price_service import PriceService
price_service = PriceService()
# 获取24小时价格变化
price_change = price_service.get_price_change(alert.product_id, days=1)
if price_change and abs(price_change['change']) > 1.0: # 变化超过1%
has_changes = True
change_icon = "📈" if price_change['change'] > 0 else "📉"
message += f"{change_icon} {alert.product.title[:20]}...\n"
message += f" 变化: {price_change['change']:.1f}% | 当前: ¥{price_change['current_price']}\n\n"
if has_changes:
self.wechat_service.send_message(user_id, message)
return True
except Exception as e:
logging.error(f"发送每日摘要失败: {e}")
return False
3.4 数据模型
web/models/product_model.py
from utils.database import db
from datetime import datetime
class Product(db.Model):
"""商品模型"""
tablename = 'products'
id = db.Column(db.Integer, primary_key=True)
product_id = db.Column(db.String(20), unique=True, nullable=False) # 京东商品ID
title = db.Column(db.Text, nullable=False) # 商品标题
shop = db.Column(db.String(200), nullable=False) # 店铺名称
comment_count = db.Column(db.Integer, default=0) # 评论数
image_url = db.Column(db.Text) # 图片URL
link = db.Column(db.Text) # 商品链接
created_at = db.Column(db.DateTime, default=datetime.now)
updated_at = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now)
def __repr__(self):
return f'<Product {self.product_id}: {self.title[:20]}>'
class PriceHistory(db.Model):
"""价格历史模型"""
tablename = 'price_history'
id = db.Column(db.Integer, primary_key=True)
product_id = db.Column(db.String(20), db.ForeignKey('products.product_id'), nullable=False)
price = db.Column(db.Float, nullable=False) # 价格
timestamp = db.Column(db.DateTime, nullable=False) # 记录时间
# 关系
product = db.relationship('Product', backref=db.backref('price_history', lazy=True))
def __repr__(self):
return f'<PriceHistory {self.product_id}: ¥{self.price} at {self.timestamp}>'
class PriceAlert(db.Model):
"""价格预警模型"""
tablename = 'price_alerts'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.String(100), nullable=False) # 微信用户ID
product_id = db.Column(db.String(20), db.ForeignKey('products.product_id'), nullable=False)
target_price = db.Column(db.Float, nullable=False) # 目标价格
created_at = db.Column(db.DateTime, default=datetime.now)
updated_at = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now)
# 关系
product = db.relationship('Product', backref=db.backref('price_alerts', lazy=True))
def __repr__(self):
return f'<PriceAlert {self.user_id} for {self.product_id}: ¥{self.target_price}>'
四、部署配置
4.1 Docker配置
docker-compose.yml
version: '3.8'
services:
web:
build: .
ports:
- "5000:5000"
environment:
- DEBUG=False
- MYSQL_HOST=mysql
- REDIS_URL=redis://redis:6379/0
depends_on:
- mysql
- redis
volumes:
- ./logs:/app/logs
- ./qr_code.png:/app/qr_code.png
restart: unless-stopped
mysql:
image: mysql:8.0
environment:
- MYSQL_ROOT_PASSWORD=password
- MYSQL_DATABASE=jd_price_bot
volumes:
- mysql_data:/var/lib/mysql
restart: unless-stopped
redis:
image: redis:7-alpine
restart: unless-stopped
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./ssl:/etc/nginx/ssl
depends_on:
- web
restart: unless-stopped
volumes:
mysql_data:
4.2 Dockerfile
FROM python:3.11-slim
WORKDIR /app
安装系统依赖
RUN apt-get update && apt-get install -y \
gcc \
g++ \
wget \
gnupg \
unzip \
&& rm -rf /var/lib/apt/lists/*
安装Chrome
RUN wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - \
&& sh -c 'echo "deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google.list' \
&& apt-get update \
&& apt-get install -y google-chrome-stable
安装ChromeDriver
RUN CHROMEDRIVER_VERSION=curl -sS chromedriver.storage.googleapis.com/LATEST_RELEASE \
&& wget -O /tmp/chromedriver.zip http://chromedriver.storage.googleapis.com/$CHROMEDRIVER_VERSION/chromedriver_linux64.zip \
&& unzip /tmp/chromedriver.zip -d /usr/local/bin/ \
&& rm /tmp/chromedriver.zip
复制依赖文件
COPY requirements.txt .
安装Python依赖
RUN pip install --no-cache-dir -r requirements.txt
复制应用代码
COPY . .
创建日志目录
RUN mkdir -p logs
暴露端口
EXPOSE 5000
启动命令
CMD ["python", "web/app.py"]
4.3 Nginx配置
nginx.conf
events {
worker_connections 1024;
}
http {
upstream jd_price_bot {
server web:5000;
}
server {
listen 80;
server_name your-domain.com;
# 重定向到HTTPS
return 301 https://$server_name$request_uri;
}
server {
listen 443 ssl;
server_name your-domain.com;
ssl_certificate /etc/nginx/ssl/cert.pem;
ssl_certificate_key /etc/nginx/ssl/private.key;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers HIGH:!aNULL:!MD5;
location / {
proxy_pass http://jd_price_bot;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
# WebSocket支持
location /websocket {
proxy_pass http://jd_price_bot;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
}
五、使用指南
5.1 启动应用
1. 克隆项目
git clone https://github.com/your-username/jd-price-bot.git
cd jd-price-bot
2. 配置环境变量
cp .env.example .env
编辑.env文件,配置数据库、微信等参数
3. 启动服务
docker-compose up -d
4. 初始化数据库
docker-compose exec web python -c "from utils.database import db, init_db; from web.app import app; init_db(app); db.create_all()"
5. 扫描微信二维码登录
查看生成的二维码文件
open qr_code.png
5.2 微信交互示例
用户:帮助
机器人:
🤖 京东比价机器人使用指南:
🔍 搜索商品:
搜索 iPhone 15
search 笔记本电脑
💰 查询价格:
价格 100012345678
price 100012345678
📊 历史价格:
历史 100012345678
history 100012345678
🔔 价格预警:
预警 100012345678 5999
alert 100012345678 5999
⭐ 收藏管理:
收藏 100012345678
收藏列表
📈 价格趋势图:
趋势 100012345678
trend 100012345678
用户:搜索 iPhone 15
机器人:
🔍 搜索“iPhone 15”结果:
Apple iPhone 15 (A3092) 128GB 粉色 支持移动联通电信5G...
💰 ¥4999 | 👑 Apple产品京东自营旗舰店
📞 评论: 200000
🔗 ID: 100012345678Apple iPhone 15 (A3092) 256GB 蓝色 支持移动联通电信5G...
💰 ¥5999 | 👑 Apple产品京东自营旗舰店
📞 评论: 150000
🔗 ID: 100012345679
输入“价格 ID”查看详情,如:价格 100012345678
用户:价格 100012345678
机器人:
📱 Apple iPhone 15 (A3092) 128GB 粉色 支持移动联通电信5G...
💰 当前价格: ¥4999
📉 7天变化: -5.2%
🏪 店铺: Apple产品京东自营旗舰店
📞 评论: 200000
输入“历史 ID”查看价格趋势
用户:预警 100012345678 4500
机器人:
✅ 预警设置成功!
当价格 ≤ ¥4500 时会通知您
六、项目特点
6.1 技术亮点
智能爬虫:结合Scrapy+Selenium,支持动态页面渲染
实时监控:定时任务自动监控价格变化
微信集成:无缝对接微信,支持多种交互方式
数据可视化:生成价格趋势图,提供购买建议
高性能缓存:Redis缓存提升查询性能
6.2 应用场景
个人购物:实时比价,抓住最佳购买时机
电商运营:监控竞品价格,制定定价策略
数据分析:分析商品价格波动规律
智能推荐:基于价格趋势推荐购买时间
6.3 注意事项
合规使用:遵守京东Robots协议,控制爬取频率
用户隐私:妥善保管用户微信信息
API限制:合理使用第三方服务,避免被封禁
数据安全:加密存储敏感信息
通过本项目,你可以:
✅ 掌握Scrapy爬虫开发技巧
✅ 实现Flask RESTful API开发
✅ 构建微信机器人应用
✅ 设计高性能数据存储方案
✅ 部署完整的Web应用系统
项目代码已包含完整的错误处理、日志记录、性能优化,可直接用于生产环境或作为学习参考。