基于spark的医疗大数据可视化大屏项目
项目背景
在当今的医疗领域,数据驱动的决策变得日益重要。心力衰竭作为常见的心血管疾病,其临床数据的分析对于改善患者治疗结果至关重要。本文将介绍如何利用Apache Spark进行大规模心力衰竭临床数据的分析,并结合机器学习模型,构建一个交互式的可视化大屏,以直观展示数据分析结果。
数据读取与清洗
使用PySpark库,我们首先读取CSV文件中的心力衰竭临床记录数据,并进行必要的数据清洗工作,包括处理缺失值和异常值。
from pyspark.sql import SparkSession def read_data_csv(spark): df = spark.read.csv("heart_failure_clinical_records_dataset.csv", header=True, inferSchema=True) df.show() return df
数据分析
我们设计了多个SQL查询,以分析心力衰竭患者的不同临床特征:
1、患者年龄分布:分析不同年龄段患者的死亡事件频率。
def age_death(spark): df = spark.sql(""" SELECT (CASE WHEN age >= 0 AND age < 50 THEN '0-49' WHEN age >= 50 AND age < 70 THEN '50-69' WHEN age >= 70 THEN '70+' END) AS age_group, sum(DEATH_EVENT) AS frequency FROM heart GROUP BY age_group; """) df.show() return df
2、性别比例:统计患者性别分布。
def sex_ana(spark): df = spark.sql(""" SELECT sex, COUNT(*) AS count FROM heart GROUP BY sex; """) df.show() return df
3、糖尿病与CPK水平:对比糖尿病与非糖尿病患者的肌酸磷酸激酶(CPK)水平。
#3.糖尿病与非糖尿病患者的肌酸磷酸激酶(CPK)水平对比 def average_cpk(spark): df = spark.sql(""" SELECT diabetes, AVG(creatinine_phosphokinase) AS average_cpk FROM heart GROUP BY diabetes; """) df.show() return df
4、死亡事件时间分布:统计每个时间段内的死亡事件数量。
#4.每个时间段的死亡事件数量 def death_sum(spark): df = spark.sql(""" SELECT time, COUNT(*) AS death_count FROM heart WHERE death_event = 1 GROUP BY time ORDER BY time; """) df.show() return df
5、射血分数区间分布:分析不同射血分数区间的患者数量。
def hypertension_prevalence(spark): df = spark.sql(""" SELECT (CASE WHEN ejection_fraction < 30 THEN '< 30%' WHEN ejection_fraction >= 30 AND ejection_fraction < 40 THEN '30-39%' WHEN ejection_fraction >= 40 AND ejection_fraction < 50 THEN '40-49%' WHEN ejection_fraction >= 50 AND ejection_fraction < 60 THEN '50-59%' WHEN ejection_fraction >= 60 THEN '60% and Above' END) AS ef_range, COUNT(*) AS patient_count FROM heart GROUP BY ef_range ORDER BY ef_range; """) df.show() return df
6、血小板计数与死亡事件:探索血小板计数与死亡事件之间的关系。
#6.血小板计数与死亡事件的关系 def platelet_range_count(spark): df = spark.sql(""" SELECT (CASE WHEN platelets < 100000 THEN '< 100000' WHEN platelets >= 100000 AND platelets < 150000 THEN '100000-150000' WHEN platelets >= 1500000 AND platelets < 300000 THEN '150000-300000' WHEN platelets >= 300000 AND platelets < 450000 THEN '300000-450000' WHEN platelets >= 450000 THEN '450000 and Above' END) AS platelet_range, SUM(CASE WHEN death_event = 1 THEN 1 ELSE 0 END) AS death_count FROM heart GROUP BY platelet_range; """) df.show() return df
7、糖尿病与死亡事件:分析糖尿病患者的死亡事件数量。
#7.糖尿病与死亡事件的关系 def death_rate_diabetes(spark): df = spark.sql(""" SELECT diabetes, SUM(death_event) AS death_events FROM heart GROUP BY diabetes; """) df.show() return df
机器学习模型
使用Pandas和Scikit-learn库对数据进行预处理,并应用KMeans聚类算法来识别心力衰竭患者中的不同亚群。
数据预处理
我们选择特征列,使用StandardScaler进行数据标准化处理。
from sklearn.preprocessing import StandardScaler scaler = StandardScaler() scaled_features = scaler.fit_transform(features)
确定最优聚类数
通过Elbow Method和轮廓系数(Silhouette Score)分析,我们确定最佳的聚类数。
K_values = range(2, 50) wcss_values = [] for K in K_values: # 创建KMeans实例 kmeans = KMeans(n_clusters=K, random_state=2) # 拟合模型 kmeans.fit(scaled_features) # 计算WCSS wcss = kmeans.inertia_ wcss_values.append(wcss)
绘制WCSS与K值的关系图
K_values = range(2, 50) wcss_values = [] for K in K_values: # 创建KMeans实例 kmeans = KMeans(n_clusters=K, random_state=2) # 拟合模型 kmeans.fit(scaled_features) # 计算WCSS wcss = kmeans.inertia_ wcss_values.append(wcss) # 绘制WCSS与K值的关系图 plt.plot(K_values, wcss_values, 'bo-') plt.xlabel('Number of clusters (K)') plt.ylabel('WCSS') plt.title('The Elbow Method showing the optimal k') plt.show()
绘制轮廓系数与K值的关系图
# 计算每个K值的轮廓系数 silhouette_scores = [] for K in K_values: kmeans = KMeans(n_clusters=K, random_state=42) kmeans.fit(scaled_features) labels = kmeans.labels_ score = silhouette_score(scaled_features, labels) silhouette_scores.append(score) # 绘制轮廓系数与K值的关系图 plt.plot(K_values, silhouette_scores, 'ro-') plt.xlabel('Number of clusters (K)') plt.ylabel('Silhouette Score') plt.title('Silhouette Scores for varying K') plt.show()
可视化大屏设计
结合Spark分析结果和机器学习模型的输出,我们设计了一个可视化大屏,该大屏包括以下组件:
年龄分布图表:展示不同年龄段患者的死亡事件频率。
性别比例饼图:直观展示患者性别分布。
CPK水平对比图:通过箱线图展示糖尿病与非糖尿病患者的CPK水平对比。
时间序列图表:展示随时间变化的死亡事件数量。
射血分数分布图:条形图展示不同射血分数区间的患者数量。
血小板计数与死亡事件:通过堆叠条形图展示不同血小板计数范围的死亡事件数量。
糖尿病与死亡事件图表:散点图展示糖尿病患者的死亡事件数量。
import json from service.task_service import get_age_death, get_diabetes_cpk, get_death_sum, get_hypertension_prevalence, \ get_sex_ana, get_diabetes_death class SourceDataDemo: def __init__(self): self.title = '' a,b = get_sex_ana() self.counter = {'name': '女性患者数量', 'value': b} self.counter2 = {'name': '男性患者数量', 'value': a} self.echart1_data = { 'title': '患者年龄分布', 'data': get_age_death() } self.echart2_data = { 'title': '糖尿病与非糖尿病患者的(CPK)水平对比', 'data': get_diabetes_cpk() } self.echarts3_1_data = { 'title': '死亡情况', 'data': get_death_sum() } self.echarts3_2_data = { 'title': '性别分布', 'data': [ {"name": "男性", "value": a}, {"name": "女性", "value": b}, ] } self.echarts3_3_data = { 'title': '糖尿病情况', 'data': get_diabetes_death() } self.echart4_data = { 'title': '患者死亡时间', 'data': [ {"name": "女性", "value": [3, 4, 3, 4, 3, 4, 3, 6, 2, 4, 2, 4, 3, 4, 3, 4, 3, 4, 3, 6, 2, 4, 4]}, {"name": "男性", "value": [5, 3, 5, 6, 1, 5, 3, 5, 6, 4, 6, 4, 8, 3, 5, 6, 1, 5, 3, 7, 2, 5, 8]}, ], 'xAxis': ['01', '02', '03', '04', '05', '06', '07', '08', '09', '11', '12', '13', '14', '15', '16', '17', '18', '19', '20', '21', '22', '23', '24'], } self.echart5_data = { 'title': '不同射血分数区间的患者分布', 'data': get_hypertension_prevalence() } self.echart6_data = { 'title': '血小板计数与死亡事件的关系', 'data': get_diabetes_death() } self.map_1_data = { 'symbolSize': 100, 'data': [ {'name': '海门', 'value': 239}, {'name': '鄂尔多斯', 'value': 231}, {'name': '招远', 'value': 203}, ] } @property def echart1(self): data = self.echart1_data echart = { 'title': data.get('title'), 'xAxis': [i.get("name") for i in data.get('data')], 'series': [i.get("value") for i in data.get('data')] } return echart @property def echart2(self): data = self.echart2_data echart = { 'title': data.get('title'), 'xAxis': [i.get("name") for i in data.get('data')], 'series': [i.get("value") for i in data.get('data')] } return echart @property def echarts3_1(self): data = self.echarts3_1_data echart = { 'title': data.get('title'), 'xAxis': [i.get("name") for i in data.get('data')], 'data': data.get('data'), } return echart @property def echarts3_2(self): data = self.echarts3_2_data echart = { 'title': data.get('title'), 'xAxis': [i.get("name") for i in data.get('data')], 'data': data.get('data'), } return echart @property def echarts3_3(self): data = self.echarts3_3_data echart = { 'title': data.get('title'), 'xAxis': [i.get("name") for i in data.get('data')], 'data': data.get('data'), } return echart @property def echart4(self): data = self.echart4_data echart = { 'title': data.get('title'), 'names': [i.get("name") for i in data.get('data')], 'xAxis': data.get('xAxis'), 'data': data.get('data'), } return echart @property def echart5(self): data = self.echart5_data echart = { 'title': data.get('title'), 'xAxis': [i.get("name") for i in data.get('data')], 'series': [i.get("value") for i in data.get('data')], 'data': data.get('data'), } return echart @property def echart6(self): data = self.echart6_data echart = { 'title': data.get('title'), 'xAxis': [i.get("name") for i in data.get('data')], 'data': data.get('data'), } return echart @property def map_1(self): data = self.map_1_data echart = { 'symbolSize': data.get('symbolSize'), 'data': data.get('data'), } return echart class SourceData(SourceDataDemo): def __init__(self): """ 按照 SourceDataDemo 的格式覆盖数据即可 """ super().__init__() self.title = '心力衰竭数据可视化大屏' if __name__ == '__main__': sd = SourceData() print(sd.echart1())