暂时未有相关云产品技术能力~
一.引言 项目实现中需要连接 redis,为了防止因网络抖动或其他原因造成的客户端连接失败,一般需要增加重试机制判断 client 是否连接成功,之前写了一版重连代码发现有 bug,借此机会看下代码 bug 以及如何更好的重连 redis。二.错误代码def getRedisClient(host: String, port: Int, timeout: Int = 0): Jedis = { var redisClient: Jedis = null var attempt = 3 while (attempt > 0 && redisClient == null) { try { if (!timeout.equals(0)) { redisClient = new Jedis(host, port, timeout) } else { redisClient = new Jedis(host, port) } } catch { case e: Throwable => { e.printStackTrace() println(s"redis连接异常 $host 重试次数: $attempt") Thread.sleep(1000) } } finally { attempt -= 1 } } redisClient }该代码为了实现报错重连机制,上线后发现存在下述问题 (短短几行代码这么多 bug o(╥﹏╥)o):A.host 正确,不抛异常,client 不可用出现该情况可能是网络抖动造成,不会进入 try catch 逻辑,但是后续使用该 client 会报错,因为redisClient = new Jedis() 方法没有返回值,所以只初始化无法判断该连接是否有效:public Jedis(final String host, final int port, final int timeout) { super(host, port, timeout); }B.host 错误,抛异常,client 不可用host 错误情况下,直接报错 java.net.UnknownHostExceptionC.client == null 判断问题while 循环 client == null 作为判断条件,但只在第一次生效,因为第一次初始化后,即使是错误的 host 和 ip,client 也会变为 Jedis 对象而不再是 null,从而不会执行后续重试逻辑:编辑三.修改版上面A,B,C三个问题总结下来就是两个问题:(1) Jedis 客户端连接失败时需要抛出异常进入 catch 逻辑(2) while 循环判断时需要新的判断条件这个时候 ping 方法满足我们的需求,该方法返回 String,client 正常时返回 "PONG"(新的判断条件),client 无效时则抛出异常(进入catch逻辑),所以满足了上述要求:public String ping() { checkIsInMulti(); client.ping(); return client.getStatusCodeReply(); }这里将重试次数增加为 10 次,将判断条件修改为 ping 后返回的结果,同时保证 ping 后出问题可以打出异常日志方便后续问题排查:def getRedisClient(host: String, port: Int, timeout: Int = 0): Jedis = { var redisClient: Jedis = null var attempt = 10 var ping = "" while (attempt > 0 && ping != "PONG") { try { if (!timeout.equals(0) && attempt >= 10) { redisClient = new Jedis(host, port, timeout) } else { redisClient = new Jedis(host, port) } ping = redisClient.ping() } catch { case e: Throwable => { e.printStackTrace() println(s"redis连接异常 $host 重试次数: $attempt") Thread.sleep(1000) } } finally { attempt -= 1 } } println(s"初始化成功 ${host} ping: $ping") redisClient }四.能不能再改进一点redis 配置为主从结构,上述方法针对单台 redis 进行 10 次连接,一般情况下不会出问题,但是极端情况下还是会导致服务异常,所以可以在代码中增加另一台 host 的连接,从而保证 slave or master 单点故障的情况下服务仍高可用:def getRedisClient(host: String, port: Int, timeout: Int = 0, other_host: String = ""): Jedis = { var redisClient: Jedis = null var attempt = 20 var ping = "" while (attempt > 0 && ping != "PONG") { try { if (!timeout.equals(0) && attempt >= 10) { redisClient = new Jedis(host, port, timeout) } else if (attempt >= 10) { redisClient = new Jedis(host, port) } else if (!timeout.equals(0)) { redisClient = new Jedis(other_host, port, timeout) } else { redisClient = new Jedis(other_host, port) } ping = redisClient.ping() } catch { case e: Throwable => { e.printStackTrace() if (attempt >= 10) { println(s"redis连接异常 $host 重试次数: $attempt") } else { println(s"redis连接异常 $other_host 重试次数: $attempt") } Thread.sleep(1000) } } finally { attempt -= 1 } } if (attempt >= 10) { println(s"初始化成功 ${host} ping: $ping") } else { println(s"初始化成功 ${other_host} ping: $ping") } redisClient修改后的程序相比最一开始鲁棒性更好,服务也更加高可用,非常的奈斯 (^-^)V
一.引言scala / java 项目引用非官方依赖 jar 包时,需要自定义并打入最终的 jar 包,经过试验以下方案可以实现。二.添加 jar 包到 maven 库 👍第三方自定义 jar 包可以添加到本地 maven 库中,随后即可 mvn package 打入到最终的项目 jar 包中,该方法最方便。创建 install.sh 文件,jar_path 为第三方自定义 jar 包在设备的位置,groupId、artifactId 和 版本号 version 自己定义,执行脚本后 echo 会打印出对应依赖的 pom 格式,粘贴复制到项目 maven 中后打包即可添加该 jar。#!/bin/bash jar_path=./lib/yourself.jar jar_groupId=com.self DartifactId=myjar Dversion=0.1 mvn install:install-file -Dfile=$jar_path -DgroupId=$jar_groupId -DartifactId=$DartifactId -Dversion=$Dversion -Dpackaging=jar -DgeneratePom=true pom="<dependency>\n \t<groupId>$jar_groupId</groupId>\n \t<artifactId>$DartifactId</artifactId>\n \t<version>$Dversion</version>\n </dependency>" echo $pom三.添加 jar 包到 项目目录上面的方法可以将对应第三方 jar 添加至最终的 jar 包中,除此之外,也可以将第三方 jar 包放到项目中,也可以实现上述效果。1.添加 jar 包到项目可以直接在 root 目录下创建 lib 文件夹,也可以在 src/main/resources 文件夹下创建 lib 文件夹,然后将第三方 jar 包拷贝至创建的目录中。编辑2.添加 jar 包到 mavenFile -> Project Structrue -> Libraries -> + -> java 到项目对应位置选择 jar 包即可添加,后面项目中即可正常使用该第三方 jar 包。编辑3.第三方 jar 包打入最终 jar 包正常情况下直接 mvn package 即可将第三方 jar 包打入,如果打包不生效就在 pom 中添加如下 plugin 配置:<plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.3</version> <configuration> <source>1.8</source> <target>1.8</target> <encoding>UTF-8</encoding> <compilerArguments> <extdirs>${project.basedir}/lib</extdirs> </compilerArguments> </configuration> </plugin> </plugins>extdirs 的地址中 ${project.basedir} 是该项目的根目录,后续 lib 为添加的项目地址,如果添加到 src/main/resources/lib 下,则修改为 ${project.basedir}/src/main/resources/lib。四.总结这里推荐使用第一个 👍 的方法,整体比较简洁省事,如果第三方 jar 包通过 --jars 传入,则 lib 下对应的第三方 jar 包只用于编译,此时使用 scope = syetem 的打包方式即可,这样可以压缩最终 jar 包的大小,减小传输量 :<dependency> <groupId>$groupId</groupId> <artifactId>$artifactId</artifactId> <version>$version</version> <scope>system</scope> <systemPath>${project.basedir}/lib/your.jar</systemPath> </dependency>
一.引言编辑数据源创建初始数据集,这里主要以 DataSet 数据源为例,例如从文件或者从 collection 中创建,后续介绍 DataStreaming 的数据源获取方法。创建数据集的机制一般抽象在 InputFormat 后面,这里有点类似 spark 的 sparkContext,Flink 的 ExecutionEnvironment 也提供了很多快捷的方法。主要分为下面几大类,基于文件的和基于集合的 :File-Based 基于文件 readTextFile(path)TextInputFormat -读取文件行并返回字符串。readTextFileWithValue(path) TextValueInputFormat -读取文件行并返回StringValues。StringValues是可变字符串。readCsvFile(path)CsvInputFormat -解析逗号(或其他char)分隔字段的文件。返回由元组或pojo组成的数据集。支持基本java类型及其对应值作为字段类型。readFileOfPrimitives(path, Class) PrimitiveInputFormat—解析以新行(或其他字符序列)分隔的原始数据类型的文件,如String或Integer。 readFileOfPrimitives(path, delimiter, Class)PrimitiveInputFormat—使用给定的分隔符,解析以新行(或其他字符序列)分隔的原始数据类型(如String或Integer)的文件。Collection-Based 基于集合fromCollection(Collection)从Java.util.Collection创建一个数据集。集合中的所有元素必须具有相同的类型,当然也可以是 scala 的。fromCollection(Iterator, Class)从迭代器创建一个数据集。该类指定迭代器返回的元素的数据类型。fromElements(T…) 根据给定的对象序列创建一个数据集。所有对象必须是相同的类型。 fromParallelCollection(SplittableIterator, Class) 从一个迭代器中并行创建一个数据集。该类指定迭代器返回的元素的数据类型。 generateSequence(from, to)在给定的间隔内并行生成数字序列。Generic 泛型 readFile(inputFormat, path) FileInputFormat - 接受文件输入格式。 createInput(inputFormat) / inputFormat 接受通用的输入格式。Tips:介绍前这里先初始化好执行的 ExecutionEnvironment ,后面的示例都将基于改 env 实现。注意最下面 import 的隐式转换,Flink 基于 Scala 时很多方法都需要隐式转换,否则 api 执行会报错。import org.apache.flink.api.scala.{DataSet, ExecutionEnvironment} val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironment import org.apache.flink.api.scala._二.FileBased 基于文件1.readTextFileApi readTextFile 和 spark 的 textFile 很像,它可以读取本地的文件,也可以读取 HDFS 或者集群上的数据,并支持自动识别一些压缩格式文件,就像 textFile 可以直接读取 gz 一样。val textLinesFromLocalFile: DataSet[String] = env.readTextFile("./myfile") val textLinesFromHdfs: DataSet[String] = env.raedTextFile("hdfs://nnHost:nnPort/path/to/my/textfile") //触发程序执行 textLines.print()对于本地和集群的文件都可以直接调用执行,截止到 Flink v1.14.3 该接口支持以下压缩格式:Compressed Method 压缩方法File Extensions 文件扩展名Parallelizable 可压缩DEFLATE.deflatenoGZip.gz, .gzipnoBzip2.bz2noXZ.xznoZStandart.zstno2.readTextFileWithValue从文本文件中读取数据并以StringValue类型返回,StringValue类型为可变字符串。此方法和readTextFile方法类似,只不过是制定了数据类型,返回的不是 DataSet[String] 而是 DataSet[StringValue]val textLines = env.readTextFileWithValue("./yourfile") //触发程序执行 textLines.print()3.readCsvFilecsv 文件内容如下:1,2,3,4,5 2,3,4,5,6 3,4,5,6,7 4,5,6,7,8 5,6,7,8,9A.基础读法[(隐式转换)] 中指定了 csv 文件中各个元素的数据类型val csvInput = env.readCsvFile[(String,String,String,String,String)]("./info.csv") csvInput.print()B.读取指定行includedFields 使用数组,其中数字代表的含义为要保留的数据列对应的列数,这里还支持 ignoreFirstLine = true 参数可以去除带表头的 csv 文件。val csvInput2 = env.readCsvFile[(String, Double)]("./info.csv", includedFields = Array(0, 3)) csvInput2.print()C.读取生成指定类scala 支持 caseClass 快速定义数据类,这里 [] 内代表返回的数据类型,pojoFields 指定对应列的 colName, 由于只给出了三列而原始数据有五列,所以只返回对应三列的数据case class Person(a: String, b: String, c: String) val csvInput3 = env.readCsvFile[Person]("./info.csv", pojoFields = Array("name", "age", "gender")) csvInput3.print()Person(4,5,6) Person(1,2,3) Person(5,6,7) Person(2,3,4) Person(3,4,5)4.readFileOfPrimitives读取一个原始数据类型(如String,Integer)的文件,返回一个对应的原始类型的DataSet集合。这里第一个参数为对应文件 path,第二个参数为分割符,以上面的 csv 文件数据为例,读取文件时会自动分割原始数据,得到类似的 DateSet[1,2,3,4,5,6,......],原始方法中还有一个 class 参数指定输出数据类型,这里隐式方法 env.readFileOfPrimitives[String] 的 [Class] 已经实现了该功能,所以 readFileOfPrimitives(path, delimiter, Class) 可以看作是 readFileOfPrimitives(path, Class) 的一个扩展。val textLinesOfPrimitives = env.readFileOfPrimitives[String]("./info.csv", delimiter = ",") textLinesOfPrimitives.print()Line: 1,2,3,4,5 DataSet.print(): 1 2 3 4 5三. Collection-Based 基于集合1.fromCollection该方法有两个参数类型,一种是直接从 collection 中初始化,还有一种是从 iterator 中初始化,两者基本类似。这里如果是 java 则对应 java.util.Collection ,scala 则对应 scala.collection//1.用Array创建DataSet val dataSet1: DataSet[String] = env.fromCollection(Array("spark", "flink")) dataSet1.print() //2.用Iterable创建DataSet val dataSet2: DataSet[String] = env.fromCollection(Iterable("spark", "flink")) dataSet2.print()由于 collection 中包含多种数据结构,写法相同,下面给出一些可以用于初始化的常见数据结构 :Array,ArrayBuffer,List,ListBuffer,Vector,mutable.Queue,mutable.Stack,Stream,Seq,Set,Iteratable, Iterator,mutable.ArraySeq,mutable.ArrayStack,Map,Range。还有一个特殊的 generateSequence 可以生成 DataSet :val numbers = env.generateSequence(1, 10000000)2.fromElements根据给定的对象序列创建数据集。所有对象必须是相同的类型。这个就比较好理解了,直接给出相同类型的元素即可,fromCollection 和 fromElements 身上都可以看到一丝 spark.parallelize 序列化函数的影子//1.用element创建DataSet(fromElements) val dataSet1: DataSet[String] = env.fromElements("spark", "flink") dataSet1.print() //2.用Tuple创建DataSet(fromElements) val dataSet2: DataSet[(Int, String)] = env.fromElements((1, "spark"), (2, "flink")) dataSet2.print()3. fromParallelCollectionpackage org.apache.flink.util; import java.io.Serializable; import java.util.Iterator; import org.apache.flink.annotation.Public; @Public public abstract class SplittableIterator<T> implements Iterator<T>, Serializable { private static final long serialVersionUID = 200377674313072307L; public SplittableIterator() { } public abstract Iterator<T>[] split(int var1); public Iterator<T> getSplit(int num, int numPartitions) { if (numPartitions >= 1 && num >= 0 && num < numPartitions) { return this.split(numPartitions)[num]; } else { throw new IllegalArgumentException(); } } public abstract int getMaximumNumberOfSplits(); }fromParallelCollection 的参数为 SplittableIterator, SplittableIterator是个抽象类,它定义了抽象方法 split 以及 getMaximumNumberOfSplits;它有两个实现类,分别是LongValueSequenceIterator以及NumberSequenceIterator。两个实现类实现了常用 number 的迭代器实现和 Long 的迭代器实现,有兴趣的小伙伴可以去看下 SplittableIterator 和各自实现类的源码,没兴趣的话你就只需要知道该方法可以并行读取迭代器并返回指定元素的数据类型。val start = System.currentTimeMillis() val it = (0 to 100).iterator val dataSetSingle: DataSet[Int] = env.fromCollection(it) dataSetSingle.print() println("Single thread Cost: ", (System.currentTimeMillis() - start)) val start1 = System.currentTimeMillis() val itSequence = new NumberSequenceIterator(0, 100) val dataSetParellel = env.fromParallelCollection(itSequence) dataSetParellel.print() println("Parallel thread Cost: ", (System.currentTimeMillis() - start1))二者主要体现在并行的效率上 : (Single thread Cost: 3886) (Parallel thread Cost: 939)四.Generic 泛型1.readFileA.ExecutionEnvironment该方法接受文件输入格式,指定 inputFormat 和 path 即可输出文件内容val data = env.readFile(new TextInputFormat(null), "./info.csv") data.print()1,2,3,4,5 3,4,5,6,7 5,6,7,8,9 4,5,6,7,8 2,3,4,5,6B.StreamExecutionEnvironment上述 env 采用 ExecutionEnvironment.getExecutionEnvironment,可以看作是 sparkContext 处理离线任务,还有一种 StreamExecutionEnvironment 可以看作是 StreamingContext 处理流式任务,该 env 也拥有 readFile api :val envStreaming = StreamExecutionEnvironment.getExecutionEnvironment val dataSource = envStreaming.readFile(new TextInputFormat(null), "./info.csv", FileProcessingMode.PROCESS_CONTINUOUSLY, 5000L) dataSource.print() envStreaming.execute()ExecutionEnvironment 执行时只读取文件一次,StreamingExecutionEnvironment 在 PROCESS_CONTINUOUSLY 模式下会根据 interval = 5000L ms 持续扫描文件,如果文件发生修改则重新读取文件内容,这里 interval 可以自定义。如果选择 PROCESS_ONE 模式,则会退化为 ExecutionEnvironment 的 readFIle 即只读一次。2.createInput该方法下接受通用输入格式。该方法和 spark.HadoopRDD 接口比较类似了,自定义的部分比较大。// read a file from the specified path of type SequenceFileInputFormat val tuples = env.createInput(HadoopInputs.readSequenceFile(classOf[IntWritable], classOf[Text], "hdfs://nnHost:nnPort/path/to/file"))五.总结ExecutionEnvironment 模型下主要以静态 DateSet 为 DataSource 并进行后续处理,很多接口的含义和执行与 spark 很类似,其主要思想为批处理,后续介绍 DataSet 常用的 transform 函数与批处理方法。
一.引言Flink 的数据处理主要分三步,第一步 Source 为数据源,分为 DataSet 和 DataStreaming ,后一步为 Transformation 负责处理和转换数据,针对不同的 DataSource,Transformation 可能会存在差异,最后一步是 sink 负责将结果输出。前面介绍了 DataSet 的 Source 和 Transformation,这里介绍下 DataSet 和 DataStreaming 的 Sink 相关 API。编辑Tips:下述代码区分为 DataSet 和 DataStreaming,所以执行环境会有不同:// 二者都需要引入 import org.apache.flink.streaming.api.scala._ // DataSet 选择 val env = ExecutionEnvironment.getExecutionEnvironment // DataStreaming 选择 val env = StreamExecutionEnvironment.getExecutionEnvironment二.DataSet1.writeAsTextA.存储在本机和HDFSwriteAsText 可以根据地址的不同自适应的存储在 Local FileSystem 和 Hdfs System 上:// 读取本地路径并输出到本地 val textLines = env.readTextFile("InputPath") textLines.writeAsText("OutputPath") // 读取 Hdfs 路径并输出到 Hdfs val textLinesOnHdfs = env.readTextFile("hdfs://nnHost:nnPort/file") textLinesOnHdfs.writeAsText("hdfs://nnHost:nnPort/my/result/on/localFS")B.模式选择writeAsText 有下述两种模式可以选择:· WriteMode.NO_OVERWRITE : 指定路径不存在文件,执行写操作· WriteMode.OVERWRITE:指定路径不存在文件,执行写操作;存在文件则进行覆盖,注意这里不是追加import org.apache.flink.core.fs.FileSystem.WriteMode val textLines = env.readTextFile("InputPath") textLines.writeAsText("OutputPath", WriteMode.OVERWRITE)C.输出数量writeAsText 提供 setParallelism 方法,该方法控制输出文件数量,如果输出路径为 output,不设置该方法时会生成 output 文件夹并在文件夹下生成多个文件,如果 setParallelism(1) ,则会生成 output 文件并将全部输出写入该文件中:val textLines = env.readTextFile("InputPath") textLines.writeAsText("OutputPath").setParallelism(1)2.writeAsCsvwriteAsCsv 将输出文件存储为 csv 格式 ,共提供4个参数:def writeAsCsv(filePath : _root_.scala.Predef.String, rowDelimiter : _root_.scala.Predef.String, fieldDelimiter : _root_.scala.Predef.String, writeMode : org.apache.flink.core.fs.FileSystem.WriteMode) : org.apache.flink.api.java.operators.DataSink[T] = { /* compiled code */ }·filePath:输出路径· rowDelimiter:行分割符· fieldDelimiter:csv 文件各字符分隔符· wirteMode:输出模式A.默认输出val values: DataSet[(String, Int, Double)] = env.fromElements(("A", 1, 2D), ("B", 1, 2D), ("C", 1, 2D), ("D", 1, 2D)) values.writeAsCsv("outputV1.csv")A,1,2.0 B,1,2.0 C,1,2.0 D,1,2.0B.选择参数输出每行数据 \n 分割,每个元祖的字符 "|" 分割val values: DataSet[(String, Int, Double)] = env.fromElements(("A", 1, 2D), ("B", 1, 2D), ("C", 1, 2D), ("D", 1, 2D)) values.writeAsCsv("outputV2.csv", "\n", "|")A|1|2.0 B|1|2.0 C|1|2.0 D|1|2.03.Stdout,Stderrprint \ printToErr 一般多见于测试数据输出,可以将计算结果输出到控制台的 Stdout 或者 Stderr 上。val textLines = env.readTextFile("InputPath") textLines.print() textLines.printToErr()编辑4.Output with OutputFormat 数据也可以根据自己自定义的 Format 进行输出:val textLines = env.readTextFile("InputPath") val textFormat = new TextOutputFormat[String](new Path("OutputPath")) textLines.output(textFormat).setParallelism(1)Tips:Flink 和 Spark 类似,其内部执行逻辑也是 Lazy Mode,因此 writeAsText,writeAsCsv,output 方法均不会触发 Flink 执行代码逻辑,除了 print 和 printToErr 可以直接触发,上述操作需要再额外调用 env.execute() 才会执行 :env.execute()三.DataStreamDataStream 可以将数据写入文件、标准输出、标准错误输出和 Socket。除了官方提供的基本 Sink 组件外,Flink 还额外支持了下述连接器,可以很好地实现工程交互,截止 v1.14.3,Flink 支持一下第三方 Connector:Connector 类别支持方式Apache Kafkasource/sinkApache CassandrasinkAmazon Kinesis Streamssource/sinkElasticsearchsinkFileSystemsinkRabbitMQsource/sinkGoogle PubSubsource/sinkHybrid Source sourceApache NiFi source/sinkApache PulsarsourceTwitter Streaming APIsourceJDBCsink1.Write to SocketDataSet 和 DataStream 相比后者比前者多一个 WriteToSocket 方法,支持将流数据写入到 SocketstreamSource.writeToSocket("ip", port, new SimpleStringSchema());可以通过上述方法将数据写入 Socket,例如写入本机 -> ip = "localhost",port=9999,后续通过下述方法即可监听到上述写到 Socket 中的数据并进行后续的处理逻辑。env.socketTextStream("localhost", 9999)...2.Kafka Consumer / ProducerA.ConsumerFlink 的 Kafka consumer 称为 FlinkKafkaConsumer。它提供对一个或多个 Kafka topics 的访问。需要引入如下依赖:<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kafka_2.11</artifactId> <version>1.14.3</version> </dependency>Kafka 配置 :val properties = new Properties() properties.setProperty("bootstrap.servers", "localhost:9092") properties.setProperty("group.id", "test") val stream = env .addSource(new FlinkKafkaConsumer[String]("topic", new SimpleStringSchema(), properties))共有三个参数需要填写:Topic: kafka 消费 topic 名称或名称列表StringSchema: 用于反序列化 kafka 数据的 DeserializationSchema 或者KafkaDeserializationSchema,内部主要需要复写 deserialize 方法用于将 kafka 数据转换并写生成 DataStream。Properties:kafka 配置,除相关配置外,还需要提供 bootstrao.servers 和 消费的 group_id。生成 DataStream 并消费 :val env = StreamExecutionEnvironment.getExecutionEnvironment() val myConsumer = new FlinkKafkaConsumer[String](...) myConsumer.setStartFromEarliest() // 尽可能从最早的记录开始 myConsumer.setStartFromLatest() // 从最新的记录开始 myConsumer.setStartFromTimestamp(...) // 从指定的时间开始(毫秒) myConsumer.setStartFromGroupOffsets() // 默认的方法 val stream = env.addSource(myConsumer) ...B.ProducerFlink Kafka Producer 被称为 FlinkKafkaProducer。它允许将消息流写入一个或多个 Kafka topic,需要填写生成 kafka 的 Topic 并将相关输出的参数配置到 properties 中,最后一个参数控制容错语义。val stream: DataStream[String] = ... val properties = new Properties properties.setProperty("bootstrap.servers", "localhost:9092") val myProducer = new FlinkKafkaProducer[String]( "my-topic", // 目标 topic new SimpleStringSchema(), // 序列化 schema properties, // producer 配置 FlinkKafkaProducer.Semantic.EXACTLY_ONCE) // 容错 stream.addSink(myProducer)3.ElasticsearchSink 也支持与 ES 交互,写入 ES 库中,由于版本比较多,因此 maven 也有较多选择:ES 5.x <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch5_2.11</artifactId> <version>1.14.3</version> </dependency> ES 6.x <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch6_2.11</artifactId> <version>1.14.3</version> </dependency> ES 7 及更高版本 <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-elasticsearch7_2.11</artifactId> <version>1.14.3</version> </dependency>下述 Demo 适用于 scala + ES 6.x 及以上:import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink import org.apache.http.HttpHost import org.elasticsearch.action.index.IndexRequest import org.elasticsearch.client.Requests import java.util.ArrayList import java.util.List val input: DataStream[String] = ... val httpHosts = new java.util.ArrayList[HttpHost] httpHosts.add(new HttpHost("127.0.0.1", 9200, "http")) httpHosts.add(new HttpHost("10.2.3.1", 9200, "http")) val esSinkBuilder = new ElasticsearchSink.Builder[String]( httpHosts, new ElasticsearchSinkFunction[String] { def process(element: String, ctx: RuntimeContext, indexer: RequestIndexer) { val json = new java.util.HashMap[String, String] json.put("data", element) val rqst: IndexRequest = Requests.indexRequest .index("my-index") .`type`("my-type") .source(json) indexer.add(rqst) } } // 批量请求的配置;下面的设置使 sink 在接收每个元素之后立即提交,否则这些元素将被缓存起来 esSinkBuilder.setBulkFlushMaxActions(1) // 为内部创建的 REST 客户端提供一个自定义配置信息的 RestClientFactory esSinkBuilder.setRestClientFactory(new RestClientFactory { override def configureRestClientBuilder(restClientBuilder: RestClientBuilder): Unit = { restClientBuilder.setDefaultHeaders(...) restClientBuilder.setMaxRetryTimeoutMillis(...) restClientBuilder.setPathPrefix(...) restClientBuilder.setHttpClientConfigCallback(...) } }) // 最后,构建并添加 sink 到作业管道中 input.addSink(esSinkBuilder.build)4.自定义 Sinkimport org.apache.flink.streaming.api.functions.sink.RichSinkFunction如上述方法不能满足工程需求,则可以自己定义 Sink 方式,这里需要继承 RichSinkFunction[T],T 为上一个 DataStream 的数据类型,并重写下述 3 个函数完成 Sink 需求:· open(parameters: Configuration):初始化函数,负责初始化 sink 相关的连接和客户端 client· invoke(value: T): 每个要 sink 的数据都用通过 invoke 方法并处理 T 最终写入· close:关闭 open 函数启动的 connection 和 client四.总结上述方法简单介绍了 DataSet 和 DataStream 的常用 Sink 方式,Flink 主要优势还是体现在 DataStream 流式处理上,所以 DataSet 相对内容较少。Kafka + Flink 是非常常见的处理流程,所以上面主要给出了 Kafka + Flink 的相关示例,如有更多需求可参考官方 API 提供的更详细的方法 -> Flink Connectors。
一.引言DataStream API 得名于特殊的 DataStream 类,该类用于表示 Flink 程序中的数据集合。你可以认为 它们是可以包含重复项的不可变数据集合。这些数据可以是有界(有限)的,也可以是无界(无限)的,但用于处理它们的API是相同的。DataStream 在用法上类似于常规的 Java 集合,但在某些关键方面却大不相同。它们是不可变的,这意味着一旦它们被创建,你就不能添加或删除元素。你也不能简单地察看内部元素,而只能使用 DataStream API 操作来处理它们,DataStream API 操作也叫作转换(transformation)。你可以通过在 Flink 程序中添加 source 创建一个初始的 DataStream。然后,你可以基于 DataStream 派生新的流,并使用 map、filter 等 API 方法把 DataStream 和派生的流连接在一起。和之前相同,一个 DataStrea 的处理主要包含 Source + Transformation + Sink 的组合:编辑 Tips:与之前不同的是,DataSet 的执行环境为:val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironmentDataStreaming 的执行环境为:val env = StreamExecutionEnvironment.getExecutionEnvironment 二.FileBased 基于文件这里大部分接口与 DataSet 类似,由于 env 的不同,得到的最终类型也不同,由 DataSet 变为了 DataStreaming1.readTextFile(path)读取文本文件,例如遵守 TextInputFormat 规范的文件,逐行读取并将它们作为字符串返回。val textLines = env.readTextFile("path")2.readFile(fileInputFormat, path)按照指定的文件输入格式读取(一次)文件。class selfFileInputFormat() extends FileInputFormat[String] { override def reachedEnd(): Boolean = ??? override def nextRecord(ot: String): String = ??? } val dataStream = env.readFile(new selfFileInputFormat(), "")3.readFile(fileInputFormat, path, watchType, interval, pathFilter, typeInfo)上述两个方法是基于 API 直接调用的,底层调用函数为该函数,该方法它基于给定的 fileInputFormat 读取路径 path 上的文件。根据提供的 watchType 的不同,source 可能定期(每 interval 毫秒)监控路径上的新数据。Tips:在底层,Flink 将文件读取过程拆分为两个子任务,即 目录监控 和 数据读取。每个子任务都由一个单独的实体实现。监控由单个非并行(并行度 = 1)任务实现,而读取由多个并行运行的任务执行。后者的并行度和作业的并行度相等。单个监控任务的作用是扫描目录(定期或仅扫描一次,取决于 watchType),找到要处理的文件,将它们划分为 分片,并将这些分片分配给下游 reader。Reader 是将实际获取数据的角色。每个分片只能被一个 reader 读取,而一个 reader 可以一个一个地读取多个分片。FileProcessingMode.PROCESS_CONTINUOUSLY当一个文件被修改时,它的内容会被完全重新处理。这可能会打破 “精确一次” 的语义,因为在文件末尾追加数据将导致重新处理文件的所有内容。FileProcessingMode.PROCESS_ONCEsource 扫描一次路径然后退出,无需等待 reader 读完文件内容。当然,reader 会继续读取数据,直到所有文件内容都读完。关闭 source 会导致在那之后不再有检查点。这可能会导致节点故障后恢复速度变慢,因为作业将从最后一个检查点恢复读取。val env = StreamExecutionEnvironment.getExecutionEnvironment // 自定义 TextFormat class selfFileInputFormat() extends FileInputFormat[String] { override def reachedEnd(): Boolean = ??? override def nextRecord(ot: String): String = ??? } // 隐函数 typeInfo implicit val typeInfo = TypeInformation.of(classOf[String]) // 读取模式 watchType val watchType = FileProcessingMode.PROCESS_CONTINUOUSLY // 文件过滤 fileFilter 也可以加入过滤正在处理的文件逻辑 val filePathFilter = new FilePathFilter { override def filterPath(filePath: Path): Boolean = { filePath.getPath.endsWith(".txt") } } val dataStream = env.readFile(new selfFileInputFormat(), "", watchType, 60L, filePathFilter)三.Collection-Based1.fromCollection(Collection)从 Java Java.util.Collection 创建数据流。集合中的所有元素必须属于同一类型。当然,使用 scala 的转换后,scala 对应的 collection 也可以使用,这里使用方法和 DataSet 类似。val dataStream: DataStream[String] = env.fromCollection(Array("spark", "flink"))2.fromCollection(Iterator, Class)从迭代器获取,class 参数指定返回值返回元素的数据类型val dataStream: DataStream[String] = env.fromCollection(Iterator("spark", "flink"))3.fromElements(T ...)从给定的对象序列中创建数据流。所有的对象必须属于同一类型。val dataStream: DataStream[String] = env.fromElements("spark", "flink")4.fromParellelCollection(SplittableIterator, Class) 从迭代器并行创建数据流。class 参数指定迭代器返回元素的数据类型。val itSequence = new NumberSequenceIterator(0, 100) val dataStream = env.fromParallelCollection(itSequence)5.generateSequence(from, to)val numbers = env.generateSequence(1, 10000000)四.Socket-Based从 Socket 读取。元素可以由分隔符分隔。1.启动 Socket在本地 terminal 执行下列语句并输入一下字符:nc -lk 9999编辑 2.读取 Socketval env = StreamExecutionEnvironment.getExecutionEnvironment val text = env.socketTextStream("localhost", 9999) val counts = text.flatMap { _.toLowerCase.split("\\W+") filter { _.nonEmpty } } .map { (_, 1) } .keyBy(_._1) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .sum(1) counts.print() env.execute("Window Stream WordCount") 上面是使用 keyBy 对 5S 滚动窗口内的单词进行 wordCount 的实例,下面是输出结果:3> (hello,1) 5> (world,1)持续输入一些单词程序会每5s统计一个窗口内的 wordCount。五.AddSource1.官方 API上一边文章提到了 Flink 支持的外部 API 以及对应支持的运行方式,下述 Connector 类别中支持 source 的均可以调用官方 API 和 Maven 依赖进行数据读取与加载生成 DataStream。Connector 类别支持方式Apache Kafkasource/sinkApache CassandrasinkAmazon Kinesis Streamssource/sinkElasticsearchsinkFileSystemsinkRabbitMQsource/sinkGoogle PubSubsource/sinkHybrid Source sourceApache NiFi source/sinkApache PulsarsourceTwitter Streaming APIsourceJDBCsink2. Self-Defined自定义数据源需要继承 RichSourceFunction[T] 并定义数据类型 T, 主要实现 run 方法 - 获取数据与 cancel 方法 - 停止获取数据,这里和 Spark-Streaming 自定义 receiver,Storm 自定义实现 spout 类似,下面例子将以1s为间隔持续从文本中读取新内容并输出:class SourceFromFile extends RichSourceFunction[String] { private var isRunning = true override def run(ctx: SourceFunction.SourceContext[String]): Unit = { val bufferedReader = new BufferedReader(new FileReader("data.txt")) while ( { isRunning }) { val line = bufferedReader.readLine if (!StringUtils.isBlank(line)) { ctx.collect(line) } TimeUnit.SECONDS.sleep(1) } } override def cancel(): Unit = { isRunning = false } } val dataStream = env.addSource(new SourceFromFile()).print() 六.总结结合上一篇 Flink / Scala - DataSource 之 DataSet 获取数据总结,Flink 两种数据结构的获取 - DataSet / DataStream 就都介绍完了,作为流式处理引擎,Flink 更擅长于处理 DataStream 流式数据,后续也会介绍更多的流式数据处理方法。
一.引言Mac-mini 官方只支持连接2个显示器,ctrl cv 复制粘贴代码和写博客的时候十分不方便,这时候需要外接第三个显示器,下面看看如何操作。Tips:第三个显示器的连接器需要有外置显卡,否则无法点亮,同学们注意~二.外接步骤1.设备绿联40244 USB 转 VGA 连接器如下图所示编辑2.下载驱动USB3.0外置显卡驱动程序(苹果Mac OSX系统),点击立即下载即可编辑3.解压源文件解压文件后有如下文件夹,根据自己的系统选择,系统在关于本机查看:编辑编辑4.安装编辑双击目录下 pkg 文件并开始安装,全程 yes 或者 ok 即可:编辑5.修改 DisplayLink Manger APP 配置到启动台找到 DisplayLink Manager 并打开,系统会弹出 "安全性与隐私" 相关提示,点击进入设置界面,对屏幕录制选项下的对应 APP 打勾即可,打勾后配置结束,正常的话第三个屏幕就会点亮 编辑 编辑Tips:完成上述配置后如果第三台显示器仍未点亮,可能是以下原因:A.连接线未完全连接B.重启电脑以适应新驱动C.重启电脑后仍无效,可以尝试重新插拔三台显示器的数据源线三.效果图博主现在的输出配置为:HDMI + 4k & VGA(typeC) + 1080p & VGA(Usb) + 1080 上一个最终效果图,有问题欢迎交流~编辑
一.引言上一篇文章 Flink / Scala - DataSet 应用 Broadcast Variables 介绍了 DataSet 场景下 Broadcast 的使用,本文将介绍 DataStream 中的 Broadcast 应用场景,与 DataSet 类似,Broadcast 的值是所有 task 公用的,Broadcast State 是为 DataStreaming 所有 task 定制的可实时修改的公用值。二.代码常规介绍DataStream<T> output = dataStream .connect(BroadcastStream) .process( // KeyedBroadcastProcessFunction 中的类型参数表示: // 1. key stream 中的 key 类型 // 2. 非广播流中的元素类型 // 3. 广播流中的元素类型 // 4. 结果的类型,在这里是 string new KeyedBroadcastProcessFunction<Ks, In1, In2, Out>() { // 模式匹配逻辑 } );常规使用中我们都包含一个数据流 DataStream,其中包含我们需要处理的数据,如果处理逻辑会随着一个状态值的改变而改变,这是可以引入第二个数据流成为广播流 BroadcastStream,通过调用 DataStream 的 connect 方法,并将 BroadcastStream 参数传入即可获得一个 BroadcastConnectedStream,这时数据同时包含数据流和状态流,需要重写 process 函数处理两个流的数据,根据 DataStream 是否是 Keyd-Stream,Process 方法分为:· keyed 流,那就是 KeyedBroadcastProcessFunction 类型· non-keyed 流,那就是 BroadcastProcessFunction 类型在传入的 BroadcastProcessFunction 或 KeyedBroadcastProcessFunction 中,我们需要实现两个方法。processBroadcastElement() 方法负责处理广播流中的元素,processElement() 负责处理非广播流中的元素。 两个子类型定义如下:public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction { public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception; public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception; } public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> { public abstract void processElement(IN1 value, ReadOnlyContext ctx, Collector<OUT> out) throws Exception; public abstract void processBroadcastElement(IN2 value, Context ctx, Collector<OUT> out) throws Exception; public void onTimer(long timestamp, OnTimerContext ctx, Collector<OUT> out) throws Exception; }需要注意的是 processBroadcastElement() 负责处理广播流的元素,而 processElement() 负责处理另一个流的元素。两个方法的第二个参数(Context)不同,均有以下方法:得到广播流的存储状态:ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)查询元素的时间戳:ctx.timestamp()查询目前的 Watermark:ctx.currentWatermark()目前的处理时间 (processing time):ctx.currentProcessingTime()产生旁路输出:ctx.output(OutputTag<X> outputTag, X value)三.应用实例上面说的比较官方,下面通过一个简单的例子理解一下 BroadCast Value 和 BroadCast Stream 的用处,上面提到了 BroadCast Stream 作为一个状态流控制 DataStream 的数据输出,下面实现以下功能:DataStream: 定期生成 num - 100+num 的 100 个数字,每次生成周期初始化 num + 100BroadCastStream:不定期传入状态控制输出状态,分为 odd-单数 even-双数Sink:根据 odd 和 even 的状态,print 输出 100 个数字中的单数或者双数1.DataStream5s 中生成 num - (num+100) 的数字,下一批数据比上一批增加 100,这里继承 RichSourceFunction 自定义 Source 来源然后通过 addSource 实现,完整的 DataStream Source 生成方法参考: Flink / Scala - DataSource 之 DataStream 获取数据总结。// 每5s生成一批数据 数据流 case class InputData(num: Int) class SourceFromCollection extends RichSourceFunction[InputData] { private var isRunning = true var start = 0 override def run(ctx: SourceFunction.SourceContext[InputData]): Unit = { while ( { isRunning }) { (start to (start + 100)).foreach(num => { ctx.collect(InputData(num)) }) start += 100 TimeUnit.SECONDS.sleep(5) } } override def cancel(): Unit = { isRunning = false } } val keyedStream = env.addSource(new SourceFromCollection()).setParallelism(1).keyBy(_.num)上述流根据 num- num+100 的数字生成 InputData 类,并通过 keyBy 生成 Keyd-Stream。2.BroadCastStreamBroadCastStream 广播流即本例中的状态流,这里通过 File 传递状态值并解析,同样是继承 RichFunction 实现自定义的 Source,每 1s 从对应文件读取,获取是否有新的状态传入。// MapStateDescriptor odd: 奇数 even: 偶数 case class FilterState(state: String) // 每s监控一次文件,并读取最新的状态 class SourceFromFile extends RichSourceFunction[String] { private var isRunning = true override def run(ctx: SourceFunction.SourceContext[String]): Unit = { val bufferedReader = new BufferedReader(new FileReader("./data.txt")) while ( { isRunning }) { val line = bufferedReader.readLine if (!StringUtils.isBlank(line)) { ctx.collect(line) } TimeUnit.SECONDS.sleep(1) } } override def cancel(): Unit = { isRunning = false } } val ruleStateDescriptor = new MapStateDescriptor("RulesBroadcastState", classOf[String], classOf[FilterState]) // 广播流,广播规则并且创建 BroadCast val ruleStream = env.addSource(new SourceFromFile).setParallelism(1).map(new RichMapFunction[String, FilterState]() { override def map(in: String): FilterState = { FilterState(in) } }).broadcast(ruleStateDescriptor)stateDescriptor 负责声明广播状态的类型,这里定义为 MapStateDescriptor ,后续通过 String 类型的 key 即可获取对应的 FilterState,从而决定 DataStream 中的数据如何 sink。3.合并 DataStream 与 BroadCastStreamDataStream.connect(BroadCastStream),由于原始 DataStream 为 keyd-stream,所以使用 keyedBroadcastProcessFunction,共包含四个参数:· ks - keyBy 字段的类型,这里根据 InputData.num keyBy,所以是 Int· IN1 - DataStream 数据流的类型,这里是 InputData· IN2 - BroadCastStream 广播流的类型,这里是 FilterState· OUT - Sink 输出端为直接输出 Print String,所以为 StringkeyedStream.connect(ruleStream).process(new KeyedBroadcastProcessFunction[Int, InputData, FilterState, String] { // 与之前的 Descriptor 相同 val ruleStateDescriptor = new MapStateDescriptor("RulesBroadcastState", classOf[String], classOf[FilterState]) override def processElement(inputData: InputData, context: KeyedBroadcastProcessFunction[Int, InputData, FilterState, String]#ReadOnlyContext, out: Collector[String]): Unit = { val filterStateClass = context.getBroadcastState(ruleStateDescriptor).get("broadcastStateKey") val filterState = if (filterStateClass == null) { "odd" } else { filterStateClass.state } // 奇数模式 if (filterState == "odd" && inputData.num % 2 != 0) { out.collect(inputData.num.toString) } // 偶数模式 if (filterState == "even" && inputData.num % 2 == 0) { out.collect(inputData.num.toString) } } override def processBroadcastElement(filterState: FilterState, context: KeyedBroadcastProcessFunction[Int, InputData, FilterState, String]#Context, collector: Collector[String]): Unit = { // 从广播中获取规则 val broadCastValue = context.getBroadcastState(ruleStateDescriptor) broadCastValue.put("broadcastStateKey", filterState) println(s"Rule Changed: ${filterState.state}") } }).setParallelism(1).print()A. ProcessElement该方法负责输出数据,根据 FilterState 的状态是 odd-单数 还是 even-双数,状态默认为 odd-单数。通过 context.getBroadcastState(StateDescriptor) 方法获取 BroadcastStream 中的 FilterState 数据。注意这里的 StateDescriptor 要与上面初始化的 StateDescriptor 保持一致。B. ProcessBroadcastElement 该方法负责处理 Broadcast 数据流并更新至 context,从而其他 task 节点在执行 processElement 方法时获取最新的状态值,这里 put 的 Key 和上述方法 get 的 Key 需要保持一致,否则获取状态值为 null。4. 测试为了本地测试方便查看,两个 Stream 的 parallelism 都设置为1。状态文件 File 为空,此时默认状态为 odd,输出单数:编辑文件内增加一行 even,并 ctrl s 保存,此时 Broadcast 1s 的间隔检测到新状态 even,处理并更细至各 task,各 task 输出偶数:编辑 再次增加一行 odd,此时输出状态改变,重新修改为输出单数:编辑一个基本的 BroadcastValue 控制 DataStream 的实例就完成了,状态文件夹最终包含两行状态数据:编辑5.完整代码import org.apache.flink.api.common.functions.RichMapFunction import org.apache.flink.api.common.state.MapStateDescriptor import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction} import org.apache.flink.streaming.api.scala._ import org.apache.flink.util.Collector import org.apache.commons.lang3.StringUtils import java.io.BufferedReader import java.io.FileReader import java.util.concurrent.TimeUnit object BroadCastStateDemo { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 每5s生成一批数据 数据流 case class InputData(num: Int) class SourceFromCollection extends RichSourceFunction[InputData] { private var isRunning = true var start = 0 override def run(ctx: SourceFunction.SourceContext[InputData]): Unit = { while ( { isRunning }) { (start to (start + 100)).foreach(num => { ctx.collect(InputData(num)) }) start += 100 TimeUnit.SECONDS.sleep(5) } } override def cancel(): Unit = { isRunning = false } } val keyedStream = env.addSource(new SourceFromCollection()).setParallelism(1).keyBy(_.num) // 每s监控一次文件,并读取最新的状态 class SourceFromFile extends RichSourceFunction[String] { private var isRunning = true override def run(ctx: SourceFunction.SourceContext[String]): Unit = { val bufferedReader = new BufferedReader(new FileReader("/Users/xudong11/flink/src/main/scala/com.weibo.ug.push.flink/DataStreamingDemo/data.txt")) while ( { isRunning }) { val line = bufferedReader.readLine if (!StringUtils.isBlank(line)) { ctx.collect(line) } TimeUnit.SECONDS.sleep(1) } } override def cancel(): Unit = { isRunning = false } } // MapStateDescriptor odd: 奇数 even: 偶数 case class FilterState(state: String) val ruleStateDescriptor = new MapStateDescriptor("RulesBroadcastState", classOf[String], classOf[FilterState]) // 广播流,广播规则并且创建 BroadCast val ruleStream = env.addSource(new SourceFromFile).setParallelism(1).map(new RichMapFunction[String, FilterState]() { override def map(in: String): FilterState = { FilterState(in) } }).broadcast(ruleStateDescriptor) // 连接两个流 keyedStream.connect(ruleStream).process(new KeyedBroadcastProcessFunction[Int, InputData, FilterState, String] { // 与之前的 Descriptor 相同 val ruleStateDescriptor = new MapStateDescriptor("RulesBroadcastState", classOf[String], classOf[FilterState]) override def processElement(inputData: InputData, context: KeyedBroadcastProcessFunction[Int, InputData, FilterState, String]#ReadOnlyContext, out: Collector[String]): Unit = { val filterStateClass = context.getBroadcastState(ruleStateDescriptor).get("broadcastStateKey") val filterState = if (filterStateClass == null) { "odd" } else { filterStateClass.state } // 奇数模式 if (filterState == "odd" && inputData.num % 2 != 0) { out.collect(inputData.num.toString) } // 偶数模式 if (filterState == "even" && inputData.num % 2 == 0) { out.collect(inputData.num.toString) } } override def processBroadcastElement(filterState: FilterState, context: KeyedBroadcastProcessFunction[Int, InputData, FilterState, String]#Context, collector: Collector[String]): Unit = { // 从广播中获取规则 val broadCastValue = context.getBroadcastState(ruleStateDescriptor) broadCastValue.put("broadcastStateKey", filterState) println(s"Rule Changed: ${filterState.state}") } }).setParallelism(1).print() env.execute() } }四.总结1.实现步骤Broadcast Value 通过 DataStream connect BroadCastStream 连接实现,期间注意两个 ProcessFunction 的重写与对应 StateDescriptor 的定制即可。2.数据一致性其次需要注意两个 processFunction 的参数 ctx,在 processElement 中 ctr 是 readOnly,因为一致性的原因,这里只允许 task 读取最新的 State 但不能修改;相反的 processBroadcastElement 方法中的 context 允许修改其中 value 状态的值,注意这里的逻辑要保持全局的一致性(增加随机数随机修改状态值可视作是不保持全局唯一性的操作),否则会造成状态不同从而导致 task 端输出不一致。3.CheckPoint所有的 task 均会对 broadcast state 进行 checkpoint:虽然所有 task 中的 broadcast state 是一致的,但当 checkpoint 来临时所有 task 均会对 broadcast state 做 checkpoint。 这个设计是为了防止在作业恢复后读文件造成的文件热点即 hotspot 。当然这种方式会造成 checkpoint 一定程度的写放大,放大倍数为 p(=并行度)。Flink 会保证在恢复状态 / 改变并发的时候数据没有重复且没有缺失。 在作业恢复时,如果与之前具有相同或更小的并发度,所有的 task 读取之前已经 checkpoint 过的 state。在增大并发的情况下,task 会读取本身的 state,多出来的并发(p_new - p_old)会使用轮询调度算法读取之前 task 的 state。4.State Backendbroadcast state 在运行时保存在内存中,需要保证内存充足。这一特性同样适用于所有其他 Operator State,因此不使用 RocksDB state backend。
一.引言使用 Flink 自定义 Source 生成数据时,集群提交任务时显示 org.apache.log4j.Logger@72c927f1 is not serializable. The object probably contains or references non serializable fields. 报错序列化相关错误 :编辑二.问题解决1.Scala Class 初始化不需要对应变量错误代码:val logger = Logger.getLogger(classOf[T])正确代码:通过 scala 延迟加载功能与 @transient 关键字忽略对该变量的序列化,前提是该变量在对应 class[T] 初始化时不需要,如果某个变量在 class[T] 初始化时调用,加了 @transient 关键字会导致该变量为 null 并报错空指针。@transient lazy val logger = Logger.getLogger(classOf[T])2.Scala Class 初始化需要对应变量上述 logger 在 Class 初始化阶段不使用,所以可以使用 @transient 延迟初始化解决问题,还有一些变量的生成无法延迟初始化,例如使用 redis 初始化一些变量,如果使用 @transient 会报如下错误:编辑此时需要将对应无法初始化的类放到 open 初始化函数中,然后变量通过 var 修饰符在 class 内定义,并在 open 函数内执行实际初始化方法,Flink RichSourceFunction open 函数使用方法如下:var redis: Jedis = _ var initValue: T = _ override def open(parameters: Configuration): Unit = { redis = getRedisClient(host, port) initValue = ... (包含redis读取的初始化方法) }3.Java错误代码:private Logger logger = Logger.getLogger(T.class)正确代码:log4j 不能序列化,为了防止 logger 被序列化,需要保持其处于 @transient 或者 static 状态,前者会导致上述相同的问题即 NullPointException,所以这里通过 static + final 修饰。private static final Logger logger = Logger.getLogger(T.class)三.扩展上述错误发生在 class[T] 内的变量 logger,变量无法序列化通过上述方法即可解决,如果是 class[T] 内某个 class 无法序列化,则需要实现 java.io.Serializable 接口,保证该类可以被序列化。上面的序列化问题出现在 BroadcastStream 场景下,由于 broadcastStream 中的类T 中有变量无法序列化导致广播流失效,通过 scala 方法已完美解决。
一.问题分析原始程序使用 EventTime,JobGraph 为 Source + KeyBy + ProcessFunction + Window + Sink 形式,其中 ProcessFunction 内设置了 ValueState 与 onTimer 的机制,由于需要定时更新一些任务需要的实时变量,故引入 BroadcastStream 实现实时变量的不定时更新,经过修改后的 JobGraph 为 Source + keyd-Broadcast-Process + window + Sink,如图所示:编辑任务执行后发现第三步 Co-Process-Broadcast-keyed Stage 部分 Records-Received 正常,但是 Bytes Sent 消失了,即下游没有 (onTimer) 的数据写入到 window ,导致所有数据卡在第三步,window 无法触发:编辑再看下游算子发现 WaterMark 均为空: 编辑这时才想起来,自己的任务设定为 EventTime,原始 Source 设置了时间戳和 watermark 但新增的 BroadcastStream 未设置时间戳和 watermark 遂导致另外一个 Source 不更新 watermark,导致整个任务的 watermark 无法推进,从而导致任务卡死。所以解决方法就是给 BroadcastStream 实现 assignTimestampsAndWatermarks 方法设置时间戳和 watermark 即可。二.问题修复broadcastStream 主题框架如下,如果需要完整的示例可以参考: DataStream Broadcast 示例详解DataStream<T> output = dataStream .connect(BroadcastStream) .process( // KeyedBroadcastProcessFunction 中的类型参数表示: // 1. key stream 中的 key 类型 // 2. 非广播流中的元素类型 // 3. 广播流中的元素类型 // 4. 结果的类型,在这里是 string new KeyedBroadcastProcessFunction<Ks, In1, In2, Out>() { // 模式匹配逻辑 } );下面详细说下 BroadcastStream,任务失败之前我的 BroadcastStream 是这样定义的:MapStateDescriptor<String, T> descriptor = new MapStateDescriptor<>("T", String.class, T.class); BroadcastStream<T> contextStream = env .addSource(new SelfDefinedSource()) .setParallelism(1) .broadcast(descriptor);这里通过自定义的 SelfDefinedSource 函数定期读取更新的在线变量,并后续通过 ctx.collect() 生产到 BroadcastStream 中,由于未对该数据流设置 watermark 导致任务失败,下面继承 BoundedOutOfOrdernessTimestampExtractor 类为该数据流设置 watermark:public static class ContextTSExtrator extends BoundedOutOfOrdernessTimestampExtractor<T> { public MultiSortContextTSExtrator() { super(Time.seconds(MAX_SEND_EVENT_DELAY)); } @Override public long extractTimestamp(T t) { return System.currentTimeMillis(); } }这里直接设置为 System.currentTimeMillis,如果需要从 T 中获取时间戳也可以生成 T 时绑定其 EventTimeStamp,随后为源数据通过 assign 方法增加时间戳:BroadcastStream<T> contextStream = env .addSource(new SelfDefinedSource()) .assignTimestampsAndWatermarks(newContextTSExtrator()) .setParallelism(1) .broadcast(descriptor);增加完时间戳后,任务正常执行,可以看到流程图上 keyd-Broadcast-Process 部分和 window 部分也都有了实时更新的 watermark,任务正常进行:编辑三.优化processFunction 中处理后的数据设置时间=t 的过期时间,即 eventTime + t 为 onTimer 的时机,BroadCastStream 则时每隔 X 分钟读取线上最新变量并更新最后通过 context.collect(T) 发出,X 分钟通过 Times.sleep() 实现。理想情况每隔数据的间隔为 t ,但根据 stage 和日志中的日志发现每隔元素处理到 sink 的间隔为 t + X,即延时处理的时间与定时读取实时变量的间隔一致。上述新增的 waterMark 为 System.currentimeMilles,但是因为每隔 X 分钟执行一次,所以 broadcastStream 发出的 waterMark 总是落后的,基于这个事实继续排查问题。1.单流 Watermark 机制首先看一下每个数据源的 watermark 更新机制,这个可以在 AssignerWithPeriodicWatermarks 类中看到,默认不需要我们复写:public final Watermark getCurrentWatermark() { long potentialWM = this.currentMaxTimestamp - this.maxOutOfOrderness; if (potentialWM >= this.lastEmittedWatermark) { this.lastEmittedWatermark = potentialWM; } return new Watermark(this.lastEmittedWatermark);将当前元素的时间戳减去最大的迟到容忍时间获取 poetntialWaterMark,如果大于上一次发出的 waterMark,则重新赋值,即取 currentWaterMark = Max(poentitialWaterMark,lastEmittedWaterMark),该值代表 Flink 认为小于该 currentWaterMark 值的数据都到了。2.多流 WaterMark 机制co-broadcast-keyed-stream 属于双流任务,每个流都包含一个 waterMark,两个流同时发出 waterMar,Flink 底层通过继承 TwoInputStreamOperator 接口完成对双流 element 和 waterMark 的处理:public interface TwoInputStreamOperator<IN1, IN2, OUT> extends StreamOperator<OUT> { void processElement1(StreamRecord<IN1> var1) throws Exception; void processElement2(StreamRecord<IN2> var1) throws Exception; void processWatermark1(Watermark var1) throws Exception; void processWatermark2(Watermark var1) throws Exception; void processLatencyMarker1(LatencyMarker var1) throws Exception; void processLatencyMarker2(LatencyMarker var1) throws Exception; }基于我们上面遇到的延时问题,这里需要重点关注 processWatermark 的逻辑,processWatermark 的逻辑放在了 AbstractStreamOperator 抽象类下:public void processWatermark(Watermark mark) throws Exception { if (this.timeServiceManager != null) { this.timeServiceManager.advanceWatermark(mark); } this.output.emitWatermark(mark); } public void processWatermark1(Watermark mark) throws Exception { this.input1Watermark = mark.getTimestamp(); long newMin = Math.min(this.input1Watermark, this.input2Watermark); if (newMin > this.combinedWatermark) { this.combinedWatermark = newMin; this.processWatermark(new Watermark(this.combinedWatermark)); } } public void processWatermark2(Watermark mark) throws Exception { this.input2Watermark = mark.getTimestamp(); long newMin = Math.min(this.input1Watermark, this.input2Watermark); if (newMin > this.combinedWatermark) { this.combinedWatermark = newMin; this.processWatermark(new Watermark(this.combinedWatermark)); } }针对 Stream1 和 Stream2 分别执行 processWatermark1 和 processWatermark2,其内部通过 Min 方法选取两个流中最小的一个作为新的最小值 WaterMark,然后再和 lastEmittedWatermark 进行比较再推进整个任务的 watermark 流动。所以针对上述等待时间 X 过长的问题,我们需要提高 BroadcastStream 的 watermark 生产效率,即缩短产生 T 的间隔。3.缩短 Watermark 发送间隔将间隔 X 缩短至非常小的间隔 x,需要给生成的 T 增加一个 valid 属性,给 Source 函数增加一个 epoch 控制,只有当 epoch * x = X 的时候,读取线上更新并发送一个 valid = true 的 T,其余时间发送 valid = false 的 T(null),随后在 ProcessBroadcastValue 时判断 valid 状态,只有为 true 时才更新 Broadcast 的 value 到各个 task。override def run(sourceContext: SourceFunction.SourceContext[T]): Unit = { Client client = new Socket(host, port); while (isRunning) { val t = if (epoch % X == 0) { ... 读取线上变量 re.setValid() // 生效 re } else { val re = new T(null) re } sourceContext.collect(t) TimeUnit.SECONDS.sleep(x) epoch += 1 } }例如之前以 X=300s 的间隔生成 T,将间隔缩短为 x=5s,则推算出 epoch = 300/5 = 60,所以 if 逻辑内为 epoch % 60 == 0,其余时间设计以此类推。4.更优的方案上述方案把间隔从 X 优化至 x,但是任务执行的数据源其实是另一个 DataStream,BroadcastStream 只负责定时更新 DataStream 需要的变量,所以最好的方案是任务 watermark 流动完全取决于 DataStream。这时再回看双流 processWatermark 函数:public void processWatermarkX(Watermark mark) throws Exception { this.input2Watermark = mark.getTimestamp(); long newMin = Math.min(this.input1Watermark, this.input2Watermark); if (newMin > this.combinedWatermark) { this.combinedWatermark = newMin; this.processWatermark(new Watermark(this.combinedWatermark)); } }newMin 是通过取双流最小,然后再以最小的为基准,退化为单流更新 WaterMark 更新逻辑,只取决于 DataStream ,那我们把 Broadcast 的 waterMark 设定为 Max 不就完了,这样每次 newMin 的值都取决于 DataStream,从而保证 watrtMark 流动完全取决于 DataStream,这样最后很短的间隔 x 也没有了,任务执行的延迟完全为 onTimer 设置的 valueState 的过期时间。修改 BroadcastStream 设定时间戳和 watermark 的函数:直接设置为 Watermark 类自带的 MAX_WARTERMARK.getTimeStamp(),自己设置一个非常大的时间戳也可以。public static class ContextTSExtrator extends BoundedOutOfOrdernessTimestampExtractor<T> { public MultiSortContextTSExtrator() { super(Time.seconds(MAX_SEND_EVENT_DELAY)); } @Override public long extractTimestamp(T t) { return Watermark.MAX_WATERMARK.getTimestamp(); } }其值为:static final Watermark MAX_WATERMARK = new Watermark(9223372036854775807L);四.总结通过增加和修改 watermark 逻辑,增加 broadcastStream 的双流任务终于正常执行,且数据流动完全取决于原始数据流,非常的完美。出现这样的问题还是对 Flink 的 watermark 机制不是很清晰,后续还需要继续加深对 watermark 的理解。
一.引言Flink 针对 window 提供了多种自定义 trigger,其中常见的有 CountTrigger 和 ProcessingTimeTrigger,下面通过两个 demo 了解一下两个 Trigger 的内部实现原理与窗口触发的相关知识。二.辅助知识介绍上述两个 Trigger 之前,首先重新回顾下之前提高的 trigger 基础知识。1.Trigger 内部方法· onElement :元素到达后执行的操作· onProcessingTime:到达规定处理时间窗口执行的操作· onEventTime :到达规定事件时间窗口执行的操作· clear : 清除相关 value 变量2.Window Trigger 后的操作· TriggerResult.CONTINUE :跳过,什么都不做· TriggerResult.FIRE :触发窗口计算· TriggerResult.PURGE : 清除窗口元素· TriggerResult.FIRE_AND_PURGE : 触发窗口操作,随后清空窗口元素3.ReducingStateValueReducingStateValue 是一个抽象的统计量,需要用户自己定义其返回类型和对应的 reduce 操作,这里 reduce 并不是减少而是合并的意思,可以理解为 spark 里的 reduce(_ + _) 操作,即针对给定的 object1 和 object2,合成一个单独的 object,定义该变量方法如下:val reduceStateDesc = new ReducingStateDescriptor[T]($key, new ReduceFunction(), classOf[T])T 代表返回变量类型,key 为其 name 标识,ReduceFunction 需要继承 org.apache.flink.api.common.functions.ReduceFunction 实现 reduce(o1: T, o2: T): T 的方法,下面示例生成一个得到两数中最小的一个的 RecuceFunction:class ReduceMin() extends ReduceFunction[Long] { override def reduce(t1: Long, t2: Long): Long = { math.min(1, t2) } }4.Window 默认触发时机dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10)))使用 org.apache.flink.streaming.api.windowing.assigners 类下的窗口例如滚动窗口 TumblingProcessingTimeWindows 时,根据我们设置的时间,窗口的开始和结束时间其实是固定的,待参数确定后,窗口的 start - end 就确定好了,以上述 10s 滚动窗口为例,则窗口默认的开始结束时间为整点,按18:00 开始:18:00:00 - 18:00:10 ,18:00:10 - 18:00:20 ... 18:59:50 - 19:00:00每当窗口到达规定结束时间时,都会默认调用 onProcessingTime 方法,这里不理解也没关系,等下看 demo 即可。三.CountTrigger 详解CountTrigger 按照 element 的个数进行触发,当元素数量达到 count_size 是,触发窗口计算逻辑,其内部统计 count 数量就用到了前面提到的 ReduceStateValue,为了在执行过程中添加运行日志,这里新增一个 SelfDefinedCountTrigger,代码与官方提供的 CountTrigger 完全一致,唯一差别是增加了日志打印。1.SelfDefinedCountTriggertrigger 内增加了触发的时间和触发的元素数量,主要逻辑都在 onElement 函数内,每来一个元素都会对 trigger 的 ReduceStateValue 累加值,这里采用 RecudeSum 函数,对两数求和,当数值达到 countSize 时进行窗口重触发 Fire 并清空 ReduceStateValue 开始新一轮计数;其余时间都返回 TriggerResult.CONTINUE。class SelfDefinedCountTrigger(maxCount: Long) extends Trigger[String, TimeWindow] { // 条数计数器 val countStateDesc = new ReducingStateDescriptor[Long]("count", new ReduceSum(), classOf[Long]) // 元素过来后执行的操作 override def onElement(t: String, l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { // 获取 count state 并累加数量 val count = triggerContext.getPartitionedState(countStateDesc) count.add(1L) // 满足数量触发要求 if (count.get() >= maxCount) { // 首先清空计数器 count.clear() val dateFormat = new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss") val cla = Calendar.getInstance() cla.setTimeInMillis(System.currentTimeMillis()) val date = dateFormat.format(cla.getTime) println(s"[$date] Window Trigger By Count = ${maxCount}") TriggerResult.FIRE } else { TriggerResult.CONTINUE } } override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { TriggerResult.CONTINUE } override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { TriggerResult.CONTINUE } override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = { val count = triggerContext.getPartitionedState(countStateDesc) count.clear() } }2.主函数主函数逻辑为从0开始,每s生成30个数字并循环累加,window 采用 10s 聚合的滚动窗口,trigger 采用 count = 30 的 CountTrigger,理论上每s生成的30个元素恰好触发窗口执行逻辑。窗口处理逻辑逻辑也很简单,直接输出当前 window 内的元素个数,元素 min,max 与处理时间:object CountTriggerDemo { val dateFormat = new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss") // 每s生成一批数据 class SourceFromCollection extends RichSourceFunction[String] { private var isRunning = true var start = 0 override def run(ctx: SourceFunction.SourceContext[String]): Unit = { while (isRunning) { (start until (start + 100)).foreach(num => { ctx.collect(num.toString) if (num % 30 == 0) { TimeUnit.SECONDS.sleep(1) } }) start += 100 } } override def cancel(): Unit = { isRunning = false } } def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env .addSource(new SourceFromCollection()) .setParallelism(1) .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))) .trigger(new SelfDefinedCountTrigger(30)) .process(new ProcessAllWindowFunction[String, String, TimeWindow] { override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = { val cla = Calendar.getInstance() cla.setTimeInMillis(System.currentTimeMillis()) val date = dateFormat.format(cla.getTime) val info = elements.toArray.map(_.toInt) val min = info.min val max = info.max val output = s"==========[$date] Window Elem Num: ${elements.size} Min: $min -> Max $max==========" out.collect(output) } }).print() env.execute() } }3.执行日志由于启动时并非整数时间,所以第一个窗口只处理了 8s 数据 8:30:02 - 8:30:10,第一个窗口逻辑结束后,后面都是稳定的从 10s 的滑动窗口中按每 30 个元素一次进行触发,可以通过 Window Elem Num 看到 10s 内窗口数据的变化,10s x 30 = 300。编辑4.窗口默认触发机制上面提高了窗口会在默认的 end 时间执行 onProcessingTime 方法,由于方法内只返回了 TriggerResult.CONTINUE 所以不明显,下面在 onProcessingTime 方法中增加日志验证一下:override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { println(s"Window Trigger At Default End Time: $l") TriggerResult.CONTINUE }查看日志:编辑通过日志信息看到了窗口确实会在结束时间执行 onProcessingTime 方法,但是为什么不是整数呢 编辑这里是因为窗口实际触发的 timeStamp 是 window.maxTimestamp() 变量对应的方法,而该方法定义如下:public long maxTimestamp() { return this.end - 1L; }源码在默认 end 对应的时间戳上做了减一的处理,实际窗口结束时间为 1648034659999 + 1 :编辑 所以这里验证了两个问题,第一就是窗口会在默认结束时间调用 onProcessingTime 方法,其次就是窗口的结束时间和真实触发时间相差 1L, window.maxTimestamp + 1 = window.getEnd。四.ProcessingTimeTrigger 详解processingTimeTrigger 是 processTime 对应 Flink 程序 window 的默认 trigger,其根据窗口默认的 start ,end 时间进行触发,还是用 10s 的 TumblingProcessingTimeWindows 窗口加自定义 SelfDefinedProcessingTimeTrigger 进行 demo 展示。1.SelfDefinedProcessingTimeTriggerProcesingTimeTrigger 主要方法为 onElement 和 onProcessingTime ,前者对窗口进行 timeServer 的注册,其过期时间为 window.maxTimestamp,上面也提到了,时间就是 window.getEnd() - 1,后者执行 Fire 触发窗口计算,为了获得更细致的时间信息,这里增加了 processingTime 和 window-start window-end 的相关日志。class SelfDefinedProcessingTimeTrigger() extends Trigger[String, TimeWindow] { // 条数计数器 val countStateDesc = new ReducingStateDescriptor[Long]("count", new ReduceSum(), classOf[Long]) val dateFormat = new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss") val cla = Calendar.getInstance() // 元素过来后执行的操作 override def onElement(t: String, l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { triggerContext.registerProcessingTimeTimer(w.maxTimestamp) TriggerResult.CONTINUE } override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { cla.setTimeInMillis(w.getStart) val start = dateFormat.format(cla.getTime) cla.setTimeInMillis(w.getEnd) val end = dateFormat.format(cla.getTime) println(s"start: $start end: $end processTime: $l maxTimeStamp: ${w.maxTimestamp()} windowStart: ${w.getStart} windowEnd: ${w.getEnd}") TriggerResult.FIRE } override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { TriggerResult.CONTINUE } override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = { triggerContext.deleteProcessingTimeTimer(w.maxTimestamp) } }2.主函数数据 Source 为自定义 Source,每s生成30个元素,以10s为周期生成滚动窗口,处理逻辑依然为输出窗口的元素个数,min 和 max。object ProcessTimeTriggerDemo { val dateFormat = new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss") // 每s生成一批数据 class SourceFromCollection extends RichSourceFunction[String] { private var isRunning = true var start = 0 override def run(ctx: SourceFunction.SourceContext[String]): Unit = { while (isRunning) { (start until (start + 100)).foreach(num => { ctx.collect(num.toString) if (num % 30 == 0) { TimeUnit.SECONDS.sleep(1) } }) start += 100 } } override def cancel(): Unit = { isRunning = false } } def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env .addSource(new SourceFromCollection()) .setParallelism(1) .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))) .trigger(new SelfDefinedProcessingTimeTrigger()) .process(new ProcessAllWindowFunction[String, String, TimeWindow] { override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = { val cla = Calendar.getInstance() cla.setTimeInMillis(System.currentTimeMillis()) val date = dateFormat.format(cla.getTime) val info = elements.toArray.map(_.toInt) val min = info.min val max = info.max val output = s"==========[$date] Window Elem Num: ${elements.size} Min: $min -> Max $max==========" out.collect(output) } }).print() env.execute() } }3.执行日志窗口每个10s执行一次触发,触发 elem 数量为 10x30 = 300,通过示例可以看到 window-start window-end 和其对应的 format 时间形式,以及再次验证 maxTimestamp = window.end - 1。编辑五.总结通过微调官方 Trigger 并增加日志,可以看到最常见的 CountTrigger 和 ProcessingTimeTrigger 的执行逻辑并对加深窗口触发的逻辑,后续将结合 CountTrigger 和 ProcessTimeTrigger 实现自定义的 CountAndTimeTrigger,该 Trigger 结合了 Count 和 ProcessingTime 的触发条件,可以让窗口在满足条数或满足间隔的情况下都触发。
一.引言上一篇文章提到了 CountTrigger && ProcessingTimeTriger,前者 CountTrigger 指定 count 数,当窗口内元素满足逻辑时进行一次触发,后者通过 TimeServer 注册窗口过期时间,到期后进行一次触发,本文自定义 Trigger 实现二者的合并即 Count 和 ProcessingTime 满足任意条件窗口都进行一次触发。二.代码详解1.CountAndProcessingTimeTrigger整体代码如下,主要逻辑包含在 onElement 和 onProcessingTime,前者主要负责根据 count 触发,即实现 CountTrigger 的功能,后者则主要实现 ProcessingTime 的功能,需要预先定义两个 ReduceValue 分别记录 Count 和 Time,ReduceValue 详细用法可参考上文,下面分析主要方法。class CountAndProcessingTimeTrigger(maxCount: Long, interval: Long) extends Trigger[String, TimeWindow] { // 条数计数器 val countStateDesc = new ReducingStateDescriptor[Long]("count", new ReduceSum(), classOf[Long]) // 时间计数器,保存下一次触发的时间 val timeStateDesc = new ReducingStateDescriptor[Long]("interval", new ReduceMin(), classOf[Long]) // 元素过来后执行的操作 override def onElement(t: String, time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { // 获取 count state 并累加数量 val count = triggerContext.getPartitionedState(countStateDesc) val fireTimestamp = triggerContext.getPartitionedState(timeStateDesc) // 考虑count是否足够 count.add(1L) if (count.get() >= maxCount) { val log = s"CountTrigger Triggered Count: ${count.get()}" println(formatString(log)) count.clear() // 不等于默认窗口的触发时间 if (fireTimestamp.get() != window.maxTimestamp()) { triggerContext.deleteProcessingTimeTimer(fireTimestamp.get()) } fireTimestamp.clear() return TriggerResult.FIRE } // 添加窗口的下次触发时间 val currentTimeStamp = triggerContext.getCurrentProcessingTime if (fireTimestamp.get() == null) { val nextFireTimeStamp = currentTimeStamp + interval triggerContext.registerProcessingTimeTimer(nextFireTimeStamp) fireTimestamp.add(nextFireTimeStamp) } TriggerResult.CONTINUE } override def onProcessingTime(time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { // 获取 count state val count = triggerContext.getPartitionedState(countStateDesc) // 获取 Interval state val fireTimestamp = triggerContext.getPartitionedState(timeStateDesc) // time default trigger if (time == window.maxTimestamp()) { val log = s"Window Trigger By maxTimeStamp: $time FireTimestamp: ${fireTimestamp.get()}" println(formatString(log)) count.clear() triggerContext.deleteProcessingTimeTimer(fireTimestamp.get()) fireTimestamp.clear() fireTimestamp.add(triggerContext.getCurrentProcessingTime + interval) triggerContext.registerProcessingTimeTimer(fireTimestamp.get()) return TriggerResult.FIRE } else if (fireTimestamp.get() != null && fireTimestamp.get().equals(time)) { val log = s"TimeTrigger Triggered At: ${fireTimestamp.get()}" println(formatString(log)) count.clear() fireTimestamp.clear() fireTimestamp.add(triggerContext.getCurrentProcessingTime + interval) triggerContext.registerProcessingTimeTimer(fireTimestamp.get()) return TriggerResult.FIRE } TriggerResult.CONTINUE } override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { TriggerResult.CONTINUE } override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = { // 获取 count state val count = triggerContext.getPartitionedState(countStateDesc) // 获取 Interval state val fireTimestamp = triggerContext.getPartitionedState(timeStateDesc) count.clear() fireTimestamp.clear() } }2.onElement每个元素到达时执行 count.add 进行计数,如果满足超过定义的 maxCount 则进行触发操作:---- 达到 MaxCountA.log - 打印 log 标识本次触发来源于 CountTriggerB.count.clear - 清空数值重新累加 count 并触发C.deleteProcessingTime - 清空 TimeServer 计数器,因为触发后不管 Count 还是 ProcessingTime 都要重新计数或计时----- 未达到 MaxCountA.currentTime - 通过 ctx 上下文获取当前 ProcessingTimeB.registerProcessingTimeTimer - 判断时间 value 是否有值,如果没有值则根据 current & interval 计算得到 ProcessingTime 对应的下次窗口触发时间----- 都不满足A.TriggerResult.CONTINUE - 不做触发,等待 TimeServer 到期override def onElement(t: String, time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { // 获取 count state 并累加数量 val count = triggerContext.getPartitionedState(countStateDesc) val fireTimestamp = triggerContext.getPartitionedState(timeStateDesc) // 考虑count是否足够 count.add(1L) if (count.get() >= maxCount) { val log = s"CountTrigger Triggered Count: ${count.get()}" println(formatString(log)) count.clear() // 不等于默认窗口的触发时间 if (fireTimestamp.get() != window.maxTimestamp()) { triggerContext.deleteProcessingTimeTimer(fireTimestamp.get()) } fireTimestamp.clear() return TriggerResult.FIRE } // 添加窗口的下次触发时间 val currentTimeStamp = triggerContext.getCurrentProcessingTime if (fireTimestamp.get() == null) { val nextFireTimeStamp = currentTimeStamp + interval triggerContext.registerProcessingTimeTimer(nextFireTimeStamp) fireTimestamp.add(nextFireTimeStamp) } TriggerResult.CONTINUE }3.onProcessingTime到达规定处理时间窗口执行的操作,上文我们讲到了窗口会在两种时机调用 onProcessingTime 方法,一种是达到自己定义的 ProcessintTimeTimer,窗口会进行 Fire 触发,此时触发数据为窗口的部分数据,还有一种是到达 window.maxTimeStamp 即到达 window.getEnd - 1L,此时窗口 Fire 触发的数据为 windowAll 定义的时间范围内所有数据,例如定义 Time.seconds(10),前者触发部分时间数据,后者触发完整的 10s 窗口。----- 到达窗口默认触发时间A.window.maxTimestamp - 到达窗口默认时间,打印对应日志标识B.count.clear - 清空计数状态C.deleteProcessingTime - 清空原计数器,因为这里窗口触发后就要重新计数和计时D.registerProcessingTIme - 基于当前 ProcessingTime + interval 注册下次时间E.TriggerResult.FIRE - 进行全数据的窗口触发----- 到达自定义 interval 间隔A.日志标识 - 打印 TimeTriggered 标识本次触发来源自自定义 ProcessingTimeB.clear - 窗口触发后清空原有 count 状态C.registerProcessingTIme - 基于当前 ProcessingTime + interval 注册下次时间D.TriggerResult.FIRE - 触发窗口数据----- 都不满足A.TriggerResult.CONTINUE - 什么都不做override def onProcessingTime(time: Long, window: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { // 获取 count state val count = triggerContext.getPartitionedState(countStateDesc) // 获取 Interval state val fireTimestamp = triggerContext.getPartitionedState(timeStateDesc) // time default trigger if (time == window.maxTimestamp()) { val log = s"Window Trigger By maxTimeStamp: $time FireTimestamp: ${fireTimestamp.get()}" println(formatString(log)) count.clear() triggerContext.deleteProcessingTimeTimer(fireTimestamp.get()) fireTimestamp.clear() fireTimestamp.add(triggerContext.getCurrentProcessingTime + interval) triggerContext.registerProcessingTimeTimer(fireTimestamp.get()) return TriggerResult.FIRE } else if (fireTimestamp.get() != null && fireTimestamp.get().equals(time)) { val log = s"TimeTrigger Triggered At: ${fireTimestamp.get()}" println(formatString(log)) count.clear() fireTimestamp.clear() fireTimestamp.add(triggerContext.getCurrentProcessingTime + interval) triggerContext.registerProcessingTimeTimer(fireTimestamp.get()) return TriggerResult.FIRE } TriggerResult.CONTINUE }4.onEventTime因为是基于 Count 和 ProcessingTime,所以 onEventTime 返回 TriggerResult.CONTINUE5.clear清空 Count 和 FireTimestamp 对应的 ReduceValue三.代码实践1.主函数上文 CountTrigger 和 ProcessingTimeTrigger 的 Soucre 都是固定的数据来源,每 s 发送30条数据,为了验证 CountAndProcessingTimeTrigger,这里采用 socket,自定义发送数据实现,本地nc -lk port 即可开启,processFunction 实现对 window 内数据的 min,max 统计和处理时间输出。object CountAndProcessTimeTriggerDemo { val dateFormat = new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss") def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env .socketTextStream("localhost", 9999) .setParallelism(1) .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(10))) .trigger(new CountAndProcessingTimeTrigger(10, 5000)) .process(new ProcessAllWindowFunction[String, String, TimeWindow] { override def process(context: Context, elements: Iterable[String], out: Collector[String]): Unit = { val cla = Calendar.getInstance() cla.setTimeInMillis(System.currentTimeMillis()) val date = dateFormat.format(cla.getTime) val info = elements.toArray.map(_.toInt) val min = info.min val max = info.max val output = s"==========[$date] Window Elem Num: ${elements.size} Min: $min -> Max $max==========" out.collect(output) } }).print() env.execute() } }2.数据验证上述 CountAndProcessintTimeTrigger 设定为 count = 10,interval = 5s编辑颜色触发方式输入流程蓝CountTrigger1-10满足count=10,触发CountTrigger红DeaultTrigger默认触发,全窗口数据为1-10绿ProcessingTimeTrigger11,12输入11,12 触发 ProcessingTimeTrigger黄DeaultTrigger13结束前输入13,全窗口数据为 11-13灰ProcessingTimeTrigger14输入14,等待 ProcessingTimeTrigger 触发白DeaultTrigger默认触发,此时全窗口数据为 14这里解释下为什么 interval 是 5s,但是 ProcessingTimeTrigger 输出日志的时间为 16 和 27,这是因为手动输入到 Socket 会有延迟,如果是机器默认发送数据,日志会修正为 15 和 25 触发 ProcessingTimeTrigger。四.更多CountTrigger 的逻辑比较简单,ProcessingTimeTrigger 这里只是一种定义方法,即窗口结束就重新设定过期时间,也可以跨窗口定义或者不设置整数时间,有兴趣可以自定义尝试。针对 CountTrigger、ProcessingTrigger 还有本文自定义的 CountAndProcessingTimeTrigger 我们都发现窗口触发时只调用了 FIRE,没有调用 FIRE_AND_PURGE 做 clear 清除操作,那窗口数据没有清除会不会越攒越多撑爆存储呢,其实也不会,WindowOperator 函数内置了默认的 onProcessingTime 方法,其内部会判断并调用 clearAllState 方法进行数据清空:编辑 WindowOperator 作为所有窗口处理逻辑的入口,如果我们的 Trigger 返回 TirggerResult.FIRE,窗口会在到达 CleanupTime 时执行 clearAllState 方法清空当前 window 所有状态;如果返回 TriggerResult.FIRE_AND_PURGE,windowOperator 则会调用 Trigger @override 的 clear 方法,例如 ProcessingTimeTrigger 会将窗口的计时器清除,而本例如果返回 FIRE_AND_PURGE 则会同时清空 count 和 fireTimestamp 对应的两个 ReduceValue 的值:编辑
一.引言上一篇文章 Python - openpyxl Excel 操作示例与实践 介绍了如何将数据自动转化至 Excel 并完成自定义标注,节省了大量人工操作的时间,但是后续如果需要将生成的 Excel 和数据发送邮件到指定同学就还需要一步人工操作时间即写邮件发邮件,非常的不奈斯,下面结合 smtplib 库实现自定义邮件的发送,从而实现 数据 -> Excel -> 邮件发送的全自动需求。理想的效果是数据以表格的形式在邮件中展示,并且完整的 Excel 在附件中:编辑二.smtplib 库相关介绍1.smtplib 库简介smtplib - Simple Mail Transfer Protocol 即简单邮件传输协议,它是一组用于由源地址到目的地址传送邮件的规则,由它来控制信件的中转方式。python 的 smtplib 提供了一种很方便的途径发送电子邮件。它对smtp协议进行了简单的封装,发送邮件只需要几个简单参数即可实现邮件的自动发送:import smtplib # 初始化服务 smtpObj = smtplib.SMTP( [host [, port [, local_hostname]]] )host服务器主机,一般由公司IP部门提供porthost 对应的端口号,一般情况为25local_hostnameSMTP 在你的本机上可以这样指定# 发送邮件 SMTP.sendmail(from_addr, to_addrs, msg[, mail_options, rcpt_options])from_addr邮件发送者地址to_addrs字符串列表,邮件发送地址msg要发送的信息,对应 email 类中的多种类型2.smtplib 库常用数据类型Type:Text发送文本信息Multipart用于发送连接多种类型的信息Application用于传输二进制数据Message用户包装 Email 信息Image用户传输图片Audio用于传输音频类数据Video用于传输视频类数据 SubType:text - plain纯文本text - htmlHTML 文档multipart - alternativeHTML邮件的HTML形式和纯文本形式,相同内容使用不同形式表示multipart - form-data主要用于提交时包含附件application/xhtml+xmlXHTML文档application/octet-stream任意的二进制数据application/pdfPDF文档application/mswordMicrosoft Word文件application/vnd.wap.xhtml+xml wap1.0+application/x-www-form-urlencoded使用HTTP的POST方法提交的表单message/rfc822RFC 822形式image/pngPNG 图像image/gifGIF 图像image/jpegJPEG 图像video/mpegMPEG 动画可以通过上一篇文章提到的 openpyxl 库进行文件类型的推断,给定一个 png 图像,guess_type 方法会自动推断对应文件的 MainType 以及 Subtype: ctype, encoding = mimetypes.guess_type('bash.png') if ctype is None or encoding is not None: ctype = 'application/octet-stream' maintype, subtype = ctype.split('/', 1) print(maintype, subtype)image png3.初始化本地服务器 (不推荐)local 模式下直接指定端口号即可,但是需要在本地启动服务器smtpObj = smtplib.SMTP('localhost', 1025) smtpObj.sendmail(sender, receivers, message.as_string())打开终端 terminal 执行:python -m smtpd -n -c DebuggingServer localhost:999发送后会有如下提示:编辑但是有个问题是我们本地一般不会配置邮箱服务器或自己的账号系统,所以推荐使用下面的非 localhost 模式。4.初始化官方服务器 (推荐👍)常用的邮箱有 qq,163,分别对应的官方服务器网址为: smtp.qq.com 和 smtp.163.com,下面初始化 qq邮箱 发送的客户端:smtpObj = smtplib.SMTP('smtp.qq.com', 25) smtpObj.ehlo() smtpObj.starttls() # token - 需要通过qq网页获取 smtpObj.login("xxxxxxx@qq.com", "校验码") smtpObj.sendmail(sender, receivers, message.as_string())使用 qq 邮箱时,client 需要 login 并验证你作为 sender 的资格,第一个参数为你的 qq邮箱地址,第二个校验码需要通过邮箱配置获取,下面看一下获取步骤:A.进入网页版 qq 邮箱编辑 B.选择账户选项编辑C.配置 Smtp下拉账户菜单栏到如下位置,可以看到除了 SMTP 还有很多邮箱服务,开启 SMTP 服务编辑D.发送验证码开启上述 SMTP 服务后会要求你发送验证信息到官方,成功发送后点击 '我已发送' 即可拿到官方发送的校验码,复制粘贴保存下来。编辑网易 163邮箱的配置方法和上面大同小异这里不多赘述。三.smtplib 库常用操作通过上面第二步操作我们已经初始化好 SMTP 的邮件服务器,邮件的常规操作大致如下:A.写字 ✍️B.发链接 🔗C.发图 ⛰D.发附件 📃E.发表格 📚首先初始化 QQ 邮箱服务器,然后一一实践:smtpObj = smtplib.SMTP('smtp.qq.com', 25) smtpObj.ehlo() smtpObj.starttls() # token - 需要通过qq网页获取 smtpObj.login("sender@qq.com", "token")1.写字 ✍️写字主要使用 Text type 下的 plain 模式即可:subject = 'Python SMTP 测试邮件标题' message = MIMEText('Python 邮件发送测试...', 'plain', 'utf-8') message['From'] = Header("BITDDD", 'utf-8') # 发送者 message['To'] = Header("测试账户", 'utf-8') # 接收者 message['Subject'] = Header(subject, 'utf-8') sender = 'sender@qq.com' receivers = ['receiver@qq.com'] smtpObj.sendmail(sender, receivers, message.as_string())执行后在 reveiver 对应的 qq 邮箱处收到如下信息:编辑2.发链接 🔗发链接的操作其实很常见,很多验证信息,广告都是通过网页链接实现:mail_msg = """ <p>Python 邮件发送链接测试...</p> <p><a href="https://blog.csdn.net/BIT_666?type=blog">BITDDD</a></p> """ subject = 'Python SMTP 测试发送链接标题' message = MIMEText(mail_msg, 'html', 'utf-8') message['From'] = Header("BITDDD", 'utf-8') message['To'] = Header("测试账户", 'utf-8') message['Subject'] = Header(subject, 'utf-8') smtpObj.sendmail(sender, receivers, message.as_string())发送后如下,可以修改 mail_msg 里 >BITDDD< 的内容作为链接的中文描述,例如恭喜你中奖了之类的,这样有同学就会误点这个链接了~编辑3.发图 ⛰编辑编辑msgAlternative = MIMEMultipart('alternative') msgRoot.attach(msgAlternative) mail_msg = """ <p>Python 邮件发送图片测试...</p> <p>图片演示:</p> <p><img src="cid:image1"></p> <p>图片演示:</p> <p><img src="cid:image2"></p> """ msgAlternative.attach(MIMEText(mail_msg, 'html', 'utf-8')) # 指定图片为当前目录 fp = open('A.jpg', 'rb') msgImage = MIMEImage(fp.read()) fp.close() # 定义图片 ID,在 HTML 文本中引用 msgImage.add_header('Content-ID', '<image1>') msgRoot.attach(msgImage) # 指定图片为当前目录 fp = open('B.png', 'rb') msgImage2 = MIMEImage(fp.read()) fp.close() # 定义图片 ID,在 HTML 文本中引用 msgImage2.add_header('Content-ID', '<image2>') msgRoot.attach(msgImage2) smtpObj.sendmail(sender, receivers, msgRoot.as_string())这里采用 MIMEMultipart 添加两幅图片, 如果需要继续添加,修改 mail_msg 里的信息,并在后面 attach 相关图片的二进制信息即可:编辑 4.发附件 📃发邮件很多时候需要附带 word、excel、txt 等文件信息,下述示例将添加两个附件并发送,如果有更多附件也可以模仿累加即可, Content-Disposition 里的 filename 对应该文件在邮件中展示的名字:message = MIMEMultipart() message['From'] = Header("BITDDD", 'utf-8') message['To'] = Header("测试账户", 'utf-8') subject = 'Python SMTP 邮件附件测试' message['Subject'] = Header(subject, 'utf-8') #构造附件1,传送当前目录下的 test.txt 文件 att1 = MIMEText(open('test1.txt', 'rb').read(), 'base64', 'utf-8') att1["Content-Type"] = 'application/octet-stream' att1["Content-Disposition"] = 'attachment; filename="test1.txt"' message.attach(att1) # 构造附件2,传送当前目录下的 runoob.txt 文件 att2 = MIMEText(open('test2.txt', 'rb').read(), 'base64', 'utf-8') att2["Content-Type"] = 'application/octet-stream' att2["Content-Disposition"] = 'attachment; filename="test2.txt"' message.attach(att2) smtpObj.sendmail(sender, receivers, message.as_string()) print("邮件发送成功")执行后获得如下邮件:编辑5.发表格 📚表格为上一篇文章得到的自定义数据报表:编辑msgRoot = MIMEMultipart('mixed') msgRoot['From'] = Header("BITDDD", 'utf-8') # 发送者 msgRoot['To'] = Header("测试账户", 'utf-8') # 接收者 subject = 'Python SMTP 邮件表格测试' msgRoot['Subject'] = Header(subject, 'utf-8') df = pd.read_excel("test123.xlsx") # 添加表格 html_msg = get_html_msg(df.to_html(escape=False)) content_html = MIMEText(html_msg, "html", "utf-8") msgRoot.attach(content_html) smtpObj.sendmail(sender, receivers, msgRoot.as_string())通过 DataFrame 的 to_html 方法获取其对应的 HTML 格式,并添加到 Text - html 类下,执行后收到如下邮件: 编辑Tips:get_html_msg 函数如下,内置了生成 html-table 的标准语法,如果需要添加多个表格,可以增加多个 head 和 body 标注表格类型,从而展示多张数据表:def get_html_msg(df_html): head = \ """ <head> <meta charset="utf-8"> <STYLE TYPE="text/css" MEDIA=screen> table.dataframe { border-collapse: collapse; border: 2px solid #a19da2; /*居中显示整个表格*/ margin: auto; } table.dataframe thead { border: 2px solid #91c6e1; background: #f1f1f1; padding: 10px 10px 10px 10px; color: #333333; } table.dataframe tbody { border: 2px solid #91c6e1; padding: 10px 10px 10px 10px; } table.dataframe tr { } table.dataframe th { vertical-align: top; font-size: 14px; padding: 10px 10px 10px 10px; color: #105de3; font-family: arial; text-align: center; } table.dataframe td { text-align: center; padding: 10px 10px 10px 10px; } body { font-family: 宋体; } h1 { color: #5db446 } div.header h2 { color: #0002e3; font-family: 黑体; } div.content h2 { text-align: center; font-size: 28px; text-shadow: 2px 2px 1px #de4040; color: #fff; font-weight: bold; background-color: #008eb7; line-height: 1.5; margin: 20px 0; box-shadow: 10px 10px 5px #888888; border-radius: 5px; } h3 { font-size: 22px; background-color: rgba(0, 2, 227, 0.71); text-shadow: 2px 2px 1px #de4040; color: rgba(239, 241, 234, 0.99); line-height: 1.5; } h4 { color: #e10092; font-family: 楷体; font-size: 20px; text-align: center; } td img { /*width: 60px;*/ max-width: 300px; max-height: 300px; } </STYLE> </head> """ body = \ """ <body> <div align="center" class="header"> <!--标题部分的信息--> <!-- <h1 align="center">我的python邮件,使用了Dataframe转为table </h1> --> </div> <hr> <div class="content"> <!--正文内容--> <h2>第一个Dataframe</h2> <div> <h4></h4> {df_html} </div> <hr> <p style="text-align: center"> <!-- —— 本次报告完 —— --> </p> </div> </body> """.format(df_html=df_html) html_msg = "<html>" + head + body + "</html>" fout = open('table.html', 'w', encoding='UTF-8', newline='') fout.write(html_msg) return html_msg四.smtplib 实践上一篇文章通过 openpyxl 库实现了原始数据到 Excel 的转化:编辑下面结合 smtplib 库实现 Excel 到邮件的转化:编辑这里先分析下需要做哪些事情:A.添加邮件图片 ①B.添加 DataFrame ②C.添加对应 DataFrame 的 Excel 附件 ③完整代码:#!/usr/bin/python # -*- coding: UTF-8 -*- import smtplib from email.mime.image import MIMEImage from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from email.header import Header import pandas as pd import numpy as np from openpyxl.packaging.manifest import mimetypes # 发送者邮箱 sender = 'sender@qq.com' receivers = ['receiver@qq.com'] # ===============================设置邮件标题============================== fromTitle = "BITDDD" receiverTitle = "测试账户" msgRoot = MIMEMultipart('mixed') msgRoot['From'] = Header(fromTitle, 'utf-8') # 发送者 msgRoot['To'] = Header(receiverTitle, 'utf-8') # 接收者 # 邮件主题 subject = 'Python SMTP 邮件测试 By BITDDD' msgRoot['Subject'] = Header(subject, 'utf-8') # ===============================连接服务器============================== smtpObj = smtplib.SMTP('smtp.qq.com', 25) smtpObj.ehlo() smtpObj.starttls() smtpObj.login("sender@qq.com", "token") # ===============================添加图片============================== fp = open('excel.jpg', 'rb') msgAlternative = MIMEMultipart('alternative') mail_msg = """ <p>Python 邮件发送测试...</p> <p>图片演示:</p> <p><img src="cid:image1"></p> """ msgAlternative.attach(MIMEText(mail_msg, 'html', 'utf-8')) # 指定图片为当前目录 msgImage = MIMEImage(fp.read(), subtype) msgImage.add_header('Content-ID', '<image1>') fp.close() msgAlternative.attach(msgImage) msgRoot.attach(msgAlternative) # ===============================添加df============================== df = pd.read_excel("test123.xlsx") # 添加表格 html_msg = get_html_msg(df.to_html(escape=False)) content_html = MIMEText(html_msg, "html", "utf-8") msgRoot.attach(content_html) # =============================添加附件=============================== attachCsv = MIMEText(open(savePath, 'rb').read(), 'base64', 'utf-8') attachCsv["Content-Type"] = 'application/octet-stream' attachCsv["Content-Disposition"] = 'attachment; filename="test.xlsx"' msgRoot.attach(attachCsv) smtpObj.sendmail(sender, receivers, msgRoot.as_string())这里补充一下添加混合类型 MIMEMultipart 时的几种模式:mixed混合类型related内嵌资源如附件alternative文本与超文本上述示例同时使用了 图片、附件、超文本 ,初始化可以采用 mixed 、alternative,如果使用 related ,则会解析异常,DataFrame 的数据会变成二进制 bin 文件发送到 receiver 邮箱中: 编辑五.smtplib 抄送除了发送邮件外,有时还需要抄送其他同学,smtplib 同样支持该操作。A.单独发送sender = 'xxxxA@qq.com' receivers = ['xxxxB@qq.com'] msgRoot = MIMEMultipart('alternative') msgRoot['From'] = Header(fromTitle, 'utf-8') # 发送者 msgRoot['To'] = ','.join(receiverTitle) # 接收者 # 邮件主题 subject = 'Python SMTP 邮件测试 By BITDDD' msgRoot['Subject'] = Header(subject, 'utf-8') smtpObj.sendmail(sender, receivers, msgRoot.as_string())B.设置抄送sender = 'xxxxA@qq.com' receivers = ['xxxxB@qq.com'] cc = ['xxxxC.com', "xxxxD.com"] msgRoot = MIMEMultipart('alternative') msgRoot['From'] = Header(fromTitle, 'utf-8') # 发送者 msgRoot['To'] = ','.join(receiverTitle) # 接收者 msgRoot['Cc'] = ','.join(cc) # 抄送者 # 邮件主题 subject = 'Python SMTP 邮件测试 By BITDDD' msgRoot['Subject'] = Header(subject, 'utf-8') smtpObj.sendmail(sender, receivers + cc, msgRoot.as_string())可以看到增加抄送总共分三步:-> 添加抄送列表 cc-> 将 cc 添加至 msg 中-> sendmail 写成 reveiver + cc编辑六.总结通过 openpyxl + smtplib 实现了数据到表格到邮件的转化,网页版邮箱接收会归类到 广告 或者 垃圾邮件中,需要手动找一下,不过手机上接收没有问题,非常的奈斯~编辑
一.引言上一遍文章介绍了二进制与十进制数字之间的转换,本文介绍现在应用比较广泛的浮点数标准 IEEE754。二.IEEE754 简介1.整体介绍编辑IEEE754 代表二进制浮点算数标准,一般常用的为单精确度32位以及双精确度64位,还有不常用的延伸单精度43位以及延伸双精确度79位,Scala 常用的 Float 和 Double 分别采用了 IEEE754 的单精度32位和双精确度64位的标准。其中包含 Sign + Exponent + Fraction 三个值:SIgn:符号位,0代表正,1代表负,很多时候正数会省略第一位的符号位0Exponent:阶码或阶数,代表指数位Fraction:分数值,对应的M为尾数,表示浮点数的有效数字2.公式对于 32 位的单精度浮点数,IEEE754 表示为:编辑对于 64 位的单精度浮点数,IEEE754 表示为:编辑SIgn:其中第一位代表符号位即正负数M:其中 M ∈ [1,2),写成 1.xxxx 的形式,由于二进制数字保存时第一位总是1,所以此处只需要保留 xxxx 即可,所以这里采用了 1+M 的形式,这样可以节省1位存储位置Exponent:32 位的情况下阶码 E 的取值范围为 8 bit,对应 32 位中的 2-9 位;64 位的情况下阶码E 的取值范围为 11bit,对应 64 位中的 2-12 位。以单精度浮点数为例,它的指数域为 8 bit,固定偏移值为 2^{8-1} - 1 = 127,IEEE754 约定阶码在添加时需要加上对应的偏移量,所以出现了公式最后的表述: E-127,同理 64 位需要增加偏移量 1023。这边的解释看着不太好理解,一会通过示例可以轻松搞定~3.表达形式针对 IEEE754 的上述二进制表达方式,其表述的数字主要分为3个类型:A.规约形式当阶码 E 的二进制值不全为 0 或者 1 时,所表示的值为规格化的值,或者规约形式的浮点数。B.非规约形式当阶码 E 的二进制全为 0,所表示的值为非规格化的值,或者非规约形式的浮点数,此时浮点数的指数 E=1-127 / E=1-1023,有效数字 M 不再加第一位的1,而是还原为 0.xxxx 的形式,从而表示 ±0 或者很接近 0 的小数字。C.特殊形式± 无穷:阶码 E 全为1时且有效数字 M 全为0,根据 S 的大小表示正负无穷大NaN:当 E 全为 1 时,如果有效数字 M 不全为 0,表示这个浮点数不是一个数,即为 NaN三.Double 转换为 IEEE754 十进制浮点数转化为 IEEE754 对应浮点标准数需要首先将十进制浮点数换算为常规二进制表达形式,然后根据 IEEE754 对应的 value 形式并根据 S+E+M 的顺序转化为 IEEE754 的32位、64位形式。1.单精度 Float 转换为 IEEE754 (手工版)给定上一篇文章的示例 Float = 66.59375 ,其二进制对应编码为 1000010.10011,其转换为标准形式为 1.00001010011 * 2^{6} 下面套用公式:编辑S: 66.59375 为正数,所以 s=0M:1 + M = 1.00001010011 所以 1.xxxx 的形式下,M=0.00001010011E: 2^{E-127} = 2^{6} 推出 E-127 = 6 推出 E=133,133 的二进制形式为 10000101根据 S + E + M 的形式进行拼接:IEEE754 66.59375 F = 0 + 10000101 + 00001010011 + 补 0 至 32 位=> 10000101000010100110000000000002.双精度 Double 转换为 IEEE754 (手工版)依旧使用上面示例 Double = 66.59375 = 1000010.10011 = 1.00001010011 * 2^{6},套用公式:编辑S:66.59375 为正数,所以 s=0M:1 + M = 1.00001010011 所以 1.xxxx 的形式下,M=0.00001010011E:2^{E-1023} = 2^{6} 推出 E-1023 = 6 推出 E=1029,1029 的二进制形式为 10000000101根据 S + E + M 的形式进行拼接:IEEE754 66.59375 D = 0 + 10000000101 + 00001010011 + 补 0 至 64 位=> 01000000010100001010011000000000000000000000000000000000000000003.Float / Double 转换为 IEEE754 (代码版)代码的实现主要复刻了上述的手动过程,但是代码并未考虑非规约和特殊值的情况,所以只使用一些常见的规约类型浮点数,主要过程分 3 步:A.根据 num 的正负值判断 s 的值B.排除第一位1,并根据后续的数字获取有效数字 MC.通过 e_dec 计算有效数字 E 的原始值,再根据 Float 或者 Double 的公式对 E 增加偏移量获取 真实 ED. 根据 S + E + M 的顺序并补 0 得到最终的结果,如果不好记可以和 SIM 卡的谐音记在一起def doubleToIEEE754(num: Double, StringType: String): String = { val binaryString = doubleToBin(num) val s = if (num >= 0) { 0 } else { 1 } val m = binaryString.replace(".", "").slice(1, binaryString.length - 1) val e_dec = binaryString.split("\\.")(0).length - 1 val e = if (StringType.toUpperCase().equals("F")) { // V = (-1)^s *(1+M)* 2^(E-127)(单精度) (e_dec + 127).toBinaryString } else if (StringType.toUpperCase().equals("D")) { // V = (-1)^s *(1+M)* 2^(E-1023)(双精度) (e_dec + 1023).toBinaryString } else { "NULL" } val IEEE754String = if (e != "NULL") { val re = s + e + m val length = if (StringType.equals("D")) 64 else if (StringType.equals("F")) 32 else re.length re + repeatString("0", length - re.length) } else { "" } IEEE754String }试验一下上面的示例:val num = 66.59375 println(doubleToIEEE754(num, "D")) println(doubleToIEEE754(num, "F"))0100000001010000101001100000000000000000000000000000000000000000 010000101000010100110000000000004.Float / Double 转换为 IEEE754 (官方 API 版)java 为 Float 和 Double 提供了转化 IEEE754 的 API:Float 32 位:这里要求 num 是 Float 类型val bitF = java.lang.Integer.toBinaryString(java.lang.Float.floatToRawIntBits(num))Double 64 位:这里要求 num 是 Double 类型val bitD = java.lang.Long.toBinaryString(java.lang.Double.doubleToRawLongBits(num)) 不管是手工推导还是代码版本大家都可以和官方 API 得到的结果进行验证,这里需要注意下官方 API 在 num 是正数的情况下得到的结果长度分别为 31 位和 63 位,因为第一位代表正数的 0 自动省略了。四. IEEE754 转换为 Double上面介绍了 Double 转换为 IEEE754 的过程,其中需要二进制的数字进行中间的过度,同样 IEEE754 转换为 Double 也需要二进制数字的转化:A.将 IEEE754 根据 Float / Double 的位数,截出 S + E + MB.根据 Value 的公式,将 S、E、M 代入公式得到对应的二进制形式C.将二进制形式的浮点数转化为十进制,完成 double 的转化def IEEE754ToDouble(binaryString: String, stringType: String): Double = { if (stringType.toUpperCase().equals("F")) { // V = (-1)^s *(1+M)* 2^(E-127)(单精度) val s = binaryString.slice(0, 1) val e = binaryString.slice(1, 9) val m = binaryString.slice(9, binaryString.length) var binFloat = if (e.equals("00000000")) { m } else { "1" + m } val cut = binToInteger(e) - 127 binFloat = binFloat.slice(0, cut+1) + "." + binFloat.slice(cut+1, binFloat.length) val floatNum = binToDouble(binFloat) if (s.equals("0")) { floatNum } else { -1 * floatNum } } else if (stringType.toUpperCase().equals("D")) { // V = (-1)^s *(1+M)* 2^(E-1023)(双精度) val s = binaryString.slice(0, 1) val e = binaryString.slice(1, 12) val m = binaryString.slice(12, binaryString.length) var binDouble = if (e.equals("00000000000")) { m } else { "1" + m } val cut = binToInteger(e) - 1023 binDouble = binDouble.slice(0, cut+1) + "." + binDouble.slice(cut+1, binDouble.length) val doubleNum = binToDouble(binDouble) println(binDouble, doubleNum) if (s.equals("0")) { doubleNum } else { -1 * doubleNum } } else { println("请输入正确的模式!") Double.NaN } }这里并未考虑非规约和特殊值的情况,并且对 Double 的范围也并不完全支持,只是一个思路的拓展,有兴趣的同学可以深化一下该方法~五.验证下面基于上述的互转方法和 API 进行调用和验证:// 双精度 && 单精度 println(repeatString("=", 50)) val floatBit = java.lang.Integer.toBinaryString(java.lang.Float.floatToRawIntBits(num.toFloat)) val floatBitDiy = doubleToIEEE754(num, "F") val floatNum = IEEE754ToDouble("0" + floatBit, "F") println(s"Num: $num FloatNum: $floatNum 单精度: $floatBit 长度: ${floatBit.length}") println("API:" + floatBitDiy) println("DIY:" + "0" + floatBit) println(repeatString("=", 50)) val doubleBit = java.lang.Long.toBinaryString(java.lang.Double.doubleToRawLongBits(num)) val doubleBitDiy = doubleToIEEE754(num, "D") val doubleNum = IEEE754ToDouble("0" + doubleBit, "D") println(s"Num: $num DoubleNum: $doubleNum 双精度: $doubleBit, 长度: ${doubleBit.length}") println("API:" + doubleBitDiy) println("DIY:" + "0" + doubleBit) println(repeatString("=", 50))一些常规的规约数的互相转化还是可以和 API 对应的: 编辑 上面常用的 repeatString 函数为:def repeatString(char: String, n: Int): String = List.fill(n)(char).mkString 其余 BinToInteger 、IntegerToBin,DecimalToBin 以及 BinToDecimal 可以参考之前的文章:二进制与十进制数字之间的转换。六.总结上面通过演示和代码对 IEEE754 浮点数标准做了一些基本的分解,主要就是十进制、二进制还有公式的分解与代入,除此之外对非规约值和特殊值并没有深入探讨,后续有机会可以继续深入。
一.引言现有超市用户购物数据表一张,其字段与数据如下:编辑用户 id 为连续数字编号,性别分别用 0、1表示,年龄使用 xxs 表示,cost 代表用户在超市的消费总额:1 0 00s 100 20220505 2 1 90s 200 20220505 3 1 00s 300 20220505 4 1 70s 400 20220505 5 0 60s 500 20220505 6 0 80s 600 20220505 7 1 80s 700 20220505 8 0 60s 800 20220505 9 0 70s 900 20220505 10 1 10s 1000 20220505下面使用 group by 和 grouping sets 对表中数据进行提取。二.Group By对 hive 表中的数据进行分组汇总时常用 group by 语法1.按年龄分组select user_age,sum(buy_cost) from user_purchasing_info group by user_age;00s 400.0 10s 1000.0 60s 1300.0 70s 1300.0 80s 1300.0 90s 200.02.按性别分组select user_gender,user_age,sum(buy_cost) from user_purchasing_info group by user_gender,user_age;0 2900.0 1 2600.03.按性别、年龄分组 按照多个变量进行 group by 时需要注意 group by 后的字段也需要全部添加:select user_gender,user_age,sum(buy_cost) from user_purchasing_info group by user_gender,user_age;0 00s 100.0 0 60s 1300.0 0 70s 900.0 0 80s 600.0 1 00s 300.0 1 10s 1000.0 1 70s 400.0 1 80s 700.0 1 90s 200.0三.Grouping Sets1.Grouping Sets 示例grouping sets 方法提供一个 () sets,内部支持添加 group by 字段的不同组合,按不同维度分组聚合后,最后将数据合并,类似于根据 sets 内的字段进行多次 group by 操作再最后执行 union all。下面对超市购买表进行聚合,一次性查询按年龄、性别、年龄&性别的聚合结果:select user_gender,user_age,sum(buy_cost) from user_purchasing_info where dt='20220505' group by user_gender,user_age grouping sets((user_gender),(user_age),(user_gender, user_age));编辑可以与上述查询结果相对应,其中蓝框内的结果为按年龄 user_age 进行分组的结果,红框内为按性别 user_gender + 年龄 user_age 进行分组的结果,剩余两行为按性别 user_gender 进行分组的结果。2.Grouping Sets 实战上面示例展示了一次获取不同分组的数据方法,实战场景下用法例如给定数据求按照某个维度的分数据和汇总总数据,对应到上述超市数据即求当天营业额按性别 + 年龄分组和总和数据:select concat_ws('_',nvl(user_gender,'total'),nvl(user_age,'total')),sum(buy_cost) from user_purchasing_info where dt='20220505' group by user_gender,user_age grouping sets((user_gender),(user_gender, user_age),());通过 concat_ws 和 nvl 函数连接了分组字段并使用 nvl 函数排除 NULL 的数据,非常的实用 👍 total_total 5500.0 0_total 2900.0 0_00s 100.0 0_60s 1300.0 0_70s 900.0 0_80s 600.0 1_total 2600.0 1_00s 300.0 1_10s 1000.0 1_70s 400.0 1_80s 700.0 1_90s 200.0四.总结简言之 Grouping Sets = Group By + Union,有几个点需要注意:A.NULL 值上面 Grouping Sets 中分别采用 A、B、A+B 的字段聚合,但是 Select 时 A,B 都添加了,所以对应单独 GroupBy A 或者 B 的数据,最终结果会默认补 NULL,所以上面结果中会有很多 NULL 值。B.空 Set ()Grouping Sets 实战中,我们添加了一个空 Sets(),其等价于:select sum(buy_cost) from user_purchasing_info where dt='20220505';即对整个表计算 sum,此时 user_age、user_gender 都为 NULL,通过 nvl 函数转换为 total_total,所以最终数据得到 total_total: 5500.0 的数据。C.分组函数除了 Grouping Sets 这个聚合功能函数外,类似的函数还有 Grouping_Id、CUBE、RollUp,可以参考:Hive - Cube, Rollup, GroupingId 示例与详解。
一.引言使用 mvn package 打包时,对应项目 jar 包正常大小为 70 MB 左右,今天切换分支并简单修改代码后打包大小变为 120 MB 左右,遂开始排查之旅。二.问题与纠错1.问题定位首先直接 vim 查看 jar 包内容,通过比较发现 70M 和 120M jar 包,二者主要相差在 静态文件 上,这些文件为之前版本 resource 文件夹内的文件。编辑 所以第一步定位完成,jar 包容量增大是因为增加了很多 resource 文件夹内容。2.问题分析<resources> <resource> <directory>src/main/resources</directory> <includes> <include>**/*</include> <include>**/*</include> <include>**/*</include> </includes> <filtering>false</filtering> </resource> </resources>项目 maven 中配置了 resources 相关设置,所以 resources 文件夹内文件打到 jar 包为正常现象。经过比对发现新增文件均为之前 resources 文件夹内删除的文件 ,考虑到今天 git checkout 切换到了之前老的分支,而老分支的无关文件还未从 resources 文件夹中删除,因此老文件的引入是切换老分支导致的。3.问题解决解决这个问题只需要在新分支 mvn package 前执行 mvn clean,清除 target 文件,随后打包即可恢复新版本删除 resource 文件的小包情况。未使用 mvn clean 打新包:编辑使用 mvn clean 打新包:编辑可以看到未 clean 的包打入 228 个 resources 文件,而 clean 的新版只打入 145 个文件,从而导致了 jar 包的变大。这里 mvn clean 的作用是清除 target 文件夹,由于 ignore 中添加了忽略 ./target,所以切换分支时 target 文件夹不受影响,而 maven 打包的资源来自 target 文件夹下的 class 文件夹:编辑切换老分支后打包就会保留老的 resources 文件到 classer 文件夹下,所以再切换新分支打包会依然使用 classes 文件夹从而导致 jar 包变大,mvn clean 命令清除 target 文件重新打新包时会将新包的 resources 文件 copy 到 target/class 文件夹下,所以新 jar 包容量变小。三.总结在 resources 文件夹修改的情况下,切换新老版本打包发生 jar 包容量异常增加时,执行下述命令即可解决:mvn clean package
一.引言Flink 程序内有读取 hbase 的需求,近期任务启动后偶发 sink 端背压 100% 导致无数据写入下游且无明显 exception 报错,重启任务后有较大概率恢复服务,但也有可能继续背压 100% 从而堵塞任务,遂开始排查。二.问题描述程序执行一段时间后,查看监控发现 Source + Process + Sink 端 back pressure 背压全部达到 100% ,很明显是数据发生堵塞编辑查看 on-cpu 无堆栈显示因此排除 cpu 问题,需要进一步查看任务执行、IO、网络等问题,随后查看 off-cpu 的 Flame Graph 看到堆栈最终定位在:org.apache.hadoop.hbase.util.RetryCounter.sleepUntilNextRetry编辑三.问题分析1.堆栈分析上述任务定位在 hbase 的 retryCounter.sleepUnitlNextRetry ,虽然没有看过 Hbase 的源码,但是根据这个堆栈信息大致可以判断是 hbase 读取时遇到问题导致:retry 重试 + sleepUntilNextRetry 等待并重试编辑 二者结合导致任务卡死从而数据流处理堵塞,再影响后续数据,从而导致背压全部达到 100%。2.代码定位off-cpu 的 root 调用为下述语句,非常基本的 hbase get 操作:Result result = hbaseTable.get(sampleGet);按照堆栈看一下底层源码:可以看到 Try 内逻辑真正执行的只有 1行,即 checkZk() 随后 getData(),本地测试 Get 没有问题,所以只能定位到 checkZk() 这里。编辑下面看一下 checkZk 主要负责什么事情:编辑checkZk 初始化新的 Zookeeper,如果初始化失败则返回 unable to create Zookeeper Connection,所以上面集群 hbase 无法获取数据基本定位在 Zk 创建失败。3.问题解决zookeeper 连接失败导致 Hbase Client 初始化失败从而数据无法获取导致 RetryAndSleep,一般服务器无法创建连接都因为访问过多导致,即服务过载,例如 JedisPool 的 resource,其使用有限制,超过后将无法获取连接从而导致获取数据失败。查看 hbaes 对应 zk 下的服务器连接情况:编辑看到某个 ip 下存在大量 zk 连接,通过查询 zk server 的配置,查看当前单台客户端允许的最大连接数已全部被该 ip 占用,从而导致我的 Flink 程序无法初始化 zk。所以下面只需要解决这里连接过多的问题,经过排查发现该 ip 下对应 java 任务存在 zookeeper 泄露,即代码逻辑内不断申请 zookerper 连接,从而导致连接数过多,修改后空闲连接数上升,Flink-Hbase 服务也正常运行。三.总结1.Flink 问题定位Flink 发生问题第一步查看 Excpetion,如果没有 Exception 就查看 Flame Graph,根据 on-cpu 和 off-cpu 的堆栈定位是 cpu 的问题还是自身代码的问题。2.客户端初始化Flink 初始化客户端的代码在 ProcessFunction 的 open 函数内,该方法可以保证一个 TaskManger 只初始化一个 Hbase Connection,所以很难突破单台机器初始化 zk 的限制,同学们在执行任务时也需要注意初始化无论 Hbase,Jedis 等客户端最后不要频繁初始化以及初始化过多。编辑 这里我初始化了 35 个 TaskManager,每个 Manger 上只初始化了一个 connection。3.重启解决问题上面有一个现象是我的任务重启后有一定概率恢复正常,通过上面的问题排查我们也可以得到答案,由于某 ip 下占用过多 connection,如果我的任务恰巧提交到该任务对应的机器,则我的任务无法获取连接导致堵塞,而如果任务提交恰好避开该 ip 对应的机器则代码执行无误,所以任务重启会有一定概率修复。
一.引言Flink 流处理用于处理源源不断的数据,之前介绍过 processFunction,该方法会对单个元素进行处理,除此之外,还有一种批量数据处理的方法就是 TimeWindow 以及 TimeWindowAll,Flink 时间窗口可以看作是对无线数据流设置的有限数据集,即流处理框架下的批处理。窗口下又分为 CountWindow 和 TimeWindow,之前介绍窗口 Trigger 已经介绍过,有兴趣的同学可以回看。本文主要介绍 TimeWindow 且示例均采用 ProcessingTime。二.TimeWindow 简介Flink 的窗口采用左闭右开,其根据定义的时间范围自定义生成窗口范围,常用的有:Tumbling Window - 滚动窗口Sliding Window - 滑动窗口Session Window - 会话窗口1.Tumbling Window - 滚动窗口滚动窗口下各个窗口之间不重叠,且窗口的时长固定,根据 ProcessingTime 或者 EventTime 可以分别创建对应的 TumblingProcessingTimeWindows 与 TumblingEventTimeWindows,窗口的长度可以使用 org.apache.flink.streaming.api.windowing.time.Time 设定 seconds、minutes、hours 、days。编辑根据到来的元素,窗口划定相同时间范围进行元素圈定并生成窗口,由于时间是连续的,所以滚动窗口的窗口前后重合且不会丢失元素。Tips:滚动窗口的时间根据 Time 的设定自动生成范围,例如设置 10s 的滚动窗口:TumblingProcessingTimeWindows.of(Time.seconds(10)以 18:00 开始为例,Flink 会自动生成如下左闭右开的时间窗口,窗口的开始和结束分别对应 window.getStart 和 window.getEnd。18:00:00 - 18:00:10 ,18:00:10 - 18:00:20 ... 18:59:50 - 19:00:002. Sliding Window - 滑动窗口滑动窗口以步长 Slide 不断向前滑动,然后生成 Size 大小的窗口。Slide 决定窗口的生成速度,Slide 较大时每个窗口的范围很大,窗口数量很少,反之 Slide 较小时则会生成的窗口数量会很多。Flink 可以用过如下代码设置滑动窗口:SlidingProcessingTimeWindows.of(Time.minutes(10), Time.seconds(5)Slide > Size:当 Slide > Size 时,滑动窗口会有一部分元素不在 Size 内,从而导致元素丢失。编辑Slide < Size:当 Slide < Size 时,两个滑动窗口 Size 窗口内可能包含同一元素,从而导致元素重复。编辑 Slide = Size:当 Silde = Size 时,滚动窗口变为滑动窗口。编辑3.Session Window - 会话窗口上面两种为常见的窗口模式,还有一种窗口使用较少即 Session Window,该模式下两个窗口之间有一个间隙,称为 Session Gap。当一个窗口 Session Gap 时间内没有收到数据,则窗口关闭。可以通过如下代码设置 Session Window:ProcessingTimeSessionWindows.withGap(Time.minutes(10)) 也可以调用 .withDynamicGap 和 SessionWindowTimeGapExtractor 设置动态的 Session Gap。编辑可以看到由于 Session Window 受数据源的连续性影响,窗口的大小、窗口的起止时间都是不确定的。 三.TimeWindow 与 TimeWindowAll不论是 TimwWindow 还是 TimeWindowAll ,二者都使用上述的 Window 模式,唯一不同的是二者的使用场景。1.TimeWindowTimeWindow 适用于 keyedStream,数据在 keyBy 分流后,window 将不同的 key 分开并生成多个 window,所以 TimeWindows 是并行处理的。数据流每s生成一批 Data 类数据,并累加对应类中的数值 num,数据的并行度为5:dataStream.keyBy(data => { data.num.toString.slice(0, 1) }).window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .process(new ProcessWindowFunction[Data, String, String, TimeWindow]{ override def process(key: String, context: Context, elements: Iterable[Data], out: Collector[String]): Unit = { val log = key + "\t" + elements.toArray.mkString(",") val taskId = getRuntimeContext.getIndexOfThisSubtask out.collect(taskId + "\t" + log) } }).print()红框:其中第一列红框为 TaskId,由于设置并行度为5,所以 TaskId 的值分别为 0,1,2,3,4,这也说明了 TimeWindow 是并行执行,将不同的 key 划分至不同 window绿框:第二列绿框为 TimeWindow 归拢元素对应的 key,我们根据 num 数字的第一位 keyBy,所以其值为 0,1,2,3,4,5,6,7,8,9蓝框:第三列蓝框为 key 对应的数据,可以看到对应数据的第一位均与对应 window 的 key 完全一致,所以这里可以看做是将数据 GroupBy 并汇总至不同 window编辑Tips:TimeWindow 支持并行操作,默认并行度与 SourceStream 一致,也可以手动 setParallelism 设置:dataStream.keyBy(data => { data.num.toString.slice(0, 1) }).window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .process(new ProcessWindowFunction[Data, String, String, TimeWindow]{ override def process(key: String, context: Context, elements: Iterable[Data], out: Collector[String]): Unit = { val log = key + "\t" + elements.toArray.mkString(",") val taskId = getRuntimeContext.getIndexOfThisSubtask out.collect(taskId + "\t" + log) } }).setParallelism(2).print()修改并行度为2后,可以看到打印的 TaskId 只有0,1,不再受 SourceStream 并行度为5的影响。 编辑除此之外,因为 TimeWindow 需要根据 key 划分,所以需要数据流为 keyedStream,DataStream 或者 SingleOutputStreamOperator 不支持使用 TimeWindow,只能使用 TimeWindowAll。 2.TimeWindowAllTimeWindowAll 是把所有的数据进行聚合,所以并行度只能为1。这里处理函数有不同,TImeWindow 使用 ProcessWindowFunction,TimeWindowAll 则使用 ProcessAllWindowFunction。dataStream.keyBy(data => { data.num.toString.slice(0, 1) }).windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))) .process(new ProcessAllWindowFunction[Data, String, TimeWindow] { override def process(context: Context, elements: Iterable[Data], out: Collector[String]): Unit = { val log = elements.toArray.map(_.num).mkString(",") val taskId = getRuntimeContext.getIndexOfThisSubtask out.collect(taskId + "\t" + log) } }).print()WindowAll 的数据都汇聚在同一个 Task 上,所以数据量相比之前并行的 TimeWindow 会大很多。 编辑Tips:TimeWindowAll 的并行度只能为1,因为 window 汇聚一段时间内的所有数据到一个 task 处理,如果像 TimeWindow 上面示例修改并行度会得到如下报错:编辑3.使用场景A.常见使用TimeWindow 和 TimeWindowAll 都适用于对流式数据转化做一定时间范围内的批处理,主要区别在两者的并行度,前者为 Parallel Operator 后者为 Non Parallel Operator,所以 TimeWindow 的适用范围更广,适合一些需要对数据分批分 key处理且数据量较大需要并行处理的场景;而 TimeWindowAll 汇聚一段时间内的所有数据,适合需要汇总所有数据或者数据量不大的任务,这样可以减少并发,例如任务内需要涉及到数据网络 IO,如果并行度过高则容易导致网络服务过载。B.转换TimeWindow 的并行度变成 1 则变为 TimeWindowAll;如果 TimeWindowAll 的数据实在很大,可以先通过一层 TimeWindow 做分区的汇总,随后将数据回收至 TimeWindowAll 做总的汇总,有点类似 Spark 的 groupByKey 和 reduceByKey。四.总结Flink TimeWindow 以及 TimeWindowAll 的基本介绍大致就这些,除了 window 的使用外,还涉及到 window triger 即 window 的触发方式,有需要的同学可以查看:Flink - Scala/Java trigger 简介与使用。
一.引言拷贝脚本提交后报错 line 2: $'\r': command not found,但是这是别的同学可以运行后发给我的,随后开始排查。编辑二.问题解决任务执行错误是因为原始脚本是在 windows 环境下编写而本机 linux 不识别导致,因此需要将对应文件转化为 linux 版,执行下述命令转换文件 format:dos2unix file随后提示:dos2unix: converting file file to Unix format ...同理,如果你的文件是在 linux 编写需要到 windows 环境使用:unix2dos file三.工具安装1.安装 dos2unix安装 dos2unix 需要使用 yumyum install dos2unix2.安装 yum安装 yum 需要使用 brewbrew install yum
一.引言使用 Spark 运行任务打日志经常遇到一个问题就是日志太多,除了自己的 print 日志之外,还有很多 Executor、client 的日志,一方面任务运行期间会占用更多的机器存储,其次也不方便查询自己的 print 日志。下面介绍下常用的日志系统与使用方法。二.常用日志系统编辑 常见的日志系统是 Log4j 和 SLF4J,以 Log4j 为例,针对某个任务设置 logLevel 使用如下语法:Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)其中 Level 包含如下几种类型:LogLevelLevelUseOFF2147483647关闭所有日志记录FATAL50000如其翻译,致命的错误ERROR40000错误信息提示,一般需要 Try CatchWARN30000潜在错误提示INFO20000正常日志信息DEBUG10000细粒度日志,用于应用调试TRACE5000比调试更细粒度的日志信息ALL-2147483648打开所有日志记录protected Level(int level, String levelStr, int syslogEquivalent) { super(level, levelStr, syslogEquivalent); } public static final Level OFF = new Level(2147483647, "OFF", 0); public static final Level FATAL = new Level(50000, "FATAL", 0); public static final Level ERROR = new Level(40000, "ERROR", 3); public static final Level WARN = new Level(30000, "WARN", 4); public static final Level INFO = new Level(20000, "INFO", 6); public static final Level DEBUG = new Level(10000, "DEBUG", 7); public static final Level TRACE = new Level(5000, "TRACE", 7); public static final Level ALL = new Level(-2147483648, "ALL", 7);每一个 level 类都对应一个 level 的 int 字符,如果设置 logLevel 在 DEBUG 级别,则低于其对应 level = 10000 的 TRACE、ALL 类型的日志则不予显示,所以可以根据自己的需求设定 level 级别。Log4j 一般建议使用的类型是 ERROR、WARNING、INFO、DEBUG。三.Spark Logging使用 Log4j 或者 SLF4j 时需要通过 XML 或者 log4j.properties 配置相关信息,还需要引入 对应的 log 依赖,经常出现 NoSuchMethod 的冲突,很不方便,spark 自 2.4+ 引入了 Logging,其基于 SLF4j 定义了一个 Trait,使得日志使用只需要继承接口即可。编辑trait Logging { // Make the log field transient so that objects with Logging can // be serialized and used on another machine @transient private var log_ : Logger = null ... }可以看到这里 Logger 用 @transient 关键字修饰保证其序列化时不会报错。1.输出不同类型日志import org.apache.spark.internal.Logging object LogUtil extends Logging { def main(args: Array[String]): Unit = { logInfo("LogInfo") logWarning("LogWarning") logError("LogError") } }使用对应类继承 Logging 类,对应的函数即可输出对应 level 的日志,LogInfo 会输出日志到 Stdout 中,LogWarning,LogError 会输出日志到 Stderr 中,可以根据自己的需要控制输出日志 level。编辑2.设置日志 Level除了使用 Logging 输出不同 level 类型日志外,spark 也支持设定 LogLevel。val sc = spark.sparkContext sc.setLogLevel("error")setLogLevel 函数允许我们定义 spark 相关日志的级别,该方法将覆盖任何用户定义的日志级别,通过字符串的类型配置,支持如下日志 level:ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN 。 def setLogLevel(logLevel: String) { // let's allow lowercase or mixed case too val upperCased = logLevel.toUpperCase(Locale.ROOT) require(SparkContext.VALID_LOG_LEVELS.contains(upperCased), s"Supplied level $logLevel did not match one of:" + s" ${SparkContext.VALID_LOG_LEVELS.mkString(",")}") Utils.setLogLevel(org.apache.log4j.Level.toLevel(upperCased)) }四.总结1.日志输出Logging 仅适用于 Spark 任务日志,下述方法单独使用 log 函数不会输出任何日志。def main(args: Array[String]): Unit = { logInfo("LogInfo") logWarning("LogWarning") logError("LogError") }2.日志类型Logging 使用 SLF4J,extends Logging 接口后,任务启动后会提示:编辑3.日志级别Spark setLevel 方法会覆盖用户定义的其他 logLevel,所以需要注意覆盖关系。编辑
一.引言现在有一批数据写入多台 Redis 相同 key 的队列中,需要消费 Redis 队列作为 Flink Source,为了提高可用性,下面基于 JedisPool 进行队列的消费。队列数据示例: 1,2,3,4,5、A,B,C,D,E,程序将字符串解析并 split(",") 然后分别写到下游。二.Flink Source By JedisPool1.初始化 JedisPool由于数据量较大,所以同时写入 N 台 Redis 队列,key 均相同,注意这里是 JedisPool 不是 JedisCluster,需要区分二者的概念。def initJedisPool(host: String, port: Int): JedisPool = { val config = new JedisPoolConfig config.setMaxTotal(4) config.setMaxIdle(2) config.setMaxWaitMillis(1000) config.setTestOnBorrow(true) config.setTestOnReturn(true) jedisPool = new JedisPool(config, host, port) jedisPool }需要导入 Jedis 依赖:<dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.7.2</version> </dependency>这里 JedisPool 的几个参数可以参考之前的文章:JedisPool - Java.net.SocketException: Broken pipe (write failed),里面介绍了 TestOnBorrow、TestOnReturn 的含义。2.实现 RedisSource自定义 Source 需要继承 org.apache.flink.streaming.api.functions.source.RichSourceFunction 实现 run 方法和 cancel 方法即可,主要逻辑在 run 方法中,run 方法负责从队列中不断获取数据并 collect 到下游,除此之外,由于需要读取 redis client,所以还需要新增 open 方法进行 client 的初始化。class JedisPoolSourceTest(host: String, port: Int) extends RichSourceFunction[String] { var jedisPool: JedisPool = _ // JedisPool val listKey = "testListKey" // 公用队列 Key override def open(parameters: Configuration): Unit = { initJedisPool(host, port) // 初始化 JedisPool } // 消费 Redis 队列产出数据 override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = { collectData(sourceContext) } // 关闭 Source override def cancel(): Unit = { jedisPool.close() } }3.产出数据 CollectData 上面是继承 RichSourceFunction 完成了整体的架构,下面单独把 collectData 的数据产出逻辑分析一下:def collectData(out: SourceFunction.SourceContext[String]): Unit = { var resource: Jedis = null try { resource = jedisPool.getResource // 获取 Jedis 实例 // while True 保证持续消费 while (true) { // 解析并产出 val info = resource.lpop(listKey) if (info != null) { val sendRe = info.split(",") if (sendRe.nonEmpty) { sendRe.foreach(out.collect) } } // 如果队列为空则 Sleep 1s if (resource.llen(listKey) == 0) { TimeUnit.SECONDS.sleep(1) } } } catch { case e: Exception => { e.printStackTrace() resource.close() // 关闭资源 collectData(out) // 出现异常递归重启,保证 Source 的可用性 } } }collectData 为 run 方法的主类,主要用于消费队列并产出数据到下游,有几个点需要注意:A.获取 Jedis 实例由于本例中使用 JedisPool,所以这里采用 pool.getResource 的方式获取 Jedis,网上也有一些 demo 在这里直接初始化单独 JedisClient,这里采用 pool 的形式主要是考虑到稳定性的情况。B.While True这里没啥太多说的,通过 while true 实现不间断的消费队列数据。C.Sleep(1)这里考虑到队列为空时,如果频繁 while true 访问 Jedis 会造成高 QPS 且无用操作,所以这里检测到队列为空时会加入 1s 的延迟,这个时间也可以根据自己的任务场景灵活修改。D.Try Catch + 递归try-catch 逻辑内除关闭 resource 外还递归调用了 CollectData,该方法参考了 Spark-Streaming 的 receiver 方法,在故障时能够重新申请 redis 连接并重新读取数据,保证 Source 的稳定性,Spark-Streaming 的 receiver 可以参考:Spark Streaming Receiver restart 重启。4.合并多条数据源由于数据量较大,所以数据分布式的写入到多台 Redis 队列中,上面已经实现了 JedisSource,下面则需要将多台 Redis 分别接入 JedisSource 并绑定合成一个统一的 Source。var dataStream: DataStream[String] = null // 初始化原始流 // 遍历多台 Redis 的 Host && Port testHostAndPorts.foreach { case (host, port) => { if (dataStream == null) { dataStream = env.addSource[String](new JedisPoolSourceTest(host, port)) } else { val newStream = env.addSource[String](new JedisPoolSourceTest(host, port)) dataStream = dataStream.union(newStream) // union 合并流 } } }通过 var 构造可变变量,然后不断将 RedisSource union 到 DataStream 中,最终形成统一流。三.测试多 Redis Source1.启动多台 client启动多个 Jedis 连接并向队列 key 推数据:redis-cli -h $host -p $port在不同客户端分别执行:lpush testListKey 0,1,2,3,4 lpush testListKey A,B,C,D,E2.主函数 main主函数逻辑比较简单,主要就是接受队列数据,"," split 并将每个元素输出,最终 print sink 到标准输出。def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 执行环境 // 合并多流 var dataStream: DataStream[String] = null testHostAndPorts.foreach { case (host, port) => { if (dataStream == null) { dataStream = env.addSource[String](new JedisPoolSourceTest(host, port)) } else { val newStream = env.addSource[String](new JedisPoolSourceTest(host, port)) dataStream = dataStream.union(newStream) } } } // 标准输出 dataStream.print() // 执行 env.execute()上面通过 2 台 client 分别 lpush 了数据,这边接收测试也没有问题: 编辑四.尝试与优化1.多线程的尝试上面采用 JedisPool + 单 Resource 的形式进行接受数据,其实最早使用 JedisPool 是为了接入多线程 Source,即在 run 方法内使用线程池提高生产效率:def addTask(poolSize: Int, jedisPool: JedisPool, sourceContext: SourceFunction.SourceContext[String]): Unit = { val executor = Executors.newFixedThreadPool(2) // 初始化线程池 (0 until poolSize).foreach(epoch => { executor.submit(new Runnable { override def run(): Unit = { val resource: Jedis = jedisPool.getResource while (true) { val info = resource.lpop(listKey) if (info != null) { info.split(",").foreach(sourceContext.collect) } if (resource.llen(listKey) == 0) { TimeUnit.SECONDS.sleep(1) } } } }) }) }这里 Runnable 和上面单 Resource 接队列消费逻辑相同,实现后发现可以多线程读取到 Jedis 队列的内容,随后程序发生堵塞,经过调试发现任务卡在 sourceContext.collect,不知道是不是 sourceContext 不支持多线程,有了解的同学也可以评论区讨论一下~2.空闲数据源的优化上述多台 redis 队列并不能保证全天无间断都有数据产出,在 EventTime 场景下,会出现流数据空闲的状态从而影响 WaterMark 的更新影响整个任务的时间推进,为了解决这个问题,可以使用新版的时间戳策略即 WatermarkStrategy:WatermarkStrategy .forBoundedOutOfOrderness[T](Duration.ofSeconds(60)) .withIdleness(Duration.ofMinutes(1))通过 org.apache.flink.api.common.eventtime.WatermarkStrategy 的 withIdleness 方法对数据流进行标记,当数据源在 Duration 规定的时间内未产出数据时将该 Source 标记为空闲状态,这样下游的数据也不需要等待该 Source 的 WaterMark 从而保证数据流的正常推进,待有新数据推入 Source 中,该数据流切换为活跃状态,重新向下游发送其真实 WaterMark 水印。3.数据量过大对于数据过多或过大的场景,lpop 可能会有一定的读取和网络压力,这时候可以采用 lrange + ltrim 的批量读取逻辑,将单条访问改为批量访问,减少 redis 读取压力。其思想与 hscan 代替 hgetAll 批量访问过大数据有一定类似,都是采取分治的思想,只不过前者是由一到多,后者是化整为零。五.总结绑定多台 Redis 源上线后,任务没有问题且支持空流处理,除了 Redis Source 外,还有 Redis Sink 相关的实现,大家可以参考: Flink / Scala - 使用 RedisSink 存储数据,这里使用 SharedJedisPool 代替了 Flink 自带的 RedisCommandsContainer,后续也会单独出一期 RedisCommandsContainer 的 Flink-Jedis 教程。编辑可以看到多台 Redis Source 上线后,这个作业图实在是 🚄。
一.引言Flink 支持增加 DataStream KeyBy 之后 conncet BroadCastStream 形成 BroadConnectedStream,广播流内数据一般为不间断更新的上下文信息,在本例中,需要针对数据流中的用户信息,基于用于信息 + 广播流内的物料库实现推荐逻辑,针对 BroadConnectedStream 流,需要实现 KeyedBroadCastProcessFunction 完成用户流与广播流的处理,主要方法为:ProcessElement - 根据用户流生成用户信息,根据物料库进行推荐ProcessBroadcastElement - 获取物料库,并同步至 Context编辑由于任务启动时第一批物料库生成需要一定时间,而用户流则源源不断,从而导致物料库生成之前的来的用户都没有物料库进行推荐,为了保证不遗漏用户推荐,这里需要实现数据等待逻辑,让先到的用户流等待广播流的物料库生成完毕再进行推荐,从而保证不遗漏用户。 二.While True 尝试一开始尝试带入离线的思维,既然物料库未生成无法完成推荐,则进行 while 判断和 TimeUnit 时间等待,重复判断物料库是否生成并造成线程阻塞,待物料库生成完毕再开始推荐,好处是保证不丢弃一个用户,坏处是前期需要线程堵塞,如果用户流数据过大则背压严重。override def processElement(bs: BatchSendInfo, readOnlyContext: KeyedBroadcastProcessFunction[Int, BatchSendInfo, MaterialDataBase, SendInfo]#ReadOnlyContext, collector: Collector[SendInfo]): Unit = { materialDataBase = readOnlyContext.getBroadcastState(materialDBDescriptor).get("MaterialDBContext") // DB // 第一批造成堵塞,知道物料库生成 while (materialDataBase == null) { TimeUnit.SECONDS.sleep(60) materialDataBase = readOnlyContext.getBroadcastState(materialDBDescriptor).get("MaterialDBContext") // DB } val sendInfos = RankUtil.batchRank(bs.userObjects, materialDataBase) sendInfos.foreach(collector.collect) } override def processBroadcastElement(db: MaterialDataBase, context: KeyedBroadcastProcessFunction[Int, BatchSendInfo, MaterialDataBase, SendInfo]#Context, collector: Collector[SendInfo]): Unit = { val broadCastValue: BroadcastState[String, MaterialDataBase] = context.getBroadcastState(materialDBDescriptor) // 更新 DB if (db.isValid) { broadCastValue.put("MaterialDBContext", db) } }BatchSendInfo 内存储一批待推荐的用户类,下述统称 UserObject,我的思路是 whilt true 检查物料库是否生成,未生成则等待 60s 再重新从 readOnlyContext 上下文中获取,待物料库不为 null 时执行 BatchRank 的批量排序逻辑,看上去很美好,但是实践后得到的是死循环。原因分析:对于当前处理的 bs: BatchSendInfo,其 context 在 processBroadcastElement 后已经不再更新,我理想化的情况是等到新的 MaterialDataBase 传输后在这里更新 context,但是由于 context 在当前 processFunction 内不再更新,所以我的 while true 是死循环,所以这个方案 pass,这个方案只能适用于 MaterialDataBase 在另外线程生成并能更新到当前线程的场景。三.ValueState 缓存尝试 👍还有另外一种方法,就是当物料库不可用时,将先到的数据存到 ValueState 中并设置延时处理,延时时长可以设定为物料库初始化时间左右,待 onTimer 时判断物料库状态,如果物料库初始化成功则执行推荐逻辑,未成功则继续存储至 ValueState,其实本质上和 While True 类似,只不过变成一直存储了,缺点是如果前期数据过多会造成缓存量较大,不过可以通过加大 Heap 或者采用 RocksDB 轻松解决。override def processElement(bs: BatchSendInfo, readOnlyContext: KeyedBroadcastProcessFunction[Int, BatchSendInfo, MaterialDataBase, SendInfo]#ReadOnlyContext, collector: Collector[SendInfo]): Unit = { materialDataBase = readOnlyContext.getBroadcastState(materialDBDescriptor).get("MaterialDBContext") // DB val lastBatchUserObject = state.value val combineBS = if (lastBatchUserObject == null) { bs } else { val allUser = new ArrayBuffer[DpaUserObject]() allUser ++= bs.userObjects allUser ++= lastBatchUserObject.userObjects BatchSendInfo(allUser.toArray, readOnlyContext.getCurrentKey) } if (materialDataBase == null) { // 物料库不可用 readOnlyContext.timerService.registerEventTimeTimer(System.currentTimeMillis() + expireTime) state.update(combineBS) } else { // 物料库可用 val sendInfos = RankUtil.batchRank(combineBS.userObjects, materialDataBase) sendInfos.foreach(sendInfo => { collector.collect(sendInfo) }) } } override def processBroadcastElement(db: MaterialDataBase, context: KeyedBroadcastProcessFunction[Int, BatchSendInfo, MaterialDataBase, SendInfo]#Context, collector: Collector[SendInfo]): Unit = { val broadCastValue: BroadcastState[String, MaterialDataBase] = context.getBroadcastState(materialDBDescriptor) // 更新 DB if (db.isValid) { broadCastValue.put("MaterialDBContext", db) } }ProcessBroadcastElement 方法未改变,只是修改了 ProcessElement 方法:A.lastBatchUserObject 判断当前 key 是否存在已经缓存的批用户B.CombineBS 用户合并当前 key 需要处理的用户批C.如果物料库为 null,则将当前批用户存入 ValueState 并设置 expire 过期时间,这个时间可以基于你物料库生成时间,例如物料库正常情况下50s生成,则设置60s过期,保证到期后物料库可用,不需要持续缓存D.如果物料库已经可用则直接执行 BatchRank 推荐逻辑所以这里主要就两件事,合并批用户,判断物料库状态决定批用户是存储还是计算。除了 Process 函数,还包含 onTimer 函数:override def onTimer(timestamp: Long, ctx: KeyedBroadcastProcessFunction[Int, BatchSendInfo, MaterialDataBase, SendInfo]#OnTimerContext, out: Collector[SendInfo]): Unit = { val batchBS = state.value() materialDataBase = ctx.getBroadcastState(materialDBDescriptor).get("MaterialDBContext") // DB if (!batchBS.equals(null) && !materialDataBase.equals(null)) { // 物料库可用,批量下发 val sendInfos = RankUtil.batchRank(batchBS.userObjects, materialDataBase) sendInfos.foreach(sendInfo => { out.collect(sendInfo) }) } else { // 清除状态 state.clear() } }onTimer 单独处理到期的批用户,这里重新获取 materialDataBase,如果批用户和物料库都不为 null 则执行批推荐逻辑,否则清理批用户 state.clear(),我这里会损失数据,如果不想损失数据则将 else 逻辑修改为与 ProcessElement 一致,如果物料库经过 expireTime 还未成功,则继续缓存数据,直到下一个 expireTime 周期,循环往复:context.timerService.registerEventTimeTimer(System.currentTimeMillis() + expireTime) state.update(combineBS)四.总结上述代码中的 BatchSendInfo 可以看做是自己的 Source 类,MaterialDataBase 可以看做是自己的广播流上下文,面对需要等到广播流初始化完毕的需求则修改上述对应代码即可,expireTime 则根据广播流变量初始化时间进行设定,缓存方法本地测试缓存159批数据,到期处理159批数据,延迟和存储要求都不高,非常的奈斯~
一.引言大量 id 场景下经常需要通过 id 进行 AB Test,最常见的就是使用尾号 hash 进行分组,但是由于 id 生成规则以及其他因素,按照尾号分组往往会造成 id 不匀,从而导致 AB Test 效果受影响,所以下文采用 md5 加盐 Hash 的方式,得到更均匀的分组与 AB Test 效果。二.实现原理1. id 加盐id 即为用户 uid 或商品 pid,加盐中盐代表盐值,可以指定为任一质数,id 加盐可以理解:saltNum: Int + id: String => String2.MD5 编码通过 MD5 编码将上述加盐的 id 进行编码处理,获取加密后的字节形式,md5 包采用 java 自带的:import java.security.MessageDigest val md5 = MessageDigest.getInstance("MD5") val encoded = md5.digest(saltNum + id)3.字节转16进制将加密后的每个字节转16进制,转换16进制采用 org.apache.commons 自带工具包:import org.apache.commons.codec.binary.Hex val encodeStr = Hex.encodeHexString(encoded)也可以直接使用 String 自带的 format 方法实现转换16进制:val encodeStr = encoded.map("%02x".format(_)).mkString("")4.16进制转10进制将16进制数字截取 TopN,然后将16进制转换为10进制val num = java.lang.Long.parseLong(encodeStr.slice(0, N), 16).toString直接取 TopN 并通过 parseLong 得到新的 10 进制数字。5.Hash 获取新分组通过新的十进制数字取尾号 hash,获取新的分组,上面得到 10 进制数字 num,可以再使用尾号划分,例如对倒数两位取 mod,即可得到 100 个分组,对倒数三位取 mod,即可得到 1000 个分组,依次类推。6.完整实现A.MD5 Hashdef md5Encode(id: String, saltNum: Int, N: Int): String = { val input = saltNum + id // 加盐 val md5 = MessageDigest.getInstance("md5") val encoded = md5.digest(input.getBytes) // md5 编码 val encodeStr = Hex.encodeHexString(encoded) // 转16进制 val num = java.lang.Long.parseLong(encodeStr.slice(0, N), 16).toString // 转10进制 val group = num.slice(num.length - 2, num.length).toInt % 100 // hash group.toString }B.Common Hashdef commonHash(num: String): String = { val group = num.slice(num.length - 2, num.length).toInt % 100 // hash group.toString }三.效果评估对 uid、pid 重新分组主要是为了提高 AB Test 的置信度,而且涉及到工程实现即每个 id 都需要获取对应的 group,所以下面从:-> id 分组均匀程度-> id 分组AB效果程度-> 分组速度三个方面进行评估。1.分组均匀程度由于 uid、pid 为系统生成,一定程度上不能做到完美的 hash 均分,所以需要重 hash 解决,下面分别使用 MD5 Hash 与 Common Hash 做 id 数的分析,指标: [分组 id 数 - 分组 id 平均数]编辑绿线为 MD5,红线为 CommonHash,可以看到 MD5 得到的 100 个分组 id 数相对 CommonHash 分组均匀很多,前者 Std 为 1100+,后者 Std 达到 4000+。2.分组效果均匀程度分组均分后,还要验证下效果是否一致,如果 id 数相同但是同组的 id 表现差异很大,对 AB Test 也会造成很大影响,这里采用 Pid 的销售额作图,指标: [pid 销售额 - pid 销售额均值]编辑绿线为 MD5,红线为 CommonHash,可以看到 pid 在 MD5 hash 后整体表现均匀,而原始的 CommonHash 则存在个别组出现极端坏数据的情况,影响 AB Test。3.分组速度构造 10000 个 id 模拟 Pid,打印执行时间比较:val random = scala.util.Random val testId = (0 to 10000).map(x => random.nextLong()).toArray val st = System.currentTimeMillis() testId.foreach(num => { // md5Encode(num.toString, saltNum, N) commonHash(num.toString) }) println(s"cost: ${System.currentTimeMillis() - st}")编辑MD5 耗时 220ms / 10000,CommonHash 耗时 45ms / 10000,前者大约是后者的 5 倍,但是均匀到 id 上 0.022 ms / id 的耗时也是可以接受的,所以耗时虽然比 CommonHash 慢5倍,但是工业场景下也基本不受影响。四.总结经过上面的分析,该使用什么分组 AB Test 不用我说了吧。
一.引言CountWindow 数量窗口分为滑动窗口与滚动窗口,类似于之前 TimeWindow 的滚动时间与滑动时间,这里滚动窗口不存在元素重复而滑动窗口存在元素重复的情况,下面 demo 场景为非重复场景,所以将采用滚动窗口。二.CountWindow 简介编辑这里最关键的一句话是: A Window that represents a count window. For each count window, we will assign a unique id. Thus this CountWindow can act as namespace part in state. We can attach data to each different CountWindow. 翻译的意思是:表示计数窗口的窗口。对于每个计数窗口,我们将分配一个唯一的 id。因此,此计数窗口可以作为状态中的命名空间部分。我们可以将数据附加到每个不同的 CountWindow。编辑countWindow 共分为两种初始化方式,其中只添加 size 为滚动窗口,即不重复元素的 CountWindow,带 slide 滑动参数则生成滑动窗口,不在本文讨论范围之内。上面提到了对于每个计数的窗口,我们分配一个唯一的 id,这个 id 可以近似看做是以 keyBy 为依据进行数据分流,下面示例将给出解答。 三.任务场景与实现 👍通过自定义 Source 实现无限数据流,我们希望指定 count = N,使得每 N 个数据作为一个 batch 触发一次窗口并计算。1.自定义 Sourcecase class info(num: Int, id: Int)自定义 Source 需要继承 RichSourceFunction,这里我们生成 info 类数据结构,其内部包含两个元素 num 与 id,num 代表其内部的数字信息,id 代表其 keyBy 的分组信息:class SourceFromCollection extends RichSourceFunction[info] { private var isRunning = true var start = 0 override def run(ctx: SourceFunction.SourceContext[info]): Unit = { val random = new Random() // 生成随机 id while (isRunning) { (start until (start + 100)).foreach(num => { ctx.collect(info(num, random.nextInt(4))) // 随机id范围 0,1,2,3 if (num % 20 == 0) { // 每生产20个数据 sleep 1s TimeUnit.SECONDS.sleep(1) } }) start += 100 // 数值累加 } } override def cancel(): Unit = { isRunning = false } }Source 函数每 s 生成 20 个数字和其对应的 randomId,最终输出 info(num, id) 类。2.执行流程主函数主要基于 Source 实现 ProcessWindowFunction,并制定 count=20,每20个元素触发一次窗口计算:val env = StreamExecutionEnvironment.getExecutionEnvironment val dataStream = env.addSource(new SourceFromCollection) // 添加 Source dataStream .keyBy(info => info.id) // 按 random 生成的 id 分配到不同 CountWindow .countWindow(20) // size = 20 的滚动窗口 .process(new ProcessWindowFunction[info, String, Int, GlobalWindow] { override def process(key: Int, context: Context, elements: Iterable[info], out: Collector[String]): Unit = { // 输出 id + size + 元素 val log = key + "\t" + elements.size + "\t" + elements.toArray.map(_.num).mkString("|") out.collect(log) } }).print() env.execute()其中 ProcessWindowFunction 共有四个参数:编辑IN: 输入类型,本例数据流为 DataStream[info],所以 classOf[In] 为 infoOUT: 输出类型,与 Collector 后类型一致,这里输出类型为 StringKEY: Key 即为 keyBy 的 id,这里 key 为 random.nextInt,所以为 int 类型W: org.apache.flink.streaming.api.windowing.windows.window,这里主要分为两类,如果采用时间窗口即 TimeWindow,则对应类型为 TimeWindow,本例中采用 CountWindow,则对应类型为 GlobalWindow。3.运行结果批处理每一批 size = 20,分别输出 id + "\t" + size + "\t" + 批次数据,可以看到每一批触发20条数据,且 id 分别为 0,1,2,3,这里还是根据 process 的并行度来定,如果 process 的并行度设定为2,则很大概率 0,1,2,3 均分至两台 TaskManager 上,如果规定并行度为4,则分别分配到单台 TaskManager 上,也可以根据数据的吞吐,修改并行度与 keyBy 时 nextInt 的范围。编辑四.CountTrigger1.自定义 CountTrigger之前提到过 CountTrigger 也可以实现按 count 数目进行窗口触发,但是有一点不同是 CountTrigger 不会清除窗口内元素,所以多次执行逻辑会重复处理一批数据,具体实现逻辑解析可以参考: Flink - CountTrigger && ProcessingTimeTriger 详解。class SelfDefinedCountTrigger(maxCount: Long) extends Trigger[String, TimeWindow] { // 条数计数器 val countStateDesc = new ReducingStateDescriptor[Long]("count", new ReduceSum(), classOf[Long]) // 元素过来后执行的操作 override def onElement(t: String, l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { // 获取 count state 并累加数量 val count = triggerContext.getPartitionedState(countStateDesc) count.add(1L) // 满足数量触发要求 if (count.get() >= maxCount) { // 首先清空计数器 count.clear() val dateFormat = new SimpleDateFormat("yyyy-MM-dd:HH-mm-ss") val cla = Calendar.getInstance() cla.setTimeInMillis(System.currentTimeMillis()) val date = dateFormat.format(cla.getTime) println(s"[$date] Window Trigger By Count = ${maxCount}") TriggerResult.FIRE } else { TriggerResult.CONTINUE } } override def onProcessingTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { TriggerResult.CONTINUE } override def onEventTime(l: Long, w: TimeWindow, triggerContext: Trigger.TriggerContext): TriggerResult = { TriggerResult.CONTINUE } override def clear(w: TimeWindow, triggerContext: Trigger.TriggerContext): Unit = { val count = triggerContext.getPartitionedState(countStateDesc) count.clear() } }2.运行结果之前介绍 Window 触发 CountTrigger 时也实现了基于 Count 的窗口触发机制,但是存在一个问题,CountTrigger 每次达到 count 数量触发,但是不会清除窗口数据,即窗口数据累加同时多次触发窗口:编辑如上,窗口逻辑为计算批次数据的最大最小值,同一个颜色框内为 count=30 多次触发的结果,可以看到 min 一直为同一个数字,max 持续增大,这就是上面提到的问题: 使用 countTrigger 时会造成窗口数据重复触发,所以想要实现无重复 CountWindow 就得最上面的 countWindow 实现。当然上面的执行逻辑也并不是没有使用场景,例如电商平台统计一段时间内商品销售的情况就可以使用 CountTrigger,实时滚动大屏数据展示。五.CountWindow 完整代码下面附上完整代码:package com.weibo.ug.push.flink.Main import org.apache.flink.api.scala.createTypeInformation import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction} import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction import org.apache.flink.streaming.api.windowing.windows.GlobalWindow import org.apache.flink.util.Collector import java.util.concurrent.TimeUnit import com.weibo.ug.push.flink.Main.TestCountWindow.info import scala.util.Random object TestCountWindow { case class info(num: Int, id: Int) def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val dataStream = env.addSource(new SourceFromCollection) dataStream .keyBy(x => x.id) .countWindow(20) .process(new ProcessWindowFunction[info, String, Int, GlobalWindow] { override def process(key: Int, context: Context, elements: Iterable[info], out: Collector[String]): Unit = { val log = key + "\t" + elements.size + "\t" + elements.toArray.map(_.num).mkString("|") out.collect(log) } }).print() env.execute() } } class SourceFromCollection extends RichSourceFunction[info] { private var isRunning = true var start = 0 override def run(ctx: SourceFunction.SourceContext[info]): Unit = { val random = new Random() while (isRunning) { (start until (start + 100)).foreach(num => { ctx.collect(info(num, random.nextInt(4))) if (num % 20 == 0) { TimeUnit.SECONDS.sleep(1) } }) start += 100 } } override def cancel(): Unit = { isRunning = false } }
一.引言Bing 首页的壁纸好看且每日更新,下面介绍如何使用 python 每日自动获取壁纸并保存。编辑二.手动获取自动获取前先介绍下如何手动获取,主要是了解壁纸的网页形式。1.打开开发者模式可以直接 F12 快捷键进入开发模式,在右侧栏中找到 s.cn.bing.net 选项编辑2.打开新的 Tab双击对应位置选择 open in new Tab编辑 即可得到完整壁纸,右键选择另存为即可保存至指定位置:编辑Tips:这里获取的网址连接为:https://s.cn.bing.net/th?id=OHR.LongsPeak_EN-CN6019073969_1920x1080.jpg&rf=LaDigue_1920x1080.jpg其中 https://s.cn.bing.net 为前缀,/th?id=OHR.LongsPeak_EN-CN6019073969_1920x1080.jpg&rf=LaDigue_1920x1080.jpg 为图像后缀,二者拼接即可得到壁纸地址,后续通过 python 爬虫也是基于该地址获取壁纸。三.自动获取1.官方APIBing 官方提供 API 获取线上网页壁纸:官方APIhttps://cn.bing.com/HPImageArchive.aspx?format=js&idx=0&n=1&mkt=zh-CN 主要有 format、idx、n、mkt 四个参数:参数含义format返回数据形式 js - json xml - xmlidx截止天数 0-今天 -1 - 截止至明天 1 截止至昨天n返回数量 mkt地区 zh-CN - 国区测试过程中发现 n 的数量总是返回1。2.Postman 调用接口使用 Postman Get Api 查看下接口返回 json 的大致形式,没有 postman 也不影响后续获取壁纸,获取壁纸只需要 python 即可。编辑可以看到当前壁纸的详细信息,壁纸对应的地址是:"落基山国家公园的朗斯峰,科罗拉多州 (© Andrew R. Slaton/Tandem Stills + Motion),非常的漂亮。其 images 数组内还包含 url ,该 url 形式为:"url": "/th?id=OHR.LongsPeak_ZH-CN5927119555_1920x1080.jpg&rf=LaDigue_1920x1080.jpg&pid=hp"与我们刚才手动寻找打开的 tab 地址只差前缀 https://s.cn.bing.net:https://s.cn.bing.net/th?id=OHR.LongsPeak_EN-CN6019073969_1920x1080.jpg&rf=LaDigue_1920x1080.jpg所以 python 的执行逻辑比较清晰:A.调用 API 获取 JsonB.通过 Json 获取壁纸地址,拼接前缀得到最终壁纸地址C.将对应 content 生成 jpg 保存至本机3.Python 实现#!/usr/bin/python # -*- coding: utf-8 -*- import requests import json headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/56.0.2924.87 Safari/537.36", "Connection": "close", } def dumpBingWallpaper(): # 解析 URL n = 1 idx = 1 url = "https://www.bing.com/HPImageArchive.aspx?format=js&idx={}&n={}".format(idx, n) res = requests.get(url, headers=headers) res.encoding = 'utf8' jsonData = json.loads(res.text) uri = jsonData['images'][0]['url'] # 获取图像地址与信息 img = requests.get("https://s.cn.bing.net/" + uri, headers=headers).content desc = str(jsonData['images'][0]['copyright']).split(",")[0] dt = jsonData['images'][0]['startdate'] # 输出地址 output = '/Users/xudong11/Desktop/{}.jpg'.format(desc + "_" + dt) out = open(output, 'wb') out.write(img) out.close() if __name__ == "__main__": dumpBingWallpaper()通过 copyright 和 startdate 获取图像简介与日期作为输出图像的名称,运行后在指定位置获取目标壁纸:编辑Tips:通过 chorm 开发者工具获取图像信息的方法需要引入开发者工具包,有兴趣的同学也可以实现:from selenium import webdriver options = webdriver.ChromeOptions() Chrome = webdriver.Chrome(options=options, desired_capabilities=capabilities) wait = WebDriverWait(Chrome, 5)4.定时执行A.定时脚本首先添加定时 shell 脚本 run.sh,PWD 为 python 所在文件夹目录:#!/bin/bash path=${PWD} cd $path python DumpBingPic.pyB.crontab 定时启动20 11 * * * 代表每天 11:20 下载 Bing 图片:20 11 * * * source ~/.bash_profile && cd ${PWD} && sh run.sh编辑无需手动运行脚本,每天定时保存 bing 壁纸,非常的奈斯👍四.Windows 聚焦除了 Bing 壁纸,之前也整理过 windows 聚焦的开机壁纸获取方式,并最终生成 .exe 的可执行文件,windows 的同学有兴趣也可以继续参考:Python 提取Windos聚焦的登陆图片,运行 exe 即可获取最近 windows 聚焦的开机壁纸:编辑
一.引言使用 spark.hadoopFile 读取 RCFile 时,报错 Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/crypto/key/KeyProviderTokenIssuer 与 Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/hadoop/fs/BatchListingOperations,修改依赖解决对应问题。二.KeyProviderTokenIssuer由于报错原因为 java.lang.NoClassDefFoundError 而不是 NoSuchMethod,所以基本排除是依赖冲突的高低版本的问题,如果不放心也可以使用编辑器进行全局搜索,这里全局搜索是没有该类的:编辑确实没有该类,提高 hadoop-common 消除该报错:<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.1.0</version> <scope>provided</scope> </dependency>此时代码已有该类:编辑三.BatchListingOperations修改 pom 后再次运行任务,再次提示 NoClassDefFoundError,根据上面的经验,这里判断还是没有依赖,所以再次修改 hadoop 版本。编辑hadoop-common 提高至 3.3.3:<dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>3.3.3</version> <scope>provided</scope> </dependency>再次运行任务没有问题:编辑四.判断与排查1.如何判断没有依赖还是依赖冲突一般情况下:NoSuchMethod -> 依赖冲突NoClassDefFoundError ->A. 报错有自己代码的异常栈:对应 Class / Object 初始化失败B. 报错均为官方异常栈:无对应依赖2.最简单的确认方式在 idea 或者 maven-tree 中查看报错对应依赖,如果有多个,大概率是依赖冲突,如果一个也搜不到则为无对应依赖,像这样快速查找,即可确定是单个,多个还是没有对应 class。编辑3.如果寻找对应版本博主一般直接将报错贴至 Bing 国际版,像上面的错就可以直接得到版本。编辑编辑4.最确切的版本定位如果要寻找项目真实的版本,可以寻找底层代码的 GitHub,查看你当前调用版本 API 所在 branch 的 pom.xml,其内部的版本即为最确切的版本。例如之前在 Spark - ml.dmlc.xgboost4j / spark 版本匹配与 NoSuchMethodError 解决 一文中我们通过排查源码 pom.xml,才最终定位到当前 spark 版本与 xgboost 指定 spark 版本不一致导致依赖冲突。编辑
一.引言Flink 程序增加 readFile 生成文件流后,最初运行期间 CheckPoint 存储没有问题,待文件流 Finished 后 CheckPoint 存储报错: checkpoint Failure reason: Not all required tasks are currently running,下面分析并解决下。编辑二.错误分析与解决1.问题排查Flink 场景下实现多流 uinon 并存储 ValueState,除了实时流的内容外,还需要将一个固定文件的内容进行 union 处理,所以出现如下 Overview:编辑最左边的 Source: Custom File Source 就是单独增加的文件流,由于是固定文件且逻辑简单,所以执行开始时 Busy 打的比较高,在此期间查看 RockerDB 指定 path 下存储的 chk 是没有问题的。编辑但是由于固定文件内数据量有限,处理完毕后,该 File Source 由 Running 切换至 Finished 状态:编辑此时由于有 Finshed 的节点,所以会空闲一些 Task,这时候在看 checkpoint 存储的报错原因:Not all required tasks are currently running问题应该就出在这里了,由于 File Source 执行完毕后状态转为 Finished,从而导致有 task 状态转换,而存储 checkpoint 需要所有 task 处于 Running 状态,从而导致存储 checkpoint 报错,不过这里存储出问题,不会影响任务的执行。编辑 出错后 checkpoint 大小变为 0。 2.问题分析问题:SourceFile 执行完毕由 Running 切换至 Finished 导致 checkpoint 执行失败解决:只需让 SourceFile 保持 Running 状态即可编辑readFile 支持额外参数 watchType 与 interval,博主之前在在Flink - DataStream 获取数据总结一文中对下述参数进行了分析,这里直接搬运:编辑WatchType 分为 PROCESS_CONTINUOUSLY 和 PROCESS_ONCE。PROCESS_CONTINUOUSLY : 根据 interval 间隔扫描文件检测其状态,当文件被修改后会重新处理文件内容,这将打破 exactly-once 语义。PROCESS_ONCE:示例中默认使用该模式,该模式下只读取一遍随后任务结束。关闭 source 会导致在那之后不会再有检查点。3.问题解决通过前两步的分析,解决任务的办法很简单了,只需在 readFile 时修改 WatchType 为 PROCESS_CONTINUES 即可保证 Source File 处于 Running 状态。.readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(path), schema), path)修改后 ↓↓↓ .readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(path), schema), path, FileProcessingMode.PROCESS_CONTINUOUSLY, 86400L)修改后再次提交任务:编辑PROCESS_ONCE 执行时 Source File 5min 左右会进入 Finished 状态,修改后 30min 仍然处于 Running 状态。三.新的问题1.问题排查编辑上述修改提交后,程序正常运行一段时间后,整个执行视图丢失 WaterMark,程序不再 sendRecord 并卡住。可以看到 Co-Process-Broadcast-keyed 流接到 7770550 条数据但一条数据也没有发送,查看上面 window 执行窗口也没有 Watermark 显示,所以数据不发送原因应该是 WaterMark 不更新从而导致窗口不触发所以囤积数据。编辑再查看上述界面发现 Source File 的 RecordsReceive 和 RecordsSent 均为0,程序最开始处理文件是有内容的且这两个参数也有值,这里没值应该是重新扫描文件导致没有数据从而产生空流,进而导致 WaterMark 消失。没有 WaterMark 导致任务停滞可以参考:Flink - 新增 BroadcastStream 无 watermark 导致数据流异常。2.问题分析.readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(path), schema), path, FileProcessingMode.PROCESS_CONTINUOUSLY, 86400L)由于我的文件为固定文件即当天不会更新,所以我设置 Watch 检测文件的间隔为 86400,正好是一天的秒数,这里我怀疑文件重新扫描的 interval 参数我配置有问题,遂查看源码:编辑这里 sleep 采用 Thread.sleep 执行,其单位为 mills,所以这里如果设置一天应该改为 86400000 才对。3.问题解决 (👍)这里只需把 readFile 的 interval 参数调大即可:.readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(path), schema), path, FileProcessingMode.PROCESS_CONTINUOUSLY, 86400000L)当然,如果你的文件不是按天更新或者很长一段时间才更新,这个 interval 参数可以设置的很大,从而将对应 Thread 挂起不影响整体任务。调大该参数后,任务不再中途丢失 WaterMark 卡住,正常执行。编辑四.问题总结此次问题的发生还是对基本的 API 掌握不熟悉导致,从而会出现漏掉 WatchType 参数,写错 interval 参数单位的情况,还是要多多学习 API,查看源码与问题分析与记录。下面是源码基于 readFile 参数的解释,有需要的同学也可以到官方 API 系统学习一下。编辑虽然最终解决方案只是增加两个参数,但是分析与排查的过程也很重要。🤓
一.引言parquet 文件常见与 Flink、Spark、Hive、Streamin、MapReduce 等大数据场景,通过列式存储和元数据存储的方式实现了高效的数据存储与检索,下面介绍 Flink 场景下如何读取 Parquet。Parquet 相关知识可以参考:Spark - 一文搞懂 parquet。编辑二.Parquet Read By Scala1.依赖准备与环境初始化import org.apache.hadoop.fs.FileSystem import org.apache.flink.formats.parquet.ParquetRowInputFormat import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala._ import org.apache.parquet.hadoop.ParquetFileReader import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.Type.Repetition import org.apache.parquet.schema.{MessageType, PrimitiveType, Type}Flink 读取 parquet 除了正常 Flink 环境相关依赖外,还需要加载单独的 Parquet 组件:<dependency> <groupId>org.apache.parquet</groupId> <artifactId>parquet-avro</artifactId> <version>1.10.0</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-parquet_2.12</artifactId> <version>1.9.0</version> </dependency>本文基于 Flink-1.13.1 + scala-2.12.8 + hadoop-2.6.0 的运行环境,不同版本下可能需要更换上述 parquet 相关依赖。下面初始化 Flink ExecutionEnvironment,因为流式处理的原因,这里初始化环境类型为 Stream:val env = StreamExecutionEnvironment.getExecutionEnvironment2.推断 Schem 读取 Parquetparquet 通过列式存储数据,所以需要 schema 标定每一列的数据类型与名称,与 Spark 类似, Flink 也可以通过 Parquet 文件推断其对应 schema 并读取 Parquet。def readParquetWithInferSchema(env: StreamExecutionEnvironment): Unit = { val filePath = "./test.parquet" val configuration = new org.apache.hadoop.conf.Configuration(true) val parquetFileReader = ParquetFileReader.readFooter(configuration, new org.apache.hadoop.fs.Path(filePath)) val schema: MessageType = parquetFileReader.getFileMetaData.getSchema println(s"Schema: $schema") val hdfs: FileSystem = org.apache.hadoop.fs.FileSystem.get(configuration) val rowData = env.readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(filePath), schema), filePath).setParallelism(1) rowData.map(row => { val source = row.getField(1) val flag = row.getField(35) source + "\t" + flag }).setParallelism(1).print() }通过 parquetFileReader 获取元数据 MetaData 并获取 parquet 对应 schema,最终通过 env.readFile 方法指定 InputFormat 为 ParquetRowInputFormat 读取 parquet 文件,先看一下打印出来的 schema 形式:编辑由于读取的 parquet 为 SparkSession 生成,所以列名采用了 Spark 的默认形式 _c1,_c2 ...env.execute("ReadParquet")调用执行方法运行上述 print demo 打印最终结果。Tips:这里的 Row 类型为 org.apache.flink.types.Row 而不再是 org.apache.spark.sql.Row,获取元素的方法也不再是 row.getString 或其他,而是采用 getFiled 传入 position 或者 列名 得到,索引从 0 开始。编辑3.指定 schema 读取 Parquet除了 infer 推理得到 schema 外,读取也支持自定义 schema,与 spark 类似,这里也提供了 PrimitiveType 指定每一列的数据类型,并合并为 MessageType 得到最终的 schema。def readParquetWithAssignSchema(env: StreamExecutionEnvironment): Unit = { val filePath = "./test.parquet" val id = new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "_c0") val source = new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "_c1") val flag = new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "_c35") val typeArray = Array(id, source, flag) val typeListAsJava = java.util.Arrays.asList(typeArray: _*).asInstanceOf[java.util.List[Type]] val schema = new MessageType("schema", typeListAsJava) println(schema) val rowData = env.readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(filePath), schema), filePath).setParallelism(1) rowData.map(row => { val source = row.getField(1) val flag = row.getField(2) source + "\t" + flag }).setParallelism(1).print() }上面读取的 test.parquet 有 40+ col,这里只读取第 1,2,35 列,所以单独指定 id,source,flag 三列生成 PrimitiveType 并添加至 MessageType 形成 schema,由于 MessageType 为 Java 参数,所以需要通过 asList + asInstance 进行转化,看一下当前的 schema 情况:编辑env.execute("ReadParquet")调用执行方法执行上述 print 逻辑即可。Tips:这里列名给出了 _c0, _c1,_c35,但是读取是 position 索引只能选取 0,1,2,因为 schema 数量决定了读取 Row 的列数,而 schema 的列名决定了读取的内容,在该 schema 基础下读取 getField(35) 会报数组越界 java.lang.ArrayIndexOutOfBoundsException: 编辑三. Parquet Read By Javajava 读取与 scala 大同小异,主要差别是 map 变为 MapFunction,这里直接贴完整函数方法:import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.formats.parquet.ParquetRowInputFormat; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.types.Row; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.schema.MessageType; /** * @title: ReadParquetByJava * @Author DDD * @Date: 2022/7/21 8:36 上午 * @Version 1.0 */ public class ReadParquetByJava { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); String path = "./test.parquet"; Configuration configuration = new org.apache.hadoop.conf.Configuration(true); FileSystem hdfs = org.apache.hadoop.fs.FileSystem.get(configuration); ParquetMetadata parquetFileReader = ParquetFileReader.readFooter(configuration, new org.apache.hadoop.fs.Path(path)); MessageType schema = parquetFileReader.getFileMetaData().getSchema(); System.out.println("-----Schema-----"); System.out.println(schema); env.readFile(new ParquetRowInputFormat(new org.apache.flink.core.fs.Path(path), schema), path) .setParallelism(1) .map(new MapFunction<Row, String>() { @Override public String map(Row row) throws Exception { try { String source = String.valueOf(row.getField(1)); String flag = String.valueOf(row.getField(35)); return source + "\t" + flag; } catch (Exception e) { e.printStackTrace(); return null; } } }).print(); env.execute("ReadParquetByJava"); } }四.总结Parquet 通过其列式存储与空间压缩应用于多种大数据场景,上面给出了 parquet 文件转 DataStream 的两种方式,同理也可以使用 DataSet 加载为静态数据,上面两个方法都给出了 hdfs: FileSystem 变量但都没有使用,下面说下使用场景:一般分布式任务读取时对应的 parquet 文件不是一个而是多个,所以需要从目标目录中找出第一个合法的 parquet 文件供 ParquetFileReader 解析对应的 schema,hdfs 的任务就是通过目标路径获取第一个合法文件使用。def getFirstFilePath(hdfsPath: String, hdfs: FileSystem): String = { val files = hdfs.listFiles(new org.apache.hadoop.fs.Path(hdfsPath), false) var flag = true var firstFile = "" while (flag) { if (files.hasNext) { firstFile = files.next().getPath.getName if (!firstFile.equalsIgnoreCase(s"_SUCCESS") && !firstFile.startsWith(".") && firstFile.endsWith(".parquet")) { flag = false } } else { flag = false } } hdfsPath + "/" + firstFile }合法的判断需要三个条件:A.不包含 _SUCCESSB.不以 '.' 开头C.以 '.parquet' 结尾
一.引言 本文将安装 go 语言常用编辑器 GoLand,其与 idea、pycharm 同属 JetBrains 旗下,由于突破试用的限制,下面教程主要安装 2019 版 Goland + Go 1.15.x 版本,有高版本编译器或高版本 Go 语言需求的同学可以忽略后续,如果只是入门熟悉操作可以参考下面教程。二.安装 GoLand1.下载安装包官网下载地址:GoLand/download编辑博主本机为 Mac-mini (M1 2020),版本为 11.2,选择 Version 2019.3.x 版本,由于破解需要,建议下载 Version 2019。2.安装编辑双击 .dmg 安装包,将 GoLand 拖入 Applications:A.进入界面编辑B.选择试用选择 Evaluate for free 免费试用后进入正式界面。编辑C.创建项目编辑D.编辑界面编辑自此 GoLand 安装就结束了。三.Hello World上面步骤搞定后,GoLand 2019 的安装和激活就搞定了,下面进入激动人心的 Hello World 环节。1.创建 goFile编辑2.配置 SDK / GOROOT编辑创建文件后会显示没有配置 GOROOT 的 SDK,点击 Setup GOROOT:编辑选择 1.15.x 版本:编辑待右下角提示 Go 1.15.x Installed 即安装成功。编辑3.可能遇到的坑A.Unpacked SDK is corrupted这里如果选择高版本,例如 1.18.x、1.19.x 会提示下述错误,这是由于 GoLand 2019 与对应高版本 SDK 不匹配导致,重新安装老版本 SDK 即可。编辑B.不提示 Setup GOROOT如果未提示 GOROOT,则打开 Preferences,手动安装 GOROOT 即可。编辑C.网络问题如果 GoLand 网络出现问题无法 DownLoad,可以选择官网直接下载,官方下载链接。编辑下翻找到 Archived versions 选项点击即可获取全部版本。4.运行 HelloWorldpackage HelloWorld import "fmt" func main() { fmt.Printf("Hello World!") }输入上述代码后 main 函数显示为灰色,无法运行,需将 package 的 HelloWorld 换为 main:package main import "fmt" func main() { fmt.Printf("Hello World!") } 此时出现运行提示,点击运行按钮或者 Run 选项:编辑大功告成:编辑四.总结Go语言是Google开发的一种静态强类型、编译型、并发型,并具有垃圾回收功能的编程语言,2009年11月正式宣布推出,其针对 C++、java 等编程软件的缺点并整合,具备编译运行快、简单易用、自动垃圾回收的特点,广泛应用于多种场景,并替代了很多原有 C++、java、PHP 的开发语言。在大数据领域,由于 spark + flink 的体系非常完备,所以开发主要基于 java + scala,scala 偏向于更高阶的 API 与 更少的代码,go 偏向于更少的语法,因此二者在大数据的开发场景下也有较大差异。 配置编译器只是第一步,后面需要不断的学习。
一.引言使用 Flink 1.13.1 + scala 2.11.12 的组合进行 Flink 本地测试是,报错 .NoSuchMethodError: com.twitter.chill.java.Java8ClosureRegistrar.areOnJava8()Z,经过前面多次的 noSuchMethod 的折磨,现在已经轻车熟路,直接开始排查。二.错误分析1.字面含义编辑报错显示 com.twiter.chill.java.Java8ClosureRegistrar 类没有 areOnJava8()Z 这个方法,这个 Z 代表该函数返回值为 boolean,这里再顺便回顾下不同字符代表的函数返回值:ZbooleanBbyteCcharSshortIintJlongFfloatDdoubleL fully-qualified-classfully-qualified-class[ typetype[]( arg-types ) ret-typemethod type<init>构造方法<cinit>静态类初始化代码2.深度分析def isJavaLambda(klass: Class[_]): Boolean = Java8ClosureRegistrar.areOnJava8 && klass.getName().indexOf('/') >= 0既然没有 areOnJava8()Z 这个方法,我们首要的任务就是找到这个该方法所在类 Java8ClosureRegistrar 一看究竟,IDEA 可以快捷搜索直接锁定,可以看到项目内只要一个 Java8ClosureRegistrar.class,所以这里基本排除依赖冲突的问题,大概率是高低版本导致。编辑 如果不是 IDEA,也可以到 maven 库按照字符排序快速搜查:编辑查看 0.9.3 版本的 com.twitter.chill 的 Java8ClosureRegistrar 类,果然没有 areOnJava8()Z,由于我是用的 Flink 版本相对较高,所以大概率匹配了更高版本的 com.twitter.chill,直接进入 maven 库查找高版本 com.twitter.chill:编辑博主使用 scala 2.11,最新的 0.10.0 支持 2.11,所以直接尝试最新的:编辑3.问题解决<!-- https://mvnrepository.com/artifact/com.twitter/chill --> <dependency> <groupId>com.twitter</groupId> <artifactId>chill_2.13</artifactId> <version>0.10.0</version> </dependency>根据 maven 加载依赖的先后顺序,这里直接将新版本依赖加到 dependencies 的首位。在 maven 选项下执行 reload.Project:编辑reload 后任务执行正常:编辑三.总结编辑更新了 0.10.0 版本后,再次查看 Java8ClosureRegistrar 方法,发现依然没有 areOnJava8()Z 的方法,翻了下 GITHUB 上大神的解释:no such method error is a binary incompatibility problem. You seem to have incorrect versions of chill on the classpath.This happens when you have two dependencies that use different versions and the wrong one gets selected. I don't think we can fix this in the library itself. Please reopen if you feel this is a bug we can fix.大致翻译了一下:no such method error 是二进制不兼容问题。您似乎在类路径上有不正确的 chill 版本。 当您有两个使用不同版本的依赖项并且选择了错误的依赖项时,就会发生这种情况。 我不认为我们可以在 library 本身解决这个问题。也是似懂非懂的,可能 areOnJava8()Z 就是验证 java 环境的隐函数吧,有了解的童鞋欢迎科普。总的来说,上面这个错误大致分为两种情况:A.版本过低 B.依赖冲突。最后铺一下博主这两年遇到的 NoSuchMethod 的坑,有需要的大家也可以看一下解决过程,noSuchMethod 这个错误很痛苦,但是解决了很爽:Spark : local 模式 org.apache.hadoop.conf.Configuration.getPassword(Ljava/lang/String;)[CSpark: java.lang.NoSuchMethodError: com.alibaba.fastjson.JSONObject.getOrDefaultJava- MR 读写 orc 之 NoSuchMethodError: hive.ql.exec.vector.VectorizedRowBatch.getMaxSizeSpark - ml.dmlc.xgboost4j / spark 版本匹配与 NoSuchMethodError 解决除此之外,大数据常见异常解决 专栏里还有很多大数据相关的异常与对应解决方法,有兴趣的童鞋可以移步。
一.引言使用 linux 工作中经常遇到 zip,gz,tar 相关的压缩文件,下面整理下几种压缩文件的常用方法。后续示例将基于 TestDir 展开,其中包含一个名为 123 的文件,除此之外还有一个名为的 234 文件编辑二.zipZIP 文件格式一种数据压缩和文档储存的文件格式,原名Deflate,通常使用后缀名“.zip”ZIP,是一种相当简单的分别压缩每个文件的存档格式。1.压缩 zip A.相同路径下 文件、文件夹 压缩zip -r testDir.zip ~/Desktop/TestDir-r 代表 递归压缩子目录下所有文件,后面的两个参数分别代表待生成 xxx.zip 的名称与待压缩目录/文件地址,上述命令会将 TestDir 下所有文件、文件夹压缩至 testDir.zip 中编辑B.不同路径下 文件、文件夹压缩zip -r testDir.zip ~/Desktop/TestDir ~/Desktop/234将 TestDir 下所有内容和 234 文件同时压缩到 testDir.zip 中编辑压缩后在原基础上得到新的 .zip 文件 编辑2.向 .zip 文件添加、删除zip 可以轻松的添加与删除也得益于其采用分别压缩每个文件的方式进行压缩A.添加新建一个 345 文件,将该文件添加至上面生成的 testDir.zip 中zip -m testDir.zip 345编辑 B.删除将 345 文件从 testDir.zip 中删除zip -d testDir.zip 345编辑3.解压 zip unzip -d ddd testDir.zip将 testDir.zip 解压到 ddd 文件夹下,-d 参数表示压缩后显示生成文件完整路劲编辑 4.其他参数编辑下面是一些常用的 zip 指令:-c 将解压缩的结果显示到屏幕上,并对字符做适当的转换。 -l 显示压缩文件内所包含的文件。 -v 执行是时显示详细的信息。 -a 对文本文件进行必要的字符转换。 -b 不要对文本文件进行字符转换。 -C 压缩文件中的文件名称区分大小写。 -j 不处理压缩文件中原有的目录路径。 -L 将压缩文件中的全部文件名改为小写。 -n 解压缩时不要覆盖原有的文件。 -o 不必先询问用户,unzip执行后覆盖原有文件。 -P<密码> 使用zip的密码选项。 -q 执行时不显示任何信息。 -s 将文件名中的空白字符转换为底线字符。 -V 保留VMS的文件版本信息。 -X 解压缩时同时回存文件原来的UID/GID。5.简易压缩、解压 zipA.压缩直接右键选择压缩 xxx 文件即可得到 xxx.zip 文件编辑B.解压缩直接双击对应 .zip 文件即可实现解压三.gzgz文件是一种压缩文件,以 .gz 或者 .tar.gz(.tgz)为扩展名,多见于 Linux、UNIX,与 zip 不同,gz 只能压缩文件,而不能压缩文件夹1.压缩 gzgzip testDir直接压缩文件夹会提示下述信息 ❌ : 编辑A.压缩-不保留源文件gzip testDir/123执行命令后目录下的 123 文件变为 123.gz 编辑B.压缩-保留源文件如果想要保留源文件,可以采用数据流的形式进行压缩,-c 代表将解压后的文件输出到标准输出:gzip -c testDir/123 > TestDir/123.gz执行后 123 文件保留,并生成 123.gz 编辑2.解压 gzA.解压-不保留原gzgunzip TestDir/123.gz编辑 B.解压-保留原gzgunzip -c testDir/123.gz > TestDir/123编辑3.常用参数除了使用 -c 保留源文件外,也可以使用 -r 递归将文件夹下文件都处理为 gz:-a:使用ASCII文字模式。 -c:把解压后的文件输出到标准输出设备。 -f:强行解开压缩文件。 -l:列出压缩文件的相关信息。 -L:显示版本与版权信息。 -n:解压缩时,若压缩文件内含有原来的文件名称及时间戳记,则将其忽略不予处理。 -N:解压缩时,若压缩文件内含有原来的文件名称及时间戳记,则将其回存到解开的文件上。 -q:不显示警告信息。 -r:递归处理,将指定目录下的所有文件及子目录一并处理。 -S:更改压缩字尾字符串。 -t:测试压缩文件是否正确无误。 -v:显示指令执行过程。 -V:显示版本信息。4.常规解压MacOS 下直接双击 .gz 文件即可解压对应 .gz 文件四.tartar是 Unix 和 类 Unix 系统上文件打包工具,可以将多个文件合并为一个文件,打包后的文件名亦为“tar”。tar 代表未被压缩的tar文件。已被压缩的tar文件则追加压缩文件的扩展名,如经过gzip压缩后的tar文件,扩展名为“.tar.gz”,所以经常使用 tar 将文件夹下多个文件合并再使用 gz 压缩,这样就解决了 gz 不能压缩文件夹的问题,但是解压也需要先得到 tar,再将 tar 拆分。1.生成 tar A.tartar czvf test.tar ./testDir编辑 执行命令后将 testDir 内的文件合并至 tar编辑B.tar.gztar zcvf test.tar.gz testDir编辑 执行后生成 test.tar.gz 文件,注意 tar 文件只是将多个文件合并,其本身不具备压缩功能,需结合其他压缩方式对 tar 文件进行压缩编辑2.解压 tarA.tartar xzvf test.tar -C ddd编辑适应 xzvf + -C 实现标准输出,将 tar 文件解压到 ddd 文件夹中,注意这里使用 -C 而不是 -c编辑B.tar.gztar zxvf test.tar.gz -C dddd编辑 将 tar.gz 文件解压到 dddd 文件夹下 编辑3.其他参数-c:新建打包文件,同 -v 一起使用 查看过程中打包文件名 -x:解压文件,-C 解压到对应的文件目录。 -f:后面接要处理的文件 -j:通过bzip2方式压缩或解压,最后以.tar.br2 为后缀。压缩后大小小于.tar.gz -z:通过gzip方式压缩或解压,最后以.tar.gz 为后缀 -v:压缩或解压过程中,显示出来过程 -t:查看打包文件中内容,重点文件名 -u:更新压缩文件中的内容。 -p:保留绝对路径,即允许备份数据中含有根目录 -P:保留数据原来权限及属性。 --exclude =FILE 压缩过程中,不要讲FILE打包tar 主要有以下参数,其中 -c 代表打包,-x 代表解压缩包,以打包 tar 的 tar czvf 为例:c - 新建打包文件 z - 以 .tar.gz 结尾 v - 显示压缩过程 -f 要处理的文件4.常规解压不管是 tar 还是 tar.gz,MacOS 下都支持双击直接解压相关文件编辑五.rarRAR是一种文件压缩与归档的私有文件格式,用于数据压缩与归档打包,简单的说就是将原有的文件数据经过压缩处理之后保存为RAR文件格式后缀名,通常在 Win 环境下,适配为 WinRAR。zip、gz、tar MacOS 原始都支持,由于 rar 多用于 Win,所以 mac 上使用 rar 需要先安装。1.安装 rarA.获取源文件链接:https://pan.baidu.com/s/1ZG9Wk0pO7FebH1TH57meJw 密码:fojn编辑根据自己的电脑版本选择 arm 或者 x64 版本的压缩包并解压得到下述文件:编辑B.安装 rar、unrarcd 到上述目录的地址依次执行:sudo install -c -o $USER rar /usr/local/bin sudo install -c -o $USER unrar /usr/local/bin执行后在命令行输入 rar 或者 unrar,如果有下述显示则代表安装成功:编辑2.压缩 rarrar a testDir.rar testDir 将 testDir 文件夹压缩为 rar 格式编辑编辑3.解压 rarunrar x testDir.rar编辑 解压后源文件保留,并得到解压后的 testDir 文件编辑六.总结上面一次性介绍了 zip、gz、tar、rar 四种文件的压缩方式,下面简单测试下它们的压缩性能,这里找来一个 131 kb 的文件测试一下:编辑# zip zip -r testFile.zip testFile # gz gzip testFile # tar tar czvf testFile.tar testFile # rar rar a testFile.rar testFile执行命令后得到下述4中压缩文件:编辑格式filezipgztarrar大小(byte)9467111185110711135510407简单测试了下压缩效率相近,最节省空间的是 rar,由于日常生活中几种压缩格式主要用于办公文件,因此也不做过多区分。
一.引言使用 spark 读取 parquet 文件,共有 M 个 parquet 文件,于是启动了 P Executor x Q Cores 进行如下 WordCount 代码测试,其中 P x Q = M 即 Core 数目与 parquet 文件数一一对应:编辑 其中每个 row 的 47 列对应的 parquet 文件名样例如下:/usr/hadoop/part-01005-abcd14fd-f70f-40c5-a30e-fb3ea22c192e-c000.snappy.parquet程序为了统计每个 parquet 内文件数量是否一致。由于 P[Executor] x Q[Core] = M [Parquet Num],所以认为每一个 Task 处理一个 Parquet,所以应该每一个日志都打印 Path + wordCount 结果即一一对应,但是实际操作发现 Path + wordCount 存在一对多的情况,遂开始排查。二.Println 线程安全问题由于默认 P x Q = M 的情况下 Task 会与 Parquet 执行一一对应,所以应该每个 partition 都执行 foreachPartition 的 task(1) 操作去打印路径,所以实际打印路径数与读取 parquet 数量不一致时,首先想到的是线程安全的问题,而不是 partition 合并。所以把 println 换成了线程安全的 Spark Logging:object TestReadParquetV2 extends Logging { ... println(s"[$parquetId] Path: $path") ↓ logInfo(s"[$parquetId] Path: $path") ... }切换后重新运行程序发现日志还是存在一对多的情况,且与 println 日志一致:编辑其中 TestReadParquet 是 LogInfo 打印的路径,FileScanRDD 是 Executor 扫描文件打印的日志,这里 scan 扫描的打印日志数量多于 LogInfo 打印的日志,所以排除是打印的线程安全问题导致的,出现一对多的情况就是因为 Spark 内部对多个 Partition 进行了合并,从而导致两边不一致。后续又重新查看了源码发现错怪了 println,其内部也是线程安全的:编辑结论:println 线程安全,误会一场。 三.spark.sql.files.openCostInBytes编辑既然是因为 partition 合并导致了 Task 和 Parquet 的日志没有一一对应,接下来就得找一下 Spark.sql.files 的相关参数了,看看调整后能否一一对应,首先找到了 spark.sql.files.openCostInBytes,浅翻译一下:打开文件的估计成本(以字节数衡量)可以同时扫描。这在将多个文件放入分区时使用。最好是高估,这样小文件的分区会比大文件的分区(先安排)更快。此配置仅在使用基于文件的源(如Parquet、JSON和ORC)时有效。官方的解释看的有点蒙,大致意思是用于合并小文件,该参数默认 4M,表示小于 4M 的小文件会合并到一个分区中。样例中每个 parquet 文件为 8M,我把这个参数改为 1M 看下是否会取消合并:--conf spark.sql.files.openCostInBytes=1048576 \修改 spark-submit 脚本,增加 conf 配置,也可以在代码中 SparkConf 配置:编辑结论: 执行后依然是一对多的情况,没有达到 Task 与 Parquet 的一对一处理要求。 四.spark.sql.files.maxPartitionBytes (👍)编辑openCostInBytes 参数可以看作是 partition 的最小 bytes 要求,刚才试了一下不生效,现在试一下 partition 的最大 bytes 要求,maxPartitionBytes 参数规定了读取文件时要打包到单个分区中的最大字节数。此配置仅在使用基于文件的源(如Parquet、JSON和ORC)时有效:--conf spark.sql.files.maxPartitionBytes=9437184 \样例中每个 parquet 文件为 8M,规定最大 partition 大小为 9M,只要 8+8 合并,16M 就会超过 9M 的限制,下面看下这样配置是否生效:编辑这次没有问题了,Scan 扫描了 3个 RDD,LogInfo 打印了3个 RDD 的日志。结论:将 maxPartitionBytes 调整到文件大小附近可以实现 Partition - Task 的一对一处理。五.spark.sql.parquet.adaptiveFileSplit 该参数是控制 SparkSql 文件切分的自适应开关,但是并不是在读取处切分,而是针对读取内容切分,例如 Parquet 存储10列,程序只用到2列,程序内会进行优化逻辑。--conf spark.sql.parquet.adaptiveFileSplit=false \编辑 结论:adaptiveFileSplit 参数不能实现 Partition - Task 的一对一处理。六.spark.sql.adaptive.enabled编辑spark.sql.adaptive.enabled 参数主要应用于 sql 查询时的自适应操作,在 file 读取操作时添加未生效:--conf spark.sql.adaptive.enabled=false \编辑结论:spark.sql.adaptive.enabled 适用于查询不适用于读取分区合并,不能实现 Task-Partition 一对一操作。七.总结尝试了这么多参数,只有 spark.sql.files.maxPartitionBytes 参数满足了 Task-Partition 一一对应的需求,不过这里也不是真实意义上的一一对应,查看 Executor 的日志可以看到:编辑A.一一对应虽然配置的是 P[Executor] x Q[Core] = M [Parquet Num],预想的 Task-Partition 一对一是每一个 Core 都执行到一个 Parquet,但实际操作中有个 Core 执行了 N 个 Parquet,N > 1,而有一些 Core 则一个 Parquet 都没有执行,所以上述 maxPartitionBytes 近满足了每个 parquet 文件不被拆分的被一个 task 执行,并不能保证每个 core 都只执行1个 parquet,即使资源和core分配满。B.有多有少为什么出现有多有少的情况,因为一些 Executor 指定较快或率先申请到资源启动,所以此时会将其他还未执行或还在等待资源的 Task 调度到快的或者已经执行完的 Executor 上执行,上述任务是小数据量下的 WordCount 所以很快执行完毕,如果每个任务都持续时间较长且分配资源都正常,则可以达到真正的一个 core 执行一个 Parquet。C.Task 数量可以看到,大部分配置或者默认配置都是将小的 parquet 合并,因为这样即节省资源同时运行速度不会受太大影响。实际生产环境中,如果没有特殊的需求可以不完全要求一个 parquet 对应一个 task,如果想要将 task 的量优化到合理的范围内,可以参照自己的程序,例如 Parquet 一共10列,程序内使用 5列,则可以将对应的 maxPartitionBytes 参数调大到相应文件大小的5倍,这样虽然总数据量是5倍,但是实际读取的列数只相当于一个 parquet 的 10列,其次注意 spark 一个 block 默认为 128m,考虑到并行和资源相对充沛的情况下,也无需将 maxPartitionBytes 改的过大。
一.引用parquet 文件常见于 Spark、Hive、Streamin、MapReduce 等大数据场景,通过列式存储和元数据存储的方式实现了高效的数据存储与检索,下面主要讲 parquet 文件在 spark 场景下的存储,读取与使用中可能遇到的坑。编辑二.Parquet 加载方式1.SparkSession.read.parquetSparkSession 位于 org.apache.spark.sql.SparkSession 类下,除了支持读取 parquet 的列式文件外,SparkSession 也支持读取 ORC 列式存储文件,可以参考: Spark 读取 ORC FIleval conf = new SparkConf() .setAppName("ParquetInfo") .setMaster("local") val spark = SparkSession .builder .config(conf) .getOrCreate() spark.read.parquet(path).foreach(row => { val head = row.getString(0) println(head) })读取后会获取一个 Sql.DataFrame,支持常见的 sql 语法操作,如果不想使用 sql 才做也可以通过 .rdd 的方法得到 RDD[Row],随后遍历每个 partition 下的 Iterator[Row] 即可。Tips:后续可以执行 sql 操作,当然也支持初始化 SqlContext 调用 sql 方法,不过用 SparkSession 也可以搞定。val parquetFileDF = spark.read.parquet("path") parquetFileDF.createOrReplaceTempView("tableName") val resultDf = spark.sql("SELECT * FROM tableName") val sqlContext = new SQLContext(sc) sqkContext.sql("xxx")2.SparkContext.HadoopFile使用 hadoopFile 读取时需要指定对应的 K-V 以及 InputFormat 的格式,Parquet 文件对应的 K-V 为 Void-ArrayWritable,其 InputFormat 为: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat,获取 ArrayWritable 后通过索引可以获得 Writable。val sc = spark.sparkContext sc.setLogLevel("error") val parquetInfo = sc.hadoopFile(path, classOf[MapredParquetInputFormat], classOf[Void], classOf[ArrayWritable]) parquetInfo.take(5).foreach(info => { val writable = info._2.get() val head = writable(0) println(writable.length + "\t" + head) }) Tips:需要在 SparkConf 中加入序列化的配置,否则 hadoopFile 方法会报错:编辑.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")writable 需要通过反序列化的方式才能再获取具体内容,所以这里推荐使用 SparkSession 的官方 api 读取,不过可以 RcFile SparkSession 暂不支持直接读取,所以可以用 sc.hadoopRdd 的方法读取同样列式存储的 RcFile 格式文件,可以参考: Spark 读取 RcFile三.Parquet 存储方式1.静态转换Parquet -> Parquet,读取 parquet 生成 Sql.DataFrame 再转存,类似 RDD 的 transform:spark.read.parquet(path) .write.mode(SaveMode.Overwrite) .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ") .format("parquet") .save("/split")2.RDD[T*] 转换常规数据 RDD 可以通过加入 import sqlContext.implicits._ 隐式转换的方式由 RDD 转换为 sql.Dataframe,随后完成 parquet 的存储,下面掩饰一个 PairRDD 转换为 df 并存储的方法:import sqlContext.implicits._ val commonStringRdd = sc.emptyRDD[(String, String)].toDF() commonStringRdd.write .mode(SaveMode.Overwrite) .format("parquet") .save("")Tips:SaveModel 分为 Append 追加、Overwrite 覆盖、ErrorIfExists 报错、Ignore 忽略四种模式,前两个比较好理解,后面两个前者代表如果地址已存在则报错,后者如果地址已存在则忽略且不影响原始数据。SaveModel 通过枚举 Enum 的方式实现:编辑详细的 RDD 转换 Sql.DataFrame 可以参考:Spark - RDD / ROW / sql.DataFrame 互转 。3.RDD[Row] 转换如果有生成的 RDD[Row] 就可以直接调用 sqlContext 将该 RDD 转换为 DataFrame。这里 TABLE_SCHEMA 可以看作是每一列数据的描述,类似 Hive 的 column 的信息,主要是字段名和类型,也可以添加额外的信息,sqlContext 将对应的列属性与 Row 一一匹配,如果 Schema 长度没有达到 Row 的总列数,则后续字段都只能读为 Null。val sqlContext = new SQLContext(sc) final val TABLE_SCHEME = StructType(Array( StructField("A", StringType), StructField("B", StringType), StructField("C", StringType), StructField("D", StringType), StructField("E", StringType), StructField("F", StringType), StructField("G", StringType), StructField("H", StringType) )) val commonRowRdd = sc.emptyRDD[Row] sqlContext.createDataFrame(commonRowRdd, TABLE_SCHEME) .write.mode(SaveMode.Overwrite) .format("parquet") .save("/split")Tips:编辑使用上述语法读取时可能会报错: Illegal pattern component: XXX ,这是因为内部 DataFormat 解析的问题,在代码中加入 .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ") 即可。spark.read.option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ").parquet(path)四.Parquet 浅析Parquet 由于其开源,支持多平台多系统以及高效的存储和编码方案,使得其非常适合大数据场景下的任务开发,下面简单看下他的两个特性,列式存储和元数据存储:1.列式存储 - 更小的 IOCSV 是最常见的行式存储,对于一些需要单独特征或列的场景,如果是 CSV 文件需要遍历整行并分割,最终获取目标元素,而 Parquet 方式通过列式存储,对于单独的特征可以直接访问,从而提高了执行的效率,减少了数据 IO。CSV: A,B,C,D,E -> Split(",")[col] Parquet: A B C D E -> getString(col)2.元数据存储 - 更高的压缩比Parquet 采用多种编码 encoding 方式,保证数据的高效存储和低空间A.Run Length encoding游程编码,当一行的多列数据有很多重复数据时,可以通过 "X重复了N次" 的记录方法,缩小记录的成本,虽然 N 可能很大,但存储成本很小:[1,2,1,1,1,1,2] -> 1-1,2-1,1-4,2-1B.Dictionary encoding字典编码,顾名思义就是通过映射,保存重复过多的数据,例如 "0" -> "LongString":[LongString, LongString, LongString] -> [0, 0, 0]C.Delta encoding增量编码,适用于 unix 时间戳,时间戳记录为 1970年1月1日以来的秒数,存储时间戳时可以直接减去初始时间戳,减少存储量,比如 1577808000 作为基准,则可以减少很多存储空间:[1577808000, 1577808004, 1577808008] -> [0, 4, 8] 3.存储-压缩对比val st = System.currentTimeMillis() val pairInfo = (0 to 1000000).zipWithIndex.toArray val format = "csv" // csv、json、parquet sc.parallelize(pairInfo).toDF("A", "B") .write .mode(SaveMode.Overwrite) .option("timestampFormat", "yyyy/MM/dd HH:mm:ss ZZ") .save(s"./output/$format") val saveType = "gzip" // text、gzip sc.parallelize(pairInfo).saveAsTextFile(s"./output/$saveType", classOf[GzipCodec]) val cost = System.currentTimeMillis() - st println(s"耗时: $cost")使用上述两种方法分别将 0 到 1000000 的数组存到对应文件,看一下存储的大小:编辑类型TextGzipParquetCSVJSON大小(MB)15.8 4.6813.823.8相比于表格数据 CSV 和 JSON 存储,parquet 提供了更高的压缩比,Amazon S3 集群曾经对比过 CSV 与 parquet 的效率对比,使用 Parquet 可以缩减 87% 的大小,查询的速度快 34 倍 同时可以节省 99.7 的成本,所以在大数据量加经常需要个别列操作的场景下,Parquet 非常适合。4.读取-效率对比再分别读取上述文件:val csv = spark.read.csv(path + "/output/csv").rdd.count() val parquet = spark.read.parquet(path + "/output/parquet").rdd.count() val json = spark.read.json(path + "/output/json").count() val common = sc.textFile(path + "/output/common").count() val gz = sc.textFile(path + "/output/gzip").count()类型TextGzipParquetCSVJSON耗时(ms)14171448495268706766相比 CSV,JSON 是有优势的,但是相对于行数存储的 Text 和 Gzip,执行 count 类的行统计操作显然不是列式存储文件的强项,所以相差很多,如果是大数据下针对某个或几个字段统计,Parquet 会提供相比于行式存储文件更高的性能。 5.selectExpr编辑读取 Parquet 文件除了获取原始的字段内容外,也可以通过 selectExpr 操作获取更多额外的信息,方法位于 org.apache.spark.sql.functions 中,内部包含 collect_list 类似的聚合操作,也包含 count 类似的统计操作,还有 max、min、isnull 等等。spark.read.parquet(path).selectExpr("count('_c1')").rdd.foreach(row => { println(row.getLong(0)) })上述操作通过 selectExpr 获取了 count(_c1) 特征的数量,count Result:5383。其中 _c1 为 Parquet 获取的 sql.DataFrame 的默认 schema,可以通过下述方法获取默认的 schema 信息:val schema = spark.read.parquet(path).schema println(schema)编辑这里截取了一部分,特征名从 _c0 开始依次累加,默认为 _c0,_c1 ,如果自己定义了 schema 的 StructField ,使用 spark.read.schema().parqeut() 读出来的 sql.Dataframe 的 selectExpr 函数内操作使用的列名就要换成自己定义的名称,例如 _c1 我定义为 age,则上述写法要改为 count('age'),再使用 _c1 会报错。更详细的 schema 操作可以参考:Parquet 指定 schema五.总结Spark - Parquet 大致常用的内容就这些,SparkSession 集成了读取 parquet、orc 的 API 非常的便捷,有需要建议直接通过 API 读取而不是 HadoopRdd / HadoopFile 。最后想说 parquet 的命名确实很好玩,parquet 翻译为地板,而不定长的列名存储,如果通过平面展示也颇有地板的感觉。编辑
一.引言使用 Flink - Kafka 接数据 Source 时程序报错:org.apache.flink.runtime.JobException: Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy任务每次启动后持续10min左右,然后 RUNNING -> FAILED,如此重启失败了多次。二.问题现象1.任务 URL 界面对应任务界面可以看到有一个 Source 的 3 个 Task 在任务启动的时间内一直处于 INITIALIZING 状态,直到任务结束。编辑2.yarn 界面上述报错情况下 yarn 界面如下,任务重启后大约 7min 失败编辑三.问题分析与解决1.Source 持续 INITIALIZING 与 周期性 Failed查看异常栈日志:编辑A.持续 INITIALIZING 从下往上看,显示任务被远程的 taskManager close,结合上面 Source 端有 3 个 task 一直处于 INITIALIZING 状态,大概率是因为某 broker crash ,从而导致无法 ping 通该节点,导致超时无法启动,当超时时间大于 flink 规定的心跳周期,任务 INITIALIZING -> FAILED,所以出现URL 的持续 INITIALIZING。B.周期性 Failed再往上是:Flink Recovery is suppressed by FixedDelayRestartBackoffTime,其中 maxNumberRestartAttempt 为 3,此时我们可以到提交客户端的 flink-conf.yaml 查看对应配置:restart-strategy: failure-rate restart-strategy.failure-rate.failure-rate-interval: 2 min可以看到重启策略为 failure-rate 即周期性的重启,其中周期为 2min,结合上面 maxNumberRestartAttempt = 3,这也找到了为什么程序 7min 左右循环退出的原因了,程序有1 min 左右的申请资源和初始化启动的时间,运行期间出现故障,按照 failure-rate 策略重启共计耗时 3 x 2min = 6min,3次尝试后任务仍然无法运行( taskManger 持续 ping 不通导致故障) 达到最大重启次数,任务退出。filure-rate 重启策略如下:重启策略在出现故障后重新启动作业,但当超过故障率(每个时间间隔的故障数)时,作业最终会失败。在两次连续的重启尝试之间,重启策略会等待固定的时间。编辑也可以在代码中配置:val env = StreamExecutionEnvironment.getExecutionEnvironment() env.setRestartStrategy(RestartStrategies.failureRateRestart( 3, // max failures per unit Time.of(5, TimeUnit.MINUTES), //time interval for measuring failure rate Time.of(10, TimeUnit.SECONDS) // delay ))查看更多的重启策略信息可以查看官方 API:Flink 重启策略。2.问题解决上面分析了很多,又是看日志,又是看官方 API,问题解决的方法其实很简单:A.获取持续处于 INITIALIZING 的 task 对应的 Broker 地址找到对应 source 对应 kafka 的 properties 配置中的 bootstrap.servers 参数即可,一般形式为 "broker1:port,broker2:port,broker3:port,..."B.Ping 各个 brokerping $broker分别 ping 对应 broker 对应机器,如果该 broker 正常会快速显示下述类似信息:编辑如果该 broker 异常,则 ping 后 shell 界面处于 _ 的等待状态,此时说明对应 broker 连接异常。C.将异常 broker 从 bootstrap.servers 参数中剔除重启一般情况下 kafka 都会有机器设置的冗余且实现互备,所以正常情况下去掉单台 broker 重启后任务不受影响3.其他系统问题Recovery is suppressed by FixedDelayRestartBackoffTimeStrategy 该报错主要是告知对应任务故障重启,主要异常分析还是要依靠最开始的异常栈确定异常源头,且大部分是与 flink 相关的系统参数,配置有关,例如本例就是 flink 的 source 端无法完全启动导致。除了上述 broker crash 掉点导致的 connection refused 之外A.时间语义不匹配还可能因为任务的时间语义与env设置不匹配导致任务重启并最终故障:// 事件时间 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 处理时间 env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);B.TimeWindowAll 并行度TimeWindowAll 的并行度默认为1,不可以修改,如果代码内 setParallelism > 1 则会在任务 submit 时直接报错C.non serializable field 序列化如果在 Source 函数或者其他需要序列化的类内初始化了不可序列化的变量会在任务启动时报错,需要使该变量支持序列化或采用其他方式解决
一.引言使用 executor 线程池时经常用到 shutdown / shutdownNow + awaitTermination 方法关闭线程池,下面看下几种方法的定义与常见用法。二.API 释义1.shutdown/** * Initiates an orderly shutdown in which previously submitted * tasks are executed, but no new tasks will be accepted. * Invocation has no additional effect if already shut down. * * <p>This method does not wait for previously submitted tasks to * complete execution. Use {@link #awaitTermination awaitTermination} * to do that. * * @throws SecurityException {@inheritDoc} */ public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(SHUTDOWN); interruptIdleWorkers(); onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } tryTerminate(); }shutdown 主要工作如下:Initiates an orderly shutdown in which previously submittedtasks are executed - 之前提交的继续执行but no new tasks will be accepted. - 不再接收新任务This method does not wait for previously submitted tasks tocomplete execution. - 该方法不会等待以前提交的任务完成,可以配合 awaitTermination 方法等待。2.shutdownNow/** * Attempts to stop all actively executing tasks, halts the * processing of waiting tasks, and returns a list of the tasks * that were awaiting execution. These tasks are drained (removed) * from the task queue upon return from this method. * * <p>This method does not wait for actively executing tasks to * terminate. Use {@link #awaitTermination awaitTermination} to * do that. * * <p>There are no guarantees beyond best-effort attempts to stop * processing actively executing tasks. This implementation * cancels tasks via {@link Thread#interrupt}, so any task that * fails to respond to interrupts may never terminate. * * @throws SecurityException {@inheritDoc} */ public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); advanceRunState(STOP); interruptWorkers(); tasks = drainQueue(); } finally { mainLock.unlock(); } tryTerminate(); return tasks; }shutdownNow 主要工作如下:Attempts to stop all actively executing tasks - 尝试停止正在执行的任务halts the processing of waiting tasks - 停止等待的任务and returns a list of the tasks that were awaiting execution - 返回等待列表的任务These tasks are drained (removed) from the task queue upon return from this method. - 从该方法返回时,将这些任务从队列中删除Tips:该任务不会等待主动执行的任务终止,可以配合 awaitTermination 方法等待。该方法尽可能停止主动执行的任务,通过 Thread.interrupt 实现,未能响应中断的任务可能不会停止该方法与 shutdown 差别在 interruptIdleWorkers 和 interruptWorkers,后者会调用 interrutp 方法到正在执行的 worker 上,而前者只会取消等待的任务。3.awaitTermination* Threads waiting in awaitTermination() will return when the * state reaches TERMINATED. public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { for (;;) { if (runStateAtLeast(ctl.get(), TERMINATED)) return true; if (nanos <= 0) return false; nanos = termination.awaitNanos(nanos); } } finally { mainLock.unlock(); } }awaitTermination 方法会等待线程到达 TERMINATED 即已终止的状态。如果线程池已经关闭,则直接返回 true;如果线程池未关闭,该方法会根据 Timeout + TimeUnit 的延时等待线程结束,并根据到期后的线程池状态返回 true 或者 false,注意该方法不会关闭线程池,只负责延时以及检测状态。4.runStateA.状态线程 runState 的几种状态与转换RUNNING:接受新任务并处理排队的任务SHUTDOWN:不接受新任务,但处理排队的任务STOP:不接受新任务,不处理排队任务,并中断正在进行的任务TIDYING:所有任务都已终止,workerCount 为零,线程转换到状态 TIDYING,将运行 terminate() HookTERMINATED:终止()已完成B.转换RUNNING -> SHUTDOWN : 在调用 shutdown() 时,可能隐含在 finalize() 中(RUNNING or SHUTDOWN) -> STOP : 调用 shutdownNow()SHUTDOWN -> TIDYING:当队列和池都为空时STOP -> TIDYING:当池为空时TIDYING -> TERMINATED:当 terminate() 钩子方法完成时三.实践1.shutdown + awaitTermination(500ms)processNumBuffer 为 Runnable 内的逻辑,针对给定的一批数字求出最小,最大值并返回结果字符串保存,共 500000 个数字,每 50000 个数字生成一个 Runnable。执行逻辑后调用 shutdown + awaitTermination。getCurrentThread 方法负责打印当前的可用线程,用来观测调用 shutdown 和 awaitTermination 后线程池中线程的变化。import java.util.concurrent.{CopyOnWriteArraySet, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit} object ExecutorPoolShutdown { // Runnable 内执行逻辑,寻找上下界 def processNumBuffer(nums: Array[Int], taskId: Long): String = { val maxNum = nums.max val minNum = nums.min val log = s"TaskId: $taskId Length: ${nums.length} Min: $minNum Max: $maxNum" log } def main(args: Array[String]): Unit = { // 存储所有 Task 的日志 val logAll = new StringBuffer() // 存储所有可用的 TaskId val taskSet = new CopyOnWriteArraySet[Long]() // 初始化线程池 val executor = new ThreadPoolExecutor(6, 10, 3000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue()) val numIterator = (0 until 500000).iterator // 每50000个数据生成一个 Task numIterator.grouped(50000).foreach(group => { executor.execute(new Runnable() { override def run(): Unit = { val taskId = Thread.currentThread().getId taskSet.add(taskId) // 添加 taskID val res = processNumBuffer(group.toArray, taskId) logAll.append(res + "\n") // 添加统计日志 } }) }) // 调用 shutdown executor.shutdown() // 获取当前线程 println("After shutdown...") getCurrentThread() val st = System.currentTimeMillis() // 调用 awaitTerminatio val state = executor.awaitTermination(500, TimeUnit.MILLISECONDS) val cost = System.currentTimeMillis() - st println(s"Executor State: $state Cost: $cost") // 再次获取当前线程 println("After awaitTermination...") getCurrentThread() println(logAll.toString) println(taskSet.toArray().mkString(",")) } }由于任务逻辑比较简单,调用 shutdow 和 awaitTermation 后线程数均为 5 且是和 pool 无关的线程,说明线程池的 Task 在 shutdow 后就已经全部运行完毕了,这点从 awaitTermation 返回的状态为 true 和等待时间为 0 也可以看出来。最后就是 500000 / 50000 = 10 共计10条 log,再次说明 task 都执行完毕,最后显示本次任务执行中共使用了 6 个 Task。编辑Tips:shutdown 后如果线程池已经结束,则 awaitTermation 方法不会等待,直接返回 true。2.任务延时 + shutdown + awaitTermination(500ms)上面的任务执行速度求最大最小值,执行速度很快,所以不好看到 shutdown 和 awaitTerminatioN 的作用,下面模拟一个运行时间稍长的任务,在原始 Runnable 中加入 sleep(2000),延长程序2s的运行时间:numIterator.grouped(50000).foreach(group => { executor.execute(new Runnable() { override def run(): Unit = { val taskId = Thread.currentThread().getId taskSet.add(taskId) Thread.sleep(2000) val res = processNumBuffer(group.toArray, taskId) logAll.append(res + "\n") } }) })再次运行:编辑这次的结果和上面完全不同,首先是不管是调用 shutdown 还是 awaitTermination,可以看到活跃线程中都包含 pool-1-thread-x,即这两个 API 调用后逻辑仍在运行没有结束;再看 awaitTermination 返回的状态为 false,代表线程池未完全关闭,cost=507ms,等待 500ms 后线程池仍未完全退出,但是主线程已经结束,所以 LogAll 里没有日志加入,最后只打印出了使用过的 TaskId。Tips:shutdown 后线程仍在继续运行,对应前面提到的 shutdown 之后当前运行的任务继续执行,只不过不会增加新任务,而 awaitTermination 后线程依然活跃,对应前面的 awaitTermination 方法只返回线程池关闭状态,不会关闭线程池。3.任务延时 + shutdown + awaitTermination(2000ms)调整 awaitTermination 的等待时间,从500ms提升至2000msval st = System.currentTimeMillis() val state = executor.awaitTermination(2000, TimeUnit.MILLISECONDS) val cost = System.currentTimeMillis() - st println(s"Executor State: $state Cost: $cost")再次运行: 编辑和上面相比,executor 的关闭状态返回的仍然是 false,但是等到的时间延迟至约 2000ms,可以看到随着等待时间增加,一部分 task已经完成了,但是并没有全部完成。将延时时间修改为 5000ms 再次运行:编辑等到 4000ms 时 executor 就结束了,所以 awaitTermination 返回为 true,后续也没有 pool-1-thread-x 相关的 task,最终的输出 log 也完整。4.shutdownNow + awaitTermination(500ms)shutdownNow 相比 shutdown 会多一个返回值,即等待列表的任务。val tasks = executor.shutdownNow() tasks.asScala.foreach(task => { println(task) })运行一下:编辑 由于任务运行很快,所以快速任务下,shutdown 和 shutdownNow 结果相同,awaitTermination 返回为 ture 且未等待。5.任务延时 + shutdownNow + awaitTermination(500ms)numIterator.grouped(50000).foreach(group => { executor.execute(new Runnable() { override def run(): Unit = { val taskId = Thread.currentThread().getId taskSet.add(taskId) Thread.sleep(2000) val res = processNumBuffer(group.toArray, taskId) logAll.append(res + "\n") } }) })任务增加 sleep 2000ms 后再运行一下: 编辑 最上面的任务显示 :Caused by: java.lang.InterruptedException: sleep interrupted即任务 sleep 期间被 interrupt 了,所以执行的 task 结束,和 shutdown 相比,正在执行的线程并不会被 interrupt;下面打印出来4个 Runnable,因为这四个 Task 还在等待队列中,shutdownNow 直接把他们返回了;最后下面因为 executor 已经关闭,所以状态为 true,等待时间为0。6.任务延时 + awaitTermination(xxxms) + shutdownNow通过上面 shotdownNow + awaitTermination 的示例中可以看到,如果任务不能很快执行,那么调用 shotdownNow 的结果就是所有 task 都没结束,任务没有任何改动。如果希望对任务设定一定期间,能完成多少完成多少,可以调整顺序,修改为先 awaitTermination 再 shutdownNow:val st = System.currentTimeMillis() val state = executor.awaitTermination(3000, TimeUnit.MILLISECONDS) val cost = System.currentTimeMillis() - st println(s"Executor State: $state Cost: $cost") println("After awaitTermination...") getCurrentThread() val tasks = executor.shutdownNow() tasks.asScala.foreach(task => { println(task) }) println("After shutdown...") getCurrentThread()调整完顺序后再次运行:编辑等待时间设置为 3000ms,相当于你对你的任务要求是: 3000ms 内能跑完多少算多少,没跑完就不要了;可以看到 awaitTermination 到期后返回状态为 false,说明线程内的任务还未全部结束;再看下面 shutdownNow 后,线程里已经不存在 pool-1-thread-x ,且打印出部分结果,共计6条;最下面是 interrupt 其他正在运行的 task 打印的异常栈,最终程序 exit(0) 正常退出。四.总结经过上面的代码分析,对几个方法进行一下总结:shutdown : 等待执行的任务执行,不再添加新任务shutdownNow:interrupt 当前执行的任务,不再添加新任务,返回等待的任务awaitTermination:不影响线程池开关状态,只返回状态,可以堵塞线程等待一定时间可以结合上面的6个例子以及自己任务的耗时和容忍度,决定怎么组合上面三个 API,如果一定要等到 executor 内的 task 都运行完毕再关闭 executor 且不好估算内部 task 运行时间,可以采用如下操作:executor.shutdown() while (!executor.awaitTermination(500, TimeUnit.MILLISECONDS)) { println("Task is Running...") } println("Task is Finish!")通过 while true 保证线程池内 task 都运行完毕才进行后续操作,不过需要注意 Task 内部不要有死循环,否则会导致无法跳出该 While 循环,整个程序堵塞在这里。
一.引言现在有一批流数据想要存储到 Redis 中,离线可以使用 Spark + foreach 搞定,由于是多流 join 且带状态,所以 SparkStreaming + foreach 也无法实现,而 Flink 不支持 foreach 操作触发 execute,这里采用 RedisSink 代替实现 foreach 逻辑。二.RedisSink 简介1.源码浅析编辑RedisSink 和 KafkaSink 类似都是继承了 RichSinkFunction,其内部主要实现了三个方法以及五个主要变量 :A.五个变量String additionalKey : 附加键,redis 主要是 k-v 存储,也有 k-k-v 式存储,additionalKey 即为 k-k-v 的第一个 kRedisMapper<In> redisSinkMapper : 从 In 中解析 k,v,按指定的 RedisCommand 执行操作RedisCommond redisCommand : redis 指令,例如 set(k, v),lpush(k, v) ...FlinkJedisConfigBase: Redis 配置,分别支持 Redis、RedisPool 、RedisClusterRedisCommandsContainer:redis 容器,根据 FlinkJedisConfigBase 配置以及上面的 commond 执行 k-v、k-k-v 的操作B.三个方法open: 初始化相关参数,主要是基于 FlinkJedisConfigBase 初始化 RedisCommandsContainerclose: 关闭相关 Socket,这里主要关闭 RedisCommandsContainerinvoke: 针对单个 INPUT 基于 Socket 的执行操作,这里主要是执行相关 Jedis、JedisPool、JedisCluster 操作2.底层实现A.FlinkJedisConfigBase编辑FlinkJedisConfigBase 其实只是一个中转类,其内部存储了相关的 jedis 参数,执行 build 初始化时将 FlinkJedisConfigBase 内的参数转到 GenericObjectPoolConfig 中再构造 RedisCommandsContainer编辑B. RedisCommandsContainerRedisCommandsContainer 底层实现基于 Jedis 的 JedisCluster、JedisPool 和 JedisSentinePool,分贝对应 flinkJedisCluster、flinkJedisPool 和 flinkJedisSentine,通过 build 方法和 flinkJedisConfig 实现相关类的初始化。编辑C. RedisCommond这里其实是对 Jedis 指令的封装,目前只支持无返回值的存储命令,例如 lpush、sadd、hset 等等,也可以理解,对于流式程序的最终 sink,在低延迟高吞吐的场景下,尽量避免读取的流量,例如 get、hget 命令很明显不适合在 sink 场景下实现,不过也不是不能实现,继承 RedisCommandsContainer 类即可基于 Jedis 实现其他的 redis 指令。编辑三.RedisSink 示例1.实现需求与辅助类需求: 自定义 Source 实现将 k-v 存储至 redis 中A.K-V 存储类case class SaveInfo(key: String, value: String)B.RedisMapper 命令类这里使用最基础的 SET 命令,将 SaveInfo 的 k-v 存储至对应 redis。import org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand, RedisCommandDescription, RedisMapper} class JedisMapper extends RedisMapper[SaveInfo] { override def getCommandDescription: RedisCommandDescription = { new RedisCommandDescription(RedisCommand.SET) } override def getKeyFromData(saveInfo: SaveInfo): String = { saveInfo.key } override def getValueFromData(saveInfo: SaveInfo): String = { saveInfo.value } }2.主函数def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // 自定义 Source val sourceArray = (0 to 5).map("TestKey" + _).zipWithIndex.toArray.map { case (k, v) => SaveInfo(k, v.toString) } // 定义 FlinkJedisPool 配置 val flinkJedisPoolConfig = new FlinkJedisPoolConfig.Builder() .setHost(host) .setPort(port) .setTimeout(1000) .setMaxTotal(10) .setMaxIdle(10) .setMinIdle(10) .build() // 初始化 RedisSink val jedisSink = new RedisSink(flinkJedisPoolConfig, new JedisMapper) // 执行 DAG env.fromCollection(sourceArray).addSink(jedisSink) env.execute() }生成测试的有限流,并直接引入 JedisSink,逻辑非常简单。3.运行效果先看下 Source 内的几条数据样式:编辑再看下执行后的 Redis 内容:编辑逻辑执行没有问题。四.总结这里示例了最基本的 JedisSink 方法,即初始化 FlinkJedisPool 进行单条数据的 Invoke 操作,但是一般最好采用批处理的方式,即获取 RedisResource,存储 N 条,return resource,如此循环往复。后续将介绍自定义实现 RedisCommandsContainer 的方法以及如何流转批,一次处理多条 redis 存储 k-v。
2022年08月