助力工业物联网,工业大数据项目之数据采集【四】

本文涉及的产品
云数据库 RDS MySQL,集群系列 2核4GB
推荐场景:
搭建个人博客
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
云数据库 RDS PostgreSQL,集群系列 2核4GB
简介: 助力工业物联网,工业大数据项目之数据采集【四】

01:Sqoop命令回顾

  • 目标:掌握Sqoop常用命令的使用
  • 路径
  • step1:语法
  • step2:数据库参数
  • step3:导入参数
  • step4:导出参数
  • step5:其他参数
  • 实施
  • 语法
sqoop import | export \
--数据库连接参数
--HDFS或者Hive的连接参数
--配置参数
  • 数据库参数
  • –connect jdbc:mysql://hostname:3306
  • –username
  • –password
  • –table
  • –columns
  • –where
  • -e/–query
  • 导入参数
  • –delete-target-dir
  • –target-dir
  • –hcatalog-database
  • –hcatalog-table
  • 导出参数
  • –export-dir
  • –hcatalog-database
  • –hcatalog-table
  • 其他参数
  • -m
  • 连接Oracle语法
--connect jdbc:oracle:thin:@OracleServer:OraclePort:OracleSID
  • 测试采集Oracle数据
  • 进入
docker exec -it sqoop bash
  • 测试
sqoop import \
--connect jdbc:oracle:thin:@oracle.bigdata.cn:1521:helowin \
--username ciss \
--password 123456 \
--table CISS4.CISS_BASE_AREAS \
--target-dir /test/full_imp/ciss4.ciss_base_areas \
--fields-terminated-by "\t" \
-m 1
  • 查看结果

  • 小结
  • 掌握Sqoop常用命令的使用

02:YARN资源调度及配置

  • 目标实现YARN的资源调度配置
  • 实施
  • 常用端口记住:排错
  • NameNode:8020,50070
  • ResourceManager:8032,8088
  • JobHistoryServer:19888
  • Master:7077,8080
  • HistoryServer:18080
  • YARN调度策略
  • FIFO:不用
  • 单队列,队列内部FIFO,所有资源只给一个程序运行
  • Capacity:Apache
  • 多队列,队列内部FIFO,资源分配给不同的队列,队列内部所有资源只给一个程序运行
  • Fair:CDH
  • 多队列,队列内部共享资源,队列内部的资源可以给多个程序运行
  • YARN面试题
  • 程序提交成功,但是不运行而且不报错,什么问题,怎么解决?
  • 资源问题:APPMaster就没有启动
  • 环境问题
  • NodeManager进程问题:进程存在,但不工作
  • 机器资源不足导致YARN或者HDFS服务停止:磁盘超过90%,所有服务不再工作
  • 解决:实现监控告警:80%,邮件告警
  • YARN中程序运行失败的原因遇到过哪些?
  • 代码逻辑问题
  • 资源问题:Container
  • Application / Driver:管理进程
  • MapTask和ReduceTask / Executor:执行进程
  • 解决问题:配置进程给定更多的资源
  • 问题1:程序已提交YARN,但是无法运行,报错:Application is added to the scheduler and is not activated. User’s AM resource limit exceeded.
yarn.scheduler.capacity.maximum-am-resource-percent=0.8
  • 配置文件:${HADOOP_HOME}/etc/hadoop/capacity-scheduler.xml
  • 属性功能:指定队列最大可使用的资源容量大小百分比,默认为0.2,指定越大,AM能使用的资源越多
  • 问题2:程序提交,运行失败,报错:无法申请Container
yarn.scheduler.minimum-allocation-mb=512
  • 配置文件:${HADOOP_HOME}/etc/hadoop/yarn-site.xml
  • 属性功能:指定AM为每个Container申请的最小内存,默认为1G,申请不足1G,默认分配1G,值过大,会导致资源不足,程序失败,该值越小,能够运行的程序就越多
  • 问题3:怎么提高YARN集群的并发度?
  • 物理资源、YARN资源、Container资源、进程资源
  • YARN资源配置
yarn.nodemanager.resource.cpu-vcores=8
yarn.nodemanager.resource.memory-mb=8192
  • Container资源
yarn.scheduler.minimum-allocation-vcores=1
yarn.scheduler.maximum-allocation-vcores=32
yarn.scheduler.minimum-allocation-mb=1024
yarn.scheduler.maximum-allocation-mb=8192
  • MR Task资源
mapreduce.map.cpu.vcores=1
mapreduce.map.memory.mb=1024
mapreduce.reduce.cpu.vcores=1
mapreduce.reduce.memory.mb=1024
  • Spark Executor资源
