从理论到工程实践——用户画像入门宝典(二)

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
实时计算 Flink 版,5000CU*H 3个月
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
简介: 用户画像是大数据顶层应用中最重要的一环,搭建一套适合本公司体系的用户画像尤为重要。但是,用户画像的资料往往理论居多,实践少,更少有工程化的实战案例。本文档结合了常见的用户画像架构,使用Elasticsearch作为底层存储支撑,用户画像的检索和可视化效率得到了大幅度的提升。文档从用户画像的理论到实践均有所涉及,大家可以参照此文档完成用户画像系统从0到1的搭建。

五、标签开发


数据接入

数据的接入可以通过将数据实时写入Kafka进行接入,不管是直接的写入还是通过oracle和mysql的实时接入方式,比如oracle的ogg,mysql的binlog

ogg

Golden Gate(简称OGG)提供异构环境下交易数据的实时捕捉、变换、投递。

通过OGG可以实时的将oracle中的数据写入Kafka中。

image.png

对生产系统影响小:实时读取交易日志,以低资源占用实现大交易量数据实时复制

以交易为单位复制,保证交易一致性:只同步已提交的数据

高性能

  • 智能的交易重组和操作合并
  • 使用数据库本地接口访问
  • 并行处理体系

binlog

MySQL 的二进制日志 binlog 可以说是 MySQL 最重要的日志,它记录了所有的 DDLDML 语句(除了数据查询语句select、show等),以事件形式记录,还包含语句所执行的消耗的时间,MySQL的二进制日志是事务安全型的。binlog 的主要目的是复制和恢复

image.png

通过这些手段,可以将数据同步到kafka也就是我们的实时系统中来。

Flink接入Kafka数据

Apache Kafka Connector可以方便对kafka数据的接入。

依赖

<dependency>  <groupId>org.apache.flink</groupId>  <artifactId>flink-connector-kafka_2.11</artifactId>  <version>1.9.0</version></dependency>
构建FlinkKafkaConsumer

必须有的:

1.topic名称

2.用于反序列化Kafka数据的DeserializationSchema / KafkaDeserializationSchema

3.配置参数:“bootstrap.servers” “group.id” (kafka0.8还需要 “zookeeper.connect”)

val properties = new Properties()properties.setProperty("bootstrap.servers", "localhost:9092")// only required for Kafka 0.8properties.setProperty("zookeeper.connect", "localhost:2181")properties.setProperty("group.id", "test")stream = env    .addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties))    .print()
时间戳和水印

在许多情况下,记录的时间戳(显式或隐式)嵌入记录本身。另外,用户可能想要周期性地或以不规则的方式发出水印。

我们可以定义好Timestamp Extractors / Watermark Emitters,通过以下方式将其传递给消费者

val env = StreamExecutionEnvironment.getExecutionEnvironment()val myConsumer = new FlinkKafkaConsumer[String](...)myConsumer.setStartFromEarliest()      // start from the earliest record possiblemyConsumer.setStartFromLatest()        // start from the latest recordmyConsumer.setStartFromTimestamp(...)  // start from specified epoch timestamp (milliseconds)myConsumer.setStartFromGroupOffsets()  // the default behaviour//指定位置//val specificStartOffsets = new java.util.HashMap[KafkaTopicPartition, java.lang.Long]()//specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L)//myConsumer.setStartFromSpecificOffsets(specificStartOffsets)val stream = env.addSource(myConsumer)
检查点

启用Flink的检查点后,Flink Kafka Consumer将使用主题中的记录,并以一致的方式定期检查其所有Kafka偏移以及其他操作的状态。如果作业失败,Flink会将流式程序恢复到最新检查点的状态,并从存储在检查点中的偏移量开始重新使用Kafka的记录。

如果禁用了检查点,则Flink Kafka Consumer依赖于内部使用的Kafka客户端的自动定期偏移提交功能。

如果启用了检查点,则Flink Kafka Consumer将在检查点完成时提交存储在检查点状态中的偏移量。

val env = StreamExecutionEnvironment.getExecutionEnvironment()env.enableCheckpointing(5000) // checkpoint every 5000 msecs

