flinksql读写redis

本文涉及的产品
云原生内存数据库 Tair,内存型 2GB
实时计算 Flink 版,5000CU*H 3个月
云数据库 Redis 版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: flinksql读写redis


0、前言

  最近有个需求,需要使用flinksql读写redis,由于官网上并没有redis的connector,在网上找了很久,开源的几个connector又没法满足要求,所有这里就自己动手实现了一个。已经适配了各个版本的flink,从flink1.12到flink1.15。

  简单介绍一下功能吧:

  • 将redis作为流表时支持BLPOP、BRPOP、LPOP、RPOP、SPOP等命令;使用lua脚本封装的批量弹出提高消费性能
  • 将redis作为维表时支持GET、HGET等命令;支持lookup缓存
  • 将redis作为sink表时支持LPUSH、RPUSH、SADD、SET、HSET等命令;支持指定key的ttl时间
  • 支持flink常见的序列化反序列化方式,如json、csv等,具体参见flink官网:https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/overview/

1、redis作为流表

1.1、数据准备


1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

33

34

35

36

37

38

@Before

publicvoidinit() {

    /**

        设置当前属于测试模式,在这个测试模式下,当流表数据消费完成后程序会停止,方便测试,这个模式默认false

    */

    RedisOptions.IS_TEST = true;

    RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0);

    List<String> lists = newArrayList<>();

    for(inti = 0; i < 1000; i++) {

        lists.add("{\n"+

                "  \"number\": "+ i + ",\n"+

                "  \"name\": \"学生"+ i + "\",\n"+

                "  \"school\": \"学校"+ ((i % 3) + 1) +"\",\n"+

                "  \"class_id\": "+ ((i % 10) + 1) +"\n"+

                "}");

    }

    /**

     * 初始化学生数据

     */

    for(inti = 0; i < 1; i++) {

        redisOperator.rpush("students", lists.subList(1000* i, 1000* (i + 1)));

    }

    /**

     * 初始化班级数据

     */

    for(inti = 0;i < 10;i++) {

        redisOperator.set(String.valueOf(i + 1),"银河"+ (i + 1) + "班");

    }

 

    /**

     * 初始化学校班级数据

     */

    for(intj = 1;j < 4;j++) {

        for(inti = 1; i < 11; i++) {

            redisOperator.hset("学校"+ j, String.valueOf(i), "银河"+ i + "班");

        }

    }

}

1.2、使用BLPOP、BRPOP、LPOP、RPOP、SPOP消费指定的key的list或者set的数据

网络异常,图片无法展示
|

   @Test

   publicvoid testBlpopSQL() throws Exception {

       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


       EnvironmentSettings environmentSettings =

               EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

       StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);


       String source =

               "CREATE TABLE students\n" +

                       "(\n" +

                       "    number  BIGINT ,\n" +

                       "    name  string,\n" +

                       "    school   string, \n" +

                       "    class_id   BIGINT \n" +

                       ") \n" +

                       "WITH (\n" +

                       "  'connector'='redis',\n" +

                       "  'host'='10.201.0.33', \n" +

                       "  'port'='6379',\n" +

                       "  'redis-mode'='single', \n" +

                       "  'password'='123456',\n" +

                       "  'database'='0',\n" +

                       "  'key'='students',\n" +

                       "  'format'='json',\n" +

                       "  'batch-fetch-rows'='1000',\n" +

                       "  'json.fail-on-missing-field' = 'false',\n" +

                       "  'json.ignore-parse-errors' = 'true',\n" +

                       "  'command'='BLPOP'\n" +

                       " )";




       String sink =

               "CREATE TABLE sink_students\n" +

                       "(\n" +

                       "    number  BIGINT ,\n" +

                       "    name  string,\n" +

                       "    school   string, \n" +

                       "    class_id   BIGINT \n" +

                       ") \n" +

                       "WITH (\n" +

                       "  'connector'='print'\n" +

                       " )";


       tEnv.executeSql(source);

       tEnv.executeSql(sink);


       String sql =

               " insert into sink_students select * from students";

       TableResult tableResult = tEnv.executeSql(sql);

       tableResult.getJobClient().get().getJobExecutionResult().get();

   }

网络异常,图片无法展示
|

2、redis作为维表(不带format)

2.1、数据准备

网络异常,图片无法展示
|

   @Before

   publicvoid init() {

       /**

           设置当前属于测试模式,在这个测试模式下,当流表数据消费完成后程序会停止,方便测试,这个模式默认false

       */

       RedisOptions.IS_TEST = true;

       RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0);

       List<String> lists = new ArrayList<>();

       for (int i = 0; i < 1000; i++) {

           lists.add("{\n" +

                   "  \"number\": " + i + ",\n" +

                   "  \"name\": \"学生" + i + "\",\n" +

                   "  \"school\": \"学校" + ((i % 3) + 1) +"\",\n" +

                   "  \"class_id\": " + ((i % 10) + 1) +"\n" +

                   "}");

       }

       /**

        * 初始化学生数据

        */

       for (int i = 0; i < 1; i++) {

           redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1)));

       }

       /**

        * 初始化班级数据

        */

       for(int i = 0;i < 10;i++) {

           redisOperator.set(String.valueOf(i + 1),"银河" + (i + 1) + "班");

       }


       /**

        * 初始化学校班级数据

        */

       for(int j = 1;j < 4;j++) {

           for (int i = 1; i < 11; i++) {

               redisOperator.hset("学校" + j, String.valueOf(i), "银河" + i + "班");

           }

       }

   }

