Spring-Batch读取数据 文本数据 数据库数据 XML数据 JSON数据

本文涉及的产品
日志服务 SLS,月写入数据量 50GB 1个月
简介: Spring-Batch读取数据 文本数据 数据库数据 XML数据 JSON数据

Spring Batch读取数据通过 ItemReader接口的实现类来完成,包括 FlatFileItemReader文本数据读取、 StaxEventItemReader XML文件数据读取、 JsonItemReader JSON文件数据读取、 JdbcPagingItemReader数据库分页数据读取等实现,更多可用的实现可以参考: https://docs.spring.io/spring-batch/docs/4.2.x/reference/html/appendix.html#itemReadersAppendix,本文只介绍这四种比较常用的读取数据方式。


1、框架搭建


新建一个Spring Boot项目,版本为2.5.9,artifactId为spring-batch-itemreader


剩下的数据库层的准备,项目配置,依赖引入和Spring Batch入门文章中的框架搭建步骤一致,这里就不再赘述。


2、简单数据读取


前面提到,Spring Batch读取数据是通过ItemReader接口的实现类来完成的,所以我们可以自定义一个ItemReader的实现类,实现简单数据的读取。


新建reader包,然后在该包下新建ItemReader接口的实现类MySimpleIteamReader:

public class MySimpleIteamReader implements ItemReader<String> {
    private Iterator<String> iterator;
    public MySimpleIteamReader(List<String> data) {
        this.iterator = data.iterator();
    }
    @Override
    public String read() {
        // 数据一个接着一个读取
        return iterator.hasNext() ? iterator.next() : null;
    }
}


泛型指定读取数据的格式,这里读取的是String类型的List,read()方法的实现也很简单,就是遍历集合数据。


接着新建job包,然后在该包下新建MySimpleItemReaderDemo类,用于测试我们定义的MySimpleIteamReader,MySimpleItemReaderDemo类代码如下:

@Component
public class MySimpleItemReaderDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Bean
    public Job mySimpleItemReaderJob() {
        return jobBuilderFactory.get("mySimpleItemReaderJob")
                .start(step())
                .build();
    }
    private Step step() {
        return stepBuilderFactory.get("step")
                .<String, String>chunk(2)
                .reader(mySimpleItemReader())
                .writer(list -> list.forEach(System.out::println))  // 简单输出,后面再详细介绍writer
                .build();
    }
    private ItemReader<String> mySimpleItemReader() {
        List<String> data = Arrays.asList("java", "c++", "javascript", "python");
        return new MySimpleIteamReader(data);
    }
}


上面代码中,我们通过mySimpleItemReader()方法创建了一个MySimpleIteamReader,并且传入了List数据。上面代码大体和上一节中介绍的差不多,最主要的区别就是Step的创建过程稍有不同。


在MySimpleItemReaderDemo类中,我们通过StepBuilderFactory创建步骤Step,不过不再是使用tasklet()方法创建,而是使用chunk()方法。chunk字面上的意思是“块”的意思,可以简单理解为数据块,泛型<String, String>用于指定读取的数据和输出的数据类型,构造器入参指定了数据块的大小,比如指定为2时表示每当读取2组数据后做一次数据输出处理。接着reader()方法指定读取数据的方式,该方法接收ItemReader的实现类,这里使用的是我们自定义的MySimpleIteamReader。writer()方法指定数据输出方式,因为这块不是本文的重点,所以先简单遍历输出即可。


启动项目,控制台日志打印如下:

1673404528533.jpg


3、文本数据读取


Spring Batch读取文本类型数据可以通过FlatFileItemReader实现,在演示怎么使用之前,我们先准备好数据文件。


在resources目录下新建file文件,内容如下:

// 演示文件数据读取
1,11,12,13
2,21,22,23
3,31,32,33
4,41,42,43
5,51,52,53
6,61,62,63


file的数据是一行一行以逗号分隔的数据(在批处理业务中,文本类型的数据文件一般都是有一定规律的)。在文本数据读取的过程中,我们需要将读取的数据转换为POJO对象存储,所以我们需要创建一个与之对应的POJO对象。在包下新建entity包,然后在该包下新建TestData类:

public class TestData {
    private int id;
    private String field1;
    private String field2;
    private String field3;
    // get,set,toString略
}


因为file文本中的一行数据经过逗号分隔后为1、11、12、13,所以我们创建的与之对应的POJO TestData包含4个属性id、field1、field2和field3。


