一、接口概览
1.1 接口简介
item_search接口是锦程物流开放平台提供的物流订单搜索接口,支持根据多种条件查询物流订单列表,实现订单的模糊搜索、状态筛选、时间范围查询等复杂检索功能。
1.2 核心功能
✅ 关键词搜索:订单号、运单号、收发货人姓名/电话模糊匹配
✅ 多条件筛选:按状态、时间、物流产品等维度筛选
✅ 分页查询:支持大数据量的分页加载
✅ 排序功能:支持按创建时间、更新时间等字段排序
✅ 字段选择:可指定返回字段,优化网络传输
二、准备工作
2.1 环境准备
Python环境
pip install requests python-dotenv
Java环境(Maven)
com.squareup.okhttp3
okhttp
4.10.0
PHP环境
composer require guzzlehttp/guzzle
2.2 配置管理
config.py
import os
from dotenv import load_dotenv
load_dotenv()
class Config:
# API配置
JC_APP_KEY = os.getenv('JC_APP_KEY', '')
JC_APP_SECRET = os.getenv('JC_APP_SECRET', '')
JC_API_ENDPOINT = os.getenv('JC_API_ENDPOINT',
'https://sandbox-api.jc56.com' # 默认沙箱环境
)
# 业务配置
DEFAULT_PAGE_SIZE = 20
MAX_PAGE_SIZE = 100
REQUEST_TIMEOUT = 30
三、接口详解
3.1 接口地址
POST /v1/item/search
3.2 请求参数详解
公共参数
参数名
类型
必填
说明
示例
app_key
string
是
应用标识
jc_app_2024
timestamp
string
是
请求时间戳
2026-02-01 10:00:00
sign
string
是
请求签名
详见签名算法
sign_method
string
是
签名方法
md5
format
string
否
返回格式
json(默认)
version
string
是
API版本
1.0
业务参数
参数名
类型
必填
说明
示例
keyword
string
否
搜索关键词
JC20260201
order_no
string
否
精确订单号
JC202602010001
waybill_no
string
否
运单号
SF123456789
shipper_name
string
否
发货人姓名
张三
shipper_phone
string
否
发货人电话
13800138000
consignee_name
string
否
收货人姓名
李四
consignee_phone
string
否
收货人电话
13900139000
status
string
否
订单状态
CREATED, PICKED_UP, IN_TRANSIT, DELIVERED
start_time
string
否
开始时间
2026-01-01 00:00:00
end_time
string
否
结束时间
2026-02-01 23:59:59
product_code
string
否
产品编码
JC_EXPRESS
page_no
int
否
页码
1(默认)
page_size
int
否
每页条数
20(默认)
sort_field
string
否
排序字段
create_time, update_time
sort_order
string
否
排序方向
desc(默认), asc
fields
string
否
返回字段
order_no,status,create_time
3.3 签名算法
import hashlib
from typing import Dict, Any
def generate_sign(params: Dict[str, Any], app_secret: str) -> str:
"""
生成API请求签名
Args:
params: 请求参数字典
app_secret: 应用密钥
Returns:
32位大写MD5签名
"""
# 1. 过滤空值和sign参数
filtered_params = {
k: v for k, v in params.items()
if v is not None and k != 'sign'
}
# 2. 按键名ASCII升序排序
sorted_keys = sorted(filtered_params.keys())
# 3. 拼接键值对
sign_str = ''
for key in sorted_keys:
# 处理数组和对象类型参数
if isinstance(filtered_params[key], (list, dict)):
value = str(filtered_params[key])
else:
value = str(filtered_params[key])
sign_str += f"{key}{value}"
# 4. 拼接app_secret
sign_str += app_secret
# 5. 计算MD5并转为大写
md5_hash = hashlib.md5()
md5_hash.update(sign_str.encode('utf-8'))
return md5_hash.hexdigest().upper()
测试用例
test_params = {
"app_key": "test_app",
"timestamp": "2026-02-01 10:00:00",
"keyword": "测试",
"page_no": 1,
"page_size": 20
}
app_secret = "test_secret"
signature = generate_sign(test_params, app_secret)
print(f"生成的签名: {signature}")
四、完整代码实现
4.1 Python完整实现
import requests
import json
import hashlib
import time
from typing import Dict, Any, List, Optional
from datetime import datetime
from urllib.parse import urljoin
class JCLogisticsSearchAPI:
"""锦程物流搜索API客户端"""
def __init__(self, app_key: str, app_secret: str, sandbox: bool = True):
"""
初始化API客户端
Args:
app_key: 应用密钥
app_secret: 应用密钥
sandbox: 是否使用沙箱环境
"""
self.app_key = app_key
self.app_secret = app_secret
self.base_url = "https://sandbox-api.jc56.com" if sandbox else "https://api.jc56.com"
self.session = requests.Session()
self.session.headers.update({
"User-Agent": "JCLogistics-Search-Client/1.0",
"Accept": "application/json",
"Content-Type": "application/json; charset=utf-8"
})
def _generate_timestamp(self) -> str:
"""生成标准格式时间戳"""
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def _generate_sign(self, params: Dict[str, Any]) -> str:
"""生成请求签名"""
# 过滤空值和sign参数
filtered_params = {
k: v for k, v in params.items()
if v is not None and k != 'sign'
}
# 排序并拼接
sorted_keys = sorted(filtered_params.keys())
sign_str = ''
for key in sorted_keys:
if isinstance(filtered_params[key], (list, dict)):
value = json.dumps(filtered_params[key], separators=(',', ':'))
else:
value = str(filtered_params[key])
sign_str += f"{key}{value}"
# 添加密钥并计算MD5
sign_str += self.app_secret
return hashlib.md5(sign_str.encode('utf-8')).hexdigest().upper()
def search_orders(
self,
keyword: Optional[str] = None,
order_no: Optional[str] = None,
status_list: Optional[List[str]] = None,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
page_no: int = 1,
page_size: int = 20,
sort_field: str = "create_time",
sort_order: str = "desc",
fields: Optional[List[str]] = None,
**extra_params
) -> Dict[str, Any]:
"""
搜索物流订单
Args:
keyword: 搜索关键词(支持模糊匹配)
order_no: 精确订单号
status_list: 状态筛选列表
start_time: 开始时间(格式:YYYY-MM-DD HH:MM:SS)
end_time: 结束时间(格式:YYYY-MM-DD HH:MM:SS)
page_no: 页码(从1开始)
page_size: 每页数量(1-100)
sort_field: 排序字段
sort_order: 排序方向(desc降序/asc升序)
fields: 指定返回字段列表
**extra_params: 其他查询参数
Returns:
API响应数据
"""
# 构建请求参数
params = {
"app_key": self.app_key,
"timestamp": self._generate_timestamp(),
"sign_method": "md5",
"format": "json",
"version": "1.0",
"page_no": max(1, page_no),
"page_size": min(max(1, page_size), 100),
"sort_field": sort_field,
"sort_order": sort_order,
}
# 添加可选参数
if keyword:
params["keyword"] = keyword
if order_no:
params["order_no"] = order_no
if status_list:
params["status"] = ",".join(status_list)
if start_time:
params["start_time"] = start_time
if end_time:
params["end_time"] = end_time
if fields:
params["fields"] = ",".join(fields)
# 添加额外参数
params.update(extra_params)
# 生成签名
params["sign"] = self._generate_sign(params)
# 发送请求
url = urljoin(self.base_url, "/v1/item/search")
try:
response = self.session.post(
url,
json=params,
timeout=30
)
response.raise_for_status()
result = response.json()
# 验证返回签名(如果返回中有sign字段)
if "sign" in result:
# 注意:这里需要根据实际API返回的签名验证规则实现
pass
return result
except requests.exceptions.Timeout:
return {
"success": False,
"code": "TIMEOUT",
"message": "请求超时",
"data": None
}
except requests.exceptions.RequestException as e:
return {
"success": False,
"code": "REQUEST_ERROR",
"message": f"请求失败: {str(e)}",
"data": None
}
except json.JSONDecodeError:
return {
"success": False,
"code": "INVALID_RESPONSE",
"message": "响应解析失败",
"data": None
}
def search_all_orders(
self,
**search_params
) -> List[Dict[str, Any]]:
"""
获取所有符合条件的订单(自动处理分页)
Args:
**search_params: 搜索参数
Returns:
订单列表
"""
all_orders = []
page_no = 1
while True:
# 获取当前页数据
result = self.search_orders(
page_no=page_no,
page_size=100, # 使用最大页数减少请求次数
**search_params
)
if not result.get("success"):
print(f"第{page_no}页查询失败: {result.get('message')}")
break
data = result.get("data", {})
orders = data.get("list", [])
pagination = data.get("pagination", {})
# 添加当前页数据
all_orders.extend(orders)
# 检查是否还有下一页
current_page = pagination.get("page_no", page_no)
total_pages = pagination.get("total_pages", 0)
has_next = pagination.get("has_next", False)
print(f"已获取第{current_page}页,共{len(orders)}条,总计{len(all_orders)}条")
if not has_next or current_page >= total_pages:
break
page_no += 1
# 避免请求过于频繁
time.sleep(0.5)
return all_orders
使用示例
def demo_search():
"""搜索接口使用演示"""
# 初始化客户端
client = JCLogisticsSearchAPI(
app_key="your_app_key",
app_secret="your_app_secret",
sandbox=True
)
print("=== 示例1:关键词搜索 ===")
result1 = client.search_orders(
keyword="北京",
page_no=1,
page_size=10
)
print(json.dumps(result1, ensure_ascii=False, indent=2))
print("\n=== 示例2:状态筛选 ===")
result2 = client.search_orders(
status_list=["DELIVERED", "IN_TRANSIT"],
start_time="2026-01-01 00:00:00",
end_time="2026-02-01 23:59:59",
sort_field="create_time",
sort_order="desc"
)
print("\n=== 示例3:获取所有待发货订单 ===")
all_pending_orders = client.search_all_orders(
status_list=["CREATED"],
start_time="2026-01-01 00:00:00"
)
print(f"共找到 {len(all_pending_orders)} 个待发货订单")
if name == "main":
demo_search()
4.2 Java实现
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.databind.ObjectMapper;
import okhttp3.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.security.MessageDigest;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.TimeUnit;
public class JCLogisticsSearchClient {
private static final Logger logger = LoggerFactory.getLogger(JCLogisticsSearchClient.class);
private final String appKey;
private final String appSecret;
private final String baseUrl;
private final OkHttpClient httpClient;
private final ObjectMapper objectMapper;
public JCLogisticsSearchClient(String appKey, String appSecret, boolean sandbox) {
this.appKey = appKey;
this.appSecret = appSecret;
this.baseUrl = sandbox ? "https://sandbox-api.jc56.com" : "https://api.jc56.com";
this.httpClient = new OkHttpClient.Builder()
.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS)
.addInterceptor(new LoggingInterceptor())
.build();
this.objectMapper = new ObjectMapper();
this.objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
}
// 搜索参数构建器
public static class SearchParams {
private String keyword;
private String orderNo;
private List<String> statusList;
private String startTime;
private String endTime;
private Integer pageNo = 1;
private Integer pageSize = 20;
private String sortField = "create_time";
private String sortOrder = "desc";
private List<String> fields;
// 省略getter/setter和builder方法
}
public SearchResult searchOrders(SearchParams params) throws IOException {
// 构建请求参数
Map<String, Object> requestParams = new HashMap<>();
requestParams.put("app_key", appKey);
requestParams.put("timestamp", LocalDateTime.now()
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
requestParams.put("sign_method", "md5");
requestParams.put("format", "json");
requestParams.put("version", "1.0");
// 添加业务参数
if (params.getKeyword() != null) {
requestParams.put("keyword", params.getKeyword());
}
if (params.getStatusList() != null && !params.getStatusList().isEmpty()) {
requestParams.put("status", String.join(",", params.getStatusList()));
}
// ... 其他参数
// 生成签名
String sign = generateSign(requestParams);
requestParams.put("sign", sign);
// 构建请求
String jsonBody = objectMapper.writeValueAsString(requestParams);
Request request = new Request.Builder()
.url(baseUrl + "/v1/item/search")
.post(RequestBody.create(jsonBody, MediaType.parse("application/json")))
.addHeader("User-Agent", "JCLogistics-Java-Client/1.0")
.build();
// 发送请求
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new IOException("Unexpected code " + response);
}
String responseBody = response.body().string();
return objectMapper.readValue(responseBody, SearchResult.class);
}
}
// 省略其他辅助方法...
static class SearchResult {
private Boolean success;
private String code;
private String message;
private SearchData data;
// 省略getter/setter
}
static class SearchData {
private List<OrderInfo> list;
private Pagination pagination;
// 省略getter/setter
}
static class Pagination {
private Integer pageNo;
private Integer pageSize;
private Integer totalCount;
private Integer totalPages;
private Boolean hasNext;
// 省略getter/setter
}
}
4.3 PHP实现(Laravel示例)
<?php
namespace App\Services\Logistics;
use GuzzleHttp\Client;
use GuzzleHttp\Exception\RequestException;
use Illuminate\Support\Facades\Cache;
use Illuminate\Support\Facades\Log;
class JCLogisticsSearchService
{
private $appKey;
private $appSecret;
private $baseUrl;
private $client;
private $timeout = 30;
public function __construct($sandbox = true)
{
$this->appKey = config('services.jc_logistics.app_key');
$this->appSecret = config('services.jc_logistics.app_secret');
$this->baseUrl = $sandbox
? 'https://sandbox-api.jc56.com'
: 'https://api.jc56.com';
$this->client = new Client([
'base_uri' => $this->baseUrl,
'timeout' => $this->timeout,
'headers' => [
'User-Agent' => 'JCLogistics-Laravel-Client/1.0',
'Accept' => 'application/json',
]
]);
}
/**
* 搜索订单
*/
public function search(array $params = []): array
{
try {
// 基础参数
$requestParams = [
'app_key' => $this->appKey,
'timestamp' => date('Y-m-d H:i:s'),
'sign_method' => 'md5',
'format' => 'json',
'version' => '1.0',
];
// 合并业务参数
$requestParams = array_merge($requestParams, $params);
// 生成签名
$requestParams['sign'] = $this->generateSign($requestParams);
// 发送请求
$response = $this->client->post('/v1/item/search', [
'json' => $requestParams,
'http_errors' => false
]);
$statusCode = $response->getStatusCode();
$body = $response->getBody()->getContents();
$result = json_decode($body, true);
if ($statusCode !== 200) {
Log::error('锦程物流搜索接口请求失败', [
'status' => $statusCode,
'response' => $result
]);
return [
'success' => false,
'code' => 'HTTP_ERROR',
'message' => '接口请求失败',
'data' => null
];
}
return $result;
} catch (RequestException $e) {
Log::error('锦程物流搜索接口异常', [
'error' => $e->getMessage()
]);
return [
'success' => false,
'code' => 'REQUEST_EXCEPTION',
'message' => $e->getMessage(),
'data' => null
];
}
}
/**
* 带缓存搜索
*/
public function searchWithCache(array $params, int $ttl = 300): array
{
$cacheKey = 'jc_search:' . md5(serialize($params));
return Cache::remember($cacheKey, $ttl, function () use ($params) {
return $this->search($params);
});
}
/**
* 生成签名
*/
private function generateSign(array $params): string
{
// 移除sign参数
unset($params['sign']);
// 过滤空值并按键名排序
$params = array_filter($params, function ($value) {
return $value !== null && $value !== '';
});
ksort($params);
// 拼接字符串
$signStr = '';
foreach ($params as $key => $value) {
if (is_array($value) || is_object($value)) {
$value = json_encode($value, JSON_UNESCAPED_UNICODE);
}
$signStr .= $key . $value;
}
// 添加密钥并计算MD5
$signStr .= $this->appSecret;
return strtoupper(md5($signStr));
}
/**
* 批量搜索所有订单
*/
public function searchAll(array $params = []): array
{
$allOrders = [];
$pageNo = 1;
do {
$params['page_no'] = $pageNo;
$params['page_size'] = 100;
$result = $this->search($params);
if (!$result['success']) {
break;
}
$orders = $result['data']['list'] ?? [];
$pagination = $result['data']['pagination'] ?? [];
$allOrders = array_merge($allOrders, $orders);
$hasNext = $pagination['has_next'] ?? false;
$totalPages = $pagination['total_pages'] ?? 0;
if (!$hasNext || $pageNo >= $totalPages) {
break;
}
$pageNo++;
usleep(500000); // 延迟0.5秒
} while (true);
return $allOrders;
}
}
五、返回结果处理
5.1 成功响应格式
{
"success": true,
"code": "10000",
"message": "成功",
"data": {
"list": [
{
"order_no": "JC202602010001",
"waybill_no": "SF123456789",
"status": "DELIVERED",
"status_desc": "已签收",
"shipper_name": "张三",
"shipper_phone": "13800138000",
"consignee_name": "李四",
"consignee_phone": "13900139000",
"create_time": "2026-02-01 09:00:00",
"update_time": "2026-02-01 16:30:00",
"product_name": "锦程快递",
"weight": 2.5,
"volume": 0.02,
"amount": 25.50
},
{
"order_no": "JC202602010002",
"waybill_no": "SF123456790",
"status": "IN_TRANSIT",
"status_desc": "运输中",
"shipper_name": "王五",
"shipper_phone": "13800138001",
"consignee_name": "赵六",
"consignee_phone": "13900139001",
"create_time": "2026-02-01 10:00:00",
"update_time": "2026-02-01 14:20:00",
"product_name": "锦程快运",
"weight": 15.8,
"volume": 0.15,
"amount": 120.00
}
],
"pagination": {
"page_no": 1,
"page_size": 20,
"total_count": 125,
"total_pages": 7,
"has_next": true,
"has_previous": false
},
"summary": {
"total_amount": 2845.50,
"total_weight": 158.3,
"order_count": 125,
"status_distribution": {
"CREATED": 12,
"PICKED_UP": 28,
"IN_TRANSIT": 45,
"DELIVERED": 40
}
}
}
}
5.2 响应字段说明
订单信息字段
字段名
类型
说明
order_no
string
订单号(唯一标识)
waybill_no
string
运单号
status
string
状态编码
status_desc
string
状态描述
shipper_*
string
发货人信息
consignee_*
string
收货人信息
create_time
string
创建时间
update_time
string
更新时间
product_name
string
产品名称
weight
float
重量(kg)
volume
float
体积(m³)
amount
float
金额(元)
分页信息字段
字段名
类型
说明
page_no
int
当前页码
page_size
int
每页数量
total_count
int
总记录数
total_pages
int
总页数
has_next
bool
是否有下一页
has_previous
bool
是否有上一页
5.3 错误响应处理
class JCLogisticsError(Exception):
"""锦程物流API异常基类"""
pass
class AuthenticationError(JCLogisticsError):
"""认证错误"""
pass
class ValidationError(JCLogisticsError):
"""参数验证错误"""
pass
class RateLimitError(JCLogisticsError):
"""频率限制错误"""
pass
class APIError(JCLogisticsError):
"""API错误"""
def init(self, code, message, response=None):
self.code = code
self.message = message
self.response = response
super().init(f"[{code}] {message}")
def handle_api_response(response: Dict[str, Any]) -> Dict[str, Any]:
"""
统一处理API响应
Args:
response: API响应数据
Returns:
处理后的数据
Raises:
各种业务异常
"""
if not response.get("success"):
code = response.get("code")
message = response.get("message", "未知错误")
# 根据错误码抛出特定异常
if code in ["20001", "20002", "20003"]:
raise AuthenticationError(message)
elif code in ["20004", "20005"]:
raise ValidationError(message)
elif code == "20006":
raise RateLimitError(message)
else:
raise APIError(code, message, response)
return response.get("data", {})
使用示例
try:
result = client.search_orders(keyword="测试")
data = handle_api_response(result)
orders = data.get("list", [])
except AuthenticationError as e:
print(f"认证失败: {e}")
# 重新获取token或提醒用户
except RateLimitError as e:
print(f"请求频率超限: {e}")
# 等待后重试
time.sleep(60)
except APIError as e:
print(f"API错误: {e}")
# 记录日志并通知管理员
logger.error(f"锦程物流API错误: {e.code} - {e.message}")
except Exception as e:
print(f"未知错误: {e}")
六、高级功能实现
6.1 智能搜索建议
class IntelligentSearchService:
"""智能搜索服务"""
def __init__(self, api_client):
self.client = api_client
self.search_history = [] # 搜索历史缓存
def smart_search(self, query: str) -> Dict[str, Any]:
"""
智能搜索:自动识别搜索类型
Args:
query: 搜索查询字符串
Returns:
搜索结果
"""
# 识别查询类型
search_type = self._detect_search_type(query)
# 构建搜索参数
params = self._build_search_params(query, search_type)
# 执行搜索
result = self.client.search_orders(**params)
# 记录搜索历史
self._record_search_history(query, search_type, result)
return result
def _detect_search_type(self, query: str) -> str:
"""
自动识别搜索类型
"""
# 检查是否为订单号(特定格式)
if re.match(r'^JC\d{12}$', query):
return 'order_no'
# 检查是否为运单号
if re.match(r'^[A-Z]{2}\d{9}$', query):
return 'waybill_no'
# 检查是否为手机号
if re.match(r'^1[3-9]\d{9}$', query):
return 'phone'
# 检查是否为日期
if re.match(r'^\d{4}-\d{2}-\d{2}$', query):
return 'date'
# 默认为关键词搜索
return 'keyword'
def _build_search_params(self, query: str, search_type: str) -> Dict[str, Any]:
"""
根据搜索类型构建参数
"""
params = {}
if search_type == 'order_no':
params['order_no'] = query
elif search_type == 'waybill_no':
params['waybill_no'] = query
elif search_type == 'phone':
# 尝试同时搜索发货人和收货人电话
params['shipper_phone'] = query
params['consignee_phone'] = query
elif search_type == 'date':
params['start_time'] = f"{query} 00:00:00"
params['end_time'] = f"{query} 23:59:59"
else:
params['keyword'] = query
return params
6.2 实时搜索提示
// 前端实现示例(Vue.js)
找到 { { searchResults.pagination.total_count }} 条记录
6.3 搜索性能优化
import redis
from functools import lru_cache
from datetime import datetime, timedelta
class OptimizedSearchService:
"""优化搜索服务"""
def __init__(self, api_client, redis_client):
self.api_client = api_client
self.redis = redis_client
self.cache_prefix = "jc_search:"
@lru_cache(maxsize=1000)
def search_with_cache(self, cache_key: str, **params):
"""
使用内存缓存和Redis缓存的搜索
"""
# 先尝试从Redis获取
cached = self.redis.get(cache_key)
if cached:
return json.loads(cached)
# 调用API
result = self.api_client.search_orders(**params)
# 缓存到Redis(根据数据量设置不同过期时间)
if result.get("success"):
data = result.get("data", {})
total_count = data.get("pagination", {}).get("total_count", 0)
# 根据数据量设置缓存时间
if total_count == 0:
ttl = 300 # 5分钟
elif total_count <= 100:
ttl = 1800 # 30分钟
else:
ttl = 3600 # 1小时
self.redis.setex(
cache_key,
ttl,
json.dumps(result)
)
return result
def smart_search_optimized(self, **params) -> Dict[str, Any]:
"""
智能优化搜索
"""
# 生成缓存键
cache_key = self._generate_cache_key(params)
# 检查是否可以使用缓存
if self._can_use_cache(params):
return self.search_with_cache(cache_key, **params)
# 实时搜索
return self.api_client.search_orders(**params)
def _generate_cache_key(self, params: Dict[str, Any]) -> str:
"""生成缓存键"""
# 移除分页参数,因为它们不影响数据本身
cache_params = params.copy()
cache_params.pop('page_no', None)
cache_params.pop('page_size', None)
# 生成唯一键
param_str = json.dumps(cache_params, sort_keys=True)
return f"{self.cache_prefix}{hashlib.md5(param_str.encode()).hexdigest()}"
def _can_use_cache(self, params: Dict[str, Any]) -> bool:
"""判断是否可以使用缓存"""
# 实时性要求高的查询不使用缓存
if params.get('real_time', False):
return False
# 特定状态不使用缓存
real_time_statuses = ['PICKED_UP', 'IN_TRANSIT', 'DELIVERING']
if params.get('status') in real_time_statuses:
return False
return True
def batch_search(self, search_queries: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
"""
批量搜索优化
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
results = []
with ThreadPoolExecutor(max_workers=5) as executor:
# 提交所有搜索任务
future_to_query = {
executor.submit(self.smart_search_optimized, **query): query
for query in search_queries
}
# 收集结果
for future in as_completed(future_to_query):
try:
result = future.result(timeout=10)
results.append(result)
except Exception as e:
query = future_to_query[future]
print(f"查询失败 {query}: {e}")
results.append({
"success": False,
"error": str(e)
})
return results
七、实战应用场景
7.1 物流管理系统集成
class LogisticsManagementSystem:
"""物流管理系统集成示例"""
def __init__(self, jc_client):
self.jc_client = jc_client
self.order_cache = {}
def search_orders_by_criteria(self, criteria: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
根据综合条件搜索订单
Args:
criteria: 搜索条件字典,包含:
- date_range: 日期范围
- status: 状态
- customer_info: 客户信息
- product_type: 产品类型
- amount_range: 金额范围
Returns:
订单列表
"""
# 构建API搜索参数
search_params = {}
# 处理日期范围
if 'date_range' in criteria:
start_date, end_date = criteria['date_range']
search_params['start_time'] = f"{start_date} 00:00:00"
search_params['end_time'] = f"{end_date} 23:59:59"
# 处理状态
if 'status' in criteria:
search_params['status'] = criteria['status']
# 处理客户信息
if 'customer_info' in criteria:
customer = criteria['customer_info']
# 尝试多种搜索方式
if 'phone' in customer:
search_params['shipper_phone'] = customer['phone']
search_params['consignee_phone'] = customer['phone']
elif 'name' in customer:
search_params['keyword'] = customer['name']
# 执行搜索
result = self.jc_client.search_orders(**search_params)
if not result.get('success'):
raise Exception(f"搜索失败: {result.get('message')}")
orders = result.get('data', {}).get('list', [])
# 应用额外的过滤条件
filtered_orders = self._apply_filters(orders, criteria)
return filtered_orders
def get_dashboard_stats(self, start_date: str, end_date: str) -> Dict[str, Any]:
"""
获取仪表板统计信息
"""
# 搜索所有订单
all_orders = self.jc_client.search_all_orders(
start_time=f"{start_date} 00:00:00",
end_time=f"{end_date} 23:59:59"
)
# 计算统计信息
stats = {
'total_orders': len(all_orders),
'total_amount': sum(order.get('amount', 0) for order in all_orders),
'total_weight': sum(order.get('weight', 0) for order in all_orders),
'status_distribution': {},
'daily_trend': self._calculate_daily_trend(all_orders)
}
# 统计状态分布
for order in all_orders:
status = order.get('status', 'UNKNOWN')
stats['status_distribution'][status] = \
stats['status_distribution'].get(status, 0) + 1
return stats
def export_orders_to_excel(self, search_params: Dict[str, Any],
filename: str) -> str:
"""
导出订单到Excel
"""
import pandas as pd
# 获取所有订单
all_orders = self.jc_client.search_all_orders(**search_params)
# 转换为DataFrame
df = pd.DataFrame(all_orders)
# 数据清洗和转换
if not df.empty:
# 选择需要的列
columns = ['order_no', 'waybill_no', 'status_desc',
'shipper_name', 'shipper_phone',
'consignee_name', 'consignee_phone',
'create_time', 'amount', 'weight']
df = df[columns]
# 重命名列
column_mapping = {
'order_no': '订单号',
'waybill_no': '运单号',
'status_desc': '状态',
'shipper_name': '发货人',
'shipper_phone': '发货人电话',
'consignee_name': '收货人',
'consignee_phone': '收货人电话',
'create_time': '创建时间',
'amount': '金额',
'weight': '重量'
}
df = df.rename(columns=column_mapping)
# 导出到Excel
df.to_excel(filename, index=False)
return filename
7.2 订单状态监控系统
class OrderMonitor:
"""订单状态监控系统"""
def __init__(self, jc_client, alert_service):
self.jc_client = jc_client
self.alert_service = alert_service
self.monitored_statuses = {
'DELAYED': 24 * 3600, # 24小时未更新
'EXCEPTION': 0, # 异常状态立即通知
}
def monitor_abnormal_orders(self):
"""
监控异常订单
"""
# 搜索特定状态的订单
abnormal_orders = []
for status in ['EXCEPTION', 'PROBLEM']:
result = self.jc_client.search_orders(
status=status,
page_size=100
)
if result.get('success'):
orders = result.get('data', {}).get('list', [])
abnormal_orders.extend(orders)
# 处理异常订单
for order in abnormal_orders:
self._handle_abnormal_order(order)
def monitor_delayed_orders(self):
"""
监控延迟订单
"""
# 搜索24小时未更新的运输中订单
import datetime
cutoff_time = (datetime.datetime.now() -
datetime.timedelta(hours=24)).strftime("%Y-%m-%d %H:%M:%S")
result = self.jc_client.search_orders(
status='IN_TRANSIT',
end_time=cutoff_time,
sort_field='update_time',
sort_order='asc'
)
if result.get('success'):
delayed_orders = result.get('data', {}).get('list', [])
for order in delayed_orders:
self._handle_delayed_order(order)
def _handle_abnormal_order(self, order: Dict[str, Any]):
"""处理异常订单"""
order_no = order.get('order_no')
status_desc = order.get('status_desc', '未知状态')
# 发送告警
alert_message = f"""
订单异常告警!
订单号:{order_no}
状态:{status_desc}
发货人:{order.get('shipper_name')}
收货人:{order.get('consignee_name')}
更新时间:{order.get('update_time')}
"""
self.alert_service.send_alert(
title="物流订单异常",
message=alert_message,
level="HIGH",
recipients=["logistics_manager@company.com"]
)
# 记录到数据库
self._log_abnormal_order(order)
def _handle_delayed_order(self, order: Dict[str, Any]):
"""处理延迟订单"""
order_no = order.get('order_no')
last_update = order.get('update_time')
# 发送通知
notification = f"""
订单运输延迟提醒
订单号:{order_no}
最后更新:{last_update}
已超过24小时未更新状态
请及时跟进处理
"""
self.alert_service.send_alert(
title="订单运输延迟",
message=notification,
level="MEDIUM",
recipients=["customer_service@company.com"]
)
八、故障排查与优化
8.1 常见问题解决
问题1:签名验证失败
症状:返回"签名错误"或"签名验证失败"
解决方案:调试签名生成过程
def debug_signature(params, app_secret):
"""调试签名生成"""
print("=== 签名调试信息 ===")
print(f"原始参数: {params}")
# 1. 过滤并排序参数
filtered = {k: v for k, v in params.items() if v is not None and k != 'sign'}
sorted_keys = sorted(filtered.keys())
print(f"排序后的键: {sorted_keys}")
# 2. 拼接字符串
sign_str = ''
for key in sorted_keys:
value = str(filtered[key])
sign_str += f"{key}{value}"
print(f" {key}: {value}")
sign_str += app_secret
print(f"拼接结果: {sign_str}")
# 3. 计算签名
import hashlib
signature = hashlib.md5(sign_str.encode('utf-8')).hexdigest().upper()
print(f"计算签名: {signature}")
return signature
问题2:分页数据不准确
症状:分页数据重复或缺失
解决方案:使用稳定的排序字段
def stable_pagination_search(self, **params):
"""
稳定的分页搜索
"""
# 确保有排序字段
if 'sort_field' not in params:
params['sort_field'] = 'order_no' # 使用唯一字段排序
if 'sort_order' not in params:
params['sort_order'] = 'asc'
# 处理游标分页
last_order_no = None
all_orders = []
while True:
if last_order_no:
# 使用游标进行分页
params['cursor'] = last_order_no
result = self.search_orders(**params)
if not result.get('success'):
break
orders = result.get('data', {}).get('list', [])
if not orders:
break
all_orders.extend(orders)
last_order_no = orders[-1].get('order_no')
# 检查是否还有更多数据
pagination = result.get('data', {}).get('pagination', {})
if not pagination.get('has_next', False):
break
# 避免频繁请求
time.sleep(0.1)
return all_orders
问题3:API限流处理
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception
class RateLimitedClient:
"""带限流处理的客户端"""
def __init__(self, api_client):
self.api_client = api_client
self.rate_limiter = RateLimiter(max_calls=100, period=60) # 60秒100次
def is_rate_limit_error(self, result):
"""判断是否为限流错误"""
return result.get('code') == '20005' # 频率限制错误码
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception(lambda x: isinstance(x, dict) and
x.get('code') == '20005')
)
def search_with_retry(self, **params):
"""带重试的搜索"""
with self.rate_limiter:
result = self.api_client.search_orders(**params)
if self.is_rate_limit_error(result):
raise result # 触发重试
return result
8.2 性能优化建议
合理使用缓存
多级缓存策略
class MultiLevelCache:
def init(self):
self.memory_cache = {} # 内存缓存
self.redis_cache = redis.Redis() # Redis缓存
self.local_ttl = 60 # 内存缓存60秒
self.redis_ttl = 300 # Redis缓存5分钟
def get(self, key):
# 1. 检查内存缓存
if key in self.memory_cache:
item = self.memory_cache[key]
if time.time() < item['expire']:
return item['data']
# 2. 检查Redis缓存
cached = self.redis_cache.get(key)
if cached:
data = json.loads(cached)
# 更新内存缓存
self.memory_cache[key] = {
'data': data,
'expire': time.time() + self.local_ttl
}
return data
return None
连接池管理
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_session_with_pool():
"""创建带连接池的会话"""
session = requests.Session()
# 配置重试策略
retry_strategy = Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=10, # 连接池大小
pool_maxsize=100,
pool_block=False
)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session
批量请求优化
from concurrent.futures import ThreadPoolExecutor, as_completed
import asyncio
async def batch_search_async(search_queries):
"""异步批量搜索"""
async with aiohttp.ClientSession() as session:
tasks = []
for query in search_queries:
task = self._async_search(session, query)
tasks.append(task)
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
九、监控与告警
9.1 监控指标
class APIMonitor:
"""API监控"""
def __init__(self):
self.metrics = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'average_response_time': 0,
'error_codes': {},
'last_error_time': None
}
def record_request(self, success, response_time, error_code=None):
"""记录请求指标"""
self.metrics['total_requests'] += 1
if success:
self.metrics['successful_requests'] += 1
else:
self.metrics['failed_requests'] += 1
if error_code:
self.metrics['error_codes'][error_code] = \
self.metrics['error_codes'].get(error_code, 0) + 1
self.metrics['last_error_time'] = time.time()
# 更新平均响应时间(移动平均)
alpha = 0.1 # 平滑因子
old_avg = self.metrics['average_response_time']
self.metrics['average_response_time'] = \
alpha * response_time + (1 - alpha) * old_avg
def get_health_status(self):
"""获取健康状态"""
total = self.metrics['total_requests']
if total == 0:
return "UNKNOWN"
success_rate = self.metrics['successful_requests'] / total
if success_rate > 0.95:
return "HEALTHY"
elif success_rate > 0.8:
return "DEGRADED"
else:
return "UNHEALTHY"
9.2 告警配置
alert_rules.yaml
alert_rules:
name: "api_error_rate_high"
condition: "error_rate > 0.1"
duration: "5m"
severity: "warning"
message: "API错误率超过10%"name: "api_response_time_slow"
condition: "response_time_p95 > 5000"
duration: "10m"
severity: "warning"
message: "API响应时间P95超过5秒"name: "api_unavailable"
condition: "success_rate == 0"
duration: "2m"
severity: "critical"
message: "API完全不可用"
十、最佳实践总结
10.1 安全实践
密钥管理:使用环境变量或密钥管理服务,不要硬编码在代码中
访问控制:为不同环境使用不同的API密钥
请求验证:验证所有输入参数,防止注入攻击
HTTPS强制:确保所有请求都使用HTTPS
10.2 性能实践
合理分页:根据实际需求设置合适的page_size
缓存策略:对不常变的数据使用缓存
连接复用:使用连接池减少连接建立开销
异步处理:批量操作使用异步方式提高吞吐量
10.3 代码质量
错误处理:完善的异常处理和重试机制
日志记录:详细记录请求和响应信息
单元测试:编写测试用例覆盖主要功能
代码审查:定期进行代码审查和安全检查
10.4 运维实践
监控告警:实时监控API调用情况
容量规划:根据业务量预估API调用频率
版本管理:记录API版本变更,制定升级计划
文档维护:保持接口文档的及时更新
附录:快速开始模板quick_start.py
from jc_logistics import JCLogisticsSearchAPI
1. 初始化客户端
client = JCLogisticsSearchAPI(
app_key="your_app_key",
app_secret="your_app_secret",
sandbox=True # 测试环境
)
2. 简单搜索
result = client.search_orders(keyword="测试订单")
print(f"找到 {len(result['data']['list'])} 条记录")
3. 条件搜索
result = client.search_orders(
status_list=["DELIVERED"],
start_time="2026-01-01 00:00:00",
end_time="2026-02-01 23:59:59",
page_size=50
)
4. 获取所有数据
all_orders = client.search_all_orders(
keyword="重要客户",
status_list=["IN_TRANSIT", "DELIVERED"]
)
print(f"总共找到 {len(all_orders)} 条订单")
通过本攻略,您应该能够全面掌握锦程物流item_search接口的对接和使用。建议根据实际业务需求选择合适的实现方案,并遵循最佳实践确保系统的稳定性和可维护性。