flink测试redis sink报错

本文涉及的产品
实时计算 Flink 版,5000CU*H 3个月
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
简介: flink测试redis sink报错

具体报错

1    [Source: Custom Source -> Sink: Unnamed (1/1)#0] ERROR org.apache.flink.streaming.connectors.redis.RedisSink  - Redis has not been properly initialized: 
redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
  at redis.clients.util.Pool.getResource(Pool.java:50)
  at redis.clients.jedis.JedisPool.getResource(JedisPool.java:99)
  at org.apache.flink.streaming.connectors.redis.common.container.RedisContainer.getInstance(RedisContainer.java:250)
  at org.apache.flink.streaming.connectors.redis.common.container.RedisContainer.open(RedisContainer.java:85)
  at org.apache.flink.streaming.connectors.redis.RedisSink.open(RedisSink.java:174)
  at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
  at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
  at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)
  at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:574)
  at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:554)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
  at java.lang.Thread.run(Thread.java:750)
Caused by: redis.clients.jedis.exceptions.JedisConnectionException: java.net.ConnectException: Connection refused: connect
  at redis.clients.jedis.Connection.connect(Connection.java:164)
  at redis.clients.jedis.BinaryClient.connect(BinaryClient.java:80)
  at redis.clients.jedis.BinaryJedis.connect(BinaryJedis.java:1676)
  at redis.clients.jedis.JedisFactory.makeObject(JedisFactory.java:87)
  at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:861)
  at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:435)
  at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:363)
  at redis.clients.util.Pool.getResource(Pool.java:48)
  ... 14 more
Caused by: java.net.ConnectException: Connection refused: connect
  at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
  at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
  at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
  at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
  at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
  at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
  at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
  at java.net.Socket.connect(Socket.java:607)
  at redis.clients.jedis.Connection.connect(Connection.java:158)
  ... 21 more
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
  at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
  at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137)
  at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616)
  at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
  at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
  at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
  at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237)
  at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774)
  at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750)
  at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
  at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975)
  at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1081)
  at akka.dispatch.OnComplete.internal(Future.scala:264)
  at akka.dispatch.OnComplete.internal(Future.scala:261)
  at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191)
  at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188)
  at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
  at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73)
  at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
  at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
  at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
  at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
  at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573)
  at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22)
  at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21)
  at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
  at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
  at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
  at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
  at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55)
  at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
  at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
  at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91)
  at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40)
  at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44)
  at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
  at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
  at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
  at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
  at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)
  at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)
  at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207)
  at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197)
  at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188)
  at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677)
  at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)
  at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)
  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
  at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)
  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)
  at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
  at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
  at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
  at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
  at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
  at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
  at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
  at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
  at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
  at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
  at akka.actor.Actor.aroundReceive(Actor.scala:517)
  at akka.actor.Actor.aroundReceive$(Actor.scala:515)
  at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
  at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
  at akka.actor.ActorCell.invoke(ActorCell.scala:561)
  at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
  at akka.dispatch.Mailbox.run(Mailbox.scala:225)
  at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
  ... 4 more
Caused by: redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool
  at redis.clients.util.Pool.getResource(Pool.java:50)
  at redis.clients.jedis.JedisPool.getResource(JedisPool.java:99)
  at org.apache.flink.streaming.connectors.redis.common.container.RedisContainer.getInstance(RedisContainer.java:250)
  at org.apache.flink.streaming.connectors.redis.common.container.RedisContainer.open(RedisContainer.java:85)
  at org.apache.flink.streaming.connectors.redis.RedisSink.open(RedisSink.java:174)
  at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34)
  at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
  at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46)
  at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:574)
  at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:554)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
  at java.lang.Thread.run(Thread.java:750)
Caused by: redis.clients.jedis.exceptions.JedisConnectionException: java.net.ConnectException: Connection refused: connect
  at redis.clients.jedis.Connection.connect(Connection.java:164)
  at redis.clients.jedis.BinaryClient.connect(BinaryClient.java:80)
  at redis.clients.jedis.BinaryJedis.connect(BinaryJedis.java:1676)
  at redis.clients.jedis.JedisFactory.makeObject(JedisFactory.java:87)
  at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:861)
  at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:435)
  at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:363)
  at redis.clients.util.Pool.getResource(Pool.java:48)
  ... 14 more
Caused by: java.net.ConnectException: Connection refused: connect
  at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method)
  at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85)
  at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
  at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
  at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
  at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172)
  at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
  at java.net.Socket.connect(Socket.java:607)
  at redis.clients.jedis.Connection.connect(Connection.java:158)
  ... 21 more

根据报错定位到一个错误栈:Caused by: redis.clients.jedis.exceptions.JedisConnectionException: java.net.ConnectException: Connection refused: connect


应该是redis连接不上,确认配置(host、password、port、timeout等)没有问题之后检查逐步排查其他问题


1、先检查了redis状态

