开发者社区 > 大数据与机器学习 > 实时计算 Flink > 正文

flinkcdc模式去读holo表的binlog的时候报错,怎么解决?

flinkcdc模式去读holo表的binlog的时候报错,怎么解决?Realtime ingestion service (holohub) not enabled or its endpoint invalid. org.apache.flink.table.api.ValidationException: SQL validation failed. Realtime ingestion service (holohub) not enabled or its endpoint invalid.
at org.apache.flink.table.sqlserver.utils.FormatValidatorExceptionUtils.newValidationException(FormatValidatorExceptionUtils.java:41)
at org.apache.flink.table.sqlserver.utils.ErrorConverter.formatException(ErrorConverter.java:125)
at org.apache.flink.table.sqlserver.utils.ErrorConverter.toErrorDetail(ErrorConverter.java:60)
at org.apache.flink.table.sqlserver.utils.ErrorConverter.toGrpcException(ErrorConverter.java:54)
at org.apache.flink.table.sqlserver.FlinkSqlServiceImpl.validateAndGeneratePlan(FlinkSqlServiceImpl.java:1077)
at org.apache.flink.table.sqlserver.proto.FlinkSqlServiceGrpc$MethodHandlers.invoke(FlinkSqlServiceGrpc.java:3692)
at io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:172)
at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:331)
at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:820)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:123)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1147)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:622)
at java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.RuntimeException: Realtime ingestion service (holohub) not enabled or its endpoint invalid.
at com.alibaba.ververica.connectors.hologres.utils.JDBCUtils.getHolohubEndpoint(JDBCUtils.java:206)
at com.alibaba.ververica.connectors.hologres.source.HologresTableSource.getScanRuntimeProvider(HologresTableSource.java:354)
at org.apache.flink.table.planner.connectors.DynamicSourceUtils.validateScanSource(DynamicSourceUtils.java:580)
at org.apache.flink.table.planner.connectors.DynamicSourceUtils.prepareDynamicSource(DynamicSourceUtils.java:246)
at org.apache.flink.table.planner.connectors.DynamicSourceUtils.convertSourceToRel(DynamicSourceUtils.java:176)
at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.java:118)
at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3619)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2523)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2156)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2105)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2062)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:666)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:647)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3472)
at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:573)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.java:336)
at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.java:329)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:1398)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:1326)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(SqlToOperationConverter.java:384)
at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:274)
at org.apache.flink.table.planner.delegation.OperationIterator.convertNext(OperationIterator.java:102)
at org.apache.flink.table.planner.delegation.OperationIterator.next(OperationIterator.java:86)
at org.apache.flink.table.planner.delegation.OperationIterator.next(OperationIterator.java:49)
at org.apache.flink.table.sqlserver.execution.OperationExecutorImpl.validate(OperationExecutorImpl.java:396)
at org.apache.flink.table.sqlserver.execution.OperationExecutorImpl.validateAndGeneratePlan(OperationExecutorImpl.java:357)
at org.apache.flink.table.sqlserver.execution.DelegateOperationExecutor.lambda$validateAndGeneratePlan$29(DelegateOperationExecutor.java:263)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1729)
at org.apache.flink.table.sqlserver.context.SqlServerSecurityContext.runSecured(SqlServerSecurityContext.java:72)
at org.apache.flink.table.sqlserver.execution.DelegateOperationExecutor.wrapClassLoader(DelegateOperationExecutor.java:311)
at org.apache.flink.table.sqlserver.execution.DelegateOperationExecutor.lambda$wrapExecutor$35(DelegateOperationExecutor.java:333)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
... 3 more

展开
收起
三分钟热度的鱼 2024-05-16 08:34:58 24 0
1 条回答
写回答
取消 提交回答
  • 使用这个方式创建临时表,就没问题了。CREATE TEMPORARY TABLE IF NOT EXISTS source_student(
    id int,
    。。。
    PRIMARY KEY(id) NOT ENFORCED
    )WITH(
    'connector' = 'hologres',
    'dbname' = 'holo_test', --Hologres的数据库名称
    'tablename' = 'public.source_student', --Hologres用于接收数据的表名称
    'username' = '', --当前阿里云账号的AccessKey ID
    'password' = '
    ', --当前阿里云账号的AccessKey Secret
    'endpoint' = 'hgpostcn-**ncs.com:80', --当前Hologres实例VPC网络的Endpoint
    'binlogStartUpMode' = 'initial', -- 先读取历史全量数据,再增量消费Binlog。
    'binlog' = 'true', -- 开启开启实时同步
    'cdcMode' = 'true', -- 开启开启实时同步
    'binlogMaxRetryTimes' = '10', -- 开启开启实时同步
    'binlogRetryIntervalMs' = '500', -- 开启开启实时同步
    'binlogBatchReadSize' = '100', -- 开启开启实时同步
    'jdbcretrycount' = '1', --连接故障时的重试次数
    'partitionrouter' = 'true', --是否写入分区表
    'createparttable' = 'true', --是否自动创建分区
    'mutatetype' = 'insertorignore' --数据写入模式
    );
    -- 设置表属性开启Binlog功能
    begin;
    call set_table_property('source_student', 'binlog.level', 'replica');
    commit;

    -- 设置表属性,配置Binlog TTL时间,单位秒
    begin;
    call set_table_property('source_student', 'binlog.ttl', '2592000');
    commit; 此回答整理自钉群“实时计算Flink产品交流群”

    2024-05-16 09:16:17
    赞同 4 展开评论 打赏

实时计算Flink版是阿里云提供的全托管Serverless Flink云服务,基于 Apache Flink 构建的企业级、高性能实时大数据处理系统。提供全托管版 Flink 集群和引擎,提高作业开发运维效率。

相关电子书

更多
PolarDB-X 2.0 全局 Binlog 与备份恢复能 立即下载
低代码开发师(初级)实战教程 立即下载
阿里巴巴DevOps 最佳实践手册 立即下载