基于spark的医疗大数据可视化大屏项目

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
简介: 基于spark的医疗大数据可视化大屏项目

基于spark的医疗大数据可视化大屏项目

项目背景

在当今的医疗领域,数据驱动的决策变得日益重要。心力衰竭作为常见的心血管疾病,其临床数据的分析对于改善患者治疗结果至关重要。本文将介绍如何利用Apache Spark进行大规模心力衰竭临床数据的分析,并结合机器学习模型,构建一个交互式的可视化大屏,以直观展示数据分析结果。

数据读取与清洗

使用PySpark库,我们首先读取CSV文件中的心力衰竭临床记录数据,并进行必要的数据清洗工作,包括处理缺失值和异常值。


from pyspark.sql import SparkSession

def read_data_csv(spark):
    df = spark.read.csv("heart_failure_clinical_records_dataset.csv", header=True, inferSchema=True)
    df.show()
    return df



数据分析

我们设计了多个SQL查询,以分析心力衰竭患者的不同临床特征:

1、患者年龄分布:分析不同年龄段患者的死亡事件频率。

def age_death(spark):

    df = spark.sql("""
    SELECT
    (CASE WHEN age >= 0 AND age < 50 THEN '0-49'
         WHEN age >= 50 AND age < 70 THEN '50-69'
         WHEN age >= 70 THEN '70+'
    END) AS age_group,
    sum(DEATH_EVENT) AS frequency
    FROM
        heart
    GROUP BY
    age_group;
    """)

    df.show()

    return df


2、性别比例:统计患者性别分布。

def sex_ana(spark):
    df = spark.sql("""
        SELECT
    sex,
    COUNT(*) AS count
FROM
    heart
GROUP BY
    sex;
        """)

    df.show()

    return df


3、糖尿病与CPK水平:对比糖尿病与非糖尿病患者的肌酸磷酸激酶(CPK)水平。

#3.糖尿病与非糖尿病患者的肌酸磷酸激酶(CPK)水平对比
def average_cpk(spark):
    df = spark.sql("""
    SELECT
    diabetes,
    AVG(creatinine_phosphokinase) AS average_cpk
FROM heart
GROUP BY diabetes;
        """)

    df.show()
    return df



4、死亡事件时间分布:统计每个时间段内的死亡事件数量。

#4.每个时间段的死亡事件数量
def death_sum(spark):
    df = spark.sql("""
        SELECT
        time,
        COUNT(*) AS death_count
    FROM heart
    WHERE death_event = 1
    GROUP BY time
    ORDER BY time;
            """)

    df.show()
    return df



5、射血分数区间分布:分析不同射血分数区间的患者数量。

def hypertension_prevalence(spark):
    df = spark.sql("""
            SELECT
        (CASE WHEN ejection_fraction < 30 THEN '< 30%'
             WHEN ejection_fraction >= 30 AND ejection_fraction < 40 THEN '30-39%'
             WHEN ejection_fraction >= 40 AND ejection_fraction < 50 THEN '40-49%'
             WHEN ejection_fraction >= 50 AND ejection_fraction < 60 THEN '50-59%'
             WHEN ejection_fraction >= 60 THEN '60% and Above'
            END) AS ef_range,
            COUNT(*) AS patient_count
        FROM heart
        GROUP BY ef_range
        ORDER BY ef_range;
                """)

    df.show()

    return df




6、血小板计数与死亡事件:探索血小板计数与死亡事件之间的关系。

#6.血小板计数与死亡事件的关系
def platelet_range_count(spark):
    df = spark.sql("""
                SELECT
    (CASE WHEN platelets < 100000 THEN '< 100000'
         WHEN platelets >= 100000 AND platelets < 150000 THEN '100000-150000'
         WHEN platelets >= 1500000 AND platelets < 300000 THEN '150000-300000'
         WHEN platelets >= 300000 AND platelets < 450000 THEN '300000-450000'
         WHEN platelets >= 450000 THEN '450000 and Above'
    END) AS platelet_range,
    SUM(CASE WHEN death_event = 1 THEN 1 ELSE 0 END) AS death_count
FROM heart
GROUP BY platelet_range;
                    """)

    df.show()

    return df


