基于ray 多进程调度管理能力优化networks节点最短路径的并行计算

简介: 原生的networkx实现的只能在节点介数度量性任务上达到单核心100的cpu利用率。通过对源码的几行改造我们可以实现多核心的100的利用率。接下来要我们来一起看看是如何实现的多核心100的利用率。

本教程设计到多进程框架ray与networkx的图计算框架。

原生的networkx实现的只能在节点介数度量性任务上达到单核心100的cpu利用率。通过对源码的几行改造我们可以实现多核心的100的利用率。接下来要我们来一起看看是如何实现的多核心100的利用率。

deff_one():
degree_centrality_result=betweenness_centrality(G)
out= []
fori, jindegree_centrality_result.items():
out.append({"user_id": i, "weight": j})
pd.DataFrame(out).to_csv("betweenness_centrality_result.csv")
# 读取2020年企业年报因果关系数据集。bracket_name=json.load(open("2020_extract_result_center.json", "r"))
graph= {}
foriintqdm(list(bracket_name)):
if (i[0], i[4]) notingraph:
graph[(i[0], i[4])] =1else:
graph[(i[0], i[4])] +=1print(len(graph))
graph_relation= []
forgraph_id, graph_countingraph.items():
ifgraph_count>1:
graph_relation.append(graph_id)
print(len(graph_relation))
graph_relation=graph_relation[:10000]
G=nx.Graph()
G.add_edges_from(graph_relation)  # 添加多条边ray_G=ray.put(G)
f_one()

心细的小伙伴可能已经发现了,我们在代码中有一个ray.put的操作,对G这个无向图图谱数据集进行了多进程的准备。那么接下来,我们需要解释一下什么叫做介数中心性。在一张无向图图谱中存在着海量的节点。每一个节点到非相邻的节点都存在着一条最短路径。在介数中心性这个算法中,当前节点出现在无向图图谱所有的最短路径中出现的次数越多意味着节点的重要性越高。通过百度搜索我们可以知道,介数中心性指标在航海、飞行航路场景中有着重要的应用。在百度开源的hugegraph图数据库白皮书中介绍到介数重要性可以作为反洗钱的重要算法、社区发现可以进行团伙反欺诈。

那么源码如何实现的介数中心性这个指标的呢。我们慢慢往下看。

defbetweenness_centrality(
G, k=None, normalized=True, weight=None, endpoints=False, seed=None):
betweenness=dict.fromkeys(G, 0.0)  # b[v]=0 for v in GifkisNone:
nodes=Gelse:
nodes=seed.sample(list(G.nodes()), k)
ray_betweenness= [shortest_path_basic.remote(s, ray_betweenness) forsinnodes]
betweenness=ray.get(ray_betweenness)
betweenness_compute= {}
forbetweenness_oneinbetweenness:
fori,jinbetweenness_one.items():
ifiinbetweenness_compute:
betweenness_compute[i]+=jelse:
betweenness_compute[i]=jbetweenness=_rescale(
betweenness_compute,
len(G),
normalized=normalized,
directed=G.is_directed(),
k=k,
endpoints=endpoints,
    )
returnbetweenness

其中k代表着节点取样数,节点取样数量越少计算速度越快。(因为通过节点进行最短路径的遍历过程最少。)

首先我们需要定义一个betweeness的字典。用以储存每一个节点在其所经过的最短路径中的次数。

第二我们需要遍历所有的节点,用以在计算最短路径这个事情上获取到每一个节点所在的最短路径。

第三我们将每一个节点造成的最短路径的结果给重新合并到一个字典上。

第四,通过rescale重新对我们的所有节点结果进行汇总计算。

那么接下来让我们看看重头戏寻找当前节点的最短路径的代码我们是怎么进行修改的。

@ray.remotedefshortest_path_basic(s, ray_betweenness=None):
G=ray.get(ray_G)
betweenness=dict.fromkeys(G, 0.0)  # b[v]=0 for v in G# single source shortest pathsS, P, sigma, _=_single_source_shortest_path_basic(G, s)
# accumulationbetweenness, delta=_accumulate_basic(betweenness, S, P, sigma, s)
#     ray_betweenness = ray.put(betweenness)delGreturnbetweenness

