大数据实战项目之电商数仓(二)

本文涉及的产品
云原生大数据计算服务 MaxCompute,5000CU*H 100GB 3个月
云原生大数据计算服务MaxCompute,500CU*H 100GB 3个月
云原生数据仓库AnalyticDB MySQL版,基础版 8ACU 100GB 1个月
简介: 大数据实战项目之电商数仓(二)

大数据实战项目之电商数仓(一):https://developer.aliyun.com/article/1535211

flume数据采集通道搭建

flume第一层采集通道

设置flume的配置文件f1.conf

#a1是agent的名称,a1中定义了一个叫r1的source,如果有多个,使用空格间隔
a1.sources = r1
a1.sinks = k1
a1.channels = c1
#组名名.属性名=属性值
a1.sources.r1.type=TAILDIR
a1.sources.r1.filegroups=f1
#读取/tmp/logs/app-yyyy-mm-dd.log ^代表以xxx开头$代表以什么结尾 .代表匹配任意字符
#+代表匹配任意位置
a1.sources.r1.filegroups.f1=/tmp/logs/^app.+.log$
#JSON文件的保存位置
a1.sources.r1.positionFile=/opt/module/flume/test/log_position.json

#定义拦截器
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.atguigu.dw.flume.MyInterceptor$Builder


#定义sink
a1.sinks.k1.type=logger

#定义chanel
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000

#连接组件 同一个source可以对接多个channel,一个sink只能从一个channel拿数据!
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1

flume第一层通道的启动和关闭脚本f1

