要求
conf 文件
a1.sources=r1
a1.sinks=k1
a1.channels=c1
a1.sources.r1.type=avro
a1.sources.r1.bind=master
a1.sources.r1.port=9999
a1.sinks.k1.type=avro
a1.sinks.k1.hostname=slave1
a1.sinks.k1.port=7777
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
第二个conf
a1.sources=r1
a1.sinks=k1
a1.channels=c1
a1.sources.r1.type=avro
a1.sources.r1.bind=slave1
a1.sources.r1.port=7777
a1.sinks.k1.type=logger
a1.channels.c1.type=memory
a1.channels.c1.capacity=1000
a1.channels.c1.transactionCapacity=100
a1.sources.r1.channels=c1
a1.sinks.k1.channel=c1
当运行第第一个conf
出现错误
org.apache.flume.FlumeException: NettyAvroRpcClient { host: slave1, port: 8888 }: RPC connection error
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:181)
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:120)
at org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:638)
at org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:90)
at org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:127)
at org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:210)
at org.apache.flume.sink.AbstractRpcSink.start(AbstractRpcSink.java:290)
at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:45)
at org.apache.flume.SinkRunner.start(SinkRunner.java:79)
at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:249)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Error connecting to slave1/192.168.111.199:8888
at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:261)
at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:203)
at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:152)
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:169)
... 16 more
Caused by: java.net.ConnectException: Connection refused: slave1/192.168.111.199:8888
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152)
at org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
at org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
at org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
... 3 more
17/10/06 21:52:54 INFO sink.AbstractRpcSink: Rpc sink s1 started.
17/10/06 21:52:54 INFO sink.AbstractRpcSink: Rpc sink s1: Building RpcClient with hostname: slave1, port: 8888
17/10/06 21:52:54 INFO sink.AvroSink: Attempting to create Avro Rpc client.
17/10/06 21:52:54 WARN api.NettyAvroRpcClient: Using default maxIOWorkers
17/10/06 21:52:54 ERROR flume.SinkRunner: Unable to deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to send events
at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:389)
at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:67)
at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:145)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flume.FlumeException: NettyAvroRpcClient { host: slave1, port: 8888 }: RPC connection error
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:181)
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:120)
at org.apache.flume.api.NettyAvroRpcClient.configure(NettyAvroRpcClient.java:638)
at org.apache.flume.api.RpcClientFactory.getInstance(RpcClientFactory.java:90)
at org.apache.flume.sink.AvroSink.initializeRpcClient(AvroSink.java:127)
at org.apache.flume.sink.AbstractRpcSink.createConnection(AbstractRpcSink.java:210)
at org.apache.flume.sink.AbstractRpcSink.verifyConnection(AbstractRpcSink.java:270)
at org.apache.flume.sink.AbstractRpcSink.process(AbstractRpcSink.java:346)
... 3 more
Caused by: java.io.IOException: Error connecting to slave1/192.168.111.199:8888
at org.apache.avro.ipc.NettyTransceiver.getChannel(NettyTransceiver.java:261)
at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:203)
at org.apache.avro.ipc.NettyTransceiver.<init>(NettyTransceiver.java:152)
at org.apache.flume.api.NettyAvroRpcClient.connect(NettyAvroRpcClient.java:169)
... 10 more
Caused by: java.net.ConnectException: Connection refused: slave1/192.168.111.199:8888
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152)
at org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
at org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79)
at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:318)
at org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42)
at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108)
at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
原因是先后顺序不对 得先打开第二个conf这样才可以保证sink的avro输出有接收方