实战指南:使用API获取BSE股票实时数据
在当今全球化的投资环境中,印度股市作为近年来表现最强劲的新兴市场之一,吸引了众多开发者和投资者的关注。孟买证券交易所(BSE)作为亚洲历史最悠久的交易所,其实时数据对于量化交易、投资决策和金融应用开发具有重要意义。本文将详细介绍如何通过API接口获取BSE股票的实时数据,为开发者提供完整的技术实现方案。
一、准备工作:获取API密钥与基础配置
1.1 获取API密钥
所有API请求都需要有效的API密钥进行身份验证。
API密钥格式示例:MY4b781f618e3f43c4b055f25fa619xxxx
1.2 基础参数配置
在API系统中,印度市场的唯一标识符为 countryId=14。主要交易所ID如下:
- NSE(印度国家证券交易所):
exchangeId=46 - BSE(孟买证券交易所):
exchangeId=74
基础URL:https://api.stocktv.top
WebSocket URL:wss://ws-api.stocktv.top/connect
二、核心实时数据接口详解
2.1 获取BSE股票实时列表
这是获取BSE交易所所有股票实时行情的基础接口,包含最新价、涨跌幅、成交量等核心指标。
接口地址:GET /stock/stocks
请求参数:
| 参数名 | 必填 | 说明 |
| :--- | :--- | :--- |
| countryId | 是 | 印度市场ID,固定为14 |
| exchangeId | 否 | BSE交易所ID,此处应填74 |
| pageSize | 否 | 每页数量 |
| page | 否 | 页码 |
| key | 是 | 你的API密钥 |
返回示例:
{
"code": 200,
"message": "操作成功",
"data": {
"records": [
{
"id": 946725,
"name": "Inventure Growth Securities",
"symbol": "IGSL",
"exchangeId": 74,
"last": 2.46,
"chgPct": -20.13,
"volume": 1234567,
"technicalDay": "strong_buy",
"countryNameTranslated": "India"
}
]
}
}
关键字段说明:
last:最新成交价(实时同步)chgPct:涨跌幅百分比volume:成交量(反映市场流动性)technicalDay:日线技术指标建议(如strong_buy)
2.2 获取单只股票实时行情
针对特定股票获取详细的实时行情数据,包括买卖盘口信息。
接口地址:GET /market/todayMarket
请求参数:
symbol:股票代码(如ADANIENT.NS)key:您的API Key
示例请求:
curl --location 'https://api.stocktv.top/market/todayMarket?key=YOUR_API_KEY&symbol=ADANIENT.NS'
响应示例:
{
"code": 200,
"message": "操作成功",
"data": {
"lastPrice": "2145.65",
"previous_close": "2133.35",
"open": "2138.00",
"high": "2150.00",
"low": "2135.20",
"volume": "1234567",
"bid": "2145.60 x 100",
"ask": "2145.70 x 80",
"pc": "+12.30",
"pcp": "+0.58%",
"time": "15:29:45",
"timestamp": "1717728251"
}
}
2.3 获取BSE指数实时数据
监控BSE Sensex指数的实时波动,把握市场整体走势。
接口地址:GET /stock/indices
请求参数:
countryId:14(印度)key:您的API Key
返回字段:
isOpen:交易所开关盘状态last:实时点位high:当日最高low:当日最低chgPct:涨跌幅
三、WebSocket实时数据订阅
对于对延迟极其敏感的量化交易场景,WebSocket提供了毫秒级的实时数据推送能力。
3.1 WebSocket连接配置
class BSEWebSocket {
constructor(apiKey) {
this.wsUrl = `wss://ws-api.stocktv.top/connect?key=${
apiKey}`;
this.socket = null;
}
connect() {
this.socket = new WebSocket(this.wsUrl);
this.socket.onopen = () => {
// 订阅Sensex指数和BSE股票
const subscribeMsg = {
action: "subscribe",
symbols: ["SENSEX", "RELIANCE.BO", "TATASTEEL.BO"],
countryId: 14 // 印度国家代码
};
this.socket.send(JSON.stringify(subscribeMsg));
};
this.socket.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('实时行情更新:', data);
// 处理实时数据更新
};
this.socket.onerror = (error) => {
console.error('WebSocket错误:', error);
};
}
}
3.2 实时数据推送格式
WebSocket推送的数据包含以下关键信息:
symbol:股票代码last:最新价volume:成交量bid:买一价ask:卖一价timestamp:毫秒级时间戳
四、代码实战示例
4.1 Python实时数据获取
import requests
import pandas as pd
from datetime import datetime
class BSERealTimeAPI:
def __init__(self, api_key):
self.base_url = "https://api.stocktv.top"
self.api_key = api_key
self.headers = {
"User-Agent": "PythonStockClient/1.0",
"Accept-Encoding": "gzip"
}
def get_bse_stocks_realtime(self, page_size=20):
"""获取BSE股票实时列表"""
url = f"{self.base_url}/stock/stocks"
params = {
"countryId": 14,
"exchangeId": 74,
"pageSize": page_size,
"page": 1,
"key": self.api_key
}
try:
response = requests.get(url, params=params, headers=self.headers, timeout=10)
data = response.json()
if data.get('code') == 200:
stocks = data['data']['records']
df = pd.DataFrame(stocks)
# 添加时间戳
df['update_time'] = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"成功获取{len(stocks)}只BSE股票实时数据")
return df
else:
print(f"请求失败: {data.get('message')}")
return None
except Exception as e:
print(f"网络错误: {e}")
return None
def get_single_stock_realtime(self, symbol):
"""获取单只股票实时行情"""
url = f"{self.base_url}/market/todayMarket"
params = {
"symbol": symbol,
"key": self.api_key
}
try:
response = requests.get(url, params=params, headers=self.headers, timeout=5)
data = response.json()
if data.get('code') == 200:
stock_data = data['data']
print(f"{symbol} 实时行情:")
print(f"最新价: {stock_data.get('lastPrice')}")
print(f"涨跌幅: {stock_data.get('pcp')}")
print(f"成交量: {stock_data.get('volume')}")
print(f"买一价: {stock_data.get('bid')}")
print(f"卖一价: {stock_data.get('ask')}")
return stock_data
else:
print(f"获取失败: {data.get('message')}")
return None
except Exception as e:
print(f"请求错误: {e}")
return None
def get_bse_indices(self):
"""获取BSE指数实时数据"""
url = f"{self.base_url}/stock/indices"
params = {
"countryId": 14,
"key": self.api_key
}
try:
response = requests.get(url, params=params, headers=self.headers, timeout=5)
data = response.json()
if data.get('code') == 200:
indices = data['data']
# 过滤BSE相关指数
bse_indices = [idx for idx in indices if 'BSE' in idx.get('name', '')]
return bse_indices
else:
print(f"获取指数失败: {data.get('message')}")
return []
except Exception as e:
print(f"请求错误: {e}")
return []
# 使用示例
if __name__ == "__main__":
# 初始化API客户端
api = BSERealTimeAPI(api_key="YOUR_API_KEY")
# 1. 获取BSE股票实时列表
bse_stocks = api.get_bse_stocks_realtime(10)
if bse_stocks is not None:
print("\n--- BSE股票实时行情 ---")
print(bse_stocks[['symbol', 'name', 'last', 'chgPct', 'volume']].head())
# 2. 获取单只股票实时行情
api.get_single_stock_realtime("RELIANCE.NS")
# 3. 获取BSE指数
bse_indices = api.get_bse_indices()
if bse_indices:
print("\n--- BSE指数实时数据 ---")
for idx in bse_indices:
print(f"{idx.get('name')}: {idx.get('last')} ({idx.get('chgPct')}%)")
4.2 Node.js实时数据监控
const axios = require('axios');
const WebSocket = require('ws');
class BSERealTimeMonitor {
constructor(apiKey) {
this.apiKey = apiKey;
this.baseUrl = 'https://api.stocktv.top';
this.wsUrl = `wss://ws-api.stocktv.top/connect?key=${
apiKey}`;
this.wsConnection = null;
}
async fetchBSEStocks() {
try {
const response = await axios.get(`${
this.baseUrl}/stock/stocks`, {
params: {
countryId: 14,
exchangeId: 74,
pageSize: 15,
key: this.apiKey
},
timeout: 10000
});
if (response.data.code === 200) {
const stocks = response.data.data.records;
console.log(`获取到${
stocks.length}只BSE股票实时数据`);
// 实时数据监控
stocks.forEach(stock => {
console.log(`${
stock.symbol}: ${
stock.last} (${
stock.chgPct}%)`);
});
return stocks;
} else {
console.error('API返回错误:', response.data.message);
return null;
}
} catch (error) {
console.error('请求失败:', error.message);
return null;
}
}
connectWebSocket(symbols = ['SENSEX']) {
this.wsConnection = new WebSocket(this.wsUrl);
this.wsConnection.on('open', () => {
console.log('WebSocket连接已建立');
// 订阅股票
const subscribeMsg = {
action: 'subscribe',
symbols: symbols,
countryId: 14
};
this.wsConnection.send(JSON.stringify(subscribeMsg));
});
this.wsConnection.on('message', (data) => {
const parsedData = JSON.parse(data);
this.handleRealTimeData(parsedData);
});
this.wsConnection.on('error', (error) => {
console.error('WebSocket错误:', error);
});
this.wsConnection.on('close', () => {
console.log('WebSocket连接已关闭');
});
}
handleRealTimeData(data) {
// 实时数据处理逻辑
const timestamp = new Date().toISOString();
console.log(`[${
timestamp}] ${
data.symbol}: ${
data.last} 成交量: ${
data.volume}`);
// 可以添加更多业务逻辑,如:
// 1. 价格预警
// 2. 成交量异常检测
// 3. 数据存储
// 4. 实时图表更新
}
disconnectWebSocket() {
if (this.wsConnection) {
this.wsConnection.close();
}
}
}
// 使用示例
async function main() {
const monitor = new BSERealTimeMonitor('YOUR_API_KEY');
// 获取BSE股票列表
const stocks = await monitor.fetchBSEStocks();
if (stocks) {
// 提取前5只股票的代码用于WebSocket订阅
const symbolsToMonitor = stocks.slice(0, 5).map(stock => stock.symbol);
symbolsToMonitor.push('SENSEX'); // 添加Sensex指数
// 建立WebSocket连接进行实时监控
monitor.connectWebSocket(symbolsToMonitor);
// 模拟运行一段时间后断开
setTimeout(() => {
monitor.disconnectWebSocket();
console.log('监控结束');
}, 300000); // 运行5分钟
}
}
main();
五、实时数据应用场景
5.1 量化交易系统
通过实时数据接口,可以构建高频交易策略:
- 价格突破预警
- 成交量异常检测
- 市场情绪分析
- 套利机会识别
5.2 实时监控仪表盘
开发企业级监控系统:
- 投资组合实时估值
- 风险指标监控
- 市场异常预警
- 业绩归因分析
5.3 移动端实时行情
集成到移动应用中:
- 自选股实时提醒
- 股价波动推送
- 技术指标实时计算
- 新闻事件关联分析
5.4 金融数据门户
构建专业行情网站:
- 实时涨跌排行榜
- 板块轮动监控
- 资金流向分析
- 市场热度指标
六、最佳实践与性能优化
6.1 数据缓存策略
import time
from functools import lru_cache
class CachedBSEData:
def __init__(self, api_client):
self.api = api_client
self.cache_ttl = 60 # 缓存60秒
@lru_cache(maxsize=128)
def get_cached_stocks(self, page_size):
"""带缓存的股票数据获取"""
current_time = time.time()
cache_key = f"stocks_{page_size}"
# 检查缓存是否过期
if hasattr(self, '_last_fetch_time') and \
(current_time - self._last_fetch_time) < self.cache_ttl:
return self._cached_data
# 获取新数据
data = self.api.get_bse_stocks_realtime(page_size)
self._cached_data = data
self._last_fetch_time = current_time
return data
6.2 错误处理与重试机制
import time
from requests.exceptions import RequestException
class RobustBSEClient:
def __init__(self, api_key, max_retries=3):
self.api_key = api_key
self.max_retries = max_retries
self.base_url = "https://api.stocktv.top"
def fetch_with_retry(self, endpoint, params):
"""带指数退避的重试机制"""
for attempt in range(self.max_retries):
try:
response = requests.get(
f"{self.base_url}/{endpoint}",
params=params,
timeout=10
)
if response.status_code == 200:
data = response.json()
if data.get('code') == 200:
return data['data']
else:
print(f"API业务错误: {data.get('message')}")
except RequestException as e:
if attempt == self.max_retries - 1:
raise e
# 指数退避
wait_time = 2 ** attempt
print(f"第{attempt+1}次重试,等待{wait_time}秒...")
time.sleep(wait_time)
return None
6.3 实时数据质量监控
class DataQualityMonitor:
def __init__(self):
self.last_update_time = {
}
self.data_gaps = {
}
def check_data_freshness(self, symbol, current_time):
"""检查数据新鲜度"""
if symbol in self.last_update_time:
time_diff = current_time - self.last_update_time[symbol]
if time_diff > 300: # 超过5分钟无更新
print(f"警告: {symbol} 数据更新延迟 {time_diff}秒")
return False
self.last_update_time[symbol] = current_time
return True
def detect_anomalies(self, current_price, historical_prices):
"""检测价格异常"""
if len(historical_prices) < 10:
return False
avg_price = sum(historical_prices) / len(historical_prices)
std_dev = (sum((p - avg_price) ** 2 for p in historical_prices) / len(historical_prices)) ** 0.5
# 检查是否超过3个标准差
if abs(current_price - avg_price) > 3 * std_dev:
print(f"异常检测: 价格波动超过3个标准差")
return True
return False