一、需求分析
新闻网站需求:
- pv
- uv
- 注册用户数
- 热门板块
数据处理流程:
数据源 -> kafka -> spark streaming
二、数据准备
(1)数据格式
网站日志格式 :
date,timestamp,userid,pageid,section,action
日志字段说明:
date: 日期,yyyy-MM-dd格式 timestamp: 时间戳 userid: 用户id pageid: 页面id section: 版块 action: 用户行为,两类,点击页面和注册
数据展示:
2020-12-20 1608451521565 364 422 fashion view 2020-12-20 1608451521565 38682 708 aviation view 2020-12-20 1608451521565 65444 270 internet view 2020-12-20 1608451521565 4805 250 tv-show view 2020-12-20 1608451521565 1130 743 movie view 2020-12-20 1608451521565 85320 605 carton view 2020-12-20 1608451521565 null 581 movie view 2020-12-20 1608451521565 null null null register
kafka消费者启动:
bin/kafka-console-consumer.sh --bootstrap-server bigdata-pro-m04:9092 --topic spark
(2)基于Java开发实时数据生成器
这里生成的实时数据流,生成的传递给kafka,每隔1秒随机生成1000条数据。
package com.kfk.spark.news_analysis_project; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Properties; import java.util.Random; /** * 访问日志Kafka Producer * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/12 * @time : 7:51 下午 */ public class AccessProducer extends Thread{ private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd"); private static String date; // 版块内容 private static String[] sections = new String[] {"country", "international", "sport", "entertainment", "movie", "carton", "tv-show", "technology", "internet", "car", "military", "funny", "fashion", "aviation", "government"}; private static Random random = new Random(); private static int[] newOldUserArr = new int[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; private Producer<String,String> producer; private String topic; /** * 构造函数 * @param topic */ public AccessProducer(String topic){ this.topic = topic; producer = new KafkaProducer<String, String>(createProducerConfig()); date = simpleDateFormat.format(new Date()); } /** * createProducerConfig * @return */ public Properties createProducerConfig(){ Properties properties = new Properties(); properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("bootstrap.servers", "bigdata-pro-m04:9092"); return properties; } @Override public void run(){ int counter = 0; while (true){ // 生成1000条访问数据 for (int i = 0;i < 1000;i++){ String log = null; // 生成条访问数据 if (newOldUserArr[random.nextInt(10)] == 1){ log = getAccessLog(); } else { log = getRegisterLog(); } // 将数据发送给kafka producer.send(new ProducerRecord<String, String>(topic,log)); counter++; if (counter == 100){ counter = 0; try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } } } } } /** * 生成注册数据 * @return */ private static String getRegisterLog(){ StringBuffer stringBuffer = new StringBuffer(""); // 生成时间戳 long timestamp = System.currentTimeMillis(); // 随机生成userid(默认1000注册用户,每天1/10的访客是未注册用户) Long userid = 0L; int newOldUser = newOldUserArr[random.nextInt(10)]; if (newOldUser == 1){ userid = null; } else { userid = (long)random.nextInt(100000); } // 随机生成pageid,共1000个页面 long pageid = random.nextInt(1000); // 随机生成板块 String section = sections[random.nextInt(sections.length)]; // 生成固定的行为,view String action = "view"; return stringBuffer.append(date).append(" ") .append(timestamp).append(" ") .append(userid).append(" ") .append(pageid).append(" ") .append(section).append(" ") .append(action).toString(); } /** * 生成访问数据 * @return */ private static String getAccessLog(){ StringBuffer stringBuffer = new StringBuffer(""); // 生成时间戳 long timestamp = System.currentTimeMillis(); // 新用户都是userid为null Long userid = null; // 生成随机pageid,都是null Long pageid = null; // 生成随机版块,都是null String section = null; // 生成固定的行为,view String action = "register"; return stringBuffer.append(date).append(" ") .append(timestamp).append(" ") .append(userid).append(" ") .append(pageid).append(" ") .append(section).append(" ") .append(action).toString(); } public static void main(String[] args) { AccessProducer accessProducer = new AccessProducer("spark"); accessProducer.start(); } }
三、实施过程
将每一个需求写入到一个方法中,运行流程分析及结果以注释的形式显示。
关于数据模型,可以参考基于Spark SQL对新闻网站项目案例分析这篇文章
https://blog.csdn.net/weixin_45366499/article/details/111119234
package com.kfk.spark.common; import org.apache.spark.SparkConf; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaStreamingContext; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/14 * @time : 8:23 下午 */ public class CommStreamingContext { public static JavaStreamingContext getJssc(){ SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("CommStreamingContext"); return new JavaStreamingContext(conf, Durations.seconds(5)); } }
package com.kfk.spark.news_analysis_project; import com.kfk.spark.common.CommStreamingContext; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.ConsumerStrategies; import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies; import scala.Tuple2; import java.util.*; /** * @author : 蔡政洁 * @email :caizhengjie888@icloud.com * @date : 2020/12/20 * @time : 4:11 下午 */ public class NewsRealTime { /** * input data: * 2020-12-20 1608451521565 364 422 fashion view * 2020-12-20 1608451521565 38682 708 aviation view * ... * @param args */ public static void main(String[] args) throws InterruptedException { JavaStreamingContext jssc = CommStreamingContext.getJssc(); // sparkstreaming与kafka连接 Map<String, Object> kafkaParams = new HashMap<>(); kafkaParams.put("bootstrap.servers", "bigdata-pro-m04:9092"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "streaming_kafka_1"); kafkaParams.put("auto.offset.reset", "latest"); kafkaParams.put("enable.auto.commit", false); // 设置topic Collection<String> topics = Collections.singletonList("spark"); // kafka数据源 JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream( jssc, LocationStrategies.PreferConsistent(), ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams) ); /** * stream -> map -> JavaDStream */ JavaDStream<String> accessDstream = stream.map(new Function<ConsumerRecord<String, String>, String>() { @Override public String call(ConsumerRecord<String, String> v1) throws Exception { return v1.value(); } }); /** * accessDStream -> filter -> action(view) */ JavaDStream<String> filterDstream = accessDstream.filter(new Function<String, Boolean>() { @Override public Boolean call(String v1) throws Exception { String[] lines = v1.split(" "); String action = lines[5]; String actionValue = "view"; if (actionValue.equals(action)){ return true; } else { return false; } } }); // 求网页的pv calculatePagePV(filterDstream); // 求网页的uv calculatePageUV(filterDstream); // 求注册用户数 calculateRegistercount(accessDstream); // 求热门板块 calculateUserSectionPV(accessDstream); jssc.start(); jssc.awaitTermination(); } /** * 求网页的pv * input data: * 2020-12-20 1608451521565 364 422 fashion view * * 数据演化过程: * filterDstream -> mapToPair -> <2020-12-20_422,1> -> reduceByKey -> <2020-12-20_422,5> * * @param filterDstream */ public static void calculatePagePV(JavaDStream<String> filterDstream){ /** * filterDstream -> mapToPair -> <2020-12-20_422,1> */ JavaPairDStream<String,Integer> pairDstream = filterDstream.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String,Integer> call(String lines) throws Exception { String[] line = lines.split(" "); return new Tuple2<String,Integer>(line[0] + "_" + line[3], 1); } }); /** * pairDstream -> reduceByKey -> <2020-12-20_422,5> */ JavaPairDStream<String,Integer> pvStream = pairDstream.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); pvStream.print(); /** * (2020-12-21_16,1) * (2020-12-21_548,1) * (2020-12-21_881,1) * (2020-12-21_27,1) * (2020-12-21_771,1) * (2020-12-21_344,2) * (2020-12-21_313,1) * (2020-12-21_89,1) * (2020-12-21_14,1) * (2020-12-21_366,1) * ... */ } /** * 求网页的uv * input data: * 2020-12-20 1608451521565 364 422 fashion view * 2020-12-20 1608451521565 364 422 fashion view * 2020-12-20 1608451521565 365 422 fashion view * 2020-12-20 1608451521565 366 422 fashion view * 2020-12-20 1608451521565 367 422 fashion view * 2020-12-20 1608451521565 367 453 fashion view * * 数据演化过程: * 第一步:map * (2020-12-20,364,422) * (2020-12-20,364,422) * (2020-12-20,365,422) * (2020-12-20,366,422) * (2020-12-20,367,422) * (2020-12-20,367,453) * * 第二步:rdd -> distinct * (2020-12-20,364,422) * (2020-12-20,365,422) * (2020-12-20,366,422) * (2020-12-20,367,422) * (2020-12-20,367,453) * * 第三步:mapToPair * <2020-12-20_422,1> * <2020-12-20_422,1> * <2020-12-20_422,1> * <2020-12-20_422,1> * <2020-12-20_453,1> * * 第四步:reduceByKey * <2020-12-20_422,4> * <2020-12-20_453,1> * * @param filterDstream */ public static void calculatePageUV(JavaDStream<String> filterDstream){ /** * filterDstream -> map -> (2020-12-20,364,422) */ JavaDStream<String> mapDstream = filterDstream.map(new Function<String, String>() { @Override public String call(String lines) throws Exception { String[] line = lines.split(" "); return line[0] + "," + line[2] + "," + line[3]; } }); /** * mapDstream -> distinct */ JavaDStream<String> distinctDstream = mapDstream.transform(new Function<JavaRDD<String>, JavaRDD<String>>() { @Override public JavaRDD<String> call(JavaRDD<String> lines) throws Exception { return lines.distinct(); } }); /** * distinctDstream -> mapToPair -> <2020-12-20_422,1> */ JavaPairDStream<String,Integer> pairDstream = distinctDstream.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String lines) throws Exception { String[] line = lines.split(","); return new Tuple2<>(line[0] + "_" + line[2], 1); } }); /** * pairDstream -> reduceByKey -> <2020-12-20_422,4> */ JavaPairDStream<String,Integer> uvStream = pairDstream.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); uvStream.print(); /** * (2020-12-21_492,1) * (2020-12-21_85,2) * (2020-12-21_18,1) * (2020-12-21_27,2) * (2020-12-21_825,1) * (2020-12-21_366,1) * (2020-12-21_89,1) * (2020-12-21_14,2) * (2020-12-21_69,1) * (2020-12-21_188,1) * ... */ } /** * 求注册用户数:过滤出action=register的数据就可以 * input data: * 2020-12-20 1608451521565 364 422 fashion view * * 数据演化过程: * accessDStream -> filter -> action(register) -> mapToPair -> reduceByKey * * @param accessDstream */ public static void calculateRegistercount(JavaDStream<String> accessDstream){ /** * accessDStream -> filter -> action(register) */ JavaDStream<String> filterDstream = accessDstream.filter(new Function<String, Boolean>() { @Override public Boolean call(String v1) throws Exception { String[] lines = v1.split(" "); String action = lines[5]; String actionValue = "register"; if (actionValue.equals(action)){ return true; } else { return false; } } }); /** * filterDstream -> mapToPair -> <2020-12-20_register,1> */ JavaPairDStream<String,Integer> pairDstream = filterDstream.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String,Integer> call(String lines) throws Exception { String[] line = lines.split(" "); return new Tuple2<String,Integer>(line[0] + "_" + line[5], 1); } }); /** * pairDstream -> reduceByKey -> <2020-12-20_register,5> */ JavaPairDStream<String,Integer> registerCountStream = pairDstream.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); registerCountStream.print(); /** * (2020-12-21_register,11) */ } /** * 求出热门板块 * input data: * 2020-12-20 1608451521565 364 422 fashion view * * 数据演化过程: * filterDstream -> mapToPair -> <2020-12-20_fashion,1> -> reduceByKey -> <2020-12-20_fashion,5> * * @param filterDstream */ public static void calculateUserSectionPV(JavaDStream<String> filterDstream){ /** * filterDstream -> mapToPair -> <2020-12-20_fashion,1> */ JavaPairDStream<String,Integer> pairDstream = filterDstream.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String,Integer> call(String lines) throws Exception { String[] line = lines.split(" "); return new Tuple2<String,Integer>(line[0] + "_" + line[4], 1); } }); /** * pairDstream -> reduceByKey -> <2020-12-20_fashion,5> */ JavaPairDStream<String,Integer> pvStream = pairDstream.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1+v2; } }); pvStream.print(); /** * (2020-12-21_internet,16) * (2020-12-21_military,24) * (2020-12-21_aviation,21) * (2020-12-21_carton,19) * (2020-12-21_government,25) * (2020-12-21_tv-show,19) * (2020-12-21_country,14) * (2020-12-21_movie,13) * (2020-12-21_international,16) * ... */ } }
这里没有做过多的业务分析,可以根据前面所学的知识进行扩展,将算子灵活运用组合起来!