开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

用flink doris1.1.1的版本 ,可以对doris维表进行 lookup join么?

用flink doris connector 1.1.1的版本 ,在doris 2.0版本上,可以对doris维表进行 lookup join么?

展开
收起
三分钟热度的鱼 2023-11-30 17:22:36 75 0
2 条回答
写回答
取消 提交回答
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看

    根据Flink官方文档和Flink Doris Connector的说明,Flink Doris Connector 1.1.1版本可以与Doris 2.0版本兼容,并支持在Flink中对Doris维表进行lookup join。

    通过使用Flink Doris Connector,您可以将Doris作为Flink的外部系统进行连接,并利用Doris表的数据进行lookup join操作。这样可以在Flink作业中使用Doris表的数据来丰富和补充流数据。例如,将实时事件流与Doris维度表进行关联查询,以获取更多有关事件的信息。

    注意,为了进行lookup join操作,您需要确保正确配置Flink Doris Connector,并正确设置lookup表和主表之间的关联条件。此外,还应注意Flink任务的并行性和资源管理,以优化join操作的性能和稳定性。

    2023-11-30 21:17:23
    赞同 1 展开评论 打赏
  • 十分耕耘,一定会有一分收获!

    楼主你好,看了你的问题,根据阿里云 Flink Doris Connector 1.1.1 的文档,支持对 Doris 维表进行 Lookup Join。但是Doris 2.0 版本中的一些新特性可能会影响到该功能的使用,所以需要在使用时需要进行测试和验证。

    在使用 Flink Doris Connector 进行 Lookup Join 时,需要创建一个维表 TableSource,并在 StreamTableEnvironment 中注册。示例代码如下:

    import org.apache.flink.api.common.typeinfo.Types;
    import org.apache.flink.api.java.typeutils.RowTypeInfo;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.TableSchema;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.table.descriptors.Schema;
    import org.apache.flink.table.types.logical.VarCharType;
    import org.apache.flink.types.Row;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.functions.source.SourceFunction;
    import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
    import org.apache.flink.streaming.api.functions.source.SourceFunction.TimestampedSourceContext;
    import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
    import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
    import org.apache.flink.streaming.api.functions.source.SourceFunction.SourceContext;
    import org.apache.flink.table.api.TableEnvironment;
    import org.apache.flink.table.api.java.BatchTableEnvironment;
    import org.apache.flink.table.api.java.StreamTableEnvironment;
    import org.apache.flink.table.api.java.TableEnvironment;
    import org.apache.flink.table.descriptors.Kafka;
    import org.apache.flink.table.descriptors.Schema;
    import org.apache.flink.table.descriptors.Rowtime;
    import org.apache.flink.table.descriptors.Rowtime;
    import org.apache.flink.table.descriptors.StreamTableDescriptor;
    import org.apache.flink.table.descriptors.StreamTableEnvironmentDescriptor;
    import org.apache.flink.table.descriptors.StreamTableSourceDescriptor;
    import org.apache.flink.table.descriptors.TimedRowtime;
    import org.apache.flink.table.descriptors.TimedRowtime;
    import org.apache.flink.table.sources.LookupableTableSource;
    import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
    import org.apache.flink.table.sources.TableSource;
    import org.apache.flink.table.types.DataType;
    import org.apache.flink.util.CloseableIterator;
    
    import com.alibaba.fastjson.JSONObject;
    import com.google.common.collect.ImmutableList;
    import com.google.common.collect.Lists;
    
    import java.util.HashMap;
    import java.util.List;
    import java.util.Map;
    
    public class DorisLookupJoinExample {
      public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
    
        // 定义流表的 schema
        TableSchema schema = new TableSchema(new String[]{"id", "name", "age"}, new DataType[]{
            DataTypes.INT(),
            DataTypes.STRING(),
            DataTypes.INT()
        });
    
        // 定义一个维表的 schema
        TableSchema lookupSchema = new TableSchema(new String[]{"id", "address"}, new DataType[]{
            DataTypes.INT(),
            DataTypes.STRING()
        });
    
        // 创建一个数据流作为流表
        DataStream<Row> stream = env.addSource(new SourceFunction<Row>() {
          private boolean isRunning = true;
          private int count = 0;
    
          @Override
          public void run(SourceContext<Row> ctx) throws Exception {
            while (isRunning) {
              count++;
              Row row = Row.of(count, "abc", count % 30);
              ctx.collect(row);
              Thread.sleep(1000);
            }
          }
    
          @Override
          public void cancel() {
            isRunning = false;
          }
        }).returns(new RowTypeInfo(schema.getFieldTypes(), schema.getFieldNames()));
    
        // 创建一个维表 TableSource
        DorisTableSource lookupTable = DorisTableSource.builder()
            .withSchema(lookupSchema)
            .fromProperties(props)
            .build();
    
        // 在 TableEnvironment 中注册流表和维表
        tableEnv.registerDataStream("stream_table", stream, "id, name, age");
        tableEnv.registerTableSource("lookup_table", lookupTable);
    
        // 执行 Lookup Join 操作
        Table result = tableEnv.sqlQuery(
            "SELECT id, name, age, address FROM stream_table " +
            "LEFT JOIN lookup_table FOR SYSTEM_TIME AS OF PROCTIME() " +
            "ON stream_table.id = lookup_table.id");
    
        // 输出结果
        result.printSchema();
        result.toAppendStream(Row.class).print();
    
        // 执行 Flink 任务
        env.execute("Doris Lookup Join Example");
      }
    
      public static class DorisTableSource implements LookupableTableSource<Row> {
        private final TableSchema schema;
        private final Map<String, String> properties;
    
        public DorisTableSource(TableSchema schema, Map<String, String> properties) {
          this.schema = schema;
          this.properties = properties;
        }
    
        @Override
        public TableSchema getTableSchema() {
          return schema;
        }
    
        @Override
        public boolean isAsyncEnabled() {
          return false;
        }
    
        @Override
        public DataType getProducedDataType() {
          return DataTypes.createRowType(schema.getFieldDataTypes(), schema.getFieldNames());
        }
    
        @Override
        public void applyLookup(List<Row> rows, LookupContext context, ResultFuture<Row> resultFuture) throws Exception {
          for (Row row : rows) {
            // 从 Doris 维表中查询数据,并将查询结果添加到输出结果中
            // TODO: 实现从 Doris 维表中查询数据的逻辑
            resultFuture.complete(Lists.newArrayList(row));
          }
        }
    
        @Override
        public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
          return LookupRuntimeProvider.of(new DorisLookupFunction(this));
        }
    
        public static DorisTableSourceBuilder builder() {
          return new DorisTableSourceBuilder();
        }
    
        private static class DorisTableSourceBuilder {
          private TableSchema schema;
          private Map<String, String> properties;
    
          public DorisTableSourceBuilder withSchema(TableSchema schema) {
            this.schema = schema;
            return this;
          }
    
          public DorisTableSourceBuilder fromProperties(Map<String, String> properties) {
            this.properties = properties;
            return this;
          }
    
          public DorisTableSource build() {
            return new DorisTableSource(schema, properties);
          }      
        }
      }
    
      public static class DorisLookupFunction extends AsyncTableFunction<Row> {
        private final DorisTableSource tableSource;
    
        public DorisLookupFunction(DorisTableSource tableSource) {
          this.tableSource = tableSource;
        }
    
        public void eval(Integer id, CloseableIterator<Row> resultFuture) {
          // 从 Doris 维表中查询数据,并将查询结果添加到输出结果中
          // TODO: 实现从 Doris 维表中查询数据的逻辑
          resultFuture.close();
        }
      }
    }
    

    所以还是需要你根据实际情况实现 DorisLookupFunction 中的逻辑,从 Doris 维表中查询数据。

    2023-11-30 21:05:28
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 热门讨论

    热门文章

    相关电子书

    更多
    Flink CDC Meetup PPT - 龚中强 立即下载
    Flink CDC Meetup PPT - 王赫 立即下载
    Flink CDC Meetup PPT - 覃立辉 立即下载