版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
在使用 DataStream 方式而非 SQL 方式进行开发时,您需要确保以下关键步骤和配置正确无误。以下是针对 Flink 1.13.6 和 Scala 2.11 版本的详细操作指南:
在开始开发之前,请确保满足以下条件: - 已安装并配置好本地开发环境(如 IntelliJ IDEA)。 - 已准备好 Flink 集群,并确认其版本为 1.13.6。 - 已下载并引入所需的依赖包,包括 Flink DataStream Connector 和其他相关依赖。
在 Maven 项目中,您需要在 pom.xml
文件中添加以下依赖项,以支持 DataStream API 的开发:
<dependencies>
<!-- Flink Core Dependency -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>1.13.6</version>
</dependency>
<!-- Flink DataStream Connector for Hologres -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>ververica-connector-hologres</artifactId>
<version>1.13-vvr-6.0.7</version>
</dependency>
<!-- MySQL CDC Connector (如果需要读取 MySQL 数据) -->
<dependency>
<groupId>com.ververica</groupId>
<artifactId>ververica-connector-mysql-cdc</artifactId>
<version>1.13-vvr-6.0.7</version>
</dependency>
<!-- JDBC Connector (如果需要写入 ClickHouse 或其他数据库) -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc_2.11</artifactId>
<version>1.13.6</version>
</dependency>
</dependencies>
注意:请根据实际使用的连接器类型调整依赖版本号,并确保与 Flink 引擎版本兼容。
使用 DataStream API 读取数据时,可以通过以下方式实现:
Hologres 源表: 使用 HologresBulkreadInputFormat
实现类读取 Hologres 表数据。示例代码如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
HologresBulkreadInputFormat inputFormat = new HologresBulkreadInputFormat(
"jdbc:hologres://<endpoint>:<port>/<dbname>",
"<username>",
"<password>",
"<tablename>"
);
DataStream<Row> sourceStream = env.createInput(inputFormat);
MySQL CDC 数据源: 如果需要从 MySQL 数据库读取 Binlog 数据,可以使用 MySqlSource
,并显式配置唯一的 server-id
:
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("<hostname>")
.port(3306)
.databaseList("<database>")
.tableList("<table>")
.username("<username>")
.password("<password>")
.deserializer(new JsonDebeziumDeserializationSchema())
.serverId("56000") // 必须为每个作业配置唯一值
.build();
DataStream<String> mysqlStream = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
在 DataStream 中,您可以使用 KeyedProcessFunction
或 ProcessFunction
对数据进行自定义处理。例如:
sourceStream
.keyBy(row -> row.getField(0)) // 按照指定字段分组
.process(new KeyedProcessFunction<Object, Row, String>() {
@Override
public void processElement(Row value, Context ctx, Collector<String> out) {
// 自定义处理逻辑
out.collect(value.toString());
}
});
重要提示:避免在 processElement
和 onTimer
方法中重复注册 Timer,否则可能导致 Timer 数量急剧膨胀。
通过 DataStream API 写入 Hologres 时,可以使用 HologresSink
:
HologresSink<Row> hologresSink = HologresSink.builder()
.setEndpoint("<endpoint>")
.setDbName("<dbname>")
.setTableName("<tablename>")
.setUsername("<username>")
.setPassword("<password>")
.build();
sourceStream.addSink(hologresSink);
如果需要将数据写入 ClickHouse,可以使用 JdbcSink
,并通过批量插入提高性能:
class CkSinkBuilder implements JdbcStatementBuilder<Tuple3<String, Long, Float>> {
@Override
public void accept(PreparedStatement ps, Tuple3<String, Long, Float> value) throws SQLException {
ps.setString(1, value.f0);
ps.setLong(2, value.f1);
ps.setFloat(3, value.f2);
}
}
JdbcSink.sink(
"INSERT INTO clickhouse_table (col1, col2, col3) VALUES (?, ?, ?)",
new CkSinkBuilder(),
JdbcExecutionOptions.builder().withBatchSize(1000).build(),
new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
.withUrl("jdbc:clickhouse://<host>:8123/<database>")
.withDriverName("ru.yandex.clickhouse.ClickHouseDriver")
.withUsername("<username>")
.withPassword("<password>")
.build()
);
在本地调试时,请确保使用正确的 Uber JAR 文件(如 ververica-connector-hologres-1.13-vvr-6.0.7-uber.jar
),并将其添加到项目的 classpath 中。
mvn clean package -DskipTests
问题1:运行时找不到连接器相关类。
问题2:缺少常见的 Flink 类。
pom.xml
中添加相关依赖,或在 IntelliJ IDEA 中勾选“Add dependencies with provided scope to classpath”。问题3:运行中报错“Incompatible magic value”。
通过以上步骤,您可以成功使用 DataStream 方式开发并运行 Flink 作业。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。