第三部分:实时数据可视化
3.1 WebSocket实时数据推送
// 前端WebSocket连接
class RealtimeDashboard {
constructor() {
this.ws = null;
this.charts = {};
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 10;
}
// 初始化连接
connect() {
this.ws = new WebSocket('ws://localhost:8080/ws/dashboard');
this.ws.onopen = () => {
console.log('WebSocket连接成功');
this.reconnectAttempts = 0;
// 订阅数据流
this.subscribe('realtime_metrics');
};
this.ws.onmessage = (event) => {
const data = JSON.parse(event.data);
this.handleData(data);
};
this.ws.onclose = () => {
console.log('WebSocket连接断开');
this.reconnect();
};
this.ws.onerror = (error) => {
console.error('WebSocket错误:', error);
};
}
// 重连机制
reconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = Math.min(1000 * Math.pow(2, this.reconnectAttempts), 30000);
setTimeout(() => this.connect(), delay);
}
}
// 订阅数据
subscribe(channel) {
this.ws.send(JSON.stringify({
action: 'subscribe',
channel: channel
}));
}
// 处理实时数据
handleData(data) {
switch(data.type) {
case 'realtime_metrics':
this.updateMetrics(data.metrics);
break;
case 'realtime_chart':
this.updateChart(data.chartId, data.dataPoint);
break;
case 'alert':
this.showAlert(data.alert);
break;
}
}
// 更新指标卡片
updateMetrics(metrics) {
// 更新DOM
document.getElementById('gmv').innerText = this.formatNumber(metrics.gmv);
document.getElementById('order_count').innerText = metrics.orderCount;
document.getElementById('user_count').innerText = metrics.userCount;
// 添加动画效果
this.addPulseEffect('gmv');
}
// 更新图表
updateChart(chartId, dataPoint) {
const chart = this.charts[chartId];
if (chart) {
// 追加新数据点
chart.appendData({
seriesIndex: 0,
data: [dataPoint]
});
}
}
// 格式化数字
formatNumber(num) {
if (num >= 10000) {
return (num / 10000).toFixed(1) + '万';
}
return num.toLocaleString();
}
// 添加脉冲效果
addPulseEffect(elementId) {
const element = document.getElementById(elementId);
element.classList.add('pulse');
setTimeout(() => element.classList.remove('pulse'), 500);
}
// 显示告警
showAlert(alert) {
const alertDiv = document.createElement('div');
alertDiv.className = `alert alert-${alert.level}`;
alertDiv.innerHTML = `
<strong>${alert.title}</strong>
<span>${alert.message}</span>
<button onclick="this.parentElement.remove()">×</button>
`;
document.getElementById('alert-container').appendChild(alertDiv);
// 5秒后自动消失
setTimeout(() => alertDiv.remove(), 5000);
}
}
3.2 后端WebSocket服务(Spring Boot)
// WebSocket配置
@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
@Override
public void registerStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/ws")
.setAllowedOrigins("*")
.withSockJS();
}
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.enableSimpleBroker("/topic");
registry.setApplicationDestinationPrefixes("/app");
}
}
// WebSocket控制器
@Controller
public class WebSocketController {
private final SimpMessagingTemplate messagingTemplate;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(4);
public WebSocketController(SimpMessagingTemplate messagingTemplate) {
this.messagingTemplate = messagingTemplate;
startDataSimulation();
}
// 模拟实时数据推送
private void startDataSimulation() {
scheduler.scheduleAtFixedRate(() -> {
// 生成实时指标
Map<String, Object> metrics = new HashMap<>();
metrics.put("gmv", Math.random() * 100000);
metrics.put("orderCount", (int)(Math.random() * 1000));
metrics.put("userCount", (int)(Math.random() * 5000));
metrics.put("timestamp", System.currentTimeMillis());
// 推送给所有订阅者
messagingTemplate.convertAndSend("/topic/realtime", metrics);
}, 0, 1, TimeUnit.SECONDS);
}
// 处理客户端订阅
@MessageMapping("/subscribe")
public void subscribe(String channel) {
System.out.println("Client subscribed to: " + channel);
}
}
第四部分:大数据可视化
4.1 数据聚合与采样
处理大数据时,直接渲染所有数据点会导致性能问题。
// 数据采样算法
class DataSampler {
// 1. 系统采样:每隔N个点取一个
systematicSampling(data, targetCount) {
const step = Math.ceil(data.length / targetCount);
const result = [];
for (let i = 0; i < data.length; i += step) {
result.push(data[i]);
}
return result;
}
// 2. 最大三角形采样(LTTB)- 保留视觉特征
lttbSampling(data, threshold) {
if (threshold >= data.length) return data;
const result = [];
const bucketSize = (data.length - 2) / (threshold - 2);
result.push(data[0]); // 第一个点
let lastIndex = 0;
for (let i = 1; i < threshold - 1; i++) {
const avgRangeStart = Math.floor((i - 1) * bucketSize) + 1;
const avgRangeEnd = Math.floor(i * bucketSize) + 1;
let avgX = 0, avgY = 0;
for (let j = avgRangeStart; j < avgRangeEnd; j++) {
avgX += data[j].x;
avgY += data[j].y;
}
avgX /= (avgRangeEnd - avgRangeStart);
avgY /= (avgRangeEnd - avgRangeStart);
let maxArea = -1;
let maxAreaIndex = -1;
for (let j = avgRangeStart; j < avgRangeEnd; j++) {
const area = Math.abs(
(data[lastIndex].x - avgX) * (data[j].y - data[lastIndex].y) -
(data[lastIndex].x - data[j].x) * (avgY - data[lastIndex].y)
);
if (area > maxArea) {
maxArea = area;
maxAreaIndex = j;
}
}
result.push(data[maxAreaIndex]);
lastIndex = maxAreaIndex;
}
result.push(data[data.length - 1]); // 最后一个点
return result;
}
// 3. 自适应降采样
adaptiveSampling(data, maxPoints) {
if (data.length <= maxPoints) return data;
// 计算数据范围
const xValues = data.map(d => d.x);
const minX = Math.min(...xValues);
const maxX = Math.max(...xValues);
// 动态确定桶大小
const range = maxX - minX;
const bucketSize = range / maxPoints;
const buckets = new Map();
for (const point of data) {
const bucketIndex = Math.floor((point.x - minX) / bucketSize);
if (!buckets.has(bucketIndex)) {
buckets.set(bucketIndex, []);
}
buckets.get(bucketIndex).push(point);
}
// 每个桶取代表点(平均值或中位数)
const result = [];
for (const [_, bucketPoints] of buckets) {
const avgX = bucketPoints.reduce((sum, p) => sum + p.x, 0) / bucketPoints.length;
const avgY = bucketPoints.reduce((sum, p) => sum + p.y, 0) / bucketPoints.length;
result.push({ x: avgX, y: avgY });
}
return result;
}
}
4.2 数据聚合与预计算
// 数据预计算服务
@Service
public class DataAggregationService {
private final JdbcTemplate jdbcTemplate;
private final RedisTemplate<String, Object> redisTemplate;
// 按小时聚合
public void aggregateByHour(String metric, LocalDate date) {
String sql = "SELECT HOUR(create_time) as hour, SUM(value) as total " +
"FROM raw_data " +
"WHERE DATE(create_time) = ? " +
"GROUP BY HOUR(create_time)";
List<Map<String, Object>> results = jdbcTemplate.queryForList(sql, date);
for (Map<String, Object> result : results) {
String key = String.format("agg:%s:hour:%s:%02d",
metric, date, result.get("hour"));
redisTemplate.opsForValue().set(key, result.get("total"));
redisTemplate.expire(key, 30, TimeUnit.DAYS);
}
}
// 多级聚合:小时 -> 日 -> 月 -> 年
public void buildRollupTable(String metric, LocalDate startDate, LocalDate endDate) {
// 日聚合
for (LocalDate date = startDate; !date.isAfter(endDate); date = date.plusDays(1)) {
aggregateByHour(metric, date);
aggregateByDay(metric, date);
}
// 月聚合
aggregateByMonth(metric, startDate.getYear(), startDate.getMonthValue());
// 年聚合
aggregateByYear(metric, startDate.getYear());
}
// 查询聚合数据
public Object queryAggregatedData(String metric, String granularity, LocalDate start, LocalDate end) {
String pattern = String.format("agg:%s:%s:*", metric, granularity);
Set<String> keys = redisTemplate.keys(pattern);
Map<String, Object> results = new LinkedHashMap<>();
for (String key : keys) {
Object value = redisTemplate.opsForValue().get(key);
results.put(key, value);
}
return results;
}
}