flink实时处理的多个事件,来自kafka的3个topic。因此代码中实现了3个流解析不同的事件,最终写入redis的hset,想问下:1、以上的任务启动过程中存在事件解析丢失的情况,即kafka中相应的事件上报了,但是最终解析的时候没有解析到。
2、一个job中启动三个流和启动3个job来解析区别大不大?
3、以上任务否需要对每个流设置slotSharingGroup?
4、以上三个流的任务在submit启动的时候比一个流submit启动满了太多了,一般需要5分钟左右才能启动,是否是自己的处理方式有问题?
示例代码:
/*
流一:
*/
    SourceFunction<TestLog> kafkaConsumer = basicSource();
    DataStream<TestLog> dataStream = environment.addSource(kafkaConsumer);
    DataStream<TestLog> message = dataStream
            .filter(event -> event.getEvent_code().equals("search") || event.getEvent_code().equals("gps"));
    /*
    测试消费kafka写入redis,异步执行,这里是异步ASYNC.IO的应用
     */
    AsyncDataStream.unorderedWait(message, new basicRequest(),
            1000, TimeUnit.SECONDS, 1000);
/*
流二:
*/
 SourceFunction kafkaConsumerServer = createServerSource();
    DataStream<ServerLog> ugcStream = environment.addSource(kafkaConsumerServer)
            .filter(pageEvent ->  pageEvent.getEvent_code().contains("ugc_hudong_action"))
            .filter(pageEvent -> StringUtils.isNotBlank(pageEvent.getOpen_udid()))
            .filter(event -> StringUtils.isNotBlank(event.getAttr().get("content_id").toString()))
            .filter(event -> StringUtils.isNotBlank(event.getAttr().get("content").toString())).name("ugc").slotSharingGroup("ugcInfoSlot");
    AsyncDataStream.unorderedWait(ugcStream, new ugcRedisRequest(),
            1000, TimeUnit.SECONDS, 1000);
/*
流三:
*/
 SourceFunction kafkaConsumerPage = PageSource();
    DataStream<MobilePageEvent> pageDataStream = environment.addSource(kafkaConsumerPage);
    // 提取item_type和item_id放到redis里,并且记录处理时间
    SinkFunction<MobilePageEvent> pagesink = createRedisSink();
     pageDataStream
            .filter(Objects::nonNull)
            .filter(pageEvent -> StringUtils.isNotBlank(pageEvent.getItemType()))
            .filter(pageEvent -> StringUtils.isNotBlank(pageEvent.getItemId())).name("pageinfo").slotSharingGroup("pageInfoSlot").addSink(pagesink);
                    版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。