flink实战 —— 定时器实现已完成订单自动五星好评

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 云栖号资讯:【点击查看更多行业资讯】在这里您可以找到不同行业的第一手的上云资讯,还在等什么,快来! 背景需求 在电商领域会有这么一个场景,如果用户买了商品,在订单完成之后,24小时之内没有做出评价,系统自动给与五星好评,我们今天主要使用flink的定时器来简单实现这一功能。

云栖号资讯:【点击查看更多行业资讯
在这里您可以找到不同行业的第一手的上云资讯,还在等什么,快来!


背景需求

在电商领域会有这么一个场景,如果用户买了商品,在订单完成之后,24小时之内没有做出评价,系统自动给与五星好评,我们今天主要使用flink的定时器来简单实现这一功能。

案例详解

自定义source

首先我们还是通过自定义source来模拟生成一些订单数据。
在这里,我们生了一个最简单的二元组Tuple2,包含订单id和订单完成时间两个字段。

 public static class MySource implements SourceFunction<Tuple2<String,Long>>{
  private volatile boolean isRunning = true;

  @Override
  public void run(SourceContext<Tuple2<String,Long>> ctx) throws Exception{
   while (isRunning){
    Thread.sleep(1000);
    //订单id
    String orderid = UUID.randomUUID().toString();
    //订单完成时间
    long orderFinishTime = System.currentTimeMillis();
    ctx.collect(Tuple2.of(orderid, orderFinishTime));
   }
  }

  @Override
  public void cancel(){
   isRunning = false;
  }
 }

定时处理逻辑

先上代码, 我们再来依次解释代码

 public static class TimerProcessFuntion
   extends KeyedProcessFunction<Tuple,Tuple2<String,Long>,Object>{

  private MapState<String,Long> mapState;
  //超过多长时间(interval,单位:毫秒) 没有评价,则自动五星好评
  private long interval = 0l;

  public TimerProcessFuntion(long interval){
   this.interval = interval;
  }

  @Override
  public void open(Configuration parameters){
   MapStateDescriptor<String,Long> mapStateDesc = new MapStateDescriptor<>(
     "mapStateDesc",
     String.class, Long.class);
   mapState = getRuntimeContext().getMapState(mapStateDesc);
  }

  @Override
  public void onTimer(
    long timestamp, OnTimerContext ctx, Collector<Object> out) throws Exception{
   Iterator iterator = mapState.iterator();
   while (iterator.hasNext()){
    Map.Entry<String,Long> entry = (Map.Entry<String,Long>) iterator.next();

    String orderid = entry.getKey();
    boolean f = isEvaluation(entry.getKey());
    mapState.remove(orderid);
    if (f){
     LOG.info("订单(orderid: {}) 在  {} 毫秒时间内已经评价,不做处理", orderid, interval);
    }
    if (f){
     //如果用户没有做评价,在调用相关的接口给与默认的五星评价
     LOG.info("订单(orderid: {}) 超过  {} 毫秒未评价,调用接口给与五星自动好评", orderid, interval);
    }
   }
  }

  /**
   * 用户是否对该订单进行了评价,在生产环境下,可以去查询相关的订单系统.
   * 我们这里只是随便做了一个判断
   *
   * @param key
   * @return
   */
  private boolean isEvaluation(String key){
   return key.hashCode() % 2 == 0;
  }

  @Override
  public void processElement(
    Tuple2<String,Long> value, Context ctx, Collector<Object> out) throws Exception{
   mapState.put(value.f0, value.f1);
   ctx.timerService().registerProcessingTimeTimer(value.f1 + interval);
  }
 }
  • 首先我们定义一个MapState类型的状态,key是订单号,value是订单完成时间。
  • 在processElement处理数据的时候,把每个订单的信息存入状态中,这个时候不做任何处理,并且注册一个比订单完成时间大于间隔时间(interval)的定时器.
  • 注册的定时任务在到达了定时器的时间就会触发onTimer方法,我们主要在这个里面进行处理。我们调用外部的接口来判断用户是否做过评价,如果没做评价,调用接口给与五星好评,如果做过评价,则什么也不处理,最后记得把相应的订单从MapState删除。

【云栖号在线课堂】每天都有产品技术专家分享!
课程地址:https://yqh.aliyun.com/live

立即加入社群,与专家面对面,及时了解课程最新动态!
【云栖号在线课堂 社群】https://c.tb.cn/F3.Z8gvnK

