目录
一、背景
最近,碰到了一个业务,是将数据库中所有的地址信息请求百度接口获取经纬度保存起来。有38万多个地址,想到的方案就是查出所有的地址字段加上主键字段,然后导出csv文件,读取这个文件,遍历请求百度api接口,获取经纬度信息,生成一个新的文件,作为一张表导入数据库,使用sql给地址刷一遍经纬度。前面已经写过具体怎么实现了,请查看java实现调用百度接口将大量数据库中保存的地址转换为经纬度,但是由于是单线程效率有点低,20分钟大约跑一万条吧,我需要转换37万,得15个小时左右,太慢了,就想到了可以通过多线程拿到每一条数据请求百度接口,这样速度就上去了,先剧透一下结果,多线程下地址转换经纬度40分钟5万条,大约三个多小时就可以跑完,效率提升了好几倍,这次代码在上一篇的基础上做了一些优化,现在就来看看具体怎么实现吧。
二、具体实现
先创建一个线程池,后面会将每一个将要转换的地址提交到线程池中请求百度接口进行转换,将转换成功的放到共享集合中,将转换失败的也放到另一个共享集合中,当所有的地址都提交到线程池中请求转换以后,等待线程全部执行完毕,如果有转换失败的地址执行重试机制,最终将转换成功的共享集合中的数据写入csv文件中。
1、这里我们设置线程池核心线程个数为当前物理机的CPU核数,最大线程个数为当前物理机CPU核数的2倍;设置线程池阻塞队列的大小为5;需要注意的是,我们将线程池的拒绝策略设置为CallerRunsPolicy,即当线程池任务饱和,执行拒绝策略时不会丢弃新的任务,而是会使用调用线程来执行,创建线程池代码如下:
private final static int AVALIABLE_PROCESSORS = Runtime.getRuntime().availableProcessors(); private final static ThreadPoolExecutor POOL_EXECUTOR = new ThreadPoolExecutor(AVALIABLE_PROCESSORS, AVALIABLE_PROCESSORS * 2, 1, TimeUnit.MINUTES, new LinkedBlockingQueue<>(5), new ThreadPoolExecutor.CallerRunsPolicy());
2、由于我们用多线程操作共享集合,一定要考虑到线程安全问题,使用Collections.synchronizedList方法创建集合就可以保证线程安全,或者new CopyOnWriteArrayList<>(),再或者Vector类,所以共享变量声明如下,成功的保存进datas,失败的保存进failData中。对于转换成功的共享集合,可以设置一个初始容量,避免list集合扩容影响效率。
List<ResultBean> failData = Collections.synchronizedList(new ArrayList<>()); List<ResultBean> datas = Collections.synchronizedList(new ArrayList<>(35000));
3、我们通过for循环遍历所有要转换的地址,将地址提交到线程池中的部分代码片段如下:
for (String[] next : strings) { POOL_EXECUTOR.submit(new Runnable() { @Override public void run() { String address = next[1].replaceAll("\\s*", ""); ResultBean resultBean = new ResultBean(next[0], address, null, null); //百度接口地址转换经纬度方法 try { getLngLat(datas, failData, resultBean); } catch (Exception e) { } } }); }
4、getLngLat()方法就是请求百度接口转换,上一节介绍过,只是加了一些注释,方便执行的时候控制台提示执行情况,再展示一下。
/** * 封装的获取经纬度方法 * * @param datas * @param failData * @param resultBean */ private static void getLngLat(List<ResultBean> datas, List<ResultBean> failData, ResultBean resultBean) { Map<String, String> map = Maps.newHashMap(); map.put("address", resultBean.getAddress()); String response = restTemplate.getForObject(URL, String.class, map); if (response != null && response.contains("lng") && response.contains("lat")) { final Map<String, Object> geocoderResult = new JacksonJsonParser().parseMap(response); Map location = (Map) ((Map) geocoderResult.get("result")).get("location"); Double lng = (Double) location.get("lng"); Double lat = (Double) location.get("lat"); ResultBean bean = new ResultBean(resultBean.getExternal_id(), resultBean.getAddress(), String.valueOf(lng), String.valueOf(lat)); datas.add(bean); if (datas.size() % 1000 == 0) { System.out.println("------------------已经转换成功: " + datas.size() + "条-----------------------------------------"); } } else { failData.add(resultBean); } }
5、我们将地址全部提交到线程池中肯定很快,我们需要 判断线程中的线程是否全部执行完毕以后才能执行后续的转换失败地址重试,成功地址写入csv文件,通过下面方法,用一个while死循环不断判断是否活跃线程数为0,即所有任务结束。
public static boolean isEndTask() { while (true) { if (POOL_EXECUTOR.getActiveCount() == 0) { return true; } } }
6、在所有子线程执行结束以后,关掉线程池,执行失败地址转换重试,方法如下:
if (isEndTask()) { //关掉线程池 POOL_EXECUTOR.shutdown(); //失败数据再次请求百度接口,最多循环一千次,防止失败数据出现程序永不停止 int i = 1000; while (failData.size() > 0 && i > 0) { List<ResultBean> tempFailData = Collections.synchronizedList(new ArrayList<>(failData)); failData.clear(); for (ResultBean resultBean : tempFailData) { try { getLngLat(datas, failData, resultBean); } catch (Exception e) { } } System.out.println("重试剩余次数:" + i); i--; } }
7、上一篇已经说过原始文件格式,和导出的文件格式,请自行查看查看。