接着在job包下新建FileItemReaderDemo:

@Component
public class FileItemReaderDemo {
    // 任务创建工厂
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    // 步骤创建工厂
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Bean
    public Job fileItemReaderJob() {
        return jobBuilderFactory.get("fileItemReaderJob")
                .start(step())
                .build();
    }
    private Step step() {
        return stepBuilderFactory.get("step")
                .<TestData, TestData>chunk(2)
                .reader(fileItemReader())
                .writer(list -> list.forEach(System.out::println))
                .build();
    }
    private ItemReader<TestData> fileItemReader() {
        FlatFileItemReader<TestData> reader = new FlatFileItemReader<>();
        reader.setResource(new ClassPathResource("file")); // 设置文件资源地址
        reader.setLinesToSkip(1); // 忽略第一行
        // AbstractLineTokenizer的三个实现类之一,以固定分隔符处理行数据读取,
        // 使用默认构造器的时候,使用逗号作为分隔符,也可以通过有参构造器来指定分隔符
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        // 设置属性名,类似于表头
        tokenizer.setNames("id", "field1", "field2", "field3");
        // 将每行数据转换为TestData对象
        DefaultLineMapper<TestData> mapper = new DefaultLineMapper<>();
        // 设置LineTokenizer
        mapper.setLineTokenizer(tokenizer);
        // 设置映射方式,即读取到的文本怎么转换为对应的POJO
        mapper.setFieldSetMapper(fieldSet -> {
            TestData data = new TestData();
            data.setId(fieldSet.readInt("id"));
            data.setField1(fieldSet.readString("field1"));
            data.setField2(fieldSet.readString("field2"));
            data.setField3(fieldSet.readString("field3"));
            return data;
        });
        reader.setLineMapper(mapper);
        return reader;
    }
}


上面代码中,我们在fileItemReader()方法里编写了具体的文本数据读取代码,过程参考注释即可。DelimitedLineTokenizer分隔符行处理器的默认构造器源码如下所示:

1673404582270.jpg


常量DELIMITER_COMMA的值为public static final String DELIMITER_COMMA = ",";,假如我们的数据并不是用逗号分隔,而是用|等字符分隔的话,可以使用它的有参构造器指定:

DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer("|");

DelimitedLineTokenizer是AbstractLineTokenizer三个实现类之一:

1673404597665.jpg

顾名思义,FixedLengthTokenizer通过指定的固定长度来截取数据,RegexLineTokenizer通过正则表达式来匹配数据,这里就不演示了,有兴趣的可以自己玩玩。


编写好FileItemReaderDemo后,启动项目,控制台日志打印如下:

1673404607721.jpg


4、数据库数据读取


在演示从数据库中读取数据之前,我们先准备好测试数据。在springbatch数据库中新建一张TEST表,SQL语句如下所示:

-- ----------------------------
-- Table structure for TEST
-- ----------------------------
DROP TABLE IF EXISTS `TEST`;
CREATE TABLE `TEST` (
  `id` bigint(10) NOT NULL COMMENT 'ID',
  `field1` varchar(10) NOT NULL COMMENT '字段一',
  `field2` varchar(10) NOT NULL COMMENT '字段二',
  `field3` varchar(10) NOT NULL COMMENT '字段三',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
-- ----------------------------
-- Records of TEST
-- ----------------------------
BEGIN;
INSERT INTO `TEST` VALUES (1, '11', '12', '13');
INSERT INTO `TEST` VALUES (2, '21', '22', '23');
INSERT INTO `TEST` VALUES (3, '31', '32', '33');
INSERT INTO `TEST` VALUES (4, '41', '42', '43');
INSERT INTO `TEST` VALUES (5, '51', '52', '53');
INSERT INTO `TEST` VALUES (6, '61', '62', '63');
COMMIT;


TEST表的字段和上面创建的TestData实体类一致。


然后在job包下新建DataSourceItemReaderDemo类,测试从数据库中读取数据:

@Component
public class DataSourceItemReaderDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    // 注入数据源
    @Autowired
    private DataSource dataSource;
    @Bean
    public Job dataSourceItemReaderJob() throws Exception {
        return jobBuilderFactory.get("dataSourceItemReaderJob")
                .start(step())
                .build();
    }
    private Step step() throws Exception {
        return stepBuilderFactory.get("step")
                .<TestData, TestData>chunk(2)
                .reader(dataSourceItemReader())
                .writer(list -> list.forEach(System.out::println))
                .build();
    }
    private ItemReader<TestData> dataSourceItemReader() throws Exception {
        JdbcPagingItemReader<TestData> reader = new JdbcPagingItemReader<>();
        reader.setDataSource(dataSource); // 设置数据源
        reader.setFetchSize(5); // 每次取多少条记录
        reader.setPageSize(5); // 设置每页数据量
        // 指定sql查询语句 select id,field1,field2,field3 from TEST
        MySqlPagingQueryProvider provider = new MySqlPagingQueryProvider();
        provider.setSelectClause("id,field1,field2,field3"); //设置查询字段
        provider.setFromClause("from TEST"); // 设置从哪张表查询
        // 将读取到的数据转换为TestData对象
        reader.setRowMapper((resultSet, rowNum) -> {
            TestData data = new TestData();
            data.setId(resultSet.getInt(1));
            data.setField1(resultSet.getString(2)); // 读取第一个字段,类型为String
            data.setField2(resultSet.getString(3));
            data.setField3(resultSet.getString(4));
            return data;
        });
        Map<String, Order> sort = new HashMap<>(1);
        sort.put("id", Order.ASCENDING);
        provider.setSortKeys(sort); // 设置排序,通过id 升序
        reader.setQueryProvider(provider);
        // 设置namedParameterJdbcTemplate等属性
        reader.afterPropertiesSet();
        return reader;
    }
}


