spark/Flink 导入导出starrocks

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 RDS MySQL Serverless,0.5-2RCU 50GB
云数据库 RDS MySQL Serverless,价值2615元额度,1个月
简介: spark/Flink 导入导出starrocks


Flink导入数据到starrocks

支持的数据源

  • CSV
  • JSON

操作步骤
步骤一:添加 pom 依赖

<dependency>
    <groupId>com.starrocks</groupId>
    <artifactId>flink-connector-starrocks</artifactId>
    <!-- for flink-1.14 -->
    <version>x.x.x_flink-1.14_2.11</version>
    <version>x.x.x_flink-1.14_2.12</version>
</dependency>

步骤二:调用 flink-connector-starrocks

bean2starrocks

import com.starrocks.connector.flink.StarRocksSink;
import com.starrocks.connector.flink.row.StarRocksSinkRowBuilder;
import com.starrocks.connector.flink.table.StarRocksSinkOptions;
import com.starrocks.funcs.BeanDataJava;
import com.starrocks.funcs.MySourceJava;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import java.util.concurrent.TimeUnit;
/**
 *  Demo1
 *   - define Class BeanData,
 *   - sink to StarRocks via flink-connector-starrocks
 */
public class Bean2StarRocksJava {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        DataStream<BeanDataJava> sourceStream = env
                .addSource(new MySourceJava())
                    .uid("sourceStream-uid").name("sourceStream")
                    .setParallelism(1)
                .map(new MapFunction<Row, BeanDataJava>() {
                    @Override
                    public BeanDataJava map(Row value) throws Exception {
                        String name = value.getField(0).toString();
                        int score = Integer.parseInt(value.getField(1).toString());
                        return new BeanDataJava(name,score);
                    }
                })
                    .uid("sourceStreamMap-uid").name("sourceStreamMap")
                    .setParallelism(1);
        sourceStream
                .addSink(
                        StarRocksSink.sink(
                                // the table structure
                                TableSchema.builder()
                                        .field("name", DataTypes.VARCHAR(20))
                                        .field("score", DataTypes.INT())
                                        .build(),
                                /*
                                The sink options for this demo:
                                - hostname: master1
                                - fe http port: 8030
                                - database name: starrocks_demo
                                - table names: demo2_flink_tb1
                                - TODO: customize above args to fit your environment.
                                */
                                StarRocksSinkOptions.builder()
                                        .withProperty("jdbc-url", "jdbc:mysql://master1:9030/starrocks_demo")
                                        .withProperty("load-url", "master1:8030")
                                        .withProperty("username", "root")
                                        .withProperty("password", "")
                                        .withProperty("table-name", "demo2_flink_tb1")
                                        .withProperty("database-name", "starrocks_demo")
                                        .withProperty("sink.properties.row_delimiter","\\x02")      // in case of raw data contains common delimiter like '\n'
                                        .withProperty("sink.properties.column_separator","\\x01")   // in case of raw data contains common separator like '\t'
                                        .withProperty("sink.buffer-flush.interval-ms","5000")
                                        .build(),
                                // set the slots with streamRowData
                                new StarRocksSinkRowBuilder<BeanDataJava>() {
                                    @Override
                                    public void accept(Object[] objects, BeanDataJava beanDataJava) {
                                        objects[0] = beanDataJava.getName();
                                        objects[1] = new Integer(beanDataJava.getScore());
                                    }
                                }
                        )
                )
                .uid("sourceSink-uid").name("sourceSink")
                .setParallelism(1);
        try {
            env.execute("StarRocksSink_BeanDataJava");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private static StreamExecutionEnvironment getExecutionEnvironment(){
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setMaxParallelism(3);
        env.setParallelism(3);
        env.setRestartStrategy(RestartStrategies.failureRateRestart(
                3, //failureRate
                org.apache.flink.api.common.time.Time.of(5, TimeUnit.MINUTES), // failureInterval
                org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS) // delayInterval
        ));
        // checkpoint options
        env.enableCheckpointing(1000 * 30);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        env.getCheckpointConfig().setCheckpointTimeout(1000 * 60 * 10);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
        return env;
    }
}

json2Starrocks

