基于Spark Streaming对新闻网站项目案例分析

简介: 笔记

一、需求分析


新闻网站需求:

  • pv
  • uv
  • 注册用户数
  • 热门板块

数据处理流程:

数据源  -> kafka  ->  spark streaming


二、数据准备


(1)数据格式

网站日志格式 :

date,timestamp,userid,pageid,section,action

日志字段说明:

date: 日期,yyyy-MM-dd格式 
timestamp: 时间戳 
userid: 用户id 
pageid: 页面id 
section: 版块 
action: 用户行为,两类,点击页面和注册

数据展示:

2020-12-20 1608451521565 364 422 fashion view
2020-12-20 1608451521565 38682 708 aviation view
2020-12-20 1608451521565 65444 270 internet view
2020-12-20 1608451521565 4805 250 tv-show view
2020-12-20 1608451521565 1130 743 movie view
2020-12-20 1608451521565 85320 605 carton view
2020-12-20 1608451521565 null 581 movie view
2020-12-20 1608451521565 null null null register

kafka消费者启动:

bin/kafka-console-consumer.sh --bootstrap-server bigdata-pro-m04:9092 --topic spark

(2)基于Java开发实时数据生成器

这里生成的实时数据流,生成的传递给kafka,每隔1秒随机生成1000条数据。

package com.kfk.spark.news_analysis_project;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Properties;
import java.util.Random;
/**
 * 访问日志Kafka Producer
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/12
 * @time : 7:51 下午
 */