7、糖尿病与死亡事件:分析糖尿病患者的死亡事件数量。

#7.糖尿病与死亡事件的关系
def death_rate_diabetes(spark):
    df = spark.sql("""
        SELECT
      diabetes,
      SUM(death_event) AS death_events
    FROM
      heart
    GROUP BY
    diabetes;
                        """)

    df.show()

    return df


机器学习模型

使用Pandas和Scikit-learn库对数据进行预处理,并应用KMeans聚类算法来识别心力衰竭患者中的不同亚群。

数据预处理

我们选择特征列,使用StandardScaler进行数据标准化处理。

from sklearn.preprocessing import StandardScaler

scaler = StandardScaler()
scaled_features = scaler.fit_transform(features)

确定最优聚类数

通过Elbow Method和轮廓系数(Silhouette Score)分析,我们确定最佳的聚类数。

K_values = range(2, 50)
wcss_values = []

for K in K_values:
    # 创建KMeans实例
    kmeans = KMeans(n_clusters=K, random_state=2)
    # 拟合模型
    kmeans.fit(scaled_features)
    # 计算WCSS
    wcss = kmeans.inertia_
    wcss_values.append(wcss)

绘制WCSS与K值的关系图

K_values = range(2, 50)
wcss_values = []

for K in K_values:
    # 创建KMeans实例
    kmeans = KMeans(n_clusters=K, random_state=2)
    # 拟合模型
    kmeans.fit(scaled_features)
    # 计算WCSS
    wcss = kmeans.inertia_
    wcss_values.append(wcss)

# 绘制WCSS与K值的关系图
plt.plot(K_values, wcss_values, 'bo-')
plt.xlabel('Number of clusters (K)')
plt.ylabel('WCSS')
plt.title('The Elbow Method showing the optimal k')
plt.show()

绘制轮廓系数与K值的关系图

# 计算每个K值的轮廓系数
silhouette_scores = []
for K in K_values:
    kmeans = KMeans(n_clusters=K, random_state=42)
    kmeans.fit(scaled_features)
    labels = kmeans.labels_
    score = silhouette_score(scaled_features, labels)
    silhouette_scores.append(score)

# 绘制轮廓系数与K值的关系图
plt.plot(K_values, silhouette_scores, 'ro-')
plt.xlabel('Number of clusters (K)')
plt.ylabel('Silhouette Score')
plt.title('Silhouette Scores for varying K')
plt.show()

可视化大屏设计

结合Spark分析结果和机器学习模型的输出,我们设计了一个可视化大屏,该大屏包括以下组件:

年龄分布图表:展示不同年龄段患者的死亡事件频率。

性别比例饼图:直观展示患者性别分布。

CPK水平对比图:通过箱线图展示糖尿病与非糖尿病患者的CPK水平对比。

时间序列图表:展示随时间变化的死亡事件数量。

射血分数分布图:条形图展示不同射血分数区间的患者数量。

血小板计数与死亡事件:通过堆叠条形图展示不同血小板计数范围的死亡事件数量。

糖尿病与死亡事件图表:散点图展示糖尿病患者的死亡事件数量。

import json

from service.task_service import get_age_death, get_diabetes_cpk, get_death_sum, get_hypertension_prevalence, \
    get_sex_ana, get_diabetes_death