网络异常,图片无法展示
|

2.2、使用GET作为维表查询命令

网络异常,图片无法展示
|

   @Test

   publicvoid testGetSQL() throws Exception {

       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


       EnvironmentSettings environmentSettings =

               EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

       StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);


       String source =

               "CREATE TABLE students\n" +

                       "(\n" +

                       "    number  BIGINT ,\n" +

                       "    name  string,\n" +

                       "    school   string, \n" +

                       "    class_id   BIGINT, \n" +

                       "    proctime as PROCTIME() \n" +

                       ") \n" +

                       "WITH (\n" +

                       "  'connector'='redis',\n" +

                       "  'host'='10.201.0.33', \n" +

                       "  'port'='6379',\n" +

                       "  'redis-mode'='single', \n" +

                       "  'password'='123456',\n" +

                       "  'database'='0',\n" +

                       "  'key'='students',\n" +

                       "  'format'='json',\n" +

                       "  'batch-fetch-rows'='1000',\n" +

                       "  'json.fail-on-missing-field' = 'false',\n" +

                       "  'json.ignore-parse-errors' = 'true',\n" +

                       "  'command'='BLPOP'\n" +

                       " )";


       /**

           这里需要注意的是,由于使用get命令,而且没有加format属性,所以维表只能有两个字段,多了也识别不到,

           详细可以看源码里的注释

       */

       String daeamon =

               "CREATE TABLE classes\n" +

                       "(\n" +

                       "    class_id  BIGINT   ,\n" +

                       "    class_name  string   " +

                       ") \n" +

                       "WITH (\n" +

                       "  'connector'='redis',\n" +

                       "  'host'='10.201.0.33', \n" +

                       "  'port'='6379',\n" +

                       "  'redis-mode'='single', \n" +

                       "  'password'='123456',\n" +

                       "  'lookup.cache.max-rows'='1000',\n" +

                       "  'lookup.cache.ttl'='3600',\n" +

                       "  'lookup.cache.load-all'='true',\n" +

                       "  'database'='0',\n" +

                       "  'command'='GET'\n" +

                       " )";



       String sink =

               "CREATE TABLE sink_students\n" +

                       "(\n" +

                       "    number  BIGINT ,\n" +

                       "    name  string,\n" +

                       "    school   string, \n" +

                       "    class_id   BIGINT, \n" +

                       "    class_name   string \n" +

                       ") \n" +

                       "WITH (\n" +

                       "  'connector'='print'\n" +

                       " )";


       tEnv.executeSql(source);

       tEnv.executeSql(daeamon);

       tEnv.executeSql(sink);

       /**

           这里join的字段必须是GET命令的key

       */

       String sql =

               " insert into sink_students "

               + " select s.number,s.name,s.school,s.class_id,d.class_name  from students s"

               + "  left join classes for system_time as of s.proctime as d  on d.class_id = s.class_id";

       TableResult tableResult = tEnv.executeSql(sql);

       tableResult.getJobClient().get().getJobExecutionResult().get();

   }

网络异常,图片无法展示
|

2.3、使用HGET作为维表查询命令

网络异常,图片无法展示
|

   @Test

   publicvoid testHGetSQL() throws Exception {

       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


       EnvironmentSettings environmentSettings =

               EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

       StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);


       String source =

               "CREATE TABLE students\n" +

                       "(\n" +

                       "    number  BIGINT ,\n" +

                       "    name  string,\n" +

                       "    school   string, \n" +

                       "    class_id   BIGINT, \n" +

                       "    proctime as PROCTIME() \n" +

                       ") \n" +

                       "WITH (\n" +

                       "  'connector'='redis',\n" +

                       "  'host'='10.201.0.33', \n" +

                       "  'port'='6379',\n" +

                       "  'redis-mode'='single', \n" +

                       "  'password'='123456',\n" +

                       "  'database'='0',\n" +

                       "  'key'='students',\n" +

                       "  'format'='json',\n" +

                       "  'batch-fetch-rows'='1000',\n" +

                       "  'json.fail-on-missing-field' = 'false',\n" +

                       "  'json.ignore-parse-errors' = 'true',\n" +

                       "  'command'='BLPOP'\n" +

                       " )";

       /**

           这里需要注意的是,由于使用hget命令,而且没有加format属性,所以维表只能有三个字段,多了也识别不到,

           详细可以看源码里的注释

       */

       String daeamon =

               "CREATE TABLE classes\n" +

                       "(\n" +

                       "    school   string, \n" +

                       "    class_id  BIGINT   ,\n" +

                       "    class_name  string   " +

                       ") \n" +

                       "WITH (\n" +

                       "  'connector'='redis',\n" +

                       "  'host'='10.201.0.33', \n" +

                       "  'port'='6379',\n" +

                       "  'redis-mode'='single', \n" +

                       "  'password'='123456',\n" +

                       "  'lookup.cache.max-rows'='1000',\n" +

                       "  'lookup.cache.ttl'='3600',\n" +

                       "  'lookup.cache.load-all'='true',\n" +

                       "  'database'='0',\n" +

                       "  'command'='HGET'\n" +

                       " )";



       String sink =

               "CREATE TABLE sink_students\n" +

                       "(\n" +

                       "    number  BIGINT ,\n" +

                       "    name  string,\n" +

                       "    school   string, \n" +

                       "    class_id   BIGINT, \n" +

                       "    class_name   string \n" +

                       ") \n" +

                       "WITH (\n" +

                       "  'connector'='print'\n" +

                       " )";


       tEnv.executeSql(source);

       tEnv.executeSql(daeamon);

       tEnv.executeSql(sink);

       /**

           这里需要注意的是,由于使用hget命令,这里join的参数两个参数顺序没有关系,真正执行hget命令哪个字段作为key,

           哪个字段作为field只与维表定义的时候的字段顺序有关系

       */

       String sql =

               " insert into sink_students "

                       + " select s.number,s.name,s.school,s.class_id,d.class_name  from students s"

                       + "  left join classes for system_time as of s.proctime as d  on d.class_id = s.class_id and d.school = s.school";

       TableResult tableResult = tEnv.executeSql(sql);

       tableResult.getJobClient().get().getJobExecutionResult().get();

   }

