开发者社区> 问答> 正文

pyflink 如何正确设置高速度?(如何提速)

尊敬的开发者您好, 我的需求是这样的, 拥有数据: 现拥有两个表,一个表为出租车起点的经纬度坐标(13782492行),另一个表为交叉口的经纬度坐标(4000多行,每个坐标具备一个id,从0开始的id) 需要做什么? 将将一千多万的起点坐标匹配到距离最近的交叉口上去,返回该匹配的id,设置了一个距离阈值为100m,如果据最近的交叉口仍超过100m,则返回-1。 我现在的代码如下: import pandas as pd import numpy as np from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment, DataTypes from pyflink.table.descriptors import Schema, OldCsv, FileSystem from pyflink.table.udf import udf import os import time

环境等设置,目前使用的并行数为1,batchsize为10万(我不知道这个有没有用)

env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) t_env = StreamTableEnvironment.create(env) t_env.get_config().get_configuration().set_string("taskmanager.memory.task.off-heap.size", '80m') t_env.get_config().get_configuration().set_string("python.fn-execution.arrow.batch.size", '100000')

输出表创建

if os.path.exists('output'):     os.remove('output')

t_env.connect(FileSystem().path('output'))
    .with_format(OldCsv()                  .field('id', DataTypes.BIGINT()))
    .with_schema(Schema()                  .field('id', DataTypes.BIGINT()))
    .create_temporary_table('mySink')

交叉口经纬度数据读取

data = pd.read_csv(r'D:\大论文\项目代码\data\trip\graph_data.csv') coor_o = pd.DataFrame(dict(zip(data['O_ID'], zip(data['O_X'], data['O_Y'])))).T coor_d = pd.DataFrame(dict(zip(data['D_ID'], zip(data['D_X'], data['D_Y'])))).T coor = coor_o.append(coor_d).drop_duplicates() coor.columns = ['lng', 'lat'] coor = coor.sort_index() coor = coor.to_numpy()

udf编写与注册

@udf(input_types=[DataTypes.FLOAT(), DataTypes.FLOAT(),      DataTypes.ARRAY(DataTypes.FLOAT()), DataTypes.ARRAY(DataTypes.FLOAT())], result_type=DataTypes.BIGINT()) def distance_meters(lng1, lat1, lng2=coor[:, 0], lat2=coor[:, 1]):     temp = (np.sin((lng2-lng1)/2np.pi/180)**2+          +np.cos(lng1np.pi/180)np.cos(lng2np.pi/180)np.sin((lat2-lat1)/2np.pi/180)**2)     distance = 2np.arctan2(np.sqrt(temp), np.sqrt(1-temp))     distance = distance3958.8*1609.344

    buffer=100     if (distance <= buffer).sum() > 0:         return distance.argmin()     else:         return -1

出行起点数据读取

df = pd.read_csv(r'data\trip\yellow_tripdata_2014-01.csv') use_data = df[['pickup_longitude', 'pickup_latitude']]

处理流程

t_env.from_pandas(use_data)
     .select("distance_meters(pickup_longitude, pickup_latitude)")
     .insert_into('mySink')

执行与计时

start_time = time.time() t_env.execute("tutorial_job") print(time.time() - start_time) 我电脑的CPU为12核24线程

目前处理一千多万数据所耗费的时间为2607秒(43分钟),我不知道为什么要花这么长的时间,按理来说即使设置并行数为1,批大小为10万,应该要比这个快很多吧.. 我尝试了一下设置并行数为8,但是返现结果会变为8个文件,我就打断了,没有运行完(我需要保持原表的输入顺序,该怎么做呢) 请问,我这种情况应该如何去提速呢?可否解释一下batch.size? 期待您的回答,感谢!*来自志愿者整理的flink

展开
收起
毛毛虫雨 2021-12-06 14:56:11 527 0
1 条回答
写回答
取消 提交回答
  • Hi, 1. from_pandas性能不太好的,不是用在生产上的。你可以直接用flink的csv的connector来读取你的数据呀。 2. arrow.batch.size,表示的是会把多少条数据变成一个pandas.series,然后作为你的udf的一个列传给你*来自志愿者整理的flink

    2021-12-06 15:56:20
    赞同 展开评论 打赏
问答分类:
问答地址:
问答排行榜
最热
最新

相关电子书

更多
SparkSQL实践与优化 立即下载
低代码开发师(初级)实战教程 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载