开发者社区> 问答> 正文

flink定时加载外部文件

读取本地一个配置文件,生成广播流,用来过滤数据。 flink任务启动的时候使用-ys将文件分发到各节点。 我改如何修改这个配置文件。

java @Override public void run(SourceContext sourceContext) throws Exception {

//读取外部文件 while (isRun) { String tmp; try (BufferedReader br = new BufferedReader(new FileReader(properties.getProperty("iot.filter.conf.file")))) { tmp = br.readLine(); }

// System.out.println(tmp); if (!StringUtils.equals(value, tmp) && tmp != null) { value = tmp; sourceContext.collect(value); } Thread.sleep(3600); } }

展开
收起
frankbood 2020-11-12 15:56:20 2704 0
1 条回答
写回答
取消 提交回答
  • 下一站是幸福

    1、实时流:

    基于flink1.9.2,必须使用FlinkKafkaConsumer

    FlinkKafkaConsumer ssConsumer = new FlinkKafkaConsumer(READ_TOPIC, new SimpleStringSchema(), properties);

    2、文件流:

    DataStreamSource fileStreamSource = env.addSource(new MyRishSourceFileReader());

    3、自定义Source:

    自定义的Source,继承RichSourceFunction,重写函数。在open函数中读取文件,存入ConcurrentHashMap中,在run函数中ctx.collect()出去,然后在BroadcastProcessFunction中的processBroadcastElement函数里接收。

    import com.alibaba.fastjson.JSONObject;
    import com.maxmind.geoip2.DatabaseReader;
    import com.qianxin.ida.dto.DeviceUserBaseLineDto;
    import com.qianxin.ida.dto.GpsBaseLineDto;
    import com.qianxin.ida.dto.TimeBaseLineDto;
    import com.qianxin.ida.dto.UserDeviceBaseLineDto;
    import com.qianxin.ida.enrich.BuildBaseLineDto;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
     
    import java.time.LocalDateTime;
    import java.time.format.DateTimeFormatter;
    import java.util.List;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
     
    public class MyRishSourceFileReader extends RichSourceFunction<JSONObject> {
        public static DatabaseReader reader;
        private List<TimeBaseLineDto> timeBaseLineDtos;
        public final static ConcurrentHashMap<String, Object> map = new ConcurrentHashMap<>();
        private static final Logger logger = LoggerFactory.getLogger(MyRishSourceFileReader.class);
     
        @Override
        public void open(Configuration configuration) {
            try {
                //启动时读取首次
                query();
                reader = TransUtil.getDatabaseReader();
                //线程定时任务,每隔23小时,执行一次
                ScheduledExecutorService service = Executors.newScheduledThreadPool(5);
                service.scheduleWithFixedDelay(() -> {
                    try {
                        query();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }, 10L, 23L, TimeUnit.HOURS);
     
            } catch (Exception e) {
                logger.error("读取文件失败", e);
            }
        }
     
        public void query() {
            logger.info("当前读取基线文件的时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
            timeBaseLineDtos = BuildBaseLineDto.getTimeBaseLine();
            map.put("timeBaseLineDtos", timeBaseLineDtos);
        }
     
        @Override
        public void run(SourceContext ctx) {
            try {
                JSONObject out = new JSONObject();
                JSONObject configJsonFile = JSONObject.parseObject(JsonFileReaderUtil.readJsonData(PropertyReaderUtil.getStrValue("config.json.path")));
                out.put("configJsonFile", configJsonFile);
                out.put("timeBaseLineDtos", map.get("timeBaseLineDtos"));
                ctx.collect(out);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
     
        @Override
        public void cancel() {
        }
     
    }
    

    将文件流广播,connect实时流ssConsumer,自定义广播流函数。 4、广播:

    需要自己实现两个方法:processBroadcastElement()负责处理广播流中的传入元素,processElement()负责处理非广播流中的传入元素。从ReadOnlyContext中取到SourceContext的map,实时流数据和广播流数据汇聚,进行业务逻辑处理,最后out输出,进行sink等操作。

    import com.alibaba.fastjson.JSON;
    import com.alibaba.fastjson.JSONObject;
    import com.qianxin.ida.dto.DeviceUserBaseLineDto;
    import com.qianxin.ida.dto.GpsBaseLineDto;
    import com.qianxin.ida.dto.TimeBaseLineDto;
    import com.qianxin.ida.dto.UserDeviceBaseLineDto;
    import com.qianxin.ida.utils.TransUtil;
    import org.apache.flink.api.common.state.BroadcastState;
    import org.apache.flink.api.common.state.MapStateDescriptor;
    import org.apache.flink.shaded.netty4.io.netty.util.internal.StringUtil;
    import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
    import org.apache.flink.util.Collector;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
     
    import java.math.BigDecimal;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    import java.util.stream.Collectors;
     
    public class MyBroadcastProcessFunction extends BroadcastProcessFunction<String, JSONObject, String> {
     
        private static final Logger logger = LoggerFactory.getLogger(MyBroadcastProcessFunction.class);
        private MapStateDescriptor<String, JSONObject> ruleStateDescriptor;
        private String eventType;
     
        public MyBroadcastProcessFunction(MapStateDescriptor<String, JSONObject> ruleStateDescriptor, String eventType) {
            this.ruleStateDescriptor = ruleStateDescriptor;
            this.eventType = eventType;
        }
     
        //这里处理广播流的数据
        @Override
        public void processBroadcastElement(JSONObject jsonObject, Context ctx, Collector<String> collector) throws Exception {
            BroadcastState<String, JSONObject> broadcastState = ctx.getBroadcastState(ruleStateDescriptor);
            broadcastState.put("broadcast", jsonObject);
        }
     
        //这里处理数据流的数据
        @Override
        public void processElement(String value, ReadOnlyContext ctx, Collector<String> out) {
            double probability = 0;
            JSONObject currentStreamData = JSON.parseObject(value);
            if (currentStreamData != null) {
                try {
                    Iterator<Map.Entry<String, JSONObject>> iterator = ctx.getBroadcastState(ruleStateDescriptor).immutableEntries().iterator();
                    while (iterator.hasNext()) {
                        String outStr = "";
                        Object object = iterator.next().getValue();
                        JSONObject jsonObject = (JSONObject) JSON.toJSON(object);
                        JSONObject configJsonFile = (JSONObject) JSON.toJSON(jsonObject.get("configJsonFile"));
                        List<TimeBaseLineDto> timeBaseLineDto = (List<TimeBaseLineDto>) jsonObject.get("timeBaseLineDtos");
                        if ("1".equals(eventType)) {
                            //业务逻辑函数
                            outStr = doTimeOutierEvent(timeBaseLineDto, currentStreamData, configJsonFile);
                        } 
                        if (!StringUtil.isNullOrEmpty(outStr)) {
                            out.collect(outStr);
                        }
                    }
                } catch (Exception e) {
                    logger.error("处理广播流和数据流数据出错:", e);
                }
            }
        }
    }
    

    5、连接两个流:

    将实时流和广播流连接,非广播流上调用connect()

        BroadcastStream<JSONObject> timeBroadcast = fileStreamSource.setParallelism(1).broadcast(ruleStateDesc);
        DataStream<JSONObject> timeStream = env.addSource(ssConsumer)
                .connect(timeBroadcast).process(new MyBroadcastProcessFunction(ruleStateDesc,"1"));
    

    5、Sink:

    timeStream.addSink(FlinkKafkaProducerCustom.create(WRITE_TOPIC, properties)).name("flink-kafka-timeStream");

    ————————————— 原文链接:https://blog.csdn.net/saranjiao/article/details/105436295

    2021-04-02 22:02:20
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
Flink CDC Meetup PPT - 龚中强 立即下载
Flink CDC Meetup PPT - 王赫 立即下载
Flink CDC Meetup PPT - 覃立辉 立即下载