Flink依赖问题之connector hive依赖冲突如何解决

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。

问题一:Apache Flink常见问题汇总【精品问答】

我看1.11的java.sql.Timestamp 对应的是Flink的TIMESTAMP(9),跟之前默认的TIMESTAMP(3)有区别,而且之前1.10的Timestamp(3)是带时区UTC的,现在这个类型不带时区了。想问下这个具体调整应该如何适配?

*来自志愿者整理的flink邮件归档



参考答案:我通过flink sql 定义了一个es sink,其中有个字段类型定义为了 eventTime TIMESTAMP(9) WITH LOCAL TIME ZONE。 在尝试写入时,报了如下的异常。看来json parser无法解析这种类型。请问下大神们,我应该怎么写入一个UTC日期的时间类型?格式类似 2020-07-15T12:00:00.000Z

java.lang.UnsupportedOperationException: Not support to parse type: TIMESTAMP(9) WITH LOCAL TIME ZONE

at org.apache.flink.formats.json.JsonRowDataSerializationSchema.createNotNullConverter(JsonRowDataSerializationSchema.java:184)

*来自志愿者整理的flink邮件归档




关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370296?spm=a2c6h.12873639.article-detail.53.6f9243783Lv0fl



问题二:connector hive依赖冲突

大佬们,下面连接hive的依赖包的哪个传递依赖导致的jar包冲突,我从1.9到1.11每次在maven按照官方文档引包都会出现依赖冲突。。。。1.9刚发布的时候对下面的引包有做依赖排除,后来文档改了

// Flink's Hive connector.Contains flink-hadoop-compatibility and flink-orc jars

flink-connector-hive_2.11-1.11.0.jar

// Hive dependencies

hive-exec-2.3.4.jar

*来自志愿者整理的flink邮件归档



参考答案:

1.9和1.10时候排除一些传递依赖后在idea和打uber jar在集群环境都可以运行,不排除传递依赖的话在idea运行不了;

1.11现在只在本地测哪,不排除传递依赖idea运行不了,集群环境还没弄,但是我感觉在idea直接run这个功能好多人都需要,文档是不是可以改进一下*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370251?spm=a2c6h.12873639.article-detail.54.6f9243783Lv0fl



问题三:flink1.11 pyflink stream job 退出

python flink_cep_example.py 过几秒就退出了,应该一直运行不退出的啊。 代码如下,使用了MATCH_RECOGNIZE:

s_env = StreamExecutionEnvironment.get_execution_environment() b_s_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build() st_env = StreamTableEnvironment.create(s_env, environment_settings=b_s_settings) configuration = st_env.get_config().get_configuration() configuration.set_string("taskmanager.memory.task.off-heap.size", "500m")

s_env.set_parallelism(1)

kafka_source = """CREATE TABLE source ( flow_name STRING, flow_id STRING, component STRING, filename STRING, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'cep', 'properties.bootstrap.servers' = 'localhost:9092', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' )"""

postgres_sink = """ CREATE TABLE cep_result ( filename STRING, start_tstamp TIMESTAMP(3), end_tstamp TIMESTAMP(3) ) WITH ( 'connector.type' = 'jdbc', 'connector.url' = 'jdbc:postgresql://127.0.0.1:5432/postgres', 'connector.table' = 'cep_result', 'connector.driver' = 'org.postgresql.Driver', 'connector.username' = 'postgres', 'connector.password' = 'my_password', 'connector.write.flush.max-rows' = '1' ) """

st_env.sql_update(kafka_source) st_env.sql_update(postgres_sink)

postgres_sink_sql = ''' INSERT INTO cep_result SELECT * FROM source MATCH_RECOGNIZE ( PARTITION BY filename ORDER BY event_time MEASURES (A.event_time) AS start_tstamp, (D.event_time) AS end_tstamp ONE ROW PER MATCH AFTER MATCH SKIP PAST LAST ROW PATTERN (A B C D) DEFINE A AS component = 'XXX', B AS component = 'YYY', C AS component = 'ZZZ', D AS component = 'WWW' ) MR '''

sql_result = st_env.execute_sql(postgres_sink_sql)

*来自志愿者整理的flink邮件归档



参考答案:

execute_sql是一个异步非阻塞的方法,所以你需要在你的代码末尾加上 sql_result.get_job_client().get_job_execution_result().result() 对此我已经创建了JIRA[1]

[1] https://issues.apache.org/jira/browse/FLINK-18598*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370249?spm=a2c6h.12873639.article-detail.55.6f9243783Lv0fl



问题四:官方pyflink 例子的执行问题

官方例子: https://flink.apache.org/2020/04/09/pyflink-udf-support-flink.html 按照例子写了程序,也安装了pyflink | python -m pip install apache-flink | 代码: | from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, DataTypes from pyflink.table.descriptors import Schema, OldCsv, FileSystem from pyflink.table.udf import udf

env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env)

add = udf(lambda i, j: i + j, [DataTypes.BIGINT(), DataTypes.BIGINT()], DataTypes.BIGINT())

t_env.register_function("add", add)

t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/src.txt'))

.with_format(OldCsv() .field('a', DataTypes.BIGINT()) .field('b', DataTypes.BIGINT()))

.with_schema(Schema() .field('a', DataTypes.BIGINT()) .field('b', DataTypes.BIGINT()))

.create_temporary_table('mySource')

t_env.connect(FileSystem().path('C:/Users/xuyin/Desktop/docker_compose_test/tar.txt'))

