字段解释
session_id:唯一标识我们的会话
session_server_time:最早访问的那条session创建的时间
landing_url:对应第一个访问的URL,即同属于一个session的最早时间那个
domain:域名
数据解释
观察前面两个表格的数据,表一共有10条,表二的pageview_count与click_count字段的数据加起来正好也是10条的。
b. 定义Schema信息
既然我们是准备用Parquet格式进行存储,那么就要定义好我们的Schema,如:
{"namespace": "com.shaonaiyi.spark.session", "type": "record", "name": "TrackerLog", "fields": [ {"name": "log_type", "type": "string"}, {"name": "log_server_time", "type": "string"}, {"name": "cookie", "type": "string"}, {"name": "ip", "type": "string"}, {"name": "url", "type": "string"} ] }
{"namespace": "com.shaonaiyi.spark.session", "type": "record", "name": "TrackerSession", "fields": [ {"name": "session_id", "type": "string"}, {"name": "session_server_time", "type": "string"}, {"name": "cookie", "type": "string"}, {"name": "cookie_label", "type": "string"}, {"name": "ip", "type": "string"}, {"name": "landing_url", "type": "string"}, {"name": "pageview_count", "type": "int"}, {"name": "click_count", "type": "int"}, {"name": "domain", "type": "string"}, {"name": "domain_label", "type": "string"} ] }
将上面的Schema信息保存成两个文件,分别是:TrackerLog.avsc
、TrackerSession.avsc
,现在我们的准备工作已经完成了,接下来我们就可以开始构建一个项目了。
0x02 编程实现
1. 构建Maven项目
本人博客里面有很多教程,这里不再重复,相信学到这里的人,已经对这些基础知识相当熟练。
需要注意的是:
1、包名要与前面Schema信息定义的一致,如我的是:com.shaonaiyi.spark.session
,当然,你可以继续先操作,不创建包名先。
2. 编码前准备工作
a. 引入依赖及插件(完整pom.xml
文件如下)
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.shaonaiyi</groupId> <artifactId>spark-sessioncut</artifactId> <version>1.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>2.2.0</version> <!--<scope>provided</scope>--> </dependency> <dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-avro</artifactId> <version>1.8.1</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> <testExcludes> <testExclude>/src/test/**</testExclude> </testExcludes> <encoding>utf-8</encoding> </configuration> </plugin> <plugin> <groupId>net.alchim31.maven</groupId> <artifactId>scala-maven-plugin</artifactId> <version>3.1.6</version> <executions> <execution> <goals> <goal>compile</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.apache.avro</groupId> <artifactId>avro-maven-plugin</artifactId> <version>1.7.7</version> <executions> <execution> <phase>generate-sources</phase> <goals> <goal>schema</goal> </goals> <configuration> <sourceDirectory>${project.basedir}/src/main/avro</sourceDirectory> <outputDirectory>${project.basedir}/src/main/java</outputDirectory> </configuration> </execution> </executions> </plugin> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> </configuration> <executions> <execution> <id>make-assembly</id> <!-- this is used for inheritance merges --> <phase>package</phase> <!-- 指定在打包节点执行jar包合并操作 --> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> </plugins> </build> </project>
b. 引入Avro Schema文件
将TrackerLog.avsc、TrackerSession.avsc两个文件上传到项目的avro文件夹(自己新创建)
c. 引入数据源文件并引入Scala SDK
将数据源文件(visit_log.txt、cookie_label.txt)复制到项目的data文件夹(自己新创建),因为在开发的时候,我们一般是先将一部分测试的数据上传到项目来测试,测试完成后,再将路径修改为HDFS上的路径即可,这是开发的流程。创建scala代码源文件夹并引入Scala SDK,下面是目前的项目结构,请参考:
3. 实现源数据的获取
a. 根据Schema文件生成Java对应的类(因为Maven里引入了avro-maven-plugin
插件,所以直接点击Maven里的compile
即可生成相应的类)
编译完后,可以看到已经生成了相对应的文件,而且自动创建了包:
b. 新建一个工具类RawLogParserUtil
:
package com.shaonaiyi.session import com.shaonaiyi.spark.session.TrackerLog /** * @Auther: shaonaiyi@163.com * @Date: 2019/9/12 09:40 * @Description: 将每一行原始日志解析成TrackerLog对象 */ object RawLogParserUtil { def parse(line: String): Option[TrackerLog] = { if (line.startsWith("#")) None else { val fields = line.split("\\|") val trackerLog = new TrackerLog() trackerLog.setLogType(fields(0)) trackerLog.setLogServerTime(fields(1)) trackerLog.setCookie(fields(2)) trackerLog.setIp(fields(3)) trackerLog.setUrl(fields(4)) Some(trackerLog) } } }