【Flink-需求】KafkaSource根据经纬度查询高德API关联位置信息

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: 【Flink-需求】KafkaSource根据经纬度查询高德API关联位置信息

一、需求分析


1.KafkaSource根据经纬度查询高德API关联位置信息

2.查询一条数据及时没有及时的返回,也可以异步Flink IO


1.1 数据样式


user001,A1,2020-10-10 10:10:10,1,115.963,39.3659

user002,A2,2020-10-10 10:10:10,1,123.544,36.2356

user003,A3,2020-10-10 10:10:10,1,123.568,47.3215


二、高德地图key准备


2.1 进入开发者中心


20200923124600863.png


20200923125235751.png


2.2 引入http的依赖

        <!--引入httpClient的依赖-->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.6</version>
        </dependency>

三、Flink开发


3.1 ActivityBean.java

public class ActivityBean {
    public String uid;
    public String aid;
    public String activityName;
    public String time;
    public int eventType;
    public String province;
    public double longitude;
    public double latitude;
    public ActivityBean() {
    }
    public ActivityBean(String uid, String aid, String activityName, String time, int eventType, double longitude,double latitude,String province) {
        this.uid = uid;
        this.aid = aid;
        this.activityName = activityName;
        this.time = time;
        this.eventType = eventType;
        this.province = province;
        this.latitude = latitude;
        this.longitude = longitude;
    }
    @Override
    public String toString() {
        return "ActivityBean{" +
                "uid='" + uid + '\'' +
                ", aid='" + aid + '\'' +
                ", activityName='" + activityName + '\'' +
                ", time='" + time + '\'' +
                ", eventType=" + eventType +
                ", province='" + province + '\'' +
                ", longitude=" + longitude +
                ", latitude=" + latitude +
                '}';
    }
    public static ActivityBean of(String uid,String aid,String activityName,String time,int eventType,double longitude,double latitude,String province){
        return new ActivityBean(uid,aid,activityName,time,eventType,longitude,latitude,province);
    }
}

3.2 RichMapFunction.java

public class GeoToActivityBeanFunction extends RichMapFunction<String,ActivityBean> {
    private CloseableHttpClient httpClient = null;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        httpClient = HttpClients.createDefault();
    }
    @Override
    public ActivityBean map(String line) throws Exception {
        String[] fields = line.split(",");
        String uid = fields[0];
        String aid = fields[1];
        String time = fields[2];
        int eventType = Integer.parseInt(fields[3]);
        String longitude = fields[4];
        String latitude = fields[5];
        String url = "https://restapi.amap.com/v3/geocode/regeo?key=32451f2e804ded152cef65c3cd16452d&location=" + longitude + "," + latitude;
        String province = null;
        HttpGet httpGet = new HttpGet(url);
        CloseableHttpResponse response = httpClient.execute(httpGet);
        try{
            int status = response.getStatusLine().getStatusCode();
            if (status == 200) {
                //获取请求的json字符串
                String result = EntityUtils.toString(response.getEntity());
                System.out.println(result);
                JSONObject jsonObject = JSONObject.parseObject(result);
                JSONObject regeocode = jsonObject.getJSONObject("regeocode");
                if (regeocode != null && !regeocode.isEmpty()) {
                    JSONObject address = regeocode.getJSONObject("addressComponent");
                    //获取省市区
                    province = address.getString("province");
                    //String city = address.getString("city");
                    //String businessAreas = address.getString("businessAreas");
                }
            }
        }finally {
            response.close();
        }
        return ActivityBean.of(uid,aid,"未查询数据库",time,eventType,Double.parseDouble(longitude),Double.parseDouble(latitude),province);
    }
    @Override
    public void close() throws Exception {
        super.close();
        httpClient.close();
    }
}


3.3 Flink执行程序

public class GeoQueryLocationActivityCount {
        public static void main(String[] args) throws Exception {
            //1.获取环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //2.kafka配置
            String topic = "activity";
            Properties prop = new Properties();
            prop.setProperty("bootstrap.servers", "192.168.52.200:9092");//多个的话可以指定
            prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            prop.setProperty("auto.offset.reset", "earliest");
            prop.setProperty("group.id", "consumer1");
            FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), prop);
            //3.获取数据
            DataStream<String> lines = env.addSource(myConsumer);
            SingleOutputStreamOperator<ActivityBean> beans = lines.map(new GeoToActivityBeanFunction());
            beans.print();
            //7.执行
            env.execute("GeoQueryLocationActivityCount");
        }
}