首先我们从ray的大对象共享代理池中把我们的图拉出来。

第二我们构建一个包含所有节点为key的字典。

第三输入图谱数据G和节点s。通过s来计算所覆盖到的最短路径。

第四我们对所产生的betweenness字典对象进行积累。

第五,我们为了节约内存,所以删掉了特别占用内存的图谱数据G。

第六,我们将累计好的结果返回。

接下来我们就可以通过对基于节点的最短路径查找出来的节点权重进行权重的计算了。

整体代码如下,感兴趣的小伙伴们快来试一试这样实现的单机多进程betweenness节点介数中心性的效果吧。

importjsonimportnetworkxasnximportpandasaspdfromcollectionsimportdequefromnetworkx.algorithms.shortest_paths.weightedimport_weight_functionfromtqdmimporttqdm# helpers for betweenness centralitydef_single_source_shortest_path_basic(G, s):
S= []
P= {}
forvinG:
P[v] = []
sigma=dict.fromkeys(G, 0.0)  # sigma[v]=0 for v in GD= {}
sigma[s] =1.0D[s] =0Q=deque([s])
whileQ:  # use BFS to find shortest pathsv=Q.popleft()
S.append(v)
Dv=D[v]
sigmav=sigma[v]
forwinG[v]:
ifwnotinD:
Q.append(w)
D[w] =Dv+1ifD[w] ==Dv+1:  # this is a shortest path, count pathssigma[w] +=sigmavP[w].append(v)  # predecessorsreturnS, P, sigma, Ddef_accumulate_basic(betweenness, S, P, sigma, s):
delta=dict.fromkeys(S, 0)
whileS:
w=S.pop()
coeff= (1+delta[w]) /sigma[w]
forvinP[w]:
delta[v] +=sigma[v] *coeffifw!=s:
betweenness[w] +=delta[w]
returnbetweenness, deltadef_accumulate_endpoints(betweenness, S, P, sigma, s):
betweenness[s] +=len(S) -1delta=dict.fromkeys(S, 0)
whileS:
w=S.pop()
coeff= (1+delta[w]) /sigma[w]
forvinP[w]:
delta[v] +=sigma[v] *coeffifw!=s:
betweenness[w] +=delta[w] +1returnbetweenness, deltadef_accumulate_edges(betweenness, S, P, sigma, s):
delta=dict.fromkeys(S, 0)
whileS:
w=S.pop()
coeff= (1+delta[w]) /sigma[w]
forvinP[w]:
c=sigma[v] *coeffif (v, w) notinbetweenness:
betweenness[(w, v)] +=celse:
betweenness[(v, w)] +=cdelta[v] +=cifw!=s:
betweenness[w] +=delta[w]
returnbetweennessdef_rescale(betweenness, n, normalized, directed=False, k=None, endpoints=False):
ifnormalized:
ifendpoints:
ifn<2:
scale=None# no normalizationelse:
# Scale factor should include endpoint nodesscale=1/ (n* (n-1))
elifn<=2:
scale=None# no normalization b=0 for all nodeselse:
scale=1/ ((n-1) * (n-2))
else:  # rescale by 2 for undirected graphsifnotdirected:
scale=0.5else:
scale=NoneifscaleisnotNone:
ifkisnotNone:
scale=scale*n/kforvinbetweenness:
betweenness[v] *=scalereturnbetweennessdef_rescale_e(betweenness, n, normalized, directed=False, k=None):
ifnormalized:
ifn<=1:
scale=None# no normalization b=0 for all nodeselse:
scale=1/ (n* (n-1))
else:  # rescale by 2 for undirected graphsifnotdirected:
scale=0.5else:
scale=NoneifscaleisnotNone:
ifkisnotNone:
scale=scale*n/kforvinbetweenness:
betweenness[v] *=scalereturnbetweennessimportrayray.init()
@ray.remotedefshortest_path_basic(s, ray_betweenness=None):
G=ray.get(ray_G)
betweenness=dict.fromkeys(G, 0.0)  # b[v]=0 for v in G# single source shortest pathsS, P, sigma, _=_single_source_shortest_path_basic(G, s)
# accumulationbetweenness, delta=_accumulate_basic(betweenness, S, P, sigma, s)
#     ray_betweenness = ray.put(betweenness)delGreturnbetweennessdefbetweenness_centrality(
G, k=None, normalized=True, weight=None, endpoints=False, seed=None):
betweenness=dict.fromkeys(G, 0.0)  # b[v]=0 for v in GifkisNone:
nodes=Gelse:
nodes=seed.sample(list(G.nodes()), k)
ray_betweenness=ray.put(betweenness)
ray_betweenness= [shortest_path_basic.remote(s, ray_betweenness) forsinnodes]
betweenness=ray.get(ray_betweenness)
#     print(betweenness[:3])# rescaling#     json.dump(betweenness, open("betweenness.json", "w", encoding="utf-8"), ensure_ascii=False)betweenness_compute= {}
forbetweenness_oneinbetweenness:
fori,jinbetweenness_one.items():
ifiinbetweenness_compute:
betweenness_compute[i]+=jelse:
betweenness_compute[i]=jbetweenness=_rescale(
betweenness_compute,
len(G),
normalized=normalized,
directed=G.is_directed(),
k=k,
endpoints=endpoints,
    )