import com.starrocks.connector.flink.StarRocksSink;
import com.starrocks.connector.flink.row.StarRocksSinkRowBuilder;
import com.starrocks.connector.flink.table.StarRocksSinkOptions;
import com.starrocks.funcs.BeanDataJava;
import com.starrocks.funcs.MySourceJava;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
import java.util.concurrent.TimeUnit;
/**
 *  Demo1
 *   - define Class BeanData,
 *   - sink to StarRocks via flink-connector-starrocks
 */
public class Bean2StarRocksJava {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        DataStream<BeanDataJava> sourceStream = env
                .addSource(new MySourceJava())
                    .uid("sourceStream-uid").name("sourceStream")
                    .setParallelism(1)
                .map(new MapFunction<Row, BeanDataJava>() {
                    @Override
                    public BeanDataJava map(Row value) throws Exception {
                        String name = value.getField(0).toString();
                        int score = Integer.parseInt(value.getField(1).toString());
                        return new BeanDataJava(name,score);
                    }
                })
                    .uid("sourceStreamMap-uid").name("sourceStreamMap")
                    .setParallelism(1);
        sourceStream
                .addSink(
                        StarRocksSink.sink(
                                // the table structure
                                TableSchema.builder()
                                        .field("name", DataTypes.VARCHAR(20))
                                        .field("score", DataTypes.INT())
                                        .build(),
                                /*
                                The sink options for this demo:
                                - hostname: master1
                                - fe http port: 8030
                                - database name: starrocks_demo
                                - table names: demo2_flink_tb1
                                - TODO: customize above args to fit your environment.
                                */
                                StarRocksSinkOptions.builder()
                                        .withProperty("jdbc-url", "jdbc:mysql://master1:9030/starrocks_demo")
                                        .withProperty("load-url", "master1:8030")
                                        .withProperty("username", "root")
                                        .withProperty("password", "")
                                        .withProperty("table-name", "demo2_flink_tb1")
                                        .withProperty("database-name", "starrocks_demo")
                                        .withProperty("sink.properties.row_delimiter","\\x02")      // in case of raw data contains common delimiter like '\n'
                                        .withProperty("sink.properties.column_separator","\\x01")   // in case of raw data contains common separator like '\t'
                                        .withProperty("sink.buffer-flush.interval-ms","5000")
                                        .build(),
                                // set the slots with streamRowData
                                new StarRocksSinkRowBuilder<BeanDataJava>() {
                                    @Override
                                    public void accept(Object[] objects, BeanDataJava beanDataJava) {
                                        objects[0] = beanDataJava.getName();
                                        objects[1] = new Integer(beanDataJava.getScore());
                                    }
                                }
                        )
                )
                .uid("sourceSink-uid").name("sourceSink")
                .setParallelism(1);
        try {
            env.execute("StarRocksSink_BeanDataJava");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private static StreamExecutionEnvironment getExecutionEnvironment(){
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setMaxParallelism(3);
        env.setParallelism(3);
        env.setRestartStrategy(RestartStrategies.failureRateRestart(
                3, //failureRate
                org.apache.flink.api.common.time.Time.of(5, TimeUnit.MINUTES), // failureInterval
                org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS) // delayInterval
        ));
        // checkpoint options
        env.enableCheckpointing(1000 * 30);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        env.getCheckpointConfig().setCheckpointTimeout(1000 * 60 * 10);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
        return env;
    }
}

sql2Starrocks

import com.starrocks.funcs.MySourceJava;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import java.util.concurrent.TimeUnit;
import static org.apache.flink.table.api.Expressions.$;
/**
  * Demo3:
  *    - Construct TemporaryView via org.apache.flink.types.Row
  *    - FlinkSql -> flink-connector-starrocksdb -> StarRocksDB
  */
