一、接口概览
1.1 接口简介
item_search接口是懂车帝开放平台的核心搜索接口,支持多维度车辆检索,包括新车、二手车、车型对比、资讯搜索等。该接口返回结构化的搜索结果,支持分页、排序、字段筛选等高级功能。
1.2 核心功能
✅ 多类型搜索:新车、二手车、车型、资讯、视频等
✅ 多条件筛选:品牌、价格、级别、排量、配置等
✅ 智能排序:按相关性、价格、销量、评分等排序
✅ 分页查询:支持大数据量的分页加载
✅ 字段选择:可指定返回字段,优化网络传输
二、准备工作
2.1 环境配置
requirements.txt
requests>=2.28.0
python-dotenv>=1.0.0
pydantic>=2.0.0
aiohttp>=3.8.0
redis>=4.5.0
2.2 认证配置
config.py
import os
from dotenv import load_dotenv
load_dotenv()
class DongchediConfig:
# 懂车帝API配置
DONGCHEDI_APP_KEY = os.getenv('DONGCHEDI_APP_KEY')
DONGCHEDI_APP_SECRET = os.getenv('DONGCHEDI_APP_SECRET')
DONGCHEDI_API_BASE = os.getenv('DONGCHEDI_API_BASE',
'https://openapi.dongchedi.com/api/v1'
)
# 请求配置
REQUEST_TIMEOUT = 30
MAX_RETRIES = 3
DEFAULT_PAGE_SIZE = 20
MAX_PAGE_SIZE = 100
# 缓存配置
CACHE_TTL = 3600 # 1小时
SEARCH_CACHE_TTL = 1800 # 30分钟
三、接口详解
3.1 接口地址
GET /search
3.2 请求参数详解
公共参数
参数名
类型
必填
说明
示例
app_key
string
是
应用标识
dcd_app_2024
timestamp
int
是
时间戳
1706774400
sign
string
是
请求签名
详见签名算法
format
string
否
返回格式
json(默认)
version
string
是
API版本
1.0
搜索参数
参数名
类型
必填
说明
示例
q
string
否
搜索关键词
"宝马3系"
search_type
string
否
搜索类型
new_car/used_car/model/news/video
brand_id
int
否
品牌ID
2(宝马)
series_id
int
否
车系ID
20(3系)
price_min
float
否
最低价格(万元)
20.0
price_max
float
否
最高价格(万元)
50.0
level
string
否
车辆级别
A级/B级/C级/SUV/MPV
fuel_type
string
否
燃油类型
gasoline/diesel/hybrid/electric
transmission
string
否
变速箱
manual/automatic/cvt/dct
displacement_min
float
否
最小排量(L)
1.5
displacement_max
float
否
最大排量(L)
3.0
year_min
int
否
最小年份
2020
year_max
int
否
最大年份
2023
region
string
否
地区
北京/上海/广州
sort_by
string
否
排序字段
relevance/price/sales/rating
sort_order
string
否
排序方向
asc/desc(默认desc)
page
int
否
页码
1(默认)
per_page
int
否
每页条数
20(默认)
fields
string
否
返回字段
id,title,price,brand,rating
include_facets
bool
否
是否包含聚合信息
true
四、完整代码实现
4.1 Python完整实现
import requests
import time
import hashlib
import hmac
import json
from typing import Dict, Any, List, Optional
from datetime import datetime, timedelta
from dataclasses import dataclass
from urllib.parse import urlencode
import redis
@dataclass
class SearchResultItem:
"""搜索结果项"""
id: int
type: str # new_car, used_car, model, news, video
title: str
description: str
url: str
image_url: str
price: Optional[float] = None
original_price: Optional[float] = None
brand: Optional[str] = None
series: Optional[str] = None
model: Optional[str] = None
year: Optional[int] = None
fuel_type: Optional[str] = None
transmission: Optional[str] = None
rating: Optional[float] = None
review_count: Optional[int] = None
sales: Optional[int] = None
publish_time: Optional[str] = None
view_count: Optional[int] = None
like_count: Optional[int] = None
@dataclass
class SearchFacet:
"""搜索聚合信息"""
field: str
values: List[Dict[str, Any]] # [{value: "宝马", count: 100}, ...]
@dataclass
class SearchResult:
"""搜索结果"""
success: bool
code: int
message: str
data: Dict[str, Any]
items: List[SearchResultItem]
pagination: Dict[str, Any]
facets: List[SearchFacet]
total_count: int
search_time: float
class DongchediSearchAPI:
"""懂车帝搜索API客户端"""
def __init__(self, app_key: str, app_secret: str, sandbox: bool = True, redis_client=None):
self.app_key = app_key
self.app_secret = app_secret
self.base_url = "https://sandbox-openapi.dongchedi.com" if sandbox else "https://openapi.dongchedi.com"
self.session = requests.Session()
self.session.headers.update({
'User-Agent': 'Dongchedi-Search-API/1.0',
'Accept': 'application/json'
})
self.redis = redis_client
self._access_token = None
self._token_expires = None
def _generate_signature(self, params: Dict[str, Any], timestamp: int) -> str:
"""生成请求签名"""
# 过滤并排序参数
filtered_params = {
k: v for k, v in params.items()
if v is not None and k != 'sign'
}
sorted_keys = sorted(filtered_params.keys())
# 拼接参数字符串
sign_str = ''
for key in sorted_keys:
if isinstance(filtered_params[key], (list, dict)):
value = json.dumps(filtered_params[key], separators=(',', ':'))
else:
value = str(filtered_params[key])
sign_str += f"{key}{value}"
# 添加app_secret
sign_str += self.app_secret
# 计算HMAC-SHA256签名
signature = hmac.new(
self.app_secret.encode('utf-8'),
sign_str.encode('utf-8'),
hashlib.sha256
).hexdigest()
return signature
def _get_access_token(self) -> str:
"""获取访问令牌"""
# 检查token是否有效
if self._access_token and self._token_expires and self._token_expires > datetime.now():
return self._access_token
# 获取新token
timestamp = int(time.time())
params = {
'app_key': self.app_key,
'timestamp': timestamp,
'grant_type': 'client_credentials'
}
# 生成签名
signature = self._generate_signature(params, timestamp)
params['sign'] = signature
# 请求token
url = f"{self.base_url}/oauth/token"
response = self.session.post(url, data=params)
if response.status_code == 200:
data = response.json()
self._access_token = data['access_token']
self._token_expires = datetime.now() + timedelta(seconds=data['expires_in'] - 300) # 提前5分钟过期
return self._access_token
else:
raise Exception(f"获取token失败: {response.status_code} - {response.text}")
def search(
self,
query: Optional[str] = None,
search_type: Optional[str] = None,
brand_id: Optional[int] = None,
series_id: Optional[int] = None,
price_min: Optional[float] = None,
price_max: Optional[float] = None,
level: Optional[str] = None,
fuel_type: Optional[str] = None,
transmission: Optional[str] = None,
displacement_min: Optional[float] = None,
displacement_max: Optional[float] = None,
year_min: Optional[int] = None,
year_max: Optional[int] = None,
region: Optional[str] = None,
sort_by: str = "relevance",
sort_order: str = "desc",
page: int = 1,
per_page: int = 20,
fields: Optional[List[str]] = None,
include_facets: bool = False
) -> SearchResult:
"""
执行搜索
Args:
query: 搜索关键词
search_type: 搜索类型
brand_id: 品牌ID
series_id: 车系ID
price_min: 最低价格
price_max: 最高价格
level: 车辆级别
fuel_type: 燃油类型
transmission: 变速箱类型
displacement_min: 最小排量
displacement_max: 最大排量
year_min: 最小年份
year_max: 最大年份
region: 地区
sort_by: 排序字段
sort_order: 排序方向
page: 页码
per_page: 每页条数
fields: 返回字段列表
include_facets: 是否包含聚合信息
Returns:
搜索结果
"""
# 获取访问令牌
access_token = self._get_access_token()
# 构建请求参数
params = {
'app_key': self.app_key,
'timestamp': int(time.time()),
'format': 'json',
'version': '1.0',
'sort_by': sort_by,
'sort_order': sort_order,
'page': page,
'per_page': min(per_page, DongchediConfig.MAX_PAGE_SIZE)
}
# 添加可选参数
if query:
params['q'] = query
if search_type:
params['search_type'] = search_type
if brand_id:
params['brand_id'] = brand_id
if series_id:
params['series_id'] = series_id
if price_min:
params['price_min'] = price_min
if price_max:
params['price_max'] = price_max
if level:
params['level'] = level
if fuel_type:
params['fuel_type'] = fuel_type
if transmission:
params['transmission'] = transmission
if displacement_min:
params['displacement_min'] = displacement_min
if displacement_max:
params['displacement_max'] = displacement_max
if year_min:
params['year_min'] = year_min
if year_max:
params['year_max'] = year_max
if region:
params['region'] = region
if fields:
params['fields'] = ','.join(fields)
if include_facets:
params['include_facets'] = 'true'
# 生成签名
signature = self._generate_signature(params, params['timestamp'])
params['sign'] = signature
# 添加认证头
headers = {
'Authorization': f'Bearer {access_token}',
'Content-Type': 'application/json'
}
# 发送请求
url = f"{self.base_url}/api/v1/search"
try:
start_time = time.time()
response = self.session.get(
url,
params=params,
headers=headers,
timeout=DongchediConfig.REQUEST_TIMEOUT
)
search_time = time.time() - start_time
if response.status_code == 200:
result = response.json()
# 解析结果
items = self._parse_search_items(result.get('data', {}).get('items', []))
pagination = result.get('data', {}).get('pagination', {})
facets = self._parse_facets(result.get('data', {}).get('facets', []))
total_count = pagination.get('total_count', 0)
return SearchResult(
success=result.get('success', False),
code=result.get('code', 0),
message=result.get('message', ''),
data=result.get('data', {}),
items=items,
pagination=pagination,
facets=facets,
total_count=total_count,
search_time=search_time
)
elif response.status_code == 401:
# Token过期,重新获取
self._access_token = None
return self.search(
query=query, search_type=search_type, brand_id=brand_id, series_id=series_id,
price_min=price_min, price_max=price_max, level=level, fuel_type=fuel_type,
transmission=transmission, displacement_min=displacement_min, displacement_max=displacement_max,
year_min=year_min, year_max=year_max, region=region, sort_by=sort_by, sort_order=sort_order,
page=page, per_page=per_page, fields=fields, include_facets=include_facets
)
else:
return SearchResult(
success=False,
code=response.status_code,
message=f"HTTP {response.status_code}",
data={},
items=[],
pagination={},
facets=[],
total_count=0,
search_time=0
)
except requests.exceptions.Timeout:
return SearchResult(
success=False,
code=408,
message="请求超时",
data={},
items=[],
pagination={},
facets=[],
total_count=0,
search_time=0
)
except requests.exceptions.RequestException as e:
return SearchResult(
success=False,
code=500,
message=f"网络请求异常: {str(e)}",
data={},
items=[],
pagination={},
facets=[],
total_count=0,
search_time=0
)
def _parse_search_items(self, items_data: List[Dict[str, Any]]) -> List[SearchResultItem]:
"""解析搜索结果项"""
items = []
for item_data in items_data:
try:
item = SearchResultItem(
id=item_data.get('id'),
type=item_data.get('type', ''),
title=item_data.get('title', ''),
description=item_data.get('description', ''),
url=item_data.get('url', ''),
image_url=item_data.get('image_url', ''),
price=item_data.get('price'),
original_price=item_data.get('original_price'),
brand=item_data.get('brand'),
series=item_data.get('series'),
model=item_data.get('model'),
year=item_data.get('year'),
fuel_type=item_data.get('fuel_type'),
transmission=item_data.get('transmission'),
rating=item_data.get('rating'),
review_count=item_data.get('review_count'),
sales=item_data.get('sales'),
publish_time=item_data.get('publish_time'),
view_count=item_data.get('view_count'),
like_count=item_data.get('like_count')
)
items.append(item)
except Exception as e:
print(f"解析搜索结果项失败: {e}, 数据: {item_data}")
continue
return items
def _parse_facets(self, facets_data: List[Dict[str, Any]]) -> List[SearchFacet]:
"""解析聚合信息"""
facets = []
for facet_data in facets_data:
facet = SearchFacet(
field=facet_data.get('field', ''),
values=facet_data.get('values', [])
)
facets.append(facet)
return facets
def search_all(
self,
max_pages: int = 10,
**search_params
) -> List[SearchResultItem]:
"""
获取所有符合条件的搜索结果(自动处理分页)
Args:
max_pages: 最大页数限制
**search_params: 搜索参数
Returns:
所有搜索结果项
"""
all_items = []
page = 1
while page <= max_pages:
result = self.search(page=page, **search_params)
if not result.success:
print(f"第{page}页查询失败: {result.message}")
break
# 添加当前页数据
all_items.extend(result.items)
pagination = result.pagination
print(f"已获取第{page}页,共{len(result.items)}条,总计{len(all_items)}条")
# 检查是否还有下一页
has_next = pagination.get('has_next', False)
total_pages = pagination.get('total_pages', 0)
if not has_next or page >= total_pages:
break
page += 1
# 避免请求过于频繁
time.sleep(0.5)
return all_items
def search_new_cars(
self,
query: Optional[str] = None,
brand_id: Optional[int] = None,
price_min: Optional[float] = None,
price_max: Optional[float] = None,
level: Optional[str] = None,
**kwargs
) -> SearchResult:
"""
搜索新车
"""
return self.search(
query=query,
search_type='new_car',
brand_id=brand_id,
price_min=price_min,
price_max=price_max,
level=level,
**kwargs
)
def search_used_cars(
self,
query: Optional[str] = None,
price_min: Optional[float] = None,
price_max: Optional[float] = None,
year_min: Optional[int] = None,
year_max: Optional[int] = None,
region: Optional[str] = None,
**kwargs
) -> SearchResult:
"""
搜索二手车
"""
return self.search(
query=query,
search_type='used_car',
price_min=price_min,
price_max=price_max,
year_min=year_min,
year_max=year_max,
region=region,
**kwargs
)
def search_models(
self,
query: Optional[str] = None,
brand_id: Optional[int] = None,
series_id: Optional[int] = None,
**kwargs
) -> SearchResult:
"""
搜索车型
"""
return self.search(
query=query,
search_type='model',
brand_id=brand_id,
series_id=series_id,
**kwargs
)
def search_news(
self,
query: Optional[str] = None,
**kwargs
) -> SearchResult:
"""
搜索资讯
"""
return self.search(
query=query,
search_type='news',
**kwargs
)
def search_videos(
self,
query: Optional[str] = None,
**kwargs
) -> SearchResult:
"""
搜索视频
"""
return self.search(
query=query,
search_type='video',
**kwargs
)
使用示例
def demo_search_api():
"""搜索API使用演示"""
# 初始化客户端
client = DongchediSearchAPI(
app_key=DongchediConfig.DONGCHEDI_APP_KEY,
app_secret=DongchediConfig.DONGCHEDI_APP_SECRET,
sandbox=True
)
print("=== 示例1:新车搜索 ===")
result = client.search_new_cars(
query="宝马3系",
price_min=25,
price_max=40,
per_page=5
)
if result.success:
for item in result.items:
print(f"{item.brand} {item.model} - {item.price}万 - 评分{item.rating}")
print("\n=== 示例2:二手车搜索 ===")
result = client.search_used_cars(
query="奥迪A4L",
price_min=15,
price_max=25,
year_min=2018,
year_max=2021,
region="北京"
)
print("\n=== 示例3:车型搜索 ===")
result = client.search_models(
query="SUV",
level="SUV",
price_min=20,
price_max=50,
include_facets=True
)
if result.success:
print(f"找到 {result.total_count} 个车型")
for facet in result.facets:
if facet.field == 'brand':
print("品牌分布:")
for value in facet.values[:5]:
print(f" {value['value']}: {value['count']}个")
if name == "main":
demo_search_api()
4.2 Java实现
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import okhttp3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.InvalidKeyException;
import java.security.NoSuchAlgorithmException;
import java.time.LocalDateTime;
import java.util.*;
import java.util.concurrent.TimeUnit;
public class DongchediSearchClient {
private static final Logger logger = LoggerFactory.getLogger(DongchediSearchClient.class);
private final String appKey;
private final String appSecret;
private final String baseUrl;
private final OkHttpClient httpClient;
private final ObjectMapper objectMapper;
private String accessToken;
private LocalDateTime tokenExpires;
public DongchediSearchClient(String appKey, String appSecret, boolean sandbox) {
this.appKey = appKey;
this.appSecret = appSecret;
this.baseUrl = sandbox ?
"https://sandbox-openapi.dongchedi.com" :
"https://openapi.dongchedi.com";
this.httpClient = new OkHttpClient.Builder()
.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS)
.build();
this.objectMapper = new ObjectMapper();
this.objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
}
public SearchResult search(SearchParams params) throws IOException {
// 获取访问令牌
String token = getAccessToken();
// 构建请求URL
HttpUrl.Builder urlBuilder = HttpUrl.parse(baseUrl + "/api/v1/search").newBuilder();
// 添加公共参数
urlBuilder.addQueryParameter("app_key", appKey);
urlBuilder.addQueryParameter("timestamp", String.valueOf(System.currentTimeMillis() / 1000));
urlBuilder.addQueryParameter("format", "json");
urlBuilder.addQueryParameter("version", "1.0");
// 添加搜索参数
if (params.getQuery() != null) {
urlBuilder.addQueryParameter("q", params.getQuery());
}
if (params.getSearchType() != null) {
urlBuilder.addQueryParameter("search_type", params.getSearchType());
}
if (params.getBrandId() != null) {
urlBuilder.addQueryParameter("brand_id", params.getBrandId().toString());
}
// ... 其他参数
// 生成签名
Map<String, String> signParams = new HashMap<>();
for (String name : urlBuilder.build().queryParameterNames()) {
signParams.put(name, urlBuilder.build().queryParameter(name));
}
String signature = generateSignature(signParams, Long.parseLong(signParams.get("timestamp")));
urlBuilder.addQueryParameter("sign", signature);
// 构建请求
Request request = new Request.Builder()
.url(urlBuilder.build())
.addHeader("Authorization", "Bearer " + token)
.addHeader("Content-Type", "application/json")
.addHeader("User-Agent", "Dongchedi-Java-Client/1.0")
.build();
// 发送请求
try (Response response = httpClient.newCall(request).execute()) {
if (response.isSuccessful()) {
String responseBody = response.body().string();
return objectMapper.readValue(responseBody, SearchResult.class);
} else {
throw new IOException("请求失败: " + response.code());
}
}
}
// 省略其他方法...
}
class SearchParams {
private String query;
private String searchType;
private Integer brandId;
private Integer seriesId;
private Double priceMin;
private Double priceMax;
private String level;
private String fuelType;
private String transmission;
private Double displacementMin;
private Double displacementMax;
private Integer yearMin;
private Integer yearMax;
private String region;
private String sortBy = "relevance";
private String sortOrder = "desc";
private Integer page = 1;
private Integer perPage = 20;
private List fields;
private Boolean includeFacets = false;
// 省略getter/setter方法
}
4.3 PHP实现
<?php
class DongchediSearchService
{
private $appKey;
private $appSecret;
private $baseUrl;
private $accessToken;
private $tokenExpires;
public function __construct($appKey, $appSecret, $sandbox = true)
{
$this->appKey = $appKey;
$this->appSecret = $appSecret;
$this->baseUrl = $sandbox
? 'https://sandbox-openapi.dongchedi.com'
: 'https://openapi.dongchedi.com';
}
public function search($params = [])
{
// 获取访问令牌
$token = $this->getAccessToken();
// 构建请求参数
$requestParams = [
'app_key' => $this->appKey,
'timestamp' => time(),
'format' => 'json',
'version' => '1.0'
];
// 合并搜索参数
$requestParams = array_merge($requestParams, $params);
// 生成签名
$signature = $this->generateSignature($requestParams);
$requestParams['sign'] = $signature;
// 发送请求
$url = $this->baseUrl . '/api/v1/search?' . http_build_query($requestParams);
$ch = curl_init();
curl_setopt_array($ch, [
CURLOPT_URL => $url,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_TIMEOUT => 30,
CURLOPT_HTTPHEADER => [
'Authorization: Bearer ' . $token,
'User-Agent: Dongchedi-PHP-Client/1.0'
]
]);
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
if ($httpCode === 200) {
return json_decode($response, true);
} else {
throw new Exception("请求失败: HTTP {$httpCode}");
}
}
public function searchNewCars($query = null, $brandId = null, $priceMin = null, $priceMax = null, $level = null, $page = 1, $perPage = 20)
{
$params = [
'search_type' => 'new_car',
'page' => $page,
'per_page' => $perPage
];
if ($query) $params['q'] = $query;
if ($brandId) $params['brand_id'] = $brandId;
if ($priceMin) $params['price_min'] = $priceMin;
if ($priceMax) $params['price_max'] = $priceMax;
if ($level) $params['level'] = $level;
return $this->search($params);
}
// 省略其他搜索方法...
private function getAccessToken()
{
// 检查token是否有效
if ($this->accessToken && $this->tokenExpires && $this->tokenExpires > time()) {
return $this->accessToken;
}
// 获取新token
$timestamp = time();
$params = [
'app_key' => $this->appKey,
'timestamp'] = $timestamp,
'grant_type' => 'client_credentials'
];
$signature = $this->generateSignature($params);
$params['sign'] = $signature;
$ch = curl_init();
curl_setopt_array($ch, [
CURLOPT_URL => $this->baseUrl . '/oauth/token',
CURLOPT_RETURNTRANSFER => true,
CURLOPT_POST => true,
CURLOPT_POSTFIELDS => http_build_query($params),
CURLOPT_TIMEOUT => 30
]);
$response = curl_exec($ch);
$httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);
curl_close($ch);
if ($httpCode === 200) {
$result = json_decode($response, true);
$this->accessToken = $result['access_token'];
$this->tokenExpires = time() + $result['expires_in'] - 300; // 提前5分钟过期
return $this->accessToken;
} else {
throw new Exception("获取token失败: HTTP {$httpCode}");
}
}
private function generateSignature($params)
{
// 移除sign参数并排序
unset($params['sign']);
ksort($params);
// 拼接参数字符串
$paramStr = '';
foreach ($params as $key => $value) {
$paramStr .= $key . '=' . $value . '&';
}
$paramStr = rtrim($paramStr, '&');
// 构建签名字符串
$signStr = $this->appKey . $paramStr . $params['timestamp'] . $this->appSecret;
// 计算HMAC-SHA256
return hash_hmac('sha256', $signStr, $this->appSecret);
}
}
// 使用示例
try {
$service = new DongchediSearchService('your_app_key', 'your_app_secret');
$result = $service->searchNewCars('宝马3系', null, 25, 40);
if ($result['success']) {
foreach ($result['data']['items'] as $item) {
echo "{$item['brand']} {$item['model']} - {$item['price']}万\n";
}
}
} catch (Exception $e) {
echo "错误: " . $e->getMessage() . "\n";
}
?>
五、返回结果解析
5.1 成功响应示例
{
"success": true,
"code": 200,
"message": "成功",
"data": {
"items": [
{
"id": 12345,
"type": "new_car",
"title": "2023款 宝马3系 325Li M运动套装",
"description": "2023款宝马3系325Li M运动套装,搭载2.0T发动机,最大功率184马力...",
"url": "https://www.dongchedi.com/vehicle/12345",
"image_url": "https://img.dongchedi.com/vehicle/12345.jpg",
"price": 34.99,
"original_price": 34.99,
"brand": "宝马",
"series": "3系",
"model": "325Li M运动套装",
"year": 2023,
"fuel_type": "汽油",
"transmission": "自动",
"rating": 4.5,
"review_count": 1250,
"sales": 500,
"publish_time": "2023-01-15 10:00:00"
},
{
"id": 12346,
"type": "new_car",
"title": "2023款 宝马3系 320Li 时尚型",
"description": "2023款宝马3系320Li时尚型,搭载2.0T发动机,最大功率156马力...",
"url": "https://www.dongchedi.com/vehicle/12346",
"image_url": "https://img.dongchedi.com/vehicle/12346.jpg",
"price": 29.99,
"original_price": 29.99,
"brand": "宝马",
"series": "3系",
"model": "320Li 时尚型",
"year": 2023,
"fuel_type": "汽油",
"transmission": "自动",
"rating": 4.3,
"review_count": 850,
"sales": 300,
"publish_time": "2023-01-15 10:00:00"
}
],
"pagination": {
"page": 1,
"per_page": 20,
"total_count": 125,
"total_pages": 7,
"has_next": true,
"has_previous": false
},
"facets": [
{
"field": "brand",
"values": [
{"value": "宝马", "count": 75},
{"value": "奥迪", "count": 30},
{"value": "奔驰", "count": 20}
]
},
{
"field": "price_range",
"values": [
{"value": "20-30万", "count": 40},
{"value": "30-40万", "count": 60},
{"value": "40-50万", "count": 25}
]
}
]
}
}
5.2 错误响应示例
{
"success": false,
"code": 400,
"message": "参数错误:price_min不能大于price_max",
"data": null
}
5.3 状态码说明
状态码
说明
处理建议
200
成功
-
400
参数错误
检查请求参数格式
401
认证失败
检查API密钥和签名
403
权限不足
检查API权限范围
404
数据不存在
检查搜索条件
429
请求频率超限
降低请求频率
500
服务器错误
稍后重试
六、高级功能实现
6.1 智能搜索建议
class IntelligentSearchService:
"""智能搜索服务"""
def __init__(self, api_client):
self.client = api_client
self.search_history = []
def smart_search(self, query: str, search_type: str = "auto") -> SearchResult:
"""
智能搜索:自动识别搜索类型和参数
Args:
query: 搜索查询字符串
search_type: 搜索类型(auto自动识别)
Returns:
搜索结果
"""
# 自动识别搜索类型
if search_type == "auto":
detected_type = self._detect_search_type(query)
else:
detected_type = search_type
# 解析查询参数
search_params = self._parse_query_params(query, detected_type)
# 执行搜索
result = self.client.search(search_type=detected_type, **search_params)
# 记录搜索历史
self._record_search_history(query, detected_type, result)
return result
def _detect_search_type(self, query: str) -> str:
"""自动识别搜索类型"""
import re
# 检查是否为价格范围
if re.match(r'^\d+-\d+万$', query):
return 'new_car'
# 检查是否为车型关键词
car_keywords = ['SUV', 'MPV', '轿车', '跑车', '皮卡']
for keyword in car_keywords:
if keyword in query:
return 'model'
# 检查是否为二手车相关
used_car_keywords = ['二手车', '二手', '过户', '里程']
for keyword in used_car_keywords:
if keyword in query:
return 'used_car'
# 检查是否为资讯关键词
news_keywords = ['新闻', '资讯', '报道', '评测']
for keyword in news_keywords:
if keyword in query:
return 'news'
# 检查是否为视频关键词
video_keywords = ['视频', '试驾', '测评', 'vlog']
for keyword in video_keywords:
if keyword in query:
return 'video'
# 默认为新车搜索
return 'new_car'
def _parse_query_params(self, query: str, search_type: str) -> Dict[str, Any]:
"""解析查询参数"""
params = {'query': query}
import re
if search_type == 'new_car':
# 解析价格范围
price_match = re.search(r'(\d+)-(\d+)万', query)
if price_match:
params['price_min'] = float(price_match.group(1))
params['price_max'] = float(price_match.group(2))
# 解析级别
level_mapping = {
'A级': 'A',
'B级': 'B',
'C级': 'C',
'SUV': 'SUV',
'MPV': 'MPV'
}
for keyword, level in level_mapping.items():
if keyword in query:
params['level'] = level
break
elif search_type == 'used_car':
# 解析年份范围
year_match = re.search(r'(\d+)-(\d+)年', query)
if year_match:
params['year_min'] = int(year_match.group(1))
params['year_max'] = int(year_match.group(2))
return params
6.2 数据缓存优化
import redis
from functools import lru_cache
class CachedDongchediSearchAPI(DongchediSearchAPI):
"""带缓存的懂车帝搜索API"""
def __init__(self, app_key, app_secret, redis_client, sandbox=True):
super().__init__(app_key, app_secret, sandbox)
self.redis = redis_client
self.cache_prefix = "dongchedi:search:"
def search_cached(self, cache_key: str, **params) -> SearchResult:
"""
带缓存的搜索
"""
# 检查缓存
if self.redis:
cached = self.redis.get(cache_key)
if cached:
data = json.loads(cached)
return SearchResult(**data)
# 调用API
result = super().search(**params)
# 缓存结果
if self.redis and result.success:
# 根据搜索类型设置缓存时间
ttl = self._calculate_ttl(params.get('search_type'))
self.redis.setex(
cache_key,
ttl,
json.dumps(result.__dict__)
)
return result
def _calculate_ttl(self, search_type: str) -> int:
"""根据搜索类型计算缓存时间"""
ttl_config = {
'new_car': 3600, # 1小时
'used_car': 1800, # 30分钟
'model': 7200, # 2小时
'news': 900, # 15分钟
'video': 1800 # 30分钟
}
return ttl_config.get(search_type, 1800) # 默认30分钟
def get_cache_key(self, **params) -> str:
"""生成缓存键"""
# 移除分页参数
cache_params = params.copy()
cache_params.pop('page', None)
cache_params.pop('per_page', None)
# 生成唯一键
param_str = json.dumps(cache_params, sort_keys=True)
return f"{self.cache_prefix}{hashlib.md5(param_str.encode()).hexdigest()}"
6.3 批量处理优化
from concurrent.futures import ThreadPoolExecutor, as_completed
class BatchSearchProcessor:
"""批量搜索处理器"""
def __init__(self, api_client):
self.client = api_client
def batch_search_by_queries(
self,
search_queries: List[Dict[str, Any]],
max_workers: int = 3
) -> Dict[str, SearchResult]:
"""
批量执行多个搜索查询
Args:
search_queries: 搜索查询列表
max_workers: 最大并发数
Returns:
查询标识到结果的映射
"""
results = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
# 提交所有搜索任务
future_to_query = {
executor.submit(self.client.search, **query): query.get('query', 'unknown')
for query in search_queries
}
# 收集结果
for future in as_completed(future_to_query):
query_id = future_to_query[future]
try:
result = future.result(timeout=30)
results[query_id] = result
except Exception as e:
print(f"查询 {query_id} 失败: {e}")
results[query_id] = SearchResult(
success=False,
code=500,
message=str(e),
data={},
items=[],
pagination={},
facets=[],
total_count=0,
search_time=0
)
return results
def analyze_search_trends(
self,
search_results: Dict[str, SearchResult]
) -> Dict[str, Any]:
"""
分析搜索趋势
"""
analysis = {
'total_queries': len(search_results),
'success_rate': 0,
'avg_search_time': 0,
'total_items': 0,
'top_brands': [],
'price_distribution': {}
}
successful_searches = 0
total_search_time = 0
brand_counts = {}
price_ranges = {}
for query_id, result in search_results.items():
if result.success:
successful_searches += 1
total_search_time += result.search_time
analysis['total_items'] += result.total_count
# 统计品牌分布
for item in result.items:
if item.brand:
brand_counts[item.brand] = brand_counts.get(item.brand, 0) + 1
# 统计价格分布
for item in result.items:
if item.price:
price_range = self._get_price_range(item.price)
price_ranges[price_range] = price_ranges.get(price_range, 0) + 1
# 计算指标
analysis['success_rate'] = successful_searches / len(search_results) * 100
analysis['avg_search_time'] = total_search_time / successful_searches if successful_searches > 0 else 0
# 获取热门品牌
analysis['top_brands'] = sorted(brand_counts.items(), key=lambda x: x[1], reverse=True)[:5]
analysis['price_distribution'] = price_ranges
return analysis
def _get_price_range(self, price: float) -> str:
"""获取价格区间"""
if price < 10:
return "10万以下"
elif price < 20:
return "10-20万"
elif price < 30:
return "20-30万"
elif price < 50:
return "30-50万"
elif price < 100:
return "50-100万"
else:
return "100万以上"
七、实战应用场景
7.1 汽车比价平台
class CarPriceComparison:
"""汽车比价平台"""
def __init__(self, api_client):
self.client = api_client
self.price_history = {}
def compare_prices_by_model(
self,
model: str,
regions: List[str],
year_range: Tuple[int, int] = (2020, 2023)
) -> Dict[str, Any]:
"""
按车型比较地区价格差异
"""
search_results = {}
for region in regions:
result = self.client.search_new_cars(
query=model,
region=region,
year_min=year_range[0],
year_max=year_range[1],
sort_by="price",
sort_order="asc"
)
if result.success:
search_results[region] = result
else:
print(f"地区 {region} 搜索失败: {result.message}")
# 分析价格差异
price_analysis = self._analyze_regional_prices(search_results)
return {
'model': model,
'year_range': year_range,
'regions': regions,
'search_results': search_results,
'price_analysis': price_analysis,
'recommendation': self._generate_recommendation(price_analysis)
}
def _analyze_regional_prices(self, search_results: Dict[str, SearchResult]) -> Dict[str, Any]:
"""分析地区价格差异"""
analysis = {
'regional_prices': {},
'avg_prices': {},
'price_differences': {}
}
all_prices = []
for region, result in search_results.items():
if result.success and result.items:
prices = [item.price for item in result.items if item.price]
if prices:
avg_price = sum(prices) / len(prices)
analysis['regional_prices'][region] = prices
analysis['avg_prices'][region] = avg_price
all_prices.extend(prices)
# 计算价格差异
if len(analysis['avg_prices']) > 1:
min_price_region = min(analysis['avg_prices'], key=analysis['avg_prices'].get)
max_price_region = max(analysis['avg_prices'], key=analysis['avg_prices'].get)
min_price = analysis['avg_prices'][min_price_region]
max_price = analysis['avg_prices'][max_price_region]
analysis['price_differences'] = {
'min_price_region': min_price_region,
'min_price': min_price,
'max_price_region': max_price_region,
'max_price': max_price,
'max_difference': max_price - min_price,
'difference_rate': (max_price - min_price) / min_price * 100
}
return analysis
7.2 汽车推荐系统
class CarRecommender:
"""汽车推荐系统"""
def __init__(self, api_client):
self.client = api_client
self.user_preferences = {}
def recommend_cars_by_preferences(
self,
user_prefs: Dict[str, Any],
max_results: int = 10
) -> List[SearchResultItem]:
"""
根据用户偏好推荐汽车
Args:
user_prefs: 用户偏好
max_results: 最大推荐数量
Returns:
推荐汽车列表
"""
# 构建搜索参数
search_params = {
'search_type': 'new_car',
'sort_by': 'rating',
'sort_order': 'desc',
'per_page': 50
}
# 应用用户偏好
if 'budget' in user_prefs:
budget = user_prefs['budget']
search_params['price_max'] = budget
if 'preferred_brands' in user_prefs:
# 对多个品牌进行搜索
pass
if 'vehicle_type' in user_prefs:
search_params['level'] = user_prefs['vehicle_type']
if 'fuel_type' in user_prefs:
search_params['fuel_type'] = user_prefs['fuel_type']
# 执行搜索
result = self.client.search(**search_params)
if not result.success:
return []
# 应用推荐算法
scored_cars = []
for item in result.items:
score = self._calculate_recommendation_score(item, user_prefs)
scored_cars.append((item, score))
# 按得分排序
scored_cars.sort(key=lambda x: x[1], reverse=True)
return [car for car, score in scored_cars[:max_results]]
def _calculate_recommendation_score(
self,
car: SearchResultItem,
user_prefs: Dict[str, Any]
) -> float:
"""计算推荐得分"""
score = 0
# 价格得分
if 'budget' in user_prefs and car.price:
budget = user_prefs['budget']
price_diff = abs(car.price - budget)
price_score = max(0, 100 - price_diff * 2) # 每差1万减2分
score += price_score * 0.3
# 品牌偏好得分
if 'preferred_brands' in user_prefs and car.brand:
preferred_brands = user_prefs['preferred_brands']
if car.brand in preferred_brands:
score += 50
# 评分得分
if car.rating:
score += car.rating * 10 # 4.5分 = 45分
# 销量得分
if car.sales:
sales_score = min(car.sales / 100, 20) # 每100台销量加1分,最多20分
score += sales_score
return score
八、故障排查与优化
8.1 常见问题解决
问题1:签名验证失败
def debug_signature_generation(params, app_secret, timestamp):
"""调试签名生成过程"""
print("=== 签名调试信息 ===")
# 排序参数
filtered_params = {k: v for k, v in params.items() if v is not None and k != 'sign'}
sorted_keys = sorted(filtered_params.keys())
sign_str = ''
for key in sorted_keys:
if isinstance(filtered_params[key], (list, dict)):
value = json.dumps(filtered_params[key], separators=(',', ':'))
else:
value = str(filtered_params[key])
sign_str += f"{key}{value}"
print(f" {key}: {value}")
sign_str += app_secret
print(f"签名字符串: {sign_str}")
# 计算签名
import hmac
signature = hmac.new(
app_secret.encode('utf-8'),
sign_str.encode('utf-8'),
hashlib.sha256
).hexdigest()
print(f"计算签名: {signature}")
return signature
问题2:分页数据不准确
def stable_pagination_search(self, **params):
"""稳定的分页搜索"""
# 确保有稳定的排序字段
if 'sort_by' not in params:
params['sort_by'] = 'id' # 使用唯一字段排序
if 'sort_order' not in params:
params['sort_order'] = 'desc'
# 使用游标分页
all_items = []
last_id = None
while True:
if last_id:
# 使用游标进行分页
params['cursor'] = last_id
result = self.search(**params)
if not result.success or not result.items:
break
all_items.extend(result.items)
last_id = result.items[-1].id
# 检查是否还有下一页
pagination = result.pagination
if not pagination.get('has_next', False):
break
# 避免频繁请求
time.sleep(0.5)
return all_items
8.2 性能优化建议
合理使用缓存
多级缓存策略
class MultiLevelCache:
def init(self, redis_client):
self.memory_cache = {}
self.redis = redis_client
self.memory_ttl = 300 # 5分钟
self.redis_ttl = 1800 # 30分钟
def get_search_data(self, cache_key):
# 1. 检查内存缓存
if cache_key in self.memory_cache:
data, expire_time = self.memory_cache[cache_key]
if time.time() < expire_time:
return data
# 2. 检查Redis缓存
if self.redis:
cached = self.redis.get(cache_key)
if cached:
data = json.loads(cached)
# 更新内存缓存
self.memory_cache[cache_key] = (data, time.time() + self.memory_ttl)
return data
return None
连接池管理
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_optimized_session():
"""创建优化的会话"""
session = requests.Session()
# 配置重试策略
retry_strategy = Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=10,
pool_maxsize=100,
pool_block=False
)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
九、最佳实践总结
9.1 安全实践
密钥管理:使用环境变量存储API密钥
HTTPS强制:确保所有请求使用HTTPS
输入验证:验证所有输入参数
错误处理:不暴露敏感错误信息
9.2 性能实践
缓存策略:根据搜索类型设置合适的缓存时间
批量操作:合并多个请求减少API调用次数
分页优化:使用游标分页提高稳定性
异步处理:批量操作使用异步方式提高吞吐量
9.3 业务实践
搜索建议:提供智能搜索建议功能
聚合分析:利用facet数据进行统计分析
个性化推荐:基于用户行为优化推荐
趋势分析:记录和分析搜索趋势
附录:快速开始模板
quick_start.py
from dongchedi_search import DongchediSearchAPI
1. 初始化客户端
client = DongchediSearchAPI(
app_key="your_app_key",
app_secret="your_app_secret",
sandbox=True
)
2. 简单搜索
result = client.search(query="宝马3系")
if result.success:
for item in result.items:
print(f"{item.brand} {item.model} - {item.price}万")
3. 新车搜索
result = client.search_new_cars(
query="SUV",
price_min=20,
price_max=50,
level="SUV"
)
4. 获取所有数据
all_items = client.search_all(
query="奥迪A4L",
search_type="new_car",
max_pages=5
)
print(f"共找到 {len(all_items)} 条记录")
通过本攻略,您应该能够:
理解懂车帝item_search接口的完整功能
实现安全的API认证和请求
处理各种搜索条件和分页逻辑
构建高性能的汽车搜索应用
在实际业务中灵活应用该接口
建议根据实际业务需求选择合适的实现方案,并遵循最佳实践确保系统的稳定性和可维护性。