> 大家好, > > 我遇到一个问题一直想不明白原因,想请教大家 > > 我的代码keyby userid 然后使用session window 实现一个按userid聚合 并执行了一个 topN方法。 > > 代码大致如下 > // Topn聚合 > DataStream itemList = resultDataStream > .assignTimestampsAndWatermarks( > new > > BoundedOutOfOrdernessTimestampExtractor<PredictResult>(Time.milliseconds(100)) > { > @Override > public long extractTimestamp(PredictResult > predictResult) { > return predictResult.getDate_timestamp(); > } > } > ) > .keyBy("userId") > > .window(EventTimeSessionWindows.withGap(Time.milliseconds(100))) > .process(new TopNService(11)); > itemList.print("IRS_RESULT: "); > > > 作业的延迟特别的高,高达30秒,完全无法接受。 起初我以为是自己的 topN方法有问题,但我采用 > ProcessTimeSessionWindow后,延迟降低为一秒以内。 > 使用processtime 的弊端是gap是不好估计,高了影响作业延迟,低了 无法完成预期的聚合,导致报错(且运行不稳定)。 > 我不太理解为什么会出现这样的情况~还烦请大家给与一点解决思路~~ > > > 谢谢 > > // top n方法 > > public static class TopNService extends > ProcessWindowFunction<PredictResult, Object, Tuple, TimeWindow> { > > private final int topSize; > > public TopNService(int topSize) { > this.topSize = topSize; > } > @Override > public void process(Tuple tuple, Context context, > Iterable<PredictResult> iterable, Collector<Object> collector) throws > Exception { > List<PredictResult> allItems = new ArrayList<>(); > for (PredictResult predictResult:iterable){ > allItems.add(predictResult); > } > allItems.sort(new Comparator<PredictResult>() { > @Override > public int compare(PredictResult o1, PredictResult o2) { > return o2.probability.compareTo(o1.probability); > } > }); > int userId = allItems.get(0).userId ; > String logonType=allItems.get(0).getLogonType(); > StringBuilder result = new StringBuilder(); > for (int i=0;i<topSize;i++) { > PredictResult currentItem = allItems.get(i); > result.append(currentItem.serviceId).append(","); > } > LocalDate localDate = LocalDate.now(); > LocalTime localTime = LocalTime.now(); > //NXZW_ZNTJ_TOPIC_IRS_RESULT 的数据格式 start > JSONObject resultJson = new JSONObject(); > resultJson.put("user_id", userId); > resultJson.put("logon_type", logonType); > resultJson.put("date", localDate + " " + localTime); > JSONArray jsonArray = new JSONArray(); > jsonArray.add(resultJson); > resultJson.put("service_id", result.toString()); > //NXZW_ZNTJ_TOPIC_IRS_RESULT 的数据格式 end > collector.collect(jsonArray.toString()); > } > } >
*来自志愿者整理的flink邮件归档
个人理解可以从以下几个角度分析下 1.你说的30s和1s的延迟分别都是怎样对比出来的,参照物是否一致 2.如果上述参照物都是事件数据的原始时间戳的话,可以按照Congxian说的方法查看下 3.如果不是原始时间戳,并且MaxOutOfOrderness = 100ms的情况下,可以先查看是否是根源问题,查看是否是Flink消费作业时,上游事件时间戳是否已经有比较大的延迟
*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。