开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

大家有遇到Flink这问题吗?

大家有遇到Flink这问题吗?在1.15.4里报错。Caused by: java.util.concurrent.CompletionException: org.apache.flink.api.common.InvalidProgramException: The job graph is cyclic.
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1592)

展开
收起
真的很搞笑 2024-01-04 12:19:14 72 0
2 条回答
写回答
取消 提交回答
  • 这个问题可能是由于Flink的任务图(job graph)出现了循环。在Flink中,任务图是由数据流和转换操作组成的,如果任务图中出现了循环,那么Flink将无法确定任务的执行顺序,从而导致无法生成有效的执行计划。

    出现这种情况的原因可能有以下几种:

    1. 数据流连接错误:你可能在两个数据流之间建立了错误的连接,导致数据流形成了循环。

    2. 转换操作嵌套错误:你可能在一个转换操作中嵌套了另一个转换操作,导致转换操作形成了循环。

    为了解决这个问题,你需要检查你的代码,确保所有的数据流和转换操作都是正确的,没有任何循环。你可以从你的数据的源头开始,逐步检查每一个转换操作和数据流,直到找到问题的根源。

    如果问题仍然存在,你可能需要提供更详细的代码信息,以便我们能够更好地帮助你解决问题。

    2024-01-05 16:11:24
    赞同 展开评论 打赏
  • 面对过去,不要迷离;面对未来,不必彷徨;活在今天,你只要把自己完全展示给别人看。

    这个问题是由于Flink作业图中存在循环依赖导致的。在Flink中,作业图是一个有向无环图(DAG),如果作业图中存在循环依赖,那么Flink将无法正确执行作业。

    为了解决这个问题,您需要检查您的Flink作业代码,确保作业图中没有循环依赖。您可以使用Flink的拓扑排序功能来检查作业图是否存在循环依赖。以下是一个使用Java API进行拓扑排序的示例:

    import org.apache.flink.api.common.JobVertex;
    import org.apache.flink.graph.DefaultEdge;
    import org.apache.flink.graph.Graph;
    import org.apache.flink.util.Collector;
    
    import java.util.ArrayList;
    import java.util.List;
    import java.util.Map;
    import java.util.concurrent.ConcurrentHashMap;
    
    public class TopologicalSortExample {
        public static void main(String[] args) throws Exception {
            // 创建一个作业图
            Graph<JobVertex, DefaultEdge> graph = new Graph<>();
            Map<JobVertex, Integer> inDegrees = new ConcurrentHashMap<>();
    
            // 添加作业顶点和边到作业图中
            // ...
    
            // 计算作业顶点的入度
            for (JobVertex vertex : graph.getVertices()) {
                inDegrees.put(vertex, graph.inDegreeOf(vertex));
            }
    
            // 拓扑排序
            List<JobVertex> sortedVertices = topologicalSort(graph, inDegrees);
    
            // 输出排序后的作业顶点
            for (JobVertex vertex : sortedVertices) {
                System.out.println(vertex.getName());
            }
        }
    
        private static <K, VV, EV> List<K> topologicalSort(Graph<K, EV> graph, Map<K, Integer> inDegrees) {
            List<K> result = new ArrayList<>();
            List<K> zeroInDegreeVertices = new ArrayList<>();
    
            // 初始化入度为0的顶点列表
            for (K vertex : graph.getVertices()) {
                if (inDegrees.get(vertex) == 0) {
                    zeroInDegreeVertices.add(vertex);
                }
            }
    
            // 执行拓扑排序
            while (!zeroInDegreeVertices.isEmpty()) {
                K currentVertex = zeroInDegreeVertices.remove(0);
                result.add(currentVertex);
    
                // 更新相邻顶点的入度
                for (K neighbor : graph.getNeighborsOf(currentVertex)) {
                    inDegrees.put(neighbor, inDegrees.get(neighbor) - 1);
                    if (inDegrees.get(neighbor) == 0) {
                        zeroInDegreeVertices.add(neighbor);
                    }
                }
            }
    
            return result;
        }
    }
    

    请根据您的实际作业代码修改上述示例,并确保作业图中没有循环依赖。

    2024-01-04 16:37:24
    赞同 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

热门讨论

热门文章

相关电子书

更多
低代码开发师(初级)实战教程 立即下载
冬季实战营第三期:MySQL数据库进阶实战 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载