Flink消费Kafka完整代码:

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaConsumer {    
public static void main(String[] args) throws Exception {        
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        Properties properties = new Properties();        
properties.setProperty("bootstrap.servers", "localhost:9092");        properties.setProperty("group.id", "test");        //构建FlinkKafkaConsumer        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties);        //指定偏移量        
myConsumer.setStartFromEarliest();        
DataStream<String> stream = env                
.addSource(myConsumer);        
env.enableCheckpointing(5000);        
stream.print();        
env.execute("Flink Streaming Java API Skeleton");    
}

这样数据已经实时的接入我们系统中,可以在Flink中对数据进行处理了,那么如何对标签进行计算呢?标签的计算过程极大的依赖于数据仓库的能力,所以拥有了一个好的数据仓库,标签也就很容易计算出来了。

数据仓库基础知识

数据仓库是指一个面向主题的、集成的、稳定的、随时间变化的数据的集合,以用于支持管理决策的过程。

(1)面向主题 业务数据库中的数据主要针对事物处理任务,各个业务系统之间是各自分离的。而数据仓库中的数据是按照一定的主题进行组织的

(2)集成 数据仓库中存储的数据是从业务数据库中提取出来的,但并不是原有数据的简单复制,而是经过了抽取、清理、转换(ETL)等工作。业务数据库记录的是每一项业务处理的流水账,这些数据不适合于分析处理,进入数据仓库之前需要经过系列计算,同时抛弃一些分析处理不需要的数据。

(3)稳定 操作型数据库系统中一般只存储短期数据,因此其数据是不稳定的,记录的是系统中数据变化的瞬态。数据仓库中的数据大多表示过去某一时刻的数据,主要用于查询、分析,不像业务系统中数据库一样经常修改。一般数据仓库构建完成,主要用于访问

image.png

OLTP 联机事务处理 OLTP是传统关系型数据库的主要应用,主要用于日常事物、交易系统的处理 1、数据量存储相对来说不大 2、实时性要求高,需要支持事物 3、数据一般存储在关系型数据库(oracle或mysql中)

OLAP 联机分析处理 OLAP是数据仓库的主要应用,支持复杂的分析查询,侧重决策支持 1、实时性要求不是很高,ETL一般都是T+1的数据;2、数据量很大;3、主要用于分析决策;

星形模型是最常用的数据仓库设计结构。由一个事实表和一组维表组成,每个维表都有一个维主键。该模式核心是事实表,通过事实表将各种不同的维表连接起来,各个维表中的对象通过事实表与另一个维表中的对象相关联,这样建立各个维表对象之间的联系 维表:用于存放维度信息,包括维的属性和层次结构;事实表:是用来记录业务事实并做相应指标统计的表。同维表相比,事实表记录数量很多。

image.png

雪花模型是对星形模型的扩展,每一个维表都可以向外连接多个详细类别表。除了具有星形模式中维表的功能外,还连接对事实表进行详细描述的维度,可进一步细化查看数据的粒度 例如:地点维表包含属性集{location_id,街道,城市,省,国家}。这种模式通过地点维度表的city_id与城市维度表的city_id相关联,得到如{101,“解放大道10号”,“武汉”,“湖北省”,“中国”}、{255,“解放大道85号”,“武汉”,“湖北省”,“中国”}这样的记录。星形模型是最基本的模式,一个星形模型有多个维表,只存在一个事实表。在星形模式的基础上,用多个表来描述一个复杂维,构造维表的多层结构,就得到雪花模型。

image.png

清晰数据结构:每一个数据分层都有它的作用域,这样我们在使用表的时候能更方便地定位和理解 脏数据清洗:屏蔽原始数据的异常 屏蔽业务影响:不必改一次业务就需要重新接入数据 数据血缘追踪:简单来讲可以这样理解,我们最终给业务呈现的是能直接使用的一张业务表,但是它的来源有很多,如果有一张来源表出问题了,我们希望能够快速准确地定位到问题,并清楚它的危害范围。减少重复开发:规范数据分层,开发一些通用的中间层数据,能够减少极大的重复计算。把复杂问题简单化。将一个复杂的任务分解成多个步骤来完成,每一层只处理单一的步骤,比较简单和容易理解。便于维护数据的准确性,当数据出现问题之后,可以不用修复所有的数据,只需要从有问题的步骤开始修复。

image.png

数据仓库的数据直接对接OLAP或日志类数据, 用户画像只是站在用户的角度,对数据仓库数据做进一步的建模加工。因此每天画像标签相关数据的调度依赖上游数据仓库相关任务执行完成。

在了解了数据仓库以后,我们就可以进行标签的计算了。在开发好标签的逻辑以后,将数据写入hive和druid中,完成实时与离线的标签开发工作。

Flink 与Hive和 Druid集成

Flink+Hive

Flink从1.9开始支持集成Hive,在Flink1.10版本,标志着对 Blink的整合宣告完成,随着对 Hive 的生产级别集成,Hive作为数据仓库系统的绝对核心,承担着绝大多数的离线数据ETL计算和数据管理,期待Flink未来对Hive的完美支持。

而 HiveCatalog 会与一个 Hive Metastore 的实例连接,提供元数据持久化的能力。要使用 Flink 与 Hive 进行交互,用户需要配置一个 HiveCatalog,并通过 HiveCatalog 访问 Hive 中的元数据。

添加依赖

要与Hive集成,需要在Flink的lib目录下添加额外的依赖jar包,以使集成在Table API程序或SQL Client中的SQL中起作用。或者,可以将这些依赖项放在文件夹中,并分别使用Table API程序或SQL Client 的-C-l选项将它们添加到classpath中。本文使用第一种方式,即将jar包直接复制到$FLINK_HOME/lib目录下。本文使用的Hive版本为2.3.4(对于不同版本的Hive,可以参照官网选择不同的jar包依赖),总共需要3个jar包,如下:

  • flink-connector-hive_2.11-1.10.0.jar
  • flink-shaded-hadoop-2-uber-2.7.5-8.0.jar
  • hive-exec-2.3.4.jar

添加Maven依赖

<!-- Flink Dependency -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-hive_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-table-api-java-bridge_2.11</artifactId>
  <version>1.10.0</version>
  <scope>provided</scope>
</dependency>
<!-- Hive Dependency -->
<dependency>
    <groupId>org.apache.hive</groupId>
    <artifactId>hive-exec</artifactId>
    <version>${hive.version}</version>
    <scope>provided</scope>
</dependency>

实例代码

package com.flink.sql.hiveintegration;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.catalog.hive.HiveCatalog;
public class FlinkHiveIntegration {
    public static void main(String[] args) throws Exception {
        EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner() // 使用BlinkPlanner
                .inBatchMode() // Batch模式,默认为StreamingMode
                .build();
        //使用StreamingMode
       /* EnvironmentSettings settings = EnvironmentSettings
                .newInstance()
                .useBlinkPlanner() // 使用BlinkPlanner
                .inStreamingMode() // StreamingMode
                .build();*/
        TableEnvironment tableEnv = TableEnvironment.create(settings);
        String name = "myhive";      // Catalog名称,定义一个唯一的名称表示
        String defaultDatabase = "qfbap_ods";  // 默认数据库名称
        String hiveConfDir = "/opt/modules/apache-hive-2.3.4-bin/conf";  // hive-site.xml路径
        String version = "2.3.4";       // Hive版本号
        HiveCatalog hive = new HiveCatalog(name, defaultDatabase, hiveConfDir, version);
        tableEnv.registerCatalog("myhive", hive);
        tableEnv.useCatalog("myhive");
        // 创建数据库,目前不支持创建hive表
        String createDbSql = "CREATE DATABASE IF NOT EXISTS myhive.test123";
        tableEnv.sqlUpdate(createDbSql);
    }
}

Flink+Druid

可以将Flink分析好的数据写回kafka,然后在druid中接入数据,也可以将数据直接写入druid,以下为示例代码:

依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.8.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.flinkdruid</groupId>
    <artifactId>FlinkDruid</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>FlinkDruid</name>
    <description>Flink Druid Connection</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.9.0</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>

示例代码

@SpringBootApplication
public class FlinkDruidApp {
    private static String url = "http://localhost:8200/v1/post/wikipedia";
    private static RestTemplate template;
    private static HttpHeaders headers;
    FlinkDruidApp() {
        template = new RestTemplate();
        headers = new HttpHeaders();
        headers.setAccept(Arrays.asList(MediaType.APPLICATION_JSON));
        headers.setContentType(MediaType.APPLICATION_JSON);
    }
    public static void main(String[] args) throws Exception {
        SpringApplication.run(FlinkDruidApp.class, args);
        // Creating Flink Execution Environment
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        //Define data source
        DataSet<String> data = env.readTextFile("/wikiticker-2015-09-12-sampled.json");
        // Trasformation on the data
        data.map(x -> {
            return httpsPost(x).toString();
        }).print();
    }
    // http post method to post data in Druid
    private static ResponseEntity<String> httpsPost(String json) {
        HttpEntity<String> requestEntity =
                new HttpEntity<>(json, headers);
        ResponseEntity<String> response =
                template.exchange("http://localhost:8200/v1/post/wikipedia", HttpMethod.POST, requestEntity,
                        String.class);
        return response;
    }
    @Bean
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }
}

