Giraph源码分析(八)—— 统计每个SuperStep中参与计算的顶点数目

简介: 科研中,需要分析在每次迭代过程中参与计算的顶点数目,来进一步优化系统。比如,在SSSP的compute()方法最后一行,都会把当前顶点voteToHalt,即变为InActive状态。所以每次迭代完成后,所有顶点都是InActive状态。

作者|白松

目的:科研中,需要分析在每次迭代过程中参与计算的顶点数目,来进一步优化系统。比如,在SSSP的compute()方法最后一行,都会把当前顶点voteToHalt,即变为InActive状态。所以每次迭代完成后,所有顶点都是InActive状态。在大同步后,收到消息的顶点会被激活,变为Active状态,然后调用顶点的compute()方法。本文的目的就是统计每次迭代过程中,参与计算的顶点数目。下面附上SSSP的compute()方法:

@Override
  public void compute(Iterable messages) {
    if (getSuperstep() == 0) {
      setValue(new DoubleWritable(Double.MAX_VALUE));
    }
    double minDist = isSource() ? 0d : Double.MAX_VALUE;
    for (DoubleWritable message : messages) {
      minDist = Math.min(minDist, message.get());
    }
    if (minDist < getValue().get()) {
      setValue(new DoubleWritable(minDist));
      for (Edge edge : getEdges()) {
        double distance = minDist + edge.getValue().get();
        sendMessage(edge.getTargetVertexId(), new DoubleWritable(distance));
      }
    }
    //把顶点置为InActive状态
    voteToHalt();
  }

附:giraph中算法的终止条件是:没有活跃顶点且worker间没有消息传递。

hama-0.6.0中算法的终止条件只是:判断是否有活跃顶点。不是真正的pregel思想,半成品。

修改过程如下:

  1. org.apache.giraph.partition. PartitionStats 类

添加变量和方法,用来统计每个Partition在每个超步中参与计算的顶点数目。添加的变量和方法如下:

/** computed vertices in this partition */
private long computedVertexCount=0;
 
/**
* Increment the computed vertex count by one.
*/
public void incrComputedVertexCount() {
    ++ computedVertexCount;
}
 
/**
 * @return the computedVertexCount
 */
public long getComputedVertexCount() {
    return computedVertexCount;
}

修改readFields()和write()方法,每个方法追加最后一句。当每个Partition计算完成后,会把自己的computedVertexCount发送给Master,Mater再读取汇总。

@Override
public void readFields(DataInput input) throws IOException {
    partitionId = input.readInt();
    vertexCount = input.readLong();
    finishedVertexCount = input.readLong();
    edgeCount = input.readLong();
    messagesSentCount = input.readLong();
    //添加下条语句
    computedVertexCount=input.readLong();
}
 
@Override
public void write(DataOutput output) throws IOException {
    output.writeInt(partitionId);
    output.writeLong(vertexCount);
    output.writeLong(finishedVertexCount);
    output.writeLong(edgeCount);
    output.writeLong(messagesSentCount);
    //添加下条语句
    output.writeLong(computedVertexCount);
}
  1. org.apache.giraph.graph. GlobalStats 类

    
    添加变量和方法,用来统计每个超步中参与计算的顶点总数目,包含每个Worker上的所有Partitions。
    
    
 /** computed vertices in this partition 
  *  Add by BaiSong 
  */
  private long computedVertexCount=0;
     /**
     * @return the computedVertexCount
     */
    public long getComputedVertexCount() {
        return computedVertexCount;
    }

修改addPartitionStats(PartitionStats partitionStats)方法,增加统计computedVertexCount功能。

/**
  * Add the stats of a partition to the global stats.
  *
  * @param partitionStats Partition stats to be added.
  */
  public void addPartitionStats(PartitionStats partitionStats) {
    this.vertexCount += partitionStats.getVertexCount();
    this.finishedVertexCount += partitionStats.getFinishedVertexCount();
    this.edgeCount += partitionStats.getEdgeCount();
    //Add by BaiSong,添加下条语句
    this.computedVertexCount+=partitionStats.getComputedVertexCount();
 }

当然为了Debug方便,也可以修改该类的toString()方法(可选),修改后的如下:

public String toString() {
        return "(vtx=" + vertexCount + ", computedVertexCount="
                + computedVertexCount + ",finVtx=" + finishedVertexCount
                + ",edges=" + edgeCount + ",msgCount=" + messageCount
                + ",haltComputation=" + haltComputation + ")";
    }
  1. org.apache.giraph.graph. ComputeCallable

添加统计功能。在computePartition()方法中,添加下面一句。

if (!vertex.isHalted()) {
        context.progress();
        TimerContext computeOneTimerContext = computeOneTimer.time();
        try {
            vertex.compute(messages);
        //添加下面一句,当顶点调用完compute()方法后,就把该Partition的computedVertexCount加1
            partitionStats.incrComputedVertexCount();
        } finally {
           computeOneTimerContext.stop();
        }
……
  1. 添加Counters统计,和我的博客Giraph源码分析(七)—— 添加消息统计功能) 类似,此处不再详述。添加的类为:org.apache.giraph.counters.GiraphComputedVertex,下面附上该类的源码:

package org.apache.giraph.counters;
 
import java.util.Iterator;
import java.util.Map;
 
import org.apache.hadoop.mapreduce.Mapper.Context;
import com.google.common.collect.Maps;
 
/**
 * Hadoop Counters in group "Giraph Messages" for counting every superstep
 * message count.
 */
 
public class GiraphComputedVertex extends HadoopCountersBase {
    /** Counter group name for the giraph Messages */
    public static final String GROUP_NAME = "Giraph Computed Vertex";
 
    /** Singleton instance for everyone to use */
    private static GiraphComputedVertex INSTANCE;
 
    /** superstep time in msec */
    private final Map superstepVertexCount;
 
    private GiraphComputedVertex(Context context) {
        super(context, GROUP_NAME);
        superstepVertexCount = Maps.newHashMap();
    }
 
    /**
     * Instantiate with Hadoop Context.
     * 
     * @param context
     *            Hadoop Context to use.
     */
    public static void init(Context context) {
        INSTANCE = new GiraphComputedVertex(context);
    }
 
    /**
     * Get singleton instance.
     * 
     * @return singleton GiraphTimers instance.
     */
    public static GiraphComputedVertex getInstance() {
        return INSTANCE;
    }
 
    /**
     * Get counter for superstep messages
     * 
     * @param superstep
     * @return
     */
    public GiraphHadoopCounter getSuperstepVertexCount(long superstep) {
        GiraphHadoopCounter counter = superstepVertexCount.get(superstep);
        if (counter == null) {
            String counterPrefix = "Superstep: " + superstep+" ";
            counter = getCounter(counterPrefix);
            superstepVertexCount.put(superstep, counter);
        }
        return counter;
    }
 
    @Override
    public Iterator iterator() {
        return superstepVertexCount.values().iterator();
    }
}
  1. 实验结果,运行程序后。会在终端输出每次迭代参与计算的顶点总数目。 测试SSSP(SimpleShortestPathsVertex类),输入图中共有9个顶点和12条边。输出结果如下:

上图测试中,共有6次迭代。红色框中,显示出了每次迭代过冲参与计算的顶点数目,依次是:9,4,4,3,4,0

解释:在第0个超步,每个顶点都是活跃的,所有共有9个顶点参与计算。在第5个超步,共有0个顶点参与计算,那么就不会向外发送消息,加上每个顶点都是不活跃的,所以算法迭代终止。

阅读更多文章请访问数澜社区

相关文章
|
6月前
|
数据采集 人工智能 监控
40.8K star!让AI帮你读懂整个互联网:Crawl4AI开源爬虫工具深度解析
Crawl4AI 是2025年GitHub上备受瞩目的开源网络爬虫工具,专为AI时代设计。它不仅能抓取网页内容,还能理解页面语义结构,生成适配大语言模型的训练数据格式。上线半年获4万+星标,应用于1200+AI项目。其功能亮点包括智能内容提取引擎、AI就绪数据管道和企业级特性,支持动态页面处理、多语言识别及分布式部署。技术架构基于Python 3.10与Scrapy框架,性能卓越,适用于AI训练数据采集、行业情报监控等场景。相比Scrapy、BeautifulSoup等传统工具,Crawl4AI在动态页面支持、PDF解析和语义分块方面更具优势
2161 0
40.8K star!让AI帮你读懂整个互联网:Crawl4AI开源爬虫工具深度解析
|
9月前
|
人工智能 搜索推荐
SoulChat2.0:低成本构建 AI 心理咨询师,华南理工开源心理咨询师数字孪生大语言模型
SoulChat2.0 是华南理工大学推出的心理咨询师数字孪生大语言模型,能够低成本、快速构建个性化咨询风格的心理健康大模型,辅助心理咨询师工作。
1325 9
SoulChat2.0:低成本构建 AI 心理咨询师,华南理工开源心理咨询师数字孪生大语言模型
|
自然语言处理 数据可视化 NoSQL
精通 Transformers(四)(3)
精通 Transformers(四)
210 0
|
JavaScript 数据可视化
JS如何优雅的实现模块自动滚动展示
【8月更文挑战第22天】JS如何优雅的实现模块自动滚动展示
261 1
JS如何优雅的实现模块自动滚动展示
|
Python
Python实用记录(十六):PyQt/PySide6联动VSCode便捷操作指南
本文提供了一份详细的PySide6与VSCode联动的操作指南,包括安装配置VSCode、安装必要的扩展、配置扩展以及编辑和运行PySide6项目。文中还提到了相关工具如uic.exe、rcc.exe和designer.exe的用途,并提供了进一步学习的资源。
1751 1
Python实用记录(十六):PyQt/PySide6联动VSCode便捷操作指南
|
移动开发 前端开发
ruoyi-nbcio-plus基于vue3的flowable流程设计器主界面升级修改
ruoyi-nbcio-plus基于vue3的flowable流程设计器主界面升级修改
287 1
|
Python
在Python中,多分支结构(if-elif-else语句)
在Python中,多分支结构(if-elif-else语句)
1573 4
|
存储 弹性计算 运维
CPU 利用率从 10% 提升至 60%:中型企业云原生成本优化实战指南
在互联网早期迅速发展时,相关领域企业更多注重于扩展业务,为了迅速占领市场,往往会投入较高的成本。然而,随着互联网人口红利逐渐消退,以及近几年的疫情影响,越来越多企业开始重视成本管理,从“粗放式经营”转变为“精细化运营”模式,成本优化成为企业重点关注事项。
833 0
CPU 利用率从 10% 提升至 60%:中型企业云原生成本优化实战指南
|
机器人 Linux 网络安全
利用GrayLog告警功能实现钉钉群机器人定时工作提醒
利用GrayLog告警功能实现钉钉群机器人定时工作提醒
558 0
利用GrayLog告警功能实现钉钉群机器人定时工作提醒
|
存储 监控 网络安全
【KVM虚拟化】· 虚拟机的冷迁移和热迁移
【KVM虚拟化】· 虚拟机的冷迁移和热迁移
1661 0