问题一:不能实时读取实时写入到 Hive 的数据
试验了一下 Flink-1.11 hive streaming 的功能
https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/hive/hive_streaming.html
创建 kafka 表,通过 SQL 实时写入 Hive.
但我再通过 flink sql-client 客户端 select * from hive_table 客户端没有任何返回,通过 flink webUI 页面观察 这个 select * from hive_table 的 job 已经结束了。
*来自志愿者整理的flink邮件归档
参考答案:
你开启了 streaming-source.enable 吗?这个参数用于指定如何读取是batch读,还是stream读,如果你要实时读的话应该把这个值设定为true, 可以使用tablehints 方便地指定参数。
SELECT * FROM hive_table /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */;
就在你看得这个页面应该有对应的文档说明如何读取hive数据。*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370205?spm=a2c6h.12873639.article-detail.83.6f9243783Lv0fl
问题二:flink cep 如何处理超时事件?
我有个用户开户超时断点的场景。调研了一下,想通过flink cep 来实现。
但是我定义pattern 后发现,我的这个没办法在一条事件数据上完成判定。必须借助和上一事件数据比较之后判断是不是超时。
想知道该如何定义pattern 能够,取到排序之后前后两个两个事件。
*来自志愿者整理的flink邮件归档
参考答案:
flink使用event time,然后类似下面这样可以吗?
Pattern.begin("a").next("b").within(Time.minutes(1));
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370204?spm=a2c6h.12873639.article-detail.84.6f9243783Lv0fl
问题三:flink 1.11运算结果存mysql出错
各位大佬好,请教一个问题flink从Kafka读数,写入mysql,程序没有报错,但是没有写入mysql任何数据。代码如下,是在linux下,直接python *.py执行的。完整代码如下
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic, CheckpointingMode
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
source="""
CREATE TABLE kafka_source_tab (
id VARCHAR,
alarm_id VARCHAR,
trck_id VARCHAR
) WITH (
'connector' = 'kafka',
'topic' = 'alarm_test_g',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = '10.2.2.73:2181',
'properties.bootstrap.servers' = '10.2.2.73:9092',
'format' = 'json'
)
"""
sink="""
CREATE TABLE g_source_tab (
id VARCHAR,
alarm_id VARCHAR,
trck_id VARCHAR
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://10.2.2.77:3306/bdt?useSSL=false',
'table-name' = 'g',
'username' = 'root',
'password' = '123456t',
'sink.buffer-flush.interval' = '1s'
)
"""
env = StreamExecutionEnvironment.get_execution_environment()
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
env.set_parallelism(1)
env_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
t_env = StreamTableEnvironment.create(env, environment_settings=env_settings)
t_env.execute_sql(source)
t_env.execute_sql(sink)
source = t_env.from_path("kafka_source_tab")\
.select("id,alarm_id,trck_id")
source.execute_insert("g_source_tab")
*来自志愿者整理的flink邮件归档
参考答案:
简单看了下代码应该没啥问题,alarm_test_g 这个kafka topic里有数据吗?可以检查下是否有脏数据,直接用./bin/kafka-console-consumer.sh 检查下?我有点怀疑这点
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370203?spm=a2c6h.12873639.article-detail.85.6f9243783Lv0fl
问题四:flink on yarn日志问题
请问一下两个问题 1 flink on yarn的时候 taskmanager 挂掉的时候 上面的日志会被删除掉 无法查看 ,除了使用es收集日志的这种方案, 还有没有可以使taskmanager 挂掉,相关日志仍然可以保留。 2 flink on yarn模式 当由于错误导致taskmanager 挂掉,但是jobmanager 却一直存在, 有没有好的方式或者策略 , 可以是当task失败 达到重试次数之后 taskmanager挂掉,jobmanager也挂掉
*来自志愿者整理的flink邮件归档
参考答案:
第一个问题,您可以尝试开启Yarn的日志收集功能[1]
第二个问题,您可以尝试一下per-job mode [2][3]
[1] https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/yarn_setup.html#log-files
[2] https://ci.apache.org/projects/flink/flink-docs-master/zh/ops/deployment/#per-job-mode
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370202?spm=a2c6h.12873639.article-detail.86.6f9243783Lv0fl
问题五:[flink-sql] 如何在sql运行时动态修改kafka的scan.startup.mode
现在有个需求,就是一段用sql-client提交的sql任务需要动态修改kafka的scan.startup.mode,以支持不同的消费需求。请问有什么好的办法吗?
谢谢
*来自志愿者整理的flink邮件归档
参考答案:
可以尝试下1.11中引入的LIKE[1]或者是Table Hint[2]
[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/create.html#create-table
[2]
https://ci.apache.org/projects/flink/flink-docs-master/dev/table/sql/hints.html
*来自志愿者整理的flink邮件归档
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/370201?spm=a2c6h.12873639.article-detail.87.6f9243783Lv0fl