dataSourceItemReader()方法中的主要步骤就是:通过JdbcPagingItemReader设置对应的数据源,然后设置数据量、获取数据的sql语句、排序规则和查询结果与POJO的映射规则等。方法末尾之所以需要调用JdbcPagingItemReader的afterPropertiesSet()方法是因为需要设置JDBC模板


(afterPropertiesSet()方法源码):

1673404649232.jpg

启动项目,控制台日志打印如下:

1673404657061.jpg


5、XML数据读取


Spring Batch借助Spring OXM可以轻松地实现xml格式数据文件读取。在resources目录下新建file.xml,内容如下所示:

<?xml version="1.0" encoding="utf-8" ?>
<tests>
    <test>
        <id>1</id>
        <field1>11</field1>
        <field2>12</field2>
        <field3>13</field3>
    </test>
    <test>
        <id>2</id>
        <field1>21</field1>
        <field2>22</field2>
        <field3>23</field3>
    </test>
    <test>
        <id>3</id>
        <field1>31</field1>
        <field2>32</field2>
        <field3>33</field3>
    </test>
    <test>
        <id>4</id>
        <field1>41</field1>
        <field2>42</field2>
        <field3>43</field3>
    </test>
    <test>
        <id>5</id>
        <field1>51</field1>
        <field2>52</field2>
        <field3>53</field3>
    </test>
    <test>
        <id>6</id>
        <field1>61</field1>
        <field2>62</field2>
        <field3>63</field3>
    </test>
</tests>


xml文件内容由一组一组的<test></test>标签组成,<test>标签又包含四组子标签,标签名称和TestData实体类属性一一对应。


准备好xml文件后,在pom中引入spring-oxm依赖:

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-oxm</artifactId>
</dependency>
<dependency>
    <groupId>com.thoughtworks.xstream</groupId>
    <artifactId>xstream</artifactId>
    <version>1.4.11.1</version>
</dependency>


接着在job包下新建XmlFileItemReaderDemo,演示xml文件数据获取:

@Component
public class XmlFileItemReaderDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Bean
    public Job xmlFileItemReaderJob() {
        return jobBuilderFactory.get("xmlFileItemReaderJob")
                .start(step())
                .build();
    }
    private Step step() {
        return stepBuilderFactory.get("step")
                .<TestData, TestData>chunk(2)
                .reader(xmlFileItemReader())
                .writer(list -> list.forEach(System.out::println))
                .build();
    }
    private ItemReader<TestData> xmlFileItemReader() {
        StaxEventItemReader<TestData> reader = new StaxEventItemReader<>();
        reader.setResource(new ClassPathResource("file.xml")); // 设置xml文件源
        reader.setFragmentRootElementName("test"); // 指定xml文件的根标签
        // 将xml数据转换为TestData对象
        XStreamMarshaller marshaller = new XStreamMarshaller();
        // 指定需要转换的目标数据类型
        Map<String, Class<TestData>> map = new HashMap<>(1);
        map.put("test", TestData.class);
        marshaller.setAliases(map);
        reader.setUnmarshaller(marshaller);
        return reader;
    }
}