public class AccessProducer extends Thread{
    private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
    private static String date;
    // 版块内容
    private static String[] sections = new String[] {"country", "international", "sport",
            "entertainment", "movie", "carton",
            "tv-show", "technology", "internet",
            "car", "military", "funny",
            "fashion", "aviation", "government"};
    private static Random random = new Random();
    private static int[] newOldUserArr = new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
    private Producer<String,String> producer;
    private String topic;
    /**
     * 构造函数
     * @param topic
     */
    public AccessProducer(String topic){
        this.topic = topic;
        producer = new KafkaProducer<String, String>(createProducerConfig());
        date = simpleDateFormat.format(new Date());
    }
    /**
     * createProducerConfig
     * @return
     */
    public Properties createProducerConfig(){
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("bootstrap.servers", "bigdata-pro-m04:9092");
        return properties;
    }
    @Override
    public void run(){
        int counter = 0;
        while (true){
            // 生成1000条访问数据
            for (int i = 0;i < 1000;i++){
                String log = null;
                // 生成条访问数据
                if (newOldUserArr[random.nextInt(10)] == 1){
                    log = getAccessLog();
                } else {
                    log = getRegisterLog();
                }
                // 将数据发送给kafka
                producer.send(new ProducerRecord<String, String>(topic,log));
                counter++;
                if (counter == 100){
                    counter = 0;
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }
    }
    /**
     * 生成注册数据
     * @return
     */
    private static String getRegisterLog(){
        StringBuffer stringBuffer = new StringBuffer("");
        // 生成时间戳
        long timestamp = System.currentTimeMillis();
        // 随机生成userid(默认1000注册用户,每天1/10的访客是未注册用户)
        Long userid = 0L;
        int newOldUser = newOldUserArr[random.nextInt(10)];
        if (newOldUser == 1){
            userid = null;
        } else {
            userid = (long)random.nextInt(100000);
        }
        // 随机生成pageid,共1000个页面
        long pageid = random.nextInt(1000);
        // 随机生成板块
        String section = sections[random.nextInt(sections.length)];
        // 生成固定的行为,view
        String action = "view";
        return stringBuffer.append(date).append(" ")
                .append(timestamp).append(" ")
                .append(userid).append(" ")
                .append(pageid).append(" ")
                .append(section).append(" ")
                .append(action).toString();
    }
    /**
     * 生成访问数据
     * @return
     */
    private static String getAccessLog(){
        StringBuffer stringBuffer = new StringBuffer("");
        // 生成时间戳
        long timestamp = System.currentTimeMillis();
        // 新用户都是userid为null
        Long userid = null;
        // 生成随机pageid,都是null
        Long pageid = null;
        // 生成随机版块,都是null
        String section = null;
        // 生成固定的行为,view
        String action = "register";
        return stringBuffer.append(date).append(" ")
                .append(timestamp).append(" ")
                .append(userid).append(" ")
                .append(pageid).append(" ")
                .append(section).append(" ")
                .append(action).toString();
    }
    public static void main(String[] args) {
        AccessProducer accessProducer = new AccessProducer("spark");
        accessProducer.start();
    }
}


三、实施过程


将每一个需求写入到一个方法中,运行流程分析及结果以注释的形式显示。

关于数据模型,可以参考基于Spark SQL对新闻网站项目案例分析这篇文章

https://blog.csdn.net/weixin_45366499/article/details/111119234

package com.kfk.spark.common;
import org.apache.spark.SparkConf;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/14
 * @time : 8:23 下午
 */
public class CommStreamingContext {
    public static JavaStreamingContext getJssc(){
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("CommStreamingContext");
        return new JavaStreamingContext(conf, Durations.seconds(5));
    }
}
package com.kfk.spark.news_analysis_project;
import com.kfk.spark.common.CommStreamingContext;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaInputDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka010.ConsumerStrategies;
import org.apache.spark.streaming.kafka010.KafkaUtils;
import org.apache.spark.streaming.kafka010.LocationStrategies;
import scala.Tuple2;
import java.util.*;
/**
 * @author : 蔡政洁
 * @email :caizhengjie888@icloud.com
 * @date : 2020/12/20
 * @time : 4:11 下午
 */
public class NewsRealTime {
    /**
     * input data:
     * 2020-12-20 1608451521565 364 422 fashion view
     * 2020-12-20 1608451521565 38682 708 aviation view
     * ...
     * @param args
     */
    public static void main(String[] args) throws InterruptedException {
        JavaStreamingContext jssc = CommStreamingContext.getJssc();
        // sparkstreaming与kafka连接
        Map<String, Object> kafkaParams = new HashMap<>();
        kafkaParams.put("bootstrap.servers", "bigdata-pro-m04:9092");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", "streaming_kafka_1");
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);
        // 设置topic
        Collection<String> topics = Collections.singletonList("spark");
        // kafka数据源
        JavaInputDStream<ConsumerRecord<String, String>> stream =
                KafkaUtils.createDirectStream(
                        jssc,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
                );
        /**
         * stream -> map -> JavaDStream
         */
        JavaDStream<String> accessDstream = stream.map(new Function<ConsumerRecord<String, String>, String>() {
            @Override
            public String call(ConsumerRecord<String, String> v1) throws Exception {
                return v1.value();
            }
        });
        /**
         * accessDStream -> filter -> action(view)
         */
        JavaDStream<String> filterDstream = accessDstream.filter(new Function<String, Boolean>() {
            @Override
            public Boolean call(String v1) throws Exception {
                String[] lines = v1.split(" ");
                String action = lines[5];
                String actionValue = "view";
                if (actionValue.equals(action)){
                    return true;
                } else {
                    return false;
                }
            }
        });
        // 求网页的pv
        calculatePagePV(filterDstream);
        // 求网页的uv
        calculatePageUV(filterDstream);
        // 求注册用户数
        calculateRegistercount(accessDstream);
        // 求热门板块
        calculateUserSectionPV(accessDstream);
        jssc.start();
        jssc.awaitTermination();
    }
    /**
     * 求网页的pv
     * input data:
     * 2020-12-20 1608451521565 364 422 fashion view
     *
     * 数据演化过程:
     * filterDstream -> mapToPair -> <2020-12-20_422,1> -> reduceByKey -> <2020-12-20_422,5>
     *
     * @param filterDstream
     */
    public static void calculatePagePV(JavaDStream<String> filterDstream){
        /**
         * filterDstream -> mapToPair -> <2020-12-20_422,1>
         */
        JavaPairDStream<String,Integer> pairDstream = filterDstream.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String,Integer> call(String lines) throws Exception {
                String[] line = lines.split(" ");
                return new Tuple2<String,Integer>(line[0] + "_" + line[3], 1);
            }
        });
        /**
         * pairDstream -> reduceByKey -> <2020-12-20_422,5>
         */
        JavaPairDStream<String,Integer> pvStream = pairDstream.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1+v2;
            }
        });
        pvStream.print();
        /**
         * (2020-12-21_16,1)
         * (2020-12-21_548,1)
         * (2020-12-21_881,1)
         * (2020-12-21_27,1)
         * (2020-12-21_771,1)
         * (2020-12-21_344,2)
         * (2020-12-21_313,1)
         * (2020-12-21_89,1)
         * (2020-12-21_14,1)
         * (2020-12-21_366,1)
         * ...
         */
    }
    /**
     * 求网页的uv
     * input data:
     * 2020-12-20 1608451521565 364 422 fashion view
     * 2020-12-20 1608451521565 364 422 fashion view
     * 2020-12-20 1608451521565 365 422 fashion view
     * 2020-12-20 1608451521565 366 422 fashion view
     * 2020-12-20 1608451521565 367 422 fashion view
     * 2020-12-20 1608451521565 367 453 fashion view
     *
     * 数据演化过程:
     * 第一步:map
     * (2020-12-20,364,422)
     * (2020-12-20,364,422)
     * (2020-12-20,365,422)
     * (2020-12-20,366,422)
     * (2020-12-20,367,422)
     * (2020-12-20,367,453)
     *
     * 第二步:rdd -> distinct
     * (2020-12-20,364,422)
     * (2020-12-20,365,422)
     * (2020-12-20,366,422)
     * (2020-12-20,367,422)
     * (2020-12-20,367,453)
     *
     * 第三步:mapToPair
     * <2020-12-20_422,1>
     * <2020-12-20_422,1>
     * <2020-12-20_422,1>
     * <2020-12-20_422,1>
     * <2020-12-20_453,1>
     *
     * 第四步:reduceByKey
     * <2020-12-20_422,4>
     * <2020-12-20_453,1>
     *
     * @param filterDstream
     */
    public static void calculatePageUV(JavaDStream<String> filterDstream){
        /**
         * filterDstream -> map -> (2020-12-20,364,422)
         */
        JavaDStream<String> mapDstream = filterDstream.map(new Function<String, String>() {
            @Override
            public String call(String lines) throws Exception {
                String[] line = lines.split(" ");
                return line[0] + "," + line[2] + "," + line[3];
            }
        });
        /**
         * mapDstream -> distinct
         */
        JavaDStream<String> distinctDstream = mapDstream.transform(new Function<JavaRDD<String>, JavaRDD<String>>() {
            @Override
            public JavaRDD<String> call(JavaRDD<String> lines) throws Exception {
                return lines.distinct();
            }
        });
        /**
         * distinctDstream -> mapToPair -> <2020-12-20_422,1>
         */
        JavaPairDStream<String,Integer> pairDstream = distinctDstream.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String lines) throws Exception {
                String[] line = lines.split(",");
                return new Tuple2<>(line[0] + "_" + line[2], 1);
            }
        });
        /**
         * pairDstream -> reduceByKey -> <2020-12-20_422,4>
         */
        JavaPairDStream<String,Integer> uvStream = pairDstream.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1+v2;
            }
        });
        uvStream.print();
        /**
         * (2020-12-21_492,1)
         * (2020-12-21_85,2)
         * (2020-12-21_18,1)
         * (2020-12-21_27,2)
         * (2020-12-21_825,1)
         * (2020-12-21_366,1)
         * (2020-12-21_89,1)
         * (2020-12-21_14,2)
         * (2020-12-21_69,1)
         * (2020-12-21_188,1)
         * ...
         */
    }
    /**
     * 求注册用户数:过滤出action=register的数据就可以
     * input data:
     * 2020-12-20 1608451521565 364 422 fashion view
     *
     * 数据演化过程:
     * accessDStream -> filter -> action(register) -> mapToPair -> reduceByKey
     *
     * @param accessDstream
     */
    public static void calculateRegistercount(JavaDStream<String> accessDstream){
        /**
         * accessDStream -> filter -> action(register)
         */
        JavaDStream<String> filterDstream = accessDstream.filter(new Function<String, Boolean>() {
            @Override
            public Boolean call(String v1) throws Exception {
                String[] lines = v1.split(" ");
                String action = lines[5];
                String actionValue = "register";
                if (actionValue.equals(action)){
                    return true;
                } else {
                    return false;
                }
            }
        });
        /**
         * filterDstream -> mapToPair -> <2020-12-20_register,1>
         */
        JavaPairDStream<String,Integer> pairDstream = filterDstream.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String,Integer> call(String lines) throws Exception {
                String[] line = lines.split(" ");
                return new Tuple2<String,Integer>(line[0] + "_" + line[5], 1);
            }
        });
        /**
         * pairDstream -> reduceByKey -> <2020-12-20_register,5>
         */
        JavaPairDStream<String,Integer> registerCountStream = pairDstream.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1+v2;
            }
        });
        registerCountStream.print();
        /**
         * (2020-12-21_register,11)
         */
    }
    /**
     * 求出热门板块
     * input data:
     * 2020-12-20 1608451521565 364 422 fashion view
     *
     * 数据演化过程:
     * filterDstream -> mapToPair -> <2020-12-20_fashion,1> -> reduceByKey -> <2020-12-20_fashion,5>
     *
     * @param filterDstream
     */
    public static void calculateUserSectionPV(JavaDStream<String> filterDstream){
        /**
         * filterDstream -> mapToPair -> <2020-12-20_fashion,1>
         */
        JavaPairDStream<String,Integer> pairDstream = filterDstream.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String,Integer> call(String lines) throws Exception {
                String[] line = lines.split(" ");
                return new Tuple2<String,Integer>(line[0] + "_" + line[4], 1);
            }
        });
        /**
         * pairDstream -> reduceByKey -> <2020-12-20_fashion,5>
         */
        JavaPairDStream<String,Integer> pvStream = pairDstream.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1+v2;
            }
        });
        pvStream.print();
        /**
         * (2020-12-21_internet,16)
         * (2020-12-21_military,24)
         * (2020-12-21_aviation,21)
         * (2020-12-21_carton,19)
         * (2020-12-21_government,25)
         * (2020-12-21_tv-show,19)
         * (2020-12-21_country,14)
         * (2020-12-21_movie,13)
         * (2020-12-21_international,16)
         * ...
         */
    }
}

