开发者学堂课程【大数据 Spark2020版(知识精讲与实战演练)第四阶段:行政区统计_功能实现】学习笔记,与课程紧密联系,让用户快速学习知识。
课程地址:https://developer.aliyun.com/learning/course/691/detail/12123
行政区统计_功能实现
功能实现
通过解析 JSON、读取 Geometry,通过两个步骤进行后续工作,通过两个步骤将 GeoJSON 数据表示为 Geometry 对象的形式,后期可以找到某个点在那个行政区中,做具体功能实现,在出租车数据集中增加行政区信息。
1.步骤
将 Geometry 数据集按照区域大小排序
广播 Geometry 信息,发给每一个 Executor
创建 UDF ,通过经纬度获取行政区信息
统计行政区信息
进入代码中,在代码中实现各步骤,第七步增加行政区信息,做好准备解析 GeoJSON,在解析 GeoJSON 前要读取数据集,这是第一步,第二步按照区域大小进行排序,第三步广播 GeoJSON 数据集,第四步进行相应的 UDF 创建,完成功能,第五步统计信息,以上是整个的五个步骤。
第一步读取数据集,在 Scala 中读取文件用 source,读取数据集用 source,fromFile,File 路径是在 dataset 下的 borough-boundaries,读取数据集,变成字符串的形式,mrString 的形式就是 GeoJSON 的数据集,解析数据集 FeatureExtraction,parseJson 进行相应操作,传入 geojson,解析出的结果。 FeatureCollection,对结果进行排序,排序的目的,后续需要得到每一个出租车在哪个行政区,本质拿到经纬度,遍历 features,搜索其所在的行政区,在搜索的过程中,行政区越大(地理面积)命中的几率就越高,所以把大的行政区放在前面,容易命中数据,减少遍历次数。
排序 FeatureCollectio,FeatureCollection 找到其中的 features,进行 sort,sort使用 sortBy 中返回排序依据,根据排序依据进行排序,得到 feature。排序依据两个值有关,一是与行政区编号有关,feature.properties,properties 是解析出的map数据,就可以找到其中的 boroughCode 行政区编号,二是行政区大小有关,附上 FeatureCollection,找到 getGeometry,getGeometry 获取的是 Geometry 的对象,通过 calculateArea2D 判断在二维坐标系的面积,排序完成后获取最终的结构sortedFeatures,后续在每一个算子中使用,拿到每一条数据 map,在后续spark中使用数据集,数据集较小,可以广播发给大家,减少数据集的拷贝量,得到 spark.sparkContext.broadcast(),将 sortedFeatures 发出,发出后得到 featuresBC
创建UDF,目的是完成具体功能,对 UDF 命名,boroughLookUp,LookUp 是检索,在函数中接收X、Y,X、Y 是经纬度,根据XY求得所在的行政区,搜索经纬度所在的行政区,进行相应的转换,转为行政区信息。
得到 GeometryEngine 这样一个类,找到其中静态方法 contains,contains 判断 feature,判断行政区,传入经纬度new point,加入地理坐标系参数,SpatialReference.create(4326)是一个固定写法,需要行政区的 feature 对象,featureBC 获取行政区列表,value 得到广播中的每一条数据,进行 find,feature 中有一个参数 Geometry,得到 Geometry 对象,在生成的行政区列表中进行搜索,搜索的过程判断是否在行政区内,相应转换,转为行政区信息,featureHit 是一个命中对象,featureHit 是一个 option,使用 map,接收 feature,获取行政区名字,option 对象 map 后转为另一个 option 对象,option 取得名字,得到行政区定义,borough,将定义返回,在 UDF 中接收经纬度,返回行政区名字,执行相应的测试统计是否正确,生成新的 UDF,通过 UDF 方法传 boroughLookup,生成新的UDF,进行相应技术,groupBy 对应新生成的行政区列,UDF 传入其中的X、Y,找到下车点的X、Y经纬度,对行政区进行分组,求 count 看每一个行政区下车点有多少个出租车在行政区停留,
运行代码
//7.增加行政区信息
//7.1.读取数据集
Val
geo
J
son=Source.fromFile("dataset/nyc-borough-boundaries-polygon.geojson").mkString
val featurecollection = FeatureExtraction.parseJson(geoJson)
//7.2.排序
//后续需要得到每一个出租车在哪个行政区,拿到经纬度,遍历 features 搜索其所在的行政区
//在搜索的过程中,行政区越大命中的几率就越高,所以把大的行政区放在前面,更容易命中,减少遍历次数
val sortedFeatures = featurecollection.features.sortBy(feature => {
(feature.properties(""boroughCode"),- feature.getGeometry().calculateArea2D()))
//7.3.广播
val featuresBC = spark.sparkContext.broadcast(sortedFeatures)
//7.4. UDF创建,完成功能
val boroughLookup = (x: Double, y: Double) =>
{
//7.4.1.搜索经纬度所在的行政区
val featureHit: option[Feature] = featuresBC.value.find(feature =>
{
GeometryEngine.contains(feature.getGeometry(), new Point(x,y),SpatialReference.create(4326))})
//7.4.2.转为行政区信息
val borough = featureHit.map(feature => feature.properties("borought" )).getorElse("NA")borough
}
//7.5.统计信息
val boroughUDF = udf ( boroughLookup)
taxiClean.groupBy(boroughUDF( " dropoffx,"dropoffY))
.count()
.show
结果
Queens 有869条数据,NA 有306条数据,Brooklyn 有751条数据,Staten Island 有6条数据,Manhattan 有7980条数据,Bronx 有88条数据,Manhattan 数据多,相对密集频繁
数据不够全面,相对来说较少,课后在集群中使用较大的数据集运算,将整个行政区信息添加进入,还剩最后一步按照行政区统计行政区的平局等待时间。