问题一:flink1.11 web ui没有DAG
本地linux下单机版安装的,提交flink代码运行后,正常运行,有日志,但是为啥UI上面却不显示数据接收和发送的条数,求大佬解答
*来自志愿者整理的flink邮件归档
参考答案:
这边说的 UI 上不显示数据接受和发送的条数,能否截图发一下,这样大家能更好的理解这个问题。另外 flink 作业有数据输入和处理吗?
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370326?spm=a2c6h.12873639.article-detail.32.29d04378ApxdqJ
问题二:flink stream如何为每条数据生成自增主键
flink stream如何为每条数据生成自增主键??时间戳貌似不行,同一时间戳可能会产生多条数据,无法区分数据的现后顺序。
*来自志愿者整理的flink邮件归档
参考答案:
你是希望每条数据有一个 id,这个 id 是随着数据递增的是啊?或许你可以使用 RichMapFunction[1] 来做这个事情,在每次
mapFunction 中把自定的 id 加进去,然后这个 id 还可以保存到 state 中,这样就算作业 failover 了,自增 id
也不会有问题。
[1]
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370324?spm=a2c6h.12873639.article-detail.33.29d04378ApxdqJ
问题三:flink 1.9.2 升级 1.10.0 任务失败不能从checkpoint恢复
当升级到1.10.0 时候,程序出错后会尝试从checkpoint恢复,但是总是失败,提示
Caused by: java.nio.file.NoSuchFileException: /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/db/000009.sst -> /data/hadoop/yarn/local/usercache/hdfs/appcache/application_1589438582606_30760/flink-io-26af2be2-2b14-4eab-90d8-9ebb32ace6e3/job_6b6cacb02824b8521808381113f57eff_op_StreamGroupedReduce_54cc3719665e6629c9000e9308537a5e__1_1__uuid_afda2b8b-0b79-449e-88b5-c34c27c1a079/8f609663-4fbb-483f-83c0-de04654310f7/000009.sst
配置和1.9.2 一样: state.backend: rocksdb state.checkpoints.dir: hdfs:///flink/checkpoints/wc/ state.savepoints.dir: hdfs:///flink/savepoints/wc/ state.backend.incremental: true
代码上都有
env.enableCheckpointing(10000); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS)));
是1.10.0 需要做什么特别配置么?
*来自志愿者整理的flink邮件归档
参考答案:
你的异常应该是从增量Checkpoint恢复时,文件已经下载到本地了,做硬链时[1],发现源文件不见了,有很大的可能是当时发生了异常,导致restore流程退出了,所以这个问题应该不是root cause。
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370322?spm=a2c6h.12873639.article-detail.34.29d04378ApxdqJ
问题四:flink 1.11 使用sql写入hdfs无法自动提交分区
我在用flink 1.11 的sql从kafka消费然后写入hdfs的过程中,发现没法自动提交分区,请问这个是什么原因呢?谢谢
我的checkpoint设置了间隔10s,对于如下的配置,正常应该是每隔10在hdfs相应的分区下会有_SUCCESS文件,但是实际上过了好久也没有,ORC格式的结果数据是正常写入了。
public static void main(String[] args) throws Exception{ StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment(); bsEnv.enableCheckpointing(10000); bsEnv.setParallelism(1); StreamTableEnvironment tEnv = StreamTableEnvironment.create(bsEnv);
String sqlSource = "CREATE TABLE source_kafka (\n" + " appName STRING,\n" + " appVersion STRING,\n" + " uploadTime STRING\n" + ") WITH (\n" + " 'connector.type' = 'kafka', \n" + " 'connector.version' = '0.10',\n" + " 'connector.topic' = 'mytest',\n" + " 'connector.properties.zookeeper.connect' = 'localhost:2181',\n" + " 'connector.properties.bootstrap.servers' = 'localhost:9092',\n" + " 'connector.properties.group.id' = 'testGroup',\n" + " 'format.type'='json',\n" + " 'update-mode' = 'append' )";
tEnv.executeSql(sqlSource);
String sql = "CREATE TABLE fs_table (\n" + " appName STRING,\n" + " appVersion STRING,\n" + " uploadTime STRING,\n" + " dt STRING," + " h string" + ") PARTITIONED BY (dt,h) WITH (\n" + " 'connector'='filesystem',\n" + " 'path'='hdfs://localhost/tmp/',\n" + " 'sink.partition-commit.policy.kind' = 'success-file', " + " 'format'='orc'\n" + ")"; tEnv.executeSql(sql);
String insertSql = "insert into fs_table SELECT appName ,appVersion,uploadTime, " + " DATE_FORMAT(LOCALTIMESTAMP, 'yyyy-MM-dd'), DATE_FORMAT(LOCALTIMESTAMP, 'HH') FROM source_kafka";
tEnv.executeSql(insertSql);
}
*来自志愿者整理的flink邮件归档
参考答案:
默认情况下,对ORC来说,理论上一旦有正式数据文件的生成,就会有对应SUCCESS文件产生,你是怎么确认没有SUCCESS文件的呢?
我用同样SQL在我的环境是有的。
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370320?spm=a2c6h.12873639.article-detail.35.29d04378ApxdqJ
问题五:flink1.11 sql
flink支持配置hive方言,那么flink可以直接使用hive内自定义的udf、udtf函数吗
*来自志愿者整理的flink邮件归档
参考答案:
我在使用pyflink1.11过程中,使用flinksql维表时声明了主键primary key 但是还是会报错说我没有用声明主键,另外,当我使用inner join代替left join就不会有这个问题,请问这是什么问题
下面我附录了报错信息和代码。谢谢!
报错附录
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 147, in deco
return f(*a, **kw)
File "/usr/local/lib/python3.7/site-packages/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o5.execute.
: org.apache.flink.table.api.TableException: UpsertStreamTableSink requires that Table has a full primary keys if it is updated.
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:93)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlanInternal(StreamExecLegacySink.scala:48)
at org.apache.flink.table.planner.plan.nodes.exec.ExecNode$class.translateToPlan(ExecNode.scala:58)
at org.apache.flink.table.planner.plan.nodes.physical.stream.StreamExecLegacySink.translateToPlan(StreamExecLegacySink.scala:48)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:67)
at org.apache.flink.table.planner.delegation.StreamPlanner$$anonfun$translateToPlan$1.apply(StreamPlanner.scala:66)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
at scala.collection.Iterator$class.foreach(Iterator.scala:891)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
at scala.collection.AbstractTraversable.map(Traversable.scala:104)
at org.apache.flink.table.planner.delegation.StreamPlanner.translateToPlan(StreamPlanner.scala:66)
at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:166)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1248)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.translateAndClearBuffer(TableEnvironmentImpl.java:1240)
at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1197)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "mysql_join.py", line 90, in
from_kafka_to_kafka_demo()
File "mysql_join.py", line 22, in from_kafka_to_kafka_demo
st_env.execute("2-from_kafka_to_kafka")
File "/usr/local/lib/python3.7/site-packages/pyflink/table/table_environment.py", line 1057, in execute
return JobExecutionResult(self._j_tenv.execute(job_name))
File "/usr/local/lib/python3.7/site-packages/py4j/java_gateway.py", line 1286, in call
answer, self.gateway_client, self.target_id, self.name)
File "/usr/local/lib/python3.7/site-packages/pyflink/util/exceptions.py", line 154, in deco
raise exception_mapping[exception](s.split(': ', 1)[1], stack_trace)
pyflink.util.exceptions.TableException: 'UpsertStreamTableSink requires that Table has a full primary keys if it is updated.'
代码附录
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings,DataTypes, CsvTableSource, CsvTableSink
from pyflink.table.descriptors import Schema, Kafka, Json, Rowtime
from pyflink.table.window import Tumble
def from_kafka_to_kafka_demo():
use blink table planner
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env_settings = EnvironmentSettings.Builder().use_blink_planner().build()
st_env = StreamTableEnvironment.create(stream_execution_environment=env,environment_settings=env_settings)
register source and sink
register_rides_source(st_env)
register_rides_sink(st_env)
register_mysql_source(st_env)
st_env.sql_update("insert into flink_result select cast(t1.id as int) as id,cast(t2.type as varchar),cast( t1.time1 as bigint) as rowtime from source1 t1 left join dim_mysql t2 on t1.type=cast(t2.id as varchar) ")
st_env.execute("2-from_kafka_to_kafka")
def register_rides_source(st_env):
source_ddl = \
"""
create table source1(
id int,
time1 varchar ,
type string
) with (
'connector.type' = 'kafka',
'connector.topic' = 'tp1',
'connector.startup-mode' = 'latest-offset',
'connector.properties.bootstrap.servers' = 'localhost:9092',
'connector.properties.zookeeper.connect' = 'localhost:2181',
'format.type' = 'json',
'connector.version' = 'universal',
'update-mode' = 'append'
)
"""
st_env.sql_update(source_ddl)
def register_mysql_source(st_env):
source_ddl = \
"""
CREATE TABLE dim_mysql (
id int, --
type varchar --
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3390/test',
'connector.table' = 'flink_test',
'connector.driver' = 'com.mysql.cj.jdbc.Driver',
'connector.username' = '***',
'connector.password' = '***',
'connector.lookup.cache.max-rows' = '5000',
'connector.lookup.cache.ttl' = '1min',
'connector.lookup.max-retries' = '3'
)
"""
st_env.sql_update(source_ddl)
def register_rides_sink(st_env):
sink_ddl = \
"""
CREATE TABLE flink_result (
id int,
type varchar,
rtime bigint,
primary key(id) NOT ENFORCED
) WITH (
'connector.type' = 'jdbc',
'connector.url' = 'jdbc:mysql://localhost:3390/test',
'connector.table' = 'flink_result',
'connector.driver' = 'com.mysql.cj.jdbc.Driver',
'connector.username' = '***',
'connector.password' = '***',
'connector.write.flush.max-rows' = '5000',
'connector.write.flush.interval' = '2s',
'connector.write.max-retries' = '3'
)
"""
st_env.sql_update(sink_ddl)
if name == 'main':
from_kafka_to_kafka_demo()
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370318?spm=a2c6h.12873639.article-detail.36.29d04378ApxdqJ