• 关于

    hdfs 2 api

    的搜索结果

问题

Hadoop的HDFS的java client jar包在哪下载?

因为相信,所以看见。 2020-05-27 10:03:13 8 浏览量 回答数 1

回答

 云HBase团队为大家提供了一个github项目供大家参考使用上面的三种方式来开发Spark分析HBase的程序,项目地址; https://github.com/lw309637554/alicloud-hbase-spark-examples?spm=a2c4e.11153940.blogcont573569.14.320377b4U14MDa依赖项:需要下载云HBase及云Phoenix的client包分析HFILE:需要先开通云HBase的HDFS访问权限,参考文档在hbase shell中对表生成snapshot表“snapshot 'sourceTable', ‘snapshotName'”在项目中配置自己的hdfs-sit.xml文件,然后通过直读HDFS的方式分析snapshot表具体的exampleRDD API对应:org.apache.spark.hbase.NativeRDDAnalyzeSQL API对应:org.apache.spark.sql.execution.datasources.hbase.SqlAnalyze分析HFILE对应:org.apache.spark.hfile.SparkAnalyzeHFILE

hiekay 2019-12-02 01:43:14 0 浏览量 回答数 0

问题

如何使用flink将增量数据写入配置单元

flink小助手 2019-12-01 19:22:00 899 浏览量 回答数 1

阿里云高校特惠,助力学生创业梦!0元体验,快速入门云计算!

学生动手场景应用,快速了解并掌握云服务器的各种新奇玩法!

问题

在将Flink数据集写入hdfs时如何创建Job对象

flink小助手 2019-12-01 19:22:01 774 浏览量 回答数 1

回答