class SourceDataDemo:

    def __init__(self):
        self.title = ''
        a,b = get_sex_ana()
        self.counter = {'name': '女性患者数量', 'value': b}
        self.counter2 = {'name': '男性患者数量', 'value': a}
        self.echart1_data = {
            'title': '患者年龄分布',
            'data': get_age_death()
        }
        self.echart2_data = {
            'title': '糖尿病与非糖尿病患者的(CPK)水平对比',
            'data': get_diabetes_cpk()
        }
        self.echarts3_1_data = {
            'title': '死亡情况',
            'data': get_death_sum()
        }
        self.echarts3_2_data = {
            'title': '性别分布',
            'data': [
                {"name": "男性", "value": a},
                {"name": "女性", "value": b},
            ]
        }
        self.echarts3_3_data = {
            'title': '糖尿病情况',
            'data': get_diabetes_death()
        }
        self.echart4_data = {
            'title': '患者死亡时间',
            'data': [
                {"name": "女性", "value": [3, 4, 3, 4, 3, 4, 3, 6, 2, 4, 2, 4, 3, 4, 3, 4, 3, 4, 3, 6, 2, 4, 4]},
                {"name": "男性", "value": [5, 3, 5, 6, 1, 5, 3, 5, 6, 4, 6, 4, 8, 3, 5, 6, 1, 5, 3, 7, 2, 5, 8]},
            ],
            'xAxis': ['01', '02', '03', '04', '05', '06', '07', '08', '09', '11', '12', '13', '14', '15', '16', '17',
                      '18', '19', '20', '21', '22', '23', '24'],
        }
        self.echart5_data = {
            'title': '不同射血分数区间的患者分布',
            'data': get_hypertension_prevalence()
        }

        self.echart6_data = {
            'title': '血小板计数与死亡事件的关系',
            'data': get_diabetes_death()
        }
        self.map_1_data = {
            'symbolSize': 100,
            'data': [
                {'name': '海门', 'value': 239},
                {'name': '鄂尔多斯', 'value': 231},
                {'name': '招远', 'value': 203},
            ]
        }

    @property
    def echart1(self):
        data = self.echart1_data
        echart = {
            'title': data.get('title'),
            'xAxis': [i.get("name") for i in data.get('data')],
            'series': [i.get("value") for i in data.get('data')]
        }
        return echart

    @property
    def echart2(self):
        data = self.echart2_data
        echart = {
            'title': data.get('title'),
            'xAxis': [i.get("name") for i in data.get('data')],
            'series': [i.get("value") for i in data.get('data')]
        }
        return echart

    @property
    def echarts3_1(self):
        data = self.echarts3_1_data
        echart = {
            'title': data.get('title'),
            'xAxis': [i.get("name") for i in data.get('data')],
            'data': data.get('data'),
        }
        return echart

    @property
    def echarts3_2(self):
        data = self.echarts3_2_data
        echart = {
            'title': data.get('title'),
            'xAxis': [i.get("name") for i in data.get('data')],
            'data': data.get('data'),
        }
        return echart

    @property
    def echarts3_3(self):
        data = self.echarts3_3_data
        echart = {
            'title': data.get('title'),
            'xAxis': [i.get("name") for i in data.get('data')],
            'data': data.get('data'),
        }
        return echart

    @property
    def echart4(self):
        data = self.echart4_data
        echart = {
            'title': data.get('title'),
            'names': [i.get("name") for i in data.get('data')],
            'xAxis': data.get('xAxis'),
            'data': data.get('data'),
        }
        return echart

    @property
    def echart5(self):
        data = self.echart5_data
        echart = {
            'title': data.get('title'),
            'xAxis': [i.get("name") for i in data.get('data')],
            'series': [i.get("value") for i in data.get('data')],
            'data': data.get('data'),
        }
        return echart

    @property
    def echart6(self):
        data = self.echart6_data
        echart = {
            'title': data.get('title'),
            'xAxis': [i.get("name") for i in data.get('data')],
            'data': data.get('data'),
        }
        return echart

    @property
    def map_1(self):
        data = self.map_1_data
        echart = {
            'symbolSize': data.get('symbolSize'),
            'data': data.get('data'),
        }
        return echart


class SourceData(SourceDataDemo):

    def __init__(self):
        """
        按照 SourceDataDemo 的格式覆盖数据即可
        """
        super().__init__()
        self.title = '心力衰竭数据可视化大屏'


if __name__ == '__main__':
    sd = SourceData()
    print(sd.echart1())