public class Sql2StarRocksJava {
    public static void main(String[] args) {
        StreamExecutionEnvironment env = getExecutionEnvironment();
        EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
        StreamTableEnvironment streamTableEnv = StreamTableEnvironment.create(env,settings);
        DataStream<Row> source = env
                .addSource(new MySourceJava(),getRowTypeInfo())
                .uid("sourceStream-uid").name("sourceStream")
                .setParallelism(1);
        Table sourceTable = streamTableEnv.fromDataStream(source,$("name"),$("score"));
        streamTableEnv.createTemporaryView("sourceTable",sourceTable);
        /*
        The sink options for this demo:
        - hostname: master1
        - fe http port: 8030
        - database name: starrocksdb_demo
        - table names: demo2_flink_tb1
        - TODO: customize above args to fit your environment.
        */
        streamTableEnv.executeSql(
                "CREATE TABLE testTable( "+
                          "  `name` VARCHAR, "+
                          "  `score` INT "+
                          " ) WITH ( "+
                          "  'connector' = 'starrocks', "+
                          "  'jdbc-url'='jdbc:mysql://master1:9030/starrocksdb_demo', "+
                          "  'load-url'='master1:8030', "+
                          "  'database-name' = 'starrocks_demo', "+
                          "  'table-name' = 'demo2_flink_tb3', "+
                          "  'username' = 'root', "+
                          "  'password' = '', "+
                          "  'sink.buffer-flush.max-rows' = '1000000', "+
                          "  'sink.buffer-flush.max-bytes' = '300000000', "+
                          "  'sink.buffer-flush.interval-ms' = '15000', "+
                          "  'sink.max-retries' = '3', "+
                          "  'sink.properties.row_delimiter' = '\\x02', "+
                          "  'sink.properties.column_separator' = '\\x01', "+
                          "  'sink.properties.columns' = 'NAME, SCORE' "+
                          " )"
    );
        // TODO Cautions for Scala codes:
        // 1. 3x quotation marks save some careful work with escape characters, using '\x02' and  '\x01' directly.
        // 2. When concat multiple lines with double quotation marks, please use "\\x02" and "\\x01" instead, e.g. :
        //  ...
        //  + "'sink.properties.row_delimiter' = '\\x02',"
        //  + "'sink.properties.column_separator' = '\\x01' "
        //  + ...
        streamTableEnv.executeSql(
                "insert into testTable select `name`,`score` from sourceTable");
        try {
            env.execute("StarRocksSink_SQLJava");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    private static StreamExecutionEnvironment getExecutionEnvironment(){
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setMaxParallelism(3);
        env.setParallelism(3);
        env.setRestartStrategy(RestartStrategies.failureRateRestart(
                3, //failureRate
                org.apache.flink.api.common.time.Time.of(5, TimeUnit.MINUTES), // failureInterval
                org.apache.flink.api.common.time.Time.of(10, TimeUnit.SECONDS) // delayInterval
        ));
        // checkpoint options
        env.enableCheckpointing(1000 * 30);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.AT_LEAST_ONCE);
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);
        env.getCheckpointConfig().setCheckpointTimeout(1000 * 60 * 10);
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION);
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(Integer.MAX_VALUE);
        return env;
    }
  public static RowTypeInfo getRowTypeInfo(){
    return new RowTypeInfo(
        TypeInformation.of(String.class),TypeInformation.of(int.class));
  }
}

Flink 导出starrocks数据

flink-connector-starrocks 的 source 功能暂时无法保证 exactly-once 语义。如果读取任务失败,您需要重复本步骤,再次创建读取任务。


步骤一:添加 pom 依赖

<dependency>    
    <groupId>com.starrocks</groupId>
    <artifactId>flink-connector-starrocks</artifactId>
    <!-- for flink-1.14 -->
    <version>x.x.x_flink-1.14_2.11</version>
    <version>x.x.x_flink-1.14_2.12</version>
</dependency>

步骤二:调用 flink-connector-starrocks

参考如下示例代码,调用 flink-connector-starrocks,读取 StarRocks 的数据。相关参数说明:

