尊敬的开发者您好, 我的需求是这样的, 拥有数据: 现拥有两个表,一个表为出租车起点的经纬度坐标(13782492行),另一个表为交叉口的经纬度坐标(4000多行,每个坐标具备一个id,从0开始的id) 需要做什么? 将将一千多万的起点坐标匹配到距离最近的交叉口上去,返回该匹配的id,设置了一个距离阈值为100m,如果据最近的交叉口仍超过100m,则返回-1。 我现在的代码如下: import pandas as pd import numpy as np from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, DataTypes from pyflink.table.descriptors import Schema, OldCsv, FileSystem from pyflink.table.udf import udf import os import time
env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env) t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m') t_env.get_config().get_configuration().set_string("python.fn-execution.arrow.batch.size", '100000')
if os.path.exists('output'): os.remove('output')
t_env.connect(FileSystem().path('output'))
.with_format(OldCsv() .field('id', DataTypes.BIGINT()))
.with_schema(Schema() .field('id', DataTypes.BIGINT()))
.create_temporary_table('mySink')
data = pd.read_csv(r'D:\大论文\项目代码\data\trip\graph_data.csv') coor_o = pd.DataFrame(dict(zip(data['O_ID'], zip(data['O_X'], data['O_Y'])))).T coor_d = pd.DataFrame(dict(zip(data['D_ID'], zip(data['D_X'], data['D_Y'])))).T coor = coor_o.append(coor_d).drop_duplicates() coor.columns = ['lng', 'lat'] coor = coor.sort_index() coor = coor.to_numpy()
@udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT(), DataTypes.ARRAY(DataTypes.FLOAT()), DataTypes.ARRAY(DataTypes.FLOAT())], result_type=DataTypes.BIGINT()) def distance_meters(lng1, lat1, lng2=coor[:, 0], lat2=coor[:, 1]): temp = (np.sin((lng2-lng1)/2np.pi/180)**2+ +np.cos(lng1np.pi/180)np.cos(lng2np.pi/180)np.sin((lat2-lat1)/2np.pi/180)**2) distance = 2np.arctan2(np.sqrt(temp), np.sqrt(1-temp)) distance = distance3958.8*1609.344
buffer=100 if (distance <= buffer).sum() > 0: return distance.argmin() else: return -1
df = pd.read_csv(r'data\trip\yellow_tripdata_2014-01.csv') use_data = df[['pickup_longitude', 'pickup_latitude']]
t_env.from_pandas(use_data)
.select("distance_meters(pickup_longitude, pickup_latitude)")
.insert_into('mySink')
start_time = time.time() t_env.execute("tutorial_job") print(time.time() - start_time) 我电脑的CPU为12核24线程
目前处理一千多万数据所耗费的时间为2607秒(43分钟),我不知道为什么要花这么长的时间,按理来说即使设置并行数为1,批大小为10万,应该要比这个快很多吧.. 我尝试了一下设置并行数为8,但是返现结果会变为8个文件,我就打断了,没有运行完(我需要保持原表的输入顺序,该怎么做呢) 请问,我这种情况应该如何去提速呢?可否解释一下batch.size? 期待您的回答,感谢!*来自志愿者整理的flink
Hi, 1. from_pandas性能不太好的,不是用在生产上的。你可以直接用flink的csv的connector来读取你的数据呀。 2. arrow.batch.size,表示的是会把多少条数据变成一个pandas.series,然后作为你的udf的一个列传给你*来自志愿者整理的flink
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。