标签的开发工作繁琐,需要不断的开发并且优化,但是如何将做好的标签提供出去产生真正的价值呢?下一章,我们将介绍用户画像产品化。


六、用户画像产品化


在开发好用户标签以后,如何将标签应用到实际其实是一个很重要的问题。只有做好产品的设计才能让标签发挥真正的价值,本文将介绍用户画像的产品化过程。

1、标签展示

image.png

首先是标签展示功能,这个主要供业务人员和研发人员使用,是为了更直观的看见整个的用户标签体系。

不同的标签体系会有不同的层级,那么这个页面的设计就需要我们展示成树状的结构,方便以后的扩展。

在最后一个层级,比如自然性别,可以设计一个统计页面,在进入页面后,可以展示相应的数据统计情况,

可以更直观看见标签中值得比例,也可以为业务提供好的建议,另外可以对标签的具体描述进行展示,起到一个说明的作用,还可以展示标签按天的波动情况,观察标签的变化情况。

image.png

这一部分的数据来源呢?之前也提到过,这些标签的元数据信息都存在mysql中,方便我们查询。

所以树状图和标签描述信息需要去mysql中获取,而比例等图表数据则是从Hbase,Hive中查询获取的,当然也有直接通过ES获取的。但是每天的标签历史波动情况,还是要通过每天跑完标签后存在mysql中作为历史记录进行展示。