本文档介绍文件存储HDFS和对象存储OSS之间的数据迁移过程。您可以将文件存储HDFS数据迁移到对象存储OSS,也可以将对象存储OSS的数据迁移到文件存储HDFS上。 背景信息 阿里云文件存储HDFS是面向阿里云ECS实例及容器服务等计算资源的文件存储服务。文件存储HDFS允许您就像在Hadoop分布式文件系统中管理和访问数据,并对热数据提供高性能的数据访问能力。对象存储OSS是海量、安全、低成本、高可靠的云存储服务,并提供标准型、归档型等多种存储类型供选择。客户可以在文件存储HDFS和对象存储OSS之间实现数据迁移,从而实现热、温、冷数据的合理分层,在实现对热数据的高性能访问的同时,有效控制存储成本。 准备工作 挂载文件系统,详情请参见挂载文件系统。 验证文件系统和计算节点之间的连通性。 执行以下命令,在文件存储HDFS上创建目录(如:/dfs_links)。 hadoop fs -mkdir /dfs_links 执行以下命令,验证连通性。 hadoop fs -ls dfs://f-xxxxxxxxxxxxxxx.cn-xxxxxxx.dfs.aliyuncs.com:10290/dfs_links 其中f-xxxxxxxxxxxxxxx.cn-xxxxxxx.dfs.aliyuncs.com为文件存储HDFS挂载点域名,请根据实际情况进行修改。 如果命令正常执行无输出结果,则表示连通成功。如果连通失败,请参见创建文件系统实例后,为什么无法访问文件存储HDFS?进行排查。 准备迁移工具。 单击emr-tools下载迁移工具安装包。 将迁移工具安装包上传计算节点的本地目录。 说明 该计算节点必须运行着Hadoop的YARN服务,或者是YARN集群中可以提交作业的计算节点。因为emr-tools迁移工具需要借助Hadoop数据迁移工具DistCp实现数据的迁移。 执行以下命令,解压安装包。 tar jxf emr-tools.tar.bz2 将文件存储HDFS数据迁移到对象存储OSS 进入emr-tools工具安装包解压后所在的目录,使用hdfs2oss4emr.sh脚本将文件存储HDFS上的数据迁移到对象存储OSS上,具体命令如下所示。 cd emr-tools ./hdfs2oss4emr.sh dfs://f-xxxxxxxxxxxxxxx.cn-xxxxxxx.dfs.aliyuncs.com:10290/HDFS2OSS/data/data_1000g oss://accessKeyId:accessKeySecret@bucket-name.oss-cn-hangzhou.aliyuncs.com/HDFS2OSS/data/data_1000g 参数说明如下表所示。 参数 说明 accessKeyId 访问对象存储OSS API的密钥。获取方式请参见如何获取AccessKeyId和AccessKeySecret。 accessKeySecret bucket-name.oss-cn-hangzhou.aliyuncs.com 对象存储OSS的访问域名,包括bucket名称和endpoint地址。 执行以上命令后,系统将启动一个Hadoop MapReduce任务(DistCp)。 任务执行完成后,查看迁移结果。 如果回显包含如下类似信息,说明迁移成功。 19/03/27 08:48:58 INFO mapreduce.Job: Job job_1553599949635_0014 completed successfully 19/03/27 08:48:59 INFO mapreduce.Job: Counters: 38 File System Counters FILE: Number of bytes read=0 FILE: Number of bytes written=2462230 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=1000001748624 HDFS: Number of bytes written=0 HDFS: Number of read operations=40124 HDFS: Number of large read operations=0 HDFS: Number of write operations=40 OSS: Number of bytes read=0 OSS: Number of bytes written=1000000000000 OSS: Number of read operations=0 OSS: Number of large read operations=0 OSS: Number of write operations=0 Job Counters Launched map tasks=20 Other local map tasks=20 Total time spent by all maps in occupied slots (ms)=65207738 Total time spent by all reduces in occupied slots (ms)=0 Total time spent by all map tasks (ms)=65207738 Total vcore-milliseconds taken by all map tasks=65207738 Total megabyte-milliseconds taken by all map tasks=66772723712 Map-Reduce Framework Map input records=10002 Map output records=0 Input split bytes=2740 Spilled Records=0 Failed Shuffles=0 Merged Map outputs=0 GC time elapsed (ms)=58169 CPU time spent (ms)=5960400 Physical memory (bytes) snapshot=4420333568 Virtual memory (bytes) snapshot=42971959296 Total committed heap usage (bytes)=2411724800 File Input Format Counters Bytes Read=1745884 File Output Format Counters Bytes Written=0 org.apache.hadoop.tools.mapred.CopyMapper$Counter BYTESCOPIED=1000000000000 BYTESEXPECTED=1000000000000 COPY=10002 copy from dfs://f-xxxxxxxxxxxxxxx.cn-xxxxxxx.dfs.aliyuncs.com:10290/path/on/dfs to oss://accessKeyId:accessKeySecret@bucket-name.oss-cn-hangzhou.aliyuncs.com/path/on/oss does succeed !!! 迁移完成后,您可以通过osscmd工具,执行以下命令查看对象存储OSS上的数据情况。 osscmd ls oss://bucket-name/HDFS2OSS/data/data_1000g 将对象存储OSS数据迁移到文件存储HDFS 进入emr-tools工具安装包解压后所在的目录,使用hdfs2oss4emr.sh脚本将对象存储OSS上的数据迁移到文件存储HDFS上,具体命令如下所示。 cd emr-tools ./hdfs2oss4emr.sh oss://accessKeyId:accessKeySecret@bucket-name.oss-cn-hangzhou.aliyuncs.com/OSS2HDFS/oss/1000g dfs://f-xxxxxxxxxxxxxxx.cn-xxxxxxx.dfs.aliyuncs.com:10290/OSS2HDFS/data/data_1000g 参数说明如下表所示。 参数 说明 accessKeyId 访问对象存储OSS API的密钥。获取方式请参见如何获取AccessKeyId和AccessKeySecret。 accessKeySecret bucket-name.oss-cn-hangzhou.aliyuncs.com 对象存储OSS的访问域名,包括bucket名称和endpoint地址。 执行以上命令后,系统将启动一个Hadoop MapReduce任务(DistCp)。 任务执行完成后,查看迁移结果。 如果回显包含如下类似信息,说明迁移成功。 19/03/23 21:59:23 INFO mapreduce.Job: Counters: 38 File System Counters DFS: Number of bytes read=2335687 DFS: Number of bytes written=999700000000 DFS: Number of read operations=60218 DFS: Number of large read operations=0 DFS: Number of write operations=20076 FILE: Number of bytes read=0 FILE: Number of bytes written=2575367 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 OSS: Number of bytes read=0 OSS: Number of bytes written=0 OSS: Number of read operations=0 OSS: Number of large read operations=0 OSS: Number of write operations=0 Job Counters Launched map tasks=21 Other local map tasks=21 Total time spent by all maps in occupied slots (ms)=36490484 Total time spent by all reduces in occupied slots (ms)=0 Total time spent by all map tasks (ms)=36490484 Total vcore-milliseconds taken by all map tasks=36490484 Total megabyte-milliseconds taken by all map tasks=37366255616 Map-Reduce Framework Map input records=10018 Map output records=0 Input split bytes=2856 Spilled Records=0 Failed Shuffles=0 Merged Map outputs=0 GC time elapsed (ms)=1064802 CPU time spent (ms)=10370840 Physical memory (bytes) snapshot=6452363264 Virtual memory (bytes) snapshot=45328142336 Total committed heap usage (bytes)=4169138176 File Input Format Counters Bytes Read=2332831 File Output Format Counters Bytes Written=0 org.apache.hadoop.tools.mapred.CopyMapper$Counter BYTESCOPIED=999700000000 BYTESEXPECTED=999700000000 COPY=10018 copy from oss://accessKeyId:accessKeySecret@bucket-name.oss-cn-hangzhou.aliyuncs.com/path/on/oss to dfs://f-xxxxxxxxxxxxxxx.cn-xxxxxxx.dfs.aliyuncs.com:10290/path/on/dfs does succeed !!! 迁移完成后,您可以执行以下命令查看文件存储HDFS上的数据情况。 hadoop fs -ls dfs://f-xxxxxxxxxxxxxxx.cn-xxxxxxx.dfs.aliyuncs.com:10290/OSS2HDFS/data/data_1000g 常见问题 迁移过程出现异常提示:Cannot obtain block length for LocatedBlock。 从原生HDFS往对象存储OSS/文件存储HDFS迁移数据时,可能会遇到这个问题。遇到该问题时,请执行hdfs fsck / –openforwrite命令,检查当前是否有文件处于写入状态尚未关闭。 如果有处于写入状态的文件时,需判断文件是否有效。 如果文件无效,则直接删除文件。 hdfs rm 如果文件有效,则不能直接删除,请考虑恢复问题文件租约。 hdfs debug recoverLease -path -retries 对于正在写入的文件,进行迁移时会遗漏掉最新写入的数据吗? Hadoop兼容文件系统提供单写者多读者并发语义,针对同一个文件,同一时刻可以有一个写者写入和多个读者读出。以文件存储HDFS到对象存储OSS的数据迁移为例,数据迁移任务打开文件存储HDFS的文件F,根据当前系统状态决定文件F的长度L,将L字节迁移到对象存储OSS。如果在数据迁移过程中,有并发的写者写入,文件F的长度将超过L,但是数据迁移任务无法感知到最新写入的数据。因此,建议您在做数据迁移时,避免往迁移的文件中写入数据。

1934890530796658 2020-03-31 02:35:12 0 浏览量 回答数 0

问题

flink在执行job时checkpoint报错

雅拓 2019-12-01 19:40:10 597 浏览量 回答数 1

问题

阿里云搭建hadoop chd,shell操作正常,使用java api操作出现异常。报错

因为相信,所以看见。 2020-05-27 12:59:29 3 浏览量 回答数 1

问题

新手求助:格式化HDFS文件系统 报错 namenode?报错

爱吃鱼的程序员 2020-06-10 11:15:03 0 浏览量 回答数 1

问题

hbase表删除后hdfs上数据文件已经没了,但是meta表仍然存在region信息

Pafii 2020-08-26 20:07:46 1 浏览量 回答数 0

问题

flink state.backend是rocksdb,存储在hdfs上,经常遇到checkpoint执行不成功的情况 checkpoint超时过期的原因(设置checkpoint超时为60s)

莱昂007 2019-12-01 20:27:51 3407 浏览量 回答数 1

问题

如何通过livy Programmatic API提交批处理jar Spark作业

flink小助手 2019-12-01 19:26:00 899 浏览量 回答数 1

回答

将Mysql迁移到Hbase主要有三种方法: 1、Put API Put API可能是将数据快速导入HBase表的最直接的方法。但是在导入【大量数据】时不建议使用!但是可以作为简单数据迁移的选择,直接写个代码批量处理,开发简单、方便、可控强。 2、MapReduce Job 推荐使用sqoop,它的底层实现是mapreduce,数据并行导入的,这样无须自己开发代码,过滤条件通过query参数可以实现。 Sqoop是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql...)间进行数据的传递,可以将MySQL中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到Mysql中。 参考Index of /docs。 采用如下命令:sqoop import --connect jdbc:mysql://localhost/db --username root -P --table mysql_order --columns "id,name" --hbase-table hbase_order --column-family f --hbase-row-key id --query "select id,name from mysql_order where..." -m 1 3、采用Bulk load装载数据 bulk-load的作用是用mapreduce的方式将hdfs上的文件装载到hbase中,对于海量数据装载入hbase非常有用。 需要将MySQL的表数据导出为TSV格式(因为后面使用Import TSV工具),还需要确保有一个字段可以表示HBase表行的row key。 “答案来源于网络,供您参考”

牧明 2019-12-02 02:14:54 0 浏览量 回答数 0

回答

将Mysql迁移到Hbase主要有三种方法: 1、Put API Put API可能是将数据快速导入HBase表的最直接的方法。但是在导入【大量数据】时不建议使用!但是可以作为简单数据迁移的选择,直接写个代码批量处理,开发简单、方便、可控强。 2、MapReduce Job 推荐使用sqoop,它的底层实现是mapreduce,数据并行导入的,这样无须自己开发代码,过滤条件通过query参数可以实现。 Sqoop是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql...)间进行数据的传递,可以将MySQL中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到Mysql中。 参考Index of /docs。 采用如下命令:sqoop import --connect jdbc:mysql://localhost/db --username root -P --table mysql_order --columns "id,name" --hbase-table hbase_order --column-family f --hbase-row-key id --query "select id,name from mysql_order where..." -m 1 3、采用Bulk load装载数据 bulk-load的作用是用mapreduce的方式将hdfs上的文件装载到hbase中,对于海量数据装载入hbase非常有用。 需要将MySQL的表数据导出为TSV格式(因为后面使用Import TSV工具),还需要确保有一个字段可以表示HBase表行的row key。 答案来源网络,供参考,希望对您有帮助

问问小秘 2019-12-02 02:19:21 0 浏览量 回答数 0

问题

flink checkpoint 在 window 操作下 全局配置失效的问题。

千狼 2019-12-01 20:26:05 1871 浏览量 回答数 3

问题

hadoop 2.7.2 安装 在zkfc 格式化时报错?报错

爱吃鱼的程序员 2020-06-09 11:46:10 0 浏览量 回答数 1

回答

