Flink实现PageRank算法

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
简介: PageRank估计是很多面试场合上镜率比较高的吧,面试Spark的时候会被问到,最近flink热,估计也会被问到吧,浪尖就在这里帮大家解决这个疑难杂症。算法常见的原题是:pagerank的算法会维护两个数据集:一个由(pageID,linkList)的元素组成,包含每个页面的相邻页面的列表;另一个由(pageID,rank)元素组成,包含每个页面的当前排序值。它按如下步骤进行计算。将每个页面的排序值初始化为1.0。在每次迭代中,对页面p,向其每个相邻页面(有直接链接的页面)发送一个值为rank(p)/numNeighbors(p)的贡献值。将每个页面的排序值设为0.15 +

PageRank估计是很多面试场合上镜率比较高的吧,面试Spark的时候会被问到,最近flink热,估计也会被问到吧,浪尖就在这里帮大家解决这个疑难杂症。

算法常见的原题是:

pagerank的算法会维护两个数据集:一个由(pageID,linkList)的元素组成,包含每个页面的相邻页面的列表;另一个由(pageID,rank)元素组成,包含每个页面的当前排序值。它按如下步骤进行计算。

将每个页面的排序值初始化为1.0。
在每次迭代中,对页面p,向其每个相邻页面(有直接链接的页面)发送一个值为rank(p)/numNeighbors(p)的贡献值。
将每个页面的排序值设为0.15 + 0.85 * contributionsReceived。
最后两个步骤会重复几个循环,在此过程中,算法会逐渐收敛于每个页面的实际PageRank值。在实际操作中,收敛通常需要大约10轮迭代。
下面废话少说直接上案例吧。

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;

import java.util.ArrayList;

import static org.apache.flink.api.java.aggregation.Aggregations.SUM;

