- 请列举会引起Shuffle过程的Spark算子,并简述功能。
reduceBykey:
groupByKey:
…ByKey:
- 简述Spark的两种核心Shuffle(HashShuffle与SortShuffle)的工作流程(包括未优化的HashShuffle、优化的HashShuffle、普通的SortShuffle与bypass的SortShuffle)(重点)
未经优化的HashShuffle:
优化后的Shuffle:
普通的SortShuffle:
当 shuffle read task 的 数 量 小 于 等 于 spark.shuffle.sort。
bypassMergeThreshold 参数的值时(默认为 200),就会启用 bypass 机制。
reduceByKey:按照key进行聚合,在shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]。
groupByKey:按照key进行分组,直接进行shuffle。
开发指导:reduceByKey比groupByKey,建议使用。但是需要注意是否会影响业务逻辑。
- 关系:
两者都是用来改变RDD的partition数量的,repartition底层调用的就是coalesce方法:coalesce(numPartitions, shuffle = true)
- 区别:
repartition一定会发生shuffle,coalesce根据传入的参数来判断是否发生shuffle
一般情况下增大rdd的partition数量使用repartition,减少partition数量时使用coalesce
都是做RDD持久化的
cache:内存,不会截断血缘关系,使用计算过程中的数据缓存。
checkpoint:磁盘,截断血缘关系,在ck之前必须没有任何任务提交才会生效,ck过程会额外提交一次任务。
累加器(accumulator)是Spark中提供的一种分布式的变量机制,其原理类似于mapreduce,即分布式的改变,然后聚合这些改变。累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。而广播变量用来高效分发较大的对象。
共享变量出现的原因:
通常在向 Spark 传递函数时,比如使用 map() 函数或者用 filter() 传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。
Spark的两个共享变量,累加器与广播变量,分别为结果聚合与广播这两种常见的通信模式突破了这一限制。
使用foreachPartition代替foreach,在foreachPartition内获取数据库的连接。
- RDD
优点:
编译时类型安全
编译时就能检查出类型错误
面向对象的编程风格
直接通过类名点的方式来操作数据
缺点:
序列化和反序列化的性能开销
无论是集群间的通信, 还是IO操作都需要对对象的结构和数据进行序列化和反序列化。
GC的性能开销,频繁的创建和销毁对象, 势必会增加GC
- DataFrame
DataFrame引入了schema和off-heap
schema : RDD每一行的数据, 结构都是一样的,这个结构就存储在schema中。 Spark通过schema就能够读懂数据, 因此在通信和IO时就只需要序列化和反序列化数据, 而结构的部分就可以省略了。
- DataSet
DataSet结合了RDD和DataFrame的优点,并带来的一个新的概念Encoder。
当序列化数据时,Encoder产生字节码与off-heap进行交互,能够达到按需访问数据的效果,而不用反序列化整个对象。Spark还没有提供自定义Encoder的API,但是未来会加入。
三者之间的转换:
join和sql中的inner join操作很相似,返回结果是前面一个集合和后面一个集合中匹配成功的,过滤掉关联不上的。
leftJoin类似于SQL中的左外关联left outer join,返回结果以第一个RDD为主,关联不上的记录为空。
部分场景下可以使用left semi join替代left join:
因为 left semi join 是 in(keySet) 的关系,遇到右表重复记录,左表会跳过,性能更高,而 left join 则会一直遍历。但是left semi join 中最后 select 的结果中只许出现左表中的列名,因为右表只有 join key 参与关联计算了
一、基于Receiver的方式
这种方式使用Receiver来获取数据。Receiver是使用Kafka的高层次Consumer API来实现的。receiver从Kafka中获取的数据都是存储在Spark Executor的内存中的(如果突然数据暴增,大量batch堆积,很容易出现内存溢出的问题),然后Spark Streaming启动的job会去处理那些数据。
然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启用高可靠机制,让数据零丢失,就必须启用Spark Streaming的预写日志机制(Write Ahead Log,WAL)。该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。
二、基于Direct的方式
这种新的不基于Receiver的直接方式,是在Spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用Receiver来接收数据后,这种方式会周期性地查询Kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。
优点如下:
简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。Spark会创建跟Kafka partition一样多的RDD partition,并且会并行从Kafka中读取数据。所以在Kafka partition和RDD partition之间,有一个一对一的映射关系。
高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,Kafka自己本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。而基于direct的方式,不依赖Receiver,不需要开启WAL机制,只要Kafka中作了数据的复制,那么就可以通过Kafka的副本进行恢复。
一次且仅一次的事务机制。
三、对比:
基于receiver的方式,是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的。这是消费Kafka数据的传统方式。这种方式配合着WAL机制可以保证数据零丢失的高可靠性,但是却无法保证数据被处理一次且仅一次,可能会处理两次。因为Spark和ZooKeeper之间可能是不同步的。
基于direct的方式,使用kafka的简单api,Spark Streaming自己就负责追踪消费的offset,并保存在checkpoint中。Spark自己一定是同步的,因此可以保证数据是消费一次且仅消费一次。
在实际生产环境中大都用Direct方式
窗口函数就是在原来定义的SparkStreaming计算批次大小的基础上再次进行封装,每次计算多个批次的数据,同时还需要传递一个滑动步长的参数,用来设置当次计算任务完成之后下一次从什么地方开始计算。
图中time1就是SparkStreaming计算批次大小,虚线框以及实线大框就是窗口的大小,必须为批次的整数倍。虚线框到大实线框的距离(相隔多少批次),就是滑动步长。
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount") val sc = new SparkContext(conf) sc.textFile("/input") .flatMap(_.split(" ")) .map((_,1)) .reduceByKey(_+_) .saveAsTextFile("/output") sc.stop() |
方法1:
(1)按照key对数据进行聚合(groupByKey)
(2)将value转换为数组,利用scala的sortBy或者sortWith进行排序(mapValues)数据量太大,会OOM。
方法2:
(1)取出所有的key
(2)对key进行迭代,每次取出一个key利用spark的排序算子进行排序
方法3:
(1)自定义分区器,按照key进行分区,使不同的key进到不同的分区
(2)对每个分区运用spark的排序算子进行排序
这里举个例子。比如我们有几百个文件,会有几百个map出现,读取之后进行join操作,会非常的慢。这个时候我们可以进行coalesce操作,比如240个map,我们合成60个map,也就是窄依赖。这样再shuffle,过程产生的文件数会大大减少。提高join的时间性能。
Flink核心是一个流式的数据流执行引擎,其针对数据流的分布式计算提供了数据分布、数据通信以及容错机制等功能。基于流执行引擎,Flink提供了诸多更高抽象层的API以便用户编写分布式任务:DataSet API, 对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便地使用Flink提供的各种操作符对分布式数据集进行处理,支持Java、Scala和Python。DataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便地对分布式数据流进行各种操作,支持Java和Scala。Table API,对结构化数据进行查询操作,将结构化数据抽象成关系表,并通过类SQL的DSL对关系表进行各种查询操作,支持Java和Scala。此外,Flink还针对特定的应用领域提供了领域库,例如:Flink ML,Flink的机器学习库,提供了机器学习Pipelines API并实现了多种机器学习算法。Gelly,Flink的图计算库,提供了图计算的相关API及多种图计算算法实现。
架构模型上:Spark Streaming 的task运行依赖driver 和 executor和worker,当然driver和excutor还依赖于集群管理器Standalone或者yarn等。而Flink运行时主要是JobManager、TaskManage和TaskSlot。另外一个最核心的区别是:Spark Streaming 是微批处理,运行的时候需要指定批处理的时间,每次运行 job 时处理一个批次的数据;Flink 是基于事件驱动的,事件可以理解为消息。事件驱动的应用程序是一种状态应用程序,它会从一个或者多个流中注入事件,通过触发计算更新状态,或外部动作对注入的事件作出反应。
任务调度上:Spark Streaming的调度分为构建 DGA 图,划分 stage,生成 taskset,调度 task等步骤,而Flink首先会生成 StreamGraph,接着生成 JobGraph,然后将 jobGraph 提交给 Jobmanager 由它完成 jobGraph 到 ExecutionGraph 的转变,最后由 jobManager 调度执行。
时间机制上:flink 支持三种时间机制事件时间,注入时间,处理时间,同时支持 watermark 机制处理滞后数据。Spark Streaming 只支持处理时间,Structured streaming则支持了事件时间和watermark机制。
容错机制上:二者保证exactly-once的方式不同。spark streaming 通过保存offset和事务的方式;Flink 则使用两阶段提交协议来解决这个问题。
Flink中默认提供了八大分区策略(也叫分区器)。八大分区策略继承关系图
ChannelSelector: 接口,决定将记录写入哪个Channel。有3个方法:
- void setup(int numberOfChannels): 初始化输出Channel的数量。
- int selectChannel(T record): 根据当前记录以及Channel总数,决定应将记录写入下游哪个Channel。八大分区策略的区别主要在这个方法的实现上。
- boolean isBroadcast(): 是否是广播模式。决定了是否将记录写入下游所有Channel。
- StreamPartitioner:抽象类,也是所有流分区器的基类。
注意:
这里以及下边提到的Channel可简单理解为下游Operator的某个实例。Flink 中改变并行度,默认RebalancePartitioner分区策略。分区策略,可在Flink WebUI上直观看出,如REBALANCE,即使用了RebalancePartitioner分区策略;SHUFFLE,即使用了ShufflePartitioner分区策略。
- GlobalPartitioner: DataStream => DataStream
GlobalPartitioner,GLOBAL分区。将记录输出到下游Operator的第一个实例。
selectChannel实现
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { //对每条记录,只选择下游operator的第一个Channel return 0; } |
API使用
dataStream .setParallelism(2) // 采用GLOBAL分区策略重分区 .global() .print() .setParallelism(1); |
- ShufflePartitioner: DataStream => DataStream
ShufflePartitioner,SHUFFLE分区。将记录随机输出到下游Operator的每个实例。
selectChannel实现
private Random random = new Random(); @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { //对每条记录,随机选择下游operator的某个Channel return random.nextInt(numberOfChannels); } |
API使用
dataStream .setParallelism(2) // 采用SHUFFLE分区策略重分区 .shuffle() .print() .setParallelism(4); |
- RebalancePartitioner: DataStream => DataStream
RebalancePartitioner,REBALANCE分区。将记录以循环的方式输出到下游Operator的每个实例。
selectChannel实现
public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { //第一条记录,输出到下游的第一个Channel;第二条记录,输出到下游的第二个Channel...如此循环 nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels; return nextChannelToSendTo; } |
API使用
dataStream .setParallelism(2) // 采用REBALANCE分区策略重分区 .rebalance() .print() .setParallelism(4); |
- RescalePartitioner: DataStream => DataStream
RescalePartitioner,RESCALE分区。基于上下游Operator的并行度,将记录以循环的方式输出到下游Operator的每个实例。举例: 上游并行度是2,下游是4,则上游一个并行度以循环的方式将记录输出到下游的两个并行度上;上游另一个并行度以循环的方式将记录输出到下游另两个并行度上。若上游并行度是4,下游并行度是2,则上游两个并行度将记录输出到下游一个并行度上;上游另两个并行度将记录输出到下游另一个并行度上。
selectChannel实现
private int nextChannelToSendTo = -1; @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { if (++nextChannelToSendTo >= numberOfChannels) { nextChannelToSendTo = 0; } return nextChannelToSendTo; } |
API使用
dataStream .setParallelism(2) // 采用RESCALE分区策略重分区 .rescale() .print() .setParallelism(4); |
- BroadcastPartitioner: DataStream => DataStream
BroadcastPartitioner,BROADCAST分区。广播分区将上游数据集输出到下游Operator的每个实例中。适合于大数据集Join小数据集的场景。
selectChannel实现
@Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { //广播分区不支持选择Channel,因为会输出到下游每个Channel中 throw new UnsupportedOperationException("Broadcast partitioner does not support select channels."); } @Override public boolean isBroadcast() { //启用广播模式,此时Channel选择器会选择下游所有Channel return true; } |
API使用
dataStream .setParallelism(2) // 采用BROADCAST分区策略重分区 .broadcast() .print() .setParallelism(4); |
- ForwardPartitioner
ForwardPartitioner,FORWARD分区。将记录输出到下游本地的operator实例。ForwardPartitioner分区器要求上下游算子并行度一样。上下游Operator同属一个SubTasks。
selectChannel实现
@Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { return 0; } |
API使用
dataStream .setParallelism(2) // 采用FORWARD分区策略重分区 .forward() .print() .setParallelism(2); |
- KeyGroupStreamPartitioner(HASH方式):
KeyGroupStreamPartitioner,HASH分区。将记录按Key的Hash值输出到下游Operator实例。
selectChannel实现
@Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { K key; try { key = keySelector.getKey(record.getInstance().getValue()); } catch (Exception e) { throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e); } return KeyGroupRangeAssignment.assignKeyToParallelOperator(key, maxParallelism, numberOfChannels); } // KeyGroupRangeAssignment中的方法 public static int assignKeyToParallelOperator(Object key, int maxParallelism, int parallelism) { return computeOperatorIndexForKeyGroup(maxParallelism, parallelism, assignToKeyGroup(key, maxParallelism)); } // KeyGroupRangeAssignment中的方法 public static int assignToKeyGroup(Object key, int maxParallelism) { return computeKeyGroupForKeyHash(key.hashCode(), maxParallelism); } // KeyGroupRangeAssignment中的方法 public static int computeKeyGroupForKeyHash(int keyHash, int maxParallelism) { return MathUtils.murmurHash(keyHash) % maxParallelism; } |
API使用
dataStream .setParallelism(2) // 采用HASH分区策略重分区 .keyBy((KeySelector<Tuple3<String, Integer, String>, String>) value -> value.f0) .print() .setParallelism(4); |
- CustomPartitionerWrapper
CustomPartitionerWrapper,CUSTOM分区。通过Partitioner实例的partition方法(自定义的)将记录输出到下游。
selectChannel实现
Partitioner<K> partitioner; KeySelector<T, K> keySelector; public CustomPartitionerWrapper(Partitioner<K> partitioner, KeySelector<T, K> keySelector) { this.partitioner = partitioner; this.keySelector = keySelector; } @Override public int selectChannel(SerializationDelegate<StreamRecord<T>> record) { K key; try { key = keySelector.getKey(record.getInstance().getValue()); } catch (Exception e) { throw new RuntimeException("Could not extract key from " + record.getInstance(), e); } return partitioner.partition(key, numberOfChannels); } |
自定义分区器将指定的Key分到指定的分区
// 自定义分区器,将不同的Key(用户ID)分到指定的分区 // key: 根据key的值来分区 // numPartitions: 下游算子并行度 static class CustomPartitioner implements Partitioner<String> { @Override public int partition(String key, int numPartitions) { switch (key){ case "user_1": return 0; case "user_2": return 1; case "user_3": return 2; default: return 3; } } } |
使用自定义分区器
dataStream .setParallelism(2) // 采用CUSTOM分区策略重分区 .partitionCustom(new CustomPartitioner(),0) .print() .setParallelism(4); |
- Flink的并行度有了解吗?Flink中设置并行度需要注意什么?
Flink程序由多个任务(Source、Transformation、Sink)组成。任务被分成多个并行实例来执行,每个并行实例处理任务的输入数据的子集。任务的并行实例的数量称之为并行度。Flink中人物的并行度可以从多个不同层面设置:操作算子层面(Operator Level)、执行环境层面(Execution Environment Level)、客户端层面(Client Level)、系统层面(System Level)。Flink可以设置好几个level的parallelism,其中包括Operator Level、Execution Environment Level、Client Level、System Level在flink-conf.yaml中通过parallelism.default配置项给所有execution environments指定系统级的默认parallelism;在ExecutionEnvironment里头可以通过setParallelism来给operators、data sources、data sinks设置默认的parallelism;如果operators、data sources、data sinks自己有设置parallelism则会覆盖ExecutionEnvironment设置的parallelism。
重启策略种类:固定延迟重启策略(Fixed Delay Restart Strategy)故障率重启策略(Failure Rate Restart Strategy)无重启策略(No Restart Strategy)Fallback重启策略(Fallback Restart Strategy)
Flink提供了一个分布式缓存,类似于hadoop,可以使用户在并行函数中很方便的读取本地文件,并把它放在taskmanager节点中,防止task重复拉取。
此缓存的工作机制如下:程序注册一个文件或者目录(本地或者远程文件系统,例如hdfs或者s3),通过ExecutionEnvironment注册缓存文件并为它起一个名称。
当程序执行,Flink自动将文件或者目录复制到所有taskmanager节点的本地文件系统,仅会执行一次。用户可以通过这个指定的名称查找文件或者目录,然后从taskmanager节点的本地文件系统访问它。
在Flink中,同一个算子可能存在若干个不同的并行实例,计算过程可能不在同一个Slot中进行,不同算子之间更是如此,因此不同算子的计算数据之间不能像Java数组之间一样互相访问,而广播变量Broadcast便是解决这种情况的。我们可以把广播变量理解为是一个公共的共享变量,我们可以把一个dataset 数据集广播出去,然后不同的task在节点上都能够获取到,这个数据在每个节点上只会存在一份。
- Tumbling Time Window
假如我们需要统计每一分钟中用户购买的商品的总数,需要将用户的行为事件按每一分钟进行切分,这种切分被成为翻滚时间窗口(Tumbling Time Window)。翻滚窗口能将数据流切分成不重叠的窗口,每一个事件只能属于一个窗口。
- Sliding Time Window
我们可以每30秒计算一次最近一分钟用户购买的商品总数。这种窗口我们称为滑动时间窗口(Sliding Time Window)。在滑窗中,一个元素可以对应多个窗口。
- Tumbling Count Window
当我们想要每100个用户购买行为事件统计购买总数,那么每当窗口中填满100个元素了,就会对窗口进行计算,这种窗口我们称之为翻滚计数窗口(Tumbling Count Window),上图所示窗口大小为3个。
- Session Window
在这种用户交互事件流中,我们首先想到的是将事件聚合到会话窗口中(一段用户持续活跃的周期),由非活跃的间隙分隔开。如上图所示,就是需要计算每个用户在活跃期间总共购买的商品数量,如果用户30秒没有活动则视为会话断开(假设raw data stream是单个用户的购买行为流)。一般而言,window 是在无限的流上定义了一个有限的元素集合。这个集合可以是基于时间的,元素个数的,时间和个数结合的,会话间隙的,或者是自定义的。Flink 的 DataStream API 提供了简洁的算子来满足常用的窗口操作,同时提供了通用的窗口机制来允许用户自己定义窗口分配逻辑。
Flink流计算中可能有各种方式来保存状态:
- 窗口操作
- 使用了KV操作的函数
- 继承了CheckpointedFunction的函数
当开始做checkpointing的时候,状态会被持久化到checkpoints里来规避数据丢失和状态恢复。选择的状态存储策略不同,会导致状态持久化如何和checkpoints交互。
Flink内部提供了这些状态后端:
- MemoryStateBackend
- FsStateBackend
- RocksDBStateBackend
如果没有其他配置,系统将使用MemoryStateBackend。
Flink中的时间与现实世界中的时间是不一致的,在flink中被划分为事件时间,摄入时间,处理时间三种。如果以EventTime为基准来定义时间窗口将形成EventTimeWindow,要求消息本身就应该携带EventTime如果以IngesingtTime为基准来定义时间窗口将形成IngestingTimeWindow,以source的systemTime为准。如果以ProcessingTime基准来定义时间窗口将形成ProcessingTimeWindow,以operator的systemTime为准。
Watermark是Apache Flink为了处理EventTime 窗口计算提出的一种机制,本质上也是一种时间戳。watermark是用于处理乱序事件的,处理乱序事件通常用watermark机制结合window来实现。详细参考:https://www.jianshu.com/p/1c2542f11da0
TableEnvironment是Table API和SQL集成的核心概念。它负责:
- 在内部catalog中注册表
- 注册外部catalog
- 执行SQL查询
- 注册用户定义(标量,表或聚合)函数
- 将DataStream或DataSet转换为表
- 持有对ExecutionEnvironment或StreamExecutionEnvironment的引用
StreamSQL API的执行原理如下:
- 用户使用对外提供Stream SQL的语法开发业务应用;
- 用calcite对StreamSQL进行语法检验,语法检验通过后,转换成calcite的逻辑树节点;最终形成calcite的逻辑计划;
- 采用Flink自定义的优化规则和calcite火山模型、启发式模型共同对逻辑树进行优化,生成最优的Flink物理计划;
- 对物理计划采用janino codegen生成代码,生成用低阶API DataStream 描述的流应用,提交到Flink平台执行详细参考:https://cloud.tencent.com/developer/article/1471612
Flink设计者认为:有限流处理是无限流处理的一种特殊情况,它只不过在某个时间点停止而已。Flink通过一个底层引擎同时支持流处理和批处理。详细参考:
https://cloud.tencent.com/developer/article/1501348
大概的原理,上游的task产生数据后,会写在本地的缓存中,然后通知JM自己的数据已经好了,JM通知下游的Task去拉取数据,下游的Task然后去上游的Task拉取数据,形成链条。
但是在何时通知JM?这里有一个设置,比如pipeline还是blocking,pipeline意味着上游哪怕产生一个数据,也会去通知,blocking则需要缓存的插槽存满了才会去通知,默认是pipeline。
虽然生产数据的是Task,但是一个TaskManager中的所有Task共享一个NetworkEnvironment,下游的Task利用ResultPartitionManager主动去上游Task拉数据,底层利用的是Netty和TCP实现网络链路的传输。
那么,一直都在说Flink的背压是一种自然的方式,为什么是自然的了?
从上面的图中下面的链路中可以看到,当下游的process逻辑比较慢,无法及时处理数据时,他自己的local buffer中的消息就不能及时被消费,进而导致netty无法把数据放入local buffer,进而netty也不会去socket上读取新到达的数据,进而在tcp机制中,tcp也不会从上游的socket去读取新的数据,上游的netty也是一样的逻辑,它无法发送数据,也就不能从上游的localbuffer中消费数据,所以上游的localbuffer可能就是满的,上游的operator或者process在处理数据之后进行collect.out的时候申请不能本地缓存,导致上游的process被阻塞。这样,在这个链路上,就实现了背压。
如果还有相应的上游,则会一直反压上去,一直影响到source,导致source也放慢从外部消息源读取消息的速度。一旦瓶颈解除,网络链路畅通,则背压也会自然而然的解除。
Flink基于分布式快照与可部分重发的数据源实现了容错。用户可自定义对整个Job进行快照的时间间隔,当任务失败时,Flink会将整个Job恢复到最近一次快照,并从数据源重发快照之后的数据。
详细参考:https://www.jianshu.com/p/1fca8fb61f86
- Flink在使用Window时出现数据倾斜,你有什么解决办法?
注意:这里window产生的数据倾斜指的是不同的窗口内积攒的数据量不同,主要是由源头数据的产生速度导致的差异。核心思路:1.重新设计key 2.在窗口计算前做预聚合可以参考这个:https://blog.csdn.net/it_lee_j_h/article/details/88641894
- Flink任务,delay极高,请问你有什么调优策略?
首先要确定问题产生的原因,找到最耗时的点,确定性能瓶颈点。比如任务频繁反压,找到反压点。主要通过:资源调优、作业参数调优。资源调优即是对作业中的Operator的并发数(parallelism)、CPU(core)、堆内存(heap_memory)等参数进行调优。作业参数调优包括:并行度的设置,State的设置,checkpoint的设置。
SKU(库存量单位):一台银色、128G内存的、支持联通网络的iPhoneX
SPU(标准产品单位):iPhoneX
标签 |
含义 |
|
id |
订单编号 |
|
total_amount |
订单金额 |
|
order_status |
订单状态 |
|
user_id |
用户id |
|
payment_way |
支付方式 |
|
out_trade_no |
支付流水号 |
|
create_time |
创建时间 |
|
operate_time |
操作时间 |
|
- 订单详情表(order_detail)
标签 |
含义 |
|
id |
订单编号 |
|
order_id |
订单号 |
|
user_id |
用户id |
|
sku_id |
商品id |
|
sku_name |
商品名称 |
|
order_price |
商品价格 |
|
sku_num |
商品数量 |
|
create_time |
创建时间 |
|
- 商品表
标签 |
含义 |
|
id |
skuId |
|
spu_id |
spuid |
|
price |
价格 |
|
sku_name |
商品名称 |
|
sku_desc |
商品描述 |
|
weight |
重量 |
|
tm_id |
品牌id |
|
category3_id |
品类id |
|
create_time |
创建时间 |
|
- 用户表
标签 |
含义 |
|
id |
用户id |
|
name |
姓名 |
|
birthday |
生日 |
|
gender |
性别 |
|
邮箱 |
||
user_level |
用户等级 |
|
create_time |
创建时间 |
|
- 商品一级分类表
标签 |
含义 |
|
id |
id |
|
name |
名称 |
|
- 商品二级分类表
标签 |
含义 |
|
id |
id |
|
name |
名称 |
|
category1_id |
一级品类id |
|
- 商品三级分类表
标签 |
含义 |
|
id |
id |
|
name |
名称 |
|
Category2_id |
二级品类id |
|
- 支付流水表
标签 |
含义 |
|
id |
编号 |
|
out_trade_no |
对外业务编号 |
|
order_id |
订单编号 |
|
user_id |
用户编号 |
|
alipay_trade_no |
支付宝交易流水编号 |
|
total_amount |
支付金额 |
|
subject |
交易内容 |
|
payment_type |
支付类型 |
|
payment_time |
支付时间 |
|
订单表跟订单详情表有什么区别?
订单表的订单状态会变化,订单详情表不会,因为没有订单状态。
订单表记录user_id,订单id订单编号,订单的总金额order_status,支付方式,订单状态等。
订单详情表记录user_id,商品sku_id ,具体的商品信息(商品名称sku_name,价格order_price,数量sku_num)