。。。。hdfs  先fs -put csv hadoopdir把文件上传到hadoop 然后,使用流api啥的处理 ######回复 @孤独小桃子 : 就是hadoop fs -put这个命令把我要处理的文件上传到hadoop集群里,那这样的化我的shell脚本是不是也要改改?我去看看~######hadoop fs -put 把文件上传到hdfs, hdfs是分布式的,一个文件会按大小,切开,分散到不同的节点机器上。你的shell,此时也不再是在本地运行命令,要用hadoop的流式api,看看官方手册吧######map生成的东西都会被写在本地,reduce输出会生成part…的hdfs里面,不知道你想表达什么。######我想知道我要处理的数据文件应该存放在哪里,是存放在主机还是说分散存放在所有机器######你的意思是两台跟一台没啥区别是吧,Hadoop分布式环境应对数据量很大的情况,比如t级别,你的两个CVS文件,就是用n台效果也是一样的,电线杆当筷子了###### @lgscofield 是真的分布式的,不是伪分布式。######回复 @颠覆 : 你的hadoop环境是伪分布式还是真正的分布式呢,真分布式基本上是能看出性能提升的,其实10g的文件也不算大,hadoop体现不出优势######我不知道是不是那样使用分布式的,2台电脑处理的是10G的CSV文件,用一台时间太久,由于现在做测试行不行所有用的文件比较小,但是我看网上说的貌似时间也应该会有明显的减少吧。一台运行的时候用了接近3个小时,2台启动hadoop后,按照和一台的方法一样运行时间好像差不多。######那必须是放在master节点。###### @颠覆 是的,建议你去看一下Hadoop分布式的原理,map/reduce的工作机制######哦哦,只要放在master节点就可以了啊。具体处理数据的操作和单台电脑处理数据一样吗?

kun坤 2020-06-14 23:07:36 0 浏览量 回答数 0

回答

。。。。hdfs  先fs -put csv hadoopdir把文件上传到hadoop 然后,使用流api啥的处理 ######回复 @孤独小桃子 : 就是hadoop fs -put这个命令把我要处理的文件上传到hadoop集群里,那这样的化我的shell脚本是不是也要改改?我去看看~######hadoop fs -put 把文件上传到hdfs, hdfs是分布式的,一个文件会按大小,切开,分散到不同的节点机器上。你的shell,此时也不再是在本地运行命令,要用hadoop的流式api,看看官方手册吧######map生成的东西都会被写在本地,reduce输出会生成part…的hdfs里面,不知道你想表达什么。######我想知道我要处理的数据文件应该存放在哪里,是存放在主机还是说分散存放在所有机器######你的意思是两台跟一台没啥区别是吧,Hadoop分布式环境应对数据量很大的情况,比如t级别,你的两个CVS文件,就是用n台效果也是一样的,电线杆当筷子了######@lgscofield 是真的分布式的,不是伪分布式。######回复 @颠覆 : 你的hadoop环境是伪分布式还是真正的分布式呢,真分布式基本上是能看出性能提升的,其实10g的文件也不算大,hadoop体现不出优势######我不知道是不是那样使用分布式的,2台电脑处理的是10G的CSV文件,用一台时间太久,由于现在做测试行不行所有用的文件比较小,但是我看网上说的貌似时间也应该会有明显的减少吧。一台运行的时候用了接近3个小时,2台启动hadoop后,按照和一台的方法一样运行时间好像差不多。######那必须是放在master节点。######@颠覆 是的,建议你去看一下Hadoop分布式的原理,map/reduce的工作机制######哦哦,只要放在master节点就可以了啊。具体处理数据的操作和单台电脑处理数据一样吗?

kun坤 2020-06-02 17:22:34 0 浏览量 回答数 0

问题

如何实时获取第三方api接口数据?

1123778158879581 2020-10-23 09:49:59 0 浏览量 回答数 0

问题

Hive向分区表导入数据File not found: File does no?400报错

爱吃鱼的程序员 2020-05-30 23:50:57 0 浏览量 回答数 1

问题

hive执行hadoop任务报错,求救!?报错

爱吃鱼的程序员 2020-06-22 18:11:13 0 浏览量 回答数 1

问题

Flink程序打成jar包在集群中无法运行

木子民 2020-01-10 18:31:09 25 浏览量 回答数 1

问题

flink用IDEA本地运行可以读取HDFS数据,然后把项目打包提交到flink集群,无法读取HDFS数据,出现以下错误,这是为何?

八戒八戒2333 2019-12-01 19:42:04 1927 浏览量 回答数 2

回答

所以HDFS上的ZIP会有点棘手,因为它们不能很好地拆分,所以你必须为每个执行程序处理1个或更多的zip文件。这也是为数不多的情况之一,你可能不得不退回,SparkContext因为出于某种原因,Spark中的二进制文件支持并不是那么好。https://spark.apache.org/docs/2.4.0/api/scala/index.html#org.apache.spark.SparkContext有一个readBinaryFiles可以访问zip二进制数据的地方,然后您可以使用java或scala中的常规ZIP处理。

社区小助手 2019-12-02 01:47:50 0 浏览量 回答数 0

问题

Spark 【问答合集】

社区小助手 2019-12-01 19:41:25 37184 浏览量 回答数 4

回答

您可以使用Livy通过rest API提交spark作业。请按照以下步骤操作, 首先构建spark应用程序并创建程序集jar并将应用程序jar上载到hadoop集群的集群存储(HDFS)上。使用curl(用于测试)提交作业并使用http client api实现。使用scala中的http客户端提交spark作业的示例代码 import org.apache.http.client.methods.{CloseableHttpResponse, HttpGet, HttpPost, HttpPut}import org.apache.http.entity.StringEntityimport org.apache.http.impl.client.{CloseableHttpClient, HttpClientBuilder}import org.apache.http.util.EntityUtils import scala.util.parsing.json.{JSON, JSONObject} def submitJob(className: String, jarPath:String, extraArgs: List[String]) : JSONObject = { val jobSubmitRequest = new HttpPost(s"${clusterConfig.livyserver}/batches") val data = Map("className"-> className,"file" -> jarPath,"driverMemory" -> "2g","name" -> "LivyTest","proxyUser" -> "hadoop") if(extraArgs != null && !extraArgs.isEmpty) { data + ( "args" -> extraArgs)} val json = new JSONObject(data) println(json.toString()) val params = new StringEntity(json.toString(),"UTF-8")params.setContentType("application/json") jobSubmitRequest.addHeader("Content-Type", "application/json")jobSubmitRequest.addHeader("Accept", "/")jobSubmitRequest.setEntity(params) val client: CloseableHttpClient = HttpClientBuilder.create().build()val response: CloseableHttpResponse = client.execute(jobSubmitRequest)HttpReqUtil.parseHttpResponse(response)._2}