/**
算法会维护两个数据集:一个由(pageID,linkList)的元素组成,包含每个页面的相邻页面的列表;
另一个由(pageID,rank)元素组成,包含每个页面的当前排序值。它按如下步骤进行计算:
将每个页面的排序值初始化为1.0。
在每次迭代中,对页面p,向其每个相邻页面(有直接链接的页面)发送一个值为rank(p)/numNeighbors(p)的贡献值。
将每个页面的排序值设为0.15 + 0.85 * contributionsReceived。
最后两个步骤会重复几个循环,在此过程中,算法会逐渐收敛于每个游戏拍卖平台的实际PageRank值。在实际操作中,收敛通常需要大约10轮迭代。
*/
@SuppressWarnings("serial")
public class PageRank {

private static final double DAMPENING_FACTOR = 0.85;
private static final double EPSILON = 0.0001;

// *
// PROGRAM
// *

public static void main(String[] args) throws Exception {

ParameterTool params = ParameterTool.fromArgs(args);

final int numPages = params.getInt("numPages", PageRankData.getNumberOfPages());
final int maxIterations = params.getInt("iterations", 10);

// set up execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// make the parameters available to the web ui
env.getConfig().setGlobalJobParameters(params);

// get input data
DataSet<Long> pagesInput = getPagesDataSet(env, params);
DataSet<Tuple2<Long, Long>> linksInput = getLinksDataSet(env, params);

// 初始化rank <pageID,rank>
DataSet<Tuple2<Long, Double>> pagesWithRanks = pagesInput.
    map(new RankAssigner((1.0d / numPages)));

// 获取<pageID,NeighborsLinkList>
DataSet<Tuple2<Long, Long[]>> adjacencyListInput =
    linksInput.groupBy(0).reduceGroup(new BuildOutgoingEdgeList());

// set iterative data set
IterativeDataSet<Tuple2<Long, Double>> iteration = pagesWithRanks.iterate(maxIterations);

DataSet<Tuple2<Long, Double>> newRanks = iteration
    // join pages with outgoing edges and distribute rank
    .join(adjacencyListInput).where(0).equalTo(0).flatMap(new JoinVertexWithEdgesMatch())
    // collect and sum ranks
    .groupBy(0).aggregate(SUM, 1)
    // apply dampening factor
    .map(new Dampener(DAMPENING_FACTOR, numPages));

DataSet<Tuple2<Long, Double>> finalPageRanks = iteration.closeWith(
    newRanks,
    newRanks.join(iteration).where(0).equalTo(0)
    // termination condition
    .filter(new EpsilonFilter())); //自定义了一个终止条件

// emit result
if (params.has("output")) {
  finalPageRanks.writeAsCsv(params.get("output"), "\n", " ");
  // execute program
  env.execute("Basic Page Rank Example");
} else {
  System.out.println("Printing result to stdout. Use --output to specify output path.");
  finalPageRanks.print();
}

}

// *
// USER FUNCTIONS
// *

/**

  • A map function that assigns an initial rank to all pages.

*/
public static final class RankAssigner implements MapFunction<Long, Tuple2<Long, Double>> {

Tuple2<Long, Double> outPageWithRank;

public RankAssigner(double rank) {
  this.outPageWithRank = new Tuple2<Long, Double>(-1L, rank);
}

@Override
public Tuple2<Long, Double> map(Long page) {
  outPageWithRank.f0 = page;
  return outPageWithRank;
}

}

/**

  • A reduce function that takes a sequence of edges and builds the adjacency list for the vertex where the edges
  • originate. Run as a pre-processing step.

*/
@ForwardedFields("0")
public static final class BuildOutgoingEdgeList implements GroupReduceFunction<Tuple2<Long, Long>, Tuple2<Long, Long[]>> {

private final ArrayList<Long> neighbors = new ArrayList<Long>();

@Override
public void reduce(Iterable<Tuple2<Long, Long>> values, Collector<Tuple2<Long, Long[]>> out) {
  neighbors.clear();
  Long id = 0L;

  for (Tuple2<Long, Long> n : values) {
    id = n.f0;
    neighbors.add(n.f1);
  }
  out.collect(new Tuple2<Long, Long[]>(id, neighbors.toArray(new Long[neighbors.size()])));
}

}

/**

  • Join function that distributes a fraction of a vertex's rank to all neighbors.

*/
public static final class JoinVertexWithEdgesMatch implements FlatMapFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>>, Tuple2<Long, Double>> {

@Override
public void flatMap(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Long[]>> value, Collector<Tuple2<Long, Double>> out){
  Long[] neighbors = value.f1.f1;
  double rank = value.f0.f1;
  double rankToDistribute = rank / ((double) neighbors.length);

  for (Long neighbor: neighbors) {
    out.collect(new Tuple2<Long, Double>(neighbor, rankToDistribute));
  }
}

}

/**

  • The function that applies the page rank dampening formula.

*/
@ForwardedFields("0")
public static final class Dampener implements MapFunction<Tuple2<Long, Double>, Tuple2<Long, Double>> {

private final double dampening;
private final double randomJump;

public Dampener(double dampening, double numVertices) {
  this.dampening = dampening;
  this.randomJump = (1 - dampening) / numVertices;
}

@Override
public Tuple2<Long, Double> map(Tuple2<Long, Double> value) {
  value.f1 = (value.f1 * dampening) + randomJump;
  return value;
}

}

/**

  • Filter that filters vertices where the rank difference is below a threshold.

*/
public static final class EpsilonFilter implements FilterFunction<Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>>> {

@Override
public boolean filter(Tuple2<Tuple2<Long, Double>, Tuple2<Long, Double>> value) {
  return Math.abs(value.f0.f1 - value.f1.f1) > EPSILON;
}

}

// *
// UTIL METHODS
// *

private static DataSet getPagesDataSet(ExecutionEnvironment env, ParameterTool params) {

if (params.has("pages")) {
  return env.readCsvFile(params.get("pages"))
    .fieldDelimiter(" ")
    .lineDelimiter("\n")
    .types(Long.class)
    .map(new MapFunction<Tuple1<Long>, Long>() {
      @Override
      public Long map(Tuple1<Long> v) {
        return v.f0;
      }
    });
} else {
  System.out.println("Executing PageRank example with default pages data set.");
  System.out.println("Use --pages to specify file input.");
  return PageRankData.getDefaultPagesDataSet(env);
}

}

private static DataSet<Tuple2<Long, Long>> getLinksDataSet(ExecutionEnvironment env, ParameterTool params) {

if (params.has("links")) {
  return env.readCsvFile(params.get("links"))
    .fieldDelimiter(" ")
    .lineDelimiter("\n")
    .types(Long.class, Long.class);
} else {
  System.out.println("Executing PageRank example with default links data set.");
  System.out.println("Use --links to specify file input.");
  return PageRankData.getDefaultEdgeDataSet(env);
}

}
}

public class PageRankData {

public static final Object[][] EDGES = {

{1L, 2L},
{1L, 15L},
{2L, 3L},
{2L, 4L},
{2L, 5L},
{2L, 6L},
{2L, 7L},
{3L, 13L},
{4L, 2L},
{5L, 11L},
{5L, 12L},
{6L, 1L},
{6L, 7L},
{6L, 8L},
{7L, 1L},
{7L, 8L},
{8L, 1L},
{8L, 9L},
{8L, 10L},
{9L, 14L},
{9L, 1L},
{10L, 1L},
{10L, 13L},
{11L, 12L},
{11L, 1L},
{12L, 1L},
{13L, 14L},
{14L, 12L},
{15L, 1L},

};

private static int numPages = 15;

public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env) {

List<Tuple2<Long, Long>> edges = new ArrayList<Tuple2<Long, Long>>();
for (Object[] e : EDGES) {
  edges.add(new Tuple2<Long, Long>((Long) e[0], (Long) e[1]));
}
return env.fromCollection(edges);

}