原文发布时间:2020-06-16
本文作者:大数据技术与应用实战
本文来自:“掘金”,了解相关信息可以关注“掘金”

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
相关文章
|
7月前
|
SQL Java 关系型数据库
Flink DataSet API迁移到DataStream API实战
本文介绍了作者的Flink项目从DataSet API迁移到DataStream API的背景、方法和遇到的问题以及解决方案。
224 3
|
1月前
|
分布式计算 大数据 Apache
ClickHouse与大数据生态集成:Spark & Flink 实战
【10月更文挑战第26天】在当今这个数据爆炸的时代,能够高效地处理和分析海量数据成为了企业和组织提升竞争力的关键。作为一款高性能的列式数据库系统,ClickHouse 在大数据分析领域展现出了卓越的能力。然而,为了充分利用ClickHouse的优势,将其与现有的大数据处理框架(如Apache Spark和Apache Flink)进行集成变得尤为重要。本文将从我个人的角度出发,探讨如何通过这些技术的结合,实现对大规模数据的实时处理和分析。
143 2
ClickHouse与大数据生态集成:Spark & Flink 实战
|
4天前
|
消息中间件 JSON 数据库
探索Flink动态CEP:杭州银行的实战案例
本文由杭州银行大数据工程师唐占峰、欧阳武林撰写,介绍Flink动态CEP的定义、应用场景、技术实现及使用方式。Flink动态CEP是基于Flink的复杂事件处理库,支持在不重启服务的情况下动态更新规则,适应快速变化的业务需求。文章详细阐述了其在反洗钱、反欺诈和实时营销等金融领域的应用,并展示了某金融机构的实际应用案例。通过动态CEP,用户可以实时调整规则,提高系统的灵活性和响应速度,降低维护成本。文中还提供了具体的代码示例和技术细节,帮助读者理解和使用Flink动态CEP。
236 2
探索Flink动态CEP:杭州银行的实战案例
|
4月前
|
大数据 API 数据处理
揭秘!Flink如何从默默无闻到大数据界的璀璨明星?起源、设计理念与实战秘籍大公开!
【8月更文挑战第24天】Apache Flink是一款源自Stratosphere项目的开源流处理框架,由柏林理工大学等机构于2010至2014年间开发,并于2014年捐赠给Apache软件基金会。Flink设计之初即聚焦于提供统一的数据处理模型,支持事件时间处理、精确一次状态一致性等特性,实现了流批一体化处理。其核心优势包括高吞吐量、低延迟及强大的容错机制。
84 1
|
4月前
|
API C# Shell
WPF与Windows Shell完美融合:深入解析文件系统操作技巧——从基本文件管理到高级Shell功能调用,全面掌握WPF中的文件处理艺术
【8月更文挑战第31天】Windows Presentation Foundation (WPF) 是 .NET Framework 的关键组件,用于构建 Windows 桌面应用程序。WPF 提供了丰富的功能来创建美观且功能强大的用户界面。本文通过问题解答的形式,探讨了如何在 WPF 应用中集成 Windows Shell 功能,并通过具体示例代码展示了文件系统的操作方法,包括列出目录下的所有文件、创建和删除文件、移动和复制文件以及打开文件夹或文件等。
101 0
|
4月前
|
SQL 存储 缓存
实时计算 Flink版产品使用问题之在处理数据流时,有些订单被监听到有些没有被监听到,是什么原因
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
5月前
|
关系型数据库 数据处理 对象存储
实时计算 Flink版产品使用问题之定时器执行存在延迟好几个小时,该如何处理
实时计算Flink版作为一种强大的流处理和批处理统一的计算框架,广泛应用于各种需要实时数据处理和分析的场景。实时计算Flink版通常结合SQL接口、DataStream API、以及与上下游数据源和存储系统的丰富连接器,提供了一套全面的解决方案,以应对各种实时计算需求。其低延迟、高吞吐、容错性强的特点,使其成为众多企业和组织实时数据处理首选的技术平台。以下是实时计算Flink版的一些典型使用合集。
|
7月前
|
BI 数据处理 Apache
[AIGC] 深入理解Flink中的窗口、水位线和定时器
[AIGC] 深入理解Flink中的窗口、水位线和定时器
110 2
|
7月前
|
传感器 存储 缓存
[尚硅谷flink学习笔记] 实战案例TopN 问题
这段内容是关于如何使用Apache Flink解决实时统计水位传感器数据中,在一定时间窗口内出现次数最多的水位问题,即&quot;Top N&quot;问题。首先,介绍了一个使用滑动窗口的简单实现,通过收集传感器数据,按照水位计数,然后排序并输出前两名。接着,提出了全窗口和优化方案,其中优化包括按键分区(按水位vc分组)、开窗操作(增量聚合计算count)和过程函数处理(聚合并排序输出Top N结果)。最后,给出了一个使用`KeyedProcessFunction`进行优化的示例代码,通过按键by窗口结束时间,确保每个窗口的所有数据到达后再进行处理,提高了效率。
197 1
|
Java 程序员 网络安全
Flink处理函数实战之四:窗口处理
学习Flink低阶处理函数中的ProcessAllWindowFunction和ProcessWindowFunction
142 0
Flink处理函数实战之四:窗口处理