#!/bin/bash
if(($#!=1))
then
        echo 请输入start或stop!
        exit;
fi

cmd=cmd
if [ $1 = start ]
then
        cmd="nohup flume-ng agent -c $FLUME_HOME/conf/ -n a1 -f $FLUME_HOME/myagents/f1.conf -Dflume.root.logger=DEBUG,console > /home/atguigu/f1.log 2>&1 &"
elif [ $1 = stop ]
then
        cmd="ps -ef | grep f1.conf | grep -v grep | awk -F ' ' '{print \$2}' | xargs kill -9"
else
        echo 请输入start或stop!
fi


#在hadoop102和hadoop103开启采集
for i in hadoop102 hadoop103
do
        ssh $i $cmd
done
flume第二层采集通道

设置flume的配置文件f1.conf

#配置文件编写
a1.sources = r1 r2
a1.sinks = k1 k2
a1.channels = c1 c2

#配置source
a1.sources.r1.type=org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r1.kafka.topics=topic_start
a1.sources.r1.kafka.consumer.auto.offset.reset=earliest
a1.sources.r1.kafka.consumer.group.id=CG_Start

a1.sources.r2.type=org.apache.flume.source.kafka.KafkaSource
a1.sources.r2.kafka.bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
a1.sources.r2.kafka.topics=topic_event
a1.sources.r2.kafka.consumer.auto.offset.reset=earliest
a1.sources.r2.kafka.consumer.group.id=CG_Event
#配置channel
a1.channels.c1.type=file
a1.channels.c1.checkpointDir=/opt/module/flume/c1/checkpoint
#启动备用checkpoint
a1.channels.c1.useDualCheckpoints=true
a1.channels.c1.backupCheckpointDir=/opt/module/flume/c1/backupcheckpoint
#event存储的目录
a1.channels.c1.dataDirs=/opt/module/flume/c1/datas


a1.channels.c2.type=file
a1.channels.c2.checkpointDir=/opt/module/flume/c2/checkpoint
a1.channels.c2.useDualCheckpoints=true
a1.channels.c2.backupCheckpointDir=/opt/module/flume/c2/backupcheckpoint
a1.channels.c2.dataDirs=/opt/module/flume/c2/datas


#sink
a1.sinks.k1.type = hdfs
#一旦路径中含有基于时间的转义序列,要求event的header中必须有timestamp=时间戳,如果没有需要将useLocalTimeStamp = true
a1.sinks.k1.hdfs.path = hdfs://hadoop102:9000/origin_data/gmall/log/topic_start/%Y-%m-%d
a1.sinks.k1.hdfs.filePrefix = logstart-

a1.sinks.k1.hdfs.batchSize = 1000

#文件的滚动
#60秒滚动生成一个新的文件
a1.sinks.k1.hdfs.rollInterval = 30
#设置每个文件到128M时滚动
a1.sinks.k1.hdfs.rollSize = 134217700
#禁用基于event数量的文件滚动策略
a1.sinks.k1.hdfs.rollCount = 0
#指定文件使用LZO压缩格式
a1.sinks.k1.hdfs.fileType = CompressedStream 
a1.sinks.k1.hdfs.codeC = lzop
#a1.sinks.k1.hdfs.round = true
#a1.sinks.k1.hdfs.roundValue = 10
#a1.sinks.k1.hdfs.roundUnit = second



a1.sinks.k2.type = hdfs
a1.sinks.k2.hdfs.path = hdfs://hadoop102:9000/origin_data/gmall/log/topic_event/%Y-%m-%d
a1.sinks.k2.hdfs.filePrefix = logevent-
a1.sinks.k2.hdfs.batchSize = 1000
a1.sinks.k2.hdfs.rollInterval = 30
a1.sinks.k2.hdfs.rollSize = 134217700
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.fileType = CompressedStream 
a1.sinks.k2.hdfs.codeC = lzop
#a1.sinks.k2.hdfs.round = true
#a1.sinks.k2.hdfs.roundValue = 10
#a1.sinks.k2.hdfs.roundUnit = second

#连接组件
a1.sources.r1.channels=c1
a1.sources.r2.channels=c2
a1.sinks.k1.channel=c1
a1.sinks.k2.channel=c2

flume写入hdfs采用lzo格式需要先向core-site.xml添加相关压缩格式的配置

<property>
    <name>io.compression.codecs</name>
    <value>
         com.hadoop.compression.lzo.LzoCodec,
         com.hadoop.compression.lzo.LzopCodec
     </value>
  </property>
  <property>
    <name>io.compression.codec.lzo.class</name>
    <value>com.hadoop.compression.lzo.LzoCodec</value>
 </property>

flume第二层通道的启动和关闭脚本f2

#!/bin/bash
if(($#!=1))
then
        echo 请输入start或stop!
        exit;
fi

if [ $1 = start ]
then
        ssh hadoop104 "nohup flume-ng agent -c $FLUME_HOME/conf/ -n a1 -f $FLUME_HOME/myagents/f2.conf -Dflume.root.logger=INFO,console > /home/atguigu/f2.log 2>&1 &"
elif [ $1 = stop ]
then
        ssh hadoop104 "ps -ef | grep f2.conf | grep -v grep | awk -F ' ' '{print \$2}' | xargs kill -9"
else
        echo 请输入start或stop!
fi

数据采集集群一键启动脚本

#!/bin/bash
if(($#!=1))
then
  echo 请输入start或stop!
  exits;
fi
#编写函数,这个函数的功能为返回集群中启动成功的broker的数量
function countKafkaBrokers()
{
  count=0
  for((i=102;i<=104;i++))
  do
    result=$(ssh hadoop$i "jps | grep Kafka | wc -l")
    count=$[$result+$count]
  done
  #函数可以定义返回值,如果定义,返回函数最后一行命令的状态(返回0,代表成功,非0,即为异常)
  return $count
}
#启动
if [ $1 = start ]
then
  zk start
  hd start
  kf start
  #保证kafka集群已经启动,才能启动f1,f2采集通道
  while [ 1 ]
  do
    countKafkaBrokers
    if(($?==3))
    then
      break
    fi
    sleep 2s
  done
  f1 start
  f2 start
  #查看已经启动进程
  xcall jps
elif [ $1 = stop ]
then
  f1 stop
  f2 stop
  kf stop
  #在kafka没有停止完成之前,不能停止zk集群
  while [ 1 ]
        do
                countKafkaBrokers
                if(($?==0))
                then
                        break
                fi
                sleep 2s
        done
  zk stop
  hd stop
  #查看还剩那些进程
  xcall jps
else
  echo echo 请输入start或stop!
fi

HDFS-HA配置

配置nameservice,编写hdfs-sitx.xml

vim hdfs-site.xml
• 1
<property>
  <name>dfs.nameservices</name>
  <value>mycluster</value>
</property>
<property>
  <name>dfs.ha.namenodes.mycluster</name>
  <value>nn1,nn2</value>
</property>
<property>
  <name>dfs.namenode.rpc-address.mycluster.nn1</name>
  <value>hadoop101:8020</value>
</property>
<property>
  <name>dfs.namenode.http-address.mycluster.nn1</name>
  <value>hadoop101:50070</value>
</property>
<property>
  <name>dfs.namenode.http-address.mycluster.nn2</name>
  <value>hadoop102:50070</value>
</property>

<property>
  <name>dfs.namenode.rpc-address.mycluster.nn2</name>
  <value>hadoop102:8020</value>
</property>
<property>
<property>
  <name>dfs.namenode.shared.edits.dir</name>
  <value>qjournal://hadoop101:8485;hadoop102:8485;hadoop103:8485/mycluster</value>
</property>
<property>
  <name>dfs.journalnode.edits.dir</name>
  <value>/opt/module/hadoop-2.7.2/data/data</value>
</property>
 <!-- 配置隔离机制,即同一时刻只能有一台服务器对外响应 -->
        <property>
                <name>dfs.ha.fencing.methods</name>
                <value>sshfence</value>
        </property>

<!-- 使用隔离机制时需要ssh无秘钥登录-->
        <property>
         <name>dfs.ha.fencing.ssh.private-key-files</name>
        <value>/home/atguigu/.ssh/id_rsa</value>
        </property>
<property>
  <name>dfs.client.failover.proxy.provider.mycluster</name>
  <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<property>
      <name>dfs.namenode.secondary.http-address</name>
      <value>hadoop103:50090</value>
</property>
<!--配置故障自动转义-->
 <property>
   <name>dfs.ha.automatic-failover.enabled</name>
   <value>true</value>

(2)编写core-site.xml

<!-- 指定HDFS中NameNode的地址 -->
<property>
        <name>fs.defaultFS</name>
        <value>hdfs://mycluster</value>
</property>
 <!--配置hadoop运行时临时文件-->
  <property>
  <name>hadoop.tmp.dir</name>
  <value>/opt/module/hadoop-2.7.2/data/tmp</value>
 </property>
<!-- 指定Hadoop运行时产生文件的存储目录 -->
<!--<property>
        <name>hadoop.tmp.dir</name>
        <value>/opt/module/hadoop-2.7.2/data/tmp</value>
</property>-->
<!--配置zookeeper地址-->
  <property>
   <name>ha.zookeeper.quorum</name>  <value>hadoop101:2181,hadoop102:2181,hadoop103:2181</value>
 </property>

启动journalnode

xcall hadoop-daemon.sh start journalnode
• 1

在nn1上对namenode进行格式化

hadoop namenode -format
hdfs namenode -bootstrapStandby

在nn2上对namenode信息进行拷贝

stop-all.sh
hdfs zkfc -formatZK
start-dfs.sh

ResouceManager-HA配置

(1)编写yarn-site.xml

<property>
                <name>yarn.nodemanager.aux-services</name>
                <value>mapreduce_shuffle</value>
</property>
<property>
  <name>yarn.resourcemanager.cluster-id</name>
  <value>cluster1</value>
</property>
<property>
  <name>yarn.resourcemanager.ha.rm-ids</name>
  <value>rm1,rm2</value>
</property>
<property>
  <name>yarn.resourcemanager.hostname.rm1</name>
  <value>hadoop101</value>
</property>
<property>
  <name>yarn.resourcemanager.hostname.rm2</name>
  <value>hadoop102</value>
</property>
<property>
  <name>yarn.resourcemanager.webapp.address.rm1</name>
  <value>hadoop101:8088</value>
</property>
<property>
  <name>yarn.resourcemanager.webapp.address.rm2</name>
  <value>hadoop102:8088</value>
</property>
<property>
  <name>yarn.resourcemanager.zk-address</name>
  <value>hadoop101:2181,hadoop102:2181,hadoop103:2181</value>
</property>
<!--启用自动恢复-->
    <property>
        <name>yarn.resourcemanager.recovery.enabled</name>
        <value>true</value>
</property>
<!--指定resourcemanager的状态信息存储在zookeeper集群-->
 <property>
     <name>yarn.resourcemanager.store.class</name>
   <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
</property>
<property>
  <name>yarn.resourcemanager.ha.enabled</name>
  <value>true</value>
</property>
<!-- 日志聚集功能使能 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>

<!-- 日志保留时间设置7天 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>
相关实践学习
AnalyticDB MySQL海量数据秒级分析体验
快速上手AnalyticDB MySQL,玩转SQL开发等功能!本教程介绍如何在AnalyticDB MySQL中,一键加载内置数据集,并基于自动生成的查询脚本,运行复杂查询语句,秒级生成查询结果。
阿里云云原生数据仓库AnalyticDB MySQL版 使用教程
云原生数据仓库AnalyticDB MySQL版是一种支持高并发低延时查询的新一代云原生数据仓库,高度兼容MySQL协议以及SQL:92、SQL:99、SQL:2003标准,可以对海量数据进行即时的多维分析透视和业务探索,快速构建企业云上数据仓库。 了解产品 https://www.aliyun.com/product/ApsaraDB/ads
相关文章
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
144 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
1月前
|
2月前
|
分布式计算 大数据 Serverless
云栖实录 | 开源大数据全面升级:Native 核心引擎、Serverless 化、湖仓架构引领云上大数据发展
在2024云栖大会开源大数据专场上,阿里云宣布推出实时计算Flink产品的新一代向量化流计算引擎Flash,该引擎100%兼容Apache Flink标准,性能提升5-10倍,助力企业降本增效。此外,EMR Serverless Spark产品启动商业化,提供全托管Serverless服务,性能提升300%,并支持弹性伸缩与按量付费。七猫免费小说也分享了其在云上数据仓库治理的成功实践。其次 Flink Forward Asia 2024 将于11月在上海举行,欢迎报名参加。
253 6
云栖实录 | 开源大数据全面升级:Native 核心引擎、Serverless 化、湖仓架构引领云上大数据发展
|
1月前
|
分布式计算 大数据 OLAP
AnalyticDB与大数据生态集成:Spark & Flink
【10月更文挑战第25天】在大数据时代,实时数据处理和分析变得越来越重要。AnalyticDB(ADB)是阿里云推出的一款完全托管的实时数据仓库服务,支持PB级数据的实时分析。为了充分发挥AnalyticDB的潜力,将其与大数据处理工具如Apache Spark和Apache Flink集成是非常必要的。本文将从我个人的角度出发,分享如何将AnalyticDB与Spark和Flink集成,构建端到端的大数据处理流水线,实现数据的实时分析和处理。
72 1
|
2月前
|
机器学习/深度学习 监控 搜索推荐
电商平台如何精准抓住你的心?揭秘大数据背后的神秘推荐系统!
【10月更文挑战第12天】在信息爆炸时代,数据驱动决策成为企业优化决策的关键方法。本文以某大型电商平台的商品推荐系统为例,介绍其通过收集用户行为数据,经过预处理、特征工程、模型选择与训练、评估优化及部署监控等步骤,实现个性化商品推荐,提升用户体验和销售额的过程。
107 1
|
1月前
|
并行计算 数据挖掘 大数据
Python数据分析实战:利用Pandas处理大数据集
Python数据分析实战:利用Pandas处理大数据集
|
2月前
|
消息中间件 存储 druid
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
大数据-156 Apache Druid 案例实战 Scala Kafka 订单统计
54 3
|
1月前
|
数据采集 分布式计算 OLAP
最佳实践:AnalyticDB在企业级大数据分析中的应用案例
【10月更文挑战第22天】在数字化转型的大潮中,企业对数据的依赖程度越来越高。如何高效地处理和分析海量数据,从中提取有价值的洞察,成为企业竞争力的关键。作为阿里云推出的一款实时OLAP数据库服务,AnalyticDB(ADB)凭借其强大的数据处理能力和亚秒级的查询响应时间,已经在多个行业和业务场景中得到了广泛应用。本文将从个人的角度出发,分享多个成功案例,展示AnalyticDB如何助力企业在广告投放效果分析、用户行为追踪、财务报表生成等领域实现高效的数据处理与洞察发现。
84 0
|
2月前
|
Oracle 大数据 数据挖掘
企业内训|大数据产品运营实战培训-某电信运营商大数据产品研发中心
本课程是TsingtaoAI专为某电信运营商的大数据产品研发中心的产品支撑组设计,旨在深入探讨大数据在电信运营商领域的应用与运营策略。通过密集的培训,从数据的本质与价值出发,系统解析大数据工具和技术的最新进展,深入剖析行业内外的实践案例。课程涵盖如何理解和评估数据、如何有效运用大数据技术、以及如何在不同业务场景中实现数据的价值转化。
65 0
|
4月前
|
数据采集 人工智能 安全
AI大数据处理与分析实战--体育问卷分析
本文是关于使用AI进行大数据处理与分析的实战案例,详细记录了对深圳市义务教育阶段学校“每天一节体育课”网络问卷的分析过程,包括数据概览、交互Prompt、代码处理、年级和学校维度的深入分析,以及通过AI工具辅助得出的分析结果和结论。

热门文章

最新文章