" 某公司的订单系统使用MySQL存储业务数据和进行简单的查询分析,同时由于MySQL的存储容量和处理速度都存在一定的限制,不适合处理大规模的数据量。为了更好的数据分析和处理,该公司将MySQL的数据同步到Hive中,便于利用Hadoop生态系统中的各种工具来实现离线处理,提高数据分析和处理的效率和速度。其中的order为存储订单的表,products为商品表。
" 该公司需要进行一次促销活动,实时计算和离线计算都在这个过程中发挥重要作用:
实时计算方面,可以通过实时监控用户活动和购买行为,以及实时推荐和搭配商品等方式,提高用户体验和促进消费。例如,当用户浏览某个商品时,系统可以实时分析用户数据,为用户推荐相关的热门商品和促销商品,以提高用户的购买意愿。同时,系统还可以实时计算库存、销售额等数据,以便及时调整库存,保证商品的及时配送。
离线计算方面,可以通过对历史数据的分析和挖掘,进行商品定价、销售策略等方面的决策支持。例如,根据历次促销活动的销售数据、用户行为等信息,系统可以通过离线计算,预测今年的销售量和热门商品,以及优化商品定价和促销策略,以提高销售业绩。
下面以计算促销活动期间GMV为例,介绍Dataphin如何基于Flink流批一体的任务开发流程,其中流任务处理的数据是当天的实时数据,批任务处理的数据为前一天的业务数据。运营人员可以通过分析实时GMV进行快速的活动方案的调整,通过分析对比流批任务计算结果的一致性确保历史成交额数据的准确性。
以下为数据流向的示意图:
MySQL订单系统中的业务数据通过Dataphin离线集成到Hive中,同时通过Flink cdc 任务同步到kafka中便于进行实时加工处理,同时将数据处理和数据存储解耦合,dataphin实时研发流批一体支持一套代码两种计算,Flink流任务负责计算活动当天的实时GMV,而Flink批任务则负责计算前一天的活动GMV,最后将GMV写入到一个MySQL数据库中便于后续的查询分析。
具体的实现思路:
- 命令行登录MySQL并建表:
CREATE TABLE products ( id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, name VARCHAR(255) NOT NULL, description VARCHAR(512) ); ALTER TABLE products AUTO_INCREMENT = 101; INSERT INTO products VALUES (default,"scooter","Small 2-wheel scooter"), (default,"car battery","12V car battery"), (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"), (default,"hammer","12oz carpenter's hammer"), (default,"hammer","14oz carpenter's hammer"), (default,"hammer","16oz carpenter's hammer"), (default,"rocks","box of assorted rocks"), (default,"jacket","water resistent black wind breaker"), (default,"spare tire","24 inch spare tire"); CREATE TABLE orders ( order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY, order_date DATETIME NOT NULL, customer_name VARCHAR(255) NOT NULL, price DECIMAL(10, 5) NOT NULL, product_id INTEGER NOT NULL, order_status BOOLEAN NOT NULL -- Whether order has been placed ) AUTO_INCREMENT = 10001; INSERT INTO orders VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false), (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false), (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false); CREATE TABLE sink ( order_id INT PRIMARY KEY, order_date TIMESTAMP(0), customer_name VARCHAR(2048), `price` VARCHAR(2048), product_id INT, order_status BOOLEAN, name VARCHAR(2048), description VARCHAR(2048), );
dataphin上创建Kafka和MySQL数据源:
Kafka:
MySQL:
dataphin上将MySQL数据集成同步到Hive并一键生成目标表
dataphin上创建元表:
创建MySQL元表: orders_mysql 和 products_mysql,connector选择CDC Source表
创建MySQL元表: mysql_sink,connector选择JDBC读写表
创建Kafka元表:orders_kafka 和 products_kakfa,connector选择upsert-kafka,消息格式选择json
这里需要保证orders_mysql,orders_kafka 与hive表中对应字段的字段类型保持一致
提交发布元表
创建Hive元表:orders_hive 和 products_hive
在V3.9版本中,dataphin实时研发已经支持flink sql访问Hive计算引擎中的物理表,但这里为了安全性、可复用性及灵活性,我们仍采用创建元表的方式。
在dataphin上创建实时计算任务Flink SQL:
orders_to_kafka 和 products_to_kafka,将MySQL中的数据实时同步到kafka中
提交发布计算任务
在dataphin上创建镜像表:mirror_orders 和 mirror_products
mirror_orders:实时表选择 orders_kafka ,离线表选择 orders_hive
mirror_products:实时表选择 products_kakfa,离线表选择 products_hive
提交发布镜像表
在dataphin上创建实时计算任务Flink SQL:
创建计算任务sink,预编译通过并提交
SET mirror_orders.`properties.group.id`='12'; SET mirror_products.`properties.group.id`='12'; -- cast(price as STRING) as INSERT into mysql_sink_yuan select cast(order_id as int) as order_id, cast(order_date as TIMESTAMP) as order_date, customer_name, cast(price as STRING) as price, cast(product_id as int) as product_id, cast(order_status as TINYINT)as order_status, name, description from mirror_orders AS o LEFT JOIN mirror_products AS p ON o.order_id = p.id;
开启离线模式,并配置好上下游依赖:
- 提交发布计算任务,会自动提交流和批两个任务,可以到运维页面的周期任务和实时任务中查看:
实时任务,点击启动实时实例,任务能够正常运行
周期任务,在生产环境对批任务补数据,补数据任务能成功运行,也可以查看历史周期实例的运行情况
Dataphin 智能数据建设与治理:用中台方法论治理企业级好数据
针对各行业大数据建设、治理及应用诉求,结合数据中台方法论,一站式提供全域数据集成、可视建模及规范定义、数据资产治理等能力,助力企业打造标准统一、准确可信、便捷可消费的数据体系。
Dataphin介绍:https://www.lydaas.com/dataphin
Dataphin咨询钉钉群:23381533