flink问题之使用debezium-json format报错如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:flink 1.11 es未定义pk的sink问题


根据文档[1]的描述,1.11的es sql connector如果在ddl里没有声明primary key,将会使用append模式sink数据,并使用es本身生成的id作为document_id。但是我在测试时发现,如果我的ddl里没有定义primary key,写入时没有正确生成document_id,反而是将index作为id生成了。导致只有最新的一条记录。下面是我的ddl定义: 不确定是我配置使用的方式不对,还是确实存在bug。。

CREATE TABLE ES6_SENSORDATA_OUTPUT ( event varchar, user_id varchar, distinct_id varchar, _date varchar, _event_time varchar, recv_time varchar, _browser_version varchar, path_name varchar, _search varchar, event_type varchar, _current_project varchar, message varchar, stack varchar, component_stack varchar, _screen_width varchar, _screen_height varchar ) WITH ( 'connector' = 'elasticsearch-6', 'hosts' = '<ES_YUNTU.SERVERS>', 'index' = 'flink_sensordata_target_event', 'document-type' = 'default', 'document-id.key-delimiter' = '$', 'sink.bulk-flush.interval' = '1000', 'failure-handler' = 'fail', 'format' = 'json' )

[1]https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/elasticsearch.html#key-handling


参考回答:

如果没有定义主键,ES connector 会把 _id设为null[1],这样ES的Java Client会将_id设为一个随机值[2].

所以应该不会出现您说的这种情况。您那里的ES有没有请求日志之类的,看一下Flink发过来的请求是什么样的。

[1] https://github.com/apache/flink/blob/f0eeaec530e001ab02cb889dfe217e25913660c4/flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/RowElasticsearchSinkFunction.java#L102

[2] https://github.com/elastic/elasticsearch/blob/977230a0ce89a55515dc6ef6452e9f059d9356a2/core/src/main/java/org/elasticsearch/action/index/IndexRequest.java#L509


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372256


问题二:单流的一条数据,需要先sink 至mysql,再sink至kafka,并保证两sink的原子性以及s


请问下,有没有大佬做过类似的事情?

另外,flink side out功能,可以将单流分成多流,但是不是分成多流后,但两条流sink的时候,是不是没法保证sink时候的时序?


参考回答:

你可以先用 map 再用 addSink,这样他们的调用被 chain 在一起,可以达到先写入 mysql ,再写入 kafka 的目的。

datastream.map(new MySQLSinkMapFunction()).addSink(new

FlinkKafkaProducer()).

也就是将 mysql sink 伪装成了一个 MapFunction,里面先做了 写 mysql 的动作,写成功后再将数据输出到下游。

另外,如果要在 SQL 中解决这个需求的话,会比较麻烦,因为标准语法中没有这么个语法支持这个功能。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372259


问题三:flink使用debezium-json format报错


log4j:WARN No appenders could be found for logger (org.apache.flink.table.module.ModuleManager). log4j:WARN Please initialize the log4j system properly. Exception in thread "main" org.apache.flink.table.api.TableException: Provided trait [BEFORE_AND_AFTER] can't satisfy required trait [ONLY_UPDATE_AFTER]. This is a bug in planner, please file an issue. Current node is TableSourceScan(table=[[default_catalog, default_database, ddd]], fields=[id, age]) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitChildren$2(FlinkChangelogModeInferenceProgram.scala:626) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:233) at scala.collection.Iterator.foreach(Iterator.scala:937) at scala.collection.Iterator.foreach$(Iterator.scala:937) at scala.collection.AbstractIterator.foreach(Iterator.scala:1425) at scala.collection.IterableLike.foreach(IterableLike.scala:70) at scala.collection.IterableLike.foreach$(IterableLike.scala:69) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike.map(TraversableLike.scala:233) at scala.collection.TraversableLike.map$(TraversableLike.scala:226) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.visitChildren(FlinkChangelogModeInferenceProgram.scala:614) at org.apache.flink.table.planner.plan.optimize.program.FlinkChangelogModeInferenceProgram$SatisfyUpdateKindTraitVisitor.$anonfun$visitSink$1(FlinkChangelogModeInferenceProgram.scala:690) at scala.collection.TraversableLike.$anonfun$flatMap$1(TraversableLike.scala:240)


参考回答:

这是 changgelog 里的一个bug[1], 在1.11.1和master上已经修复,1.11.1社区已经在准备中了。 [1] https://issues.apache.org/jira/browse/FLINK-18461 https://issues.apache.org/jira/browse/FLINK-18461


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372264


问题四:Flink es7 connector认证问题