--driver-memory  #分配给Driver的内存,默认分配1GB
--driver-cores   #分配给Driver运行的CPU核数,默认分配1核
--executor-memory #分配给每个Executor的内存数,默认为1G,所有集群模式都通用的选项
--executor-cores  #分配给每个Executor的核心数,YARN集合和Standalone集群通用的选项
--total-executor-cores NUM  #Standalone模式下用于指定所有Executor所用的总CPU核数
--num-executors NUM #YARN模式下用于指定Executor的个数,默认启动2个
  • 实现:修改问题1中的配置属性
  • 注意:修改完成,要重启YARN

  • [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jgRIa2kT-1673426702988)(Day2_数仓设计及数据采集.assets/image-20210822085238536.png)]
  • 小结
  • 实现YARN的资源调度配置

03:MR的Uber模式

  • 目标:了解MR的Uber模式的配置及应用
  • 实施
  • Spark为什么要比MR要快
  • MR慢
  • 只有Map和Reduce阶段,每个阶段的结果都必须写入磁盘
  • 如果要实现Map1 -> Map2 -> Reduce1 -> Reduce2
  • Mapreduce1:Map1
  • MapReduce2:Map2 -> Reduce1
  • Mapreduce3:Reduce2
  • MapReduce程序处理是进程级别:MapTask进程、ReduceTask进程
  • 问题:MR程序运行在YARN上时,有一些轻量级的作业要频繁的申请资源再运行,性能比较差怎么办?
  • Uber模式
  • 功能:Uber模式下,程序只申请一个AM Container:所有Map Task和Reduce Task,均在这个Container中顺序执行
  • 默认不开启
  • 配置:${HADOOP_HOME}/etc/hadoop/mapred-site.xml
mapreduce.job.ubertask.enable=true
#必须满足以下条件
mapreduce.job.ubertask.maxmaps=9
mapreduce.job.ubertask.maxreduces=1
mapreduce.job.ubertask.maxbytes=128M
yarn.app.mapreduce.am.resource.cpu-vcores=1
yarn.app.mapreduce.am.resource.mb=1536M
  • 特点
  • Uber模式的进程为AM,所有资源的使用必须小于AM进程的资源
  • Uber模式条件不满足,不执行Uber模式
  • Uber模式,会禁用推测执行机制
  • 小结
  • 了解MR的Uber模式的配置及应用

04:Sqoop采集数据格式问题

  • 目标掌握Sqoop采集数据时的问题
  • 路径
  • step1:现象
  • step2:问题
  • step3:原因
  • step4:解决
  • 实施
  • 现象
  • step1:查看Oracle中CISS_SERVICE_WORKORDER表的数据条数
select count(1) as cnt from CISS_SERVICE_WORKORDER;
  • step2:采集CISS_SERVICE_WORKORDER的数据到HDFS上
sqoop import
–connect jdbc:oracle:thin:@oracle.bigdata.cn:1521:helowin
–username ciss
–password 123456
–table CISS4.CISS_SERVICE_WORKORDER
–delete-target-dir
–target-dir /test/full_imp/ciss4.ciss_service_workorder
–fields-terminated-by “\001”
-m 1

- step3:Hive中建表查看数据条数
- 进入Hive容器
  ```
docker exec -it hive bash
  ```
- 连接HiveServer
  ```
  beeline -u jdbc:hive2://hive.bigdata.cn:10000 -n root -p 123456
  ```
- 创建测试表
  ```sql
  create external table test_text(
  line string
  )
  location '/test/full_imp/ciss4.ciss_service_workorder';
  ```
- 统计行数
  ```
  select count(*) from test_text;
  ```
  • 问题:Sqoop采集完成后导致HDFS数据与Oracle数据量不符
  • 原因
  • sqoop以文本格式导入数据时,默认的换行符是特殊字符
  • Oracle中的数据列中如果出现了\n、\r、\t等特殊字符,就会被划分为多行
  • Oracle数据
id      name        age
001     zhang\nsan      18
  • Sqoop遇到特殊字段就作为一行
001     zhang
san     18
  • Hive
id      name        age
001     zhang 
san     18
  • 解决
  • 方案一:删除或者替换数据中的换行符
  • –hive-drop-import-delims:删除换行符
  • –hive-delims-replacement char:替换换行符
  • 不建议使用:侵入了原始数据
  • 方案二:使用特殊文件格式:AVRO格式
  • 小结
  • 掌握Sqoop采集数据时的问题