网络异常,图片无法展示
|

3、redis作为维表(带format)

3.1、数据准备

网络异常,图片无法展示
|

   @Before

   publicvoid init() {

       RedisOptions.IS_TEST = true;

       RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0);

       List<String> lists = new ArrayList<>();

       for (int i = 0; i < 1000; i++) {

           lists.add("{\n" +

                   "  \"number\": " + i + ",\n" +

                   "  \"name\": \"学生" + i + "\",\n" +

                   "  \"school\": \"学校" + ((i % 3) + 1) +"\",\n" +

                   "  \"class_id\": " + ((i % 10) + 1) +"\n" +

                   "}");

       }

       /**

        * 初始化学生数据

        */

       for (int i = 0; i < 1; i++) {

           redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1)));

       }

       /**

        * 初始化班级数据

        */

       for(int i = 0;i < 10;i++) {

           JSONObject jsonObject = new JSONObject();

           jsonObject.put("class_id",String.valueOf(i + 1));

           jsonObject.put("class_name","银河" + (i + 1) + "班");

           jsonObject.put("remark","remark" + i);

           redisOperator.set(String.valueOf(i + 1),jsonObject.toString());

       }


       /**

        * 初始化学校班级数据

        */

       for(int j = 1;j < 4;j++) {

           for (int i = 1; i < 11; i++) {

               JSONObject jsonObject = new JSONObject();

               jsonObject.put("class_id",String.valueOf(i));

               jsonObject.put("class_name","银河" + i + "班");

               jsonObject.put("remark","remark" + i);

               jsonObject.put("school","学校" + j);

               redisOperator.hset("学校" + j, String.valueOf(i), jsonObject.toString());

           }

       }

   }

网络异常,图片无法展示
|

3.2、使用GET作为维表查询命令

网络异常,图片无法展示
|

   @Test

   publicvoid testGetSQL() throws Exception {

       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


       EnvironmentSettings environmentSettings =

               EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

       StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);


       String source =

               "CREATE TABLE students\n" +

                       "(\n" +

                       "    number  BIGINT ,\n" +

                       "    name  string,\n" +

                       "    school   string, \n" +

                       "    class_id   BIGINT, \n" +

                       "    proctime as PROCTIME() \n" +

                       ") \n" +

                       "WITH (\n" +

                       "  'connector'='redis',\n" +

                       "  'host'='10.201.0.33', \n" +

                       "  'port'='6379',\n" +

                       "  'redis-mode'='single', \n" +

                       "  'password'='123456',\n" +

                       "  'database'='0',\n" +

                       "  'key'='students',\n" +

                       "  'format'='json',\n" +

                       "  'batch-fetch-rows'='1000',\n" +

                       "  'json.fail-on-missing-field' = 'false',\n" +

                       "  'json.ignore-parse-errors' = 'true',\n" +

                       "  'command'='BLPOP'\n" +

                       " )";


       /**

        * 这里测试的核心是维表有format=json配置项,有了format配置项后,字段个数不受限制,但是需要注意的是,作为get命令的key的字段

        * 一定要放在表申明的第一位,并且get命令的value的值使用format格式化后,比如是json格式,则json里一定要包含作为维表查询的

        *  join on后面带的作为key的查询列,不然会报空指针异常

        */ 

       String daeamon =

               "CREATE TABLE classes\n" +

                       "(\n" +

                       "    class_id  BIGINT   ,\n" +

                       "    class_name  string ,\n   " +

                       "    remark  string   " +

                       ") \n" +

                       "WITH (\n" +

                       "  'connector'='redis',\n" +

                       "  'host'='10.201.0.33', \n" +

                       "  'port'='6379',\n" +

                       "  'redis-mode'='single', \n" +

                       "  'format'='json', \n" +

                       "  'password'='123456',\n" +

                       "  'lookup.cache.max-rows'='1000',\n" +

                       "  'lookup.cache.ttl'='3600',\n" +

                       "  'lookup.cache.load-all'='true',\n" +

                       "  'database'='0',\n" +

                       "  'command'='GET'\n" +

                       " )";



       String sink =

               "CREATE TABLE sink_students\n" +

                       "(\n" +

                       "    number  BIGINT ,\n" +

                       "    name  string,\n" +

                       "    school   string, \n" +

                       "    class_id   BIGINT, \n" +

                       "    class_name   string, \n" +

                       "    remark  string   " +

                       ") \n" +

                       "WITH (\n" +

                       "  'connector'='print'\n" +

                       " )";


       tEnv.executeSql(source);

       tEnv.executeSql(daeamon);

       tEnv.executeSql(sink);


       String sql =

               " insert into sink_students "

                       + " select s.number,s.name,s.school,s.class_id,d.class_name,d.remark  from students s"

                       + "  left join classes for system_time as of s.proctime as d  on d.class_id = s.class_id";

       TableResult tableResult = tEnv.executeSql(sql);

       tableResult.getJobClient().get().getJobExecutionResult().get();

   }

