开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

请问做过sqlserver的批量写入不?

请问做过sqlserver的批量写入不?

展开
收起
十一0204 2023-04-05 10:14:10 241 0
1 条回答
写回答
取消 提交回答
  • 可以参考MySQLDialect及flinkStreamSql 实现SqlServerDialect。

    package org.apache.flink.connector.jdbc.dialect;
    
    /**
     * SqlServerDialect
     *
     * @author zhanjian@pcuuu.com
     * @date 2021/4/20 10:13
     */
    public class SqlServerDialect extends AbstractDialect {
    
        private static final long serialVersionUID = 1L;
    
        // Define MAX/MIN precision of TIMESTAMP type according to Mysql docs:
        // https://dev.mysql.com/doc/refman/8.0/en/fractional-seconds.html
        private static final int MAX_TIMESTAMP_PRECISION = 6;
        private static final int MIN_TIMESTAMP_PRECISION = 1;
    
        // Define MAX/MIN precision of DECIMAL type according to Mysql docs:
        // https://dev.mysql.com/doc/refman/8.0/en/fixed-point-types.html
        private static final int MAX_DECIMAL_PRECISION = 65;
        private static final int MIN_DECIMAL_PRECISION = 1;
    
        @Override
        public String dialectName() {
            return "SqlServer";
        }
    
        @Override
        public boolean canHandle(String url) {
            return url.startsWith("jdbc:jtds:");
        }
    
        @Override
        public JdbcRowConverter getRowConverter(RowType rowType) {
            return new SqlServerConverter(rowType);
        }
    
        @Override
        public Optional<String> defaultDriverName() {
            return Optional.of("net.sourceforge.jtds.jdbc.Driver");
        }
    
        @Override
        public String quoteIdentifier(String identifier) {
            return identifier;
        }
    
        @Override
        public Optional<String> getUpsertStatement(
                String tableName, String[] fieldNames, String[] uniqueKeyFields) {
    
            StringBuilder sb = new StringBuilder();
            sb.append(
                    "MERGE INTO "
                            + tableName
                            + " T1 USING "
                            + "("
                            + buildDualQueryStatement(fieldNames)
                            + ") T2 ON ("
                            + buildConnectionConditions(uniqueKeyFields)
                            + ") ");
    
            String updateSql = buildUpdateConnection(fieldNames, uniqueKeyFields, true);
    
            if (StringUtils.isNotEmpty(updateSql)) {
                sb.append(" WHEN MATCHED THEN UPDATE SET ");
                sb.append(updateSql);
            }
    
            sb.append(
                    " WHEN NOT MATCHED THEN "
                            + "INSERT ("
                            + Arrays.stream(fieldNames)
                                    .map(this::quoteIdentifier)
                                    .collect(Collectors.joining(","))
                            + ") VALUES ("
                            + Arrays.stream(fieldNames)
                                    .map(col -> "T2." + quoteIdentifier(col))
                                    .collect(Collectors.joining(","))
                            + ")");
            sb.append(";");
            return Optional.of(sb.toString());
        }
    
        /**
         * build T1."A"=T2."A" or T1."A"=nvl(T2."A",T1."A")
         *
         * @param fieldNames
         * @param uniqueKeyFields
         * @param allReplace
         * @return
         */
        private String buildUpdateConnection(
                String[] fieldNames, String[] uniqueKeyFields, boolean allReplace) {
            List<String> uniqueKeyList = Arrays.asList(uniqueKeyFields);
            return Arrays.stream(fieldNames)
                    .filter(col -> !uniqueKeyList.contains(col))
                    .map(
                            col -> {
                                return allReplace
                                        ? quoteIdentifier("T1")
                                                + "."
                                                + quoteIdentifier(col)
                                                + " ="
                                                + quoteIdentifier("T2")
                                                + "."
                                                + quoteIdentifier(col)
                                        : quoteIdentifier("T1")
                                                + "."
                                                + quoteIdentifier(col)
                                                + " =ISNULL("
                                                + quoteIdentifier("T2")
                                                + "."
                                                + quoteIdentifier(col)
                                                + ","
                                                + quoteIdentifier("T1")
                                                + "."
                                                + quoteIdentifier(col)
                                                + ")";
                            })
                    .collect(Collectors.joining(","));
        }
    
        private String buildConnectionConditions(String[] uniqueKeyFields) {
            return Arrays.stream(uniqueKeyFields)
                    .map(col -> "T1." + quoteIdentifier(col) + "=T2." + quoteIdentifier(col))
                    .collect(Collectors.joining(","));
        }
    
        /**
         * build select sql , such as (SELECT ? "A",? "B" FROM DUAL)
         *
         * @param column destination column
         * @return
         */
        public String buildDualQueryStatement(String[] column) {
            StringBuilder sb = new StringBuilder("SELECT ");
            String collect =
                    Arrays.stream(column)
                            .map(col -> ":" + quoteIdentifier(col) + " " + quoteIdentifier(col))
                            .collect(Collectors.joining(", "));
            sb.append(collect);
            return sb.toString();
        }
    
        @Override
        public int maxDecimalPrecision() {
            return MAX_DECIMAL_PRECISION;
        }
    
        @Override
        public int minDecimalPrecision() {
            return MIN_DECIMAL_PRECISION;
        }
    
        @Override
        public int maxTimestampPrecision() {
            return MAX_TIMESTAMP_PRECISION;
        }
    
        @Override
        public int minTimestampPrecision() {
            return MIN_TIMESTAMP_PRECISION;
        }
    
        @Override
        public List<LogicalTypeRoot> unsupportedTypes() {
            // The data types used in Mysql are list at:
            // https://dev.mysql.com/doc/refman/8.0/en/data-types.html
    
            // TODO: We can't convert BINARY data type to
            //  PrimitiveArrayTypeInfo.BYTE_PRIMITIVE_ARRAY_TYPE_INFO in
            // LegacyTypeInfoDataTypeConverter.
            return Arrays.asList(
                    LogicalTypeRoot.BINARY,
                    LogicalTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE,
                    LogicalTypeRoot.TIMESTAMP_WITH_TIME_ZONE,
                    LogicalTypeRoot.INTERVAL_YEAR_MONTH,
                    LogicalTypeRoot.INTERVAL_DAY_TIME,
                    LogicalTypeRoot.ARRAY,
                    LogicalTypeRoot.MULTISET,
                    LogicalTypeRoot.MAP,
                    LogicalTypeRoot.ROW,
                    LogicalTypeRoot.DISTINCT_TYPE,
                    LogicalTypeRoot.STRUCTURED_TYPE,
                    LogicalTypeRoot.NULL,
                    LogicalTypeRoot.RAW,
                    LogicalTypeRoot.SYMBOL,
                    LogicalTypeRoot.UNRESOLVED);
        }
    
    
    2023-04-06 08:51:46
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载