05:问题解决:Avro格式

  • 目标:掌握使用Avro格式解决采集换行问题
  • 路径
  • step1:常见格式介绍
  • step2:Avro格式特点
  • step3:Sqoop使用Avro格式
  • step4:使用测试
  • 实施
  • 常见格式介绍
类型 介绍
TextFile Hive默认的文件格式,最简单的数据格式,便于查看和编辑,耗费存储空间,I/O性能较低
SequenceFile 含有键值对的二进制文件,优化磁盘利用率和I/O,并行操作数据,查询效率高,但存储空间消耗最大
AvroFile 特殊的二进制文件,设计的主要目标是为了满足schema evolution,Schema和数据保存在一起
OrcFile 列式存储,Schema存储在footer中,不支持schema evolution,高度压缩比并包含索引,查询速度非常快
ParquetFile 列式存储,与Orc类似,压缩比不如Orc,但是查询性能接近,支持的工具更多,通用性更强
  • SparkCore缺点:RDD【数据】:没有Schema
  • SparkSQL优点:DataFrame【数据 + Schema】
  • Schema:列的信息【名称、类型】
  • Avro格式特点
  • 优点
  • 二进制数据存储,性能好、效率高
  • 使用JSON描述模式,支持场景更丰富
  • Schema和数据统一存储,消息自描述
  • 模式定义允许定义数据的排序
  • 缺点
  • 只支持Avro自己的序列化格式
  • 少量列的读取性能比较差,压缩比较低
  • 场景:基于行的大规模结构化数据写入、列的读取非常多或者Schema变更操作比较频繁的场景
  • Sqoop使用Avro格式
  • 选项
--as-avrodatafile                                     Imports data to Avro datafiles
  • 注意:如果使用了MR的Uber模式,必须在程序中加上以下参数避免类冲突问题
-Dmapreduce.job.user.classpath.first=true
  • 使用测试
sqoop import \
-Dmapreduce.job.user.classpath.first=true \
--connect jdbc:oracle:thin:@oracle.bigdata.cn:1521:helowin \
--username ciss \
--password 123456 \
--table CISS4.CISS_SERVICE_WORKORDER \
--delete-target-dir \
--target-dir /test/full_imp/ciss4.ciss_service_workorder \
--as-avrodatafile \
--fields-terminated-by "\001" \
-m 1
  • Hive中建表
  • 进入Hive容器
docker exec -it hive bash
  • 连接HiveServer
beeline -u jdbc:hive2://hive.bigdata.cn:10000 -n root -p 123456
  • 创建测试表
create external table test_avro(
line string
)
stored as avro
location '/test/full_imp/ciss4.ciss_service_workorder';
  • 统计行数
select count(*) from test_avro;
  • 小结
  • 掌握如何使用Avro格式解决采集换行问题

06:Sqoop增量采集方案回顾

  • 目标:回顾Sqoop增量采集方案
  • 路径
  • step1:Append
  • step2:Lastmodified
  • step3:特殊方式
  • 实施
  • Append
  • 要求:必须有一列自增的值,按照自增的int值进行判断
  • 特点:只能导入增加的数据,无法导入更新的数据
  • 场景:数据只会发生新增,不会发生更新的场景
  • 代码
sqoop import \
--connect jdbc:mysql://node3:3306/sqoopTest \
--username root \
--password 123456 \
--table tb_tohdfs \
--target-dir /sqoop/import/test02 \
--fields-terminated-by '\t' \
--check-column id \
--incremental append \
--last-value 0 \
-m 1
  • Lastmodified
  • 要求:必须包含动态时间变化这一列,按照数据变化的时间进行判断
  • 特点:既导入新增的数据也导入更新的数据
  • 场景:一般无法满足要求,所以不用
  • 代码
sqoop import \
--connect jdbc:mysql://node3:3306/sqoopTest \
--username root \
--password 123456 \
--table tb_lastmode \
--target-dir /sqoop/import/test03 \
--fields-terminated-by '\t' \
--incremental lastmodified \
--check-column lastmode \
--last-value '2021-06-06 16:09:32' \
-m 1
  • 特殊方式
  • 要求:每次运行的输出目录不能相同
  • 特点:自己实现增量的数据过滤,可以实现新增和更新数据的采集
  • 场景:一般用于自定义增量采集每天的分区数据到Hive
  • 代码
sqoop  import \
--connect jdbc:mysql://node3:3306/db_order \
--username root \
--password-file file:///export/data/sqoop.passwd \
--query "select * from tb_order where substring(create_time,1,10) = '2021-09-14' or substring(update_time,1,10) = '2021-09-14' and \$CONDITIONS " \
--delete-target-dir \
--target-dir /nginx/logs/tb_order/daystr=2021-09-14 \
--fields-terminated-by '\t' \
-m 1
  • 小结
  • 回顾Sqoop增量采集方案