这里没有做过多的业务分析,可以根据前面所学的知识进行扩展,将算子灵活运用组合起来!


相关文章
|
1月前
|
存储 分布式计算 算法
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
大数据-106 Spark Graph X 计算学习 案例:1图的基本计算、2连通图算法、3寻找相同的用户
63 0
|
1月前
|
消息中间件 分布式计算 NoSQL
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
大数据-104 Spark Streaming Kafka Offset Scala实现Redis管理Offset并更新
42 0
|
1月前
|
消息中间件 存储 分布式计算
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
大数据-103 Spark Streaming Kafka Offset管理详解 Scala自定义Offset
92 0
|
29天前
|
SQL 分布式计算 Serverless
EMR Serverless Spark:一站式全托管湖仓分析利器
本文根据2024云栖大会阿里云 EMR 团队负责人李钰(绝顶) 演讲实录整理而成
125 2
|
15天前
|
分布式计算 流计算 Spark
【赵渝强老师】Spark Streaming中的DStream
本文介绍了Spark Streaming的核心概念DStream,即离散流。DStream通过时间间隔将连续的数据流转换为一系列不连续的RDD,再通过Transformation进行转换,实现流式数据的处理。文中以MyNetworkWordCount程序为例,展示了DStream生成RDD的过程,并附有视频讲解。
|
1月前
|
设计模式 数据采集 分布式计算
企业spark案例 —出租车轨迹分析
企业spark案例 —出租车轨迹分析
78 0
|
1月前
|
消息中间件 分布式计算 Kafka
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
大数据-102 Spark Streaming Kafka ReceiveApproach DirectApproach 附带Producer、DStream代码案例
56 0
|
消息中间件 分布式计算 Kafka
195 Spark Streaming整合Kafka完成网站点击流实时统计
195 Spark Streaming整合Kafka完成网站点击流实时统计
80 0
|
消息中间件 分布式计算 Kafka
大数据Spark Structured Streaming集成 Kafka
大数据Spark Structured Streaming集成 Kafka
130 0
|
消息中间件 分布式计算 Kafka
大数据Spark Streaming集成Kafka
大数据Spark Streaming集成Kafka
124 0
下一篇
无影云桌面