(3)sparkstreaming从kafka接入实时数据流最终实现数据可视化展示

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
RDS MySQL Serverless 高可用系列,价值2615元额度,1个月
简介: 1)我们通过kafka与各个业务系统的数据对接,将各系统中的数据实时接到kafka;2)通过sparkstreaming接入kafka数据流,定义时间窗口和计算窗口大小,业务计算逻辑处理;3)将结果数据写入到mysql;4)通过可视化平台接入mysql数据库,这里使用的是NBI大数据可视化构建平台;5)在平台上通过拖拽式构建各种数据应用,数据展示;

(1)sparkstreaming从kafka接入实时数据流最终实现数据可视化展示,我们先看下整体方案架构:
image.png

(2)方案说明:
1)我们通过kafka与各个业务系统的数据对接,将各系统中的数据实时接到kafka;
2)通过sparkstreaming接入kafka数据流,定义时间窗口和计算窗口大小,业务计算逻辑处理;
3)将结果数据写入到mysql;
4)通过可视化平台接入mysql数据库,这里使用的是NBI大数据可视化构建平台;
5)在平台上通过拖拽式构建各种数据应用,数据展示;
(3)代码演示:
定义一个kafka生产者,模拟数据源

package com.producers;

import com.alibaba.fastjson.JSONObject;
import com.pojo.WaterSensor;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;
import java.util.Random;

/**
 * Created by lj on 2022-07-18.
 */
public class Kafaka_Producer {
   
    public final static String bootstrapServers = "127.0.0.1:9092";

    public static void main(String[] args) {
   
        Properties props = new Properties();
        //设置Kafka服务器地址
        props.put("bootstrap.servers", bootstrapServers);
        //设置数据key的序列化处理类
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //设置数据value的序列化处理类
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<>(props);

        try {
   
            int i = 0;
            Random r=new Random();  
            String[] lang = {
   "flink","spark","hadoop","hive","hbase","impala","presto","superset","nbi"};

            while(true) {
   
                Thread.sleep(2000);
                WaterSensor waterSensor = new WaterSensor(lang[r.nextInt(lang.length)]+"_kafka",i,i);
                i++;

                String msg = JSONObject.toJSONString(waterSensor);
                System.out.println(msg);
                RecordMetadata recordMetadata = producer.send(new ProducerRecord<>("kafka_data_waterSensor", null, null,  msg)).get();
//                System.out.println("recordMetadata: {"+ recordMetadata +"}");
            }

        } catch (Exception e) {
   
            System.out.println(e.getMessage());
        }
    }
}

根据业务需要,定义各种消息对象

package com.pojo;

import java.io.Serializable;
import java.util.Date;

/**
 * Created by lj on 2022-07-13.
 */
public class WaterSensor implements Serializable {
   
    public String id;
    public long ts;
    public int vc;

    public WaterSensor(){
   

    }

    public WaterSensor(String id,long ts,int vc){
   
        this.id = id;
        this.ts = ts;
        this.vc = vc;
    }

    public int getVc() {
   
        return vc;
    }

    public void setVc(int vc) {
   
        this.vc = vc;
    }

    public String getId() {
   
        return id;
    }

    public void setId(String id) {
   
        this.id = id;
    }

    public long getTs() {
   
        return ts;
    }

    public void setTs(long ts) {
   
        this.ts = ts;
    }
}

sparkstreaming数据流计算

package com.examples;

import com.alibaba.fastjson.JSONObject;
import com.pojo.WaterSensor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.VoidFunction2;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.Time;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;

import java.util.*;

/**
 * Created by lj on 2022-07-18.
 */
public class SparkSql_Kafka {
   
    private static String appName = "spark.streaming.demo";
    private static String master = "local[*]";
    private static String topics = "kafka_data_waterSensor";
    private static String brokers = "127.0.0.1:9092";

    public static void main(String[] args) {
   
        //初始化sparkConf
        SparkConf sparkConf = new SparkConf().setMaster(master).setAppName(appName);

        //获得JavaStreamingContext
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.minutes(3));

        /**
         * 设置日志的级别: 避免日志重复
         */
        ssc.sparkContext().setLogLevel("ERROR");

        Collection<String> topicsSet = new HashSet<>(Arrays.asList(topics.split(",")));
        //kafka相关参数,必要!缺了会报错
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("metadata.broker.list", brokers) ;
        kafkaParams.put("bootstrap.servers", brokers);
        kafkaParams.put("group.id", "group1");
        kafkaParams.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //通过KafkaUtils.createDirectStream(...)获得kafka数据,kafka相关参数由kafkaParams指定
        JavaInputDStream<ConsumerRecord<Object,Object>> lines = KafkaUtils.createDirectStream(
                ssc,
                LocationStrategies.PreferConsistent(),
                ConsumerStrategies.Subscribe(topicsSet, kafkaParams)
        );

        JavaDStream<WaterSensor> mapDStream = lines.map(new Function<ConsumerRecord<Object, Object>, WaterSensor>() {
   
            @Override
            public WaterSensor call(ConsumerRecord<Object, Object> s) throws Exception {
   
                WaterSensor waterSensor = JSONObject.parseObject(s.value().toString(),WaterSensor.class);
                return waterSensor;
            }
        }).window(Durations.minutes(9), Durations.minutes(6));      //指定窗口大小 和 滑动频率 必须是批处理时间的整数倍;

