目前使用flink处理kafka数据后sink到mysql,处理完的数据是一个用户ID对应多条记录,都要插入到mysql,所以在sink中会循环list把数据组装成batch提交,所以mysql开启了事务保证一批数据的事务。现在发现程序运行几个小时候会出现mysql死锁!想了解一下大家是怎么处理sink端 > 多条记录的操作 > sink的invoke代码 > @Override > public void invoke(Tuple5<String, String, String, String, > List<BroadBandReq>> value, Context context) throws Exception { > connection.setAutoCommit(false); > List<BroadBandReq> f4 = value.f4; > for (BroadBandReq rs: f4){ > statement.setString(1,rs.getUserId()); > statement.setString(2,rs.getPhoneNum()); > statement.setString(3,rs.getProvId()); > statement.addBatch(); > } > try { > statement.executeBatch(); > connection.commit(); > }catch (Exception e){ > LOG.info(" add data for rds ; operTag:{}, > userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0, value.f1, > value.f2, value.f3,f4); > connection.rollback(); > e.printStackTrace(); > throw new Exception(e); > } > } > > > > > java.lang.Exception: java.sql.BatchUpdateException: Deadlock found when > trying to get lock; try restarting transaction > at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:73) > at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:18) > at > org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:627) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:718) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:736) > at > org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102) > at > com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:162) > at > com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:157) > at > org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) > at > org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) > at > org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173) > at org.apache.flink.streaming.runtime.io > .StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) > at org.apache.flink.streaming.runtime.io > .StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) > at org.apache.flink.streaming.runtime.io > .StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) > at > org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) > at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) > at java.lang.Thread.run(Thread.java:748) > Caused by: java.sql.BatchUpdateException: Deadlock found when trying to > get lock; try restarting transaction > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at com.mysql.jdbc.Util.handleNewInstance(Util.java:425) > at com.mysql.jdbc.Util.getInstance(Util.java:408) > at > com.mysql.jdbc.SQLError.createBatchUpdateException(SQLError.java:1163) > at > com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1772) > at > com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1262) > at > com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970) > at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:67) > ... 33 more > Caused by: > com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock > found when trying to get lock; try restarting transaction > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native > Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at com.mysql.jdbc.Util.handleNewInstance(Util.java:425) > at com.mysql.jdbc.Util.getInstance(Util.java:408) > at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:952) > at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3976) > at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3912) > at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2530) > at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2683) > at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486) > at > com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858) > at > com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2079) > at > com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1756) > ... 36 more > [flink-akka.actor.default-dispatcher-783]
*来自志愿者整理的flink邮件归档
我们也会用幂等处理类似的东西。 1.你要么单条数据处理 2.要么保证每个事务之间不会出现冲突才行,比如楼上说了key by 之类的
*来自志愿者整理的flink邮件归档
版权声明:本文内容由阿里云实名注册用户自发贡献,版权归原作者所有,阿里云开发者社区不拥有其著作权,亦不承担相应法律责任。具体规则请查看《阿里云开发者社区用户服务协议》和《阿里云开发者社区知识产权保护指引》。如果您发现本社区中有涉嫌抄袭的内容,填写侵权投诉表单进行举报,一经查实,本社区将立刻删除涉嫌侵权内容。