开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

请教下目前 flink-sql,在写入sink的时候能够按主键去shuffle吗?需要配置相关东西吗

请教下目前 flink-sql,在写入sink的时候能够按主键去shuffle吗?需要配置相关东西吗

展开
收起
游客3oewgrzrf6o5c 2022-07-12 10:49:44 269 0
1 条回答
写回答
取消 提交回答
  • 十分耕耘,一定会有一分收获!

    楼主你好,在阿里云Flink SQL中,可以通过使用Flink Table API或者Flink SQL语句来实现按主键进行Shuffle写入Sink的操作。具体实现方法如下:

    1. 使用Flink Table API实现按主键进行Shuffle写入Sink:
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.EnvironmentSettings;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.TableConfig;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.table.descriptors.ConnectorDescriptor;
    import org.apache.flink.table.descriptors.FileSystem;
    import org.apache.flink.table.descriptors.Schema;
    import org.apache.flink.table.descriptors.StreamTableDescriptor;
    
    public class ShuffleSinkDemo {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, settings);
            TableConfig tableConfig = tableEnv.getConfig().getConfiguration();
            tableConfig.setBoolean("table.exec.emit.early-fire.enabled", true);
            tableConfig.setBoolean("table.exec.emit.early-fire.allow-late-fire", true);
    
            String ddl = "CREATE TABLE source_table (\n" +
                    "  id BIGINT,\n" +
                    "  name STRING,\n" +
                    "  PRIMARY KEY (id) NOT ENFORCED\n" +
                    ") WITH (\n" +
                    "  'connector' = 'filesystem',\n" +
                    "  'path' = '/path/to/source_table',\n" +
                    "  'format' = 'csv'\n" +
                    ")";
    
            StreamTableDescriptor tableDescriptor = tableEnv.sqlQuery(ddl).getTableDescriptor();
            ConnectorDescriptor connectorDescriptor = new FileSystem().path("/path/to/sink_table");
            tableDescriptor.withFormat(new Csv().fieldDelimiter(",")).withSchema(new Schema()
                    .field("id", DataTypes.BIGINT())
                    .field("name", DataTypes.STRING()))
                    .withConnector(connectorDescriptor)
                    .inAppendMode()
                    .createTemporaryTable("sink_table");
    
            Table table = tableEnv.sqlQuery("SELECT id, name FROM source_table");
            tableEnv.insertInto(table, "sink_table").execute().await();
        }
    }
    

    在以上示例中,创建了一个source_table和一个sink_table,并且指定了source_table的主键为id。在向sink_table写入数据时,Flink会自动按照id进行Shuffle操作,从而保证相同id的数据被写入到同一个节点上。

    1. 使用Flink SQL语句实现按主键进行Shuffle写入Sink:
    CREATE TABLE source_table (
      id BIGINT,
      name STRING,
      PRIMARY KEY (id) NOT ENFORCED
    ) WITH (
      'connector' = 'filesystem',
      'path' = '/path/to/source_table',
      'format' = 'csv'
    );
    
    CREATE TABLE sink_table (
      id BIGINT,
      name STRING
    ) WITH (
      'connector' = 'filesystem',
      'path' = '/path/to/sink_table',
      'format' = 'csv'
    );
    
    INSERT INTO sink_table
    SELECT id, name FROM source_table;
    

    在以上示例中,创建了一个source_table和一个sink_table,并且指定了source_table的主键为id。在向sink_table写入数据时,Flink会自动按照id进行Shuffle操作,从而保证相同id的数据被写入到同一个节点上。

    需要注意的是,为了实现按主键进行Shuffle写入Sink的操作,需要确保source_table和sink_table都有主键,并且主键的值在相同节点上。如果主键分布不均匀,可能会导致数据倾斜和性能问题。同时,也需要根据具体的业务需求和数据特性,合理选择Shuffle的键和分区规则,以达到最优的性能和效率。

    2023-07-23 13:00:06
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关产品

  • 实时计算 Flink版
  • 相关电子书

    更多
    SQL Server在电子商务中的应用与实践 立即下载
    GeoMesa on Spark SQL 立即下载
    原生SQL on Hadoop引擎- Apache HAWQ 2.x最新技术解密malili 立即下载