下载地址:https://www.pan38.com/share.php?code=pvvmX 提取码:1557
包含数据库操作、订单状态管理和自动接单的功能。主程序处理订单流程,配置文件存储系统参数,requirements.txt列出所需依赖。使用时需要配置SMTP服务器信息。
import sqlite3
import smtplib
from email.mime.text import MIMEText
from datetime import datetime
class OrderSystem:
def init(self, db_name='orders.db'):
self.conn = sqlite3.connect(db_name)
self.create_tables()
self.smtp_config = {
'server': 'smtp.example.com',
'port': 587,
'username': 'your_email@example.com',
'password': 'your_password'
}
def create_tables(self):
cursor = self.conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
customer_name TEXT NOT NULL,
order_details TEXT NOT NULL,
status TEXT DEFAULT 'pending',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
self.conn.commit()
def add_order(self, customer_name, order_details):
cursor = self.conn.cursor()
cursor.execute('''
INSERT INTO orders (customer_name, order_details)
VALUES (?, ?)
''', (customer_name, order_details))
self.conn.commit()
return cursor.lastrowid
def process_orders(self):
cursor = self.conn.cursor()
cursor.execute('''
SELECT * FROM orders WHERE status = 'pending'
''')
pending_orders = cursor.fetchall()
for order in pending_orders:
order_id = order[0]
self.process_single_order(order_id)
def process_single_order(self, order_id):
cursor = self.conn.cursor()
cursor.execute('''
UPDATE orders SET status = 'processing'
WHERE id = ?
''', (order_id,))
self.conn.commit()
# 这里可以添加具体的订单处理逻辑
print(f"Processing order {order_id}")
# 模拟处理完成
self.complete_order(order_id)
def complete_order(self, order_id):
cursor = self.conn.cursor()
cursor.execute('''
UPDATE orders SET status = 'completed'
WHERE id = ?
''', (order_id,))
self.conn.commit()
self.send_notification(order_id)
def send_notification(self, order_id):
cursor = self.conn.cursor()
cursor.execute('''
SELECT customer_name, order_details FROM orders WHERE id = ?
''', (order_id,))
order = cursor.fetchone()
if not order:
return
customer_name, order_details = order
message = f"""
Dear {customer_name},
Your order (ID: {order_id}) has been processed successfully.
Order details: {order_details}
Thank you for your business!
"""
try:
msg = MIMEText(message)
msg['Subject'] = f'Order #{order_id} Completed'
msg['From'] = self.smtp_config['username']
msg['To'] = f'{customer_name}@example.com'
with smtplib.SMTP(self.smtp_config['server'], self.smtp_config['port']) as server:
server.starttls()
server.login(self.smtp_config['username'], self.smtp_config['password'])
server.send_message(msg)
print(f"Notification sent for order {order_id}")
except Exception as e:
print(f"Failed to send notification: {str(e)}")
def close(self):
self.conn.close()
if name == 'main':
system = OrderSystem()
# 示例:添加新订单
system.add_order('John Doe', 'Website development project')
system.add_order('Jane Smith', 'Mobile app design')
# 处理待处理订单
system.process_orders()
system.close()
[database]
name = orders.db
[email]
server = smtp.example.com
port = 587
username = your_email@example.com
password = your_password
from_address = noreply@example.com
[settings]
sqlite3
smtplib
email
代码语言:txt
AI代码解释
import cv2
import pytesseract
import numpy as np
import pyautogui
import time
from PIL import ImageGrab
class OrderProcessor:
def init(self):
pytesseract.pytesseract.tesseract_cmd = r'C:\Program Files\Tesseract-OCR\tesseract.exe'
self.template_images = {
'accept_button': 'accept_template.png',
'order_area': 'order_area_template.png'
}
def capture_screen(self, region=None):
"""截取屏幕指定区域"""
if region:
return ImageGrab.grab(bbox=region)
return ImageGrab.grab()
def preprocess_image(self, image):
"""图像预处理增强OCR识别"""
gray = cv2.cvtColor(np.array(image), cv2.COLOR_BGR2GRAY)
_, thresh = cv2.threshold(gray, 150, 255, cv2.THRESH_BINARY)
return thresh
def find_template(self, template_name, threshold=0.8):
"""模板匹配定位界面元素"""
template = cv2.imread(self.template_images[template_name], 0)
screen = np.array(self.capture_screen())
gray_screen = cv2.cvtColor(screen, cv2.COLOR_BGR2GRAY)
res = cv2.matchTemplate(gray_screen, template, cv2.TM_CCOEFF_NORMED)
loc = np.where(res >= threshold)
return list(zip(*loc[::-1]))
def extract_order_text(self, region):
"""从指定区域提取订单文本"""
img = self.capture_screen(region)
processed_img = self.preprocess_image(img)
text = pytesseract.image_to_string(processed_img, lang='chi_sim+eng')
return text.strip()
def auto_scroll(self, pixels=300):
"""自动滚动页面"""
pyautogui.scroll(-pixels)
time.sleep(1)
def process_new_orders(self):
"""处理新订单主流程"""
while True:
# 定位订单区域
order_positions = self.find_template('order_area')
if not order_positions:
self.auto_scroll()
continue
for pos in order_positions:
# 定义订单文本区域(相对位置)
text_region = (pos[0]+50, pos[1]+30, pos[0]+400, pos[1]+150)
order_text = self.extract_order_text(text_region)
if "新订单" in order_text:
# 定位并点击接受按钮
accept_buttons = self.find_template('accept_button')
if accept_buttons:
pyautogui.click(accept_buttons[0][0]+20, accept_buttons[0][1]+20)
print(f"已接受订单: {order_text}")
time.sleep(2)
self.auto_scroll()
time.sleep(5)
if name == 'main':
processor = OrderProcessor()
processor.process_new_orders()