[root@master src]# ps -aux | grep redis
root      11455  0.0  0.0 162396  7796 ?        Ssl  20:42   0:01 redis-server 127.0.0.1:6379
root      58166  0.0  0.0 112828   988 pts/0    S+   21:19   0:00 grep --color=auto redis
[root@master src]# netstat -nltp | grep redis
tcp        0      0 127.0.0.1:6379          0.0.0.0:*               LISTEN      11455/redis-server
[root@master src]# redis-cli
127.0.0.1:6379>

redis正常状态且能正常使用

2、检查防火墙是否关闭或者是否放行redis端口(redis默认端口6379)

[root@master src]# systemctl status firewalld
● firewalld.service - firewalld - dynamic firewall daemon
   Loaded: loaded (/usr/lib/systemd/system/firewalld.service; disabled; vendor preset: enabled)
   Active: inactive (dead)
     Docs: man:firewalld(1)
[root@master src]#

防火墙为关闭状态

3、查看redis配置里的保护模式是否关闭(redis3.2版本以上有这个属性)

查看redis配置里的bind是否关闭

[root@master redis-6.0.8]# cat redis.conf | grep protect
# Protected mode is a layer of security protection, in order to avoid that
# When protected mode is on and if:
# By default protected mode is enabled. You should disable it only if
protected-mode yes
# If the master is password protected (using the "requirepass" configuration
# on the internet. It's just a protection layer against misuse of the instance.
# So use the 'requirepass' option to protect your instance.
[root@master redis-6.0.8] # cat redis.conf |grep bind
# By default, if no "bind" configuration directive is specified, Redis listens
# the "bind" configuration directive, followed by one or more IP addresses.
# bind 192.168.1.100 10.0.0.1
# bind 127.0.0.1 ::1
# internet, binding to all the interfaces is dangerous and will expose the
# following bind directive, that will force Redis to listen only into
bind 127.0.0.1
# 1) The server is not binding explicitly to a set of addresses using the
#    "bind" directive.
# are explicitly listed using the "bind" directive.

可以看到redis保护模式合bind 参数是开启的

  • 将配置中的yes改为no
  • 将bind哪一行注释掉
  • 之后重新启动即可解决问题
相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore     ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库 ECS 实例和一台目标数据库 RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
相关文章
|
12天前
|
Java 测试技术 Maven
Spring Boot单元测试报错java.lang.IllegalStateException: Could not load TestContextBootstrapper [null]
Spring Boot单元测试报错java.lang.IllegalStateException: Could not load TestContextBootstrapper [null]
|
12天前
|
数据采集 DataWorks 关系型数据库
DataWorks操作报错合集之在DataWorks运行任务时出现链接超时,但在测试连通性时显示正常连通是什么原因导致的
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
22 0
|
12天前
|
DataWorks NoSQL 关系型数据库
DataWorks操作报错合集之在使用 DataWorks 进行 MongoDB 同步时遇到了连通性测试失败,实例配置和 MongoDB 白名单配置均正确,且同 VPC 下 MySQL 可以成功连接并同步,但 MongoDB 却无法完成同样的操作如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
30 1
|
13天前
|
SQL DataWorks Java
DataWorks操作报错合集之在阿里云 DataWorks 中,代码在开发测试阶段能够成功运行,但在提交后失败并报错“不支持https”如何解决
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
26 1
DataWorks操作报错合集之在阿里云 DataWorks 中,代码在开发测试阶段能够成功运行,但在提交后失败并报错“不支持https”如何解决
|
17天前
|
消息中间件 关系型数据库 MySQL
[flink 实时流基础] 输出算子(Sink)
[flink 实时流基础] 输出算子(Sink)
|
24天前
|
缓存 NoSQL 中间件
redis内存溢出报错--OOM command not allowed when used memory > 'maxmemory'
该内容是关于Redis缓存服务器的使用指南。通过Xshell连接IP地址为25.218.153.193或206的主机,进入/data/iuap/middleware/redis-30001/bin目录,使用`redis-cli`连接到IP为206的30003端口。登录时需`auth yonyou*123`,可运行`info`和`info memory`查看状态,`flushall`清理缓存。在清理前,要备份/data/iuap/middleware/redis-30003/data/下的.aof和.rdb文件,利用tar命令打包并移至/tmp目录。
|
2月前
|
测试技术 数据库连接 数据库
测试环境的数据库连不了,打包报错怎么办
测试环境的数据库连不了,打包报错怎么办
16 0
|
2月前
|
算法 物联网 网络安全
MQTT常见问题之使用MQTTSendMessage2MQTT.py测试报错如何解决
MQTT(Message Queuing Telemetry Transport)是一个轻量级的、基于发布/订阅模式的消息协议,广泛用于物联网(IoT)中设备间的通信。以下是MQTT使用过程中可能遇到的一些常见问题及其答案的汇总:
|
10月前
|
消息中间件 Java Kafka
Flink的sink实战之二:kafka
实践如何将flink数据集sink到kafka
126 0
Flink的sink实战之二:kafka
|
消息中间件 Java Kafka
Flink的sink实战之二:kafka
实践如何将flink数据集sink到kafka
843 0
Flink的sink实战之二:kafka