一、业务需求分析
根据业务需求,我们需要将spark处理过的数据写入到mysql数据库中,再实时推送到web前段页面,这里的spark处理数据并写入mysql数据库上篇博客已经讲到:https://blog.csdn.net/weixin_45366499/article/details/108903149
接下来的工作就是将数据库中的数据展示到界面上。
大体流程:
数据库——Java数据服务层——webSocket服务层——前端页面
二、web系统数据处理服务层开发
首先要创建一个web工程,为了和之前spark的工程分开,我新建了一个sparkTest_web工程。
首先要配置好tomcat服务器:
这里的端口我设置为8888,可以根据需求自己设定
安装好tomcat之后,需要接一下lib包
下面开始编写WeblogService程序:
package com.spark.service; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.util.HashMap; import java.util.Map; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/10/4 * @time : 2:46 下午 */ public class WeblogService { static String url = "jdbc:mysql://node1:3306/test"; static String username = "root"; static String password = "199911"; public Map<String,Object> queryWeblogs() { Connection conn = null; PreparedStatement pst = null; String[] titleNames = new String[30]; String[] titleCounts = new String[30]; Map<String,Object> retMap = new HashMap<String, Object>(); // 创建数据库连接 try{ Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection(url,username,password); // 统计titleName次数前30名 String query_sql = "select titleName,count from webCount where 1=1 order by count desc limit 30"; pst = conn.prepareStatement(query_sql); ResultSet rs = pst.executeQuery(); int i = 0; while (rs.next()){ String titleName = rs.getString("titleName"); String titleCount = rs.getString("count"); titleNames[i] = titleName; titleCounts[i] = titleCount; ++i; } retMap.put("titleName", titleNames); retMap.put("titleCount", titleCounts); }catch(Exception e){ e.printStackTrace(); // 释放资源 }finally{ try { if (pst != null) { pst.close(); } if (conn != null) { conn.close(); } }catch(Exception e){ e.printStackTrace(); } } return retMap; } // 统计总titleName的数量 public String[] titleCount() { Connection conn = null; PreparedStatement pst = null; String[] titleSums = new String[1]; // 创建数据库连接 try{ Class.forName("com.mysql.jdbc.Driver"); conn = DriverManager.getConnection(url,username,password); String query_sql = "select count(1) titleSum from webCount"; pst = conn.prepareStatement(query_sql); ResultSet rs = pst.executeQuery(); if(rs.next()){ String titleSum = rs.getString("titleSum"); titleSums[0] = titleSum; } }catch(Exception e){ e.printStackTrace(); // 释放资源 }finally{ try{ if (pst != null) { pst.close(); } if (conn != null) { conn.close(); } }catch(Exception e){ e.printStackTrace(); } } return titleSums; } }
三、基于WebSocket协议的数据推送服务开发
在web/WEB-INF下添加lib包,具体如图所示:
编写WeblogSocket程序:
package com.spark.service; import com.alibaba.fastjson.JSON; import javax.websocket.OnClose; import javax.websocket.OnMessage; import javax.websocket.OnOpen; import javax.websocket.Session; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.HashMap; import java.util.Map; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/10/4 * @time : 2:46 下午 */ @ServerEndpoint("/websocket") public class WeblogSocket { WeblogService weblogService = new WeblogService(); @OnMessage public void onMessage(String message, Session session) throws IOException, InterruptedException { while(true){ Map<String, Object> map = new HashMap<String, Object>(); map.put("titleName", weblogService.queryWeblogs().get("titleName")); map.put("titleCount",weblogService.queryWeblogs().get("titleCount")); map.put("titleSum", weblogService.titleCount()); session.getBasicRemote(). sendText(JSON.toJSONString(map)); Thread.sleep(1000); map.clear(); } } @OnOpen public void onOpen () { System.out.println("Client connected"); } @OnClose public void onClose () { System.out.println("Connection closed"); } }
四、基于Echart框架的页面展示层开发
首先要下载E charts和jquery
需要再添加ws,配置pom.xml文件,添加如下内容:
<dependencies> <dependency> <groupId>javax</groupId> <artifactId>javaee-api</artifactId> <version>7.0</version> <scope>provided</scope> </dependency> </dependencies>
最后在编写index.html文件:
<!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Title</title> <script src="js/echarts.min.js"></script> <script src="js/jquery-3.2.1.js"></script> <style> body{ text-align:center; background-color: #dbdddd; } .div{ margin:0 auto; width:1000px; height:800px; border:1px solid #F00} /* css注释:为了观察效果设置宽度 边框 高度等样式 */ </style> </head> <body> <h1>卡弗卡大数据 — 新闻网话题用户浏览实时统计分析</h1> <div> <div id="main" style="width:863px;height: 700px;float:left;">第一个</div> <div id="sum" style="width:800px;height: 700px;float:right;">第二个</div> </div> <div> <input type="submit" value="实时分析" onclick="start()" /> </div> <div id="messages"></div> <script type="text/javascript"> var webSocket = new WebSocket('ws://localhost:8888/sparkStu_web_war_exploded/websocket') var myChart = echarts.init(document.getElementById('main')); var myChart_sum = echarts.init(document.getElementById('sum')); webSocket.onerror = function(event) { onError(event) }; webSocket.onopen = function(event) { onOpen(event) }; webSocket.onmessage = function(event) { onMessage(event) }; function onMessage(event) { var sd = JSON.parse(event.data); processingData(sd); titleSum(sd.titleSum); } function onOpen(event) { } function onError(event) { alert(event.data); } function start() { alert(webSocket) webSocket.send('hello'); return false; } function processingData(json){ var option = { backgroundColor: '#ffffff',//背景色 title: { text: '新闻话题浏览量【实时】排行', subtext: '数据来自搜狗实验室', textStyle: { fontWeight: 'normal', //标题颜色 color: '#408829' }, }, tooltip: { trigger: 'axis', axisPointer: { type: 'shadow' } }, legend: { data: ['浏览量'] }, grid: { left: '3%', right: '4%', bottom: '3%', containLabel: true }, xAxis: { type: 'value', boundaryGap: [0, 0.01] }, yAxis: { type: 'category', data:json.titleName }, series: [ { name: '浏览量', type: 'bar', label: { normal: { show: true, position: 'insideRight' } }, itemStyle:{ normal:{color:'#f47209',size:'50px'} }, data: json.titleCount } ] }; myChart.setOption(option); } function titleSum(data){ var option = { backgroundColor: '#fbfbfb',//背景色 title: { text: '新闻话题曝光量【实时】统计', subtext: '数据来自搜狗实验室' }, tooltip : { formatter: "{a} <br/>{b} : {c}%" }, toolbox: { feature: { restore: {}, saveAsImage: {} } }, series: [ { name: '业务指标', type: 'gauge', max:50000, detail: {formatter:'{value}个话题'}, data: [{value: 50, name: '话题曝光量'}] } ] }; option.series[0].data[0].value = data; myChart_sum.setOption(option, true); } </script> </body> </html>
五、运行展示
当启动所有的服务正常时,运行tomcat服务器,就能看到可视化界面: