本教程设计到多进程框架ray与networkx的图计算框架。
原生的networkx实现的只能在节点介数度量性任务上达到单核心100的cpu利用率。通过对源码的几行改造我们可以实现多核心的100的利用率。接下来要我们来一起看看是如何实现的多核心100的利用率。
deff_one(): degree_centrality_result=betweenness_centrality(G) out= [] fori, jindegree_centrality_result.items(): out.append({"user_id": i, "weight": j}) pd.DataFrame(out).to_csv("betweenness_centrality_result.csv") # 读取2020年企业年报因果关系数据集。bracket_name=json.load(open("2020_extract_result_center.json", "r")) graph= {} foriintqdm(list(bracket_name)): if (i[0], i[4]) notingraph: graph[(i[0], i[4])] =1else: graph[(i[0], i[4])] +=1print(len(graph)) graph_relation= [] forgraph_id, graph_countingraph.items(): ifgraph_count>1: graph_relation.append(graph_id) print(len(graph_relation)) graph_relation=graph_relation[:10000] G=nx.Graph() G.add_edges_from(graph_relation) # 添加多条边ray_G=ray.put(G) f_one()
心细的小伙伴可能已经发现了,我们在代码中有一个ray.put的操作,对G这个无向图图谱数据集进行了多进程的准备。那么接下来,我们需要解释一下什么叫做介数中心性。在一张无向图图谱中存在着海量的节点。每一个节点到非相邻的节点都存在着一条最短路径。在介数中心性这个算法中,当前节点出现在无向图图谱所有的最短路径中出现的次数越多意味着节点的重要性越高。通过百度搜索我们可以知道,介数中心性指标在航海、飞行航路场景中有着重要的应用。在百度开源的hugegraph图数据库白皮书中介绍到介数重要性可以作为反洗钱的重要算法、社区发现可以进行团伙反欺诈。
那么源码如何实现的介数中心性这个指标的呢。我们慢慢往下看。
defbetweenness_centrality( G, k=None, normalized=True, weight=None, endpoints=False, seed=None): betweenness=dict.fromkeys(G, 0.0) # b[v]=0 for v in GifkisNone: nodes=Gelse: nodes=seed.sample(list(G.nodes()), k) ray_betweenness= [shortest_path_basic.remote(s, ray_betweenness) forsinnodes] betweenness=ray.get(ray_betweenness) betweenness_compute= {} forbetweenness_oneinbetweenness: fori,jinbetweenness_one.items(): ifiinbetweenness_compute: betweenness_compute[i]+=jelse: betweenness_compute[i]=jbetweenness=_rescale( betweenness_compute, len(G), normalized=normalized, directed=G.is_directed(), k=k, endpoints=endpoints, ) returnbetweenness
其中k代表着节点取样数,节点取样数量越少计算速度越快。(因为通过节点进行最短路径的遍历过程最少。)
首先我们需要定义一个betweeness的字典。用以储存每一个节点在其所经过的最短路径中的次数。
第二我们需要遍历所有的节点,用以在计算最短路径这个事情上获取到每一个节点所在的最短路径。
第三我们将每一个节点造成的最短路径的结果给重新合并到一个字典上。
第四,通过rescale重新对我们的所有节点结果进行汇总计算。
那么接下来让我们看看重头戏寻找当前节点的最短路径的代码我们是怎么进行修改的。
remote .defshortest_path_basic(s, ray_betweenness=None): G=ray.get(ray_G) betweenness=dict.fromkeys(G, 0.0) # b[v]=0 for v in G# single source shortest pathsS, P, sigma, _=_single_source_shortest_path_basic(G, s) # accumulationbetweenness, delta=_accumulate_basic(betweenness, S, P, sigma, s) # ray_betweenness = ray.put(betweenness)delGreturnbetweenness
首先我们从ray的大对象共享代理池中把我们的图拉出来。
第二我们构建一个包含所有节点为key的字典。
第三输入图谱数据G和节点s。通过s来计算所覆盖到的最短路径。
第四我们对所产生的betweenness字典对象进行积累。
第五,我们为了节约内存,所以删掉了特别占用内存的图谱数据G。
第六,我们将累计好的结果返回。
接下来我们就可以通过对基于节点的最短路径查找出来的节点权重进行权重的计算了。
整体代码如下,感兴趣的小伙伴们快来试一试这样实现的单机多进程betweenness节点介数中心性的效果吧。
importjsonimportnetworkxasnximportpandasaspdfromcollectionsimportdequefromnetworkx.algorithms.shortest_paths.weightedimport_weight_functionfromtqdmimporttqdm# helpers for betweenness centralitydef_single_source_shortest_path_basic(G, s): S= [] P= {} forvinG: P[v] = [] sigma=dict.fromkeys(G, 0.0) # sigma[v]=0 for v in GD= {} sigma[s] =1.0D[s] =0Q=deque([s]) whileQ: # use BFS to find shortest pathsv=Q.popleft() S.append(v) Dv=D[v] sigmav=sigma[v] forwinG[v]: ifwnotinD: Q.append(w) D[w] =Dv+1ifD[w] ==Dv+1: # this is a shortest path, count pathssigma[w] +=sigmavP[w].append(v) # predecessorsreturnS, P, sigma, Ddef_accumulate_basic(betweenness, S, P, sigma, s): delta=dict.fromkeys(S, 0) whileS: w=S.pop() coeff= (1+delta[w]) /sigma[w] forvinP[w]: delta[v] +=sigma[v] *coeffifw!=s: betweenness[w] +=delta[w] returnbetweenness, deltadef_accumulate_endpoints(betweenness, S, P, sigma, s): betweenness[s] +=len(S) -1delta=dict.fromkeys(S, 0) whileS: w=S.pop() coeff= (1+delta[w]) /sigma[w] forvinP[w]: delta[v] +=sigma[v] *coeffifw!=s: betweenness[w] +=delta[w] +1returnbetweenness, deltadef_accumulate_edges(betweenness, S, P, sigma, s): delta=dict.fromkeys(S, 0) whileS: w=S.pop() coeff= (1+delta[w]) /sigma[w] forvinP[w]: c=sigma[v] *coeffif (v, w) notinbetweenness: betweenness[(w, v)] +=celse: betweenness[(v, w)] +=cdelta[v] +=cifw!=s: betweenness[w] +=delta[w] returnbetweennessdef_rescale(betweenness, n, normalized, directed=False, k=None, endpoints=False): ifnormalized: ifendpoints: ifn<2: scale=None# no normalizationelse: # Scale factor should include endpoint nodesscale=1/ (n* (n-1)) elifn<=2: scale=None# no normalization b=0 for all nodeselse: scale=1/ ((n-1) * (n-2)) else: # rescale by 2 for undirected graphsifnotdirected: scale=0.5else: scale=NoneifscaleisnotNone: ifkisnotNone: scale=scale*n/kforvinbetweenness: betweenness[v] *=scalereturnbetweennessdef_rescale_e(betweenness, n, normalized, directed=False, k=None): ifnormalized: ifn<=1: scale=None# no normalization b=0 for all nodeselse: scale=1/ (n* (n-1)) else: # rescale by 2 for undirected graphsifnotdirected: scale=0.5else: scale=NoneifscaleisnotNone: ifkisnotNone: scale=scale*n/kforvinbetweenness: betweenness[v] *=scalereturnbetweennessimportrayray.init() remote .defshortest_path_basic(s, ray_betweenness=None): G=ray.get(ray_G) betweenness=dict.fromkeys(G, 0.0) # b[v]=0 for v in G# single source shortest pathsS, P, sigma, _=_single_source_shortest_path_basic(G, s) # accumulationbetweenness, delta=_accumulate_basic(betweenness, S, P, sigma, s) # ray_betweenness = ray.put(betweenness)delGreturnbetweennessdefbetweenness_centrality( G, k=None, normalized=True, weight=None, endpoints=False, seed=None): betweenness=dict.fromkeys(G, 0.0) # b[v]=0 for v in GifkisNone: nodes=Gelse: nodes=seed.sample(list(G.nodes()), k) ray_betweenness=ray.put(betweenness) ray_betweenness= [shortest_path_basic.remote(s, ray_betweenness) forsinnodes] betweenness=ray.get(ray_betweenness) # print(betweenness[:3])# rescaling# json.dump(betweenness, open("betweenness.json", "w", encoding="utf-8"), ensure_ascii=False)betweenness_compute= {} forbetweenness_oneinbetweenness: fori,jinbetweenness_one.items(): ifiinbetweenness_compute: betweenness_compute[i]+=jelse: betweenness_compute[i]=jbetweenness=_rescale( betweenness_compute, len(G), normalized=normalized, directed=G.is_directed(), k=k, endpoints=endpoints, ) returnbetweennessremote .deff(index): bracket_name=ray.get(bracket_name_ray) graph= {} print(index) foriintqdm(list(bracket_name)): if (i[0], i[2]) notingraph: graph[(i[0], i[2])] =1else: graph[(i[0], i[2])] +=1print(len(graph)) graph_relation= [] forgraph_id, graph_countingraph.items(): ifgraph_count>1: graph_relation.append(graph_id) G=nx.Graph() G.add_edges_from(graph_relation) # 添加多条边degree_centrality_result=betweenness_centrality(G) out= [] fori, jindegree_centrality_result.items(): out.append({"user_id": i, "weight": j}) pd.DataFrame(out).to_csv("betweenness_centrality_result"+str(index) +".csv") deff_one(): degree_centrality_result=betweenness_centrality(G) out= [] fori, jindegree_centrality_result.items(): out.append({"user_id": i, "weight": j}) pd.DataFrame(out).to_csv("betweenness_centrality_result.csv") bracket_name=json.load(open("2020_extract_result_center.json", "r")) bracket_name_ray=ray.put(bracket_name) # futures = [f.remote(i) for i in range(len(bracket_name) // 3000)]# print(ray.get(futures)) # [0, 1, 4, 9]bracket_name=ray.get(bracket_name_ray) graph= {} foriintqdm(list(bracket_name)): if (i[0], i[4]) notingraph: graph[(i[0], i[4])] =1else: graph[(i[0], i[4])] +=1print(len(graph)) graph_relation= [] forgraph_id, graph_countingraph.items(): ifgraph_count>1: graph_relation.append(graph_id) print(len(graph_relation)) graph_relation=graph_relation[:10000] G=nx.Graph() G.add_edges_from(graph_relation) # 添加多条边ray_G=ray.put(G) f_one()