用flink doris connector 1.1.1的版本 ,在doris 2.0版本上,可以对doris维表进行 lookup join么?
根据Flink官方文档和Flink Doris Connector的说明,Flink Doris Connector 1.1.1版本可以与Doris 2.0版本兼容,并支持在Flink中对Doris维表进行lookup join。
通过使用Flink Doris Connector,您可以将Doris作为Flink的外部系统进行连接,并利用Doris表的数据进行lookup join操作。这样可以在Flink作业中使用Doris表的数据来丰富和补充流数据。例如,将实时事件流与Doris维度表进行关联查询,以获取更多有关事件的信息。
注意,为了进行lookup join操作,您需要确保正确配置Flink Doris Connector,并正确设置lookup表和主表之间的关联条件。此外,还应注意Flink任务的并行性和资源管理,以优化join操作的性能和稳定性。
楼主你好,看了你的问题,根据阿里云 Flink Doris Connector 1.1.1 的文档,支持对 Doris 维表进行 Lookup Join。但是Doris 2.0 版本中的一些新特性可能会影响到该功能的使用,所以需要在使用时需要进行测试和验证。
在使用 Flink Doris Connector 进行 Lookup Join 时,需要创建一个维表 TableSource
,并在 StreamTableEnvironment
中注册。示例代码如下:
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.types.Row;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.streaming.api.functions.source.SourceFunction.TimestampedSourceContext;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.java.BatchTableEnvironment;
import org.apache.flink.table.api.java.StreamTableEnvironment;
import org.apache.flink.table.api.java.TableEnvironment;
import org.apache.flink.table.descriptors.Kafka;
import org.apache.flink.table.descriptors.Schema;
import org.apache.flink.table.descriptors.Rowtime;
import org.apache.flink.table.descriptors.Rowtime;
import org.apache.flink.table.descriptors.StreamTableDescriptor;
import org.apache.flink.table.descriptors.StreamTableEnvironmentDescriptor;
import org.apache.flink.table.descriptors.StreamTableSourceDescriptor;
import org.apache.flink.table.descriptors.TimedRowtime;
import org.apache.flink.table.descriptors.TimedRowtime;
import org.apache.flink.table.sources.LookupableTableSource;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.TableSource;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.CloseableIterator;
import com.alibaba.fastjson.JSONObject;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
public class DorisLookupJoinExample {
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// 定义流表的 schema
TableSchema schema = new TableSchema(new String[]{"id", "name", "age"}, new DataType[]{
DataTypes.INT(),
DataTypes.STRING(),
DataTypes.INT()
});
// 定义一个维表的 schema
TableSchema lookupSchema = new TableSchema(new String[]{"id", "address"}, new DataType[]{
DataTypes.INT(),
DataTypes.STRING()
});
// 创建一个数据流作为流表
DataStream<Row> stream = env.addSource(new SourceFunction<Row>() {
private boolean isRunning = true;
private int count = 0;
@Override
public void run(SourceContext<Row> ctx) throws Exception {
while (isRunning) {
count++;
Row row = Row.of(count, "abc", count % 30);
ctx.collect(row);
Thread.sleep(1000);
}
}
@Override
public void cancel() {
isRunning = false;
}
}).returns(new RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames()));
// 创建一个维表 TableSource
DorisTableSource lookupTable = DorisTableSource.builder()
.withSchema(lookupSchema)
.fromProperties(props)
.build();
// 在 TableEnvironment 中注册流表和维表
tableEnv.registerDataStream("stream_table", stream, "id, name, age");
tableEnv.registerTableSource("lookup_table", lookupTable);
// 执行 Lookup Join 操作
Table result = tableEnv.sqlQuery(
"SELECT id, name, age, address FROM stream_table " +
"LEFT JOIN lookup_table FOR SYSTEM_TIME AS OF PROCTIME() " +
"ON stream_table.id = lookup_table.id");
// 输出结果
result.printSchema();
result.toAppendStream(Row.class).print();
// 执行 Flink 任务
env.execute("Doris Lookup Join Example");
}
public static class DorisTableSource implements LookupableTableSource<Row> {
private final TableSchema schema;
private final Map<String, String> properties;
public DorisTableSource(TableSchema schema, Map<String, String> properties) {
this.schema = schema;
this.properties = properties;
}
@Override
public TableSchema getTableSchema() {
return schema;
}
@Override
public boolean isAsyncEnabled() {
return false;
}
@Override
public DataType getProducedDataType() {
return DataTypes.createRowType(schema.getFieldDataTypes(), schema.getFieldNames());
}
@Override
public void applyLookup(List<Row> rows, LookupContext context, ResultFuture<Row> resultFuture) throws Exception {
for (Row row : rows) {
// 从 Doris 维表中查询数据,并将查询结果添加到输出结果中
// TODO: 实现从 Doris 维表中查询数据的逻辑
resultFuture.complete(Lists.newArrayList(row));
}
}
@Override
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
return LookupRuntimeProvider.of(new DorisLookupFunction(this));
}
public static DorisTableSourceBuilder builder() {
return new DorisTableSourceBuilder();
}
private static class DorisTableSourceBuilder {
private TableSchema schema;
private Map<String, String> properties;
public DorisTableSourceBuilder withSchema(TableSchema schema) {
this.schema = schema;
return this;
}
public DorisTableSourceBuilder fromProperties(Map<String, String> properties) {
this.properties = properties;
return this;
}
public DorisTableSource build() {
return new DorisTableSource(schema, properties);
}
}
}
public static class DorisLookupFunction extends AsyncTableFunction<Row> {
private final DorisTableSource tableSource;
public DorisLookupFunction(DorisTableSource tableSource) {
this.tableSource = tableSource;
}
public void eval(Integer id, CloseableIterator<Row> resultFuture) {
// 从 Doris 维表中查询数据,并将查询结果添加到输出结果中
// TODO: 实现从 Doris 维表中查询数据的逻辑
resultFuture.close();
}
}
}
所以还是需要你根据实际情况实现 DorisLookupFunction
中的逻辑,从 Doris 维表中查询数据。
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。
实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。