相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
SaaS 模式云数据仓库必修课
本课程由阿里云开发者社区和阿里云大数据团队共同出品,是SaaS模式云原生数据仓库领导者MaxCompute核心课程。本课程由阿里云资深产品和技术专家们从概念到方法,从场景到实践,体系化的将阿里巴巴飞天大数据平台10多年的经过验证的方法与实践深入浅出的讲给开发者们。帮助大数据开发者快速了解并掌握SaaS模式的云原生的数据仓库,助力开发者学习了解先进的技术栈,并能在实际业务中敏捷的进行大数据分析,赋能企业业务。 通过本课程可以了解SaaS模式云原生数据仓库领导者MaxCompute核心功能及典型适用场景,可应用MaxCompute实现数仓搭建,快速进行大数据分析。适合大数据工程师、大数据分析师 大量数据需要处理、存储和管理,需要搭建数据仓库?学它! 没有足够人员和经验来运维大数据平台,不想自建IDC买机器,需要免运维的大数据平台?会SQL就等于会大数据?学它! 想知道大数据用得对不对,想用更少的钱得到持续演进的数仓能力?获得极致弹性的计算资源和更好的性能,以及持续保护数据安全的生产环境?学它! 想要获得灵活的分析能力,快速洞察数据规律特征?想要兼得数据湖的灵活性与数据仓库的成长性?学它! 出品人:阿里云大数据产品及研发团队专家 产品 MaxCompute 官网 https://www.aliyun.com/product/odps&nbsp;
相关文章
|
9天前
|
机器学习/深度学习 分布式计算 算法
Spark快速大数据分析PDF下载读书分享推荐
《Spark快速大数据分析》适合初学者,聚焦Spark实用技巧,同时深入核心概念。作者团队来自Databricks,书中详述Spark 3.0新特性,结合机器学习展示大数据分析。Spark是大数据分析的首选工具,本书助你驾驭这一利器。[PDF下载链接][1]。 ![Spark Book Cover][2] [1]: https://zhangfeidezhu.com/?p=345 [2]: https://i-blog.csdnimg.cn/direct/6b851489ad1944548602766ea9d62136.png#pic_center
25 1
Spark快速大数据分析PDF下载读书分享推荐
|
28天前
|
存储 分布式计算 Hadoop
Spark和Hadoop都是大数据处理领域的重要工具
【6月更文挑战第17天】Spark和Hadoop都是大数据处理领域的重要工具
124 59
|
11天前
|
分布式计算 大数据 Spark
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
《Spark大数据处理:技术、应用与性能优化》深入浅出介绍Spark核心,涵盖部署、实战与性能调优,适合初学者。作者基于微软和IBM经验,解析Spark工作机制,探讨BDAS生态,提供实践案例,助力快速掌握。书中亦讨论性能优化策略。[PDF下载链接](https://zhangfeidezhu.com/?p=347)。![Spark Web UI](https://img-blog.csdnimg.cn/direct/16aaadbb4e13410f8cb2727c3786cc9e.png#pic_center)
26 1
Spark大数据处理:技术、应用与性能优化(全)PDF书籍推荐分享
|
14天前
|
分布式计算 DataWorks 调度
DataWorks产品使用合集之ODPS Spark找不到自己的stdout,该如何解决
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
24 2
|
21天前
|
分布式计算 DataWorks MaxCompute
MaxCompute操作报错合集之在Spark访问OSS时出现证书错误的问题,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
21天前
|
机器学习/深度学习 分布式计算 DataWorks
MaxCompute产品使用问题之有什么命令可以看到当前账号拥有哪些项目的什么权限
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
21天前
|
存储 SQL 分布式计算
MaxCompute产品使用问题之如何查看项目空间耗用的存储大小
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
21天前
|
SQL 分布式计算 大数据
MaxCompute操作报错合集之使用spark.sql执行rename分区操作,遇到任务报错退出的情况,该怎么办
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。
|
21天前
|
机器学习/深度学习 分布式计算 DataWorks
MaxCompute产品使用问题之一个项目只能绑定一个dataworks工作空间吗
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
1月前
|
分布式计算 大数据 数据处理
Apache Spark在大数据处理中的应用
Apache Spark是大数据处理的热门工具,由AMPLab开发并捐赠给Apache软件基金会。它以内存计算和优化的执行引擎著称,提供比Hadoop更快的处理速度,支持批处理、交互式查询、流处理和机器学习。Spark架构包括Driver、Master、Worker Node和Executor,核心组件有RDD、DataFrame、Dataset、Spark SQL、Spark Streaming、MLlib和GraphX。文章通过代码示例展示了Spark在批处理、交互式查询和实时数据处理中的应用,并讨论了其优势(高性能、易用性、通用性和集成性)和挑战。【6月更文挑战第11天】
50 6