        mapDStream.foreachRDD(new VoidFunction2<JavaRDD<WaterSensor>, Time>() {
   
            @Override
            public void call(JavaRDD<WaterSensor> waterSensorJavaRDD, Time time) throws Exception {
   
                SparkSession spark = JavaSparkSessionSingleton.getInstance(waterSensorJavaRDD.context().getConf());

                Dataset<Row> dataFrame = spark.createDataFrame(waterSensorJavaRDD, WaterSensor.class);
                // 创建临时表
                dataFrame.createOrReplaceTempView("log");
                Dataset<Row> result = spark.sql("select * from log");
                System.out.println("========= " + time + "=========");
                //输出前20条数据
                result.show();

                //数据写入mysql
                writeDataToMysql(result);
            }
        });

        //开始作业
        ssc.start();
        try {
   
            ssc.awaitTermination();
        } catch (Exception e) {
   
            e.printStackTrace();
        } finally {
   
            ssc.close();
        }
    }
}

NBI大数据可视化构建平台对接mysql,构建数据应用:
image.png
image.png

NBI可视化

相关文章
|
8月前
|
消息中间件 分布式计算 Kafka
SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)
SparkStreaming(SparkStreaming概述、入门、Kafka数据源、DStream转换、输出、关闭)(一)
114 5
|
3月前
|
消息中间件 监控 Kafka
Apache Kafka 成为实时数据流处理的关键组件
【10月更文挑战第8天】随着大数据技术的发展,Apache Kafka 成为实时数据流处理的关键组件。Kafka Manager 提供了一个简洁易用的 Web 界面,方便管理和监控 Kafka 集群。本文详细介绍了 Kafka Manager 的部署步骤和基本使用方法,包括配置文件修改、启动服务、创建和管理 Topic 等操作,帮助你快速上手。
66 3
|
2月前
|
消息中间件 监控 Kafka
Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面
随着大数据技术的发展,Apache Kafka 成为处理实时数据流的关键组件。Kafka Manager 提供了一个简洁的 Web 界面,方便管理和监控 Kafka 集群。本文详细介绍了 Kafka Manager 的部署步骤和基本使用方法,包括配置文件的修改、启动命令、API 示例代码等,帮助你快速上手并有效管理 Kafka 集群。
58 0
|
5月前
|
消息中间件 Java Kafka
Kafka不重复消费的终极秘籍!解锁幂等性、偏移量、去重神器,让你的数据流稳如老狗,告别数据混乱时代!
【8月更文挑战第24天】Apache Kafka作为一款领先的分布式流处理平台,凭借其卓越的高吞吐量与低延迟特性,在大数据处理领域中占据重要地位。然而,在利用Kafka进行数据处理时,如何有效避免重复消费成为众多开发者关注的焦点。本文深入探讨了Kafka中可能出现重复消费的原因,并提出了四种实用的解决方案:利用消息偏移量手动控制消费进度;启用幂等性生产者确保消息不被重复发送;在消费者端实施去重机制;以及借助Kafka的事务支持实现精确的一次性处理。通过这些方法,开发者可根据不同的应用场景灵活选择最适合的策略,从而保障数据处理的准确性和一致性。
390 9
|
5月前
|
消息中间件 负载均衡 Java
"深入Kafka核心:探索高效灵活的Consumer机制,以Java示例展示数据流的优雅消费之道"
【8月更文挑战第10天】在大数据领域,Apache Kafka凭借其出色的性能成为消息传递与流处理的首选工具。Kafka Consumer作为关键组件,负责优雅地从集群中提取并处理数据。它支持消息的负载均衡与容错,通过Consumer Group实现消息的水平扩展。下面通过一个Java示例展示如何启动Consumer并消费数据,同时体现了Kafka Consumer设计的灵活性与高效性,使其成为复杂消费场景的理想选择。
145 4
|
5月前
|
vr&ar 图形学 开发者
步入未来科技前沿:全方位解读Unity在VR/AR开发中的应用技巧,带你轻松打造震撼人心的沉浸式虚拟现实与增强现实体验——附详细示例代码与实战指南
【8月更文挑战第31天】虚拟现实(VR)和增强现实(AR)技术正深刻改变生活,从教育、娱乐到医疗、工业,应用广泛。Unity作为强大的游戏开发引擎,适用于构建高质量的VR/AR应用,支持Oculus Rift、HTC Vive、Microsoft HoloLens、ARKit和ARCore等平台。本文将介绍如何使用Unity创建沉浸式虚拟体验,包括设置项目、添加相机、处理用户输入等,并通过具体示例代码展示实现过程。无论是完全沉浸式的VR体验,还是将数字内容叠加到现实世界的AR应用,Unity均提供了所需的一切工具。
196 0
|
7月前
|
消息中间件 数据挖掘 Kafka
使用 Flume 将 CSV 数据导入 Kafka:实现实时数据流
使用 Flume 将 CSV 数据导入 Kafka:实现实时数据流
170 2
|
7月前
|
消息中间件 关系型数据库 MySQL
使用Flink实现Kafka到MySQL的数据流转换:一个基于Flink的实践指南
使用Flink实现Kafka到MySQL的数据流转换:一个基于Flink的实践指南
670 1
|
7月前
|
消息中间件 分布式计算 Kafka
利用Spark将Kafka数据流写入HDFS
利用Spark将Kafka数据流写入HDFS
111 0
|
7月前
|
消息中间件 关系型数据库 MySQL
使用Flink实现MySQL到Kafka的数据流转换
使用Flink实现MySQL到Kafka的数据流转换
133 0