我们生产环境的es7是有用户名密码认证的,使用如下代码启动后会报,这段代码调用了es rest client api,单独使用是没问题的,不过放到 flink 里就报错了 org.elasticsearch.client.ResponseException: method [HEAD], host [xxx], URI [/], status line [HTTP/1.1 401 Unauthorized] ParameterTool pt = ParameterTool.fromArgs(args); String confFile = pt.get("confFile"); pt = ParameterTool.fromPropertiesFile(new File(confFile)); provider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(pt.get("es.user.name"), pt.get("es.user.password")));

esSinkBuilder.setRestClientFactory( (RestClientBuilder restClientBuilder) -> restClientBuilder .setRequestConfigCallback(requestConfigBuilder -> requestConfigBuilder.setSocketTimeout(180000) .setConnectionRequestTimeout(10000) ) .setHttpClientConfigCallback(httpClientBuilder -> { httpClientBuilder.disableAuthCaching(); //禁用 preemptive 身份验证 return httpClientBuilder.setDefaultCredentialsProvider(provider); } ) );


参考回答:

请问您有检查过pt.get("es.user.name"),

pt.get("es.user.password")这两个参数读出来是否都是正确的


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372267


问题五:flink sql 侧输出


大佬们、我们这面主要基于blink sql完成转换计算,但是可能会有延迟数据,现在想把延迟数据通过侧输出保存下来,在table/sql

api中要怎么操作比较合理一点?或者有没有其他处理延迟数据的方式?


参考回答:

Flink SQL/Table 目前还不支持 side output。不过有一个实验性的功能可以处理延迟数据,

你可以给你的作业配上:

table.exec.emit.late-fire.enabled = true

table.exec.emit.late-fire.delay = 1min

同时 TableConfig#setIdleStateRetentionTime 需要配上,表示窗口状态允许保留多久,即 window allowLateness 。

具体可以看下 org.apache.flink.table.planner.plan.utils.WindowEmitStrategy 这个类。


关于本问题的更多回答可点击原文查看:https://developer.aliyun.com/ask/372272

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
SQL Shell API
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之任务提交后出现 "cannot run program "/bin/bash": error=1, 不允许操作" ,是什么原因
|
2月前
|
资源调度 监控 关系型数据库
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
实时计算 Flink版操作报错合集之处理大量Join时报错空指针异常,是什么原因
|
2月前
|
SQL Java Apache
实时计算 Flink版操作报错合集之使用parquet时,怎么解决报错:无法访问到java.uti.Arrays$ArrayList类的私有字段
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
2月前
|
Oracle 关系型数据库 Java
实时计算 Flink版操作报错合集之遇到了关于MySqIValidator类缺失的错误,是什么原因
在使用实时计算Flink版过程中,可能会遇到各种错误,了解这些错误的原因及解决方法对于高效排错至关重要。针对具体问题,查看Flink的日志是关键,它们通常会提供更详细的错误信息和堆栈跟踪,有助于定位问题。此外,Flink社区文档和官方论坛也是寻求帮助的好去处。以下是一些常见的操作报错及其可能的原因与解决策略。
|
10天前
|
XML 存储 JSON
Twaver-HTML5基础学习(19)数据容器(2)_数据序列化_XML、Json
本文介绍了Twaver HTML5中的数据序列化,包括XML和JSON格式的序列化与反序列化方法。文章通过示例代码展示了如何将DataBox中的数据序列化为XML和JSON字符串,以及如何从这些字符串中反序列化数据,重建DataBox中的对象。此外,还提到了用户自定义属性的序列化注册方法。
27 1
|
2月前
|
JSON 前端开发 JavaScript
|
7天前
|
存储 JSON Go
在Gin框架中优雅地处理HTTP请求体中的JSON数据
在Gin框架中优雅地处理HTTP请求体中的JSON数据
|
11天前
|
JSON JavaScript 数据格式
vue写入json数据到文本中+vue引入cdn的用法
vue写入json数据到文本中+vue引入cdn的用法
|
9天前
|
JSON 数据格式
Blob格式转json格式,拿到后端返回的json数据
文章介绍了如何将后端返回的Blob格式数据转换为JSON格式,并处理文件下载和错误提示。
21 0
Blob格式转json格式,拿到后端返回的json数据
|
2月前
|
存储 JSON API
淘系API接口(解析返回的json数据)商品详情数据解析助力开发者
——在成长的路上,我们都是同行者。这篇关于商品详情API接口的文章,希望能帮助到您。期待与您继续分享更多API接口的知识,请记得关注Anzexi58哦! 淘宝API接口(如淘宝开放平台提供的API)允许开发者获取淘宝商品的各种信息,包括商品详情。然而,需要注意的是,直接访问淘宝的商品数据API通常需要商家身份或开发者权限,并且需要遵循淘宝的API使用协议。
淘系API接口(解析返回的json数据)商品详情数据解析助力开发者

相关产品

  • 实时计算 Flink版
  • 下一篇
    无影云桌面