import com.starrocks.connector.flink.StarRocksSource;
import com.starrocks.connector.flink.table.source.StarRocksSourceOptions;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
public class StarRocksSourceApp {
    public static void main(String[] args) {
        StarRocksSourceOptions options = StarRocksSourceOptions.builder()
               .withProperty("scan-url", "192.168.xxx.xxx:8030,192.168.xxx.xxx:8030")
               .withProperty("jdbc-url", "jdbc:mysql://192.168.xxx.xxx:9030")
               .withProperty("username", "root")
               .withProperty("password", "xxxxxx")
               .withProperty("table-name", "flink_test")
               .withProperty("database-name", "test")
               .build();
        TableSchema tableSchema = TableSchema.builder()
               .field("date_1", DataTypes.DATE())
               .field("datetime_1", DataTypes.TIMESTAMP(6))
               .field("char_1", DataTypes.CHAR(20))
               .field("varchar_1", DataTypes.STRING())
               .field("boolean_1", DataTypes.BOOLEAN())
               .field("tinyint_1", DataTypes.TINYINT())
               .field("smallint_1", DataTypes.SMALLINT())
               .field("int_1", DataTypes.INT())
               .field("bigint_1", DataTypes.BIGINT())
               .field("largeint_1", DataTypes.STRING())
               .field("float_1", DataTypes.FLOAT())
               .field("double_1", DataTypes.DOUBLE())
               .field("decimal_1", DataTypes.DECIMAL(27, 9))
               .build();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.addSource(StarRocksSource.source(tableSchema, options)).setParallelism(5).print();
        env.execute("StarRocks flink source");
    }
}

spark 导入数据到starrocks

