大数据实战项目之电商数仓(二)

简介: 大数据实战项目之电商数仓(二)

大数据实战项目之电商数仓(一):https://developer.aliyun.com/article/1535211

flume数据采集通道搭建

flume第一层采集通道

设置flume的配置文件f1.conf

#a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#组名名.属性名=属性值
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1
#读取/tmp/logs/app-yyyy-mm-dd.log ^代表以xxx开头$代表以什么结尾 .代表匹配任意字符
#+代表匹配任意位置
a1.sources.r1.filegroups.f1=/tmp/logs/^app.+.log$
#JSON文件的保存位置
a1.sources.r1.positionFile=/opt/module/flume/test/log_position.json

#定义拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.dw.flume.MyInterceptor$Builder


#定义sink
a1.sinks.k1.type=logger

#定义chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000

#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

flume第一层通道的启动和关闭脚本f1

#!/bin/bash
if(($#!=1))
then
        echo 请输入start或stop!
        exit;
fi

cmd=cmd
if [ $1 = start ]
then
        cmd="nohup flume-ng agent -c $FLUME_HOME/conf/ -n a1 -f $FLUME_HOME/myagents/f1.conf -Dflume.root.logger=DEBUG,console > /home/atguigu/f1.log 2>&1 &"
elif [ $1 = stop ]
then
        cmd="ps -ef | grep f1.conf | grep -v grep | awk -F ' ' '{print \$2}' | xargs kill -9"
else
        echo 请输入start或stop!
fi


#在hadoop102和hadoop103开启采集
for i in hadoop102 hadoop103
do
        ssh $i $cmd
done
flume第二层采集通道

设置flume的配置文件f1.conf

#配置文件编写
a1.sources = r1 r2
a1.sinks = k1 k2
a1.channels = c1 c2

#配置source
a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_start
a1.sources.r1.kafka.consumer.auto.offset.reset=earliest
a1.sources.r1.kafka.consumer.group.id=CG_Start

a1.sources.r2.type=org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r2.kafka.topics=topic_event
a1.sources.r2.kafka.consumer.auto.offset.reset=earliest
a1.sources.r2.kafka.consumer.group.id=CG_Event
#配置channel
a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/opt/module/flume/c1/checkpoint
#启动备用checkpoint
a1.channels.c1.useDualCheckpoints=true
a1.channels.c1.backupCheckpointDir=/opt/module/flume/c1/backupcheckpoint
#event存储的目录
a1.channels.c1.dataDirs=/opt/module/flume/c1/datas


a1.channels.c2.type=file
a1.channels.c2.checkpointDir=/opt/module/flume/c2/checkpoint
a1.channels.c2.useDualCheckpoints=true
a1.channels.c2.backupCheckpointDir=/opt/module/flume/c2/backupcheckpoint
a1.channels.c2.dataDirs=/opt/module/flume/c2/datas


#sink
a1.sinks.k1.type = hdfs
#一旦路径中含有基于时间的转义序列,要求event的header中必须有timestamp=时间戳,如果没有需要将useLocalTimeStamp = true
a1.sinks.k1.hdfs.path = hdfs://hadoop102:9000/origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-

a1.sinks.k1.hdfs.batchSize = 1000

#文件的滚动
#60秒滚动生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 30
#设置每个文件到128M时滚动
a1.sinks.k1.hdfs.rollSize = 134217700
#禁用基于event数量的文件滚动策略
a1.sinks.k1.hdfs.rollCount = 0
#指定文件使用LZO压缩格式
a1.sinks.k1.hdfs.fileType = CompressedStream 
a1.sinks.k1.hdfs.codeC = lzop
#a1.sinks.k1.hdfs.round = true
#a1.sinks.k1.hdfs.roundValue = 10
#a1.sinks.k1.hdfs.roundUnit = second



a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = hdfs://hadoop102:9000/origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
a1.sinks.k2.hdfs.batchSize = 1000
a1.sinks.k2.hdfs.rollInterval = 30
a1.sinks.k2.hdfs.rollSize = 134217700
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.fileType = CompressedStream 
a1.sinks.k2.hdfs.codeC = lzop
#a1.sinks.k2.hdfs.round = true
#a1.sinks.k2.hdfs.roundValue = 10
#a1.sinks.k2.hdfs.roundUnit = second

#连接组件
a1.sources.r1.channels=c1
a1.sources.r2.channels=c2
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c2

flume写入hdfs采用lzo格式需要先向core-site.xml添加相关压缩格式的配置

<property>
    <name>io.compression.codecs</name>
    <value>
         com.hadoop.compression.lzo.LzoCodec,
         com.hadoop.compression.lzo.LzopCodec
     </value>
  </property>
  <property>
    <name>io.compression.codec.lzo.class</name>
    <value>com.hadoop.compression.lzo.LzoCodec</value>
 </property>

flume第二层通道的启动和关闭脚本f2

#!/bin/bash
if(($#!=1))
then
        echo 请输入start或stop!
        exit;
fi

if [ $1 = start ]
then
        ssh hadoop104 "nohup flume-ng agent -c $FLUME_HOME/conf/ -n a1 -f $FLUME_HOME/myagents/f2.conf -Dflume.root.logger=INFO,console > /home/atguigu/f2.log 2>&1 &"
elif [ $1 = stop ]
then
        ssh hadoop104 "ps -ef | grep f2.conf | grep -v grep | awk -F ' ' '{print \$2}' | xargs kill -9"
else
        echo 请输入start或stop!
fi

数据采集集群一键启动脚本

#!/bin/bash
if(($#!=1))
then
  echo 请输入start或stop!
  exits;
fi
#编写函数,这个函数的功能为返回集群中启动成功的broker的数量
function countKafkaBrokers()
{
  count=0
  for((i=102;i<=104;i++))
  do
    result=$(ssh hadoop$i "jps | grep Kafka | wc -l")
    count=$[$result+$count]
  done
  #函数可以定义返回值,如果定义,返回函数最后一行命令的状态(返回0,代表成功,非0,即为异常)
  return $count
}
#启动
if [ $1 = start ]
then
  zk start
  hd start
  kf start
  #保证kafka集群已经启动,才能启动f1,f2采集通道
  while [ 1 ]
  do
    countKafkaBrokers
    if(($?==3))
    then
      break
    fi
    sleep 2s
  done
  f1 start
  f2 start
  #查看已经启动进程
  xcall jps
elif [ $1 = stop ]
then
  f1 stop
  f2 stop
  kf stop
  #在kafka没有停止完成之前,不能停止zk集群
  while [ 1 ]
        do
                countKafkaBrokers
                if(($?==0))
                then
                        break
                fi
                sleep 2s
        done
  zk stop
  hd stop
  #查看还剩那些进程
  xcall jps
else
  echo echo 请输入start或stop!
fi

HDFS-HA配置

配置nameservice,编写hdfs-sitx.xml

vim hdfs-site.xml
• 1
<property>
  <name>dfs.nameservices</name>
  <value>mycluster</value>
</property>
<property>
  <name>dfs.ha.namenodes.mycluster</name>
  <value>nn1,nn2</value>
</property>
<property>
  <name>dfs.namenode.rpc-address.mycluster.nn1</name>
  <value>hadoop101:8020</value>
</property>
<property>
  <name>dfs.namenode.http-address.mycluster.nn1</name>
  <value>hadoop101:50070</value>
</property>
<property>
  <name>dfs.namenode.http-address.mycluster.nn2</name>
  <value>hadoop102:50070</value>
</property>

<property>
  <name>dfs.namenode.rpc-address.mycluster.nn2</name>
  <value>hadoop102:8020</value>
</property>
<property>
<property>
  <name>dfs.namenode.shared.edits.dir</name>
  <value>qjournal://hadoop101:8485;hadoop102:8485;hadoop103:8485/mycluster</value>
</property>
<property>
  <name>dfs.journalnode.edits.dir</name>
  <value>/opt/module/hadoop-2.7.2/data/data</value>
</property>
 <!-- 配置隔离机制,即同一时刻只能有一台服务器对外响应 -->
        <property>
                <name>dfs.ha.fencing.methods</name>
                <value>sshfence</value>
        </property>

<!-- 使用隔离机制时需要ssh无秘钥登录-->
        <property>
         <name>dfs.ha.fencing.ssh.private-key-files</name>
        <value>/home/atguigu/.ssh/id_rsa</value>
        </property>
<property>
  <name>dfs.client.failover.proxy.provider.mycluster</name>
  <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<property>
      <name>dfs.namenode.secondary.http-address</name>
      <value>hadoop103:50090</value>
</property>
<!--配置故障自动转义-->
 <property>
   <name>dfs.ha.automatic-failover.enabled</name>
   <value>true</value>

(2)编写core-site.xml

<!-- 指定HDFS中NameNode的地址 -->
<property>
        <name>fs.defaultFS</name>
        <value>hdfs://mycluster</value>
</property>
 <!--配置hadoop运行时临时文件-->
  <property>
  <name>hadoop.tmp.dir</name>
  <value>/opt/module/hadoop-2.7.2/data/tmp</value>
 </property>
<!-- 指定Hadoop运行时产生文件的存储目录 -->
<!--<property>
        <name>hadoop.tmp.dir</name>
        <value>/opt/module/hadoop-2.7.2/data/tmp</value>
</property>-->
<!--配置zookeeper地址-->
  <property>
   <name>ha.zookeeper.quorum</name>  <value>hadoop101:2181,hadoop102:2181,hadoop103:2181</value>
 </property>

启动journalnode

xcall hadoop-daemon.sh start journalnode
• 1

在nn1上对namenode进行格式化

hadoop namenode -format
hdfs namenode -bootstrapStandby

在nn2上对namenode信息进行拷贝

stop-all.sh
hdfs zkfc -formatZK
start-dfs.sh

ResouceManager-HA配置

(1)编写yarn-site.xml

<property>
                <name>yarn.nodemanager.aux-services</name>
                <value>mapreduce_shuffle</value>
</property>
<property>
  <name>yarn.resourcemanager.cluster-id</name>
  <value>cluster1</value>
</property>
<property>
  <name>yarn.resourcemanager.ha.rm-ids</name>
  <value>rm1,rm2</value>
</property>
<property>
  <name>yarn.resourcemanager.hostname.rm1</name>
  <value>hadoop101</value>
</property>
<property>
  <name>yarn.resourcemanager.hostname.rm2</name>
  <value>hadoop102</value>
</property>
<property>
  <name>yarn.resourcemanager.webapp.address.rm1</name>
  <value>hadoop101:8088</value>
</property>
<property>
  <name>yarn.resourcemanager.webapp.address.rm2</name>
  <value>hadoop102:8088</value>
</property>
<property>
  <name>yarn.resourcemanager.zk-address</name>
  <value>hadoop101:2181,hadoop102:2181,hadoop103:2181</value>
</property>
<!--启用自动恢复-->
    <property>
        <name>yarn.resourcemanager.recovery.enabled</name>
        <value>true</value>
</property>
<!--指定resourcemanager的状态信息存储在zookeeper集群-->
 <property>
     <name>yarn.resourcemanager.store.class</name>
   <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
</property>
<property>
  <name>yarn.resourcemanager.ha.enabled</name>
  <value>true</value>
</property>
<!-- 日志聚集功能使能 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>

<!-- 日志保留时间设置7天 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>
相关实践学习
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
4月前
|
存储 分布式计算 大数据
基于Python大数据的的电商用户行为分析系统
本系统基于Django、Scrapy与Hadoop技术,构建电商用户行为分析平台。通过爬取与处理海量用户数据,实现行为追踪、偏好分析与个性化推荐,助力企业提升营销精准度与用户体验,推动电商智能化发展。
|
4月前
|
机器学习/深度学习 人工智能 供应链
别再靠拍脑袋进货了!用大数据让电商库存“自己会算”
别再靠拍脑袋进货了!用大数据让电商库存“自己会算”
323 10
|
5月前
|
SQL 缓存 分布式计算
【跨国数仓迁移最佳实践5】MaxCompute近线查询解决方案助力物流电商等实时场景实现高效查询
本系列文章将围绕东南亚头部科技集团的真实迁移历程展开,逐步拆解 BigQuery 迁移至 MaxCompute 过程中的关键挑战与技术创新。本篇为第5篇,解析跨国数仓迁移背后的性能优化技术。 注:客户背景为东南亚头部科技集团,文中用 GoTerra 表示。
278 8
|
7月前
|
存储 SQL 监控
数据中台架构解析:湖仓一体的实战设计
在数据量激增的数字化时代,企业面临数据分散、使用效率低等问题。数据中台作为统一管理与应用数据的核心平台,结合湖仓一体架构,打通数据壁垒,实现高效流转与分析。本文详解湖仓一体的设计与落地实践,助力企业构建统一、灵活的数据底座,驱动业务决策与创新。
|
11月前
|
分布式计算 运维 监控
Dataphin离线数仓搭建深度测评:数据工程师的实战视角
作为一名金融行业数据工程师,我参与了阿里云Dataphin智能研发版的评测。通过《离线数仓搭建》实践,体验了其在数据治理中的核心能力。Dataphin在环境搭建、管道开发和任务管理上显著提效,如测试环境搭建从3天缩短至2小时,复杂表映射效率提升50%。产品支持全链路治理、智能提效和架构兼容,帮助企业降低40%建设成本,缩短60%需求响应周期。建议加强行业模板库和移动适配功能,进一步提升使用体验。
|
6月前
|
机器学习/深度学习 搜索推荐 数据可视化
Java 大视界 -- Java 大数据机器学习模型在电商用户流失预测与留存策略制定中的应用(217)
本文探讨 Java 大数据与机器学习在电商用户流失预测与留存策略中的应用。通过构建高精度预测模型与动态分层策略,助力企业提前识别流失用户、精准触达,实现用户留存率与商业价值双提升,为电商应对用户流失提供技术新思路。
|
7月前
|
存储 SQL 分布式计算
MaxCompute x 聚水潭:基于近实时数仓解决方案构建统一增全量一体化数据链路
聚水潭作为中国领先的电商SaaS ERP服务商,致力于为88,400+客户提供全链路数字化解决方案。其核心ERP产品助力企业实现数据驱动的智能决策。为应对业务扩展带来的数据处理挑战,聚水潭采用MaxCompute近实时数仓Delta Table方案,有效提升数据新鲜度和计算效率,提效比例超200%,资源消耗显著降低。未来,聚水潭将进一步优化数据链路,结合MaxQA实现实时分析,赋能商家快速响应市场变化。
343 0
|
10月前
|
存储 消息中间件 Java
抖音集团电商流量实时数仓建设实践
本文基于抖音集团电商数据工程师姚遥在Flink Forward Asia 2024的分享,围绕电商流量数据处理展开。内容涵盖业务挑战、电商流量建模架构、流批一体实践、大流量任务调优及总结展望五个部分。通过数据建模与优化,实现效率、质量、成本和稳定性全面提升,数据质量达99%以上,任务性能提升70%。未来将聚焦自动化、低代码化与成本优化,探索更高效的流批一体化方案。
649 12
抖音集团电商流量实时数仓建设实践
|
12月前
|
SQL 缓存 数据处理
数据无界、湖仓无界,Apache Doris 湖仓一体典型场景实战指南(下篇)
Apache Doris 提出“数据无界”和“湖仓无界”理念,提供高效的数据管理方案。本文聚焦三个典型应用场景:湖仓分析加速、多源联邦分析、湖仓数据处理,深入介绍 Apache Doris 的最佳实践,帮助企业快速响应业务需求,提升数据处理和分析效率
746 3
数据无界、湖仓无界,Apache Doris 湖仓一体典型场景实战指南(下篇)