flink小助手 2019-12-02 01:48:12 0 浏览量 回答数 0

问题

弹性容器实例ECI

黄一刀 2020-04-04 01:37:23 90 浏览量 回答数 1

问题

用Sqoop,hdfs导入都mysql时候执行MR时候报错了?报错

爱吃鱼的程序员 2020-06-10 10:21:30 0 浏览量 回答数 1

回答

Checkpoint介绍 checkpoint机制是Flink可靠性的基石,可以保证Flink集群在某个算子因为某些原因(如 异常退出)出现故障时,能够将整个应用流图的状态恢复到故障之前的某一状态,保 证应用流图状态的一致性。Flink的checkpoint机制原理来自“Chandy-Lamport algorithm”算法。 每个需要checkpoint的应用在启动时,Flink的JobManager为其创建一个 CheckpointCoordinator,CheckpointCoordinator全权负责本应用的快照制作。 CheckpointCoordinator周期性的向该流应用的所有source算子发送barrier。 2.当某个source算子收到一个barrier时,便暂停数据处理过程,然后将自己的当前状 态制作成快照,并保存到指定的持久化存储中,最后向CheckpointCoordinator报告 自己快照制作情况,同时向自身所有下游算子广播该barrier,恢复数据处理 3.下游算子收到barrier之后,会暂停自己的数据处理过程,然后将自身的相关状态制作成快照,并保存到指定的持久化存储中,最后向CheckpointCoordinator报告自身 快照情况,同时向自身所有下游算子广播该barrier,恢复数据处理。 每个算子按照步骤3不断制作快照并向下游广播,直到最后barrier传递到sink算子,快照制作完成。 当CheckpointCoordinator收到所有算子的报告之后,认为该周期的快照制作成功; 否则,如果在规定的时间内没有收到所有算子的报告,则认为本周期快照制作失败 如果一个算子有两个输入源,则暂时阻塞先收到barrier的输入源,等到第二个输入源相 同编号的barrier到来时,再制作自身快照并向下游广播该barrier。具体如下图所示 两个输入源 checkpoint 过程 假设算子C有A和B两个输入源 在第i个快照周期中,由于某些原因(如处理时延、网络时延等)输入源A发出的 barrier先到来,这时算子C暂时将输入源A的输入通道阻塞,仅收输入源B的数据。 当输入源B发出的barrier到来时,算子C制作自身快照并向CheckpointCoordinator报 告自身的快照制作情况,然后将两个barrier合并为一个,向下游所有的算子广播。 当由于某些原因出现故障时,CheckpointCoordinator通知流图上所有算子统一恢复到某 个周期的checkpoint状态,然后恢复数据流处理。分布式checkpoint机制保证了数据仅被 处理一次(Exactly Once)。 持久化存储 目前,Checkpoint持久化存储可以使用如下三种: MemStateBackend 该持久化存储主要将快照数据保存到JobManager的内存中,仅适合作为测试以及 快照的数据量非常小时使用,并不推荐用作大规模商业部署。 FsStateBackend 该持久化存储主要将快照数据保存到文件系统中,目前支持的文件系统主要是 HDFS和本地文件。如果使用HDFS,则初始化FsStateBackend时,需要传入以 “hdfs://”开头的路径(即: new FsStateBackend("hdfs:///hacluster/checkpoint")), 如果使用本地文件,则需要传入以“file://”开头的路径(即:new FsStateBackend("file:///Data"))。在分布式情况下,不推荐使用本地文件。如果某 个算子在节点A上失败,在节点B上恢复,使用本地文件时,在B上无法读取节点 A上的数据,导致状态恢复失败。 RocksDBStateBackend RocksDBStatBackend介于本地文件和HDFS之间,平时使用RocksDB的功能,将数 据持久化到本地文件中,当制作快照时,将本地数据制作成快照,并持久化到 FsStateBackend中(FsStateBackend不必用户特别指明,只需在初始化时传入HDFS 或本地路径即可,如new RocksDBStateBackend("hdfs:///hacluster/checkpoint")或new RocksDBStateBackend("file:///Data"))。 如果用户使用自定义窗口(window),不推荐用户使用RocksDBStateBackend。在自 定义窗口中,状态以ListState的形式保存在StatBackend中,如果一个key值中有多 个value值,则RocksDB读取该种ListState非常缓慢,影响性能。用户可以根据应用 的具体情况选择FsStateBackend+HDFS或RocksStateBackend+HDFS。 语法 ​ val env = StreamExecutionEnvironment.getExecutionEnvironment() // start a checkpoint every 1000 ms env.enableCheckpointing(1000) // advanced options: // 设置checkpoint的执行模式,最多执行一次或者至少执行一次 env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) // 设置checkpoint的超时时间 env.getCheckpointConfig.setCheckpointTimeout(60000) // 如果在只做快照过程中出现错误,是否让整体任务失败:true是 false不是 env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false) //设置同一时间有多少 个checkpoint可以同时执行 env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) ​ 例子 需求 假定用户需要每隔1秒钟需要统计4秒中窗口中数据的量,然后对统计的结果值进行checkpoint处理 数据规划 使用自定义算子每秒钟产生大约10000条数据。 
 产生的数据为一个四元组(Long,String,String,Integer)—------(id,name,info,count)。 
 数据经统计后,统计结果打印到终端输出。 
 打印输出的结果为Long类型的数据。 
 开发思路 
 source算子每隔1秒钟发送10000条数据,并注入到Window算子中。 window算子每隔1秒钟统计一次最近4秒钟内数据数量。 每隔1秒钟将统计结果打印到终端 每隔6秒钟触发一次checkpoint,然后将checkpoint的结果保存到HDFS中。 //发送数据形式 case class SEvent(id: Long, name: String, info: String, count: Int) class SEventSourceWithChk extends RichSourceFunction[SEvent]{ private var count = 0L private var isRunning = true private val alphabet = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWZYX0987654321" // 任务取消时调用 override def cancel(): Unit = { isRunning = false } //// source算子的逻辑,即:每秒钟向流图中注入10000个元组 override def run(sourceContext: SourceContext[SEvent]): Unit = { while(isRunning) { for (i <- 0 until 10000) { sourceContext.collect(SEvent(1, "hello-"+count, alphabet,1)) count += 1L } Thread.sleep(1000) } } } /** 该段代码是流图定义代码,具体实现业务流程,另外,代码中窗口的触发时间使 用了event time。 */ object FlinkEventTimeAPIChkMain { def main(args: Array[String]): Unit ={ val env = StreamExecutionEnvironment.getExecutionEnvironment env.setStateBackend(new FsStateBackend("hdfs://hadoop01:9000/flink-checkpoint/checkpoint/")) env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE) env.getCheckpointConfig.setCheckpointInterval(6000) env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 应用逻辑 val source: DataStream[SEvent] = env.addSource(new SEventSourceWithChk) source.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[SEvent] { // 设置watermark override def getCurrentWatermark: Watermark = { new Watermark(System.currentTimeMillis()) } // 给每个元组打上时间戳 override def extractTimestamp(t: SEvent, l: Long): Long = { System.currentTimeMillis() } }) .keyBy(0) .window(SlidingEventTimeWindows.of(Time.seconds(4), Time.seconds(1))) .apply(new WindowStatisticWithChk) .print() env.execute() } } //该数据在算子制作快照时用于保存到目前为止算子记录的数据条数。 // 用户自定义状态 class UDFState extends Serializable{ private var count = 0L // 设置用户自定义状态 def setState(s: Long) = count = s // 获取用户自定状态 def getState = count } //该段代码是window算子的代码,每当触发计算时统计窗口中元组数量。 class WindowStatisticWithChk extends WindowFunction[SEvent, Long, Tuple, TimeWindow] with ListCheckpointed[UDFState]{ private var total = 0L // window算子的实现逻辑,即:统计window中元组的数量 override def apply(key: Tuple, window: TimeWindow, input: Iterable[SEvent], out: Collector[Long]): Unit = { var count = 0L for (event <- input) { count += 1L } total += count out.collect(count) } // 从自定义快照中恢复状态 override def restoreState(state: util.List[UDFState]): Unit = { val udfState = state.get(0) total = udfState.getState } // 制作自定义状态快照 override def snapshotState(checkpointId: Long, timestamp: Long): util.List[UDFState] = { val udfList: util.ArrayList[UDFState] = new util.ArrayList[UDFState] val udfState = new UDFState udfState.setState(total) udfList.add(udfState) udfList } } flink-SQL Table API和SQL捆绑在flink-table Maven工件中。必须将以下依赖项添加到你的项目才能使用Table API和SQL: org.apache.flink flink-table_2.11 1.5.0 另外,你需要为Flink的Scala批处理或流式API添加依赖项。对于批量查询,您需要添加: org.apache.flink flink-scala_2.11 1.5.0 Table API和SQL程序的结构 Flink的批处理和流处理的Table API和SQL程序遵循相同的模式; 所以我们只需要使用一种来演示即可 要想执行flink的SQL语句,首先需要获取SQL的执行环境: 两种方式(batch和streaming): // *************** // STREAMING QUERY // *************** val sEnv = StreamExecutionEnvironment.getExecutionEnvironment // create a TableEnvironment for streaming queries val sTableEnv = TableEnvironment.getTableEnvironment(sEnv) // *********** // BATCH QUERY // *********** val bEnv = ExecutionEnvironment.getExecutionEnvironment // create a TableEnvironment for batch queries val bTableEnv = TableEnvironment.getTableEnvironment(bEnv) 通过getTableEnvironment可以获取TableEnviromment;这个TableEnviromment是Table API和SQL集成的核心概念。它负责: 在内部目录中注册一个表 注册外部目录 执行SQL查询 注册用户定义的(标量,表格或聚合)函数 转换DataStream或DataSet成Table 持有一个ExecutionEnvironment或一个参考StreamExecutionEnvironment 在内部目录中注册一个表 TableEnvironment维护一个按名称注册的表的目录。有两种类型的表格,输入表格和输出表格。 输入表可以在Table API和SQL查询中引用并提供输入数据。输出表可用于将表API或SQL查询的结果发送到外部系统 输入表可以从各种来源注册: 现有Table对象,通常是表API或SQL查询的结果。 TableSource,它访问外部数据,例如文件,数据库或消息传递系统。 DataStream或DataSet来自DataStream或DataSet程序。 输出表可以使用注册TableSink。 注册一个表 // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // register the Table projTable as table "projectedX" tableEnv.registerTable("projectedTable", projTable) // Table is the result of a simple projection query val projTable: Table = tableEnv.scan("projectedTable ").select(...) 注册一个tableSource TableSource提供对存储在诸如数据库(MySQL,HBase等),具有特定编码(CSV,Apache [Parquet,Avro,ORC],...)的文件的存储系统中的外部数据的访问或者消息传送系统(Apache Kafka,RabbitMQ,...) // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // create a TableSource val csvSource: TableSource = new CsvTableSource("/path/to/file", ...) // register the TableSource as table "CsvTable" tableEnv.registerTableSource("CsvTable", csvSource) 注册一个tableSink 注册TableSink可用于将表API或SQL查询的结果发送到外部存储系统,如数据库,键值存储,消息队列或文件系统(使用不同的编码,例如CSV,Apache [Parquet ,Avro,ORC],...) // get a TableEnvironment val tableEnv = TableEnvironment.getTableEnvironment(env) // create a TableSink val csvSink: TableSink = new CsvTableSink("/path/to/file", ...) // define the field names and types val fieldNames: Array[String] = Array("a", "b", "c") val fieldTypes: Array[TypeInformation[_]] = Array(Types.INT, Types.STRING, Types.LONG) // register the TableSink as table "CsvSinkTable" tableEnv.registerTableSink("CsvSinkTable", fieldNames, fieldTypes, csvSink) 例子 //创建batch执行环境 val env = ExecutionEnvironment.getExecutionEnvironment //创建table环境用于batch查询 val tableEnvironment = TableEnvironment.getTableEnvironment(env) //加载外部数据 val csvTableSource = CsvTableSource.builder() .path("data1.csv")//文件路径 .field("id" , Types.INT)//第一列数据 .field("name" , Types.STRING)//第二列数据 .field("age" , Types.INT)//第三列数据 .fieldDelimiter(",")//列分隔符,默认是"," .lineDelimiter("\n")//换行符 .ignoreFirstLine()//忽略第一行 .ignoreParseErrors()//忽略解析错误 .build() //将外部数据构建成表 tableEnvironment.registerTableSource("tableA" , csvTableSource) //TODO 1:使用table方式查询数据 val table = tableEnvironment.scan("tableA").select("id , name , age").filter("name == 'lisi'") //将数据写出去 table.writeToSink(new CsvTableSink("bbb" , "," , 1 , FileSystem.WriteMode.OVERWRITE)) //TODO 2:使用sql方式 // val sqlResult = tableEnvironment.sqlQuery("select id,name,age from tableA where id > 0 order by id limit 2") //// //将数据写出去 // sqlResult.writeToSink(new CsvTableSink("aaaaaa.csv", ",", 1, FileSystem.WriteMode.OVERWRITE)) able和DataStream和DataSet的集成 1:将DataStream或DataSet转换为表格 在上面的例子讲解中,直接使用的是:registerTableSource注册表 对于flink来说,还有更灵活的方式:比如直接注册DataStream或者DataSet转换为一张表。 然后DataStream或者DataSet就相当于表,这样可以继续使用SQL来操作流或者批次的数据 语法: // get TableEnvironment // registration of a DataSet is equivalent Env:DataStream val tableEnv = TableEnvironment.getTableEnvironment(env) val stream: DataStream[(Long, String)] = ... // register the DataStream as Table "myTable" with fields "f0", "f1" tableEnv.registerDataStream("myTable", stream) 例子 object SQLToDataSetAndStreamSet { def main(args: Array[String]): Unit = { // set up execution environment val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) //构造数据 val orderA: DataStream[Order] = env.fromCollection(Seq( Order(1L, "beer", 3), Order(1L, "diaper", 4), Order(3L, "rubber", 2))) val orderB: DataStream[Order] = env.fromCollection(Seq( Order(2L, "pen", 3), Order(2L, "rubber", 3), Order(4L, "beer", 1))) // 根据数据注册表 tEnv.registerDataStream("OrderA", orderA) tEnv.registerDataStream("OrderB", orderB) // union the two tables val result = tEnv.sqlQuery( "SELECT * FROM OrderA WHERE amount > 2 UNION ALL " + "SELECT * FROM OrderB WHERE amount < 2") result.writeToSink(new CsvTableSink("ccc" , "," , 1 , FileSystem.WriteMode.OVERWRITE)) env.execute() } } case class Order(user: Long, product: String, amount: Int) 将表转换为DataStream或DataSet A Table可以转换成a DataStream或DataSet。通过这种方式,可以在Table API或SQL查询的结果上运行自定义的DataStream或DataSet程序 1:将表转换为DataStream 有两种模式可以将 Table转换为DataStream: 1:Append Mode 将一个表附加到流上 2:Retract Mode 将表转换为流 语法格式: // get TableEnvironment. // registration of a DataSet is equivalent // ge val tableEnv = TableEnvironment.getTableEnvironment(env) // Table with two fields (String name, Integer age) val table: Table = ... // convert the Table into an append DataStream of Row val dsRow: DataStream[Row] = tableEnv.toAppendStreamRow // convert the Table into an append DataStream of Tuple2[String, Int] val dsTuple: DataStream[(String, Int)] dsTuple = tableEnv.toAppendStream(String, Int) // convert the Table into a retract DataStream of Row. // A retract stream of type X is a DataStream[(Boolean, X)]. // The boolean field indicates the type of the change. // True is INSERT, false is DELETE. val retractStream: DataStream[(Boolean, Row)] = tableEnv.toRetractStreamRow 例子: object TableTODataSet_DataStream { def main(args: Array[String]): Unit = { //构造数据,转换为table val data = List( Peoject(1L, 1, "Hello"), Peoject(2L, 2, "Hello"), Peoject(3L, 3, "Hello"), Peoject(4L, 4, "Hello"), Peoject(5L, 5, "Hello"), Peoject(6L, 6, "Hello"), Peoject(7L, 7, "Hello World"), Peoject(8L, 8, "Hello World"), Peoject(8L, 8, "Hello World"), Peoject(20L, 20, "Hello World")) val env = StreamExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tEnv = TableEnvironment.getTableEnvironment(env) val stream = env.fromCollection(data) val table: Table = tEnv.fromDataStream(stream) //TODO 将table转换为DataStream----[数控等离子切割机](http://www.158cnc.com)[http://www.158cnc.com](http://www.158cnc.com)将一个表附加到流上Append Mode val appendStream: DataStream[Peoject] = tEnv.toAppendStream[Peoject](table) //TODO 将表转换为流Retract Mode true代表添加消息,false代表撤销消息 val retractStream: DataStream[(Boolean, Peoject)] = tEnv.toRetractStream[Peoject](table) retractStream.print() env.execute() } } case class Peoject(user: Long, index: Int, content: String) 将表转换为DataSet 语法格式 // get TableEnvironment // registration of a DataSet is equivalent val tableEnv = TableEnvironment.getTableEnvironment(env) // Table with two fields (String name, Integer age) val table: Table = ... // convert the Table into a DataSet of Row val dsRow: DataSet[Row] = tableEnv.toDataSetRow // convert the Table into a DataSet of Tuple2[String, Int] val dsTuple: DataSet[(String, Int)] = tableEnv.toDataSet(String, Int) 例子: case class Peoject(user: Long, index: Int, content: String) object TableTODataSet{ def main(args: Array[String]): Unit = { //构造数据,转换为table val data = List( Peoject(1L, 1, "Hello"), Peoject(2L, 2, "Hello"), Peoject(3L, 3, "Hello"), Peoject(4L, 4, "Hello"), Peoject(5L, 5, "Hello"), Peoject(6L, 6, "Hello"), Peoject(7L, 7, "Hello World"), Peoject(8L, 8, "Hello World"), Peoject(8L, 8, "Hello World"), Peoject(20L, 20, "Hello World")) //初始化环境,加载table数据 val env = ExecutionEnvironment.getExecutionEnvironment env.setParallelism(1) val tableEnvironment = TableEnvironment.getTableEnvironment(env) val collection: DataSet[Peoject] = env.fromCollection(data) val table: Table = tableEnvironment.fromDataSet(collection) //TODO 将table转换为dataSet val toDataSet: DataSet[Peoject] = tableEnvironment.toDataSet[Peoject](table) toDataSet.print() // env.execute() } }