3.4 测试


1.开启kafka生产者


bin/kafka-console-producer.sh --broker-list 192.168.52.200:9092,1.168.52.201:9092,19


测试结果:


20200923133307141.png

这里我们想一个问题,每次flink查询计算时候,需要依赖于查询高德地图的返回结果,如果没有返回结果,那么是不是flink程序就会一直卡在这里,不会执行下去


四、Flink异步Async I/O

2020092313403940.png


4.1 引入异步的httpclient依赖

        <!--引入高效的异步httpClient的依赖-->
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpasyncclient</artifactId>
            <version>4.1.4</version>
        </dependency>

4.2 RichAsyncFunction.java

public class AsyncGeoToActivityBeanFunction extends RichAsyncFunction<String,ActivityBean> {
    private transient CloseableHttpAsyncClient httpAsyncClient;
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        //1.初始化异步的httpclient
        RequestConfig requestConfig = RequestConfig.custom()
                .setSocketTimeout(3000)
                .setConnectTimeout(3000)
                .build();
        httpAsyncClient = HttpAsyncClients.custom()
                .setMaxConnTotal(20)
                .setDefaultRequestConfig(requestConfig).build();
        httpAsyncClient.start();
    }
    @Override
    public void asyncInvoke(String line, ResultFuture<ActivityBean> resultFuture) throws Exception {
        String[] fields = line.split(",");
        String uid = fields[0];
        String aid = fields[1];
        String time = fields[2];
        int eventType = Integer.parseInt(fields[3]);
        String longitude = fields[4];
        String latitude = fields[5];
        String url = "https://restapi.amap.com/v3/geocode/regeo?key=32451f2e804ded152cef65c3cd16452d&location=" + longitude + "," + latitude;
        HttpGet httpGet = new HttpGet(url);
        Future<HttpResponse> future = httpAsyncClient.execute(httpGet, null);
        CompletableFuture.supplyAsync(new Supplier<String>() {
            @Override
            public String get() {
                try{
                    HttpResponse response = future.get();
                    String province = null;
                    if (response.getStatusLine().getStatusCode() == 200) {
                        //获取请求的json字符串
                        String result = EntityUtils.toString(response.getEntity());
                        System.out.println(result);
                        JSONObject jsonObject = JSONObject.parseObject(result);
                        JSONObject regeocode = jsonObject.getJSONObject("regeocode");
                        if (regeocode != null && !regeocode.isEmpty()) {
                            JSONObject address = regeocode.getJSONObject("addressComponent");
                            //获取省市区
                            province = address.getString("province");
                            //String city = address.getString("city");
                            //String businessAreas = address.getString("businessAreas");
                        }
                    }
                    return province;
                } catch (Exception e) {
                    return null;
                }
            }
        }).thenAccept((String province) ->{
            resultFuture.complete(Collections.singleton(ActivityBean.of(uid,aid,null,time,eventType,0,0,province)));
        });
    }
    @Override
    public void close() throws Exception {
        super.close();
        httpAsyncClient.close();
    }
}

4.3 Flink主程序

public class AsyncGeoQueryLocationActivityCount {
        public static void main(String[] args) throws Exception {
            //1.获取环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            //2.kafka配置
            String topic = "activity";
            Properties prop = new Properties();
            prop.setProperty("bootstrap.servers", "192.168.52.200:9092");//多个的话可以指定
            prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            prop.setProperty("auto.offset.reset", "earliest");
            prop.setProperty("group.id", "consumer1");
            FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), prop);
            //3.获取数据
            DataStream<String> lines = env.addSource(myConsumer);
            // 无序的异步
            SingleOutputStreamOperator<ActivityBean> beans = AsyncDataStream.unorderedWait(lines, new AsyncGeoToActivityBeanFunction(), 0, TimeUnit.MICROSECONDS, 10);
            beans.print();
            //7.执行
            env.execute("AsyncGeoQueryLocationActivityCount");
        }
}

4.4 总结


异步请求MYSQL 【线程池】

异步请求ES

异步请求…