网络异常,图片无法展示
|

3.3、使用HGET作为维表查询命令

网络异常,图片无法展示
|

   @Test

   publicvoid testHGetSQL() throws Exception {

       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


       EnvironmentSettings environmentSettings =

               EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

       StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);


       String source =

               "CREATE TABLE students\n" +

                       "(\n" +

                       "    number  BIGINT ,\n" +

                       "    name  string,\n" +

                       "    school   string, \n" +

                       "    class_id   BIGINT, \n" +

                       "    proctime as PROCTIME() \n" +

                       ") \n" +

                       "WITH (\n" +

                       "  'connector'='redis',\n" +

                       "  'host'='10.201.0.33', \n" +

                       "  'port'='6379',\n" +

                       "  'redis-mode'='single', \n" +

                       "  'password'='123456',\n" +

                       "  'database'='0',\n" +

                       "  'key'='students',\n" +

                       "  'format'='json',\n" +

                       "  'batch-fetch-rows'='1000',\n" +

                       "  'json.fail-on-missing-field' = 'false',\n" +

                       "  'json.ignore-parse-errors' = 'true',\n" +

                       "  'command'='BLPOP'\n" +

                       " )";


       /**

        * 这里测试的核心是维表有format=json配置项,有了format配置项后,字段个数不受限制,但是需要注意的是,作为hget命令的key的字段

        * 一定要放在表申明的第一位,field的字段一定要放在申明的第二位,并且hget命令的value的值使用format格式化后,比如是json格式,          * 则json里一定要包含作为维表查询的 join on后面带的作为key和field的查询列,不然会报空指针异常

        */ 

       String daeamon =

               "CREATE TABLE classes\n" +

                       "(\n" +

                       "    school   string, \n" +

                       "    class_id  BIGINT   ,\n" +

                       "    class_name  string,   " +

                       "    remark  string   " +

                       ") \n" +

                       "WITH (\n" +

                       "  'connector'='redis',\n" +

                       "  'host'='10.201.0.33', \n" +

                       "  'port'='6379',\n" +

                       "  'format'='json', \n" +

                       "  'redis-mode'='single', \n" +

                       "  'password'='123456',\n" +

                       "  'lookup.cache.max-rows'='1000',\n" +

                       "  'lookup.cache.ttl'='3600',\n" +

                       "  'lookup.cache.load-all'='true',\n" +

                       "  'database'='0',\n" +

                       "  'command'='HGET'\n" +

                       " )";



       String sink =

               "CREATE TABLE sink_students\n" +

                       "(\n" +

                       "    number  BIGINT ,\n" +

                       "    name  string,\n" +

                       "    school   string, \n" +

                       "    class_id   BIGINT, \n" +

                       "    class_name   string, \n" +

                       "    remark  string   " +

                       ") \n" +

                       "WITH (\n" +

                       "  'connector'='print'\n" +

                       " )";


       tEnv.executeSql(source);

       tEnv.executeSql(daeamon);

       tEnv.executeSql(sink);


       String sql =

               " insert into sink_students "

                       + " select s.number,s.name,s.school,s.class_id,d.class_name,d.remark  from students s"

                       + "  left join classes for system_time as of s.proctime as d  on d.class_id = s.class_id and d.school = s.school";

       TableResult tableResult = tEnv.executeSql(sql);

       tableResult.getJobClient().get().getJobExecutionResult().get();

   }

网络异常,图片无法展示
|

4、redis作为sink表

4.1、数据准备

网络异常,图片无法展示
|

   @Before

   publicvoid init() {

       RedisOptions.IS_TEST = true;

       RedisOperator redisOperator = RedisOperators.getSimple("10.201.0.33",6379,"123456",0);

       List<String> lists = new ArrayList<>();

       for (int i = 0; i < 1000; i++) {

           lists.add("{\n" +

                   "  \"number\": " + i + ",\n" +

                   "  \"name\": \"学生" + i + "\",\n" +

                   "  \"school\": \"学校" + ((i % 3) + 1) +"\",\n" +

                   "  \"class_id\": " + ((i % 10) + 1) +"\n" +

                   "}");

       }

       /**

        * 初始化学生数据

        */

       for (int i = 0; i < 1; i++) {

           redisOperator.rpush("students", lists.subList(1000 * i, 1000 * (i + 1)));

       }

       /**

        * 初始化班级数据

        */

       for(int i = 0;i < 10;i++) {

           redisOperator.set(String.valueOf(i + 1),"银河" + (i + 1) + "班");

       }


       /**

        * 初始化学校班级数据

        */

       for(int j = 1;j < 4;j++) {

           for (int i = 1; i < 11; i++) {

               redisOperator.hset("学校" + j, String.valueOf(i), "银河" + i + "班");

           }

       }

   }

网络异常,图片无法展示
|

4.2、使用LPush、RPUSH、SADD命令作为sink表写入命令

网络异常,图片无法展示
|

@Test

   publicvoid testLPushSQL() throws Exception {

       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


       EnvironmentSettings environmentSettings =

               EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

       StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);


       String source =

               "CREATE TABLE students\n" +

                       "(\n" +

                       "    number  BIGINT ,\n" +

                       "    name  string,\n" +

                       "    school   string, \n" +

                       "    class_id   BIGINT, \n" +

                       "    proctime as PROCTIME() \n" +

                       ") \n" +

                       "WITH (\n" +

                       "  'connector'='redis',\n" +

                       "  'host'='10.201.0.33', \n" +

                       "  'port'='6379',\n" +

                       "  'redis-mode'='single', \n" +

                       "  'password'='123456',\n" +

                       "  'database'='0',\n" +

                       "  'key'='students',\n" +

                       "  'format'='json',\n" +

                       "  'batch-fetch-rows'='1000',\n" +

                       "  'json.fail-on-missing-field' = 'false',\n" +

                       "  'json.ignore-parse-errors' = 'true',\n" +

                       "  'command'='BLPOP'\n" +

                       " )";


       String daeamon =

               "CREATE TABLE classes\n" +

                       "(\n" +

                       "    school   string, \n" +

                       "    class_id  BIGINT   ,\n" +

                       "    class_name  string   " +

                       ") \n" +

                       "WITH (\n" +

                       "  'connector'='redis',\n" +

                       "  'host'='10.201.0.33', \n" +

                       "  'port'='6379',\n" +

                       "  'redis-mode'='single', \n" +

                       "  'password'='123456',\n" +

                       "  'lookup.cache.max-rows'='1000',\n" +

                       "  'lookup.cache.ttl'='3600',\n" +

                       "  'lookup.cache.load-all'='true',\n" +

                       "  'database'='0',\n" +

                       "  'command'='HGET'\n" +

                       " )";


       /**

        *  1、这里因为command是LPUSH,所以不需要primary key(number) not enforced, 因为这种命令只支持INSERT语义

        *  2、并行度配置项sink.parallelism没有配置,默认为核心数

        */

       String sink =

               "CREATE TABLE sink_students\n" +

                       "(\n" +

                       "    number  BIGINT ,\n" +

                       "    name  string,\n" +

                       "    school   string, \n" +

                       "    class_id   BIGINT, \n" +

                       "    class_name   string \n" +

                       ") \n" +

                       "WITH (\n" +

                       "  'connector'='redis',\n" +

                       "  'host'='10.201.0.33', \n" +

                       "  'port'='6379',\n" +

                       "  'redis-mode'='single', \n" +

                       "  'password'='123456',\n" +

                       "  'database'='0',\n" +

                       "  'key'='sink_students_list',\n" +

                       "  'format'='json',\n" +

                       "  'batch-fetch-rows'='1000',\n" +

                       "  'json.fail-on-missing-field' = 'false',\n" +

                       "  'json.ignore-parse-errors' = 'true',\n" +

                       "  'command'='LPUSH'\n" +

                       " )";


       tEnv.executeSql(source);

       tEnv.executeSql(daeamon);

       tEnv.executeSql(sink);


       String sql =

               " insert into sink_students "

               + " select s.number,s.name,s.school,s.class_id,d.class_name  from students s"

               + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school";

       TableResult tableResult = tEnv.executeSql(sql);

       tableResult.getJobClient().get().getJobExecutionResult().get();

   }

网络异常,图片无法展示
|

4.2、使用SET命令作为sink表写入命令

网络异常,图片无法展示
|

   @Test

   publicvoid testSet() throws Exception {

       long start = System.currentTimeMillis();

       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


       EnvironmentSettings environmentSettings =

               EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

       StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);


       String source =

               "CREATE TABLE students\n" +

                       "(\n" +

                       "    number  BIGINT ,\n" +

                       "    name  string,\n" +

                       "    school   string, \n" +

                       "    class_id   BIGINT, \n" +

                       "    proctime as PROCTIME() \n" +

                       ") \n" +

                       "WITH (\n" +

                       "  'connector'='redis',\n" +

                       "  'host'='10.201.0.33', \n" +

                       "  'port'='6379',\n" +

                       "  'redis-mode'='single', \n" +

                       "  'password'='123456',\n" +

                       "  'database'='0',\n" +

                       "  'key'='students',\n" +

                       "  'format'='json',\n" +

                       "  'batch-fetch-rows'='1000',\n" +

                       "  'json.fail-on-missing-field' = 'false',\n" +

                       "  'json.ignore-parse-errors' = 'true',\n" +

                       "  'command'='BLPOP'\n" +

                       " )";


       String daeamon =

               "CREATE TABLE classes\n" +

                       "(\n" +

                       "    school   string, \n" +

                       "    class_id  BIGINT   ,\n" +

                       "    class_name  string   " +

                       ") \n" +

                       "WITH (\n" +

                       "  'connector'='redis',\n" +

                       "  'host'='10.201.0.33', \n" +

                       "  'port'='6379',\n" +

                       "  'redis-mode'='single', \n" +

                       "  'password'='123456',\n" +

                       "  'lookup.cache.max-rows'='1000',\n" +

                       "  'lookup.cache.ttl'='3600',\n" +

                       "  'lookup.cache.load-all'='true',\n" +

                       "  'database'='0',\n" +

                       "  'command'='HGET'\n" +

                       " )";


       /**

        *  1、这里因为command是SET,所以需要一个key,这里key就是使用主键,多个就用下划线拼接起来,

        *  2、并行度配置项sink.parallelism没有配置,默认为核心数

        */

       String sink =

               "CREATE TABLE sink_students\n" +

                       "(\n" +

                       "    school   string, \n" +

                       "    number  BIGINT ,\n" +

                       "    name  string,\n" +

                       "    class_id   BIGINT, \n" +

                       "    class_name   string, \n" +

                       "    primary key(school,number) not enforced" +

                       ") \n" +

                       "WITH (\n" +

                       "  'connector'='redis',\n" +

                       "  'host'='10.201.0.33', \n" +

                       "  'port'='6379',\n" +

                       "  'redis-mode'='single', \n" +

                       "  'password'='123456',\n" +

                       "  'database'='0',\n" +

                       "  'format'='json',\n" +

                       "  'batch-fetch-rows'='1000',\n" +

                       "  'json.fail-on-missing-field' = 'false',\n" +

                       "  'json.ignore-parse-errors' = 'true',\n" +

                       "  'command'='SET'\n" +

                       " )";


       tEnv.executeSql(source);

       tEnv.executeSql(daeamon);

       tEnv.executeSql(sink);


       String sql =

               " insert into sink_students "

                       + " select s.school,s.number,s.name,s.class_id,d.class_name  from students s"

                       + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school";

       TableResult tableResult = tEnv.executeSql(sql);

       tableResult.getJobClient().get().getJobExecutionResult().get();

       long end = System.currentTimeMillis();

       System.out.println("耗时:" + (end - start) + "ms");

   }

网络异常,图片无法展示
|

4.3、使用HSET命令作为sink表写入命令(不指定key)

网络异常,图片无法展示
|

   @Test

   publicvoid testHSet() throws Exception {

       long start = System.currentTimeMillis();

       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


       EnvironmentSettings environmentSettings =

               EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

       StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);


       String source =

               "CREATE TABLE students\n" +

                       "(\n" +

                       "    number  BIGINT ,\n" +

                       "    name  string,\n" +

                       "    school   string, \n" +

                       "    class_id   BIGINT, \n" +

                       "    proctime as PROCTIME() \n" +

                       ") \n" +

                       "WITH (\n" +

                       "  'connector'='redis',\n" +

                       "  'host'='10.201.0.33', \n" +

                       "  'port'='6379',\n" +

                       "  'redis-mode'='single', \n" +

                       "  'password'='123456',\n" +

                       "  'database'='0',\n" +

                       "  'key'='students',\n" +

                       "  'format'='json',\n" +

                       "  'batch-fetch-rows'='1000',\n" +

                       "  'json.fail-on-missing-field' = 'false',\n" +

                       "  'json.ignore-parse-errors' = 'true',\n" +

                       "  'command'='BLPOP'\n" +

                       " )";


       String daeamon =

               "CREATE TABLE classes\n" +

                       "(\n" +

                       "    school   string, \n" +

                       "    class_id  BIGINT   ,\n" +

                       "    class_name  string   " +

                       ") \n" +

                       "WITH (\n" +

                       "  'connector'='redis',\n" +

                       "  'host'='10.201.0.33', \n" +

                       "  'port'='6379',\n" +

                       "  'redis-mode'='single', \n" +

                       "  'password'='123456',\n" +

                       "  'lookup.cache.max-rows'='1000',\n" +

                       "  'lookup.cache.ttl'='3600',\n" +

                       "  'lookup.cache.load-all'='true',\n" +

                       "  'database'='0',\n" +

                       "  'command'='HGET'\n" +

                       " )";


       /**

        *  1、这里因为command是HSET,所以需要一个key和一个field,这里是按照表申明的顺序,第一个作为key,

        *  第二个作为field,由于需要更新,也需要一个主键,这里最好把前两个字段一起作为主键

        *  2、作为sink有一个sink.key.ttl参数可以设置key保存在redis的ttl生存时间,单位秒,默认为-1表示长期保存

        */

       String sink =

               "CREATE TABLE sink_students\n" +

                       "(\n" +

                       "    school   string, \n" +

                       "    number  BIGINT ,\n" +

                       "    name  string,\n" +

                       "    class_id   BIGINT, \n" +

                       "    class_name   string, \n" +

                       "    primary key(school,number) not enforced" +

                       ") \n" +

                       "WITH (\n" +

                       "  'connector'='redis',\n" +

                       "  'host'='10.201.0.33', \n" +

                       "  'port'='6379',\n" +

                       "  'redis-mode'='single', \n" +

                       "  'password'='123456',\n" +

                       "  'database'='0',\n" +

                       "  'format'='json',\n" +

                       "  'batch-fetch-rows'='1000',\n" +

                       "  'json.fail-on-missing-field' = 'false',\n" +

                       "  'json.ignore-parse-errors' = 'true',\n" +

                       "  'sink.parallelism' = '16',\n" +

                       "  'sink.key.ttl' = '300',\n" +

                       "  'command'='HSET'\n" +

                       " )";


       tEnv.executeSql(source);

       tEnv.executeSql(daeamon);

       tEnv.executeSql(sink);


       String sql =

               " insert into sink_students "

                       + " select s.school,s.number,s.name,s.class_id,d.class_name  from students s"

                       + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school";

       TableResult tableResult = tEnv.executeSql(sql);

       tableResult.getJobClient().get().getJobExecutionResult().get();

       long end = System.currentTimeMillis();

       System.out.println("耗时:" + (end - start) + "ms");

   }

