下载地址:https://www.pan38.com/share.php?code=pvvmX 提取码:8888
通过API与顺风车平台交互,包含登录验证、订单查询和抢单功能。请注意这只是一个技术演示,实际平台可能有更复杂的反自动化机制。建议遵守平台规则,合理使用服务。
import requests
import json
import time
import random
from datetime import datetime
class CarPoolingBot:
def init(self, username, password):
self.session = requests.Session()
self.base_url = "https://api.example-carpool.com/v1"
self.token = None
self.username = username
self.password = password
self.headers = {
"User-Agent": "Mozilla/5.0",
"Content-Type": "application/json"
}
def login(self):
login_url = f"{self.base_url}/auth/login"
payload = {
"username": self.username,
"password": self.password
}
try:
response = self.session.post(
login_url,
data=json.dumps(payload),
headers=self.headers
)
if response.status_code == 200:
self.token = response.json().get("token")
self.headers["Authorization"] = f"Bearer {self.token}"
print(f"登录成功,用户: {self.username}")
return True
else:
print(f"登录失败: {response.text}")
return False
except Exception as e:
print(f"登录异常: {str(e)}")
return False
def get_available_orders(self):
orders_url = f"{self.base_url}/orders/available"
try:
response = self.session.get(
orders_url,
headers=self.headers
)
if response.status_code == 200:
return response.json().get("data", [])
else:
print(f"获取订单失败: {response.text}")
return []
except Exception as e:
print(f"获取订单异常: {str(e)}")
return []
def grab_order(self, order_id):
grab_url = f"{self.base_url}/orders/{order_id}/grab"
try:
response = self.session.post(
grab_url,
headers=self.headers
)
if response.status_code == 200:
print(f"成功抢到订单: {order_id}")
return True
else:
print(f"抢单失败: {response.text}")
return False
except Exception as e:
print(f"抢单异常: {str(e)}")
return False
def monitor_and_grab(self, interval=5, max_attempts=100):
if not self.login():
return False
attempts = 0
while attempts < max_attempts:
orders = self.get_available_orders()
if not orders:
print(f"{datetime.now()} - 没有可用订单,等待中...")
time.sleep(interval)
attempts += 1
continue
for order in orders:
if self.grab_order(order["id"]):
return True
time.sleep(random.uniform(0.5, 1.5))
attempts += 1
time.sleep(interval)
print("达到最大尝试次数,退出")
return False
if name == "main":
bot = CarPoolingBot("your_username", "your_password")
bot.monitor_and_grab()
[credentials]
username = your_username
password = your_password
[settings]
check_interval = 5
max_attempts = 100
retry_delay = 1.0
[api]
base_url = https://api.example-carpool.com/v1
timeout = 30
requests==2.31.0
python-dotenv==1.0.0
schedule==1.2.0