在xmlFileItemReader()方法中,我们通过StaxEventItemReader读取xml文件,代码较简单,看注释即可。


6、JSON数据读取


在resources目录下新建file.json文件,内容如下:

[
  {
    "id": 1,
    "field1": "11",
    "field2": "12",
    "field3": "13"
  },
  {
    "id": 2,
    "field1": "21",
    "field2": "22",
    "field3": "23"
  },
  {
    "id": 3,
    "field1": "31",
    "field2": "32",
    "field3": "33"
  }
]


JSON对象属性和TestData对象属性一一对应。在job包下新建JSONFileItemReaderDemo,用于测试JSON文件数据读取:

@Component
public class JSONFileItemReaderDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Bean
    public Job jsonFileItemReaderJob() {
        return jobBuilderFactory.get("jsonFileItemReaderJob")
                .start(step())
                .build();
    }
    private Step step() {
        return stepBuilderFactory.get("step")
                .<TestData, TestData>chunk(2)
                .reader(jsonItemReader())
                .writer(list -> list.forEach(System.out::println))
                .build();
    }
    private ItemReader<TestData> jsonItemReader() {
        // 设置json文件地址
        ClassPathResource resource = new ClassPathResource("file.json");
        // 设置json文件转换的目标对象类型
        JacksonJsonObjectReader<TestData> jacksonJsonObjectReader = new JacksonJsonObjectReader<>(TestData.class);
        JsonItemReader<TestData> reader = new JsonItemReader<>(resource, jacksonJsonObjectReader);
        // 给reader设置一个别名
        reader.setName("testDataJsonItemReader");
        return reader;
    }
}


7、多文本数据读取


多文本的数据读取本质还是单文件数据读取,区别就是多文件读取需要在单文件读取的方式上设置一层代理。


在resources目录下新建两个文件file1和file2,file1内容如下所示:

// 演示文件数据读取
1,11,12,13
2,21,22,23
3,31,32,33
4,41,42,43
5,51,52,53
6,61,62,63


file2内容如下所示:

// 演示文件数据读取
7,71,72,73
8,81,82,83


然后在job包下新建MultiFileIteamReaderDemo,演示多文件数据读取:

@Component
public class MultiFileIteamReaderDemo {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
    @Bean
    public Job multiFileItemReaderJob() {
        return jobBuilderFactory.get("multiFileItemReaderJob")
                .start(step())
                .build();
    }
    private Step step() {
        return stepBuilderFactory.get("step")
                .<TestData, TestData>chunk(2)
                .reader(multiFileItemReader())
                .writer(list -> list.forEach(System.out::println))
                .build();
    }
    private ItemReader<TestData> multiFileItemReader() {
        MultiResourceItemReader<TestData> reader = new MultiResourceItemReader<>();
        reader.setDelegate(fileItemReader()); // 设置文件读取代理,方法可以使用前面文件读取中的例子
        Resource[] resources = new Resource[]{
                new ClassPathResource("file1"),
                new ClassPathResource("file2")
        };
        reader.setResources(resources); // 设置多文件源
        return reader;
    }
    private FlatFileItemReader<TestData> fileItemReader() {
        FlatFileItemReader<TestData> reader = new FlatFileItemReader<>();
        reader.setLinesToSkip(1); // 忽略第一行
        // AbstractLineTokenizer的三个实现类之一,以固定分隔符处理行数据读取,
        // 使用默认构造器的时候,使用逗号作为分隔符,也可以通过有参构造器来指定分隔符
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        // 设置属姓名,类似于表头
        tokenizer.setNames("id", "field1", "field2", "field3");
        // 将每行数据转换为TestData对象
        DefaultLineMapper<TestData> mapper = new DefaultLineMapper<>();
        mapper.setLineTokenizer(tokenizer);
        // 设置映射方式
        mapper.setFieldSetMapper(fieldSet -> {
            TestData data = new TestData();
            data.setId(fieldSet.readInt("id"));
            data.setField1(fieldSet.readString("field1"));
            data.setField2(fieldSet.readString("field2"));
            data.setField3(fieldSet.readString("field3"));
            return data;
        });
        reader.setLineMapper(mapper);
        return reader;
    }
}


上面代码中fileItemReader()方法在文本数据读取中介绍过了,多文件读取的关键在于multiFileItemReader()方法,该方法通过MultiResourceItemReader对象设置了多个文件的目标地址,并且将单文件的读取方式设置为代理。


启动项目,控制台日志打印如下:

1673403992040.jpg

相关实践学习
日志服务之使用Nginx模式采集日志
本文介绍如何通过日志服务控制台创建Nginx模式的Logtail配置快速采集Nginx日志并进行多维度分析。
相关文章
【YashanDB知识库】python驱动查询gbk字符集崖山数据库CLOB字段,数据被驱动截断
【YashanDB知识库】python驱动查询gbk字符集崖山数据库CLOB字段,数据被驱动截断
有哪些方法可以验证用户输入数据的格式是否符合数据库的要求?
有哪些方法可以验证用户输入数据的格式是否符合数据库的要求?
178 75
【YashanDB 知识库】用 yasldr 配置 Bulkload 模式作单线程迁移 300G 的业务数据到分布式数据库,迁移任务频繁出错
问题描述 详细版本:YashanDB Server Enterprise Edition Release 23.2.4.100 x86_64 6db1237 影响范围: 离线数据迁移场景,影响业务数据入库。 外场将部分 NewCIS 的报表业务放到分布式数据库,验证 SQL 性能水平。 操作系统环境配置: 125G 内存 32C CPU 2T 的 HDD 磁盘 问题出现的步骤/操作: 1、部署崖山分布式数据库 1mm 1cn 3dn 单线启动 yasldr 数据迁移任务,设置 32 线程的 bulk load 模式 2、观察 yasldr.log 是否出现如下错
Hutool创建数据源工厂动态查询不同数据库不同数据表的数据
Hutool创建数据源工厂动态查询不同数据库不同数据表的数据
38 2
|
3月前
|
从建模到运维:联犀如何完美融入时序数据库 TDengine 实现物联网数据流畅管理
本篇文章是“2024,我想和 TDengine 谈谈”征文活动的三等奖作品。文章从一个具体的业务场景出发,分析了企业在面对海量时序数据时的挑战,并提出了利用 TDengine 高效处理和存储数据的方法,帮助企业解决在数据采集、存储、分析等方面的痛点。通过这篇文章,作者不仅展示了自己对数据处理技术的理解,还进一步阐释了时序数据库在行业中的潜力与应用价值,为读者提供了很多实际的操作思路和技术选型的参考。
82 1
微服务——SpringBoot使用归纳——Spring Boot返回Json数据及数据封装——封装统一返回的数据结构
本文介绍了在Spring Boot中封装统一返回的数据结构的方法。通过定义一个泛型类`JsonResult&lt;T&gt;`,包含数据、状态码和提示信息三个属性,满足不同场景下的JSON返回需求。例如,无数据返回时可设置默认状态码&quot;0&quot;和消息&quot;操作成功!&quot;,有数据返回时也可自定义状态码和消息。同时,文章展示了如何在Controller中使用该结构,通过具体示例(如用户信息、列表和Map)说明其灵活性与便捷性。最后总结了Spring Boot中JSON数据返回的配置与实际项目中的应用技巧。
43 0
|
5天前
|
微服务——SpringBoot使用归纳——Spring Boot返回Json数据及数据封装——使用 fastJson 处理 null
本文介绍如何使用 fastJson 处理 null 值。与 Jackson 不同,fastJson 需要通过继承 `WebMvcConfigurationSupport` 类并覆盖 `configureMessageConverters` 方法来配置 null 值的处理方式。例如,可将 String 类型的 null 转为 &quot;&quot;,Number 类型的 null 转为 0,避免循环引用等。代码示例展示了具体实现步骤,包括引入相关依赖、设置序列化特性及解决中文乱码问题。
21 0
|
5天前
|
微服务——SpringBoot使用归纳——Spring Boot返回Json数据及数据封装——Spring Boot 默认对Json的处理
本文介绍了在Spring Boot中返回Json数据的方法及数据封装技巧。通过使用`@RestController`注解,可以轻松实现接口返回Json格式的数据,默认使用的Json解析框架是Jackson。文章详细讲解了如何处理不同数据类型(如类对象、List、Map)的Json转换,并提供了自定义配置以应对null值问题。此外,还对比了Jackson与阿里巴巴FastJson的特点,以及如何在项目中引入和配置FastJson,解决null值转换和中文乱码等问题。
22 0

热门文章

最新文章