网络异常,图片无法展示
|

4.4、使用HSET命令作为sink表写入命令(指定key)

网络异常,图片无法展示
|

   @Test

   publicvoid testHSetWithKey() throws Exception {

       long start = System.currentTimeMillis();

       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();


       EnvironmentSettings environmentSettings =

               EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();

       StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, environmentSettings);


       String source =

               "CREATE TABLE students\n" +

                       "(\n" +

                       "    number  BIGINT ,\n" +

                       "    name  string,\n" +

                       "    school   string, \n" +

                       "    class_id   BIGINT, \n" +

                       "    proctime as PROCTIME() \n" +

                       ") \n" +

                       "WITH (\n" +

                       "  'connector'='redis',\n" +

                       "  'host'='10.201.0.33', \n" +

                       "  'port'='6379',\n" +

                       "  'redis-mode'='single', \n" +

                       "  'password'='123456',\n" +

                       "  'database'='0',\n" +

                       "  'key'='students',\n" +

                       "  'format'='json',\n" +

                       "  'batch-fetch-rows'='1000',\n" +

                       "  'json.fail-on-missing-field' = 'false',\n" +

                       "  'json.ignore-parse-errors' = 'true',\n" +

                       "  'command'='BLPOP'\n" +

                       " )";


       String daeamon =

               "CREATE TABLE classes\n" +

                       "(\n" +

                       "    school   string, \n" +

                       "    class_id  BIGINT   ,\n" +

                       "    class_name  string   " +

                       ") \n" +

                       "WITH (\n" +

                       "  'connector'='redis',\n" +

                       "  'host'='10.201.0.33', \n" +

                       "  'port'='6379',\n" +

                       "  'redis-mode'='single', \n" +

                       "  'password'='123456',\n" +

                       "  'lookup.cache.max-rows'='1000',\n" +

                       "  'lookup.cache.ttl'='3600',\n" +

                       "  'lookup.cache.load-all'='true',\n" +

                       "  'database'='0',\n" +

                       "  'command'='HGET'\n" +

                       " )";


       /**

        *  1、这里因为command是HSET,所以需要一个key和一个field,这里配置项指定了key,那么主键拼接就作为field,

        *  使用hset保存到redis

        *  2、作为sink有一个sink.key.ttl参数可以设置key保存在redis的ttl生存时间,单位秒,默认-1表示长期保存

        */

       String sink =

               "CREATE TABLE sink_students\n" +

                       "(\n" +

                       "    school   string, \n" +

                       "    number  BIGINT ,\n" +

                       "    name  string,\n" +

                       "    class_id   BIGINT, \n" +

                       "    class_name   string, \n" +

                       "    primary key(number) not enforced" +

                       ") \n" +

                       "WITH (\n" +

                       "  'connector'='redis',\n" +

                       "  'host'='10.201.0.33', \n" +

                       "  'port'='6379',\n" +

                       "  'redis-mode'='single', \n" +

                       "  'password'='123456',\n" +

                       "  'database'='0',\n" +

                       "  'format'='json',\n" +

                       "  'key'='sink_students_hset',\n" +

                       "  'batch-fetch-rows'='1000',\n" +

                       "  'json.fail-on-missing-field' = 'false',\n" +

                       "  'json.ignore-parse-errors' = 'true',\n" +

                       "  'sink.parallelism' = '16',\n" +

                       "  'sink.key.ttl' = '300',\n" +

                       "  'command'='HSET'\n" +

                       " )";


       tEnv.executeSql(source);

       tEnv.executeSql(daeamon);

       tEnv.executeSql(sink);


       String sql =

               " insert into sink_students "

                       + " select s.school,s.number,s.name,s.class_id,d.class_name  from students s"

                       + " left join classes for system_time as of s.proctime as d on d.class_id = s.class_id and d.school = s.school";

       TableResult tableResult = tEnv.executeSql(sql);

       tableResult.getJobClient().get().getJobExecutionResult().get();

       long end = System.currentTimeMillis();

       System.out.println("耗时:" + (end - start) + "ms");

   }

