问题一:flink cdc 在从Oracle拉取数据的时候造成Oracle归档日志暴涨,是什么原因?
flink cdc 在从Oracle拉取数据的时候造成Oracle归档日志暴涨,是什么原因?
参考答案:
因为flink cdc会捕获所有数据的操作,比如insert、update、delete等等,这些操作会被记录在Oracle的归档日志中。特别是对于频繁更新的操作,每次更新会被表示为一条update_before和update_after记录,这会增加归档日志的数量。
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/617643
问题二:Flink为什么两套api的算子不能同时渲染?
Flink为什么两套api的算子不能同时渲染?
参考答案:
StreamExecutionEnvironment.exec调用了吗
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/617153
问题三:Flink有个需求是我们写了一个Function,在哪看指标信息呢控制台里面没有看见?
Flink有个需求是我们写了一个Function,暂且叫RuleFuntion,这个是过规则的,然后一条数据循环过几千个规则,现在我想记录每个规则的耗时,然后存入到MySQL中,现在的问题是,如果我直接在Function中使用单例连接MySQL,定期写入MySQL的话,我觉得不够优雅我想试试使用指标上报metrics.reporter,看着有如下的指标上报想调研一下我们没有普罗米修斯,所以想试试jmx或者 日志slf4j或者http调用我们的服务先试试jmx,然后代码如下这个运行起来,在哪看指标信息呢控制台里面没有看见?
参考答案:
对于流计算来说,你这应该以流的思想来处理这个问题,把你想记录的东西直接推消息队列,下游消费入库持久化就可以了,做存算分离,首先你想用暴露指标的方式来做,消耗的资源一点不少,而且不好协调,会消耗整个集群的性能和内存,如果把这个指标数据扔进kafka那么压力由kafka来承受,也就是所谓的,普罗的指标暴露是flink源码实现的,定期推送给pushgateway,再由pushgateway推送到普罗的时序数据库
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/617148
问题四:Flink中mongo 的connection 必须指定 有什么方法像 jdbc哪种的吗 ?
Flink中mongo 的connection 必须指定 conn.agg() conn.find() 才能执行并返回对应结果 有什么方法像 jdbc哪种的吗 conn.excute( sql) 的方式 达成这种通用查询的方式吗?
类似这种 可以随意写然后就统一执行 目前java api 只能先指定查询方式
参考答案:
在Flink中,连接MongoDB时需要指定MongoDB的连接信息,包括主机名、端口号、数据库名称等。与JDBC类似,Flink提供了MongoDB Connector来连接和操作MongoDB数据库。
以下是使用Flink MongoDB Connector连接MongoDB的示例代码:
java
复制代码运行
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.connectors.mongodb.MongoSink;
import org.apache.flink.streaming.connectors.mongodb.MongoSource;
import org.apache.flink.streaming.connectors.mongodb.config.MongoConfigUtil;
import org.apache.flink.streaming.connectors.mongodb.config.MongoConnectionOptions;
import org.apache.flink.streaming.connectors.mongodb.config.WriteConcern;
import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
// 创建MongoDB连接配置
MongoConnectionOptions connectionOptions = MongoConfigUtil.createConnectionOptions(
"mongodb://localhost:27017", // MongoDB连接字符串
"myDatabase", // 数据库名称
null, // 用户名(可选)
null // 密码(可选)
);
// 创建MongoDB源
MongoSource mongoSource = MongoSource.builder()
.setCollection("myCollection") // 集合名称
.setConnectionOptions(connectionOptions)
.setDeserializer(new SimpleStringSchema()) // 反序列化器
.build();
// 创建MongoDB目标
MongoSink mongoSink = MongoSink.builder()
.setConnectionOptions(connectionOptions)
.setCollectionName("myCollection") // 集合名称
.setWriteConcern(WriteConcern.UNACKNOWLEDGED) // 写入策略
.setSerializer(new KeyedSerializationSchemaWrapper<>(new SimpleStringSchema())) // 序列化器
.build();
上述代码中,我们首先创建了一个MongoConnectionOptions对象,用于指定MongoDB的连接信息。然后,通过MongoSource和MongoSink类分别创建了MongoDB的源和目标。在源中,我们使用了SimpleStringSchema作为反序列化器,将读取到的数据转换为Java字符串类型。在目标中,我们使用了KeyedSerializationSchemaWrapper包装了SimpleStringSchema作为序列化器,将数据转换为适合写入MongoDB的格式。
请注意,上述代码仅为示例,您需要根据实际情况进行适当的修改和调整。另外,确保您的项目中包含了Flink MongoDB Connector的相关依赖项。
关于本问题的更多回答可点击进行查看:
https://developer.aliyun.com/ask/616496
问题五:Flink如果改成一个job里面只有一个insert into的语句执行?
"Flink如果改成一个job里面只有一个insert into的语句执行,是可以顺利恢复的,
有多个语句就不能恢复了,但是恢复时只执行第一个语句是没有问题的。
现在我是一个job里面多个insert 任务嘛,请问我要插入多个宽表的话,如何在一个job里面的一个任务里面完成?
参考答案:
修改了代码的话就要加上--allowNonRestoredState这个参数,否则无法从ck或者sp重启的
关于本问题的更多回答可点击进行查看: