tranformation
reduce
对按key分类的数据流进行"滚动"压缩。可以将当前元素与前一个元素进行整合处理,并返回一个新值。可用于前后数据的计算或拼接。
例如下面例子,通过 id 字段进行分类,然后使用 reduce 进行压缩处理,每次将学生的名字字段进行拼接,年龄进行相加,返回一个新的对象:
SingleOutputStreamOperator operator = source .map((MapFunction<String, Student>) value -> parseTokens2Object(parseString2Tokens(value))) .keyBy((KeySelector<Student, Integer>) value -> value == null ? 0 : value.getId()) .reduce((ReduceFunction<Student>) (value1, value2) -> { Student student = new Student(); student.setId(value1.getId() + value2.getId()); student.setName(value1.getName() + " || " + value2.getName()); student.setAge(value1.getAge() + value2.getAge()); return student; });
16> Student(id=2, name=name2, age=22) 11> Student(id=1, name=name1, age=21) 15> Student(id=3, name=name13, age=28) 16> Student(id=5, name=name10, age=25) 16> Student(id=10, name=name10 || name15, age=55) 16> Student(id=4, name=name2 || name7, age=44) 16> Student(id=6, name=name2 || name7 || name12, age=71) ... 16> Student(id=15, name=name10 || name15 || name5, age=80) 1> Student(id=12, name=name4 || name9 || name14, age=77) 11> Student(id=3, name=name1 || name6 || name11, age=68)
从结果可以看到,id 相同的都分到同一个分区(测试中可以简单通过前面的线程 id 确认,属于同一分区处理),然后传入新对象,按照 reduce 的操作进行了处理,返回了拼装之后的对象。
Fold
滚动折叠。合并当前元素和上一个被折叠的值,输入值可以与返回值类型不一样。
例如数据流是一组数字 [1, 5, 7],想要输出一个拼接后的字符串,可以通过下面进行处理:
// 标准格式 keyedStream.fold(${initialValue}, (s1, s2) -> s1 + " || " + s2); SingleOutputStreamOperator operator = source .map((MapFunction<String, Student>) value -> parseTokens2Object(parseString2Tokens(value))) .keyBy("id") .fold("strat", new FoldFunction<Student, String>() { @Override public String fold(String accumulator, Student value) throws Exception { return accumulator + " || " + value; } });
16> strat || Student(id=2, name=name2, age=22) 11> strat || Student(id=1, name=name1, age=21) 15> strat || Student(id=3, name=name13, age=28) 16> strat || Student(id=2, name=name2, age=22) || Student(id=2, name=name12, age=27) 1> strat || Student(id=4, name=name9, age=24) 11> strat || Student(id=1, name=name1, age=21) || Student(id=1, name=name6, age=21) 1> strat || Student(id=4, name=name9, age=24) || Student(id=4, name=name14, age=29) 16> strat || Student(id=2, name=name2, age=22) || Student(id=2, name=name12, age=27) || Student(id=2, name=name7, age=22) ...
从输出结果可以看到,初始值和每次处理的对象进行了拼接,最后返回的是折叠后的对象,不过该方法被标注为 @Deprecated,不建议继续使用。
Project
选择部分字段。注意,只对元组 Tuple 类型的输入流有效,输出的也是选择下标的新元组数据流。例如下面,选择是下标 1 和 3 的属性:
DataStreamSource<Tuple4<String, Integer, Long, BigDecimal>> customSource = env.fromCollection( Lists.newArrayList( Tuple4.of("one", 1, 1L, BigDecimal.ONE), Tuple4.of("two", 2, 2L, BigDecimal.ZERO), Tuple4.of("three", 3, 3L, BigDecimal.TEN), Tuple4.of("four", 4, 4L, BigDecimal.TEN) ) ); // 分离下标 1,3 到新到数据流 DataStream<Tuple2<Integer, BigDecimal>> tuple2DataStreamSource = customSource.project(1, 3);
[1, 1] [2, 0] [3, 10] [4, 10]
window
窗口。该函数允许在已分区的 KeyedStream 上定义窗口。例如最近 5 秒内到达的数据:
dataStream.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(5))); dataStream.keyBy(0).window(SlidingEventTimeWindows.of(Time.seconds(3),Time.seconds(2)))
AllWindowedStream
在 DataStream 上定义窗口。与上面不同的是,此次定义的范围是所有流,windows 会根据某些特征(例如,最近 5 秒内到达的数据)对所有流事件进行划分窗口。
SingleOutputStreamOperator<Student> operator = source .map((MapFunction<String, Student>) value -> parseTokens2Object(parseString2Tokens(value))) .assignTimestampsAndWatermarks(new MyTimestampExtractor()) .windowAll(TumblingEventTimeWindows.of(Time.milliseconds(1))) .apply(new AllWindowFunction<Student, Student, TimeWindow>(){...};
reduce、apply、process、Aggregate、 AggregateFunction、ProcessWindowFunction
全量函数:窗口先缓存所有元素,等到触发条件后对窗口内的全量元素执行计算。 增量函数:窗口保存一份中间数据,每流入一个新元素,新元素与中间数据两两合一,生成新的中间数据。 增量聚合函数计算性能较高,占用存储空间少,主要因为基于中间状态的计算结果,窗 口中只维护中间结果状态值,不需要缓存原始数据。而全量窗口函数使用的代价相对较高, 性能比较弱,主要因为此时算子需要对所有属于该窗口的接入数据进行缓存,然后等到窗口 触发的时候,对所有的原始数据进行汇总计算。
apply和process都是处理全量计算,但工作中正常用process。 process更加底层,更加强大,有open/close生命周期方法,又可获取RuntimeContext。可以自己定时触发计算的定时器,形成自己的定时任务在processElement方法定义定时器 context.timerService().registerEventTimeTimer(timestamp); ,当定时器时间到达,会回调onTimer()方法的计算任务。 Apply只能作用窗口流,而Process既可以作用于窗口流也可以作用于普通流
reduce接受两个相同类型的输入,生成一个同类型输出,所以泛型就一个 <T> maxBy、minBy、sum这3个底层都是由reduce实现的 aggregate的输入值、中间结果值、输出值它们3个类型可以各不相同,泛型有<T, ACC, R> 归约函数(ReduceFunction) 将窗口中收集到的数据两两进行归约。当我们进行流处理时,就是要保存一个状态;每来一个新的数据,就和之前的聚合状态做归约,这样就实现了增量式的聚合。 所有聚合的操作保存在flink的状态内存中,因为他需要跨越多条记录,需要根据key保存状态。数据流入的过程,就是不断计算并更新flink中保存的状态的过程。 ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。这就迫使我们必须在聚合前,先将数 据转换(map)成预期结果类型;而在有些情况下,还需要对状态进行进一步处理才能得到输出结果,这时它们的类型可能不同,使用 ReduceFunction 就会非常麻烦。 使用ReduceFunction能够快速对两个相同类型的数据元素按照指定的方法进行聚合逻辑,实现sum功能。 聚合函数(aggregatefunction) aggregate需要指定一个aggregatefunction函数,可以看做reduce函数的通用版本,这里有三种类型IN,ACC,OUT,分别代表输入类型,累加器类型,输出类型。 接口中有四个方法: createAccumulator():创建一个累加器,这就是为聚合创建一个初始状态,每个聚合任务只会调用一次 add():将输入的元素添加到累加器中,这就是聚合状态,对于新来的数据进行进一步聚合的过程。传入两个参数,当前新到来的数据value,和当前的累加器accumulator;返回一个新的累加器值,对聚合状态进行更新。 getResult():从累加器提取聚合的输出结果。也就是说我们可以定义多个状态,然后基于这些聚合的状态计算出一个结果进行输出。比如计算平均,我们可以设置sum和count两个状态,最终调用这个方法时相除得到最终的结果。这个方法只在窗口要输出结果时调用。 merge():合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口的场景就是会话窗口。 与reduce相比,aggregate的输入格式与输出格式可以不同。更加灵活。
reduceFunction统计每一小时用户的访问量:
package com.rosh.flink.pojo; import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; @AllArgsConstructor @NoArgsConstructor @Data public class UserPojo { private Integer userId; private String name; private String uri; private Long timestamp; } package com.rosh.flink.wartermark; import com.rosh.flink.pojo.UserPojo; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.util.ArrayList; import java.util.List; import java.util.Random; public class WindowTS { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<UserPojo> dataDS = env.fromCollection(getUserLists()); //生成有序水位线 SingleOutputStreamOperator<UserPojo> orderStreamDS = dataDS.assignTimestampsAndWatermarks(WatermarkStrategy.<UserPojo>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<UserPojo>() { @Override public long extractTimestamp(UserPojo element, long recordTimestamp) { return element.getTimestamp(); } })); //聚合 SingleOutputStreamOperator<Tuple2<Integer, Long>> userDS = orderStreamDS.map(new MapFunction<UserPojo, Tuple2<Integer, Long>>() { @Override public Tuple2<Integer, Long> map(UserPojo value) throws Exception { return Tuple2.of(value.getUserId(), 1L); } }); //开窗统计每1小时用户访问了多少次 SingleOutputStreamOperator<Tuple2<Integer, Long>> resultDS = userDS.keyBy(tuple -> tuple.f0) .window(TumblingEventTimeWindows.of(Time.hours(1))) .reduce(new ReduceFunction<Tuple2<Integer, Long>>() { @Override public Tuple2<Integer, Long> reduce(Tuple2<Integer, Long> value1, Tuple2<Integer, Long> value2) throws Exception { value1.f1 = value1.f1 + value2.f1; return value1; } }); resultDS.print(); env.execute("WarterMarkTest"); } private static List<UserPojo> getUserLists() throws NoSuchAlgorithmException { List<UserPojo> lists = new ArrayList<>(); Random random = SecureRandom.getInstanceStrong(); for (int i = 1; i <= 1000; i++) { String uri = "/goods/" + i; int userId = random.nextInt(10); //有序时间 UserPojo userPojo = new UserPojo(userId, "name" + userId, uri, (long) (1000 * i)); //无序时间 lists.add(userPojo); } return lists; } }
AggregateFunction统计人均访问次数
package com.rosh.flink.wartermark; import com.rosh.flink.pojo.UserPojo; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.util.*; public class AggWindowTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); SingleOutputStreamOperator<UserPojo> userDS = env.fromCollection(getUserLists()).assignTimestampsAndWatermarks( WatermarkStrategy.<UserPojo>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<UserPojo>() { @Override public long extractTimestamp(UserPojo element, long recordTimestamp) { return element.getTimestamp(); } }) ); //统计5秒内,人均访问次数 SingleOutputStreamOperator<Double> resultDS = userDS.keyBy(key -> true) .window(SlidingEventTimeWindows.of(Time.seconds(10), Time.seconds(5))) .aggregate(new PeopleHourAvgCount()); resultDS.print("人均访问次数为:"); env.execute("AggWindowTest"); } private static class PeopleHourAvgCount implements AggregateFunction<UserPojo, Tuple2<HashSet<Integer>, Long>, Double> { /** * 初始化累加器 */ @Override public Tuple2<HashSet<Integer>, Long> createAccumulator() { return Tuple2.of(new HashSet<>(), 0L); } /** * */ @Override public Tuple2<HashSet<Integer>, Long> add(UserPojo value, Tuple2<HashSet<Integer>, Long> accumulator) { //distinct userId accumulator.f0.add(value.getUserId()); //次数+1 accumulator.f1 = accumulator.f1 + 1; //返回累加器 return accumulator; } @Override public Double getResult(Tuple2<HashSet<Integer>, Long> accumulator) { return accumulator.f1 * 1.0 / accumulator.f0.size(); } @Override public Tuple2<HashSet<Integer>, Long> merge(Tuple2<HashSet<Integer>, Long> a, Tuple2<HashSet<Integer>, Long> b) { return null; } } /** * 获取随机人数的1000次访问 */ private static List<UserPojo> getUserLists() throws NoSuchAlgorithmException { List<UserPojo> lists = new ArrayList<>(); Random random = SecureRandom.getInstanceStrong(); //获取随机人数 int peopleCount = random.nextInt(20); System.out.println("随机人数为:" + peopleCount); for (int i = 1; i <= 1000; i++) { String uri = "/goods/" + i; int userId = random.nextInt(peopleCount); //有序时间 UserPojo userPojo = new UserPojo(userId, "name" + userId, uri, new Date().getTime()); //无序时间 lists.add(userPojo); } return lists; } }
全窗口函数
窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。 这种计算方式相比于流处理是低效的,但是有的时候必须获取到所有数据才能计算,或者需要获取窗口的起始时间等,那么就必须使用全窗口函数。这是典型的批处理思想。 1.窗口函数(WindowFunction) WindowFunction字面上就是“窗口函数”,他其实就是老版本的通用窗口函数接口。我们可以基于WindowedStream调用apply方法,传入一个WindowFunction的实现类。 WindowFunction可用的功能较少,一般使用ProcessWindowFunction。 ProcessWindowFunction 是 Window API 中最底层的通用窗口函数接口。之所以说它“最底层”,是因为除了可以拿到窗口中的所有数据之外,ProcessWindowFunction 还可以获取到一个“上下文对象”(Context)。这个上下文对象非常强大,不仅能够获取窗口信息,还可以访问当前的时间和状态信息。这里的时间就包括了处理时间(processing time)和事件时间水位线(event time watermark)。这就使得 ProcessWindowFunction 更加灵活、功能更加丰富。事实上,ProcessWindowFunction 是 Flink 底层 API——处理函数(process function)中的一员。
统计10秒访问UV:
package com.rosh.flink.wartermark; import com.rosh.flink.pojo.UserPojo; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.sql.Timestamp; import java.util.*; public class ProcessWindowTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); DataStreamSource<UserPojo> userDS = env.fromCollection(getUserLists()); //水位线 SingleOutputStreamOperator<UserPojo> watermarks = userDS.assignTimestampsAndWatermarks(WatermarkStrategy.<UserPojo>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<UserPojo>() { @Override public long extractTimestamp(UserPojo element, long recordTimestamp) { return element.getTimestamp(); } })); //开窗10秒UV统计 SingleOutputStreamOperator<String> resultDS = watermarks.keyBy(key -> true) .window(TumblingEventTimeWindows.of(Time.seconds(10))) .process(new UserUVCount()); resultDS.print("UV:"); env.execute("ProcessWindowTest"); } private static class UserUVCount extends ProcessWindowFunction<UserPojo, String, Boolean, TimeWindow> { @Override public void process(Boolean aBoolean, ProcessWindowFunction<UserPojo, String, Boolean, TimeWindow>.Context context, Iterable<UserPojo> elements, Collector<String> out) throws Exception { //用户集合 HashSet<Integer> hashSet = new HashSet<>(); for (UserPojo user : elements) { hashSet.add(user.getUserId()); } //获取时间信息 long start = context.window().getStart(); long end = context.window().getEnd(); String rs = "窗口信息,startTime:" + new Timestamp(start) + ",endTime: " + new Timestamp(end) + ",用户访问的次数为:" + hashSet.size(); out.collect(rs); } } private static List<UserPojo> getUserLists() throws NoSuchAlgorithmException { List<UserPojo> lists = new ArrayList<>(); Random random = SecureRandom.getInstanceStrong(); int userCount = random.nextInt(100); for (int i = 1; i <= 1000; i++) { String uri = "/goods/" + i; int userId = random.nextInt(userCount); //有序时间 UserPojo userPojo = new UserPojo(userId, "name" + userId, uri, new Date().getTime()); //无序时间 lists.add(userPojo); } return lists; } }
增量聚合和全窗口函数的结合使用
增量聚合函数处理计算会更高效。全窗口函数的优势在于提供了更多的信息,可以认为是更加“通用”的窗口操作。所以在实际应用中,我们往往希望兼具这两者的优点,把它们结合在一起使用。Flink 的Window API 就给我们实现了这样的用法。 在调用 WindowedStream 的.reduce()和.aggregate()方法时,只是简单地直接传入了一个 ReduceFunction 或 AggregateFunction 进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是 WindowFunction 或者 ProcessWindowFunction。 # ReduceFunction 与 WindowFunction 结合 public <R> SingleOutputStreamOperator<R> reduce( ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) # ReduceFunction 与 ProcessWindowFunction 结合 public <R> SingleOutputStreamOperator<R> reduce( ReduceFunction<T> reduceFunction, ProcessWindowFunction<T, R, K, W> function) # AggregateFunction 与 WindowFunction 结合 public <ACC, V, R> SingleOutputStreamOperator<R> aggregate(AggregateFunction<T, ACC, V> aggFunction, WindowFunction<V, R, K, W> windowFunction) # AggregateFunction 与 ProcessWindowFunction 结合 public <ACC, V, R> SingleOutputStreamOperator<R> aggregate( AggregateFunction<T, ACC, V> aggFunction, ProcessWindowFunction<V, R, K, W> windowFunction) 这样调用的处理机制是:基于第一个参数(增量聚合函数)来处理窗口数据,每来一个数据就做一次聚合;等到窗口需要触发计算时,则调用第二个参数(全窗口函数)的处理逻辑输 出结果。需要注意的是,这里的全窗口函数就不再缓存所有数据了,而是直接将增量聚合函数的结果拿来当作了 Iterable 类型的输入。一般情况下,这时的可迭代集合中就只有一个元素了。 ProcessWindowFunction和其他窗口函数可以一起组合使用,满足你的一切需求,因为将ProcessWindowFunction用于简单的聚合(比如count)是非常低效的。ProcessWindowFunction可以与ReduceFunction、AggregateFunction或FoldFunction组合,以便在元素到达窗口时就增量地聚合它们,减少 ProcessWindowFunction 处理的数据量。当窗口关闭时,ProcessWindowFunction将提供聚合的结果。这允许它在访问ProcessWindowFunction的附加窗口元信息时以增量方式计算窗口。
统计10秒的url浏览量:
package com.rosh.flink.wartermark; import com.alibaba.fastjson.JSONObject; import com.rosh.flink.pojo.UserPojo; import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; import java.security.NoSuchAlgorithmException; import java.security.SecureRandom; import java.sql.Timestamp; import java.util.ArrayList; import java.util.Date; import java.util.List; import java.util.Random; public class UrlWindowTest { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); //读取数据源 DataStreamSource<UserPojo> userDS = env.fromCollection(getUserLists()); //水位线 SingleOutputStreamOperator<UserPojo> waterDS = userDS.assignTimestampsAndWatermarks(WatermarkStrategy.<UserPojo>forMonotonousTimestamps() .withTimestampAssigner(new SerializableTimestampAssigner<UserPojo>() { @Override public long extractTimestamp(UserPojo element, long recordTimestamp) { return element.getTimestamp(); } })); //url count SingleOutputStreamOperator<Tuple2<String, Long>> urlDS = waterDS.map(new MapFunction<UserPojo, Tuple2<String, Long>>() { @Override public Tuple2<String, Long> map(UserPojo value) throws Exception { return Tuple2.of(value.getUri(), 1L); } }); SingleOutputStreamOperator<JSONObject> resultDS = urlDS.keyBy(data -> data.f0) .window(TumblingEventTimeWindows.of(Time.seconds(10))) .reduce(new ReduceFunction<Tuple2<String, Long>>() { @Override public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception { value1.f1 = value1.f1 + value2.f1; return value1; } }, new WindowFunction<Tuple2<String, Long>, JSONObject, String, TimeWindow>() { @Override public void apply(String s, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<JSONObject> out) throws Exception { Tuple2<String, Long> tuple2 = input.iterator().next(); JSONObject jsonObject = new JSONObject(); jsonObject.put("url", tuple2.f0); jsonObject.put("count", tuple2.f1); new Timestamp(window.getStart()); jsonObject.put("startTime", new Timestamp(window.getStart()).toString()); jsonObject.put("endTime", new Timestamp(window.getEnd()).toString()); out.collect(jsonObject); } }); resultDS.print(); env.execute("UrlWindowTest"); } private static List<UserPojo> getUserLists() throws NoSuchAlgorithmException { List<UserPojo> lists = new ArrayList<>(); Random random = SecureRandom.getInstanceStrong(); for (int i = 1; i <= 1000; i++) { //随机生成userId、goodId int userId = random.nextInt(100); int goodId = random.nextInt(50); String uri = "/goods/" + goodId; //有序时间 UserPojo userPojo = new UserPojo(userId, "name" + userId, uri, new Date().getTime()); //无序时间 lists.add(userPojo); } return lists; } }