选用的方案实现过程
最终选用了基于Redis
的有序集合Sorted Set
和Quartz
短轮询进行实现。具体方案是:
- 订单创建的时候,订单ID和当前时间戳分别作为
Sorted Set
的member和score添加到订单队列Sorted Set
中。 - 订单创建的时候,订单ID和推送内容
JSON
字符串分别作为field和value添加到订单队列内容Hash
中。 - 第1步和第2步操作的时候用
Lua
脚本保证原子性。 - 使用一个异步线程通过
Sorted Set
的命令ZREVRANGEBYSCORE
弹出指定数量的订单ID对应的订单队列内容Hash
中的订单推送内容数据进行处理。
对于第4点处理有两种方案:
- 方案一:弹出订单内容数据的同时进行数据删除,也就是
ZREVRANGEBYSCORE
、ZREM
和HDEL
命令要在同一个Lua
脚本中执行,这样的话Lua
脚本的编写难度大,并且由于弹出数据已经在Redis
中删除,如果数据处理失败则可能需要从数据库重新查询补偿。 - 方案二:弹出订单内容数据之后,在数据处理完成的时候再主动删除订单队列
Sorted Set
和订单队列内容Hash
中对应的数据,这样的话需要控制并发,有重复执行的可能性。
最终暂时选用了方案一,也就是从Sorted Set
弹出订单ID并且从Hash
中获取完推送数据之后马上删除这两个集合中对应的数据。方案的流程图大概是这样:
这里先详细说明一下用到的Redis
命令。
Sorted Set相关命令
ZADD
命令 - 将一个或多个成员元素及其分数值加入到有序集当中。
ZADD KEY SCORE1 VALUE1.. SCOREN VALUEN
ZREVRANGEBYSCORE
命令 - 返回有序集中指定分数区间内的所有的成员。有序集成员按分数值递减(从大到小)的次序排列。
ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]
- max:分数区间 - 最大分数。
- min:分数区间 - 最小分数。
- WITHSCORES:可选参数,是否返回分数值,指定则会返回得分值。
- LIMIT:可选参数,offset和count原理和
MySQL
的LIMIT offset,size
一致,如果不指定此参数则返回整个集合的数据。
ZREM
命令 - 用于移除有序集中的一个或多个成员,不存在的成员将被忽略。
ZREM key member [member ...]
Hash相关命令
HMSET
命令 - 同时将多个field-value(字段-值)对设置到哈希表中。
HMSET KEY_NAME FIELD1 VALUE1 ...FIELDN VALUEN
HDEL
命令 - 删除哈希表key中的一个或多个指定字段,不存在的字段将被忽略。
HDEL KEY_NAME FIELD1.. FIELDN
Lua相关
- 加载
Lua
脚本并且返回脚本的SHA-1
字符串:SCRIPT LOAD script
。 - 执行已经加载的
Lua
脚本:EVALSHA sha1 numkeys key [key ...] arg [arg ...]
。 unpack
函数可以把table
类型的参数转化为可变参数,不过需要注意的是unpack
函数必须使用在非变量定义的函数调用的最后一个参数,否则会失效,详细见Stackoverflow
的提问table.unpack() only returns the first element。
PS:如果不熟悉Lua语言,建议系统学习一下,因为想用好Redis,一定离不开Lua。
引入依赖:
<dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-dependencies</artifactId> <version>2.1.7.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>org.quartz-scheduler</groupId> <artifactId>quartz</artifactId> <version>2.3.1</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>3.1.0</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-context-support</artifactId> <version>5.1.9.RELEASE</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.8</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.59</version> </dependency> </dependencies> 复制代码
编写Lua
脚本/lua/enqueue.lua
和/lua/dequeue.lua
:
-- /lua/enqueue.lua local zset_key = KEYS[1] local hash_key = KEYS[2] local zset_value = ARGV[1] local zset_score = ARGV[2] local hash_field = ARGV[3] local hash_value = ARGV[4] redis.call('ZADD', zset_key, zset_score, zset_value) redis.call('HSET', hash_key, hash_field, hash_value) return nil -- /lua/dequeue.lua -- 参考jesque的部分Lua脚本实现 local zset_key = KEYS[1] local hash_key = KEYS[2] local min_score = ARGV[1] local max_score = ARGV[2] local offset = ARGV[3] local limit = ARGV[4] -- TYPE命令的返回结果是{'ok':'zset'}这样子,这里利用next做一轮迭代 local status, type = next(redis.call('TYPE', zset_key)) if status ~= nil and status == 'ok' then if type == 'zset' then local list = redis.call('ZREVRANGEBYSCORE', zset_key, max_score, min_score, 'LIMIT', offset, limit) if list ~= nil and #list > 0 then -- unpack函数能把table转化为可变参数 redis.call('ZREM', zset_key, unpack(list)) local result = redis.call('HMGET', hash_key, unpack(list)) redis.call('HDEL', hash_key, unpack(list)) return result end end end return nil 复制代码
编写核心API代码:
// Jedis提供者 @Component public class JedisProvider implements InitializingBean { private JedisPool jedisPool; @Override public void afterPropertiesSet() throws Exception { jedisPool = new JedisPool(); } public Jedis provide(){ return jedisPool.getResource(); } } // OrderMessage @Data public class OrderMessage { private String orderId; private BigDecimal amount; private Long userId; } // 延迟队列接口 public interface OrderDelayQueue { void enqueue(OrderMessage message); List<OrderMessage> dequeue(String min, String max, String offset, String limit); List<OrderMessage> dequeue(); String enqueueSha(); String dequeueSha(); } // 延迟队列实现类 @RequiredArgsConstructor @Component public class RedisOrderDelayQueue implements OrderDelayQueue, InitializingBean { private static final String MIN_SCORE = "0"; private static final String OFFSET = "0"; private static final String LIMIT = "10"; private static final String ORDER_QUEUE = "ORDER_QUEUE"; private static final String ORDER_DETAIL_QUEUE = "ORDER_DETAIL_QUEUE"; private static final String ENQUEUE_LUA_SCRIPT_LOCATION = "/lua/enqueue.lua"; private static final String DEQUEUE_LUA_SCRIPT_LOCATION = "/lua/dequeue.lua"; private static final AtomicReference<String> ENQUEUE_LUA_SHA = new AtomicReference<>(); private static final AtomicReference<String> DEQUEUE_LUA_SHA = new AtomicReference<>(); private static final List<String> KEYS = Lists.newArrayList(); private final JedisProvider jedisProvider; static { KEYS.add(ORDER_QUEUE); KEYS.add(ORDER_DETAIL_QUEUE); } @Override public void enqueue(OrderMessage message) { List<String> args = Lists.newArrayList(); args.add(message.getOrderId()); args.add(String.valueOf(System.currentTimeMillis())); args.add(message.getOrderId()); args.add(JSON.toJSONString(message)); try (Jedis jedis = jedisProvider.provide()) { jedis.evalsha(ENQUEUE_LUA_SHA.get(), KEYS, args); } } @Override public List<OrderMessage> dequeue() { // 30分钟之前 String maxScore = String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000); return dequeue(MIN_SCORE, maxScore, OFFSET, LIMIT); } @SuppressWarnings("unchecked") @Override public List<OrderMessage> dequeue(String min, String max, String offset, String limit) { List<String> args = new ArrayList<>(); args.add(min); args.add(max); args.add(offset); args.add(limit); List<OrderMessage> result = Lists.newArrayList(); try (Jedis jedis = jedisProvider.provide()) { List<String> eval = (List<String>) jedis.evalsha(DEQUEUE_LUA_SHA.get(), KEYS, args); if (null != eval) { for (String e : eval) { result.add(JSON.parseObject(e, OrderMessage.class)); } } } return result; } @Override public String enqueueSha() { return ENQUEUE_LUA_SHA.get(); } @Override public String dequeueSha() { return DEQUEUE_LUA_SHA.get(); } @Override public void afterPropertiesSet() throws Exception { // 加载Lua脚本 loadLuaScript(); } private void loadLuaScript() throws Exception { try (Jedis jedis = jedisProvider.provide()) { ClassPathResource resource = new ClassPathResource(ENQUEUE_LUA_SCRIPT_LOCATION); String luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8); String sha = jedis.scriptLoad(luaContent); ENQUEUE_LUA_SHA.compareAndSet(null, sha); resource = new ClassPathResource(DEQUEUE_LUA_SCRIPT_LOCATION); luaContent = StreamUtils.copyToString(resource.getInputStream(), StandardCharsets.UTF_8); sha = jedis.scriptLoad(luaContent); DEQUEUE_LUA_SHA.compareAndSet(null, sha); } } public static void main(String[] as) throws Exception { DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); JedisProvider jedisProvider = new JedisProvider(); jedisProvider.afterPropertiesSet(); RedisOrderDelayQueue queue = new RedisOrderDelayQueue(jedisProvider); queue.afterPropertiesSet(); // 写入测试数据 OrderMessage message = new OrderMessage(); message.setAmount(BigDecimal.valueOf(10086)); message.setOrderId("ORDER_ID_10086"); message.setUserId(10086L); message.setTimestamp(LocalDateTime.now().format(f)); List<String> args = Lists.newArrayList(); args.add(message.getOrderId()); // 测试需要,score设置为30分钟之前 args.add(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000)); args.add(message.getOrderId()); args.add(JSON.toJSONString(message)); try (Jedis jedis = jedisProvider.provide()) { jedis.evalsha(ENQUEUE_LUA_SHA.get(), KEYS, args); } List<OrderMessage> dequeue = queue.dequeue(); System.out.println(dequeue); } } 复制代码
这里先执行一次main()
方法验证一下延迟队列是否生效:
[OrderMessage(orderId=ORDER_ID_10086, amount=10086, userId=10086, timestamp=2019-08-21 08:32:22.885)] 复制代码
确定延迟队列的代码没有问题,接着编写一个Quartz
的Job
类型的消费者OrderMessageConsumer
:
@DisallowConcurrentExecution @Component public class OrderMessageConsumer implements Job { private static final AtomicInteger COUNTER = new AtomicInteger(); private static final ExecutorService BUSINESS_WORKER_POOL = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors(), r -> { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("OrderMessageConsumerWorker-" + COUNTER.getAndIncrement()); return thread; }); private static final Logger LOGGER = LoggerFactory.getLogger(OrderMessageConsumer.class); @Autowired private OrderDelayQueue orderDelayQueue; @Override public void execute(JobExecutionContext jobExecutionContext) throws JobExecutionException { StopWatch stopWatch = new StopWatch(); stopWatch.start(); LOGGER.info("订单消息处理定时任务开始执行......"); List<OrderMessage> messages = orderDelayQueue.dequeue(); if (!messages.isEmpty()) { // 简单的列表等分放到线程池中执行 List<List<OrderMessage>> partition = Lists.partition(messages, 2); int size = partition.size(); final CountDownLatch latch = new CountDownLatch(size); for (List<OrderMessage> p : partition) { BUSINESS_WORKER_POOL.execute(new ConsumeTask(p, latch)); } try { latch.await(); } catch (InterruptedException ignore) { //ignore } } stopWatch.stop(); LOGGER.info("订单消息处理定时任务执行完毕,耗时:{} ms......", stopWatch.getTotalTimeMillis()); } @RequiredArgsConstructor private static class ConsumeTask implements Runnable { private final List<OrderMessage> messages; private final CountDownLatch latch; @Override public void run() { try { // 实际上这里应该单条捕获异常 for (OrderMessage message : messages) { LOGGER.info("处理订单信息,内容:{}", message); } } finally { latch.countDown(); } } } } 复制代码
上面的消费者设计的时候需要有以下考量:
- 使用
@DisallowConcurrentExecution
注解不允许Job
并发执行,其实多个Job
并发执行意义不大,因为我们采用的是短间隔的轮询,而Redis
是单线程处理命令,在客户端做多线程其实效果不佳。 - 线程池
BUSINESS_WORKER_POOL
的线程容量或者队列应该综合LIMIT
值、等分订单信息列表中使用的size
值以及ConsumeTask
里面具体的执行时间进行考虑,这里只是为了方便使用了固定容量的线程池。 ConsumeTask
中应该对每一条订单信息的处理单独捕获异常和吞并异常,或者把处理单个订单信息的逻辑封装成一个不抛出异常的方法。
其他Quartz
相关的代码:
// Quartz配置类 @Configuration public class QuartzAutoConfiguration { @Bean public SchedulerFactoryBean schedulerFactoryBean(QuartzAutowiredJobFactory quartzAutowiredJobFactory) { SchedulerFactoryBean factory = new SchedulerFactoryBean(); factory.setAutoStartup(true); factory.setJobFactory(quartzAutowiredJobFactory); return factory; } @Bean public QuartzAutowiredJobFactory quartzAutowiredJobFactory() { return new QuartzAutowiredJobFactory(); } public static class QuartzAutowiredJobFactory extends AdaptableJobFactory implements BeanFactoryAware { private AutowireCapableBeanFactory autowireCapableBeanFactory; @Override public void setBeanFactory(BeanFactory beanFactory) throws BeansException { this.autowireCapableBeanFactory = (AutowireCapableBeanFactory) beanFactory; } @Override protected Object createJobInstance(TriggerFiredBundle bundle) throws Exception { Object jobInstance = super.createJobInstance(bundle); // 这里利用AutowireCapableBeanFactory从新建的Job实例做一次自动装配,得到一个原型(prototype)的JobBean实例 autowireCapableBeanFactory.autowireBean(jobInstance); return jobInstance; } } } 复制代码
这里暂时使用了内存态的RAMJobStore
去存放任务和触发器的相关信息,如果在生产环境最好替换成基于MySQL
也就是JobStoreTX
进行集群化,最后是启动函数和CommandLineRunner
的实现:
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class, TransactionAutoConfiguration.class}) public class Application implements CommandLineRunner { @Autowired private Scheduler scheduler; @Autowired private JedisProvider jedisProvider; public static void main(String[] args) { SpringApplication.run(Application.class, args); } @Override public void run(String... args) throws Exception { // 准备一些测试数据 prepareOrderMessageData(); JobDetail job = JobBuilder.newJob(OrderMessageConsumer.class) .withIdentity("OrderMessageConsumer", "DelayTask") .build(); // 触发器5秒触发一次 Trigger trigger = TriggerBuilder.newTrigger() .withIdentity("OrderMessageConsumerTrigger", "DelayTask") .withSchedule(SimpleScheduleBuilder.simpleSchedule().withIntervalInSeconds(5).repeatForever()) .build(); scheduler.scheduleJob(job, trigger); } private void prepareOrderMessageData() throws Exception { DateTimeFormatter f = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"); try (Jedis jedis = jedisProvider.provide()) { List<OrderMessage> messages = Lists.newArrayList(); for (int i = 0; i < 100; i++) { OrderMessage message = new OrderMessage(); message.setAmount(BigDecimal.valueOf(i)); message.setOrderId("ORDER_ID_" + i); message.setUserId((long) i); message.setTimestamp(LocalDateTime.now().format(f)); messages.add(message); } // 这里暂时不使用Lua Map<String, Double> map = Maps.newHashMap(); Map<String, String> hash = Maps.newHashMap(); for (OrderMessage message : messages) { // 故意把score设计成30分钟前 map.put(message.getOrderId(), Double.valueOf(String.valueOf(System.currentTimeMillis() - 30 * 60 * 1000))); hash.put(message.getOrderId(), JSON.toJSONString(message)); } jedis.zadd("ORDER_QUEUE", map); jedis.hmset("ORDER_DETAIL_QUEUE", hash); } } } 复制代码
输出结果如下:
2019-08-21 22:45:59.518 INFO 33000 --- [ryBean_Worker-1] club.throwable.OrderMessageConsumer : 订单消息处理定时任务开始执行...... 2019-08-21 22:45:59.525 INFO 33000 --- [onsumerWorker-4] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_91, amount=91, userId=91, timestamp=2019-08-21 22:45:59.475) 2019-08-21 22:45:59.525 INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_95, amount=95, userId=95, timestamp=2019-08-21 22:45:59.475) 2019-08-21 22:45:59.525 INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_97, amount=97, userId=97, timestamp=2019-08-21 22:45:59.475) 2019-08-21 22:45:59.525 INFO 33000 --- [onsumerWorker-0] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_99, amount=99, userId=99, timestamp=2019-08-21 22:45:59.475) 2019-08-21 22:45:59.525 INFO 33000 --- [onsumerWorker-3] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_93, amount=93, userId=93, timestamp=2019-08-21 22:45:59.475) 2019-08-21 22:45:59.539 INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_94, amount=94, userId=94, timestamp=2019-08-21 22:45:59.475) 2019-08-21 22:45:59.539 INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_96, amount=96, userId=96, timestamp=2019-08-21 22:45:59.475) 2019-08-21 22:45:59.539 INFO 33000 --- [onsumerWorker-3] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_92, amount=92, userId=92, timestamp=2019-08-21 22:45:59.475) 2019-08-21 22:45:59.539 INFO 33000 --- [onsumerWorker-0] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_98, amount=98, userId=98, timestamp=2019-08-21 22:45:59.475) 2019-08-21 22:45:59.539 INFO 33000 --- [onsumerWorker-4] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_90, amount=90, userId=90, timestamp=2019-08-21 22:45:59.475) 2019-08-21 22:45:59.540 INFO 33000 --- [ryBean_Worker-1] club.throwable.OrderMessageConsumer : 订单消息处理定时任务执行完毕,耗时:22 ms...... 2019-08-21 22:46:04.515 INFO 33000 --- [ryBean_Worker-2] club.throwable.OrderMessageConsumer : 订单消息处理定时任务开始执行...... 2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-5] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_89, amount=89, userId=89, timestamp=2019-08-21 22:45:59.475) 2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-6] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_87, amount=87, userId=87, timestamp=2019-08-21 22:45:59.475) 2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-7] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_85, amount=85, userId=85, timestamp=2019-08-21 22:45:59.475) 2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-5] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_88, amount=88, userId=88, timestamp=2019-08-21 22:45:59.475) 2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_83, amount=83, userId=83, timestamp=2019-08-21 22:45:59.475) 2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_81, amount=81, userId=81, timestamp=2019-08-21 22:45:59.475) 2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-6] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_86, amount=86, userId=86, timestamp=2019-08-21 22:45:59.475) 2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-2] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_82, amount=82, userId=82, timestamp=2019-08-21 22:45:59.475) 2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-7] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_84, amount=84, userId=84, timestamp=2019-08-21 22:45:59.475) 2019-08-21 22:46:04.516 INFO 33000 --- [onsumerWorker-1] club.throwable.OrderMessageConsumer : 处理订单信息,内容:OrderMessage(orderId=ORDER_ID_80, amount=80, userId=80, timestamp=2019-08-21 22:45:59.475) 2019-08-21 22:46:04.516 INFO 33000 --- [ryBean_Worker-2] club.throwable.OrderMessageConsumer : 订单消息处理定时任务执行完毕,耗时:1 ms...... ...... 复制代码
首次执行的时候涉及到一些组件的初始化,会比较慢,后面看到由于我们只是简单打印订单信息,所以定时任务执行比较快。如果在不调整当前架构的情况下,生产中需要注意:
- 切换
JobStore
为JDBC
模式,Quartz
官方有完整教程,或者看笔者之前翻译的Quartz
文档。 - 需要监控或者收集任务的执行状态,添加预警等等。
这里其实有一个性能隐患,命令ZREVRANGEBYSCORE
的时间复杂度可以视为为O(N)
,N
是集合的元素个数,由于这里把所有的订单信息都放进了同一个Sorted Set
(ORDER_QUEUE
)中,所以在一直有新增数据的时候,dequeue
脚本的时间复杂度一直比较高,后续订单量升高之后会此处一定会成为性能瓶颈,后面会给出解决的方案。
小结
这篇文章主要从一个实际生产案例的仿真例子入手,分析了当前延时任务的一些实现方案,还基于Redis
和Quartz
给出了一个完整的示例。当前的示例只是处于可运行的状态,有些问题尚未解决。下一篇文章会着眼于解决两个方面的问题:
- 分片。
- 监控。
还有一点,架构是基于业务形态演进出来的,很多东西需要结合场景进行方案设计和改进,思路仅供参考,切勿照搬代码。
附件
- Markdown和PPT原件:github.com/zjcscut/blo…
- Github Page:www.throwable.club/2019/08/21/…
- Coding Page:throwable.coding.me/2019/08/21/…