一、需求分析
1.KafkaSource根据经纬度查询高德API关联位置信息
2.查询一条数据及时没有及时的返回,也可以异步Flink IO
1.1 数据样式
user001,A1,2020-10-10 10:10:10,1,115.963,39.3659
user002,A2,2020-10-10 10:10:10,1,123.544,36.2356
user003,A3,2020-10-10 10:10:10,1,123.568,47.3215
二、高德地图key准备
2.1 进入开发者中心
2.2 引入http的依赖
<!--引入httpClient的依赖--> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpclient</artifactId> <version>4.5.6</version> </dependency>
三、Flink开发
3.1 ActivityBean.java
public class ActivityBean { public String uid; public String aid; public String activityName; public String time; public int eventType; public String province; public double longitude; public double latitude; public ActivityBean() { } public ActivityBean(String uid, String aid, String activityName, String time, int eventType, double longitude,double latitude,String province) { this.uid = uid; this.aid = aid; this.activityName = activityName; this.time = time; this.eventType = eventType; this.province = province; this.latitude = latitude; this.longitude = longitude; } @Override public String toString() { return "ActivityBean{" + "uid='" + uid + '\'' + ", aid='" + aid + '\'' + ", activityName='" + activityName + '\'' + ", time='" + time + '\'' + ", eventType=" + eventType + ", province='" + province + '\'' + ", longitude=" + longitude + ", latitude=" + latitude + '}'; } public static ActivityBean of(String uid,String aid,String activityName,String time,int eventType,double longitude,double latitude,String province){ return new ActivityBean(uid,aid,activityName,time,eventType,longitude,latitude,province); } }
3.2 RichMapFunction.java
public class GeoToActivityBeanFunction extends RichMapFunction<String,ActivityBean> { private CloseableHttpClient httpClient = null; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); httpClient = HttpClients.createDefault(); } @Override public ActivityBean map(String line) throws Exception { String[] fields = line.split(","); String uid = fields[0]; String aid = fields[1]; String time = fields[2]; int eventType = Integer.parseInt(fields[3]); String longitude = fields[4]; String latitude = fields[5]; String url = "https://restapi.amap.com/v3/geocode/regeo?key=32451f2e804ded152cef65c3cd16452d&location=" + longitude + "," + latitude; String province = null; HttpGet httpGet = new HttpGet(url); CloseableHttpResponse response = httpClient.execute(httpGet); try{ int status = response.getStatusLine().getStatusCode(); if (status == 200) { //获取请求的json字符串 String result = EntityUtils.toString(response.getEntity()); System.out.println(result); JSONObject jsonObject = JSONObject.parseObject(result); JSONObject regeocode = jsonObject.getJSONObject("regeocode"); if (regeocode != null && !regeocode.isEmpty()) { JSONObject address = regeocode.getJSONObject("addressComponent"); //获取省市区 province = address.getString("province"); //String city = address.getString("city"); //String businessAreas = address.getString("businessAreas"); } } }finally { response.close(); } return ActivityBean.of(uid,aid,"未查询数据库",time,eventType,Double.parseDouble(longitude),Double.parseDouble(latitude),province); } @Override public void close() throws Exception { super.close(); httpClient.close(); } }
3.3 Flink执行程序
public class GeoQueryLocationActivityCount { public static void main(String[] args) throws Exception { //1.获取环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.kafka配置 String topic = "activity"; Properties prop = new Properties(); prop.setProperty("bootstrap.servers", "192.168.52.200:9092");//多个的话可以指定 prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); prop.setProperty("auto.offset.reset", "earliest"); prop.setProperty("group.id", "consumer1"); FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), prop); //3.获取数据 DataStream<String> lines = env.addSource(myConsumer); SingleOutputStreamOperator<ActivityBean> beans = lines.map(new GeoToActivityBeanFunction()); beans.print(); //7.执行 env.execute("GeoQueryLocationActivityCount"); } }
3.4 测试
1.开启kafka生产者
bin/kafka-console-producer.sh --broker-list 192.168.52.200:9092,1.168.52.201:9092,19
测试结果:
这里我们想一个问题,每次flink查询计算时候,需要依赖于查询高德地图的返回结果,如果没有返回结果,那么是不是flink程序就会一直卡在这里,不会执行下去
四、Flink异步Async I/O
4.1 引入异步的httpclient依赖
<!--引入高效的异步httpClient的依赖--> <dependency> <groupId>org.apache.httpcomponents</groupId> <artifactId>httpasyncclient</artifactId> <version>4.1.4</version> </dependency>
4.2 RichAsyncFunction.java
public class AsyncGeoToActivityBeanFunction extends RichAsyncFunction<String,ActivityBean> { private transient CloseableHttpAsyncClient httpAsyncClient; @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //1.初始化异步的httpclient RequestConfig requestConfig = RequestConfig.custom() .setSocketTimeout(3000) .setConnectTimeout(3000) .build(); httpAsyncClient = HttpAsyncClients.custom() .setMaxConnTotal(20) .setDefaultRequestConfig(requestConfig).build(); httpAsyncClient.start(); } @Override public void asyncInvoke(String line, ResultFuture<ActivityBean> resultFuture) throws Exception { String[] fields = line.split(","); String uid = fields[0]; String aid = fields[1]; String time = fields[2]; int eventType = Integer.parseInt(fields[3]); String longitude = fields[4]; String latitude = fields[5]; String url = "https://restapi.amap.com/v3/geocode/regeo?key=32451f2e804ded152cef65c3cd16452d&location=" + longitude + "," + latitude; HttpGet httpGet = new HttpGet(url); Future<HttpResponse> future = httpAsyncClient.execute(httpGet, null); CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { try{ HttpResponse response = future.get(); String province = null; if (response.getStatusLine().getStatusCode() == 200) { //获取请求的json字符串 String result = EntityUtils.toString(response.getEntity()); System.out.println(result); JSONObject jsonObject = JSONObject.parseObject(result); JSONObject regeocode = jsonObject.getJSONObject("regeocode"); if (regeocode != null && !regeocode.isEmpty()) { JSONObject address = regeocode.getJSONObject("addressComponent"); //获取省市区 province = address.getString("province"); //String city = address.getString("city"); //String businessAreas = address.getString("businessAreas"); } } return province; } catch (Exception e) { return null; } } }).thenAccept((String province) ->{ resultFuture.complete(Collections.singleton(ActivityBean.of(uid,aid,null,time,eventType,0,0,province))); }); } @Override public void close() throws Exception { super.close(); httpAsyncClient.close(); } }
4.3 Flink主程序
public class AsyncGeoQueryLocationActivityCount { public static void main(String[] args) throws Exception { //1.获取环境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //2.kafka配置 String topic = "activity"; Properties prop = new Properties(); prop.setProperty("bootstrap.servers", "192.168.52.200:9092");//多个的话可以指定 prop.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); prop.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); prop.setProperty("auto.offset.reset", "earliest"); prop.setProperty("group.id", "consumer1"); FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), prop); //3.获取数据 DataStream<String> lines = env.addSource(myConsumer); // 无序的异步 SingleOutputStreamOperator<ActivityBean> beans = AsyncDataStream.unorderedWait(lines, new AsyncGeoToActivityBeanFunction(), 0, TimeUnit.MICROSECONDS, 10); beans.print(); //7.执行 env.execute("AsyncGeoQueryLocationActivityCount"); } }
4.4 总结
异步请求MYSQL 【线程池】
异步请求ES
异步请求…