相关实践学习
钉钉群中如何接收IoT温控器数据告警通知
本实验主要介绍如何将温控器设备以MQTT协议接入IoT物联网平台,通过云产品流转到函数计算FC,调用钉钉群机器人API,实时推送温湿度消息到钉钉群。
阿里云AIoT物联网开发实战
本课程将由物联网专家带你熟悉阿里云AIoT物联网领域全套云产品,7天轻松搭建基于Arduino的端到端物联网场景应用。 开始学习前,请先开通下方两个云产品,让学习更流畅: IoT物联网平台:https://iot.console.aliyun.com/ LinkWAN物联网络管理平台:https://linkwan.console.aliyun.com/service-open
目录
相关文章
|
16天前
|
监控 供应链 安全
物联网卡在工业领域的应用
物联网卡在工业领域的应用极大地推动了行业的智能化、自动化和高效化进程。以下是物联网卡在工业领域中各操作类型中的具体应用作用:
|
2月前
|
消息中间件 存储 传感器
RabbitMQ 在物联网 (IoT) 项目中的应用案例
【8月更文第28天】随着物联网技术的发展,越来越多的设备被连接到互联网上以收集和传输数据。这些设备可以是传感器、执行器或其他类型的硬件。为了有效地管理这些设备并处理它们产生的大量数据,需要一个可靠的消息传递系统。RabbitMQ 是一个流行的开源消息中间件,它提供了一种灵活的方式来处理和转发消息,非常适合用于物联网环境。
75 1
|
2月前
|
分布式计算 搜索推荐 物联网
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
大数据及AI典型场景实践问题之通过KafKa+OTS+MaxCompute完成物联网系统技术重构如何解决
|
2月前
|
消息中间件 数据采集 关系型数据库
大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka
大数据-业务数据采集-FlinkCDC 读取 MySQL 数据存入 Kafka
42 1
|
2月前
|
数据采集 关系型数据库 MySQL
大数据-业务数据采集-FlinkCDC The MySQL server is not configured to use a ROW binlog_format
大数据-业务数据采集-FlinkCDC The MySQL server is not configured to use a ROW binlog_format
34 1
|
2月前
|
数据采集 大数据
大数据-业务数据采集-FlinkCDC DebeziumSourceFunction via the 'serverTimezone' configuration property
大数据-业务数据采集-FlinkCDC DebeziumSourceFunction via the 'serverTimezone' configuration property
29 1
|
2月前
|
JSON 关系型数据库 大数据
大数据-业务数据采集-FlinkCDC
大数据-业务数据采集-FlinkCDC
51 1
|
2月前
|
机器学习/深度学习 设计模式 人工智能
面向对象方法在AIGC和大数据集成项目中的应用
【8月更文第12天】随着人工智能生成内容(AIGC)和大数据技术的快速发展,企业面临着前所未有的挑战和机遇。AIGC技术能够自动产生高质量的内容,而大数据技术则能提供海量数据的支持,两者的结合为企业提供了强大的竞争优势。然而,要充分利用这些技术,就需要构建一个既能处理大规模数据又能高效集成机器学习模型的集成框架。面向对象编程(OOP)以其封装性、继承性和多态性等特点,在构建这样的复杂系统中扮演着至关重要的角色。
53 3
|
3月前
|
弹性计算 分布式计算 大数据
MaxCompute产品使用合集之如何将用户A从项目空间A申请的表权限需要改为用户B
MaxCompute作为一款全面的大数据处理平台,广泛应用于各类大数据分析、数据挖掘、BI及机器学习场景。掌握其核心功能、熟练操作流程、遵循最佳实践,可以帮助用户高效、安全地管理和利用海量数据。以下是一个关于MaxCompute产品使用的合集,涵盖了其核心功能、应用场景、操作流程以及最佳实践等内容。
|
3月前
|
分布式计算 运维 DataWorks
MaxCompute操作报错合集之用户已在DataWorks项目中,并有项目的开发和运维权限,下载数据时遇到报错,该如何解决
MaxCompute是阿里云提供的大规模离线数据处理服务,用于大数据分析、挖掘和报表生成等场景。在使用MaxCompute进行数据处理时,可能会遇到各种操作报错。以下是一些常见的MaxCompute操作报错及其可能的原因与解决措施的合集。

热门文章

最新文章

下一篇
无影云桌面