基于API的印度股市数据对接指南
一、引言:为什么要关注印度股市API
随着全球金融市场的互联互通,印度作为世界第五大经济体,其资本市场为开发者提供了丰富的数据机会。NSE(国家证券交易所)和BSE(孟买证券交易所)拥有超过5000家上市公司,日交易额超过700亿美元。对于金融科技开发者而言,能够高效获取这些数据是构建投资分析工具、量化策略和行情应用的基础。
本文将以技术实践的角度,详细介绍如何通过一套通用的API接口对接印度股票市场数据,涵盖实时行情、历史K线、基本面数据等核心功能。
二、环境准备与基础配置
2.1 API访问凭证
任何第三方数据接口都需要身份验证。以下示例展示了如何配置基础请求参数:
# config.py - 配置文件
API_CONFIG = {
'base_url': 'https://api.stocktv.top',
'ws_url': 'wss://ws-api.stocktv.top/connect',
'api_key': 'your_actual_key_here', # 替换为实际密钥
'headers': {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept': 'application/json'
},
'timeout': 10
}
# 印度市场标识码
INDIA_MARKET = {
'country_id': 14,
'exchanges': {
'nse': 46, # 国家证券交易所
'bse': 74 # 孟买证券交易所
}
}
2.2 通用请求封装
创建一个可重用的请求处理器,处理认证和错误:
# api_client.py
import requests
import json
import time
from typing import Optional, Dict, Any
class IndiaStockAPIClient:
def __init__(self, api_key: str):
self.api_key = api_key
self.base_url = API_CONFIG['base_url']
self.session = requests.Session()
self.session.headers.update(API_CONFIG['headers'])
def _make_request(self, endpoint: str, params: Dict[str, Any]) -> Optional[Dict]:
"""通用请求方法,包含重试机制"""
params['key'] = self.api_key
max_retries = 3
for attempt in range(max_retries):
try:
response = self.session.get(
f"{self.base_url}{endpoint}",
params=params,
timeout=API_CONFIG['timeout']
)
if response.status_code == 200:
data = response.json()
if data.get('code') == 200:
return data.get('data')
else:
print(f"API Error: {data.get('message')}")
return None
elif response.status_code == 429:
# 频率限制,等待后重试
wait_time = (attempt + 1) * 5
print(f"Rate limited. Waiting {wait_time}s...")
time.sleep(wait_time)
continue
else:
print(f"HTTP Error {response.status_code}")
return None
except requests.exceptions.RequestException as e:
print(f"Request failed (attempt {attempt+1}): {e}")
if attempt < max_retries - 1:
time.sleep(2 ** attempt) # 指数退避
return None
三、核心数据接口实现
3.1 市场数据获取
获取印度交易所的股票列表是构建任何应用的第一步:
def get_market_list(self, exchange: str = 'nse', page: int = 1,
page_size: int = 100) -> Optional[Dict]:
"""获取交易所股票列表"""
endpoint = "/stock/stocks"
params = {
'countryId': INDIA_MARKET['country_id'],
'exchangeId': INDIA_MARKET['exchanges'][exchange],
'page': page,
'pageSize': page_size
}
data = self._make_request(endpoint, params)
if data:
return {
'total': data.get('total', 0),
'current_page': data.get('current', 1),
'stocks': data.get('records', [])
}
return None
# 使用示例
client = IndiaStockAPIClient(API_CONFIG['api_key'])
market_data = client.get_market_list('nse', page_size=50)
if market_data:
print(f"NSE交易所共有 {market_data['total']} 只股票")
for stock in market_data['stocks'][:5]:
print(f"{stock['symbol']}: {stock['name']} - ₹{stock['last']}")
3.2 实时行情查询
对于交易类应用,实时行情数据至关重要:
def get_real_time_quotes(self, symbols: list) -> Dict[str, Any]:
"""获取多只股票实时行情"""
# 首先获取股票ID(需要先查询symbol到id的映射)
stock_ids = self._resolve_symbols_to_ids(symbols)
if not stock_ids:
return {
}
endpoint = "/stock/queryStocks"
params = {
'id': ','.join(map(str, stock_ids))}
data = self._make_request(endpoint, params)
return self._process_quote_data(data)
def _process_quote_data(self, raw_data: list) -> Dict:
"""处理行情数据"""
processed = {
}
for item in raw_data:
symbol = item.get('symbol')
processed[symbol] = {
'price': item.get('last'),
'change': item.get('chg'),
'change_pct': item.get('chgPct'),
'volume': item.get('volume'),
'high': item.get('high'),
'low': item.get('low'),
'timestamp': item.get('timestamp'),
'market_cap': self._format_market_cap(item.get('fundamentalMarketCap'))
}
return processed
def _format_market_cap(self, value):
"""格式化市值显示"""
if value >= 1e12:
return f"₹{value/1e12:.2f}T"
elif value >= 1e9:
return f"₹{value/1e9:.2f}B"
elif value >= 1e6:
return f"₹{value/1e6:.2f}M"
return f"₹{value}"
3.3 历史K线数据获取
技术分析需要完整的历史价格数据:
def get_historical_data(self, symbol: str, interval: str = 'P1D',
limit: int = 100) -> pd.DataFrame:
"""
获取历史K线数据
Args:
symbol: 股票代码
interval: 时间间隔 (PT5M, PT1H, P1D, P1W, P1M)
limit: 数据点数量
Returns:
pandas DataFrame with OHLCV数据
"""
# 获取股票ID
stock_id = self._get_stock_id(symbol)
if not stock_id:
return pd.DataFrame()
endpoint = "/stock/kline"
params = {
'pid': stock_id,
'interval': interval
}
data = self._make_request(endpoint, params)
return self._parse_kline_data(data, limit)
def _parse_kline_data(self, raw_data: list, limit: int) -> pd.DataFrame:
"""解析K线数据为DataFrame"""
if not raw_data:
return pd.DataFrame()
df = pd.DataFrame(raw_data[:limit])
# 重命名列
column_mapping = {
'time': 'timestamp',
'open': 'open',
'high': 'high',
'low': 'low',
'close': 'close',
'volume': 'volume'
}
df = df.rename(columns=column_mapping)
df['timestamp'] = pd.to_datetime(df['timestamp'], unit='ms')
df.set_index('timestamp', inplace=True)
# 计算技术指标
df['sma_20'] = df['close'].rolling(window=20).mean()
df['sma_50'] = df['close'].rolling(window=50).mean()
return df
四、WebSocket实时数据流
4.1 WebSocket客户端实现
对于高频交易或实时监控应用,WebSocket是更高效的选择:
import asyncio
import websockets
import json
from datetime import datetime
class RealTimeDataStream:
def __init__(self, api_key: str):
self.api_key = api_key
self.ws_url = f"{API_CONFIG['ws_url']}?key={api_key}"
self.connected = False
self.subscriptions = set()
async def connect(self):
"""建立WebSocket连接"""
try:
self.websocket = await websockets.connect(self.ws_url)
self.connected = True
print(f"[{datetime.now()}] WebSocket连接已建立")
# 启动心跳任务
asyncio.create_task(self._keep_alive())
except Exception as e:
print(f"连接失败: {e}")
self.connected = False
async def subscribe(self, symbols: list):
"""订阅股票行情"""
if not self.connected:
await self.connect()
subscription_msg = {
'action': 'subscribe',
'symbols': symbols,
'data_types': ['quote', 'trade'] # 订阅行情和交易数据
}
await self.websocket.send(json.dumps(subscription_msg))
self.subscriptions.update(symbols)
async def start_streaming(self, callback):
"""启动数据流"""
while self.connected:
try:
message = await self.websocket.recv()
data = json.loads(message)
# 处理不同类型的数据
if data.get('type') == 'stock':
await callback(self._process_stock_update(data))
elif data.get('type') == 'heartbeat':
pass # 心跳包,不做处理
except websockets.exceptions.ConnectionClosed:
print("连接已关闭,尝试重连...")
await self.connect()
except Exception as e:
print(f"数据处理错误: {e}")
async def _keep_alive(self):
"""维持连接心跳"""
while self.connected:
await asyncio.sleep(30)
try:
await self.websocket.send(json.dumps({
'action': 'ping'}))
except:
self.connected = False
break
def _process_stock_update(self, data: Dict) -> Dict:
"""处理股票更新数据"""
return {
'symbol': data.get('symbol'),
'price': data.get('last'),
'volume': data.get('volume'),
'bid': data.get('bid', {
}),
'ask': data.get('ask', {
}),
'timestamp': datetime.fromtimestamp(data.get('ts', 0)/1000),
'exchange': data.get('exchangeId')
}
4.2 实时数据处理器示例
class RealTimeProcessor:
def __init__(self):
self.price_alerts = {
}
self.volume_spikes = {
}
async def handle_data(self, update: Dict):
"""处理实时数据更新"""
symbol = update['symbol']
price = update['price']
volume = update['volume']
# 价格预警检查
await self._check_price_alerts(symbol, price)
# 成交量异常检测
await self._detect_volume_spike(symbol, volume)
# 更新本地缓存
self._update_cache(symbol, update)
# 记录日志
self._log_update(update)
async def _check_price_alerts(self, symbol: str, price: float):
"""检查价格触发预警"""
if symbol in self.price_alerts:
alert_price = self.price_alerts[symbol]
if price >= alert_price['high']:
print(f"⚠️ {symbol} 触及高价预警: {price}")
elif price <= alert_price['low']:
print(f"⚠️ {symbol} 触及低价预警: {price}")
def set_alert(self, symbol: str, high: float = None, low: float = None):
"""设置价格预警"""
self.price_alerts[symbol] = {
'high': high, 'low': low}
五、完整应用示例:印度股票监控仪表板
5.1 数据层实现
# data_manager.py
import pandas as pd
import sqlite3
from contextlib import contextmanager
from datetime import datetime, timedelta
class StockDataManager:
def __init__(self, db_path: str = 'india_stocks.db'):
self.db_path = db_path
self._init_database()
def _init_database(self):
"""初始化数据库表"""
with self._get_connection() as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS stock_prices (
id INTEGER PRIMARY KEY AUTOINCREMENT,
symbol TEXT NOT NULL,
price REAL NOT NULL,
volume INTEGER,
timestamp DATETIME NOT NULL,
UNIQUE(symbol, timestamp)
)
''')
conn.execute('''
CREATE INDEX IF NOT EXISTS idx_symbol_time
ON stock_prices(symbol, timestamp DESC)
''')
@contextmanager
def _get_connection(self):
"""数据库连接上下文管理器"""
conn = sqlite3.connect(self.db_path)
try:
yield conn
conn.commit()
finally:
conn.close()
def save_price_data(self, symbol: str, price_data: Dict):
"""保存价格数据到数据库"""
with self._get_connection() as conn:
conn.execute('''
INSERT OR REPLACE INTO stock_prices
(symbol, price, volume, timestamp)
VALUES (?, ?, ?, ?)
''', (
symbol,
price_data['price'],
price_data.get('volume', 0),
price_data['timestamp']
))
def get_historical_prices(self, symbol: str, days: int = 30) -> pd.DataFrame:
"""获取历史价格数据"""
query = '''
SELECT symbol, price, volume, timestamp
FROM stock_prices
WHERE symbol = ?
AND timestamp >= ?
ORDER BY timestamp ASC
'''
cutoff_date = datetime.now() - timedelta(days=days)
with self._get_connection() as conn:
df = pd.read_sql_query(
query,
conn,
params=(symbol, cutoff_date)
)
if not df.empty:
df['timestamp'] = pd.to_datetime(df['timestamp'])
df.set_index('timestamp', inplace=True)
return df
5.2 分析层实现
# analytics.py
import numpy as np
from typing import Tuple
class StockAnalyzer:
@staticmethod
def calculate_returns(prices: pd.Series) -> pd.Series:
"""计算收益率序列"""
returns = prices.pct_change().dropna()
return returns
@staticmethod
def calculate_volatility(returns: pd.Series, window: int = 20) -> pd.Series:
"""计算波动率"""
volatility = returns.rolling(window=window).std() * np.sqrt(252)
return volatility
@staticmethod
def calculate_support_resistance(prices: pd.Series,
window: int = 20) -> Tuple[float, float]:
"""计算支撑位和阻力位"""
support = prices.rolling(window=window).min().iloc[-1]
resistance = prices.rolling(window=window).max().iloc[-1]
return support, resistance
@staticmethod
def generate_signals(prices: pd.Series,
short_window: int = 10,
long_window: int = 30) -> pd.DataFrame:
"""生成交易信号"""
signals = pd.DataFrame(index=prices.index)
signals['price'] = prices
# 计算移动平均
signals['short_ma'] = prices.rolling(window=short_window).mean()
signals['long_ma'] = prices.rolling(window=long_window).mean()
# 生成信号
signals['signal'] = 0
signals.loc[signals['short_ma'] > signals['long_ma'], 'signal'] = 1
signals.loc[signals['short_ma'] < signals['long_ma'], 'signal'] = -1
# 计算持仓变化
signals['positions'] = signals['signal'].diff()
return signals
5.3 主应用程序
# main.py
import asyncio
from datetime import datetime
import matplotlib.pyplot as plt
class IndiaStockMonitorApp:
def __init__(self, api_key: str):
self.api_client = IndiaStockAPIClient(api_key)
self.data_manager = StockDataManager()
self.analyzer = StockAnalyzer()
self.monitored_symbols = ['RELIANCE', 'TCS', 'INFY', 'HDFCBANK']
async def run_monitoring(self):
"""运行监控任务"""
print("印度股票监控系统启动...")
print(f"监控标的: {', '.join(self.monitored_symbols)}")
while True:
try:
# 获取实时数据
quotes = self.api_client.get_real_time_quotes(
self.monitored_symbols
)
if quotes:
# 保存数据
for symbol, data in quotes.items():
self.data_manager.save_price_data(symbol, data)
# 显示实时行情
self._display_quotes(quotes)
# 执行分析
await self._perform_analysis()
# 间隔60秒
await asyncio.sleep(60)
except KeyboardInterrupt:
print("\n监控已停止")
break
except Exception as e:
print(f"监控错误: {e}")
await asyncio.sleep(30)
def _display_quotes(self, quotes: Dict):
"""显示实时行情"""
print(f"\n{'='*50}")
print(f"印度股市行情 - {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"{'='*50}")
for symbol, data in quotes.items():
trend = "↑" if data['change_pct'] > 0 else "↓"
color = "\033[92m" if data['change_pct'] > 0 else "\033[91m"
reset = "\033[0m"
print(f"{symbol:15} {color}₹{data['price']:8.2f} {trend} "
f"{data['change_pct']:5.2f}%{reset} "
f"成交量: {data['volume']:,}")
async def _perform_analysis(self):
"""执行技术分析"""
print("\n技术分析:")
for symbol in self.monitored_symbols[:2]: # 分析前两只股票
historical_data = self.data_manager.get_historical_prices(
symbol, days=30
)
if not historical_data.empty:
prices = historical_data['price']
# 计算技术指标
returns = self.analyzer.calculate_returns(prices)
volatility = self.analyzer.calculate_volatility(returns)
support, resistance = self.analyzer.calculate_support_resistance(prices)
current_price = prices.iloc[-1]
print(f"\n{symbol}:")
print(f" 当前价格: ₹{current_price:.2f}")
print(f" 支撑位: ₹{support:.2f}")
print(f" 阻力位: ₹{resistance:.2f}")
print(f" 波动率: {volatility.iloc[-1]:.2%}" if len(volatility) > 0 else " 波动率: 计算中")
def generate_report(self, symbol: str, days: int = 30):
"""生成分析报告"""
data = self.data_manager.get_historical_prices(symbol, days)
if data.empty:
print(f"没有找到 {symbol} 的历史数据")
return
fig, axes = plt.subplots(2, 2, figsize=(12, 8))
# 价格走势
axes[0, 0].plot(data.index, data['price'])
axes[0, 0].set_title(f'{symbol} 价格走势')
axes[0, 0].set_ylabel('价格 (₹)')
axes[0, 0].grid(True, alpha=0.3)
# 成交量
axes[0, 1].bar(data.index, data['volume'], alpha=0.7)
axes[0, 1].set_title('成交量')
axes[0, 1].set_ylabel('成交量')
# 收益率分布
returns = self.analyzer.calculate_returns(data['price'])
axes[1, 0].hist(returns.dropna(), bins=50, alpha=0.7)
axes[1, 0].set_title('收益率分布')
axes[1, 0].set_xlabel('收益率')
# 移动平均线
signals = self.analyzer.generate_signals(data['price'])
axes[1, 1].plot(signals.index, signals['price'], label='价格')
axes[1, 1].plot(signals.index, signals['short_ma'], label='10日均线')
axes[1, 1].plot(signals.index, signals['long_ma'], label='30日均线')
axes[1, 1].set_title('移动平均线')
axes[1, 1].legend()
plt.tight_layout()
plt.savefig(f'{symbol}_analysis_report.png', dpi=150, bbox_inches='tight')
print(f"分析报告已保存为 {symbol}_analysis_report.png")
# 运行应用
async def main():
# 从环境变量获取API密钥
import os
api_key = os.getenv('STOCK_API_KEY', 'your_api_key_here')
app = IndiaStockMonitorApp(api_key)
# 启动监控
monitor_task = asyncio.create_task(app.run_monitoring())
# 运行2分钟后停止并生成报告
await asyncio.sleep(120)
monitor_task.cancel()
# 生成分析报告
app.generate_report('RELIANCE')
print("程序执行完成")
if __name__ == "__main__":
asyncio.run(main())
六、开发注意事项与最佳实践
6.1 时区处理
印度标准时间(IST)比UTC快5小时30分钟,处理时间数据时需要特别注意:
from datetime import datetime
import pytz
def convert_to_ist(utc_time: datetime) -> datetime:
"""UTC时间转换为IST"""
ist = pytz.timezone('Asia/Kolkata')
return utc_time.astimezone(ist)
def get_market_hours():
"""获取印度股市交易时间"""
ist = pytz.timezone('Asia/Kolkata')
now = datetime.now(ist)
# 交易日判断(周一至周五)
if now.weekday() >= 5: # 5=周六, 6=周日
return False, "非交易日"
# 交易时间判断(9:15-15:30 IST)
market_open = now.replace(hour=9, minute=15, second=0, microsecond=0)
market_close = now.replace(hour=15, minute=30, second=0, microsecond=0)
is_trading = market_open <= now <= market_close
next_event = market_close if is_trading else market_open
return is_trading, next_event
6.2 错误处理与重试机制
from tenacity import retry, stop_after_attempt, wait_exponential
class RobustAPIClient(IndiaStockAPIClient):
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=4, max=10)
)
def get_market_data_with_retry(self, **kwargs):
"""带重试机制的数据获取"""
return self.get_market_list(**kwargs)
def handle_api_errors(self, error):
"""统一的错误处理"""
error_handlers = {
400: "请求参数错误",
401: "认证失败,请检查API密钥",
403: "访问被拒绝",
404: "请求的资源不存在",
429: "请求频率超限,请稍后重试",
500: "服务器内部错误",
503: "服务暂时不可用"
}
error_msg = error_handlers.get(error.status_code, "未知错误")
print(f"API错误 {error.status_code}: {error_msg}")
# 记录错误日志
self.log_error({
'timestamp': datetime.now(),
'endpoint': error.url,
'status_code': error.status_code,
'message': error_msg
})
6.3 性能优化建议
import hashlib
from functools import lru_cache
class OptimizedStockClient(IndiaStockAPIClient):
def __init__(self, api_key: str, cache_ttl: int = 300):
super().__init__(api_key)
self.cache_ttl = cache_ttl
self._cache = {
}
def _get_cache_key(self, endpoint: str, params: Dict) -> str:
"""生成缓存键"""
param_str = json.dumps(params, sort_keys=True)
return hashlib.md5(f"{endpoint}{param_str}".encode()).hexdigest()
@lru_cache(maxsize=128)
def get_static_data(self, symbol: str) -> Dict:
"""获取静态数据(使用内存缓存)"""
# 静态数据变化频率低,适合缓存
endpoint = "/stock/stockInfo"
params = {
'symbol': symbol, 'key': self.api_key}
cache_key = self._get_cache_key(endpoint, params)
if cache_key in self._cache:
cache_entry = self._cache[cache_key]
if datetime.now() - cache_entry['timestamp'] < timedelta(seconds=self.cache_ttl):
return cache_entry['data']
data = self._make_request(endpoint, params)
if data:
self._cache[cache_key] = {
'data': data,
'timestamp': datetime.now()
}
return data
七、实际应用场景扩展
7.1 多市场数据对比分析
class ComparativeAnalyzer:
def __init__(self, api_clients: Dict):
self.clients = api_clients # 不同市场的API客户端
async def compare_market_performance(self, symbols_map: Dict[str, list]):
"""比较不同市场相似股票的表现"""
results = {
}
for market, symbols in symbols_map.items():
client = self.clients.get(market)
if client:
market_data = {
}
for symbol in symbols:
quote = client.get_real_time_quotes([symbol])
if quote:
market_data[symbol] = quote.get(symbol, {
})
results[market] = market_data
# 生成对比报告
self._generate_comparison_report(results)
return results
7.2 实时预警系统
class AlertSystem:
def __init__(self):
self.alerts = {
}
self.triggered_alerts = []
def add_price_alert(self, symbol: str, condition: str,
value: float, callback: callable):
"""添加价格预警"""
alert_id = f"{symbol}_{condition}_{value}"
self.alerts[alert_id] = {
'symbol': symbol,
'condition': condition,
'value': value,
'callback': callback,
'active': True
}
return alert_id
def check_alerts(self, current_prices: Dict[str, float]):
"""检查预警条件"""
for alert_id, alert in self.alerts.items():
if not alert['active']:
continue
symbol = alert['symbol']
current_price = current_prices.get(symbol)
if current_price is None:
continue
triggered = False
condition = alert['condition']
target_value = alert['value']
if condition == "above" and current_price > target_value:
triggered = True
elif condition == "below" and current_price < target_value:
triggered = True
elif condition == "change_pct" and abs(current_price) > target_value:
triggered = True
if triggered:
alert['callback']({
'symbol': symbol,
'current_price': current_price,
'condition': condition,
'target_value': target_value,
'timestamp': datetime.now()
})
self.triggered_alerts.append(alert_id)
alert['active'] = False # 触发后停用
八、总结
通过本文的技术实现,我们构建了一个完整的印度股票数据对接系统。这个系统具备以下特点:
- 模块化设计:分离了数据获取、存储、分析和展示层
- 实时处理能力:支持HTTP轮询和WebSocket两种数据获取方式
- 容错机制:实现了完善的错误处理和重试逻辑
- 数据分析功能:内置了基础的技术分析指标计算
- 可扩展架构:易于添加新的数据源或分析功能
在实际开发中,开发者可以根据具体需求调整和扩展这个基础框架。例如,可以添加机器学习模型进行价格预测,或者集成更多金融市场的数据源进行跨市场分析。
印度股市的国际化程度不断提高,对于金融科技开发者而言,掌握其数据接口技术将为构建跨国投资工具奠定坚实基础。无论是个人投资者使用的移动应用,还是机构使用的专业分析平台,稳定可靠的数据接口都是系统成功的关键。
重要提示:生产环境使用请确保遵守相关数据使用协议,合理控制请求频率,并实现适当的数据缓存策略以优化性能。