凹凹凸曼 2020-06-16 19:23:12 0 浏览量 回答数 0

回答

两者都是用mr模型来进行并行计算: 1)hadoop的一个作业称为job,job里面分为map task和reduce task,每个task都是在自己的进程中运行的,当task结束时,进程也会结束。 2)spark用户提交的任务成为application,一个application对应一个sparkcontext,app中存在多个job,每触发一次action操作就会产生一个job。这些job可以并行或串行执行,每个job中有多个stage,stage是shuffle过程中DAGSchaduler通过RDD之间的依赖关系划分job而来的,每个stage里面有多个task,组成taskset有TaskSchaduler分发到各个executor中执行,executor的生命周期是和app一样的,即使没有job运行也是存在的,所以task可以快速启动读取内存进行计算。 3)hadoop的job只有map和reduce操作,表达能力比较欠缺而且在mr过程中会重复的读写hdfs,造成大量的io操作,多个job需要自己管理关系。 spark的迭代计算都是在内存中进行的,API中提供了大量的RDD操作如join,groupby等,而且通过DAG图可以实现良好的容错。

珍宝珠 2019-12-02 03:06:11 0 浏览量 回答数 0

问题

SparkContext无法以master设置为“Yarn”开始

社区小助手 2019-12-01 19:28:47 785 浏览量 回答数 1
阿里云大学 云服务器ECS com域名 网站域名whois查询 开发者平台 小程序定制 小程序开发 国内短信套餐包 开发者技术与产品 云数据库 图像识别 开发者问答 阿里云建站 阿里云备案 云市场 万网 阿里云帮助文档 免费套餐 开发者工具 企业信息查询 小程序开发制作 视频内容分析 企业网站制作 视频集锦 代理记账服务 企业建站模板