五、标签开发
数据接入
数据的接入可以通过将数据实时写入Kafka进行接入,不管是直接的写入还是通过oracle和mysql的实时接入方式,比如oracle的ogg,mysql的binlog
ogg
Golden Gate(简称OGG)提供异构环境下交易数据的实时捕捉、变换、投递。
通过OGG可以实时的将oracle中的数据写入Kafka中。
对生产系统影响小:实时读取交易日志,以低资源占用实现大交易量数据实时复制
以交易为单位复制,保证交易一致性:只同步已提交的数据
高性能
- 智能的交易重组和操作合并
- 使用数据库本地接口访问
- 并行处理体系
binlog
MySQL 的二进制日志 binlog 可以说是 MySQL 最重要的日志,它记录了所有的 DDL
和 DML
语句(除了数据查询语句select、show等),以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是事务安全型的。binlog 的主要目的是复制和恢复。
通过这些手段,可以将数据同步到kafka也就是我们的实时系统中来。
Flink接入Kafka数据
Apache Kafka Connector可以方便对kafka数据的接入。
依赖
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.9.0</version></dependency>
构建FlinkKafkaConsumer
必须有的:
1.topic名称
2.用于反序列化Kafka数据的DeserializationSchema / KafkaDeserializationSchema
3.配置参数:“bootstrap.servers” “group.id” (kafka0.8还需要 “zookeeper.connect”)
val properties = new Properties()properties.setProperty("bootstrap.servers", "localhost:9092")// only required for Kafka 0.8properties.setProperty("zookeeper.connect", "localhost:2181")properties.setProperty("group.id", "test")stream = env .addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties)) .print()
时间戳和水印
在许多情况下,记录的时间戳(显式或隐式)嵌入记录本身。另外,用户可能想要周期性地或以不规则的方式发出水印。
我们可以定义好Timestamp Extractors / Watermark Emitters,通过以下方式将其传递给消费者
val env = StreamExecutionEnvironment.getExecutionEnvironment()val myConsumer = new FlinkKafkaConsumer[String](...)myConsumer.setStartFromEarliest() // start from the earliest record possiblemyConsumer.setStartFromLatest() // start from the latest recordmyConsumer.setStartFromTimestamp(...) // start from specified epoch timestamp (milliseconds)myConsumer.setStartFromGroupOffsets() // the default behaviour//指定位置//val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()//specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)//myConsumer.setStartFromSpecificOffsets(specificStartOffsets)val stream = env.addSource(myConsumer)
检查点
启用Flink的检查点后,Flink Kafka Consumer将使用主题中的记录,并以一致的方式定期检查其所有Kafka偏移以及其他操作的状态。如果作业失败,Flink会将流式程序恢复到最新检查点的状态,并从存储在检查点中的偏移量开始重新使用Kafka的记录。
如果禁用了检查点,则Flink Kafka Consumer依赖于内部使用的Kafka客户端的自动定期偏移提交功能。
如果启用了检查点,则Flink Kafka Consumer将在检查点完成时提交存储在检查点状态中的偏移量。
val env = StreamExecutionEnvironment.getExecutionEnvironment()env.enableCheckpointing(5000) // checkpoint every 5000 msecs
Flink消费Kafka完整代码:
import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import java.util.Properties; public class KafkaConsumer { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "localhost:9092"); properties.setProperty("group.id", "test"); //构建FlinkKafkaConsumer FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties); //指定偏移量 myConsumer.setStartFromEarliest(); DataStream<String> stream = env .addSource(myConsumer); env.enableCheckpointing(5000); stream.print(); env.execute("Flink Streaming Java API Skeleton"); }
这样数据已经实时的接入我们系统中,可以在Flink中对数据进行处理了,那么如何对标签进行计算呢?标签的计算过程极大的依赖于数据仓库的能力,所以拥有了一个好的数据仓库,标签也就很容易计算出来了。
数据仓库基础知识
数据仓库是指一个面向主题的、集成的、稳定的、随时间变化的数据的集合,以用于支持管理决策的过程。
(1)面向主题 业务数据库中的数据主要针对事物处理任务,各个业务系统之间是各自分离的。而数据仓库中的数据是按照一定的主题进行组织的
(2)集成 数据仓库中存储的数据是从业务数据库中提取出来的,但并不是原有数据的简单复制,而是经过了抽取、清理、转换(ETL)等工作。业务数据库记录的是每一项业务处理的流水账,这些数据不适合于分析处理,进入数据仓库之前需要经过系列计算,同时抛弃一些分析处理不需要的数据。
(3)稳定 操作型数据库系统中一般只存储短期数据,因此其数据是不稳定的,记录的是系统中数据变化的瞬态。数据仓库中的数据大多表示过去某一时刻的数据,主要用于查询、分析,不像业务系统中数据库一样经常修改。一般数据仓库构建完成,主要用于访问
OLTP 联机事务处理 OLTP是传统关系型数据库的主要应用,主要用于日常事物、交易系统的处理 1、数据量存储相对来说不大 2、实时性要求高,需要支持事物 3、数据一般存储在关系型数据库(oracle或mysql中)
OLAP 联机分析处理 OLAP是数据仓库的主要应用,支持复杂的分析查询,侧重决策支持 1、实时性要求不是很高,ETL一般都是T+1的数据;2、数据量很大;3、主要用于分析决策;
星形模型是最常用的数据仓库设计结构。由一个事实表和一组维表组成,每个维表都有一个维主键。该模式核心是事实表,通过事实表将各种不同的维表连接起来,各个维表中的对象通过事实表与另一个维表中的对象相关联,这样建立各个维表对象之间的联系 维表:用于存放维度信息,包括维的属性和层次结构;事实表:是用来记录业务事实并做相应指标统计的表。同维表相比,事实表记录数量很多。
雪花模型是对星形模型的扩展,每一个维表都可以向外连接多个详细类别表。除了具有星形模式中维表的功能外,还连接对事实表进行详细描述的维度,可进一步细化查看数据的粒度 例如:地点维表包含属性集{location_id,街道,城市,省,国家}。这种模式通过地点维度表的city_id与城市维度表的city_id相关联,得到如{101,“解放大道10号”,“武汉”,“湖北省”,“中国”}、{255,“解放大道85号”,“武汉”,“湖北省”,“中国”}这样的记录。星形模型是最基本的模式,一个星形模型有多个维表,只存在一个事实表。在星形模式的基础上,用多个表来描述一个复杂维,构造维表的多层结构,就得到雪花模型。
清晰数据结构:每一个数据分层都有它的作用域,这样我们在使用表的时候能更方便地定位和理解 脏数据清洗:屏蔽原始数据的异常 屏蔽业务影响:不必改一次业务就需要重新接入数据 数据血缘追踪:简单来讲可以这样理解,我们最终给业务呈现的是能直接使用的一张业务表,但是它的来源有很多,如果有一张来源表出问题了,我们希望能够快速准确地定位到问题,并清楚它的危害范围。减少重复开发:规范数据分层,开发一些通用的中间层数据,能够减少极大的重复计算。把复杂问题简单化。将一个复杂的任务分解成多个步骤来完成,每一层只处理单一的步骤,比较简单和容易理解。便于维护数据的准确性,当数据出现问题之后,可以不用修复所有的数据,只需要从有问题的步骤开始修复。
数据仓库的数据直接对接OLAP或日志类数据, 用户画像只是站在用户的角度,对数据仓库数据做进一步的建模加工。因此每天画像标签相关数据的调度依赖上游数据仓库相关任务执行完成。
在了解了数据仓库以后,我们就可以进行标签的计算了。在开发好标签的逻辑以后,将数据写入hive和druid中,完成实时与离线的标签开发工作。
Flink 与Hive和 Druid集成
Flink+Hive
Flink从1.9开始支持集成Hive,在Flink1.10版本,标志着对 Blink的整合宣告完成,随着对 Hive 的生产级别集成,Hive作为数据仓库系统的绝对核心,承担着绝大多数的离线数据ETL计算和数据管理,期待Flink未来对Hive的完美支持。
而 HiveCatalog 会与一个 Hive Metastore 的实例连接,提供元数据持久化的能力。要使用 Flink 与 Hive 进行交互,用户需要配置一个 HiveCatalog,并通过 HiveCatalog 访问 Hive 中的元数据。
添加依赖
要与Hive集成,需要在Flink的lib目录下添加额外的依赖jar包,以使集成在Table API程序或SQL Client中的SQL中起作用。或者,可以将这些依赖项放在文件夹中,并分别使用Table API程序或SQL Client 的-C
或-l
选项将它们添加到classpath中。本文使用第一种方式,即将jar包直接复制到$FLINK_HOME/lib目录下。本文使用的Hive版本为2.3.4(对于不同版本的Hive,可以参照官网选择不同的jar包依赖),总共需要3个jar包,如下:
- flink-connector-hive_2.11-1.10.0.jar
- flink-shaded-hadoop-2-uber-2.7.5-8.0.jar
- hive-exec-2.3.4.jar
添加Maven依赖
<!-- Flink Dependency --> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-hive_2.11</artifactId> <version>1.10.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-api-java-bridge_2.11</artifactId> <version>1.10.0</version> <scope>provided</scope> </dependency> <!-- Hive Dependency --> <dependency> <groupId>org.apache.hive</groupId> <artifactId>hive-exec</artifactId> <version>${hive.version}</version> <scope>provided</scope> </dependency>
实例代码
package com.flink.sql.hiveintegration; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.catalog.hive.HiveCatalog; public class FlinkHiveIntegration { public static void main(String[] args) throws Exception { EnvironmentSettings settings = EnvironmentSettings .newInstance() .useBlinkPlanner() // 使用BlinkPlanner .inBatchMode() // Batch模式,默认为StreamingMode .build(); //使用StreamingMode /* EnvironmentSettings settings = EnvironmentSettings .newInstance() .useBlinkPlanner() // 使用BlinkPlanner .inStreamingMode() // StreamingMode .build();*/ TableEnvironment tableEnv = TableEnvironment.create(settings); String name = "myhive"; // Catalog名称,定义一个唯一的名称表示 String defaultDatabase = "qfbap_ods"; // 默认数据库名称 String hiveConfDir = "/opt/modules/apache-hive-2.3.4-bin/conf"; // hive-site.xml路径 String version = "2.3.4"; // Hive版本号 HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version); tableEnv.registerCatalog("myhive", hive); tableEnv.useCatalog("myhive"); // 创建数据库,目前不支持创建hive表 String createDbSql = "CREATE DATABASE IF NOT EXISTS myhive.test123"; tableEnv.sqlUpdate(createDbSql); } }
Flink+Druid
可以将Flink分析好的数据写回kafka,然后在druid中接入数据,也可以将数据直接写入druid,以下为示例代码:
依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.8.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.flinkdruid</groupId> <artifactId>FlinkDruid</artifactId> <version>0.0.1-SNAPSHOT</version> <name>FlinkDruid</name> <description>Flink Druid Connection</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-core</artifactId> <version>1.9.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>1.9.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
示例代码
@SpringBootApplication public class FlinkDruidApp { private static String url = "http://localhost:8200/v1/post/wikipedia"; private static RestTemplate template; private static HttpHeaders headers; FlinkDruidApp() { template = new RestTemplate(); headers = new HttpHeaders(); headers.setAccept(Arrays.asList(MediaType.APPLICATION_JSON)); headers.setContentType(MediaType.APPLICATION_JSON); } public static void main(String[] args) throws Exception { SpringApplication.run(FlinkDruidApp.class, args); // Creating Flink Execution Environment ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); //Define data source DataSet<String> data = env.readTextFile("/wikiticker-2015-09-12-sampled.json"); // Trasformation on the data data.map(x -> { return httpsPost(x).toString(); }).print(); } // http post method to post data in Druid private static ResponseEntity<String> httpsPost(String json) { HttpEntity<String> requestEntity = new HttpEntity<>(json, headers); ResponseEntity<String> response = template.exchange("http://localhost:8200/v1/post/wikipedia", HttpMethod.POST, requestEntity, String.class); return response; } @Bean public RestTemplate restTemplate() { return new RestTemplate(); } }
标签的开发工作繁琐,需要不断的开发并且优化,但是如何将做好的标签提供出去产生真正的价值呢?下一章,我们将介绍用户画像产品化。
六、用户画像产品化
在开发好用户标签以后,如何将标签应用到实际其实是一个很重要的问题。只有做好产品的设计才能让标签发挥真正的价值,本文将介绍用户画像的产品化过程。
1、标签展示
首先是标签展示功能,这个主要供业务人员和研发人员使用,是为了更直观的看见整个的用户标签体系。
不同的标签体系会有不同的层级,那么这个页面的设计就需要我们展示成树状的结构,方便以后的扩展。
在最后一个层级,比如自然性别,可以设计一个统计页面,在进入页面后,可以展示相应的数据统计情况,
可以更直观看见标签中值得比例,也可以为业务提供好的建议,另外可以对标签的具体描述进行展示,起到一个说明的作用,还可以展示标签按天的波动情况,观察标签的变化情况。
这一部分的数据来源呢?之前也提到过,这些标签的元数据信息都存在mysql中,方便我们查询。
所以树状图和标签描述信息需要去mysql中获取,而比例等图表数据则是从Hbase,Hive中查询获取的,当然也有直接通过ES获取的。但是每天的标签历史波动情况,还是要通过每天跑完标签后存在mysql中作为历史记录进行展示。
2、标签查询
这一功能可以提供给研发人员和业务人员使用。
标签查询功能其实就是对用户进行全局画像的过程,对于一个用户的全量标签信息,我们是需要对其进行展示的。
输入用户id后,可以查看该用户的属性信息、行为信息、风控属性等信息。从多方位了解一个具体的用户特征。
这些已经是标签的具体信息了,由于是对单一id的查找,从hive中获取会造成查询速度的问题,所以我们更建议从Hbase或者ES中查询获取,这样查询效率和实时性都能获得极大的提升。
3、标签管理
这一功能是提供给研发人员使用的。
对于标签,不能每一次新增一个标签都进行非常大改动,这样是非常耗费人力的,所以必须要有可以对标签进行管理的功能。
这里定义了标签的基本信息,开发方式,开发人员等等,在完成标签的开发以后,直接在此页面对标签进行录入,就可以完成标签的上线工作,让业务人员可以对标签进行使用。
新增和编辑标签的页面,可以提供下拉框或者输入框提供信息录入的功能。
之前已经提到过,这些标签的元数据信息都保存在了Mysql中,只要完成对其的新增和修改就可以了。
4、用户分群
作为用户画像最核心的功能,用户分群功能。是用户画像与业务系统建立联系的桥梁,也是用户画像的价值所在。
这项功能主要供业务人员使用。
此功能允许用户自定义的圈定一部分人员,圈定的规则就是对于标签的条件约束。
在圈定好人群以后,可以对这部分人群提供与业务系统的外呼系统,客服系统,广告系统,Push系统的交互,达到真正的精细化运营的目的。
对于标签规则的判断,需要将记录好的规则存储于Mysql中,在进行人群计算时又需要将规则解析成可计算的逻辑。不管是解析成Sql或者其他的查询语言都难度巨大,这对于研发是一个非常大的挑战。
在此功能中,还可以增加人群对比的功能,对不同人群的不同标签进行圈定,对比。这对于查询性能也是一个巨大的考验。
但是,用户分群功能作为用户画像的核心是我们必须要实现的。对于技术架构,Hbase更擅长与KV形式的查询,对于多维度查询性能较差,所以可以采取ES索引,在ES查询出Hbase的Rowkey,再去查询Hbase的方式。也有很多公司选择整体迁移到ES中完成此项工作。