2、标签查询

这一功能可以提供给研发人员和业务人员使用。

标签查询功能其实就是对用户进行全局画像的过程,对于一个用户的全量标签信息,我们是需要对其进行展示的。

image.png

输入用户id后,可以查看该用户的属性信息、行为信息、风控属性等信息。从多方位了解一个具体的用户特征。

这些已经是标签的具体信息了,由于是对单一id的查找,从hive中获取会造成查询速度的问题,所以我们更建议从Hbase或者ES中查询获取,这样查询效率和实时性都能获得极大的提升。

3、标签管理

这一功能是提供给研发人员使用的。

对于标签,不能每一次新增一个标签都进行非常大改动,这样是非常耗费人力的,所以必须要有可以对标签进行管理的功能。

这里定义了标签的基本信息,开发方式,开发人员等等,在完成标签的开发以后,直接在此页面对标签进行录入,就可以完成标签的上线工作,让业务人员可以对标签进行使用。

image.png

新增和编辑标签的页面,可以提供下拉框或者输入框提供信息录入的功能。

image.png

之前已经提到过,这些标签的元数据信息都保存在了Mysql中,只要完成对其的新增和修改就可以了。

4、用户分群

作为用户画像最核心的功能,用户分群功能。是用户画像与业务系统建立联系的桥梁,也是用户画像的价值所在。

这项功能主要供业务人员使用。

此功能允许用户自定义的圈定一部分人员,圈定的规则就是对于标签的条件约束。

在圈定好人群以后,可以对这部分人群提供与业务系统的外呼系统,客服系统,广告系统,Push系统的交互,达到真正的精细化运营的目的。

image.png

对于标签规则的判断,需要将记录好的规则存储于Mysql中,在进行人群计算时又需要将规则解析成可计算的逻辑。不管是解析成Sql或者其他的查询语言都难度巨大,这对于研发是一个非常大的挑战。

image.png

在此功能中,还可以增加人群对比的功能,对不同人群的不同标签进行圈定,对比。这对于查询性能也是一个巨大的考验。

image.png

但是,用户分群功能作为用户画像的核心是我们必须要实现的。对于技术架构,Hbase更擅长与KV形式的查询,对于多维度查询性能较差,所以可以采取ES索引,在ES查询出Hbase的Rowkey,再去查询Hbase的方式。也有很多公司选择整体迁移到ES中完成此项工作。