returnbetweenness@ray.remotedeff(index):
bracket_name=ray.get(bracket_name_ray)
graph= {}
print(index)
foriintqdm(list(bracket_name)):
if (i[0], i[2]) notingraph:
graph[(i[0], i[2])] =1else:
graph[(i[0], i[2])] +=1print(len(graph))
graph_relation= []
forgraph_id, graph_countingraph.items():
ifgraph_count>1:
graph_relation.append(graph_id)
G=nx.Graph()
G.add_edges_from(graph_relation)  # 添加多条边degree_centrality_result=betweenness_centrality(G)
out= []
fori, jindegree_centrality_result.items():
out.append({"user_id": i, "weight": j})
pd.DataFrame(out).to_csv("betweenness_centrality_result"+str(index) +".csv")
deff_one():
degree_centrality_result=betweenness_centrality(G)
out= []
fori, jindegree_centrality_result.items():
out.append({"user_id": i, "weight": j})
pd.DataFrame(out).to_csv("betweenness_centrality_result.csv")
bracket_name=json.load(open("2020_extract_result_center.json", "r"))
bracket_name_ray=ray.put(bracket_name)
# futures = [f.remote(i) for i in range(len(bracket_name) // 3000)]# print(ray.get(futures))  # [0, 1, 4, 9]bracket_name=ray.get(bracket_name_ray)
graph= {}
foriintqdm(list(bracket_name)):
if (i[0], i[4]) notingraph:
graph[(i[0], i[4])] =1else:
graph[(i[0], i[4])] +=1print(len(graph))
graph_relation= []
forgraph_id, graph_countingraph.items():
ifgraph_count>1:
graph_relation.append(graph_id)
print(len(graph_relation))
graph_relation=graph_relation[:10000]
G=nx.Graph()
G.add_edges_from(graph_relation)  # 添加多条边ray_G=ray.put(G)
f_one()
相关文章
|
19天前
|
算法 调度 UED
深入理解操作系统:进程调度与优先级队列
【10月更文挑战第31天】在计算机科学的广阔天地中,操作系统扮演着枢纽的角色,它不仅管理着硬件资源,还为应用程序提供了运行的环境。本文将深入浅出地探讨操作系统的核心概念之一——进程调度,以及如何通过优先级队列来优化资源分配。我们将从基础理论出发,逐步过渡到实际应用,最终以代码示例巩固知识点,旨在为读者揭开操作系统高效管理的神秘面纱。
|
16天前
|
算法 调度 UED
深入理解操作系统:进程管理与调度策略
【10月更文挑战第34天】本文旨在探讨操作系统中至关重要的一环——进程管理及其调度策略。我们将从基础概念入手,逐步揭示进程的生命周期、状态转换以及调度算法的核心原理。文章将通过浅显易懂的语言和具体实例,引导读者理解操作系统如何高效地管理和调度进程,保证系统资源的合理分配和利用。无论你是初学者还是有一定经验的开发者,这篇文章都能为你提供新的视角和深入的理解。
38 3
|
20天前
|
算法 Linux 定位技术
Linux内核中的进程调度算法解析####
【10月更文挑战第29天】 本文深入剖析了Linux操作系统的心脏——内核中至关重要的组成部分之一,即进程调度机制。不同于传统的摘要概述,我们将通过一段引人入胜的故事线来揭开进程调度算法的神秘面纱,展现其背后的精妙设计与复杂逻辑,让读者仿佛跟随一位虚拟的“进程侦探”,一步步探索Linux如何高效、公平地管理众多进程,确保系统资源的最优分配与利用。 ####
58 4
|
21天前
|
缓存 负载均衡 算法
Linux内核中的进程调度算法解析####
本文深入探讨了Linux操作系统核心组件之一——进程调度器,着重分析了其采用的CFS(完全公平调度器)算法。不同于传统摘要对研究背景、方法、结果和结论的概述,本文摘要将直接揭示CFS算法的核心优势及其在现代多核处理器环境下如何实现高效、公平的资源分配,同时简要提及该算法如何优化系统响应时间和吞吐量,为读者快速构建对Linux进程调度机制的认知框架。 ####
|
21天前
|
消息中间件 算法 调度
深入理解操作系统:进程管理与调度策略
【10月更文挑战第29天】本文将带领读者深入探讨操作系统中的核心组件之一——进程,并分析进程管理的重要性。我们将从进程的生命周期入手,逐步揭示进程状态转换、进程调度算法以及优先级调度等关键概念。通过理论讲解与代码演示相结合的方式,本文旨在为读者提供对进程调度机制的全面理解,从而帮助读者更好地掌握操作系统的精髓。
31 1
|
21天前
|
算法 调度 UED
深入理解操作系统中的进程调度
【10月更文挑战第29天】探索进程调度的奥秘,本文将带你深入了解在操作系统中如何管理和控制多个并发执行的程序。从简单的调度算法到复杂的多级反馈队列,我们将逐步揭示如何优化系统性能和提高资源利用率。准备好一起揭开进程调度的神秘面纱吧!
|
26天前
|
算法 大数据 Linux
深入理解操作系统之进程调度算法
【10月更文挑战第24天】本文旨在通过浅显易懂的语言,带领读者深入了解操作系统中的进程调度算法。我们将从进程的基本概念出发,逐步解析进程调度的目的、重要性以及常见的几种调度算法。文章将通过比喻和实例,使复杂的技术内容变得生动有趣,帮助读者建立对操作系统进程调度机制的清晰认识。最后,我们还将探讨这些调度算法在现代操作系统中的应用和发展趋势。
|
19天前
|
算法 Linux 调度
深入理解操作系统之进程调度
【10月更文挑战第31天】在操作系统的心脏跳动中,进程调度扮演着关键角色。本文将深入浅出地探讨进程调度的机制和策略,通过比喻和实例让读者轻松理解这一复杂主题。我们将一起探索不同类型的调度算法,并了解它们如何影响系统性能和用户体验。无论你是初学者还是资深开发者,这篇文章都将为你打开一扇理解操作系统深层工作机制的大门。
26 0
|
21天前
|
算法 调度 开发者
探索操作系统的核心:进程管理与调度
【10月更文挑战第29天】本文深入探讨了操作系统中至关重要的一环——进程管理。通过浅显易懂的语言,我们将了解到什么是进程,进程如何被创建和管理,以及操作系统如何决定哪个进程应该获得CPU时间。文章还将揭示进程调度对系统性能的影响,并分享一些优化技巧。无论你是编程新手还是有一定经验的开发者,这篇文章都将为你提供宝贵的知识。
|
1月前
|
算法 Unix Linux
深入理解操作系统:进程管理与调度策略
【10月更文挑战第9天】本文将带你进入操作系统的核心,探索进程管理的奥秘。我们将从基础的概念出发,逐步深入到进程的创建、调度和同步等关键机制。通过理论与实际代码示例的结合,你将获得对操作系统中进程管理更深层次的理解和应用能力。无论你是初学者还是有一定经验的开发者,这篇文章都将为你提供新的视角和知识,让你在操作系统的学习之旅上更进一步。

热门文章

最新文章

相关实验场景

更多
下一篇
无影云桌面