大数据实战项目之电商数仓(二)

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生数据仓库AnalyticDB MySQL版,8核32GB 100GB 1个月
简介: 大数据实战项目之电商数仓(二)

大数据实战项目之电商数仓(一):https://developer.aliyun.com/article/1535211

flume数据采集通道搭建

flume第一层采集通道

设置flume的配置文件f1.conf

#a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#组名名.属性名=属性值
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1
#读取/tmp/logs/app-yyyy-mm-dd.log ^代表以xxx开头$代表以什么结尾 .代表匹配任意字符
#+代表匹配任意位置
a1.sources.r1.filegroups.f1=/tmp/logs/^app.+.log$
#JSON文件的保存位置
a1.sources.r1.positionFile=/opt/module/flume/test/log_position.json

#定义拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.dw.flume.MyInterceptor$Builder


#定义sink
a1.sinks.k1.type=logger

#定义chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000

#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

flume第一层通道的启动和关闭脚本f1

#!/bin/bash
if(($#!=1))
then
        echo 请输入start或stop!
        exit;
fi

cmd=cmd
if [ $1 = start ]
then
        cmd="nohup flume-ng agent -c $FLUME_HOME/conf/ -n a1 -f $FLUME_HOME/myagents/f1.conf -Dflume.root.logger=DEBUG,console > /home/atguigu/f1.log 2>&1 &"
elif [ $1 = stop ]
then
        cmd="ps -ef | grep f1.conf | grep -v grep | awk -F ' ' '{print \$2}' | xargs kill -9"
else
        echo 请输入start或stop!
fi


#在hadoop102和hadoop103开启采集
for i in hadoop102 hadoop103
do
        ssh $i $cmd
done
flume第二层采集通道

设置flume的配置文件f1.conf

#配置文件编写
a1.sources = r1 r2
a1.sinks = k1 k2
a1.channels = c1 c2

#配置source
a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_start
a1.sources.r1.kafka.consumer.auto.offset.reset=earliest
a1.sources.r1.kafka.consumer.group.id=CG_Start

a1.sources.r2.type=org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r2.kafka.topics=topic_event
a1.sources.r2.kafka.consumer.auto.offset.reset=earliest
a1.sources.r2.kafka.consumer.group.id=CG_Event
#配置channel
a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/opt/module/flume/c1/checkpoint
#启动备用checkpoint
a1.channels.c1.useDualCheckpoints=true
a1.channels.c1.backupCheckpointDir=/opt/module/flume/c1/backupcheckpoint
#event存储的目录
a1.channels.c1.dataDirs=/opt/module/flume/c1/datas


a1.channels.c2.type=file
a1.channels.c2.checkpointDir=/opt/module/flume/c2/checkpoint
a1.channels.c2.useDualCheckpoints=true
a1.channels.c2.backupCheckpointDir=/opt/module/flume/c2/backupcheckpoint
a1.channels.c2.dataDirs=/opt/module/flume/c2/datas


#sink
a1.sinks.k1.type = hdfs
#一旦路径中含有基于时间的转义序列,要求event的header中必须有timestamp=时间戳,如果没有需要将useLocalTimeStamp = true
a1.sinks.k1.hdfs.path = hdfs://hadoop102:9000/origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-

a1.sinks.k1.hdfs.batchSize = 1000

#文件的滚动
#60秒滚动生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 30
#设置每个文件到128M时滚动
a1.sinks.k1.hdfs.rollSize = 134217700
#禁用基于event数量的文件滚动策略
a1.sinks.k1.hdfs.rollCount = 0
#指定文件使用LZO压缩格式
a1.sinks.k1.hdfs.fileType = CompressedStream 
a1.sinks.k1.hdfs.codeC = lzop
#a1.sinks.k1.hdfs.round = true
#a1.sinks.k1.hdfs.roundValue = 10
#a1.sinks.k1.hdfs.roundUnit = second



a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = hdfs://hadoop102:9000/origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
a1.sinks.k2.hdfs.batchSize = 1000
a1.sinks.k2.hdfs.rollInterval = 30
a1.sinks.k2.hdfs.rollSize = 134217700
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.fileType = CompressedStream 
a1.sinks.k2.hdfs.codeC = lzop
#a1.sinks.k2.hdfs.round = true
#a1.sinks.k2.hdfs.roundValue = 10
#a1.sinks.k2.hdfs.roundUnit = second

#连接组件
a1.sources.r1.channels=c1
a1.sources.r2.channels=c2
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c2

flume写入hdfs采用lzo格式需要先向core-site.xml添加相关压缩格式的配置

<property>
    <name>io.compression.codecs</name>
    <value>
         com.hadoop.compression.lzo.LzoCodec,
         com.hadoop.compression.lzo.LzopCodec
     </value>
  </property>
  <property>
    <name>io.compression.codec.lzo.class</name>
    <value>com.hadoop.compression.lzo.LzoCodec</value>
 </property>

flume第二层通道的启动和关闭脚本f2

#!/bin/bash
if(($#!=1))
then
        echo 请输入start或stop!
        exit;
fi

if [ $1 = start ]
then
        ssh hadoop104 "nohup flume-ng agent -c $FLUME_HOME/conf/ -n a1 -f $FLUME_HOME/myagents/f2.conf -Dflume.root.logger=INFO,console > /home/atguigu/f2.log 2>&1 &"
elif [ $1 = stop ]
then
        ssh hadoop104 "ps -ef | grep f2.conf | grep -v grep | awk -F ' ' '{print \$2}' | xargs kill -9"
else
        echo 请输入start或stop!
fi

数据采集集群一键启动脚本

#!/bin/bash
if(($#!=1))
then
  echo 请输入start或stop!
  exits;
fi
#编写函数,这个函数的功能为返回集群中启动成功的broker的数量
function countKafkaBrokers()
{
  count=0
  for((i=102;i<=104;i++))
  do
    result=$(ssh hadoop$i "jps | grep Kafka | wc -l")
    count=$[$result+$count]
  done
  #函数可以定义返回值,如果定义,返回函数最后一行命令的状态(返回0,代表成功,非0,即为异常)
  return $count
}
#启动
if [ $1 = start ]
then
  zk start
  hd start
  kf start
  #保证kafka集群已经启动,才能启动f1,f2采集通道
  while [ 1 ]
  do
    countKafkaBrokers
    if(($?==3))
    then
      break
    fi
    sleep 2s
  done
  f1 start
  f2 start
  #查看已经启动进程
  xcall jps
elif [ $1 = stop ]
then
  f1 stop
  f2 stop
  kf stop
  #在kafka没有停止完成之前,不能停止zk集群
  while [ 1 ]
        do
                countKafkaBrokers
                if(($?==0))
                then
                        break
                fi
                sleep 2s
        done
  zk stop
  hd stop
  #查看还剩那些进程
  xcall jps
else
  echo echo 请输入start或stop!
fi

HDFS-HA配置

配置nameservice,编写hdfs-sitx.xml

vim hdfs-site.xml
• 1
<property>
  <name>dfs.nameservices</name>
  <value>mycluster</value>
</property>
<property>
  <name>dfs.ha.namenodes.mycluster</name>
  <value>nn1,nn2</value>
</property>
<property>
  <name>dfs.namenode.rpc-address.mycluster.nn1</name>
  <value>hadoop101:8020</value>
</property>
<property>
  <name>dfs.namenode.http-address.mycluster.nn1</name>
  <value>hadoop101:50070</value>
</property>
<property>
  <name>dfs.namenode.http-address.mycluster.nn2</name>
  <value>hadoop102:50070</value>
</property>

<property>
  <name>dfs.namenode.rpc-address.mycluster.nn2</name>
  <value>hadoop102:8020</value>
</property>
<property>
<property>
  <name>dfs.namenode.shared.edits.dir</name>
  <value>qjournal://hadoop101:8485;hadoop102:8485;hadoop103:8485/mycluster</value>
</property>
<property>
  <name>dfs.journalnode.edits.dir</name>
  <value>/opt/module/hadoop-2.7.2/data/data</value>
</property>
 <!-- 配置隔离机制,即同一时刻只能有一台服务器对外响应 -->
        <property>
                <name>dfs.ha.fencing.methods</name>
                <value>sshfence</value>
        </property>

<!-- 使用隔离机制时需要ssh无秘钥登录-->
        <property>
         <name>dfs.ha.fencing.ssh.private-key-files</name>
        <value>/home/atguigu/.ssh/id_rsa</value>
        </property>
<property>
  <name>dfs.client.failover.proxy.provider.mycluster</name>
  <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<property>
      <name>dfs.namenode.secondary.http-address</name>
      <value>hadoop103:50090</value>
</property>
<!--配置故障自动转义-->
 <property>
   <name>dfs.ha.automatic-failover.enabled</name>
   <value>true</value>

(2)编写core-site.xml

<!-- 指定HDFS中NameNode的地址 -->
<property>
        <name>fs.defaultFS</name>
        <value>hdfs://mycluster</value>
</property>
 <!--配置hadoop运行时临时文件-->
  <property>
  <name>hadoop.tmp.dir</name>
  <value>/opt/module/hadoop-2.7.2/data/tmp</value>
 </property>
<!-- 指定Hadoop运行时产生文件的存储目录 -->
<!--<property>
        <name>hadoop.tmp.dir</name>
        <value>/opt/module/hadoop-2.7.2/data/tmp</value>
</property>-->
<!--配置zookeeper地址-->
  <property>
   <name>ha.zookeeper.quorum</name>  <value>hadoop101:2181,hadoop102:2181,hadoop103:2181</value>
 </property>

启动journalnode

xcall hadoop-daemon.sh start journalnode
• 1

在nn1上对namenode进行格式化

hadoop namenode -format
hdfs namenode -bootstrapStandby

在nn2上对namenode信息进行拷贝

stop-all.sh
hdfs zkfc -formatZK
start-dfs.sh

ResouceManager-HA配置

(1)编写yarn-site.xml

<property>
                <name>yarn.nodemanager.aux-services</name>
                <value>mapreduce_shuffle</value>
</property>
<property>
  <name>yarn.resourcemanager.cluster-id</name>
  <value>cluster1</value>
</property>
<property>
  <name>yarn.resourcemanager.ha.rm-ids</name>
  <value>rm1,rm2</value>
</property>
<property>
  <name>yarn.resourcemanager.hostname.rm1</name>
  <value>hadoop101</value>
</property>
<property>
  <name>yarn.resourcemanager.hostname.rm2</name>
  <value>hadoop102</value>
</property>
<property>
  <name>yarn.resourcemanager.webapp.address.rm1</name>
  <value>hadoop101:8088</value>
</property>
<property>
  <name>yarn.resourcemanager.webapp.address.rm2</name>
  <value>hadoop102:8088</value>
</property>
<property>
  <name>yarn.resourcemanager.zk-address</name>
  <value>hadoop101:2181,hadoop102:2181,hadoop103:2181</value>
</property>
<!--启用自动恢复-->
    <property>
        <name>yarn.resourcemanager.recovery.enabled</name>
        <value>true</value>
</property>
<!--指定resourcemanager的状态信息存储在zookeeper集群-->
 <property>
     <name>yarn.resourcemanager.store.class</name>
   <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
</property>
<property>
  <name>yarn.resourcemanager.ha.enabled</name>
  <value>true</value>
</property>
<!-- 日志聚集功能使能 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>

<!-- 日志保留时间设置7天 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>
相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
5天前
|
数据采集 运维 Cloud Native
Flink+Paimon在阿里云大数据云原生运维数仓的实践
构建实时云原生运维数仓以提升大数据集群的运维能力,采用 Flink+Paimon 方案,解决资源审计、拓扑及趋势分析需求。
352 0
Flink+Paimon在阿里云大数据云原生运维数仓的实践
|
21天前
|
机器学习/深度学习 分布式计算 DataWorks
MaxCompute产品使用问题之有什么命令可以看到当前账号拥有哪些项目的什么权限
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
21天前
|
存储 SQL 分布式计算
MaxCompute产品使用问题之如何查看项目空间耗用的存储大小
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
21天前
|
机器学习/深度学习 分布式计算 DataWorks
MaxCompute产品使用问题之一个项目只能绑定一个dataworks工作空间吗
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
14天前
|
分布式计算 DataWorks 安全
DataWorks产品使用合集之两个odps数据源绑定了同一个项目, 如何看另外一个数据源的同步数据
DataWorks作为一站式的数据开发与治理平台,提供了从数据采集、清洗、开发、调度、服务化、质量监控到安全管理的全套解决方案,帮助企业构建高效、规范、安全的大数据处理体系。以下是对DataWorks产品使用合集的概述,涵盖数据处理的各个环节。
12 0
|
17天前
|
SQL 关系型数据库 MySQL
详尽分享音乐数据中心数仓综合项目
详尽分享音乐数据中心数仓综合项目
18 0
|
21天前
|
存储 分布式计算 大数据
MaxCompute产品使用问题之购买包年包月资源可以让同一个地域下的两个项目共用吗
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
1月前
|
分布式计算 数据可视化 大数据
基于spark的医疗大数据可视化大屏项目
基于spark的医疗大数据可视化大屏项目
|
6天前
|
数据采集 自然语言处理 大数据
​「Python大数据」词频数据渲染词云图导出HTML
使用Python,本文展示数据聚类和办公自动化,焦点在于通过jieba分词处理VOC数据,构建词云图并以HTML保存。`wordCloud.py`脚本中,借助pyecharts生成词云,如图所示,关键词如&quot;Python&quot;、&quot;词云&quot;等。示例代码创建了词云图实例,添加词频数据,并输出到&quot;wordCloud.html&quot;。
18 1
​「Python大数据」词频数据渲染词云图导出HTML
|
16天前
|
存储 人工智能 OLAP
深度|大模型时代下,基于湖仓一体的数据智能新范式
本次文根据峰会演讲内容整理:分享在大模型时代基于湖仓一体的数据产品演进,以及我们观察到的一些智能开发相关的新范式。

热门文章

最新文章