支持的数据格式

  • CSV
    • ORC(2.0 版本之后支持)
    • PARQUET(2.0 版本之后支持)


    操作步骤
    步骤一:添加 pom 依赖

     <dependency>
                <groupId>com.starrocks.connector</groupId>
                <artifactId>spark</artifactId>
                <version>1.0.0</version>
                <scope>system</scope>
                <systemPath>${project.basedir}/src/main/resources/starrocks-spark2_2.11-1.0.0.jar</systemPath>
            </dependency>

    步骤二:调用 spark-connector-starrocks
    sparkLoad2StarRocks

    import com.starrocks.utils.{Consts, LoggerUtil, MySrSink}
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    import com.starrocks.connector.spark._
    object SparkConnector2StarRocks {
      // parameters
      val starRocksName =  "starrocks_demo"
      val tblNameSrc =  "demo1_spark_tb1"
      val tblNameDst =  "demo1_spark_tb2"
      val userName =  "root"
      val password =  ""
      val srFe = "master1"   // fe hostname
      val port =  8030          // fe http port
      val filterRatio =  0.2
      val columns = "uid,date,hour,minute,site"
      val master = "local"
      val appName = "app_spark_demo2"
      val partitions =   2   // computing parallelism
      val buckets =   1      // sink parallelism
      val debug = false
      LoggerUtil.setSparkLogLevels()
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
          .setAppName(appName)
          .setMaster(master)
          .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
        val spark = SparkSession.builder().config(conf).master(master).enableHiveSupport().getOrCreate()
        val sc = spark.sparkContext
        sc.setLogLevel("WARN")
        import spark.implicits._
        val starrocksSparkDF = spark.read.format("starrocks")
          .option("starrocks.table.identifier", s"${starRocksName}.${tblNameSrc}")
          .option("starrocks.fenodes", s"${srFe}:${port}")
          .option("user", s"${userName}")
          .option("password", s"${password}")
          .load().repartition(partitions)
        starrocksSparkDF.show(5, false)
        starrocksSparkDF.createOrReplaceTempView("view_tb1")
        val resDf = spark.sql(
          """
            |select uid, date, hour, minute, site
            |from view_tb1
            |lateral view explode(split(uid_list_str,',')) temp_tbl as uid
            |""".stripMargin)
        resDf.show(5, false)  // IDEA/REPL local outputs
        resDf.map( x => x.toString().replaceAll("\\[|\\]","").replace(",",Consts.starrocksSep))
          .repartition(buckets).foreachPartition(
          itr => {
            val sink = new MySrSink(Map(
              "max_filter_ratio" -> s"${filterRatio}",
              "columns" -> columns,
              "column_separator" -> Consts.starrocksSep),
              starRocksName,
              userName,
              password,
              tblNameDst,
              srFe,
              port,
              debug,
              debug)
            if (itr.hasNext) sink.invoke(itr.mkString("\n"))
          }
        )
        spark.close()
      }
    }

    SparkStreaming2StarRocks

    import com.starrocks.utils.{Consts, LoggerUtil, MySrSink}
    import org.apache.spark.SparkConf
    import org.apache.spark.sql.SparkSession
    import com.starrocks.connector.spark._
    object SparkConnector2StarRocks {
      // parameters
      val starRocksName =  "starrocks_demo"
      val tblNameSrc =  "demo1_spark_tb1"
      val tblNameDst =  "demo1_spark_tb2"
      val userName =  "root"
      val password =  ""
      val srFe = "master1"   // fe hostname
      val port =  8030          // fe http port
      val filterRatio =  0.2
      val columns = "uid,date,hour,minute,site"
      val master = "local"
      val appName = "app_spark_demo2"
      val partitions =   2   // computing parallelism
      val buckets =   1      // sink parallelism
      val debug = false
      LoggerUtil.setSparkLogLevels()
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
          .setAppName(appName)
          .setMaster(master)
          .set("spark.serializer","org.apache.spark.serializer.KryoSerializer")
        val spark = SparkSession.builder().config(conf).master(master).enableHiveSupport().getOrCreate()
        val sc = spark.sparkContext
        sc.setLogLevel("WARN")
        import spark.implicits._
        val starrocksSparkDF = spark.read.format("starrocks")
          .option("starrocks.table.identifier", s"${starRocksName}.${tblNameSrc}")
          .option("starrocks.fenodes", s"${srFe}:${port}")
          .option("user", s"${userName}")
          .option("password", s"${password}")
          .load().repartition(partitions)
        starrocksSparkDF.show(5, false)
        starrocksSparkDF.createOrReplaceTempView("view_tb1")
        val resDf = spark.sql(
          """
            |select uid, date, hour, minute, site
            |from view_tb1
            |lateral view explode(split(uid_list_str,',')) temp_tbl as uid
            |""".stripMargin)
        resDf.show(5, false)  // IDEA/REPL local outputs
        resDf.map( x => x.toString().replaceAll("\\[|\\]","").replace(",",Consts.starrocksSep))
          .repartition(buckets).foreachPartition(
          itr => {
            val sink = new MySrSink(Map(
              // "label"->"label123" :
              //     1. If not customized, StarRocks randomly generates a code as the label;
              //     2. Stream-load label is 'Unique', the Stream-load with same label can be loaded only once.
              //        [Good choice]: the label can be combined with info like batch-time and TaskContext.get.partitionId().
              "max_filter_ratio" -> s"${filterRatio}",
              "columns" -> columns,
              "column_separator" -> Consts.starrocksSep),
              starRocksName,
              userName,
              password,
              tblNameDst,
              srFe,
              port,
              debug,
              debug)
            if (itr.hasNext) sink.invoke(itr.mkString("\n"))
          }
        )
        spark.close()
      }
    }

    spark 导出starrocks到其它

    使用说明

    • 当前版本只支持从 StarRocks 中读取数据,不支持从 Sink 写入数据到 StarRocks 中。
    • 支持在 StarRocks 端完成数据过滤,从而减少数据传输量。
    • 如果读取数据的开销比较大,可以通过合理的表设计和使用过滤条件,控制 Spark不要一次读取过多的数据,从而避免给磁盘和网络造成过大的 I/O 压力或影响正常的查询业务。

    版本要求

    60a6bcefe26f4b118e50f46e4d0afd1d.png

    操作步骤

    步骤一:添加 pom 依赖

        <dependency>
                <groupId>com.starrocks</groupId>
                <artifactId>starrocks-stream-load-sdk</artifactId>
                <version>1.0-SNAPSHOT</version>
            </dependency>
            <dependency>
                <groupId>org.scala-lang</groupId>
                <artifactId>scala-library</artifactId>
                <version>2.12.10</version>
            </dependency>
            <dependency>
                <groupId>com.starrocks</groupId>
                <artifactId>starrocks-thrift-sdk</artifactId>
                <version>1.0.1</version>
            </dependency>

    步骤一:使用spark  读取starRocks
    使用 Spark DataFrame 读取数据

    val starrocksSparkDF = spark.read.format("starrocks")
               .option("starrocks.table.identifier", s"test.score_board")
               .option("starrocks.fenodes", s"<fe_host>:<fe_http_port>")
               .option("user", s"root")
               .option("password", s"")
    ## 分区裁剪           
                .option("starrocks.filter.query", "dt='2022-01-02 08:00:00'")
    ##   分桶裁剪       
               .option("starrocks.filter.query", "k=1")   
    ##   分区分桶裁剪  
               .option("starrocks.filter.query", "k=7 and dt='2022-01-02 08:00:00'")                
               .load()

    使用spark RDD的方式读取数据

    import com.starrocks.connector.spark._
    val starrocksSparkRDD = sc.starrocksRDD(
      tableIdentifier = Some("$YOUR_STARROCKS_DATABASE_NAME.$YOUR_STARROCKS_TABLE_NAME"),
      cfg = Some(Map(
        "starrocks.fenodes" -> "$YOUR_STARROCKS_FE_HOSTNAME:$YOUR_STARROCKS_FE_RESTFUL_PORT",
        "starrocks.request.auth.user" -> "$YOUR_STARROCKS_USERNAME",
        "starrocks.request.auth.password" -> "$YOUR_STARROCKS_PASSWORD"
      ))
    )

    使用sparksql的方式读取starrocks

    val resDf = spark.sql(
            |CREATE TEMPORARY VIEW spark_starrocks
            |  USING starrocks
            |  OPTIONS
            |  (
            |      "starrocks.table.identifier" = "test.score_board",
            |      "starrocks.fenodes" = "<fe_host>:<fe_http_port>",
            |      "user" = "root",
            |      "password" = ""
            |  );
            |
            |""".stripMargin)
    

    参数设置

    通用参数

    以下参数适用于 Spark SQL、Spark DataFrame、Spark RDD 三种读取方式。

    60a6bcefe26f4b118e50f46e4d0afd1d.png

    Spark SQL 和 Spark DataFrame 专有参数

    以下参数仅适用于 Spark SQL 和 Spark DataFrame 读取方式。

    60a6bcefe26f4b118e50f46e4d0afd1d.png

    Spark RDD 专有参数

    以下参数仅适用于 Spark RDD 读取方式。

    75f0e2306cfe4b549332ab598e15c984.png







    相关实践学习
    基于Hologres轻松玩转一站式实时仓库
    本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
    Linux入门到精通
    本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
    相关文章
    |
    7月前
    |
    分布式计算 Hadoop 大数据
    大数据Hadoop之——Apache Hudi 数据湖实战操作(Spark,Flink与Hudi整合)
    大数据Hadoop之——Apache Hudi 数据湖实战操作(Spark,Flink与Hudi整合)
    |
    7月前
    |
    分布式计算 资源调度 监控
    没有监控的流处理作业与茫茫大海中的裸泳无异 - 附 flink 与 spark 作业监控脚本实现
    没有监控的流处理作业与茫茫大海中的裸泳无异 - 附 flink 与 spark 作业监控脚本实现
    |
    9月前
    |
    分布式计算 Hadoop 大数据
    一口气说完MR、Storm、Spark、SparkStreaming和Flink
    一口气说完MR、Storm、Spark、SparkStreaming和Flink
    |
    23天前
    |
    分布式计算 大数据 数据处理
    【Flink】Flink跟Spark Streaming的区别?
    【4月更文挑战第17天】【Flink】Flink跟Spark Streaming的区别?
    |
    2月前
    |
    监控 关系型数据库 MySQL
    Flink CDC产品常见问题之使用3.0测试mysql到starrocks启动报错如何解决
    Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
    |
    2月前
    |
    Oracle 关系型数据库 MySQL
    Flink CDC产品常见问题之用superset连接starrocks报错如何解决
    Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
    |
    4月前
    |
    缓存 NoSQL 数据库
    Flink cdc到doris,starrocks,table store
    Flink cdc到doris,starrocks,table store
    |
    4月前
    |
    消息中间件 分布式计算 数据处理
    Flink与Spark的区别是什么?请举例说明。
    Flink与Spark的区别是什么?请举例说明。
    55 0
    |
    5月前
    |
    流计算
    Flink CDC-sql怎样导数据使starrocks支持主键模型delete的配置吗?目前只能更新和插入,但是删除不行
    Flink CDC-sql怎样导数据使starrocks支持主键模型delete的配置吗?目前只能更新和插入,但是删除不行
    102 1
    |
    6月前
    |
    消息中间件 分布式计算 Kafka
    Flink教程(30)- Flink VS Spark(下)
    Flink教程(30)- Flink VS Spark(下)
    47 0