public static DataSet getDefaultPagesDataSet(ExecutionEnvironment env) {

return env.generateSequence(1, 15);

}

public static int getNumberOfPages() {

return numPages;

}

}
是不是很简单,大家可以自己搞个案例测试一下哦~

相关实践学习
基于Hologres轻松玩转一站式实时仓库
本场景介绍如何利用阿里云MaxCompute、实时计算Flink和交互式分析服务Hologres开发离线、实时数据融合分析的数据大屏应用。
Linux入门到精通
本套课程是从入门开始的Linux学习课程,适合初学者阅读。由浅入深案例丰富,通俗易懂。主要涉及基础的系统操作以及工作中常用的各种服务软件的应用、部署和优化。即使是零基础的学员,只要能够坚持把所有章节都学完,也一定会受益匪浅。
目录
相关文章
|
4天前
|
机器学习/深度学习 数据采集 自然语言处理
经典机器学习算法——Pagerank算法(二)
PageRank 算法由 Google 创始人 Larry Page 在斯坦福读大学时提出,又称 PR——佩奇排名。主要针对网页进行排名,计算网站的重要性,优化搜索引擎的搜索结果。PR 值是表示其重要性的因子
|
4天前
|
机器学习/深度学习 数据采集 算法
经典机器学习算法——Pagerank算法(一)
PageRank 算法由 Google 创始人 Larry Page 在斯坦福读大学时提出,又称 PR——佩奇排名。主要针对网页进行排名,计算网站的重要性,优化搜索引擎的搜索结果。PR 值是表示其重要性的因子
经典机器学习算法——Pagerank算法(一)
|
2月前
|
存储 SQL 算法
flink cdc 算法问题之low hign点位有重叠如何解决
Flink CDC(Change Data Capture)是一个基于Apache Flink的实时数据变更捕获库,用于实现数据库的实时同步和变更流的处理;在本汇总中,我们组织了关于Flink CDC产品在实践中用户经常提出的问题及其解答,目的是辅助用户更好地理解和应用这一技术,优化实时数据处理流程。
|
4月前
|
机器学习/深度学习 算法 搜索推荐
Flink中的流式机器学习是什么?请解释其作用和常用算法。
Flink中的流式机器学习是什么?请解释其作用和常用算法。
51 0
|
4月前
|
算法 搜索推荐 Java
图计算中的PageRank算法是什么?请解释其作用和计算原理。
图计算中的PageRank算法是什么?请解释其作用和计算原理。
29 0
|
7月前
|
消息中间件 存储 算法
Flink---13、容错机制(检查点(保存、恢复、算法、配置)、状态一致性、端到端精确一次)
Flink---13、容错机制(检查点(保存、恢复、算法、配置)、状态一致性、端到端精确一次)
|
10月前
|
存储 SQL Kubernetes
GeaFlow图计算快速上手之PageRank算法
GeaFlow图计算快速上手之PageRank算法
GeaFlow图计算快速上手之PageRank算法
|
机器学习/深度学习 算法 数据可视化
【复杂网络建模】——Python可视化重要节点识别(PageRank算法)
【复杂网络建模】——Python可视化重要节点识别(PageRank算法)
556 0
|
机器学习/深度学习 移动开发 算法
大数据分析实验,包含五个子实验:wordCount实验,PageRank实验,关系挖掘实验,k-means算法,推荐系统算法。(下)
大数据分析实验,包含五个子实验:wordCount实验,PageRank实验,关系挖掘实验,k-means算法,推荐系统算法。(下)
255 0
大数据分析实验,包含五个子实验:wordCount实验,PageRank实验,关系挖掘实验,k-means算法,推荐系统算法。(下)
|
分布式计算 算法 搜索推荐
大数据分析实验,包含五个子实验:wordCount实验,PageRank实验,关系挖掘实验,k-means算法,推荐系统算法。(上)
大数据分析实验,包含五个子实验:wordCount实验,PageRank实验,关系挖掘实验,k-means算法,推荐系统算法。
319 0
大数据分析实验,包含五个子实验:wordCount实验,PageRank实验,关系挖掘实验,k-means算法,推荐系统算法。(上)