目录
相关文章
|
2月前
|
JSON 搜索推荐 API
抖音商品详情API接口:获取商品信息的指南
抖音商品详情API接口由抖音开放平台提供,允许第三方应用访问抖音小店的商品数据,包括基本信息、价格、库存及用户评价等。其优势在于数据实时性、自动化处理、市场分析及个性化推荐。通过注册账号、获取API密钥、阅读文档和构建请求,用户可高效获取商品信息,提升运营效率。未来,该接口将在电商领域发挥更大作用。
|
12天前
|
JSON 安全 API
抖音店铺商品信息的 API
抖音店铺商品信息的 API 主要用于获取商品的详细信息,包括基本信息、属性、库存、评价、推广信息等。开发者需注册账号、申请权限、阅读文档、发送请求并处理响应。此外,还提供商品搜索和管理接口,帮助商家优化商品展示和管理订单,提高运营效率。使用时需遵守平台规则,确保数据安全和合法性。
|
13天前
|
API
淘宝API接口( item_detail - 淘宝商品详情查询)
淘宝商品详情查询 API(item_detail)用于获取淘宝商品的详细信息。请求参数包括商品唯一 ID(num_iid)和是否获取促销价(is_promotion)。响应参数包含商品标题、价格、库存、图片链接、品牌等详细信息。
|
20天前
|
Prometheus 监控 Cloud Native
调用淘宝 API 时如何处理错误信息?
调用淘宝API时,需熟悉其错误码体系,处理客户端(如参数错误、权限不足)和服务器(如内部错误、网络问题)错误,编写错误处理逻辑,并进行充分测试与监控,确保API调用稳定可靠。
|
3天前
|
Java 测试技术 API
如何利用 1688 API 接口获取商品信息?
利用 1688 API 获取商品信息的步骤包括:注册开发者账号并创建应用,申请 API 权限,获取 API 密钥,了解 API 文档,编写代码调用 API,并进行测试与调试。最终在生产环境中上线应用,确保合法合规使用。
|
29天前
|
数据采集 人工智能 自然语言处理
Python实时查询股票API的FinanceAgent框架构建股票(美股/A股/港股)AI Agent
金融领域Finance AI Agents方面的工作,发现很多行业需求和用户输入的 query都是和查询股价/行情/指数/财报汇总/金融理财建议相关。如果需要准确的 金融实时数据就不能只依赖LLM 来生成了。常规的方案包括 RAG (包括调用API )再把对应数据和prompt 一起拼接送给大模型来做文本生成。稳定的一些商业机构的金融数据API基本都是收费的,如果是以科研和demo性质有一些开放爬虫API可以使用。这里主要介绍一下 FinanceAgent,github地址 https://github.com/AI-Hub-Admin/FinanceAgent
|
12天前
|
监控 供应链 搜索推荐
获取店铺商品信息的 API 接口有哪些?
本文介绍了五个常用的电商平台获取店铺商品信息的 API 接口:淘宝、京东、1688、拼多多和慢慢买。每个接口的功能、使用方式及优势各不相同,涵盖商品详情、价格、销量、库存等信息,适用于商品分析、竞品分析、价格监控等场景,帮助商家提升业务效率和市场竞争力。
|
2月前
|
XML JSON API
淘宝商品详情API接口:获取商品信息的指南
淘宝详情API接口是淘宝开放平台提供的一种API接口,它允许开发者通过编程方式获取淘宝商品的详细信息。这些信息包括商品的基本属性、价格、库存状态、销售策略、卖家信息等,对于电商分析、市场研究或者商品信息管理等场景非常有用。
81 1
|
2月前
|
API 搜索推荐
|
2月前
|
SQL 分布式计算 BI
Dataphin中集成SelectDB以支持报表分析和API查询
本文介绍了一家零售企业如何利用SelectDB进行BI分析及数据服务API的查询。通过Dataphin的数据集成、SQL研发等功能,将CRM、ERP等系统数据汇聚加工,并推送至SelectDB构建销售数据集市层,以支持报表分析及API查询。SelectDB具备实时、统一、弹性及开放特性,适用于多种实时分析场景。文章详细描述了在Dataphin中集成SelectDB的整体方案、数据源配置、数据集成、数据开发及数据服务流程。
105 0