下载地址:https://www.pan38.com/share.php?code=pvvmX 提取码:8888
该实现包含API交互层、订单处理核心逻辑和配置系统,支持多线程实时监控和智能接单。使用时需替换有效API令牌并配置筛选参数。代码包含异常处理和日志输出功能,建议部署在云服务器实现24小时运行。
import requests
import json
import time
import hashlib
from threading import Thread
from queue import Queue
class HuoLaLaAPI:
def init(self, token):
self.base_url = "https://api.huolala.cn/v3"
self.headers = {
"User-Agent": "Mozilla/5.0",
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
self.session = requests.Session()
def _request(self, method, endpoint, data=None):
url = f"{self.base_url}/{endpoint}"
try:
response = self.session.request(
method,
url,
headers=self.headers,
json=data,
timeout=10
)
response.raise_for_status()
return response.json()
except Exception as e:
print(f"API请求错误: {str(e)}")
return None
def get_available_orders(self, radius=10, lat=None, lng=None):
params = {
"radius": radius,
"page": 1,
"size": 50
}
if lat and lng:
params.update({"lat": lat, "lng": lng})
return self._request("GET", "orders/available", params)
def accept_order(self, order_id):
data = {"order_id": order_id}
return self._request("POST", "orders/accept", data)
class OrderProcessor:
def init(self, api):
self.api = api
self.order_queue = Queue()
self.running = True
def start_monitoring(self):
monitor_thread = Thread(target=self._monitor_orders)
process_thread = Thread(target=self._process_orders)
monitor_thread.start()
process_thread.start()
def _monitor_orders(self):
while self.running:
orders = self.api.get_available_orders()
if orders and orders.get("data"):
for order in orders["data"]:
if self._filter_order(order):
self.order_queue.put(order)
time.sleep(5)
def _process_orders(self):
while self.running:
if not self.order_queue.empty():
order = self.order_queue.get()
result = self.api.accept_order(order["id"])
if result and result.get("success"):
print(f"成功接单: {order['id']}")
else:
print(f"接单失败: {order['id']}")
time.sleep(1)
def _filter_order(self, order):
# 示例筛选逻辑:只接距离5公里内且价格大于50的订单
return (order.get("distance", 100) <= 5
and order.get("price", 0) > 50)
if name == "main":
API_TOKEN = "your_api_token_here" # 替换为实际token
api = HuoLaLaAPI(API_TOKEN)
processor = OrderProcessor(api)
try:
print("货拉拉接单脚本启动...")
processor.start_monitoring()
while True:
time.sleep(1)
except KeyboardInterrupt:
processor.running = False
print("脚本已停止")
[api]
token = your_api_token_here
base_url = https://api.huolala.cn/v3
[filter]
max_distance = 5
min_price = 50
max_wait_time = 30
[location]
latitude = 39.9042 # 北京纬度
longitude = 116.4074 # 北京经度
requests==2.31.0
python-dotenv==1.0.0
schedule==1.2.0
geopy==2.4.0