大数据实战项目之电商数仓(二)

本文涉及的产品
云原生数据仓库AnalyticDB MySQL版,8核32GB 100GB 1个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
简介: 大数据实战项目之电商数仓(二)

大数据实战项目之电商数仓(一):https://developer.aliyun.com/article/1535211

flume数据采集通道搭建

flume第一层采集通道

设置flume的配置文件f1.conf

#a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#组名名.属性名=属性值
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1
#读取/tmp/logs/app-yyyy-mm-dd.log ^代表以xxx开头$代表以什么结尾 .代表匹配任意字符
#+代表匹配任意位置
a1.sources.r1.filegroups.f1=/tmp/logs/^app.+.log$
#JSON文件的保存位置
a1.sources.r1.positionFile=/opt/module/flume/test/log_position.json

#定义拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.dw.flume.MyInterceptor$Builder


#定义sink
a1.sinks.k1.type=logger

#定义chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000

#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

flume第一层通道的启动和关闭脚本f1

#!/bin/bash
if(($#!=1))
then
        echo 请输入start或stop!
        exit;
fi

cmd=cmd
if [ $1 = start ]
then
        cmd="nohup flume-ng agent -c $FLUME_HOME/conf/ -n a1 -f $FLUME_HOME/myagents/f1.conf -Dflume.root.logger=DEBUG,console > /home/atguigu/f1.log 2>&1 &"
elif [ $1 = stop ]
then
        cmd="ps -ef | grep f1.conf | grep -v grep | awk -F ' ' '{print \$2}' | xargs kill -9"
else
        echo 请输入start或stop!
fi


#在hadoop102和hadoop103开启采集
for i in hadoop102 hadoop103
do
        ssh $i $cmd
done
flume第二层采集通道

设置flume的配置文件f1.conf

#配置文件编写
a1.sources = r1 r2
a1.sinks = k1 k2
a1.channels = c1 c2

#配置source
a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_start
a1.sources.r1.kafka.consumer.auto.offset.reset=earliest
a1.sources.r1.kafka.consumer.group.id=CG_Start

a1.sources.r2.type=org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r2.kafka.topics=topic_event
a1.sources.r2.kafka.consumer.auto.offset.reset=earliest
a1.sources.r2.kafka.consumer.group.id=CG_Event
#配置channel
a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/opt/module/flume/c1/checkpoint
#启动备用checkpoint
a1.channels.c1.useDualCheckpoints=true
a1.channels.c1.backupCheckpointDir=/opt/module/flume/c1/backupcheckpoint
#event存储的目录
a1.channels.c1.dataDirs=/opt/module/flume/c1/datas


a1.channels.c2.type=file
a1.channels.c2.checkpointDir=/opt/module/flume/c2/checkpoint
a1.channels.c2.useDualCheckpoints=true
a1.channels.c2.backupCheckpointDir=/opt/module/flume/c2/backupcheckpoint
a1.channels.c2.dataDirs=/opt/module/flume/c2/datas


#sink
a1.sinks.k1.type = hdfs
#一旦路径中含有基于时间的转义序列,要求event的header中必须有timestamp=时间戳,如果没有需要将useLocalTimeStamp = true
a1.sinks.k1.hdfs.path = hdfs://hadoop102:9000/origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-

a1.sinks.k1.hdfs.batchSize = 1000

#文件的滚动
#60秒滚动生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 30
#设置每个文件到128M时滚动
a1.sinks.k1.hdfs.rollSize = 134217700
#禁用基于event数量的文件滚动策略
a1.sinks.k1.hdfs.rollCount = 0
#指定文件使用LZO压缩格式
a1.sinks.k1.hdfs.fileType = CompressedStream 
a1.sinks.k1.hdfs.codeC = lzop
#a1.sinks.k1.hdfs.round = true
#a1.sinks.k1.hdfs.roundValue = 10
#a1.sinks.k1.hdfs.roundUnit = second



a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = hdfs://hadoop102:9000/origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
a1.sinks.k2.hdfs.batchSize = 1000
a1.sinks.k2.hdfs.rollInterval = 30
a1.sinks.k2.hdfs.rollSize = 134217700
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.fileType = CompressedStream 
a1.sinks.k2.hdfs.codeC = lzop
#a1.sinks.k2.hdfs.round = true
#a1.sinks.k2.hdfs.roundValue = 10
#a1.sinks.k2.hdfs.roundUnit = second

#连接组件
a1.sources.r1.channels=c1
a1.sources.r2.channels=c2
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c2

flume写入hdfs采用lzo格式需要先向core-site.xml添加相关压缩格式的配置

<property>
    <name>io.compression.codecs</name>
    <value>
         com.hadoop.compression.lzo.LzoCodec,
         com.hadoop.compression.lzo.LzopCodec
     </value>
  </property>
  <property>
    <name>io.compression.codec.lzo.class</name>
    <value>com.hadoop.compression.lzo.LzoCodec</value>
 </property>

flume第二层通道的启动和关闭脚本f2

#!/bin/bash
if(($#!=1))
then
        echo 请输入start或stop!
        exit;
fi

if [ $1 = start ]
then
        ssh hadoop104 "nohup flume-ng agent -c $FLUME_HOME/conf/ -n a1 -f $FLUME_HOME/myagents/f2.conf -Dflume.root.logger=INFO,console > /home/atguigu/f2.log 2>&1 &"
elif [ $1 = stop ]
then
        ssh hadoop104 "ps -ef | grep f2.conf | grep -v grep | awk -F ' ' '{print \$2}' | xargs kill -9"
else
        echo 请输入start或stop!
fi

数据采集集群一键启动脚本

#!/bin/bash
if(($#!=1))
then
  echo 请输入start或stop!
  exits;
fi
#编写函数,这个函数的功能为返回集群中启动成功的broker的数量
function countKafkaBrokers()
{
  count=0
  for((i=102;i<=104;i++))
  do
    result=$(ssh hadoop$i "jps | grep Kafka | wc -l")
    count=$[$result+$count]
  done
  #函数可以定义返回值,如果定义,返回函数最后一行命令的状态(返回0,代表成功,非0,即为异常)
  return $count
}
#启动
if [ $1 = start ]
then
  zk start
  hd start
  kf start
  #保证kafka集群已经启动,才能启动f1,f2采集通道
  while [ 1 ]
  do
    countKafkaBrokers
    if(($?==3))
    then
      break
    fi
    sleep 2s
  done
  f1 start
  f2 start
  #查看已经启动进程
  xcall jps
elif [ $1 = stop ]
then
  f1 stop
  f2 stop
  kf stop
  #在kafka没有停止完成之前,不能停止zk集群
  while [ 1 ]
        do
                countKafkaBrokers
                if(($?==0))
                then
                        break
                fi
                sleep 2s
        done
  zk stop
  hd stop
  #查看还剩那些进程
  xcall jps
else
  echo echo 请输入start或stop!
fi

HDFS-HA配置

配置nameservice,编写hdfs-sitx.xml

vim hdfs-site.xml
• 1
<property>
  <name>dfs.nameservices</name>
  <value>mycluster</value>
</property>
<property>
  <name>dfs.ha.namenodes.mycluster</name>
  <value>nn1,nn2</value>
</property>
<property>
  <name>dfs.namenode.rpc-address.mycluster.nn1</name>
  <value>hadoop101:8020</value>
</property>
<property>
  <name>dfs.namenode.http-address.mycluster.nn1</name>
  <value>hadoop101:50070</value>
</property>
<property>
  <name>dfs.namenode.http-address.mycluster.nn2</name>
  <value>hadoop102:50070</value>
</property>

<property>
  <name>dfs.namenode.rpc-address.mycluster.nn2</name>
  <value>hadoop102:8020</value>
</property>
<property>
<property>
  <name>dfs.namenode.shared.edits.dir</name>
  <value>qjournal://hadoop101:8485;hadoop102:8485;hadoop103:8485/mycluster</value>
</property>
<property>
  <name>dfs.journalnode.edits.dir</name>
  <value>/opt/module/hadoop-2.7.2/data/data</value>
</property>
 <!-- 配置隔离机制,即同一时刻只能有一台服务器对外响应 -->
        <property>
                <name>dfs.ha.fencing.methods</name>
                <value>sshfence</value>
        </property>

<!-- 使用隔离机制时需要ssh无秘钥登录-->
        <property>
         <name>dfs.ha.fencing.ssh.private-key-files</name>
        <value>/home/atguigu/.ssh/id_rsa</value>
        </property>
<property>
  <name>dfs.client.failover.proxy.provider.mycluster</name>
  <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<property>
      <name>dfs.namenode.secondary.http-address</name>
      <value>hadoop103:50090</value>
</property>
<!--配置故障自动转义-->
 <property>
   <name>dfs.ha.automatic-failover.enabled</name>
   <value>true</value>

(2)编写core-site.xml

<!-- 指定HDFS中NameNode的地址 -->
<property>
        <name>fs.defaultFS</name>
        <value>hdfs://mycluster</value>
</property>
 <!--配置hadoop运行时临时文件-->
  <property>
  <name>hadoop.tmp.dir</name>
  <value>/opt/module/hadoop-2.7.2/data/tmp</value>
 </property>
<!-- 指定Hadoop运行时产生文件的存储目录 -->
<!--<property>
        <name>hadoop.tmp.dir</name>
        <value>/opt/module/hadoop-2.7.2/data/tmp</value>
</property>-->
<!--配置zookeeper地址-->
  <property>
   <name>ha.zookeeper.quorum</name>  <value>hadoop101:2181,hadoop102:2181,hadoop103:2181</value>
 </property>

启动journalnode

xcall hadoop-daemon.sh start journalnode
• 1

在nn1上对namenode进行格式化

hadoop namenode -format
hdfs namenode -bootstrapStandby

在nn2上对namenode信息进行拷贝

stop-all.sh
hdfs zkfc -formatZK
start-dfs.sh

ResouceManager-HA配置

(1)编写yarn-site.xml

<property>
                <name>yarn.nodemanager.aux-services</name>
                <value>mapreduce_shuffle</value>
</property>
<property>
  <name>yarn.resourcemanager.cluster-id</name>
  <value>cluster1</value>
</property>
<property>
  <name>yarn.resourcemanager.ha.rm-ids</name>
  <value>rm1,rm2</value>
</property>
<property>
  <name>yarn.resourcemanager.hostname.rm1</name>
  <value>hadoop101</value>
</property>
<property>
  <name>yarn.resourcemanager.hostname.rm2</name>
  <value>hadoop102</value>
</property>
<property>
  <name>yarn.resourcemanager.webapp.address.rm1</name>
  <value>hadoop101:8088</value>
</property>
<property>
  <name>yarn.resourcemanager.webapp.address.rm2</name>
  <value>hadoop102:8088</value>
</property>
<property>
  <name>yarn.resourcemanager.zk-address</name>
  <value>hadoop101:2181,hadoop102:2181,hadoop103:2181</value>
</property>
<!--启用自动恢复-->
    <property>
        <name>yarn.resourcemanager.recovery.enabled</name>
        <value>true</value>
</property>
<!--指定resourcemanager的状态信息存储在zookeeper集群-->
 <property>
     <name>yarn.resourcemanager.store.class</name>
   <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
</property>
<property>
  <name>yarn.resourcemanager.ha.enabled</name>
  <value>true</value>
</property>
<!-- 日志聚集功能使能 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>

<!-- 日志保留时间设置7天 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>
相关实践学习
数据库实验室挑战任务-初级任务
本场景介绍如何开通属于你的免费云数据库,在RDS-MySQL中完成对学生成绩的详情查询,执行指定类型SQL。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
2天前
|
机器学习/深度学习 分布式计算 DataWorks
MaxCompute产品使用问题之有什么命令可以看到当前账号拥有哪些项目的什么权限
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
2天前
|
存储 SQL 分布式计算
MaxCompute产品使用问题之如何查看项目空间耗用的存储大小
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
3天前
|
机器学习/深度学习 分布式计算 DataWorks
MaxCompute产品使用问题之一个项目只能绑定一个dataworks工作空间吗
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
23小时前
|
存储 数据采集 分布式计算
利用大数据技术优化电商返利系统的效率
利用大数据技术优化电商返利系统的效率
|
2天前
|
存储 数据采集 分布式计算
利用大数据技术优化电商返利系统的效率
利用大数据技术优化电商返利系统的效率
|
2天前
|
存储 分布式计算 大数据
MaxCompute产品使用问题之购买包年包月资源可以让同一个地域下的两个项目共用吗
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
14天前
|
分布式计算 数据可视化 大数据
基于spark的医疗大数据可视化大屏项目
基于spark的医疗大数据可视化大屏项目
|
15天前
|
Cloud Native 数据管理 OLAP
云原生数据仓库AnalyticDB产品使用合集之是否可以创建表而不使用分区
阿里云AnalyticDB提供了全面的数据导入、查询分析、数据管理、运维监控等功能,并通过扩展功能支持与AI平台集成、跨地域复制与联邦查询等高级应用场景,为企业构建实时、高效、可扩展的数据仓库解决方案。以下是对AnalyticDB产品使用合集的概述,包括数据导入、查询分析、数据管理、运维监控、扩展功能等方面。
337 2
云原生数据仓库AnalyticDB产品使用合集之是否可以创建表而不使用分区
|
15天前
|
SQL Cloud Native 关系型数据库
云原生数据仓库AnalyticDB产品使用合集之如何进行一键诊断
阿里云AnalyticDB提供了全面的数据导入、查询分析、数据管理、运维监控等功能,并通过扩展功能支持与AI平台集成、跨地域复制与联邦查询等高级应用场景,为企业构建实时、高效、可扩展的数据仓库解决方案。以下是对AnalyticDB产品使用合集的概述,包括数据导入、查询分析、数据管理、运维监控、扩展功能等方面。
350 7
|
15天前
|
存储 SQL Cloud Native
云原生数据仓库AnalyticDB产品使用合集之热数据存储空间在什么地方查看
阿里云AnalyticDB提供了全面的数据导入、查询分析、数据管理、运维监控等功能,并通过扩展功能支持与AI平台集成、跨地域复制与联邦查询等高级应用场景,为企业构建实时、高效、可扩展的数据仓库解决方案。以下是对AnalyticDB产品使用合集的概述,包括数据导入、查询分析、数据管理、运维监控、扩展功能等方面。