Flink使用过程中的几个疑难点,求各路大神解答-问答-阿里云开发者社区-阿里云

开发者社区> 问答> 正文

Flink使用过程中的几个疑难点,求各路大神解答

于飞007 2019-03-01 14:10:51 1260

flink实时处理的多个事件,来自kafka的3个topic。因此代码中实现了3个流解析不同的事件,最终写入redis的hset,想问下:1、以上的任务启动过程中存在事件解析丢失的情况,即kafka中相应的事件上报了,但是最终解析的时候没有解析到。
2、一个job中启动三个流和启动3个job来解析区别大不大?
3、以上任务否需要对每个流设置slotSharingGroup?
4、以上三个流的任务在submit启动的时候比一个流submit启动满了太多了,一般需要5分钟左右才能启动,是否是自己的处理方式有问题?
示例代码:
/*
流一:
*/

    SourceFunction<TestLog> kafkaConsumer = basicSource();
    DataStream<TestLog> dataStream = environment.addSource(kafkaConsumer);
    DataStream<TestLog> message = dataStream
            .filter(event -> event.getEvent_code().equals("search") || event.getEvent_code().equals("gps"));

    /*
    测试消费kafka写入redis,异步执行,这里是异步ASYNC.IO的应用
     */
    AsyncDataStream.unorderedWait(message, new basicRequest(),
            1000, TimeUnit.SECONDS, 1000);

/*
流二:
*/
SourceFunction kafkaConsumerServer = createServerSource();

    DataStream<ServerLog> ugcStream = environment.addSource(kafkaConsumerServer)
            .filter(pageEvent ->  pageEvent.getEvent_code().contains("ugc_hudong_action"))
            .filter(pageEvent -> StringUtils.isNotBlank(pageEvent.getOpen_udid()))
            .filter(event -> StringUtils.isNotBlank(event.getAttr().get("content_id").toString()))
            .filter(event -> StringUtils.isNotBlank(event.getAttr().get("content").toString())).name("ugc").slotSharingGroup("ugcInfoSlot");

    AsyncDataStream.unorderedWait(ugcStream, new ugcRedisRequest(),
            1000, TimeUnit.SECONDS, 1000);

/*
流三:
*/
SourceFunction kafkaConsumerPage = PageSource();

    DataStream<MobilePageEvent> pageDataStream = environment.addSource(kafkaConsumerPage);
    // 提取item_type和item_id放到redis里,并且记录处理时间
    SinkFunction<MobilePageEvent> pagesink = createRedisSink();
     pageDataStream
            .filter(Objects::nonNull)
            .filter(pageEvent -> StringUtils.isNotBlank(pageEvent.getItemType()))
            .filter(pageEvent -> StringUtils.isNotBlank(pageEvent.getItemId())).name("pageinfo").slotSharingGroup("pageInfoSlot").addSink(pagesink);
消息中间件 NoSQL Kafka Redis 流计算
分享到
取消 提交回答
全部回答(1)
数据库
使用钉钉扫一扫加入圈子
+ 订阅

分享数据库前沿,解构实战干货,推动数据库技术变革

推荐文章
相似问题
推荐课程