相关文章
|
7月前
|
SQL 安全 数据挖掘
课7-隐语SCQL的架构详细拆解
SCQL是安全协作查询语言,针对多⽅隐私保护的数据分析。它在不泄露数据隐私的情况下,允许互不信任的参与⽅联合分析数据。SCQL采用半诚实安全模型,支持多⽅协作(N大于等于2方),并提供MySQL兼容的SQL接口。关键特性包括列级别授权(CCL)、多种密态协议支持和跨多种数据源接入。CCL是列控制列表,定义数据使用约束。SCQL架构包括SCDB(不参与计算)和SCQLEngine(部署在数据参与⽅),通过流程图和架构图展示其工作原理,适用于医疗研究、联合营销和保险理赔等场景。
|
Java 程序员
收藏!阿里毕玄16篇文章,深度讲解Java开发、系统设计、职业发展
阿里毕玄结合自己的经历深度讲解Java开发、系统设计、职业发展等问题,快来一键收藏吧。
34853 1
|
7月前
|
存储 分布式计算 分布式数据库
大数据技术原理与应用 期末复习 知识点全总结(林子雨版
大数据技术原理与应用 期末复习 知识点全总结(林子雨版
1099 1
|
7月前
|
消息中间件 资源调度 大数据
学了1年大数据,来测测你大数据技术掌握程度?大数据综合复习之面试题15问(思维导图+问答库)
学了1年大数据,来测测你大数据技术掌握程度?大数据综合复习之面试题15问(思维导图+问答库)
77 0
|
数据可视化 数据挖掘 大数据
大数据可视化理论与案例分析|青训营笔记
通过本篇文章,可以帮助读者对数据可视化的概念和原理有一个整体的认知,并且介绍了数据可视化中常见的可视化图表的种类和使用场景。
284 0
大数据可视化理论与案例分析|青训营笔记
|
数据采集 前端开发 大数据
创业公司如何做数据分析(一)开篇
本文将按照“WHY->WHAT->HOW”的思考方式来阐述下面三个问题:创业公司为什么需要做数据分析?创业公司做数据分析,需要做哪些事情?如何实现这些数据上的需求?从而基于“数据驱动”来做决策、运营与产品。
9071 0
|
机器学习/深度学习 SQL 数据采集
数据分析理论与实践 | 青训营笔记
埋点:埋点数据是指上报的记录着触发原因和状态信息的日志数据。按照上报方来看,可以划分为"服务端埋点”和"客户端埋点”,按照上报形式,可以划分为"代码埋点”、“可视化全埋点” 。
167 0
数据分析理论与实践 | 青训营笔记
|
存储 SQL 分布式计算
从理论到工程实践——用户画像入门宝典(一)
用户画像是大数据顶层应用中最重要的一环,搭建一套适合本公司体系的用户画像尤为重要。但是,用户画像的资料往往理论居多,实践少,更少有工程化的实战案例。 本文档结合了常见的用户画像架构,使用Elasticsearch作为底层存储支撑,用户画像的检索和可视化效率得到了大幅度的提升。文档从用户画像的理论到实践均有所涉及,大家可以参照此文档完成用户画像系统从0到1的搭建。
735 1
从理论到工程实践——用户画像入门宝典(一)
|
运维 Cloud Native 算法
Java性能优化学习1:理论基础学习与分析
性能:使用有限的资源在有限的时间内完成工作。 最主要的衡量因素就是时间,所以很多衡量指标,都可以把时间作为横轴。
|
数据采集 分布式计算 大数据
从面试官的角度谈谈大数据面试
作为一只老鸟,我的面试经验还算丰富,无论是作为面试者还是面试官。其实这篇对于面试者来说也是有意义的,毕竟知己知彼,百战不殆,知道对方会从哪些方面问问题,从哪些方面考核,才能更好地提前做好准备。 首先,我觉得面试官有责任保证面试过程是一次高效的交流。你要获取到你需要的信息,对面试者做全方位的考量;面试者也要获取到他需要的信息,面试官(若面试成功很大可能是自己的上级)的水平,公司技术要求水平,自己是否适合这家公司,公司是否需要自己。面试是一个双向选择的过程,面试官在选人,面试者在选公司。而面试者了解这家公司最直接的途径就是通过面试官。
258 0