开发者社区> 问答> 正文

程序运行几个小时候会出现mysql死锁!想了解一下大家是怎么处理sink端 多条记录的操作?

目前使用flink处理kafka数据后sink到mysql,处理完的数据是一个用户ID对应多条记录,都要插入到mysql,所以在sink中会循环list把数据组装成batch提交,所以mysql开启了事务保证一批数据的事务。现在发现程序运行几个小时候会出现mysql死锁!想了解一下大家是怎么处理sink端 多条记录的操作 sink的invoke代码 @Override public void invoke(Tuple5<String, String, String, String, List > value, Context context) throws Exception { connection.setAutoCommit(false); List f4 = value.f4; for (BroadBandReq rs: f4){ statement.setString(1,rs.getUserId()); statement.setString(2,rs.getPhoneNum()); statement.setString(3,rs.getProvId()); statement.addBatch(); } try { statement.executeBatch(); connection.commit(); }catch (Exception e){ LOG.info(" add data for rds ; operTag:{}, userId:{},removeTag:{},stateCode:{}, phoneList:{}", value.f0, value.f1, value.f2, value.f3,f4); connection.rollback(); e.printStackTrace(); throw new Exception(e); } }

java.lang.Exception: java.sql.BatchUpdateException: Deadlock found when trying to get lock; try restarting transaction at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:73) at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:18) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:627) at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:736) at org.apache.flink.streaming.api.operators.ProcessOperator$ContextImpl.output(ProcessOperator.java:102) at com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:162) at com.unicom.ogg.ExtractOggApplicationV2$6.processElement(ExtractOggApplicationV2.java:157) at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:641) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708) at org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:40) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:173) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.processElement(StreamTaskNetworkInput.java:151) at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:128) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:69) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:311) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:187) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:487) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:470) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:707) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:532) at java.lang.Thread.run(Thread.java:748) Caused by: java.sql.BatchUpdateException: Deadlock found when trying to get lock; try restarting transaction at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at com.mysql.jdbc.Util.handleNewInstance(Util.java:425) at com.mysql.jdbc.Util.getInstance(Util.java:408) at com.mysql.jdbc.SQLError.createBatchUpdateException(SQLError.java:1163) at com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1772) at com.mysql.jdbc.PreparedStatement.executeBatchInternal(PreparedStatement.java:1262) at com.mysql.jdbc.StatementImpl.executeBatch(StatementImpl.java:970) at com.mycom.ogg.RdsOperaterSink.invoke(RdsOperaterSink.java:67) ... 33 more Caused by: com.mysql.jdbc.exceptions.jdbc4.MySQLTransactionRollbackException: Deadlock found when trying to get lock; try restarting transaction at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at com.mysql.jdbc.Util.handleNewInstance(Util.java:425) at com.mysql.jdbc.Util.getInstance(Util.java:408) at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:952) at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3976) at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3912) at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2530) at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2683) at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2486) at com.mysql.jdbc.PreparedStatement.executeInternal(PreparedStatement.java:1858) at com.mysql.jdbc.PreparedStatement.executeUpdateInternal(PreparedStatement.java:2079) at com.mysql.jdbc.PreparedStatement.executeBatchSerially(PreparedStatement.java:1756) ... 36 more [flink-akka.actor.default-dispatcher-783]*来自志愿者整理的FLINK邮件归档

展开
收起
玛丽莲梦嘉 2021-12-03 18:37:20 1534 0
1 条回答
写回答
取消 提交回答
  •       可以辛苦贴一下上下文吗,什么场景,mysql的sql是什么。可以先判断是什么导致了MySQL死锁,我理解如果可以对上游做主键keyby,那么下游同时只有一个算子在处理同一个主键。*来自志愿者整理的FLINK邮件归档

    2021-12-03 18:59:46
    赞同 展开评论 打赏
问答排行榜
最热
最新

相关电子书

更多
搭建电商项目架构连接MySQL 立即下载
搭建4层电商项目架构,实战连接MySQL 立即下载
PolarDB MySQL引擎重磅功能及产品能力盛大发布 立即下载

相关镜像