具体报错
1 [Source: Custom Source -> Sink: Unnamed (1/1)#0] ERROR org.apache.flink.streaming.connectors.redis.RedisSink - Redis has not been properly initialized: redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool at redis.clients.util.Pool.getResource(Pool.java:50) at redis.clients.jedis.JedisPool.getResource(JedisPool.java:99) at org.apache.flink.streaming.connectors.redis.common.container.RedisContainer.getInstance(RedisContainer.java:250) at org.apache.flink.streaming.connectors.redis.common.container.RedisContainer.open(RedisContainer.java:85) at org.apache.flink.streaming.connectors.redis.RedisSink.open(RedisSink.java:174) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:574) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:554) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:750) Caused by: redis.clients.jedis.exceptions.JedisConnectionException: java.net.ConnectException: Connection refused: connect at redis.clients.jedis.Connection.connect(Connection.java:164) at redis.clients.jedis.BinaryClient.connect(BinaryClient.java:80) at redis.clients.jedis.BinaryJedis.connect(BinaryJedis.java:1676) at redis.clients.jedis.JedisFactory.makeObject(JedisFactory.java:87) at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:861) at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:435) at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:363) at redis.clients.util.Pool.getResource(Pool.java:48) ... 14 more Caused by: java.net.ConnectException: Connection refused: connect at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:607) at redis.clients.jedis.Connection.connect(Connection.java:158) ... 21 more Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:137) at java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:616) at java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$0(AkkaInvocationHandler.java:237) at java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:774) at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:750) at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488) at java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:1975) at org.apache.flink.runtime.concurrent.FutureUtils$1.onComplete(FutureUtils.java:1081) at akka.dispatch.OnComplete.internal(Future.scala:264) at akka.dispatch.OnComplete.internal(Future.scala:261) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:191) at akka.dispatch.japi$CallbackBridge.apply(Future.scala:188) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at org.apache.flink.runtime.concurrent.Executors$DirectExecutionContext.execute(Executors.java:73) at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284) at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284) at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:573) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:22) at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:21) at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532) at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29) at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29) at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60) at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:55) at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:91) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12) at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81) at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:91) at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:40) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:44) at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138) at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82) at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:207) at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:197) at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:188) at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:677) at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79) at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212) at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) at akka.actor.Actor.aroundReceive(Actor.scala:517) at akka.actor.Actor.aroundReceive$(Actor.scala:515) at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) at akka.actor.ActorCell.invoke(ActorCell.scala:561) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) at akka.dispatch.Mailbox.run(Mailbox.scala:225) at akka.dispatch.Mailbox.exec(Mailbox.scala:235) ... 4 more Caused by: redis.clients.jedis.exceptions.JedisConnectionException: Could not get a resource from the pool at redis.clients.util.Pool.getResource(Pool.java:50) at redis.clients.jedis.JedisPool.getResource(JedisPool.java:99) at org.apache.flink.streaming.connectors.redis.common.container.RedisContainer.getInstance(RedisContainer.java:250) at org.apache.flink.streaming.connectors.redis.common.container.RedisContainer.open(RedisContainer.java:85) at org.apache.flink.streaming.connectors.redis.RedisSink.open(RedisSink.java:174) at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:34) at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:46) at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:437) at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:574) at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.call(StreamTaskActionExecutor.java:100) at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:554) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:756) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563) at java.lang.Thread.run(Thread.java:750) Caused by: redis.clients.jedis.exceptions.JedisConnectionException: java.net.ConnectException: Connection refused: connect at redis.clients.jedis.Connection.connect(Connection.java:164) at redis.clients.jedis.BinaryClient.connect(BinaryClient.java:80) at redis.clients.jedis.BinaryJedis.connect(BinaryJedis.java:1676) at redis.clients.jedis.JedisFactory.makeObject(JedisFactory.java:87) at org.apache.commons.pool2.impl.GenericObjectPool.create(GenericObjectPool.java:861) at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:435) at org.apache.commons.pool2.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:363) at redis.clients.util.Pool.getResource(Pool.java:48) ... 14 more Caused by: java.net.ConnectException: Connection refused: connect at java.net.DualStackPlainSocketImpl.waitForConnect(Native Method) at java.net.DualStackPlainSocketImpl.socketConnect(DualStackPlainSocketImpl.java:85) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.PlainSocketImpl.connect(PlainSocketImpl.java:172) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:607) at redis.clients.jedis.Connection.connect(Connection.java:158) ... 21 more
根据报错定位到一个错误栈:Caused by: redis.clients.jedis.exceptions.JedisConnectionException: java.net.ConnectException: Connection refused: connect
应该是redis连接不上,确认配置(host、password、port、timeout等)没有问题之后检查逐步排查其他问题
1、先检查了redis状态
[root@master src]# ps -aux | grep redis root 11455 0.0 0.0 162396 7796 ? Ssl 20:42 0:01 redis-server 127.0.0.1:6379 root 58166 0.0 0.0 112828 988 pts/0 S+ 21:19 0:00 grep --color=auto redis [root@master src]# netstat -nltp | grep redis tcp 0 0 127.0.0.1:6379 0.0.0.0:* LISTEN 11455/redis-server [root@master src]# redis-cli 127.0.0.1:6379>
redis正常状态且能正常使用
2、检查防火墙是否关闭或者是否放行redis端口(redis默认端口6379)
[root@master src]# systemctl status firewalld ● firewalld.service - firewalld - dynamic firewall daemon Loaded: loaded (/usr/lib/systemd/system/firewalld.service; disabled; vendor preset: enabled) Active: inactive (dead) Docs: man:firewalld(1) [root@master src]#
防火墙为关闭状态
3、查看redis配置里的保护模式是否关闭(redis3.2版本以上有这个属性)
查看redis配置里的bind是否关闭
[root@master redis-6.0.8]# cat redis.conf | grep protect # Protected mode is a layer of security protection, in order to avoid that # When protected mode is on and if: # By default protected mode is enabled. You should disable it only if protected-mode yes # If the master is password protected (using the "requirepass" configuration # on the internet. It's just a protection layer against misuse of the instance. # So use the 'requirepass' option to protect your instance. [root@master redis-6.0.8] # cat redis.conf |grep bind # By default, if no "bind" configuration directive is specified, Redis listens # the "bind" configuration directive, followed by one or more IP addresses. # bind 192.168.1.100 10.0.0.1 # bind 127.0.0.1 ::1 # internet, binding to all the interfaces is dangerous and will expose the # following bind directive, that will force Redis to listen only into bind 127.0.0.1 # 1) The server is not binding explicitly to a set of addresses using the # "bind" directive. # are explicitly listed using the "bind" directive.
可以看到redis保护模式合bind 参数是开启的
- 将配置中的yes改为no
- 将bind哪一行注释掉
- 之后重新启动即可解决问题