flink实时处理的多个事件,来自kafka的3个topic。因此代码中实现了3个流解析不同的事件,最终写入redis的hset,想问下:1、以上的任务启动过程中存在事件解析丢失的情况,即kafka中相应的事件上报了,但是最终解析的时候没有解析到。
2、一个job中启动三个流和启动3个job来解析区别大不大?
3、以上任务否需要对每个流设置slotSharingGroup?
4、以上三个流的任务在submit启动的时候比一个流submit启动满了太多了,一般需要5分钟左右才能启动,是否是自己的处理方式有问题?
示例代码:
/*
流一:
*/
SourceFunction<TestLog> kafkaConsumer = basicSource();
DataStream<TestLog> dataStream = environment.addSource(kafkaConsumer);
DataStream<TestLog> message = dataStream
.filter(event -> event.getEvent_code().equals("search") || event.getEvent_code().equals("gps"));
/*
测试消费kafka写入redis,异步执行,这里是异步ASYNC.IO的应用
*/
AsyncDataStream.unorderedWait(message, new basicRequest(),
1000, TimeUnit.SECONDS, 1000);
/*
流二:
*/
SourceFunction kafkaConsumerServer = createServerSource();
DataStream<ServerLog> ugcStream = environment.addSource(kafkaConsumerServer)
.filter(pageEvent -> pageEvent.getEvent_code().contains("ugc_hudong_action"))
.filter(pageEvent -> StringUtils.isNotBlank(pageEvent.getOpen_udid()))
.filter(event -> StringUtils.isNotBlank(event.getAttr().get("content_id").toString()))
.filter(event -> StringUtils.isNotBlank(event.getAttr().get("content").toString())).name("ugc").slotSharingGroup("ugcInfoSlot");
AsyncDataStream.unorderedWait(ugcStream, new ugcRedisRequest(),
1000, TimeUnit.SECONDS, 1000);
/*
流三:
*/
SourceFunction kafkaConsumerPage = PageSource();
DataStream<MobilePageEvent> pageDataStream = environment.addSource(kafkaConsumerPage);
// 提取item_type和item_id放到redis里,并且记录处理时间
SinkFunction<MobilePageEvent> pagesink = createRedisSink();
pageDataStream
.filter(Objects::nonNull)
.filter(pageEvent -> StringUtils.isNotBlank(pageEvent.getItemType()))
.filter(pageEvent -> StringUtils.isNotBlank(pageEvent.getItemId())).name("pageinfo").slotSharingGroup("pageInfoSlot").addSink(pagesink);
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。