网络异常,图片无法展示
|

5、配置说明

配置项 描述
host redis的host
port redis的port
password redis的password
cluster-nodes redis的集群节点,ip和端口之间用英文冒号分隔,多个ip端口用英文逗号分割
master.name redis的sentinel模式的master节点的名称
sentinels.info redis的sentinel模式的info信息
sentinels.password redis的sentinel模式的密码
database redis的database,一般是0~15
command redis的命令,作为流表时支持BLPOP、BRPOP、LPOP、RPOP、SPOP;作为维表时支持GET、HGET;作为sink表时支持LPUSH、RPUSH、SADD、SET、HSET
redis-mode redis的部署模式,single、cluster、sentinel
key redis需要访问的key,比如数据是以某个固定的key存放在redis里,值是一个list;redis执行lpush、rpush、sadd、hset等sink使用的命令时的key;
timeout 连接redis的超时时间,单位毫秒
max-total 连接redis的连接池的最大连接数
max-idle 连接redis的连接池的最大空闲数
min-idle 连接redis的连接池的最小空闲数
format 格式化数据格式,如json、csv
batch-fetch-rows 像LPOP、BLPOP、RPOP、BRPOP这种命令每次从redis拿到数据的条数
lookup.cache.max-rows 作为维表lookup模式,缓存在内存中的数据的最大条数
lookup.cache.ttl 作为维表lookup模式,缓存在内存中的数据的ttl超时时间,单位秒
lookup.max-retries 作为维表lookup模式,查找数据的失败重试次数
lookup.cache.load-all 作为维表lookup模式,查找数据是否加载所有,主要是针对hget命令,如:HGET KEY_NAME FIELD_NAME;是否根据key查出所有field的值,这里可以根据实际hash表的大小决定是否要查询所有出来缓存起来
sink.max-retries redis作为sink源时,最大重试次数
sink.parallelism redis作为sink源时,sink的并行数,默认并行度为核心数
sink.key.ttl redis作为sink源时,sink的数据保存在redis的ttl超时时间,单位秒,默认为-1表示长期保存
lookup.max-retries 作为维表lookup模式,查找数据的失败重试次数
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
4月前
|
缓存 NoSQL 数据库
探秘Redis读写策略:CacheAside、读写穿透、异步写入
本文介绍了 Redis 的三种高可用性读写模式:CacheAside、Read/Write Through 和 Write Behind Caching。CacheAside 简单易用,但可能引发数据不一致;Read/Write Through 保证数据一致性,但性能可能受限于数据库;Write Behind Caching 提高写入性能,但有数据丢失风险。开发者应根据业务需求选择合适模式。
411 2
探秘Redis读写策略:CacheAside、读写穿透、异步写入
|
15天前
|
缓存 NoSQL Redis
【Azure Redis 缓存】Azure Redis读写比较慢/卡的问题排查
【Azure Redis 缓存】Azure Redis读写比较慢/卡的问题排查
|
2月前
|
负载均衡 NoSQL Java
|
3月前
|
NoSQL Java 关系型数据库
非关系型数据库NoSQL数据层解决方案 之 redis springboot整合与读写操作 2024详解以及window版redis5.0.14下载
非关系型数据库NoSQL数据层解决方案 之 redis springboot整合与读写操作 2024详解以及window版redis5.0.14下载
35 0
|
运维 NoSQL Java
【服务器运维】硬盘满了,Redis读写失败 MISCONF Redis is configured to save RDB snapshots, but is currently not able
【服务器运维】硬盘满了,Redis读写失败 MISCONF Redis is configured to save RDB snapshots, but is currently not able
104 0
|
存储 缓存 NoSQL
koa-redis进行数据的读写
koa-redis进行数据的读写
|
存储 缓存 NoSQL
读写 Redis RESP3 协议以及Redis 6.0客户端缓存
  在四月份的一篇翻译的文章中,我介绍了读写Redis RESP version 2的协议的Go 语言的实现,你可以使用它采用底层的方式读写5.0以及以下版本的Redis。Redis 6.0还在开发之中年底或者明年初就要发布了。Redis 6.0支持多线程I/O,还有客户端缓存。
410 0
|
缓存 NoSQL Java
补习系列(14)-springboot redis 整合-数据读写
一、简介 在 [补习系列(A3)-springboot redis 与发布订阅]() 一文中,我们介绍了使用 Redis 实现消息订阅发布的机制,并且给出了一个真实用例。然而,绝大多数场景下 Redis 是作为缓存被使用的(这是其主要优势)。
2067 0
|
缓存 NoSQL Redis
使用ServiceStack.Redis实现Redis数据读写
原文:使用ServiceStack.Redis实现Redis数据读写 User.cs实体类 public class User { public string Name { get; set; ...
1703 0
|
Web App开发 NoSQL Redis
下一篇
DDNS