一、接口概览
1.1 接口简介
item_get接口是懂车帝开放平台的核心接口之一,用于获取车辆的完整详细信息,包括车辆基本参数、配置信息、图片、价格、评测数据、用户评价等全方位数据。
1.2 接口特性
数据全面:返回车辆的全维度信息,涵盖技术参数、配置、图片、价格等
实时更新:数据与懂车帝APP/网站保持同步
结构化返回:JSON格式,字段定义清晰,便于解析
权限分级:支持不同数据粒度的访问权限控制
二、准备工作
2.1 环境配置
requirements.txt
requests>=2.28.0
python-dotenv>=1.0.0
pydantic>=2.0.0
aiohttp>=3.8.0
redis>=4.5.0
2.2 认证配置
config.py
import os
from dotenv import load_dotenv
load_dotenv()
class DongchediConfig:
# 懂车帝API配置
DONGCHEDI_APP_KEY = os.getenv('DONGCHEDI_APP_KEY')
DONGCHEDI_APP_SECRET = os.getenv('DONGCHEDI_APP_SECRET')
DONGCHEDI_API_BASE = os.getenv('DONGCHEDI_API_BASE',
'https://openapi.dongchedi.com/api/v1'
)
# 请求配置
REQUEST_TIMEOUT = 30
MAX_RETRIES = 3
DEFAULT_PAGE_SIZE = 20
# 缓存配置
CACHE_TTL = 3600 # 1小时
三、接口详解
3.1 接口地址
GET /vehicle/{vehicle_id}
3.2 请求参数详解
公共参数
参数名
类型
必填
说明
示例
app_key
string
是
应用标识
dcd_app_2024
timestamp
int
是
时间戳
1706774400
sign
string
是
请求签名
详见签名算法
format
string
否
返回格式
json(默认)
version
string
是
API版本
1.0
业务参数
参数名
类型
必填
说明
示例
vehicle_id
int
是
车辆ID
12345
include_images
bool
否
是否包含图片
true
include_specs
bool
否
是否包含详细配置
true
include_prices
bool
否
是否包含价格信息
true
include_reviews
bool
否
是否包含用户评价
true
include_ratings
bool
否
是否包含评分数据
true
include_dealers
bool
否
是否包含经销商信息
true
fields
string
否
指定返回字段
id,title,price,brand,series
3.3 签名算法
import hashlib
import hmac
from typing import Dict, Any
def generate_signature(params: Dict[str, Any], app_secret: str) -> str:
"""
生成懂车帝API请求签名
Args:
params: 请求参数字典
app_secret: 应用密钥
Returns:
签名字符串
"""
# 1. 过滤空值和sign参数,按键名ASCII升序排序
filtered_params = {
k: v for k, v in params.items()
if v is not None and k != 'sign'
}
sorted_keys = sorted(filtered_params.keys())
# 2. 拼接键值对
sign_str = ''
for key in sorted_keys:
if isinstance(filtered_params[key], (list, dict)):
# 处理复杂类型参数
import json
value = json.dumps(filtered_params[key], separators=(',', ':'))
else:
value = str(filtered_params[key])
sign_str += f"{key}{value}"
# 3. 拼接app_secret
sign_str += app_secret
# 4. 计算HMAC-SHA256签名
signature = hmac.new(
app_secret.encode('utf-8'),
sign_str.encode('utf-8'),
hashlib.sha256
).hexdigest()
return signature
四、完整代码实现
4.1 Python完整实现
import requests
import time
import hashlib
import hmac
import json
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass
from urllib.parse import urlencode
import redis
@dataclass
class VehicleBasicInfo:
"""车辆基本信息"""
vehicle_id: int
title: str
brand: str
series: str
model: str
year: int
fuel_type: str
transmission: str
body_type: str
displacement: str
power: str
torque: str
fuel_consumption: str
msrp: float
market_price: float
create_time: str
update_time: str
@dataclass
class VehicleImage:
"""车辆图片信息"""
image_id: int
url: str
title: str
type: str # exterior, interior, detail, 360
size: str # small, medium, large, original
width: int
height: int
@dataclass
class VehicleSpec:
"""车辆配置信息"""
category: str
name: str
value: str
description: str
is_standard: bool
@dataclass
class VehiclePrice:
"""车辆价格信息"""
price_type: str # msrp, dealer_price, market_price, min_price, max_price
amount: float
currency: str
region: str
update_time: str
@dataclass
class VehicleReview:
"""车辆评价信息"""
review_id: int
user_id: int
user_name: str
rating: float
title: str
content: str
create_time: str
helpful_count: int
comment_count: int
pros: List[str]
cons: List[str]
@dataclass
class VehicleRating:
"""车辆评分信息"""
overall: float
exterior: float
interior: float
performance: float
comfort: float
fuel_economy: float
safety: float
technology: float
review_count: int
recommend_rate: float
class DongchediAPI:
"""懂车帝API客户端"""
def __init__(self, app_key: str, app_secret: str, sandbox: bool = True, redis_client=None):
self.app_key = app_key
self.app_secret = app_secret
self.base_url = "https://sandbox-openapi.dongchedi.com" if sandbox else "https://openapi.dongchedi.com"
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Dongchedi-API-Client/1.0',
'Accept': 'application/json'
})
self.redis = redis_client
self._access_token = None
self._token_expires = None
def _generate_signature(self, params: Dict[str, Any], timestamp: int) -> str:
"""生成请求签名"""
# 过滤并排序参数
filtered_params = {
k: v for k, v in params.items()
if v is not None and k != 'sign'
}
sorted_keys = sorted(filtered_params.keys())
# 拼接参数字符串
sign_str = ''
for key in sorted_keys:
if isinstance(filtered_params[key], (list, dict)):
value = json.dumps(filtered_params[key], separators=(',', ':'))
else:
value = str(filtered_params[key])
sign_str += f"{key}{value}"
# 添加app_secret
sign_str += self.app_secret
# 计算HMAC-SHA256签名
signature = hmac.new(
self.app_secret.encode('utf-8'),
sign_str.encode('utf-8'),
hashlib.sha256
).hexdigest()
return signature
def _get_access_token(self) -> str:
"""获取访问令牌"""
# 检查token是否有效
if self._access_token and self._token_expires and self._token_expires > datetime.now():
return self._access_token
# 获取新token
timestamp = int(time.time())
params = {
'app_key': self.app_key,
'timestamp': timestamp,
'grant_type': 'client_credentials'
}
# 生成签名
signature = self._generate_signature(params, timestamp)
params['sign'] = signature
# 请求token
url = f"{self.base_url}/oauth/token"
response = self.session.post(url, data=params)
if response.status_code == 200:
data = response.json()
self._access_token = data['access_token']
self._token_expires = datetime.now() + timedelta(seconds=data['expires_in'] - 300) # 提前5分钟过期
return self._access_token
else:
raise Exception(f"获取token失败: {response.status_code} - {response.text}")
def get_vehicle_detail(
self,
vehicle_id: int,
include_images: bool = True,
include_specs: bool = True,
include_prices: bool = True,
include_reviews: bool = False,
include_ratings: bool = True,
include_dealers: bool = False,
fields: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
获取车辆详细信息
Args:
vehicle_id: 车辆ID
include_images: 是否包含图片
include_specs: 是否包含详细配置
include_prices: 是否包含价格信息
include_reviews: 是否包含用户评价
include_ratings: 是否包含评分数据
include_dealers: 是否包含经销商信息
fields: 指定返回字段
Returns:
车辆详情数据
"""
# 获取访问令牌
access_token = self._get_access_token()
# 构建请求参数
params = {
'app_key': self.app_key,
'timestamp': int(time.time()),
'format': 'json',
'version': '1.0',
'include_images': str(include_images).lower(),
'include_specs': str(include_specs).lower(),
'include_prices': str(include_prices).lower(),
'include_reviews': str(include_reviews).lower(),
'include_ratings': str(include_ratings).lower(),
'include_dealers': str(include_dealers).lower()
}
if fields:
params['fields'] = ','.join(fields)
# 生成签名
signature = self._generate_signature(params, params['timestamp'])
params['sign'] = signature
# 添加认证头
headers = {
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json'
}
# 发送请求
url = f"{self.base_url}/api/v1/vehicle/{vehicle_id}"
try:
response = self.session.get(
url,
params=params,
headers=headers,
timeout=DongchediConfig.REQUEST_TIMEOUT
)
if response.status_code == 200:
return response.json()
elif response.status_code == 404:
raise VehicleNotFoundException(f"车辆ID {vehicle_id} 不存在")
elif response.status_code == 401:
# Token过期,重新获取
self._access_token = None
return self.get_vehicle_detail(
vehicle_id, include_images, include_specs,
include_prices, include_reviews, include_ratings,
include_dealers, fields
)
else:
raise Exception(f"请求失败: {response.status_code} - {response.text}")
except requests.exceptions.Timeout:
raise Exception("请求超时")
except requests.exceptions.RequestException as e:
raise Exception(f"网络请求异常: {str(e)}")
def get_vehicle_detail_structured(
self,
vehicle_id: int,
**kwargs
) -> Dict[str, Any]:
"""
获取结构化的车辆详情信息
"""
raw_data = self.get_vehicle_detail(vehicle_id, **kwargs)
if not raw_data.get('success'):
raise Exception(f"获取车辆详情失败: {raw_data.get('message')}")
data = raw_data.get('data', {})
# 解析基础信息
basic_info = VehicleBasicInfo(
vehicle_id=data.get('id'),
title=data.get('title', ''),
brand=data.get('brand', {}).get('name', ''),
series=data.get('series', {}).get('name', ''),
model=data.get('model', ''),
year=data.get('year', 0),
fuel_type=data.get('fuel_type', ''),
transmission=data.get('transmission', ''),
body_type=data.get('body_type', ''),
displacement=data.get('displacement', ''),
power=data.get('power', ''),
torque=data.get('torque', ''),
fuel_consumption=data.get('fuel_consumption', ''),
msrp=data.get('msrp', 0),
market_price=data.get('market_price', 0),
create_time=data.get('create_time', ''),
update_time=data.get('update_time', '')
)
# 解析图片信息
images = []
for img_data in data.get('images', []):
images.append(VehicleImage(
image_id=img_data.get('id'),
url=img_data.get('url'),
title=img_data.get('title'),
type=img_data.get('type'),
size=img_data.get('size'),
width=img_data.get('width'),
height=img_data.get('height')
))
# 解析配置信息
specs = []
for spec_data in data.get('specs', []):
specs.append(VehicleSpec(
category=spec_data.get('category'),
name=spec_data.get('name'),
value=spec_data.get('value'),
description=spec_data.get('description'),
is_standard=spec_data.get('is_standard', False)
))
# 解析价格信息
prices = []
for price_data in data.get('prices', []):
prices.append(VehiclePrice(
price_type=price_data.get('type'),
amount=price_data.get('amount'),
currency=price_data.get('currency'),
region=price_data.get('region'),
update_time=price_data.get('update_time')
))
# 解析评分信息
rating_data = data.get('rating', {})
rating = VehicleRating(
overall=rating_data.get('overall', 0),
exterior=rating_data.get('exterior', 0),
interior=rating_data.get('interior', 0),
performance=rating_data.get('performance', 0),
comfort=rating_data.get('comfort', 0),
fuel_economy=rating_data.get('fuel_economy', 0),
safety=rating_data.get('safety', 0),
technology=rating_data.get('technology', 0),
review_count=rating_data.get('review_count', 0),
recommend_rate=rating_data.get('recommend_rate', 0)
)
# 解析评价信息
reviews = []
for review_data in data.get('reviews', []):
reviews.append(VehicleReview(
review_id=review_data.get('id'),
user_id=review_data.get('user_id'),
user_name=review_data.get('user_name'),
rating=review_data.get('rating'),
title=review_data.get('title'),
content=review_data.get('content'),
create_time=review_data.get('create_time'),
helpful_count=review_data.get('helpful_count'),
comment_count=review_data.get('comment_count'),
pros=review_data.get('pros', []),
cons=review_data.get('cons', [])
))
return {
'basic_info': basic_info,
'images': images,
'specs': specs,
'prices': prices,
'rating': rating,
'reviews': reviews,
'raw_data': raw_data
}
def batch_get_vehicle_details(
self,
vehicle_ids: List[int],
max_workers: int = 5,
**kwargs
) -> Dict[int, Dict[str, Any]]:
"""
批量获取车辆详情
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
results = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有任务
future_to_id = {
executor.submit(self.get_vehicle_detail, vehicle_id, **kwargs): vehicle_id
for vehicle_id in vehicle_ids
}
# 收集结果
for future in as_completed(future_to_id):
vehicle_id = future_to_id[future]
try:
result = future.result(timeout=30)
results[vehicle_id] = result
except Exception as e:
print(f"获取车辆 {vehicle_id} 详情失败: {e}")
results[vehicle_id] = {'error': str(e)}
return results
class VehicleNotFoundException(Exception):
"""车辆不存在异常"""
pass
使用示例
def demo_dongchedi_api():
"""懂车帝API使用演示"""
# 初始化客户端
client = DongchediAPI(
app_key=DongchediConfig.DONGCHEDI_APP_KEY,
app_secret=DongchediConfig.DONGCHEDI_APP_SECRET,
sandbox=True
)
# 获取单个车辆详情
print("=== 获取单个车辆详情 ===")
vehicle_detail = client.get_vehicle_detail(12345)
print(json.dumps(vehicle_detail, ensure_ascii=False, indent=2))
# 获取结构化信息
print("\n=== 获取结构化车辆信息 ===")
structured_info = client.get_vehicle_detail_structured(12345)
basic_info = structured_info['basic_info']
print(f"车辆: {basic_info.brand} {basic_info.series} {basic_info.model}")
print(f"指导价: {basic_info.msrp}万")
print(f"市场价: {basic_info.market_price}万")
print(f"综合评分: {structured_info['rating'].overall}")
# 批量获取
print("\n=== 批量获取车辆详情 ===")
vehicle_ids = [12345, 12346, 12347, 12348]
batch_results = client.batch_get_vehicle_details(vehicle_ids)
for vid, result in batch_results.items():
if 'error' not in result:
data = result.get('data', {})
print(f"车辆 {vid}: {data.get('brand', {}).get('name')} {data.get('model')}")
else:
print(f"车辆 {vid}: 获取失败 - {result['error']}")
if name == "main":
demo_dongchedi_api()
4.2 Java实现
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import okhttp3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.TimeUnit;
public class DongchediApiClient {
private static final Logger logger = LoggerFactory.getLogger(DongchediApiClient.class);
private final String appKey;
private final String appSecret;
private final String baseUrl;
private final OkHttpClient httpClient;
private final ObjectMapper objectMapper;
private String accessToken;
private LocalDateTime tokenExpires;
public DongchediApiClient(String appKey, String appSecret, boolean sandbox) {
this.appKey = appKey;
this.appSecret = appSecret;
this.baseUrl = sandbox ?
"https://sandbox-openapi.dongchedi.com" :
"https://openapi.dongchedi.com";
this.httpClient = new OkHttpClient.Builder()
.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS)
.build();
this.objectMapper = new ObjectMapper();
this.objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
}
public Map<String, Object> getVehicleDetail(int vehicleId, Map<String, Object> params) throws IOException {
// 获取访问令牌
String token = getAccessToken();
// 构建请求URL
HttpUrl.Builder urlBuilder = HttpUrl.parse(baseUrl + "/api/v1/vehicle/" + vehicleId).newBuilder();
if (params != null) {
for (Map.Entry<String, Object> param : params.entrySet()) {
urlBuilder.addQueryParameter(param.getKey(), param.getValue().toString());
}
}
// 构建请求
Request request = new Request.Builder()
.url(urlBuilder.build())
.addHeader("Authorization", "Bearer " + token)
.addHeader("Content-Type", "application/json")
.addHeader("User-Agent", "Dongchedi-Java-Client/1.0")
.build();
// 发送请求
try (Response response = httpClient.newCall(request).execute()) {
if (response.isSuccessful()) {
String responseBody = response.body().string();
return objectMapper.readValue(responseBody, Map.class);
} else if (response.code() == 404) {
throw new VehicleNotFoundException("车辆ID " + vehicleId + " 不存在");
} else if (response.code() == 401) {
// Token过期,重新获取
this.accessToken = null;
return getVehicleDetail(vehicleId, params);
} else {
throw new IOException("请求失败: " + response.code() + " - " + response.message());
}
}
}
private String getAccessToken() throws IOException {
// 检查token是否有效
if (accessToken != null && tokenExpires != null && tokenExpires.isAfter(LocalDateTime.now())) {
return accessToken;
}
// 获取新token
long timestamp = System.currentTimeMillis() / 1000;
Map<String, Object> params = new HashMap<>();
params.put("app_key", appKey);
params.put("timestamp", timestamp);
params.put("grant_type", "client_credentials");
// 生成签名
String signature = generateSignature(params, timestamp);
params.put("sign", signature);
// 构建请求
FormBody.Builder formBuilder = new FormBody.Builder();
for (Map.Entry<String, Object> param : params.entrySet()) {
formBuilder.add(param.getKey(), param.getValue().toString());
}
Request request = new Request.Builder()
.url(baseUrl + "/oauth/token")
.post(formBuilder.build())
.build();
try (Response response = httpClient.newCall(request).execute()) {
if (response.isSuccessful()) {
String responseBody = response.body().string();
Map<String, Object> result = objectMapper.readValue(responseBody, Map.class);
this.accessToken = (String) result.get("access_token");
int expiresIn = (Integer) result.get("expires_in");
this.tokenExpires = LocalDateTime.now().plusSeconds(expiresIn - 300); // 提前5分钟过期
return accessToken;
} else {
throw new IOException("获取token失败: " + response.code());
}
}
}
private String generateSignature(Map<String, Object> params, long timestamp) {
try {
// 排序参数
List<String> keys = new ArrayList<>(params.keySet());
Collections.sort(keys);
// 构建参数字符串
StringBuilder paramStr = new StringBuilder();
for (String key : keys) {
paramStr.append(key).append("=").append(params.get(key)).append("&");
}
if (paramStr.length() > 0) {
paramStr.deleteCharAt(paramStr.length() - 1); // 移除最后一个&
}
// 构建签名字符串
String signStr = appKey + paramStr.toString() + timestamp + appSecret;
// 计算HMAC-SHA256
Mac sha256_HMAC = Mac.getInstance("HmacSHA256");
SecretKeySpec secret_key = new SecretKeySpec(appSecret.getBytes(StandardCharsets.UTF_8), "HmacSHA256");
sha256_HMAC.init(secret_key);
byte[] hash = sha256_HMAC.doFinal(signStr.getBytes(StandardCharsets.UTF_8));
// 转换为十六进制
StringBuilder hexString = new StringBuilder();
for (byte b : hash) {
String hex = Integer.toHexString(0xff & b);
if (hex.length() == 1) hexString.append('0');
hexString.append(hex);
}
return hexString.toString();
} catch (NoSuchAlgorithmException | InvalidKeyException e) {
throw new RuntimeException("生成签名失败", e);
}
}
}
class VehicleNotFoundException extends RuntimeException {
public VehicleNotFoundException(String message) {
super(message);
}
}
4.3 PHP实现
<?php
class DongchediApiClient
{
private $appKey;
private $appSecret;
private $baseUrl;
private $accessToken;
private $tokenExpires;
public function __construct($appKey, $appSecret, $sandbox = true)
{
$this->appKey = $appKey;
$this->appSecret = $appSecret;
$this->baseUrl = $sandbox
? 'https://sandbox-openapi.dongchedi.com'
: 'https://openapi.dongchedi.com';
}
public function getVehicleDetail($vehicleId, $params = [])
{
// 获取访问令牌
$token = $this->getAccessToken();
// 构建请求URL
$url = $this->baseUrl . '/api/v1/vehicle/' . $vehicleId;
if (!empty($params)) {
$url .= '?' . http_build_query($params);
}
// 发送请求
$ch = curl_init();
curl_setopt_array($ch, [
CURLOPT_URL => $url,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_TIMEOUT => 30,
CURLOPT_HTTPHEADER => [
'Authorization: Bearer ' . $token,
'Content-Type: application/json',
'User-Agent: Dongchedi-PHP-Client/1.0'
]
]);
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
if ($httpCode === 200) {
return json_decode($response, true);
} elseif ($httpCode === 404) {
throw new Exception("车辆ID {$vehicleId} 不存在");
} elseif ($httpCode === 401) {
// Token过期,重新获取
$this->accessToken = null;
return $this->getVehicleDetail($vehicleId, $params);
} else {
throw new Exception("请求失败: HTTP {$httpCode}");
}
}
private function getAccessToken()
{
// 检查token是否有效
if ($this->accessToken && $this->tokenExpires && $this->tokenExpires > time()) {
return $this->accessToken;
}
// 获取新token
$timestamp = time();
$params = [
'app_key': $this->appKey,
'timestamp': $timestamp,
'grant_type': 'client_credentials'
];
// 生成签名
$signature = $this->generateSignature($params, $timestamp);
$params['sign'] = $signature;
// 发送请求
$ch = curl_init();
curl_setopt_array($ch, [
CURLOPT_URL => $this->baseUrl . '/oauth/token',
CURLOPT_RETURNTRANSFER => true,
CURLOPT_POST => true,
CURLOPT_POSTFIELDS => http_build_query($params),
CURLOPT_TIMEOUT => 30
]);
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
if ($httpCode === 200) {
$result = json_decode($response, true);
$this->accessToken = $result['access_token'];
$this->tokenExpires = time() + $result['expires_in'] - 300; // 提前5分钟过期
return $this->accessToken;
} else {
throw new Exception("获取token失败: HTTP {$httpCode}");
}
}
private function generateSignature($params, $timestamp)
{
// 排序参数
ksort($params);
$paramStr = '';
foreach ($params as $key => $value) {
$paramStr .= $key . '=' . $value . '&';
}
$paramStr = rtrim($paramStr, '&');
// 构建签名字符串
$signStr = $this->appKey . $paramStr . $timestamp . $this->appSecret;
// 计算HMAC-SHA256
return hash_hmac('sha256', $signStr, $this->appSecret);
}
}
// 使用示例
try {
$client = new DongchediApiClient('your_app_key', 'your_app_secret', true);
$vehicleDetail = $client->getVehicleDetail(12345, [
'include_images' => 'true',
'include_specs' => 'true',
'include_ratings' => 'true'
]);
echo "车辆信息:\n";
echo "品牌: " . $vehicleDetail['data']['brand']['name'] . "\n";
echo "车系: " . $vehicleDetail['data']['series']['name'] . "\n";
echo "车型: " . $vehicleDetail['data']['model'] . "\n";
echo "综合评分: " . $vehicleDetail['data']['rating']['overall'] . "\n";
} catch (Exception $e) {
echo "错误: " . $e->getMessage() . "\n";
}
?>
五、返回结果解析
5.1 成功响应示例
{
"success": true,
"code": 200,
"message": "成功",
"data": {
"id": 12345,
"title": "2023款 宝马3系 325Li M运动套装",
"brand": {
"id": 2,
"name": "宝马",
"logo": "https://img.dongchedi.com/brand/2.png"
},
"series": {
"id": 20,
"name": "3系",
"image": "https://img.dongchedi.com/series/20.jpg"
},
"model": "325Li M运动套装",
"year": 2023,
"fuel_type": "汽油",
"transmission": "8挡手自一体",
"body_type": "轿车",
"displacement": "2.0T",
"power": "184马力/5000-6500rpm",
"torque": "300牛·米/1350-4000rpm",
"fuel_consumption": "6.2L/100km",
"msrp": 34.99,
"market_price": 32.5,
"images": [
{
"id": 1,
"url": "https://img.dongchedi.com/vehicle/12345_1.jpg",
"title": "外观前脸",
"type": "exterior",
"size": "large",
"width": 1920,
"height": 1080
}
],
"specs": [
{
"category": "车身",
"name": "长宽高",
"value": "4838×1827×1454mm",
"description": "车身尺寸",
"is_standard": true
}
],
"prices": [
{
"type": "msrp",
"amount": 34.99,
"currency": "CNY",
"region": "全国",
"update_time": "2023-01-15 10:00:00"
}
],
"rating": {
"overall": 4.5,
"exterior": 4.7,
"interior": 4.3,
"performance": 4.6,
"comfort": 4.4,
"fuel_economy": 4.2,
"safety": 4.8,
"technology": 4.5,
"review_count": 1250,
"recommend_rate": 92.5
},
"reviews": [
{
"id": 1001,
"user_id": 5001,
"user_name": "懂车帝用户",
"rating": 5.0,
"title": "操控性很棒,动力充沛",
"content": "3系的操控性确实名不虚传...",
"create_time": "2023-05-20 14:30:00",
"helpful_count": 25,
"comment_count": 8,
"pros": ["操控好", "动力强", "外观漂亮"],
"cons": ["后排空间一般", "油耗偏高"]
}
],
"create_time": "2022-12-01 09:00:00",
"update_time": "2023-06-15 16:30:00"
}
}
5.2 错误响应示例
{
"success": false,
"code": 404,
"message": "车辆不存在",
"data": null
}
5.3 状态码说明
状态码
说明
处理建议
200
成功
-
400
参数错误
检查请求参数格式
401
认证失败
检查API密钥和签名
403
权限不足
检查API权限范围
404
车辆不存在
检查vehicle_id是否正确
429
请求频率超限
降低请求频率
500
服务器错误
稍后重试
六、高级功能实现
6.1 智能数据解析
class IntelligentVehicleParser:
"""智能车辆数据解析器"""
def __init__(self):
self.spec_categories = {
'车身': ['长宽高', '轴距', '整备质量', '车门数'],
'发动机': ['排量', '最大功率', '最大扭矩', '气缸数', '燃料形式'],
'变速箱': ['变速箱类型', '挡位个数'],
'底盘转向': ['驱动方式', '前悬架', '后悬架'],
'车轮制动': ['前制动器', '后制动器', '驻车制动'],
'安全配置': ['气囊数量', 'ABS', 'ESP'],
'舒适配置': ['座椅材质', '空调类型', '天窗']
}
def parse_vehicle_specs(self, specs: List[VehicleSpec]) -> Dict[str, Dict[str, str]]:
"""解析车辆配置信息"""
parsed_specs = {}
for category in self.spec_categories.keys():
parsed_specs[category] = {}
for spec in specs:
for category, spec_names in self.spec_categories.items():
if spec.name in spec_names:
parsed_specs[category][spec.name] = spec.value
break
else:
# 未分类的配置
if '其他' not in parsed_specs:
parsed_specs['其他'] = {}
parsed_specs['其他'][spec.name] = spec.value
return parsed_specs
def extract_technical_data(self, vehicle_data: Dict[str, Any]) -> Dict[str, Any]:
"""提取技术数据"""
basic_info = vehicle_data.get('basic_info', {})
# 解析功率和扭矩
power_match = re.search(r'(\d+(\.\d+)?)', basic_info.power or '')
torque_match = re.search(r'(\d+(\.\d+)?)', basic_info.torque or '')
fuel_match = re.search(r'(\d+(\.\d+)?)', basic_info.fuel_consumption or '')
return {
'power_kw': float(power_match.group(1)) * 0.735 if power_match else None,
'torque_nm': float(torque_match.group(1)) if torque_match else None,
'fuel_consumption_l_100km': float(fuel_match.group(1)) if fuel_match else None,
'displacement_cc': self._parse_displacement(basic_info.displacement or ''),
'transmission_type': self._classify_transmission(basic_info.transmission or ''),
'body_type_code': self._classify_body_type(basic_info.body_type or '')
}
def _parse_displacement(self, displacement: str) -> int:
"""解析排量"""
match = re.search(r'(\d+(\.\d+)?)', displacement)
if match:
return int(float(match.group(1)) * 1000)
return 0
def _classify_transmission(self, transmission: str) -> str:
"""分类变速箱类型"""
if '手自一体' in transmission or 'AT' in transmission:
return 'AT'
elif '双离合' in transmission or 'DCT' in transmission:
return 'DCT'
elif '无级变速' in transmission or 'CVT' in transmission:
return 'CVT'
elif '手动' in transmission or 'MT' in transmission:
return 'MT'
else:
return 'UNKNOWN'
def _classify_body_type(self, body_type: str) -> str:
"""分类车身类型"""
mapping = {
'轿车': 'SEDAN',
'SUV': 'SUV',
'MPV': 'MPV',
'跑车': 'COUPE',
'皮卡': 'PICKUP',
'微面': 'VAN'
}
return mapping.get(body_type, 'UNKNOWN')
6.2 数据缓存优化
import redis
from functools import lru_cache
class CachedDongchediAPI(DongchediAPI):
"""带缓存的懂车帝API"""
def __init__(self, app_key, app_secret, redis_client, sandbox=True):
super().__init__(app_key, app_secret, sandbox)
self.redis = redis_client
self.cache_prefix = "dongchedi:vehicle:"
@lru_cache(maxsize=1000)
def get_vehicle_detail_cached(self, vehicle_id: int, **kwargs) -> Dict[str, Any]:
"""
带缓存的车辆详情获取
"""
cache_key = f"{self.cache_prefix}{vehicle_id}"
# 检查Redis缓存
if self.redis:
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
# 调用API
data = super().get_vehicle_detail(vehicle_id, **kwargs)
# 缓存到Redis
if self.redis and data.get('success'):
# 根据数据更新频率设置不同TTL
ttl = self._calculate_ttl(data)
self.redis.setex(cache_key, ttl, json.dumps(data))
return data
def _calculate_ttl(self, vehicle_data: Dict[str, Any]) -> int:
"""根据车辆信息计算缓存时间"""
year = vehicle_data.get('data', {}).get('year', 0)
current_year = datetime.now().year
# 老车型缓存时间更长
if year < current_year - 3:
return 24 * 3600 # 24小时
elif year < current_year - 1:
return 12 * 3600 # 12小时
else:
return 3600 # 1小时
6.3 批量处理优化
import asyncio
import aiohttp
class AsyncDongchediAPI:
"""异步懂车帝API客户端"""
def __init__(self, app_key, app_secret, sandbox=True):
self.app_key = app_key
self.app_secret = app_secret
self.base_url = "https://sandbox-openapi.dongchedi.com" if sandbox else "https://openapi.dongchedi.com"
self.session = None
self.access_token = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
await self._get_access_token()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def _get_access_token(self):
"""异步获取访问令牌"""
timestamp = int(time.time())
params = {
'app_key': self.app_key,
'timestamp': timestamp,
'grant_type': 'client_credentials'
}
signature = self._generate_signature(params, timestamp)
params['sign'] = signature
async with self.session.post(
f"{self.base_url}/oauth/token",
data=params
) as response:
if response.status == 200:
data = await response.json()
self.access_token = data['access_token']
else:
raise Exception(f"获取token失败: {response.status}")
async def get_vehicle_detail_async(self, vehicle_id: int, **kwargs) -> Dict[str, Any]:
"""异步获取车辆详情"""
params = {
'include_images': str(kwargs.get('include_images', True)).lower(),
'include_specs': str(kwargs.get('include_specs', True)).lower(),
'include_prices': str(kwargs.get('include_prices', True)).lower(),
'include_reviews': str(kwargs.get('include_reviews', False)).lower(),
'include_ratings': str(kwargs.get('include_ratings', True)).lower()
}
headers = {
'Authorization': f'Bearer {self.access_token}',
'Content-Type': 'application/json'
}
url = f"{self.base_url}/api/v1/vehicle/{vehicle_id}"
async with self.session.get(
url,
params=params,
headers=headers
) as response:
if response.status == 200:
return await response.json()
else:
raise Exception(f"请求失败: {response.status}")
async def batch_get_vehicle_details_async(
self,
vehicle_ids: List[int],
**kwargs
) -> Dict[int, Dict[str, Any]]:
"""异步批量获取车辆详情"""
tasks = []
for vehicle_id in vehicle_ids:
task = self.get_vehicle_detail_async(vehicle_id, **kwargs)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return {
vehicle_id: result
for vehicle_id, result in zip(vehicle_ids, results)
if not isinstance(result, Exception)
}
使用示例
async def demo_async_api():
"""异步API使用示例"""
async with AsyncDongchediAPI('your_app_key', 'your_app_secret') as client:
vehicle_ids = [12345, 12346, 12347, 12348, 12349]
results = await client.batch_get_vehicle_details_async(vehicle_ids)
for vehicle_id, result in results.items():
print(f"车辆 {vehicle_id}: {result.get('data', {}).get('model')}")
七、实战应用场景
7.1 汽车电商平台集成
class CarEcommercePlatform:
"""汽车电商平台集成"""
def __init__(self, dongchedi_client):
self.client = dongchedi_client
self.vehicle_cache = {}
def get_vehicle_display_info(self, vehicle_id: int) -> Dict[str, Any]:
"""获取车辆展示信息"""
# 获取车辆详情
vehicle_data = self.client.get_vehicle_detail_structured(vehicle_id)
basic_info = vehicle_data['basic_info']
images = vehicle_data['images']
rating = vehicle_data['rating']
# 构建展示信息
display_info = {
'vehicle_id': vehicle_id,
'title': f"{basic_info.brand} {basic_info.series} {basic_info.model}",
'year': basic_info.year,
'specs': {
'fuel_type': basic_info.fuel_type,
'transmission': basic_info.transmission,
'power': basic_info.power,
'fuel_consumption': basic_info.fuel_consumption
},
'main_image': self._get_main_image(images),
'price_info': {
'msrp': basic_info.msrp,
'market_price': basic_info.market_price,
'discount': basic_info.msrp - basic_info.market_price
},
'rating_info': {
'overall': rating.overall,
'review_count': rating.rereview_count,
'recommend_rate': rating.recommend_rate
},
'features': self._extract_features(vehicle_data)
}
return display_info
def _get_main_image(self, images: List[VehicleImage]) -> str:
"""获取主图"""
for image in images:
if image.type == 'exterior' and image.size == 'large':
return image.url
return images[0].url if images else ''
7.2 汽车对比工具
class VehicleComparisonTool:
"""车辆对比工具"""
def __init__(self, dongchedi_client):
self.client = dongchedi_client
def compare_vehicles(self, vehicle_ids: List[int]) -> Dict[str, Any]:
"""对比多个车辆"""
if len(vehicle_ids) > 5:
raise Exception("最多支持同时对比5辆车")
# 获取车辆详情
vehicle_details = self.client.batch_get_vehicle_details(vehicle_ids)
# 构建对比数据
comparison_data = {
'vehicles': [],
'common_specs': self._get_common_specs(vehicle_details),
'price_comparison': self._compare_prices(vehicle_details),
'rating_comparison': self._compare_ratings(vehicle_details)
}
for vehicle_id, detail in vehicle_details.items():
if 'error' not in detail:
data = detail.get('data', {})
comparison_data['vehicles'].append({
'vehicle_id': vehicle_id,
'basic_info': {
'brand': data.get('brand', {}).get('name'),
'series': data.get('series', {}).get('name'),
'model': data.get('model'),
'year': data.get('year'),
'msrp': data.get('msrp')
},
'rating': data.get('rating', {})
})
return comparison_data
def _get_common_specs(self, vehicle_details: Dict[int, Dict[str, Any]]) -> List[Dict[str, Any]]:
"""获取共同配置项"""
all_spec_names = set()
specs_by_vehicle = {}
for vehicle_id, detail in vehicle_details.items():
if 'error' not in detail:
specs = detail.get('data', {}).get('specs', [])
spec_names = [spec.get('name') for spec in specs]
all_spec_names.update(spec_names)
specs_by_vehicle[vehicle_id] = {spec.get('name'): spec.get('value') for spec in specs}
# 返回出现次数大于1的配置项
common_specs = []
for spec_name in all_spec_names:
spec_values = {}
for vehicle_id in vehicle_details.keys():
if vehicle_id in specs_by_vehicle and spec_name in specs_by_vehicle[vehicle_id]:
spec_values[vehicle_id] = specs_by_vehicle[vehicle_id][spec_name]
if len(spec_values) > 1:
common_specs.append({
'name': spec_name,
'values': spec_values
})
return common_specs[:10] # 限制返回数量
八、故障排查与优化
8.1 常见问题解决
问题1:签名验证失败
def debug_signature_generation(params, app_secret, timestamp):
"""调试签名生成过程"""
print("=== 签名调试信息 ===")
# 排序参数
sorted_params = sorted(params.items())
param_str = ''
for key, value in sorted_params:
if value is not None and key != 'sign':
if isinstance(value, (list, dict)):
value = json.dumps(value, separators=(',', ':'))
param_str += f"{key}{value}"
sign_str = f"{app_key}{param_str}{timestamp}{app_secret}"
print(f"签名字符串: {sign_str}")
# 计算签名
import hmac
signature = hmac.new(
app_secret.encode('utf-8'),
sign_str.encode('utf-8'),
hashlib.sha256
).hexdigest()
print(f"计算签名: {signature}")
return signature
问题2:Token过期处理
def get_vehicle_detail_with_retry(self, vehicle_id: int, max_retries: int = 3, kwargs):
"""带重试的车辆详情获取"""
for attempt in range(max_retries):
try:
return self.get_vehicle_detail(vehicle_id, kwargs)
except Exception as e:
if "401" in str(e) and attempt < max_retries - 1:
# Token过期,重新获取
self._access_token = None
time.sleep(1)
continue
else:
raise
8.2 性能优化建议
合理使用缓存
多级缓存策略
class MultiLevelCache:
def init(self, redis_client):
self.memory_cache = {}
self.redis = redis_client
self.memory_ttl = 300 # 5分钟
self.redis_ttl = 3600 # 1小时
def get_vehicle_data(self, vehicle_id):
# 1. 检查内存缓存
if vehicle_id in self.memory_cache:
data, expire_time = self.memory_cache[vehicle_id]
if time.time() < expire_time:
return data
# 2. 检查Redis缓存
if self.redis:
cache_key = f"dongchedi:vehicle:{vehicle_id}"
cached = self.redis.get(cache_key)
if cached:
data = json.loads(cached)
# 更新内存缓存
self.memory_cache[vehicle_id] = (data, time.time() + self.memory_ttl)
return data
return None
批量请求优化
使用异步处理提高吞吐量
async def process_vehicle_batch(vehicle_ids, batch_size=10):
"""批量处理车辆数据"""
results = []
for i in range(0, len(vehicle_ids), batch_size):
batch = vehicle_ids[i:i+batch_size]
batch_results = await asyncio.gather(*[
get_vehicle_detail_async(vehicle_id)
for vehicle_id in batch
])
results.extend(batch_results)
await asyncio.sleep(0.1) # 避免频率限制
return results
九、最佳实践总结
9.1 安全实践
密钥保护:使用环境变量存储API密钥
HTTPS强制:确保所有请求使用HTTPS
输入验证:验证所有输入参数
错误处理:不暴露敏感错误信息
9.2 性能实践
缓存策略:根据数据更新频率设置合适的缓存时间
批量操作:合并多个请求减少API调用次数
异步处理:使用异步IO提高并发性能
连接复用:使用连接池减少连接建立开销
9.3 代码质量
异常处理:完善的异常处理和重试机制
日志记录:详细记录API调用情况
单元测试:编写测试用例覆盖主要功能
类型注解:使用类型注解提高代码可读性
附录:快速开始模板
quick_start.py
from dongchedi_api import DongchediAPI
1. 初始化客户端
client = DongchediAPI(
app_key="your_app_key",
app_secret="your_app_secret",
sandbox=True
)
2. 获取车辆详情
vehicle_detail = client.get_vehicle_detail(12345)
print(f"车辆: {vehicle_detail['data']['brand']['name']} {vehicle_detail['data']['model']}")
3. 获取结构化信息
structured_info = client.get_vehicle_detail_structured(12345)
print(f"排量: {structured_info['basic_info'].displacement}")
print(f"功率: {structured_info['basic_info'].power}")
print(f"综合评分: {structured_info['rating'].overall}")
4. 批量获取
vehicle_ids = [12345, 12346, 12347]
batch_results = client.batch_get_vehicle_details(vehicle_ids)
for vid, result in batch_results.items():
if 'error' not in result:
data = result.get('data', {})
print(f"车辆 {vid}: {data.get('brand', {}).get('name')} {data.get('model')}")
通过本攻略,您应该能够:
理解懂车帝item_get接口的完整功能
实现安全的API认证和请求
处理各种错误情况和性能优化
在实际项目中灵活应用该接口
建议根据实际业务需求选择合适的实现方案,并遵循最佳实践确保系统的稳定性和可维护性