.with_format(OldCsv() .field('sum', DataTypes.BIGINT()))

.with_schema(Schema() .field('sum', DataTypes.BIGINT()))

.create_temporary_table('mySink')

t_env.from_path('mySource')

.select("add(a, b)")

.insert_into('mySink')

t_env.execute("tutorial_job") |

执行:

| python test_pyflink.py |

报错:

| Traceback (most recent call last): File "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py", line 147, in deco return f(*a, **kw) File "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o2.execute. : org.apache.flink.table.api.TableException: The configured Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb. The Task Off-Heap Memory can be configured using the configuration key 'taskmanager.memory.task.off-heap.size'. at org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.checkPythonWorkerMemory(CommonPythonBase.scala:158) at org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getMergedConfiguration(CommonPythonBase.scala:119) at org.apache.flink.table.planner.plan.nodes.common.CommonPythonBase$class.getConfig(CommonPythonBase.scala:102) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.getConfig(StreamExecPythonCalc.scala:35) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:61) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecPythonCalc.translateToPlanInternal(StreamExecPythonCalc.scala:35) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecCalcBase.translateToPlan(StreamExecCalcBase.scala:38) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToTransformation(StreamExecLegacySink.scala:158) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:106) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58) at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67) at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66) at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248) at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1240) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282) at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79) at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:745)

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "test_pyflink.py", line 34, in t_env.execute("tutorial_job") File "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\table\table_environment.py", line 1057, in execute return JobExecutionResult(self._j_tenv.execute(job_name)) File "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\py4j\java_gateway.py", line 1286, in call answer, self.gateway_client, self.target_id, self.name) File "C:\Users\xuyin\AppData\Local\Programs\Python\Python37\lib\site-packages\pyflink\util\exceptions.py", line 154, in deco raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace) pyflink.util.exceptions.TableException: "The configured Task Off-Heap Memory 0 bytes is less than the least required Python worker Memory 79 mb. The Task Off-Heap Memory can be configured using the configuration key 'taskmanager.memory.task.off-heap.size'." |

里面提到还要配置taskmanager.memory.task.off-heap.size这个属性吗 ,

我找到..\Python\Python37\Lib\site-packages\pyflink\conf下面的flink-conf.yaml

增加了taskmanager.memory.task.off-heap.size: 100m

但是还是报一样的错误

请问用python安装的flink去哪里配置属性

*来自志愿者整理的flink邮件归档



参考答案:

你需要添加配置,如果你没有使用RocksDB作为statebackend的话,你直接配置t_env.get_config().get_configuration().set_boolean("python.fn-execution.memory.managed", True)就行,如果你用了的话,就需要配置off-heap memory了,table_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m')。你可以参考文档上的例子,以及对应的note说明[1]

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/python/python_udfs.html#scalar-functions*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370237?spm=a2c6h.12873639.article-detail.56.6f9243783Lv0fl



问题五:Flink整合hive之后,通过flink创建的表,hive beeline可见表,不可见字段?

参照文档https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/#connecting-to-hive 通过flink创建表:CREATE TABLE Orders (product STRING, amount INT) 在beeline端可见表,但是desc看不到字段,select * from orders也不可用

*来自志愿者整理的flink邮件归档



参考答案:

默认创建的是Flink表,Hive端不可见。

你想创建Hive表的话,用Hive dialect。*来自志愿者整理的flink邮件归档



关于本问题的更多回答可点击进行查看:

https://developer.aliyun.com/ask/370236?spm=a2c6h.12873639.article-detail.57.6f9243783Lv0fl

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
2月前
|
消息中间件 Kafka Apache
Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
【2月更文挑战第6天】Flink 提供了与 Kafka 集成的官方 Connector,使得 Flink 能够消费 Kafka 数据
72 2
|
1月前
|
Java 数据库连接 数据库
Flink Connector JDBC已经被移到了一个独立的仓库
【2月更文挑战第23天】Flink Connector JDBC已经被移到了一个独立的仓库
14 1
|
1月前
|
SQL 关系型数据库 MySQL
Flink CDC + Hudi + Hive + Presto构建实时数据湖最佳实践
Flink CDC + Hudi + Hive + Presto构建实时数据湖最佳实践
147 0
|
1月前
|
SQL JSON 监控
使用 SPL 高效实现 Flink SLS Connector 下推
SLS 推出了 SPL 语言,可以高效的对日志数据的清洗,加工。对 SPL 及 SPL 在阿里云 Flink SLS Connector 中应用进行介绍及举例。
56027 153
|
2月前
|
SQL Java 数据库连接
Flink扩展问题之jdbc connector扩展失败如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
SQL 消息中间件 Kafka
Flink部署问题之hive表没有数据如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
SQL Java Apache
Flink报错问题之flink-1.11写hive报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
SQL 消息中间件 Apache
Flink报错问题之使用hive udf函数报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
2月前
|
SQL 消息中间件 Java
Flink报错问题之写入Hive报错如何解决
Apache Flink是由Apache软件基金会开发的开源流处理框架,其核心是用Java和Scala编写的分布式流数据流引擎。本合集提供有关Apache Flink相关技术、使用技巧和最佳实践的资源。
|
4月前
|
SQL 数据采集 数据挖掘
大数据行业应用之Hive数据分析航班线路相关的各项指标
大数据行业应用之Hive数据分析航班线路相关的各项指标
103 1

相关产品

  • 实时计算 Flink版